aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorzousar <[email protected]>2022-02-08 18:06:53 -0700
committerzousar <[email protected]>2022-02-08 18:06:53 -0700
commit4c08a6f32966a94cf925b7b1503623a3953340b3 (patch)
tree3372483df06be6878f79c50dff9bdc34410badbe
parentMerge branch 'main' of https://github.com/EpicGames/zen (diff)
downloadzen-4c08a6f32966a94cf925b7b1503623a3953340b3.tar.xz
zen-4c08a6f32966a94cf925b7b1503623a3953340b3.zip
Change Value propagation to Zen or Jupiter
This change ensures we retain the right content type of kCompressedBinary when propagating values from Zen->UpstreamZen. This is done via an RPC that posts a CbPackage. Furthermore when propagating from Zen->Jupiter, it composes its own referencing CbObject for them instead of sending a octet content type and Jupiter defining the referencing CbObject. When fetching Values from Jupiter, this new composed CbObject is still interpreted correctly by Zen.
-rw-r--r--zenserver/cache/structuredcache.cpp2
-rw-r--r--zenserver/upstream/upstreamcache.cpp303
-rw-r--r--zenserver/upstream/zen.cpp30
-rw-r--r--zenserver/upstream/zen.h1
4 files changed, 230 insertions, 106 deletions
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp
index 49e5896d1..9ad465506 100644
--- a/zenserver/cache/structuredcache.cpp
+++ b/zenserver/cache/structuredcache.cpp
@@ -1527,7 +1527,7 @@ HttpStructuredCacheService::HandleRpcPutCacheValues(zen::HttpServerRequest& Requ
if (Succeeded && EnumHasAllFlags(Policy, CachePolicy::StoreRemote))
{
- m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kBinary, .Key = Key});
+ m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCompressedBinary, .Key = Key});
}
Results.push_back(Succeeded);
ZEN_DEBUG("PUTCACHEVALUES - '{}/{}' {}, '{}'", Key.Bucket, Key.Hash, NiceBytes(TransferredSize), Succeeded ? "Added"sv : "Invalid");
diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp
index 5990536a9..b165c3e7e 100644
--- a/zenserver/upstream/upstreamcache.cpp
+++ b/zenserver/upstream/upstreamcache.cpp
@@ -402,150 +402,193 @@ namespace detail {
.ElapsedSeconds = Result.ElapsedSeconds,
.Success = Result.Success};
}
- else
+ else if (CacheRecord.Type == ZenContentType::kCompressedBinary)
{
- int64_t TotalBytes = 0ull;
- double TotalElapsedSeconds = 0.0;
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(RecordValue));
+ if (!Compressed)
+ {
+ return {.Reason = std::string("Invalid compressed value buffer"), .Success = false};
+ }
- const auto PutBlobs = [&](std::span<IoHash> ValueContentIds, std::string& OutReason) -> bool {
- for (const IoHash& ValueContentId : ValueContentIds)
- {
- const auto It =
- std::find(std::begin(CacheRecord.ValueContentIds), std::end(CacheRecord.ValueContentIds), ValueContentId);
+ IoHash RawHash = IoHash::FromBLAKE3(Compressed.GetRawHash());
- if (It == std::end(CacheRecord.ValueContentIds))
- {
- OutReason = fmt::format("value '{}' MISSING from local cache", ValueContentId);
- return false;
- }
+ CbObjectWriter ReferencingObject;
+ ReferencingObject.AddBinaryAttachment("RawHash", RawHash);
+ ReferencingObject.AddInteger("RawSize", Compressed.GetRawSize());
- const size_t Idx = std::distance(std::begin(CacheRecord.ValueContentIds), It);
+ return PerformStructuredPut(Session, CacheRecord.Key, ReferencingObject.Save().GetBuffer().AsIoBuffer(), MaxAttempts,
+ [&](const IoHash& ValueContentId, IoBuffer& OutBuffer, std::string& OutReason) {
- CloudCacheResult BlobResult;
- for (int32_t Attempt = 0; Attempt < MaxAttempts && !BlobResult.Success; Attempt++)
+ if (ValueContentId != RawHash)
{
- BlobResult = Session.PutCompressedBlob(CacheRecord.ValueContentIds[Idx], Values[Idx]);
+ OutReason = fmt::format("Value '{}' MISMATCHED from compressed buffer raw hash {}", ValueContentId, RawHash);
+ return false;
}
- m_Status.SetFromErrorCode(BlobResult.ErrorCode, BlobResult.Reason);
+ OutBuffer = RecordValue;
+ return true;
+ });
+ }
+ else
+ {
+ return PerformStructuredPut(Session, CacheRecord.Key, RecordValue, MaxAttempts, [&](const IoHash& ValueContentId, IoBuffer& OutBuffer, std::string& OutReason) {
- if (!BlobResult.Success)
- {
- OutReason = fmt::format("upload value '{}' FAILED, reason '{}'", ValueContentId, BlobResult.Reason);
- return false;
- }
+ const auto It =
+ std::find(std::begin(CacheRecord.ValueContentIds), std::end(CacheRecord.ValueContentIds), ValueContentId);
- TotalBytes += BlobResult.Bytes;
- TotalElapsedSeconds += BlobResult.ElapsedSeconds;
+ if (It == std::end(CacheRecord.ValueContentIds))
+ {
+ OutReason = fmt::format("value '{}' MISSING from local cache", ValueContentId);
+ return false;
}
+ const size_t Idx = std::distance(std::begin(CacheRecord.ValueContentIds), It);
+
+ OutBuffer = Values[Idx];
return true;
- };
+ });
+ }
+ }
+ catch (std::exception& Err)
+ {
+ m_Status.Set(UpstreamEndpointState::kError, Err.what());
- PutRefResult RefResult;
- for (int32_t Attempt = 0; Attempt < MaxAttempts && !RefResult.Success; Attempt++)
- {
- RefResult = Session.PutRef(CacheRecord.Key.Bucket, CacheRecord.Key.Hash, RecordValue, ZenContentType::kCbObject);
- }
+ return {.Reason = std::string(Err.what()), .Success = false};
+ }
+ }
- m_Status.SetFromErrorCode(RefResult.ErrorCode, RefResult.Reason);
+ virtual UpstreamEndpointStats& Stats() override { return m_Stats; }
- if (!RefResult.Success)
- {
- return {.Reason = fmt::format("upload cache record '{}/{}' FAILED, reason '{}'",
- CacheRecord.Key.Bucket,
- CacheRecord.Key.Hash,
- RefResult.Reason),
- .Success = false};
- }
+ private:
+ static void AppendResult(const CloudCacheResult& Result, GetUpstreamCacheResult& Out)
+ {
+ Out.Success &= Result.Success;
+ Out.Bytes += Result.Bytes;
+ Out.ElapsedSeconds += Result.ElapsedSeconds;
+
+ if (Result.ErrorCode)
+ {
+ Out.Error = {.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)};
+ }
+ };
- TotalBytes += RefResult.Bytes;
- TotalElapsedSeconds += RefResult.ElapsedSeconds;
+ PutUpstreamCacheResult PerformStructuredPut(CloudCacheSession& Session, const CacheKey& Key, IoBuffer ObjectBuffer, const int32_t MaxAttempts, std::function<bool(const IoHash& ValueContentId, IoBuffer& OutBuffer, std::string& OutReason)>&& BlobFetchFn)
+ {
+ int64_t TotalBytes = 0ull;
+ double TotalElapsedSeconds = 0.0;
- std::string Reason;
- if (!PutBlobs(RefResult.Needs, Reason))
+ const auto PutBlobs = [&](std::span<IoHash> ValueContentIds, std::string& OutReason) -> bool {
+ for (const IoHash& ValueContentId : ValueContentIds)
+ {
+ IoBuffer BlobBuffer;
+ if (!BlobFetchFn(ValueContentId, BlobBuffer, OutReason))
{
- return {.Reason = std::move(Reason), .Success = false};
+ return false;
}
- const IoHash RefHash = IoHash::HashBuffer(RecordValue);
- FinalizeRefResult FinalizeResult = Session.FinalizeRef(CacheRecord.Key.Bucket, CacheRecord.Key.Hash, RefHash);
+ CloudCacheResult BlobResult;
+ for (int32_t Attempt = 0; Attempt < MaxAttempts && !BlobResult.Success; Attempt++)
+ {
+ BlobResult = Session.PutCompressedBlob(ValueContentId, BlobBuffer);
+ }
- m_Status.SetFromErrorCode(FinalizeResult.ErrorCode, FinalizeResult.Reason);
+ m_Status.SetFromErrorCode(BlobResult.ErrorCode, BlobResult.Reason);
- if (!FinalizeResult.Success)
+ if (!BlobResult.Success)
{
- return {.Reason = fmt::format("finalize cache record '{}/{}' FAILED, reason '{}'",
- CacheRecord.Key.Bucket,
- CacheRecord.Key.Hash,
- FinalizeResult.Reason),
- .Success = false};
+ OutReason = fmt::format("upload value '{}' FAILED, reason '{}'", ValueContentId, BlobResult.Reason);
+ return false;
}
- if (!FinalizeResult.Needs.empty())
- {
- if (!PutBlobs(FinalizeResult.Needs, Reason))
- {
- return {.Reason = std::move(Reason), .Success = false};
- }
+ TotalBytes += BlobResult.Bytes;
+ TotalElapsedSeconds += BlobResult.ElapsedSeconds;
+ }
- FinalizeResult = Session.FinalizeRef(CacheRecord.Key.Bucket, CacheRecord.Key.Hash, RefHash);
+ return true;
+ };
- m_Status.SetFromErrorCode(FinalizeResult.ErrorCode, FinalizeResult.Reason);
+ PutRefResult RefResult;
+ for (int32_t Attempt = 0; Attempt < MaxAttempts && !RefResult.Success; Attempt++)
+ {
+ RefResult = Session.PutRef(Key.Bucket, Key.Hash, ObjectBuffer, ZenContentType::kCbObject);
+ }
- if (!FinalizeResult.Success)
- {
- return {.Reason = fmt::format("finalize '{}/{}' FAILED, reason '{}'",
- CacheRecord.Key.Bucket,
- CacheRecord.Key.Hash,
- FinalizeResult.Reason),
- .Success = false};
- }
+ m_Status.SetFromErrorCode(RefResult.ErrorCode, RefResult.Reason);
- if (!FinalizeResult.Needs.empty())
- {
- ExtendableStringBuilder<256> Sb;
- for (const IoHash& MissingHash : FinalizeResult.Needs)
- {
- Sb << MissingHash.ToHexString() << ",";
- }
+ if (!RefResult.Success)
+ {
+ return { .Reason = fmt::format("upload cache record '{}/{}' FAILED, reason '{}'",
+ Key.Bucket,
+ Key.Hash,
+ RefResult.Reason),
+ .Success = false };
+ }
- return {.Reason = fmt::format("finalize '{}/{}' FAILED, still needs value(s) '{}'",
- CacheRecord.Key.Bucket,
- CacheRecord.Key.Hash,
- Sb.ToString()),
- .Success = false};
- }
- }
+ TotalBytes += RefResult.Bytes;
+ TotalElapsedSeconds += RefResult.ElapsedSeconds;
+
+ std::string Reason;
+ if (!PutBlobs(RefResult.Needs, Reason))
+ {
+ return { .Reason = std::move(Reason), .Success = false };
+ }
- TotalBytes += FinalizeResult.Bytes;
- TotalElapsedSeconds += FinalizeResult.ElapsedSeconds;
+ const IoHash RefHash = IoHash::HashBuffer(ObjectBuffer);
+ FinalizeRefResult FinalizeResult = Session.FinalizeRef(Key.Bucket, Key.Hash, RefHash);
- return {.Bytes = TotalBytes, .ElapsedSeconds = TotalElapsedSeconds, .Success = true};
- }
+ m_Status.SetFromErrorCode(FinalizeResult.ErrorCode, FinalizeResult.Reason);
+
+ if (!FinalizeResult.Success)
+ {
+ return { .Reason = fmt::format("finalize cache record '{}/{}' FAILED, reason '{}'",
+ Key.Bucket,
+ Key.Hash,
+ FinalizeResult.Reason),
+ .Success = false };
}
- catch (std::exception& Err)
+
+ if (!FinalizeResult.Needs.empty())
{
- m_Status.Set(UpstreamEndpointState::kError, Err.what());
+ if (!PutBlobs(FinalizeResult.Needs, Reason))
+ {
+ return { .Reason = std::move(Reason), .Success = false };
+ }
- return {.Reason = std::string(Err.what()), .Success = false};
- }
- }
+ FinalizeResult = Session.FinalizeRef(Key.Bucket, Key.Hash, RefHash);
- virtual UpstreamEndpointStats& Stats() override { return m_Stats; }
+ m_Status.SetFromErrorCode(FinalizeResult.ErrorCode, FinalizeResult.Reason);
- private:
- static void AppendResult(const CloudCacheResult& Result, GetUpstreamCacheResult& Out)
- {
- Out.Success &= Result.Success;
- Out.Bytes += Result.Bytes;
- Out.ElapsedSeconds += Result.ElapsedSeconds;
+ if (!FinalizeResult.Success)
+ {
+ return { .Reason = fmt::format("finalize '{}/{}' FAILED, reason '{}'",
+ Key.Bucket,
+ Key.Hash,
+ FinalizeResult.Reason),
+ .Success = false };
+ }
- if (Result.ErrorCode)
- {
- Out.Error = {.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)};
+ if (!FinalizeResult.Needs.empty())
+ {
+ ExtendableStringBuilder<256> Sb;
+ for (const IoHash& MissingHash : FinalizeResult.Needs)
+ {
+ Sb << MissingHash.ToHexString() << ",";
+ }
+
+ return { .Reason = fmt::format("finalize '{}/{}' FAILED, still needs value(s) '{}'",
+ Key.Bucket,
+ Key.Hash,
+ Sb.ToString()),
+ .Success = false };
+ }
}
- };
+
+ TotalBytes += FinalizeResult.Bytes;
+ TotalElapsedSeconds += FinalizeResult.ElapsedSeconds;
+
+ return { .Bytes = TotalBytes, .ElapsedSeconds = TotalElapsedSeconds, .Success = true };
+
+ }
spdlog::logger& Log() { return m_Log; }
@@ -921,7 +964,7 @@ namespace detail {
}
else
{
- return {.Reason = std::string("invalid value buffer"), .Success = false};
+ return {.Reason = std::string("Invalid value buffer"), .Success = false};
}
}
@@ -939,6 +982,56 @@ namespace detail {
TotalBytes = Result.Bytes;
TotalElapsedSeconds = Result.ElapsedSeconds;
}
+ else if (CacheRecord.Type == ZenContentType::kCompressedBinary)
+ {
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(RecordValue));
+ if (!Compressed)
+ {
+ return {.Reason = std::string("Invalid value compressed buffer"), .Success = false};
+ }
+
+ CbPackage BatchPackage;
+ CbObjectWriter BatchWriter;
+ BatchWriter << "Method"sv
+ << "PutCacheValues";
+
+ BatchWriter.BeginObject("Params"sv);
+ {
+ // DefaultPolicy unspecified and expected to be Default
+
+ BatchWriter.BeginArray("Requests"sv);
+ {
+ BatchWriter.BeginObject();
+ {
+ const CacheKey& Key = CacheRecord.Key;
+ BatchWriter.BeginObject("Key"sv);
+ {
+ BatchWriter << "Bucket"sv << Key.Bucket;
+ BatchWriter << "Hash"sv << Key.Hash;
+ }
+ BatchWriter.EndObject();
+ // Policy unspecified and expected to be Default
+ BatchWriter.AddBinaryAttachment("RawHash"sv, IoHash::FromBLAKE3(Compressed.GetRawHash()));
+ BatchPackage.AddAttachment(CbAttachment(Compressed));
+ }
+ BatchWriter.EndObject();
+ }
+ BatchWriter.EndArray();
+ }
+ BatchWriter.EndObject();
+ BatchPackage.SetObject(BatchWriter.Save());
+
+ Result.Success = false;
+ for (uint32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++)
+ {
+ Result = Session.InvokeRpc(BatchPackage);
+ }
+
+ m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason);
+
+ TotalBytes += Result.Bytes;
+ TotalElapsedSeconds += Result.ElapsedSeconds;
+ }
else
{
for (size_t Idx = 0, Count = Values.size(); Idx < Count; Idx++)
diff --git a/zenserver/upstream/zen.cpp b/zenserver/upstream/zen.cpp
index 0570dd316..098f53bb0 100644
--- a/zenserver/upstream/zen.cpp
+++ b/zenserver/upstream/zen.cpp
@@ -3,10 +3,12 @@
#include "zen.h"
#include <zencore/compactbinarybuilder.h>
+#include <zencore/compactbinarypackage.h>
#include <zencore/compactbinaryvalidation.h>
#include <zencore/fmtutils.h>
#include <zencore/session.h>
#include <zencore/stream.h>
+#include <zenhttp/httpshared.h>
#include "cache/structuredcachestore.h"
#include "diag/formatters.h"
@@ -539,4 +541,32 @@ ZenStructuredCacheSession::InvokeRpc(const CbObjectView& Request)
return {.Response = std::move(Buffer), .Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success};
}
+ZenCacheResult
+ZenStructuredCacheSession::InvokeRpc(const CbPackage& Request)
+{
+ ExtendableStringBuilder<256> Uri;
+ Uri << m_Client.ServiceUrl() << "/z$/$rpc";
+
+ SharedBuffer Message = FormatPackageMessageBuffer(Request).Flatten();
+
+ cpr::Session& Session = m_SessionState->GetSession();
+
+ Session.SetOption(cpr::Url{Uri.c_str()});
+ Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}});
+ Session.SetBody(cpr::Body{reinterpret_cast<const char*>(Message.GetData()), Message.GetSize()});
+
+ cpr::Response Response = Session.Post();
+ ZEN_DEBUG("POST {}", Response);
+
+ if (Response.error)
+ {
+ return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = std::move(Response.error.message)};
+ }
+
+ const bool Success = Response.status_code == 200;
+ const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer();
+
+ return {.Response = std::move(Buffer), .Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success};
+}
+
} // namespace zen
diff --git a/zenserver/upstream/zen.h b/zenserver/upstream/zen.h
index bc8fd3c56..f70d9d06f 100644
--- a/zenserver/upstream/zen.h
+++ b/zenserver/upstream/zen.h
@@ -133,6 +133,7 @@ public:
ZenCacheResult PutCacheRecord(std::string_view BucketId, const IoHash& Key, IoBuffer Value, ZenContentType Type);
ZenCacheResult PutCacheValue(std::string_view BucketId, const IoHash& Key, const IoHash& ValueContentId, IoBuffer Payload);
ZenCacheResult InvokeRpc(const CbObjectView& Request);
+ ZenCacheResult InvokeRpc(const CbPackage& Package);
private:
inline spdlog::logger& Log() { return m_Log; }