aboutsummaryrefslogtreecommitdiff
path: root/zenserver/upstream/upstreamcache.cpp
diff options
context:
space:
mode:
authorzousar <[email protected]>2022-02-09 11:17:30 -0700
committerGitHub <[email protected]>2022-02-09 11:17:30 -0700
commitbe7c0eb18e98f5edf0ec17b54cf27200c42920f9 (patch)
tree2b56085e844a242128af008ff5bdc20f499c8ab1 /zenserver/upstream/upstreamcache.cpp
parentSimplify HandleRpcGetCacheChunks (#53) (diff)
parentprepare_commit to fix formatting (diff)
downloadzen-be7c0eb18e98f5edf0ec17b54cf27200c42920f9.tar.xz
zen-be7c0eb18e98f5edf0ec17b54cf27200c42920f9.zip
Merge pull request #52 from EpicGames/ValuePropagationFix
Change Value propagation to Zen or Jupiter
Diffstat (limited to 'zenserver/upstream/upstreamcache.cpp')
-rw-r--r--zenserver/upstream/upstreamcache.cpp297
1 files changed, 196 insertions, 101 deletions
diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp
index d02a00fcd..5e0f6a297 100644
--- a/zenserver/upstream/upstreamcache.cpp
+++ b/zenserver/upstream/upstreamcache.cpp
@@ -402,14 +402,45 @@ namespace detail {
.ElapsedSeconds = Result.ElapsedSeconds,
.Success = Result.Success};
}
- else
+ else if (CacheRecord.Type == ZenContentType::kCompressedBinary)
{
- int64_t TotalBytes = 0ull;
- double TotalElapsedSeconds = 0.0;
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(RecordValue));
+ if (!Compressed)
+ {
+ return {.Reason = std::string("Invalid compressed value buffer"), .Success = false};
+ }
- const auto PutBlobs = [&](std::span<IoHash> ValueContentIds, std::string& OutReason) -> bool {
- for (const IoHash& ValueContentId : ValueContentIds)
- {
+ IoHash RawHash = IoHash::FromBLAKE3(Compressed.GetRawHash());
+
+ CbObjectWriter ReferencingObject;
+ ReferencingObject.AddBinaryAttachment("RawHash", RawHash);
+ ReferencingObject.AddInteger("RawSize", Compressed.GetRawSize());
+
+ return PerformStructuredPut(
+ Session,
+ CacheRecord.Key,
+ ReferencingObject.Save().GetBuffer().AsIoBuffer(),
+ MaxAttempts,
+ [&](const IoHash& ValueContentId, IoBuffer& OutBuffer, std::string& OutReason) {
+ if (ValueContentId != RawHash)
+ {
+ OutReason =
+ fmt::format("Value '{}' MISMATCHED from compressed buffer raw hash {}", ValueContentId, RawHash);
+ return false;
+ }
+
+ OutBuffer = RecordValue;
+ return true;
+ });
+ }
+ else
+ {
+ return PerformStructuredPut(
+ Session,
+ CacheRecord.Key,
+ RecordValue,
+ MaxAttempts,
+ [&](const IoHash& ValueContentId, IoBuffer& OutBuffer, std::string& OutReason) {
const auto It =
std::find(std::begin(CacheRecord.ValueContentIds), std::end(CacheRecord.ValueContentIds), ValueContentId);
@@ -421,131 +452,145 @@ namespace detail {
const size_t Idx = std::distance(std::begin(CacheRecord.ValueContentIds), It);
- CloudCacheResult BlobResult;
- for (int32_t Attempt = 0; Attempt < MaxAttempts && !BlobResult.Success; Attempt++)
- {
- BlobResult = Session.PutCompressedBlob(CacheRecord.ValueContentIds[Idx], Values[Idx]);
- }
+ OutBuffer = Values[Idx];
+ return true;
+ });
+ }
+ }
+ catch (std::exception& Err)
+ {
+ m_Status.Set(UpstreamEndpointState::kError, Err.what());
- m_Status.SetFromErrorCode(BlobResult.ErrorCode, BlobResult.Reason);
+ return {.Reason = std::string(Err.what()), .Success = false};
+ }
+ }
- if (!BlobResult.Success)
- {
- OutReason = fmt::format("upload value '{}' FAILED, reason '{}'", ValueContentId, BlobResult.Reason);
- return false;
- }
+ virtual UpstreamEndpointStats& Stats() override { return m_Stats; }
- TotalBytes += BlobResult.Bytes;
- TotalElapsedSeconds += BlobResult.ElapsedSeconds;
- }
+ private:
+ static void AppendResult(const CloudCacheResult& Result, GetUpstreamCacheResult& Out)
+ {
+ Out.Success &= Result.Success;
+ Out.Bytes += Result.Bytes;
+ Out.ElapsedSeconds += Result.ElapsedSeconds;
+
+ if (Result.ErrorCode)
+ {
+ Out.Error = {.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)};
+ }
+ };
- return true;
- };
+ PutUpstreamCacheResult PerformStructuredPut(
+ CloudCacheSession& Session,
+ const CacheKey& Key,
+ IoBuffer ObjectBuffer,
+ const int32_t MaxAttempts,
+ std::function<bool(const IoHash& ValueContentId, IoBuffer& OutBuffer, std::string& OutReason)>&& BlobFetchFn)
+ {
+ int64_t TotalBytes = 0ull;
+ double TotalElapsedSeconds = 0.0;
- PutRefResult RefResult;
- for (int32_t Attempt = 0; Attempt < MaxAttempts && !RefResult.Success; Attempt++)
+ const auto PutBlobs = [&](std::span<IoHash> ValueContentIds, std::string& OutReason) -> bool {
+ for (const IoHash& ValueContentId : ValueContentIds)
+ {
+ IoBuffer BlobBuffer;
+ if (!BlobFetchFn(ValueContentId, BlobBuffer, OutReason))
{
- RefResult = Session.PutRef(CacheRecord.Key.Bucket, CacheRecord.Key.Hash, RecordValue, ZenContentType::kCbObject);
+ return false;
}
- m_Status.SetFromErrorCode(RefResult.ErrorCode, RefResult.Reason);
-
- if (!RefResult.Success)
+ CloudCacheResult BlobResult;
+ for (int32_t Attempt = 0; Attempt < MaxAttempts && !BlobResult.Success; Attempt++)
{
- return {.Reason = fmt::format("upload cache record '{}/{}' FAILED, reason '{}'",
- CacheRecord.Key.Bucket,
- CacheRecord.Key.Hash,
- RefResult.Reason),
- .Success = false};
+ BlobResult = Session.PutCompressedBlob(ValueContentId, BlobBuffer);
}
- TotalBytes += RefResult.Bytes;
- TotalElapsedSeconds += RefResult.ElapsedSeconds;
+ m_Status.SetFromErrorCode(BlobResult.ErrorCode, BlobResult.Reason);
- std::string Reason;
- if (!PutBlobs(RefResult.Needs, Reason))
+ if (!BlobResult.Success)
{
- return {.Reason = std::move(Reason), .Success = false};
+ OutReason = fmt::format("upload value '{}' FAILED, reason '{}'", ValueContentId, BlobResult.Reason);
+ return false;
}
- const IoHash RefHash = IoHash::HashBuffer(RecordValue);
- FinalizeRefResult FinalizeResult = Session.FinalizeRef(CacheRecord.Key.Bucket, CacheRecord.Key.Hash, RefHash);
+ TotalBytes += BlobResult.Bytes;
+ TotalElapsedSeconds += BlobResult.ElapsedSeconds;
+ }
- m_Status.SetFromErrorCode(FinalizeResult.ErrorCode, FinalizeResult.Reason);
+ return true;
+ };
- if (!FinalizeResult.Success)
- {
- return {.Reason = fmt::format("finalize cache record '{}/{}' FAILED, reason '{}'",
- CacheRecord.Key.Bucket,
- CacheRecord.Key.Hash,
- FinalizeResult.Reason),
- .Success = false};
- }
-
- if (!FinalizeResult.Needs.empty())
- {
- if (!PutBlobs(FinalizeResult.Needs, Reason))
- {
- return {.Reason = std::move(Reason), .Success = false};
- }
+ PutRefResult RefResult;
+ for (int32_t Attempt = 0; Attempt < MaxAttempts && !RefResult.Success; Attempt++)
+ {
+ RefResult = Session.PutRef(Key.Bucket, Key.Hash, ObjectBuffer, ZenContentType::kCbObject);
+ }
- FinalizeResult = Session.FinalizeRef(CacheRecord.Key.Bucket, CacheRecord.Key.Hash, RefHash);
+ m_Status.SetFromErrorCode(RefResult.ErrorCode, RefResult.Reason);
- m_Status.SetFromErrorCode(FinalizeResult.ErrorCode, FinalizeResult.Reason);
+ if (!RefResult.Success)
+ {
+ return {.Reason = fmt::format("upload cache record '{}/{}' FAILED, reason '{}'", Key.Bucket, Key.Hash, RefResult.Reason),
+ .Success = false};
+ }
- if (!FinalizeResult.Success)
- {
- return {.Reason = fmt::format("finalize '{}/{}' FAILED, reason '{}'",
- CacheRecord.Key.Bucket,
- CacheRecord.Key.Hash,
- FinalizeResult.Reason),
- .Success = false};
- }
+ TotalBytes += RefResult.Bytes;
+ TotalElapsedSeconds += RefResult.ElapsedSeconds;
- if (!FinalizeResult.Needs.empty())
- {
- ExtendableStringBuilder<256> Sb;
- for (const IoHash& MissingHash : FinalizeResult.Needs)
- {
- Sb << MissingHash.ToHexString() << ",";
- }
+ std::string Reason;
+ if (!PutBlobs(RefResult.Needs, Reason))
+ {
+ return {.Reason = std::move(Reason), .Success = false};
+ }
- return {.Reason = fmt::format("finalize '{}/{}' FAILED, still needs value(s) '{}'",
- CacheRecord.Key.Bucket,
- CacheRecord.Key.Hash,
- Sb.ToString()),
- .Success = false};
- }
- }
+ const IoHash RefHash = IoHash::HashBuffer(ObjectBuffer);
+ FinalizeRefResult FinalizeResult = Session.FinalizeRef(Key.Bucket, Key.Hash, RefHash);
- TotalBytes += FinalizeResult.Bytes;
- TotalElapsedSeconds += FinalizeResult.ElapsedSeconds;
+ m_Status.SetFromErrorCode(FinalizeResult.ErrorCode, FinalizeResult.Reason);
- return {.Bytes = TotalBytes, .ElapsedSeconds = TotalElapsedSeconds, .Success = true};
- }
+ if (!FinalizeResult.Success)
+ {
+ return {
+ .Reason = fmt::format("finalize cache record '{}/{}' FAILED, reason '{}'", Key.Bucket, Key.Hash, FinalizeResult.Reason),
+ .Success = false};
}
- catch (std::exception& Err)
+
+ if (!FinalizeResult.Needs.empty())
{
- m_Status.Set(UpstreamEndpointState::kError, Err.what());
+ if (!PutBlobs(FinalizeResult.Needs, Reason))
+ {
+ return {.Reason = std::move(Reason), .Success = false};
+ }
- return {.Reason = std::string(Err.what()), .Success = false};
- }
- }
+ FinalizeResult = Session.FinalizeRef(Key.Bucket, Key.Hash, RefHash);
- virtual UpstreamEndpointStats& Stats() override { return m_Stats; }
+ m_Status.SetFromErrorCode(FinalizeResult.ErrorCode, FinalizeResult.Reason);
- private:
- static void AppendResult(const CloudCacheResult& Result, GetUpstreamCacheResult& Out)
- {
- Out.Success &= Result.Success;
- Out.Bytes += Result.Bytes;
- Out.ElapsedSeconds += Result.ElapsedSeconds;
+ if (!FinalizeResult.Success)
+ {
+ return {.Reason = fmt::format("finalize '{}/{}' FAILED, reason '{}'", Key.Bucket, Key.Hash, FinalizeResult.Reason),
+ .Success = false};
+ }
- if (Result.ErrorCode)
- {
- Out.Error = {.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)};
+ if (!FinalizeResult.Needs.empty())
+ {
+ ExtendableStringBuilder<256> Sb;
+ for (const IoHash& MissingHash : FinalizeResult.Needs)
+ {
+ Sb << MissingHash.ToHexString() << ",";
+ }
+
+ return {
+ .Reason = fmt::format("finalize '{}/{}' FAILED, still needs value(s) '{}'", Key.Bucket, Key.Hash, Sb.ToString()),
+ .Success = false};
+ }
}
- };
+
+ TotalBytes += FinalizeResult.Bytes;
+ TotalElapsedSeconds += FinalizeResult.ElapsedSeconds;
+
+ return {.Bytes = TotalBytes, .ElapsedSeconds = TotalElapsedSeconds, .Success = true};
+ }
spdlog::logger& Log() { return m_Log; }
@@ -918,7 +963,7 @@ namespace detail {
}
else
{
- return {.Reason = std::string("invalid value buffer"), .Success = false};
+ return {.Reason = std::string("Invalid value buffer"), .Success = false};
}
}
@@ -936,6 +981,56 @@ namespace detail {
TotalBytes = Result.Bytes;
TotalElapsedSeconds = Result.ElapsedSeconds;
}
+ else if (CacheRecord.Type == ZenContentType::kCompressedBinary)
+ {
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(RecordValue));
+ if (!Compressed)
+ {
+ return {.Reason = std::string("Invalid value compressed buffer"), .Success = false};
+ }
+
+ CbPackage BatchPackage;
+ CbObjectWriter BatchWriter;
+ BatchWriter << "Method"sv
+ << "PutCacheValues";
+
+ BatchWriter.BeginObject("Params"sv);
+ {
+ // DefaultPolicy unspecified and expected to be Default
+
+ BatchWriter.BeginArray("Requests"sv);
+ {
+ BatchWriter.BeginObject();
+ {
+ const CacheKey& Key = CacheRecord.Key;
+ BatchWriter.BeginObject("Key"sv);
+ {
+ BatchWriter << "Bucket"sv << Key.Bucket;
+ BatchWriter << "Hash"sv << Key.Hash;
+ }
+ BatchWriter.EndObject();
+ // Policy unspecified and expected to be Default
+ BatchWriter.AddBinaryAttachment("RawHash"sv, IoHash::FromBLAKE3(Compressed.GetRawHash()));
+ BatchPackage.AddAttachment(CbAttachment(Compressed));
+ }
+ BatchWriter.EndObject();
+ }
+ BatchWriter.EndArray();
+ }
+ BatchWriter.EndObject();
+ BatchPackage.SetObject(BatchWriter.Save());
+
+ Result.Success = false;
+ for (uint32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++)
+ {
+ Result = Session.InvokeRpc(BatchPackage);
+ }
+
+ m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason);
+
+ TotalBytes += Result.Bytes;
+ TotalElapsedSeconds += Result.ElapsedSeconds;
+ }
else
{
for (size_t Idx = 0, Count = Values.size(); Idx < Count; Idx++)