aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorzousar <[email protected]>2025-12-19 00:49:38 -0700
committerzousar <[email protected]>2025-12-19 00:49:38 -0700
commit774d39bf193dd84b76437abcd41df2478e3431c0 (patch)
treeaf6550d5f2f0e292775843473260df1e2872e63a /src
parentChange default limit-overwrite behavior to true (diff)
downloadzen-774d39bf193dd84b76437abcd41df2478e3431c0.tar.xz
zen-774d39bf193dd84b76437abcd41df2478e3431c0.zip
Ensure upstream put propagation includes overwrite
When changing the default limit-overwrite behavior, a unit test surfaced a bug where an put of data with overwrite cache policy would not get propagated via zen's built-in upstream mechanism with a matching overwrite cache policy to the upstream. This change ensures that it does and leaves the unit test configured to exercise this scenario.
Diffstat (limited to 'src')
-rw-r--r--src/zenremotestore/include/zenremotestore/jupiter/jupitersession.h7
-rw-r--r--src/zenremotestore/jupiter/jupitersession.cpp17
-rw-r--r--src/zenremotestore/projectstore/jupiterremoteprojectstore.cpp2
-rw-r--r--src/zenserver-test/cache-tests.cpp2
-rw-r--r--src/zenserver/storage/cache/httpstructuredcache.cpp9
-rw-r--r--src/zenserver/storage/upstream/upstreamcache.cpp18
-rw-r--r--src/zenserver/storage/upstream/zen.cpp14
-rw-r--r--src/zenserver/storage/upstream/zen.h3
-rw-r--r--src/zenstore/cache/cacherpc.cpp17
-rw-r--r--src/zenstore/include/zenstore/cache/upstreamcacheclient.h1
10 files changed, 73 insertions, 17 deletions
diff --git a/src/zenremotestore/include/zenremotestore/jupiter/jupitersession.h b/src/zenremotestore/include/zenremotestore/jupiter/jupitersession.h
index 1945f6cff..15077376c 100644
--- a/src/zenremotestore/include/zenremotestore/jupiter/jupitersession.h
+++ b/src/zenremotestore/include/zenremotestore/jupiter/jupitersession.h
@@ -81,7 +81,12 @@ public:
IoHash& OutPayloadHash,
std::filesystem::path TempFolderPath = {});
- PutRefResult PutRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, IoBuffer Ref, ZenContentType RefType);
+ PutRefResult PutRef(std::string_view Namespace,
+ std::string_view BucketId,
+ const IoHash& Key,
+ bool Overwrite,
+ IoBuffer Ref,
+ ZenContentType RefType);
JupiterResult PutBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob);
JupiterResult PutCompressedBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob);
JupiterResult PutCompressedBlob(std::string_view Namespace, const IoHash& Key, const CompositeBuffer& Blob);
diff --git a/src/zenremotestore/jupiter/jupitersession.cpp b/src/zenremotestore/jupiter/jupitersession.cpp
index 3dac87d96..dd0e5ad1f 100644
--- a/src/zenremotestore/jupiter/jupitersession.cpp
+++ b/src/zenremotestore/jupiter/jupitersession.cpp
@@ -184,7 +184,12 @@ JupiterSession::GetObject(std::string_view Namespace, const IoHash& Key)
}
PutRefResult
-JupiterSession::PutRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, IoBuffer Ref, ZenContentType RefType)
+JupiterSession::PutRef(std::string_view Namespace,
+ std::string_view BucketId,
+ const IoHash& Key,
+ bool Overwrite,
+ IoBuffer Ref,
+ ZenContentType RefType)
{
ZEN_TRACE_CPU("JupiterClient::PutRef");
@@ -192,9 +197,13 @@ JupiterSession::PutRef(std::string_view Namespace, std::string_view BucketId, co
IoHash Hash = IoHash::HashBuffer(Ref.Data(), Ref.Size());
- HttpClient::Response Response = m_HttpClient.Put(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString()),
- Ref,
- {{"X-Jupiter-IoHash", Hash.ToHexString()}});
+ HttpClient::KeyValueMap AdditionalHeaders({{"X-Jupiter-IoHash", Hash.ToHexString()}});
+ if (Overwrite)
+ {
+ AdditionalHeaders.Entries.emplace("X-Jupiter-Allow-Overwrite", "True");
+ }
+ HttpClient::Response Response =
+ m_HttpClient.Put(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString()), Ref, AdditionalHeaders);
PutRefResult Result = {detail::ConvertResponse(Response)};
if (Result.Success)
diff --git a/src/zenremotestore/projectstore/jupiterremoteprojectstore.cpp b/src/zenremotestore/projectstore/jupiterremoteprojectstore.cpp
index 6d888ea01..462de2988 100644
--- a/src/zenremotestore/projectstore/jupiterremoteprojectstore.cpp
+++ b/src/zenremotestore/projectstore/jupiterremoteprojectstore.cpp
@@ -80,7 +80,7 @@ public:
virtual SaveResult SaveContainer(const IoBuffer& Payload) override
{
JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect);
- PutRefResult PutResult = Session.PutRef(m_Namespace, m_Bucket, m_Key, Payload, ZenContentType::kCbObject);
+ PutRefResult PutResult = Session.PutRef(m_Namespace, m_Bucket, m_Key, /*Overwrite*/ false, Payload, ZenContentType::kCbObject);
AddStats(PutResult);
SaveResult Result{ConvertResult(PutResult), {PutResult.Needs.begin(), PutResult.Needs.end()}, PutResult.RawHash};
diff --git a/src/zenserver-test/cache-tests.cpp b/src/zenserver-test/cache-tests.cpp
index 8d50df0e6..854590987 100644
--- a/src/zenserver-test/cache-tests.cpp
+++ b/src/zenserver-test/cache-tests.cpp
@@ -1031,7 +1031,7 @@ TEST_CASE("zcache.rpc")
{
using namespace utils;
- ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber(), "--cache-bucket-limit-overwrites=false");
+ ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber(), "--cache-bucket-limit-overwrites");
ZenServerInstance UpstreamServer(TestEnv);
SpawnServer(UpstreamServer, UpstreamCfg);
diff --git a/src/zenserver/storage/cache/httpstructuredcache.cpp b/src/zenserver/storage/cache/httpstructuredcache.cpp
index 03d2140a1..72f29d14e 100644
--- a/src/zenserver/storage/cache/httpstructuredcache.cpp
+++ b/src/zenserver/storage/cache/httpstructuredcache.cpp
@@ -1268,7 +1268,8 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con
if (HasUpstream && EnumHasAllFlags(PolicyFromUrl, CachePolicy::StoreRemote))
{
- m_UpstreamCache.EnqueueUpstream({.Type = ContentType, .Namespace = Ref.Namespace, .Key = {Ref.BucketSegment, Ref.HashKey}});
+ m_UpstreamCache.EnqueueUpstream(
+ {.Type = ContentType, .Namespace = Ref.Namespace, .Key = {Ref.BucketSegment, Ref.HashKey}, .Overwrite = Overwrite});
}
ZEN_DEBUG("PUTCACHERECORD - '{}/{}/{}' {} '{}' in {}",
@@ -1347,7 +1348,8 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con
m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbObject,
.Namespace = Ref.Namespace,
.Key = {Ref.BucketSegment, Ref.HashKey},
- .ValueContentIds = std::move(ValidAttachments)});
+ .ValueContentIds = std::move(ValidAttachments),
+ .Overwrite = Overwrite});
}
Request.WriteResponse(HttpResponseCode::Created);
@@ -1465,7 +1467,8 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con
m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbPackage,
.Namespace = Ref.Namespace,
.Key = {Ref.BucketSegment, Ref.HashKey},
- .ValueContentIds = std::move(ValidAttachments)});
+ .ValueContentIds = std::move(ValidAttachments),
+ .Overwrite = Overwrite});
}
Request.WriteResponse(HttpResponseCode::Created);
diff --git a/src/zenserver/storage/upstream/upstreamcache.cpp b/src/zenserver/storage/upstream/upstreamcache.cpp
index 938a1a011..b26c57414 100644
--- a/src/zenserver/storage/upstream/upstreamcache.cpp
+++ b/src/zenserver/storage/upstream/upstreamcache.cpp
@@ -554,6 +554,7 @@ namespace detail {
Result = Session.PutRef(BlobStoreNamespace,
CacheRecord.Key.Bucket,
CacheRecord.Key.Hash,
+ CacheRecord.Overwrite,
RecordValue,
ZenContentType::kBinary);
}
@@ -585,6 +586,7 @@ namespace detail {
Session,
CacheRecord.Namespace,
CacheRecord.Key,
+ CacheRecord.Overwrite,
ReferencingObject.Save().GetBuffer().AsIoBuffer(),
MaxAttempts,
[&](const IoHash& ValueContentId, IoBuffer& OutBuffer, std::string& OutReason) {
@@ -605,6 +607,7 @@ namespace detail {
Session,
CacheRecord.Namespace,
CacheRecord.Key,
+ CacheRecord.Overwrite,
RecordValue,
MaxAttempts,
[&](const IoHash& ValueContentId, IoBuffer& OutBuffer, std::string& OutReason) {
@@ -651,6 +654,7 @@ namespace detail {
JupiterSession& Session,
std::string_view Namespace,
const CacheKey& Key,
+ bool Overwrite,
IoBuffer ObjectBuffer,
const int32_t MaxAttempts,
std::function<bool(const IoHash& ValueContentId, IoBuffer& OutBuffer, std::string& OutReason)>&& BlobFetchFn)
@@ -692,7 +696,7 @@ namespace detail {
PutRefResult RefResult;
for (int32_t Attempt = 0; Attempt < MaxAttempts && !RefResult.Success; Attempt++)
{
- RefResult = Session.PutRef(BlobStoreNamespace, Key.Bucket, Key.Hash, ObjectBuffer, ZenContentType::kCbObject);
+ RefResult = Session.PutRef(BlobStoreNamespace, Key.Bucket, Key.Hash, Overwrite, ObjectBuffer, ZenContentType::kCbObject);
}
m_Status.SetFromErrorCode(RefResult.ErrorCode, RefResult.Reason);
@@ -1302,6 +1306,7 @@ namespace detail {
Result = Session.PutCacheRecord(CacheRecord.Namespace,
CacheRecord.Key.Bucket,
CacheRecord.Key.Hash,
+ CacheRecord.Overwrite ? CachePolicy::Store : CachePolicy::Default,
PackagePayload,
CacheRecord.Type);
}
@@ -1329,7 +1334,14 @@ namespace detail {
BatchWriter.BeginObject("Params"sv);
{
- // DefaultPolicy unspecified and expected to be Default
+ if (CacheRecord.Overwrite)
+ {
+ BatchWriter << "DefaultPolicy"sv << WriteToString<128>(CachePolicy::Store).ToView();
+ }
+ else
+ {
+ // DefaultPolicy unspecified and expected to be Default
+ }
BatchWriter << "Namespace"sv << CacheRecord.Namespace;
@@ -1376,6 +1388,7 @@ namespace detail {
Result = Session.PutCacheValue(CacheRecord.Namespace,
CacheRecord.Key.Bucket,
CacheRecord.Key.Hash,
+ CacheRecord.Overwrite ? CachePolicy::Store : CachePolicy::Default,
CacheRecord.ValueContentIds[Idx],
Values[Idx]);
}
@@ -1400,6 +1413,7 @@ namespace detail {
Result = Session.PutCacheRecord(CacheRecord.Namespace,
CacheRecord.Key.Bucket,
CacheRecord.Key.Hash,
+ CacheRecord.Overwrite ? CachePolicy::Store : CachePolicy::Default,
RecordValue,
CacheRecord.Type);
}
diff --git a/src/zenserver/storage/upstream/zen.cpp b/src/zenserver/storage/upstream/zen.cpp
index 25fd3a3bb..423c9039c 100644
--- a/src/zenserver/storage/upstream/zen.cpp
+++ b/src/zenserver/storage/upstream/zen.cpp
@@ -127,6 +127,7 @@ ZenCacheResult
ZenStructuredCacheSession::PutCacheRecord(std::string_view Namespace,
std::string_view BucketId,
const IoHash& Key,
+ CachePolicy Policy,
IoBuffer Value,
ZenContentType Type)
{
@@ -140,6 +141,12 @@ ZenStructuredCacheSession::PutCacheRecord(std::string_view Namespace,
}
Uri << BucketId << "/" << Key.ToHexString();
+ if (Policy != CachePolicy::Default)
+ {
+ Uri << "?Policy=";
+ Uri << Policy;
+ }
+
Value.SetContentType(Type);
HttpClient::Response Response = Http.Put(Uri, Value);
@@ -159,6 +166,7 @@ ZenCacheResult
ZenStructuredCacheSession::PutCacheValue(std::string_view Namespace,
std::string_view BucketId,
const IoHash& Key,
+ CachePolicy Policy,
const IoHash& ValueContentId,
IoBuffer Payload)
{
@@ -172,6 +180,12 @@ ZenStructuredCacheSession::PutCacheValue(std::string_view Namespace,
}
Uri << BucketId << "/" << Key.ToHexString() << "/" << ValueContentId.ToHexString();
+ if (Policy != CachePolicy::Default)
+ {
+ Uri << "?Policy=";
+ Uri << Policy;
+ }
+
Payload.SetContentType(HttpContentType::kCompressedBinary);
HttpClient::Response Response = Http.Put(Uri, Payload);
diff --git a/src/zenserver/storage/upstream/zen.h b/src/zenserver/storage/upstream/zen.h
index 6321b46b1..54fb063c2 100644
--- a/src/zenserver/storage/upstream/zen.h
+++ b/src/zenserver/storage/upstream/zen.h
@@ -8,6 +8,7 @@
#include <zencore/memoryview.h>
#include <zencore/uid.h>
#include <zencore/zencore.h>
+#include <zenstore/cache/cachepolicy.h>
#include <chrono>
@@ -59,11 +60,13 @@ public:
ZenCacheResult PutCacheRecord(std::string_view Namespace,
std::string_view BucketId,
const IoHash& Key,
+ CachePolicy Policy,
IoBuffer Value,
ZenContentType Type);
ZenCacheResult PutCacheValue(std::string_view Namespace,
std::string_view BucketId,
const IoHash& Key,
+ CachePolicy Policy,
const IoHash& ValueContentId,
IoBuffer Payload);
ZenCacheResult InvokeRpc(const CbObjectView& Request);
diff --git a/src/zenstore/cache/cacherpc.cpp b/src/zenstore/cache/cacherpc.cpp
index 6469d6c31..660c66b9a 100644
--- a/src/zenstore/cache/cacherpc.cpp
+++ b/src/zenstore/cache/cacherpc.cpp
@@ -494,7 +494,8 @@ CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Packag
m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbPackage,
.Namespace = Request.Namespace,
.Key = Request.Key,
- .ValueContentIds = std::move(ValidAttachments)});
+ .ValueContentIds = std::move(ValidAttachments),
+ .Overwrite = Overwrite});
}
return PutResult;
}
@@ -998,6 +999,7 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con
eastl::fixed_vector<size_t, 32> BatchResultIndexes;
eastl::fixed_vector<ZenCacheStore::PutResult, 32> Results;
eastl::fixed_vector<CacheKey, 32> UpstreamCacheKeys;
+ eastl::fixed_vector<bool, 32> Overwrites;
uint64_t RequestCount = RequestsArray.Num();
{
@@ -1008,6 +1010,7 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con
Batch = std::make_unique<ZenCacheStore::PutBatch>(m_CacheStore, *Namespace, BatchResults);
BatchResults.reserve(RequestCount);
BatchResultIndexes.reserve(RequestCount);
+ Overwrites.reserve(RequestCount);
}
for (CbFieldView RequestField : RequestsArray)
{
@@ -1032,6 +1035,7 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con
uint64_t RawSize = RequestObject["RawSize"sv].AsUInt64();
bool Valid = false;
uint64_t TransferredSize = 0;
+ bool Overwrite = false;
if (const CbAttachment* Attachment = BatchRequest.FindAttachment(RawHash))
{
@@ -1047,8 +1051,8 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con
if (EnumHasAllFlags(Policy, CachePolicy::StoreLocal))
{
- bool Overwrite = !EnumHasAllFlags(Policy, CachePolicy::QueryLocal);
- IoBuffer Value = Chunk.GetCompressed().Flatten().AsIoBuffer();
+ Overwrite = !EnumHasAllFlags(Policy, CachePolicy::QueryLocal);
+ IoBuffer Value = Chunk.GetCompressed().Flatten().AsIoBuffer();
Value.SetContentType(ZenContentType::kCompressedBinary);
if (RawSize == 0)
{
@@ -1116,6 +1120,7 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con
{
UpstreamCacheKeys.push_back(CacheKey::Empty);
}
+ Overwrites.push_back(Overwrite);
ZEN_DEBUG("PUTCACHEVALUES - '{}/{}/{}' {}, '{}' in {}",
*Namespace,
@@ -1150,8 +1155,10 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con
{
if ((Results[Index].Status == zen::PutStatus::Success) && UpstreamCacheKeys[Index] != CacheKey::Empty)
{
- m_UpstreamCache.EnqueueUpstream(
- {.Type = ZenContentType::kCompressedBinary, .Namespace = *Namespace, .Key = UpstreamCacheKeys[Index]});
+ m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCompressedBinary,
+ .Namespace = *Namespace,
+ .Key = UpstreamCacheKeys[Index],
+ .Overwrite = Overwrites[Index]});
}
}
{
diff --git a/src/zenstore/include/zenstore/cache/upstreamcacheclient.h b/src/zenstore/include/zenstore/cache/upstreamcacheclient.h
index ff4a8c3f7..f5a861e2b 100644
--- a/src/zenstore/include/zenstore/cache/upstreamcacheclient.h
+++ b/src/zenstore/include/zenstore/cache/upstreamcacheclient.h
@@ -27,6 +27,7 @@ struct UpstreamCacheRecord
CacheKey Key;
std::vector<IoHash> ValueContentIds;
CacheRequestContext Context;
+ bool Overwrite = false;
};
struct UpstreamError