aboutsummaryrefslogtreecommitdiff
path: root/zenserver/upstream/upstreamcache.cpp
diff options
context:
space:
mode:
authorzousar <[email protected]>2022-02-08 18:06:53 -0700
committerzousar <[email protected]>2022-02-08 18:06:53 -0700
commit4c08a6f32966a94cf925b7b1503623a3953340b3 (patch)
tree3372483df06be6878f79c50dff9bdc34410badbe /zenserver/upstream/upstreamcache.cpp
parentMerge branch 'main' of https://github.com/EpicGames/zen (diff)
downloadzen-4c08a6f32966a94cf925b7b1503623a3953340b3.tar.xz
zen-4c08a6f32966a94cf925b7b1503623a3953340b3.zip
Change Value propagation to Zen or Jupiter
This change ensures we retain the right content type of kCompressedBinary when propagating values from Zen->UpstreamZen. This is done via an RPC that posts a CbPackage. Furthermore when propagating from Zen->Jupiter, it composes its own referencing CbObject for them instead of sending a octet content type and Jupiter defining the referencing CbObject. When fetching Values from Jupiter, this new composed CbObject is still interpreted correctly by Zen.
Diffstat (limited to 'zenserver/upstream/upstreamcache.cpp')
-rw-r--r--zenserver/upstream/upstreamcache.cpp303
1 files changed, 198 insertions, 105 deletions
diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp
index 5990536a9..b165c3e7e 100644
--- a/zenserver/upstream/upstreamcache.cpp
+++ b/zenserver/upstream/upstreamcache.cpp
@@ -402,150 +402,193 @@ namespace detail {
.ElapsedSeconds = Result.ElapsedSeconds,
.Success = Result.Success};
}
- else
+ else if (CacheRecord.Type == ZenContentType::kCompressedBinary)
{
- int64_t TotalBytes = 0ull;
- double TotalElapsedSeconds = 0.0;
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(RecordValue));
+ if (!Compressed)
+ {
+ return {.Reason = std::string("Invalid compressed value buffer"), .Success = false};
+ }
- const auto PutBlobs = [&](std::span<IoHash> ValueContentIds, std::string& OutReason) -> bool {
- for (const IoHash& ValueContentId : ValueContentIds)
- {
- const auto It =
- std::find(std::begin(CacheRecord.ValueContentIds), std::end(CacheRecord.ValueContentIds), ValueContentId);
+ IoHash RawHash = IoHash::FromBLAKE3(Compressed.GetRawHash());
- if (It == std::end(CacheRecord.ValueContentIds))
- {
- OutReason = fmt::format("value '{}' MISSING from local cache", ValueContentId);
- return false;
- }
+ CbObjectWriter ReferencingObject;
+ ReferencingObject.AddBinaryAttachment("RawHash", RawHash);
+ ReferencingObject.AddInteger("RawSize", Compressed.GetRawSize());
- const size_t Idx = std::distance(std::begin(CacheRecord.ValueContentIds), It);
+ return PerformStructuredPut(Session, CacheRecord.Key, ReferencingObject.Save().GetBuffer().AsIoBuffer(), MaxAttempts,
+ [&](const IoHash& ValueContentId, IoBuffer& OutBuffer, std::string& OutReason) {
- CloudCacheResult BlobResult;
- for (int32_t Attempt = 0; Attempt < MaxAttempts && !BlobResult.Success; Attempt++)
+ if (ValueContentId != RawHash)
{
- BlobResult = Session.PutCompressedBlob(CacheRecord.ValueContentIds[Idx], Values[Idx]);
+ OutReason = fmt::format("Value '{}' MISMATCHED from compressed buffer raw hash {}", ValueContentId, RawHash);
+ return false;
}
- m_Status.SetFromErrorCode(BlobResult.ErrorCode, BlobResult.Reason);
+ OutBuffer = RecordValue;
+ return true;
+ });
+ }
+ else
+ {
+ return PerformStructuredPut(Session, CacheRecord.Key, RecordValue, MaxAttempts, [&](const IoHash& ValueContentId, IoBuffer& OutBuffer, std::string& OutReason) {
- if (!BlobResult.Success)
- {
- OutReason = fmt::format("upload value '{}' FAILED, reason '{}'", ValueContentId, BlobResult.Reason);
- return false;
- }
+ const auto It =
+ std::find(std::begin(CacheRecord.ValueContentIds), std::end(CacheRecord.ValueContentIds), ValueContentId);
- TotalBytes += BlobResult.Bytes;
- TotalElapsedSeconds += BlobResult.ElapsedSeconds;
+ if (It == std::end(CacheRecord.ValueContentIds))
+ {
+ OutReason = fmt::format("value '{}' MISSING from local cache", ValueContentId);
+ return false;
}
+ const size_t Idx = std::distance(std::begin(CacheRecord.ValueContentIds), It);
+
+ OutBuffer = Values[Idx];
return true;
- };
+ });
+ }
+ }
+ catch (std::exception& Err)
+ {
+ m_Status.Set(UpstreamEndpointState::kError, Err.what());
- PutRefResult RefResult;
- for (int32_t Attempt = 0; Attempt < MaxAttempts && !RefResult.Success; Attempt++)
- {
- RefResult = Session.PutRef(CacheRecord.Key.Bucket, CacheRecord.Key.Hash, RecordValue, ZenContentType::kCbObject);
- }
+ return {.Reason = std::string(Err.what()), .Success = false};
+ }
+ }
- m_Status.SetFromErrorCode(RefResult.ErrorCode, RefResult.Reason);
+ virtual UpstreamEndpointStats& Stats() override { return m_Stats; }
- if (!RefResult.Success)
- {
- return {.Reason = fmt::format("upload cache record '{}/{}' FAILED, reason '{}'",
- CacheRecord.Key.Bucket,
- CacheRecord.Key.Hash,
- RefResult.Reason),
- .Success = false};
- }
+ private:
+ static void AppendResult(const CloudCacheResult& Result, GetUpstreamCacheResult& Out)
+ {
+ Out.Success &= Result.Success;
+ Out.Bytes += Result.Bytes;
+ Out.ElapsedSeconds += Result.ElapsedSeconds;
+
+ if (Result.ErrorCode)
+ {
+ Out.Error = {.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)};
+ }
+ };
- TotalBytes += RefResult.Bytes;
- TotalElapsedSeconds += RefResult.ElapsedSeconds;
+ PutUpstreamCacheResult PerformStructuredPut(CloudCacheSession& Session, const CacheKey& Key, IoBuffer ObjectBuffer, const int32_t MaxAttempts, std::function<bool(const IoHash& ValueContentId, IoBuffer& OutBuffer, std::string& OutReason)>&& BlobFetchFn)
+ {
+ int64_t TotalBytes = 0ull;
+ double TotalElapsedSeconds = 0.0;
- std::string Reason;
- if (!PutBlobs(RefResult.Needs, Reason))
+ const auto PutBlobs = [&](std::span<IoHash> ValueContentIds, std::string& OutReason) -> bool {
+ for (const IoHash& ValueContentId : ValueContentIds)
+ {
+ IoBuffer BlobBuffer;
+ if (!BlobFetchFn(ValueContentId, BlobBuffer, OutReason))
{
- return {.Reason = std::move(Reason), .Success = false};
+ return false;
}
- const IoHash RefHash = IoHash::HashBuffer(RecordValue);
- FinalizeRefResult FinalizeResult = Session.FinalizeRef(CacheRecord.Key.Bucket, CacheRecord.Key.Hash, RefHash);
+ CloudCacheResult BlobResult;
+ for (int32_t Attempt = 0; Attempt < MaxAttempts && !BlobResult.Success; Attempt++)
+ {
+ BlobResult = Session.PutCompressedBlob(ValueContentId, BlobBuffer);
+ }
- m_Status.SetFromErrorCode(FinalizeResult.ErrorCode, FinalizeResult.Reason);
+ m_Status.SetFromErrorCode(BlobResult.ErrorCode, BlobResult.Reason);
- if (!FinalizeResult.Success)
+ if (!BlobResult.Success)
{
- return {.Reason = fmt::format("finalize cache record '{}/{}' FAILED, reason '{}'",
- CacheRecord.Key.Bucket,
- CacheRecord.Key.Hash,
- FinalizeResult.Reason),
- .Success = false};
+ OutReason = fmt::format("upload value '{}' FAILED, reason '{}'", ValueContentId, BlobResult.Reason);
+ return false;
}
- if (!FinalizeResult.Needs.empty())
- {
- if (!PutBlobs(FinalizeResult.Needs, Reason))
- {
- return {.Reason = std::move(Reason), .Success = false};
- }
+ TotalBytes += BlobResult.Bytes;
+ TotalElapsedSeconds += BlobResult.ElapsedSeconds;
+ }
- FinalizeResult = Session.FinalizeRef(CacheRecord.Key.Bucket, CacheRecord.Key.Hash, RefHash);
+ return true;
+ };
- m_Status.SetFromErrorCode(FinalizeResult.ErrorCode, FinalizeResult.Reason);
+ PutRefResult RefResult;
+ for (int32_t Attempt = 0; Attempt < MaxAttempts && !RefResult.Success; Attempt++)
+ {
+ RefResult = Session.PutRef(Key.Bucket, Key.Hash, ObjectBuffer, ZenContentType::kCbObject);
+ }
- if (!FinalizeResult.Success)
- {
- return {.Reason = fmt::format("finalize '{}/{}' FAILED, reason '{}'",
- CacheRecord.Key.Bucket,
- CacheRecord.Key.Hash,
- FinalizeResult.Reason),
- .Success = false};
- }
+ m_Status.SetFromErrorCode(RefResult.ErrorCode, RefResult.Reason);
- if (!FinalizeResult.Needs.empty())
- {
- ExtendableStringBuilder<256> Sb;
- for (const IoHash& MissingHash : FinalizeResult.Needs)
- {
- Sb << MissingHash.ToHexString() << ",";
- }
+ if (!RefResult.Success)
+ {
+ return { .Reason = fmt::format("upload cache record '{}/{}' FAILED, reason '{}'",
+ Key.Bucket,
+ Key.Hash,
+ RefResult.Reason),
+ .Success = false };
+ }
- return {.Reason = fmt::format("finalize '{}/{}' FAILED, still needs value(s) '{}'",
- CacheRecord.Key.Bucket,
- CacheRecord.Key.Hash,
- Sb.ToString()),
- .Success = false};
- }
- }
+ TotalBytes += RefResult.Bytes;
+ TotalElapsedSeconds += RefResult.ElapsedSeconds;
+
+ std::string Reason;
+ if (!PutBlobs(RefResult.Needs, Reason))
+ {
+ return { .Reason = std::move(Reason), .Success = false };
+ }
- TotalBytes += FinalizeResult.Bytes;
- TotalElapsedSeconds += FinalizeResult.ElapsedSeconds;
+ const IoHash RefHash = IoHash::HashBuffer(ObjectBuffer);
+ FinalizeRefResult FinalizeResult = Session.FinalizeRef(Key.Bucket, Key.Hash, RefHash);
- return {.Bytes = TotalBytes, .ElapsedSeconds = TotalElapsedSeconds, .Success = true};
- }
+ m_Status.SetFromErrorCode(FinalizeResult.ErrorCode, FinalizeResult.Reason);
+
+ if (!FinalizeResult.Success)
+ {
+ return { .Reason = fmt::format("finalize cache record '{}/{}' FAILED, reason '{}'",
+ Key.Bucket,
+ Key.Hash,
+ FinalizeResult.Reason),
+ .Success = false };
}
- catch (std::exception& Err)
+
+ if (!FinalizeResult.Needs.empty())
{
- m_Status.Set(UpstreamEndpointState::kError, Err.what());
+ if (!PutBlobs(FinalizeResult.Needs, Reason))
+ {
+ return { .Reason = std::move(Reason), .Success = false };
+ }
- return {.Reason = std::string(Err.what()), .Success = false};
- }
- }
+ FinalizeResult = Session.FinalizeRef(Key.Bucket, Key.Hash, RefHash);
- virtual UpstreamEndpointStats& Stats() override { return m_Stats; }
+ m_Status.SetFromErrorCode(FinalizeResult.ErrorCode, FinalizeResult.Reason);
- private:
- static void AppendResult(const CloudCacheResult& Result, GetUpstreamCacheResult& Out)
- {
- Out.Success &= Result.Success;
- Out.Bytes += Result.Bytes;
- Out.ElapsedSeconds += Result.ElapsedSeconds;
+ if (!FinalizeResult.Success)
+ {
+ return { .Reason = fmt::format("finalize '{}/{}' FAILED, reason '{}'",
+ Key.Bucket,
+ Key.Hash,
+ FinalizeResult.Reason),
+ .Success = false };
+ }
- if (Result.ErrorCode)
- {
- Out.Error = {.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)};
+ if (!FinalizeResult.Needs.empty())
+ {
+ ExtendableStringBuilder<256> Sb;
+ for (const IoHash& MissingHash : FinalizeResult.Needs)
+ {
+ Sb << MissingHash.ToHexString() << ",";
+ }
+
+ return { .Reason = fmt::format("finalize '{}/{}' FAILED, still needs value(s) '{}'",
+ Key.Bucket,
+ Key.Hash,
+ Sb.ToString()),
+ .Success = false };
+ }
}
- };
+
+ TotalBytes += FinalizeResult.Bytes;
+ TotalElapsedSeconds += FinalizeResult.ElapsedSeconds;
+
+ return { .Bytes = TotalBytes, .ElapsedSeconds = TotalElapsedSeconds, .Success = true };
+
+ }
spdlog::logger& Log() { return m_Log; }
@@ -921,7 +964,7 @@ namespace detail {
}
else
{
- return {.Reason = std::string("invalid value buffer"), .Success = false};
+ return {.Reason = std::string("Invalid value buffer"), .Success = false};
}
}
@@ -939,6 +982,56 @@ namespace detail {
TotalBytes = Result.Bytes;
TotalElapsedSeconds = Result.ElapsedSeconds;
}
+ else if (CacheRecord.Type == ZenContentType::kCompressedBinary)
+ {
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(RecordValue));
+ if (!Compressed)
+ {
+ return {.Reason = std::string("Invalid value compressed buffer"), .Success = false};
+ }
+
+ CbPackage BatchPackage;
+ CbObjectWriter BatchWriter;
+ BatchWriter << "Method"sv
+ << "PutCacheValues";
+
+ BatchWriter.BeginObject("Params"sv);
+ {
+ // DefaultPolicy unspecified and expected to be Default
+
+ BatchWriter.BeginArray("Requests"sv);
+ {
+ BatchWriter.BeginObject();
+ {
+ const CacheKey& Key = CacheRecord.Key;
+ BatchWriter.BeginObject("Key"sv);
+ {
+ BatchWriter << "Bucket"sv << Key.Bucket;
+ BatchWriter << "Hash"sv << Key.Hash;
+ }
+ BatchWriter.EndObject();
+ // Policy unspecified and expected to be Default
+ BatchWriter.AddBinaryAttachment("RawHash"sv, IoHash::FromBLAKE3(Compressed.GetRawHash()));
+ BatchPackage.AddAttachment(CbAttachment(Compressed));
+ }
+ BatchWriter.EndObject();
+ }
+ BatchWriter.EndArray();
+ }
+ BatchWriter.EndObject();
+ BatchPackage.SetObject(BatchWriter.Save());
+
+ Result.Success = false;
+ for (uint32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++)
+ {
+ Result = Session.InvokeRpc(BatchPackage);
+ }
+
+ m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason);
+
+ TotalBytes += Result.Bytes;
+ TotalElapsedSeconds += Result.ElapsedSeconds;
+ }
else
{
for (size_t Idx = 0, Count = Values.size(); Idx < Count; Idx++)