aboutsummaryrefslogtreecommitdiff
path: root/src/zenstore
diff options
context:
space:
mode:
authorLiam Mitchell <[email protected]>2025-08-21 23:58:51 +0000
committerLiam Mitchell <[email protected]>2025-08-21 23:58:51 +0000
commit33209bd6931f49362dfc2d62c6cb6b87a42c99e1 (patch)
treecfc7914634088b3f4feac2d4cec0b5650dfdcc3c /src/zenstore
parentFix changelog merge issues (diff)
parentavoid new in static IoBuffer (#472) (diff)
downloadzen-33209bd6931f49362dfc2d62c6cb6b87a42c99e1.tar.xz
zen-33209bd6931f49362dfc2d62c6cb6b87a42c99e1.zip
Merge remote-tracking branch 'origin/main' into de/zen-service-command
Diffstat (limited to 'src/zenstore')
-rw-r--r--src/zenstore/buildstore/buildstore.cpp782
-rw-r--r--src/zenstore/cache/cachedisklayer.cpp320
-rw-r--r--src/zenstore/cache/cacherpc.cpp288
-rw-r--r--src/zenstore/cache/structuredcachestore.cpp134
-rw-r--r--src/zenstore/cas.cpp20
-rw-r--r--src/zenstore/cas.h5
-rw-r--r--src/zenstore/cidstore.cpp20
-rw-r--r--src/zenstore/compactcas.h2
-rw-r--r--src/zenstore/include/zenstore/buildstore/buildstore.h87
-rw-r--r--src/zenstore/include/zenstore/cache/cachedisklayer.h99
-rw-r--r--src/zenstore/include/zenstore/cache/cacherpc.h18
-rw-r--r--src/zenstore/include/zenstore/cache/cacheshared.h8
-rw-r--r--src/zenstore/include/zenstore/cache/structuredcachestore.h56
-rw-r--r--src/zenstore/include/zenstore/cache/upstreamcacheclient.h2
-rw-r--r--src/zenstore/include/zenstore/cidstore.h5
15 files changed, 966 insertions, 880 deletions
diff --git a/src/zenstore/buildstore/buildstore.cpp b/src/zenstore/buildstore/buildstore.cpp
index 20dc55bca..1b2cf036b 100644
--- a/src/zenstore/buildstore/buildstore.cpp
+++ b/src/zenstore/buildstore/buildstore.cpp
@@ -3,6 +3,7 @@
#include <zenstore/buildstore/buildstore.h>
#include <zencore/compactbinarybuilder.h>
+#include <zencore/compress.h>
#include <zencore/fmtutils.h>
#include <zencore/logging.h>
#include <zencore/memory/llm.h>
@@ -20,7 +21,6 @@ ZEN_THIRD_PARTY_INCLUDES_END
#if ZEN_WITH_TESTS
# include <zencore/compactbinarybuilder.h>
-# include <zencore/compress.h>
# include <zencore/testing.h>
# include <zencore/testutils.h>
# include <zenutil/workerpools.h>
@@ -45,7 +45,7 @@ namespace blobstore::impl {
const char* LogExtension = ".slog";
const char* AccessTimeExtension = ".zacs";
- const uint32_t ManifestVersion = (1 << 16) | (0 << 8) | (0);
+ const uint32_t ManifestVersion = (2 << 16) | (0 << 8) | (0);
std::filesystem::path GetManifestPath(const std::filesystem::path& RootDirectory)
{
@@ -106,13 +106,11 @@ namespace blobstore::impl {
} // namespace blobstore::impl
-BuildStore::BuildStore(const BuildStoreConfig& Config, GcManager& Gc)
+BuildStore::BuildStore(const BuildStoreConfig& Config, GcManager& Gc, CidStore& BlobStore)
: m_Log(logging::Get("builds"))
, m_Config(Config)
, m_Gc(Gc)
-, m_LargeBlobStore(m_Gc)
-, m_SmallBlobStore(Gc)
-, m_MetadataBlockStore()
+, m_BlobStore(BlobStore)
{
ZEN_TRACE_CPU("BuildStore::BuildStore");
ZEN_MEMSCOPE(GetBuildstoreTag());
@@ -170,75 +168,16 @@ BuildStore::BuildStore(const BuildStoreConfig& Config, GcManager& Gc)
ManifestWriter.AddDateTime("createdAt", DateTime::Now());
TemporaryFile::SafeWriteFile(ManifestPath, ManifestWriter.Save().GetBuffer().AsIoBuffer());
}
- m_LargeBlobStore.Initialize(Config.RootDirectory / "file_cas", IsNew);
- m_SmallBlobStore.Initialize(Config.RootDirectory,
- "blob_cas",
- m_Config.SmallBlobBlockStoreMaxBlockSize,
- m_Config.SmallBlobBlockStoreAlignement,
- IsNew);
- m_MetadataBlockStore.Initialize(Config.RootDirectory / "metadata", m_Config.MetadataBlockStoreMaxBlockSize, 1u << 20);
-
- BlockStore::BlockIndexSet KnownBlocks;
- for (const BlobEntry& Blob : m_BlobEntries)
- {
- if (const MetadataIndex MetaIndex = Blob.Metadata; MetaIndex)
- {
- const MetadataEntry& Metadata = m_MetadataEntries[MetaIndex];
- KnownBlocks.insert(Metadata.Location.BlockIndex);
- }
- }
- BlockStore::BlockIndexSet MissingBlocks = m_MetadataBlockStore.SyncExistingBlocksOnDisk(KnownBlocks);
m_PayloadlogFile.Open(BlobLogPath, CasLogFile::Mode::kWrite);
m_MetadatalogFile.Open(MetaLogPath, CasLogFile::Mode::kWrite);
- if (!MissingBlocks.empty())
- {
- std::vector<MetadataDiskEntry> MissingMetadatas;
- for (auto& It : m_BlobLookup)
- {
- const IoHash& BlobHash = It.first;
- const BlobIndex ReadBlobIndex = It.second;
- const BlobEntry& ReadBlobEntry = m_BlobEntries[ReadBlobIndex];
- if (ReadBlobEntry.Metadata)
- {
- const MetadataEntry& MetaData = m_MetadataEntries[ReadBlobEntry.Metadata];
- if (MissingBlocks.contains(MetaData.Location.BlockIndex))
- {
- MissingMetadatas.push_back(
- MetadataDiskEntry{.Entry = m_MetadataEntries[ReadBlobEntry.Metadata], .BlobHash = BlobHash});
- MissingMetadatas.back().Entry.Flags |= MetadataEntry::kTombStone;
- m_MetadataEntries[ReadBlobEntry.Metadata] = {};
- m_BlobEntries[ReadBlobIndex].Metadata = {};
- }
- }
- }
- ZEN_ASSERT(!MissingMetadatas.empty());
-
- for (const MetadataDiskEntry& Entry : MissingMetadatas)
- {
- auto It = m_BlobLookup.find(Entry.BlobHash);
- ZEN_ASSERT(It != m_BlobLookup.end());
-
- const BlobIndex ReadBlobIndex = It->second;
- const BlobEntry& ReadBlobEntry = m_BlobEntries[ReadBlobIndex];
- if (!ReadBlobEntry.Payload)
- {
- m_BlobLookup.erase(It);
- }
- }
- m_MetadatalogFile.Append(MissingMetadatas);
- CompactState();
- }
-
m_Gc.AddGcReferencer(*this);
m_Gc.AddGcReferenceLocker(*this);
- m_Gc.AddGcStorage(this);
}
catch (const std::exception& Ex)
{
ZEN_ERROR("Failed to initialize build store. Reason: '{}'", Ex.what());
- m_Gc.RemoveGcStorage(this);
m_Gc.RemoveGcReferenceLocker(*this);
m_Gc.RemoveGcReferencer(*this);
}
@@ -249,7 +188,6 @@ BuildStore::~BuildStore()
try
{
ZEN_TRACE_CPU("BuildStore::~BuildStore");
- m_Gc.RemoveGcStorage(this);
m_Gc.RemoveGcReferenceLocker(*this);
m_Gc.RemoveGcReferencer(*this);
Flush();
@@ -280,21 +218,12 @@ BuildStore::PutBlob(const IoHash& BlobHash, const IoBuffer& Payload)
}
}
- uint64_t PayloadSize = Payload.GetSize();
- PayloadEntry Entry;
- if (Payload.GetSize() > m_Config.SmallBlobBlockStoreMaxBlockEmbedSize)
- {
- CasStore::InsertResult Result = m_LargeBlobStore.InsertChunk(Payload, BlobHash);
- ZEN_UNUSED(Result);
- Entry = PayloadEntry(PayloadEntry::kStandalone, PayloadSize);
- }
- else
- {
- CasStore::InsertResult Result = m_SmallBlobStore.InsertChunk(Payload, BlobHash);
- ZEN_UNUSED(Result);
- Entry = PayloadEntry(0, PayloadSize);
- }
+ uint64_t PayloadSize = Payload.GetSize();
+ CidStore::InsertResult Result = m_BlobStore.AddChunk(Payload, BlobHash);
+ PayloadEntry Entry = PayloadEntry(0, PayloadSize);
+ ;
+ IoHash MetadataHash;
{
RwLock::ExclusiveLockScope _(m_Lock);
if (auto It = m_BlobLookup.find(BlobHash); It != m_BlobLookup.end())
@@ -310,6 +239,10 @@ BuildStore::PutBlob(const IoHash& BlobHash, const IoBuffer& Payload)
Blob.Payload = PayloadIndex(gsl::narrow<uint32_t>(m_PayloadEntries.size()));
m_PayloadEntries.push_back(Entry);
}
+ if (Blob.Metadata)
+ {
+ MetadataHash = m_MetadataEntries[Blob.Metadata].MetadataHash;
+ }
Blob.LastAccessTime = GcClock::TickCount();
}
else
@@ -322,6 +255,16 @@ BuildStore::PutBlob(const IoHash& BlobHash, const IoBuffer& Payload)
m_BlobEntries.push_back(BlobEntry{.Payload = NewPayloadIndex, .LastAccessTime = AccessTime(GcClock::TickCount())});
m_BlobLookup.insert({BlobHash, NewBlobIndex});
}
+
+ m_LastAccessTimeUpdateCount++;
+ if (m_TrackedBlobKeys)
+ {
+ m_TrackedBlobKeys->push_back(BlobHash);
+ if (MetadataHash != IoHash::Zero)
+ {
+ m_TrackedBlobKeys->push_back(BlobHash);
+ }
+ }
}
m_PayloadlogFile.Append(PayloadDiskEntry{.Entry = Entry, .BlobHash = BlobHash});
m_LastAccessTimeUpdateCount++;
@@ -340,21 +283,9 @@ BuildStore::GetBlob(const IoHash& BlobHash)
Blob.LastAccessTime = GcClock::TickCount();
if (Blob.Payload)
{
- const PayloadEntry& Entry = m_PayloadEntries[Blob.Payload];
- const bool IsStandalone = (Entry.GetFlags() & PayloadEntry::kStandalone) != 0;
Lock.ReleaseNow();
+ IoBuffer Chunk = m_BlobStore.FindChunkByCid(BlobHash);
- IoBuffer Chunk;
- if (IsStandalone)
- {
- ZEN_TRACE_CPU("GetLarge");
- Chunk = m_LargeBlobStore.FindChunk(BlobHash);
- }
- else
- {
- ZEN_TRACE_CPU("GetSmall");
- Chunk = m_SmallBlobStore.FindChunk(BlobHash);
- }
if (Chunk)
{
Chunk.SetContentType(ZenContentType::kCompressedBinary);
@@ -362,7 +293,7 @@ BuildStore::GetBlob(const IoHash& BlobHash)
}
else
{
- ZEN_WARN("Inconsistencies in build store, {} is in index but not {}", BlobHash, IsStandalone ? "on disk" : "in block");
+ ZEN_WARN("Inconsistencies in build store, {} is in index but not in blob store", BlobHash);
}
}
}
@@ -381,10 +312,10 @@ BuildStore::BlobsExists(std::span<const IoHash> BlobHashes)
{
if (auto It = m_BlobLookup.find(BlobHash); It != m_BlobLookup.end())
{
- const BlobIndex ExistingBlobIndex = It->second;
- BlobEntry& Blob = m_BlobEntries[ExistingBlobIndex];
- bool HasPayload = !!Blob.Payload;
- bool HasMetadata = !!Blob.Metadata;
+ const BlobIndex ExistingBlobIndex = It->second;
+ const BlobEntry& Blob = m_BlobEntries[ExistingBlobIndex];
+ bool HasPayload = !!Blob.Payload;
+ bool HasMetadata = !!Blob.Metadata;
Result.push_back(BlobExistsResult{.HasBody = HasPayload, .HasMetadata = HasMetadata});
}
else
@@ -396,20 +327,82 @@ BuildStore::BlobsExists(std::span<const IoHash> BlobHashes)
}
void
-BuildStore::PutMetadatas(std::span<const IoHash> BlobHashes, std::span<const IoBuffer> MetaDatas)
+BuildStore::PutMetadatas(std::span<const IoHash> BlobHashes, std::span<const IoBuffer> Metadatas, WorkerThreadPool* OptionalWorkerPool)
{
ZEN_TRACE_CPU("BuildStore::PutMetadatas");
ZEN_MEMSCOPE(GetBuildstoreTag());
- size_t WriteBlobIndex = 0;
- m_MetadataBlockStore.WriteChunks(MetaDatas, m_Config.MetadataBlockStoreAlignement, [&](std::span<BlockStoreLocation> Locations) {
+ std::vector<IoHash> MetadataHashes;
+ std::vector<IoBuffer> CompressedMetadataBuffers;
+
+ auto CompressOne = [&BlobHashes, &MetadataHashes, &CompressedMetadataBuffers](const IoBuffer& Buffer, size_t Index) {
+ if (Buffer.GetContentType() == ZenContentType::kCompressedBinary)
+ {
+ IoHash RawHash;
+ uint64_t RawSize;
+ if (!CompressedBuffer::FromCompressed(SharedBuffer(Buffer), RawHash, RawSize))
+ {
+ throw std::runtime_error(
+ fmt::format("Invalid compressed buffer provided when storing metadata for blob {}", BlobHashes[Index]));
+ }
+ else
+ {
+ CompressedMetadataBuffers[Index] = Buffer;
+ MetadataHashes[Index] = RawHash;
+ }
+ }
+ else
+ {
+ CompressedBuffer Compressed =
+ CompressedBuffer::Compress(SharedBuffer(Buffer), OodleCompressor::Mermaid, OodleCompressionLevel::None);
+ MetadataHashes[Index] = Compressed.DecodeRawHash();
+ CompressedMetadataBuffers[Index] = std::move(Compressed.GetCompressed()).Flatten().AsIoBuffer();
+ CompressedMetadataBuffers[Index].SetContentType(ZenContentType::kCompressedBinary);
+ }
+ };
+
+ MetadataHashes.resize(Metadatas.size());
+ CompressedMetadataBuffers.resize(Metadatas.size());
+ if (OptionalWorkerPool)
+ {
+ std::atomic<bool> AbortFlag;
+ std::atomic<bool> PauseFlag;
+ ParallelWork Work(AbortFlag, PauseFlag);
+ for (size_t Index = 0; Index < Metadatas.size(); Index++)
+ {
+ Work.ScheduleWork(
+ *OptionalWorkerPool,
+ [Index, &BlobHashes, &Metadatas, &MetadataHashes, &CompressedMetadataBuffers, &CompressOne](std::atomic<bool>&) {
+ const IoBuffer& Buffer = Metadatas[Index];
+ CompressOne(Buffer, Index);
+ },
+ {});
+ }
+ Work.Wait();
+ }
+ else
+ {
+ for (size_t Index = 0; Index < Metadatas.size(); Index++)
+ {
+ const IoBuffer& Buffer = Metadatas[Index];
+ CompressOne(Buffer, Index);
+ }
+ }
+
+ std::vector<MetadataDiskEntry> AddedMetadataEntries;
+ AddedMetadataEntries.reserve(MetadataHashes.size());
+
+ std::vector<CidStore::InsertResult> InsertResults = m_BlobStore.AddChunks(CompressedMetadataBuffers, MetadataHashes);
+ ZEN_UNUSED(InsertResults);
+ {
RwLock::ExclusiveLockScope _(m_Lock);
- for (size_t LocationIndex = 0; LocationIndex < Locations.size(); LocationIndex++)
+ for (size_t Index = 0; Index < BlobHashes.size(); Index++)
{
- const IoBuffer& Data = MetaDatas[WriteBlobIndex];
- const IoHash& BlobHash = BlobHashes[WriteBlobIndex];
- const BlockStoreLocation& Location = Locations[LocationIndex];
+ const ZenContentType ContentType = Metadatas[Index].GetContentType();
+ const IoHash& BlobHash = BlobHashes[Index];
+ const IoHash& MetadataHash = MetadataHashes[Index];
+ const IoBuffer& Metadata = CompressedMetadataBuffers[Index];
- MetadataEntry Entry = {.Location = Location, .ContentType = Data.GetContentType(), .Flags = 0};
+ MetadataEntry Entry(MetadataHash, Metadata.GetSize(), ContentType, 0);
if (auto It = m_BlobLookup.find(BlobHash); It != m_BlobLookup.end())
{
@@ -435,17 +428,16 @@ BuildStore::PutMetadatas(std::span<const IoHash> BlobHashes, std::span<const IoB
m_BlobEntries.push_back(BlobEntry{.Metadata = NewMetadataIndex, .LastAccessTime = AccessTime(GcClock::TickCount())});
m_BlobLookup.insert({BlobHash, NewBlobIndex});
}
-
- m_MetadatalogFile.Append(MetadataDiskEntry{.Entry = Entry, .BlobHash = BlobHash});
-
m_LastAccessTimeUpdateCount++;
- WriteBlobIndex++;
- if (m_TrackedCacheKeys)
+ if (m_TrackedBlobKeys)
{
- m_TrackedCacheKeys->insert(BlobHash);
+ m_TrackedBlobKeys->push_back(BlobHash);
+ m_TrackedBlobKeys->push_back(MetadataHash);
}
+ AddedMetadataEntries.push_back(MetadataDiskEntry{.Entry = Entry, .BlobHash = BlobHash});
}
- });
+ }
+ m_MetadatalogFile.Append(AddedMetadataEntries);
}
std::vector<IoBuffer>
@@ -453,9 +445,9 @@ BuildStore::GetMetadatas(std::span<const IoHash> BlobHashes, WorkerThreadPool* O
{
ZEN_TRACE_CPU("BuildStore::GetMetadatas");
ZEN_MEMSCOPE(GetBuildstoreTag());
- std::vector<BlockStoreLocation> MetaLocations;
- std::vector<size_t> MetaLocationResultIndexes;
- MetaLocations.reserve(BlobHashes.size());
+ std::vector<IoHash> MetadataHashes;
+ std::vector<size_t> MetaLocationResultIndexes;
+ MetadataHashes.reserve(BlobHashes.size());
MetaLocationResultIndexes.reserve(BlobHashes.size());
tsl::robin_set<uint32_t> ReferencedBlocks;
@@ -475,10 +467,9 @@ BuildStore::GetMetadatas(std::span<const IoHash> BlobHashes, WorkerThreadPool* O
if (ExistingBlobEntry.Metadata)
{
const MetadataEntry& ExistingMetadataEntry = m_MetadataEntries[ExistingBlobEntry.Metadata];
- MetaLocations.push_back(ExistingMetadataEntry.Location);
+ MetadataHashes.push_back(ExistingMetadataEntry.MetadataHash);
MetaLocationResultIndexes.push_back(Index);
- ReferencedBlocks.insert(ExistingMetadataEntry.Location.BlockIndex);
- ResultContentTypes[Index] = ExistingMetadataEntry.ContentType;
+ ResultContentTypes[Index] = ExistingMetadataEntry.GetContentType();
}
ExistingBlobEntry.LastAccessTime = AccessTime(GcClock::TickCount());
m_LastAccessTimeUpdateCount++;
@@ -486,100 +477,35 @@ BuildStore::GetMetadatas(std::span<const IoHash> BlobHashes, WorkerThreadPool* O
}
}
- auto DoOneBlock = [this](std::span<const BlockStoreLocation> MetaLocations,
- std::span<const size_t> MetaLocationResultIndexes,
- std::span<const size_t> ChunkIndexes,
- std::vector<IoBuffer>& Result) {
- if (ChunkIndexes.size() < 4)
- {
- for (size_t ChunkIndex : ChunkIndexes)
+ m_BlobStore.IterateChunks(
+ MetadataHashes,
+ [this, &BlobHashes, &MetadataHashes, &MetaLocationResultIndexes, &Result](size_t Index, const IoBuffer& Payload) {
+ if (Payload)
{
- IoBuffer Chunk = m_MetadataBlockStore.TryGetChunk(MetaLocations[ChunkIndex]);
- if (Chunk)
- {
- size_t ResultIndex = MetaLocationResultIndexes[ChunkIndex];
- Result[ResultIndex] = std::move(Chunk);
- }
- }
- return true;
- }
- return m_MetadataBlockStore.IterateBlock(
- MetaLocations,
- ChunkIndexes,
- [&MetaLocationResultIndexes, &Result](size_t ChunkIndex, const void* Data, uint64_t Size) {
- if (Data != nullptr)
+ size_t ResultIndex = MetaLocationResultIndexes[Index];
+ IoHash RawHash;
+ uint64_t RawSize;
+ CompressedBuffer CompressedBuffer = CompressedBuffer::FromCompressed(SharedBuffer(Payload), RawHash, RawSize);
+ if (CompressedBuffer)
{
- size_t ResultIndex = MetaLocationResultIndexes[ChunkIndex];
- Result[ResultIndex] = IoBuffer(IoBuffer::Clone, Data, Size);
- }
- return true;
- },
- [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) {
- size_t ResultIndex = MetaLocationResultIndexes[ChunkIndex];
- Result[ResultIndex] = File.GetChunk(Offset, Size);
- return true;
- },
- 8u * 1024u);
- };
-
- if (!MetaLocations.empty())
- {
- std::atomic<bool> AbortFlag;
- std::atomic<bool> PauseFlag;
- ParallelWork Work(AbortFlag, PauseFlag);
-
- try
- {
- m_MetadataBlockStore.IterateChunks(
- MetaLocations,
- [this, OptionalWorkerPool, &Work, &Result, &MetaLocations, &MetaLocationResultIndexes, &ReferencedBlocks, DoOneBlock](
- uint32_t BlockIndex,
- std::span<const size_t> ChunkIndexes) -> bool {
- ZEN_UNUSED(BlockIndex);
- if (ChunkIndexes.size() == MetaLocations.size() || OptionalWorkerPool == nullptr || ReferencedBlocks.size() == 1)
+ IoBuffer Decompressed = CompressedBuffer.DecompressToComposite().Flatten().AsIoBuffer();
+ if (Decompressed)
{
- return DoOneBlock(MetaLocations, MetaLocationResultIndexes, ChunkIndexes, Result);
+ Result[ResultIndex] = std::move(Decompressed);
}
else
{
- ZEN_ASSERT(OptionalWorkerPool != nullptr);
- std::vector<size_t> TmpChunkIndexes(ChunkIndexes.begin(), ChunkIndexes.end());
- Work.ScheduleWork(
- *OptionalWorkerPool,
- [this,
- &Result,
- &MetaLocations,
- &MetaLocationResultIndexes,
- DoOneBlock,
- ChunkIndexes = std::move(TmpChunkIndexes)](std::atomic<bool>& AbortFlag) {
- if (AbortFlag)
- {
- return;
- }
- try
- {
- if (!DoOneBlock(MetaLocations, MetaLocationResultIndexes, ChunkIndexes, Result))
- {
- AbortFlag.store(true);
- }
- }
- catch (const std::exception& Ex)
- {
- ZEN_WARN("Failed getting metadata for {} chunks. Reason: {}", ChunkIndexes.size(), Ex.what());
- }
- });
- return !Work.IsAborted();
+ ZEN_WARN("Metadata {} for blob {} is malformed (not a compressed binary format)",
+ MetadataHashes[ResultIndex],
+ BlobHashes[ResultIndex]);
}
- });
- }
- catch (const std::exception& Ex)
- {
- AbortFlag.store(true);
- ZEN_WARN("Failed iterating block metadata chunks in {}. Reason: '{}'", m_Config.RootDirectory, Ex.what());
- }
+ }
+ }
+ return true;
+ },
+ OptionalWorkerPool,
+ 8u * 1024u);
- Work.Wait();
- }
for (size_t Index = 0; Index < Result.size(); Index++)
{
if (Result[Index])
@@ -600,9 +526,7 @@ BuildStore::Flush()
const auto _ = MakeGuard(
[&] { ZEN_INFO("Flushed build store at {} in {}", m_Config.RootDirectory, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); });
- m_LargeBlobStore.Flush();
- m_SmallBlobStore.Flush();
- m_MetadataBlockStore.Flush(false);
+ m_BlobStore.Flush();
m_PayloadlogFile.Flush();
m_MetadatalogFile.Flush();
@@ -636,22 +560,14 @@ BuildStore::GetStorageStats() const
{
const PayloadEntry& Payload = m_PayloadEntries[ReadBlobEntry.Payload];
uint64_t Size = Payload.GetSize();
- if ((Payload.GetFlags() & PayloadEntry::kStandalone) != 0)
- {
- Result.LargeBlobCount++;
- Result.LargeBlobBytes += Size;
- }
- else
- {
- Result.SmallBlobCount++;
- Result.SmallBlobBytes += Size;
- }
+ Result.BlobCount++;
+ Result.BlobBytes += Size;
}
if (ReadBlobEntry.Metadata)
{
const MetadataEntry& Metadata = m_MetadataEntries[ReadBlobEntry.Metadata];
Result.MetadataCount++;
- Result.MetadataByteCount += Metadata.Location.Size;
+ Result.MetadataByteCount += Metadata.GetSize();
}
}
}
@@ -882,10 +798,9 @@ BuildStore::ReadMetadataLog(const RwLock::ExclusiveLockScope&, const std::filesy
CasLog.Replay(
[&](const MetadataDiskEntry& Record) {
std::string InvalidEntryReason;
- if (Record.Entry.Flags & MetadataEntry::kTombStone)
+ if (Record.Entry.GetFlags() & MetadataEntry::kTombStone)
{
// Note: this leaves m_BlobLookup and other arrays with 'holes' in them, this will get clean up in compact gc operation
- // Note: this leaves m_BlobLookup and other arrays with 'holes' in them, this will get clean up in compact gc operation
if (auto ExistingIt = m_BlobLookup.find(Record.BlobHash); ExistingIt != m_BlobLookup.end())
{
if (!m_BlobEntries[ExistingIt->second].Payload)
@@ -1058,7 +973,7 @@ BuildStore::ValidatePayloadDiskEntry(const PayloadDiskEntry& Entry, std::string&
OutReason = fmt::format("Invalid blob hash {}", Entry.BlobHash.ToHexString());
return false;
}
- if (Entry.Entry.GetFlags() & ~(PayloadEntry::kTombStone | PayloadEntry::kStandalone))
+ if (Entry.Entry.GetFlags() & ~(PayloadEntry::kTombStone))
{
OutReason = fmt::format("Invalid flags {} for entry {}", Entry.Entry.GetFlags(), Entry.BlobHash.ToHexString());
return false;
@@ -1083,30 +998,20 @@ BuildStore::ValidateMetadataDiskEntry(const MetadataDiskEntry& Entry, std::strin
OutReason = fmt::format("Invalid blob hash {} for meta entry", Entry.BlobHash.ToHexString());
return false;
}
- if (Entry.Entry.Location.Size == 0)
+ if (Entry.Entry.GetSize() == 0)
{
- OutReason = fmt::format("Invalid meta blob size {} for meta entry", Entry.Entry.Location.Size);
+ OutReason = fmt::format("Invalid meta blob size {} for meta entry", Entry.Entry.GetSize());
return false;
}
- if (Entry.Entry.Reserved1 != 0 || Entry.Entry.Reserved2 != 0)
- {
- OutReason = fmt::format("Invalid reserved fields for meta entry {}", Entry.BlobHash.ToHexString());
- return false;
- }
- if (Entry.Entry.Flags & MetadataEntry::kTombStone)
+ if (Entry.Entry.GetFlags() & MetadataEntry::kTombStone)
{
return true;
}
- if (Entry.Entry.ContentType == ZenContentType::kCOUNT)
+ if (Entry.Entry.GetContentType() == ZenContentType::kCOUNT)
{
OutReason = fmt::format("Invalid content type for meta entry {}", Entry.BlobHash.ToHexString());
return false;
}
- if (Entry.Reserved1 != 0 || Entry.Reserved2 != 0 || Entry.Reserved3 != 0 || Entry.Reserved4 != 0)
- {
- OutReason = fmt::format("Invalid reserved fields for meta entry {}", Entry.BlobHash.ToHexString());
- return false;
- }
return true;
}
@@ -1114,31 +1019,76 @@ class BuildStoreGcReferenceChecker : public GcReferenceChecker
{
public:
BuildStoreGcReferenceChecker(BuildStore& Store) : m_Store(Store) {}
+ ~BuildStoreGcReferenceChecker()
+ {
+ try
+ {
+ m_Store.m_Lock.WithExclusiveLock([&]() { m_Store.m_TrackedBlobKeys.reset(); });
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("~BuildStoreGcReferenceChecker threw exception: '{}'", Ex.what());
+ }
+ }
virtual std::string GetGcName(GcCtx& Ctx) override
{
ZEN_UNUSED(Ctx);
return fmt::format("buildstore: '{}'", m_Store.m_Config.RootDirectory.string());
}
- virtual void PreCache(GcCtx& Ctx) override { ZEN_UNUSED(Ctx); }
-
- virtual void UpdateLockedState(GcCtx& Ctx) override
+ virtual void PreCache(GcCtx& Ctx) override
{
- ZEN_TRACE_CPU("Builds::UpdateLockedState");
- ZEN_MEMSCOPE(GetBuildstoreTag());
+ ZEN_TRACE_CPU("Builds::PreCache");
auto Log = [&Ctx]() { return Ctx.Logger; };
- m_References.reserve(m_Store.m_BlobLookup.size());
- for (const auto& It : m_Store.m_BlobLookup)
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
+ if (!Ctx.Settings.Verbose)
+ {
+ return;
+ }
+ ZEN_INFO("GCV2: builds [PRECACHE] '{}': found {} references in {}",
+ m_Store.m_Config.RootDirectory,
+ m_PrecachedReferences.size(),
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+
+ m_Store.m_Lock.WithExclusiveLock([&]() { m_Store.m_TrackedBlobKeys = std::make_unique<std::vector<IoHash>>(); });
+
{
- const BuildStore::BlobIndex ExistingBlobIndex = It.second;
- if (m_Store.m_BlobEntries[ExistingBlobIndex].Payload)
+ m_PrecachedReferences.reserve(m_Store.m_BlobLookup.size());
+ RwLock::SharedLockScope __(m_Store.m_Lock);
+ for (const auto& It : m_Store.m_BlobLookup)
{
- m_References.push_back(It.first);
+ const BuildStore::BlobIndex ExistingBlobIndex = It.second;
+ const BuildStore::BlobEntry& Entry = m_Store.m_BlobEntries[ExistingBlobIndex];
+ if (Entry.Payload)
+ {
+ m_PrecachedReferences.push_back(It.first);
+ }
+ if (Entry.Metadata)
+ {
+ const BuildStore::MetadataEntry& MetadataEntry = m_Store.m_MetadataEntries[Entry.Metadata];
+ m_PrecachedReferences.push_back(MetadataEntry.MetadataHash);
+ }
}
}
- FilterReferences(Ctx, fmt::format("buildstore [LOCKSTATE] '{}'", "buildstore"), m_References);
+ FilterReferences(Ctx, fmt::format("buildstore [PRECACHE] '{}'", m_Store.m_Config.RootDirectory), m_PrecachedReferences);
+ }
+
+ virtual void UpdateLockedState(GcCtx& Ctx) override
+ {
+ ZEN_TRACE_CPU("Builds::UpdateLockedState");
+ ZEN_MEMSCOPE(GetBuildstoreTag());
+
+ auto Log = [&Ctx]() { return Ctx.Logger; };
+
+ ZEN_ASSERT(m_Store.m_TrackedBlobKeys);
+
+ m_AddedReferences = std::move(*m_Store.m_TrackedBlobKeys);
+
+ FilterReferences(Ctx, fmt::format("buildstore [LOCKSTATE] '{}'", m_Store.m_Config.RootDirectory), m_AddedReferences);
}
virtual std::span<IoHash> GetUnusedReferences(GcCtx& Ctx, std::span<IoHash> IoCids) override
@@ -1165,14 +1115,16 @@ public:
NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
});
- std::span<IoHash> UnusedReferences = KeepUnusedReferences(m_References, IoCids);
+ std::span<IoHash> UnusedReferences = KeepUnusedReferences(m_PrecachedReferences, IoCids);
+ UnusedReferences = KeepUnusedReferences(m_AddedReferences, UnusedReferences);
UsedCount = IoCids.size() - UnusedReferences.size();
return UnusedReferences;
}
private:
BuildStore& m_Store;
- std::vector<IoHash> m_References;
+ std::vector<IoHash> m_PrecachedReferences;
+ std::vector<IoHash> m_AddedReferences;
};
std::string
@@ -1184,201 +1136,6 @@ BuildStore::GetGcName(GcCtx& Ctx)
return fmt::format("buildstore: '{}'", m_Config.RootDirectory.string());
}
-class BuildStoreGcCompator : public GcStoreCompactor
-{
- using BlobEntry = BuildStore::BlobEntry;
- using PayloadEntry = BuildStore::PayloadEntry;
- using MetadataEntry = BuildStore::MetadataEntry;
- using MetadataDiskEntry = BuildStore::MetadataDiskEntry;
- using BlobIndex = BuildStore::BlobIndex;
- using PayloadIndex = BuildStore::PayloadIndex;
- using MetadataIndex = BuildStore::MetadataIndex;
-
-public:
- BuildStoreGcCompator(BuildStore& Store, std::vector<IoHash>&& RemovedBlobs) : m_Store(Store), m_RemovedBlobs(std::move(RemovedBlobs)) {}
-
- virtual void CompactStore(GcCtx& Ctx, GcCompactStoreStats& Stats, const std::function<uint64_t()>& ClaimDiskReserveCallback) override
- {
- ZEN_UNUSED(ClaimDiskReserveCallback);
- ZEN_TRACE_CPU("Builds::CompactStore");
- ZEN_MEMSCOPE(GetBuildstoreTag());
-
- auto Log = [&Ctx]() { return Ctx.Logger; };
-
- Stopwatch Timer;
- const auto _ = MakeGuard([&] {
- if (!Ctx.Settings.Verbose)
- {
- return;
- }
- ZEN_INFO("GCV2: buildstore [COMPACT] '{}': RemovedDisk: {} in {}",
- m_Store.m_Config.RootDirectory,
- NiceBytes(Stats.RemovedDisk),
- NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
- });
-
- const auto __ = MakeGuard([&] { m_Store.Flush(); });
-
- if (!m_RemovedBlobs.empty())
- {
- if (Ctx.Settings.CollectSmallObjects)
- {
- m_Store.m_Lock.WithExclusiveLock([this]() { m_Store.m_TrackedCacheKeys = std::make_unique<HashSet>(); });
- auto __ = MakeGuard([this]() { m_Store.m_Lock.WithExclusiveLock([&]() { m_Store.m_TrackedCacheKeys.reset(); }); });
-
- BlockStore::BlockUsageMap BlockUsage;
- {
- RwLock::SharedLockScope __(m_Store.m_Lock);
-
- for (auto LookupIt : m_Store.m_BlobLookup)
- {
- const BlobIndex ReadBlobIndex = LookupIt.second;
- const BlobEntry& ReadBlobEntry = m_Store.m_BlobEntries[ReadBlobIndex];
-
- if (ReadBlobEntry.Metadata)
- {
- const MetadataEntry& ReadMetadataEntry = m_Store.m_MetadataEntries[ReadBlobEntry.Metadata];
-
- uint32_t BlockIndex = ReadMetadataEntry.Location.BlockIndex;
- uint64_t ChunkSize = RoundUp(ReadMetadataEntry.Location.Size, m_Store.m_Config.MetadataBlockStoreAlignement);
-
- if (auto BlockUsageIt = BlockUsage.find(BlockIndex); BlockUsageIt != BlockUsage.end())
- {
- BlockStore::BlockUsageInfo& Info = BlockUsageIt.value();
- Info.EntryCount++;
- Info.DiskUsage += ChunkSize;
- }
- else
- {
- BlockUsage.insert_or_assign(BlockIndex,
- BlockStore::BlockUsageInfo{.DiskUsage = ChunkSize, .EntryCount = 1});
- }
- }
- }
- }
-
- BlockStore::BlockEntryCountMap BlocksToCompact = m_Store.m_MetadataBlockStore.GetBlocksToCompact(BlockUsage, 90);
- BlockStoreCompactState BlockCompactState;
- std::vector<IoHash> BlockCompactStateKeys;
- BlockCompactState.IncludeBlocks(BlocksToCompact);
-
- if (BlocksToCompact.size() > 0)
- {
- {
- RwLock::SharedLockScope ___(m_Store.m_Lock);
- for (const auto& Entry : m_Store.m_BlobLookup)
- {
- BlobIndex Index = Entry.second;
-
- if (MetadataIndex Meta = m_Store.m_BlobEntries[Index].Metadata; Meta)
- {
- if (BlockCompactState.AddKeepLocation(m_Store.m_MetadataEntries[Meta].Location))
- {
- BlockCompactStateKeys.push_back(Entry.first);
- }
- }
- }
- }
-
- if (Ctx.Settings.IsDeleteMode)
- {
- if (Ctx.Settings.Verbose)
- {
- ZEN_INFO("GCV2: buildstore [COMPACT] '{}': compacting {} blocks",
- m_Store.m_Config.RootDirectory,
- BlocksToCompact.size());
- }
-
- m_Store.m_MetadataBlockStore.CompactBlocks(
- BlockCompactState,
- m_Store.m_Config.MetadataBlockStoreAlignement,
- [&](const BlockStore::MovedChunksArray& MovedArray,
- const BlockStore::ChunkIndexArray& ScrubbedArray,
- uint64_t FreedDiskSpace) {
- std::vector<MetadataDiskEntry> MovedEntries;
- MovedEntries.reserve(MovedArray.size());
- RwLock::ExclusiveLockScope _(m_Store.m_Lock);
- for (const std::pair<size_t, BlockStoreLocation>& Moved : MovedArray)
- {
- size_t ChunkIndex = Moved.first;
- const IoHash& Key = BlockCompactStateKeys[ChunkIndex];
-
- ZEN_ASSERT(m_Store.m_TrackedCacheKeys);
- if (m_Store.m_TrackedCacheKeys->contains(Key))
- {
- continue;
- }
-
- if (auto It = m_Store.m_BlobLookup.find(Key); It != m_Store.m_BlobLookup.end())
- {
- const BlobIndex Index = It->second;
-
- if (MetadataIndex Meta = m_Store.m_BlobEntries[Index].Metadata; Meta)
- {
- m_Store.m_MetadataEntries[Meta].Location = Moved.second;
- MovedEntries.push_back(
- MetadataDiskEntry{.Entry = m_Store.m_MetadataEntries[Meta], .BlobHash = Key});
- }
- }
- }
-
- for (size_t Scrubbed : ScrubbedArray)
- {
- const IoHash& Key = BlockCompactStateKeys[Scrubbed];
- if (auto It = m_Store.m_BlobLookup.find(Key); It != m_Store.m_BlobLookup.end())
- {
- const BlobIndex Index = It->second;
-
- if (MetadataIndex Meta = m_Store.m_BlobEntries[Index].Metadata; Meta)
- {
- MovedEntries.push_back(
- MetadataDiskEntry{.Entry = m_Store.m_MetadataEntries[Meta], .BlobHash = Key});
- MovedEntries.back().Entry.Flags |= MetadataEntry::kTombStone;
- m_Store.m_MetadataEntries[Meta] = {};
- m_Store.m_BlobEntries[Index].Metadata = {};
- }
- }
- }
-
- m_Store.m_MetadatalogFile.Append(MovedEntries);
-
- Stats.RemovedDisk += FreedDiskSpace;
- if (Ctx.IsCancelledFlag.load())
- {
- return false;
- }
- return true;
- },
- ClaimDiskReserveCallback,
- fmt::format("GCV2: buildstore [COMPACT] '{}': ", m_Store.m_Config.RootDirectory));
- }
- else
- {
- if (Ctx.Settings.Verbose)
- {
- ZEN_INFO("GCV2: buildstore [COMPACT] '{}': skipped compacting of {} eligible blocks",
- m_Store.m_Config.RootDirectory,
- BlocksToCompact.size());
- }
- }
- }
- }
- }
- }
-
- virtual std::string GetGcName(GcCtx& Ctx) override
- {
- ZEN_UNUSED(Ctx);
- ZEN_MEMSCOPE(GetBuildstoreTag());
-
- return fmt::format("buildstore: '{}'", m_Store.m_Config.RootDirectory.string());
- }
-
-private:
- BuildStore& m_Store;
- const std::vector<IoHash> m_RemovedBlobs;
-};
-
GcStoreCompactor*
BuildStore::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats)
{
@@ -1413,10 +1170,9 @@ BuildStore::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats)
uint64_t BlobSize = 0;
};
- bool DiskSizeExceeded = false;
- const uint64_t CurrentDiskSize =
- m_LargeBlobStore.StorageSize().DiskSize + m_SmallBlobStore.StorageSize().DiskSize + m_MetadataBlockStore.TotalSize();
- if (CurrentDiskSize > m_Config.MaxDiskSpaceLimit)
+ bool DiskSizeExceeded = false;
+ const uint64_t CurrentBlobsDiskSize = m_BlobStore.TotalSize().TotalSize;
+ if ((m_Config.MaxDiskSpaceLimit > 0) && (CurrentBlobsDiskSize > m_Config.MaxDiskSpaceLimit))
{
DiskSizeExceeded = true;
}
@@ -1444,14 +1200,14 @@ BuildStore::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats)
if (ReadBlobEntry.Metadata)
{
const MetadataEntry& Metadata = m_MetadataEntries[ReadBlobEntry.Metadata];
- Size += Metadata.Location.Size;
+ Size += Metadata.GetSize();
}
const GcClock::Tick AccessTick = ReadBlobEntry.LastAccessTime;
if (AccessTick < ExpireTicks)
{
ExpiredBlobs.push_back(It.first);
- ExpiredDataSize += ExpiredDataSize;
+ ExpiredDataSize += Size;
}
else if (DiskSizeExceeded)
{
@@ -1469,7 +1225,7 @@ BuildStore::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats)
const uint64_t NewSizeLimit =
m_Config.MaxDiskSpaceLimit -
(m_Config.MaxDiskSpaceLimit >> 4); // Remove a bit more than just below the limit so we have some space to grow
- if ((CurrentDiskSize - ExpiredDataSize) > NewSizeLimit)
+ if ((CurrentBlobsDiskSize - ExpiredDataSize) > NewSizeLimit)
{
std::vector<size_t> NonExpiredOrder;
NonExpiredOrder.resize(NonExpiredBlobSizeInfos.size());
@@ -1487,7 +1243,7 @@ BuildStore::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats)
while (It != NonExpiredOrder.end())
{
const SizeInfo& Info = NonExpiredBlobSizeInfos[*It];
- if ((CurrentDiskSize - ExpiredDataSize) < NewSizeLimit)
+ if ((CurrentBlobsDiskSize - ExpiredDataSize) < NewSizeLimit)
{
break;
}
@@ -1539,7 +1295,7 @@ BuildStore::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats)
{
RemoveMetadatas.push_back(
MetadataDiskEntry{.Entry = m_MetadataEntries[ReadBlobEntry.Metadata], .BlobHash = ExpiredBlob});
- RemoveMetadatas.back().Entry.Flags |= MetadataEntry::kTombStone;
+ RemoveMetadatas.back().Entry.AddFlag(MetadataEntry::kTombStone);
m_MetadataEntries[ReadBlobEntry.Metadata] = {};
m_BlobEntries[ReadBlobIndex].Metadata = {};
}
@@ -1568,7 +1324,7 @@ BuildStore::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats)
CompactState();
}
- return new BuildStoreGcCompator(*this, std::move(RemovedBlobs));
+ return nullptr;
}
std::vector<GcReferenceChecker*>
@@ -1595,21 +1351,6 @@ BuildStore::LockState(GcCtx& Ctx)
return Locks;
}
-void
-BuildStore::ScrubStorage(ScrubContext& ScrubCtx)
-{
- ZEN_UNUSED(ScrubCtx);
- // TODO
-}
-
-GcStorageSize
-BuildStore::StorageSize() const
-{
- GcStorageSize Result;
- Result.DiskSize = m_MetadataBlockStore.TotalSize();
- return Result;
-}
-
/*
___________ __
\__ ___/___ _______/ |_ ______
@@ -1630,8 +1371,10 @@ TEST_CASE("BuildStore.Blobs")
std::vector<IoHash> CompressedBlobsHashes;
{
- GcManager Gc;
- BuildStore Store(Config, Gc);
+ GcManager Gc;
+ CidStore BlobStore(Gc);
+ BlobStore.Initialize({.RootDirectory = _.Path() / "build_cas"});
+ BuildStore Store(Config, Gc, BlobStore);
for (size_t I = 0; I < 5; I++)
{
@@ -1658,10 +1401,13 @@ TEST_CASE("BuildStore.Blobs")
IoBuffer Decompressed = CompressedBlob.Decompress().AsIoBuffer();
CHECK(IoHash::HashBuffer(Decompressed) == RawHash);
}
+ BlobStore.Flush();
}
{
- GcManager Gc;
- BuildStore Store(Config, Gc);
+ GcManager Gc;
+ CidStore BlobStore(Gc);
+ BlobStore.Initialize({.RootDirectory = _.Path() / "build_cas"});
+ BuildStore Store(Config, Gc, BlobStore);
for (const IoHash& RawHash : CompressedBlobsHashes)
{
IoBuffer Payload = Store.GetBlob(RawHash);
@@ -1689,8 +1435,10 @@ TEST_CASE("BuildStore.Blobs")
}
}
{
- GcManager Gc;
- BuildStore Store(Config, Gc);
+ GcManager Gc;
+ CidStore BlobStore(Gc);
+ BlobStore.Initialize({.RootDirectory = _.Path() / "build_cas"});
+ BuildStore Store(Config, Gc, BlobStore);
for (const IoHash& RawHash : CompressedBlobsHashes)
{
IoBuffer Payload = Store.GetBlob(RawHash);
@@ -1709,7 +1457,7 @@ TEST_CASE("BuildStore.Blobs")
}
namespace blockstore::testing {
- IoBuffer MakeMetaData(const IoHash& BlobHash, const std::vector<std::pair<std::string, std::string>>& KeyValues)
+ IoBuffer MakeMetadata(const IoHash& BlobHash, const std::vector<std::pair<std::string, std::string>>& KeyValues)
{
CbObjectWriter Writer;
Writer.AddHash("rawHash"sv, BlobHash);
@@ -1740,16 +1488,18 @@ TEST_CASE("BuildStore.Metadata")
std::vector<IoHash> BlobHashes;
std::vector<IoBuffer> MetaPayloads;
{
- GcManager Gc;
- BuildStore Store(Config, Gc);
+ GcManager Gc;
+ CidStore BlobStore(Gc);
+ BlobStore.Initialize({.RootDirectory = _.Path() / "build_cas"});
+ BuildStore Store(Config, Gc, BlobStore);
for (size_t I = 0; I < 5; I++)
{
BlobHashes.push_back(IoHash::HashBuffer(&I, sizeof(I)));
- MetaPayloads.push_back(MakeMetaData(BlobHashes.back(), {{"index", fmt::format("{}", I)}}));
+ MetaPayloads.push_back(MakeMetadata(BlobHashes.back(), {{"index", fmt::format("{}", I)}}));
MetaPayloads.back().SetContentType(ZenContentType::kCbObject);
}
- Store.PutMetadatas(BlobHashes, MetaPayloads);
+ Store.PutMetadatas(BlobHashes, MetaPayloads, &WorkerPool);
std::vector<IoBuffer> ValidateMetaPayloads = Store.GetMetadatas(BlobHashes, &WorkerPool);
CHECK(ValidateMetaPayloads.size() == MetaPayloads.size());
@@ -1760,8 +1510,10 @@ TEST_CASE("BuildStore.Metadata")
}
}
{
- GcManager Gc;
- BuildStore Store(Config, Gc);
+ GcManager Gc;
+ CidStore BlobStore(Gc);
+ BlobStore.Initialize({.RootDirectory = _.Path() / "build_cas"});
+ BuildStore Store(Config, Gc, BlobStore);
std::vector<IoBuffer> ValidateMetaPayloads = Store.GetMetadatas(BlobHashes, &WorkerPool);
CHECK(ValidateMetaPayloads.size() == MetaPayloads.size());
for (size_t I = 0; I < ValidateMetaPayloads.size(); I++)
@@ -1776,8 +1528,10 @@ TEST_CASE("BuildStore.Metadata")
}
std::vector<IoHash> CompressedBlobsHashes;
{
- GcManager Gc;
- BuildStore Store(Config, Gc);
+ GcManager Gc;
+ CidStore BlobStore(Gc);
+ BlobStore.Initialize({.RootDirectory = _.Path() / "build_cas"});
+ BuildStore Store(Config, Gc, BlobStore);
for (size_t I = 0; I < 5; I++)
{
IoBuffer Blob = CreateSemiRandomBlob(4711 + I * 7);
@@ -1805,14 +1559,16 @@ TEST_CASE("BuildStore.Metadata")
std::vector<IoBuffer> BlobMetaPayloads;
{
- GcManager Gc;
- BuildStore Store(Config, Gc);
+ GcManager Gc;
+ CidStore BlobStore(Gc);
+ BlobStore.Initialize({.RootDirectory = _.Path() / "build_cas"});
+ BuildStore Store(Config, Gc, BlobStore);
for (const IoHash& BlobHash : CompressedBlobsHashes)
{
- BlobMetaPayloads.push_back(MakeMetaData(BlobHash, {{"blobHash", fmt::format("{}", BlobHash)}}));
+ BlobMetaPayloads.push_back(MakeMetadata(BlobHash, {{"blobHash", fmt::format("{}", BlobHash)}}));
BlobMetaPayloads.back().SetContentType(ZenContentType::kCbObject);
}
- Store.PutMetadatas(CompressedBlobsHashes, BlobMetaPayloads);
+ Store.PutMetadatas(CompressedBlobsHashes, BlobMetaPayloads, &WorkerPool);
std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, &WorkerPool);
CHECK(MetadataPayloads.size() == BlobMetaPayloads.size());
@@ -1824,8 +1580,10 @@ TEST_CASE("BuildStore.Metadata")
}
{
- GcManager Gc;
- BuildStore Store(Config, Gc);
+ GcManager Gc;
+ CidStore BlobStore(Gc);
+ BlobStore.Initialize({.RootDirectory = _.Path() / "build_cas"});
+ BuildStore Store(Config, Gc, BlobStore);
std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, &WorkerPool);
CHECK(MetadataPayloads.size() == BlobMetaPayloads.size());
@@ -1847,14 +1605,16 @@ TEST_CASE("BuildStore.Metadata")
for (const IoHash& BlobHash : CompressedBlobsHashes)
{
BlobMetaPayloads.push_back(
- MakeMetaData(BlobHash, {{"blobHash", fmt::format("{}", BlobHash)}, {"replaced", fmt::format("{}", true)}}));
+ MakeMetadata(BlobHash, {{"blobHash", fmt::format("{}", BlobHash)}, {"replaced", fmt::format("{}", true)}}));
BlobMetaPayloads.back().SetContentType(ZenContentType::kCbObject);
}
- Store.PutMetadatas(CompressedBlobsHashes, BlobMetaPayloads);
+ Store.PutMetadatas(CompressedBlobsHashes, BlobMetaPayloads, &WorkerPool);
}
{
- GcManager Gc;
- BuildStore Store(Config, Gc);
+ GcManager Gc;
+ CidStore BlobStore(Gc);
+ BlobStore.Initialize({.RootDirectory = _.Path() / "build_cas"});
+ BuildStore Store(Config, Gc, BlobStore);
std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, &WorkerPool);
CHECK(MetadataPayloads.size() == BlobMetaPayloads.size());
@@ -1886,8 +1646,10 @@ TEST_CASE("BuildStore.GC")
std::vector<IoHash> CompressedBlobsHashes;
std::vector<IoBuffer> BlobMetaPayloads;
{
- GcManager Gc;
- BuildStore Store(Config, Gc);
+ GcManager Gc;
+ CidStore BlobStore(Gc);
+ BlobStore.Initialize({.RootDirectory = _.Path() / "build_cas"});
+ BuildStore Store(Config, Gc, BlobStore);
for (size_t I = 0; I < 5; I++)
{
IoBuffer Blob = CreateSemiRandomBlob(4711 + I * 7);
@@ -1900,14 +1662,16 @@ TEST_CASE("BuildStore.GC")
}
for (const IoHash& BlobHash : CompressedBlobsHashes)
{
- BlobMetaPayloads.push_back(MakeMetaData(BlobHash, {{"blobHash", fmt::format("{}", BlobHash)}}));
+ BlobMetaPayloads.push_back(MakeMetadata(BlobHash, {{"blobHash", fmt::format("{}", BlobHash)}}));
BlobMetaPayloads.back().SetContentType(ZenContentType::kCbObject);
}
- Store.PutMetadatas(CompressedBlobsHashes, BlobMetaPayloads);
+ Store.PutMetadatas(CompressedBlobsHashes, BlobMetaPayloads, nullptr);
}
{
- GcManager Gc;
- BuildStore Store(Config, Gc);
+ GcManager Gc;
+ CidStore BlobStore(Gc);
+ BlobStore.Initialize({.RootDirectory = _.Path() / "build_cas"});
+ BuildStore Store(Config, Gc, BlobStore);
{
GcResult Result = Gc.CollectGarbage(GcSettings{.BuildStoreExpireTime = GcClock::Now() - std::chrono::hours(1),
@@ -1967,8 +1731,10 @@ TEST_CASE("BuildStore.SizeLimit")
std::vector<IoHash> CompressedBlobsHashes;
std::vector<IoBuffer> BlobMetaPayloads;
{
- GcManager Gc;
- BuildStore Store(Config, Gc);
+ GcManager Gc;
+ CidStore BlobStore(Gc);
+ BlobStore.Initialize({.RootDirectory = _.Path() / "build_cas"});
+ BuildStore Store(Config, Gc, BlobStore);
for (size_t I = 0; I < 64; I++)
{
IoBuffer Blob = CreateSemiRandomBlob(65537 + I * 7);
@@ -1981,10 +1747,10 @@ TEST_CASE("BuildStore.SizeLimit")
}
for (const IoHash& BlobHash : CompressedBlobsHashes)
{
- BlobMetaPayloads.push_back(MakeMetaData(BlobHash, {{"blobHash", fmt::format("{}", BlobHash)}}));
+ BlobMetaPayloads.push_back(MakeMetadata(BlobHash, {{"blobHash", fmt::format("{}", BlobHash)}}));
BlobMetaPayloads.back().SetContentType(ZenContentType::kCbObject);
}
- Store.PutMetadatas(CompressedBlobsHashes, BlobMetaPayloads);
+ Store.PutMetadatas(CompressedBlobsHashes, BlobMetaPayloads, nullptr);
{
for (size_t I = 0; I < 64; I++)
@@ -1997,8 +1763,10 @@ TEST_CASE("BuildStore.SizeLimit")
}
}
{
- GcManager Gc;
- BuildStore Store(Config, Gc);
+ GcManager Gc;
+ CidStore BlobStore(Gc);
+ BlobStore.Initialize({.RootDirectory = _.Path() / "build_cas"});
+ BuildStore Store(Config, Gc, BlobStore);
{
GcResult Result = Gc.CollectGarbage(GcSettings{.BuildStoreExpireTime = GcClock::Now() - std::chrono::hours(1),
@@ -2023,7 +1791,7 @@ TEST_CASE("BuildStore.SizeLimit")
CHECK(IoHash::HashBuffer(DecompressedBlob) == BlobHash);
}
}
- CHECK(DeletedBlobs == 50);
+ CHECK(DeletedBlobs == 53);
std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, nullptr);
CHECK(MetadataPayloads.size() == BlobMetaPayloads.size());
diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp
index 15a1c9650..cacbbd966 100644
--- a/src/zenstore/cache/cachedisklayer.cpp
+++ b/src/zenstore/cache/cachedisklayer.cpp
@@ -233,6 +233,28 @@ using namespace std::literals;
namespace zen::cache::impl {
+static bool
+UpdateValueWithRawSizeAndHash(ZenCacheValue& Value)
+{
+ if ((Value.RawSize == 0) && (Value.RawHash == IoHash::Zero))
+ {
+ if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary)
+ {
+ return CompressedBuffer::ValidateCompressedHeader(Value.Value, Value.RawHash, Value.RawSize);
+ }
+ else
+ {
+ Value.RawSize = Value.Value.GetSize();
+ Value.RawHash = IoHash::HashBuffer(Value.Value);
+ return true;
+ }
+ }
+ else
+ {
+ return true;
+ }
+}
+
class BucketManifestSerializer
{
using MetaDataIndex = ZenCacheDiskLayer::CacheBucket::MetaDataIndex;
@@ -348,11 +370,20 @@ BucketManifestSerializer::ParseManifest(RwLock::ExclusiveLockScope& Buck
Stopwatch Timer;
const auto _ = MakeGuard([&] { ZEN_INFO("parsed store manifest '{}' in {}", ManifestPath, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); });
- const uint64_t Count = Manifest["Count"sv].AsUInt64(0);
+ const uint64_t Count = Manifest["Count"sv].AsUInt64(0);
+ CbArrayView KeyArray = Manifest["Keys"sv].AsArrayView();
+ if (KeyArray.Num() != Count)
+ {
+ ZEN_WARN("Mismatch in size between 'Keys' ({}) array size and 'Count' ({}) in {}, skipping metadata",
+ KeyArray.Num(),
+ Count,
+ ManifestPath);
+ return;
+ }
+
std::vector<PayloadIndex> KeysIndexes;
KeysIndexes.reserve(Count);
- CbArrayView KeyArray = Manifest["Keys"sv].AsArrayView();
for (CbFieldView& KeyView : KeyArray)
{
if (auto It = Index.find(KeyView.AsHash()); It != Index.end())
@@ -367,19 +398,43 @@ BucketManifestSerializer::ParseManifest(RwLock::ExclusiveLockScope& Buck
size_t KeyIndexOffset = 0;
CbArrayView TimeStampArray = Manifest["Timestamps"].AsArrayView();
- for (CbFieldView& TimeStampView : TimeStampArray)
+ if (KeysIndexes.size() != TimeStampArray.Num())
{
- const PayloadIndex KeyIndex = KeysIndexes[KeyIndexOffset++];
- if (KeyIndex)
+ ZEN_WARN("Mismatch in size between 'Keys' ({}) and 'Timestamps' ({}) arrays in {}, skipping timestamps",
+ KeysIndexes.size(),
+ TimeStampArray.Num(),
+ ManifestPath);
+ }
+ else
+ {
+ for (CbFieldView& TimeStampView : TimeStampArray)
{
- AccessTimes[KeyIndex] = TimeStampView.AsInt64();
+ const PayloadIndex KeyIndex = KeysIndexes[KeyIndexOffset++];
+ if (KeyIndex)
+ {
+ AccessTimes[KeyIndex] = TimeStampView.AsInt64();
+ }
}
}
KeyIndexOffset = 0;
CbArrayView RawHashArray = Manifest["RawHash"].AsArrayView();
CbArrayView RawSizeArray = Manifest["RawSize"].AsArrayView();
- if (RawHashArray.Num() == RawSizeArray.Num())
+ if (RawHashArray.Num() != KeysIndexes.size())
+ {
+ ZEN_WARN("Mismatch in size between 'Keys' ({}) and 'RawHash' ({}) arrays in {}, skipping meta data",
+ KeysIndexes.size(),
+ RawHashArray.Num(),
+ ManifestPath);
+ }
+ else if (RawSizeArray.Num() != KeysIndexes.size())
+ {
+ ZEN_WARN("Mismatch in size between 'Keys' ({}) and 'RawSize' ({}) arrays in {}, skipping meta data",
+ KeysIndexes.size(),
+ RawSizeArray.Num(),
+ ManifestPath);
+ }
+ else
{
auto RawHashIt = RawHashArray.CreateViewIterator();
auto RawSizeIt = RawSizeArray.CreateViewIterator();
@@ -404,10 +459,6 @@ BucketManifestSerializer::ParseManifest(RwLock::ExclusiveLockScope& Buck
RawSizeIt++;
}
}
- else
- {
- ZEN_WARN("Mismatch in size between 'RawHash' and 'RawSize' arrays in {}, skipping meta data", ManifestPath);
- }
}
Oid
@@ -747,6 +798,7 @@ ZenCacheDiskLayer::CacheBucket::CacheBucket(GcManager& Gc,
m_Configuration.LargeObjectThreshold = Max(m_Configuration.LargeObjectThreshold, IoStoreDDCOverrideSize);
}
m_Gc.AddGcReferencer(*this);
+ m_Gc.AddGcStorage(this);
}
ZenCacheDiskLayer::CacheBucket::~CacheBucket()
@@ -761,6 +813,7 @@ ZenCacheDiskLayer::CacheBucket::~CacheBucket()
{
ZEN_ERROR("~CacheBucket() failed with: ", Ex.what());
}
+ m_Gc.RemoveGcStorage(this);
m_Gc.RemoveGcReferencer(*this);
}
@@ -1286,20 +1339,21 @@ ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(const DiskLocation& Loc,
struct ZenCacheDiskLayer::CacheBucket::PutBatchHandle
{
- PutBatchHandle(std::vector<bool>& OutResults) : OutResults(OutResults) {}
+ PutBatchHandle(std::vector<ZenCacheDiskLayer::PutResult>& OutResults) : OutResults(OutResults) {}
struct Entry
{
std::vector<IoHash> HashKeyAndReferences;
+ bool Overwrite;
};
- std::vector<IoBuffer> Buffers;
- std::vector<Entry> Entries;
- std::vector<size_t> EntryResultIndexes;
+ std::vector<ZenCacheValue> Buffers;
+ std::vector<Entry> Entries;
+ std::vector<size_t> EntryResultIndexes;
- std::vector<bool>& OutResults;
+ std::vector<ZenCacheDiskLayer::PutResult>& OutResults;
};
ZenCacheDiskLayer::CacheBucket::PutBatchHandle*
-ZenCacheDiskLayer::CacheBucket::BeginPutBatch(std::vector<bool>& OutResults)
+ZenCacheDiskLayer::CacheBucket::BeginPutBatch(std::vector<ZenCacheDiskLayer::PutResult>& OutResults)
{
ZEN_TRACE_CPU("Z$::Bucket::BeginPutBatch");
return new PutBatchHandle(OutResults);
@@ -1315,23 +1369,40 @@ ZenCacheDiskLayer::CacheBucket::EndPutBatch(PutBatchHandle* Batch) noexcept
ZEN_ASSERT(Batch);
if (!Batch->Buffers.empty())
{
- std::vector<uint8_t> EntryFlags;
- for (const IoBuffer& Buffer : Batch->Buffers)
+ ZEN_ASSERT(Batch->Buffers.size() == Batch->Entries.size());
+ std::vector<uint8_t> EntryFlags;
+ std::vector<size_t> BufferToEntryIndexes;
+ std::vector<IoBuffer> BuffersToCommit;
+ BuffersToCommit.reserve(Batch->Buffers.size());
+ for (size_t Index = 0; Index < Batch->Entries.size(); Index++)
{
- uint8_t Flags = 0;
- if (Buffer.GetContentType() == ZenContentType::kCbObject)
+ const std::vector<IoHash>& HashKeyAndReferences = Batch->Entries[Index].HashKeyAndReferences;
+ ZEN_ASSERT(HashKeyAndReferences.size() >= 1);
+
+ ZenCacheValue& Value = Batch->Buffers[Index];
+ std::span<const IoHash> ReferenceSpan(HashKeyAndReferences.begin() + 1, HashKeyAndReferences.end());
+ PutResult& OutResult = Batch->OutResults[Batch->EntryResultIndexes[Index]];
+ OutResult = PutResult{zen::PutStatus::Success};
+ if (!ShouldRejectPut(HashKeyAndReferences[0], Value, Batch->Entries[Index].Overwrite, OutResult))
{
- Flags |= DiskLocation::kStructured;
- }
- else if (Buffer.GetContentType() == ZenContentType::kCompressedBinary)
- {
- Flags |= DiskLocation::kCompressed;
+ BufferToEntryIndexes.push_back(Index);
+ BuffersToCommit.push_back(Value.Value);
+
+ uint8_t Flags = 0;
+ if (Value.Value.GetContentType() == ZenContentType::kCbObject)
+ {
+ Flags |= DiskLocation::kStructured;
+ }
+ else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary)
+ {
+ Flags |= DiskLocation::kCompressed;
+ }
+ EntryFlags.push_back(Flags);
}
- EntryFlags.push_back(Flags);
}
size_t IndexOffset = 0;
- m_BlockStore.WriteChunks(Batch->Buffers, m_Configuration.PayloadAlignment, [&](std::span<BlockStoreLocation> Locations) {
+ m_BlockStore.WriteChunks(BuffersToCommit, m_Configuration.PayloadAlignment, [&](std::span<BlockStoreLocation> Locations) {
ZEN_MEMSCOPE(GetCacheDiskTag());
std::vector<DiskIndexEntry> DiskEntries;
{
@@ -1339,8 +1410,9 @@ ZenCacheDiskLayer::CacheBucket::EndPutBatch(PutBatchHandle* Batch) noexcept
for (size_t Index = 0; Index < Locations.size(); Index++)
{
DiskLocation Location(Locations[Index], m_Configuration.PayloadAlignment, EntryFlags[IndexOffset + Index]);
- const std::vector<IoHash>& HashKeyAndReferences = Batch->Entries[IndexOffset + Index].HashKeyAndReferences;
- ZEN_ASSERT(HashKeyAndReferences.size() > 1);
+ const std::vector<IoHash>& HashKeyAndReferences =
+ Batch->Entries[BufferToEntryIndexes[IndexOffset + Index]].HashKeyAndReferences;
+ ZEN_ASSERT(HashKeyAndReferences.size() >= 1);
const IoHash HashKey = HashKeyAndReferences[0];
DiskEntries.push_back({.Key = HashKey, .Location = Location});
if (m_TrackedCacheKeys)
@@ -1375,12 +1447,6 @@ ZenCacheDiskLayer::CacheBucket::EndPutBatch(PutBatchHandle* Batch) noexcept
}
}
m_SlogFile.Append(DiskEntries);
- for (size_t Index = 0; Index < Locations.size(); Index++)
- {
- size_t ResultIndex = Batch->EntryResultIndexes[IndexOffset + Index];
- ZEN_ASSERT(ResultIndex < Batch->OutResults.size());
- Batch->OutResults[ResultIndex] = true;
- }
IndexOffset += Locations.size();
});
}
@@ -1876,30 +1942,128 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal
}
}
-void
+bool
+ZenCacheDiskLayer::CacheBucket::ShouldRejectPut(const IoHash& HashKey,
+ ZenCacheValue& InOutValue,
+ bool Overwrite,
+ ZenCacheDiskLayer::PutResult& OutPutResult)
+{
+ const bool CheckExisting = m_Configuration.LimitOverwrites && !Overwrite;
+ if (CheckExisting)
+ {
+ RwLock::SharedLockScope IndexLock(m_IndexLock);
+ auto It = m_Index.find(HashKey);
+ if (It != m_Index.end())
+ {
+ const PayloadIndex EntryIndex = It.value();
+ m_AccessTimes[EntryIndex] = GcClock::TickCount();
+ const DiskLocation Location = m_Payloads[EntryIndex].Location;
+
+ const BucketPayload* Payload = &m_Payloads[EntryIndex];
+ if (Payload->MetaData)
+ {
+ const BucketMetaData MetaData = m_MetaDatas[Payload->MetaData];
+ if (MetaData)
+ {
+ IndexLock.ReleaseNow();
+ if (!cache::impl::UpdateValueWithRawSizeAndHash(InOutValue))
+ {
+ OutPutResult = PutResult{zen::PutStatus::Fail, "Value provided is of bad format"};
+ return true;
+ }
+ else if (MetaData.RawSize != InOutValue.RawSize || MetaData.RawHash != InOutValue.RawHash)
+ {
+ OutPutResult = PutResult{
+ zen::PutStatus::Conflict,
+ fmt::format("Value exists with different size '{}' or hash '{}'", MetaData.RawSize, MetaData.RawHash)};
+ return true;
+ }
+ return false;
+ }
+ }
+
+ ZenCacheValue ExistingValue;
+ if (Payload->MemCached)
+ {
+ ExistingValue.Value = m_MemCachedPayloads[Payload->MemCached].Payload;
+ IndexLock.ReleaseNow();
+ }
+ else
+ {
+ IndexLock.ReleaseNow();
+
+ if (Location.IsFlagSet(DiskLocation::kStandaloneFile))
+ {
+ ExistingValue.Value = GetStandaloneCacheValue(Location, HashKey);
+ }
+ else
+ {
+ ExistingValue.Value = GetInlineCacheValue(Location);
+ }
+ }
+
+ if (ExistingValue.Value)
+ {
+ if (cache::impl::UpdateValueWithRawSizeAndHash(ExistingValue))
+ {
+ if (!cache::impl::UpdateValueWithRawSizeAndHash(InOutValue))
+ {
+ OutPutResult = PutResult{zen::PutStatus::Fail, "Value provided is of bad format"};
+ return true;
+ }
+
+ if (ExistingValue.RawSize != InOutValue.RawSize || ExistingValue.RawHash != InOutValue.RawHash)
+ {
+ OutPutResult = PutResult{zen::PutStatus::Conflict,
+ fmt::format("Value exists with different size '{}' or hash '{}'",
+ ExistingValue.RawSize,
+ ExistingValue.RawHash)};
+ return true;
+ }
+ }
+ }
+ }
+ }
+ return false;
+}
+
+ZenCacheDiskLayer::PutResult
ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey,
const ZenCacheValue& Value,
std::span<IoHash> References,
+ bool Overwrite,
PutBatchHandle* OptionalBatchHandle)
{
ZEN_TRACE_CPU("Z$::Bucket::Put");
metrics::RequestStats::Scope $(m_PutOps, Value.Value.Size());
+ PutResult Result{zen::PutStatus::Success};
+
if (Value.Value.Size() >= m_Configuration.LargeObjectThreshold)
{
- PutStandaloneCacheValue(HashKey, Value, References);
+ ZenCacheValue AcceptedValue = Value;
+ if (ShouldRejectPut(HashKey, AcceptedValue, Overwrite, Result))
+ {
+ if (OptionalBatchHandle)
+ {
+ OptionalBatchHandle->OutResults.push_back(Result);
+ }
+ return Result;
+ }
+ PutStandaloneCacheValue(HashKey, AcceptedValue, References);
if (OptionalBatchHandle)
{
- OptionalBatchHandle->OutResults.push_back(true);
+ OptionalBatchHandle->OutResults.push_back({zen::PutStatus::Success});
}
}
else
{
- PutInlineCacheValue(HashKey, Value, References, OptionalBatchHandle);
+ Result = PutInlineCacheValue(HashKey, Value, References, Overwrite, OptionalBatchHandle);
}
m_DiskWriteCount++;
+ return Result;
}
uint64_t
@@ -2425,7 +2589,7 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx)
ZenCacheDiskLayer::BucketStats
ZenCacheDiskLayer::CacheBucket::Stats()
{
- GcStorageSize Size = StorageSize();
+ CacheStoreSize Size = TotalSize();
return ZenCacheDiskLayer::BucketStats{.DiskSize = Size.DiskSize,
.MemorySize = Size.MemorySize,
.DiskHitCount = m_DiskHitCount,
@@ -2748,38 +2912,49 @@ ZenCacheDiskLayer::CacheBucket::GetMetaData(RwLock::SharedLockScope&, const Buck
return {};
}
-void
+ZenCacheDiskLayer::PutResult
ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey,
const ZenCacheValue& Value,
std::span<IoHash> References,
+ bool Overwrite,
PutBatchHandle* OptionalBatchHandle)
{
ZEN_TRACE_CPU("Z$::Bucket::PutInlineCacheValue");
+ PutResult Result{zen::PutStatus::Success};
if (OptionalBatchHandle != nullptr)
{
- OptionalBatchHandle->Buffers.push_back(Value.Value);
+ OptionalBatchHandle->Buffers.push_back(Value);
OptionalBatchHandle->Entries.push_back({});
OptionalBatchHandle->EntryResultIndexes.push_back(OptionalBatchHandle->OutResults.size());
- OptionalBatchHandle->OutResults.push_back(false);
+ OptionalBatchHandle->OutResults.push_back(PutResult{zen::PutStatus::Fail});
+ PutBatchHandle::Entry& CurrentEntry = OptionalBatchHandle->Entries.back();
+ CurrentEntry.Overwrite = Overwrite;
std::vector<IoHash>& HashKeyAndReferences = OptionalBatchHandle->Entries.back().HashKeyAndReferences;
- HashKeyAndReferences.reserve(1 + HashKeyAndReferences.size());
+ HashKeyAndReferences.reserve(1 + References.size());
HashKeyAndReferences.push_back(HashKey);
- HashKeyAndReferences.insert(HashKeyAndReferences.end(), HashKeyAndReferences.begin(), HashKeyAndReferences.end());
- return;
+ HashKeyAndReferences.insert(HashKeyAndReferences.end(), References.begin(), References.end());
+ return Result;
}
+
+ ZenCacheValue AcceptedValue = Value;
+ if (ShouldRejectPut(HashKey, AcceptedValue, Overwrite, Result))
+ {
+ return Result;
+ }
+
uint8_t EntryFlags = 0;
- if (Value.Value.GetContentType() == ZenContentType::kCbObject)
+ if (AcceptedValue.Value.GetContentType() == ZenContentType::kCbObject)
{
EntryFlags |= DiskLocation::kStructured;
}
- else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary)
+ else if (AcceptedValue.Value.GetContentType() == ZenContentType::kCompressedBinary)
{
EntryFlags |= DiskLocation::kCompressed;
}
- m_BlockStore.WriteChunk(Value.Value.Data(),
- Value.Value.Size(),
+ m_BlockStore.WriteChunk(AcceptedValue.Value.Data(),
+ AcceptedValue.Value.Size(),
m_Configuration.PayloadAlignment,
[&](const BlockStoreLocation& BlockStoreLocation) {
ZEN_MEMSCOPE(GetCacheDiskTag());
@@ -2816,6 +2991,7 @@ ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey,
}
m_SlogFile.Append({.Key = HashKey, .Location = Location});
});
+ return Result;
}
std::string
@@ -3752,7 +3928,7 @@ ZenCacheDiskLayer::GetOrCreateBucket(std::string_view InBucket)
struct ZenCacheDiskLayer::PutBatchHandle
{
- PutBatchHandle(std::vector<bool>& OutResults) : OutResults(OutResults) {}
+ PutBatchHandle(std::vector<ZenCacheDiskLayer::PutResult>& OutResults) : OutResults(OutResults) {}
struct BucketHandle
{
CacheBucket* Bucket;
@@ -3811,13 +3987,13 @@ struct ZenCacheDiskLayer::PutBatchHandle
return NewBucketHandle;
}
- RwLock Lock;
- std::vector<BucketHandle> BucketHandles;
- std::vector<bool>& OutResults;
+ RwLock Lock;
+ std::vector<BucketHandle> BucketHandles;
+ std::vector<ZenCacheDiskLayer::PutResult>& OutResults;
};
ZenCacheDiskLayer::PutBatchHandle*
-ZenCacheDiskLayer::BeginPutBatch(std::vector<bool>& OutResults)
+ZenCacheDiskLayer::BeginPutBatch(std::vector<PutResult>& OutResults)
{
return new PutBatchHandle(OutResults);
}
@@ -3954,21 +4130,23 @@ ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, GetBatc
}
}
-void
+ZenCacheDiskLayer::PutResult
ZenCacheDiskLayer::Put(std::string_view InBucket,
const IoHash& HashKey,
const ZenCacheValue& Value,
std::span<IoHash> References,
+ bool Overwrite,
PutBatchHandle* OptionalBatchHandle)
{
ZEN_TRACE_CPU("Z$::Put");
-
+ PutResult RetVal = {zen::PutStatus::Fail};
if (CacheBucket* Bucket = GetOrCreateBucket(InBucket); Bucket != nullptr)
{
CacheBucket::PutBatchHandle* BucketBatchHandle = OptionalBatchHandle == nullptr ? nullptr : OptionalBatchHandle->GetHandle(Bucket);
- Bucket->Put(HashKey, Value, References, BucketBatchHandle);
+ RetVal = Bucket->Put(HashKey, Value, References, Overwrite, BucketBatchHandle);
TryMemCacheTrim();
}
+ return RetVal;
}
void
@@ -4241,8 +4419,9 @@ ZenCacheDiskLayer::Flush()
}
}
+#if ZEN_WITH_TESTS
void
-ZenCacheDiskLayer::ScrubStorage(ScrubContext& Ctx)
+ZenCacheDiskLayer::Scrub(ScrubContext& Ctx)
{
ZEN_TRACE_CPU("Z$::ScrubStorage");
@@ -4253,13 +4432,13 @@ ZenCacheDiskLayer::ScrubStorage(ScrubContext& Ctx)
for (auto& Kv : m_Buckets)
{
-#if 1
+# if 1
Results.push_back(Ctx.ThreadPool().EnqueueTask(
std::packaged_task<void()>{[this, Bucket = Kv.second.get(), &Ctx] { Bucket->ScrubStorage(Ctx); }}));
-#else
+# else
CacheBucket& Bucket = *Kv.second;
Bucket.ScrubStorage(Ctx);
-#endif
+# endif
}
for (auto& Result : Results)
@@ -4275,16 +4454,17 @@ ZenCacheDiskLayer::ScrubStorage(ScrubContext& Ctx)
}
}
}
+#endif // ZEN_WITH_TESTS
-GcStorageSize
-ZenCacheDiskLayer::StorageSize() const
+CacheStoreSize
+ZenCacheDiskLayer::TotalSize() const
{
- GcStorageSize StorageSize{};
+ CacheStoreSize StorageSize{};
RwLock::SharedLockScope _(m_Lock);
for (auto& Kv : m_Buckets)
{
- GcStorageSize BucketSize = Kv.second->StorageSize();
+ CacheStoreSize BucketSize = Kv.second->TotalSize();
StorageSize.DiskSize += BucketSize.DiskSize;
StorageSize.MemorySize += BucketSize.MemorySize;
}
@@ -4295,7 +4475,7 @@ ZenCacheDiskLayer::StorageSize() const
ZenCacheDiskLayer::DiskStats
ZenCacheDiskLayer::Stats() const
{
- GcStorageSize Size = StorageSize();
+ CacheStoreSize Size = TotalSize();
ZenCacheDiskLayer::DiskStats Stats = {.DiskSize = Size.DiskSize, .MemorySize = Size.MemorySize};
{
RwLock::SharedLockScope _(m_Lock);
@@ -4319,7 +4499,7 @@ ZenCacheDiskLayer::GetInfo() const
{
Info.BucketNames.push_back(Kv.first);
Info.EntryCount += Kv.second->EntryCount();
- GcStorageSize BucketSize = Kv.second->StorageSize();
+ CacheStoreSize BucketSize = Kv.second->TotalSize();
Info.StorageSize.DiskSize += BucketSize.DiskSize;
Info.StorageSize.MemorySize += BucketSize.MemorySize;
}
@@ -4334,7 +4514,7 @@ ZenCacheDiskLayer::GetBucketInfo(std::string_view Bucket) const
if (auto It = m_Buckets.find(std::string(Bucket)); It != m_Buckets.end())
{
- return ZenCacheDiskLayer::BucketInfo{.EntryCount = It->second->EntryCount(), .StorageSize = It->second->StorageSize()};
+ return ZenCacheDiskLayer::BucketInfo{.EntryCount = It->second->EntryCount(), .StorageSize = It->second->TotalSize()};
}
return {};
}
diff --git a/src/zenstore/cache/cacherpc.cpp b/src/zenstore/cache/cacherpc.cpp
index de4b0a37c..ff21d1ede 100644
--- a/src/zenstore/cache/cacherpc.cpp
+++ b/src/zenstore/cache/cacherpc.cpp
@@ -153,13 +153,13 @@ CacheRpcHandler::CacheRpcHandler(LoggerRef InLog,
CacheStats& InCacheStats,
UpstreamCacheClient& InUpstreamCache,
ZenCacheStore& InCacheStore,
- CidStore& InCidStore,
+ GetCidStoreFunc&& InGetCidStore,
const DiskWriteBlocker* InDiskWriteBlocker)
: m_Log(InLog)
, m_CacheStats(InCacheStats)
, m_UpstreamCache(InUpstreamCache)
, m_CacheStore(InCacheStore)
-, m_CidStore(InCidStore)
+, m_GetCidStore(std::move(InGetCidStore))
, m_DiskWriteBlocker(InDiskWriteBlocker)
{
}
@@ -174,6 +174,12 @@ CacheRpcHandler::AreDiskWritesAllowed() const
return (m_DiskWriteBlocker == nullptr || m_DiskWriteBlocker->AreDiskWritesAllowed());
}
+CidStore&
+CacheRpcHandler::GetCidStore(std::string_view Namespace)
+{
+ return m_GetCidStore(Namespace);
+}
+
CacheRpcHandler::RpcResponseCode
CacheRpcHandler::HandleRpcRequest(const CacheRequestContext& Context,
std::string_view UriNamespace,
@@ -334,13 +340,13 @@ CacheRpcHandler::HandleRpcPutCacheRecords(const CacheRequestContext& Context, co
.Policy = std::move(Policy),
.Context = Context};
- PutResult Result = PutCacheRecord(PutRequest, &BatchRequest);
+ PutStatus Result = PutCacheRecord(PutRequest, &BatchRequest);
- if (Result == PutResult::Invalid)
+ if (Result == PutStatus::Invalid)
{
return CbPackage{};
}
- Results.push_back(Result == PutResult::Success);
+ Results.push_back(Result == PutStatus::Success);
}
if (Results.empty())
{
@@ -360,7 +366,7 @@ CacheRpcHandler::HandleRpcPutCacheRecords(const CacheRequestContext& Context, co
return RpcResponse;
}
-PutResult
+PutStatus
CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Package)
{
CbObjectView Record = Request.RecordObject;
@@ -381,9 +387,12 @@ CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Packag
Stopwatch Timer;
+ CidStore& ChunkStore = m_GetCidStore(Request.Namespace);
+
Request.RecordObject.IterateAttachments([this,
&Request,
Package,
+ &ChunkStore,
&WriteAttachmentBuffers,
&WriteRawHashes,
&ValidAttachments,
@@ -412,7 +421,7 @@ CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Packag
Count.Invalid++;
}
}
- else if (m_CidStore.ContainsChunk(ValueHash))
+ else if (ChunkStore.ContainsChunk(ValueHash))
{
ValidAttachments.emplace_back(ValueHash);
Count.Valid++;
@@ -422,19 +431,33 @@ CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Packag
if (Count.Invalid > 0)
{
- return PutResult::Invalid;
+ return PutStatus::Invalid;
}
ZenCacheValue CacheValue;
CacheValue.Value = IoBuffer(Record.GetSize());
Record.CopyTo(MutableMemoryView(CacheValue.Value.MutableData(), CacheValue.Value.GetSize()));
CacheValue.Value.SetContentType(ZenContentType::kCbObject);
- m_CacheStore.Put(Request.Context, Request.Namespace, Request.Key.Bucket, Request.Key.Hash, CacheValue, ReferencedAttachments, nullptr);
+ bool Overwrite = EnumHasAllFlags(Request.Policy.GetRecordPolicy(), CachePolicy::StoreLocal) &&
+ !EnumHasAllFlags(Request.Policy.GetRecordPolicy(), CachePolicy::QueryLocal);
+ // TODO: Propagation for rejected PUTs
+ ZenCacheStore::PutResult PutResult = m_CacheStore.Put(Request.Context,
+ Request.Namespace,
+ Request.Key.Bucket,
+ Request.Key.Hash,
+ CacheValue,
+ ReferencedAttachments,
+ Overwrite,
+ nullptr);
+ if (PutResult.Status != zen::PutStatus::Success)
+ {
+ return PutResult.Status;
+ }
m_CacheStats.WriteCount++;
if (!WriteAttachmentBuffers.empty())
{
- std::vector<CidStore::InsertResult> InsertResults = m_CidStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes);
+ std::vector<CidStore::InsertResult> InsertResults = ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes);
for (size_t Index = 0; Index < InsertResults.size(); Index++)
{
if (InsertResults[Index].New)
@@ -461,12 +484,14 @@ CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Packag
if (HasUpstream && EnumHasAllFlags(Request.Policy.GetRecordPolicy(), CachePolicy::StoreRemote) && !IsPartialRecord)
{
- m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbPackage,
- .Namespace = Request.Namespace,
- .Key = Request.Key,
- .ValueContentIds = std::move(ValidAttachments)});
+ m_UpstreamCache.EnqueueUpstream(
+ {.Type = ZenContentType::kCbPackage,
+ .Namespace = Request.Namespace,
+ .Key = Request.Key,
+ .ValueContentIds = std::move(ValidAttachments)},
+ [ChunkStore = &ChunkStore](const IoHash& ValueHash) { return ChunkStore->FindChunkByCid(ValueHash); });
}
- return PutResult::Success;
+ return PutStatus::Success;
}
CbPackage
@@ -507,6 +532,8 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb
return CbPackage{};
}
+ CidStore& ChunkStore = m_GetCidStore(Namespace.value());
+
const bool HasUpstream = m_UpstreamCache.IsActive();
eastl::fixed_vector<RecordRequestData, 16> Requests;
@@ -606,7 +633,7 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb
}
else if (EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData))
{
- if (m_CidStore.ContainsChunk(Value.ContentId))
+ if (ChunkStore.ContainsChunk(Value.ContentId))
{
Value.Exists = true;
}
@@ -627,7 +654,7 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb
}
else
{
- if (IoBuffer Chunk = m_CidStore.FindChunkByCid(Value.ContentId))
+ if (IoBuffer Chunk = ChunkStore.FindChunkByCid(Value.ContentId))
{
if (Chunk.GetSize() > 0)
{
@@ -651,7 +678,7 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb
}
if (!RequestValueIndexes.empty())
{
- m_CidStore.IterateChunks(
+ ChunkStore.IterateChunks(
CidHashes,
[this, &Request, &RequestValueIndexes](size_t Index, const IoBuffer& Payload) -> bool {
try
@@ -744,7 +771,7 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb
}
}
- const auto OnCacheRecordGetComplete = [this, Namespace, &ParseValues, Context](CacheRecordGetCompleteParams&& Params) {
+ const auto OnCacheRecordGetComplete = [this, Namespace, &ChunkStore, &ParseValues, Context](CacheRecordGetCompleteParams&& Params) {
if (!Params.Record)
{
return;
@@ -765,18 +792,24 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb
EnumHasAllFlags(Request.DownstreamPolicy.GetRecordPolicy(), CachePolicy::StoreLocal) && AreDiskWritesAllowed();
if (StoreLocal)
{
+ bool Overwrite = !EnumHasAllFlags(Request.DownstreamPolicy.GetRecordPolicy(), CachePolicy::QueryLocal);
std::vector<IoHash> ReferencedAttachments;
ObjectBuffer.IterateAttachments([&ReferencedAttachments](CbFieldView HashView) {
const IoHash ValueHash = HashView.AsHash();
ReferencedAttachments.push_back(ValueHash);
});
- m_CacheStore.Put(Context,
- *Namespace,
- Key.Bucket,
- Key.Hash,
- {.Value = {Request.RecordCacheValue}},
- ReferencedAttachments,
- nullptr);
+ ZenCacheStore::PutResult PutResult = m_CacheStore.Put(Context,
+ *Namespace,
+ Key.Bucket,
+ Key.Hash,
+ {.Value = {Request.RecordCacheValue}},
+ ReferencedAttachments,
+ Overwrite,
+ nullptr);
+ if (PutResult.Status != zen::PutStatus::Success)
+ {
+ return;
+ }
m_CacheStats.WriteCount++;
}
ParseValues(Request);
@@ -807,7 +840,7 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb
Value.Exists = true;
if (StoreLocal)
{
- m_CidStore.AddChunk(Compressed.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash());
+ ChunkStore.AddChunk(Compressed.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash());
}
if (!EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData))
{
@@ -924,10 +957,12 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con
const bool HasUpstream = m_UpstreamCache.IsActive();
CbArrayView RequestsArray = Params["Requests"sv].AsArrayView();
- std::vector<bool> BatchResults;
- eastl::fixed_vector<size_t, 32> BatchResultIndexes;
- eastl::fixed_vector<bool, 32> Results;
- eastl::fixed_vector<CacheKey, 32> UpstreamCacheKeys;
+ CidStore& ChunkStore = m_GetCidStore(Namespace.value());
+
+ std::vector<ZenCacheStore::PutResult> BatchResults;
+ eastl::fixed_vector<size_t, 32> BatchResultIndexes;
+ eastl::fixed_vector<ZenCacheStore::PutResult, 32> Results;
+ eastl::fixed_vector<CacheKey, 32> UpstreamCacheKeys;
uint64_t RequestCount = RequestsArray.Num();
{
@@ -977,34 +1012,39 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con
if (EnumHasAllFlags(Policy, CachePolicy::StoreLocal))
{
- IoBuffer Value = Chunk.GetCompressed().Flatten().AsIoBuffer();
+ bool Overwrite = !EnumHasAllFlags(Policy, CachePolicy::QueryLocal);
+ IoBuffer Value = Chunk.GetCompressed().Flatten().AsIoBuffer();
Value.SetContentType(ZenContentType::kCompressedBinary);
if (RawSize == 0)
{
RawSize = Chunk.DecodeRawSize();
}
- m_CacheStore.Put(Context,
- *Namespace,
- Key.Bucket,
- Key.Hash,
- {.Value = Value, .RawSize = RawSize, .RawHash = RawHash},
- {},
- Batch.get());
- m_CacheStats.WriteCount++;
+ ZenCacheStore::PutResult PutResult = m_CacheStore.Put(Context,
+ *Namespace,
+ Key.Bucket,
+ Key.Hash,
+ {.Value = Value, .RawSize = RawSize, .RawHash = RawHash},
+ {},
+ Overwrite,
+ Batch.get());
+ if (PutResult.Status == zen::PutStatus::Success)
+ {
+ m_CacheStats.WriteCount++;
+ }
if (Batch)
{
BatchResultIndexes.push_back(Results.size());
- Results.push_back(false);
+ Results.push_back({zen::PutStatus::Fail});
}
else
{
- Results.push_back(true);
+ Results.push_back(PutResult);
}
TransferredSize = Chunk.GetCompressedSize();
}
else
{
- Results.push_back(true);
+ Results.push_back({zen::PutStatus::Success});
}
Valid = true;
}
@@ -1020,12 +1060,12 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con
if (m_CacheStore.Get(Context, *Namespace, Key.Bucket, Key.Hash, ExistingValue) &&
IsCompressedBinary(ExistingValue.Value.GetContentType()))
{
- Results.push_back(true);
+ Results.push_back({zen::PutStatus::Success});
Valid = true;
}
else
{
- Results.push_back(false);
+ Results.push_back({zen::PutStatus::Fail, fmt::format("Missing attachment with raw hash {}", RawHash)});
}
}
// We do not search the Upstream. No data in a put means the caller is probing for whether they need to do a heavy put.
@@ -1060,27 +1100,49 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con
{
size_t BatchResultIndex = BatchResultIndexes[Index];
ZEN_ASSERT(BatchResultIndex < Results.size());
- ZEN_ASSERT(Results[BatchResultIndex] == false);
+ ZEN_ASSERT(Results[BatchResultIndex].Status != zen::PutStatus::Success);
Results[BatchResultIndex] = BatchResults[Index];
}
for (std::size_t Index = 0; Index < Results.size(); Index++)
{
- if (Results[Index] && UpstreamCacheKeys[Index] != CacheKey::Empty)
+ if ((Results[Index].Status == zen::PutStatus::Success) && UpstreamCacheKeys[Index] != CacheKey::Empty)
{
m_UpstreamCache.EnqueueUpstream(
- {.Type = ZenContentType::kCompressedBinary, .Namespace = *Namespace, .Key = UpstreamCacheKeys[Index]});
+ {.Type = ZenContentType::kCompressedBinary, .Namespace = *Namespace, .Key = UpstreamCacheKeys[Index]},
+ [ChunkStore = &ChunkStore](const IoHash& ValueHash) { return ChunkStore->FindChunkByCid(ValueHash); });
}
}
{
ZEN_TRACE_CPU("Z$::RpcPutCacheValues::Response");
CbObjectWriter ResponseObject{1024};
ResponseObject.BeginArray("Result"sv);
- for (bool Value : Results)
+ bool bAnyErrors = false;
+ for (const ZenCacheStore::PutResult& Value : Results)
{
- ResponseObject.AddBool(Value);
+ if (Value.Status == zen::PutStatus::Success)
+ {
+ ResponseObject.AddBool(true);
+ }
+ else
+ {
+ bAnyErrors = true;
+ ResponseObject.AddBool(false);
+ }
}
ResponseObject.EndArray();
+ if (bAnyErrors)
+ {
+ ResponseObject.BeginArray("ErrorMessages"sv);
+ for (const ZenCacheStore::PutResult& Value : Results)
+ {
+ if (Value.Status != zen::PutStatus::Success)
+ {
+ ResponseObject.AddString(Value.Message);
+ }
+ }
+ ResponseObject.EndArray();
+ }
CbPackage RpcResponse;
RpcResponse.SetObject(ResponseObject.Save());
@@ -1239,6 +1301,7 @@ CacheRpcHandler::HandleRpcGetCacheValues(const CacheRequestContext& Context, CbO
const bool HasData = IsCompressedBinary(Params.Value.GetContentType());
const bool SkipData = EnumHasAllFlags(Request.Policy, CachePolicy::SkipData);
const bool StoreData = EnumHasAllFlags(Request.Policy, CachePolicy::StoreLocal) && AreDiskWritesAllowed();
+ const bool Overwrite = StoreData && !EnumHasAllFlags(Request.Policy, CachePolicy::QueryLocal);
const bool IsHit = SkipData || HasData;
if (IsHit)
{
@@ -1249,14 +1312,19 @@ CacheRpcHandler::HandleRpcGetCacheValues(const CacheRequestContext& Context, CbO
if (HasData && StoreData)
{
- m_CacheStore.Put(Context,
- *Namespace,
- Request.Key.Bucket,
- Request.Key.Hash,
- ZenCacheValue{.Value = Params.Value, .RawSize = Request.RawSize, .RawHash = Request.RawHash},
- {},
- nullptr);
- m_CacheStats.WriteCount++;
+ ZenCacheStore::PutResult PutResult = m_CacheStore.Put(
+ Context,
+ *Namespace,
+ Request.Key.Bucket,
+ Request.Key.Hash,
+ ZenCacheValue{.Value = Params.Value, .RawSize = Request.RawSize, .RawHash = Request.RawHash},
+ {},
+ Overwrite,
+ nullptr);
+ if (PutResult.Status == zen::PutStatus::Success)
+ {
+ m_CacheStats.WriteCount++;
+ }
}
ZEN_DEBUG("GETCACHEVALUES HIT - '{}/{}/{}' {} ({}) in {}",
@@ -1494,6 +1562,8 @@ CacheRpcHandler::GetLocalCacheRecords(const CacheRequestContext& Context,
using namespace cache::detail;
const bool HasUpstream = m_UpstreamCache.IsActive();
+ CidStore& ChunkStore = m_GetCidStore(Namespace);
+
// TODO: BatchGet records?
std::vector<CacheKeyRequest*> UpstreamRecordRequests;
for (size_t RecordIndex = 0; RecordIndex < Records.size(); ++RecordIndex)
@@ -1527,36 +1597,48 @@ CacheRpcHandler::GetLocalCacheRecords(const CacheRequestContext& Context,
if (!UpstreamRecordRequests.empty())
{
- const auto OnCacheRecordGetComplete = [this, Namespace, &RecordKeys, &Records, &RecordRequests, Context](
- CacheRecordGetCompleteParams&& Params) {
- if (!Params.Record)
- {
- return;
- }
- CacheKeyRequest& RecordKey = Params.Request;
- size_t RecordIndex = std::distance(RecordKeys.data(), &RecordKey);
- RecordRequests[RecordIndex]->ElapsedTimeUs += static_cast<uint64_t>(Params.ElapsedSeconds * 1000000.0);
- RecordBody& Record = Records[RecordIndex];
-
- const CacheKey& Key = RecordKey.Key;
- Record.Exists = true;
- CbObject ObjectBuffer = CbObject::Clone(Params.Record);
- Record.CacheValue = ObjectBuffer.GetBuffer().AsIoBuffer();
- Record.CacheValue.SetContentType(ZenContentType::kCbObject);
- Record.Source = Params.Source;
-
- bool StoreLocal = EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::StoreLocal) && AreDiskWritesAllowed();
- if (StoreLocal)
- {
- std::vector<IoHash> ReferencedAttachments;
- ObjectBuffer.IterateAttachments([&ReferencedAttachments](CbFieldView HashView) {
- const IoHash ValueHash = HashView.AsHash();
- ReferencedAttachments.push_back(ValueHash);
- });
- m_CacheStore.Put(Context, Namespace, Key.Bucket, Key.Hash, {.Value = Record.CacheValue}, ReferencedAttachments, nullptr);
- m_CacheStats.WriteCount++;
- }
- };
+ const auto OnCacheRecordGetComplete =
+ [this, Namespace, &RecordKeys, &Records, &RecordRequests, Context](CacheRecordGetCompleteParams&& Params) {
+ if (!Params.Record)
+ {
+ return;
+ }
+ CacheKeyRequest& RecordKey = Params.Request;
+ size_t RecordIndex = std::distance(RecordKeys.data(), &RecordKey);
+ RecordRequests[RecordIndex]->ElapsedTimeUs += static_cast<uint64_t>(Params.ElapsedSeconds * 1000000.0);
+ RecordBody& Record = Records[RecordIndex];
+
+ const CacheKey& Key = RecordKey.Key;
+ Record.Exists = true;
+ CbObject ObjectBuffer = CbObject::Clone(Params.Record);
+ Record.CacheValue = ObjectBuffer.GetBuffer().AsIoBuffer();
+ Record.CacheValue.SetContentType(ZenContentType::kCbObject);
+ Record.Source = Params.Source;
+
+ bool StoreLocal = EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::StoreLocal) && AreDiskWritesAllowed();
+ if (StoreLocal)
+ {
+ bool Overwrite = !EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::QueryLocal);
+ std::vector<IoHash> ReferencedAttachments;
+ ObjectBuffer.IterateAttachments([&ReferencedAttachments](CbFieldView HashView) {
+ const IoHash ValueHash = HashView.AsHash();
+ ReferencedAttachments.push_back(ValueHash);
+ });
+ ZenCacheStore::PutResult PutResult = m_CacheStore.Put(Context,
+ Namespace,
+ Key.Bucket,
+ Key.Hash,
+ {.Value = Record.CacheValue},
+ ReferencedAttachments,
+ Overwrite,
+ nullptr);
+ if (PutResult.Status != zen::PutStatus::Success)
+ {
+ return;
+ }
+ m_CacheStats.WriteCount++;
+ }
+ };
m_UpstreamCache.GetCacheRecords(Namespace, UpstreamRecordRequests, std::move(OnCacheRecordGetComplete));
}
@@ -1620,12 +1702,12 @@ CacheRpcHandler::GetLocalCacheRecords(const CacheRequestContext& Context,
{
if (EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData) && Request->RawSizeKnown)
{
- if (m_CidStore.ContainsChunk(Request->Key->ChunkId))
+ if (ChunkStore.ContainsChunk(Request->Key->ChunkId))
{
Request->Exists = true;
}
}
- else if (IoBuffer Payload = m_CidStore.FindChunkByCid(Request->Key->ChunkId))
+ else if (IoBuffer Payload = ChunkStore.FindChunkByCid(Request->Key->ChunkId))
{
if (!EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData))
{
@@ -1758,6 +1840,8 @@ CacheRpcHandler::GetUpstreamCacheChunks(const CacheRequestContext& Context,
return;
}
+ CidStore& ChunkStore = m_GetCidStore(Namespace);
+
CacheChunkRequest& Key = Params.Request;
size_t RequestIndex = std::distance(RequestKeys.data(), &Key);
ChunkRequest& Request = Requests[RequestIndex];
@@ -1774,20 +1858,26 @@ CacheRpcHandler::GetUpstreamCacheChunks(const CacheRequestContext& Context,
bool StoreLocal = EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::StoreLocal) && AreDiskWritesAllowed();
if (StoreLocal)
{
+ bool Overwrite = !EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::QueryLocal);
if (Request.IsRecordRequest)
{
- m_CidStore.AddChunk(Params.Value, Params.RawHash);
+ ChunkStore.AddChunk(Params.Value, Params.RawHash);
}
else
{
- m_CacheStore.Put(Context,
- Namespace,
- Key.Key.Bucket,
- Key.Key.Hash,
- {.Value = Params.Value, .RawSize = Params.RawSize, .RawHash = Params.RawHash},
- {},
- nullptr);
- m_CacheStats.WriteCount++;
+ ZenCacheStore::PutResult PutResult =
+ m_CacheStore.Put(Context,
+ Namespace,
+ Key.Key.Bucket,
+ Key.Key.Hash,
+ {.Value = Params.Value, .RawSize = Params.RawSize, .RawHash = Params.RawHash},
+ {},
+ Overwrite,
+ nullptr);
+ if (PutResult.Status == zen::PutStatus::Success)
+ {
+ m_CacheStats.WriteCount++;
+ }
}
}
if (!EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::SkipData))
diff --git a/src/zenstore/cache/structuredcachestore.cpp b/src/zenstore/cache/structuredcachestore.cpp
index d956384ca..1f2d6c37f 100644
--- a/src/zenstore/cache/structuredcachestore.cpp
+++ b/src/zenstore/cache/structuredcachestore.cpp
@@ -139,13 +139,10 @@ ZenCacheNamespace::ZenCacheNamespace(GcManager& Gc, JobQueue& JobQueue, const st
CreateDirectories(m_RootDir);
m_DiskLayer.DiscoverBuckets();
-
- m_Gc.AddGcStorage(this);
}
ZenCacheNamespace::~ZenCacheNamespace()
{
- m_Gc.RemoveGcStorage(this);
}
struct ZenCacheNamespace::PutBatchHandle
@@ -154,7 +151,7 @@ struct ZenCacheNamespace::PutBatchHandle
};
ZenCacheNamespace::PutBatchHandle*
-ZenCacheNamespace::BeginPutBatch(std::vector<bool>& OutResult)
+ZenCacheNamespace::BeginPutBatch(std::vector<PutResult>& OutResult)
{
ZenCacheNamespace::PutBatchHandle* Handle = new ZenCacheNamespace::PutBatchHandle;
Handle->DiskLayerHandle = m_DiskLayer.BeginPutBatch(OutResult);
@@ -252,11 +249,12 @@ ZenCacheNamespace::Get(std::string_view InBucket, const IoHash& HashKey, GetBatc
return;
}
-void
+ZenCacheNamespace::PutResult
ZenCacheNamespace::Put(std::string_view InBucket,
const IoHash& HashKey,
const ZenCacheValue& Value,
std::span<IoHash> References,
+ bool Overwrite,
PutBatchHandle* OptionalBatchHandle)
{
ZEN_TRACE_CPU(OptionalBatchHandle ? "Z$::Namespace::Put(Batched)" : "Z$::Namespace::Put");
@@ -268,8 +266,12 @@ ZenCacheNamespace::Put(std::string_view InBucket,
ZEN_ASSERT(Value.Value.Size());
ZenCacheDiskLayer::PutBatchHandle* DiskLayerBatchHandle = OptionalBatchHandle ? OptionalBatchHandle->DiskLayerHandle : nullptr;
- m_DiskLayer.Put(InBucket, HashKey, Value, References, DiskLayerBatchHandle);
- m_WriteCount++;
+ PutResult RetVal = m_DiskLayer.Put(InBucket, HashKey, Value, References, Overwrite, DiskLayerBatchHandle);
+ if (RetVal.Status == zen::PutStatus::Success)
+ {
+ m_WriteCount++;
+ }
+ return RetVal;
}
bool
@@ -297,7 +299,6 @@ ZenCacheNamespace::EnumerateBucketContents(std::string_view
std::function<void()>
ZenCacheNamespace::Drop()
{
- m_Gc.RemoveGcStorage(this);
return m_DiskLayer.Drop();
}
@@ -307,25 +308,19 @@ ZenCacheNamespace::Flush()
m_DiskLayer.Flush();
}
+#if ZEN_WITH_TESTS
void
-ZenCacheNamespace::ScrubStorage(ScrubContext& Ctx)
+ZenCacheNamespace::Scrub(ScrubContext& Ctx)
{
- if (m_LastScrubTime == Ctx.ScrubTimestamp())
- {
- return;
- }
-
ZEN_INFO("scrubbing '{}'", m_RootDir);
-
- m_LastScrubTime = Ctx.ScrubTimestamp();
-
- m_DiskLayer.ScrubStorage(Ctx);
+ m_DiskLayer.Scrub(Ctx);
}
+#endif // ZEN_WITH_TESTS
-GcStorageSize
-ZenCacheNamespace::StorageSize() const
+CacheStoreSize
+ZenCacheNamespace::TotalSize() const
{
- return m_DiskLayer.StorageSize();
+ return m_DiskLayer.TotalSize();
}
ZenCacheNamespace::Info
@@ -557,7 +552,7 @@ ZenCacheStore::LogWorker()
}
}
-ZenCacheStore::PutBatch::PutBatch(ZenCacheStore& CacheStore, std::string_view InNamespace, std::vector<bool>& OutResult)
+ZenCacheStore::PutBatch::PutBatch(ZenCacheStore& CacheStore, std::string_view InNamespace, std::vector<PutResult>& OutResult)
: m_CacheStore(CacheStore)
{
ZEN_MEMSCOPE(GetCacheStoreTag());
@@ -720,13 +715,14 @@ ZenCacheStore::Get(const CacheRequestContext& Context,
m_MissCount++;
}
-void
+ZenCacheStore::PutResult
ZenCacheStore::Put(const CacheRequestContext& Context,
std::string_view Namespace,
std::string_view Bucket,
const IoHash& HashKey,
const ZenCacheValue& Value,
std::span<IoHash> References,
+ bool Overwrite,
PutBatch* OptionalBatchHandle)
{
// Ad hoc rejection of known bad usage patterns for DDC bucket names
@@ -734,7 +730,7 @@ ZenCacheStore::Put(const CacheRequestContext& Context,
if (IsKnownBadBucketName(Bucket))
{
m_RejectedWriteCount++;
- return;
+ return PutResult{zen::PutStatus::Invalid, "Bad bucket name"};
}
ZEN_MEMSCOPE(GetCacheStoreTag());
@@ -764,9 +760,16 @@ ZenCacheStore::Put(const CacheRequestContext& Context,
if (ZenCacheNamespace* Store = GetNamespace(Namespace); Store)
{
ZenCacheNamespace::PutBatchHandle* BatchHandle = OptionalBatchHandle ? OptionalBatchHandle->m_NamespaceBatchHandle : nullptr;
- Store->Put(Bucket, HashKey, Value, References, BatchHandle);
- m_WriteCount++;
- return;
+ PutResult RetVal = Store->Put(Bucket, HashKey, Value, References, Overwrite, BatchHandle);
+ if (RetVal.Status == zen::PutStatus::Success)
+ {
+ m_WriteCount++;
+ }
+ else
+ {
+ m_RejectedWriteCount++;
+ }
+ return RetVal;
}
ZEN_WARN("request for unknown namespace '{}' in ZenCacheStore::Put [{}] bucket '{}', key '{}'",
@@ -774,6 +777,7 @@ ZenCacheStore::Put(const CacheRequestContext& Context,
Namespace,
Bucket,
HashKey.ToHexString());
+ return PutResult{zen::PutStatus::Fail, fmt::format("Unknown namespace '{}'", Namespace)};
}
bool
@@ -822,11 +826,13 @@ ZenCacheStore::Flush()
IterateNamespaces([&](std::string_view, ZenCacheNamespace& Store) { Store.Flush(); });
}
+#if ZEN_WITH_TESTS
void
-ZenCacheStore::ScrubStorage(ScrubContext& Ctx)
+ZenCacheStore::Scrub(ScrubContext& Ctx)
{
- IterateNamespaces([&](std::string_view, ZenCacheNamespace& Store) { Store.ScrubStorage(Ctx); });
+ IterateNamespaces([&](std::string_view, ZenCacheNamespace& Store) { Store.Scrub(Ctx); });
}
+#endif // ZEN_WITH_TESTS
CacheValueDetails
ZenCacheStore::GetValueDetails(const std::string_view NamespaceFilter,
@@ -951,12 +957,12 @@ ZenCacheStore::IterateNamespaces(const std::function<void(std::string_view Names
}
}
-GcStorageSize
-ZenCacheStore::StorageSize() const
+CacheStoreSize
+ZenCacheStore::TotalSize() const
{
- GcStorageSize Size;
+ CacheStoreSize Size;
IterateNamespaces([&](std::string_view, ZenCacheNamespace& Store) {
- GcStorageSize StoreSize = Store.StorageSize();
+ CacheStoreSize StoreSize = Store.TotalSize();
Size.MemorySize += StoreSize.MemorySize;
Size.DiskSize += StoreSize.DiskSize;
});
@@ -1026,7 +1032,7 @@ ZenCacheStore::SetLoggingConfig(const Configuration::LogConfig& Loggingconfig)
ZenCacheStore::Info
ZenCacheStore::GetInfo() const
{
- ZenCacheStore::Info Info = {.Config = m_Configuration, .StorageSize = StorageSize()};
+ ZenCacheStore::Info Info = {.Config = m_Configuration, .StorageSize = TotalSize()};
IterateNamespaces([&Info](std::string_view NamespaceName, ZenCacheNamespace& Namespace) {
Info.NamespaceNames.push_back(std::string(NamespaceName));
@@ -1378,7 +1384,7 @@ TEST_CASE("cachestore.store")
Value.Value = Obj.GetBuffer().AsIoBuffer();
Value.Value.SetContentType(ZenContentType::kCbObject);
- Zcs.Put("test_bucket"sv, Key, Value, {});
+ Zcs.Put("test_bucket"sv, Key, Value, {}, false);
}
for (int i = 0; i < kIterationCount; ++i)
@@ -1414,7 +1420,7 @@ TEST_CASE("cachestore.size")
const size_t Count = 16;
ScopedTemporaryDirectory TempDir;
- GcStorageSize CacheSize;
+ CacheStoreSize CacheSize;
{
GcManager Gc;
@@ -1432,10 +1438,10 @@ TEST_CASE("cachestore.size")
const size_t Bucket = Key % 4;
std::string BucketName = fmt::format("test_bucket-{}", Bucket);
IoHash Hash = IoHash::HashBuffer(&Key, sizeof(uint32_t));
- Zcs.Put(BucketName, Hash, ZenCacheValue{.Value = Buffer}, {});
+ Zcs.Put(BucketName, Hash, ZenCacheValue{.Value = Buffer}, {}, false);
Keys.push_back({BucketName, Hash});
}
- CacheSize = Zcs.StorageSize();
+ CacheSize = Zcs.TotalSize();
CHECK_LE(CacheValue.GetSize() * Count, CacheSize.DiskSize);
CHECK_EQ(0, CacheSize.MemorySize);
@@ -1445,7 +1451,7 @@ TEST_CASE("cachestore.size")
Zcs.Get(Key.first, Key.second, _);
}
- CacheSize = Zcs.StorageSize();
+ CacheSize = Zcs.TotalSize();
CHECK_LE(CacheValue.GetSize() * Count, CacheSize.DiskSize);
CHECK_LE(CacheValue.GetSize() * Count, CacheSize.MemorySize);
}
@@ -1454,7 +1460,7 @@ TEST_CASE("cachestore.size")
GcManager Gc;
ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache", {});
- const GcStorageSize SerializedSize = Zcs.StorageSize();
+ const CacheStoreSize SerializedSize = Zcs.TotalSize();
CHECK_EQ(SerializedSize.MemorySize, 0);
CHECK_LE(SerializedSize.DiskSize, CacheSize.DiskSize);
@@ -1462,8 +1468,8 @@ TEST_CASE("cachestore.size")
{
Zcs.DropBucket(fmt::format("test_bucket-{}", Bucket));
}
- CHECK_EQ(0, Zcs.StorageSize().DiskSize);
- CHECK_EQ(0, Zcs.StorageSize().MemorySize);
+ CHECK_EQ(0, Zcs.TotalSize().DiskSize);
+ CHECK_EQ(0, Zcs.TotalSize().MemorySize);
}
}
@@ -1472,7 +1478,7 @@ TEST_CASE("cachestore.size")
const size_t Count = 16;
ScopedTemporaryDirectory TempDir;
- GcStorageSize CacheSize;
+ CacheStoreSize CacheSize;
{
GcManager Gc;
@@ -1486,10 +1492,10 @@ TEST_CASE("cachestore.size")
for (size_t Key = 0; Key < Count; ++Key)
{
const size_t Bucket = Key % 4;
- Zcs.Put(fmt::format("test_bucket-{}", Bucket), IoHash::HashBuffer(&Key, sizeof(uint32_t)), {.Value = Buffer}, {});
+ Zcs.Put(fmt::format("test_bucket-{}", Bucket), IoHash::HashBuffer(&Key, sizeof(uint32_t)), {.Value = Buffer}, {}, false);
}
- CacheSize = Zcs.StorageSize();
+ CacheSize = Zcs.TotalSize();
CHECK_LE(CacheValue.GetSize() * Count, CacheSize.DiskSize);
CHECK_EQ(0, CacheSize.MemorySize);
}
@@ -1498,7 +1504,7 @@ TEST_CASE("cachestore.size")
GcManager Gc;
ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache", {});
- const GcStorageSize SerializedSize = Zcs.StorageSize();
+ const CacheStoreSize SerializedSize = Zcs.TotalSize();
CHECK_EQ(SerializedSize.MemorySize, 0);
CHECK_LE(SerializedSize.DiskSize, CacheSize.DiskSize);
@@ -1506,7 +1512,7 @@ TEST_CASE("cachestore.size")
{
Zcs.DropBucket(fmt::format("test_bucket-{}", Bucket));
}
- CHECK_EQ(0, Zcs.StorageSize().DiskSize);
+ CHECK_EQ(0, Zcs.TotalSize().DiskSize);
}
}
}
@@ -1569,7 +1575,7 @@ TEST_CASE("cachestore.threadedinsert") // * doctest::skip(true))
for (const auto& Chunk : Chunks)
{
ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, &Chunk]() {
- Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}, {});
+ Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}, {}, false);
WorkCompleted.fetch_add(1);
});
}
@@ -1599,7 +1605,7 @@ TEST_CASE("cachestore.threadedinsert") // * doctest::skip(true))
GcChunkHashes.swap(RemainingChunkHashes);
};
- const uint64_t TotalSize = Zcs.StorageSize().DiskSize;
+ const uint64_t TotalSize = Zcs.TotalSize().DiskSize;
CHECK_LE(kChunkSize * Chunks.size(), TotalSize);
{
@@ -1650,7 +1656,7 @@ TEST_CASE("cachestore.threadedinsert") // * doctest::skip(true))
for (const auto& Chunk : NewChunks)
{
ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, Chunk, &AddedChunkCount]() {
- Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}, {});
+ Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}, {}, false);
AddedChunkCount.fetch_add(1);
WorkCompleted.fetch_add(1);
});
@@ -1755,14 +1761,14 @@ TEST_CASE("cachestore.namespaces")
Buffer.SetContentType(ZenContentType::kCbObject);
ZenCacheValue PutValue = {.Value = Buffer};
- Zcs.Put(Context, ZenCacheStore::DefaultNamespace, Bucket, Key1, PutValue, {});
+ Zcs.Put(Context, ZenCacheStore::DefaultNamespace, Bucket, Key1, PutValue, {}, false);
ZenCacheValue GetValue;
CHECK(Zcs.Get(Context, ZenCacheStore::DefaultNamespace, Bucket, Key1, GetValue));
CHECK(!Zcs.Get(Context, CustomNamespace, Bucket, Key1, GetValue));
// This should just be dropped as we don't allow creating of namespaces on the fly
- Zcs.Put(Context, CustomNamespace, Bucket, Key1, PutValue, {});
+ Zcs.Put(Context, CustomNamespace, Bucket, Key1, PutValue, {}, false);
CHECK(!Zcs.Get(Context, CustomNamespace, Bucket, Key1, GetValue));
}
@@ -1778,7 +1784,7 @@ TEST_CASE("cachestore.namespaces")
IoBuffer Buffer2 = CacheValue2.GetBuffer().AsIoBuffer();
Buffer2.SetContentType(ZenContentType::kCbObject);
ZenCacheValue PutValue2 = {.Value = Buffer2};
- Zcs.Put(Context, CustomNamespace, Bucket, Key2, PutValue2, {});
+ Zcs.Put(Context, CustomNamespace, Bucket, Key2, PutValue2, {}, false);
ZenCacheValue GetValue;
CHECK(!Zcs.Get(Context, ZenCacheStore::DefaultNamespace, Bucket, Key2, GetValue));
@@ -1820,7 +1826,7 @@ TEST_CASE("cachestore.drop.bucket")
Buffer.SetContentType(ZenContentType::kCbObject);
ZenCacheValue PutValue = {.Value = Buffer};
- Zcs.Put(Context, Namespace, Bucket, Key, PutValue, {});
+ Zcs.Put(Context, Namespace, Bucket, Key, PutValue, {}, false);
return Key;
};
auto GetValue = [&Context](ZenCacheStore& Zcs, std::string_view Namespace, std::string_view Bucket, const IoHash& Key) {
@@ -1893,7 +1899,7 @@ TEST_CASE("cachestore.drop.namespace")
Buffer.SetContentType(ZenContentType::kCbObject);
ZenCacheValue PutValue = {.Value = Buffer};
- Zcs.Put(Context, Namespace, Bucket, Key, PutValue, {});
+ Zcs.Put(Context, Namespace, Bucket, Key, PutValue, {}, false);
return Key;
};
auto GetValue = [&Context](ZenCacheStore& Zcs, std::string_view Namespace, std::string_view Bucket, const IoHash& Key) {
@@ -1957,8 +1963,6 @@ TEST_CASE("cachestore.blocked.disklayer.put")
{
ScopedTemporaryDirectory TempDir;
- GcStorageSize CacheSize;
-
const auto CreateCacheValue = [](size_t Size) -> CbObject {
std::vector<uint8_t> Buf;
Buf.resize(Size, Size & 0xff);
@@ -1979,7 +1983,7 @@ TEST_CASE("cachestore.blocked.disklayer.put")
size_t Key = Buffer.Size();
IoHash HashKey = IoHash::HashBuffer(&Key, sizeof(uint32_t));
- Zcs.Put("test_bucket", HashKey, {.Value = Buffer}, {});
+ Zcs.Put("test_bucket", HashKey, {.Value = Buffer}, {}, false);
ZenCacheValue BufferGet;
CHECK(Zcs.Get("test_bucket", HashKey, BufferGet));
@@ -1989,7 +1993,7 @@ TEST_CASE("cachestore.blocked.disklayer.put")
Buffer2.SetContentType(ZenContentType::kCbObject);
// We should be able to overwrite even if the file is open for read
- Zcs.Put("test_bucket", HashKey, {.Value = Buffer2}, {});
+ Zcs.Put("test_bucket", HashKey, {.Value = Buffer2}, {}, false);
MemoryView OldView = BufferGet.Value.GetView();
@@ -2080,7 +2084,7 @@ TEST_CASE("cachestore.scrub")
AttachmentHashes.push_back(Attachment.DecodeRawHash());
CidStore.AddChunk(Attachment.GetCompressed().Flatten().AsIoBuffer(), AttachmentHashes.back());
}
- Zcs.Put("mybucket", Cid, {.Value = Record.Record}, AttachmentHashes);
+ Zcs.Put("mybucket", Cid, {.Value = Record.Record}, AttachmentHashes, false);
}
};
@@ -2094,8 +2098,8 @@ TEST_CASE("cachestore.scrub")
WorkerThreadPool ThreadPool{1};
ScrubContext ScrubCtx{ThreadPool};
- Zcs.ScrubStorage(ScrubCtx);
- CidStore.ScrubStorage(ScrubCtx);
+ Zcs.Scrub(ScrubCtx);
+ CidStore.Scrub(ScrubCtx);
CHECK(ScrubCtx.ScrubbedChunks() == (StructuredCids.size() + StructuredCids.size() * AttachmentSizes.size()) + UnstructuredCids.size());
CHECK(ScrubCtx.BadCids().GetSize() == 0);
}
@@ -2129,7 +2133,8 @@ TEST_CASE("cachestore.newgc.basics")
{.Value = Record.second,
.RawSize = Record.second.GetSize(),
.RawHash = IoHash::HashBuffer(Record.second.GetData(), Record.second.GetSize())},
- AttachmentKeys);
+ AttachmentKeys,
+ false);
for (const auto& Attachment : Attachments)
{
CidStore.AddChunk(Attachment.second.GetCompressed().Flatten().AsIoBuffer(), Attachment.second.DecodeRawHash());
@@ -2145,7 +2150,8 @@ TEST_CASE("cachestore.newgc.basics")
{.Value = CacheValue.second,
.RawSize = CacheValue.second.GetSize(),
.RawHash = IoHash::HashBuffer(CacheValue.second.GetData(), CacheValue.second.GetSize())},
- {});
+ {},
+ false);
CacheEntries.insert({Key, CacheEntry{CacheValue.second, {}}});
return Key;
};
diff --git a/src/zenstore/cas.cpp b/src/zenstore/cas.cpp
index 460f0e10d..6b89beb3d 100644
--- a/src/zenstore/cas.cpp
+++ b/src/zenstore/cas.cpp
@@ -73,9 +73,12 @@ public:
WorkerThreadPool* OptionalWorkerPool,
uint64_t LargeSizeLimit) override;
virtual void Flush() override;
- virtual void ScrubStorage(ScrubContext& Ctx) override;
virtual CidStoreSize TotalSize() const override;
+#if ZEN_WITH_TESTS
+ virtual void Scrub(ScrubContext& Ctx) override;
+#endif // ZEN_WITH_TESTS
+
private:
CasContainerStrategy m_TinyStrategy;
CasContainerStrategy m_SmallStrategy;
@@ -195,7 +198,7 @@ CasImpl::OpenOrCreateManifest()
}
else
{
- ZEN_WARN("Store manifest validation failed: {:#x}, will generate new manifest to recover", uint32_t(ValidationResult));
+ ZEN_WARN("Store manifest validation failed: {}, will generate new manifest to recover", ToString(ValidationResult));
}
if (ManifestIsOk)
@@ -463,24 +466,19 @@ CasImpl::Flush()
m_LargeStrategy.Flush();
}
+#if ZEN_WITH_TESTS
void
-CasImpl::ScrubStorage(ScrubContext& Ctx)
+CasImpl::Scrub(ScrubContext& Ctx)
{
ZEN_MEMSCOPE(GetCasTag());
ZEN_TRACE_CPU("Cas::ScrubStorage");
- if (m_LastScrubTime == Ctx.ScrubTimestamp())
- {
- return;
- }
-
- m_LastScrubTime = Ctx.ScrubTimestamp();
-
m_SmallStrategy.ScrubStorage(Ctx);
m_TinyStrategy.ScrubStorage(Ctx);
m_LargeStrategy.ScrubStorage(Ctx);
}
+#endif // ZEN_WITH_TESTS
CidStoreSize
CasImpl::TotalSize() const
@@ -523,7 +521,7 @@ TEST_CASE("CasStore")
WorkerThreadPool ThreadPool{1};
ScrubContext Ctx{ThreadPool};
- Store->ScrubStorage(Ctx);
+ Store->Scrub(Ctx);
IoBuffer Value1{16};
memcpy(Value1.MutableData(), "1234567890123456", 16);
diff --git a/src/zenstore/cas.h b/src/zenstore/cas.h
index e279dd2cc..0f6e2ba9d 100644
--- a/src/zenstore/cas.h
+++ b/src/zenstore/cas.h
@@ -50,12 +50,13 @@ public:
WorkerThreadPool* OptionalWorkerPool,
uint64_t LargeSizeLimit) = 0;
virtual void Flush() = 0;
- virtual void ScrubStorage(ScrubContext& Ctx) = 0;
virtual CidStoreSize TotalSize() const = 0;
+#if ZEN_WITH_TESTS
+ virtual void Scrub(ScrubContext& Ctx) = 0;
+#endif // ZEN_WITH_TESTS
protected:
CidStoreConfiguration m_Config;
- uint64_t m_LastScrubTime = 0;
};
ZENCORE_API std::unique_ptr<CasStore> CreateCasStore(GcManager& Gc);
diff --git a/src/zenstore/cidstore.cpp b/src/zenstore/cidstore.cpp
index 2ab769d04..ae1b59dc0 100644
--- a/src/zenstore/cidstore.cpp
+++ b/src/zenstore/cidstore.cpp
@@ -127,17 +127,9 @@ struct CidStore::Impl
void Flush() { m_CasStore.Flush(); }
- void ScrubStorage(ScrubContext& Ctx)
- {
- if (Ctx.ScrubTimestamp() == m_LastScrubTime)
- {
- return;
- }
-
- m_LastScrubTime = Ctx.ScrubTimestamp();
-
- m_CasStore.ScrubStorage(Ctx);
- }
+#if ZEN_WITH_TESTS
+ void Scrub(ScrubContext& Ctx) { m_CasStore.Scrub(Ctx); }
+#endif // ZEN_WITH_TESTS
CidStoreStats Stats()
{
@@ -236,11 +228,13 @@ CidStore::Flush()
m_Impl->Flush();
}
+#if ZEN_WITH_TESTS
void
-CidStore::ScrubStorage(ScrubContext& Ctx)
+CidStore::Scrub(ScrubContext& Ctx)
{
- m_Impl->ScrubStorage(Ctx);
+ m_Impl->Scrub(Ctx);
}
+#endif // ZEN_WITH_TESTS
CidStoreSize
CidStore::TotalSize() const
diff --git a/src/zenstore/compactcas.h b/src/zenstore/compactcas.h
index 15e4cbf81..32c256a42 100644
--- a/src/zenstore/compactcas.h
+++ b/src/zenstore/compactcas.h
@@ -68,10 +68,10 @@ struct CasContainerStrategy final : public GcStorage, public GcReferenceStore
void Flush();
// GcStorage
-
virtual void ScrubStorage(ScrubContext& ScrubCtx) override;
virtual GcStorageSize StorageSize() const override;
+ // GcReferenceStore
virtual std::string GetGcName(GcCtx& Ctx) override;
virtual GcReferencePruner* CreateReferencePruner(GcCtx& Ctx, GcReferenceStoreStats& Stats) override;
diff --git a/src/zenstore/include/zenstore/buildstore/buildstore.h b/src/zenstore/include/zenstore/buildstore/buildstore.h
index adf48dc26..87b7dd812 100644
--- a/src/zenstore/include/zenstore/buildstore/buildstore.h
+++ b/src/zenstore/include/zenstore/buildstore/buildstore.h
@@ -6,9 +6,8 @@
#include <zencore/iohash.h>
#include <zenstore/accesstime.h>
#include <zenstore/caslog.h>
+#include <zenstore/cidstore.h>
#include <zenstore/gc.h>
-#include "../compactcas.h"
-#include "../filecas.h"
ZEN_THIRD_PARTY_INCLUDES_START
#include <tsl/robin_map.h>
@@ -19,18 +18,13 @@ namespace zen {
struct BuildStoreConfig
{
std::filesystem::path RootDirectory;
- uint32_t SmallBlobBlockStoreMaxBlockSize = 256 * 1024 * 1024;
- uint64_t SmallBlobBlockStoreMaxBlockEmbedSize = 1 * 1024 * 1024;
- uint32_t SmallBlobBlockStoreAlignement = 16;
- uint32_t MetadataBlockStoreMaxBlockSize = 64 * 1024 * 1024;
- uint32_t MetadataBlockStoreAlignement = 8;
- uint64_t MaxDiskSpaceLimit = 1u * 1024u * 1024u * 1024u * 1024u; // 1TB
+ uint64_t MaxDiskSpaceLimit = 1u * 1024u * 1024u * 1024u * 1024u; // 1TB
};
-class BuildStore : public GcReferencer, public GcReferenceLocker, public GcStorage
+class BuildStore : public GcReferencer, public GcReferenceLocker
{
public:
- explicit BuildStore(const BuildStoreConfig& Config, GcManager& Gc);
+ explicit BuildStore(const BuildStoreConfig& Config, GcManager& Gc, CidStore& BlobStore);
virtual ~BuildStore();
void PutBlob(const IoHash& BlobHashes, const IoBuffer& Payload);
@@ -44,7 +38,7 @@ public:
std::vector<BlobExistsResult> BlobsExists(std::span<const IoHash> BlobHashes);
- void PutMetadatas(std::span<const IoHash> BlobHashes, std::span<const IoBuffer> MetaDatas);
+ void PutMetadatas(std::span<const IoHash> BlobHashes, std::span<const IoBuffer> MetaDatas, WorkerThreadPool* OptionalWorkerPool);
std::vector<IoBuffer> GetMetadatas(std::span<const IoHash> BlobHashes, WorkerThreadPool* OptionalWorkerPool);
void Flush();
@@ -52,10 +46,8 @@ public:
struct StorageStats
{
uint64_t EntryCount = 0;
- uint64_t LargeBlobCount = 0;
- uint64_t LargeBlobBytes = 0;
- uint64_t SmallBlobCount = 0;
- uint64_t SmallBlobBytes = 0;
+ uint64_t BlobCount = 0;
+ uint64_t BlobBytes = 0;
uint64_t MetadataCount = 0;
uint64_t MetadataByteCount = 0;
};
@@ -86,23 +78,18 @@ private:
//////// GcReferenceLocker
virtual std::vector<RwLock::SharedLockScope> LockState(GcCtx& Ctx) override;
- //////// GcStorage
- virtual void ScrubStorage(ScrubContext& ScrubCtx) override;
- virtual GcStorageSize StorageSize() const override;
-
#pragma pack(push)
#pragma pack(1)
struct PayloadEntry
{
PayloadEntry() {}
- PayloadEntry(uint64_t Flags, uint64_t Size)
+ PayloadEntry(uint8_t Flags, uint64_t Size)
{
ZEN_ASSERT((Size & 0x00ffffffffffffffu) == Size);
- ZEN_ASSERT((Flags & (kTombStone | kStandalone)) == Flags);
+ ZEN_ASSERT((Flags & (kTombStone)) == Flags);
FlagsAndSize = (Size << 8) | Flags;
}
- static const uint8_t kTombStone = 0x10u; // Represents a deleted key/value
- static const uint8_t kStandalone = 0x20u; // This payload is stored as a standalone value
+ static const uint8_t kTombStone = 0x10u; // Represents a deleted key/value
uint64_t FlagsAndSize = 0;
uint64_t GetSize() const { return FlagsAndSize >> 8; }
@@ -126,27 +113,47 @@ private:
struct MetadataEntry
{
- BlockStoreLocation Location; // 12 bytes
+ IoHash MetadataHash; // 20 bytes
+
+ MetadataEntry() {}
- ZenContentType ContentType = ZenContentType::kCOUNT; // 1 byte
- static const uint8_t kTombStone = 0x10u; // Represents a deleted key/value
- uint8_t Flags = 0; // 1 byte
+ MetadataEntry(const IoHash& Hash, uint64_t Size, ZenContentType ContentType, uint8_t Flags)
+ {
+ ZEN_ASSERT((Size & 0x0000ffffffffffffu) == Size);
+ ZEN_ASSERT((Flags & kTombStone) == Flags);
+ FlagsContentTypeAndSize = (Size << 16) | ((uint64_t)ContentType << 8) | Flags;
+ MetadataHash = Hash;
+ }
+
+ static const uint8_t kTombStone = 0x10u; // Represents a deleted key/value
+
+ uint64_t GetSize() const { return FlagsContentTypeAndSize >> 16; }
+ void SetSize(uint64_t Size)
+ {
+ ZEN_ASSERT((Size & 0x0000ffffffffffffu) == Size);
+ FlagsContentTypeAndSize = (Size << 16) | (FlagsContentTypeAndSize & 0xffff);
+ }
+
+ uint8_t GetFlags() const { return uint8_t(FlagsContentTypeAndSize & 0xff); }
+ void AddFlag(uint8_t Flag) { FlagsContentTypeAndSize |= Flag; }
+ void SetFlags(uint8_t Flags) { FlagsContentTypeAndSize = (FlagsContentTypeAndSize & 0xffffffffffffff00u) | Flags; }
+
+ ZenContentType GetContentType() const { return ZenContentType((FlagsContentTypeAndSize >> 8) & 0xff); }
+ void SetContentType(ZenContentType ContentType)
+ {
+ FlagsContentTypeAndSize = (FlagsContentTypeAndSize & 0xffffffffffff00ffu) | (uint16_t(ContentType) << 8);
+ }
- uint8_t Reserved1 = 0;
- uint8_t Reserved2 = 0;
+ uint64_t FlagsContentTypeAndSize = ((uint64_t)ZenContentType::kCOUNT << 8);
};
- static_assert(sizeof(MetadataEntry) == 16);
+ static_assert(sizeof(MetadataEntry) == 28);
struct MetadataDiskEntry
{
- MetadataEntry Entry; // 16 bytes
+ MetadataEntry Entry; // 28 bytes
IoHash BlobHash; // 20 bytes
- uint8_t Reserved1 = 0;
- uint8_t Reserved2 = 0;
- uint8_t Reserved3 = 0;
- uint8_t Reserved4 = 0;
};
- static_assert(sizeof(MetadataDiskEntry) == 40);
+ static_assert(sizeof(MetadataDiskEntry) == 48);
#pragma pack(pop)
@@ -206,17 +213,15 @@ private:
std::vector<BlobEntry> m_BlobEntries;
tsl::robin_map<IoHash, BlobIndex, IoHash::Hasher> m_BlobLookup;
- FileCasStrategy m_LargeBlobStore;
- CasContainerStrategy m_SmallBlobStore;
- BlockStore m_MetadataBlockStore;
+ CidStore& m_BlobStore;
TCasLogFile<PayloadDiskEntry> m_PayloadlogFile;
TCasLogFile<MetadataDiskEntry> m_MetadatalogFile;
uint64_t m_BlobLogFlushPosition = 0;
uint64_t m_MetaLogFlushPosition = 0;
- std::unique_ptr<HashSet> m_TrackedCacheKeys;
- std::atomic<uint64_t> m_LastAccessTimeUpdateCount;
+ std::unique_ptr<std::vector<IoHash>> m_TrackedBlobKeys;
+ std::atomic<uint64_t> m_LastAccessTimeUpdateCount;
friend class BuildStoreGcReferenceChecker;
friend class BuildStoreGcReferencePruner;
diff --git a/src/zenstore/include/zenstore/cache/cachedisklayer.h b/src/zenstore/include/zenstore/cache/cachedisklayer.h
index 3cd2d6423..49c52f847 100644
--- a/src/zenstore/include/zenstore/cache/cachedisklayer.h
+++ b/src/zenstore/include/zenstore/cache/cachedisklayer.h
@@ -106,6 +106,12 @@ static_assert(sizeof(DiskIndexEntry) == 32);
//////////////////////////////////////////////////////////////////////////
+struct CacheStoreSize
+{
+ uint64_t DiskSize = 0;
+ uint64_t MemorySize = 0;
+};
+
class ZenCacheDiskLayer
{
public:
@@ -115,6 +121,7 @@ public:
uint32_t PayloadAlignment = 1u << 4;
uint64_t MemCacheSizeThreshold = 1 * 1024;
uint64_t LargeObjectThreshold = 128 * 1024;
+ bool LimitOverwrites = false;
};
struct Configuration
@@ -130,8 +137,8 @@ public:
struct BucketInfo
{
- uint64_t EntryCount = 0;
- GcStorageSize StorageSize;
+ uint64_t EntryCount = 0;
+ CacheStoreSize StorageSize;
};
struct Info
@@ -140,7 +147,7 @@ public:
Configuration Config;
std::vector<std::string> BucketNames;
uint64_t EntryCount = 0;
- GcStorageSize StorageSize;
+ CacheStoreSize StorageSize;
};
struct BucketStats
@@ -170,6 +177,12 @@ public:
uint64_t MemorySize;
};
+ struct PutResult
+ {
+ zen::PutStatus Status;
+ std::string Message;
+ };
+
explicit ZenCacheDiskLayer(GcManager& Gc, JobQueue& JobQueue, const std::filesystem::path& RootDir, const Configuration& Config);
~ZenCacheDiskLayer();
@@ -180,22 +193,22 @@ public:
void Get(std::string_view Bucket, const IoHash& HashKey, GetBatchHandle& BatchHandle);
struct PutBatchHandle;
- PutBatchHandle* BeginPutBatch(std::vector<bool>& OutResult);
+ PutBatchHandle* BeginPutBatch(std::vector<PutResult>& OutResult);
void EndPutBatch(PutBatchHandle* Batch) noexcept;
- void Put(std::string_view Bucket,
+ PutResult Put(std::string_view Bucket,
const IoHash& HashKey,
const ZenCacheValue& Value,
std::span<IoHash> References,
+ bool Overwrite,
PutBatchHandle* OptionalBatchHandle);
std::function<void()> Drop();
std::function<void()> DropBucket(std::string_view Bucket);
void Flush();
- void ScrubStorage(ScrubContext& Ctx);
- void DiscoverBuckets();
- GcStorageSize StorageSize() const;
- DiskStats Stats() const;
+ void DiscoverBuckets();
+ CacheStoreSize TotalSize() const;
+ DiskStats Stats() const;
Info GetInfo() const;
std::optional<BucketInfo> GetBucketInfo(std::string_view Bucket) const;
@@ -212,6 +225,7 @@ public:
#if ZEN_WITH_TESTS
void SetAccessTime(std::string_view Bucket, const IoHash& HashKey, GcClock::TimePoint Time);
+ void Scrub(ScrubContext& Ctx);
#endif // ZEN_WITH_TESTS
bool GetContentStats(std::string_view BucketName, CacheContentStats& OutContentStats) const;
@@ -219,7 +233,7 @@ public:
/** A cache bucket manages a single directory containing
metadata and data for that bucket
*/
- struct CacheBucket : public GcReferencer
+ struct CacheBucket : public GcReferencer, public GcStorage
{
CacheBucket(GcManager& Gc,
std::atomic_uint64_t& OuterCacheMemoryUsage,
@@ -236,13 +250,22 @@ public:
void Get(const IoHash& HashKey, GetBatchHandle& BatchHandle);
struct PutBatchHandle;
- PutBatchHandle* BeginPutBatch(std::vector<bool>& OutResult);
- void EndPutBatch(PutBatchHandle* Batch) noexcept;
- void Put(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References, PutBatchHandle* OptionalBatchHandle);
- uint64_t MemCacheTrim(GcClock::TimePoint ExpireTime);
- std::function<void()> Drop();
- void Flush();
- void ScrubStorage(ScrubContext& Ctx);
+ PutBatchHandle* BeginPutBatch(std::vector<ZenCacheDiskLayer::PutResult>& OutResult);
+ void EndPutBatch(PutBatchHandle* Batch) noexcept;
+ PutResult Put(const IoHash& HashKey,
+ const ZenCacheValue& Value,
+ std::span<IoHash> References,
+ bool Overwrite,
+ PutBatchHandle* OptionalBatchHandle);
+ uint64_t MemCacheTrim(GcClock::TimePoint ExpireTime);
+ std::function<void()> Drop();
+ void Flush();
+ inline CacheStoreSize TotalSize() const
+ {
+ return {.DiskSize = m_StandaloneSize.load(std::memory_order::relaxed) + m_BlockStore.TotalSize(),
+ .MemorySize = m_MemCachedSize.load(std::memory_order::relaxed)};
+ }
+
RwLock::SharedLockScope GetGcReferencerLock();
struct ReferencesStats
@@ -265,11 +288,6 @@ public:
std::span<const std::size_t> ChunkIndexes,
std::vector<IoHash>& OutReferences) const;
- inline GcStorageSize StorageSize() const
- {
- return {.DiskSize = m_StandaloneSize.load(std::memory_order::relaxed) + m_BlockStore.TotalSize(),
- .MemorySize = m_MemCachedSize.load(std::memory_order::relaxed)};
- }
uint64_t EntryCount() const;
BucketStats Stats();
@@ -281,6 +299,20 @@ public:
void SetAccessTime(const IoHash& HashKey, GcClock::TimePoint Time);
#endif // ZEN_WITH_TESTS
+ // GcReferencer
+ virtual std::string GetGcName(GcCtx& Ctx) override;
+ virtual GcStoreCompactor* RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) override;
+ virtual std::vector<GcReferenceChecker*> CreateReferenceCheckers(GcCtx& Ctx) override;
+ virtual std::vector<GcReferenceValidator*> CreateReferenceValidators(GcCtx& Ctx) override;
+
+ // GcStorage
+ virtual void ScrubStorage(ScrubContext& Ctx) override;
+ virtual GcStorageSize StorageSize() const override
+ {
+ CacheStoreSize Size = TotalSize();
+ return {.DiskSize = Size.DiskSize, .MemorySize = Size.MemorySize};
+ }
+
private:
#pragma pack(push)
#pragma pack(1)
@@ -379,19 +411,16 @@ public:
std::atomic_uint64_t m_StandaloneSize{};
std::atomic_uint64_t m_MemCachedSize{};
- virtual std::string GetGcName(GcCtx& Ctx) override;
- virtual GcStoreCompactor* RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) override;
- virtual std::vector<GcReferenceChecker*> CreateReferenceCheckers(GcCtx& Ctx) override;
- virtual std::vector<GcReferenceValidator*> CreateReferenceValidators(GcCtx& Ctx) override;
-
- void BuildPath(PathBuilderBase& Path, const IoHash& HashKey) const;
- void PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References);
- IoBuffer GetStandaloneCacheValue(const DiskLocation& Loc, const IoHash& HashKey) const;
- void PutInlineCacheValue(const IoHash& HashKey,
- const ZenCacheValue& Value,
- std::span<IoHash> References,
- PutBatchHandle* OptionalBatchHandle = nullptr);
- IoBuffer GetInlineCacheValue(const DiskLocation& Loc) const;
+ void BuildPath(PathBuilderBase& Path, const IoHash& HashKey) const;
+ bool ShouldRejectPut(const IoHash& HashKey, ZenCacheValue& InOutValue, bool Overwrite, ZenCacheDiskLayer::PutResult& OutPutResult);
+ void PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References);
+ IoBuffer GetStandaloneCacheValue(const DiskLocation& Loc, const IoHash& HashKey) const;
+ PutResult PutInlineCacheValue(const IoHash& HashKey,
+ const ZenCacheValue& Value,
+ std::span<IoHash> References,
+ bool Overwrite,
+ PutBatchHandle* OptionalBatchHandle = nullptr);
+ IoBuffer GetInlineCacheValue(const DiskLocation& Loc) const;
CacheValueDetails::ValueDetails GetValueDetails(RwLock::SharedLockScope&, const IoHash& Key, PayloadIndex Index) const;
void SetMetaData(RwLock::ExclusiveLockScope&,
diff --git a/src/zenstore/include/zenstore/cache/cacherpc.h b/src/zenstore/include/zenstore/cache/cacherpc.h
index da8cf69fe..80340d72c 100644
--- a/src/zenstore/include/zenstore/cache/cacherpc.h
+++ b/src/zenstore/include/zenstore/cache/cacherpc.h
@@ -4,6 +4,7 @@
#include <zencore/iobuffer.h>
#include <zencore/logging.h>
+#include <zenstore/cache/cacheshared.h>
#include <zenutil/cache/cache.h>
#include <atomic>
@@ -56,13 +57,6 @@ struct CacheStats
std::atomic_uint64_t RpcChunkBatchRequests{};
};
-enum class PutResult
-{
- Success,
- Fail,
- Invalid,
-};
-
/** Recognize both kBinary and kCompressedBinary as kCompressedBinary for structured cache value keys.
We need this until the content type is preserved for kCompressedBinary when passing to and from upstream servers.
@@ -76,11 +70,13 @@ IsCompressedBinary(ZenContentType Type)
struct CacheRpcHandler
{
+ typedef std::function<CidStore&(std::string_view Context)> GetCidStoreFunc;
+
CacheRpcHandler(LoggerRef InLog,
CacheStats& InCacheStats,
UpstreamCacheClient& InUpstreamCache,
ZenCacheStore& InCacheStore,
- CidStore& InCidStore,
+ GetCidStoreFunc&& InGetCidStore,
const DiskWriteBlocker* InDiskWriteBlocker);
~CacheRpcHandler();
@@ -100,6 +96,8 @@ struct CacheRpcHandler
int& OutTargetProcessId,
CbPackage& OutPackage);
+ CidStore& GetCidStore(std::string_view Namespace);
+
private:
CbPackage HandleRpcPutCacheRecords(const CacheRequestContext& Context, const CbPackage& BatchRequest);
CbPackage HandleRpcGetCacheRecords(const CacheRequestContext& Context, CbObjectView BatchRequest);
@@ -107,7 +105,7 @@ private:
CbPackage HandleRpcGetCacheValues(const CacheRequestContext& Context, CbObjectView BatchRequest);
CbPackage HandleRpcGetCacheChunks(const CacheRequestContext& Context, RpcAcceptOptions AcceptOptions, CbObjectView BatchRequest);
- PutResult PutCacheRecord(PutRequestData& Request, const CbPackage* Package);
+ PutStatus PutCacheRecord(PutRequestData& Request, const CbPackage* Package);
/** HandleRpcGetCacheChunks Helper: Parse the Body object into RecordValue Requests and Value Requests. */
bool ParseGetCacheChunksRequest(std::string& Namespace,
@@ -148,7 +146,7 @@ private:
CacheStats& m_CacheStats;
UpstreamCacheClient& m_UpstreamCache;
ZenCacheStore& m_CacheStore;
- CidStore& m_CidStore;
+ GetCidStoreFunc m_GetCidStore;
const DiskWriteBlocker* m_DiskWriteBlocker = nullptr;
bool AreDiskWritesAllowed() const;
diff --git a/src/zenstore/include/zenstore/cache/cacheshared.h b/src/zenstore/include/zenstore/cache/cacheshared.h
index ef1b803de..8f40ae727 100644
--- a/src/zenstore/include/zenstore/cache/cacheshared.h
+++ b/src/zenstore/include/zenstore/cache/cacheshared.h
@@ -69,6 +69,14 @@ struct CacheContentStats
std::vector<IoHash> Attachments;
};
+enum class PutStatus
+{
+ Success,
+ Fail,
+ Conflict,
+ Invalid,
+};
+
bool IsKnownBadBucketName(std::string_view BucketName);
bool ValidateIoBuffer(ZenContentType ContentType, IoBuffer Buffer);
diff --git a/src/zenstore/include/zenstore/cache/structuredcachestore.h b/src/zenstore/include/zenstore/cache/structuredcachestore.h
index 48fc17960..c51d7312c 100644
--- a/src/zenstore/include/zenstore/cache/structuredcachestore.h
+++ b/src/zenstore/include/zenstore/cache/structuredcachestore.h
@@ -49,7 +49,7 @@ class JobQueue;
*/
-class ZenCacheNamespace final : public GcStorage
+class ZenCacheNamespace final
{
public:
struct Configuration
@@ -78,24 +78,27 @@ public:
ZenCacheDiskLayer::DiskStats DiskStats;
};
+ using PutResult = ZenCacheDiskLayer::PutResult;
+
ZenCacheNamespace(GcManager& Gc, JobQueue& JobQueue, const std::filesystem::path& RootDir, const Configuration& Config);
~ZenCacheNamespace();
struct PutBatchHandle;
- PutBatchHandle* BeginPutBatch(std::vector<bool>& OutResults);
+ PutBatchHandle* BeginPutBatch(std::vector<PutResult>& OutResults);
void EndPutBatch(PutBatchHandle* Batch) noexcept;
struct GetBatchHandle;
GetBatchHandle* BeginGetBatch(ZenCacheValueVec_t& OutResults);
void EndGetBatch(GetBatchHandle* Batch) noexcept;
- bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue);
- void Get(std::string_view Bucket, const IoHash& HashKey, GetBatchHandle& OptionalBatchHandle);
- void Put(std::string_view Bucket,
- const IoHash& HashKey,
- const ZenCacheValue& Value,
- std::span<IoHash> References,
- PutBatchHandle* OptionalBatchHandle = nullptr);
+ bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue);
+ void Get(std::string_view Bucket, const IoHash& HashKey, GetBatchHandle& OptionalBatchHandle);
+ PutResult Put(std::string_view Bucket,
+ const IoHash& HashKey,
+ const ZenCacheValue& Value,
+ std::span<IoHash> References,
+ bool Overwrite,
+ PutBatchHandle* OptionalBatchHandle = nullptr);
bool DropBucket(std::string_view Bucket);
void EnumerateBucketContents(std::string_view Bucket,
@@ -104,10 +107,7 @@ public:
std::function<void()> Drop();
void Flush();
- // GcStorage
- virtual void ScrubStorage(ScrubContext& ScrubCtx) override;
- virtual GcStorageSize StorageSize() const override;
-
+ CacheStoreSize TotalSize() const;
Configuration GetConfig() const { return m_Configuration; }
Info GetInfo() const;
std::optional<BucketInfo> GetBucketInfo(std::string_view Bucket) const;
@@ -124,6 +124,7 @@ public:
#if ZEN_WITH_TESTS
void SetAccessTime(std::string_view Bucket, const IoHash& HashKey, GcClock::TimePoint Time);
+ void Scrub(ScrubContext& ScrubCtx);
#endif // ZEN_WITH_TESTS
private:
@@ -137,7 +138,6 @@ private:
std::atomic<uint64_t> m_WriteCount{};
metrics::RequestStats m_PutOps;
metrics::RequestStats m_GetOps;
- uint64_t m_LastScrubTime = 0;
ZenCacheNamespace(const ZenCacheNamespace&) = delete;
ZenCacheNamespace& operator=(const ZenCacheNamespace&) = delete;
@@ -175,7 +175,7 @@ public:
Configuration Config;
std::vector<std::string> NamespaceNames;
uint64_t DiskEntryCount = 0;
- GcStorageSize StorageSize;
+ CacheStoreSize StorageSize;
};
struct NamedNamespaceStats
@@ -196,6 +196,8 @@ public:
std::vector<NamedNamespaceStats> NamespaceStats;
};
+ using PutResult = ZenCacheNamespace::PutResult;
+
ZenCacheStore(GcManager& Gc,
JobQueue& JobQueue,
const std::filesystem::path& BasePath,
@@ -206,7 +208,7 @@ public:
class PutBatch
{
public:
- PutBatch(ZenCacheStore& CacheStore, std::string_view Namespace, std::vector<bool>& OutResult);
+ PutBatch(ZenCacheStore& CacheStore, std::string_view Namespace, std::vector<PutResult>& OutResult);
~PutBatch();
private:
@@ -243,24 +245,24 @@ public:
const IoHash& HashKey,
GetBatch& BatchHandle);
- void Put(const CacheRequestContext& Context,
- std::string_view Namespace,
- std::string_view Bucket,
- const IoHash& HashKey,
- const ZenCacheValue& Value,
- std::span<IoHash> References,
- PutBatch* OptionalBatchHandle = nullptr);
+ PutResult Put(const CacheRequestContext& Context,
+ std::string_view Namespace,
+ std::string_view Bucket,
+ const IoHash& HashKey,
+ const ZenCacheValue& Value,
+ std::span<IoHash> References,
+ bool Overwrite,
+ PutBatch* OptionalBatchHandle = nullptr);
bool DropBucket(std::string_view Namespace, std::string_view Bucket);
bool DropNamespace(std::string_view Namespace);
void Flush();
- void ScrubStorage(ScrubContext& Ctx);
CacheValueDetails GetValueDetails(const std::string_view NamespaceFilter,
const std::string_view BucketFilter,
const std::string_view ValueFilter) const;
- GcStorageSize StorageSize() const;
+ CacheStoreSize TotalSize() const;
CacheStoreStats Stats(bool IncludeNamespaceStats = true);
Configuration GetConfiguration() const { return m_Configuration; }
@@ -290,6 +292,10 @@ public:
bool GetContentStats(std::string_view Namespace, std::string_view BucketName, CacheContentStats& OutContentStats) const;
+#if ZEN_WITH_TESTS
+ void Scrub(ScrubContext& Ctx);
+#endif // ZEN_WITH_TESTS
+
private:
const ZenCacheNamespace* FindNamespace(std::string_view Namespace) const;
ZenCacheNamespace* GetNamespace(std::string_view Namespace);
diff --git a/src/zenstore/include/zenstore/cache/upstreamcacheclient.h b/src/zenstore/include/zenstore/cache/upstreamcacheclient.h
index 152031c3a..c3993c028 100644
--- a/src/zenstore/include/zenstore/cache/upstreamcacheclient.h
+++ b/src/zenstore/include/zenstore/cache/upstreamcacheclient.h
@@ -113,7 +113,7 @@ public:
std::span<CacheChunkRequest*> CacheChunkRequests,
OnCacheChunksGetComplete&& OnComplete) = 0;
- virtual void EnqueueUpstream(UpstreamCacheRecord CacheRecord) = 0;
+ virtual void EnqueueUpstream(UpstreamCacheRecord CacheRecord, std::function<IoBuffer(const IoHash&)>&& GetValueFunc) = 0;
};
} // namespace zen
diff --git a/src/zenstore/include/zenstore/cidstore.h b/src/zenstore/include/zenstore/cidstore.h
index b3d00fec0..8918b119f 100644
--- a/src/zenstore/include/zenstore/cidstore.h
+++ b/src/zenstore/include/zenstore/cidstore.h
@@ -87,12 +87,15 @@ public:
bool ContainsChunk(const IoHash& DecompressedId);
void FilterChunks(HashKeySet& InOutChunks);
void Flush();
- void ScrubStorage(ScrubContext& Ctx);
CidStoreSize TotalSize() const;
CidStoreStats Stats() const;
virtual void ReportMetrics(StatsMetrics& Statsd) override;
+#if ZEN_WITH_TESTS
+ void Scrub(ScrubContext& Ctx);
+#endif // ZEN_WITH_TESTS
+
private:
struct Impl;
std::unique_ptr<CasStore> m_CasStore;