diff options
Diffstat (limited to 'zenserver/upstream')
| -rw-r--r-- | zenserver/upstream/jupiter.cpp | 41 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamapply.cpp | 5 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamapply.h | 6 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamcache.cpp | 72 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamcache.h | 4 | ||||
| -rw-r--r-- | zenserver/upstream/zen.h | 6 |
6 files changed, 76 insertions, 58 deletions
diff --git a/zenserver/upstream/jupiter.cpp b/zenserver/upstream/jupiter.cpp index f9be068ec..c3480ff56 100644 --- a/zenserver/upstream/jupiter.cpp +++ b/zenserver/upstream/jupiter.cpp @@ -2,7 +2,6 @@ #include "jupiter.h" -#include "cache/structuredcachestore.h" #include "diag/formatters.h" #include "diag/logging.h" @@ -58,7 +57,7 @@ namespace detail { cpr::Session& GetSession() { return Session; } private: - friend class CloudCacheClient; + friend class zen::CloudCacheClient; CloudCacheClient& OwnerClient; CloudCacheAccessToken AccessToken; @@ -119,7 +118,7 @@ CloudCacheSession::GetDerivedData(std::string_view BucketId, std::string_view Ke return {.Response = Buffer, .Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, - .ErrorCode = !Success ? Response.status_code : 0, + .ErrorCode = !Success ? int32_t(Response.status_code) : 0, .Success = Success}; } @@ -168,7 +167,7 @@ CloudCacheSession::GetRef(std::string_view BucketId, const IoHash& Key, ZenConte return {.Response = Buffer, .Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, - .ErrorCode = !Success ? Response.status_code : 0, + .ErrorCode = !Success ? int32_t(Response.status_code) : 0, .Success = Success}; } @@ -209,7 +208,7 @@ CloudCacheSession::GetBlob(const IoHash& Key) return {.Response = Buffer, .Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, - .ErrorCode = !Success ? Response.status_code : 0, + .ErrorCode = !Success ? int32_t(Response.status_code) : 0, .Success = Success}; } @@ -249,7 +248,7 @@ CloudCacheSession::GetCompressedBlob(const IoHash& Key) return {.Response = Buffer, .Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, - .ErrorCode = !Success ? Response.status_code : 0, + .ErrorCode = !Success ? int32_t(Response.status_code) : 0, .Success = Success}; } @@ -289,7 +288,7 @@ CloudCacheSession::GetObject(const IoHash& Key) return {.Response = Buffer, .Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, - .ErrorCode = !Success ? Response.status_code : 0, + .ErrorCode = !Success ? int32_t(Response.status_code) : 0, .Success = Success}; } @@ -331,7 +330,7 @@ CloudCacheSession::PutDerivedData(std::string_view BucketId, std::string_view Ke return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, - .ErrorCode = !Success ? Response.status_code : 0, + .ErrorCode = !Success ? int32_t(Response.status_code) : 0, .Success = Success}; } @@ -388,7 +387,7 @@ CloudCacheSession::PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer PutRefResult Result; Result.Success = (Response.status_code == 200 || Response.status_code == 201); - Result.ErrorCode = !Result.Success ? Response.status_code : 0; + Result.ErrorCode = !Result.Success ? int32_t(Response.status_code) : 0; Result.Bytes = Response.uploaded_bytes; Result.ElapsedSeconds = Response.elapsed; @@ -453,7 +452,7 @@ CloudCacheSession::FinalizeRef(std::string_view BucketId, const IoHash& Key, con FinalizeRefResult Result; Result.Success = (Response.status_code == 200 || Response.status_code == 201); - Result.ErrorCode = !Result.Success ? Response.status_code : 0; + Result.ErrorCode = !Result.Success ? int32_t(Response.status_code) : 0; Result.Bytes = Response.uploaded_bytes; Result.ElapsedSeconds = Response.elapsed; @@ -508,7 +507,7 @@ CloudCacheSession::PutBlob(const IoHash& Key, IoBuffer Blob) return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, - .ErrorCode = !Success ? Response.status_code : 0, + .ErrorCode = !Success ? int32_t(Response.status_code) : 0, .Success = Success}; } @@ -546,7 +545,7 @@ CloudCacheSession::PutCompressedBlob(const IoHash& Key, IoBuffer Blob) return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, - .ErrorCode = !Success ? Response.status_code : 0, + .ErrorCode = !Success ? int32_t(Response.status_code) : 0, .Success = Success}; } @@ -584,7 +583,7 @@ CloudCacheSession::PutObject(const IoHash& Key, IoBuffer Object) return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, - .ErrorCode = !Success ? Response.status_code : 0, + .ErrorCode = !Success ? int32_t(Response.status_code) : 0, .Success = Success}; } @@ -621,7 +620,11 @@ CloudCacheSession::RefExists(std::string_view BucketId, const IoHash& Key) const bool Success = Response.status_code == 200; - return {.ElapsedSeconds = Response.elapsed, .ErrorCode = !Success ? Response.status_code : 0, .Success = Success}; + return { + .ElapsedSeconds = Response.elapsed, + .ErrorCode = !Success ? int32_t(Response.status_code) : 0, + .Success = Success + }; } CloudCacheResult @@ -692,7 +695,11 @@ CloudCacheSession::PostComputeTasks(std::string_view ChannelId, IoBuffer TasksDa const bool Success = Response.status_code == 200; - return {.ElapsedSeconds = Response.elapsed, .ErrorCode = !Success ? Response.status_code : 0, .Success = Success}; + return { + .ElapsedSeconds = Response.elapsed, + .ErrorCode = !Success ? int32_t(Response.status_code) : 0, + .Success = Success + }; } CloudCacheResult @@ -731,7 +738,7 @@ CloudCacheSession::GetComputeUpdates(std::string_view ChannelId, const uint32_t return {.Response = Buffer, .Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, - .ErrorCode = !Success ? Response.status_code : 0, + .ErrorCode = !Success ? int32_t(Response.status_code) : 0, .Success = Success}; } @@ -771,7 +778,7 @@ CloudCacheSession::GetObjectTree(const IoHash& Key) return {.Response = Buffer, .Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, - .ErrorCode = !Success ? Response.status_code : 0, + .ErrorCode = !Success ? int32_t(Response.status_code) : 0, .Success = Success}; } diff --git a/zenserver/upstream/upstreamapply.cpp b/zenserver/upstream/upstreamapply.cpp index 05be5f65c..f673ec3b3 100644 --- a/zenserver/upstream/upstreamapply.cpp +++ b/zenserver/upstream/upstreamapply.cpp @@ -1,6 +1,9 @@ // Copyright Epic Games, Inc. All Rights Reserved. #include "upstreamapply.h" + +#if ZEN_WITH_COMPUTE_SERVICES + #include "jupiter.h" #include "zen.h" @@ -1569,3 +1572,5 @@ MakeHordeUpstreamEndpoint(const CloudCacheClientOptions& Options, CasStore& CasS } } // namespace zen + +#endif // ZEN_WITH_COMPUTE_SERVICES diff --git a/zenserver/upstream/upstreamapply.h b/zenserver/upstream/upstreamapply.h index 98f193c02..8196c3b40 100644 --- a/zenserver/upstream/upstreamapply.h +++ b/zenserver/upstream/upstreamapply.h @@ -2,6 +2,10 @@ #pragma once +#include "compute/apply.h" + +#if ZEN_WITH_COMPUTE_SERVICES + #include <zencore/compactbinarypackage.h> #include <zencore/iobuffer.h> #include <zencore/iohash.h> @@ -170,3 +174,5 @@ std::unique_ptr<UpstreamApplyEndpoint> MakeHordeUpstreamEndpoint(const CloudCach CidStore& CidStore); } // namespace zen + +#endif // ZEN_WITH_COMPUTE_SERVICES diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index e2dc09872..616cd4146 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -195,7 +195,7 @@ namespace detail { } } - OnComplete({.CacheKey = CacheKey, .KeyIndex = Index, .Record = Record, .Package = Package}); + OnComplete({.Key = CacheKey, .KeyIndex = Index, .Record = Record, .Package = Package}); } return Result; @@ -271,16 +271,16 @@ namespace detail { if (CacheRecord.Type == ZenContentType::kBinary) { CloudCacheResult Result; - for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) + for (uint32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) { if (m_UseLegacyDdc) { - Result = Session.PutDerivedData(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RecordValue); + Result = Session.PutDerivedData(CacheRecord.Key.Bucket, CacheRecord.Key.Hash, RecordValue); } else { - Result = Session.PutRef(CacheRecord.CacheKey.Bucket, - CacheRecord.CacheKey.Hash, + Result = Session.PutRef(CacheRecord.Key.Bucket, + CacheRecord.Key.Hash, RecordValue, ZenContentType::kBinary); } @@ -336,15 +336,15 @@ namespace detail { for (int32_t Attempt = 0; Attempt < MaxAttempts && !RefResult.Success; Attempt++) { RefResult = - Session.PutRef(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RecordValue, ZenContentType::kCbObject); + Session.PutRef(CacheRecord.Key.Bucket, CacheRecord.Key.Hash, RecordValue, ZenContentType::kCbObject); } m_HealthOk = RefResult.ErrorCode == 0; if (!RefResult.Success) { - return {.Reason = "upload cache record '{}/{}' FAILED, reason '{}'"_format(CacheRecord.CacheKey.Bucket, - CacheRecord.CacheKey.Hash, + return {.Reason = "upload cache record '{}/{}' FAILED, reason '{}'"_format(CacheRecord.Key.Bucket, + CacheRecord.Key.Hash, RefResult.Reason), .Success = false}; } @@ -359,13 +359,13 @@ namespace detail { } const IoHash RefHash = IoHash::HashBuffer(RecordValue); - FinalizeRefResult FinalizeResult = Session.FinalizeRef(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RefHash); + FinalizeRefResult FinalizeResult = Session.FinalizeRef(CacheRecord.Key.Bucket, CacheRecord.Key.Hash, RefHash); m_HealthOk = FinalizeResult.ErrorCode == 0; if (!FinalizeResult.Success) { - return {.Reason = "finalize cache record '{}/{}' FAILED, reason '{}'"_format(CacheRecord.CacheKey.Bucket, - CacheRecord.CacheKey.Hash, + return {.Reason = "finalize cache record '{}/{}' FAILED, reason '{}'"_format(CacheRecord.Key.Bucket, + CacheRecord.Key.Hash, FinalizeResult.Reason), .Success = false}; } @@ -377,13 +377,13 @@ namespace detail { return {.Reason = std::move(Reason), .Success = false}; } - FinalizeResult = Session.FinalizeRef(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RefHash); + FinalizeResult = Session.FinalizeRef(CacheRecord.Key.Bucket, CacheRecord.Key.Hash, RefHash); m_HealthOk = FinalizeResult.ErrorCode == 0; if (!FinalizeResult.Success) { - return {.Reason = "finalize '{}/{}' FAILED, reason '{}'"_format(CacheRecord.CacheKey.Bucket, - CacheRecord.CacheKey.Hash, + return {.Reason = "finalize '{}/{}' FAILED, reason '{}'"_format(CacheRecord.Key.Bucket, + CacheRecord.Key.Hash, FinalizeResult.Reason), .Success = false}; } @@ -396,8 +396,8 @@ namespace detail { Sb << MissingHash.ToHexString() << ","; } - return {.Reason = "finalize '{}/{}' FAILED, still needs payload(s) '{}'"_format(CacheRecord.CacheKey.Bucket, - CacheRecord.CacheKey.Hash, + return {.Reason = "finalize '{}/{}' FAILED, still needs payload(s) '{}'"_format(CacheRecord.Key.Bucket, + CacheRecord.Key.Hash, Sb.ToString()), .Success = false}; } @@ -607,7 +607,7 @@ namespace detail { { const size_t Index = IndexMap[LocalIndex++]; OnComplete( - {.CacheKey = CacheKeys[Index], .KeyIndex = Index, .Record = Record.AsObjectView(), .Package = BatchResponse}); + {.Key = CacheKeys[Index], .KeyIndex = Index, .Record = Record.AsObjectView(), .Package = BatchResponse}); } return {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = true}; @@ -620,7 +620,7 @@ namespace detail { for (size_t Index : KeyIndex) { - OnComplete({.CacheKey = CacheKeys[Index], .KeyIndex = Index, .Record = CbObjectView(), .Package = CbPackage()}); + OnComplete({.Key = CacheKeys[Index], .KeyIndex = Index, .Record = CbObjectView(), .Package = CbPackage()}); } return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}; @@ -772,10 +772,10 @@ namespace detail { Package.Save(MemStream); IoBuffer PackagePayload(IoBuffer::Wrap, MemStream.Data(), MemStream.Size()); - for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) + for (uint32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) { - Result = Session.PutCacheRecord(CacheRecord.CacheKey.Bucket, - CacheRecord.CacheKey.Hash, + Result = Session.PutCacheRecord(CacheRecord.Key.Bucket, + CacheRecord.Key.Hash, PackagePayload, CacheRecord.Type); @@ -790,10 +790,10 @@ namespace detail { for (size_t Idx = 0, Count = Payloads.size(); Idx < Count; Idx++) { Result.Success = false; - for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) + for (uint32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) { - Result = Session.PutCachePayload(CacheRecord.CacheKey.Bucket, - CacheRecord.CacheKey.Hash, + Result = Session.PutCachePayload(CacheRecord.Key.Bucket, + CacheRecord.Key.Hash, CacheRecord.PayloadIds[Idx], Payloads[Idx]); @@ -813,10 +813,10 @@ namespace detail { } Result.Success = false; - for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) + for (uint32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) { Result = - Session.PutCacheRecord(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RecordValue, CacheRecord.Type); + Session.PutCacheRecord(CacheRecord.Key.Bucket, CacheRecord.Key.Hash, RecordValue, CacheRecord.Type); m_HealthOk = Result.ErrorCode == 0; } @@ -1099,7 +1099,7 @@ public: for (size_t Index : MissingKeys) { - OnComplete({.CacheKey = CacheKeys[Index], .KeyIndex = Index, .Record = CbObjectView(), .Package = CbPackage()}); + OnComplete({.Key = CacheKeys[Index], .KeyIndex = Index, .Record = CbObjectView(), .Package = CbPackage()}); } } @@ -1236,11 +1236,11 @@ private: ZenCacheValue CacheValue; std::vector<IoBuffer> Payloads; - if (!m_CacheStore.Get(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, CacheValue)) + if (!m_CacheStore.Get(CacheRecord.Key.Bucket, CacheRecord.Key.Hash, CacheValue)) { ZEN_WARN("process upstream FAILED, '{}/{}', cache record doesn't exist", - CacheRecord.CacheKey.Bucket, - CacheRecord.CacheKey.Hash); + CacheRecord.Key.Bucket, + CacheRecord.Key.Hash); return; } @@ -1253,8 +1253,8 @@ private: else { ZEN_WARN("process upstream FAILED, '{}/{}/{}', payload doesn't exist in CAS", - CacheRecord.CacheKey.Bucket, - CacheRecord.CacheKey.Hash, + CacheRecord.Key.Bucket, + CacheRecord.Key.Hash, PayloadId); return; } @@ -1270,8 +1270,8 @@ private: if (!Result.Success) { ZEN_WARN("upload cache record '{}/{}' FAILED, endpoint '{}', reason '{}'", - CacheRecord.CacheKey.Bucket, - CacheRecord.CacheKey.Hash, + CacheRecord.Key.Bucket, + CacheRecord.Key.Hash, Endpoint->GetEndpointInfo().Url, Result.Reason); } @@ -1293,8 +1293,8 @@ private: catch (std::exception& Err) { ZEN_ERROR("upload cache record '{}/{}' FAILED, reason '{}'", - CacheRecord.CacheKey.Bucket, - CacheRecord.CacheKey.Hash, + CacheRecord.Key.Bucket, + CacheRecord.Key.Hash, Err.what()); } } diff --git a/zenserver/upstream/upstreamcache.h b/zenserver/upstream/upstreamcache.h index 12287198d..c463c4996 100644 --- a/zenserver/upstream/upstreamcache.h +++ b/zenserver/upstream/upstreamcache.h @@ -25,7 +25,7 @@ struct ZenStructuredCacheClientOptions; struct UpstreamCacheRecord { ZenContentType Type = ZenContentType::kBinary; - CacheKey CacheKey; + CacheKey Key; std::vector<IoHash> PayloadIds; }; @@ -83,7 +83,7 @@ struct UpstreamEndpointStats struct CacheRecordGetCompleteParams { - const CacheKey& CacheKey; + const CacheKey& Key; size_t KeyIndex = ~size_t(0); const CbObjectView& Record; const CbPackage& Package; diff --git a/zenserver/upstream/zen.h b/zenserver/upstream/zen.h index d549c2fc4..83e54465a 100644 --- a/zenserver/upstream/zen.h +++ b/zenserver/upstream/zen.h @@ -10,14 +10,14 @@ #include <zencore/uid.h> #include <zencore/zencore.h> -#pragma warning(push) -#pragma warning(disable : 4127) +ZEN_THIRD_PARTY_INCLUDES_START #include <tsl/robin_map.h> -#pragma warning(pop) +ZEN_THIRD_PARTY_INCLUDES_END #include <asio.hpp> #include <chrono> +#include <list> struct ZenCacheValue; |