From 8cb2b93f31c59b3261951a70c48e5da548194002 Mon Sep 17 00:00:00 2001 From: Stefan Boberg Date: Wed, 31 Jan 2024 15:55:45 +0100 Subject: changed RPC recording to MPSC setup (#638) fixes rare race condition when using RPC recording for long periods of time --- src/zenserver/cache/httpstructuredcache.cpp | 22 +--------------------- 1 file changed, 1 insertion(+), 21 deletions(-) (limited to 'src/zenserver/cache/httpstructuredcache.cpp') diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp index c62b5325e..95c85d6c8 100644 --- a/src/zenserver/cache/httpstructuredcache.cpp +++ b/src/zenserver/cache/httpstructuredcache.cpp @@ -1665,14 +1665,12 @@ HttpStructuredCacheService::HandleRpcRequest(HttpServerRequest& Request) auto HandleRpc = [this, RequestContext, Body = Request.ReadPayload(), ContentType, AcceptType](HttpServerRequest& AsyncRequest) mutable { - uint64_t RequestIndex = ~0ull; - if (m_RequestRecordingEnabled) { RwLock::SharedLockScope _(m_RequestRecordingLock); if (m_RequestRecorder) { - RequestIndex = m_RequestRecorder->RecordRequest( + m_RequestRecorder->RecordRequest( {.ContentType = ContentType, .AcceptType = AcceptType, .SessionId = RequestContext.SessionId}, Body); } @@ -1712,14 +1710,6 @@ HttpStructuredCacheService::HandleRpcRequest(HttpServerRequest& Request) TargetProcessHandle = m_OpenProcessCache.GetProcessHandle(RequestContext.SessionId, TargetProcessId); } CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResult, Flags, TargetProcessHandle); - if (RequestIndex != ~0ull) - { - RwLock::SharedLockScope _(m_RequestRecordingLock); - if (m_RequestRecorder) - { - m_RequestRecorder->RecordResponse(RequestIndex, HttpContentType::kCbPackage, RpcResponseBuffer); - } - } AsyncRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer); } else @@ -1727,16 +1717,6 @@ HttpStructuredCacheService::HandleRpcRequest(HttpServerRequest& Request) BinaryWriter MemStream; RpcResult.Save(MemStream); - if (RequestIndex != ~0ull) - { - RwLock::SharedLockScope _(m_RequestRecordingLock); - if (m_RequestRecorder) - { - m_RequestRecorder->RecordResponse(RequestIndex, - HttpContentType::kCbPackage, - IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); - } - } AsyncRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); -- cgit v1.2.3 From badf0c81d059e3cdd6051dba310dc7db5b7cc8d1 Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Tue, 27 Feb 2024 09:39:30 +0100 Subject: remove reference caching (#658) * remove reference caching --- src/zenserver/cache/httpstructuredcache.cpp | 1 - 1 file changed, 1 deletion(-) (limited to 'src/zenserver/cache/httpstructuredcache.cpp') diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp index 95c85d6c8..4cf7c9a01 100644 --- a/src/zenserver/cache/httpstructuredcache.cpp +++ b/src/zenserver/cache/httpstructuredcache.cpp @@ -740,7 +740,6 @@ HttpStructuredCacheService::HandleCacheNamespaceRequest(HttpServerRequest& Reque ResponseWriter.AddInteger("MemCacheTargetFootprintBytes"sv, Info->Config.DiskLayerConfig.MemCacheTargetFootprintBytes); ResponseWriter.AddInteger("MemCacheTrimIntervalSeconds"sv, Info->Config.DiskLayerConfig.MemCacheTrimIntervalSeconds); ResponseWriter.AddInteger("MemCacheMaxAgeSeconds"sv, Info->Config.DiskLayerConfig.MemCacheMaxAgeSeconds); - ResponseWriter.AddBool("EnableReferenceCaching"sv, Info->Config.DiskLayerConfig.BucketConfig.EnableReferenceCaching); } ResponseWriter.EndObject(); -- cgit v1.2.3 From 45bfc721dd049193abf3b2be2aa7b097466ddbe9 Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Thu, 14 Mar 2024 16:33:17 +0100 Subject: clean up test linking (#4) - Improvement: Add zenhttp-test and zenutil-test - Improvement: Moved cachepolicy test to cachepolicy.cpp - Improvement: Renamed cachestore tests from z$ to cachestore - Improvement: Moved test linking so test for a lib is linked by -test - Improvement: Removed HttpRequestParseRelativeUri in httpstructuredcache.cpp and use the one in cacherequests.h instead --- src/zenserver/cache/httpstructuredcache.cpp | 280 +--------------------------- 1 file changed, 2 insertions(+), 278 deletions(-) (limited to 'src/zenserver/cache/httpstructuredcache.cpp') diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp index 4cf7c9a01..f64b9c5a5 100644 --- a/src/zenserver/cache/httpstructuredcache.cpp +++ b/src/zenserver/cache/httpstructuredcache.cpp @@ -39,11 +39,6 @@ #include #include -#if ZEN_WITH_TESTS -# include -# include -#endif - namespace zen { using namespace std::literals; @@ -64,64 +59,6 @@ namespace { static constinit std::string_view HttpZCacheUtilReplayRecording = "exec$/replay-recording"sv; static constinit std::string_view HttpZCacheDetailsPrefix = "details$"sv; - constinit AsciiSet ValidNamespaceNameCharactersSet{"abcdefghijklmnopqrstuvwxyz0123456789-_.ABCDEFGHIJKLMNOPQRSTUVWXYZ"}; - constinit AsciiSet ValidBucketNameCharactersSet{"abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"}; - - std::optional GetValidNamespaceName(std::string_view Name) - { - if (Name.empty()) - { - ZEN_WARN("Namespace is invalid, empty namespace is not allowed"); - return {}; - } - - if (Name.length() > 64) - { - ZEN_WARN("Namespace '{}' is invalid, length exceeds 64 characters", Name); - return {}; - } - - if (!AsciiSet::HasOnly(Name, ValidNamespaceNameCharactersSet)) - { - ZEN_WARN("Namespace '{}' is invalid, invalid characters detected", Name); - return {}; - } - - return ToLower(Name); - } - - std::optional GetValidBucketName(std::string_view Name) - { - if (Name.empty()) - { - ZEN_WARN("Bucket name is invalid, empty bucket name is not allowed"); - return {}; - } - - if (!AsciiSet::HasOnly(Name, ValidBucketNameCharactersSet)) - { - ZEN_WARN("Bucket name '{}' is invalid, invalid characters detected", Name); - return {}; - } - - return ToLower(Name); - } - - std::optional GetValidIoHash(std::string_view Hash) - { - if (Hash.length() != IoHash::StringLength) - { - return {}; - } - - IoHash KeyHash; - if (!ParseHexBytes(Hash.data(), Hash.size(), KeyHash.Hash)) - { - return {}; - } - return KeyHash; - } - struct HttpRequestData { std::optional Namespace; @@ -130,117 +67,6 @@ namespace { std::optional ValueContentId; }; - bool HttpRequestParseRelativeUri(std::string_view Key, HttpRequestData& Data) - { - std::vector Tokens; - uint32_t TokenCount = ForEachStrTok(Key, '/', [&](const std::string_view& Token) { - Tokens.push_back(Token); - return true; - }); - - switch (TokenCount) - { - case 0: - return true; - case 1: - Data.Namespace = GetValidNamespaceName(Tokens[0]); - return Data.Namespace.has_value(); - case 2: - { - std::optional PossibleHashKey = GetValidIoHash(Tokens[1]); - if (PossibleHashKey.has_value()) - { - // Legacy bucket/key request - Data.Bucket = GetValidBucketName(Tokens[0]); - if (!Data.Bucket.has_value()) - { - return false; - } - Data.HashKey = PossibleHashKey; - Data.Namespace = ZenCacheStore::DefaultNamespace; - return true; - } - Data.Namespace = GetValidNamespaceName(Tokens[0]); - if (!Data.Namespace.has_value()) - { - return false; - } - Data.Bucket = GetValidBucketName(Tokens[1]); - if (!Data.Bucket.has_value()) - { - return false; - } - return true; - } - case 3: - { - std::optional PossibleHashKey = GetValidIoHash(Tokens[1]); - if (PossibleHashKey.has_value()) - { - // Legacy bucket/key/valueid request - Data.Bucket = GetValidBucketName(Tokens[0]); - if (!Data.Bucket.has_value()) - { - return false; - } - Data.HashKey = PossibleHashKey; - Data.ValueContentId = GetValidIoHash(Tokens[2]); - if (!Data.ValueContentId.has_value()) - { - return false; - } - Data.Namespace = ZenCacheStore::DefaultNamespace; - return true; - } - Data.Namespace = GetValidNamespaceName(Tokens[0]); - if (!Data.Namespace.has_value()) - { - return false; - } - Data.Bucket = GetValidBucketName(Tokens[1]); - if (!Data.Bucket.has_value()) - { - return false; - } - Data.HashKey = GetValidIoHash(Tokens[2]); - if (!Data.HashKey) - { - return false; - } - return true; - } - case 4: - { - Data.Namespace = GetValidNamespaceName(Tokens[0]); - if (!Data.Namespace.has_value()) - { - return false; - } - - Data.Bucket = GetValidBucketName(Tokens[1]); - if (!Data.Bucket.has_value()) - { - return false; - } - - Data.HashKey = GetValidIoHash(Tokens[2]); - if (!Data.HashKey.has_value()) - { - return false; - } - - Data.ValueContentId = GetValidIoHash(Tokens[3]); - if (!Data.ValueContentId.has_value()) - { - return false; - } - return true; - } - default: - return false; - } - } - } // namespace ////////////////////////////////////////////////////////////////////////// @@ -614,8 +440,8 @@ HttpStructuredCacheService::HandleRequest(HttpServerRequest& Request) return; } - HttpRequestData RequestData; - if (!HttpRequestParseRelativeUri(Key, RequestData)) + cacherequests::HttpRequestData RequestData; + if (!cacherequests::HttpRequestParseRelativeUri(Key, ZenCacheStore::DefaultNamespace, RequestData)) { m_CacheStats.BadRequestCount++; return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL @@ -1968,106 +1794,4 @@ HttpStructuredCacheService::AreDiskWritesAllowed() const return (m_DiskWriteBlocker == nullptr || m_DiskWriteBlocker->AreDiskWritesAllowed()); } -#if ZEN_WITH_TESTS - -TEST_CASE("z$service.parse.relative.Uri") -{ - HttpRequestData RootRequest; - CHECK(HttpRequestParseRelativeUri("", RootRequest)); - CHECK(!RootRequest.Namespace.has_value()); - CHECK(!RootRequest.Bucket.has_value()); - CHECK(!RootRequest.HashKey.has_value()); - CHECK(!RootRequest.ValueContentId.has_value()); - - RootRequest = {}; - CHECK(HttpRequestParseRelativeUri("/", RootRequest)); - CHECK(!RootRequest.Namespace.has_value()); - CHECK(!RootRequest.Bucket.has_value()); - CHECK(!RootRequest.HashKey.has_value()); - CHECK(!RootRequest.ValueContentId.has_value()); - - HttpRequestData LegacyBucketRequestBecomesNamespaceRequest; - CHECK(HttpRequestParseRelativeUri("test", LegacyBucketRequestBecomesNamespaceRequest)); - CHECK(LegacyBucketRequestBecomesNamespaceRequest.Namespace == "test"sv); - CHECK(!LegacyBucketRequestBecomesNamespaceRequest.Bucket.has_value()); - CHECK(!LegacyBucketRequestBecomesNamespaceRequest.HashKey.has_value()); - CHECK(!LegacyBucketRequestBecomesNamespaceRequest.ValueContentId.has_value()); - - HttpRequestData LegacyHashKeyRequest; - CHECK(HttpRequestParseRelativeUri("test/0123456789abcdef12340123456789abcdef1234", LegacyHashKeyRequest)); - CHECK(LegacyHashKeyRequest.Namespace == ZenCacheStore::DefaultNamespace); - CHECK(LegacyHashKeyRequest.Bucket == "test"sv); - CHECK(LegacyHashKeyRequest.HashKey == IoHash::FromHexString("0123456789abcdef12340123456789abcdef1234"sv)); - CHECK(!LegacyHashKeyRequest.ValueContentId.has_value()); - - HttpRequestData LegacyValueContentIdRequest; - CHECK(HttpRequestParseRelativeUri("test/0123456789abcdef12340123456789abcdef1234/56789abcdef12345678956789abcdef123456789", - LegacyValueContentIdRequest)); - CHECK(LegacyValueContentIdRequest.Namespace == ZenCacheStore::DefaultNamespace); - CHECK(LegacyValueContentIdRequest.Bucket == "test"sv); - CHECK(LegacyValueContentIdRequest.HashKey == IoHash::FromHexString("0123456789abcdef12340123456789abcdef1234"sv)); - CHECK(LegacyValueContentIdRequest.ValueContentId == IoHash::FromHexString("56789abcdef12345678956789abcdef123456789"sv)); - - HttpRequestData V2DefaultNamespaceRequest; - CHECK(HttpRequestParseRelativeUri("ue4.ddc", V2DefaultNamespaceRequest)); - CHECK(V2DefaultNamespaceRequest.Namespace == "ue4.ddc"); - CHECK(!V2DefaultNamespaceRequest.Bucket.has_value()); - CHECK(!V2DefaultNamespaceRequest.HashKey.has_value()); - CHECK(!V2DefaultNamespaceRequest.ValueContentId.has_value()); - - HttpRequestData V2NamespaceRequest; - CHECK(HttpRequestParseRelativeUri("nicenamespace", V2NamespaceRequest)); - CHECK(V2NamespaceRequest.Namespace == "nicenamespace"sv); - CHECK(!V2NamespaceRequest.Bucket.has_value()); - CHECK(!V2NamespaceRequest.HashKey.has_value()); - CHECK(!V2NamespaceRequest.ValueContentId.has_value()); - - HttpRequestData V2BucketRequestWithDefaultNamespace; - CHECK(HttpRequestParseRelativeUri("ue4.ddc/test", V2BucketRequestWithDefaultNamespace)); - CHECK(V2BucketRequestWithDefaultNamespace.Namespace == "ue4.ddc"); - CHECK(V2BucketRequestWithDefaultNamespace.Bucket == "test"sv); - CHECK(!V2BucketRequestWithDefaultNamespace.HashKey.has_value()); - CHECK(!V2BucketRequestWithDefaultNamespace.ValueContentId.has_value()); - - HttpRequestData V2BucketRequestWithNamespace; - CHECK(HttpRequestParseRelativeUri("nicenamespace/test", V2BucketRequestWithNamespace)); - CHECK(V2BucketRequestWithNamespace.Namespace == "nicenamespace"sv); - CHECK(V2BucketRequestWithNamespace.Bucket == "test"sv); - CHECK(!V2BucketRequestWithNamespace.HashKey.has_value()); - CHECK(!V2BucketRequestWithNamespace.ValueContentId.has_value()); - - HttpRequestData V2HashKeyRequest; - CHECK(HttpRequestParseRelativeUri("test/0123456789abcdef12340123456789abcdef1234", V2HashKeyRequest)); - CHECK(V2HashKeyRequest.Namespace == ZenCacheStore::DefaultNamespace); - CHECK(V2HashKeyRequest.Bucket == "test"); - CHECK(V2HashKeyRequest.HashKey == IoHash::FromHexString("0123456789abcdef12340123456789abcdef1234"sv)); - CHECK(!V2HashKeyRequest.ValueContentId.has_value()); - - HttpRequestData V2ValueContentIdRequest; - CHECK( - HttpRequestParseRelativeUri("nicenamespace/test/0123456789abcdef12340123456789abcdef1234/56789abcdef12345678956789abcdef123456789", - V2ValueContentIdRequest)); - CHECK(V2ValueContentIdRequest.Namespace == "nicenamespace"sv); - CHECK(V2ValueContentIdRequest.Bucket == "test"sv); - CHECK(V2ValueContentIdRequest.HashKey == IoHash::FromHexString("0123456789abcdef12340123456789abcdef1234"sv)); - CHECK(V2ValueContentIdRequest.ValueContentId == IoHash::FromHexString("56789abcdef12345678956789abcdef123456789"sv)); - - HttpRequestData Invalid; - CHECK(!HttpRequestParseRelativeUri("bad\2_namespace", Invalid)); - CHECK(!HttpRequestParseRelativeUri("nice/\2\1bucket", Invalid)); - CHECK(!HttpRequestParseRelativeUri("namespace/bucket/0123456789a", Invalid)); - CHECK(!HttpRequestParseRelativeUri("namespace/bucket/0123456789abcdef12340123456789abcdef1234/56789abcdef1234", Invalid)); - CHECK(!HttpRequestParseRelativeUri("namespace/bucket/pppppppp89abcdef12340123456789abcdef1234", Invalid)); - CHECK(!HttpRequestParseRelativeUri("namespace/bucket/0123456789abcdef12340123456789abcdef1234/56789abcd", Invalid)); - CHECK(!HttpRequestParseRelativeUri("namespace/bucket/0123456789abcdef12340123456789abcdef1234/ppppppppdef12345678956789abcdef123456789", - Invalid)); -} - -#endif - -void -z$service_forcelink() -{ -} - } // namespace zen -- cgit v1.2.3 From 96f44f2f2d8cbcda254d0b193f5a1aece645daeb Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Mon, 22 Apr 2024 20:21:02 +0200 Subject: InsertChunks for CAS store (#55) - Improvement: Add batching when writing multiple small chunks to block store - decreases I/O load significantly on oplog import --- src/zenserver/cache/httpstructuredcache.cpp | 77 ++++++++++++++++++----------- 1 file changed, 47 insertions(+), 30 deletions(-) (limited to 'src/zenserver/cache/httpstructuredcache.cpp') diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp index f64b9c5a5..8106e9db9 100644 --- a/src/zenserver/cache/httpstructuredcache.cpp +++ b/src/zenserver/cache/httpstructuredcache.cpp @@ -848,17 +848,20 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con CbPackage Package; if (Package.TryLoad(ClientResultValue.Value)) { - CbObject CacheRecord = Package.GetObject(); - AttachmentCount Count; - size_t NumAttachments = Package.GetAttachments().size(); - std::vector AttachmentsToStoreLocally; - std::vector ReferencedAttachments; - AttachmentsToStoreLocally.reserve(NumAttachments); + CbObject CacheRecord = Package.GetObject(); + AttachmentCount Count; + size_t NumAttachments = Package.GetAttachments().size(); + std::vector ReferencedAttachments; + std::vector WriteAttachmentBuffers; + WriteAttachmentBuffers.reserve(NumAttachments); + std::vector WriteRawHashes; + WriteRawHashes.reserve(NumAttachments); CacheRecord.IterateAttachments([this, &Package, &Ref, - &AttachmentsToStoreLocally, + &WriteAttachmentBuffers, + &WriteRawHashes, &ReferencedAttachments, &Count, QueryLocal, @@ -872,7 +875,9 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con { if (StoreLocal) { - AttachmentsToStoreLocally.emplace_back(Attachment); + CompressedBuffer Chunk = Attachment->AsCompressedBinary(); + WriteAttachmentBuffers.push_back(Chunk.GetCompressed().Flatten().AsIoBuffer()); + WriteRawHashes.push_back(Attachment->GetHash()); } Count.Valid++; } @@ -923,18 +928,22 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con m_CacheStore .Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, CacheValue, ReferencedAttachments); m_CacheStats.WriteCount++; - } - for (const CbAttachment* Attachment : AttachmentsToStoreLocally) - { - ZEN_ASSERT_SLOW(StoreLocal); - CompressedBuffer Chunk = Attachment->AsCompressedBinary(); - CidStore::InsertResult InsertResult = - m_CidStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash()); - if (InsertResult.New) + if (!WriteAttachmentBuffers.empty()) { - Count.New++; + std::vector InsertResults = + m_CidStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes); + for (const CidStore::InsertResult& Result : InsertResults) + { + if (Result.New) + { + Count.New++; + } + } } + + WriteAttachmentBuffers = {}; + WriteRawHashes = {}; } BinaryWriter MemStream; @@ -1151,23 +1160,27 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con CbObject CacheRecord = Package.GetObject(); - AttachmentCount Count; - size_t NumAttachments = Package.GetAttachments().size(); - std::vector ValidAttachments; - std::vector ReferencedAttachments; - std::vector AttachmentsToStoreLocally; + AttachmentCount Count; + size_t NumAttachments = Package.GetAttachments().size(); + std::vector ValidAttachments; + std::vector ReferencedAttachments; ValidAttachments.reserve(NumAttachments); - AttachmentsToStoreLocally.reserve(NumAttachments); + std::vector WriteAttachmentBuffers; + std::vector WriteRawHashes; + WriteAttachmentBuffers.reserve(NumAttachments); + WriteRawHashes.reserve(NumAttachments); CacheRecord.IterateAttachments( - [this, &Ref, &Package, &AttachmentsToStoreLocally, &ValidAttachments, &ReferencedAttachments, &Count](CbFieldView HashView) { + [this, &Ref, &Package, &WriteAttachmentBuffers, &WriteRawHashes, &ValidAttachments, &ReferencedAttachments, &Count]( + CbFieldView HashView) { const IoHash Hash = HashView.AsHash(); ReferencedAttachments.push_back(Hash); if (const CbAttachment* Attachment = Package.FindAttachment(Hash)) { if (Attachment->IsCompressedBinary()) { - AttachmentsToStoreLocally.emplace_back(Attachment); + WriteAttachmentBuffers.emplace_back(Attachment->AsCompressedBinary().GetCompressed().Flatten().AsIoBuffer()); + WriteRawHashes.push_back(Hash); ValidAttachments.emplace_back(Hash); Count.Valid++; } @@ -1202,14 +1215,18 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con m_CacheStore.Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, CacheValue, ReferencedAttachments); m_CacheStats.WriteCount++; - for (const CbAttachment* Attachment : AttachmentsToStoreLocally) + if (!WriteAttachmentBuffers.empty()) { - CompressedBuffer Chunk = Attachment->AsCompressedBinary(); - CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash()); - if (InsertResult.New) + std::vector InsertResults = m_CidStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes); + for (const CidStore::InsertResult& InsertResult : InsertResults) { - Count.New++; + if (InsertResult.New) + { + Count.New++; + } } + WriteAttachmentBuffers = {}; + WriteRawHashes = {}; } ZEN_DEBUG("PUTCACHERECORD - '{}/{}/{}' {} '{}', attachments '{}/{}/{}' (new/valid/total) in {}", -- cgit v1.2.3 From 1bafcb32cb48b2256a9d72995388b7df72058039 Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Thu, 2 May 2024 10:53:15 +0200 Subject: batch cache put (#67) - Improvement: Batch scope for put of cache values --- src/zenserver/cache/httpstructuredcache.cpp | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) (limited to 'src/zenserver/cache/httpstructuredcache.cpp') diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp index 8106e9db9..135eee57c 100644 --- a/src/zenserver/cache/httpstructuredcache.cpp +++ b/src/zenserver/cache/httpstructuredcache.cpp @@ -839,7 +839,7 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con if (Success && StoreLocal) { - m_CacheStore.Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ClientResultValue, {}); + m_CacheStore.Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ClientResultValue, {}, nullptr); m_CacheStats.WriteCount++; } } @@ -925,8 +925,13 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con if (StoreLocal) { - m_CacheStore - .Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, CacheValue, ReferencedAttachments); + m_CacheStore.Put(RequestContext, + Ref.Namespace, + Ref.BucketSegment, + Ref.HashKey, + CacheValue, + ReferencedAttachments, + nullptr); m_CacheStats.WriteCount++; if (!WriteAttachmentBuffers.empty()) @@ -1067,7 +1072,8 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con Ref.BucketSegment, Ref.HashKey, {.Value = Body, .RawSize = RawSize, .RawHash = RawHash}, - {}); + {}, + nullptr); m_CacheStats.WriteCount++; if (HasUpstream && EnumHasAllFlags(PolicyFromUrl, CachePolicy::StoreRemote)) @@ -1116,7 +1122,7 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con TotalCount++; }); - m_CacheStore.Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, {.Value = Body}, ReferencedAttachments); + m_CacheStore.Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, {.Value = Body}, ReferencedAttachments, nullptr); m_CacheStats.WriteCount++; ZEN_DEBUG("PUTCACHERECORD - '{}/{}/{}' {} '{}' attachments '{}/{}' (valid/total) in {}", -- cgit v1.2.3 From 1155ed2048a24f4dfcfa65d63243b77973f820d6 Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Thu, 2 May 2024 17:40:30 +0200 Subject: fix zero size attachment replies (#69) - Bugfix: Don't try to respond with zero size partial cache value when partial size is zero - Improvement: Added more validation of data read from cache / cas --- src/zenserver/cache/httpstructuredcache.cpp | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) (limited to 'src/zenserver/cache/httpstructuredcache.cpp') diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp index 135eee57c..449a43653 100644 --- a/src/zenserver/cache/httpstructuredcache.cpp +++ b/src/zenserver/cache/httpstructuredcache.cpp @@ -727,7 +727,15 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash())) { CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(Chunk)); - Package.AddAttachment(CbAttachment(Compressed, AttachmentHash.AsHash())); + if (Compressed) + { + Package.AddAttachment(CbAttachment(Compressed, AttachmentHash.AsHash())); + } + else + { + ZEN_WARN("invalid compressed binary returned for {}", AttachmentHash.AsHash()); + MissingCount++; + } } else { -- cgit v1.2.3 From 8ce1dc72cce381b2adae256504331f2e8893f262 Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Thu, 30 May 2024 14:44:34 +0200 Subject: cache optimizations (#88) * message formatting optimizations * bump iostorecompression small value threshold to 1MB --- src/zenserver/cache/httpstructuredcache.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/zenserver/cache/httpstructuredcache.cpp') diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp index 449a43653..52e31ff40 100644 --- a/src/zenserver/cache/httpstructuredcache.cpp +++ b/src/zenserver/cache/httpstructuredcache.cpp @@ -883,7 +883,7 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con { if (StoreLocal) { - CompressedBuffer Chunk = Attachment->AsCompressedBinary(); + const CompressedBuffer& Chunk = Attachment->AsCompressedBinary(); WriteAttachmentBuffers.push_back(Chunk.GetCompressed().Flatten().AsIoBuffer()); WriteRawHashes.push_back(Attachment->GetHash()); } -- cgit v1.2.3 From b9d5ea52a837167b08113823f38e53b59b44a7fe Mon Sep 17 00:00:00 2001 From: Stefan Boberg Date: Mon, 23 Sep 2024 19:28:26 +0200 Subject: Added namespace qualifier (optional) for z$ rpc requests (#166) This change adds support for a namespace-qualified RPC endpoint for z$ at `/z$//$rpc` which may be used to validate RPC requests by URL inspection. The old scheme is still supported. --- src/zenserver/cache/httpstructuredcache.cpp | 125 ++++++++++++++++------------ 1 file changed, 74 insertions(+), 51 deletions(-) (limited to 'src/zenserver/cache/httpstructuredcache.cpp') diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp index 52e31ff40..109fb34f6 100644 --- a/src/zenserver/cache/httpstructuredcache.cpp +++ b/src/zenserver/cache/httpstructuredcache.cpp @@ -366,10 +366,27 @@ HttpStructuredCacheService::HandleRequest(HttpServerRequest& Request) metrics::OperationTiming::Scope $(m_HttpRequests); - std::string_view Key = Request.RelativeUri(); - if (Key == HttpZCacheRPCPrefix) + const std::string_view Key = Request.RelativeUri(); + + std::string_view UriNamespace; + + if (Key.ends_with(HttpZCacheRPCPrefix)) { - return HandleRpcRequest(Request); + const size_t RpcOffset = Key.length() - HttpZCacheRPCPrefix.length(); + + if (RpcOffset) + { + std::string_view KeyPrefix = Key.substr(0, RpcOffset); + + if (KeyPrefix.back() == '/') + { + KeyPrefix.remove_suffix(1); + + UriNamespace = KeyPrefix; + } + } + + return HandleRpcRequest(Request, UriNamespace); } if (Key == HttpZCacheUtilStartRecording) @@ -1452,6 +1469,7 @@ HttpStructuredCacheService::ReplayRequestRecorder(const CacheRequestContext& Co int TargetPid = 0; CbPackage RpcResult; if (m_RpcHandler.HandleRpcRequest(Context, + /* UriNamespace */ {}, RequestInfo.ContentType, std::move(Body), AcceptMagic, @@ -1497,7 +1515,7 @@ HttpStructuredCacheService::ReplayRequestRecorder(const CacheRequestContext& Co } void -HttpStructuredCacheService::HandleRpcRequest(HttpServerRequest& Request) +HttpStructuredCacheService::HandleRpcRequest(HttpServerRequest& Request, std::string_view UriNamespace) { ZEN_TRACE_CPU("z$::Http::HandleRpcRequest"); @@ -1519,65 +1537,70 @@ HttpStructuredCacheService::HandleRpcRequest(HttpServerRequest& Request) return Request.WriteResponse(HttpResponseCode::BadRequest); } - auto HandleRpc = - [this, RequestContext, Body = Request.ReadPayload(), ContentType, AcceptType](HttpServerRequest& AsyncRequest) mutable { - if (m_RequestRecordingEnabled) + auto HandleRpc = [this, + RequestContext, + Body = Request.ReadPayload(), + ContentType, + AcceptType, + UriNamespaceString = std::string{UriNamespace}](HttpServerRequest& AsyncRequest) mutable { + if (m_RequestRecordingEnabled) + { + RwLock::SharedLockScope _(m_RequestRecordingLock); + if (m_RequestRecorder) { - RwLock::SharedLockScope _(m_RequestRecordingLock); - if (m_RequestRecorder) - { - m_RequestRecorder->RecordRequest( - {.ContentType = ContentType, .AcceptType = AcceptType, .SessionId = RequestContext.SessionId}, - Body); - } + m_RequestRecorder->RecordRequest( + {.ContentType = ContentType, .AcceptType = AcceptType, .SessionId = RequestContext.SessionId}, + Body); } + } - uint32_t AcceptMagic = 0; - RpcAcceptOptions AcceptFlags = RpcAcceptOptions::kNone; - int TargetProcessId = 0; - CbPackage RpcResult; + uint32_t AcceptMagic = 0; + RpcAcceptOptions AcceptFlags = RpcAcceptOptions::kNone; + int TargetProcessId = 0; + CbPackage RpcResult; - CacheRpcHandler::RpcResponseCode ResultCode = m_RpcHandler.HandleRpcRequest(RequestContext, - ContentType, - std::move(Body), - AcceptMagic, - AcceptFlags, - TargetProcessId, - RpcResult); + CacheRpcHandler::RpcResponseCode ResultCode = m_RpcHandler.HandleRpcRequest(RequestContext, + UriNamespaceString, + ContentType, + std::move(Body), + /* out */ AcceptMagic, + /* out */ AcceptFlags, + /* out */ TargetProcessId, + /* out */ RpcResult); - HttpResponseCode HttpResultCode = HttpResponseCode(int(ResultCode)); + HttpResponseCode HttpResultCode = HttpResponseCode(int(ResultCode)); - if (!IsHttpSuccessCode(HttpResultCode)) - { - return AsyncRequest.WriteResponse(HttpResultCode); - } + if (!IsHttpSuccessCode(HttpResultCode)) + { + return AsyncRequest.WriteResponse(HttpResultCode); + } - if (AcceptMagic == kCbPkgMagic) + if (AcceptMagic == kCbPkgMagic) + { + void* TargetProcessHandle = nullptr; + FormatFlags Flags = FormatFlags::kDefault; + if (EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences)) { - void* TargetProcessHandle = nullptr; - FormatFlags Flags = FormatFlags::kDefault; - if (EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences)) + Flags |= FormatFlags::kAllowLocalReferences; + if (!EnumHasAnyFlags(AcceptFlags, RpcAcceptOptions::kAllowPartialLocalReferences)) { - Flags |= FormatFlags::kAllowLocalReferences; - if (!EnumHasAnyFlags(AcceptFlags, RpcAcceptOptions::kAllowPartialLocalReferences)) - { - Flags |= FormatFlags::kDenyPartialLocalReferences; - } - TargetProcessHandle = m_OpenProcessCache.GetProcessHandle(RequestContext.SessionId, TargetProcessId); + Flags |= FormatFlags::kDenyPartialLocalReferences; } - CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResult, Flags, TargetProcessHandle); - AsyncRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer); + TargetProcessHandle = m_OpenProcessCache.GetProcessHandle(RequestContext.SessionId, TargetProcessId); } - else - { - BinaryWriter MemStream; - RpcResult.Save(MemStream); + CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResult, Flags, TargetProcessHandle); + AsyncRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer); + } + else + { + BinaryWriter MemStream; + RpcResult.Save(MemStream); - AsyncRequest.WriteResponse(HttpResponseCode::OK, - HttpContentType::kCbPackage, - IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); - } - }; + AsyncRequest.WriteResponse(HttpResponseCode::OK, + HttpContentType::kCbPackage, + IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); + } + }; if (HasUpstream) { -- cgit v1.2.3 From efd016d84d0940bf616e3efef135532cbf5fedef Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Mon, 21 Oct 2024 15:40:13 +0200 Subject: bucket size queries (#203) - Feature: Added options --bucketsize and --bucketsizes to zen cache-info to get data sizes in cache buckets and attachments --- src/zenserver/cache/httpstructuredcache.cpp | 115 ++++++++++++++++++++++++++++ 1 file changed, 115 insertions(+) (limited to 'src/zenserver/cache/httpstructuredcache.cpp') diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp index 109fb34f6..551b5a76d 100644 --- a/src/zenserver/cache/httpstructuredcache.cpp +++ b/src/zenserver/cache/httpstructuredcache.cpp @@ -23,6 +23,7 @@ #include #include #include +#include #include "upstream/jupiter.h" #include "upstream/upstreamcache.h" @@ -606,6 +607,81 @@ HttpStructuredCacheService::HandleCacheNamespaceRequest(HttpServerRequest& Reque ResponseWriter.AddInteger("EntryCount", Info->DiskLayerInfo.EntryCount); + if (auto Buckets = HttpServerRequest::Decode(Request.GetQueryParams().GetValue("bucketsizes")); !Buckets.empty()) + { + ResponseWriter.BeginObject("BucketSizes"); + + ResponseWriter.BeginArray("Buckets"); + + std::vector BucketNames; + if (Buckets == "*") // Get all - empty FieldFilter equal getting all fields + { + BucketNames = Info.value().BucketNames; + } + else + { + ForEachStrTok(Buckets, ',', [&](std::string_view BucketName) { + BucketNames.push_back(std::string(BucketName)); + return true; + }); + } + WorkerThreadPool& WorkerPool = GetMediumWorkerPool(EWorkloadType::Background); + std::vector AllAttachments; + for (const std::string& BucketName : BucketNames) + { + ResponseWriter.BeginObject(); + ResponseWriter << "Name" << BucketName; + CacheContentStats ContentStats; + bool Success = m_CacheStore.GetContentStats(NamespaceName, BucketName, ContentStats); + if (Success) + { + size_t ValuesSize = 0; + for (const uint64_t Size : ContentStats.ValueSizes) + { + ValuesSize += Size; + } + + std::sort(ContentStats.Attachments.begin(), ContentStats.Attachments.end()); + auto NewEnd = std::unique(ContentStats.Attachments.begin(), ContentStats.Attachments.end()); + ContentStats.Attachments.erase(NewEnd, ContentStats.Attachments.end()); + + ResponseWriter << "Count" << ContentStats.ValueSizes.size(); + ResponseWriter << "StructuredCount" << ContentStats.StructuredValuesCount; + ResponseWriter << "StandaloneCount" << ContentStats.StandaloneValuesCount; + ResponseWriter << "Size" << ValuesSize; + ResponseWriter << "AttachmentCount" << ContentStats.Attachments.size(); + + AllAttachments.insert(AllAttachments.end(), ContentStats.Attachments.begin(), ContentStats.Attachments.end()); + } + ResponseWriter.EndObject(); + } + + ResponseWriter.EndArray(); + + ResponseWriter.BeginObject("Attachments"); + std::sort(AllAttachments.begin(), AllAttachments.end()); + auto NewEnd = std::unique(AllAttachments.begin(), AllAttachments.end()); + AllAttachments.erase(NewEnd, AllAttachments.end()); + + uint64_t AttachmentsSize = 0; + + m_CidStore.IterateChunks( + AllAttachments, + [&](size_t Index, const IoBuffer& Payload) { + ZEN_UNUSED(Index); + AttachmentsSize += Payload.GetSize(); + return true; + }, + &WorkerPool); + + ResponseWriter << "Count" << AllAttachments.size(); + ResponseWriter << "Size" << AttachmentsSize; + + ResponseWriter.EndObject(); + + ResponseWriter.EndObject(); + } + return Request.WriteResponse(HttpResponseCode::OK, ResponseWriter.Save()); } break; @@ -656,6 +732,45 @@ HttpStructuredCacheService::HandleCacheBucketRequest(HttpServerRequest& Request, ResponseWriter.AddInteger("DiskEntryCount", Info->DiskLayerInfo.EntryCount); + if (auto GetBucketSize = Request.GetQueryParams().GetValue("bucketsize"); GetBucketSize == "true") + { + CacheContentStats ContentStats; + bool Success = m_CacheStore.GetContentStats(NamespaceName, BucketName, ContentStats); + if (Success) + { + size_t ValuesSize = 0; + for (const uint64_t Size : ContentStats.ValueSizes) + { + ValuesSize += Size; + } + + std::sort(ContentStats.Attachments.begin(), ContentStats.Attachments.end()); + auto NewEnd = std::unique(ContentStats.Attachments.begin(), ContentStats.Attachments.end()); + ContentStats.Attachments.erase(NewEnd, ContentStats.Attachments.end()); + + ResponseWriter << "Count" << ContentStats.ValueSizes.size(); + ResponseWriter << "StructuredCount" << ContentStats.StructuredValuesCount; + ResponseWriter << "StandaloneCount" << ContentStats.StandaloneValuesCount; + ResponseWriter << "Size" << ValuesSize; + ResponseWriter << "AttachmentCount" << ContentStats.Attachments.size(); + + uint64_t AttachmentsSize = 0; + + WorkerThreadPool& WorkerPool = GetMediumWorkerPool(EWorkloadType::Background); + + m_CidStore.IterateChunks( + ContentStats.Attachments, + [&](size_t Index, const IoBuffer& Payload) { + ZEN_UNUSED(Index); + AttachmentsSize += Payload.GetSize(); + return true; + }, + &WorkerPool); + + ResponseWriter << "AttachmentsSize" << AttachmentsSize; + } + } + return Request.WriteResponse(HttpResponseCode::OK, ResponseWriter.Save()); } break; -- cgit v1.2.3 From bcb81b326a373aa86d7e6a046febc8ba74f21c04 Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Mon, 25 Nov 2024 14:49:04 +0100 Subject: caller controls threshold for bulk-loading chunks in IterateChunks (#222) * Allow caller to control threshold for bulk-loading chunks in IterateChunks * use smaller batch chunk reading for /fileinfos and /chunkinfos as we do not intend to read the payload * use smaller batch read buffer when just querying for size of attachments --- src/zenserver/cache/httpstructuredcache.cpp | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) (limited to 'src/zenserver/cache/httpstructuredcache.cpp') diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp index 551b5a76d..f49f6a645 100644 --- a/src/zenserver/cache/httpstructuredcache.cpp +++ b/src/zenserver/cache/httpstructuredcache.cpp @@ -672,7 +672,8 @@ HttpStructuredCacheService::HandleCacheNamespaceRequest(HttpServerRequest& Reque AttachmentsSize += Payload.GetSize(); return true; }, - &WorkerPool); + &WorkerPool, + 8u * 1024u); ResponseWriter << "Count" << AllAttachments.size(); ResponseWriter << "Size" << AttachmentsSize; @@ -765,7 +766,8 @@ HttpStructuredCacheService::HandleCacheBucketRequest(HttpServerRequest& Request, AttachmentsSize += Payload.GetSize(); return true; }, - &WorkerPool); + &WorkerPool, + 8u * 1024u); ResponseWriter << "AttachmentsSize" << AttachmentsSize; } -- cgit v1.2.3 From e6f44577f469e891ed8dab1492a4c53224e0b765 Mon Sep 17 00:00:00 2001 From: Stefan Boberg Date: Mon, 2 Dec 2024 12:21:53 +0100 Subject: added support for dynamic LLM tags (#245) * added FLLMTag which can be used to register memory tags outside of core * changed `UE_MEMSCOPE` -> `ZEN_MEMSCOPE` for consistency * instrumented some subsystems with dynamic tags --- src/zenserver/cache/httpstructuredcache.cpp | 13 +++++++++++++ 1 file changed, 13 insertions(+) (limited to 'src/zenserver/cache/httpstructuredcache.cpp') diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp index f49f6a645..a6606c7ad 100644 --- a/src/zenserver/cache/httpstructuredcache.cpp +++ b/src/zenserver/cache/httpstructuredcache.cpp @@ -10,6 +10,7 @@ #include #include #include +#include #include #include #include @@ -42,6 +43,14 @@ namespace zen { +const FLLMTag& +GetCacheHttpTag() +{ + static FLLMTag CacheHttpTag("http", FLLMTag("cache")); + + return CacheHttpTag; +} + using namespace std::literals; ////////////////////////////////////////////////////////////////////////// @@ -365,6 +374,8 @@ HttpStructuredCacheService::HandleRequest(HttpServerRequest& Request) { ZEN_TRACE_CPU("z$::Http::HandleRequest"); + ZEN_MEMSCOPE(GetCacheHttpTag()); + metrics::OperationTiming::Scope $(m_HttpRequests); const std::string_view Key = Request.RelativeUri(); @@ -1742,6 +1753,8 @@ HttpStructuredCacheService::HandleRpcRequest(HttpServerRequest& Request, std::st void HttpStructuredCacheService::HandleStatsRequest(HttpServerRequest& Request) { + ZEN_MEMSCOPE(GetCacheHttpTag()); + CbObjectWriter Cbo; EmitSnapshot("requests", m_HttpRequests, Cbo); -- cgit v1.2.3 From e4fc54356fc002256d4a59f238868c6a8841c140 Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Thu, 5 Dec 2024 14:45:44 +0100 Subject: global open process cache for projectstore (#257) * move openprocess cache to central location * enable openprocesscache in projectstore so "getchunks" can send filehandles when requested --- src/zenserver/cache/httpstructuredcache.cpp | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'src/zenserver/cache/httpstructuredcache.cpp') diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp index a6606c7ad..fd116ba8e 100644 --- a/src/zenserver/cache/httpstructuredcache.cpp +++ b/src/zenserver/cache/httpstructuredcache.cpp @@ -86,7 +86,8 @@ HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCach HttpStatsService& StatsService, HttpStatusService& StatusService, UpstreamCache& UpstreamCache, - const DiskWriteBlocker* InDiskWriteBlocker) + const DiskWriteBlocker* InDiskWriteBlocker, + OpenProcessCache& InOpenProcessCache) : m_Log(logging::Get("cache")) , m_CacheStore(InCacheStore) , m_StatsService(StatsService) @@ -94,6 +95,7 @@ HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCach , m_CidStore(InCidStore) , m_UpstreamCache(UpstreamCache) , m_DiskWriteBlocker(InDiskWriteBlocker) +, m_OpenProcessCache(InOpenProcessCache) , m_RpcHandler(m_Log, m_CacheStats, UpstreamCache, InCacheStore, InCidStore, InDiskWriteBlocker) { m_StatsService.RegisterHandler("z$", *this); -- cgit v1.2.3 From 376ba6bf28792971275e9f56181f4b5230b05066 Mon Sep 17 00:00:00 2001 From: Stefan Boberg Date: Wed, 11 Dec 2024 12:58:21 +0100 Subject: Memory tracking improvements (#262) * added LLM tag to properly tag RPC allocations * annotated some more httpsys functions with memory tags * only emit memory scope events if the active tag is different from the new tag --- src/zenserver/cache/httpstructuredcache.cpp | 4 ++++ 1 file changed, 4 insertions(+) (limited to 'src/zenserver/cache/httpstructuredcache.cpp') diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp index fd116ba8e..925c7b42d 100644 --- a/src/zenserver/cache/httpstructuredcache.cpp +++ b/src/zenserver/cache/httpstructuredcache.cpp @@ -51,6 +51,8 @@ GetCacheHttpTag() return CacheHttpTag; } +extern const FLLMTag& GetCacheRpcTag(); + using namespace std::literals; ////////////////////////////////////////////////////////////////////////// @@ -1647,6 +1649,8 @@ HttpStructuredCacheService::ReplayRequestRecorder(const CacheRequestContext& Co void HttpStructuredCacheService::HandleRpcRequest(HttpServerRequest& Request, std::string_view UriNamespace) { + ZEN_MEMSCOPE(GetCacheRpcTag()); + ZEN_TRACE_CPU("z$::Http::HandleRpcRequest"); const bool HasUpstream = m_UpstreamCache.IsActive(); -- cgit v1.2.3 From a5158f9fc806d506590dd9bf0e3282cb76c3ac4e Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Thu, 16 Jan 2025 09:19:08 +0100 Subject: move basicfile.h/cpp -> zencore (#273) move jupiter.h/cpp -> zenutil move packageformat.h/.cpp -> zenhttp zenutil now depends on zenhttp instead of the inverse --- src/zenserver/cache/httpstructuredcache.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'src/zenserver/cache/httpstructuredcache.cpp') diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp index 925c7b42d..b4dc4c7f0 100644 --- a/src/zenserver/cache/httpstructuredcache.cpp +++ b/src/zenserver/cache/httpstructuredcache.cpp @@ -18,15 +18,15 @@ #include #include #include +#include #include #include #include #include #include -#include +#include #include -#include "upstream/jupiter.h" #include "upstream/upstreamcache.h" #include "upstream/zen.h" #include "zenstore/cidstore.h" -- cgit v1.2.3 From 0bd78e41254a74daccd0a9209e5d4a0589ca20fc Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Wed, 22 Jan 2025 15:26:32 +0100 Subject: jupiter code cleanup (#276) * cleanup jupiter * move jupiter files to separate folder * CloudCache -> Jupiter * split up jupiter files * kill redundant JupiterAccessTokenProvider --- src/zenserver/cache/httpstructuredcache.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/zenserver/cache/httpstructuredcache.cpp') diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp index b4dc4c7f0..b9a9ca380 100644 --- a/src/zenserver/cache/httpstructuredcache.cpp +++ b/src/zenserver/cache/httpstructuredcache.cpp @@ -24,7 +24,7 @@ #include #include #include -#include +#include #include #include "upstream/upstreamcache.h" -- cgit v1.2.3 From 0bbd1fb43bbd9f878a2aa326ef06f2dc503a3b3f Mon Sep 17 00:00:00 2001 From: zousar <2936246+zousar@users.noreply.github.com> Date: Wed, 26 Feb 2025 11:49:05 -0700 Subject: Enforce Overwrite Prevention According To Cache Policy Overwrite with differing value should be denied if QueryLocal is not present and StoreLocal is present. Overwrite with equal value should succeed regardless of policy flags. --- src/zenserver/cache/httpstructuredcache.cpp | 88 +++++++++++++++++++---------- 1 file changed, 59 insertions(+), 29 deletions(-) (limited to 'src/zenserver/cache/httpstructuredcache.cpp') diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp index b9a9ca380..d5dd28f68 100644 --- a/src/zenserver/cache/httpstructuredcache.cpp +++ b/src/zenserver/cache/httpstructuredcache.cpp @@ -996,8 +996,12 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con if (Success && StoreLocal) { - m_CacheStore.Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ClientResultValue, {}, nullptr); - m_CacheStats.WriteCount++; + const bool Overwrite = !EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal); + if (m_CacheStore + .Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ClientResultValue, {}, Overwrite, nullptr)) + { + m_CacheStats.WriteCount++; + } } } else if (AcceptType == ZenContentType::kCbPackage) @@ -1082,30 +1086,34 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con if (StoreLocal) { - m_CacheStore.Put(RequestContext, - Ref.Namespace, - Ref.BucketSegment, - Ref.HashKey, - CacheValue, - ReferencedAttachments, - nullptr); - m_CacheStats.WriteCount++; - - if (!WriteAttachmentBuffers.empty()) + const bool Overwrite = !EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal); + if (m_CacheStore.Put(RequestContext, + Ref.Namespace, + Ref.BucketSegment, + Ref.HashKey, + CacheValue, + ReferencedAttachments, + Overwrite, + nullptr)) { - std::vector InsertResults = - m_CidStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes); - for (const CidStore::InsertResult& Result : InsertResults) + m_CacheStats.WriteCount++; + + if (!WriteAttachmentBuffers.empty()) { - if (Result.New) + std::vector InsertResults = + m_CidStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes); + for (const CidStore::InsertResult& Result : InsertResults) { - Count.New++; + if (Result.New) + { + Count.New++; + } } } - } - WriteAttachmentBuffers = {}; - WriteRawHashes = {}; + WriteAttachmentBuffers = {}; + WriteRawHashes = {}; + } } BinaryWriter MemStream; @@ -1224,13 +1232,18 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con { RawHash = IoHash::HashBuffer(SharedBuffer(Body)); } - m_CacheStore.Put(RequestContext, - Ref.Namespace, - Ref.BucketSegment, - Ref.HashKey, - {.Value = Body, .RawSize = RawSize, .RawHash = RawHash}, - {}, - nullptr); + const bool Overwrite = !EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal); + if (!m_CacheStore.Put(RequestContext, + Ref.Namespace, + Ref.BucketSegment, + Ref.HashKey, + {.Value = Body, .RawSize = RawSize, .RawHash = RawHash}, + {}, + Overwrite, + nullptr)) + { + return Request.WriteResponse(HttpResponseCode::Conflict); + } m_CacheStats.WriteCount++; if (HasUpstream && EnumHasAllFlags(PolicyFromUrl, CachePolicy::StoreRemote)) @@ -1279,7 +1292,19 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con TotalCount++; }); - m_CacheStore.Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, {.Value = Body}, ReferencedAttachments, nullptr); + const bool Overwrite = !EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal); + + if (!m_CacheStore.Put(RequestContext, + Ref.Namespace, + Ref.BucketSegment, + Ref.HashKey, + {.Value = Body}, + ReferencedAttachments, + Overwrite, + nullptr)) + { + return Request.WriteResponse(HttpResponseCode::Conflict); + } m_CacheStats.WriteCount++; ZEN_DEBUG("PUTCACHERECORD - '{}/{}/{}' {} '{}' attachments '{}/{}' (valid/total) in {}", @@ -1372,10 +1397,15 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid attachment(s)"sv); } + const bool Overwrite = !EnumHasAllFlags(Policy, CachePolicy::QueryLocal); + ZenCacheValue CacheValue; CacheValue.Value = CacheRecord.GetBuffer().AsIoBuffer(); CacheValue.Value.SetContentType(ZenContentType::kCbObject); - m_CacheStore.Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, CacheValue, ReferencedAttachments); + if (!m_CacheStore.Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, CacheValue, ReferencedAttachments, Overwrite)) + { + return Request.WriteResponse(HttpResponseCode::Conflict); + } m_CacheStats.WriteCount++; if (!WriteAttachmentBuffers.empty()) -- cgit v1.2.3 From 25985163796ba45b028b40662146e44e8eff47a8 Mon Sep 17 00:00:00 2001 From: zousar <2936246+zousar@users.noreply.github.com> Date: Mon, 24 Mar 2025 23:30:03 -0600 Subject: Establish TODOs and unit test for rejected PUT propagation --- src/zenserver/cache/httpstructuredcache.cpp | 3 +++ 1 file changed, 3 insertions(+) (limited to 'src/zenserver/cache/httpstructuredcache.cpp') diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp index d5dd28f68..932e456d2 100644 --- a/src/zenserver/cache/httpstructuredcache.cpp +++ b/src/zenserver/cache/httpstructuredcache.cpp @@ -1233,6 +1233,7 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con RawHash = IoHash::HashBuffer(SharedBuffer(Body)); } const bool Overwrite = !EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal); + // TODO: Propagation for rejected PUTs if (!m_CacheStore.Put(RequestContext, Ref.Namespace, Ref.BucketSegment, @@ -1294,6 +1295,7 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con const bool Overwrite = !EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal); + // TODO: Propagation for rejected PUTs if (!m_CacheStore.Put(RequestContext, Ref.Namespace, Ref.BucketSegment, @@ -1402,6 +1404,7 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con ZenCacheValue CacheValue; CacheValue.Value = CacheRecord.GetBuffer().AsIoBuffer(); CacheValue.Value.SetContentType(ZenContentType::kCbObject); + // TODO: Propagation for rejected PUTs if (!m_CacheStore.Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, CacheValue, ReferencedAttachments, Overwrite)) { return Request.WriteResponse(HttpResponseCode::Conflict); -- cgit v1.2.3 From 49701314f570da3622f11eb37cc889c7d39d9a93 Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Mon, 19 May 2025 22:25:58 +0200 Subject: handle exception with batch work (#401) * use ParallelWork in rpc playback * use ParallelWork in projectstore * use ParallelWork in buildstore * use ParallelWork in cachedisklayer * use ParallelWork in compactcas * use ParallelWork in filecas * don't set abort flag in ParallelWork destructor * add PrepareFileForScatteredWrite for temp files in httpclient * Use PrepareFileForScatteredWrite when stream-decompressing files * be more relaxed when deleting temp files * allow explicit zen-cache when using direct host url without resolving * fix lambda capture when writing loose chunks * no delay when attempting to remove temp files --- src/zenserver/cache/httpstructuredcache.cpp | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) (limited to 'src/zenserver/cache/httpstructuredcache.cpp') diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp index b9a9ca380..f7e63433b 100644 --- a/src/zenserver/cache/httpstructuredcache.cpp +++ b/src/zenserver/cache/httpstructuredcache.cpp @@ -25,6 +25,7 @@ #include #include #include +#include #include #include "upstream/upstreamcache.h" @@ -1585,12 +1586,13 @@ HttpStructuredCacheService::ReplayRequestRecorder(const CacheRequestContext& Co WorkerThreadPool WorkerPool(ThreadCount); uint64_t RequestCount = Replayer.GetRequestCount(); Stopwatch Timer; - auto _ = MakeGuard([&]() { ZEN_INFO("Replayed {} requests in {}", RequestCount, NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); }); - Latch JobLatch(RequestCount); + auto _ = MakeGuard([&]() { ZEN_INFO("Replayed {} requests in {}", RequestCount, NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); }); + std::atomic AbortFlag; + ParallelWork Work(AbortFlag); ZEN_INFO("Replaying {} requests", RequestCount); for (uint64_t RequestIndex = 0; RequestIndex < RequestCount; ++RequestIndex) { - WorkerPool.ScheduleWork([this, &Context, &JobLatch, &Replayer, RequestIndex]() { + Work.ScheduleWork(WorkerPool, [this, &Context, &Replayer, RequestIndex](std::atomic&) { IoBuffer Body; zen::cache::RecordedRequestInfo RequestInfo = Replayer.GetRequest(RequestIndex, /* out */ Body); @@ -1634,16 +1636,15 @@ HttpStructuredCacheService::ReplayRequestRecorder(const CacheRequestContext& Co } } } - JobLatch.CountDown(); }); } - while (!JobLatch.Wait(10000)) - { + Work.Wait(10000, [&](bool IsAborted, std::ptrdiff_t PendingWork) { + ZEN_UNUSED(IsAborted); ZEN_INFO("Replayed {} of {} requests, elapsed {}", - RequestCount - JobLatch.Remaining(), + RequestCount - PendingWork, RequestCount, NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); - } + }); } void -- cgit v1.2.3 From 40b9386054de3c23f77da74eefaa743240d164fd Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Thu, 5 Jun 2025 14:40:02 +0200 Subject: pause, resume and abort running builds cmd (#421) - Feature: `zen builds pause`, `zen builds resume` and `zen builds abort` commands to control a running `zen builds` command - `--process-id` the process id to control, if omitted it tries to find a running process using the same executable as itself - Improvement: Process report now indicates if it is pausing or aborting --- src/zenserver/cache/httpstructuredcache.cpp | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) (limited to 'src/zenserver/cache/httpstructuredcache.cpp') diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp index f7e63433b..9f2e826d6 100644 --- a/src/zenserver/cache/httpstructuredcache.cpp +++ b/src/zenserver/cache/httpstructuredcache.cpp @@ -1588,7 +1588,8 @@ HttpStructuredCacheService::ReplayRequestRecorder(const CacheRequestContext& Co Stopwatch Timer; auto _ = MakeGuard([&]() { ZEN_INFO("Replayed {} requests in {}", RequestCount, NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); }); std::atomic AbortFlag; - ParallelWork Work(AbortFlag); + std::atomic PauseFlag; + ParallelWork Work(AbortFlag, PauseFlag); ZEN_INFO("Replaying {} requests", RequestCount); for (uint64_t RequestIndex = 0; RequestIndex < RequestCount; ++RequestIndex) { @@ -1638,8 +1639,8 @@ HttpStructuredCacheService::ReplayRequestRecorder(const CacheRequestContext& Co } }); } - Work.Wait(10000, [&](bool IsAborted, std::ptrdiff_t PendingWork) { - ZEN_UNUSED(IsAborted); + Work.Wait(10000, [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { + ZEN_UNUSED(IsAborted, IsPaused); ZEN_INFO("Replayed {} of {} requests, elapsed {}", RequestCount - PendingWork, RequestCount, -- cgit v1.2.3 From d000167e12c6dde651ef86be9f67552291ff1b7d Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Mon, 16 Jun 2025 13:17:54 +0200 Subject: graceful wait in parallelwork destructor (#438) * exception safety when issuing ParallelWork * add asserts to Latch usage to catch usage errors * extended error messaging and recovery handling in ParallelWork destructor to help find issues --- src/zenserver/cache/httpstructuredcache.cpp | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) (limited to 'src/zenserver/cache/httpstructuredcache.cpp') diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp index 9f2e826d6..bb0c55618 100644 --- a/src/zenserver/cache/httpstructuredcache.cpp +++ b/src/zenserver/cache/httpstructuredcache.cpp @@ -1593,10 +1593,19 @@ HttpStructuredCacheService::ReplayRequestRecorder(const CacheRequestContext& Co ZEN_INFO("Replaying {} requests", RequestCount); for (uint64_t RequestIndex = 0; RequestIndex < RequestCount; ++RequestIndex) { - Work.ScheduleWork(WorkerPool, [this, &Context, &Replayer, RequestIndex](std::atomic&) { + if (AbortFlag) + { + break; + } + Work.ScheduleWork(WorkerPool, [this, &Context, &Replayer, RequestIndex](std::atomic& AbortFlag) { IoBuffer Body; zen::cache::RecordedRequestInfo RequestInfo = Replayer.GetRequest(RequestIndex, /* out */ Body); + if (AbortFlag) + { + return; + } + if (Body) { uint32_t AcceptMagic = 0; -- cgit v1.2.3 From 081b55a5cf3d9af66d4d0be64fc38ea0055acede Mon Sep 17 00:00:00 2001 From: zousar Date: Tue, 24 Jun 2025 00:42:13 -0600 Subject: Change to PutResult structure Result structure contains status and a string message (may be empty) --- src/zenserver/cache/httpstructuredcache.cpp | 89 ++++++++++++++++++----------- 1 file changed, 57 insertions(+), 32 deletions(-) (limited to 'src/zenserver/cache/httpstructuredcache.cpp') diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp index 932e456d2..224cc6678 100644 --- a/src/zenserver/cache/httpstructuredcache.cpp +++ b/src/zenserver/cache/httpstructuredcache.cpp @@ -996,9 +996,11 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con if (Success && StoreLocal) { - const bool Overwrite = !EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal); - if (m_CacheStore - .Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ClientResultValue, {}, Overwrite, nullptr)) + const bool Overwrite = !EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal); + ZenCacheStore::PutResult PutResult = + m_CacheStore + .Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ClientResultValue, {}, Overwrite, nullptr); + if (PutResult.Status == zen::PutStatus::Success) { m_CacheStats.WriteCount++; } @@ -1086,15 +1088,16 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con if (StoreLocal) { - const bool Overwrite = !EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal); - if (m_CacheStore.Put(RequestContext, - Ref.Namespace, - Ref.BucketSegment, - Ref.HashKey, - CacheValue, - ReferencedAttachments, - Overwrite, - nullptr)) + const bool Overwrite = !EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal); + ZenCacheStore::PutResult PutResult = m_CacheStore.Put(RequestContext, + Ref.Namespace, + Ref.BucketSegment, + Ref.HashKey, + CacheValue, + ReferencedAttachments, + Overwrite, + nullptr); + if (PutResult.Status == zen::PutStatus::Success) { m_CacheStats.WriteCount++; @@ -1204,6 +1207,24 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con return Request.WriteResponse(HttpResponseCode::InsufficientStorage); } + auto WriteFailureResponse = [&Request](const ZenCacheStore::PutResult& PutResult) { + ZEN_UNUSED(PutResult); + + HttpResponseCode ResponseCode = HttpResponseCode::InternalServerError; + switch (PutResult.Status) + { + case zen::PutStatus::Conflict: + ResponseCode = HttpResponseCode::Conflict; + break; + case zen::PutStatus::Invalid: + ResponseCode = HttpResponseCode::BadRequest; + break; + } + + return PutResult.Message.empty() ? Request.WriteResponse(ResponseCode) + : Request.WriteResponse(ResponseCode, zen::HttpContentType::kText, PutResult.Message); + }; + const HttpContentType ContentType = Request.RequestContentType(); Body.SetContentType(ContentType); @@ -1234,16 +1255,17 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con } const bool Overwrite = !EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal); // TODO: Propagation for rejected PUTs - if (!m_CacheStore.Put(RequestContext, - Ref.Namespace, - Ref.BucketSegment, - Ref.HashKey, - {.Value = Body, .RawSize = RawSize, .RawHash = RawHash}, - {}, - Overwrite, - nullptr)) + ZenCacheStore::PutResult PutResult = m_CacheStore.Put(RequestContext, + Ref.Namespace, + Ref.BucketSegment, + Ref.HashKey, + {.Value = Body, .RawSize = RawSize, .RawHash = RawHash}, + {}, + Overwrite, + nullptr); + if (PutResult.Status != zen::PutStatus::Success) { - return Request.WriteResponse(HttpResponseCode::Conflict); + return WriteFailureResponse(PutResult); } m_CacheStats.WriteCount++; @@ -1296,16 +1318,17 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con const bool Overwrite = !EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal); // TODO: Propagation for rejected PUTs - if (!m_CacheStore.Put(RequestContext, - Ref.Namespace, - Ref.BucketSegment, - Ref.HashKey, - {.Value = Body}, - ReferencedAttachments, - Overwrite, - nullptr)) + ZenCacheStore::PutResult PutResult = m_CacheStore.Put(RequestContext, + Ref.Namespace, + Ref.BucketSegment, + Ref.HashKey, + {.Value = Body}, + ReferencedAttachments, + Overwrite, + nullptr); + if (PutResult.Status != zen::PutStatus::Success) { - return Request.WriteResponse(HttpResponseCode::Conflict); + return WriteFailureResponse(PutResult); } m_CacheStats.WriteCount++; @@ -1405,9 +1428,11 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con CacheValue.Value = CacheRecord.GetBuffer().AsIoBuffer(); CacheValue.Value.SetContentType(ZenContentType::kCbObject); // TODO: Propagation for rejected PUTs - if (!m_CacheStore.Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, CacheValue, ReferencedAttachments, Overwrite)) + ZenCacheStore::PutResult PutResult = + m_CacheStore.Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, CacheValue, ReferencedAttachments, Overwrite); + if (PutResult.Status != zen::PutStatus::Success) { - return Request.WriteResponse(HttpResponseCode::Conflict); + return WriteFailureResponse(PutResult); } m_CacheStats.WriteCount++; -- cgit v1.2.3 From 4c05d1041461b630cd5770dae5e8d03147d5674b Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Wed, 20 Aug 2025 12:33:03 +0200 Subject: per namespace/project cas prep refactor (#470) - Refactor so we can have more than one cas store for project store and cache. - Refactor `UpstreamCacheClient` so it is not tied to a specific CidStore - Refactor scrub to keep the GC interface ScrubStorage function separate from scrub accessor functions (renamed to Scrub). - Refactor storage size to keep GC interface StorageSize function separate from size accessor functions (renamed to TotalSize) - Refactor cache storage so `ZenCacheDiskLayer::CacheBucket` implements GcStorage interface rather than `ZenCacheNamespace` --- src/zenserver/cache/httpstructuredcache.cpp | 306 ++++++++++++++++++---------- 1 file changed, 197 insertions(+), 109 deletions(-) (limited to 'src/zenserver/cache/httpstructuredcache.cpp') diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp index acb6053d9..19ac3a216 100644 --- a/src/zenserver/cache/httpstructuredcache.cpp +++ b/src/zenserver/cache/httpstructuredcache.cpp @@ -85,7 +85,7 @@ namespace { ////////////////////////////////////////////////////////////////////////// HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCacheStore, - CidStore& InCidStore, + GetCidStoreFunc&& GetCidStore, HttpStatsService& StatsService, HttpStatusService& StatusService, UpstreamCache& UpstreamCache, @@ -95,11 +95,10 @@ HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCach , m_CacheStore(InCacheStore) , m_StatsService(StatsService) , m_StatusService(StatusService) -, m_CidStore(InCidStore) , m_UpstreamCache(UpstreamCache) , m_DiskWriteBlocker(InDiskWriteBlocker) , m_OpenProcessCache(InOpenProcessCache) -, m_RpcHandler(m_Log, m_CacheStats, UpstreamCache, InCacheStore, InCidStore, InDiskWriteBlocker) +, m_RpcHandler(m_Log, m_CacheStats, UpstreamCache, InCacheStore, std::move(GetCidStore), InDiskWriteBlocker) { m_StatsService.RegisterHandler("z$", *this); m_StatusService.RegisterHandler("z$", *this); @@ -130,24 +129,6 @@ HttpStructuredCacheService::Flush() m_CacheStore.Flush(); } -void -HttpStructuredCacheService::ScrubStorage(ScrubContext& Ctx) -{ - if (m_LastScrubTime == Ctx.ScrubTimestamp()) - { - return; - } - - ZenCacheStore::Info Info = m_CacheStore.GetInfo(); - - ZEN_INFO("scrubbing '{}'", Info.BasePath); - - m_LastScrubTime = Ctx.ScrubTimestamp(); - - m_CidStore.ScrubStorage(Ctx); - m_CacheStore.ScrubStorage(Ctx); -} - void HttpStructuredCacheService::HandleDetailsRequest(HttpServerRequest& Request) { @@ -243,6 +224,9 @@ HttpStructuredCacheService::HandleDetailsRequest(HttpServerRequest& Request) for (const auto& NamespaceIt : ValueDetails.Namespaces) { const std::string& Namespace = NamespaceIt.first; + + CidStore& ChunkStore = m_RpcHandler.GetCidStore(Namespace); + for (const auto& BucketIt : NamespaceIt.second.Buckets) { const std::string& Bucket = BucketIt.first; @@ -252,7 +236,7 @@ HttpStructuredCacheService::HandleDetailsRequest(HttpServerRequest& Request) { for (const IoHash& Hash : ValueIt.second.Attachments) { - IoBuffer Payload = m_CidStore.FindChunkByCid(Hash); + IoBuffer Payload = ChunkStore.FindChunkByCid(Hash); CSVWriter << "\r\n" << Namespace << "," << Bucket << "," << ValueIt.first.ToHexString() << ", " << Hash.ToHexString() << ", " << gsl::narrow(Payload.GetSize()); @@ -270,7 +254,7 @@ HttpStructuredCacheService::HandleDetailsRequest(HttpServerRequest& Request) size_t AttachmentsSize = 0; for (const IoHash& Hash : ValueIt.second.Attachments) { - IoBuffer Payload = m_CidStore.FindChunkByCid(Hash); + IoBuffer Payload = ChunkStore.FindChunkByCid(Hash); AttachmentsSize += Payload.GetSize(); } CSVWriter << ", " << gsl::narrow(AttachmentsSize); @@ -292,6 +276,9 @@ HttpStructuredCacheService::HandleDetailsRequest(HttpServerRequest& Request) for (const auto& NamespaceIt : ValueDetails.Namespaces) { const std::string& Namespace = NamespaceIt.first; + + CidStore& ChunkStore = m_RpcHandler.GetCidStore(Namespace); + Cbo.BeginObject(); { Cbo.AddString("name", Namespace); @@ -334,7 +321,7 @@ HttpStructuredCacheService::HandleDetailsRequest(HttpServerRequest& Request) { Cbo.BeginObject(); Cbo.AddHash("cid", Hash); - IoBuffer Payload = m_CidStore.FindChunkByCid(Hash); + IoBuffer Payload = ChunkStore.FindChunkByCid(Hash); Cbo.AddInteger("size", gsl::narrow(Payload.GetSize())); Cbo.EndObject(); } @@ -348,7 +335,7 @@ HttpStructuredCacheService::HandleDetailsRequest(HttpServerRequest& Request) size_t AttachmentsSize = 0; for (const IoHash& Hash : ValueIt.second.Attachments) { - IoBuffer Payload = m_CidStore.FindChunkByCid(Hash); + IoBuffer Payload = ChunkStore.FindChunkByCid(Hash); AttachmentsSize += Payload.GetSize(); } Cbo.AddInteger("attachmentssize", gsl::narrow(AttachmentsSize)); @@ -623,6 +610,8 @@ HttpStructuredCacheService::HandleCacheNamespaceRequest(HttpServerRequest& Reque ResponseWriter.AddInteger("EntryCount", Info->DiskLayerInfo.EntryCount); + CidStore& ChunkStore = m_RpcHandler.GetCidStore(NamespaceName); + if (auto Buckets = HttpServerRequest::Decode(Request.GetQueryParams().GetValue("bucketsizes")); !Buckets.empty()) { ResponseWriter.BeginObject("BucketSizes"); @@ -681,7 +670,7 @@ HttpStructuredCacheService::HandleCacheNamespaceRequest(HttpServerRequest& Reque uint64_t AttachmentsSize = 0; - m_CidStore.IterateChunks( + ChunkStore.IterateChunks( AllAttachments, [&](size_t Index, const IoBuffer& Payload) { ZEN_UNUSED(Index); @@ -749,6 +738,8 @@ HttpStructuredCacheService::HandleCacheBucketRequest(HttpServerRequest& Request, ResponseWriter.AddInteger("DiskEntryCount", Info->DiskLayerInfo.EntryCount); + CidStore& ChunkStore = m_RpcHandler.GetCidStore(NamespaceName); + if (auto GetBucketSize = Request.GetQueryParams().GetValue("bucketsize"); GetBucketSize == "true") { CacheContentStats ContentStats; @@ -775,7 +766,7 @@ HttpStructuredCacheService::HandleCacheBucketRequest(HttpServerRequest& Request, WorkerThreadPool& WorkerPool = GetMediumWorkerPool(EWorkloadType::Background); - m_CidStore.IterateChunks( + ChunkStore.IterateChunks( ContentStats.Attachments, [&](size_t Index, const IoBuffer& Payload) { ZEN_UNUSED(Index); @@ -850,6 +841,8 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con CacheRequestContext RequestContext = {.SessionId = Request.SessionId(), .RequestId = Request.RequestId()}; Stopwatch Timer; + CidStore& ChunkStore = m_RpcHandler.GetCidStore(Ref.Namespace); + if (EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal) && m_CacheStore.Get(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ClientResultValue)) { @@ -864,17 +857,17 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con uint32_t MissingCount = 0; CbObjectView CacheRecord(ClientResultValue.Value.Data()); - CacheRecord.IterateAttachments([this, &MissingCount, &Package, SkipData](CbFieldView AttachmentHash) { + CacheRecord.IterateAttachments([this, &ChunkStore, &MissingCount, &Package, SkipData](CbFieldView AttachmentHash) { if (SkipData) { - if (!m_CidStore.ContainsChunk(AttachmentHash.AsHash())) + if (!ChunkStore.ContainsChunk(AttachmentHash.AsHash())) { MissingCount++; } } else { - if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash())) + if (IoBuffer Chunk = ChunkStore.FindChunkByCid(AttachmentHash.AsHash())) { CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(Chunk)); if (Compressed) @@ -974,6 +967,8 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con { Success = true; + CidStore& ChunkStore = m_RpcHandler.GetCidStore(Ref.Namespace); + ClientResultValue.Value = UpstreamResult.Value; ClientResultValue.Value.SetContentType(AcceptType); @@ -1024,6 +1019,7 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con CacheRecord.IterateAttachments([this, &Package, &Ref, + &ChunkStore, &WriteAttachmentBuffers, &WriteRawHashes, &ReferencedAttachments, @@ -1058,12 +1054,12 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con { if (SkipData) { - if (m_CidStore.ContainsChunk(Hash)) + if (ChunkStore.ContainsChunk(Hash)) { Count.Valid++; } } - else if (IoBuffer Chunk = m_CidStore.FindChunkByCid(Hash)) + else if (IoBuffer Chunk = ChunkStore.FindChunkByCid(Hash)) { CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(Chunk)); if (Compressed) @@ -1105,7 +1101,7 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con if (!WriteAttachmentBuffers.empty()) { std::vector InsertResults = - m_CidStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes); + ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes); for (const CidStore::InsertResult& Result : InsertResults) { if (Result.New) @@ -1272,7 +1268,10 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con if (HasUpstream && EnumHasAllFlags(PolicyFromUrl, CachePolicy::StoreRemote)) { - m_UpstreamCache.EnqueueUpstream({.Type = ContentType, .Namespace = Ref.Namespace, .Key = {Ref.BucketSegment, Ref.HashKey}}); + CidStore& ChunkStore = m_RpcHandler.GetCidStore(Ref.Namespace); + m_UpstreamCache.EnqueueUpstream( + {.Type = ContentType, .Namespace = Ref.Namespace, .Key = {Ref.BucketSegment, Ref.HashKey}}, + [ChunkStore = &ChunkStore](const IoHash& ValueHash) { return ChunkStore->FindChunkByCid(ValueHash); }); } ZEN_DEBUG("PUTCACHERECORD - '{}/{}/{}' {} '{}' in {}", @@ -1306,15 +1305,18 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con std::vector ReferencedAttachments; int32_t TotalCount = 0; - CacheRecord.IterateAttachments([this, &TotalCount, &ValidAttachments, &ReferencedAttachments](CbFieldView AttachmentHash) { - const IoHash Hash = AttachmentHash.AsHash(); - ReferencedAttachments.push_back(Hash); - if (m_CidStore.ContainsChunk(Hash)) - { - ValidAttachments.emplace_back(Hash); - } - TotalCount++; - }); + CidStore& ChunkStore = m_RpcHandler.GetCidStore(Ref.Namespace); + + CacheRecord.IterateAttachments( + [this, &ChunkStore, &TotalCount, &ValidAttachments, &ReferencedAttachments](CbFieldView AttachmentHash) { + const IoHash Hash = AttachmentHash.AsHash(); + ReferencedAttachments.push_back(Hash); + if (ChunkStore.ContainsChunk(Hash)) + { + ValidAttachments.emplace_back(Hash); + } + TotalCount++; + }); const bool Overwrite = !EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal); @@ -1348,10 +1350,12 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con CachePolicy Policy = PolicyFromUrl; if (HasUpstream && EnumHasAllFlags(Policy, CachePolicy::StoreRemote) && !IsPartialRecord) { - m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbObject, - .Namespace = Ref.Namespace, - .Key = {Ref.BucketSegment, Ref.HashKey}, - .ValueContentIds = std::move(ValidAttachments)}); + m_UpstreamCache.EnqueueUpstream( + {.Type = ZenContentType::kCbObject, + .Namespace = Ref.Namespace, + .Key = {Ref.BucketSegment, Ref.HashKey}, + .ValueContentIds = std::move(ValidAttachments)}, + [ChunkStore = &ChunkStore](const IoHash& ValueHash) { return ChunkStore->FindChunkByCid(ValueHash); }); } Request.WriteResponse(HttpResponseCode::Created); @@ -1384,38 +1388,46 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con WriteAttachmentBuffers.reserve(NumAttachments); WriteRawHashes.reserve(NumAttachments); - CacheRecord.IterateAttachments( - [this, &Ref, &Package, &WriteAttachmentBuffers, &WriteRawHashes, &ValidAttachments, &ReferencedAttachments, &Count]( - CbFieldView HashView) { - const IoHash Hash = HashView.AsHash(); - ReferencedAttachments.push_back(Hash); - if (const CbAttachment* Attachment = Package.FindAttachment(Hash)) - { - if (Attachment->IsCompressedBinary()) - { - WriteAttachmentBuffers.emplace_back(Attachment->AsCompressedBinary().GetCompressed().Flatten().AsIoBuffer()); - WriteRawHashes.push_back(Hash); - ValidAttachments.emplace_back(Hash); - Count.Valid++; - } - else - { - ZEN_WARN("PUTCACHERECORD - '{}/{}/{}' '{}' FAILED, attachment '{}' is not compressed", - Ref.Namespace, - Ref.BucketSegment, - Ref.HashKey, - ToString(HttpContentType::kCbPackage), - Hash); - Count.Invalid++; - } - } - else if (m_CidStore.ContainsChunk(Hash)) + CidStore& ChunkStore = m_RpcHandler.GetCidStore(Ref.Namespace); + + CacheRecord.IterateAttachments([this, + &Ref, + &Package, + &ChunkStore, + &WriteAttachmentBuffers, + &WriteRawHashes, + &ValidAttachments, + &ReferencedAttachments, + &Count](CbFieldView HashView) { + const IoHash Hash = HashView.AsHash(); + ReferencedAttachments.push_back(Hash); + if (const CbAttachment* Attachment = Package.FindAttachment(Hash)) + { + if (Attachment->IsCompressedBinary()) { + WriteAttachmentBuffers.emplace_back(Attachment->AsCompressedBinary().GetCompressed().Flatten().AsIoBuffer()); + WriteRawHashes.push_back(Hash); ValidAttachments.emplace_back(Hash); Count.Valid++; } - Count.Total++; - }); + else + { + ZEN_WARN("PUTCACHERECORD - '{}/{}/{}' '{}' FAILED, attachment '{}' is not compressed", + Ref.Namespace, + Ref.BucketSegment, + Ref.HashKey, + ToString(HttpContentType::kCbPackage), + Hash); + Count.Invalid++; + } + } + else if (ChunkStore.ContainsChunk(Hash)) + { + ValidAttachments.emplace_back(Hash); + Count.Valid++; + } + Count.Total++; + }); if (Count.Invalid > 0) { @@ -1439,7 +1451,7 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con if (!WriteAttachmentBuffers.empty()) { - std::vector InsertResults = m_CidStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes); + std::vector InsertResults = ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes); for (const CidStore::InsertResult& InsertResult : InsertResults) { if (InsertResult.New) @@ -1466,10 +1478,12 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con if (HasUpstream && EnumHasAllFlags(Policy, CachePolicy::StoreRemote) && !IsPartialRecord) { - m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbPackage, - .Namespace = Ref.Namespace, - .Key = {Ref.BucketSegment, Ref.HashKey}, - .ValueContentIds = std::move(ValidAttachments)}); + m_UpstreamCache.EnqueueUpstream( + {.Type = ZenContentType::kCbPackage, + .Namespace = Ref.Namespace, + .Key = {Ref.BucketSegment, Ref.HashKey}, + .ValueContentIds = std::move(ValidAttachments)}, + [ChunkStore = &ChunkStore](const IoHash& ValueHash) { return ChunkStore->FindChunkByCid(ValueHash); }); } Request.WriteResponse(HttpResponseCode::Created); @@ -1503,7 +1517,9 @@ HttpStructuredCacheService::HandleGetCacheChunk(HttpServerRequest& Request, cons { Stopwatch Timer; - IoBuffer Value = m_CidStore.FindChunkByCid(Ref.ValueContentId); + CidStore& ChunkStore = m_RpcHandler.GetCidStore(Ref.Namespace); + + IoBuffer Value = ChunkStore.FindChunkByCid(Ref.ValueContentId); const UpstreamEndpointInfo* Source = nullptr; CachePolicy Policy = PolicyFromUrl; @@ -1525,7 +1541,7 @@ HttpStructuredCacheService::HandleGetCacheChunk(HttpServerRequest& Request, cons { if (AreDiskWritesAllowed()) { - m_CidStore.AddChunk(UpstreamResult.Value, RawHash); + ChunkStore.AddChunk(UpstreamResult.Value, RawHash); } Source = UpstreamResult.Source; } @@ -1619,7 +1635,9 @@ HttpStructuredCacheService::HandlePutCacheChunk(HttpServerRequest& Request, cons "ValueContentId does not match attachment hash"sv); } - CidStore::InsertResult Result = m_CidStore.AddChunk(Body, RawHash); + CidStore& ChunkStore = m_RpcHandler.GetCidStore(Ref.Namespace); + + CidStore::InsertResult Result = ChunkStore.AddChunk(Body, RawHash); ZEN_DEBUG("PUTCACHECHUNK - '{}/{}/{}/{}' {} '{}' ({}) in {}", Ref.Namespace, @@ -1834,16 +1852,51 @@ HttpStructuredCacheService::HandleStatsRequest(HttpServerRequest& Request) EmitSnapshot("requests", m_HttpRequests, Cbo); - const uint64_t HitCount = m_CacheStats.HitCount; - const uint64_t UpstreamHitCount = m_CacheStats.UpstreamHitCount; - const uint64_t MissCount = m_CacheStats.MissCount; - const uint64_t WriteCount = m_CacheStats.WriteCount; - const uint64_t BadRequestCount = m_CacheStats.BadRequestCount; - struct CidStoreStats StoreStats = m_CidStore.Stats(); - const uint64_t ChunkHitCount = StoreStats.HitCount; - const uint64_t ChunkMissCount = StoreStats.MissCount; - const uint64_t ChunkWriteCount = StoreStats.WriteCount; - const uint64_t TotalCount = HitCount + MissCount; + const uint64_t HitCount = m_CacheStats.HitCount; + const uint64_t UpstreamHitCount = m_CacheStats.UpstreamHitCount; + const uint64_t MissCount = m_CacheStats.MissCount; + const uint64_t WriteCount = m_CacheStats.WriteCount; + const uint64_t BadRequestCount = m_CacheStats.BadRequestCount; + + uint64_t TotalChunkHitCount = 0; + uint64_t TotalChunkMissCount = 0; + uint64_t TotalChunkWriteCount = 0; + CidStoreSize TotalCidSize; + + tsl::robin_map UniqueStores; + { + std::vector NamespaceNames = m_CacheStore.GetNamespaces(); + + for (const std::string& NamespaceName : NamespaceNames) + { + CidStore* Store = &m_RpcHandler.GetCidStore(NamespaceName); + if (auto It = UniqueStores.find(Store); It == UniqueStores.end()) + { + UniqueStores.insert_or_assign(Store, NamespaceName); + } + else + { + UniqueStores.insert_or_assign(Store, std::string{}); + } + } + + for (auto It : UniqueStores) + { + CidStore* ChunkStore = It.first; + + CidStoreStats StoreStats = ChunkStore->Stats(); + CidStoreSize StoreSize = ChunkStore->TotalSize(); + + TotalChunkHitCount += StoreStats.HitCount; + TotalChunkMissCount += StoreStats.MissCount; + TotalChunkWriteCount += StoreStats.WriteCount; + + TotalCidSize.TinySize += StoreSize.TinySize; + TotalCidSize.SmallSize += StoreSize.SmallSize; + TotalCidSize.LargeSize += StoreSize.LargeSize; + TotalCidSize.TotalSize += StoreSize.TotalSize; + } + } const uint64_t RpcRequests = m_CacheStats.RpcRequests; const uint64_t RpcRecordRequests = m_CacheStats.RpcRecordRequests; @@ -1853,17 +1906,11 @@ HttpStructuredCacheService::HandleStatsRequest(HttpServerRequest& Request) const uint64_t RpcChunkRequests = m_CacheStats.RpcChunkRequests; const uint64_t RpcChunkBatchRequests = m_CacheStats.RpcChunkBatchRequests; - const CidStoreSize CidSize = m_CidStore.TotalSize(); - const GcStorageSize CacheSize = m_CacheStore.StorageSize(); + const CacheStoreSize CacheSize = m_CacheStore.TotalSize(); bool ShowCidStoreStats = Request.GetQueryParams().GetValue("cidstorestats") == "true"; bool ShowCacheStoreStats = Request.GetQueryParams().GetValue("cachestorestats") == "true"; - CidStoreStats CidStoreStats = {}; - if (ShowCidStoreStats) - { - CidStoreStats = m_CidStore.Stats(); - } ZenCacheStore::CacheStoreStats CacheStoreStats = {}; if (ShowCacheStoreStats) { @@ -1898,6 +1945,7 @@ HttpStructuredCacheService::HandleStatsRequest(HttpServerRequest& Request) Cbo.EndObject(); Cbo << "hits" << HitCount << "misses" << MissCount << "writes" << WriteCount; + const uint64_t TotalCount = HitCount + MissCount; Cbo << "hit_ratio" << (TotalCount > 0 ? (double(HitCount) / double(TotalCount)) : 0.0); if (m_UpstreamCache.IsActive()) @@ -1908,7 +1956,9 @@ HttpStructuredCacheService::HandleStatsRequest(HttpServerRequest& Request) Cbo << "upstream_ratio" << (HitCount > 0 ? (double(UpstreamHitCount) / double(HitCount)) : 0.0); } - Cbo << "cidhits" << ChunkHitCount << "cidmisses" << ChunkMissCount << "cidwrites" << ChunkWriteCount; + Cbo << "cidhits" << TotalChunkHitCount << "cidmisses" << TotalChunkMissCount << "cidwrites" << TotalChunkWriteCount; + const uint64_t TotalChunkCount = TotalChunkHitCount + TotalChunkMissCount; + Cbo << "cidhit_ratio" << (TotalChunkHitCount ? (double(TotalChunkCount) / double(TotalChunkHitCount)) : 0.0); if (ShowCacheStoreStats) { @@ -2017,20 +2067,58 @@ HttpStructuredCacheService::HandleStatsRequest(HttpServerRequest& Request) { Cbo.BeginObject("size"); { - Cbo << "tiny" << CidSize.TinySize; - Cbo << "small" << CidSize.SmallSize; - Cbo << "large" << CidSize.LargeSize; - Cbo << "total" << CidSize.TotalSize; + Cbo << "tiny" << TotalCidSize.TinySize; + Cbo << "small" << TotalCidSize.SmallSize; + Cbo << "large" << TotalCidSize.LargeSize; + Cbo << "total" << TotalCidSize.TotalSize; } Cbo.EndObject(); if (ShowCidStoreStats) { Cbo.BeginObject("store"); - Cbo << "hits" << CidStoreStats.HitCount << "misses" << CidStoreStats.MissCount << "writes" << CidStoreStats.WriteCount; - EmitSnapshot("read", CidStoreStats.FindChunkOps, Cbo); - EmitSnapshot("write", CidStoreStats.AddChunkOps, Cbo); - // EmitSnapshot("exists", CidStoreStats.ContainChunkOps, Cbo); + + auto OutputStats = [&](CidStore& ChunkStore) { + CidStoreStats StoreStats = ChunkStore.Stats(); + Cbo << "hits" << StoreStats.HitCount << "misses" << StoreStats.MissCount << "writes" << StoreStats.WriteCount; + const uint64_t Count = StoreStats.HitCount + StoreStats.MissCount; + Cbo << "hit_ratio" << (Count ? (double(StoreStats.HitCount) / double(Count)) : 0.0); + EmitSnapshot("read", StoreStats.FindChunkOps, Cbo); + EmitSnapshot("write", StoreStats.AddChunkOps, Cbo); + }; + + if (UniqueStores.size() > 1) + { + Cbo.BeginArray("namespaces"); + for (auto It : UniqueStores) + { + CidStore* ChunkStore = It.first; + const std::string& Namespace = It.second; + CidStoreSize ChunkStoreSize = ChunkStore->TotalSize(); + Cbo.BeginObject(); + { + Cbo << "namespace" << Namespace; + Cbo.BeginObject("stats"); + OutputStats(*ChunkStore); + Cbo.EndObject(); + + Cbo.BeginObject("size"); + { + Cbo << "tiny" << ChunkStoreSize.TinySize; + Cbo << "small" << ChunkStoreSize.SmallSize; + Cbo << "large" << ChunkStoreSize.LargeSize; + Cbo << "total" << ChunkStoreSize.TotalSize; + } + Cbo.EndObject(); + } + Cbo.EndObject(); + } + Cbo.EndArray(); // namespaces + } + else if (UniqueStores.size() != 0) + { + OutputStats(*UniqueStores.begin()->first); + } Cbo.EndObject(); } } -- cgit v1.2.3 From fb137cf9c8b7a9d1659b03472c9591c4863e9173 Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Tue, 26 Aug 2025 11:43:37 +0200 Subject: revert multi-cid store (#475) --- src/zenserver/cache/httpstructuredcache.cpp | 286 +++++++++------------------- 1 file changed, 90 insertions(+), 196 deletions(-) (limited to 'src/zenserver/cache/httpstructuredcache.cpp') diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp index 19ac3a216..68f1c602e 100644 --- a/src/zenserver/cache/httpstructuredcache.cpp +++ b/src/zenserver/cache/httpstructuredcache.cpp @@ -85,7 +85,7 @@ namespace { ////////////////////////////////////////////////////////////////////////// HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCacheStore, - GetCidStoreFunc&& GetCidStore, + CidStore& InCidStore, HttpStatsService& StatsService, HttpStatusService& StatusService, UpstreamCache& UpstreamCache, @@ -95,10 +95,11 @@ HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCach , m_CacheStore(InCacheStore) , m_StatsService(StatsService) , m_StatusService(StatusService) +, m_CidStore(InCidStore) , m_UpstreamCache(UpstreamCache) , m_DiskWriteBlocker(InDiskWriteBlocker) , m_OpenProcessCache(InOpenProcessCache) -, m_RpcHandler(m_Log, m_CacheStats, UpstreamCache, InCacheStore, std::move(GetCidStore), InDiskWriteBlocker) +, m_RpcHandler(m_Log, m_CacheStats, UpstreamCache, InCacheStore, InCidStore, InDiskWriteBlocker) { m_StatsService.RegisterHandler("z$", *this); m_StatusService.RegisterHandler("z$", *this); @@ -224,9 +225,6 @@ HttpStructuredCacheService::HandleDetailsRequest(HttpServerRequest& Request) for (const auto& NamespaceIt : ValueDetails.Namespaces) { const std::string& Namespace = NamespaceIt.first; - - CidStore& ChunkStore = m_RpcHandler.GetCidStore(Namespace); - for (const auto& BucketIt : NamespaceIt.second.Buckets) { const std::string& Bucket = BucketIt.first; @@ -236,7 +234,7 @@ HttpStructuredCacheService::HandleDetailsRequest(HttpServerRequest& Request) { for (const IoHash& Hash : ValueIt.second.Attachments) { - IoBuffer Payload = ChunkStore.FindChunkByCid(Hash); + IoBuffer Payload = m_CidStore.FindChunkByCid(Hash); CSVWriter << "\r\n" << Namespace << "," << Bucket << "," << ValueIt.first.ToHexString() << ", " << Hash.ToHexString() << ", " << gsl::narrow(Payload.GetSize()); @@ -254,7 +252,7 @@ HttpStructuredCacheService::HandleDetailsRequest(HttpServerRequest& Request) size_t AttachmentsSize = 0; for (const IoHash& Hash : ValueIt.second.Attachments) { - IoBuffer Payload = ChunkStore.FindChunkByCid(Hash); + IoBuffer Payload = m_CidStore.FindChunkByCid(Hash); AttachmentsSize += Payload.GetSize(); } CSVWriter << ", " << gsl::narrow(AttachmentsSize); @@ -276,9 +274,6 @@ HttpStructuredCacheService::HandleDetailsRequest(HttpServerRequest& Request) for (const auto& NamespaceIt : ValueDetails.Namespaces) { const std::string& Namespace = NamespaceIt.first; - - CidStore& ChunkStore = m_RpcHandler.GetCidStore(Namespace); - Cbo.BeginObject(); { Cbo.AddString("name", Namespace); @@ -321,7 +316,7 @@ HttpStructuredCacheService::HandleDetailsRequest(HttpServerRequest& Request) { Cbo.BeginObject(); Cbo.AddHash("cid", Hash); - IoBuffer Payload = ChunkStore.FindChunkByCid(Hash); + IoBuffer Payload = m_CidStore.FindChunkByCid(Hash); Cbo.AddInteger("size", gsl::narrow(Payload.GetSize())); Cbo.EndObject(); } @@ -335,7 +330,7 @@ HttpStructuredCacheService::HandleDetailsRequest(HttpServerRequest& Request) size_t AttachmentsSize = 0; for (const IoHash& Hash : ValueIt.second.Attachments) { - IoBuffer Payload = ChunkStore.FindChunkByCid(Hash); + IoBuffer Payload = m_CidStore.FindChunkByCid(Hash); AttachmentsSize += Payload.GetSize(); } Cbo.AddInteger("attachmentssize", gsl::narrow(AttachmentsSize)); @@ -610,8 +605,6 @@ HttpStructuredCacheService::HandleCacheNamespaceRequest(HttpServerRequest& Reque ResponseWriter.AddInteger("EntryCount", Info->DiskLayerInfo.EntryCount); - CidStore& ChunkStore = m_RpcHandler.GetCidStore(NamespaceName); - if (auto Buckets = HttpServerRequest::Decode(Request.GetQueryParams().GetValue("bucketsizes")); !Buckets.empty()) { ResponseWriter.BeginObject("BucketSizes"); @@ -670,7 +663,7 @@ HttpStructuredCacheService::HandleCacheNamespaceRequest(HttpServerRequest& Reque uint64_t AttachmentsSize = 0; - ChunkStore.IterateChunks( + m_CidStore.IterateChunks( AllAttachments, [&](size_t Index, const IoBuffer& Payload) { ZEN_UNUSED(Index); @@ -738,8 +731,6 @@ HttpStructuredCacheService::HandleCacheBucketRequest(HttpServerRequest& Request, ResponseWriter.AddInteger("DiskEntryCount", Info->DiskLayerInfo.EntryCount); - CidStore& ChunkStore = m_RpcHandler.GetCidStore(NamespaceName); - if (auto GetBucketSize = Request.GetQueryParams().GetValue("bucketsize"); GetBucketSize == "true") { CacheContentStats ContentStats; @@ -766,7 +757,7 @@ HttpStructuredCacheService::HandleCacheBucketRequest(HttpServerRequest& Request, WorkerThreadPool& WorkerPool = GetMediumWorkerPool(EWorkloadType::Background); - ChunkStore.IterateChunks( + m_CidStore.IterateChunks( ContentStats.Attachments, [&](size_t Index, const IoBuffer& Payload) { ZEN_UNUSED(Index); @@ -841,8 +832,6 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con CacheRequestContext RequestContext = {.SessionId = Request.SessionId(), .RequestId = Request.RequestId()}; Stopwatch Timer; - CidStore& ChunkStore = m_RpcHandler.GetCidStore(Ref.Namespace); - if (EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal) && m_CacheStore.Get(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ClientResultValue)) { @@ -857,17 +846,17 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con uint32_t MissingCount = 0; CbObjectView CacheRecord(ClientResultValue.Value.Data()); - CacheRecord.IterateAttachments([this, &ChunkStore, &MissingCount, &Package, SkipData](CbFieldView AttachmentHash) { + CacheRecord.IterateAttachments([this, &MissingCount, &Package, SkipData](CbFieldView AttachmentHash) { if (SkipData) { - if (!ChunkStore.ContainsChunk(AttachmentHash.AsHash())) + if (!m_CidStore.ContainsChunk(AttachmentHash.AsHash())) { MissingCount++; } } else { - if (IoBuffer Chunk = ChunkStore.FindChunkByCid(AttachmentHash.AsHash())) + if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash())) { CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(Chunk)); if (Compressed) @@ -967,8 +956,6 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con { Success = true; - CidStore& ChunkStore = m_RpcHandler.GetCidStore(Ref.Namespace); - ClientResultValue.Value = UpstreamResult.Value; ClientResultValue.Value.SetContentType(AcceptType); @@ -1019,7 +1006,6 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con CacheRecord.IterateAttachments([this, &Package, &Ref, - &ChunkStore, &WriteAttachmentBuffers, &WriteRawHashes, &ReferencedAttachments, @@ -1054,12 +1040,12 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con { if (SkipData) { - if (ChunkStore.ContainsChunk(Hash)) + if (m_CidStore.ContainsChunk(Hash)) { Count.Valid++; } } - else if (IoBuffer Chunk = ChunkStore.FindChunkByCid(Hash)) + else if (IoBuffer Chunk = m_CidStore.FindChunkByCid(Hash)) { CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(Chunk)); if (Compressed) @@ -1101,7 +1087,7 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con if (!WriteAttachmentBuffers.empty()) { std::vector InsertResults = - ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes); + m_CidStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes); for (const CidStore::InsertResult& Result : InsertResults) { if (Result.New) @@ -1268,10 +1254,7 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con if (HasUpstream && EnumHasAllFlags(PolicyFromUrl, CachePolicy::StoreRemote)) { - CidStore& ChunkStore = m_RpcHandler.GetCidStore(Ref.Namespace); - m_UpstreamCache.EnqueueUpstream( - {.Type = ContentType, .Namespace = Ref.Namespace, .Key = {Ref.BucketSegment, Ref.HashKey}}, - [ChunkStore = &ChunkStore](const IoHash& ValueHash) { return ChunkStore->FindChunkByCid(ValueHash); }); + m_UpstreamCache.EnqueueUpstream({.Type = ContentType, .Namespace = Ref.Namespace, .Key = {Ref.BucketSegment, Ref.HashKey}}); } ZEN_DEBUG("PUTCACHERECORD - '{}/{}/{}' {} '{}' in {}", @@ -1305,18 +1288,15 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con std::vector ReferencedAttachments; int32_t TotalCount = 0; - CidStore& ChunkStore = m_RpcHandler.GetCidStore(Ref.Namespace); - - CacheRecord.IterateAttachments( - [this, &ChunkStore, &TotalCount, &ValidAttachments, &ReferencedAttachments](CbFieldView AttachmentHash) { - const IoHash Hash = AttachmentHash.AsHash(); - ReferencedAttachments.push_back(Hash); - if (ChunkStore.ContainsChunk(Hash)) - { - ValidAttachments.emplace_back(Hash); - } - TotalCount++; - }); + CacheRecord.IterateAttachments([this, &TotalCount, &ValidAttachments, &ReferencedAttachments](CbFieldView AttachmentHash) { + const IoHash Hash = AttachmentHash.AsHash(); + ReferencedAttachments.push_back(Hash); + if (m_CidStore.ContainsChunk(Hash)) + { + ValidAttachments.emplace_back(Hash); + } + TotalCount++; + }); const bool Overwrite = !EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal); @@ -1350,12 +1330,10 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con CachePolicy Policy = PolicyFromUrl; if (HasUpstream && EnumHasAllFlags(Policy, CachePolicy::StoreRemote) && !IsPartialRecord) { - m_UpstreamCache.EnqueueUpstream( - {.Type = ZenContentType::kCbObject, - .Namespace = Ref.Namespace, - .Key = {Ref.BucketSegment, Ref.HashKey}, - .ValueContentIds = std::move(ValidAttachments)}, - [ChunkStore = &ChunkStore](const IoHash& ValueHash) { return ChunkStore->FindChunkByCid(ValueHash); }); + m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbObject, + .Namespace = Ref.Namespace, + .Key = {Ref.BucketSegment, Ref.HashKey}, + .ValueContentIds = std::move(ValidAttachments)}); } Request.WriteResponse(HttpResponseCode::Created); @@ -1388,46 +1366,38 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con WriteAttachmentBuffers.reserve(NumAttachments); WriteRawHashes.reserve(NumAttachments); - CidStore& ChunkStore = m_RpcHandler.GetCidStore(Ref.Namespace); - - CacheRecord.IterateAttachments([this, - &Ref, - &Package, - &ChunkStore, - &WriteAttachmentBuffers, - &WriteRawHashes, - &ValidAttachments, - &ReferencedAttachments, - &Count](CbFieldView HashView) { - const IoHash Hash = HashView.AsHash(); - ReferencedAttachments.push_back(Hash); - if (const CbAttachment* Attachment = Package.FindAttachment(Hash)) - { - if (Attachment->IsCompressedBinary()) + CacheRecord.IterateAttachments( + [this, &Ref, &Package, &WriteAttachmentBuffers, &WriteRawHashes, &ValidAttachments, &ReferencedAttachments, &Count]( + CbFieldView HashView) { + const IoHash Hash = HashView.AsHash(); + ReferencedAttachments.push_back(Hash); + if (const CbAttachment* Attachment = Package.FindAttachment(Hash)) { - WriteAttachmentBuffers.emplace_back(Attachment->AsCompressedBinary().GetCompressed().Flatten().AsIoBuffer()); - WriteRawHashes.push_back(Hash); - ValidAttachments.emplace_back(Hash); - Count.Valid++; + if (Attachment->IsCompressedBinary()) + { + WriteAttachmentBuffers.emplace_back(Attachment->AsCompressedBinary().GetCompressed().Flatten().AsIoBuffer()); + WriteRawHashes.push_back(Hash); + ValidAttachments.emplace_back(Hash); + Count.Valid++; + } + else + { + ZEN_WARN("PUTCACHERECORD - '{}/{}/{}' '{}' FAILED, attachment '{}' is not compressed", + Ref.Namespace, + Ref.BucketSegment, + Ref.HashKey, + ToString(HttpContentType::kCbPackage), + Hash); + Count.Invalid++; + } } - else + else if (m_CidStore.ContainsChunk(Hash)) { - ZEN_WARN("PUTCACHERECORD - '{}/{}/{}' '{}' FAILED, attachment '{}' is not compressed", - Ref.Namespace, - Ref.BucketSegment, - Ref.HashKey, - ToString(HttpContentType::kCbPackage), - Hash); - Count.Invalid++; + ValidAttachments.emplace_back(Hash); + Count.Valid++; } - } - else if (ChunkStore.ContainsChunk(Hash)) - { - ValidAttachments.emplace_back(Hash); - Count.Valid++; - } - Count.Total++; - }); + Count.Total++; + }); if (Count.Invalid > 0) { @@ -1451,7 +1421,7 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con if (!WriteAttachmentBuffers.empty()) { - std::vector InsertResults = ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes); + std::vector InsertResults = m_CidStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes); for (const CidStore::InsertResult& InsertResult : InsertResults) { if (InsertResult.New) @@ -1478,12 +1448,10 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con if (HasUpstream && EnumHasAllFlags(Policy, CachePolicy::StoreRemote) && !IsPartialRecord) { - m_UpstreamCache.EnqueueUpstream( - {.Type = ZenContentType::kCbPackage, - .Namespace = Ref.Namespace, - .Key = {Ref.BucketSegment, Ref.HashKey}, - .ValueContentIds = std::move(ValidAttachments)}, - [ChunkStore = &ChunkStore](const IoHash& ValueHash) { return ChunkStore->FindChunkByCid(ValueHash); }); + m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbPackage, + .Namespace = Ref.Namespace, + .Key = {Ref.BucketSegment, Ref.HashKey}, + .ValueContentIds = std::move(ValidAttachments)}); } Request.WriteResponse(HttpResponseCode::Created); @@ -1517,9 +1485,7 @@ HttpStructuredCacheService::HandleGetCacheChunk(HttpServerRequest& Request, cons { Stopwatch Timer; - CidStore& ChunkStore = m_RpcHandler.GetCidStore(Ref.Namespace); - - IoBuffer Value = ChunkStore.FindChunkByCid(Ref.ValueContentId); + IoBuffer Value = m_CidStore.FindChunkByCid(Ref.ValueContentId); const UpstreamEndpointInfo* Source = nullptr; CachePolicy Policy = PolicyFromUrl; @@ -1541,7 +1507,7 @@ HttpStructuredCacheService::HandleGetCacheChunk(HttpServerRequest& Request, cons { if (AreDiskWritesAllowed()) { - ChunkStore.AddChunk(UpstreamResult.Value, RawHash); + m_CidStore.AddChunk(UpstreamResult.Value, RawHash); } Source = UpstreamResult.Source; } @@ -1635,9 +1601,7 @@ HttpStructuredCacheService::HandlePutCacheChunk(HttpServerRequest& Request, cons "ValueContentId does not match attachment hash"sv); } - CidStore& ChunkStore = m_RpcHandler.GetCidStore(Ref.Namespace); - - CidStore::InsertResult Result = ChunkStore.AddChunk(Body, RawHash); + CidStore::InsertResult Result = m_CidStore.AddChunk(Body, RawHash); ZEN_DEBUG("PUTCACHECHUNK - '{}/{}/{}/{}' {} '{}' ({}) in {}", Ref.Namespace, @@ -1852,51 +1816,16 @@ HttpStructuredCacheService::HandleStatsRequest(HttpServerRequest& Request) EmitSnapshot("requests", m_HttpRequests, Cbo); - const uint64_t HitCount = m_CacheStats.HitCount; - const uint64_t UpstreamHitCount = m_CacheStats.UpstreamHitCount; - const uint64_t MissCount = m_CacheStats.MissCount; - const uint64_t WriteCount = m_CacheStats.WriteCount; - const uint64_t BadRequestCount = m_CacheStats.BadRequestCount; - - uint64_t TotalChunkHitCount = 0; - uint64_t TotalChunkMissCount = 0; - uint64_t TotalChunkWriteCount = 0; - CidStoreSize TotalCidSize; - - tsl::robin_map UniqueStores; - { - std::vector NamespaceNames = m_CacheStore.GetNamespaces(); - - for (const std::string& NamespaceName : NamespaceNames) - { - CidStore* Store = &m_RpcHandler.GetCidStore(NamespaceName); - if (auto It = UniqueStores.find(Store); It == UniqueStores.end()) - { - UniqueStores.insert_or_assign(Store, NamespaceName); - } - else - { - UniqueStores.insert_or_assign(Store, std::string{}); - } - } - - for (auto It : UniqueStores) - { - CidStore* ChunkStore = It.first; - - CidStoreStats StoreStats = ChunkStore->Stats(); - CidStoreSize StoreSize = ChunkStore->TotalSize(); - - TotalChunkHitCount += StoreStats.HitCount; - TotalChunkMissCount += StoreStats.MissCount; - TotalChunkWriteCount += StoreStats.WriteCount; - - TotalCidSize.TinySize += StoreSize.TinySize; - TotalCidSize.SmallSize += StoreSize.SmallSize; - TotalCidSize.LargeSize += StoreSize.LargeSize; - TotalCidSize.TotalSize += StoreSize.TotalSize; - } - } + const uint64_t HitCount = m_CacheStats.HitCount; + const uint64_t UpstreamHitCount = m_CacheStats.UpstreamHitCount; + const uint64_t MissCount = m_CacheStats.MissCount; + const uint64_t WriteCount = m_CacheStats.WriteCount; + const uint64_t BadRequestCount = m_CacheStats.BadRequestCount; + struct CidStoreStats StoreStats = m_CidStore.Stats(); + const uint64_t ChunkHitCount = StoreStats.HitCount; + const uint64_t ChunkMissCount = StoreStats.MissCount; + const uint64_t ChunkWriteCount = StoreStats.WriteCount; + const uint64_t TotalCount = HitCount + MissCount; const uint64_t RpcRequests = m_CacheStats.RpcRequests; const uint64_t RpcRecordRequests = m_CacheStats.RpcRecordRequests; @@ -1906,11 +1835,17 @@ HttpStructuredCacheService::HandleStatsRequest(HttpServerRequest& Request) const uint64_t RpcChunkRequests = m_CacheStats.RpcChunkRequests; const uint64_t RpcChunkBatchRequests = m_CacheStats.RpcChunkBatchRequests; + const CidStoreSize CidSize = m_CidStore.TotalSize(); const CacheStoreSize CacheSize = m_CacheStore.TotalSize(); bool ShowCidStoreStats = Request.GetQueryParams().GetValue("cidstorestats") == "true"; bool ShowCacheStoreStats = Request.GetQueryParams().GetValue("cachestorestats") == "true"; + CidStoreStats CidStoreStats = {}; + if (ShowCidStoreStats) + { + CidStoreStats = m_CidStore.Stats(); + } ZenCacheStore::CacheStoreStats CacheStoreStats = {}; if (ShowCacheStoreStats) { @@ -1945,7 +1880,6 @@ HttpStructuredCacheService::HandleStatsRequest(HttpServerRequest& Request) Cbo.EndObject(); Cbo << "hits" << HitCount << "misses" << MissCount << "writes" << WriteCount; - const uint64_t TotalCount = HitCount + MissCount; Cbo << "hit_ratio" << (TotalCount > 0 ? (double(HitCount) / double(TotalCount)) : 0.0); if (m_UpstreamCache.IsActive()) @@ -1956,9 +1890,7 @@ HttpStructuredCacheService::HandleStatsRequest(HttpServerRequest& Request) Cbo << "upstream_ratio" << (HitCount > 0 ? (double(UpstreamHitCount) / double(HitCount)) : 0.0); } - Cbo << "cidhits" << TotalChunkHitCount << "cidmisses" << TotalChunkMissCount << "cidwrites" << TotalChunkWriteCount; - const uint64_t TotalChunkCount = TotalChunkHitCount + TotalChunkMissCount; - Cbo << "cidhit_ratio" << (TotalChunkHitCount ? (double(TotalChunkCount) / double(TotalChunkHitCount)) : 0.0); + Cbo << "cidhits" << ChunkHitCount << "cidmisses" << ChunkMissCount << "cidwrites" << ChunkWriteCount; if (ShowCacheStoreStats) { @@ -2067,58 +1999,20 @@ HttpStructuredCacheService::HandleStatsRequest(HttpServerRequest& Request) { Cbo.BeginObject("size"); { - Cbo << "tiny" << TotalCidSize.TinySize; - Cbo << "small" << TotalCidSize.SmallSize; - Cbo << "large" << TotalCidSize.LargeSize; - Cbo << "total" << TotalCidSize.TotalSize; + Cbo << "tiny" << CidSize.TinySize; + Cbo << "small" << CidSize.SmallSize; + Cbo << "large" << CidSize.LargeSize; + Cbo << "total" << CidSize.TotalSize; } Cbo.EndObject(); if (ShowCidStoreStats) { Cbo.BeginObject("store"); - - auto OutputStats = [&](CidStore& ChunkStore) { - CidStoreStats StoreStats = ChunkStore.Stats(); - Cbo << "hits" << StoreStats.HitCount << "misses" << StoreStats.MissCount << "writes" << StoreStats.WriteCount; - const uint64_t Count = StoreStats.HitCount + StoreStats.MissCount; - Cbo << "hit_ratio" << (Count ? (double(StoreStats.HitCount) / double(Count)) : 0.0); - EmitSnapshot("read", StoreStats.FindChunkOps, Cbo); - EmitSnapshot("write", StoreStats.AddChunkOps, Cbo); - }; - - if (UniqueStores.size() > 1) - { - Cbo.BeginArray("namespaces"); - for (auto It : UniqueStores) - { - CidStore* ChunkStore = It.first; - const std::string& Namespace = It.second; - CidStoreSize ChunkStoreSize = ChunkStore->TotalSize(); - Cbo.BeginObject(); - { - Cbo << "namespace" << Namespace; - Cbo.BeginObject("stats"); - OutputStats(*ChunkStore); - Cbo.EndObject(); - - Cbo.BeginObject("size"); - { - Cbo << "tiny" << ChunkStoreSize.TinySize; - Cbo << "small" << ChunkStoreSize.SmallSize; - Cbo << "large" << ChunkStoreSize.LargeSize; - Cbo << "total" << ChunkStoreSize.TotalSize; - } - Cbo.EndObject(); - } - Cbo.EndObject(); - } - Cbo.EndArray(); // namespaces - } - else if (UniqueStores.size() != 0) - { - OutputStats(*UniqueStores.begin()->first); - } + Cbo << "hits" << CidStoreStats.HitCount << "misses" << CidStoreStats.MissCount << "writes" << CidStoreStats.WriteCount; + EmitSnapshot("read", CidStoreStats.FindChunkOps, Cbo); + EmitSnapshot("write", CidStoreStats.AddChunkOps, Cbo); + // EmitSnapshot("exists", CidStoreStats.ContainChunkOps, Cbo); Cbo.EndObject(); } } -- cgit v1.2.3 From 9f575bd416e1f7afbd11d4b221074f34bb89605c Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Thu, 4 Sep 2025 13:17:25 +0200 Subject: add validation of compact binary payloads before reading them (#483) * add validation of compact binary payloads before reading them --- src/zenserver/cache/httpstructuredcache.cpp | 67 ++++++++++++++++++----------- 1 file changed, 42 insertions(+), 25 deletions(-) (limited to 'src/zenserver/cache/httpstructuredcache.cpp') diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp index 68f1c602e..08d0b12a7 100644 --- a/src/zenserver/cache/httpstructuredcache.cpp +++ b/src/zenserver/cache/httpstructuredcache.cpp @@ -5,6 +5,7 @@ #include #include #include +#include #include #include #include @@ -842,45 +843,61 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con { if (ContentType == ZenContentType::kCbObject) { - CbPackage Package; - uint32_t MissingCount = 0; - - CbObjectView CacheRecord(ClientResultValue.Value.Data()); - CacheRecord.IterateAttachments([this, &MissingCount, &Package, SkipData](CbFieldView AttachmentHash) { - if (SkipData) - { - if (!m_CidStore.ContainsChunk(AttachmentHash.AsHash())) + CbPackage Package; + uint32_t MissingCount = 0; + CbValidateError ValidateError = CbValidateError::None; + if (CbObject PackageObject = ValidateAndReadCompactBinaryObject(std::move(ClientResultValue.Value), ValidateError); + ValidateError == CbValidateError::None) + { + CbObjectView CacheRecord(ClientResultValue.Value.Data()); + CacheRecord.IterateAttachments([this, &MissingCount, &Package, SkipData](CbFieldView AttachmentHash) { + if (SkipData) { - MissingCount++; + if (!m_CidStore.ContainsChunk(AttachmentHash.AsHash())) + { + MissingCount++; + } } - } - else - { - if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash())) + else { - CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(Chunk)); - if (Compressed) + if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash())) { - Package.AddAttachment(CbAttachment(Compressed, AttachmentHash.AsHash())); + CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(Chunk)); + if (Compressed) + { + Package.AddAttachment(CbAttachment(Compressed, AttachmentHash.AsHash())); + } + else + { + ZEN_WARN("invalid compressed binary returned for {}", AttachmentHash.AsHash()); + MissingCount++; + } } else { - ZEN_WARN("invalid compressed binary returned for {}", AttachmentHash.AsHash()); MissingCount++; } } - else - { - MissingCount++; - } - } - }); + }); - Success = MissingCount == 0 || PartialRecord; + Success = MissingCount == 0 || PartialRecord; + } + else + { + ZEN_WARN("Invalid compact binary payload returned for {}/{}/{} ({}). Reason: '{}'", + Ref.Namespace, + Ref.BucketSegment, + Ref.HashKey, + Ref.ValueContentId, + ToString(ValidateError)); + Success = false; + } if (Success) { - Package.SetObject(LoadCompactBinaryObject(ClientResultValue.Value)); + CbObject PackageObject = LoadCompactBinaryObject(std::move(ClientResultValue.Value)); + + Package.SetObject(std::move(PackageObject)); BinaryWriter MemStream; Package.Save(MemStream); -- cgit v1.2.3 From 339668ac935f781c06225d2d685642e27348772b Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Wed, 10 Sep 2025 16:38:33 +0200 Subject: add EMode to WorkerTheadPool to avoid thread starvation (#492) - Improvement: Add a new mode to worker thread pools to avoid starvation of workers which could cause long stalls due to other work begin queued up. UE-305498 --- src/zenserver/cache/httpstructuredcache.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/zenserver/cache/httpstructuredcache.cpp') diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp index 08d0b12a7..c83065506 100644 --- a/src/zenserver/cache/httpstructuredcache.cpp +++ b/src/zenserver/cache/httpstructuredcache.cpp @@ -1646,7 +1646,7 @@ HttpStructuredCacheService::ReplayRequestRecorder(const CacheRequestContext& Co auto _ = MakeGuard([&]() { ZEN_INFO("Replayed {} requests in {}", RequestCount, NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); }); std::atomic AbortFlag; std::atomic PauseFlag; - ParallelWork Work(AbortFlag, PauseFlag); + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); ZEN_INFO("Replaying {} requests", RequestCount); for (uint64_t RequestIndex = 0; RequestIndex < RequestCount; ++RequestIndex) { -- cgit v1.2.3 From 7a94b22eafdbd3f394fb9200e713cbb3b2b0cd56 Mon Sep 17 00:00:00 2001 From: zousar Date: Fri, 19 Sep 2025 23:46:52 -0600 Subject: Change batch put responses for client reporting Conflicts are now treated as successes, and we optionally return a Details array instead of an ErrorMessages array. Details are returned for all requests in a batch, or no requests in a batch depending on whether there are any details to be shared about any of the put requests. The details for a conflict include the raw hash and raw size of the item. If the item is a record, we also include the record as an object. --- src/zenserver/cache/httpstructuredcache.cpp | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) (limited to 'src/zenserver/cache/httpstructuredcache.cpp') diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp index c83065506..dc14465de 100644 --- a/src/zenserver/cache/httpstructuredcache.cpp +++ b/src/zenserver/cache/httpstructuredcache.cpp @@ -1221,8 +1221,11 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con break; } - return PutResult.Message.empty() ? Request.WriteResponse(ResponseCode) - : Request.WriteResponse(ResponseCode, zen::HttpContentType::kText, PutResult.Message); + if (PutResult.Details) + { + Request.WriteResponse(ResponseCode, PutResult.Details); + } + return Request.WriteResponse(ResponseCode); }; const HttpContentType ContentType = Request.RequestContentType(); -- cgit v1.2.3 From ebfade799e7199f7c6b981f17a55ed67d4323c41 Mon Sep 17 00:00:00 2001 From: zousar Date: Wed, 24 Sep 2025 22:12:11 -0600 Subject: Report Incomplete Records To Client When requesting partial records, report back when a record is incomplete via an "Incomplete" array of bools that is a sibling to the "Result" array for batch/rpc operations, or via the HttpResponseCode::PartialContent status code for individual record requests. --- src/zenserver/cache/httpstructuredcache.cpp | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) (limited to 'src/zenserver/cache/httpstructuredcache.cpp') diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp index dc14465de..bc3f4ee20 100644 --- a/src/zenserver/cache/httpstructuredcache.cpp +++ b/src/zenserver/cache/httpstructuredcache.cpp @@ -821,7 +821,8 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con const bool SkipData = EnumHasAllFlags(PolicyFromUrl, CachePolicy::SkipData); const bool PartialRecord = EnumHasAllFlags(PolicyFromUrl, CachePolicy::PartialRecord); - bool Success = false; + bool Success = false; + uint32_t MissingCount = 0; ZenCacheValue ClientResultValue; if (!EnumHasAnyFlags(PolicyFromUrl, CachePolicy::Query)) { @@ -844,7 +845,6 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con if (ContentType == ZenContentType::kCbObject) { CbPackage Package; - uint32_t MissingCount = 0; CbValidateError ValidateError = CbValidateError::None; if (CbObject PackageObject = ValidateAndReadCompactBinaryObject(std::move(ClientResultValue.Value), ValidateError); ValidateError == CbValidateError::None) @@ -936,7 +936,9 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con else { // kCbPackage handled SkipData when constructing the ClientResultValue, kcbObject ignores SkipData - return Request.WriteResponse(HttpResponseCode::OK, ClientResultValue.Value.GetContentType(), ClientResultValue.Value); + return Request.WriteResponse((MissingCount == 0) ? HttpResponseCode::OK : HttpResponseCode::PartialContent, + ClientResultValue.Value.GetContentType(), + ClientResultValue.Value); } } else if (!HasUpstream || !EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryRemote)) -- cgit v1.2.3 From 2f0efec7ab0430f4f4858db87b7eecfbccc0f47c Mon Sep 17 00:00:00 2001 From: Stefan Boberg Date: Mon, 29 Sep 2025 10:36:32 +0200 Subject: make cpr a HttpClient implementation detail (#517) these changes remove cpr from anything which is not `HttpClient` internals. The goal is to eventually replace cpr with a more direct curl interface to eliminate cpr since it's proven problematic due to their development practices which frequently breaks APIs and prevents us from updating vcpkg. But this PR is limited to refactoring existing cpr code to use `HttpClient` instead. --- src/zenserver/cache/httpstructuredcache.cpp | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) (limited to 'src/zenserver/cache/httpstructuredcache.cpp') diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp index bc3f4ee20..dad4ed803 100644 --- a/src/zenserver/cache/httpstructuredcache.cpp +++ b/src/zenserver/cache/httpstructuredcache.cpp @@ -40,7 +40,6 @@ #include #include -#include #include namespace zen { @@ -391,8 +390,9 @@ HttpStructuredCacheService::HandleRequest(HttpServerRequest& Request) if (Key == HttpZCacheUtilStartRecording) { - HttpServerRequest::QueryParams Params = Request.GetQueryParams(); - std::string RecordPath = cpr::util::urlDecode(std::string(Params.GetValue("path"))); + HttpServerRequest::QueryParams Params = Request.GetQueryParams(); + + std::string RecordPath = UrlDecode(Params.GetValue("path")); { RwLock::ExclusiveLockScope _(m_RequestRecordingLock); @@ -429,9 +429,11 @@ HttpStructuredCacheService::HandleRequest(HttpServerRequest& Request) m_RequestRecorder.reset(); } - HttpServerRequest::QueryParams Params = Request.GetQueryParams(); - std::string RecordPath = cpr::util::urlDecode(std::string(Params.GetValue("path"))); - uint32_t ThreadCount = std::thread::hardware_concurrency(); + HttpServerRequest::QueryParams Params = Request.GetQueryParams(); + + std::string RecordPath = UrlDecode(Params.GetValue("path")); + + uint32_t ThreadCount = std::thread::hardware_concurrency(); if (auto Param = Params.GetValue("thread_count"); Param.empty() == false) { if (auto Value = ParseInt(Param)) -- cgit v1.2.3 From f7664974cd32286b0623082a395c91ba1ceb9ce0 Mon Sep 17 00:00:00 2001 From: Stefan Boberg Date: Fri, 3 Oct 2025 10:10:05 +0200 Subject: cache RPC replay fixes (minor) (#544) * fixes misleading server side log message when using `--onhost` * fixed `--onhost` behaviour in zen (no longer replays twice) --- src/zenserver/cache/httpstructuredcache.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/zenserver/cache/httpstructuredcache.cpp') diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp index dad4ed803..a9d6be8f2 100644 --- a/src/zenserver/cache/httpstructuredcache.cpp +++ b/src/zenserver/cache/httpstructuredcache.cpp @@ -447,7 +447,7 @@ HttpStructuredCacheService::HandleRequest(HttpServerRequest& Request) std::unique_ptr Replayer(cache::MakeDiskRequestReplayer(RecordPath, false)); ReplayRequestRecorder(RequestContext, *Replayer, ThreadCount < 1 ? 1 : ThreadCount); - ZEN_INFO("cache RPC replay STARTED"); + ZEN_INFO("cache RPC replay COMPLETED"); Request.WriteResponse(HttpResponseCode::OK); return; -- cgit v1.2.3 From 5361ee1c77b68bb14237169660840d6d63a74892 Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Fri, 3 Oct 2025 12:38:35 +0200 Subject: remove zenutil dependency in zenremotestore (#547) * remove dependency to zenutil/workerpools.h from remoteprojectstore.cpp * remove dependency to zenutil/workerpools.h from buildstoragecache.cpp * remove unneded include * move jupiter helpers to zenremotestore * move parallelwork to zencore * remove zenutil dependency from zenremotestore * clean up test project dependencies - use indirect dependencies --- src/zenserver/cache/httpstructuredcache.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'src/zenserver/cache/httpstructuredcache.cpp') diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp index a9d6be8f2..884684eed 100644 --- a/src/zenserver/cache/httpstructuredcache.cpp +++ b/src/zenserver/cache/httpstructuredcache.cpp @@ -12,6 +12,7 @@ #include #include #include +#include #include #include #include @@ -20,13 +21,12 @@ #include #include #include +#include #include #include #include #include #include -#include -#include #include #include "upstream/upstreamcache.h" -- cgit v1.2.3 From 42a2c2582b10a598ce5ef50f7feb4bab394b8fc1 Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Fri, 3 Oct 2025 15:57:42 +0200 Subject: cacherequests helpers test only (#551) * don't use cacherequests utils in cache_cmd.cpp * make zenutil/cacherequests code into test code helpers only --- src/zenserver/cache/httpstructuredcache.cpp | 16 +++------------- 1 file changed, 3 insertions(+), 13 deletions(-) (limited to 'src/zenserver/cache/httpstructuredcache.cpp') diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp index 884684eed..ab3f1b56c 100644 --- a/src/zenserver/cache/httpstructuredcache.cpp +++ b/src/zenserver/cache/httpstructuredcache.cpp @@ -22,10 +22,9 @@ #include #include #include +#include #include #include -#include -#include #include #include @@ -71,15 +70,6 @@ namespace { static constinit std::string_view HttpZCacheUtilStopRecording = "exec$/stop-recording"sv; static constinit std::string_view HttpZCacheUtilReplayRecording = "exec$/replay-recording"sv; static constinit std::string_view HttpZCacheDetailsPrefix = "details$"sv; - - struct HttpRequestData - { - std::optional Namespace; - std::optional Bucket; - std::optional HashKey; - std::optional ValueContentId; - }; - } // namespace ////////////////////////////////////////////////////////////////////////// @@ -459,8 +449,8 @@ HttpStructuredCacheService::HandleRequest(HttpServerRequest& Request) return; } - cacherequests::HttpRequestData RequestData; - if (!cacherequests::HttpRequestParseRelativeUri(Key, ZenCacheStore::DefaultNamespace, RequestData)) + HttpCacheRequestData RequestData; + if (!HttpCacheRequestParseRelativeUri(Key, ZenCacheStore::DefaultNamespace, RequestData)) { m_CacheStats.BadRequestCount++; return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL -- cgit v1.2.3 From f8dae0f66d17a904dfa4a54771df031b62bce10e Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Fri, 3 Oct 2025 17:17:44 +0200 Subject: move rpcrecorder out from cache subfolder (#552) --- src/zenserver/cache/httpstructuredcache.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/zenserver/cache/httpstructuredcache.cpp') diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp index ab3f1b56c..dd5bf05cb 100644 --- a/src/zenserver/cache/httpstructuredcache.cpp +++ b/src/zenserver/cache/httpstructuredcache.cpp @@ -25,7 +25,7 @@ #include #include #include -#include +#include #include #include "upstream/upstreamcache.h" -- cgit v1.2.3