diff options
| author | zousar <[email protected]> | 2022-02-09 11:17:30 -0700 |
|---|---|---|
| committer | GitHub <[email protected]> | 2022-02-09 11:17:30 -0700 |
| commit | be7c0eb18e98f5edf0ec17b54cf27200c42920f9 (patch) | |
| tree | 2b56085e844a242128af008ff5bdc20f499c8ab1 /zenserver/upstream/upstreamcache.cpp | |
| parent | Simplify HandleRpcGetCacheChunks (#53) (diff) | |
| parent | prepare_commit to fix formatting (diff) | |
| download | zen-be7c0eb18e98f5edf0ec17b54cf27200c42920f9.tar.xz zen-be7c0eb18e98f5edf0ec17b54cf27200c42920f9.zip | |
Merge pull request #52 from EpicGames/ValuePropagationFix
Change Value propagation to Zen or Jupiter
Diffstat (limited to 'zenserver/upstream/upstreamcache.cpp')
| -rw-r--r-- | zenserver/upstream/upstreamcache.cpp | 297 |
1 files changed, 196 insertions, 101 deletions
diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index d02a00fcd..5e0f6a297 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -402,14 +402,45 @@ namespace detail { .ElapsedSeconds = Result.ElapsedSeconds, .Success = Result.Success}; } - else + else if (CacheRecord.Type == ZenContentType::kCompressedBinary) { - int64_t TotalBytes = 0ull; - double TotalElapsedSeconds = 0.0; + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(RecordValue)); + if (!Compressed) + { + return {.Reason = std::string("Invalid compressed value buffer"), .Success = false}; + } - const auto PutBlobs = [&](std::span<IoHash> ValueContentIds, std::string& OutReason) -> bool { - for (const IoHash& ValueContentId : ValueContentIds) - { + IoHash RawHash = IoHash::FromBLAKE3(Compressed.GetRawHash()); + + CbObjectWriter ReferencingObject; + ReferencingObject.AddBinaryAttachment("RawHash", RawHash); + ReferencingObject.AddInteger("RawSize", Compressed.GetRawSize()); + + return PerformStructuredPut( + Session, + CacheRecord.Key, + ReferencingObject.Save().GetBuffer().AsIoBuffer(), + MaxAttempts, + [&](const IoHash& ValueContentId, IoBuffer& OutBuffer, std::string& OutReason) { + if (ValueContentId != RawHash) + { + OutReason = + fmt::format("Value '{}' MISMATCHED from compressed buffer raw hash {}", ValueContentId, RawHash); + return false; + } + + OutBuffer = RecordValue; + return true; + }); + } + else + { + return PerformStructuredPut( + Session, + CacheRecord.Key, + RecordValue, + MaxAttempts, + [&](const IoHash& ValueContentId, IoBuffer& OutBuffer, std::string& OutReason) { const auto It = std::find(std::begin(CacheRecord.ValueContentIds), std::end(CacheRecord.ValueContentIds), ValueContentId); @@ -421,131 +452,145 @@ namespace detail { const size_t Idx = std::distance(std::begin(CacheRecord.ValueContentIds), It); - CloudCacheResult BlobResult; - for (int32_t Attempt = 0; Attempt < MaxAttempts && !BlobResult.Success; Attempt++) - { - BlobResult = Session.PutCompressedBlob(CacheRecord.ValueContentIds[Idx], Values[Idx]); - } + OutBuffer = Values[Idx]; + return true; + }); + } + } + catch (std::exception& Err) + { + m_Status.Set(UpstreamEndpointState::kError, Err.what()); - m_Status.SetFromErrorCode(BlobResult.ErrorCode, BlobResult.Reason); + return {.Reason = std::string(Err.what()), .Success = false}; + } + } - if (!BlobResult.Success) - { - OutReason = fmt::format("upload value '{}' FAILED, reason '{}'", ValueContentId, BlobResult.Reason); - return false; - } + virtual UpstreamEndpointStats& Stats() override { return m_Stats; } - TotalBytes += BlobResult.Bytes; - TotalElapsedSeconds += BlobResult.ElapsedSeconds; - } + private: + static void AppendResult(const CloudCacheResult& Result, GetUpstreamCacheResult& Out) + { + Out.Success &= Result.Success; + Out.Bytes += Result.Bytes; + Out.ElapsedSeconds += Result.ElapsedSeconds; + + if (Result.ErrorCode) + { + Out.Error = {.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}; + } + }; - return true; - }; + PutUpstreamCacheResult PerformStructuredPut( + CloudCacheSession& Session, + const CacheKey& Key, + IoBuffer ObjectBuffer, + const int32_t MaxAttempts, + std::function<bool(const IoHash& ValueContentId, IoBuffer& OutBuffer, std::string& OutReason)>&& BlobFetchFn) + { + int64_t TotalBytes = 0ull; + double TotalElapsedSeconds = 0.0; - PutRefResult RefResult; - for (int32_t Attempt = 0; Attempt < MaxAttempts && !RefResult.Success; Attempt++) + const auto PutBlobs = [&](std::span<IoHash> ValueContentIds, std::string& OutReason) -> bool { + for (const IoHash& ValueContentId : ValueContentIds) + { + IoBuffer BlobBuffer; + if (!BlobFetchFn(ValueContentId, BlobBuffer, OutReason)) { - RefResult = Session.PutRef(CacheRecord.Key.Bucket, CacheRecord.Key.Hash, RecordValue, ZenContentType::kCbObject); + return false; } - m_Status.SetFromErrorCode(RefResult.ErrorCode, RefResult.Reason); - - if (!RefResult.Success) + CloudCacheResult BlobResult; + for (int32_t Attempt = 0; Attempt < MaxAttempts && !BlobResult.Success; Attempt++) { - return {.Reason = fmt::format("upload cache record '{}/{}' FAILED, reason '{}'", - CacheRecord.Key.Bucket, - CacheRecord.Key.Hash, - RefResult.Reason), - .Success = false}; + BlobResult = Session.PutCompressedBlob(ValueContentId, BlobBuffer); } - TotalBytes += RefResult.Bytes; - TotalElapsedSeconds += RefResult.ElapsedSeconds; + m_Status.SetFromErrorCode(BlobResult.ErrorCode, BlobResult.Reason); - std::string Reason; - if (!PutBlobs(RefResult.Needs, Reason)) + if (!BlobResult.Success) { - return {.Reason = std::move(Reason), .Success = false}; + OutReason = fmt::format("upload value '{}' FAILED, reason '{}'", ValueContentId, BlobResult.Reason); + return false; } - const IoHash RefHash = IoHash::HashBuffer(RecordValue); - FinalizeRefResult FinalizeResult = Session.FinalizeRef(CacheRecord.Key.Bucket, CacheRecord.Key.Hash, RefHash); + TotalBytes += BlobResult.Bytes; + TotalElapsedSeconds += BlobResult.ElapsedSeconds; + } - m_Status.SetFromErrorCode(FinalizeResult.ErrorCode, FinalizeResult.Reason); + return true; + }; - if (!FinalizeResult.Success) - { - return {.Reason = fmt::format("finalize cache record '{}/{}' FAILED, reason '{}'", - CacheRecord.Key.Bucket, - CacheRecord.Key.Hash, - FinalizeResult.Reason), - .Success = false}; - } - - if (!FinalizeResult.Needs.empty()) - { - if (!PutBlobs(FinalizeResult.Needs, Reason)) - { - return {.Reason = std::move(Reason), .Success = false}; - } + PutRefResult RefResult; + for (int32_t Attempt = 0; Attempt < MaxAttempts && !RefResult.Success; Attempt++) + { + RefResult = Session.PutRef(Key.Bucket, Key.Hash, ObjectBuffer, ZenContentType::kCbObject); + } - FinalizeResult = Session.FinalizeRef(CacheRecord.Key.Bucket, CacheRecord.Key.Hash, RefHash); + m_Status.SetFromErrorCode(RefResult.ErrorCode, RefResult.Reason); - m_Status.SetFromErrorCode(FinalizeResult.ErrorCode, FinalizeResult.Reason); + if (!RefResult.Success) + { + return {.Reason = fmt::format("upload cache record '{}/{}' FAILED, reason '{}'", Key.Bucket, Key.Hash, RefResult.Reason), + .Success = false}; + } - if (!FinalizeResult.Success) - { - return {.Reason = fmt::format("finalize '{}/{}' FAILED, reason '{}'", - CacheRecord.Key.Bucket, - CacheRecord.Key.Hash, - FinalizeResult.Reason), - .Success = false}; - } + TotalBytes += RefResult.Bytes; + TotalElapsedSeconds += RefResult.ElapsedSeconds; - if (!FinalizeResult.Needs.empty()) - { - ExtendableStringBuilder<256> Sb; - for (const IoHash& MissingHash : FinalizeResult.Needs) - { - Sb << MissingHash.ToHexString() << ","; - } + std::string Reason; + if (!PutBlobs(RefResult.Needs, Reason)) + { + return {.Reason = std::move(Reason), .Success = false}; + } - return {.Reason = fmt::format("finalize '{}/{}' FAILED, still needs value(s) '{}'", - CacheRecord.Key.Bucket, - CacheRecord.Key.Hash, - Sb.ToString()), - .Success = false}; - } - } + const IoHash RefHash = IoHash::HashBuffer(ObjectBuffer); + FinalizeRefResult FinalizeResult = Session.FinalizeRef(Key.Bucket, Key.Hash, RefHash); - TotalBytes += FinalizeResult.Bytes; - TotalElapsedSeconds += FinalizeResult.ElapsedSeconds; + m_Status.SetFromErrorCode(FinalizeResult.ErrorCode, FinalizeResult.Reason); - return {.Bytes = TotalBytes, .ElapsedSeconds = TotalElapsedSeconds, .Success = true}; - } + if (!FinalizeResult.Success) + { + return { + .Reason = fmt::format("finalize cache record '{}/{}' FAILED, reason '{}'", Key.Bucket, Key.Hash, FinalizeResult.Reason), + .Success = false}; } - catch (std::exception& Err) + + if (!FinalizeResult.Needs.empty()) { - m_Status.Set(UpstreamEndpointState::kError, Err.what()); + if (!PutBlobs(FinalizeResult.Needs, Reason)) + { + return {.Reason = std::move(Reason), .Success = false}; + } - return {.Reason = std::string(Err.what()), .Success = false}; - } - } + FinalizeResult = Session.FinalizeRef(Key.Bucket, Key.Hash, RefHash); - virtual UpstreamEndpointStats& Stats() override { return m_Stats; } + m_Status.SetFromErrorCode(FinalizeResult.ErrorCode, FinalizeResult.Reason); - private: - static void AppendResult(const CloudCacheResult& Result, GetUpstreamCacheResult& Out) - { - Out.Success &= Result.Success; - Out.Bytes += Result.Bytes; - Out.ElapsedSeconds += Result.ElapsedSeconds; + if (!FinalizeResult.Success) + { + return {.Reason = fmt::format("finalize '{}/{}' FAILED, reason '{}'", Key.Bucket, Key.Hash, FinalizeResult.Reason), + .Success = false}; + } - if (Result.ErrorCode) - { - Out.Error = {.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}; + if (!FinalizeResult.Needs.empty()) + { + ExtendableStringBuilder<256> Sb; + for (const IoHash& MissingHash : FinalizeResult.Needs) + { + Sb << MissingHash.ToHexString() << ","; + } + + return { + .Reason = fmt::format("finalize '{}/{}' FAILED, still needs value(s) '{}'", Key.Bucket, Key.Hash, Sb.ToString()), + .Success = false}; + } } - }; + + TotalBytes += FinalizeResult.Bytes; + TotalElapsedSeconds += FinalizeResult.ElapsedSeconds; + + return {.Bytes = TotalBytes, .ElapsedSeconds = TotalElapsedSeconds, .Success = true}; + } spdlog::logger& Log() { return m_Log; } @@ -918,7 +963,7 @@ namespace detail { } else { - return {.Reason = std::string("invalid value buffer"), .Success = false}; + return {.Reason = std::string("Invalid value buffer"), .Success = false}; } } @@ -936,6 +981,56 @@ namespace detail { TotalBytes = Result.Bytes; TotalElapsedSeconds = Result.ElapsedSeconds; } + else if (CacheRecord.Type == ZenContentType::kCompressedBinary) + { + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(RecordValue)); + if (!Compressed) + { + return {.Reason = std::string("Invalid value compressed buffer"), .Success = false}; + } + + CbPackage BatchPackage; + CbObjectWriter BatchWriter; + BatchWriter << "Method"sv + << "PutCacheValues"; + + BatchWriter.BeginObject("Params"sv); + { + // DefaultPolicy unspecified and expected to be Default + + BatchWriter.BeginArray("Requests"sv); + { + BatchWriter.BeginObject(); + { + const CacheKey& Key = CacheRecord.Key; + BatchWriter.BeginObject("Key"sv); + { + BatchWriter << "Bucket"sv << Key.Bucket; + BatchWriter << "Hash"sv << Key.Hash; + } + BatchWriter.EndObject(); + // Policy unspecified and expected to be Default + BatchWriter.AddBinaryAttachment("RawHash"sv, IoHash::FromBLAKE3(Compressed.GetRawHash())); + BatchPackage.AddAttachment(CbAttachment(Compressed)); + } + BatchWriter.EndObject(); + } + BatchWriter.EndArray(); + } + BatchWriter.EndObject(); + BatchPackage.SetObject(BatchWriter.Save()); + + Result.Success = false; + for (uint32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) + { + Result = Session.InvokeRpc(BatchPackage); + } + + m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason); + + TotalBytes += Result.Bytes; + TotalElapsedSeconds += Result.ElapsedSeconds; + } else { for (size_t Idx = 0, Count = Values.size(); Idx < Count; Idx++) |