diff options
| author | Liam Mitchell <[email protected]> | 2025-08-21 23:58:51 +0000 |
|---|---|---|
| committer | Liam Mitchell <[email protected]> | 2025-08-21 23:58:51 +0000 |
| commit | 33209bd6931f49362dfc2d62c6cb6b87a42c99e1 (patch) | |
| tree | cfc7914634088b3f4feac2d4cec0b5650dfdcc3c /src/zenstore | |
| parent | Fix changelog merge issues (diff) | |
| parent | avoid new in static IoBuffer (#472) (diff) | |
| download | zen-33209bd6931f49362dfc2d62c6cb6b87a42c99e1.tar.xz zen-33209bd6931f49362dfc2d62c6cb6b87a42c99e1.zip | |
Merge remote-tracking branch 'origin/main' into de/zen-service-command
Diffstat (limited to 'src/zenstore')
| -rw-r--r-- | src/zenstore/buildstore/buildstore.cpp | 782 | ||||
| -rw-r--r-- | src/zenstore/cache/cachedisklayer.cpp | 320 | ||||
| -rw-r--r-- | src/zenstore/cache/cacherpc.cpp | 288 | ||||
| -rw-r--r-- | src/zenstore/cache/structuredcachestore.cpp | 134 | ||||
| -rw-r--r-- | src/zenstore/cas.cpp | 20 | ||||
| -rw-r--r-- | src/zenstore/cas.h | 5 | ||||
| -rw-r--r-- | src/zenstore/cidstore.cpp | 20 | ||||
| -rw-r--r-- | src/zenstore/compactcas.h | 2 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/buildstore/buildstore.h | 87 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/cache/cachedisklayer.h | 99 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/cache/cacherpc.h | 18 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/cache/cacheshared.h | 8 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/cache/structuredcachestore.h | 56 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/cache/upstreamcacheclient.h | 2 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/cidstore.h | 5 |
15 files changed, 966 insertions, 880 deletions
diff --git a/src/zenstore/buildstore/buildstore.cpp b/src/zenstore/buildstore/buildstore.cpp index 20dc55bca..1b2cf036b 100644 --- a/src/zenstore/buildstore/buildstore.cpp +++ b/src/zenstore/buildstore/buildstore.cpp @@ -3,6 +3,7 @@ #include <zenstore/buildstore/buildstore.h> #include <zencore/compactbinarybuilder.h> +#include <zencore/compress.h> #include <zencore/fmtutils.h> #include <zencore/logging.h> #include <zencore/memory/llm.h> @@ -20,7 +21,6 @@ ZEN_THIRD_PARTY_INCLUDES_END #if ZEN_WITH_TESTS # include <zencore/compactbinarybuilder.h> -# include <zencore/compress.h> # include <zencore/testing.h> # include <zencore/testutils.h> # include <zenutil/workerpools.h> @@ -45,7 +45,7 @@ namespace blobstore::impl { const char* LogExtension = ".slog"; const char* AccessTimeExtension = ".zacs"; - const uint32_t ManifestVersion = (1 << 16) | (0 << 8) | (0); + const uint32_t ManifestVersion = (2 << 16) | (0 << 8) | (0); std::filesystem::path GetManifestPath(const std::filesystem::path& RootDirectory) { @@ -106,13 +106,11 @@ namespace blobstore::impl { } // namespace blobstore::impl -BuildStore::BuildStore(const BuildStoreConfig& Config, GcManager& Gc) +BuildStore::BuildStore(const BuildStoreConfig& Config, GcManager& Gc, CidStore& BlobStore) : m_Log(logging::Get("builds")) , m_Config(Config) , m_Gc(Gc) -, m_LargeBlobStore(m_Gc) -, m_SmallBlobStore(Gc) -, m_MetadataBlockStore() +, m_BlobStore(BlobStore) { ZEN_TRACE_CPU("BuildStore::BuildStore"); ZEN_MEMSCOPE(GetBuildstoreTag()); @@ -170,75 +168,16 @@ BuildStore::BuildStore(const BuildStoreConfig& Config, GcManager& Gc) ManifestWriter.AddDateTime("createdAt", DateTime::Now()); TemporaryFile::SafeWriteFile(ManifestPath, ManifestWriter.Save().GetBuffer().AsIoBuffer()); } - m_LargeBlobStore.Initialize(Config.RootDirectory / "file_cas", IsNew); - m_SmallBlobStore.Initialize(Config.RootDirectory, - "blob_cas", - m_Config.SmallBlobBlockStoreMaxBlockSize, - m_Config.SmallBlobBlockStoreAlignement, - IsNew); - m_MetadataBlockStore.Initialize(Config.RootDirectory / "metadata", m_Config.MetadataBlockStoreMaxBlockSize, 1u << 20); - - BlockStore::BlockIndexSet KnownBlocks; - for (const BlobEntry& Blob : m_BlobEntries) - { - if (const MetadataIndex MetaIndex = Blob.Metadata; MetaIndex) - { - const MetadataEntry& Metadata = m_MetadataEntries[MetaIndex]; - KnownBlocks.insert(Metadata.Location.BlockIndex); - } - } - BlockStore::BlockIndexSet MissingBlocks = m_MetadataBlockStore.SyncExistingBlocksOnDisk(KnownBlocks); m_PayloadlogFile.Open(BlobLogPath, CasLogFile::Mode::kWrite); m_MetadatalogFile.Open(MetaLogPath, CasLogFile::Mode::kWrite); - if (!MissingBlocks.empty()) - { - std::vector<MetadataDiskEntry> MissingMetadatas; - for (auto& It : m_BlobLookup) - { - const IoHash& BlobHash = It.first; - const BlobIndex ReadBlobIndex = It.second; - const BlobEntry& ReadBlobEntry = m_BlobEntries[ReadBlobIndex]; - if (ReadBlobEntry.Metadata) - { - const MetadataEntry& MetaData = m_MetadataEntries[ReadBlobEntry.Metadata]; - if (MissingBlocks.contains(MetaData.Location.BlockIndex)) - { - MissingMetadatas.push_back( - MetadataDiskEntry{.Entry = m_MetadataEntries[ReadBlobEntry.Metadata], .BlobHash = BlobHash}); - MissingMetadatas.back().Entry.Flags |= MetadataEntry::kTombStone; - m_MetadataEntries[ReadBlobEntry.Metadata] = {}; - m_BlobEntries[ReadBlobIndex].Metadata = {}; - } - } - } - ZEN_ASSERT(!MissingMetadatas.empty()); - - for (const MetadataDiskEntry& Entry : MissingMetadatas) - { - auto It = m_BlobLookup.find(Entry.BlobHash); - ZEN_ASSERT(It != m_BlobLookup.end()); - - const BlobIndex ReadBlobIndex = It->second; - const BlobEntry& ReadBlobEntry = m_BlobEntries[ReadBlobIndex]; - if (!ReadBlobEntry.Payload) - { - m_BlobLookup.erase(It); - } - } - m_MetadatalogFile.Append(MissingMetadatas); - CompactState(); - } - m_Gc.AddGcReferencer(*this); m_Gc.AddGcReferenceLocker(*this); - m_Gc.AddGcStorage(this); } catch (const std::exception& Ex) { ZEN_ERROR("Failed to initialize build store. Reason: '{}'", Ex.what()); - m_Gc.RemoveGcStorage(this); m_Gc.RemoveGcReferenceLocker(*this); m_Gc.RemoveGcReferencer(*this); } @@ -249,7 +188,6 @@ BuildStore::~BuildStore() try { ZEN_TRACE_CPU("BuildStore::~BuildStore"); - m_Gc.RemoveGcStorage(this); m_Gc.RemoveGcReferenceLocker(*this); m_Gc.RemoveGcReferencer(*this); Flush(); @@ -280,21 +218,12 @@ BuildStore::PutBlob(const IoHash& BlobHash, const IoBuffer& Payload) } } - uint64_t PayloadSize = Payload.GetSize(); - PayloadEntry Entry; - if (Payload.GetSize() > m_Config.SmallBlobBlockStoreMaxBlockEmbedSize) - { - CasStore::InsertResult Result = m_LargeBlobStore.InsertChunk(Payload, BlobHash); - ZEN_UNUSED(Result); - Entry = PayloadEntry(PayloadEntry::kStandalone, PayloadSize); - } - else - { - CasStore::InsertResult Result = m_SmallBlobStore.InsertChunk(Payload, BlobHash); - ZEN_UNUSED(Result); - Entry = PayloadEntry(0, PayloadSize); - } + uint64_t PayloadSize = Payload.GetSize(); + CidStore::InsertResult Result = m_BlobStore.AddChunk(Payload, BlobHash); + PayloadEntry Entry = PayloadEntry(0, PayloadSize); + ; + IoHash MetadataHash; { RwLock::ExclusiveLockScope _(m_Lock); if (auto It = m_BlobLookup.find(BlobHash); It != m_BlobLookup.end()) @@ -310,6 +239,10 @@ BuildStore::PutBlob(const IoHash& BlobHash, const IoBuffer& Payload) Blob.Payload = PayloadIndex(gsl::narrow<uint32_t>(m_PayloadEntries.size())); m_PayloadEntries.push_back(Entry); } + if (Blob.Metadata) + { + MetadataHash = m_MetadataEntries[Blob.Metadata].MetadataHash; + } Blob.LastAccessTime = GcClock::TickCount(); } else @@ -322,6 +255,16 @@ BuildStore::PutBlob(const IoHash& BlobHash, const IoBuffer& Payload) m_BlobEntries.push_back(BlobEntry{.Payload = NewPayloadIndex, .LastAccessTime = AccessTime(GcClock::TickCount())}); m_BlobLookup.insert({BlobHash, NewBlobIndex}); } + + m_LastAccessTimeUpdateCount++; + if (m_TrackedBlobKeys) + { + m_TrackedBlobKeys->push_back(BlobHash); + if (MetadataHash != IoHash::Zero) + { + m_TrackedBlobKeys->push_back(BlobHash); + } + } } m_PayloadlogFile.Append(PayloadDiskEntry{.Entry = Entry, .BlobHash = BlobHash}); m_LastAccessTimeUpdateCount++; @@ -340,21 +283,9 @@ BuildStore::GetBlob(const IoHash& BlobHash) Blob.LastAccessTime = GcClock::TickCount(); if (Blob.Payload) { - const PayloadEntry& Entry = m_PayloadEntries[Blob.Payload]; - const bool IsStandalone = (Entry.GetFlags() & PayloadEntry::kStandalone) != 0; Lock.ReleaseNow(); + IoBuffer Chunk = m_BlobStore.FindChunkByCid(BlobHash); - IoBuffer Chunk; - if (IsStandalone) - { - ZEN_TRACE_CPU("GetLarge"); - Chunk = m_LargeBlobStore.FindChunk(BlobHash); - } - else - { - ZEN_TRACE_CPU("GetSmall"); - Chunk = m_SmallBlobStore.FindChunk(BlobHash); - } if (Chunk) { Chunk.SetContentType(ZenContentType::kCompressedBinary); @@ -362,7 +293,7 @@ BuildStore::GetBlob(const IoHash& BlobHash) } else { - ZEN_WARN("Inconsistencies in build store, {} is in index but not {}", BlobHash, IsStandalone ? "on disk" : "in block"); + ZEN_WARN("Inconsistencies in build store, {} is in index but not in blob store", BlobHash); } } } @@ -381,10 +312,10 @@ BuildStore::BlobsExists(std::span<const IoHash> BlobHashes) { if (auto It = m_BlobLookup.find(BlobHash); It != m_BlobLookup.end()) { - const BlobIndex ExistingBlobIndex = It->second; - BlobEntry& Blob = m_BlobEntries[ExistingBlobIndex]; - bool HasPayload = !!Blob.Payload; - bool HasMetadata = !!Blob.Metadata; + const BlobIndex ExistingBlobIndex = It->second; + const BlobEntry& Blob = m_BlobEntries[ExistingBlobIndex]; + bool HasPayload = !!Blob.Payload; + bool HasMetadata = !!Blob.Metadata; Result.push_back(BlobExistsResult{.HasBody = HasPayload, .HasMetadata = HasMetadata}); } else @@ -396,20 +327,82 @@ BuildStore::BlobsExists(std::span<const IoHash> BlobHashes) } void -BuildStore::PutMetadatas(std::span<const IoHash> BlobHashes, std::span<const IoBuffer> MetaDatas) +BuildStore::PutMetadatas(std::span<const IoHash> BlobHashes, std::span<const IoBuffer> Metadatas, WorkerThreadPool* OptionalWorkerPool) { ZEN_TRACE_CPU("BuildStore::PutMetadatas"); ZEN_MEMSCOPE(GetBuildstoreTag()); - size_t WriteBlobIndex = 0; - m_MetadataBlockStore.WriteChunks(MetaDatas, m_Config.MetadataBlockStoreAlignement, [&](std::span<BlockStoreLocation> Locations) { + std::vector<IoHash> MetadataHashes; + std::vector<IoBuffer> CompressedMetadataBuffers; + + auto CompressOne = [&BlobHashes, &MetadataHashes, &CompressedMetadataBuffers](const IoBuffer& Buffer, size_t Index) { + if (Buffer.GetContentType() == ZenContentType::kCompressedBinary) + { + IoHash RawHash; + uint64_t RawSize; + if (!CompressedBuffer::FromCompressed(SharedBuffer(Buffer), RawHash, RawSize)) + { + throw std::runtime_error( + fmt::format("Invalid compressed buffer provided when storing metadata for blob {}", BlobHashes[Index])); + } + else + { + CompressedMetadataBuffers[Index] = Buffer; + MetadataHashes[Index] = RawHash; + } + } + else + { + CompressedBuffer Compressed = + CompressedBuffer::Compress(SharedBuffer(Buffer), OodleCompressor::Mermaid, OodleCompressionLevel::None); + MetadataHashes[Index] = Compressed.DecodeRawHash(); + CompressedMetadataBuffers[Index] = std::move(Compressed.GetCompressed()).Flatten().AsIoBuffer(); + CompressedMetadataBuffers[Index].SetContentType(ZenContentType::kCompressedBinary); + } + }; + + MetadataHashes.resize(Metadatas.size()); + CompressedMetadataBuffers.resize(Metadatas.size()); + if (OptionalWorkerPool) + { + std::atomic<bool> AbortFlag; + std::atomic<bool> PauseFlag; + ParallelWork Work(AbortFlag, PauseFlag); + for (size_t Index = 0; Index < Metadatas.size(); Index++) + { + Work.ScheduleWork( + *OptionalWorkerPool, + [Index, &BlobHashes, &Metadatas, &MetadataHashes, &CompressedMetadataBuffers, &CompressOne](std::atomic<bool>&) { + const IoBuffer& Buffer = Metadatas[Index]; + CompressOne(Buffer, Index); + }, + {}); + } + Work.Wait(); + } + else + { + for (size_t Index = 0; Index < Metadatas.size(); Index++) + { + const IoBuffer& Buffer = Metadatas[Index]; + CompressOne(Buffer, Index); + } + } + + std::vector<MetadataDiskEntry> AddedMetadataEntries; + AddedMetadataEntries.reserve(MetadataHashes.size()); + + std::vector<CidStore::InsertResult> InsertResults = m_BlobStore.AddChunks(CompressedMetadataBuffers, MetadataHashes); + ZEN_UNUSED(InsertResults); + { RwLock::ExclusiveLockScope _(m_Lock); - for (size_t LocationIndex = 0; LocationIndex < Locations.size(); LocationIndex++) + for (size_t Index = 0; Index < BlobHashes.size(); Index++) { - const IoBuffer& Data = MetaDatas[WriteBlobIndex]; - const IoHash& BlobHash = BlobHashes[WriteBlobIndex]; - const BlockStoreLocation& Location = Locations[LocationIndex]; + const ZenContentType ContentType = Metadatas[Index].GetContentType(); + const IoHash& BlobHash = BlobHashes[Index]; + const IoHash& MetadataHash = MetadataHashes[Index]; + const IoBuffer& Metadata = CompressedMetadataBuffers[Index]; - MetadataEntry Entry = {.Location = Location, .ContentType = Data.GetContentType(), .Flags = 0}; + MetadataEntry Entry(MetadataHash, Metadata.GetSize(), ContentType, 0); if (auto It = m_BlobLookup.find(BlobHash); It != m_BlobLookup.end()) { @@ -435,17 +428,16 @@ BuildStore::PutMetadatas(std::span<const IoHash> BlobHashes, std::span<const IoB m_BlobEntries.push_back(BlobEntry{.Metadata = NewMetadataIndex, .LastAccessTime = AccessTime(GcClock::TickCount())}); m_BlobLookup.insert({BlobHash, NewBlobIndex}); } - - m_MetadatalogFile.Append(MetadataDiskEntry{.Entry = Entry, .BlobHash = BlobHash}); - m_LastAccessTimeUpdateCount++; - WriteBlobIndex++; - if (m_TrackedCacheKeys) + if (m_TrackedBlobKeys) { - m_TrackedCacheKeys->insert(BlobHash); + m_TrackedBlobKeys->push_back(BlobHash); + m_TrackedBlobKeys->push_back(MetadataHash); } + AddedMetadataEntries.push_back(MetadataDiskEntry{.Entry = Entry, .BlobHash = BlobHash}); } - }); + } + m_MetadatalogFile.Append(AddedMetadataEntries); } std::vector<IoBuffer> @@ -453,9 +445,9 @@ BuildStore::GetMetadatas(std::span<const IoHash> BlobHashes, WorkerThreadPool* O { ZEN_TRACE_CPU("BuildStore::GetMetadatas"); ZEN_MEMSCOPE(GetBuildstoreTag()); - std::vector<BlockStoreLocation> MetaLocations; - std::vector<size_t> MetaLocationResultIndexes; - MetaLocations.reserve(BlobHashes.size()); + std::vector<IoHash> MetadataHashes; + std::vector<size_t> MetaLocationResultIndexes; + MetadataHashes.reserve(BlobHashes.size()); MetaLocationResultIndexes.reserve(BlobHashes.size()); tsl::robin_set<uint32_t> ReferencedBlocks; @@ -475,10 +467,9 @@ BuildStore::GetMetadatas(std::span<const IoHash> BlobHashes, WorkerThreadPool* O if (ExistingBlobEntry.Metadata) { const MetadataEntry& ExistingMetadataEntry = m_MetadataEntries[ExistingBlobEntry.Metadata]; - MetaLocations.push_back(ExistingMetadataEntry.Location); + MetadataHashes.push_back(ExistingMetadataEntry.MetadataHash); MetaLocationResultIndexes.push_back(Index); - ReferencedBlocks.insert(ExistingMetadataEntry.Location.BlockIndex); - ResultContentTypes[Index] = ExistingMetadataEntry.ContentType; + ResultContentTypes[Index] = ExistingMetadataEntry.GetContentType(); } ExistingBlobEntry.LastAccessTime = AccessTime(GcClock::TickCount()); m_LastAccessTimeUpdateCount++; @@ -486,100 +477,35 @@ BuildStore::GetMetadatas(std::span<const IoHash> BlobHashes, WorkerThreadPool* O } } - auto DoOneBlock = [this](std::span<const BlockStoreLocation> MetaLocations, - std::span<const size_t> MetaLocationResultIndexes, - std::span<const size_t> ChunkIndexes, - std::vector<IoBuffer>& Result) { - if (ChunkIndexes.size() < 4) - { - for (size_t ChunkIndex : ChunkIndexes) + m_BlobStore.IterateChunks( + MetadataHashes, + [this, &BlobHashes, &MetadataHashes, &MetaLocationResultIndexes, &Result](size_t Index, const IoBuffer& Payload) { + if (Payload) { - IoBuffer Chunk = m_MetadataBlockStore.TryGetChunk(MetaLocations[ChunkIndex]); - if (Chunk) - { - size_t ResultIndex = MetaLocationResultIndexes[ChunkIndex]; - Result[ResultIndex] = std::move(Chunk); - } - } - return true; - } - return m_MetadataBlockStore.IterateBlock( - MetaLocations, - ChunkIndexes, - [&MetaLocationResultIndexes, &Result](size_t ChunkIndex, const void* Data, uint64_t Size) { - if (Data != nullptr) + size_t ResultIndex = MetaLocationResultIndexes[Index]; + IoHash RawHash; + uint64_t RawSize; + CompressedBuffer CompressedBuffer = CompressedBuffer::FromCompressed(SharedBuffer(Payload), RawHash, RawSize); + if (CompressedBuffer) { - size_t ResultIndex = MetaLocationResultIndexes[ChunkIndex]; - Result[ResultIndex] = IoBuffer(IoBuffer::Clone, Data, Size); - } - return true; - }, - [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) { - size_t ResultIndex = MetaLocationResultIndexes[ChunkIndex]; - Result[ResultIndex] = File.GetChunk(Offset, Size); - return true; - }, - 8u * 1024u); - }; - - if (!MetaLocations.empty()) - { - std::atomic<bool> AbortFlag; - std::atomic<bool> PauseFlag; - ParallelWork Work(AbortFlag, PauseFlag); - - try - { - m_MetadataBlockStore.IterateChunks( - MetaLocations, - [this, OptionalWorkerPool, &Work, &Result, &MetaLocations, &MetaLocationResultIndexes, &ReferencedBlocks, DoOneBlock]( - uint32_t BlockIndex, - std::span<const size_t> ChunkIndexes) -> bool { - ZEN_UNUSED(BlockIndex); - if (ChunkIndexes.size() == MetaLocations.size() || OptionalWorkerPool == nullptr || ReferencedBlocks.size() == 1) + IoBuffer Decompressed = CompressedBuffer.DecompressToComposite().Flatten().AsIoBuffer(); + if (Decompressed) { - return DoOneBlock(MetaLocations, MetaLocationResultIndexes, ChunkIndexes, Result); + Result[ResultIndex] = std::move(Decompressed); } else { - ZEN_ASSERT(OptionalWorkerPool != nullptr); - std::vector<size_t> TmpChunkIndexes(ChunkIndexes.begin(), ChunkIndexes.end()); - Work.ScheduleWork( - *OptionalWorkerPool, - [this, - &Result, - &MetaLocations, - &MetaLocationResultIndexes, - DoOneBlock, - ChunkIndexes = std::move(TmpChunkIndexes)](std::atomic<bool>& AbortFlag) { - if (AbortFlag) - { - return; - } - try - { - if (!DoOneBlock(MetaLocations, MetaLocationResultIndexes, ChunkIndexes, Result)) - { - AbortFlag.store(true); - } - } - catch (const std::exception& Ex) - { - ZEN_WARN("Failed getting metadata for {} chunks. Reason: {}", ChunkIndexes.size(), Ex.what()); - } - }); - return !Work.IsAborted(); + ZEN_WARN("Metadata {} for blob {} is malformed (not a compressed binary format)", + MetadataHashes[ResultIndex], + BlobHashes[ResultIndex]); } - }); - } - catch (const std::exception& Ex) - { - AbortFlag.store(true); - ZEN_WARN("Failed iterating block metadata chunks in {}. Reason: '{}'", m_Config.RootDirectory, Ex.what()); - } + } + } + return true; + }, + OptionalWorkerPool, + 8u * 1024u); - Work.Wait(); - } for (size_t Index = 0; Index < Result.size(); Index++) { if (Result[Index]) @@ -600,9 +526,7 @@ BuildStore::Flush() const auto _ = MakeGuard( [&] { ZEN_INFO("Flushed build store at {} in {}", m_Config.RootDirectory, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); - m_LargeBlobStore.Flush(); - m_SmallBlobStore.Flush(); - m_MetadataBlockStore.Flush(false); + m_BlobStore.Flush(); m_PayloadlogFile.Flush(); m_MetadatalogFile.Flush(); @@ -636,22 +560,14 @@ BuildStore::GetStorageStats() const { const PayloadEntry& Payload = m_PayloadEntries[ReadBlobEntry.Payload]; uint64_t Size = Payload.GetSize(); - if ((Payload.GetFlags() & PayloadEntry::kStandalone) != 0) - { - Result.LargeBlobCount++; - Result.LargeBlobBytes += Size; - } - else - { - Result.SmallBlobCount++; - Result.SmallBlobBytes += Size; - } + Result.BlobCount++; + Result.BlobBytes += Size; } if (ReadBlobEntry.Metadata) { const MetadataEntry& Metadata = m_MetadataEntries[ReadBlobEntry.Metadata]; Result.MetadataCount++; - Result.MetadataByteCount += Metadata.Location.Size; + Result.MetadataByteCount += Metadata.GetSize(); } } } @@ -882,10 +798,9 @@ BuildStore::ReadMetadataLog(const RwLock::ExclusiveLockScope&, const std::filesy CasLog.Replay( [&](const MetadataDiskEntry& Record) { std::string InvalidEntryReason; - if (Record.Entry.Flags & MetadataEntry::kTombStone) + if (Record.Entry.GetFlags() & MetadataEntry::kTombStone) { // Note: this leaves m_BlobLookup and other arrays with 'holes' in them, this will get clean up in compact gc operation - // Note: this leaves m_BlobLookup and other arrays with 'holes' in them, this will get clean up in compact gc operation if (auto ExistingIt = m_BlobLookup.find(Record.BlobHash); ExistingIt != m_BlobLookup.end()) { if (!m_BlobEntries[ExistingIt->second].Payload) @@ -1058,7 +973,7 @@ BuildStore::ValidatePayloadDiskEntry(const PayloadDiskEntry& Entry, std::string& OutReason = fmt::format("Invalid blob hash {}", Entry.BlobHash.ToHexString()); return false; } - if (Entry.Entry.GetFlags() & ~(PayloadEntry::kTombStone | PayloadEntry::kStandalone)) + if (Entry.Entry.GetFlags() & ~(PayloadEntry::kTombStone)) { OutReason = fmt::format("Invalid flags {} for entry {}", Entry.Entry.GetFlags(), Entry.BlobHash.ToHexString()); return false; @@ -1083,30 +998,20 @@ BuildStore::ValidateMetadataDiskEntry(const MetadataDiskEntry& Entry, std::strin OutReason = fmt::format("Invalid blob hash {} for meta entry", Entry.BlobHash.ToHexString()); return false; } - if (Entry.Entry.Location.Size == 0) + if (Entry.Entry.GetSize() == 0) { - OutReason = fmt::format("Invalid meta blob size {} for meta entry", Entry.Entry.Location.Size); + OutReason = fmt::format("Invalid meta blob size {} for meta entry", Entry.Entry.GetSize()); return false; } - if (Entry.Entry.Reserved1 != 0 || Entry.Entry.Reserved2 != 0) - { - OutReason = fmt::format("Invalid reserved fields for meta entry {}", Entry.BlobHash.ToHexString()); - return false; - } - if (Entry.Entry.Flags & MetadataEntry::kTombStone) + if (Entry.Entry.GetFlags() & MetadataEntry::kTombStone) { return true; } - if (Entry.Entry.ContentType == ZenContentType::kCOUNT) + if (Entry.Entry.GetContentType() == ZenContentType::kCOUNT) { OutReason = fmt::format("Invalid content type for meta entry {}", Entry.BlobHash.ToHexString()); return false; } - if (Entry.Reserved1 != 0 || Entry.Reserved2 != 0 || Entry.Reserved3 != 0 || Entry.Reserved4 != 0) - { - OutReason = fmt::format("Invalid reserved fields for meta entry {}", Entry.BlobHash.ToHexString()); - return false; - } return true; } @@ -1114,31 +1019,76 @@ class BuildStoreGcReferenceChecker : public GcReferenceChecker { public: BuildStoreGcReferenceChecker(BuildStore& Store) : m_Store(Store) {} + ~BuildStoreGcReferenceChecker() + { + try + { + m_Store.m_Lock.WithExclusiveLock([&]() { m_Store.m_TrackedBlobKeys.reset(); }); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("~BuildStoreGcReferenceChecker threw exception: '{}'", Ex.what()); + } + } virtual std::string GetGcName(GcCtx& Ctx) override { ZEN_UNUSED(Ctx); return fmt::format("buildstore: '{}'", m_Store.m_Config.RootDirectory.string()); } - virtual void PreCache(GcCtx& Ctx) override { ZEN_UNUSED(Ctx); } - - virtual void UpdateLockedState(GcCtx& Ctx) override + virtual void PreCache(GcCtx& Ctx) override { - ZEN_TRACE_CPU("Builds::UpdateLockedState"); - ZEN_MEMSCOPE(GetBuildstoreTag()); + ZEN_TRACE_CPU("Builds::PreCache"); auto Log = [&Ctx]() { return Ctx.Logger; }; - m_References.reserve(m_Store.m_BlobLookup.size()); - for (const auto& It : m_Store.m_BlobLookup) + Stopwatch Timer; + const auto _ = MakeGuard([&] { + if (!Ctx.Settings.Verbose) + { + return; + } + ZEN_INFO("GCV2: builds [PRECACHE] '{}': found {} references in {}", + m_Store.m_Config.RootDirectory, + m_PrecachedReferences.size(), + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + + m_Store.m_Lock.WithExclusiveLock([&]() { m_Store.m_TrackedBlobKeys = std::make_unique<std::vector<IoHash>>(); }); + { - const BuildStore::BlobIndex ExistingBlobIndex = It.second; - if (m_Store.m_BlobEntries[ExistingBlobIndex].Payload) + m_PrecachedReferences.reserve(m_Store.m_BlobLookup.size()); + RwLock::SharedLockScope __(m_Store.m_Lock); + for (const auto& It : m_Store.m_BlobLookup) { - m_References.push_back(It.first); + const BuildStore::BlobIndex ExistingBlobIndex = It.second; + const BuildStore::BlobEntry& Entry = m_Store.m_BlobEntries[ExistingBlobIndex]; + if (Entry.Payload) + { + m_PrecachedReferences.push_back(It.first); + } + if (Entry.Metadata) + { + const BuildStore::MetadataEntry& MetadataEntry = m_Store.m_MetadataEntries[Entry.Metadata]; + m_PrecachedReferences.push_back(MetadataEntry.MetadataHash); + } } } - FilterReferences(Ctx, fmt::format("buildstore [LOCKSTATE] '{}'", "buildstore"), m_References); + FilterReferences(Ctx, fmt::format("buildstore [PRECACHE] '{}'", m_Store.m_Config.RootDirectory), m_PrecachedReferences); + } + + virtual void UpdateLockedState(GcCtx& Ctx) override + { + ZEN_TRACE_CPU("Builds::UpdateLockedState"); + ZEN_MEMSCOPE(GetBuildstoreTag()); + + auto Log = [&Ctx]() { return Ctx.Logger; }; + + ZEN_ASSERT(m_Store.m_TrackedBlobKeys); + + m_AddedReferences = std::move(*m_Store.m_TrackedBlobKeys); + + FilterReferences(Ctx, fmt::format("buildstore [LOCKSTATE] '{}'", m_Store.m_Config.RootDirectory), m_AddedReferences); } virtual std::span<IoHash> GetUnusedReferences(GcCtx& Ctx, std::span<IoHash> IoCids) override @@ -1165,14 +1115,16 @@ public: NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); - std::span<IoHash> UnusedReferences = KeepUnusedReferences(m_References, IoCids); + std::span<IoHash> UnusedReferences = KeepUnusedReferences(m_PrecachedReferences, IoCids); + UnusedReferences = KeepUnusedReferences(m_AddedReferences, UnusedReferences); UsedCount = IoCids.size() - UnusedReferences.size(); return UnusedReferences; } private: BuildStore& m_Store; - std::vector<IoHash> m_References; + std::vector<IoHash> m_PrecachedReferences; + std::vector<IoHash> m_AddedReferences; }; std::string @@ -1184,201 +1136,6 @@ BuildStore::GetGcName(GcCtx& Ctx) return fmt::format("buildstore: '{}'", m_Config.RootDirectory.string()); } -class BuildStoreGcCompator : public GcStoreCompactor -{ - using BlobEntry = BuildStore::BlobEntry; - using PayloadEntry = BuildStore::PayloadEntry; - using MetadataEntry = BuildStore::MetadataEntry; - using MetadataDiskEntry = BuildStore::MetadataDiskEntry; - using BlobIndex = BuildStore::BlobIndex; - using PayloadIndex = BuildStore::PayloadIndex; - using MetadataIndex = BuildStore::MetadataIndex; - -public: - BuildStoreGcCompator(BuildStore& Store, std::vector<IoHash>&& RemovedBlobs) : m_Store(Store), m_RemovedBlobs(std::move(RemovedBlobs)) {} - - virtual void CompactStore(GcCtx& Ctx, GcCompactStoreStats& Stats, const std::function<uint64_t()>& ClaimDiskReserveCallback) override - { - ZEN_UNUSED(ClaimDiskReserveCallback); - ZEN_TRACE_CPU("Builds::CompactStore"); - ZEN_MEMSCOPE(GetBuildstoreTag()); - - auto Log = [&Ctx]() { return Ctx.Logger; }; - - Stopwatch Timer; - const auto _ = MakeGuard([&] { - if (!Ctx.Settings.Verbose) - { - return; - } - ZEN_INFO("GCV2: buildstore [COMPACT] '{}': RemovedDisk: {} in {}", - m_Store.m_Config.RootDirectory, - NiceBytes(Stats.RemovedDisk), - NiceTimeSpanMs(Timer.GetElapsedTimeMs())); - }); - - const auto __ = MakeGuard([&] { m_Store.Flush(); }); - - if (!m_RemovedBlobs.empty()) - { - if (Ctx.Settings.CollectSmallObjects) - { - m_Store.m_Lock.WithExclusiveLock([this]() { m_Store.m_TrackedCacheKeys = std::make_unique<HashSet>(); }); - auto __ = MakeGuard([this]() { m_Store.m_Lock.WithExclusiveLock([&]() { m_Store.m_TrackedCacheKeys.reset(); }); }); - - BlockStore::BlockUsageMap BlockUsage; - { - RwLock::SharedLockScope __(m_Store.m_Lock); - - for (auto LookupIt : m_Store.m_BlobLookup) - { - const BlobIndex ReadBlobIndex = LookupIt.second; - const BlobEntry& ReadBlobEntry = m_Store.m_BlobEntries[ReadBlobIndex]; - - if (ReadBlobEntry.Metadata) - { - const MetadataEntry& ReadMetadataEntry = m_Store.m_MetadataEntries[ReadBlobEntry.Metadata]; - - uint32_t BlockIndex = ReadMetadataEntry.Location.BlockIndex; - uint64_t ChunkSize = RoundUp(ReadMetadataEntry.Location.Size, m_Store.m_Config.MetadataBlockStoreAlignement); - - if (auto BlockUsageIt = BlockUsage.find(BlockIndex); BlockUsageIt != BlockUsage.end()) - { - BlockStore::BlockUsageInfo& Info = BlockUsageIt.value(); - Info.EntryCount++; - Info.DiskUsage += ChunkSize; - } - else - { - BlockUsage.insert_or_assign(BlockIndex, - BlockStore::BlockUsageInfo{.DiskUsage = ChunkSize, .EntryCount = 1}); - } - } - } - } - - BlockStore::BlockEntryCountMap BlocksToCompact = m_Store.m_MetadataBlockStore.GetBlocksToCompact(BlockUsage, 90); - BlockStoreCompactState BlockCompactState; - std::vector<IoHash> BlockCompactStateKeys; - BlockCompactState.IncludeBlocks(BlocksToCompact); - - if (BlocksToCompact.size() > 0) - { - { - RwLock::SharedLockScope ___(m_Store.m_Lock); - for (const auto& Entry : m_Store.m_BlobLookup) - { - BlobIndex Index = Entry.second; - - if (MetadataIndex Meta = m_Store.m_BlobEntries[Index].Metadata; Meta) - { - if (BlockCompactState.AddKeepLocation(m_Store.m_MetadataEntries[Meta].Location)) - { - BlockCompactStateKeys.push_back(Entry.first); - } - } - } - } - - if (Ctx.Settings.IsDeleteMode) - { - if (Ctx.Settings.Verbose) - { - ZEN_INFO("GCV2: buildstore [COMPACT] '{}': compacting {} blocks", - m_Store.m_Config.RootDirectory, - BlocksToCompact.size()); - } - - m_Store.m_MetadataBlockStore.CompactBlocks( - BlockCompactState, - m_Store.m_Config.MetadataBlockStoreAlignement, - [&](const BlockStore::MovedChunksArray& MovedArray, - const BlockStore::ChunkIndexArray& ScrubbedArray, - uint64_t FreedDiskSpace) { - std::vector<MetadataDiskEntry> MovedEntries; - MovedEntries.reserve(MovedArray.size()); - RwLock::ExclusiveLockScope _(m_Store.m_Lock); - for (const std::pair<size_t, BlockStoreLocation>& Moved : MovedArray) - { - size_t ChunkIndex = Moved.first; - const IoHash& Key = BlockCompactStateKeys[ChunkIndex]; - - ZEN_ASSERT(m_Store.m_TrackedCacheKeys); - if (m_Store.m_TrackedCacheKeys->contains(Key)) - { - continue; - } - - if (auto It = m_Store.m_BlobLookup.find(Key); It != m_Store.m_BlobLookup.end()) - { - const BlobIndex Index = It->second; - - if (MetadataIndex Meta = m_Store.m_BlobEntries[Index].Metadata; Meta) - { - m_Store.m_MetadataEntries[Meta].Location = Moved.second; - MovedEntries.push_back( - MetadataDiskEntry{.Entry = m_Store.m_MetadataEntries[Meta], .BlobHash = Key}); - } - } - } - - for (size_t Scrubbed : ScrubbedArray) - { - const IoHash& Key = BlockCompactStateKeys[Scrubbed]; - if (auto It = m_Store.m_BlobLookup.find(Key); It != m_Store.m_BlobLookup.end()) - { - const BlobIndex Index = It->second; - - if (MetadataIndex Meta = m_Store.m_BlobEntries[Index].Metadata; Meta) - { - MovedEntries.push_back( - MetadataDiskEntry{.Entry = m_Store.m_MetadataEntries[Meta], .BlobHash = Key}); - MovedEntries.back().Entry.Flags |= MetadataEntry::kTombStone; - m_Store.m_MetadataEntries[Meta] = {}; - m_Store.m_BlobEntries[Index].Metadata = {}; - } - } - } - - m_Store.m_MetadatalogFile.Append(MovedEntries); - - Stats.RemovedDisk += FreedDiskSpace; - if (Ctx.IsCancelledFlag.load()) - { - return false; - } - return true; - }, - ClaimDiskReserveCallback, - fmt::format("GCV2: buildstore [COMPACT] '{}': ", m_Store.m_Config.RootDirectory)); - } - else - { - if (Ctx.Settings.Verbose) - { - ZEN_INFO("GCV2: buildstore [COMPACT] '{}': skipped compacting of {} eligible blocks", - m_Store.m_Config.RootDirectory, - BlocksToCompact.size()); - } - } - } - } - } - } - - virtual std::string GetGcName(GcCtx& Ctx) override - { - ZEN_UNUSED(Ctx); - ZEN_MEMSCOPE(GetBuildstoreTag()); - - return fmt::format("buildstore: '{}'", m_Store.m_Config.RootDirectory.string()); - } - -private: - BuildStore& m_Store; - const std::vector<IoHash> m_RemovedBlobs; -}; - GcStoreCompactor* BuildStore::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) { @@ -1413,10 +1170,9 @@ BuildStore::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) uint64_t BlobSize = 0; }; - bool DiskSizeExceeded = false; - const uint64_t CurrentDiskSize = - m_LargeBlobStore.StorageSize().DiskSize + m_SmallBlobStore.StorageSize().DiskSize + m_MetadataBlockStore.TotalSize(); - if (CurrentDiskSize > m_Config.MaxDiskSpaceLimit) + bool DiskSizeExceeded = false; + const uint64_t CurrentBlobsDiskSize = m_BlobStore.TotalSize().TotalSize; + if ((m_Config.MaxDiskSpaceLimit > 0) && (CurrentBlobsDiskSize > m_Config.MaxDiskSpaceLimit)) { DiskSizeExceeded = true; } @@ -1444,14 +1200,14 @@ BuildStore::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) if (ReadBlobEntry.Metadata) { const MetadataEntry& Metadata = m_MetadataEntries[ReadBlobEntry.Metadata]; - Size += Metadata.Location.Size; + Size += Metadata.GetSize(); } const GcClock::Tick AccessTick = ReadBlobEntry.LastAccessTime; if (AccessTick < ExpireTicks) { ExpiredBlobs.push_back(It.first); - ExpiredDataSize += ExpiredDataSize; + ExpiredDataSize += Size; } else if (DiskSizeExceeded) { @@ -1469,7 +1225,7 @@ BuildStore::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) const uint64_t NewSizeLimit = m_Config.MaxDiskSpaceLimit - (m_Config.MaxDiskSpaceLimit >> 4); // Remove a bit more than just below the limit so we have some space to grow - if ((CurrentDiskSize - ExpiredDataSize) > NewSizeLimit) + if ((CurrentBlobsDiskSize - ExpiredDataSize) > NewSizeLimit) { std::vector<size_t> NonExpiredOrder; NonExpiredOrder.resize(NonExpiredBlobSizeInfos.size()); @@ -1487,7 +1243,7 @@ BuildStore::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) while (It != NonExpiredOrder.end()) { const SizeInfo& Info = NonExpiredBlobSizeInfos[*It]; - if ((CurrentDiskSize - ExpiredDataSize) < NewSizeLimit) + if ((CurrentBlobsDiskSize - ExpiredDataSize) < NewSizeLimit) { break; } @@ -1539,7 +1295,7 @@ BuildStore::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) { RemoveMetadatas.push_back( MetadataDiskEntry{.Entry = m_MetadataEntries[ReadBlobEntry.Metadata], .BlobHash = ExpiredBlob}); - RemoveMetadatas.back().Entry.Flags |= MetadataEntry::kTombStone; + RemoveMetadatas.back().Entry.AddFlag(MetadataEntry::kTombStone); m_MetadataEntries[ReadBlobEntry.Metadata] = {}; m_BlobEntries[ReadBlobIndex].Metadata = {}; } @@ -1568,7 +1324,7 @@ BuildStore::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) CompactState(); } - return new BuildStoreGcCompator(*this, std::move(RemovedBlobs)); + return nullptr; } std::vector<GcReferenceChecker*> @@ -1595,21 +1351,6 @@ BuildStore::LockState(GcCtx& Ctx) return Locks; } -void -BuildStore::ScrubStorage(ScrubContext& ScrubCtx) -{ - ZEN_UNUSED(ScrubCtx); - // TODO -} - -GcStorageSize -BuildStore::StorageSize() const -{ - GcStorageSize Result; - Result.DiskSize = m_MetadataBlockStore.TotalSize(); - return Result; -} - /* ___________ __ \__ ___/___ _______/ |_ ______ @@ -1630,8 +1371,10 @@ TEST_CASE("BuildStore.Blobs") std::vector<IoHash> CompressedBlobsHashes; { - GcManager Gc; - BuildStore Store(Config, Gc); + GcManager Gc; + CidStore BlobStore(Gc); + BlobStore.Initialize({.RootDirectory = _.Path() / "build_cas"}); + BuildStore Store(Config, Gc, BlobStore); for (size_t I = 0; I < 5; I++) { @@ -1658,10 +1401,13 @@ TEST_CASE("BuildStore.Blobs") IoBuffer Decompressed = CompressedBlob.Decompress().AsIoBuffer(); CHECK(IoHash::HashBuffer(Decompressed) == RawHash); } + BlobStore.Flush(); } { - GcManager Gc; - BuildStore Store(Config, Gc); + GcManager Gc; + CidStore BlobStore(Gc); + BlobStore.Initialize({.RootDirectory = _.Path() / "build_cas"}); + BuildStore Store(Config, Gc, BlobStore); for (const IoHash& RawHash : CompressedBlobsHashes) { IoBuffer Payload = Store.GetBlob(RawHash); @@ -1689,8 +1435,10 @@ TEST_CASE("BuildStore.Blobs") } } { - GcManager Gc; - BuildStore Store(Config, Gc); + GcManager Gc; + CidStore BlobStore(Gc); + BlobStore.Initialize({.RootDirectory = _.Path() / "build_cas"}); + BuildStore Store(Config, Gc, BlobStore); for (const IoHash& RawHash : CompressedBlobsHashes) { IoBuffer Payload = Store.GetBlob(RawHash); @@ -1709,7 +1457,7 @@ TEST_CASE("BuildStore.Blobs") } namespace blockstore::testing { - IoBuffer MakeMetaData(const IoHash& BlobHash, const std::vector<std::pair<std::string, std::string>>& KeyValues) + IoBuffer MakeMetadata(const IoHash& BlobHash, const std::vector<std::pair<std::string, std::string>>& KeyValues) { CbObjectWriter Writer; Writer.AddHash("rawHash"sv, BlobHash); @@ -1740,16 +1488,18 @@ TEST_CASE("BuildStore.Metadata") std::vector<IoHash> BlobHashes; std::vector<IoBuffer> MetaPayloads; { - GcManager Gc; - BuildStore Store(Config, Gc); + GcManager Gc; + CidStore BlobStore(Gc); + BlobStore.Initialize({.RootDirectory = _.Path() / "build_cas"}); + BuildStore Store(Config, Gc, BlobStore); for (size_t I = 0; I < 5; I++) { BlobHashes.push_back(IoHash::HashBuffer(&I, sizeof(I))); - MetaPayloads.push_back(MakeMetaData(BlobHashes.back(), {{"index", fmt::format("{}", I)}})); + MetaPayloads.push_back(MakeMetadata(BlobHashes.back(), {{"index", fmt::format("{}", I)}})); MetaPayloads.back().SetContentType(ZenContentType::kCbObject); } - Store.PutMetadatas(BlobHashes, MetaPayloads); + Store.PutMetadatas(BlobHashes, MetaPayloads, &WorkerPool); std::vector<IoBuffer> ValidateMetaPayloads = Store.GetMetadatas(BlobHashes, &WorkerPool); CHECK(ValidateMetaPayloads.size() == MetaPayloads.size()); @@ -1760,8 +1510,10 @@ TEST_CASE("BuildStore.Metadata") } } { - GcManager Gc; - BuildStore Store(Config, Gc); + GcManager Gc; + CidStore BlobStore(Gc); + BlobStore.Initialize({.RootDirectory = _.Path() / "build_cas"}); + BuildStore Store(Config, Gc, BlobStore); std::vector<IoBuffer> ValidateMetaPayloads = Store.GetMetadatas(BlobHashes, &WorkerPool); CHECK(ValidateMetaPayloads.size() == MetaPayloads.size()); for (size_t I = 0; I < ValidateMetaPayloads.size(); I++) @@ -1776,8 +1528,10 @@ TEST_CASE("BuildStore.Metadata") } std::vector<IoHash> CompressedBlobsHashes; { - GcManager Gc; - BuildStore Store(Config, Gc); + GcManager Gc; + CidStore BlobStore(Gc); + BlobStore.Initialize({.RootDirectory = _.Path() / "build_cas"}); + BuildStore Store(Config, Gc, BlobStore); for (size_t I = 0; I < 5; I++) { IoBuffer Blob = CreateSemiRandomBlob(4711 + I * 7); @@ -1805,14 +1559,16 @@ TEST_CASE("BuildStore.Metadata") std::vector<IoBuffer> BlobMetaPayloads; { - GcManager Gc; - BuildStore Store(Config, Gc); + GcManager Gc; + CidStore BlobStore(Gc); + BlobStore.Initialize({.RootDirectory = _.Path() / "build_cas"}); + BuildStore Store(Config, Gc, BlobStore); for (const IoHash& BlobHash : CompressedBlobsHashes) { - BlobMetaPayloads.push_back(MakeMetaData(BlobHash, {{"blobHash", fmt::format("{}", BlobHash)}})); + BlobMetaPayloads.push_back(MakeMetadata(BlobHash, {{"blobHash", fmt::format("{}", BlobHash)}})); BlobMetaPayloads.back().SetContentType(ZenContentType::kCbObject); } - Store.PutMetadatas(CompressedBlobsHashes, BlobMetaPayloads); + Store.PutMetadatas(CompressedBlobsHashes, BlobMetaPayloads, &WorkerPool); std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, &WorkerPool); CHECK(MetadataPayloads.size() == BlobMetaPayloads.size()); @@ -1824,8 +1580,10 @@ TEST_CASE("BuildStore.Metadata") } { - GcManager Gc; - BuildStore Store(Config, Gc); + GcManager Gc; + CidStore BlobStore(Gc); + BlobStore.Initialize({.RootDirectory = _.Path() / "build_cas"}); + BuildStore Store(Config, Gc, BlobStore); std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, &WorkerPool); CHECK(MetadataPayloads.size() == BlobMetaPayloads.size()); @@ -1847,14 +1605,16 @@ TEST_CASE("BuildStore.Metadata") for (const IoHash& BlobHash : CompressedBlobsHashes) { BlobMetaPayloads.push_back( - MakeMetaData(BlobHash, {{"blobHash", fmt::format("{}", BlobHash)}, {"replaced", fmt::format("{}", true)}})); + MakeMetadata(BlobHash, {{"blobHash", fmt::format("{}", BlobHash)}, {"replaced", fmt::format("{}", true)}})); BlobMetaPayloads.back().SetContentType(ZenContentType::kCbObject); } - Store.PutMetadatas(CompressedBlobsHashes, BlobMetaPayloads); + Store.PutMetadatas(CompressedBlobsHashes, BlobMetaPayloads, &WorkerPool); } { - GcManager Gc; - BuildStore Store(Config, Gc); + GcManager Gc; + CidStore BlobStore(Gc); + BlobStore.Initialize({.RootDirectory = _.Path() / "build_cas"}); + BuildStore Store(Config, Gc, BlobStore); std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, &WorkerPool); CHECK(MetadataPayloads.size() == BlobMetaPayloads.size()); @@ -1886,8 +1646,10 @@ TEST_CASE("BuildStore.GC") std::vector<IoHash> CompressedBlobsHashes; std::vector<IoBuffer> BlobMetaPayloads; { - GcManager Gc; - BuildStore Store(Config, Gc); + GcManager Gc; + CidStore BlobStore(Gc); + BlobStore.Initialize({.RootDirectory = _.Path() / "build_cas"}); + BuildStore Store(Config, Gc, BlobStore); for (size_t I = 0; I < 5; I++) { IoBuffer Blob = CreateSemiRandomBlob(4711 + I * 7); @@ -1900,14 +1662,16 @@ TEST_CASE("BuildStore.GC") } for (const IoHash& BlobHash : CompressedBlobsHashes) { - BlobMetaPayloads.push_back(MakeMetaData(BlobHash, {{"blobHash", fmt::format("{}", BlobHash)}})); + BlobMetaPayloads.push_back(MakeMetadata(BlobHash, {{"blobHash", fmt::format("{}", BlobHash)}})); BlobMetaPayloads.back().SetContentType(ZenContentType::kCbObject); } - Store.PutMetadatas(CompressedBlobsHashes, BlobMetaPayloads); + Store.PutMetadatas(CompressedBlobsHashes, BlobMetaPayloads, nullptr); } { - GcManager Gc; - BuildStore Store(Config, Gc); + GcManager Gc; + CidStore BlobStore(Gc); + BlobStore.Initialize({.RootDirectory = _.Path() / "build_cas"}); + BuildStore Store(Config, Gc, BlobStore); { GcResult Result = Gc.CollectGarbage(GcSettings{.BuildStoreExpireTime = GcClock::Now() - std::chrono::hours(1), @@ -1967,8 +1731,10 @@ TEST_CASE("BuildStore.SizeLimit") std::vector<IoHash> CompressedBlobsHashes; std::vector<IoBuffer> BlobMetaPayloads; { - GcManager Gc; - BuildStore Store(Config, Gc); + GcManager Gc; + CidStore BlobStore(Gc); + BlobStore.Initialize({.RootDirectory = _.Path() / "build_cas"}); + BuildStore Store(Config, Gc, BlobStore); for (size_t I = 0; I < 64; I++) { IoBuffer Blob = CreateSemiRandomBlob(65537 + I * 7); @@ -1981,10 +1747,10 @@ TEST_CASE("BuildStore.SizeLimit") } for (const IoHash& BlobHash : CompressedBlobsHashes) { - BlobMetaPayloads.push_back(MakeMetaData(BlobHash, {{"blobHash", fmt::format("{}", BlobHash)}})); + BlobMetaPayloads.push_back(MakeMetadata(BlobHash, {{"blobHash", fmt::format("{}", BlobHash)}})); BlobMetaPayloads.back().SetContentType(ZenContentType::kCbObject); } - Store.PutMetadatas(CompressedBlobsHashes, BlobMetaPayloads); + Store.PutMetadatas(CompressedBlobsHashes, BlobMetaPayloads, nullptr); { for (size_t I = 0; I < 64; I++) @@ -1997,8 +1763,10 @@ TEST_CASE("BuildStore.SizeLimit") } } { - GcManager Gc; - BuildStore Store(Config, Gc); + GcManager Gc; + CidStore BlobStore(Gc); + BlobStore.Initialize({.RootDirectory = _.Path() / "build_cas"}); + BuildStore Store(Config, Gc, BlobStore); { GcResult Result = Gc.CollectGarbage(GcSettings{.BuildStoreExpireTime = GcClock::Now() - std::chrono::hours(1), @@ -2023,7 +1791,7 @@ TEST_CASE("BuildStore.SizeLimit") CHECK(IoHash::HashBuffer(DecompressedBlob) == BlobHash); } } - CHECK(DeletedBlobs == 50); + CHECK(DeletedBlobs == 53); std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, nullptr); CHECK(MetadataPayloads.size() == BlobMetaPayloads.size()); diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp index 15a1c9650..cacbbd966 100644 --- a/src/zenstore/cache/cachedisklayer.cpp +++ b/src/zenstore/cache/cachedisklayer.cpp @@ -233,6 +233,28 @@ using namespace std::literals; namespace zen::cache::impl { +static bool +UpdateValueWithRawSizeAndHash(ZenCacheValue& Value) +{ + if ((Value.RawSize == 0) && (Value.RawHash == IoHash::Zero)) + { + if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary) + { + return CompressedBuffer::ValidateCompressedHeader(Value.Value, Value.RawHash, Value.RawSize); + } + else + { + Value.RawSize = Value.Value.GetSize(); + Value.RawHash = IoHash::HashBuffer(Value.Value); + return true; + } + } + else + { + return true; + } +} + class BucketManifestSerializer { using MetaDataIndex = ZenCacheDiskLayer::CacheBucket::MetaDataIndex; @@ -348,11 +370,20 @@ BucketManifestSerializer::ParseManifest(RwLock::ExclusiveLockScope& Buck Stopwatch Timer; const auto _ = MakeGuard([&] { ZEN_INFO("parsed store manifest '{}' in {}", ManifestPath, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); - const uint64_t Count = Manifest["Count"sv].AsUInt64(0); + const uint64_t Count = Manifest["Count"sv].AsUInt64(0); + CbArrayView KeyArray = Manifest["Keys"sv].AsArrayView(); + if (KeyArray.Num() != Count) + { + ZEN_WARN("Mismatch in size between 'Keys' ({}) array size and 'Count' ({}) in {}, skipping metadata", + KeyArray.Num(), + Count, + ManifestPath); + return; + } + std::vector<PayloadIndex> KeysIndexes; KeysIndexes.reserve(Count); - CbArrayView KeyArray = Manifest["Keys"sv].AsArrayView(); for (CbFieldView& KeyView : KeyArray) { if (auto It = Index.find(KeyView.AsHash()); It != Index.end()) @@ -367,19 +398,43 @@ BucketManifestSerializer::ParseManifest(RwLock::ExclusiveLockScope& Buck size_t KeyIndexOffset = 0; CbArrayView TimeStampArray = Manifest["Timestamps"].AsArrayView(); - for (CbFieldView& TimeStampView : TimeStampArray) + if (KeysIndexes.size() != TimeStampArray.Num()) { - const PayloadIndex KeyIndex = KeysIndexes[KeyIndexOffset++]; - if (KeyIndex) + ZEN_WARN("Mismatch in size between 'Keys' ({}) and 'Timestamps' ({}) arrays in {}, skipping timestamps", + KeysIndexes.size(), + TimeStampArray.Num(), + ManifestPath); + } + else + { + for (CbFieldView& TimeStampView : TimeStampArray) { - AccessTimes[KeyIndex] = TimeStampView.AsInt64(); + const PayloadIndex KeyIndex = KeysIndexes[KeyIndexOffset++]; + if (KeyIndex) + { + AccessTimes[KeyIndex] = TimeStampView.AsInt64(); + } } } KeyIndexOffset = 0; CbArrayView RawHashArray = Manifest["RawHash"].AsArrayView(); CbArrayView RawSizeArray = Manifest["RawSize"].AsArrayView(); - if (RawHashArray.Num() == RawSizeArray.Num()) + if (RawHashArray.Num() != KeysIndexes.size()) + { + ZEN_WARN("Mismatch in size between 'Keys' ({}) and 'RawHash' ({}) arrays in {}, skipping meta data", + KeysIndexes.size(), + RawHashArray.Num(), + ManifestPath); + } + else if (RawSizeArray.Num() != KeysIndexes.size()) + { + ZEN_WARN("Mismatch in size between 'Keys' ({}) and 'RawSize' ({}) arrays in {}, skipping meta data", + KeysIndexes.size(), + RawSizeArray.Num(), + ManifestPath); + } + else { auto RawHashIt = RawHashArray.CreateViewIterator(); auto RawSizeIt = RawSizeArray.CreateViewIterator(); @@ -404,10 +459,6 @@ BucketManifestSerializer::ParseManifest(RwLock::ExclusiveLockScope& Buck RawSizeIt++; } } - else - { - ZEN_WARN("Mismatch in size between 'RawHash' and 'RawSize' arrays in {}, skipping meta data", ManifestPath); - } } Oid @@ -747,6 +798,7 @@ ZenCacheDiskLayer::CacheBucket::CacheBucket(GcManager& Gc, m_Configuration.LargeObjectThreshold = Max(m_Configuration.LargeObjectThreshold, IoStoreDDCOverrideSize); } m_Gc.AddGcReferencer(*this); + m_Gc.AddGcStorage(this); } ZenCacheDiskLayer::CacheBucket::~CacheBucket() @@ -761,6 +813,7 @@ ZenCacheDiskLayer::CacheBucket::~CacheBucket() { ZEN_ERROR("~CacheBucket() failed with: ", Ex.what()); } + m_Gc.RemoveGcStorage(this); m_Gc.RemoveGcReferencer(*this); } @@ -1286,20 +1339,21 @@ ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(const DiskLocation& Loc, struct ZenCacheDiskLayer::CacheBucket::PutBatchHandle { - PutBatchHandle(std::vector<bool>& OutResults) : OutResults(OutResults) {} + PutBatchHandle(std::vector<ZenCacheDiskLayer::PutResult>& OutResults) : OutResults(OutResults) {} struct Entry { std::vector<IoHash> HashKeyAndReferences; + bool Overwrite; }; - std::vector<IoBuffer> Buffers; - std::vector<Entry> Entries; - std::vector<size_t> EntryResultIndexes; + std::vector<ZenCacheValue> Buffers; + std::vector<Entry> Entries; + std::vector<size_t> EntryResultIndexes; - std::vector<bool>& OutResults; + std::vector<ZenCacheDiskLayer::PutResult>& OutResults; }; ZenCacheDiskLayer::CacheBucket::PutBatchHandle* -ZenCacheDiskLayer::CacheBucket::BeginPutBatch(std::vector<bool>& OutResults) +ZenCacheDiskLayer::CacheBucket::BeginPutBatch(std::vector<ZenCacheDiskLayer::PutResult>& OutResults) { ZEN_TRACE_CPU("Z$::Bucket::BeginPutBatch"); return new PutBatchHandle(OutResults); @@ -1315,23 +1369,40 @@ ZenCacheDiskLayer::CacheBucket::EndPutBatch(PutBatchHandle* Batch) noexcept ZEN_ASSERT(Batch); if (!Batch->Buffers.empty()) { - std::vector<uint8_t> EntryFlags; - for (const IoBuffer& Buffer : Batch->Buffers) + ZEN_ASSERT(Batch->Buffers.size() == Batch->Entries.size()); + std::vector<uint8_t> EntryFlags; + std::vector<size_t> BufferToEntryIndexes; + std::vector<IoBuffer> BuffersToCommit; + BuffersToCommit.reserve(Batch->Buffers.size()); + for (size_t Index = 0; Index < Batch->Entries.size(); Index++) { - uint8_t Flags = 0; - if (Buffer.GetContentType() == ZenContentType::kCbObject) + const std::vector<IoHash>& HashKeyAndReferences = Batch->Entries[Index].HashKeyAndReferences; + ZEN_ASSERT(HashKeyAndReferences.size() >= 1); + + ZenCacheValue& Value = Batch->Buffers[Index]; + std::span<const IoHash> ReferenceSpan(HashKeyAndReferences.begin() + 1, HashKeyAndReferences.end()); + PutResult& OutResult = Batch->OutResults[Batch->EntryResultIndexes[Index]]; + OutResult = PutResult{zen::PutStatus::Success}; + if (!ShouldRejectPut(HashKeyAndReferences[0], Value, Batch->Entries[Index].Overwrite, OutResult)) { - Flags |= DiskLocation::kStructured; - } - else if (Buffer.GetContentType() == ZenContentType::kCompressedBinary) - { - Flags |= DiskLocation::kCompressed; + BufferToEntryIndexes.push_back(Index); + BuffersToCommit.push_back(Value.Value); + + uint8_t Flags = 0; + if (Value.Value.GetContentType() == ZenContentType::kCbObject) + { + Flags |= DiskLocation::kStructured; + } + else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary) + { + Flags |= DiskLocation::kCompressed; + } + EntryFlags.push_back(Flags); } - EntryFlags.push_back(Flags); } size_t IndexOffset = 0; - m_BlockStore.WriteChunks(Batch->Buffers, m_Configuration.PayloadAlignment, [&](std::span<BlockStoreLocation> Locations) { + m_BlockStore.WriteChunks(BuffersToCommit, m_Configuration.PayloadAlignment, [&](std::span<BlockStoreLocation> Locations) { ZEN_MEMSCOPE(GetCacheDiskTag()); std::vector<DiskIndexEntry> DiskEntries; { @@ -1339,8 +1410,9 @@ ZenCacheDiskLayer::CacheBucket::EndPutBatch(PutBatchHandle* Batch) noexcept for (size_t Index = 0; Index < Locations.size(); Index++) { DiskLocation Location(Locations[Index], m_Configuration.PayloadAlignment, EntryFlags[IndexOffset + Index]); - const std::vector<IoHash>& HashKeyAndReferences = Batch->Entries[IndexOffset + Index].HashKeyAndReferences; - ZEN_ASSERT(HashKeyAndReferences.size() > 1); + const std::vector<IoHash>& HashKeyAndReferences = + Batch->Entries[BufferToEntryIndexes[IndexOffset + Index]].HashKeyAndReferences; + ZEN_ASSERT(HashKeyAndReferences.size() >= 1); const IoHash HashKey = HashKeyAndReferences[0]; DiskEntries.push_back({.Key = HashKey, .Location = Location}); if (m_TrackedCacheKeys) @@ -1375,12 +1447,6 @@ ZenCacheDiskLayer::CacheBucket::EndPutBatch(PutBatchHandle* Batch) noexcept } } m_SlogFile.Append(DiskEntries); - for (size_t Index = 0; Index < Locations.size(); Index++) - { - size_t ResultIndex = Batch->EntryResultIndexes[IndexOffset + Index]; - ZEN_ASSERT(ResultIndex < Batch->OutResults.size()); - Batch->OutResults[ResultIndex] = true; - } IndexOffset += Locations.size(); }); } @@ -1876,30 +1942,128 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal } } -void +bool +ZenCacheDiskLayer::CacheBucket::ShouldRejectPut(const IoHash& HashKey, + ZenCacheValue& InOutValue, + bool Overwrite, + ZenCacheDiskLayer::PutResult& OutPutResult) +{ + const bool CheckExisting = m_Configuration.LimitOverwrites && !Overwrite; + if (CheckExisting) + { + RwLock::SharedLockScope IndexLock(m_IndexLock); + auto It = m_Index.find(HashKey); + if (It != m_Index.end()) + { + const PayloadIndex EntryIndex = It.value(); + m_AccessTimes[EntryIndex] = GcClock::TickCount(); + const DiskLocation Location = m_Payloads[EntryIndex].Location; + + const BucketPayload* Payload = &m_Payloads[EntryIndex]; + if (Payload->MetaData) + { + const BucketMetaData MetaData = m_MetaDatas[Payload->MetaData]; + if (MetaData) + { + IndexLock.ReleaseNow(); + if (!cache::impl::UpdateValueWithRawSizeAndHash(InOutValue)) + { + OutPutResult = PutResult{zen::PutStatus::Fail, "Value provided is of bad format"}; + return true; + } + else if (MetaData.RawSize != InOutValue.RawSize || MetaData.RawHash != InOutValue.RawHash) + { + OutPutResult = PutResult{ + zen::PutStatus::Conflict, + fmt::format("Value exists with different size '{}' or hash '{}'", MetaData.RawSize, MetaData.RawHash)}; + return true; + } + return false; + } + } + + ZenCacheValue ExistingValue; + if (Payload->MemCached) + { + ExistingValue.Value = m_MemCachedPayloads[Payload->MemCached].Payload; + IndexLock.ReleaseNow(); + } + else + { + IndexLock.ReleaseNow(); + + if (Location.IsFlagSet(DiskLocation::kStandaloneFile)) + { + ExistingValue.Value = GetStandaloneCacheValue(Location, HashKey); + } + else + { + ExistingValue.Value = GetInlineCacheValue(Location); + } + } + + if (ExistingValue.Value) + { + if (cache::impl::UpdateValueWithRawSizeAndHash(ExistingValue)) + { + if (!cache::impl::UpdateValueWithRawSizeAndHash(InOutValue)) + { + OutPutResult = PutResult{zen::PutStatus::Fail, "Value provided is of bad format"}; + return true; + } + + if (ExistingValue.RawSize != InOutValue.RawSize || ExistingValue.RawHash != InOutValue.RawHash) + { + OutPutResult = PutResult{zen::PutStatus::Conflict, + fmt::format("Value exists with different size '{}' or hash '{}'", + ExistingValue.RawSize, + ExistingValue.RawHash)}; + return true; + } + } + } + } + } + return false; +} + +ZenCacheDiskLayer::PutResult ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References, + bool Overwrite, PutBatchHandle* OptionalBatchHandle) { ZEN_TRACE_CPU("Z$::Bucket::Put"); metrics::RequestStats::Scope $(m_PutOps, Value.Value.Size()); + PutResult Result{zen::PutStatus::Success}; + if (Value.Value.Size() >= m_Configuration.LargeObjectThreshold) { - PutStandaloneCacheValue(HashKey, Value, References); + ZenCacheValue AcceptedValue = Value; + if (ShouldRejectPut(HashKey, AcceptedValue, Overwrite, Result)) + { + if (OptionalBatchHandle) + { + OptionalBatchHandle->OutResults.push_back(Result); + } + return Result; + } + PutStandaloneCacheValue(HashKey, AcceptedValue, References); if (OptionalBatchHandle) { - OptionalBatchHandle->OutResults.push_back(true); + OptionalBatchHandle->OutResults.push_back({zen::PutStatus::Success}); } } else { - PutInlineCacheValue(HashKey, Value, References, OptionalBatchHandle); + Result = PutInlineCacheValue(HashKey, Value, References, Overwrite, OptionalBatchHandle); } m_DiskWriteCount++; + return Result; } uint64_t @@ -2425,7 +2589,7 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx) ZenCacheDiskLayer::BucketStats ZenCacheDiskLayer::CacheBucket::Stats() { - GcStorageSize Size = StorageSize(); + CacheStoreSize Size = TotalSize(); return ZenCacheDiskLayer::BucketStats{.DiskSize = Size.DiskSize, .MemorySize = Size.MemorySize, .DiskHitCount = m_DiskHitCount, @@ -2748,38 +2912,49 @@ ZenCacheDiskLayer::CacheBucket::GetMetaData(RwLock::SharedLockScope&, const Buck return {}; } -void +ZenCacheDiskLayer::PutResult ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References, + bool Overwrite, PutBatchHandle* OptionalBatchHandle) { ZEN_TRACE_CPU("Z$::Bucket::PutInlineCacheValue"); + PutResult Result{zen::PutStatus::Success}; if (OptionalBatchHandle != nullptr) { - OptionalBatchHandle->Buffers.push_back(Value.Value); + OptionalBatchHandle->Buffers.push_back(Value); OptionalBatchHandle->Entries.push_back({}); OptionalBatchHandle->EntryResultIndexes.push_back(OptionalBatchHandle->OutResults.size()); - OptionalBatchHandle->OutResults.push_back(false); + OptionalBatchHandle->OutResults.push_back(PutResult{zen::PutStatus::Fail}); + PutBatchHandle::Entry& CurrentEntry = OptionalBatchHandle->Entries.back(); + CurrentEntry.Overwrite = Overwrite; std::vector<IoHash>& HashKeyAndReferences = OptionalBatchHandle->Entries.back().HashKeyAndReferences; - HashKeyAndReferences.reserve(1 + HashKeyAndReferences.size()); + HashKeyAndReferences.reserve(1 + References.size()); HashKeyAndReferences.push_back(HashKey); - HashKeyAndReferences.insert(HashKeyAndReferences.end(), HashKeyAndReferences.begin(), HashKeyAndReferences.end()); - return; + HashKeyAndReferences.insert(HashKeyAndReferences.end(), References.begin(), References.end()); + return Result; } + + ZenCacheValue AcceptedValue = Value; + if (ShouldRejectPut(HashKey, AcceptedValue, Overwrite, Result)) + { + return Result; + } + uint8_t EntryFlags = 0; - if (Value.Value.GetContentType() == ZenContentType::kCbObject) + if (AcceptedValue.Value.GetContentType() == ZenContentType::kCbObject) { EntryFlags |= DiskLocation::kStructured; } - else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary) + else if (AcceptedValue.Value.GetContentType() == ZenContentType::kCompressedBinary) { EntryFlags |= DiskLocation::kCompressed; } - m_BlockStore.WriteChunk(Value.Value.Data(), - Value.Value.Size(), + m_BlockStore.WriteChunk(AcceptedValue.Value.Data(), + AcceptedValue.Value.Size(), m_Configuration.PayloadAlignment, [&](const BlockStoreLocation& BlockStoreLocation) { ZEN_MEMSCOPE(GetCacheDiskTag()); @@ -2816,6 +2991,7 @@ ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, } m_SlogFile.Append({.Key = HashKey, .Location = Location}); }); + return Result; } std::string @@ -3752,7 +3928,7 @@ ZenCacheDiskLayer::GetOrCreateBucket(std::string_view InBucket) struct ZenCacheDiskLayer::PutBatchHandle { - PutBatchHandle(std::vector<bool>& OutResults) : OutResults(OutResults) {} + PutBatchHandle(std::vector<ZenCacheDiskLayer::PutResult>& OutResults) : OutResults(OutResults) {} struct BucketHandle { CacheBucket* Bucket; @@ -3811,13 +3987,13 @@ struct ZenCacheDiskLayer::PutBatchHandle return NewBucketHandle; } - RwLock Lock; - std::vector<BucketHandle> BucketHandles; - std::vector<bool>& OutResults; + RwLock Lock; + std::vector<BucketHandle> BucketHandles; + std::vector<ZenCacheDiskLayer::PutResult>& OutResults; }; ZenCacheDiskLayer::PutBatchHandle* -ZenCacheDiskLayer::BeginPutBatch(std::vector<bool>& OutResults) +ZenCacheDiskLayer::BeginPutBatch(std::vector<PutResult>& OutResults) { return new PutBatchHandle(OutResults); } @@ -3954,21 +4130,23 @@ ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, GetBatc } } -void +ZenCacheDiskLayer::PutResult ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References, + bool Overwrite, PutBatchHandle* OptionalBatchHandle) { ZEN_TRACE_CPU("Z$::Put"); - + PutResult RetVal = {zen::PutStatus::Fail}; if (CacheBucket* Bucket = GetOrCreateBucket(InBucket); Bucket != nullptr) { CacheBucket::PutBatchHandle* BucketBatchHandle = OptionalBatchHandle == nullptr ? nullptr : OptionalBatchHandle->GetHandle(Bucket); - Bucket->Put(HashKey, Value, References, BucketBatchHandle); + RetVal = Bucket->Put(HashKey, Value, References, Overwrite, BucketBatchHandle); TryMemCacheTrim(); } + return RetVal; } void @@ -4241,8 +4419,9 @@ ZenCacheDiskLayer::Flush() } } +#if ZEN_WITH_TESTS void -ZenCacheDiskLayer::ScrubStorage(ScrubContext& Ctx) +ZenCacheDiskLayer::Scrub(ScrubContext& Ctx) { ZEN_TRACE_CPU("Z$::ScrubStorage"); @@ -4253,13 +4432,13 @@ ZenCacheDiskLayer::ScrubStorage(ScrubContext& Ctx) for (auto& Kv : m_Buckets) { -#if 1 +# if 1 Results.push_back(Ctx.ThreadPool().EnqueueTask( std::packaged_task<void()>{[this, Bucket = Kv.second.get(), &Ctx] { Bucket->ScrubStorage(Ctx); }})); -#else +# else CacheBucket& Bucket = *Kv.second; Bucket.ScrubStorage(Ctx); -#endif +# endif } for (auto& Result : Results) @@ -4275,16 +4454,17 @@ ZenCacheDiskLayer::ScrubStorage(ScrubContext& Ctx) } } } +#endif // ZEN_WITH_TESTS -GcStorageSize -ZenCacheDiskLayer::StorageSize() const +CacheStoreSize +ZenCacheDiskLayer::TotalSize() const { - GcStorageSize StorageSize{}; + CacheStoreSize StorageSize{}; RwLock::SharedLockScope _(m_Lock); for (auto& Kv : m_Buckets) { - GcStorageSize BucketSize = Kv.second->StorageSize(); + CacheStoreSize BucketSize = Kv.second->TotalSize(); StorageSize.DiskSize += BucketSize.DiskSize; StorageSize.MemorySize += BucketSize.MemorySize; } @@ -4295,7 +4475,7 @@ ZenCacheDiskLayer::StorageSize() const ZenCacheDiskLayer::DiskStats ZenCacheDiskLayer::Stats() const { - GcStorageSize Size = StorageSize(); + CacheStoreSize Size = TotalSize(); ZenCacheDiskLayer::DiskStats Stats = {.DiskSize = Size.DiskSize, .MemorySize = Size.MemorySize}; { RwLock::SharedLockScope _(m_Lock); @@ -4319,7 +4499,7 @@ ZenCacheDiskLayer::GetInfo() const { Info.BucketNames.push_back(Kv.first); Info.EntryCount += Kv.second->EntryCount(); - GcStorageSize BucketSize = Kv.second->StorageSize(); + CacheStoreSize BucketSize = Kv.second->TotalSize(); Info.StorageSize.DiskSize += BucketSize.DiskSize; Info.StorageSize.MemorySize += BucketSize.MemorySize; } @@ -4334,7 +4514,7 @@ ZenCacheDiskLayer::GetBucketInfo(std::string_view Bucket) const if (auto It = m_Buckets.find(std::string(Bucket)); It != m_Buckets.end()) { - return ZenCacheDiskLayer::BucketInfo{.EntryCount = It->second->EntryCount(), .StorageSize = It->second->StorageSize()}; + return ZenCacheDiskLayer::BucketInfo{.EntryCount = It->second->EntryCount(), .StorageSize = It->second->TotalSize()}; } return {}; } diff --git a/src/zenstore/cache/cacherpc.cpp b/src/zenstore/cache/cacherpc.cpp index de4b0a37c..ff21d1ede 100644 --- a/src/zenstore/cache/cacherpc.cpp +++ b/src/zenstore/cache/cacherpc.cpp @@ -153,13 +153,13 @@ CacheRpcHandler::CacheRpcHandler(LoggerRef InLog, CacheStats& InCacheStats, UpstreamCacheClient& InUpstreamCache, ZenCacheStore& InCacheStore, - CidStore& InCidStore, + GetCidStoreFunc&& InGetCidStore, const DiskWriteBlocker* InDiskWriteBlocker) : m_Log(InLog) , m_CacheStats(InCacheStats) , m_UpstreamCache(InUpstreamCache) , m_CacheStore(InCacheStore) -, m_CidStore(InCidStore) +, m_GetCidStore(std::move(InGetCidStore)) , m_DiskWriteBlocker(InDiskWriteBlocker) { } @@ -174,6 +174,12 @@ CacheRpcHandler::AreDiskWritesAllowed() const return (m_DiskWriteBlocker == nullptr || m_DiskWriteBlocker->AreDiskWritesAllowed()); } +CidStore& +CacheRpcHandler::GetCidStore(std::string_view Namespace) +{ + return m_GetCidStore(Namespace); +} + CacheRpcHandler::RpcResponseCode CacheRpcHandler::HandleRpcRequest(const CacheRequestContext& Context, std::string_view UriNamespace, @@ -334,13 +340,13 @@ CacheRpcHandler::HandleRpcPutCacheRecords(const CacheRequestContext& Context, co .Policy = std::move(Policy), .Context = Context}; - PutResult Result = PutCacheRecord(PutRequest, &BatchRequest); + PutStatus Result = PutCacheRecord(PutRequest, &BatchRequest); - if (Result == PutResult::Invalid) + if (Result == PutStatus::Invalid) { return CbPackage{}; } - Results.push_back(Result == PutResult::Success); + Results.push_back(Result == PutStatus::Success); } if (Results.empty()) { @@ -360,7 +366,7 @@ CacheRpcHandler::HandleRpcPutCacheRecords(const CacheRequestContext& Context, co return RpcResponse; } -PutResult +PutStatus CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Package) { CbObjectView Record = Request.RecordObject; @@ -381,9 +387,12 @@ CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Packag Stopwatch Timer; + CidStore& ChunkStore = m_GetCidStore(Request.Namespace); + Request.RecordObject.IterateAttachments([this, &Request, Package, + &ChunkStore, &WriteAttachmentBuffers, &WriteRawHashes, &ValidAttachments, @@ -412,7 +421,7 @@ CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Packag Count.Invalid++; } } - else if (m_CidStore.ContainsChunk(ValueHash)) + else if (ChunkStore.ContainsChunk(ValueHash)) { ValidAttachments.emplace_back(ValueHash); Count.Valid++; @@ -422,19 +431,33 @@ CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Packag if (Count.Invalid > 0) { - return PutResult::Invalid; + return PutStatus::Invalid; } ZenCacheValue CacheValue; CacheValue.Value = IoBuffer(Record.GetSize()); Record.CopyTo(MutableMemoryView(CacheValue.Value.MutableData(), CacheValue.Value.GetSize())); CacheValue.Value.SetContentType(ZenContentType::kCbObject); - m_CacheStore.Put(Request.Context, Request.Namespace, Request.Key.Bucket, Request.Key.Hash, CacheValue, ReferencedAttachments, nullptr); + bool Overwrite = EnumHasAllFlags(Request.Policy.GetRecordPolicy(), CachePolicy::StoreLocal) && + !EnumHasAllFlags(Request.Policy.GetRecordPolicy(), CachePolicy::QueryLocal); + // TODO: Propagation for rejected PUTs + ZenCacheStore::PutResult PutResult = m_CacheStore.Put(Request.Context, + Request.Namespace, + Request.Key.Bucket, + Request.Key.Hash, + CacheValue, + ReferencedAttachments, + Overwrite, + nullptr); + if (PutResult.Status != zen::PutStatus::Success) + { + return PutResult.Status; + } m_CacheStats.WriteCount++; if (!WriteAttachmentBuffers.empty()) { - std::vector<CidStore::InsertResult> InsertResults = m_CidStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes); + std::vector<CidStore::InsertResult> InsertResults = ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes); for (size_t Index = 0; Index < InsertResults.size(); Index++) { if (InsertResults[Index].New) @@ -461,12 +484,14 @@ CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Packag if (HasUpstream && EnumHasAllFlags(Request.Policy.GetRecordPolicy(), CachePolicy::StoreRemote) && !IsPartialRecord) { - m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbPackage, - .Namespace = Request.Namespace, - .Key = Request.Key, - .ValueContentIds = std::move(ValidAttachments)}); + m_UpstreamCache.EnqueueUpstream( + {.Type = ZenContentType::kCbPackage, + .Namespace = Request.Namespace, + .Key = Request.Key, + .ValueContentIds = std::move(ValidAttachments)}, + [ChunkStore = &ChunkStore](const IoHash& ValueHash) { return ChunkStore->FindChunkByCid(ValueHash); }); } - return PutResult::Success; + return PutStatus::Success; } CbPackage @@ -507,6 +532,8 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb return CbPackage{}; } + CidStore& ChunkStore = m_GetCidStore(Namespace.value()); + const bool HasUpstream = m_UpstreamCache.IsActive(); eastl::fixed_vector<RecordRequestData, 16> Requests; @@ -606,7 +633,7 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb } else if (EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData)) { - if (m_CidStore.ContainsChunk(Value.ContentId)) + if (ChunkStore.ContainsChunk(Value.ContentId)) { Value.Exists = true; } @@ -627,7 +654,7 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb } else { - if (IoBuffer Chunk = m_CidStore.FindChunkByCid(Value.ContentId)) + if (IoBuffer Chunk = ChunkStore.FindChunkByCid(Value.ContentId)) { if (Chunk.GetSize() > 0) { @@ -651,7 +678,7 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb } if (!RequestValueIndexes.empty()) { - m_CidStore.IterateChunks( + ChunkStore.IterateChunks( CidHashes, [this, &Request, &RequestValueIndexes](size_t Index, const IoBuffer& Payload) -> bool { try @@ -744,7 +771,7 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb } } - const auto OnCacheRecordGetComplete = [this, Namespace, &ParseValues, Context](CacheRecordGetCompleteParams&& Params) { + const auto OnCacheRecordGetComplete = [this, Namespace, &ChunkStore, &ParseValues, Context](CacheRecordGetCompleteParams&& Params) { if (!Params.Record) { return; @@ -765,18 +792,24 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb EnumHasAllFlags(Request.DownstreamPolicy.GetRecordPolicy(), CachePolicy::StoreLocal) && AreDiskWritesAllowed(); if (StoreLocal) { + bool Overwrite = !EnumHasAllFlags(Request.DownstreamPolicy.GetRecordPolicy(), CachePolicy::QueryLocal); std::vector<IoHash> ReferencedAttachments; ObjectBuffer.IterateAttachments([&ReferencedAttachments](CbFieldView HashView) { const IoHash ValueHash = HashView.AsHash(); ReferencedAttachments.push_back(ValueHash); }); - m_CacheStore.Put(Context, - *Namespace, - Key.Bucket, - Key.Hash, - {.Value = {Request.RecordCacheValue}}, - ReferencedAttachments, - nullptr); + ZenCacheStore::PutResult PutResult = m_CacheStore.Put(Context, + *Namespace, + Key.Bucket, + Key.Hash, + {.Value = {Request.RecordCacheValue}}, + ReferencedAttachments, + Overwrite, + nullptr); + if (PutResult.Status != zen::PutStatus::Success) + { + return; + } m_CacheStats.WriteCount++; } ParseValues(Request); @@ -807,7 +840,7 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb Value.Exists = true; if (StoreLocal) { - m_CidStore.AddChunk(Compressed.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash()); + ChunkStore.AddChunk(Compressed.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash()); } if (!EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData)) { @@ -924,10 +957,12 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con const bool HasUpstream = m_UpstreamCache.IsActive(); CbArrayView RequestsArray = Params["Requests"sv].AsArrayView(); - std::vector<bool> BatchResults; - eastl::fixed_vector<size_t, 32> BatchResultIndexes; - eastl::fixed_vector<bool, 32> Results; - eastl::fixed_vector<CacheKey, 32> UpstreamCacheKeys; + CidStore& ChunkStore = m_GetCidStore(Namespace.value()); + + std::vector<ZenCacheStore::PutResult> BatchResults; + eastl::fixed_vector<size_t, 32> BatchResultIndexes; + eastl::fixed_vector<ZenCacheStore::PutResult, 32> Results; + eastl::fixed_vector<CacheKey, 32> UpstreamCacheKeys; uint64_t RequestCount = RequestsArray.Num(); { @@ -977,34 +1012,39 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con if (EnumHasAllFlags(Policy, CachePolicy::StoreLocal)) { - IoBuffer Value = Chunk.GetCompressed().Flatten().AsIoBuffer(); + bool Overwrite = !EnumHasAllFlags(Policy, CachePolicy::QueryLocal); + IoBuffer Value = Chunk.GetCompressed().Flatten().AsIoBuffer(); Value.SetContentType(ZenContentType::kCompressedBinary); if (RawSize == 0) { RawSize = Chunk.DecodeRawSize(); } - m_CacheStore.Put(Context, - *Namespace, - Key.Bucket, - Key.Hash, - {.Value = Value, .RawSize = RawSize, .RawHash = RawHash}, - {}, - Batch.get()); - m_CacheStats.WriteCount++; + ZenCacheStore::PutResult PutResult = m_CacheStore.Put(Context, + *Namespace, + Key.Bucket, + Key.Hash, + {.Value = Value, .RawSize = RawSize, .RawHash = RawHash}, + {}, + Overwrite, + Batch.get()); + if (PutResult.Status == zen::PutStatus::Success) + { + m_CacheStats.WriteCount++; + } if (Batch) { BatchResultIndexes.push_back(Results.size()); - Results.push_back(false); + Results.push_back({zen::PutStatus::Fail}); } else { - Results.push_back(true); + Results.push_back(PutResult); } TransferredSize = Chunk.GetCompressedSize(); } else { - Results.push_back(true); + Results.push_back({zen::PutStatus::Success}); } Valid = true; } @@ -1020,12 +1060,12 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con if (m_CacheStore.Get(Context, *Namespace, Key.Bucket, Key.Hash, ExistingValue) && IsCompressedBinary(ExistingValue.Value.GetContentType())) { - Results.push_back(true); + Results.push_back({zen::PutStatus::Success}); Valid = true; } else { - Results.push_back(false); + Results.push_back({zen::PutStatus::Fail, fmt::format("Missing attachment with raw hash {}", RawHash)}); } } // We do not search the Upstream. No data in a put means the caller is probing for whether they need to do a heavy put. @@ -1060,27 +1100,49 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con { size_t BatchResultIndex = BatchResultIndexes[Index]; ZEN_ASSERT(BatchResultIndex < Results.size()); - ZEN_ASSERT(Results[BatchResultIndex] == false); + ZEN_ASSERT(Results[BatchResultIndex].Status != zen::PutStatus::Success); Results[BatchResultIndex] = BatchResults[Index]; } for (std::size_t Index = 0; Index < Results.size(); Index++) { - if (Results[Index] && UpstreamCacheKeys[Index] != CacheKey::Empty) + if ((Results[Index].Status == zen::PutStatus::Success) && UpstreamCacheKeys[Index] != CacheKey::Empty) { m_UpstreamCache.EnqueueUpstream( - {.Type = ZenContentType::kCompressedBinary, .Namespace = *Namespace, .Key = UpstreamCacheKeys[Index]}); + {.Type = ZenContentType::kCompressedBinary, .Namespace = *Namespace, .Key = UpstreamCacheKeys[Index]}, + [ChunkStore = &ChunkStore](const IoHash& ValueHash) { return ChunkStore->FindChunkByCid(ValueHash); }); } } { ZEN_TRACE_CPU("Z$::RpcPutCacheValues::Response"); CbObjectWriter ResponseObject{1024}; ResponseObject.BeginArray("Result"sv); - for (bool Value : Results) + bool bAnyErrors = false; + for (const ZenCacheStore::PutResult& Value : Results) { - ResponseObject.AddBool(Value); + if (Value.Status == zen::PutStatus::Success) + { + ResponseObject.AddBool(true); + } + else + { + bAnyErrors = true; + ResponseObject.AddBool(false); + } } ResponseObject.EndArray(); + if (bAnyErrors) + { + ResponseObject.BeginArray("ErrorMessages"sv); + for (const ZenCacheStore::PutResult& Value : Results) + { + if (Value.Status != zen::PutStatus::Success) + { + ResponseObject.AddString(Value.Message); + } + } + ResponseObject.EndArray(); + } CbPackage RpcResponse; RpcResponse.SetObject(ResponseObject.Save()); @@ -1239,6 +1301,7 @@ CacheRpcHandler::HandleRpcGetCacheValues(const CacheRequestContext& Context, CbO const bool HasData = IsCompressedBinary(Params.Value.GetContentType()); const bool SkipData = EnumHasAllFlags(Request.Policy, CachePolicy::SkipData); const bool StoreData = EnumHasAllFlags(Request.Policy, CachePolicy::StoreLocal) && AreDiskWritesAllowed(); + const bool Overwrite = StoreData && !EnumHasAllFlags(Request.Policy, CachePolicy::QueryLocal); const bool IsHit = SkipData || HasData; if (IsHit) { @@ -1249,14 +1312,19 @@ CacheRpcHandler::HandleRpcGetCacheValues(const CacheRequestContext& Context, CbO if (HasData && StoreData) { - m_CacheStore.Put(Context, - *Namespace, - Request.Key.Bucket, - Request.Key.Hash, - ZenCacheValue{.Value = Params.Value, .RawSize = Request.RawSize, .RawHash = Request.RawHash}, - {}, - nullptr); - m_CacheStats.WriteCount++; + ZenCacheStore::PutResult PutResult = m_CacheStore.Put( + Context, + *Namespace, + Request.Key.Bucket, + Request.Key.Hash, + ZenCacheValue{.Value = Params.Value, .RawSize = Request.RawSize, .RawHash = Request.RawHash}, + {}, + Overwrite, + nullptr); + if (PutResult.Status == zen::PutStatus::Success) + { + m_CacheStats.WriteCount++; + } } ZEN_DEBUG("GETCACHEVALUES HIT - '{}/{}/{}' {} ({}) in {}", @@ -1494,6 +1562,8 @@ CacheRpcHandler::GetLocalCacheRecords(const CacheRequestContext& Context, using namespace cache::detail; const bool HasUpstream = m_UpstreamCache.IsActive(); + CidStore& ChunkStore = m_GetCidStore(Namespace); + // TODO: BatchGet records? std::vector<CacheKeyRequest*> UpstreamRecordRequests; for (size_t RecordIndex = 0; RecordIndex < Records.size(); ++RecordIndex) @@ -1527,36 +1597,48 @@ CacheRpcHandler::GetLocalCacheRecords(const CacheRequestContext& Context, if (!UpstreamRecordRequests.empty()) { - const auto OnCacheRecordGetComplete = [this, Namespace, &RecordKeys, &Records, &RecordRequests, Context]( - CacheRecordGetCompleteParams&& Params) { - if (!Params.Record) - { - return; - } - CacheKeyRequest& RecordKey = Params.Request; - size_t RecordIndex = std::distance(RecordKeys.data(), &RecordKey); - RecordRequests[RecordIndex]->ElapsedTimeUs += static_cast<uint64_t>(Params.ElapsedSeconds * 1000000.0); - RecordBody& Record = Records[RecordIndex]; - - const CacheKey& Key = RecordKey.Key; - Record.Exists = true; - CbObject ObjectBuffer = CbObject::Clone(Params.Record); - Record.CacheValue = ObjectBuffer.GetBuffer().AsIoBuffer(); - Record.CacheValue.SetContentType(ZenContentType::kCbObject); - Record.Source = Params.Source; - - bool StoreLocal = EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::StoreLocal) && AreDiskWritesAllowed(); - if (StoreLocal) - { - std::vector<IoHash> ReferencedAttachments; - ObjectBuffer.IterateAttachments([&ReferencedAttachments](CbFieldView HashView) { - const IoHash ValueHash = HashView.AsHash(); - ReferencedAttachments.push_back(ValueHash); - }); - m_CacheStore.Put(Context, Namespace, Key.Bucket, Key.Hash, {.Value = Record.CacheValue}, ReferencedAttachments, nullptr); - m_CacheStats.WriteCount++; - } - }; + const auto OnCacheRecordGetComplete = + [this, Namespace, &RecordKeys, &Records, &RecordRequests, Context](CacheRecordGetCompleteParams&& Params) { + if (!Params.Record) + { + return; + } + CacheKeyRequest& RecordKey = Params.Request; + size_t RecordIndex = std::distance(RecordKeys.data(), &RecordKey); + RecordRequests[RecordIndex]->ElapsedTimeUs += static_cast<uint64_t>(Params.ElapsedSeconds * 1000000.0); + RecordBody& Record = Records[RecordIndex]; + + const CacheKey& Key = RecordKey.Key; + Record.Exists = true; + CbObject ObjectBuffer = CbObject::Clone(Params.Record); + Record.CacheValue = ObjectBuffer.GetBuffer().AsIoBuffer(); + Record.CacheValue.SetContentType(ZenContentType::kCbObject); + Record.Source = Params.Source; + + bool StoreLocal = EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::StoreLocal) && AreDiskWritesAllowed(); + if (StoreLocal) + { + bool Overwrite = !EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::QueryLocal); + std::vector<IoHash> ReferencedAttachments; + ObjectBuffer.IterateAttachments([&ReferencedAttachments](CbFieldView HashView) { + const IoHash ValueHash = HashView.AsHash(); + ReferencedAttachments.push_back(ValueHash); + }); + ZenCacheStore::PutResult PutResult = m_CacheStore.Put(Context, + Namespace, + Key.Bucket, + Key.Hash, + {.Value = Record.CacheValue}, + ReferencedAttachments, + Overwrite, + nullptr); + if (PutResult.Status != zen::PutStatus::Success) + { + return; + } + m_CacheStats.WriteCount++; + } + }; m_UpstreamCache.GetCacheRecords(Namespace, UpstreamRecordRequests, std::move(OnCacheRecordGetComplete)); } @@ -1620,12 +1702,12 @@ CacheRpcHandler::GetLocalCacheRecords(const CacheRequestContext& Context, { if (EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData) && Request->RawSizeKnown) { - if (m_CidStore.ContainsChunk(Request->Key->ChunkId)) + if (ChunkStore.ContainsChunk(Request->Key->ChunkId)) { Request->Exists = true; } } - else if (IoBuffer Payload = m_CidStore.FindChunkByCid(Request->Key->ChunkId)) + else if (IoBuffer Payload = ChunkStore.FindChunkByCid(Request->Key->ChunkId)) { if (!EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData)) { @@ -1758,6 +1840,8 @@ CacheRpcHandler::GetUpstreamCacheChunks(const CacheRequestContext& Context, return; } + CidStore& ChunkStore = m_GetCidStore(Namespace); + CacheChunkRequest& Key = Params.Request; size_t RequestIndex = std::distance(RequestKeys.data(), &Key); ChunkRequest& Request = Requests[RequestIndex]; @@ -1774,20 +1858,26 @@ CacheRpcHandler::GetUpstreamCacheChunks(const CacheRequestContext& Context, bool StoreLocal = EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::StoreLocal) && AreDiskWritesAllowed(); if (StoreLocal) { + bool Overwrite = !EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::QueryLocal); if (Request.IsRecordRequest) { - m_CidStore.AddChunk(Params.Value, Params.RawHash); + ChunkStore.AddChunk(Params.Value, Params.RawHash); } else { - m_CacheStore.Put(Context, - Namespace, - Key.Key.Bucket, - Key.Key.Hash, - {.Value = Params.Value, .RawSize = Params.RawSize, .RawHash = Params.RawHash}, - {}, - nullptr); - m_CacheStats.WriteCount++; + ZenCacheStore::PutResult PutResult = + m_CacheStore.Put(Context, + Namespace, + Key.Key.Bucket, + Key.Key.Hash, + {.Value = Params.Value, .RawSize = Params.RawSize, .RawHash = Params.RawHash}, + {}, + Overwrite, + nullptr); + if (PutResult.Status == zen::PutStatus::Success) + { + m_CacheStats.WriteCount++; + } } } if (!EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::SkipData)) diff --git a/src/zenstore/cache/structuredcachestore.cpp b/src/zenstore/cache/structuredcachestore.cpp index d956384ca..1f2d6c37f 100644 --- a/src/zenstore/cache/structuredcachestore.cpp +++ b/src/zenstore/cache/structuredcachestore.cpp @@ -139,13 +139,10 @@ ZenCacheNamespace::ZenCacheNamespace(GcManager& Gc, JobQueue& JobQueue, const st CreateDirectories(m_RootDir); m_DiskLayer.DiscoverBuckets(); - - m_Gc.AddGcStorage(this); } ZenCacheNamespace::~ZenCacheNamespace() { - m_Gc.RemoveGcStorage(this); } struct ZenCacheNamespace::PutBatchHandle @@ -154,7 +151,7 @@ struct ZenCacheNamespace::PutBatchHandle }; ZenCacheNamespace::PutBatchHandle* -ZenCacheNamespace::BeginPutBatch(std::vector<bool>& OutResult) +ZenCacheNamespace::BeginPutBatch(std::vector<PutResult>& OutResult) { ZenCacheNamespace::PutBatchHandle* Handle = new ZenCacheNamespace::PutBatchHandle; Handle->DiskLayerHandle = m_DiskLayer.BeginPutBatch(OutResult); @@ -252,11 +249,12 @@ ZenCacheNamespace::Get(std::string_view InBucket, const IoHash& HashKey, GetBatc return; } -void +ZenCacheNamespace::PutResult ZenCacheNamespace::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References, + bool Overwrite, PutBatchHandle* OptionalBatchHandle) { ZEN_TRACE_CPU(OptionalBatchHandle ? "Z$::Namespace::Put(Batched)" : "Z$::Namespace::Put"); @@ -268,8 +266,12 @@ ZenCacheNamespace::Put(std::string_view InBucket, ZEN_ASSERT(Value.Value.Size()); ZenCacheDiskLayer::PutBatchHandle* DiskLayerBatchHandle = OptionalBatchHandle ? OptionalBatchHandle->DiskLayerHandle : nullptr; - m_DiskLayer.Put(InBucket, HashKey, Value, References, DiskLayerBatchHandle); - m_WriteCount++; + PutResult RetVal = m_DiskLayer.Put(InBucket, HashKey, Value, References, Overwrite, DiskLayerBatchHandle); + if (RetVal.Status == zen::PutStatus::Success) + { + m_WriteCount++; + } + return RetVal; } bool @@ -297,7 +299,6 @@ ZenCacheNamespace::EnumerateBucketContents(std::string_view std::function<void()> ZenCacheNamespace::Drop() { - m_Gc.RemoveGcStorage(this); return m_DiskLayer.Drop(); } @@ -307,25 +308,19 @@ ZenCacheNamespace::Flush() m_DiskLayer.Flush(); } +#if ZEN_WITH_TESTS void -ZenCacheNamespace::ScrubStorage(ScrubContext& Ctx) +ZenCacheNamespace::Scrub(ScrubContext& Ctx) { - if (m_LastScrubTime == Ctx.ScrubTimestamp()) - { - return; - } - ZEN_INFO("scrubbing '{}'", m_RootDir); - - m_LastScrubTime = Ctx.ScrubTimestamp(); - - m_DiskLayer.ScrubStorage(Ctx); + m_DiskLayer.Scrub(Ctx); } +#endif // ZEN_WITH_TESTS -GcStorageSize -ZenCacheNamespace::StorageSize() const +CacheStoreSize +ZenCacheNamespace::TotalSize() const { - return m_DiskLayer.StorageSize(); + return m_DiskLayer.TotalSize(); } ZenCacheNamespace::Info @@ -557,7 +552,7 @@ ZenCacheStore::LogWorker() } } -ZenCacheStore::PutBatch::PutBatch(ZenCacheStore& CacheStore, std::string_view InNamespace, std::vector<bool>& OutResult) +ZenCacheStore::PutBatch::PutBatch(ZenCacheStore& CacheStore, std::string_view InNamespace, std::vector<PutResult>& OutResult) : m_CacheStore(CacheStore) { ZEN_MEMSCOPE(GetCacheStoreTag()); @@ -720,13 +715,14 @@ ZenCacheStore::Get(const CacheRequestContext& Context, m_MissCount++; } -void +ZenCacheStore::PutResult ZenCacheStore::Put(const CacheRequestContext& Context, std::string_view Namespace, std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References, + bool Overwrite, PutBatch* OptionalBatchHandle) { // Ad hoc rejection of known bad usage patterns for DDC bucket names @@ -734,7 +730,7 @@ ZenCacheStore::Put(const CacheRequestContext& Context, if (IsKnownBadBucketName(Bucket)) { m_RejectedWriteCount++; - return; + return PutResult{zen::PutStatus::Invalid, "Bad bucket name"}; } ZEN_MEMSCOPE(GetCacheStoreTag()); @@ -764,9 +760,16 @@ ZenCacheStore::Put(const CacheRequestContext& Context, if (ZenCacheNamespace* Store = GetNamespace(Namespace); Store) { ZenCacheNamespace::PutBatchHandle* BatchHandle = OptionalBatchHandle ? OptionalBatchHandle->m_NamespaceBatchHandle : nullptr; - Store->Put(Bucket, HashKey, Value, References, BatchHandle); - m_WriteCount++; - return; + PutResult RetVal = Store->Put(Bucket, HashKey, Value, References, Overwrite, BatchHandle); + if (RetVal.Status == zen::PutStatus::Success) + { + m_WriteCount++; + } + else + { + m_RejectedWriteCount++; + } + return RetVal; } ZEN_WARN("request for unknown namespace '{}' in ZenCacheStore::Put [{}] bucket '{}', key '{}'", @@ -774,6 +777,7 @@ ZenCacheStore::Put(const CacheRequestContext& Context, Namespace, Bucket, HashKey.ToHexString()); + return PutResult{zen::PutStatus::Fail, fmt::format("Unknown namespace '{}'", Namespace)}; } bool @@ -822,11 +826,13 @@ ZenCacheStore::Flush() IterateNamespaces([&](std::string_view, ZenCacheNamespace& Store) { Store.Flush(); }); } +#if ZEN_WITH_TESTS void -ZenCacheStore::ScrubStorage(ScrubContext& Ctx) +ZenCacheStore::Scrub(ScrubContext& Ctx) { - IterateNamespaces([&](std::string_view, ZenCacheNamespace& Store) { Store.ScrubStorage(Ctx); }); + IterateNamespaces([&](std::string_view, ZenCacheNamespace& Store) { Store.Scrub(Ctx); }); } +#endif // ZEN_WITH_TESTS CacheValueDetails ZenCacheStore::GetValueDetails(const std::string_view NamespaceFilter, @@ -951,12 +957,12 @@ ZenCacheStore::IterateNamespaces(const std::function<void(std::string_view Names } } -GcStorageSize -ZenCacheStore::StorageSize() const +CacheStoreSize +ZenCacheStore::TotalSize() const { - GcStorageSize Size; + CacheStoreSize Size; IterateNamespaces([&](std::string_view, ZenCacheNamespace& Store) { - GcStorageSize StoreSize = Store.StorageSize(); + CacheStoreSize StoreSize = Store.TotalSize(); Size.MemorySize += StoreSize.MemorySize; Size.DiskSize += StoreSize.DiskSize; }); @@ -1026,7 +1032,7 @@ ZenCacheStore::SetLoggingConfig(const Configuration::LogConfig& Loggingconfig) ZenCacheStore::Info ZenCacheStore::GetInfo() const { - ZenCacheStore::Info Info = {.Config = m_Configuration, .StorageSize = StorageSize()}; + ZenCacheStore::Info Info = {.Config = m_Configuration, .StorageSize = TotalSize()}; IterateNamespaces([&Info](std::string_view NamespaceName, ZenCacheNamespace& Namespace) { Info.NamespaceNames.push_back(std::string(NamespaceName)); @@ -1378,7 +1384,7 @@ TEST_CASE("cachestore.store") Value.Value = Obj.GetBuffer().AsIoBuffer(); Value.Value.SetContentType(ZenContentType::kCbObject); - Zcs.Put("test_bucket"sv, Key, Value, {}); + Zcs.Put("test_bucket"sv, Key, Value, {}, false); } for (int i = 0; i < kIterationCount; ++i) @@ -1414,7 +1420,7 @@ TEST_CASE("cachestore.size") const size_t Count = 16; ScopedTemporaryDirectory TempDir; - GcStorageSize CacheSize; + CacheStoreSize CacheSize; { GcManager Gc; @@ -1432,10 +1438,10 @@ TEST_CASE("cachestore.size") const size_t Bucket = Key % 4; std::string BucketName = fmt::format("test_bucket-{}", Bucket); IoHash Hash = IoHash::HashBuffer(&Key, sizeof(uint32_t)); - Zcs.Put(BucketName, Hash, ZenCacheValue{.Value = Buffer}, {}); + Zcs.Put(BucketName, Hash, ZenCacheValue{.Value = Buffer}, {}, false); Keys.push_back({BucketName, Hash}); } - CacheSize = Zcs.StorageSize(); + CacheSize = Zcs.TotalSize(); CHECK_LE(CacheValue.GetSize() * Count, CacheSize.DiskSize); CHECK_EQ(0, CacheSize.MemorySize); @@ -1445,7 +1451,7 @@ TEST_CASE("cachestore.size") Zcs.Get(Key.first, Key.second, _); } - CacheSize = Zcs.StorageSize(); + CacheSize = Zcs.TotalSize(); CHECK_LE(CacheValue.GetSize() * Count, CacheSize.DiskSize); CHECK_LE(CacheValue.GetSize() * Count, CacheSize.MemorySize); } @@ -1454,7 +1460,7 @@ TEST_CASE("cachestore.size") GcManager Gc; ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache", {}); - const GcStorageSize SerializedSize = Zcs.StorageSize(); + const CacheStoreSize SerializedSize = Zcs.TotalSize(); CHECK_EQ(SerializedSize.MemorySize, 0); CHECK_LE(SerializedSize.DiskSize, CacheSize.DiskSize); @@ -1462,8 +1468,8 @@ TEST_CASE("cachestore.size") { Zcs.DropBucket(fmt::format("test_bucket-{}", Bucket)); } - CHECK_EQ(0, Zcs.StorageSize().DiskSize); - CHECK_EQ(0, Zcs.StorageSize().MemorySize); + CHECK_EQ(0, Zcs.TotalSize().DiskSize); + CHECK_EQ(0, Zcs.TotalSize().MemorySize); } } @@ -1472,7 +1478,7 @@ TEST_CASE("cachestore.size") const size_t Count = 16; ScopedTemporaryDirectory TempDir; - GcStorageSize CacheSize; + CacheStoreSize CacheSize; { GcManager Gc; @@ -1486,10 +1492,10 @@ TEST_CASE("cachestore.size") for (size_t Key = 0; Key < Count; ++Key) { const size_t Bucket = Key % 4; - Zcs.Put(fmt::format("test_bucket-{}", Bucket), IoHash::HashBuffer(&Key, sizeof(uint32_t)), {.Value = Buffer}, {}); + Zcs.Put(fmt::format("test_bucket-{}", Bucket), IoHash::HashBuffer(&Key, sizeof(uint32_t)), {.Value = Buffer}, {}, false); } - CacheSize = Zcs.StorageSize(); + CacheSize = Zcs.TotalSize(); CHECK_LE(CacheValue.GetSize() * Count, CacheSize.DiskSize); CHECK_EQ(0, CacheSize.MemorySize); } @@ -1498,7 +1504,7 @@ TEST_CASE("cachestore.size") GcManager Gc; ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache", {}); - const GcStorageSize SerializedSize = Zcs.StorageSize(); + const CacheStoreSize SerializedSize = Zcs.TotalSize(); CHECK_EQ(SerializedSize.MemorySize, 0); CHECK_LE(SerializedSize.DiskSize, CacheSize.DiskSize); @@ -1506,7 +1512,7 @@ TEST_CASE("cachestore.size") { Zcs.DropBucket(fmt::format("test_bucket-{}", Bucket)); } - CHECK_EQ(0, Zcs.StorageSize().DiskSize); + CHECK_EQ(0, Zcs.TotalSize().DiskSize); } } } @@ -1569,7 +1575,7 @@ TEST_CASE("cachestore.threadedinsert") // * doctest::skip(true)) for (const auto& Chunk : Chunks) { ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, &Chunk]() { - Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}, {}); + Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}, {}, false); WorkCompleted.fetch_add(1); }); } @@ -1599,7 +1605,7 @@ TEST_CASE("cachestore.threadedinsert") // * doctest::skip(true)) GcChunkHashes.swap(RemainingChunkHashes); }; - const uint64_t TotalSize = Zcs.StorageSize().DiskSize; + const uint64_t TotalSize = Zcs.TotalSize().DiskSize; CHECK_LE(kChunkSize * Chunks.size(), TotalSize); { @@ -1650,7 +1656,7 @@ TEST_CASE("cachestore.threadedinsert") // * doctest::skip(true)) for (const auto& Chunk : NewChunks) { ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, Chunk, &AddedChunkCount]() { - Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}, {}); + Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}, {}, false); AddedChunkCount.fetch_add(1); WorkCompleted.fetch_add(1); }); @@ -1755,14 +1761,14 @@ TEST_CASE("cachestore.namespaces") Buffer.SetContentType(ZenContentType::kCbObject); ZenCacheValue PutValue = {.Value = Buffer}; - Zcs.Put(Context, ZenCacheStore::DefaultNamespace, Bucket, Key1, PutValue, {}); + Zcs.Put(Context, ZenCacheStore::DefaultNamespace, Bucket, Key1, PutValue, {}, false); ZenCacheValue GetValue; CHECK(Zcs.Get(Context, ZenCacheStore::DefaultNamespace, Bucket, Key1, GetValue)); CHECK(!Zcs.Get(Context, CustomNamespace, Bucket, Key1, GetValue)); // This should just be dropped as we don't allow creating of namespaces on the fly - Zcs.Put(Context, CustomNamespace, Bucket, Key1, PutValue, {}); + Zcs.Put(Context, CustomNamespace, Bucket, Key1, PutValue, {}, false); CHECK(!Zcs.Get(Context, CustomNamespace, Bucket, Key1, GetValue)); } @@ -1778,7 +1784,7 @@ TEST_CASE("cachestore.namespaces") IoBuffer Buffer2 = CacheValue2.GetBuffer().AsIoBuffer(); Buffer2.SetContentType(ZenContentType::kCbObject); ZenCacheValue PutValue2 = {.Value = Buffer2}; - Zcs.Put(Context, CustomNamespace, Bucket, Key2, PutValue2, {}); + Zcs.Put(Context, CustomNamespace, Bucket, Key2, PutValue2, {}, false); ZenCacheValue GetValue; CHECK(!Zcs.Get(Context, ZenCacheStore::DefaultNamespace, Bucket, Key2, GetValue)); @@ -1820,7 +1826,7 @@ TEST_CASE("cachestore.drop.bucket") Buffer.SetContentType(ZenContentType::kCbObject); ZenCacheValue PutValue = {.Value = Buffer}; - Zcs.Put(Context, Namespace, Bucket, Key, PutValue, {}); + Zcs.Put(Context, Namespace, Bucket, Key, PutValue, {}, false); return Key; }; auto GetValue = [&Context](ZenCacheStore& Zcs, std::string_view Namespace, std::string_view Bucket, const IoHash& Key) { @@ -1893,7 +1899,7 @@ TEST_CASE("cachestore.drop.namespace") Buffer.SetContentType(ZenContentType::kCbObject); ZenCacheValue PutValue = {.Value = Buffer}; - Zcs.Put(Context, Namespace, Bucket, Key, PutValue, {}); + Zcs.Put(Context, Namespace, Bucket, Key, PutValue, {}, false); return Key; }; auto GetValue = [&Context](ZenCacheStore& Zcs, std::string_view Namespace, std::string_view Bucket, const IoHash& Key) { @@ -1957,8 +1963,6 @@ TEST_CASE("cachestore.blocked.disklayer.put") { ScopedTemporaryDirectory TempDir; - GcStorageSize CacheSize; - const auto CreateCacheValue = [](size_t Size) -> CbObject { std::vector<uint8_t> Buf; Buf.resize(Size, Size & 0xff); @@ -1979,7 +1983,7 @@ TEST_CASE("cachestore.blocked.disklayer.put") size_t Key = Buffer.Size(); IoHash HashKey = IoHash::HashBuffer(&Key, sizeof(uint32_t)); - Zcs.Put("test_bucket", HashKey, {.Value = Buffer}, {}); + Zcs.Put("test_bucket", HashKey, {.Value = Buffer}, {}, false); ZenCacheValue BufferGet; CHECK(Zcs.Get("test_bucket", HashKey, BufferGet)); @@ -1989,7 +1993,7 @@ TEST_CASE("cachestore.blocked.disklayer.put") Buffer2.SetContentType(ZenContentType::kCbObject); // We should be able to overwrite even if the file is open for read - Zcs.Put("test_bucket", HashKey, {.Value = Buffer2}, {}); + Zcs.Put("test_bucket", HashKey, {.Value = Buffer2}, {}, false); MemoryView OldView = BufferGet.Value.GetView(); @@ -2080,7 +2084,7 @@ TEST_CASE("cachestore.scrub") AttachmentHashes.push_back(Attachment.DecodeRawHash()); CidStore.AddChunk(Attachment.GetCompressed().Flatten().AsIoBuffer(), AttachmentHashes.back()); } - Zcs.Put("mybucket", Cid, {.Value = Record.Record}, AttachmentHashes); + Zcs.Put("mybucket", Cid, {.Value = Record.Record}, AttachmentHashes, false); } }; @@ -2094,8 +2098,8 @@ TEST_CASE("cachestore.scrub") WorkerThreadPool ThreadPool{1}; ScrubContext ScrubCtx{ThreadPool}; - Zcs.ScrubStorage(ScrubCtx); - CidStore.ScrubStorage(ScrubCtx); + Zcs.Scrub(ScrubCtx); + CidStore.Scrub(ScrubCtx); CHECK(ScrubCtx.ScrubbedChunks() == (StructuredCids.size() + StructuredCids.size() * AttachmentSizes.size()) + UnstructuredCids.size()); CHECK(ScrubCtx.BadCids().GetSize() == 0); } @@ -2129,7 +2133,8 @@ TEST_CASE("cachestore.newgc.basics") {.Value = Record.second, .RawSize = Record.second.GetSize(), .RawHash = IoHash::HashBuffer(Record.second.GetData(), Record.second.GetSize())}, - AttachmentKeys); + AttachmentKeys, + false); for (const auto& Attachment : Attachments) { CidStore.AddChunk(Attachment.second.GetCompressed().Flatten().AsIoBuffer(), Attachment.second.DecodeRawHash()); @@ -2145,7 +2150,8 @@ TEST_CASE("cachestore.newgc.basics") {.Value = CacheValue.second, .RawSize = CacheValue.second.GetSize(), .RawHash = IoHash::HashBuffer(CacheValue.second.GetData(), CacheValue.second.GetSize())}, - {}); + {}, + false); CacheEntries.insert({Key, CacheEntry{CacheValue.second, {}}}); return Key; }; diff --git a/src/zenstore/cas.cpp b/src/zenstore/cas.cpp index 460f0e10d..6b89beb3d 100644 --- a/src/zenstore/cas.cpp +++ b/src/zenstore/cas.cpp @@ -73,9 +73,12 @@ public: WorkerThreadPool* OptionalWorkerPool, uint64_t LargeSizeLimit) override; virtual void Flush() override; - virtual void ScrubStorage(ScrubContext& Ctx) override; virtual CidStoreSize TotalSize() const override; +#if ZEN_WITH_TESTS + virtual void Scrub(ScrubContext& Ctx) override; +#endif // ZEN_WITH_TESTS + private: CasContainerStrategy m_TinyStrategy; CasContainerStrategy m_SmallStrategy; @@ -195,7 +198,7 @@ CasImpl::OpenOrCreateManifest() } else { - ZEN_WARN("Store manifest validation failed: {:#x}, will generate new manifest to recover", uint32_t(ValidationResult)); + ZEN_WARN("Store manifest validation failed: {}, will generate new manifest to recover", ToString(ValidationResult)); } if (ManifestIsOk) @@ -463,24 +466,19 @@ CasImpl::Flush() m_LargeStrategy.Flush(); } +#if ZEN_WITH_TESTS void -CasImpl::ScrubStorage(ScrubContext& Ctx) +CasImpl::Scrub(ScrubContext& Ctx) { ZEN_MEMSCOPE(GetCasTag()); ZEN_TRACE_CPU("Cas::ScrubStorage"); - if (m_LastScrubTime == Ctx.ScrubTimestamp()) - { - return; - } - - m_LastScrubTime = Ctx.ScrubTimestamp(); - m_SmallStrategy.ScrubStorage(Ctx); m_TinyStrategy.ScrubStorage(Ctx); m_LargeStrategy.ScrubStorage(Ctx); } +#endif // ZEN_WITH_TESTS CidStoreSize CasImpl::TotalSize() const @@ -523,7 +521,7 @@ TEST_CASE("CasStore") WorkerThreadPool ThreadPool{1}; ScrubContext Ctx{ThreadPool}; - Store->ScrubStorage(Ctx); + Store->Scrub(Ctx); IoBuffer Value1{16}; memcpy(Value1.MutableData(), "1234567890123456", 16); diff --git a/src/zenstore/cas.h b/src/zenstore/cas.h index e279dd2cc..0f6e2ba9d 100644 --- a/src/zenstore/cas.h +++ b/src/zenstore/cas.h @@ -50,12 +50,13 @@ public: WorkerThreadPool* OptionalWorkerPool, uint64_t LargeSizeLimit) = 0; virtual void Flush() = 0; - virtual void ScrubStorage(ScrubContext& Ctx) = 0; virtual CidStoreSize TotalSize() const = 0; +#if ZEN_WITH_TESTS + virtual void Scrub(ScrubContext& Ctx) = 0; +#endif // ZEN_WITH_TESTS protected: CidStoreConfiguration m_Config; - uint64_t m_LastScrubTime = 0; }; ZENCORE_API std::unique_ptr<CasStore> CreateCasStore(GcManager& Gc); diff --git a/src/zenstore/cidstore.cpp b/src/zenstore/cidstore.cpp index 2ab769d04..ae1b59dc0 100644 --- a/src/zenstore/cidstore.cpp +++ b/src/zenstore/cidstore.cpp @@ -127,17 +127,9 @@ struct CidStore::Impl void Flush() { m_CasStore.Flush(); } - void ScrubStorage(ScrubContext& Ctx) - { - if (Ctx.ScrubTimestamp() == m_LastScrubTime) - { - return; - } - - m_LastScrubTime = Ctx.ScrubTimestamp(); - - m_CasStore.ScrubStorage(Ctx); - } +#if ZEN_WITH_TESTS + void Scrub(ScrubContext& Ctx) { m_CasStore.Scrub(Ctx); } +#endif // ZEN_WITH_TESTS CidStoreStats Stats() { @@ -236,11 +228,13 @@ CidStore::Flush() m_Impl->Flush(); } +#if ZEN_WITH_TESTS void -CidStore::ScrubStorage(ScrubContext& Ctx) +CidStore::Scrub(ScrubContext& Ctx) { - m_Impl->ScrubStorage(Ctx); + m_Impl->Scrub(Ctx); } +#endif // ZEN_WITH_TESTS CidStoreSize CidStore::TotalSize() const diff --git a/src/zenstore/compactcas.h b/src/zenstore/compactcas.h index 15e4cbf81..32c256a42 100644 --- a/src/zenstore/compactcas.h +++ b/src/zenstore/compactcas.h @@ -68,10 +68,10 @@ struct CasContainerStrategy final : public GcStorage, public GcReferenceStore void Flush(); // GcStorage - virtual void ScrubStorage(ScrubContext& ScrubCtx) override; virtual GcStorageSize StorageSize() const override; + // GcReferenceStore virtual std::string GetGcName(GcCtx& Ctx) override; virtual GcReferencePruner* CreateReferencePruner(GcCtx& Ctx, GcReferenceStoreStats& Stats) override; diff --git a/src/zenstore/include/zenstore/buildstore/buildstore.h b/src/zenstore/include/zenstore/buildstore/buildstore.h index adf48dc26..87b7dd812 100644 --- a/src/zenstore/include/zenstore/buildstore/buildstore.h +++ b/src/zenstore/include/zenstore/buildstore/buildstore.h @@ -6,9 +6,8 @@ #include <zencore/iohash.h> #include <zenstore/accesstime.h> #include <zenstore/caslog.h> +#include <zenstore/cidstore.h> #include <zenstore/gc.h> -#include "../compactcas.h" -#include "../filecas.h" ZEN_THIRD_PARTY_INCLUDES_START #include <tsl/robin_map.h> @@ -19,18 +18,13 @@ namespace zen { struct BuildStoreConfig { std::filesystem::path RootDirectory; - uint32_t SmallBlobBlockStoreMaxBlockSize = 256 * 1024 * 1024; - uint64_t SmallBlobBlockStoreMaxBlockEmbedSize = 1 * 1024 * 1024; - uint32_t SmallBlobBlockStoreAlignement = 16; - uint32_t MetadataBlockStoreMaxBlockSize = 64 * 1024 * 1024; - uint32_t MetadataBlockStoreAlignement = 8; - uint64_t MaxDiskSpaceLimit = 1u * 1024u * 1024u * 1024u * 1024u; // 1TB + uint64_t MaxDiskSpaceLimit = 1u * 1024u * 1024u * 1024u * 1024u; // 1TB }; -class BuildStore : public GcReferencer, public GcReferenceLocker, public GcStorage +class BuildStore : public GcReferencer, public GcReferenceLocker { public: - explicit BuildStore(const BuildStoreConfig& Config, GcManager& Gc); + explicit BuildStore(const BuildStoreConfig& Config, GcManager& Gc, CidStore& BlobStore); virtual ~BuildStore(); void PutBlob(const IoHash& BlobHashes, const IoBuffer& Payload); @@ -44,7 +38,7 @@ public: std::vector<BlobExistsResult> BlobsExists(std::span<const IoHash> BlobHashes); - void PutMetadatas(std::span<const IoHash> BlobHashes, std::span<const IoBuffer> MetaDatas); + void PutMetadatas(std::span<const IoHash> BlobHashes, std::span<const IoBuffer> MetaDatas, WorkerThreadPool* OptionalWorkerPool); std::vector<IoBuffer> GetMetadatas(std::span<const IoHash> BlobHashes, WorkerThreadPool* OptionalWorkerPool); void Flush(); @@ -52,10 +46,8 @@ public: struct StorageStats { uint64_t EntryCount = 0; - uint64_t LargeBlobCount = 0; - uint64_t LargeBlobBytes = 0; - uint64_t SmallBlobCount = 0; - uint64_t SmallBlobBytes = 0; + uint64_t BlobCount = 0; + uint64_t BlobBytes = 0; uint64_t MetadataCount = 0; uint64_t MetadataByteCount = 0; }; @@ -86,23 +78,18 @@ private: //////// GcReferenceLocker virtual std::vector<RwLock::SharedLockScope> LockState(GcCtx& Ctx) override; - //////// GcStorage - virtual void ScrubStorage(ScrubContext& ScrubCtx) override; - virtual GcStorageSize StorageSize() const override; - #pragma pack(push) #pragma pack(1) struct PayloadEntry { PayloadEntry() {} - PayloadEntry(uint64_t Flags, uint64_t Size) + PayloadEntry(uint8_t Flags, uint64_t Size) { ZEN_ASSERT((Size & 0x00ffffffffffffffu) == Size); - ZEN_ASSERT((Flags & (kTombStone | kStandalone)) == Flags); + ZEN_ASSERT((Flags & (kTombStone)) == Flags); FlagsAndSize = (Size << 8) | Flags; } - static const uint8_t kTombStone = 0x10u; // Represents a deleted key/value - static const uint8_t kStandalone = 0x20u; // This payload is stored as a standalone value + static const uint8_t kTombStone = 0x10u; // Represents a deleted key/value uint64_t FlagsAndSize = 0; uint64_t GetSize() const { return FlagsAndSize >> 8; } @@ -126,27 +113,47 @@ private: struct MetadataEntry { - BlockStoreLocation Location; // 12 bytes + IoHash MetadataHash; // 20 bytes + + MetadataEntry() {} - ZenContentType ContentType = ZenContentType::kCOUNT; // 1 byte - static const uint8_t kTombStone = 0x10u; // Represents a deleted key/value - uint8_t Flags = 0; // 1 byte + MetadataEntry(const IoHash& Hash, uint64_t Size, ZenContentType ContentType, uint8_t Flags) + { + ZEN_ASSERT((Size & 0x0000ffffffffffffu) == Size); + ZEN_ASSERT((Flags & kTombStone) == Flags); + FlagsContentTypeAndSize = (Size << 16) | ((uint64_t)ContentType << 8) | Flags; + MetadataHash = Hash; + } + + static const uint8_t kTombStone = 0x10u; // Represents a deleted key/value + + uint64_t GetSize() const { return FlagsContentTypeAndSize >> 16; } + void SetSize(uint64_t Size) + { + ZEN_ASSERT((Size & 0x0000ffffffffffffu) == Size); + FlagsContentTypeAndSize = (Size << 16) | (FlagsContentTypeAndSize & 0xffff); + } + + uint8_t GetFlags() const { return uint8_t(FlagsContentTypeAndSize & 0xff); } + void AddFlag(uint8_t Flag) { FlagsContentTypeAndSize |= Flag; } + void SetFlags(uint8_t Flags) { FlagsContentTypeAndSize = (FlagsContentTypeAndSize & 0xffffffffffffff00u) | Flags; } + + ZenContentType GetContentType() const { return ZenContentType((FlagsContentTypeAndSize >> 8) & 0xff); } + void SetContentType(ZenContentType ContentType) + { + FlagsContentTypeAndSize = (FlagsContentTypeAndSize & 0xffffffffffff00ffu) | (uint16_t(ContentType) << 8); + } - uint8_t Reserved1 = 0; - uint8_t Reserved2 = 0; + uint64_t FlagsContentTypeAndSize = ((uint64_t)ZenContentType::kCOUNT << 8); }; - static_assert(sizeof(MetadataEntry) == 16); + static_assert(sizeof(MetadataEntry) == 28); struct MetadataDiskEntry { - MetadataEntry Entry; // 16 bytes + MetadataEntry Entry; // 28 bytes IoHash BlobHash; // 20 bytes - uint8_t Reserved1 = 0; - uint8_t Reserved2 = 0; - uint8_t Reserved3 = 0; - uint8_t Reserved4 = 0; }; - static_assert(sizeof(MetadataDiskEntry) == 40); + static_assert(sizeof(MetadataDiskEntry) == 48); #pragma pack(pop) @@ -206,17 +213,15 @@ private: std::vector<BlobEntry> m_BlobEntries; tsl::robin_map<IoHash, BlobIndex, IoHash::Hasher> m_BlobLookup; - FileCasStrategy m_LargeBlobStore; - CasContainerStrategy m_SmallBlobStore; - BlockStore m_MetadataBlockStore; + CidStore& m_BlobStore; TCasLogFile<PayloadDiskEntry> m_PayloadlogFile; TCasLogFile<MetadataDiskEntry> m_MetadatalogFile; uint64_t m_BlobLogFlushPosition = 0; uint64_t m_MetaLogFlushPosition = 0; - std::unique_ptr<HashSet> m_TrackedCacheKeys; - std::atomic<uint64_t> m_LastAccessTimeUpdateCount; + std::unique_ptr<std::vector<IoHash>> m_TrackedBlobKeys; + std::atomic<uint64_t> m_LastAccessTimeUpdateCount; friend class BuildStoreGcReferenceChecker; friend class BuildStoreGcReferencePruner; diff --git a/src/zenstore/include/zenstore/cache/cachedisklayer.h b/src/zenstore/include/zenstore/cache/cachedisklayer.h index 3cd2d6423..49c52f847 100644 --- a/src/zenstore/include/zenstore/cache/cachedisklayer.h +++ b/src/zenstore/include/zenstore/cache/cachedisklayer.h @@ -106,6 +106,12 @@ static_assert(sizeof(DiskIndexEntry) == 32); ////////////////////////////////////////////////////////////////////////// +struct CacheStoreSize +{ + uint64_t DiskSize = 0; + uint64_t MemorySize = 0; +}; + class ZenCacheDiskLayer { public: @@ -115,6 +121,7 @@ public: uint32_t PayloadAlignment = 1u << 4; uint64_t MemCacheSizeThreshold = 1 * 1024; uint64_t LargeObjectThreshold = 128 * 1024; + bool LimitOverwrites = false; }; struct Configuration @@ -130,8 +137,8 @@ public: struct BucketInfo { - uint64_t EntryCount = 0; - GcStorageSize StorageSize; + uint64_t EntryCount = 0; + CacheStoreSize StorageSize; }; struct Info @@ -140,7 +147,7 @@ public: Configuration Config; std::vector<std::string> BucketNames; uint64_t EntryCount = 0; - GcStorageSize StorageSize; + CacheStoreSize StorageSize; }; struct BucketStats @@ -170,6 +177,12 @@ public: uint64_t MemorySize; }; + struct PutResult + { + zen::PutStatus Status; + std::string Message; + }; + explicit ZenCacheDiskLayer(GcManager& Gc, JobQueue& JobQueue, const std::filesystem::path& RootDir, const Configuration& Config); ~ZenCacheDiskLayer(); @@ -180,22 +193,22 @@ public: void Get(std::string_view Bucket, const IoHash& HashKey, GetBatchHandle& BatchHandle); struct PutBatchHandle; - PutBatchHandle* BeginPutBatch(std::vector<bool>& OutResult); + PutBatchHandle* BeginPutBatch(std::vector<PutResult>& OutResult); void EndPutBatch(PutBatchHandle* Batch) noexcept; - void Put(std::string_view Bucket, + PutResult Put(std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References, + bool Overwrite, PutBatchHandle* OptionalBatchHandle); std::function<void()> Drop(); std::function<void()> DropBucket(std::string_view Bucket); void Flush(); - void ScrubStorage(ScrubContext& Ctx); - void DiscoverBuckets(); - GcStorageSize StorageSize() const; - DiskStats Stats() const; + void DiscoverBuckets(); + CacheStoreSize TotalSize() const; + DiskStats Stats() const; Info GetInfo() const; std::optional<BucketInfo> GetBucketInfo(std::string_view Bucket) const; @@ -212,6 +225,7 @@ public: #if ZEN_WITH_TESTS void SetAccessTime(std::string_view Bucket, const IoHash& HashKey, GcClock::TimePoint Time); + void Scrub(ScrubContext& Ctx); #endif // ZEN_WITH_TESTS bool GetContentStats(std::string_view BucketName, CacheContentStats& OutContentStats) const; @@ -219,7 +233,7 @@ public: /** A cache bucket manages a single directory containing metadata and data for that bucket */ - struct CacheBucket : public GcReferencer + struct CacheBucket : public GcReferencer, public GcStorage { CacheBucket(GcManager& Gc, std::atomic_uint64_t& OuterCacheMemoryUsage, @@ -236,13 +250,22 @@ public: void Get(const IoHash& HashKey, GetBatchHandle& BatchHandle); struct PutBatchHandle; - PutBatchHandle* BeginPutBatch(std::vector<bool>& OutResult); - void EndPutBatch(PutBatchHandle* Batch) noexcept; - void Put(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References, PutBatchHandle* OptionalBatchHandle); - uint64_t MemCacheTrim(GcClock::TimePoint ExpireTime); - std::function<void()> Drop(); - void Flush(); - void ScrubStorage(ScrubContext& Ctx); + PutBatchHandle* BeginPutBatch(std::vector<ZenCacheDiskLayer::PutResult>& OutResult); + void EndPutBatch(PutBatchHandle* Batch) noexcept; + PutResult Put(const IoHash& HashKey, + const ZenCacheValue& Value, + std::span<IoHash> References, + bool Overwrite, + PutBatchHandle* OptionalBatchHandle); + uint64_t MemCacheTrim(GcClock::TimePoint ExpireTime); + std::function<void()> Drop(); + void Flush(); + inline CacheStoreSize TotalSize() const + { + return {.DiskSize = m_StandaloneSize.load(std::memory_order::relaxed) + m_BlockStore.TotalSize(), + .MemorySize = m_MemCachedSize.load(std::memory_order::relaxed)}; + } + RwLock::SharedLockScope GetGcReferencerLock(); struct ReferencesStats @@ -265,11 +288,6 @@ public: std::span<const std::size_t> ChunkIndexes, std::vector<IoHash>& OutReferences) const; - inline GcStorageSize StorageSize() const - { - return {.DiskSize = m_StandaloneSize.load(std::memory_order::relaxed) + m_BlockStore.TotalSize(), - .MemorySize = m_MemCachedSize.load(std::memory_order::relaxed)}; - } uint64_t EntryCount() const; BucketStats Stats(); @@ -281,6 +299,20 @@ public: void SetAccessTime(const IoHash& HashKey, GcClock::TimePoint Time); #endif // ZEN_WITH_TESTS + // GcReferencer + virtual std::string GetGcName(GcCtx& Ctx) override; + virtual GcStoreCompactor* RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) override; + virtual std::vector<GcReferenceChecker*> CreateReferenceCheckers(GcCtx& Ctx) override; + virtual std::vector<GcReferenceValidator*> CreateReferenceValidators(GcCtx& Ctx) override; + + // GcStorage + virtual void ScrubStorage(ScrubContext& Ctx) override; + virtual GcStorageSize StorageSize() const override + { + CacheStoreSize Size = TotalSize(); + return {.DiskSize = Size.DiskSize, .MemorySize = Size.MemorySize}; + } + private: #pragma pack(push) #pragma pack(1) @@ -379,19 +411,16 @@ public: std::atomic_uint64_t m_StandaloneSize{}; std::atomic_uint64_t m_MemCachedSize{}; - virtual std::string GetGcName(GcCtx& Ctx) override; - virtual GcStoreCompactor* RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) override; - virtual std::vector<GcReferenceChecker*> CreateReferenceCheckers(GcCtx& Ctx) override; - virtual std::vector<GcReferenceValidator*> CreateReferenceValidators(GcCtx& Ctx) override; - - void BuildPath(PathBuilderBase& Path, const IoHash& HashKey) const; - void PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References); - IoBuffer GetStandaloneCacheValue(const DiskLocation& Loc, const IoHash& HashKey) const; - void PutInlineCacheValue(const IoHash& HashKey, - const ZenCacheValue& Value, - std::span<IoHash> References, - PutBatchHandle* OptionalBatchHandle = nullptr); - IoBuffer GetInlineCacheValue(const DiskLocation& Loc) const; + void BuildPath(PathBuilderBase& Path, const IoHash& HashKey) const; + bool ShouldRejectPut(const IoHash& HashKey, ZenCacheValue& InOutValue, bool Overwrite, ZenCacheDiskLayer::PutResult& OutPutResult); + void PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References); + IoBuffer GetStandaloneCacheValue(const DiskLocation& Loc, const IoHash& HashKey) const; + PutResult PutInlineCacheValue(const IoHash& HashKey, + const ZenCacheValue& Value, + std::span<IoHash> References, + bool Overwrite, + PutBatchHandle* OptionalBatchHandle = nullptr); + IoBuffer GetInlineCacheValue(const DiskLocation& Loc) const; CacheValueDetails::ValueDetails GetValueDetails(RwLock::SharedLockScope&, const IoHash& Key, PayloadIndex Index) const; void SetMetaData(RwLock::ExclusiveLockScope&, diff --git a/src/zenstore/include/zenstore/cache/cacherpc.h b/src/zenstore/include/zenstore/cache/cacherpc.h index da8cf69fe..80340d72c 100644 --- a/src/zenstore/include/zenstore/cache/cacherpc.h +++ b/src/zenstore/include/zenstore/cache/cacherpc.h @@ -4,6 +4,7 @@ #include <zencore/iobuffer.h> #include <zencore/logging.h> +#include <zenstore/cache/cacheshared.h> #include <zenutil/cache/cache.h> #include <atomic> @@ -56,13 +57,6 @@ struct CacheStats std::atomic_uint64_t RpcChunkBatchRequests{}; }; -enum class PutResult -{ - Success, - Fail, - Invalid, -}; - /** Recognize both kBinary and kCompressedBinary as kCompressedBinary for structured cache value keys. We need this until the content type is preserved for kCompressedBinary when passing to and from upstream servers. @@ -76,11 +70,13 @@ IsCompressedBinary(ZenContentType Type) struct CacheRpcHandler { + typedef std::function<CidStore&(std::string_view Context)> GetCidStoreFunc; + CacheRpcHandler(LoggerRef InLog, CacheStats& InCacheStats, UpstreamCacheClient& InUpstreamCache, ZenCacheStore& InCacheStore, - CidStore& InCidStore, + GetCidStoreFunc&& InGetCidStore, const DiskWriteBlocker* InDiskWriteBlocker); ~CacheRpcHandler(); @@ -100,6 +96,8 @@ struct CacheRpcHandler int& OutTargetProcessId, CbPackage& OutPackage); + CidStore& GetCidStore(std::string_view Namespace); + private: CbPackage HandleRpcPutCacheRecords(const CacheRequestContext& Context, const CbPackage& BatchRequest); CbPackage HandleRpcGetCacheRecords(const CacheRequestContext& Context, CbObjectView BatchRequest); @@ -107,7 +105,7 @@ private: CbPackage HandleRpcGetCacheValues(const CacheRequestContext& Context, CbObjectView BatchRequest); CbPackage HandleRpcGetCacheChunks(const CacheRequestContext& Context, RpcAcceptOptions AcceptOptions, CbObjectView BatchRequest); - PutResult PutCacheRecord(PutRequestData& Request, const CbPackage* Package); + PutStatus PutCacheRecord(PutRequestData& Request, const CbPackage* Package); /** HandleRpcGetCacheChunks Helper: Parse the Body object into RecordValue Requests and Value Requests. */ bool ParseGetCacheChunksRequest(std::string& Namespace, @@ -148,7 +146,7 @@ private: CacheStats& m_CacheStats; UpstreamCacheClient& m_UpstreamCache; ZenCacheStore& m_CacheStore; - CidStore& m_CidStore; + GetCidStoreFunc m_GetCidStore; const DiskWriteBlocker* m_DiskWriteBlocker = nullptr; bool AreDiskWritesAllowed() const; diff --git a/src/zenstore/include/zenstore/cache/cacheshared.h b/src/zenstore/include/zenstore/cache/cacheshared.h index ef1b803de..8f40ae727 100644 --- a/src/zenstore/include/zenstore/cache/cacheshared.h +++ b/src/zenstore/include/zenstore/cache/cacheshared.h @@ -69,6 +69,14 @@ struct CacheContentStats std::vector<IoHash> Attachments; }; +enum class PutStatus +{ + Success, + Fail, + Conflict, + Invalid, +}; + bool IsKnownBadBucketName(std::string_view BucketName); bool ValidateIoBuffer(ZenContentType ContentType, IoBuffer Buffer); diff --git a/src/zenstore/include/zenstore/cache/structuredcachestore.h b/src/zenstore/include/zenstore/cache/structuredcachestore.h index 48fc17960..c51d7312c 100644 --- a/src/zenstore/include/zenstore/cache/structuredcachestore.h +++ b/src/zenstore/include/zenstore/cache/structuredcachestore.h @@ -49,7 +49,7 @@ class JobQueue; */ -class ZenCacheNamespace final : public GcStorage +class ZenCacheNamespace final { public: struct Configuration @@ -78,24 +78,27 @@ public: ZenCacheDiskLayer::DiskStats DiskStats; }; + using PutResult = ZenCacheDiskLayer::PutResult; + ZenCacheNamespace(GcManager& Gc, JobQueue& JobQueue, const std::filesystem::path& RootDir, const Configuration& Config); ~ZenCacheNamespace(); struct PutBatchHandle; - PutBatchHandle* BeginPutBatch(std::vector<bool>& OutResults); + PutBatchHandle* BeginPutBatch(std::vector<PutResult>& OutResults); void EndPutBatch(PutBatchHandle* Batch) noexcept; struct GetBatchHandle; GetBatchHandle* BeginGetBatch(ZenCacheValueVec_t& OutResults); void EndGetBatch(GetBatchHandle* Batch) noexcept; - bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue); - void Get(std::string_view Bucket, const IoHash& HashKey, GetBatchHandle& OptionalBatchHandle); - void Put(std::string_view Bucket, - const IoHash& HashKey, - const ZenCacheValue& Value, - std::span<IoHash> References, - PutBatchHandle* OptionalBatchHandle = nullptr); + bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue); + void Get(std::string_view Bucket, const IoHash& HashKey, GetBatchHandle& OptionalBatchHandle); + PutResult Put(std::string_view Bucket, + const IoHash& HashKey, + const ZenCacheValue& Value, + std::span<IoHash> References, + bool Overwrite, + PutBatchHandle* OptionalBatchHandle = nullptr); bool DropBucket(std::string_view Bucket); void EnumerateBucketContents(std::string_view Bucket, @@ -104,10 +107,7 @@ public: std::function<void()> Drop(); void Flush(); - // GcStorage - virtual void ScrubStorage(ScrubContext& ScrubCtx) override; - virtual GcStorageSize StorageSize() const override; - + CacheStoreSize TotalSize() const; Configuration GetConfig() const { return m_Configuration; } Info GetInfo() const; std::optional<BucketInfo> GetBucketInfo(std::string_view Bucket) const; @@ -124,6 +124,7 @@ public: #if ZEN_WITH_TESTS void SetAccessTime(std::string_view Bucket, const IoHash& HashKey, GcClock::TimePoint Time); + void Scrub(ScrubContext& ScrubCtx); #endif // ZEN_WITH_TESTS private: @@ -137,7 +138,6 @@ private: std::atomic<uint64_t> m_WriteCount{}; metrics::RequestStats m_PutOps; metrics::RequestStats m_GetOps; - uint64_t m_LastScrubTime = 0; ZenCacheNamespace(const ZenCacheNamespace&) = delete; ZenCacheNamespace& operator=(const ZenCacheNamespace&) = delete; @@ -175,7 +175,7 @@ public: Configuration Config; std::vector<std::string> NamespaceNames; uint64_t DiskEntryCount = 0; - GcStorageSize StorageSize; + CacheStoreSize StorageSize; }; struct NamedNamespaceStats @@ -196,6 +196,8 @@ public: std::vector<NamedNamespaceStats> NamespaceStats; }; + using PutResult = ZenCacheNamespace::PutResult; + ZenCacheStore(GcManager& Gc, JobQueue& JobQueue, const std::filesystem::path& BasePath, @@ -206,7 +208,7 @@ public: class PutBatch { public: - PutBatch(ZenCacheStore& CacheStore, std::string_view Namespace, std::vector<bool>& OutResult); + PutBatch(ZenCacheStore& CacheStore, std::string_view Namespace, std::vector<PutResult>& OutResult); ~PutBatch(); private: @@ -243,24 +245,24 @@ public: const IoHash& HashKey, GetBatch& BatchHandle); - void Put(const CacheRequestContext& Context, - std::string_view Namespace, - std::string_view Bucket, - const IoHash& HashKey, - const ZenCacheValue& Value, - std::span<IoHash> References, - PutBatch* OptionalBatchHandle = nullptr); + PutResult Put(const CacheRequestContext& Context, + std::string_view Namespace, + std::string_view Bucket, + const IoHash& HashKey, + const ZenCacheValue& Value, + std::span<IoHash> References, + bool Overwrite, + PutBatch* OptionalBatchHandle = nullptr); bool DropBucket(std::string_view Namespace, std::string_view Bucket); bool DropNamespace(std::string_view Namespace); void Flush(); - void ScrubStorage(ScrubContext& Ctx); CacheValueDetails GetValueDetails(const std::string_view NamespaceFilter, const std::string_view BucketFilter, const std::string_view ValueFilter) const; - GcStorageSize StorageSize() const; + CacheStoreSize TotalSize() const; CacheStoreStats Stats(bool IncludeNamespaceStats = true); Configuration GetConfiguration() const { return m_Configuration; } @@ -290,6 +292,10 @@ public: bool GetContentStats(std::string_view Namespace, std::string_view BucketName, CacheContentStats& OutContentStats) const; +#if ZEN_WITH_TESTS + void Scrub(ScrubContext& Ctx); +#endif // ZEN_WITH_TESTS + private: const ZenCacheNamespace* FindNamespace(std::string_view Namespace) const; ZenCacheNamespace* GetNamespace(std::string_view Namespace); diff --git a/src/zenstore/include/zenstore/cache/upstreamcacheclient.h b/src/zenstore/include/zenstore/cache/upstreamcacheclient.h index 152031c3a..c3993c028 100644 --- a/src/zenstore/include/zenstore/cache/upstreamcacheclient.h +++ b/src/zenstore/include/zenstore/cache/upstreamcacheclient.h @@ -113,7 +113,7 @@ public: std::span<CacheChunkRequest*> CacheChunkRequests, OnCacheChunksGetComplete&& OnComplete) = 0; - virtual void EnqueueUpstream(UpstreamCacheRecord CacheRecord) = 0; + virtual void EnqueueUpstream(UpstreamCacheRecord CacheRecord, std::function<IoBuffer(const IoHash&)>&& GetValueFunc) = 0; }; } // namespace zen diff --git a/src/zenstore/include/zenstore/cidstore.h b/src/zenstore/include/zenstore/cidstore.h index b3d00fec0..8918b119f 100644 --- a/src/zenstore/include/zenstore/cidstore.h +++ b/src/zenstore/include/zenstore/cidstore.h @@ -87,12 +87,15 @@ public: bool ContainsChunk(const IoHash& DecompressedId); void FilterChunks(HashKeySet& InOutChunks); void Flush(); - void ScrubStorage(ScrubContext& Ctx); CidStoreSize TotalSize() const; CidStoreStats Stats() const; virtual void ReportMetrics(StatsMetrics& Statsd) override; +#if ZEN_WITH_TESTS + void Scrub(ScrubContext& Ctx); +#endif // ZEN_WITH_TESTS + private: struct Impl; std::unique_ptr<CasStore> m_CasStore; |