diff options
| author | zousar <[email protected]> | 2025-06-24 16:26:29 -0600 |
|---|---|---|
| committer | zousar <[email protected]> | 2025-06-24 16:26:29 -0600 |
| commit | bb298631ba35a323827dda0b8cd6158e276b5f61 (patch) | |
| tree | 7ba8db91c44ce83f2c518f80f80ab14910eefa6f /src/zenstore/cache | |
| parent | Change to PutResult structure (diff) | |
| parent | 5.6.14 (diff) | |
| download | zen-bb298631ba35a323827dda0b8cd6158e276b5f61.tar.xz zen-bb298631ba35a323827dda0b8cd6158e276b5f61.zip | |
Merge branch 'main' into zs/put-overwrite-policy
Diffstat (limited to 'src/zenstore/cache')
| -rw-r--r-- | src/zenstore/cache/cachedisklayer.cpp | 671 | ||||
| -rw-r--r-- | src/zenstore/cache/cacherpc.cpp | 116 | ||||
| -rw-r--r-- | src/zenstore/cache/structuredcachestore.cpp | 51 |
3 files changed, 530 insertions, 308 deletions
diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp index 72a767645..1ebb8f144 100644 --- a/src/zenstore/cache/cachedisklayer.cpp +++ b/src/zenstore/cache/cachedisklayer.cpp @@ -14,6 +14,7 @@ #include <zencore/trace.h> #include <zencore/workthreadpool.h> #include <zencore/xxhash.h> +#include <zenutil/parallelwork.h> #include <zenutil/referencemetadata.h> #include <zenutil/workerpools.h> @@ -195,34 +196,33 @@ namespace cache::impl { return true; } - bool MoveAndDeleteDirectory(const std::filesystem::path& Dir) + std::filesystem::path MoveDroppedDirectory(const std::filesystem::path& Dir) { int DropIndex = 0; do { - if (!std::filesystem::exists(Dir)) + if (!IsDir(Dir)) { - return false; + return {}; } std::string DroppedName = fmt::format("[dropped]{}({})", Dir.filename().string(), DropIndex); std::filesystem::path DroppedBucketPath = Dir.parent_path() / DroppedName; - if (std::filesystem::exists(DroppedBucketPath)) + if (IsDir(DroppedBucketPath)) { DropIndex++; continue; } std::error_code Ec; - std::filesystem::rename(Dir, DroppedBucketPath, Ec); + RenameDirectory(Dir, DroppedBucketPath, Ec); if (!Ec) { - DeleteDirectories(DroppedBucketPath); - return true; + return DroppedBucketPath; } - // TODO: Do we need to bail at some point? zen::Sleep(100); - } while (true); + } while (DropIndex < 10); + return {}; } } // namespace cache::impl @@ -373,10 +373,10 @@ private: #pragma pack(4) struct ManifestData { - uint32_t RawSize; // 4 - AccessTime Timestamp; // 4 - IoHash RawHash; // 20 - IoHash Key; // 20 + uint32_t RawSize; // 4 + uint32_t SecondsSinceEpoch; // 4 + IoHash RawHash; // 20 + IoHash Key; // 20 }; #pragma pack(pop) @@ -658,7 +658,7 @@ BucketManifestSerializer::ReadSidecarFile(RwLock::ExclusiveLockScope& B ZenCacheDiskLayer::CacheBucket::BucketPayload& PayloadEntry = Payloads[PlIndex]; - AccessTimes[PlIndex] = Entry->Timestamp; + AccessTimes[PlIndex].SetSecondsSinceEpoch(Entry->SecondsSinceEpoch); if (Entry->RawSize && Entry->RawHash != IoHash::Zero) { @@ -685,6 +685,16 @@ BucketManifestSerializer::WriteSidecarFile(RwLock::SharedLockScope&, { ZEN_TRACE_CPU("Z$::WriteSidecarFile"); + ZEN_DEBUG("writing store sidecar for '{}'", SidecarPath); + const uint64_t EntryCount = Index.size(); + Stopwatch Timer; + const auto _ = MakeGuard([&] { + ZEN_INFO("wrote store sidecar for '{}' containing {} entries in {}", + SidecarPath, + EntryCount, + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + BucketMetaHeader Header; Header.EntryCount = m_ManifestEntryCount; Header.LogPosition = SnapshotLogPosition; @@ -702,43 +712,44 @@ BucketManifestSerializer::WriteSidecarFile(RwLock::SharedLockScope&, SidecarFile.Write(&Header, sizeof Header, 0); - // TODO: make this batching for better performance { uint64_t WriteOffset = sizeof Header; - // BasicFileWriter SidecarWriter(SidecarFile, 128 * 1024); + const size_t MaxManifestDataBufferCount = (512u * 1024u) / sizeof(ManifestData); - std::vector<ManifestData> ManifestDataBuffer; - const size_t MaxManifestDataBufferCount = Min(Index.size(), 8192u); // 512 Kb - ManifestDataBuffer.reserve(MaxManifestDataBufferCount); + std::vector<ManifestData> ManifestDataBuffer(Min(m_ManifestEntryCount, MaxManifestDataBufferCount)); + auto WriteIt = ManifestDataBuffer.begin(); for (auto& Kv : Index) { - const IoHash& Key = Kv.first; - const PayloadIndex PlIndex = Kv.second; + ManifestData& Data = *WriteIt++; - IoHash RawHash = IoHash::Zero; - uint32_t RawSize = 0; + const PayloadIndex PlIndex = Kv.second; + Data.Key = Kv.first; + Data.SecondsSinceEpoch = AccessTimes[PlIndex].GetSecondsSinceEpoch(); if (const MetaDataIndex MetaIndex = Payloads[PlIndex].MetaData) { - RawHash = MetaDatas[MetaIndex].RawHash; - RawSize = MetaDatas[MetaIndex].RawSize; + Data.RawHash = MetaDatas[MetaIndex].RawHash; + Data.RawSize = MetaDatas[MetaIndex].RawSize; + } + else + { + Data.RawHash = IoHash::Zero; + Data.RawSize = 0; } - ManifestDataBuffer.emplace_back( - ManifestData{.RawSize = RawSize, .Timestamp = AccessTimes[PlIndex], .RawHash = RawHash, .Key = Key}); - if (ManifestDataBuffer.size() == MaxManifestDataBufferCount) + if (WriteIt == ManifestDataBuffer.end()) { - const uint64_t WriteSize = sizeof(ManifestData) * ManifestDataBuffer.size(); + uint64_t WriteSize = std::distance(ManifestDataBuffer.begin(), WriteIt) * sizeof(ManifestData); SidecarFile.Write(ManifestDataBuffer.data(), WriteSize, WriteOffset); WriteOffset += WriteSize; - ManifestDataBuffer.clear(); - ManifestDataBuffer.reserve(MaxManifestDataBufferCount); + WriteIt = ManifestDataBuffer.begin(); } } - if (ManifestDataBuffer.size() > 0) + if (WriteIt != ManifestDataBuffer.begin()) { - SidecarFile.Write(ManifestDataBuffer.data(), sizeof(ManifestData) * ManifestDataBuffer.size(), WriteOffset); + uint64_t WriteSize = std::distance(ManifestDataBuffer.begin(), WriteIt) * sizeof(ManifestData); + SidecarFile.Write(ManifestDataBuffer.data(), WriteSize, WriteOffset); } } @@ -763,11 +774,11 @@ namespace zen { ZenCacheDiskLayer::CacheBucket::CacheBucket(GcManager& Gc, std::atomic_uint64_t& OuterCacheMemoryUsage, - std::string BucketName, + std::string_view BucketName, const BucketConfiguration& Config) : m_Gc(Gc) , m_OuterCacheMemoryUsage(OuterCacheMemoryUsage) -, m_BucketName(std::move(BucketName)) +, m_BucketName(BucketName) , m_Configuration(Config) , m_BucketId(Oid::Zero) { @@ -795,6 +806,16 @@ ZenCacheDiskLayer::CacheBucket::CacheBucket(GcManager& Gc, ZenCacheDiskLayer::CacheBucket::~CacheBucket() { + try + { + m_SlogFile.Flush(); + m_SlogFile.Close(); + m_BlockStore.Close(); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("~CacheBucket() failed with: ", Ex.what()); + } m_Gc.RemoveGcReferencer(*this); } @@ -868,12 +889,13 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo } void -ZenCacheDiskLayer::CacheBucket::WriteIndexSnapshotLocked(bool FlushLockPosition, const std::function<uint64_t()>& ClaimDiskReserveFunc) +ZenCacheDiskLayer::CacheBucket::WriteIndexSnapshotLocked(uint64_t LogPosition, + bool ResetLog, + const std::function<uint64_t()>& ClaimDiskReserveFunc) { ZEN_TRACE_CPU("Z$::Bucket::WriteIndexSnapshot"); - const uint64_t LogCount = FlushLockPosition ? 0 : m_SlogFile.GetLogCount(); - if (m_LogFlushPosition == LogCount) + if (m_LogFlushPosition == LogPosition) { return; } @@ -890,7 +912,7 @@ ZenCacheDiskLayer::CacheBucket::WriteIndexSnapshotLocked(bool FlushLockPosition, namespace fs = std::filesystem; - fs::path IndexPath = cache::impl::GetIndexPath(m_BucketDir, m_BucketName); + const fs::path IndexPath = cache::impl::GetIndexPath(m_BucketDir, m_BucketName); try { @@ -922,66 +944,70 @@ ZenCacheDiskLayer::CacheBucket::WriteIndexSnapshotLocked(bool FlushLockPosition, throw std::system_error(Ec, fmt::format("failed to create new snapshot file in '{}'", m_BucketDir)); } - { - // This is in a separate scope just to ensure IndexWriter goes out - // of scope before the file is flushed/closed, in order to ensure - // all data is written to the file - BasicFileWriter IndexWriter(ObjectIndexFile, 128 * 1024); + const uint64_t IndexLogPosition = ResetLog ? 0 : LogPosition; - cache::impl::CacheBucketIndexHeader Header = {.EntryCount = EntryCount, - .LogPosition = LogCount, - .PayloadAlignment = gsl::narrow<uint32_t>(m_Configuration.PayloadAlignment)}; + cache::impl::CacheBucketIndexHeader Header = {.EntryCount = EntryCount, + .LogPosition = IndexLogPosition, + .PayloadAlignment = gsl::narrow<uint32_t>(m_Configuration.PayloadAlignment)}; - Header.Checksum = cache::impl::CacheBucketIndexHeader::ComputeChecksum(Header); - IndexWriter.Write(&Header, sizeof(cache::impl::CacheBucketIndexHeader), 0); + Header.Checksum = cache::impl::CacheBucketIndexHeader::ComputeChecksum(Header); + ObjectIndexFile.Write(&Header, sizeof(cache::impl::CacheBucketIndexHeader), 0); + if (EntryCount > 0) + { uint64_t IndexWriteOffset = sizeof(cache::impl::CacheBucketIndexHeader); + size_t MaxWriteEntryCount = (512u * 1024u) / sizeof(DiskIndexEntry); + std::vector<DiskIndexEntry> DiskEntryBuffer(Min(m_Index.size(), MaxWriteEntryCount)); + + auto WriteIt = DiskEntryBuffer.begin(); for (auto& Entry : m_Index) { - DiskIndexEntry IndexEntry; - IndexEntry.Key = Entry.first; - IndexEntry.Location = m_Payloads[Entry.second].Location; - IndexWriter.Write(&IndexEntry, sizeof(DiskIndexEntry), IndexWriteOffset); - - IndexWriteOffset += sizeof(DiskIndexEntry); + *WriteIt++ = {.Key = Entry.first, .Location = m_Payloads[Entry.second].Location}; + if (WriteIt == DiskEntryBuffer.end()) + { + uint64_t WriteSize = std::distance(DiskEntryBuffer.begin(), WriteIt) * sizeof(DiskIndexEntry); + ObjectIndexFile.Write(DiskEntryBuffer.data(), WriteSize, IndexWriteOffset); + IndexWriteOffset += WriteSize; + WriteIt = DiskEntryBuffer.begin(); + } } - IndexWriter.Flush(); + if (WriteIt != DiskEntryBuffer.begin()) + { + uint64_t WriteSize = std::distance(DiskEntryBuffer.begin(), WriteIt) * sizeof(DiskIndexEntry); + ObjectIndexFile.Write(DiskEntryBuffer.data(), WriteSize, IndexWriteOffset); + } } ObjectIndexFile.Flush(); ObjectIndexFile.MoveTemporaryIntoPlace(IndexPath, Ec); if (Ec) { - std::filesystem::path TempFilePath = ObjectIndexFile.GetPath(); - ZEN_WARN("snapshot failed to rename new snapshot '{}' to '{}', reason: '{}'", TempFilePath, IndexPath, Ec.message()); + throw std::system_error(Ec, + fmt::format("Snapshot failed to rename new snapshot '{}' to '{}', reason: '{}'", + ObjectIndexFile.GetPath(), + IndexPath, + Ec.message())); } - else + + if (ResetLog) { - // We must only update the log flush position once the snapshot write succeeds - if (FlushLockPosition) - { - std::filesystem::path LogPath = cache::impl::GetLogPath(m_BucketDir, m_BucketName); + const std::filesystem::path LogPath = cache::impl::GetLogPath(m_BucketDir, m_BucketName); - if (std::filesystem::is_regular_file(LogPath)) + if (IsFile(LogPath)) + { + m_SlogFile.Close(); + if (!RemoveFile(LogPath, Ec) || Ec) { - if (!std::filesystem::remove(LogPath, Ec) || Ec) - { - ZEN_WARN("snapshot failed to clean log file '{}', removing index at '{}', reason: '{}'", - LogPath, - IndexPath, - Ec.message()); - std::error_code RemoveIndexEc; - std::filesystem::remove(IndexPath, RemoveIndexEc); - } + // This is non-critical, it only means that we will replay the events of the log over the snapshot - inefficent but in + // the end it will be the same result + ZEN_WARN("snapshot failed to clean log file '{}', reason: '{}'", LogPath, IndexPath, Ec.message()); } - } - if (!Ec) - { - m_LogFlushPosition = LogCount; + m_SlogFile.Open(LogPath, CasLogFile::Mode::kWrite); } } + m_LogFlushPosition = IndexLogPosition; } catch (const std::exception& Err) { @@ -994,7 +1020,7 @@ ZenCacheDiskLayer::CacheBucket::ReadIndexFile(RwLock::ExclusiveLockScope&, const { ZEN_TRACE_CPU("Z$::Bucket::ReadIndexFile"); - if (!std::filesystem::is_regular_file(IndexPath)) + if (!IsFile(IndexPath)) { return 0; } @@ -1078,7 +1104,7 @@ ZenCacheDiskLayer::CacheBucket::ReadLog(RwLock::ExclusiveLockScope&, const std:: { ZEN_TRACE_CPU("Z$::Bucket::ReadLog"); - if (!std::filesystem::is_regular_file(LogPath)) + if (!IsFile(LogPath)) { return 0; } @@ -1158,47 +1184,40 @@ ZenCacheDiskLayer::CacheBucket::InitializeIndexFromDisk(RwLock::ExclusiveLockSco if (IsNew) { - fs::remove(LogPath); - fs::remove(IndexPath); - fs::remove_all(m_BlocksBasePath); + RemoveFile(LogPath); + RemoveFile(IndexPath); + DeleteDirectories(m_BlocksBasePath); } CreateDirectories(m_BucketDir); m_BlockStore.Initialize(m_BlocksBasePath, m_Configuration.MaxBlockSize, BlockStoreDiskLocation::MaxBlockIndex + 1); - if (std::filesystem::is_regular_file(IndexPath)) + if (IsFile(IndexPath)) { uint32_t IndexVersion = 0; m_LogFlushPosition = ReadIndexFile(IndexLock, IndexPath, IndexVersion); if (IndexVersion == 0) { ZEN_WARN("removing invalid index file at '{}'", IndexPath); - std::filesystem::remove(IndexPath); + RemoveFile(IndexPath); } } uint64_t LogEntryCount = 0; - if (std::filesystem::is_regular_file(LogPath)) + if (IsFile(LogPath)) { if (TCasLogFile<DiskIndexEntry>::IsValid(LogPath)) { LogEntryCount = ReadLog(IndexLock, LogPath, m_LogFlushPosition); } - else if (fs::is_regular_file(LogPath)) + else if (IsFile(LogPath)) { ZEN_WARN("removing invalid log at '{}'", LogPath); - std::filesystem::remove(LogPath); + RemoveFile(LogPath); } } - if (IsNew || LogEntryCount > 0 || m_LogFlushPosition != 0) - { - WriteIndexSnapshot(IndexLock, /*Flush log*/ true); - } - - m_SlogFile.Open(LogPath, CasLogFile::Mode::kWrite); - BlockStore::BlockIndexSet KnownBlocks; for (const auto& Entry : m_Index) { @@ -1216,7 +1235,53 @@ ZenCacheDiskLayer::CacheBucket::InitializeIndexFromDisk(RwLock::ExclusiveLockSco KnownBlocks.insert(BlockIndex); } } - m_BlockStore.SyncExistingBlocksOnDisk(KnownBlocks); + BlockStore::BlockIndexSet MissingBlocks = m_BlockStore.SyncExistingBlocksOnDisk(KnownBlocks); + m_SlogFile.Open(LogPath, CasLogFile::Mode::kWrite); + + bool RemovedEntries = false; + if (!MissingBlocks.empty()) + { + std::vector<DiskIndexEntry> MissingEntries; + + for (auto& It : m_Index) + { + BucketPayload& Payload = m_Payloads[It.second]; + DiskLocation Location = Payload.Location; + if (!Location.IsFlagSet(DiskLocation::kStandaloneFile)) + { + if (MissingBlocks.contains(Location.Location.BlockLocation.GetBlockIndex())) + { + RemoveMemCachedData(IndexLock, Payload); + RemoveMetaData(IndexLock, Payload); + } + } + Location.Flags |= DiskLocation::kTombStone; + MissingEntries.push_back(DiskIndexEntry{.Key = It.first, .Location = Location}); + } + + ZEN_ASSERT(!MissingEntries.empty()); + + for (const DiskIndexEntry& Entry : MissingEntries) + { + m_Index.erase(Entry.Key); + } + m_SlogFile.Append(MissingEntries); + m_SlogFile.Flush(); + { + std::vector<BucketPayload> Payloads; + std::vector<AccessTime> AccessTimes; + std::vector<BucketMetaData> MetaDatas; + std::vector<MemCacheData> MemCachedPayloads; + IndexMap Index; + CompactState(IndexLock, Payloads, AccessTimes, MetaDatas, MemCachedPayloads, Index); + } + RemovedEntries = true; + } + + if (IsNew || LogEntryCount > 0 || m_LogFlushPosition != 0 || RemovedEntries) + { + WriteIndexSnapshot(IndexLock, m_SlogFile.GetLogCount(), /*Flush log*/ true); + } } void @@ -1384,7 +1449,7 @@ ZenCacheDiskLayer::CacheBucket::EndPutBatch(PutBatchHandle* Batch) noexcept struct ZenCacheDiskLayer::CacheBucket::GetBatchHandle { - GetBatchHandle(std::vector<ZenCacheValue>& OutResults) : OutResults(OutResults) + GetBatchHandle(ZenCacheValueVec_t& OutResults) : OutResults(OutResults) { Keys.reserve(OutResults.capacity()); ResultIndexes.reserve(OutResults.capacity()); @@ -1395,11 +1460,11 @@ struct ZenCacheDiskLayer::CacheBucket::GetBatchHandle std::vector<IoHash> Keys; std::vector<size_t> ResultIndexes; - std::vector<ZenCacheValue>& OutResults; + ZenCacheValueVec_t& OutResults; }; ZenCacheDiskLayer::CacheBucket::GetBatchHandle* -ZenCacheDiskLayer::CacheBucket::BeginGetBatch(std::vector<ZenCacheValue>& OutResult) +ZenCacheDiskLayer::CacheBucket::BeginGetBatch(ZenCacheValueVec_t& OutResult) { ZEN_TRACE_CPU("Z$::Bucket::BeginGetBatch"); return new GetBatchHandle(OutResult); @@ -1419,13 +1484,13 @@ ZenCacheDiskLayer::CacheBucket::EndGetBatch(GetBatchHandle* Batch) noexcept if (!Batch->ResultIndexes.empty()) { - std::vector<DiskLocation> StandaloneDiskLocations; - std::vector<size_t> StandaloneKeyIndexes; - std::vector<size_t> MemCachedKeyIndexes; - std::vector<DiskLocation> InlineDiskLocations; - std::vector<BlockStoreLocation> InlineBlockLocations; - std::vector<size_t> InlineKeyIndexes; - std::vector<bool> FillRawHashAndRawSize(Batch->Keys.size(), false); + eastl::fixed_vector<DiskLocation, 16> StandaloneDiskLocations; + eastl::fixed_vector<size_t, 16> StandaloneKeyIndexes; + eastl::fixed_vector<size_t, 16> MemCachedKeyIndexes; + eastl::fixed_vector<DiskLocation, 16> InlineDiskLocations; + eastl::fixed_vector<BlockStoreLocation, 16> InlineBlockLocations; + eastl::fixed_vector<size_t, 16> InlineKeyIndexes; + eastl::fixed_vector<bool, 16> FillRawHashAndRawSize(Batch->Keys.size(), false); { RwLock::SharedLockScope IndexLock(m_IndexLock); for (size_t KeyIndex = 0; KeyIndex < Batch->Keys.size(); KeyIndex++) @@ -1479,6 +1544,13 @@ ZenCacheDiskLayer::CacheBucket::EndGetBatch(GetBatchHandle* Batch) noexcept } } } + else + { + if (m_Configuration.MemCacheSizeThreshold > 0) + { + m_MemoryMissCount++; + } + } } } @@ -1487,7 +1559,7 @@ ZenCacheDiskLayer::CacheBucket::EndGetBatch(GetBatchHandle* Batch) noexcept // Often we will find the metadata due to the thread setting the mem cached part doing it before us so it is worth // checking if it is present once more before spending time fetching and setting the RawHash and RawSize in metadata - auto FillOne = [&](const DiskLocation& Location, size_t KeyIndex, IoBuffer&& Value) { + auto FillOne = [&](const DiskLocation& Location, size_t KeyIndex, IoBuffer&& Value, bool UsesTemporaryMemory) { if (!Value) { return; @@ -1510,6 +1582,12 @@ ZenCacheDiskLayer::CacheBucket::EndGetBatch(GetBatchHandle* Batch) noexcept } } + if (AddToMemCache || UsesTemporaryMemory) + { + // We need to own it if we want to add it to the memcache or the buffer is just a range of the block iteration buffer + OutValue.Value.MakeOwned(); + } + if (SetMetaInfo) { // See ZenCacheDiskLayer::CacheBucket::Get - it sets the memcache part first and then if it needs to it set the @@ -1581,33 +1659,42 @@ ZenCacheDiskLayer::CacheBucket::EndGetBatch(GetBatchHandle* Batch) noexcept if (!InlineDiskLocations.empty()) { ZEN_TRACE_CPU("Z$::Bucket::EndGetBatch::ReadInline"); - m_BlockStore.IterateChunks(InlineBlockLocations, [&](uint32_t, std::span<const size_t> ChunkIndexes) -> bool { - // Only read into memory the IoBuffers we could potentially add to memcache - const uint64_t LargeChunkSizeLimit = Max(m_Configuration.MemCacheSizeThreshold, 1u * 1024u); - m_BlockStore.IterateBlock( - InlineBlockLocations, - ChunkIndexes, - [this, &FillOne, &InlineDiskLocations, &InlineKeyIndexes](size_t ChunkIndex, - const void* Data, - uint64_t Size) -> bool { - if (Data != nullptr) - { - FillOne(InlineDiskLocations[ChunkIndex], - InlineKeyIndexes[ChunkIndex], - IoBufferBuilder::MakeCloneFromMemory(Data, Size)); - } - return true; - }, - [this, &FillOne, &InlineDiskLocations, &InlineKeyIndexes](size_t ChunkIndex, - BlockStoreFile& File, - uint64_t Offset, - uint64_t Size) -> bool { - FillOne(InlineDiskLocations[ChunkIndex], InlineKeyIndexes[ChunkIndex], File.GetChunk(Offset, Size)); - return true; - }, - LargeChunkSizeLimit); - return true; - }); + m_BlockStore.IterateChunks(std::span{begin(InlineBlockLocations), end(InlineBlockLocations)}, + [&](uint32_t, std::span<const size_t> ChunkIndexes) -> bool { + // Up to 8KB or m_Configuration.MemCacheSizeThreshold depending on configuration + const uint64_t LargeChunkSizeLimit = + m_Configuration.MemCacheSizeThreshold == 0 + ? Min(m_Configuration.LargeObjectThreshold, 8u * 1024u) + : Max(m_Configuration.MemCacheSizeThreshold, 8u * 1024u); + + m_BlockStore.IterateBlock( + std::span{begin(InlineBlockLocations), end(InlineBlockLocations)}, + ChunkIndexes, + [this, &FillOne, &InlineDiskLocations, &InlineKeyIndexes](size_t ChunkIndex, + const void* Data, + uint64_t Size) -> bool { + if (Data != nullptr) + { + FillOne(InlineDiskLocations[ChunkIndex], + InlineKeyIndexes[ChunkIndex], + IoBufferBuilder::MakeFromMemory(MemoryView(Data, Size)), + /*UsesTemporaryMemory*/ true); + } + return true; + }, + [this, &FillOne, &InlineDiskLocations, &InlineKeyIndexes](size_t ChunkIndex, + BlockStoreFile& File, + uint64_t Offset, + uint64_t Size) -> bool { + FillOne(InlineDiskLocations[ChunkIndex], + InlineKeyIndexes[ChunkIndex], + File.GetChunk(Offset, Size), + /*UsesTemporaryMemory*/ false); + return true; + }, + LargeChunkSizeLimit); + return true; + }); } if (!StandaloneDiskLocations.empty()) @@ -1617,7 +1704,7 @@ ZenCacheDiskLayer::CacheBucket::EndGetBatch(GetBatchHandle* Batch) noexcept { size_t KeyIndex = StandaloneKeyIndexes[Index]; const DiskLocation& Location = StandaloneDiskLocations[Index]; - FillOne(Location, KeyIndex, GetStandaloneCacheValue(Location, Batch->Keys[KeyIndex])); + FillOne(Location, KeyIndex, GetStandaloneCacheValue(Location, Batch->Keys[KeyIndex]), /*UsesTemporaryMemory*/ false); } } @@ -1697,10 +1784,6 @@ ZenCacheDiskLayer::CacheBucket::EndGetBatch(GetBatchHandle* Batch) noexcept else { m_DiskMissCount++; - if (m_Configuration.MemCacheSizeThreshold > 0) - { - m_MemoryMissCount++; - } } } } @@ -2029,11 +2112,13 @@ ZenCacheDiskLayer::CacheBucket::GetUsageByAccess(GcClock::TimePoint Now, GcClock } } -bool +std::function<void()> ZenCacheDiskLayer::CacheBucket::Drop() { ZEN_TRACE_CPU("Z$::Bucket::Drop"); + m_Gc.RemoveGcReferencer(*this); + RwLock::ExclusiveLockScope _(m_IndexLock); std::vector<std::unique_ptr<RwLock::ExclusiveLockScope>> ShardLocks; @@ -2045,7 +2130,7 @@ ZenCacheDiskLayer::CacheBucket::Drop() m_BlockStore.Close(); m_SlogFile.Close(); - const bool Deleted = cache::impl::MoveAndDeleteDirectory(m_BucketDir); + std::filesystem::path DroppedPath = cache::impl::MoveDroppedDirectory(m_BucketDir); m_Index.clear(); m_Payloads.clear(); @@ -2058,7 +2143,21 @@ ZenCacheDiskLayer::CacheBucket::Drop() m_OuterCacheMemoryUsage.fetch_sub(m_MemCachedSize.load()); m_MemCachedSize.store(0); - return Deleted; + if (DroppedPath.empty()) + { + return {}; + } + else + { + return [DroppedPath = std::move(DroppedPath)]() { + std::error_code Ec; + (void)DeleteDirectories(DroppedPath, Ec); + if (Ec) + { + ZEN_WARN("Failed to clean up dropped bucket directory '{}', reason: '{}'", DroppedPath, Ec.message()); + } + }; + } } void @@ -2093,6 +2192,9 @@ ZenCacheDiskLayer::CacheBucket::SaveSnapshot(const std::function<uint64_t()>& Cl ZEN_TRACE_CPU("Z$::Bucket::SaveSnapshot"); try { + // Be defensive regarding log position as it is written to without acquiring m_LocationMapLock + const uint64_t LogPosition = m_SlogFile.GetLogCount(); + bool UseLegacyScheme = false; IoBuffer Buffer; @@ -2107,7 +2209,7 @@ ZenCacheDiskLayer::CacheBucket::SaveSnapshot(const std::function<uint64_t()>& Cl { RwLock::SharedLockScope IndexLock(m_IndexLock); - WriteIndexSnapshot(IndexLock, /*Flush log*/ false); + WriteIndexSnapshot(IndexLock, LogPosition, /*Flush log*/ false); // Note: this copy could be eliminated on shutdown to // reduce memory usage and execution time Index = m_Index; @@ -2147,7 +2249,7 @@ ZenCacheDiskLayer::CacheBucket::SaveSnapshot(const std::function<uint64_t()>& Cl else { RwLock::SharedLockScope IndexLock(m_IndexLock); - WriteIndexSnapshot(IndexLock, /*Flush log*/ false); + WriteIndexSnapshot(IndexLock, LogPosition, /*Flush log*/ false); const uint64_t EntryCount = m_Index.size(); Buffer = ManifestWriter.MakeSidecarManifest(m_BucketId, EntryCount); uint64_t SidecarSize = ManifestWriter.GetSidecarSize(); @@ -2257,7 +2359,7 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx) RwLock::SharedLockScope ValueLock(LockForHash(HashKey)); std::error_code Ec; - uintmax_t size = std::filesystem::file_size(DataFilePath.ToPath(), Ec); + uintmax_t size = FileSizeFromPath(DataFilePath.ToPath(), Ec); if (Ec) { ReportBadKey(HashKey); @@ -2398,11 +2500,11 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx) BuildPath(Path, Entry.Key); fs::path FilePath = Path.ToPath(); RwLock::ExclusiveLockScope ValueLock(LockForHash(Entry.Key)); - if (fs::is_regular_file(FilePath)) + if (IsFile(FilePath)) { ZEN_DEBUG("deleting bad standalone cache file '{}'", Path.ToUtf8()); std::error_code Ec; - fs::remove(FilePath, Ec); // We don't care if we fail, we are no longer tracking this file... + RemoveFile(FilePath, Ec); // We don't care if we fail, we are no longer tracking this file... } } } @@ -2535,7 +2637,7 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c if (CleanUpTempFile) { std::error_code Ec; - std::filesystem::remove(DataFile.GetPath(), Ec); + RemoveFile(DataFile.GetPath(), Ec); if (Ec) { ZEN_WARN("Failed to clean up temporary file '{}' for put in '{}', reason '{}'", @@ -2563,7 +2665,7 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c RwLock::ExclusiveLockScope ValueLock(LockForHash(HashKey)); // We do a speculative remove of the file instead of probing with a exists call and check the error code instead - std::filesystem::remove(FsPath, Ec); + RemoveFile(FsPath, Ec); if (Ec) { if (Ec.value() != ENOENT) @@ -2571,7 +2673,7 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c ZEN_WARN("Failed to remove file '{}' for put in '{}', reason: '{}', retrying.", FsPath, m_BucketDir, Ec.message()); Sleep(100); Ec.clear(); - std::filesystem::remove(FsPath, Ec); + RemoveFile(FsPath, Ec); if (Ec && Ec.value() != ENOENT) { throw std::system_error(Ec, fmt::format("Failed to remove file '{}' for put in '{}'", FsPath, m_BucketDir)); @@ -2796,7 +2898,6 @@ ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, ZEN_MEMSCOPE(GetCacheDiskTag()); ZEN_TRACE_CPU("Z$::Bucket::UpdateLocation"); DiskLocation Location(BlockStoreLocation, m_Configuration.PayloadAlignment, EntryFlags); - m_SlogFile.Append({.Key = HashKey, .Location = Location}); RwLock::ExclusiveLockScope IndexLock(m_IndexLock); if (m_TrackedCacheKeys) @@ -2826,6 +2927,7 @@ ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, m_AccessTimes.emplace_back(GcClock::TickCount()); m_Index.insert_or_assign(HashKey, EntryIndex); } + m_SlogFile.Append({.Key = HashKey, .Location = Location}); }); } @@ -2842,9 +2944,10 @@ class DiskBucketStoreCompactor : public GcStoreCompactor using CacheBucket = ZenCacheDiskLayer::CacheBucket; public: - DiskBucketStoreCompactor(CacheBucket& Bucket, std::vector<std::pair<IoHash, uint64_t>>&& ExpiredStandaloneKeys) + DiskBucketStoreCompactor(CacheBucket& Bucket, std::vector<std::pair<IoHash, uint64_t>>&& ExpiredStandaloneKeys, bool FlushBucket) : m_Bucket(Bucket) , m_ExpiredStandaloneKeys(std::move(ExpiredStandaloneKeys)) + , m_FlushBucket(FlushBucket) { m_ExpiredStandaloneKeys.shrink_to_fit(); } @@ -2902,7 +3005,7 @@ public: ZEN_DEBUG("GCV2: cachebucket [COMPACT] '{}': deleting standalone cache file '{}'", m_Bucket.m_BucketDir, Path.ToUtf8()); std::error_code Ec; - if (!fs::remove(FilePath, Ec)) + if (!RemoveFile(FilePath, Ec)) { continue; } @@ -2923,7 +3026,7 @@ public: ZEN_DEBUG("GCV2: cachebucket [COMPACT] '{}': checking standalone cache file '{}'", m_Bucket.m_BucketDir, Path.ToUtf8()); std::error_code Ec; - bool Existed = std::filesystem::is_regular_file(FilePath, Ec); + bool Existed = IsFile(FilePath, Ec); if (Ec) { ZEN_WARN("GCV2: cachebucket [COMPACT] '{}': failed checking cache payload file '{}'. Reason '{}'", @@ -3023,10 +3126,12 @@ public: m_Bucket.m_BlockStore.CompactBlocks( BlockCompactState, m_Bucket.m_Configuration.PayloadAlignment, - [&](const BlockStore::MovedChunksArray& MovedArray, uint64_t FreedDiskSpace) { + [&](const BlockStore::MovedChunksArray& MovedArray, + const BlockStore::ChunkIndexArray& ScrubbedArray, + uint64_t FreedDiskSpace) { std::vector<DiskIndexEntry> MovedEntries; MovedEntries.reserve(MovedArray.size()); - RwLock::ExclusiveLockScope _(m_Bucket.m_IndexLock); + RwLock::ExclusiveLockScope IndexLock(m_Bucket.m_IndexLock); for (const std::pair<size_t, BlockStoreLocation>& Moved : MovedArray) { size_t ChunkIndex = Moved.first; @@ -3048,6 +3153,24 @@ public: MovedEntries.push_back({.Key = Key, .Location = Payload.Location}); } } + + for (size_t ScrubbedIndex : ScrubbedArray) + { + const IoHash& Key = BlockCompactStateKeys[ScrubbedIndex]; + + if (auto It = m_Bucket.m_Index.find(Key); It != m_Bucket.m_Index.end()) + { + BucketPayload& Payload = m_Bucket.m_Payloads[It->second]; + DiskLocation Location = Payload.Location; + + m_Bucket.RemoveMemCachedData(IndexLock, Payload); + m_Bucket.RemoveMetaData(IndexLock, Payload); + + Location.Flags |= DiskLocation::kTombStone; + MovedEntries.push_back(DiskIndexEntry{.Key = Key, .Location = Location}); + } + } + m_Bucket.m_SlogFile.Append(MovedEntries); Stats.RemovedDisk += FreedDiskSpace; if (Ctx.IsCancelledFlag.load()) @@ -3071,6 +3194,10 @@ public: } } } + if (m_FlushBucket) + { + m_Bucket.Flush(); + } } virtual std::string GetGcName(GcCtx& Ctx) override { return m_Bucket.GetGcName(Ctx); } @@ -3078,6 +3205,7 @@ public: private: ZenCacheDiskLayer::CacheBucket& m_Bucket; std::vector<std::pair<IoHash, uint64_t>> m_ExpiredStandaloneKeys; + bool m_FlushBucket = false; }; GcStoreCompactor* @@ -3101,24 +3229,6 @@ ZenCacheDiskLayer::CacheBucket::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) NiceBytes(Stats.FreedMemory), NiceTimeSpanMs(Timer.GetElapsedTimeMs())); } - if (Stats.DeletedCount > 0) - { - bool Expected = false; - if (m_IsFlushing || !m_IsFlushing.compare_exchange_strong(Expected, true)) - { - return; - } - auto FlushingGuard = MakeGuard([&] { m_IsFlushing.store(false); }); - - try - { - SaveSnapshot([]() { return 0; }); - } - catch (const std::exception& Ex) - { - ZEN_WARN("Failed to write index and manifest after RemoveExpiredData in '{}'. Reason: '{}'", m_BucketDir, Ex.what()); - } - } }); const GcClock::Tick ExpireTicks = Ctx.Settings.CacheExpireTime.time_since_epoch().count(); @@ -3170,7 +3280,7 @@ ZenCacheDiskLayer::CacheBucket::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) return nullptr; } - if (Ctx.Settings.IsDeleteMode) + if (Ctx.Settings.IsDeleteMode && !ExpiredEntries.empty()) { for (const DiskIndexEntry& Entry : ExpiredEntries) { @@ -3205,7 +3315,7 @@ ZenCacheDiskLayer::CacheBucket::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) return nullptr; } - return new DiskBucketStoreCompactor(*this, std::move(ExpiredStandaloneKeys)); + return new DiskBucketStoreCompactor(*this, std::move(ExpiredStandaloneKeys), /*FlushBucket*/ Stats.DeletedCount > 0); } bool @@ -3395,7 +3505,7 @@ ZenCacheDiskLayer::CacheBucket::GetReferences(const LoggerRef& Logger, CaptureAttachments(ChunkIndex, File.GetChunk(Offset, Size).GetView()); return !IsCancelledFlag.load(); }, - 0); + 32u * 1024); if (Continue) { @@ -3698,11 +3808,11 @@ ZenCacheDiskLayer::CacheBucket* ZenCacheDiskLayer::GetOrCreateBucket(std::string_view InBucket) { ZEN_TRACE_CPU("Z$::GetOrCreateBucket"); - const auto BucketName = std::string(InBucket); { RwLock::SharedLockScope SharedLock(m_Lock); - if (auto It = m_Buckets.find(BucketName); It != m_Buckets.end()) + if (auto It = m_Buckets.find_as(InBucket, std::hash<std::string_view>(), eastl::equal_to_2<std::string, std::string_view>()); + It != m_Buckets.end()) { return It->second.get(); } @@ -3710,31 +3820,40 @@ ZenCacheDiskLayer::GetOrCreateBucket(std::string_view InBucket) // We create the bucket without holding a lock since contructor calls GcManager::AddGcReferencer which takes an exclusive lock. // This can cause a deadlock, if GC is running we would block while holding ZenCacheDiskLayer::m_Lock - std::unique_ptr<CacheBucket> Bucket( - std::make_unique<CacheBucket>(m_Gc, m_TotalMemCachedSize, BucketName, m_Configuration.BucketConfig)); + BucketConfiguration* BucketConfig = &m_Configuration.BucketConfig; + if (auto It = m_Configuration.BucketConfigMap.find_as(InBucket, + std::hash<std::string_view>(), + eastl::equal_to_2<std::string, std::string_view>()); + It != m_Configuration.BucketConfigMap.end()) + { + BucketConfig = &It->second; + } + std::unique_ptr<CacheBucket> Bucket(std::make_unique<CacheBucket>(m_Gc, m_TotalMemCachedSize, InBucket, *BucketConfig)); RwLock::ExclusiveLockScope Lock(m_Lock); - if (auto It = m_Buckets.find(BucketName); It != m_Buckets.end()) + if (auto It = m_Buckets.find_as(InBucket, std::hash<std::string_view>(), eastl::equal_to_2<std::string, std::string_view>()); + It != m_Buckets.end()) { return It->second.get(); } std::filesystem::path BucketPath = m_RootDir; - BucketPath /= BucketName; + BucketPath /= InBucket; try { if (!Bucket->OpenOrCreate(BucketPath)) { - ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", BucketName, m_RootDir); + ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", InBucket, m_RootDir); return nullptr; } } catch (const std::exception& Err) { - ZEN_WARN("Creating bucket '{}' in '{}' FAILED, reason: '{}'", BucketName, BucketPath, Err.what()); + ZEN_WARN("Creating bucket '{}' in '{}' FAILED, reason: '{}'", InBucket, BucketPath, Err.what()); throw; } + std::string BucketName{InBucket}; CacheBucket* Result = Bucket.get(); m_Buckets.emplace(BucketName, std::move(Bucket)); if (m_CapturedBuckets) @@ -3833,7 +3952,7 @@ ZenCacheDiskLayer::EndPutBatch(PutBatchHandle* Batch) noexcept struct ZenCacheDiskLayer::GetBatchHandle { - GetBatchHandle(std::vector<ZenCacheValue>& OutResults) : OutResults(OutResults) {} + GetBatchHandle(ZenCacheValueVec_t& OutResults) : OutResults(OutResults) {} struct BucketHandle { CacheBucket* Bucket; @@ -3893,13 +4012,13 @@ struct ZenCacheDiskLayer::GetBatchHandle return NewBucketHandle; } - RwLock Lock; - std::vector<BucketHandle> BucketHandles; - std::vector<ZenCacheValue>& OutResults; + RwLock Lock; + eastl::fixed_vector<BucketHandle, 4> BucketHandles; + ZenCacheValueVec_t& OutResults; }; ZenCacheDiskLayer::GetBatchHandle* -ZenCacheDiskLayer::BeginGetBatch(std::vector<ZenCacheValue>& OutResults) +ZenCacheDiskLayer::BeginGetBatch(ZenCacheValueVec_t& OutResults) { return new GetBatchHandle(OutResults); } @@ -3994,7 +4113,11 @@ ZenCacheDiskLayer::DiscoverBuckets() if (IsKnownBadBucketName(BucketName)) { BadBucketDirectories.push_back(BucketPath); - + continue; + } + else if (BucketName.starts_with("[dropped]")) + { + BadBucketDirectories.push_back(BucketPath); continue; } @@ -4027,50 +4150,66 @@ ZenCacheDiskLayer::DiscoverBuckets() RwLock SyncLock; WorkerThreadPool& Pool = GetLargeWorkerPool(EWorkloadType::Burst); - Latch WorkLatch(1); - for (auto& BucketPath : FoundBucketDirectories) + std::atomic<bool> AbortFlag; + std::atomic<bool> PauseFlag; + ParallelWork Work(AbortFlag, PauseFlag); + try { - WorkLatch.AddCount(1); - Pool.ScheduleWork([this, &WorkLatch, &SyncLock, BucketPath]() { - ZEN_MEMSCOPE(GetCacheDiskTag()); - - auto _ = MakeGuard([&]() { WorkLatch.CountDown(); }); - const std::string BucketName = PathToUtf8(BucketPath.stem()); - try - { - std::unique_ptr<CacheBucket> NewBucket = - std::make_unique<CacheBucket>(m_Gc, m_TotalMemCachedSize, BucketName, m_Configuration.BucketConfig); + for (auto& BucketPath : FoundBucketDirectories) + { + Work.ScheduleWork(Pool, [this, &SyncLock, BucketPath](std::atomic<bool>&) { + ZEN_MEMSCOPE(GetCacheDiskTag()); - CacheBucket* Bucket = nullptr; + const std::string BucketName = PathToUtf8(BucketPath.stem()); + try { - RwLock::ExclusiveLockScope __(SyncLock); - auto InsertResult = m_Buckets.emplace(BucketName, std::move(NewBucket)); - Bucket = InsertResult.first->second.get(); - } - ZEN_ASSERT(Bucket); + BucketConfiguration* BucketConfig = &m_Configuration.BucketConfig; + if (auto It = m_Configuration.BucketConfigMap.find_as(std::string_view(BucketName), + std::hash<std::string_view>(), + eastl::equal_to_2<std::string, std::string_view>()); + It != m_Configuration.BucketConfigMap.end()) + { + BucketConfig = &It->second; + } - if (!Bucket->OpenOrCreate(BucketPath, /* AllowCreate */ false)) - { - ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", BucketName, m_RootDir); + std::unique_ptr<CacheBucket> NewBucket = + std::make_unique<CacheBucket>(m_Gc, m_TotalMemCachedSize, BucketName, *BucketConfig); + CacheBucket* Bucket = nullptr; { RwLock::ExclusiveLockScope __(SyncLock); - m_Buckets.erase(BucketName); + auto InsertResult = m_Buckets.emplace(BucketName, std::move(NewBucket)); + Bucket = InsertResult.first->second.get(); + } + ZEN_ASSERT(Bucket); + + if (!Bucket->OpenOrCreate(BucketPath, /* AllowCreate */ false)) + { + ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", BucketName, m_RootDir); + + { + RwLock::ExclusiveLockScope __(SyncLock); + m_Buckets.erase(BucketName); + } } } - } - catch (const std::exception& Err) - { - ZEN_ERROR("Opening bucket '{}' in '{}' FAILED, reason: '{}'", BucketName, BucketPath, Err.what()); - return; - } - }); + catch (const std::exception& Err) + { + ZEN_ERROR("Opening bucket '{}' in '{}' FAILED, reason: '{}'", BucketName, BucketPath, Err.what()); + return; + } + }); + } + } + catch (const std::exception& Ex) + { + AbortFlag.store(true); + ZEN_WARN("Failed discovering buckets in {}. Reason: '{}'", m_RootDir, Ex.what()); } - WorkLatch.CountDown(); - WorkLatch.Wait(); + Work.Wait(); } -bool +std::function<void()> ZenCacheDiskLayer::DropBucket(std::string_view InBucket) { ZEN_TRACE_CPU("Z$::DropBucket"); @@ -4088,33 +4227,72 @@ ZenCacheDiskLayer::DropBucket(std::string_view InBucket) return Bucket.Drop(); } - // Make sure we remove the folder even if we don't know about the bucket std::filesystem::path BucketPath = m_RootDir; BucketPath /= std::string(InBucket); - return cache::impl::MoveAndDeleteDirectory(BucketPath); + std::filesystem::path DroppedPath = cache::impl::MoveDroppedDirectory(BucketPath); + if (DroppedPath.empty()) + { + return {}; + } + else + { + return [DroppedPath = std::move(DroppedPath)]() { + std::error_code Ec; + (void)DeleteDirectories(DroppedPath, Ec); + if (Ec) + { + ZEN_WARN("Failed to clean up dropped bucket directory '{}', reason: '{}'", DroppedPath, Ec.message()); + } + }; + } } -bool +std::function<void()> ZenCacheDiskLayer::Drop() { ZEN_TRACE_CPU("Z$::Drop"); - RwLock::ExclusiveLockScope _(m_Lock); - - std::vector<std::unique_ptr<CacheBucket>> Buckets; - Buckets.reserve(m_Buckets.size()); - while (!m_Buckets.empty()) + std::vector<std::function<void()>> PostDropOps; { - const auto& It = m_Buckets.begin(); - CacheBucket& Bucket = *It->second; - m_DroppedBuckets.push_back(std::move(It->second)); - m_Buckets.erase(It->first); - if (!Bucket.Drop()) + RwLock::ExclusiveLockScope _(m_Lock); + PostDropOps.reserve(m_Buckets.size()); + while (!m_Buckets.empty()) { - return false; + const auto& It = m_Buckets.begin(); + CacheBucket& Bucket = *It->second; + m_DroppedBuckets.push_back(std::move(It->second)); + m_Buckets.erase(It->first); + if (std::function<void()> PostDropOp = Bucket.Drop(); !PostDropOp) + { + return {}; + } + else + { + PostDropOps.emplace_back(std::move(PostDropOp)); + } } } - return cache::impl::MoveAndDeleteDirectory(m_RootDir); + + std::filesystem::path DroppedPath = cache::impl::MoveDroppedDirectory(m_RootDir); + if (DroppedPath.empty()) + { + return {}; + } + else + { + return [DroppedPath = std::move(DroppedPath), PostDropOps = std::move(PostDropOps)]() { + for (auto& PostDropOp : PostDropOps) + { + PostDropOp(); + } + std::error_code Ec; + (void)DeleteDirectories(DroppedPath, Ec); + if (Ec) + { + ZEN_WARN("Failed to clean up dropped bucket directory '{}', reason: '{}'", DroppedPath, Ec.message()); + } + }; + } } void @@ -4144,16 +4322,16 @@ ZenCacheDiskLayer::Flush() } { WorkerThreadPool& Pool = GetMediumWorkerPool(EWorkloadType::Burst); - Latch WorkLatch(1); + std::atomic<bool> AbortFlag; + std::atomic<bool> PauseFlag; + ParallelWork Work(AbortFlag, PauseFlag); try { for (auto& Bucket : Buckets) { - WorkLatch.AddCount(1); - Pool.ScheduleWork([&WorkLatch, Bucket]() { + Work.ScheduleWork(Pool, [Bucket](std::atomic<bool>&) { ZEN_MEMSCOPE(GetCacheDiskTag()); - auto _ = MakeGuard([&]() { WorkLatch.CountDown(); }); try { Bucket->Flush(); @@ -4167,13 +4345,14 @@ ZenCacheDiskLayer::Flush() } catch (const std::exception& Ex) { + AbortFlag.store(true); ZEN_ERROR("Failed to flush buckets at '{}'. Reason: '{}'", m_RootDir, Ex.what()); } - WorkLatch.CountDown(); - while (!WorkLatch.Wait(1000)) - { - ZEN_DEBUG("Waiting for {} buckets at '{}' to flush", WorkLatch.Remaining(), m_RootDir); - } + + Work.Wait(1000, [&](bool IsAborted, bool IsPaused, std::ptrdiff_t RemainingWork) { + ZEN_UNUSED(IsAborted, IsPaused); + ZEN_DEBUG("Waiting for {} buckets at '{}' to flush", RemainingWork, m_RootDir); + }); } } diff --git a/src/zenstore/cache/cacherpc.cpp b/src/zenstore/cache/cacherpc.cpp index 20c244250..436e8a083 100644 --- a/src/zenstore/cache/cacherpc.cpp +++ b/src/zenstore/cache/cacherpc.cpp @@ -20,6 +20,8 @@ #include <zencore/memory/llm.h> +#include <EASTL/fixed_vector.h> + ////////////////////////////////////////////////////////////////////////// namespace zen { @@ -89,7 +91,7 @@ GetRpcRequestCacheKey(const CbObjectView& KeyView, CacheKey& Key) return false; } IoHash Hash = HashField.AsHash(); - Key = CacheKey::Create(*Bucket, Hash); + Key = CacheKey::CreateValidated(std::move(*Bucket), Hash); return true; } @@ -218,6 +220,11 @@ CacheRpcHandler::HandleRpcRequest(const CacheRequestContext& Context, ZEN_WARN("Content format not supported, expected package message format"); return RpcResponseCode::BadRequest; } + if (CbValidateError Error = ValidateCompactBinary(Object.GetView(), CbValidateMode::Default); Error != CbValidateError::None) + { + ZEN_WARN("Content format is corrupt, compact binary format validation failed. Reason: '{}'", ToString(Error)); + return RpcResponseCode::BadRequest; + } } if (!UriNamespace.empty()) @@ -305,7 +312,7 @@ CacheRpcHandler::HandleRpcPutCacheRecords(const CacheRequestContext& Context, co } DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default; - std::vector<bool> Results; + eastl::fixed_vector<bool, 32> Results; CbArrayView RequestsArray = Params["Requests"sv].AsArrayView(); for (CbFieldView RequestField : RequestsArray) @@ -495,16 +502,15 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb bool Exists = false; bool ReadFromUpstream = false; }; - struct RecordRequestData + struct RecordRequestData : public CacheKeyRequest { - CacheKeyRequest Upstream; - CbObjectView RecordObject; - IoBuffer RecordCacheValue; - CacheRecordPolicy DownstreamPolicy; - std::vector<ValueRequestData> Values; - bool Complete = false; - const UpstreamEndpointInfo* Source = nullptr; - uint64_t ElapsedTimeUs; + CbObjectView RecordObject; + IoBuffer RecordCacheValue; + CacheRecordPolicy DownstreamPolicy; + eastl::fixed_vector<ValueRequestData, 4> Values; + bool Complete = false; + const UpstreamEndpointInfo* Source = nullptr; + uint64_t ElapsedTimeUs; }; std::string_view PolicyText = Params["DefaultPolicy"sv].AsString(); @@ -517,8 +523,8 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb const bool HasUpstream = m_UpstreamCache.IsActive(); - std::vector<RecordRequestData> Requests; - std::vector<size_t> UpstreamIndexes; + eastl::fixed_vector<RecordRequestData, 16> Requests; + eastl::fixed_vector<size_t, 16> UpstreamIndexes; auto ParseValues = [](RecordRequestData& Request) { CbArrayView ValuesArray = Request.RecordObject["Values"sv].AsArrayView(); @@ -549,7 +555,7 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb CbObjectView RequestObject = RequestField.AsObjectView(); CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView(); - CacheKey& Key = Request.Upstream.Key; + CacheKey& Key = Request.Key; if (!GetRpcRequestCacheKey(KeyObject, Key)) { return CbPackage{}; @@ -571,6 +577,13 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb { FoundLocalInvalid = true; } + else if (CbValidateError Error = ValidateCompactBinary(Request.RecordCacheValue.GetView(), CbValidateMode::Default); + Error != CbValidateError::None) + { + ZEN_WARN("HandleRpcGetCacheRecords stored record is corrupt, compact binary format validation failed. Reason: '{}'", + ToString(Error)); + FoundLocalInvalid = true; + } else { Request.RecordObject = CbObjectView(Request.RecordCacheValue.GetData()); @@ -654,7 +667,7 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb { m_CidStore.IterateChunks( CidHashes, - [this, &Request, ValueCount, &RequestValueIndexes](size_t Index, const IoBuffer& Payload) -> bool { + [this, &Request, &RequestValueIndexes](size_t Index, const IoBuffer& Payload) -> bool { try { const size_t ValueIndex = RequestValueIndexes[Index]; @@ -721,7 +734,7 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb for (size_t Index : UpstreamIndexes) { RecordRequestData& Request = Requests[Index]; - UpstreamRequests.push_back(&Request.Upstream); + UpstreamRequests.push_back(&Request); if (Request.Values.size()) { @@ -735,13 +748,13 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb UpstreamPolicy |= !Value.ReadFromUpstream ? CachePolicy::SkipData : CachePolicy::None; Builder.AddValuePolicy(Value.ValueId, UpstreamPolicy); } - Request.Upstream.Policy = Builder.Build(); + Request.Policy = Builder.Build(); } else { // We don't know which Values exist in the Record; ask the upstrem for all values that the client wants, // and convert the CacheRecordPolicy to an upstream policy - Request.Upstream.Policy = Request.DownstreamPolicy.ConvertToUpstream(); + Request.Policy = Request.DownstreamPolicy.ConvertToUpstream(); } } @@ -751,10 +764,9 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb return; } - RecordRequestData& Request = - *reinterpret_cast<RecordRequestData*>(reinterpret_cast<char*>(&Params.Request) - offsetof(RecordRequestData, Upstream)); + RecordRequestData& Request = *static_cast<RecordRequestData*>(&Params.Request); Request.ElapsedTimeUs += static_cast<uint64_t>(Params.ElapsedSeconds * 1000000.0); - const CacheKey& Key = Request.Upstream.Key; + const CacheKey& Key = Request.Key; Stopwatch Timer; auto TimeGuard = MakeGuard([&Timer, &Request]() { Request.ElapsedTimeUs += Timer.GetElapsedTimeUs(); }); if (!Request.RecordObject) @@ -852,10 +864,12 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb CbPackage ResponsePackage; CbObjectWriter ResponseObject{2048}; + ResponsePackage.ReserveAttachments(Requests.size()); + ResponseObject.BeginArray("Result"sv); for (RecordRequestData& Request : Requests) { - const CacheKey& Key = Request.Upstream.Key; + const CacheKey& Key = Request.Key; if (Request.Complete || (Request.RecordObject && EnumHasAllFlags(Request.DownstreamPolicy.GetRecordPolicy(), CachePolicy::PartialRecord))) { @@ -930,11 +944,12 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con const bool HasUpstream = m_UpstreamCache.IsActive(); CbArrayView RequestsArray = Params["Requests"sv].AsArrayView(); - std::vector<ZenCacheStore::PutResult> BatchResults; - std::vector<size_t> BatchResultIndexes; - std::vector<ZenCacheStore::PutResult> Results; - std::vector<CacheKey> UpstreamCacheKeys; - uint64_t RequestCount = RequestsArray.Num(); + std::vector<ZenCacheStore::PutResult> BatchResults; + eastl::fixed_vector<size_t, 32> BatchResultIndexes; + eastl::fixed_vector<ZenCacheStore::PutResult, 32> Results; + eastl::fixed_vector<CacheKey, 32> UpstreamCacheKeys; + + uint64_t RequestCount = RequestsArray.Num(); { Results.reserve(RequestCount); std::unique_ptr<ZenCacheStore::PutBatch> Batch; @@ -1145,15 +1160,15 @@ CacheRpcHandler::HandleRpcGetCacheValues(const CacheRequestContext& Context, CbO uint64_t RawSize = 0; CompressedBuffer Result; }; - std::vector<RequestData> Requests; + eastl::fixed_vector<RequestData, 16> Requests; - std::vector<size_t> RemoteRequestIndexes; + eastl::fixed_vector<size_t, 16> RemoteRequestIndexes; const bool HasUpstream = m_UpstreamCache.IsActive(); - CbArrayView RequestsArray = Params["Requests"sv].AsArrayView(); - std::vector<ZenCacheValue> CacheValues; - const uint64_t RequestCount = RequestsArray.Num(); + CbArrayView RequestsArray = Params["Requests"sv].AsArrayView(); + ZenCacheValueVec_t CacheValues; + const uint64_t RequestCount = RequestsArray.Num(); CacheValues.reserve(RequestCount); { std::unique_ptr<ZenCacheStore::GetBatch> Batch; @@ -1182,7 +1197,6 @@ CacheRpcHandler::HandleRpcGetCacheValues(const CacheRequestContext& Context, CbO CacheKey& Key = Request.Key; CachePolicy Policy = Request.Policy; - ZenCacheValue CacheValue; if (EnumHasAllFlags(Policy, CachePolicy::QueryLocal)) { if (Batch) @@ -1328,6 +1342,9 @@ CacheRpcHandler::HandleRpcGetCacheValues(const CacheRequestContext& Context, CbO ZEN_TRACE_CPU("Z$::RpcGetCacheValues::Response"); CbPackage RpcResponse; CbObjectWriter ResponseObject{1024}; + + RpcResponse.ReserveAttachments(Requests.size()); + ResponseObject.BeginArray("Result"sv); for (const RequestData& Request : Requests) { @@ -1622,18 +1639,27 @@ CacheRpcHandler::GetLocalCacheRecords(const CacheRequestContext& Context, Record.ValuesRead = true; if (Record.CacheValue && Record.CacheValue.GetContentType() == ZenContentType::kCbObject) { - CbObjectView RecordObject = CbObjectView(Record.CacheValue.GetData()); - CbArrayView ValuesArray = RecordObject["Values"sv].AsArrayView(); - Record.Values.reserve(ValuesArray.Num()); - for (CbFieldView ValueField : ValuesArray) + if (CbValidateError Error = ValidateCompactBinary(Record.CacheValue.GetView(), CbValidateMode::Default); + Error != CbValidateError::None) { - CbObjectView ValueObject = ValueField.AsObjectView(); - Oid ValueId = ValueObject["Id"sv].AsObjectId(); - CbFieldView RawHashField = ValueObject["RawHash"sv]; - IoHash RawHash = RawHashField.AsBinaryAttachment(); - if (ValueId && !RawHashField.HasError()) + ZEN_WARN("GetLocalCacheRecords stored record for is corrupt, compact binary format validation failed. Reason: '{}'", + ToString(Error)); + } + else + { + CbObjectView RecordObject = CbObjectView(Record.CacheValue.GetData()); + CbArrayView ValuesArray = RecordObject["Values"sv].AsArrayView(); + Record.Values.reserve(ValuesArray.Num()); + for (CbFieldView ValueField : ValuesArray) { - Record.Values.push_back({ValueId, RawHash, ValueObject["RawSize"sv].AsUInt64()}); + CbObjectView ValueObject = ValueField.AsObjectView(); + Oid ValueId = ValueObject["Id"sv].AsObjectId(); + CbFieldView RawHashField = ValueObject["RawHash"sv]; + IoHash RawHash = RawHashField.AsBinaryAttachment(); + if (ValueId && !RawHashField.HasError()) + { + Record.Values.push_back({ValueId, RawHash, ValueObject["RawSize"sv].AsUInt64()}); + } } } } @@ -1706,7 +1732,7 @@ CacheRpcHandler::GetLocalCacheValues(const CacheRequestContext& Context, using namespace cache::detail; const bool HasUpstream = m_UpstreamCache.IsActive(); - std::vector<ZenCacheValue> Chunks; + ZenCacheValueVec_t Chunks; Chunks.reserve(ValueRequests.size()); { std::unique_ptr<ZenCacheStore::GetBatch> Batch; @@ -1866,6 +1892,8 @@ CacheRpcHandler::WriteGetCacheChunksResponse([[maybe_unused]] const CacheRequest CbPackage RpcResponse; CbObjectWriter Writer{1024}; + RpcResponse.ReserveAttachments(Requests.size()); + Writer.BeginArray("Result"sv); for (ChunkRequest& Request : Requests) { diff --git a/src/zenstore/cache/structuredcachestore.cpp b/src/zenstore/cache/structuredcachestore.cpp index a3f80099f..973af52b2 100644 --- a/src/zenstore/cache/structuredcachestore.cpp +++ b/src/zenstore/cache/structuredcachestore.cpp @@ -178,13 +178,13 @@ ZenCacheNamespace::EndPutBatch(PutBatchHandle* Batch) noexcept struct ZenCacheNamespace::GetBatchHandle { - GetBatchHandle(std::vector<ZenCacheValue>& OutResult) : Results(OutResult) {} - std::vector<ZenCacheValue>& Results; + GetBatchHandle(ZenCacheValueVec_t& OutResult) : Results(OutResult) {} + ZenCacheValueVec_t& Results; ZenCacheDiskLayer::GetBatchHandle* DiskLayerHandle = nullptr; }; ZenCacheNamespace::GetBatchHandle* -ZenCacheNamespace::BeginGetBatch(std::vector<ZenCacheValue>& OutResult) +ZenCacheNamespace::BeginGetBatch(ZenCacheValueVec_t& OutResult) { ZenCacheNamespace::GetBatchHandle* Handle = new ZenCacheNamespace::GetBatchHandle(OutResult); Handle->DiskLayerHandle = m_DiskLayer.BeginGetBatch(OutResult); @@ -282,11 +282,14 @@ ZenCacheNamespace::DropBucket(std::string_view Bucket) { ZEN_INFO("dropping bucket '{}'", Bucket); - const bool Dropped = m_DiskLayer.DropBucket(Bucket); - - ZEN_INFO("bucket '{}' was {}", Bucket, Dropped ? "dropped" : "not found"); - - return Dropped; + std::function<void()> PostDropOp = m_DiskLayer.DropBucket(Bucket); + if (!PostDropOp) + { + ZEN_INFO("bucket '{}' was not found in {}", Bucket, m_RootDir); + return false; + } + PostDropOp(); + return true; } void @@ -296,9 +299,10 @@ ZenCacheNamespace::EnumerateBucketContents(std::string_view m_DiskLayer.EnumerateBucketContents(Bucket, Fn); } -bool +std::function<void()> ZenCacheNamespace::Drop() { + m_Gc.RemoveGcStorage(this); return m_DiskLayer.Drop(); } @@ -585,7 +589,7 @@ ZenCacheStore::PutBatch::~PutBatch() } } -ZenCacheStore::GetBatch::GetBatch(ZenCacheStore& CacheStore, std::string_view InNamespace, std::vector<ZenCacheValue>& OutResult) +ZenCacheStore::GetBatch::GetBatch(ZenCacheStore& CacheStore, std::string_view InNamespace, ZenCacheValueVec_t& OutResult) : m_CacheStore(CacheStore) , Results(OutResult) { @@ -800,16 +804,27 @@ ZenCacheStore::DropBucket(std::string_view Namespace, std::string_view Bucket) bool ZenCacheStore::DropNamespace(std::string_view InNamespace) { - RwLock::SharedLockScope _(m_NamespacesLock); - if (auto It = m_Namespaces.find(std::string(InNamespace)); It != m_Namespaces.end()) + std::function<void()> PostDropOp; + { + RwLock::SharedLockScope _(m_NamespacesLock); + if (auto It = m_Namespaces.find(std::string(InNamespace)); It != m_Namespaces.end()) + { + ZenCacheNamespace& Namespace = *It->second; + m_DroppedNamespaces.push_back(std::move(It->second)); + m_Namespaces.erase(It); + PostDropOp = Namespace.Drop(); + } + else + { + ZEN_WARN("request for unknown namespace '{}' in ZenCacheStore::DropNamespace", InNamespace); + return false; + } + } + if (PostDropOp) { - ZenCacheNamespace& Namespace = *It->second; - m_DroppedNamespaces.push_back(std::move(It->second)); - m_Namespaces.erase(It); - return Namespace.Drop(); + PostDropOp(); } - ZEN_WARN("request for unknown namespace '{}' in ZenCacheStore::DropNamespace", InNamespace); - return false; + return true; } void |