aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/cache/httpstructuredcache.cpp
diff options
context:
space:
mode:
authorLiam Mitchell <[email protected]>2025-08-21 23:58:51 +0000
committerLiam Mitchell <[email protected]>2025-08-21 23:58:51 +0000
commit33209bd6931f49362dfc2d62c6cb6b87a42c99e1 (patch)
treecfc7914634088b3f4feac2d4cec0b5650dfdcc3c /src/zenserver/cache/httpstructuredcache.cpp
parentFix changelog merge issues (diff)
parentavoid new in static IoBuffer (#472) (diff)
downloadzen-33209bd6931f49362dfc2d62c6cb6b87a42c99e1.tar.xz
zen-33209bd6931f49362dfc2d62c6cb6b87a42c99e1.zip
Merge remote-tracking branch 'origin/main' into de/zen-service-command
Diffstat (limited to 'src/zenserver/cache/httpstructuredcache.cpp')
-rw-r--r--src/zenserver/cache/httpstructuredcache.cpp420
1 files changed, 283 insertions, 137 deletions
diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp
index bb0c55618..19ac3a216 100644
--- a/src/zenserver/cache/httpstructuredcache.cpp
+++ b/src/zenserver/cache/httpstructuredcache.cpp
@@ -85,7 +85,7 @@ namespace {
//////////////////////////////////////////////////////////////////////////
HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCacheStore,
- CidStore& InCidStore,
+ GetCidStoreFunc&& GetCidStore,
HttpStatsService& StatsService,
HttpStatusService& StatusService,
UpstreamCache& UpstreamCache,
@@ -95,11 +95,10 @@ HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCach
, m_CacheStore(InCacheStore)
, m_StatsService(StatsService)
, m_StatusService(StatusService)
-, m_CidStore(InCidStore)
, m_UpstreamCache(UpstreamCache)
, m_DiskWriteBlocker(InDiskWriteBlocker)
, m_OpenProcessCache(InOpenProcessCache)
-, m_RpcHandler(m_Log, m_CacheStats, UpstreamCache, InCacheStore, InCidStore, InDiskWriteBlocker)
+, m_RpcHandler(m_Log, m_CacheStats, UpstreamCache, InCacheStore, std::move(GetCidStore), InDiskWriteBlocker)
{
m_StatsService.RegisterHandler("z$", *this);
m_StatusService.RegisterHandler("z$", *this);
@@ -131,24 +130,6 @@ HttpStructuredCacheService::Flush()
}
void
-HttpStructuredCacheService::ScrubStorage(ScrubContext& Ctx)
-{
- if (m_LastScrubTime == Ctx.ScrubTimestamp())
- {
- return;
- }
-
- ZenCacheStore::Info Info = m_CacheStore.GetInfo();
-
- ZEN_INFO("scrubbing '{}'", Info.BasePath);
-
- m_LastScrubTime = Ctx.ScrubTimestamp();
-
- m_CidStore.ScrubStorage(Ctx);
- m_CacheStore.ScrubStorage(Ctx);
-}
-
-void
HttpStructuredCacheService::HandleDetailsRequest(HttpServerRequest& Request)
{
std::string_view Key = Request.RelativeUri();
@@ -243,6 +224,9 @@ HttpStructuredCacheService::HandleDetailsRequest(HttpServerRequest& Request)
for (const auto& NamespaceIt : ValueDetails.Namespaces)
{
const std::string& Namespace = NamespaceIt.first;
+
+ CidStore& ChunkStore = m_RpcHandler.GetCidStore(Namespace);
+
for (const auto& BucketIt : NamespaceIt.second.Buckets)
{
const std::string& Bucket = BucketIt.first;
@@ -252,7 +236,7 @@ HttpStructuredCacheService::HandleDetailsRequest(HttpServerRequest& Request)
{
for (const IoHash& Hash : ValueIt.second.Attachments)
{
- IoBuffer Payload = m_CidStore.FindChunkByCid(Hash);
+ IoBuffer Payload = ChunkStore.FindChunkByCid(Hash);
CSVWriter << "\r\n"
<< Namespace << "," << Bucket << "," << ValueIt.first.ToHexString() << ", " << Hash.ToHexString()
<< ", " << gsl::narrow<uint64_t>(Payload.GetSize());
@@ -270,7 +254,7 @@ HttpStructuredCacheService::HandleDetailsRequest(HttpServerRequest& Request)
size_t AttachmentsSize = 0;
for (const IoHash& Hash : ValueIt.second.Attachments)
{
- IoBuffer Payload = m_CidStore.FindChunkByCid(Hash);
+ IoBuffer Payload = ChunkStore.FindChunkByCid(Hash);
AttachmentsSize += Payload.GetSize();
}
CSVWriter << ", " << gsl::narrow<uint64_t>(AttachmentsSize);
@@ -292,6 +276,9 @@ HttpStructuredCacheService::HandleDetailsRequest(HttpServerRequest& Request)
for (const auto& NamespaceIt : ValueDetails.Namespaces)
{
const std::string& Namespace = NamespaceIt.first;
+
+ CidStore& ChunkStore = m_RpcHandler.GetCidStore(Namespace);
+
Cbo.BeginObject();
{
Cbo.AddString("name", Namespace);
@@ -334,7 +321,7 @@ HttpStructuredCacheService::HandleDetailsRequest(HttpServerRequest& Request)
{
Cbo.BeginObject();
Cbo.AddHash("cid", Hash);
- IoBuffer Payload = m_CidStore.FindChunkByCid(Hash);
+ IoBuffer Payload = ChunkStore.FindChunkByCid(Hash);
Cbo.AddInteger("size", gsl::narrow<uint64_t>(Payload.GetSize()));
Cbo.EndObject();
}
@@ -348,7 +335,7 @@ HttpStructuredCacheService::HandleDetailsRequest(HttpServerRequest& Request)
size_t AttachmentsSize = 0;
for (const IoHash& Hash : ValueIt.second.Attachments)
{
- IoBuffer Payload = m_CidStore.FindChunkByCid(Hash);
+ IoBuffer Payload = ChunkStore.FindChunkByCid(Hash);
AttachmentsSize += Payload.GetSize();
}
Cbo.AddInteger("attachmentssize", gsl::narrow<uint64_t>(AttachmentsSize));
@@ -623,6 +610,8 @@ HttpStructuredCacheService::HandleCacheNamespaceRequest(HttpServerRequest& Reque
ResponseWriter.AddInteger("EntryCount", Info->DiskLayerInfo.EntryCount);
+ CidStore& ChunkStore = m_RpcHandler.GetCidStore(NamespaceName);
+
if (auto Buckets = HttpServerRequest::Decode(Request.GetQueryParams().GetValue("bucketsizes")); !Buckets.empty())
{
ResponseWriter.BeginObject("BucketSizes");
@@ -681,7 +670,7 @@ HttpStructuredCacheService::HandleCacheNamespaceRequest(HttpServerRequest& Reque
uint64_t AttachmentsSize = 0;
- m_CidStore.IterateChunks(
+ ChunkStore.IterateChunks(
AllAttachments,
[&](size_t Index, const IoBuffer& Payload) {
ZEN_UNUSED(Index);
@@ -749,6 +738,8 @@ HttpStructuredCacheService::HandleCacheBucketRequest(HttpServerRequest& Request,
ResponseWriter.AddInteger("DiskEntryCount", Info->DiskLayerInfo.EntryCount);
+ CidStore& ChunkStore = m_RpcHandler.GetCidStore(NamespaceName);
+
if (auto GetBucketSize = Request.GetQueryParams().GetValue("bucketsize"); GetBucketSize == "true")
{
CacheContentStats ContentStats;
@@ -775,7 +766,7 @@ HttpStructuredCacheService::HandleCacheBucketRequest(HttpServerRequest& Request,
WorkerThreadPool& WorkerPool = GetMediumWorkerPool(EWorkloadType::Background);
- m_CidStore.IterateChunks(
+ ChunkStore.IterateChunks(
ContentStats.Attachments,
[&](size_t Index, const IoBuffer& Payload) {
ZEN_UNUSED(Index);
@@ -850,6 +841,8 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con
CacheRequestContext RequestContext = {.SessionId = Request.SessionId(), .RequestId = Request.RequestId()};
Stopwatch Timer;
+ CidStore& ChunkStore = m_RpcHandler.GetCidStore(Ref.Namespace);
+
if (EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal) &&
m_CacheStore.Get(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ClientResultValue))
{
@@ -864,17 +857,17 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con
uint32_t MissingCount = 0;
CbObjectView CacheRecord(ClientResultValue.Value.Data());
- CacheRecord.IterateAttachments([this, &MissingCount, &Package, SkipData](CbFieldView AttachmentHash) {
+ CacheRecord.IterateAttachments([this, &ChunkStore, &MissingCount, &Package, SkipData](CbFieldView AttachmentHash) {
if (SkipData)
{
- if (!m_CidStore.ContainsChunk(AttachmentHash.AsHash()))
+ if (!ChunkStore.ContainsChunk(AttachmentHash.AsHash()))
{
MissingCount++;
}
}
else
{
- if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash()))
+ if (IoBuffer Chunk = ChunkStore.FindChunkByCid(AttachmentHash.AsHash()))
{
CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(Chunk));
if (Compressed)
@@ -974,6 +967,8 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con
{
Success = true;
+ CidStore& ChunkStore = m_RpcHandler.GetCidStore(Ref.Namespace);
+
ClientResultValue.Value = UpstreamResult.Value;
ClientResultValue.Value.SetContentType(AcceptType);
@@ -997,8 +992,14 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con
if (Success && StoreLocal)
{
- m_CacheStore.Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ClientResultValue, {}, nullptr);
- m_CacheStats.WriteCount++;
+ const bool Overwrite = !EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal);
+ ZenCacheStore::PutResult PutResult =
+ m_CacheStore
+ .Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ClientResultValue, {}, Overwrite, nullptr);
+ if (PutResult.Status == zen::PutStatus::Success)
+ {
+ m_CacheStats.WriteCount++;
+ }
}
}
else if (AcceptType == ZenContentType::kCbPackage)
@@ -1018,6 +1019,7 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con
CacheRecord.IterateAttachments([this,
&Package,
&Ref,
+ &ChunkStore,
&WriteAttachmentBuffers,
&WriteRawHashes,
&ReferencedAttachments,
@@ -1052,12 +1054,12 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con
{
if (SkipData)
{
- if (m_CidStore.ContainsChunk(Hash))
+ if (ChunkStore.ContainsChunk(Hash))
{
Count.Valid++;
}
}
- else if (IoBuffer Chunk = m_CidStore.FindChunkByCid(Hash))
+ else if (IoBuffer Chunk = ChunkStore.FindChunkByCid(Hash))
{
CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(Chunk));
if (Compressed)
@@ -1083,30 +1085,35 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con
if (StoreLocal)
{
- m_CacheStore.Put(RequestContext,
- Ref.Namespace,
- Ref.BucketSegment,
- Ref.HashKey,
- CacheValue,
- ReferencedAttachments,
- nullptr);
- m_CacheStats.WriteCount++;
-
- if (!WriteAttachmentBuffers.empty())
+ const bool Overwrite = !EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal);
+ ZenCacheStore::PutResult PutResult = m_CacheStore.Put(RequestContext,
+ Ref.Namespace,
+ Ref.BucketSegment,
+ Ref.HashKey,
+ CacheValue,
+ ReferencedAttachments,
+ Overwrite,
+ nullptr);
+ if (PutResult.Status == zen::PutStatus::Success)
{
- std::vector<CidStore::InsertResult> InsertResults =
- m_CidStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes);
- for (const CidStore::InsertResult& Result : InsertResults)
+ m_CacheStats.WriteCount++;
+
+ if (!WriteAttachmentBuffers.empty())
{
- if (Result.New)
+ std::vector<CidStore::InsertResult> InsertResults =
+ ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes);
+ for (const CidStore::InsertResult& Result : InsertResults)
{
- Count.New++;
+ if (Result.New)
+ {
+ Count.New++;
+ }
}
}
- }
- WriteAttachmentBuffers = {};
- WriteRawHashes = {};
+ WriteAttachmentBuffers = {};
+ WriteRawHashes = {};
+ }
}
BinaryWriter MemStream;
@@ -1197,6 +1204,24 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con
return Request.WriteResponse(HttpResponseCode::InsufficientStorage);
}
+ auto WriteFailureResponse = [&Request](const ZenCacheStore::PutResult& PutResult) {
+ ZEN_UNUSED(PutResult);
+
+ HttpResponseCode ResponseCode = HttpResponseCode::InternalServerError;
+ switch (PutResult.Status)
+ {
+ case zen::PutStatus::Conflict:
+ ResponseCode = HttpResponseCode::Conflict;
+ break;
+ case zen::PutStatus::Invalid:
+ ResponseCode = HttpResponseCode::BadRequest;
+ break;
+ }
+
+ return PutResult.Message.empty() ? Request.WriteResponse(ResponseCode)
+ : Request.WriteResponse(ResponseCode, zen::HttpContentType::kText, PutResult.Message);
+ };
+
const HttpContentType ContentType = Request.RequestContentType();
Body.SetContentType(ContentType);
@@ -1225,18 +1250,28 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con
{
RawHash = IoHash::HashBuffer(SharedBuffer(Body));
}
- m_CacheStore.Put(RequestContext,
- Ref.Namespace,
- Ref.BucketSegment,
- Ref.HashKey,
- {.Value = Body, .RawSize = RawSize, .RawHash = RawHash},
- {},
- nullptr);
+ const bool Overwrite = !EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal);
+ // TODO: Propagation for rejected PUTs
+ ZenCacheStore::PutResult PutResult = m_CacheStore.Put(RequestContext,
+ Ref.Namespace,
+ Ref.BucketSegment,
+ Ref.HashKey,
+ {.Value = Body, .RawSize = RawSize, .RawHash = RawHash},
+ {},
+ Overwrite,
+ nullptr);
+ if (PutResult.Status != zen::PutStatus::Success)
+ {
+ return WriteFailureResponse(PutResult);
+ }
m_CacheStats.WriteCount++;
if (HasUpstream && EnumHasAllFlags(PolicyFromUrl, CachePolicy::StoreRemote))
{
- m_UpstreamCache.EnqueueUpstream({.Type = ContentType, .Namespace = Ref.Namespace, .Key = {Ref.BucketSegment, Ref.HashKey}});
+ CidStore& ChunkStore = m_RpcHandler.GetCidStore(Ref.Namespace);
+ m_UpstreamCache.EnqueueUpstream(
+ {.Type = ContentType, .Namespace = Ref.Namespace, .Key = {Ref.BucketSegment, Ref.HashKey}},
+ [ChunkStore = &ChunkStore](const IoHash& ValueHash) { return ChunkStore->FindChunkByCid(ValueHash); });
}
ZEN_DEBUG("PUTCACHERECORD - '{}/{}/{}' {} '{}' in {}",
@@ -1270,17 +1305,34 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con
std::vector<IoHash> ReferencedAttachments;
int32_t TotalCount = 0;
- CacheRecord.IterateAttachments([this, &TotalCount, &ValidAttachments, &ReferencedAttachments](CbFieldView AttachmentHash) {
- const IoHash Hash = AttachmentHash.AsHash();
- ReferencedAttachments.push_back(Hash);
- if (m_CidStore.ContainsChunk(Hash))
- {
- ValidAttachments.emplace_back(Hash);
- }
- TotalCount++;
- });
+ CidStore& ChunkStore = m_RpcHandler.GetCidStore(Ref.Namespace);
- m_CacheStore.Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, {.Value = Body}, ReferencedAttachments, nullptr);
+ CacheRecord.IterateAttachments(
+ [this, &ChunkStore, &TotalCount, &ValidAttachments, &ReferencedAttachments](CbFieldView AttachmentHash) {
+ const IoHash Hash = AttachmentHash.AsHash();
+ ReferencedAttachments.push_back(Hash);
+ if (ChunkStore.ContainsChunk(Hash))
+ {
+ ValidAttachments.emplace_back(Hash);
+ }
+ TotalCount++;
+ });
+
+ const bool Overwrite = !EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal);
+
+ // TODO: Propagation for rejected PUTs
+ ZenCacheStore::PutResult PutResult = m_CacheStore.Put(RequestContext,
+ Ref.Namespace,
+ Ref.BucketSegment,
+ Ref.HashKey,
+ {.Value = Body},
+ ReferencedAttachments,
+ Overwrite,
+ nullptr);
+ if (PutResult.Status != zen::PutStatus::Success)
+ {
+ return WriteFailureResponse(PutResult);
+ }
m_CacheStats.WriteCount++;
ZEN_DEBUG("PUTCACHERECORD - '{}/{}/{}' {} '{}' attachments '{}/{}' (valid/total) in {}",
@@ -1298,10 +1350,12 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con
CachePolicy Policy = PolicyFromUrl;
if (HasUpstream && EnumHasAllFlags(Policy, CachePolicy::StoreRemote) && !IsPartialRecord)
{
- m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbObject,
- .Namespace = Ref.Namespace,
- .Key = {Ref.BucketSegment, Ref.HashKey},
- .ValueContentIds = std::move(ValidAttachments)});
+ m_UpstreamCache.EnqueueUpstream(
+ {.Type = ZenContentType::kCbObject,
+ .Namespace = Ref.Namespace,
+ .Key = {Ref.BucketSegment, Ref.HashKey},
+ .ValueContentIds = std::move(ValidAttachments)},
+ [ChunkStore = &ChunkStore](const IoHash& ValueHash) { return ChunkStore->FindChunkByCid(ValueHash); });
}
Request.WriteResponse(HttpResponseCode::Created);
@@ -1334,38 +1388,46 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con
WriteAttachmentBuffers.reserve(NumAttachments);
WriteRawHashes.reserve(NumAttachments);
- CacheRecord.IterateAttachments(
- [this, &Ref, &Package, &WriteAttachmentBuffers, &WriteRawHashes, &ValidAttachments, &ReferencedAttachments, &Count](
- CbFieldView HashView) {
- const IoHash Hash = HashView.AsHash();
- ReferencedAttachments.push_back(Hash);
- if (const CbAttachment* Attachment = Package.FindAttachment(Hash))
- {
- if (Attachment->IsCompressedBinary())
- {
- WriteAttachmentBuffers.emplace_back(Attachment->AsCompressedBinary().GetCompressed().Flatten().AsIoBuffer());
- WriteRawHashes.push_back(Hash);
- ValidAttachments.emplace_back(Hash);
- Count.Valid++;
- }
- else
- {
- ZEN_WARN("PUTCACHERECORD - '{}/{}/{}' '{}' FAILED, attachment '{}' is not compressed",
- Ref.Namespace,
- Ref.BucketSegment,
- Ref.HashKey,
- ToString(HttpContentType::kCbPackage),
- Hash);
- Count.Invalid++;
- }
- }
- else if (m_CidStore.ContainsChunk(Hash))
+ CidStore& ChunkStore = m_RpcHandler.GetCidStore(Ref.Namespace);
+
+ CacheRecord.IterateAttachments([this,
+ &Ref,
+ &Package,
+ &ChunkStore,
+ &WriteAttachmentBuffers,
+ &WriteRawHashes,
+ &ValidAttachments,
+ &ReferencedAttachments,
+ &Count](CbFieldView HashView) {
+ const IoHash Hash = HashView.AsHash();
+ ReferencedAttachments.push_back(Hash);
+ if (const CbAttachment* Attachment = Package.FindAttachment(Hash))
+ {
+ if (Attachment->IsCompressedBinary())
{
+ WriteAttachmentBuffers.emplace_back(Attachment->AsCompressedBinary().GetCompressed().Flatten().AsIoBuffer());
+ WriteRawHashes.push_back(Hash);
ValidAttachments.emplace_back(Hash);
Count.Valid++;
}
- Count.Total++;
- });
+ else
+ {
+ ZEN_WARN("PUTCACHERECORD - '{}/{}/{}' '{}' FAILED, attachment '{}' is not compressed",
+ Ref.Namespace,
+ Ref.BucketSegment,
+ Ref.HashKey,
+ ToString(HttpContentType::kCbPackage),
+ Hash);
+ Count.Invalid++;
+ }
+ }
+ else if (ChunkStore.ContainsChunk(Hash))
+ {
+ ValidAttachments.emplace_back(Hash);
+ Count.Valid++;
+ }
+ Count.Total++;
+ });
if (Count.Invalid > 0)
{
@@ -1373,15 +1435,23 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con
return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid attachment(s)"sv);
}
+ const bool Overwrite = !EnumHasAllFlags(Policy, CachePolicy::QueryLocal);
+
ZenCacheValue CacheValue;
CacheValue.Value = CacheRecord.GetBuffer().AsIoBuffer();
CacheValue.Value.SetContentType(ZenContentType::kCbObject);
- m_CacheStore.Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, CacheValue, ReferencedAttachments);
+ // TODO: Propagation for rejected PUTs
+ ZenCacheStore::PutResult PutResult =
+ m_CacheStore.Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, CacheValue, ReferencedAttachments, Overwrite);
+ if (PutResult.Status != zen::PutStatus::Success)
+ {
+ return WriteFailureResponse(PutResult);
+ }
m_CacheStats.WriteCount++;
if (!WriteAttachmentBuffers.empty())
{
- std::vector<CidStore::InsertResult> InsertResults = m_CidStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes);
+ std::vector<CidStore::InsertResult> InsertResults = ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes);
for (const CidStore::InsertResult& InsertResult : InsertResults)
{
if (InsertResult.New)
@@ -1408,10 +1478,12 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con
if (HasUpstream && EnumHasAllFlags(Policy, CachePolicy::StoreRemote) && !IsPartialRecord)
{
- m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbPackage,
- .Namespace = Ref.Namespace,
- .Key = {Ref.BucketSegment, Ref.HashKey},
- .ValueContentIds = std::move(ValidAttachments)});
+ m_UpstreamCache.EnqueueUpstream(
+ {.Type = ZenContentType::kCbPackage,
+ .Namespace = Ref.Namespace,
+ .Key = {Ref.BucketSegment, Ref.HashKey},
+ .ValueContentIds = std::move(ValidAttachments)},
+ [ChunkStore = &ChunkStore](const IoHash& ValueHash) { return ChunkStore->FindChunkByCid(ValueHash); });
}
Request.WriteResponse(HttpResponseCode::Created);
@@ -1445,7 +1517,9 @@ HttpStructuredCacheService::HandleGetCacheChunk(HttpServerRequest& Request, cons
{
Stopwatch Timer;
- IoBuffer Value = m_CidStore.FindChunkByCid(Ref.ValueContentId);
+ CidStore& ChunkStore = m_RpcHandler.GetCidStore(Ref.Namespace);
+
+ IoBuffer Value = ChunkStore.FindChunkByCid(Ref.ValueContentId);
const UpstreamEndpointInfo* Source = nullptr;
CachePolicy Policy = PolicyFromUrl;
@@ -1467,7 +1541,7 @@ HttpStructuredCacheService::HandleGetCacheChunk(HttpServerRequest& Request, cons
{
if (AreDiskWritesAllowed())
{
- m_CidStore.AddChunk(UpstreamResult.Value, RawHash);
+ ChunkStore.AddChunk(UpstreamResult.Value, RawHash);
}
Source = UpstreamResult.Source;
}
@@ -1561,7 +1635,9 @@ HttpStructuredCacheService::HandlePutCacheChunk(HttpServerRequest& Request, cons
"ValueContentId does not match attachment hash"sv);
}
- CidStore::InsertResult Result = m_CidStore.AddChunk(Body, RawHash);
+ CidStore& ChunkStore = m_RpcHandler.GetCidStore(Ref.Namespace);
+
+ CidStore::InsertResult Result = ChunkStore.AddChunk(Body, RawHash);
ZEN_DEBUG("PUTCACHECHUNK - '{}/{}/{}/{}' {} '{}' ({}) in {}",
Ref.Namespace,
@@ -1776,16 +1852,51 @@ HttpStructuredCacheService::HandleStatsRequest(HttpServerRequest& Request)
EmitSnapshot("requests", m_HttpRequests, Cbo);
- const uint64_t HitCount = m_CacheStats.HitCount;
- const uint64_t UpstreamHitCount = m_CacheStats.UpstreamHitCount;
- const uint64_t MissCount = m_CacheStats.MissCount;
- const uint64_t WriteCount = m_CacheStats.WriteCount;
- const uint64_t BadRequestCount = m_CacheStats.BadRequestCount;
- struct CidStoreStats StoreStats = m_CidStore.Stats();
- const uint64_t ChunkHitCount = StoreStats.HitCount;
- const uint64_t ChunkMissCount = StoreStats.MissCount;
- const uint64_t ChunkWriteCount = StoreStats.WriteCount;
- const uint64_t TotalCount = HitCount + MissCount;
+ const uint64_t HitCount = m_CacheStats.HitCount;
+ const uint64_t UpstreamHitCount = m_CacheStats.UpstreamHitCount;
+ const uint64_t MissCount = m_CacheStats.MissCount;
+ const uint64_t WriteCount = m_CacheStats.WriteCount;
+ const uint64_t BadRequestCount = m_CacheStats.BadRequestCount;
+
+ uint64_t TotalChunkHitCount = 0;
+ uint64_t TotalChunkMissCount = 0;
+ uint64_t TotalChunkWriteCount = 0;
+ CidStoreSize TotalCidSize;
+
+ tsl::robin_map<CidStore*, std::string> UniqueStores;
+ {
+ std::vector<std::string> NamespaceNames = m_CacheStore.GetNamespaces();
+
+ for (const std::string& NamespaceName : NamespaceNames)
+ {
+ CidStore* Store = &m_RpcHandler.GetCidStore(NamespaceName);
+ if (auto It = UniqueStores.find(Store); It == UniqueStores.end())
+ {
+ UniqueStores.insert_or_assign(Store, NamespaceName);
+ }
+ else
+ {
+ UniqueStores.insert_or_assign(Store, std::string{});
+ }
+ }
+
+ for (auto It : UniqueStores)
+ {
+ CidStore* ChunkStore = It.first;
+
+ CidStoreStats StoreStats = ChunkStore->Stats();
+ CidStoreSize StoreSize = ChunkStore->TotalSize();
+
+ TotalChunkHitCount += StoreStats.HitCount;
+ TotalChunkMissCount += StoreStats.MissCount;
+ TotalChunkWriteCount += StoreStats.WriteCount;
+
+ TotalCidSize.TinySize += StoreSize.TinySize;
+ TotalCidSize.SmallSize += StoreSize.SmallSize;
+ TotalCidSize.LargeSize += StoreSize.LargeSize;
+ TotalCidSize.TotalSize += StoreSize.TotalSize;
+ }
+ }
const uint64_t RpcRequests = m_CacheStats.RpcRequests;
const uint64_t RpcRecordRequests = m_CacheStats.RpcRecordRequests;
@@ -1795,17 +1906,11 @@ HttpStructuredCacheService::HandleStatsRequest(HttpServerRequest& Request)
const uint64_t RpcChunkRequests = m_CacheStats.RpcChunkRequests;
const uint64_t RpcChunkBatchRequests = m_CacheStats.RpcChunkBatchRequests;
- const CidStoreSize CidSize = m_CidStore.TotalSize();
- const GcStorageSize CacheSize = m_CacheStore.StorageSize();
+ const CacheStoreSize CacheSize = m_CacheStore.TotalSize();
bool ShowCidStoreStats = Request.GetQueryParams().GetValue("cidstorestats") == "true";
bool ShowCacheStoreStats = Request.GetQueryParams().GetValue("cachestorestats") == "true";
- CidStoreStats CidStoreStats = {};
- if (ShowCidStoreStats)
- {
- CidStoreStats = m_CidStore.Stats();
- }
ZenCacheStore::CacheStoreStats CacheStoreStats = {};
if (ShowCacheStoreStats)
{
@@ -1840,6 +1945,7 @@ HttpStructuredCacheService::HandleStatsRequest(HttpServerRequest& Request)
Cbo.EndObject();
Cbo << "hits" << HitCount << "misses" << MissCount << "writes" << WriteCount;
+ const uint64_t TotalCount = HitCount + MissCount;
Cbo << "hit_ratio" << (TotalCount > 0 ? (double(HitCount) / double(TotalCount)) : 0.0);
if (m_UpstreamCache.IsActive())
@@ -1850,7 +1956,9 @@ HttpStructuredCacheService::HandleStatsRequest(HttpServerRequest& Request)
Cbo << "upstream_ratio" << (HitCount > 0 ? (double(UpstreamHitCount) / double(HitCount)) : 0.0);
}
- Cbo << "cidhits" << ChunkHitCount << "cidmisses" << ChunkMissCount << "cidwrites" << ChunkWriteCount;
+ Cbo << "cidhits" << TotalChunkHitCount << "cidmisses" << TotalChunkMissCount << "cidwrites" << TotalChunkWriteCount;
+ const uint64_t TotalChunkCount = TotalChunkHitCount + TotalChunkMissCount;
+ Cbo << "cidhit_ratio" << (TotalChunkHitCount ? (double(TotalChunkCount) / double(TotalChunkHitCount)) : 0.0);
if (ShowCacheStoreStats)
{
@@ -1959,20 +2067,58 @@ HttpStructuredCacheService::HandleStatsRequest(HttpServerRequest& Request)
{
Cbo.BeginObject("size");
{
- Cbo << "tiny" << CidSize.TinySize;
- Cbo << "small" << CidSize.SmallSize;
- Cbo << "large" << CidSize.LargeSize;
- Cbo << "total" << CidSize.TotalSize;
+ Cbo << "tiny" << TotalCidSize.TinySize;
+ Cbo << "small" << TotalCidSize.SmallSize;
+ Cbo << "large" << TotalCidSize.LargeSize;
+ Cbo << "total" << TotalCidSize.TotalSize;
}
Cbo.EndObject();
if (ShowCidStoreStats)
{
Cbo.BeginObject("store");
- Cbo << "hits" << CidStoreStats.HitCount << "misses" << CidStoreStats.MissCount << "writes" << CidStoreStats.WriteCount;
- EmitSnapshot("read", CidStoreStats.FindChunkOps, Cbo);
- EmitSnapshot("write", CidStoreStats.AddChunkOps, Cbo);
- // EmitSnapshot("exists", CidStoreStats.ContainChunkOps, Cbo);
+
+ auto OutputStats = [&](CidStore& ChunkStore) {
+ CidStoreStats StoreStats = ChunkStore.Stats();
+ Cbo << "hits" << StoreStats.HitCount << "misses" << StoreStats.MissCount << "writes" << StoreStats.WriteCount;
+ const uint64_t Count = StoreStats.HitCount + StoreStats.MissCount;
+ Cbo << "hit_ratio" << (Count ? (double(StoreStats.HitCount) / double(Count)) : 0.0);
+ EmitSnapshot("read", StoreStats.FindChunkOps, Cbo);
+ EmitSnapshot("write", StoreStats.AddChunkOps, Cbo);
+ };
+
+ if (UniqueStores.size() > 1)
+ {
+ Cbo.BeginArray("namespaces");
+ for (auto It : UniqueStores)
+ {
+ CidStore* ChunkStore = It.first;
+ const std::string& Namespace = It.second;
+ CidStoreSize ChunkStoreSize = ChunkStore->TotalSize();
+ Cbo.BeginObject();
+ {
+ Cbo << "namespace" << Namespace;
+ Cbo.BeginObject("stats");
+ OutputStats(*ChunkStore);
+ Cbo.EndObject();
+
+ Cbo.BeginObject("size");
+ {
+ Cbo << "tiny" << ChunkStoreSize.TinySize;
+ Cbo << "small" << ChunkStoreSize.SmallSize;
+ Cbo << "large" << ChunkStoreSize.LargeSize;
+ Cbo << "total" << ChunkStoreSize.TotalSize;
+ }
+ Cbo.EndObject();
+ }
+ Cbo.EndObject();
+ }
+ Cbo.EndArray(); // namespaces
+ }
+ else if (UniqueStores.size() != 0)
+ {
+ OutputStats(*UniqueStores.begin()->first);
+ }
Cbo.EndObject();
}
}