diff options
| author | zousar <[email protected]> | 2022-02-09 11:17:30 -0700 |
|---|---|---|
| committer | GitHub <[email protected]> | 2022-02-09 11:17:30 -0700 |
| commit | be7c0eb18e98f5edf0ec17b54cf27200c42920f9 (patch) | |
| tree | 2b56085e844a242128af008ff5bdc20f499c8ab1 | |
| parent | Simplify HandleRpcGetCacheChunks (#53) (diff) | |
| parent | prepare_commit to fix formatting (diff) | |
| download | zen-be7c0eb18e98f5edf0ec17b54cf27200c42920f9.tar.xz zen-be7c0eb18e98f5edf0ec17b54cf27200c42920f9.zip | |
Merge pull request #52 from EpicGames/ValuePropagationFix
Change Value propagation to Zen or Jupiter
| -rw-r--r-- | zenhttp/httpasio.cpp | 20 | ||||
| -rw-r--r-- | zenserver/cache/structuredcache.cpp | 2 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamcache.cpp | 297 | ||||
| -rw-r--r-- | zenserver/upstream/zen.cpp | 30 | ||||
| -rw-r--r-- | zenserver/upstream/zen.h | 1 |
5 files changed, 242 insertions, 108 deletions
diff --git a/zenhttp/httpasio.cpp b/zenhttp/httpasio.cpp index 318f47eff..45994bb67 100644 --- a/zenhttp/httpasio.cpp +++ b/zenhttp/httpasio.cpp @@ -949,10 +949,18 @@ struct HttpAcceptor // This must be used by both the client and server side, and is only effective in the absence of // Windows Filtering Platform (WFP) callouts which can be installed by security software. // https://docs.microsoft.com/en-us/windows/win32/winsock/sio-loopback-fast-path - SOCKET NativeSocket = m_Acceptor.native_handle(); - int LoopbackOptionValue = 1; - DWORD OptionNumberOfBytesReturned = 0; - WSAIoctl(NativeSocket, SIO_LOOPBACK_FAST_PATH, &LoopbackOptionValue, sizeof(LoopbackOptionValue), NULL, 0, &OptionNumberOfBytesReturned, 0, 0); + SOCKET NativeSocket = m_Acceptor.native_handle(); + int LoopbackOptionValue = 1; + DWORD OptionNumberOfBytesReturned = 0; + WSAIoctl(NativeSocket, + SIO_LOOPBACK_FAST_PATH, + &LoopbackOptionValue, + sizeof(LoopbackOptionValue), + NULL, + 0, + &OptionNumberOfBytesReturned, + 0, + 0); #endif m_Acceptor.listen(); } @@ -983,8 +991,8 @@ struct HttpAcceptor // reference to the callbacks. Socket->set_option(asio::ip::tcp::no_delay(true)); - Socket->set_option(asio::socket_base::receive_buffer_size(128*1024)); - Socket->set_option(asio::socket_base::send_buffer_size(256*1024)); + Socket->set_option(asio::socket_base::receive_buffer_size(128 * 1024)); + Socket->set_option(asio::socket_base::send_buffer_size(256 * 1024)); auto Conn = std::make_shared<HttpServerConnection>(m_Server, std::move(Socket)); Conn->HandleNewRequest(); diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index 3d5359188..037092eb6 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -1381,7 +1381,7 @@ HttpStructuredCacheService::HandleRpcPutCacheValues(zen::HttpServerRequest& Requ if (Succeeded && EnumHasAllFlags(Policy, CachePolicy::StoreRemote)) { - m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kBinary, .Key = Key}); + m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCompressedBinary, .Key = Key}); } Results.push_back(Succeeded); ZEN_DEBUG("PUTCACHEVALUES - '{}/{}' {}, '{}'", Key.Bucket, Key.Hash, NiceBytes(TransferredSize), Succeeded ? "Added"sv : "Invalid"); diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index d02a00fcd..5e0f6a297 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -402,14 +402,45 @@ namespace detail { .ElapsedSeconds = Result.ElapsedSeconds, .Success = Result.Success}; } - else + else if (CacheRecord.Type == ZenContentType::kCompressedBinary) { - int64_t TotalBytes = 0ull; - double TotalElapsedSeconds = 0.0; + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(RecordValue)); + if (!Compressed) + { + return {.Reason = std::string("Invalid compressed value buffer"), .Success = false}; + } - const auto PutBlobs = [&](std::span<IoHash> ValueContentIds, std::string& OutReason) -> bool { - for (const IoHash& ValueContentId : ValueContentIds) - { + IoHash RawHash = IoHash::FromBLAKE3(Compressed.GetRawHash()); + + CbObjectWriter ReferencingObject; + ReferencingObject.AddBinaryAttachment("RawHash", RawHash); + ReferencingObject.AddInteger("RawSize", Compressed.GetRawSize()); + + return PerformStructuredPut( + Session, + CacheRecord.Key, + ReferencingObject.Save().GetBuffer().AsIoBuffer(), + MaxAttempts, + [&](const IoHash& ValueContentId, IoBuffer& OutBuffer, std::string& OutReason) { + if (ValueContentId != RawHash) + { + OutReason = + fmt::format("Value '{}' MISMATCHED from compressed buffer raw hash {}", ValueContentId, RawHash); + return false; + } + + OutBuffer = RecordValue; + return true; + }); + } + else + { + return PerformStructuredPut( + Session, + CacheRecord.Key, + RecordValue, + MaxAttempts, + [&](const IoHash& ValueContentId, IoBuffer& OutBuffer, std::string& OutReason) { const auto It = std::find(std::begin(CacheRecord.ValueContentIds), std::end(CacheRecord.ValueContentIds), ValueContentId); @@ -421,131 +452,145 @@ namespace detail { const size_t Idx = std::distance(std::begin(CacheRecord.ValueContentIds), It); - CloudCacheResult BlobResult; - for (int32_t Attempt = 0; Attempt < MaxAttempts && !BlobResult.Success; Attempt++) - { - BlobResult = Session.PutCompressedBlob(CacheRecord.ValueContentIds[Idx], Values[Idx]); - } + OutBuffer = Values[Idx]; + return true; + }); + } + } + catch (std::exception& Err) + { + m_Status.Set(UpstreamEndpointState::kError, Err.what()); - m_Status.SetFromErrorCode(BlobResult.ErrorCode, BlobResult.Reason); + return {.Reason = std::string(Err.what()), .Success = false}; + } + } - if (!BlobResult.Success) - { - OutReason = fmt::format("upload value '{}' FAILED, reason '{}'", ValueContentId, BlobResult.Reason); - return false; - } + virtual UpstreamEndpointStats& Stats() override { return m_Stats; } - TotalBytes += BlobResult.Bytes; - TotalElapsedSeconds += BlobResult.ElapsedSeconds; - } + private: + static void AppendResult(const CloudCacheResult& Result, GetUpstreamCacheResult& Out) + { + Out.Success &= Result.Success; + Out.Bytes += Result.Bytes; + Out.ElapsedSeconds += Result.ElapsedSeconds; + + if (Result.ErrorCode) + { + Out.Error = {.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}; + } + }; - return true; - }; + PutUpstreamCacheResult PerformStructuredPut( + CloudCacheSession& Session, + const CacheKey& Key, + IoBuffer ObjectBuffer, + const int32_t MaxAttempts, + std::function<bool(const IoHash& ValueContentId, IoBuffer& OutBuffer, std::string& OutReason)>&& BlobFetchFn) + { + int64_t TotalBytes = 0ull; + double TotalElapsedSeconds = 0.0; - PutRefResult RefResult; - for (int32_t Attempt = 0; Attempt < MaxAttempts && !RefResult.Success; Attempt++) + const auto PutBlobs = [&](std::span<IoHash> ValueContentIds, std::string& OutReason) -> bool { + for (const IoHash& ValueContentId : ValueContentIds) + { + IoBuffer BlobBuffer; + if (!BlobFetchFn(ValueContentId, BlobBuffer, OutReason)) { - RefResult = Session.PutRef(CacheRecord.Key.Bucket, CacheRecord.Key.Hash, RecordValue, ZenContentType::kCbObject); + return false; } - m_Status.SetFromErrorCode(RefResult.ErrorCode, RefResult.Reason); - - if (!RefResult.Success) + CloudCacheResult BlobResult; + for (int32_t Attempt = 0; Attempt < MaxAttempts && !BlobResult.Success; Attempt++) { - return {.Reason = fmt::format("upload cache record '{}/{}' FAILED, reason '{}'", - CacheRecord.Key.Bucket, - CacheRecord.Key.Hash, - RefResult.Reason), - .Success = false}; + BlobResult = Session.PutCompressedBlob(ValueContentId, BlobBuffer); } - TotalBytes += RefResult.Bytes; - TotalElapsedSeconds += RefResult.ElapsedSeconds; + m_Status.SetFromErrorCode(BlobResult.ErrorCode, BlobResult.Reason); - std::string Reason; - if (!PutBlobs(RefResult.Needs, Reason)) + if (!BlobResult.Success) { - return {.Reason = std::move(Reason), .Success = false}; + OutReason = fmt::format("upload value '{}' FAILED, reason '{}'", ValueContentId, BlobResult.Reason); + return false; } - const IoHash RefHash = IoHash::HashBuffer(RecordValue); - FinalizeRefResult FinalizeResult = Session.FinalizeRef(CacheRecord.Key.Bucket, CacheRecord.Key.Hash, RefHash); + TotalBytes += BlobResult.Bytes; + TotalElapsedSeconds += BlobResult.ElapsedSeconds; + } - m_Status.SetFromErrorCode(FinalizeResult.ErrorCode, FinalizeResult.Reason); + return true; + }; - if (!FinalizeResult.Success) - { - return {.Reason = fmt::format("finalize cache record '{}/{}' FAILED, reason '{}'", - CacheRecord.Key.Bucket, - CacheRecord.Key.Hash, - FinalizeResult.Reason), - .Success = false}; - } - - if (!FinalizeResult.Needs.empty()) - { - if (!PutBlobs(FinalizeResult.Needs, Reason)) - { - return {.Reason = std::move(Reason), .Success = false}; - } + PutRefResult RefResult; + for (int32_t Attempt = 0; Attempt < MaxAttempts && !RefResult.Success; Attempt++) + { + RefResult = Session.PutRef(Key.Bucket, Key.Hash, ObjectBuffer, ZenContentType::kCbObject); + } - FinalizeResult = Session.FinalizeRef(CacheRecord.Key.Bucket, CacheRecord.Key.Hash, RefHash); + m_Status.SetFromErrorCode(RefResult.ErrorCode, RefResult.Reason); - m_Status.SetFromErrorCode(FinalizeResult.ErrorCode, FinalizeResult.Reason); + if (!RefResult.Success) + { + return {.Reason = fmt::format("upload cache record '{}/{}' FAILED, reason '{}'", Key.Bucket, Key.Hash, RefResult.Reason), + .Success = false}; + } - if (!FinalizeResult.Success) - { - return {.Reason = fmt::format("finalize '{}/{}' FAILED, reason '{}'", - CacheRecord.Key.Bucket, - CacheRecord.Key.Hash, - FinalizeResult.Reason), - .Success = false}; - } + TotalBytes += RefResult.Bytes; + TotalElapsedSeconds += RefResult.ElapsedSeconds; - if (!FinalizeResult.Needs.empty()) - { - ExtendableStringBuilder<256> Sb; - for (const IoHash& MissingHash : FinalizeResult.Needs) - { - Sb << MissingHash.ToHexString() << ","; - } + std::string Reason; + if (!PutBlobs(RefResult.Needs, Reason)) + { + return {.Reason = std::move(Reason), .Success = false}; + } - return {.Reason = fmt::format("finalize '{}/{}' FAILED, still needs value(s) '{}'", - CacheRecord.Key.Bucket, - CacheRecord.Key.Hash, - Sb.ToString()), - .Success = false}; - } - } + const IoHash RefHash = IoHash::HashBuffer(ObjectBuffer); + FinalizeRefResult FinalizeResult = Session.FinalizeRef(Key.Bucket, Key.Hash, RefHash); - TotalBytes += FinalizeResult.Bytes; - TotalElapsedSeconds += FinalizeResult.ElapsedSeconds; + m_Status.SetFromErrorCode(FinalizeResult.ErrorCode, FinalizeResult.Reason); - return {.Bytes = TotalBytes, .ElapsedSeconds = TotalElapsedSeconds, .Success = true}; - } + if (!FinalizeResult.Success) + { + return { + .Reason = fmt::format("finalize cache record '{}/{}' FAILED, reason '{}'", Key.Bucket, Key.Hash, FinalizeResult.Reason), + .Success = false}; } - catch (std::exception& Err) + + if (!FinalizeResult.Needs.empty()) { - m_Status.Set(UpstreamEndpointState::kError, Err.what()); + if (!PutBlobs(FinalizeResult.Needs, Reason)) + { + return {.Reason = std::move(Reason), .Success = false}; + } - return {.Reason = std::string(Err.what()), .Success = false}; - } - } + FinalizeResult = Session.FinalizeRef(Key.Bucket, Key.Hash, RefHash); - virtual UpstreamEndpointStats& Stats() override { return m_Stats; } + m_Status.SetFromErrorCode(FinalizeResult.ErrorCode, FinalizeResult.Reason); - private: - static void AppendResult(const CloudCacheResult& Result, GetUpstreamCacheResult& Out) - { - Out.Success &= Result.Success; - Out.Bytes += Result.Bytes; - Out.ElapsedSeconds += Result.ElapsedSeconds; + if (!FinalizeResult.Success) + { + return {.Reason = fmt::format("finalize '{}/{}' FAILED, reason '{}'", Key.Bucket, Key.Hash, FinalizeResult.Reason), + .Success = false}; + } - if (Result.ErrorCode) - { - Out.Error = {.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}; + if (!FinalizeResult.Needs.empty()) + { + ExtendableStringBuilder<256> Sb; + for (const IoHash& MissingHash : FinalizeResult.Needs) + { + Sb << MissingHash.ToHexString() << ","; + } + + return { + .Reason = fmt::format("finalize '{}/{}' FAILED, still needs value(s) '{}'", Key.Bucket, Key.Hash, Sb.ToString()), + .Success = false}; + } } - }; + + TotalBytes += FinalizeResult.Bytes; + TotalElapsedSeconds += FinalizeResult.ElapsedSeconds; + + return {.Bytes = TotalBytes, .ElapsedSeconds = TotalElapsedSeconds, .Success = true}; + } spdlog::logger& Log() { return m_Log; } @@ -918,7 +963,7 @@ namespace detail { } else { - return {.Reason = std::string("invalid value buffer"), .Success = false}; + return {.Reason = std::string("Invalid value buffer"), .Success = false}; } } @@ -936,6 +981,56 @@ namespace detail { TotalBytes = Result.Bytes; TotalElapsedSeconds = Result.ElapsedSeconds; } + else if (CacheRecord.Type == ZenContentType::kCompressedBinary) + { + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(RecordValue)); + if (!Compressed) + { + return {.Reason = std::string("Invalid value compressed buffer"), .Success = false}; + } + + CbPackage BatchPackage; + CbObjectWriter BatchWriter; + BatchWriter << "Method"sv + << "PutCacheValues"; + + BatchWriter.BeginObject("Params"sv); + { + // DefaultPolicy unspecified and expected to be Default + + BatchWriter.BeginArray("Requests"sv); + { + BatchWriter.BeginObject(); + { + const CacheKey& Key = CacheRecord.Key; + BatchWriter.BeginObject("Key"sv); + { + BatchWriter << "Bucket"sv << Key.Bucket; + BatchWriter << "Hash"sv << Key.Hash; + } + BatchWriter.EndObject(); + // Policy unspecified and expected to be Default + BatchWriter.AddBinaryAttachment("RawHash"sv, IoHash::FromBLAKE3(Compressed.GetRawHash())); + BatchPackage.AddAttachment(CbAttachment(Compressed)); + } + BatchWriter.EndObject(); + } + BatchWriter.EndArray(); + } + BatchWriter.EndObject(); + BatchPackage.SetObject(BatchWriter.Save()); + + Result.Success = false; + for (uint32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) + { + Result = Session.InvokeRpc(BatchPackage); + } + + m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason); + + TotalBytes += Result.Bytes; + TotalElapsedSeconds += Result.ElapsedSeconds; + } else { for (size_t Idx = 0, Count = Values.size(); Idx < Count; Idx++) diff --git a/zenserver/upstream/zen.cpp b/zenserver/upstream/zen.cpp index 0570dd316..098f53bb0 100644 --- a/zenserver/upstream/zen.cpp +++ b/zenserver/upstream/zen.cpp @@ -3,10 +3,12 @@ #include "zen.h" #include <zencore/compactbinarybuilder.h> +#include <zencore/compactbinarypackage.h> #include <zencore/compactbinaryvalidation.h> #include <zencore/fmtutils.h> #include <zencore/session.h> #include <zencore/stream.h> +#include <zenhttp/httpshared.h> #include "cache/structuredcachestore.h" #include "diag/formatters.h" @@ -539,4 +541,32 @@ ZenStructuredCacheSession::InvokeRpc(const CbObjectView& Request) return {.Response = std::move(Buffer), .Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success}; } +ZenCacheResult +ZenStructuredCacheSession::InvokeRpc(const CbPackage& Request) +{ + ExtendableStringBuilder<256> Uri; + Uri << m_Client.ServiceUrl() << "/z$/$rpc"; + + SharedBuffer Message = FormatPackageMessageBuffer(Request).Flatten(); + + cpr::Session& Session = m_SessionState->GetSession(); + + Session.SetOption(cpr::Url{Uri.c_str()}); + Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}}); + Session.SetBody(cpr::Body{reinterpret_cast<const char*>(Message.GetData()), Message.GetSize()}); + + cpr::Response Response = Session.Post(); + ZEN_DEBUG("POST {}", Response); + + if (Response.error) + { + return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = std::move(Response.error.message)}; + } + + const bool Success = Response.status_code == 200; + const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); + + return {.Response = std::move(Buffer), .Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success}; +} + } // namespace zen diff --git a/zenserver/upstream/zen.h b/zenserver/upstream/zen.h index bc8fd3c56..f70d9d06f 100644 --- a/zenserver/upstream/zen.h +++ b/zenserver/upstream/zen.h @@ -133,6 +133,7 @@ public: ZenCacheResult PutCacheRecord(std::string_view BucketId, const IoHash& Key, IoBuffer Value, ZenContentType Type); ZenCacheResult PutCacheValue(std::string_view BucketId, const IoHash& Key, const IoHash& ValueContentId, IoBuffer Payload); ZenCacheResult InvokeRpc(const CbObjectView& Request); + ZenCacheResult InvokeRpc(const CbPackage& Package); private: inline spdlog::logger& Log() { return m_Log; } |