aboutsummaryrefslogtreecommitdiff
path: root/src/zenstore/cache/cachedisklayer.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenstore/cache/cachedisklayer.cpp')
-rw-r--r--src/zenstore/cache/cachedisklayer.cpp671
1 files changed, 425 insertions, 246 deletions
diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp
index 72a767645..1ebb8f144 100644
--- a/src/zenstore/cache/cachedisklayer.cpp
+++ b/src/zenstore/cache/cachedisklayer.cpp
@@ -14,6 +14,7 @@
#include <zencore/trace.h>
#include <zencore/workthreadpool.h>
#include <zencore/xxhash.h>
+#include <zenutil/parallelwork.h>
#include <zenutil/referencemetadata.h>
#include <zenutil/workerpools.h>
@@ -195,34 +196,33 @@ namespace cache::impl {
return true;
}
- bool MoveAndDeleteDirectory(const std::filesystem::path& Dir)
+ std::filesystem::path MoveDroppedDirectory(const std::filesystem::path& Dir)
{
int DropIndex = 0;
do
{
- if (!std::filesystem::exists(Dir))
+ if (!IsDir(Dir))
{
- return false;
+ return {};
}
std::string DroppedName = fmt::format("[dropped]{}({})", Dir.filename().string(), DropIndex);
std::filesystem::path DroppedBucketPath = Dir.parent_path() / DroppedName;
- if (std::filesystem::exists(DroppedBucketPath))
+ if (IsDir(DroppedBucketPath))
{
DropIndex++;
continue;
}
std::error_code Ec;
- std::filesystem::rename(Dir, DroppedBucketPath, Ec);
+ RenameDirectory(Dir, DroppedBucketPath, Ec);
if (!Ec)
{
- DeleteDirectories(DroppedBucketPath);
- return true;
+ return DroppedBucketPath;
}
- // TODO: Do we need to bail at some point?
zen::Sleep(100);
- } while (true);
+ } while (DropIndex < 10);
+ return {};
}
} // namespace cache::impl
@@ -373,10 +373,10 @@ private:
#pragma pack(4)
struct ManifestData
{
- uint32_t RawSize; // 4
- AccessTime Timestamp; // 4
- IoHash RawHash; // 20
- IoHash Key; // 20
+ uint32_t RawSize; // 4
+ uint32_t SecondsSinceEpoch; // 4
+ IoHash RawHash; // 20
+ IoHash Key; // 20
};
#pragma pack(pop)
@@ -658,7 +658,7 @@ BucketManifestSerializer::ReadSidecarFile(RwLock::ExclusiveLockScope& B
ZenCacheDiskLayer::CacheBucket::BucketPayload& PayloadEntry = Payloads[PlIndex];
- AccessTimes[PlIndex] = Entry->Timestamp;
+ AccessTimes[PlIndex].SetSecondsSinceEpoch(Entry->SecondsSinceEpoch);
if (Entry->RawSize && Entry->RawHash != IoHash::Zero)
{
@@ -685,6 +685,16 @@ BucketManifestSerializer::WriteSidecarFile(RwLock::SharedLockScope&,
{
ZEN_TRACE_CPU("Z$::WriteSidecarFile");
+ ZEN_DEBUG("writing store sidecar for '{}'", SidecarPath);
+ const uint64_t EntryCount = Index.size();
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
+ ZEN_INFO("wrote store sidecar for '{}' containing {} entries in {}",
+ SidecarPath,
+ EntryCount,
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+
BucketMetaHeader Header;
Header.EntryCount = m_ManifestEntryCount;
Header.LogPosition = SnapshotLogPosition;
@@ -702,43 +712,44 @@ BucketManifestSerializer::WriteSidecarFile(RwLock::SharedLockScope&,
SidecarFile.Write(&Header, sizeof Header, 0);
- // TODO: make this batching for better performance
{
uint64_t WriteOffset = sizeof Header;
- // BasicFileWriter SidecarWriter(SidecarFile, 128 * 1024);
+ const size_t MaxManifestDataBufferCount = (512u * 1024u) / sizeof(ManifestData);
- std::vector<ManifestData> ManifestDataBuffer;
- const size_t MaxManifestDataBufferCount = Min(Index.size(), 8192u); // 512 Kb
- ManifestDataBuffer.reserve(MaxManifestDataBufferCount);
+ std::vector<ManifestData> ManifestDataBuffer(Min(m_ManifestEntryCount, MaxManifestDataBufferCount));
+ auto WriteIt = ManifestDataBuffer.begin();
for (auto& Kv : Index)
{
- const IoHash& Key = Kv.first;
- const PayloadIndex PlIndex = Kv.second;
+ ManifestData& Data = *WriteIt++;
- IoHash RawHash = IoHash::Zero;
- uint32_t RawSize = 0;
+ const PayloadIndex PlIndex = Kv.second;
+ Data.Key = Kv.first;
+ Data.SecondsSinceEpoch = AccessTimes[PlIndex].GetSecondsSinceEpoch();
if (const MetaDataIndex MetaIndex = Payloads[PlIndex].MetaData)
{
- RawHash = MetaDatas[MetaIndex].RawHash;
- RawSize = MetaDatas[MetaIndex].RawSize;
+ Data.RawHash = MetaDatas[MetaIndex].RawHash;
+ Data.RawSize = MetaDatas[MetaIndex].RawSize;
+ }
+ else
+ {
+ Data.RawHash = IoHash::Zero;
+ Data.RawSize = 0;
}
- ManifestDataBuffer.emplace_back(
- ManifestData{.RawSize = RawSize, .Timestamp = AccessTimes[PlIndex], .RawHash = RawHash, .Key = Key});
- if (ManifestDataBuffer.size() == MaxManifestDataBufferCount)
+ if (WriteIt == ManifestDataBuffer.end())
{
- const uint64_t WriteSize = sizeof(ManifestData) * ManifestDataBuffer.size();
+ uint64_t WriteSize = std::distance(ManifestDataBuffer.begin(), WriteIt) * sizeof(ManifestData);
SidecarFile.Write(ManifestDataBuffer.data(), WriteSize, WriteOffset);
WriteOffset += WriteSize;
- ManifestDataBuffer.clear();
- ManifestDataBuffer.reserve(MaxManifestDataBufferCount);
+ WriteIt = ManifestDataBuffer.begin();
}
}
- if (ManifestDataBuffer.size() > 0)
+ if (WriteIt != ManifestDataBuffer.begin())
{
- SidecarFile.Write(ManifestDataBuffer.data(), sizeof(ManifestData) * ManifestDataBuffer.size(), WriteOffset);
+ uint64_t WriteSize = std::distance(ManifestDataBuffer.begin(), WriteIt) * sizeof(ManifestData);
+ SidecarFile.Write(ManifestDataBuffer.data(), WriteSize, WriteOffset);
}
}
@@ -763,11 +774,11 @@ namespace zen {
ZenCacheDiskLayer::CacheBucket::CacheBucket(GcManager& Gc,
std::atomic_uint64_t& OuterCacheMemoryUsage,
- std::string BucketName,
+ std::string_view BucketName,
const BucketConfiguration& Config)
: m_Gc(Gc)
, m_OuterCacheMemoryUsage(OuterCacheMemoryUsage)
-, m_BucketName(std::move(BucketName))
+, m_BucketName(BucketName)
, m_Configuration(Config)
, m_BucketId(Oid::Zero)
{
@@ -795,6 +806,16 @@ ZenCacheDiskLayer::CacheBucket::CacheBucket(GcManager& Gc,
ZenCacheDiskLayer::CacheBucket::~CacheBucket()
{
+ try
+ {
+ m_SlogFile.Flush();
+ m_SlogFile.Close();
+ m_BlockStore.Close();
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("~CacheBucket() failed with: ", Ex.what());
+ }
m_Gc.RemoveGcReferencer(*this);
}
@@ -868,12 +889,13 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo
}
void
-ZenCacheDiskLayer::CacheBucket::WriteIndexSnapshotLocked(bool FlushLockPosition, const std::function<uint64_t()>& ClaimDiskReserveFunc)
+ZenCacheDiskLayer::CacheBucket::WriteIndexSnapshotLocked(uint64_t LogPosition,
+ bool ResetLog,
+ const std::function<uint64_t()>& ClaimDiskReserveFunc)
{
ZEN_TRACE_CPU("Z$::Bucket::WriteIndexSnapshot");
- const uint64_t LogCount = FlushLockPosition ? 0 : m_SlogFile.GetLogCount();
- if (m_LogFlushPosition == LogCount)
+ if (m_LogFlushPosition == LogPosition)
{
return;
}
@@ -890,7 +912,7 @@ ZenCacheDiskLayer::CacheBucket::WriteIndexSnapshotLocked(bool FlushLockPosition,
namespace fs = std::filesystem;
- fs::path IndexPath = cache::impl::GetIndexPath(m_BucketDir, m_BucketName);
+ const fs::path IndexPath = cache::impl::GetIndexPath(m_BucketDir, m_BucketName);
try
{
@@ -922,66 +944,70 @@ ZenCacheDiskLayer::CacheBucket::WriteIndexSnapshotLocked(bool FlushLockPosition,
throw std::system_error(Ec, fmt::format("failed to create new snapshot file in '{}'", m_BucketDir));
}
- {
- // This is in a separate scope just to ensure IndexWriter goes out
- // of scope before the file is flushed/closed, in order to ensure
- // all data is written to the file
- BasicFileWriter IndexWriter(ObjectIndexFile, 128 * 1024);
+ const uint64_t IndexLogPosition = ResetLog ? 0 : LogPosition;
- cache::impl::CacheBucketIndexHeader Header = {.EntryCount = EntryCount,
- .LogPosition = LogCount,
- .PayloadAlignment = gsl::narrow<uint32_t>(m_Configuration.PayloadAlignment)};
+ cache::impl::CacheBucketIndexHeader Header = {.EntryCount = EntryCount,
+ .LogPosition = IndexLogPosition,
+ .PayloadAlignment = gsl::narrow<uint32_t>(m_Configuration.PayloadAlignment)};
- Header.Checksum = cache::impl::CacheBucketIndexHeader::ComputeChecksum(Header);
- IndexWriter.Write(&Header, sizeof(cache::impl::CacheBucketIndexHeader), 0);
+ Header.Checksum = cache::impl::CacheBucketIndexHeader::ComputeChecksum(Header);
+ ObjectIndexFile.Write(&Header, sizeof(cache::impl::CacheBucketIndexHeader), 0);
+ if (EntryCount > 0)
+ {
uint64_t IndexWriteOffset = sizeof(cache::impl::CacheBucketIndexHeader);
+ size_t MaxWriteEntryCount = (512u * 1024u) / sizeof(DiskIndexEntry);
+ std::vector<DiskIndexEntry> DiskEntryBuffer(Min(m_Index.size(), MaxWriteEntryCount));
+
+ auto WriteIt = DiskEntryBuffer.begin();
for (auto& Entry : m_Index)
{
- DiskIndexEntry IndexEntry;
- IndexEntry.Key = Entry.first;
- IndexEntry.Location = m_Payloads[Entry.second].Location;
- IndexWriter.Write(&IndexEntry, sizeof(DiskIndexEntry), IndexWriteOffset);
-
- IndexWriteOffset += sizeof(DiskIndexEntry);
+ *WriteIt++ = {.Key = Entry.first, .Location = m_Payloads[Entry.second].Location};
+ if (WriteIt == DiskEntryBuffer.end())
+ {
+ uint64_t WriteSize = std::distance(DiskEntryBuffer.begin(), WriteIt) * sizeof(DiskIndexEntry);
+ ObjectIndexFile.Write(DiskEntryBuffer.data(), WriteSize, IndexWriteOffset);
+ IndexWriteOffset += WriteSize;
+ WriteIt = DiskEntryBuffer.begin();
+ }
}
- IndexWriter.Flush();
+ if (WriteIt != DiskEntryBuffer.begin())
+ {
+ uint64_t WriteSize = std::distance(DiskEntryBuffer.begin(), WriteIt) * sizeof(DiskIndexEntry);
+ ObjectIndexFile.Write(DiskEntryBuffer.data(), WriteSize, IndexWriteOffset);
+ }
}
ObjectIndexFile.Flush();
ObjectIndexFile.MoveTemporaryIntoPlace(IndexPath, Ec);
if (Ec)
{
- std::filesystem::path TempFilePath = ObjectIndexFile.GetPath();
- ZEN_WARN("snapshot failed to rename new snapshot '{}' to '{}', reason: '{}'", TempFilePath, IndexPath, Ec.message());
+ throw std::system_error(Ec,
+ fmt::format("Snapshot failed to rename new snapshot '{}' to '{}', reason: '{}'",
+ ObjectIndexFile.GetPath(),
+ IndexPath,
+ Ec.message()));
}
- else
+
+ if (ResetLog)
{
- // We must only update the log flush position once the snapshot write succeeds
- if (FlushLockPosition)
- {
- std::filesystem::path LogPath = cache::impl::GetLogPath(m_BucketDir, m_BucketName);
+ const std::filesystem::path LogPath = cache::impl::GetLogPath(m_BucketDir, m_BucketName);
- if (std::filesystem::is_regular_file(LogPath))
+ if (IsFile(LogPath))
+ {
+ m_SlogFile.Close();
+ if (!RemoveFile(LogPath, Ec) || Ec)
{
- if (!std::filesystem::remove(LogPath, Ec) || Ec)
- {
- ZEN_WARN("snapshot failed to clean log file '{}', removing index at '{}', reason: '{}'",
- LogPath,
- IndexPath,
- Ec.message());
- std::error_code RemoveIndexEc;
- std::filesystem::remove(IndexPath, RemoveIndexEc);
- }
+ // This is non-critical, it only means that we will replay the events of the log over the snapshot - inefficent but in
+ // the end it will be the same result
+ ZEN_WARN("snapshot failed to clean log file '{}', reason: '{}'", LogPath, IndexPath, Ec.message());
}
- }
- if (!Ec)
- {
- m_LogFlushPosition = LogCount;
+ m_SlogFile.Open(LogPath, CasLogFile::Mode::kWrite);
}
}
+ m_LogFlushPosition = IndexLogPosition;
}
catch (const std::exception& Err)
{
@@ -994,7 +1020,7 @@ ZenCacheDiskLayer::CacheBucket::ReadIndexFile(RwLock::ExclusiveLockScope&, const
{
ZEN_TRACE_CPU("Z$::Bucket::ReadIndexFile");
- if (!std::filesystem::is_regular_file(IndexPath))
+ if (!IsFile(IndexPath))
{
return 0;
}
@@ -1078,7 +1104,7 @@ ZenCacheDiskLayer::CacheBucket::ReadLog(RwLock::ExclusiveLockScope&, const std::
{
ZEN_TRACE_CPU("Z$::Bucket::ReadLog");
- if (!std::filesystem::is_regular_file(LogPath))
+ if (!IsFile(LogPath))
{
return 0;
}
@@ -1158,47 +1184,40 @@ ZenCacheDiskLayer::CacheBucket::InitializeIndexFromDisk(RwLock::ExclusiveLockSco
if (IsNew)
{
- fs::remove(LogPath);
- fs::remove(IndexPath);
- fs::remove_all(m_BlocksBasePath);
+ RemoveFile(LogPath);
+ RemoveFile(IndexPath);
+ DeleteDirectories(m_BlocksBasePath);
}
CreateDirectories(m_BucketDir);
m_BlockStore.Initialize(m_BlocksBasePath, m_Configuration.MaxBlockSize, BlockStoreDiskLocation::MaxBlockIndex + 1);
- if (std::filesystem::is_regular_file(IndexPath))
+ if (IsFile(IndexPath))
{
uint32_t IndexVersion = 0;
m_LogFlushPosition = ReadIndexFile(IndexLock, IndexPath, IndexVersion);
if (IndexVersion == 0)
{
ZEN_WARN("removing invalid index file at '{}'", IndexPath);
- std::filesystem::remove(IndexPath);
+ RemoveFile(IndexPath);
}
}
uint64_t LogEntryCount = 0;
- if (std::filesystem::is_regular_file(LogPath))
+ if (IsFile(LogPath))
{
if (TCasLogFile<DiskIndexEntry>::IsValid(LogPath))
{
LogEntryCount = ReadLog(IndexLock, LogPath, m_LogFlushPosition);
}
- else if (fs::is_regular_file(LogPath))
+ else if (IsFile(LogPath))
{
ZEN_WARN("removing invalid log at '{}'", LogPath);
- std::filesystem::remove(LogPath);
+ RemoveFile(LogPath);
}
}
- if (IsNew || LogEntryCount > 0 || m_LogFlushPosition != 0)
- {
- WriteIndexSnapshot(IndexLock, /*Flush log*/ true);
- }
-
- m_SlogFile.Open(LogPath, CasLogFile::Mode::kWrite);
-
BlockStore::BlockIndexSet KnownBlocks;
for (const auto& Entry : m_Index)
{
@@ -1216,7 +1235,53 @@ ZenCacheDiskLayer::CacheBucket::InitializeIndexFromDisk(RwLock::ExclusiveLockSco
KnownBlocks.insert(BlockIndex);
}
}
- m_BlockStore.SyncExistingBlocksOnDisk(KnownBlocks);
+ BlockStore::BlockIndexSet MissingBlocks = m_BlockStore.SyncExistingBlocksOnDisk(KnownBlocks);
+ m_SlogFile.Open(LogPath, CasLogFile::Mode::kWrite);
+
+ bool RemovedEntries = false;
+ if (!MissingBlocks.empty())
+ {
+ std::vector<DiskIndexEntry> MissingEntries;
+
+ for (auto& It : m_Index)
+ {
+ BucketPayload& Payload = m_Payloads[It.second];
+ DiskLocation Location = Payload.Location;
+ if (!Location.IsFlagSet(DiskLocation::kStandaloneFile))
+ {
+ if (MissingBlocks.contains(Location.Location.BlockLocation.GetBlockIndex()))
+ {
+ RemoveMemCachedData(IndexLock, Payload);
+ RemoveMetaData(IndexLock, Payload);
+ }
+ }
+ Location.Flags |= DiskLocation::kTombStone;
+ MissingEntries.push_back(DiskIndexEntry{.Key = It.first, .Location = Location});
+ }
+
+ ZEN_ASSERT(!MissingEntries.empty());
+
+ for (const DiskIndexEntry& Entry : MissingEntries)
+ {
+ m_Index.erase(Entry.Key);
+ }
+ m_SlogFile.Append(MissingEntries);
+ m_SlogFile.Flush();
+ {
+ std::vector<BucketPayload> Payloads;
+ std::vector<AccessTime> AccessTimes;
+ std::vector<BucketMetaData> MetaDatas;
+ std::vector<MemCacheData> MemCachedPayloads;
+ IndexMap Index;
+ CompactState(IndexLock, Payloads, AccessTimes, MetaDatas, MemCachedPayloads, Index);
+ }
+ RemovedEntries = true;
+ }
+
+ if (IsNew || LogEntryCount > 0 || m_LogFlushPosition != 0 || RemovedEntries)
+ {
+ WriteIndexSnapshot(IndexLock, m_SlogFile.GetLogCount(), /*Flush log*/ true);
+ }
}
void
@@ -1384,7 +1449,7 @@ ZenCacheDiskLayer::CacheBucket::EndPutBatch(PutBatchHandle* Batch) noexcept
struct ZenCacheDiskLayer::CacheBucket::GetBatchHandle
{
- GetBatchHandle(std::vector<ZenCacheValue>& OutResults) : OutResults(OutResults)
+ GetBatchHandle(ZenCacheValueVec_t& OutResults) : OutResults(OutResults)
{
Keys.reserve(OutResults.capacity());
ResultIndexes.reserve(OutResults.capacity());
@@ -1395,11 +1460,11 @@ struct ZenCacheDiskLayer::CacheBucket::GetBatchHandle
std::vector<IoHash> Keys;
std::vector<size_t> ResultIndexes;
- std::vector<ZenCacheValue>& OutResults;
+ ZenCacheValueVec_t& OutResults;
};
ZenCacheDiskLayer::CacheBucket::GetBatchHandle*
-ZenCacheDiskLayer::CacheBucket::BeginGetBatch(std::vector<ZenCacheValue>& OutResult)
+ZenCacheDiskLayer::CacheBucket::BeginGetBatch(ZenCacheValueVec_t& OutResult)
{
ZEN_TRACE_CPU("Z$::Bucket::BeginGetBatch");
return new GetBatchHandle(OutResult);
@@ -1419,13 +1484,13 @@ ZenCacheDiskLayer::CacheBucket::EndGetBatch(GetBatchHandle* Batch) noexcept
if (!Batch->ResultIndexes.empty())
{
- std::vector<DiskLocation> StandaloneDiskLocations;
- std::vector<size_t> StandaloneKeyIndexes;
- std::vector<size_t> MemCachedKeyIndexes;
- std::vector<DiskLocation> InlineDiskLocations;
- std::vector<BlockStoreLocation> InlineBlockLocations;
- std::vector<size_t> InlineKeyIndexes;
- std::vector<bool> FillRawHashAndRawSize(Batch->Keys.size(), false);
+ eastl::fixed_vector<DiskLocation, 16> StandaloneDiskLocations;
+ eastl::fixed_vector<size_t, 16> StandaloneKeyIndexes;
+ eastl::fixed_vector<size_t, 16> MemCachedKeyIndexes;
+ eastl::fixed_vector<DiskLocation, 16> InlineDiskLocations;
+ eastl::fixed_vector<BlockStoreLocation, 16> InlineBlockLocations;
+ eastl::fixed_vector<size_t, 16> InlineKeyIndexes;
+ eastl::fixed_vector<bool, 16> FillRawHashAndRawSize(Batch->Keys.size(), false);
{
RwLock::SharedLockScope IndexLock(m_IndexLock);
for (size_t KeyIndex = 0; KeyIndex < Batch->Keys.size(); KeyIndex++)
@@ -1479,6 +1544,13 @@ ZenCacheDiskLayer::CacheBucket::EndGetBatch(GetBatchHandle* Batch) noexcept
}
}
}
+ else
+ {
+ if (m_Configuration.MemCacheSizeThreshold > 0)
+ {
+ m_MemoryMissCount++;
+ }
+ }
}
}
@@ -1487,7 +1559,7 @@ ZenCacheDiskLayer::CacheBucket::EndGetBatch(GetBatchHandle* Batch) noexcept
// Often we will find the metadata due to the thread setting the mem cached part doing it before us so it is worth
// checking if it is present once more before spending time fetching and setting the RawHash and RawSize in metadata
- auto FillOne = [&](const DiskLocation& Location, size_t KeyIndex, IoBuffer&& Value) {
+ auto FillOne = [&](const DiskLocation& Location, size_t KeyIndex, IoBuffer&& Value, bool UsesTemporaryMemory) {
if (!Value)
{
return;
@@ -1510,6 +1582,12 @@ ZenCacheDiskLayer::CacheBucket::EndGetBatch(GetBatchHandle* Batch) noexcept
}
}
+ if (AddToMemCache || UsesTemporaryMemory)
+ {
+ // We need to own it if we want to add it to the memcache or the buffer is just a range of the block iteration buffer
+ OutValue.Value.MakeOwned();
+ }
+
if (SetMetaInfo)
{
// See ZenCacheDiskLayer::CacheBucket::Get - it sets the memcache part first and then if it needs to it set the
@@ -1581,33 +1659,42 @@ ZenCacheDiskLayer::CacheBucket::EndGetBatch(GetBatchHandle* Batch) noexcept
if (!InlineDiskLocations.empty())
{
ZEN_TRACE_CPU("Z$::Bucket::EndGetBatch::ReadInline");
- m_BlockStore.IterateChunks(InlineBlockLocations, [&](uint32_t, std::span<const size_t> ChunkIndexes) -> bool {
- // Only read into memory the IoBuffers we could potentially add to memcache
- const uint64_t LargeChunkSizeLimit = Max(m_Configuration.MemCacheSizeThreshold, 1u * 1024u);
- m_BlockStore.IterateBlock(
- InlineBlockLocations,
- ChunkIndexes,
- [this, &FillOne, &InlineDiskLocations, &InlineKeyIndexes](size_t ChunkIndex,
- const void* Data,
- uint64_t Size) -> bool {
- if (Data != nullptr)
- {
- FillOne(InlineDiskLocations[ChunkIndex],
- InlineKeyIndexes[ChunkIndex],
- IoBufferBuilder::MakeCloneFromMemory(Data, Size));
- }
- return true;
- },
- [this, &FillOne, &InlineDiskLocations, &InlineKeyIndexes](size_t ChunkIndex,
- BlockStoreFile& File,
- uint64_t Offset,
- uint64_t Size) -> bool {
- FillOne(InlineDiskLocations[ChunkIndex], InlineKeyIndexes[ChunkIndex], File.GetChunk(Offset, Size));
- return true;
- },
- LargeChunkSizeLimit);
- return true;
- });
+ m_BlockStore.IterateChunks(std::span{begin(InlineBlockLocations), end(InlineBlockLocations)},
+ [&](uint32_t, std::span<const size_t> ChunkIndexes) -> bool {
+ // Up to 8KB or m_Configuration.MemCacheSizeThreshold depending on configuration
+ const uint64_t LargeChunkSizeLimit =
+ m_Configuration.MemCacheSizeThreshold == 0
+ ? Min(m_Configuration.LargeObjectThreshold, 8u * 1024u)
+ : Max(m_Configuration.MemCacheSizeThreshold, 8u * 1024u);
+
+ m_BlockStore.IterateBlock(
+ std::span{begin(InlineBlockLocations), end(InlineBlockLocations)},
+ ChunkIndexes,
+ [this, &FillOne, &InlineDiskLocations, &InlineKeyIndexes](size_t ChunkIndex,
+ const void* Data,
+ uint64_t Size) -> bool {
+ if (Data != nullptr)
+ {
+ FillOne(InlineDiskLocations[ChunkIndex],
+ InlineKeyIndexes[ChunkIndex],
+ IoBufferBuilder::MakeFromMemory(MemoryView(Data, Size)),
+ /*UsesTemporaryMemory*/ true);
+ }
+ return true;
+ },
+ [this, &FillOne, &InlineDiskLocations, &InlineKeyIndexes](size_t ChunkIndex,
+ BlockStoreFile& File,
+ uint64_t Offset,
+ uint64_t Size) -> bool {
+ FillOne(InlineDiskLocations[ChunkIndex],
+ InlineKeyIndexes[ChunkIndex],
+ File.GetChunk(Offset, Size),
+ /*UsesTemporaryMemory*/ false);
+ return true;
+ },
+ LargeChunkSizeLimit);
+ return true;
+ });
}
if (!StandaloneDiskLocations.empty())
@@ -1617,7 +1704,7 @@ ZenCacheDiskLayer::CacheBucket::EndGetBatch(GetBatchHandle* Batch) noexcept
{
size_t KeyIndex = StandaloneKeyIndexes[Index];
const DiskLocation& Location = StandaloneDiskLocations[Index];
- FillOne(Location, KeyIndex, GetStandaloneCacheValue(Location, Batch->Keys[KeyIndex]));
+ FillOne(Location, KeyIndex, GetStandaloneCacheValue(Location, Batch->Keys[KeyIndex]), /*UsesTemporaryMemory*/ false);
}
}
@@ -1697,10 +1784,6 @@ ZenCacheDiskLayer::CacheBucket::EndGetBatch(GetBatchHandle* Batch) noexcept
else
{
m_DiskMissCount++;
- if (m_Configuration.MemCacheSizeThreshold > 0)
- {
- m_MemoryMissCount++;
- }
}
}
}
@@ -2029,11 +2112,13 @@ ZenCacheDiskLayer::CacheBucket::GetUsageByAccess(GcClock::TimePoint Now, GcClock
}
}
-bool
+std::function<void()>
ZenCacheDiskLayer::CacheBucket::Drop()
{
ZEN_TRACE_CPU("Z$::Bucket::Drop");
+ m_Gc.RemoveGcReferencer(*this);
+
RwLock::ExclusiveLockScope _(m_IndexLock);
std::vector<std::unique_ptr<RwLock::ExclusiveLockScope>> ShardLocks;
@@ -2045,7 +2130,7 @@ ZenCacheDiskLayer::CacheBucket::Drop()
m_BlockStore.Close();
m_SlogFile.Close();
- const bool Deleted = cache::impl::MoveAndDeleteDirectory(m_BucketDir);
+ std::filesystem::path DroppedPath = cache::impl::MoveDroppedDirectory(m_BucketDir);
m_Index.clear();
m_Payloads.clear();
@@ -2058,7 +2143,21 @@ ZenCacheDiskLayer::CacheBucket::Drop()
m_OuterCacheMemoryUsage.fetch_sub(m_MemCachedSize.load());
m_MemCachedSize.store(0);
- return Deleted;
+ if (DroppedPath.empty())
+ {
+ return {};
+ }
+ else
+ {
+ return [DroppedPath = std::move(DroppedPath)]() {
+ std::error_code Ec;
+ (void)DeleteDirectories(DroppedPath, Ec);
+ if (Ec)
+ {
+ ZEN_WARN("Failed to clean up dropped bucket directory '{}', reason: '{}'", DroppedPath, Ec.message());
+ }
+ };
+ }
}
void
@@ -2093,6 +2192,9 @@ ZenCacheDiskLayer::CacheBucket::SaveSnapshot(const std::function<uint64_t()>& Cl
ZEN_TRACE_CPU("Z$::Bucket::SaveSnapshot");
try
{
+ // Be defensive regarding log position as it is written to without acquiring m_LocationMapLock
+ const uint64_t LogPosition = m_SlogFile.GetLogCount();
+
bool UseLegacyScheme = false;
IoBuffer Buffer;
@@ -2107,7 +2209,7 @@ ZenCacheDiskLayer::CacheBucket::SaveSnapshot(const std::function<uint64_t()>& Cl
{
RwLock::SharedLockScope IndexLock(m_IndexLock);
- WriteIndexSnapshot(IndexLock, /*Flush log*/ false);
+ WriteIndexSnapshot(IndexLock, LogPosition, /*Flush log*/ false);
// Note: this copy could be eliminated on shutdown to
// reduce memory usage and execution time
Index = m_Index;
@@ -2147,7 +2249,7 @@ ZenCacheDiskLayer::CacheBucket::SaveSnapshot(const std::function<uint64_t()>& Cl
else
{
RwLock::SharedLockScope IndexLock(m_IndexLock);
- WriteIndexSnapshot(IndexLock, /*Flush log*/ false);
+ WriteIndexSnapshot(IndexLock, LogPosition, /*Flush log*/ false);
const uint64_t EntryCount = m_Index.size();
Buffer = ManifestWriter.MakeSidecarManifest(m_BucketId, EntryCount);
uint64_t SidecarSize = ManifestWriter.GetSidecarSize();
@@ -2257,7 +2359,7 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx)
RwLock::SharedLockScope ValueLock(LockForHash(HashKey));
std::error_code Ec;
- uintmax_t size = std::filesystem::file_size(DataFilePath.ToPath(), Ec);
+ uintmax_t size = FileSizeFromPath(DataFilePath.ToPath(), Ec);
if (Ec)
{
ReportBadKey(HashKey);
@@ -2398,11 +2500,11 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx)
BuildPath(Path, Entry.Key);
fs::path FilePath = Path.ToPath();
RwLock::ExclusiveLockScope ValueLock(LockForHash(Entry.Key));
- if (fs::is_regular_file(FilePath))
+ if (IsFile(FilePath))
{
ZEN_DEBUG("deleting bad standalone cache file '{}'", Path.ToUtf8());
std::error_code Ec;
- fs::remove(FilePath, Ec); // We don't care if we fail, we are no longer tracking this file...
+ RemoveFile(FilePath, Ec); // We don't care if we fail, we are no longer tracking this file...
}
}
}
@@ -2535,7 +2637,7 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c
if (CleanUpTempFile)
{
std::error_code Ec;
- std::filesystem::remove(DataFile.GetPath(), Ec);
+ RemoveFile(DataFile.GetPath(), Ec);
if (Ec)
{
ZEN_WARN("Failed to clean up temporary file '{}' for put in '{}', reason '{}'",
@@ -2563,7 +2665,7 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c
RwLock::ExclusiveLockScope ValueLock(LockForHash(HashKey));
// We do a speculative remove of the file instead of probing with a exists call and check the error code instead
- std::filesystem::remove(FsPath, Ec);
+ RemoveFile(FsPath, Ec);
if (Ec)
{
if (Ec.value() != ENOENT)
@@ -2571,7 +2673,7 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c
ZEN_WARN("Failed to remove file '{}' for put in '{}', reason: '{}', retrying.", FsPath, m_BucketDir, Ec.message());
Sleep(100);
Ec.clear();
- std::filesystem::remove(FsPath, Ec);
+ RemoveFile(FsPath, Ec);
if (Ec && Ec.value() != ENOENT)
{
throw std::system_error(Ec, fmt::format("Failed to remove file '{}' for put in '{}'", FsPath, m_BucketDir));
@@ -2796,7 +2898,6 @@ ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey,
ZEN_MEMSCOPE(GetCacheDiskTag());
ZEN_TRACE_CPU("Z$::Bucket::UpdateLocation");
DiskLocation Location(BlockStoreLocation, m_Configuration.PayloadAlignment, EntryFlags);
- m_SlogFile.Append({.Key = HashKey, .Location = Location});
RwLock::ExclusiveLockScope IndexLock(m_IndexLock);
if (m_TrackedCacheKeys)
@@ -2826,6 +2927,7 @@ ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey,
m_AccessTimes.emplace_back(GcClock::TickCount());
m_Index.insert_or_assign(HashKey, EntryIndex);
}
+ m_SlogFile.Append({.Key = HashKey, .Location = Location});
});
}
@@ -2842,9 +2944,10 @@ class DiskBucketStoreCompactor : public GcStoreCompactor
using CacheBucket = ZenCacheDiskLayer::CacheBucket;
public:
- DiskBucketStoreCompactor(CacheBucket& Bucket, std::vector<std::pair<IoHash, uint64_t>>&& ExpiredStandaloneKeys)
+ DiskBucketStoreCompactor(CacheBucket& Bucket, std::vector<std::pair<IoHash, uint64_t>>&& ExpiredStandaloneKeys, bool FlushBucket)
: m_Bucket(Bucket)
, m_ExpiredStandaloneKeys(std::move(ExpiredStandaloneKeys))
+ , m_FlushBucket(FlushBucket)
{
m_ExpiredStandaloneKeys.shrink_to_fit();
}
@@ -2902,7 +3005,7 @@ public:
ZEN_DEBUG("GCV2: cachebucket [COMPACT] '{}': deleting standalone cache file '{}'", m_Bucket.m_BucketDir, Path.ToUtf8());
std::error_code Ec;
- if (!fs::remove(FilePath, Ec))
+ if (!RemoveFile(FilePath, Ec))
{
continue;
}
@@ -2923,7 +3026,7 @@ public:
ZEN_DEBUG("GCV2: cachebucket [COMPACT] '{}': checking standalone cache file '{}'", m_Bucket.m_BucketDir, Path.ToUtf8());
std::error_code Ec;
- bool Existed = std::filesystem::is_regular_file(FilePath, Ec);
+ bool Existed = IsFile(FilePath, Ec);
if (Ec)
{
ZEN_WARN("GCV2: cachebucket [COMPACT] '{}': failed checking cache payload file '{}'. Reason '{}'",
@@ -3023,10 +3126,12 @@ public:
m_Bucket.m_BlockStore.CompactBlocks(
BlockCompactState,
m_Bucket.m_Configuration.PayloadAlignment,
- [&](const BlockStore::MovedChunksArray& MovedArray, uint64_t FreedDiskSpace) {
+ [&](const BlockStore::MovedChunksArray& MovedArray,
+ const BlockStore::ChunkIndexArray& ScrubbedArray,
+ uint64_t FreedDiskSpace) {
std::vector<DiskIndexEntry> MovedEntries;
MovedEntries.reserve(MovedArray.size());
- RwLock::ExclusiveLockScope _(m_Bucket.m_IndexLock);
+ RwLock::ExclusiveLockScope IndexLock(m_Bucket.m_IndexLock);
for (const std::pair<size_t, BlockStoreLocation>& Moved : MovedArray)
{
size_t ChunkIndex = Moved.first;
@@ -3048,6 +3153,24 @@ public:
MovedEntries.push_back({.Key = Key, .Location = Payload.Location});
}
}
+
+ for (size_t ScrubbedIndex : ScrubbedArray)
+ {
+ const IoHash& Key = BlockCompactStateKeys[ScrubbedIndex];
+
+ if (auto It = m_Bucket.m_Index.find(Key); It != m_Bucket.m_Index.end())
+ {
+ BucketPayload& Payload = m_Bucket.m_Payloads[It->second];
+ DiskLocation Location = Payload.Location;
+
+ m_Bucket.RemoveMemCachedData(IndexLock, Payload);
+ m_Bucket.RemoveMetaData(IndexLock, Payload);
+
+ Location.Flags |= DiskLocation::kTombStone;
+ MovedEntries.push_back(DiskIndexEntry{.Key = Key, .Location = Location});
+ }
+ }
+
m_Bucket.m_SlogFile.Append(MovedEntries);
Stats.RemovedDisk += FreedDiskSpace;
if (Ctx.IsCancelledFlag.load())
@@ -3071,6 +3194,10 @@ public:
}
}
}
+ if (m_FlushBucket)
+ {
+ m_Bucket.Flush();
+ }
}
virtual std::string GetGcName(GcCtx& Ctx) override { return m_Bucket.GetGcName(Ctx); }
@@ -3078,6 +3205,7 @@ public:
private:
ZenCacheDiskLayer::CacheBucket& m_Bucket;
std::vector<std::pair<IoHash, uint64_t>> m_ExpiredStandaloneKeys;
+ bool m_FlushBucket = false;
};
GcStoreCompactor*
@@ -3101,24 +3229,6 @@ ZenCacheDiskLayer::CacheBucket::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats)
NiceBytes(Stats.FreedMemory),
NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
}
- if (Stats.DeletedCount > 0)
- {
- bool Expected = false;
- if (m_IsFlushing || !m_IsFlushing.compare_exchange_strong(Expected, true))
- {
- return;
- }
- auto FlushingGuard = MakeGuard([&] { m_IsFlushing.store(false); });
-
- try
- {
- SaveSnapshot([]() { return 0; });
- }
- catch (const std::exception& Ex)
- {
- ZEN_WARN("Failed to write index and manifest after RemoveExpiredData in '{}'. Reason: '{}'", m_BucketDir, Ex.what());
- }
- }
});
const GcClock::Tick ExpireTicks = Ctx.Settings.CacheExpireTime.time_since_epoch().count();
@@ -3170,7 +3280,7 @@ ZenCacheDiskLayer::CacheBucket::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats)
return nullptr;
}
- if (Ctx.Settings.IsDeleteMode)
+ if (Ctx.Settings.IsDeleteMode && !ExpiredEntries.empty())
{
for (const DiskIndexEntry& Entry : ExpiredEntries)
{
@@ -3205,7 +3315,7 @@ ZenCacheDiskLayer::CacheBucket::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats)
return nullptr;
}
- return new DiskBucketStoreCompactor(*this, std::move(ExpiredStandaloneKeys));
+ return new DiskBucketStoreCompactor(*this, std::move(ExpiredStandaloneKeys), /*FlushBucket*/ Stats.DeletedCount > 0);
}
bool
@@ -3395,7 +3505,7 @@ ZenCacheDiskLayer::CacheBucket::GetReferences(const LoggerRef& Logger,
CaptureAttachments(ChunkIndex, File.GetChunk(Offset, Size).GetView());
return !IsCancelledFlag.load();
},
- 0);
+ 32u * 1024);
if (Continue)
{
@@ -3698,11 +3808,11 @@ ZenCacheDiskLayer::CacheBucket*
ZenCacheDiskLayer::GetOrCreateBucket(std::string_view InBucket)
{
ZEN_TRACE_CPU("Z$::GetOrCreateBucket");
- const auto BucketName = std::string(InBucket);
{
RwLock::SharedLockScope SharedLock(m_Lock);
- if (auto It = m_Buckets.find(BucketName); It != m_Buckets.end())
+ if (auto It = m_Buckets.find_as(InBucket, std::hash<std::string_view>(), eastl::equal_to_2<std::string, std::string_view>());
+ It != m_Buckets.end())
{
return It->second.get();
}
@@ -3710,31 +3820,40 @@ ZenCacheDiskLayer::GetOrCreateBucket(std::string_view InBucket)
// We create the bucket without holding a lock since contructor calls GcManager::AddGcReferencer which takes an exclusive lock.
// This can cause a deadlock, if GC is running we would block while holding ZenCacheDiskLayer::m_Lock
- std::unique_ptr<CacheBucket> Bucket(
- std::make_unique<CacheBucket>(m_Gc, m_TotalMemCachedSize, BucketName, m_Configuration.BucketConfig));
+ BucketConfiguration* BucketConfig = &m_Configuration.BucketConfig;
+ if (auto It = m_Configuration.BucketConfigMap.find_as(InBucket,
+ std::hash<std::string_view>(),
+ eastl::equal_to_2<std::string, std::string_view>());
+ It != m_Configuration.BucketConfigMap.end())
+ {
+ BucketConfig = &It->second;
+ }
+ std::unique_ptr<CacheBucket> Bucket(std::make_unique<CacheBucket>(m_Gc, m_TotalMemCachedSize, InBucket, *BucketConfig));
RwLock::ExclusiveLockScope Lock(m_Lock);
- if (auto It = m_Buckets.find(BucketName); It != m_Buckets.end())
+ if (auto It = m_Buckets.find_as(InBucket, std::hash<std::string_view>(), eastl::equal_to_2<std::string, std::string_view>());
+ It != m_Buckets.end())
{
return It->second.get();
}
std::filesystem::path BucketPath = m_RootDir;
- BucketPath /= BucketName;
+ BucketPath /= InBucket;
try
{
if (!Bucket->OpenOrCreate(BucketPath))
{
- ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", BucketName, m_RootDir);
+ ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", InBucket, m_RootDir);
return nullptr;
}
}
catch (const std::exception& Err)
{
- ZEN_WARN("Creating bucket '{}' in '{}' FAILED, reason: '{}'", BucketName, BucketPath, Err.what());
+ ZEN_WARN("Creating bucket '{}' in '{}' FAILED, reason: '{}'", InBucket, BucketPath, Err.what());
throw;
}
+ std::string BucketName{InBucket};
CacheBucket* Result = Bucket.get();
m_Buckets.emplace(BucketName, std::move(Bucket));
if (m_CapturedBuckets)
@@ -3833,7 +3952,7 @@ ZenCacheDiskLayer::EndPutBatch(PutBatchHandle* Batch) noexcept
struct ZenCacheDiskLayer::GetBatchHandle
{
- GetBatchHandle(std::vector<ZenCacheValue>& OutResults) : OutResults(OutResults) {}
+ GetBatchHandle(ZenCacheValueVec_t& OutResults) : OutResults(OutResults) {}
struct BucketHandle
{
CacheBucket* Bucket;
@@ -3893,13 +4012,13 @@ struct ZenCacheDiskLayer::GetBatchHandle
return NewBucketHandle;
}
- RwLock Lock;
- std::vector<BucketHandle> BucketHandles;
- std::vector<ZenCacheValue>& OutResults;
+ RwLock Lock;
+ eastl::fixed_vector<BucketHandle, 4> BucketHandles;
+ ZenCacheValueVec_t& OutResults;
};
ZenCacheDiskLayer::GetBatchHandle*
-ZenCacheDiskLayer::BeginGetBatch(std::vector<ZenCacheValue>& OutResults)
+ZenCacheDiskLayer::BeginGetBatch(ZenCacheValueVec_t& OutResults)
{
return new GetBatchHandle(OutResults);
}
@@ -3994,7 +4113,11 @@ ZenCacheDiskLayer::DiscoverBuckets()
if (IsKnownBadBucketName(BucketName))
{
BadBucketDirectories.push_back(BucketPath);
-
+ continue;
+ }
+ else if (BucketName.starts_with("[dropped]"))
+ {
+ BadBucketDirectories.push_back(BucketPath);
continue;
}
@@ -4027,50 +4150,66 @@ ZenCacheDiskLayer::DiscoverBuckets()
RwLock SyncLock;
WorkerThreadPool& Pool = GetLargeWorkerPool(EWorkloadType::Burst);
- Latch WorkLatch(1);
- for (auto& BucketPath : FoundBucketDirectories)
+ std::atomic<bool> AbortFlag;
+ std::atomic<bool> PauseFlag;
+ ParallelWork Work(AbortFlag, PauseFlag);
+ try
{
- WorkLatch.AddCount(1);
- Pool.ScheduleWork([this, &WorkLatch, &SyncLock, BucketPath]() {
- ZEN_MEMSCOPE(GetCacheDiskTag());
-
- auto _ = MakeGuard([&]() { WorkLatch.CountDown(); });
- const std::string BucketName = PathToUtf8(BucketPath.stem());
- try
- {
- std::unique_ptr<CacheBucket> NewBucket =
- std::make_unique<CacheBucket>(m_Gc, m_TotalMemCachedSize, BucketName, m_Configuration.BucketConfig);
+ for (auto& BucketPath : FoundBucketDirectories)
+ {
+ Work.ScheduleWork(Pool, [this, &SyncLock, BucketPath](std::atomic<bool>&) {
+ ZEN_MEMSCOPE(GetCacheDiskTag());
- CacheBucket* Bucket = nullptr;
+ const std::string BucketName = PathToUtf8(BucketPath.stem());
+ try
{
- RwLock::ExclusiveLockScope __(SyncLock);
- auto InsertResult = m_Buckets.emplace(BucketName, std::move(NewBucket));
- Bucket = InsertResult.first->second.get();
- }
- ZEN_ASSERT(Bucket);
+ BucketConfiguration* BucketConfig = &m_Configuration.BucketConfig;
+ if (auto It = m_Configuration.BucketConfigMap.find_as(std::string_view(BucketName),
+ std::hash<std::string_view>(),
+ eastl::equal_to_2<std::string, std::string_view>());
+ It != m_Configuration.BucketConfigMap.end())
+ {
+ BucketConfig = &It->second;
+ }
- if (!Bucket->OpenOrCreate(BucketPath, /* AllowCreate */ false))
- {
- ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", BucketName, m_RootDir);
+ std::unique_ptr<CacheBucket> NewBucket =
+ std::make_unique<CacheBucket>(m_Gc, m_TotalMemCachedSize, BucketName, *BucketConfig);
+ CacheBucket* Bucket = nullptr;
{
RwLock::ExclusiveLockScope __(SyncLock);
- m_Buckets.erase(BucketName);
+ auto InsertResult = m_Buckets.emplace(BucketName, std::move(NewBucket));
+ Bucket = InsertResult.first->second.get();
+ }
+ ZEN_ASSERT(Bucket);
+
+ if (!Bucket->OpenOrCreate(BucketPath, /* AllowCreate */ false))
+ {
+ ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", BucketName, m_RootDir);
+
+ {
+ RwLock::ExclusiveLockScope __(SyncLock);
+ m_Buckets.erase(BucketName);
+ }
}
}
- }
- catch (const std::exception& Err)
- {
- ZEN_ERROR("Opening bucket '{}' in '{}' FAILED, reason: '{}'", BucketName, BucketPath, Err.what());
- return;
- }
- });
+ catch (const std::exception& Err)
+ {
+ ZEN_ERROR("Opening bucket '{}' in '{}' FAILED, reason: '{}'", BucketName, BucketPath, Err.what());
+ return;
+ }
+ });
+ }
+ }
+ catch (const std::exception& Ex)
+ {
+ AbortFlag.store(true);
+ ZEN_WARN("Failed discovering buckets in {}. Reason: '{}'", m_RootDir, Ex.what());
}
- WorkLatch.CountDown();
- WorkLatch.Wait();
+ Work.Wait();
}
-bool
+std::function<void()>
ZenCacheDiskLayer::DropBucket(std::string_view InBucket)
{
ZEN_TRACE_CPU("Z$::DropBucket");
@@ -4088,33 +4227,72 @@ ZenCacheDiskLayer::DropBucket(std::string_view InBucket)
return Bucket.Drop();
}
- // Make sure we remove the folder even if we don't know about the bucket
std::filesystem::path BucketPath = m_RootDir;
BucketPath /= std::string(InBucket);
- return cache::impl::MoveAndDeleteDirectory(BucketPath);
+ std::filesystem::path DroppedPath = cache::impl::MoveDroppedDirectory(BucketPath);
+ if (DroppedPath.empty())
+ {
+ return {};
+ }
+ else
+ {
+ return [DroppedPath = std::move(DroppedPath)]() {
+ std::error_code Ec;
+ (void)DeleteDirectories(DroppedPath, Ec);
+ if (Ec)
+ {
+ ZEN_WARN("Failed to clean up dropped bucket directory '{}', reason: '{}'", DroppedPath, Ec.message());
+ }
+ };
+ }
}
-bool
+std::function<void()>
ZenCacheDiskLayer::Drop()
{
ZEN_TRACE_CPU("Z$::Drop");
- RwLock::ExclusiveLockScope _(m_Lock);
-
- std::vector<std::unique_ptr<CacheBucket>> Buckets;
- Buckets.reserve(m_Buckets.size());
- while (!m_Buckets.empty())
+ std::vector<std::function<void()>> PostDropOps;
{
- const auto& It = m_Buckets.begin();
- CacheBucket& Bucket = *It->second;
- m_DroppedBuckets.push_back(std::move(It->second));
- m_Buckets.erase(It->first);
- if (!Bucket.Drop())
+ RwLock::ExclusiveLockScope _(m_Lock);
+ PostDropOps.reserve(m_Buckets.size());
+ while (!m_Buckets.empty())
{
- return false;
+ const auto& It = m_Buckets.begin();
+ CacheBucket& Bucket = *It->second;
+ m_DroppedBuckets.push_back(std::move(It->second));
+ m_Buckets.erase(It->first);
+ if (std::function<void()> PostDropOp = Bucket.Drop(); !PostDropOp)
+ {
+ return {};
+ }
+ else
+ {
+ PostDropOps.emplace_back(std::move(PostDropOp));
+ }
}
}
- return cache::impl::MoveAndDeleteDirectory(m_RootDir);
+
+ std::filesystem::path DroppedPath = cache::impl::MoveDroppedDirectory(m_RootDir);
+ if (DroppedPath.empty())
+ {
+ return {};
+ }
+ else
+ {
+ return [DroppedPath = std::move(DroppedPath), PostDropOps = std::move(PostDropOps)]() {
+ for (auto& PostDropOp : PostDropOps)
+ {
+ PostDropOp();
+ }
+ std::error_code Ec;
+ (void)DeleteDirectories(DroppedPath, Ec);
+ if (Ec)
+ {
+ ZEN_WARN("Failed to clean up dropped bucket directory '{}', reason: '{}'", DroppedPath, Ec.message());
+ }
+ };
+ }
}
void
@@ -4144,16 +4322,16 @@ ZenCacheDiskLayer::Flush()
}
{
WorkerThreadPool& Pool = GetMediumWorkerPool(EWorkloadType::Burst);
- Latch WorkLatch(1);
+ std::atomic<bool> AbortFlag;
+ std::atomic<bool> PauseFlag;
+ ParallelWork Work(AbortFlag, PauseFlag);
try
{
for (auto& Bucket : Buckets)
{
- WorkLatch.AddCount(1);
- Pool.ScheduleWork([&WorkLatch, Bucket]() {
+ Work.ScheduleWork(Pool, [Bucket](std::atomic<bool>&) {
ZEN_MEMSCOPE(GetCacheDiskTag());
- auto _ = MakeGuard([&]() { WorkLatch.CountDown(); });
try
{
Bucket->Flush();
@@ -4167,13 +4345,14 @@ ZenCacheDiskLayer::Flush()
}
catch (const std::exception& Ex)
{
+ AbortFlag.store(true);
ZEN_ERROR("Failed to flush buckets at '{}'. Reason: '{}'", m_RootDir, Ex.what());
}
- WorkLatch.CountDown();
- while (!WorkLatch.Wait(1000))
- {
- ZEN_DEBUG("Waiting for {} buckets at '{}' to flush", WorkLatch.Remaining(), m_RootDir);
- }
+
+ Work.Wait(1000, [&](bool IsAborted, bool IsPaused, std::ptrdiff_t RemainingWork) {
+ ZEN_UNUSED(IsAborted, IsPaused);
+ ZEN_DEBUG("Waiting for {} buckets at '{}' to flush", RemainingWork, m_RootDir);
+ });
}
}