diff options
| author | zousar <[email protected]> | 2025-12-19 00:49:38 -0700 |
|---|---|---|
| committer | zousar <[email protected]> | 2025-12-19 00:49:38 -0700 |
| commit | 774d39bf193dd84b76437abcd41df2478e3431c0 (patch) | |
| tree | af6550d5f2f0e292775843473260df1e2872e63a /src | |
| parent | Change default limit-overwrite behavior to true (diff) | |
| download | zen-774d39bf193dd84b76437abcd41df2478e3431c0.tar.xz zen-774d39bf193dd84b76437abcd41df2478e3431c0.zip | |
Ensure upstream put propagation includes overwrite
When changing the default limit-overwrite behavior, a unit test surfaced a bug where an put of data with overwrite cache policy would not get propagated via zen's built-in upstream mechanism with a matching overwrite cache policy to the upstream. This change ensures that it does and leaves the unit test configured to exercise this scenario.
Diffstat (limited to 'src')
| -rw-r--r-- | src/zenremotestore/include/zenremotestore/jupiter/jupitersession.h | 7 | ||||
| -rw-r--r-- | src/zenremotestore/jupiter/jupitersession.cpp | 17 | ||||
| -rw-r--r-- | src/zenremotestore/projectstore/jupiterremoteprojectstore.cpp | 2 | ||||
| -rw-r--r-- | src/zenserver-test/cache-tests.cpp | 2 | ||||
| -rw-r--r-- | src/zenserver/storage/cache/httpstructuredcache.cpp | 9 | ||||
| -rw-r--r-- | src/zenserver/storage/upstream/upstreamcache.cpp | 18 | ||||
| -rw-r--r-- | src/zenserver/storage/upstream/zen.cpp | 14 | ||||
| -rw-r--r-- | src/zenserver/storage/upstream/zen.h | 3 | ||||
| -rw-r--r-- | src/zenstore/cache/cacherpc.cpp | 17 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/cache/upstreamcacheclient.h | 1 |
10 files changed, 73 insertions, 17 deletions
diff --git a/src/zenremotestore/include/zenremotestore/jupiter/jupitersession.h b/src/zenremotestore/include/zenremotestore/jupiter/jupitersession.h index 1945f6cff..15077376c 100644 --- a/src/zenremotestore/include/zenremotestore/jupiter/jupitersession.h +++ b/src/zenremotestore/include/zenremotestore/jupiter/jupitersession.h @@ -81,7 +81,12 @@ public: IoHash& OutPayloadHash, std::filesystem::path TempFolderPath = {}); - PutRefResult PutRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, IoBuffer Ref, ZenContentType RefType); + PutRefResult PutRef(std::string_view Namespace, + std::string_view BucketId, + const IoHash& Key, + bool Overwrite, + IoBuffer Ref, + ZenContentType RefType); JupiterResult PutBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob); JupiterResult PutCompressedBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob); JupiterResult PutCompressedBlob(std::string_view Namespace, const IoHash& Key, const CompositeBuffer& Blob); diff --git a/src/zenremotestore/jupiter/jupitersession.cpp b/src/zenremotestore/jupiter/jupitersession.cpp index 3dac87d96..dd0e5ad1f 100644 --- a/src/zenremotestore/jupiter/jupitersession.cpp +++ b/src/zenremotestore/jupiter/jupitersession.cpp @@ -184,7 +184,12 @@ JupiterSession::GetObject(std::string_view Namespace, const IoHash& Key) } PutRefResult -JupiterSession::PutRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, IoBuffer Ref, ZenContentType RefType) +JupiterSession::PutRef(std::string_view Namespace, + std::string_view BucketId, + const IoHash& Key, + bool Overwrite, + IoBuffer Ref, + ZenContentType RefType) { ZEN_TRACE_CPU("JupiterClient::PutRef"); @@ -192,9 +197,13 @@ JupiterSession::PutRef(std::string_view Namespace, std::string_view BucketId, co IoHash Hash = IoHash::HashBuffer(Ref.Data(), Ref.Size()); - HttpClient::Response Response = m_HttpClient.Put(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString()), - Ref, - {{"X-Jupiter-IoHash", Hash.ToHexString()}}); + HttpClient::KeyValueMap AdditionalHeaders({{"X-Jupiter-IoHash", Hash.ToHexString()}}); + if (Overwrite) + { + AdditionalHeaders.Entries.emplace("X-Jupiter-Allow-Overwrite", "True"); + } + HttpClient::Response Response = + m_HttpClient.Put(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString()), Ref, AdditionalHeaders); PutRefResult Result = {detail::ConvertResponse(Response)}; if (Result.Success) diff --git a/src/zenremotestore/projectstore/jupiterremoteprojectstore.cpp b/src/zenremotestore/projectstore/jupiterremoteprojectstore.cpp index 6d888ea01..462de2988 100644 --- a/src/zenremotestore/projectstore/jupiterremoteprojectstore.cpp +++ b/src/zenremotestore/projectstore/jupiterremoteprojectstore.cpp @@ -80,7 +80,7 @@ public: virtual SaveResult SaveContainer(const IoBuffer& Payload) override { JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect); - PutRefResult PutResult = Session.PutRef(m_Namespace, m_Bucket, m_Key, Payload, ZenContentType::kCbObject); + PutRefResult PutResult = Session.PutRef(m_Namespace, m_Bucket, m_Key, /*Overwrite*/ false, Payload, ZenContentType::kCbObject); AddStats(PutResult); SaveResult Result{ConvertResult(PutResult), {PutResult.Needs.begin(), PutResult.Needs.end()}, PutResult.RawHash}; diff --git a/src/zenserver-test/cache-tests.cpp b/src/zenserver-test/cache-tests.cpp index 8d50df0e6..854590987 100644 --- a/src/zenserver-test/cache-tests.cpp +++ b/src/zenserver-test/cache-tests.cpp @@ -1031,7 +1031,7 @@ TEST_CASE("zcache.rpc") { using namespace utils; - ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber(), "--cache-bucket-limit-overwrites=false"); + ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber(), "--cache-bucket-limit-overwrites"); ZenServerInstance UpstreamServer(TestEnv); SpawnServer(UpstreamServer, UpstreamCfg); diff --git a/src/zenserver/storage/cache/httpstructuredcache.cpp b/src/zenserver/storage/cache/httpstructuredcache.cpp index 03d2140a1..72f29d14e 100644 --- a/src/zenserver/storage/cache/httpstructuredcache.cpp +++ b/src/zenserver/storage/cache/httpstructuredcache.cpp @@ -1268,7 +1268,8 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con if (HasUpstream && EnumHasAllFlags(PolicyFromUrl, CachePolicy::StoreRemote)) { - m_UpstreamCache.EnqueueUpstream({.Type = ContentType, .Namespace = Ref.Namespace, .Key = {Ref.BucketSegment, Ref.HashKey}}); + m_UpstreamCache.EnqueueUpstream( + {.Type = ContentType, .Namespace = Ref.Namespace, .Key = {Ref.BucketSegment, Ref.HashKey}, .Overwrite = Overwrite}); } ZEN_DEBUG("PUTCACHERECORD - '{}/{}/{}' {} '{}' in {}", @@ -1347,7 +1348,8 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbObject, .Namespace = Ref.Namespace, .Key = {Ref.BucketSegment, Ref.HashKey}, - .ValueContentIds = std::move(ValidAttachments)}); + .ValueContentIds = std::move(ValidAttachments), + .Overwrite = Overwrite}); } Request.WriteResponse(HttpResponseCode::Created); @@ -1465,7 +1467,8 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbPackage, .Namespace = Ref.Namespace, .Key = {Ref.BucketSegment, Ref.HashKey}, - .ValueContentIds = std::move(ValidAttachments)}); + .ValueContentIds = std::move(ValidAttachments), + .Overwrite = Overwrite}); } Request.WriteResponse(HttpResponseCode::Created); diff --git a/src/zenserver/storage/upstream/upstreamcache.cpp b/src/zenserver/storage/upstream/upstreamcache.cpp index 938a1a011..b26c57414 100644 --- a/src/zenserver/storage/upstream/upstreamcache.cpp +++ b/src/zenserver/storage/upstream/upstreamcache.cpp @@ -554,6 +554,7 @@ namespace detail { Result = Session.PutRef(BlobStoreNamespace, CacheRecord.Key.Bucket, CacheRecord.Key.Hash, + CacheRecord.Overwrite, RecordValue, ZenContentType::kBinary); } @@ -585,6 +586,7 @@ namespace detail { Session, CacheRecord.Namespace, CacheRecord.Key, + CacheRecord.Overwrite, ReferencingObject.Save().GetBuffer().AsIoBuffer(), MaxAttempts, [&](const IoHash& ValueContentId, IoBuffer& OutBuffer, std::string& OutReason) { @@ -605,6 +607,7 @@ namespace detail { Session, CacheRecord.Namespace, CacheRecord.Key, + CacheRecord.Overwrite, RecordValue, MaxAttempts, [&](const IoHash& ValueContentId, IoBuffer& OutBuffer, std::string& OutReason) { @@ -651,6 +654,7 @@ namespace detail { JupiterSession& Session, std::string_view Namespace, const CacheKey& Key, + bool Overwrite, IoBuffer ObjectBuffer, const int32_t MaxAttempts, std::function<bool(const IoHash& ValueContentId, IoBuffer& OutBuffer, std::string& OutReason)>&& BlobFetchFn) @@ -692,7 +696,7 @@ namespace detail { PutRefResult RefResult; for (int32_t Attempt = 0; Attempt < MaxAttempts && !RefResult.Success; Attempt++) { - RefResult = Session.PutRef(BlobStoreNamespace, Key.Bucket, Key.Hash, ObjectBuffer, ZenContentType::kCbObject); + RefResult = Session.PutRef(BlobStoreNamespace, Key.Bucket, Key.Hash, Overwrite, ObjectBuffer, ZenContentType::kCbObject); } m_Status.SetFromErrorCode(RefResult.ErrorCode, RefResult.Reason); @@ -1302,6 +1306,7 @@ namespace detail { Result = Session.PutCacheRecord(CacheRecord.Namespace, CacheRecord.Key.Bucket, CacheRecord.Key.Hash, + CacheRecord.Overwrite ? CachePolicy::Store : CachePolicy::Default, PackagePayload, CacheRecord.Type); } @@ -1329,7 +1334,14 @@ namespace detail { BatchWriter.BeginObject("Params"sv); { - // DefaultPolicy unspecified and expected to be Default + if (CacheRecord.Overwrite) + { + BatchWriter << "DefaultPolicy"sv << WriteToString<128>(CachePolicy::Store).ToView(); + } + else + { + // DefaultPolicy unspecified and expected to be Default + } BatchWriter << "Namespace"sv << CacheRecord.Namespace; @@ -1376,6 +1388,7 @@ namespace detail { Result = Session.PutCacheValue(CacheRecord.Namespace, CacheRecord.Key.Bucket, CacheRecord.Key.Hash, + CacheRecord.Overwrite ? CachePolicy::Store : CachePolicy::Default, CacheRecord.ValueContentIds[Idx], Values[Idx]); } @@ -1400,6 +1413,7 @@ namespace detail { Result = Session.PutCacheRecord(CacheRecord.Namespace, CacheRecord.Key.Bucket, CacheRecord.Key.Hash, + CacheRecord.Overwrite ? CachePolicy::Store : CachePolicy::Default, RecordValue, CacheRecord.Type); } diff --git a/src/zenserver/storage/upstream/zen.cpp b/src/zenserver/storage/upstream/zen.cpp index 25fd3a3bb..423c9039c 100644 --- a/src/zenserver/storage/upstream/zen.cpp +++ b/src/zenserver/storage/upstream/zen.cpp @@ -127,6 +127,7 @@ ZenCacheResult ZenStructuredCacheSession::PutCacheRecord(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, + CachePolicy Policy, IoBuffer Value, ZenContentType Type) { @@ -140,6 +141,12 @@ ZenStructuredCacheSession::PutCacheRecord(std::string_view Namespace, } Uri << BucketId << "/" << Key.ToHexString(); + if (Policy != CachePolicy::Default) + { + Uri << "?Policy="; + Uri << Policy; + } + Value.SetContentType(Type); HttpClient::Response Response = Http.Put(Uri, Value); @@ -159,6 +166,7 @@ ZenCacheResult ZenStructuredCacheSession::PutCacheValue(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, + CachePolicy Policy, const IoHash& ValueContentId, IoBuffer Payload) { @@ -172,6 +180,12 @@ ZenStructuredCacheSession::PutCacheValue(std::string_view Namespace, } Uri << BucketId << "/" << Key.ToHexString() << "/" << ValueContentId.ToHexString(); + if (Policy != CachePolicy::Default) + { + Uri << "?Policy="; + Uri << Policy; + } + Payload.SetContentType(HttpContentType::kCompressedBinary); HttpClient::Response Response = Http.Put(Uri, Payload); diff --git a/src/zenserver/storage/upstream/zen.h b/src/zenserver/storage/upstream/zen.h index 6321b46b1..54fb063c2 100644 --- a/src/zenserver/storage/upstream/zen.h +++ b/src/zenserver/storage/upstream/zen.h @@ -8,6 +8,7 @@ #include <zencore/memoryview.h> #include <zencore/uid.h> #include <zencore/zencore.h> +#include <zenstore/cache/cachepolicy.h> #include <chrono> @@ -59,11 +60,13 @@ public: ZenCacheResult PutCacheRecord(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, + CachePolicy Policy, IoBuffer Value, ZenContentType Type); ZenCacheResult PutCacheValue(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, + CachePolicy Policy, const IoHash& ValueContentId, IoBuffer Payload); ZenCacheResult InvokeRpc(const CbObjectView& Request); diff --git a/src/zenstore/cache/cacherpc.cpp b/src/zenstore/cache/cacherpc.cpp index 6469d6c31..660c66b9a 100644 --- a/src/zenstore/cache/cacherpc.cpp +++ b/src/zenstore/cache/cacherpc.cpp @@ -494,7 +494,8 @@ CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Packag m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbPackage, .Namespace = Request.Namespace, .Key = Request.Key, - .ValueContentIds = std::move(ValidAttachments)}); + .ValueContentIds = std::move(ValidAttachments), + .Overwrite = Overwrite}); } return PutResult; } @@ -998,6 +999,7 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con eastl::fixed_vector<size_t, 32> BatchResultIndexes; eastl::fixed_vector<ZenCacheStore::PutResult, 32> Results; eastl::fixed_vector<CacheKey, 32> UpstreamCacheKeys; + eastl::fixed_vector<bool, 32> Overwrites; uint64_t RequestCount = RequestsArray.Num(); { @@ -1008,6 +1010,7 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con Batch = std::make_unique<ZenCacheStore::PutBatch>(m_CacheStore, *Namespace, BatchResults); BatchResults.reserve(RequestCount); BatchResultIndexes.reserve(RequestCount); + Overwrites.reserve(RequestCount); } for (CbFieldView RequestField : RequestsArray) { @@ -1032,6 +1035,7 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con uint64_t RawSize = RequestObject["RawSize"sv].AsUInt64(); bool Valid = false; uint64_t TransferredSize = 0; + bool Overwrite = false; if (const CbAttachment* Attachment = BatchRequest.FindAttachment(RawHash)) { @@ -1047,8 +1051,8 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con if (EnumHasAllFlags(Policy, CachePolicy::StoreLocal)) { - bool Overwrite = !EnumHasAllFlags(Policy, CachePolicy::QueryLocal); - IoBuffer Value = Chunk.GetCompressed().Flatten().AsIoBuffer(); + Overwrite = !EnumHasAllFlags(Policy, CachePolicy::QueryLocal); + IoBuffer Value = Chunk.GetCompressed().Flatten().AsIoBuffer(); Value.SetContentType(ZenContentType::kCompressedBinary); if (RawSize == 0) { @@ -1116,6 +1120,7 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con { UpstreamCacheKeys.push_back(CacheKey::Empty); } + Overwrites.push_back(Overwrite); ZEN_DEBUG("PUTCACHEVALUES - '{}/{}/{}' {}, '{}' in {}", *Namespace, @@ -1150,8 +1155,10 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con { if ((Results[Index].Status == zen::PutStatus::Success) && UpstreamCacheKeys[Index] != CacheKey::Empty) { - m_UpstreamCache.EnqueueUpstream( - {.Type = ZenContentType::kCompressedBinary, .Namespace = *Namespace, .Key = UpstreamCacheKeys[Index]}); + m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCompressedBinary, + .Namespace = *Namespace, + .Key = UpstreamCacheKeys[Index], + .Overwrite = Overwrites[Index]}); } } { diff --git a/src/zenstore/include/zenstore/cache/upstreamcacheclient.h b/src/zenstore/include/zenstore/cache/upstreamcacheclient.h index ff4a8c3f7..f5a861e2b 100644 --- a/src/zenstore/include/zenstore/cache/upstreamcacheclient.h +++ b/src/zenstore/include/zenstore/cache/upstreamcacheclient.h @@ -27,6 +27,7 @@ struct UpstreamCacheRecord CacheKey Key; std::vector<IoHash> ValueContentIds; CacheRequestContext Context; + bool Overwrite = false; }; struct UpstreamError |