diff options
| author | Dan Engelbrecht <[email protected]> | 2025-10-03 12:38:35 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-10-03 12:38:35 +0200 |
| commit | 5361ee1c77b68bb14237169660840d6d63a74892 (patch) | |
| tree | 3ad259133e09485a14506be38e43ec5b62a050f2 /src/zenremotestore | |
| parent | move chunking code to zenremotestore lib (#545) (diff) | |
| download | zen-5361ee1c77b68bb14237169660840d6d63a74892.tar.xz zen-5361ee1c77b68bb14237169660840d6d63a74892.zip | |
remove zenutil dependency in zenremotestore (#547)
* remove dependency to zenutil/workerpools.h from remoteprojectstore.cpp
* remove dependency to zenutil/workerpools.h from buildstoragecache.cpp
* remove unneded include
* move jupiter helpers to zenremotestore
* move parallelwork to zencore
* remove zenutil dependency from zenremotestore
* clean up test project dependencies - use indirect dependencies
Diffstat (limited to 'src/zenremotestore')
14 files changed, 1270 insertions, 24 deletions
diff --git a/src/zenremotestore/builds/buildstoragecache.cpp b/src/zenremotestore/builds/buildstoragecache.cpp index 694e364ea..d36d75480 100644 --- a/src/zenremotestore/builds/buildstoragecache.cpp +++ b/src/zenremotestore/builds/buildstoragecache.cpp @@ -11,7 +11,6 @@ #include <zencore/workthreadpool.h> #include <zenhttp/httpclient.h> #include <zenhttp/packageformat.h> -#include <zenutil/workerpools.h> ZEN_THIRD_PARTY_INCLUDES_START #include <tsl/robin_set.h> @@ -29,15 +28,13 @@ public: std::string_view Namespace, std::string_view Bucket, const std::filesystem::path& TempFolderPath, - bool BoostBackgroundThreadCount) + WorkerThreadPool& BackgroundWorkerPool) : m_HttpClient(HttpClient) , m_Stats(Stats) , m_Namespace(Namespace.empty() ? "none" : Namespace) , m_Bucket(Bucket.empty() ? "none" : Bucket) , m_TempFolderPath(std::filesystem::path(TempFolderPath).make_preferred()) - , m_BoostBackgroundThreadCount(BoostBackgroundThreadCount) - , m_BackgroundWorkPool(m_BoostBackgroundThreadCount ? GetSmallWorkerPool(EWorkloadType::Background) - : GetTinyWorkerPool(EWorkloadType::Background)) + , m_BackgroundWorkPool(BackgroundWorkerPool) , m_PendingBackgroundWorkCount(1) , m_CancelBackgroundWork(false) { @@ -394,7 +391,6 @@ private: const std::string m_Namespace; const std::string m_Bucket; const std::filesystem::path m_TempFolderPath; - const bool m_BoostBackgroundThreadCount; bool IsFlushed = false; WorkerThreadPool& m_BackgroundWorkPool; @@ -408,9 +404,9 @@ CreateZenBuildStorageCache(HttpClient& HttpClient, std::string_view Namespace, std::string_view Bucket, const std::filesystem::path& TempFolderPath, - bool BoostBackgroundThreadCount) + WorkerThreadPool& BackgroundWorkerPool) { - return std::make_unique<ZenBuildStorageCache>(HttpClient, Stats, Namespace, Bucket, TempFolderPath, BoostBackgroundThreadCount); + return std::make_unique<ZenBuildStorageCache>(HttpClient, Stats, Namespace, Bucket, TempFolderPath, BackgroundWorkerPool); } ZenCacheEndpointTestResult diff --git a/src/zenremotestore/builds/jupiterbuildstorage.cpp b/src/zenremotestore/builds/jupiterbuildstorage.cpp index f1f9b3555..14a5ecc85 100644 --- a/src/zenremotestore/builds/jupiterbuildstorage.cpp +++ b/src/zenremotestore/builds/jupiterbuildstorage.cpp @@ -8,7 +8,7 @@ #include <zencore/scopeguard.h> #include <zencore/timer.h> #include <zencore/trace.h> -#include <zenutil/jupiter/jupitersession.h> +#include <zenremotestore/jupiter/jupitersession.h> ZEN_THIRD_PARTY_INCLUDES_START #include <tsl/robin_map.h> diff --git a/src/zenremotestore/chunking/chunkedcontent.cpp b/src/zenremotestore/chunking/chunkedcontent.cpp index e97dcff15..9df7725db 100644 --- a/src/zenremotestore/chunking/chunkedcontent.cpp +++ b/src/zenremotestore/chunking/chunkedcontent.cpp @@ -5,14 +5,12 @@ #include <zencore/filesystem.h> #include <zencore/fmtutils.h> #include <zencore/logging.h> +#include <zencore/parallelwork.h> #include <zencore/scopeguard.h> #include <zencore/timer.h> #include <zencore/trace.h> - #include <zenremotestore/chunking/chunkedfile.h> #include <zenremotestore/chunking/chunkingcontroller.h> -#include <zenutil/parallelwork.h> -#include <zenutil/workerpools.h> ZEN_THIRD_PARTY_INCLUDES_START #include <tsl/robin_set.h> diff --git a/src/zenremotestore/include/zenremotestore/builds/buildstoragecache.h b/src/zenremotestore/include/zenremotestore/builds/buildstoragecache.h index 2e8024915..e30270848 100644 --- a/src/zenremotestore/include/zenremotestore/builds/buildstoragecache.h +++ b/src/zenremotestore/include/zenremotestore/builds/buildstoragecache.h @@ -11,6 +11,7 @@ namespace zen { class HttpClient; +class WorkerThreadPool; class BuildStorageCache { @@ -56,7 +57,7 @@ std::unique_ptr<BuildStorageCache> CreateZenBuildStorageCache(HttpClient& H std::string_view Namespace, std::string_view Bucket, const std::filesystem::path& TempFolderPath, - bool BoostBackgroundThreadCount); + WorkerThreadPool& BackgroundWorkerPool); struct ZenCacheEndpointTestResult { diff --git a/src/zenremotestore/include/zenremotestore/jupiter/jupiterclient.h b/src/zenremotestore/include/zenremotestore/jupiter/jupiterclient.h new file mode 100644 index 000000000..8a51bd60a --- /dev/null +++ b/src/zenremotestore/include/zenremotestore/jupiter/jupiterclient.h @@ -0,0 +1,56 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zenbase/refcount.h> +#include <zencore/logging.h> +#include <zenhttp/httpclient.h> + +#include <chrono> + +namespace zen { + +class IoBuffer; + +struct JupiterClientOptions +{ + std::string_view Name; + std::string_view ServiceUrl; + std::string_view DdcNamespace; + std::string_view BlobStoreNamespace; + std::string_view ComputeCluster; + std::chrono::milliseconds ConnectTimeout{5000}; + std::chrono::milliseconds Timeout{}; + bool AssumeHttp2 = false; + bool AllowResume = false; + uint8_t RetryCount = 0; +}; + +/** + * Jupiter upstream cache client + */ +class JupiterClient : public RefCounted +{ +public: + JupiterClient(const JupiterClientOptions& Options, std::function<HttpClientAccessToken()>&& TokenProvider); + ~JupiterClient(); + + std::string_view DefaultDdcNamespace() const { return m_DefaultDdcNamespace; } + std::string_view DefaultBlobStoreNamespace() const { return m_DefaultBlobStoreNamespace; } + std::string_view ComputeCluster() const { return m_ComputeCluster; } + std::string_view ServiceUrl() const { return m_HttpClient.GetBaseUri(); } + + LoggerRef Logger() { return m_Log; } + HttpClient& Client() { return m_HttpClient; } + +private: + LoggerRef m_Log; + const std::string m_DefaultDdcNamespace; + const std::string m_DefaultBlobStoreNamespace; + const std::string m_ComputeCluster; + HttpClient m_HttpClient; + + friend class JupiterSession; +}; + +} // namespace zen diff --git a/src/zenremotestore/include/zenremotestore/jupiter/jupiterhost.h b/src/zenremotestore/include/zenremotestore/jupiter/jupiterhost.h new file mode 100644 index 000000000..3bbc700b8 --- /dev/null +++ b/src/zenremotestore/include/zenremotestore/jupiter/jupiterhost.h @@ -0,0 +1,35 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <string> +#include <string_view> +#include <vector> + +namespace zen { + +struct HttpClientSettings; + +struct JupiterServerDiscovery +{ + struct EndPoint + { + std::string Name; + std::string BaseUrl; + bool AssumeHttp2 = false; + }; + std::vector<EndPoint> ServerEndPoints; + std::vector<EndPoint> CacheEndPoints; +}; + +JupiterServerDiscovery DiscoverJupiterEndpoints(std::string_view Host, const HttpClientSettings& ClientSettings); + +struct JupiterEndpointTestResult +{ + bool Success = false; + std::string FailureReason; +}; + +JupiterEndpointTestResult TestJupiterEndpoint(std::string_view BaseUrl, const bool AssumeHttp2); + +} // namespace zen diff --git a/src/zenremotestore/include/zenremotestore/jupiter/jupitersession.h b/src/zenremotestore/include/zenremotestore/jupiter/jupitersession.h new file mode 100644 index 000000000..b79790f25 --- /dev/null +++ b/src/zenremotestore/include/zenremotestore/jupiter/jupitersession.h @@ -0,0 +1,179 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/iohash.h> +#include <zencore/logging.h> +#include <zenhttp/httpclient.h> + +#include <set> + +namespace zen { + +class IoBuffer; + +struct JupiterResult +{ + IoBuffer Response; + uint64_t SentBytes{}; + uint64_t ReceivedBytes{}; + double ElapsedSeconds{}; + int32_t ErrorCode{}; + std::string Reason; + bool Success = false; +}; + +struct PutRefResult : JupiterResult +{ + std::vector<IoHash> Needs; + IoHash RawHash; +}; + +struct FinalizeRefResult : JupiterResult +{ + std::vector<IoHash> Needs; +}; + +struct JupiterExistsResult : JupiterResult +{ + std::set<IoHash> Needs; +}; + +struct GetObjectReferencesResult : JupiterResult +{ + std::set<IoHash> References; +}; + +struct PutBuildPartResult : JupiterResult +{ + std::vector<IoHash> Needs; + IoHash RawHash; +}; + +struct FinalizeBuildPartResult : JupiterResult +{ + std::vector<IoHash> Needs; +}; + +/** + * Context for performing Jupiter operations + * + * Maintains an HTTP connection so that subsequent operations don't need to go + * through the whole connection setup process + * + */ +class JupiterSession +{ +public: + JupiterSession(LoggerRef InLog, HttpClient& InHttpClient, bool AllowRedirect); + ~JupiterSession(); + + JupiterResult Authenticate(); + + JupiterResult GetRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, ZenContentType RefType); + JupiterResult GetBlob(std::string_view Namespace, const IoHash& Key); + JupiterResult GetCompressedBlob(std::string_view Namespace, const IoHash& Key, std::filesystem::path TempFolderPath = {}); + JupiterResult GetObject(std::string_view Namespace, const IoHash& Key); + JupiterResult GetInlineBlob(std::string_view Namespace, + std::string_view BucketId, + const IoHash& Key, + IoHash& OutPayloadHash, + std::filesystem::path TempFolderPath = {}); + + PutRefResult PutRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, IoBuffer Ref, ZenContentType RefType); + JupiterResult PutBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob); + JupiterResult PutCompressedBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob); + JupiterResult PutCompressedBlob(std::string_view Namespace, const IoHash& Key, const CompositeBuffer& Blob); + JupiterResult PutObject(std::string_view Namespace, const IoHash& Key, IoBuffer Object); + + FinalizeRefResult FinalizeRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, const IoHash& RefHah); + + JupiterResult RefExists(std::string_view Namespace, std::string_view BucketId, const IoHash& Key); + + GetObjectReferencesResult GetObjectReferences(std::string_view Namespace, const IoHash& Key); + + JupiterResult BlobExists(std::string_view Namespace, const IoHash& Key); + JupiterResult CompressedBlobExists(std::string_view Namespace, const IoHash& Key); + JupiterResult ObjectExists(std::string_view Namespace, const IoHash& Key); + + JupiterExistsResult BlobExists(std::string_view Namespace, const std::set<IoHash>& Keys); + JupiterExistsResult CompressedBlobExists(std::string_view Namespace, const std::set<IoHash>& Keys); + JupiterExistsResult ObjectExists(std::string_view Namespace, const std::set<IoHash>& Keys); + + std::vector<IoHash> Filter(std::string_view Namespace, std::string_view BucketId, const std::vector<IoHash>& ChunkHashes); + + JupiterResult ListBuildNamespaces(); + JupiterResult ListBuildBuckets(std::string_view Namespace); + JupiterResult ListBuilds(std::string_view Namespace, std::string_view BucketId, const IoBuffer& Payload); + JupiterResult PutBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const IoBuffer& Payload); + JupiterResult GetBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId); + JupiterResult FinalizeBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId); + PutBuildPartResult PutBuildPart(std::string_view Namespace, + std::string_view BucketId, + const Oid& BuildId, + const Oid& PartId, + std::string_view PartName, + const IoBuffer& Payload); + JupiterResult GetBuildPart(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const Oid& PartId); + JupiterResult PutBuildBlob(std::string_view Namespace, + std::string_view BucketId, + const Oid& BuildId, + const IoHash& Hash, + ZenContentType ContentType, + const CompositeBuffer& Payload); + JupiterResult GetBuildBlob(std::string_view Namespace, + std::string_view BucketId, + const Oid& BuildId, + const IoHash& Hash, + std::filesystem::path TempFolderPath, + uint64_t Offset = 0, + uint64_t Size = (uint64_t)-1); + + JupiterResult PutMultipartBuildBlob(std::string_view Namespace, + std::string_view BucketId, + const Oid& BuildId, + const IoHash& Hash, + ZenContentType ContentType, + uint64_t PayloadSize, + std::function<IoBuffer(uint64_t Offset, uint64_t Size)>&& Transmitter, + std::vector<std::function<JupiterResult(bool& OutIsComplete)>>& OutWorkItems); + JupiterResult GetMultipartBuildBlob(std::string_view Namespace, + std::string_view BucketId, + const Oid& BuildId, + const IoHash& Hash, + uint64_t ChunkSize, + std::function<void(uint64_t Offset, const IoBuffer& Chunk)>&& OnReceive, + std::function<void()>&& OnComplete, + std::vector<std::function<JupiterResult()>>& OutWorkItems); + JupiterResult PutBlockMetadata(std::string_view Namespace, + std::string_view BucketId, + const Oid& BuildId, + const IoHash& Hash, + const IoBuffer& Payload); + FinalizeBuildPartResult FinalizeBuildPart(std::string_view Namespace, + std::string_view BucketId, + const Oid& BuildId, + const Oid& PartId, + const IoHash& RawHash); + JupiterResult FindBlocks(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, uint64_t MaxBlockCount); + JupiterResult GetBlockMetadata(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, IoBuffer Payload); + + JupiterResult PutBuildPartStats(std::string_view Namespace, + std::string_view BucketId, + const Oid& BuildId, + const Oid& BuildPartId, + IoBuffer Payload); + +private: + inline LoggerRef Log() { return m_Log; } + + JupiterResult CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const IoHash& Key); + + JupiterExistsResult CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const std::set<IoHash>& Keys); + + LoggerRef m_Log; + HttpClient& m_HttpClient; + const bool m_AllowRedirect = false; +}; + +} // namespace zen diff --git a/src/zenremotestore/include/zenremotestore/projectstore/remoteprojectstore.h b/src/zenremotestore/include/zenremotestore/projectstore/remoteprojectstore.h index 7e5af5e6b..fbcdde955 100644 --- a/src/zenremotestore/include/zenremotestore/projectstore/remoteprojectstore.h +++ b/src/zenremotestore/include/zenremotestore/projectstore/remoteprojectstore.h @@ -126,6 +126,7 @@ RemoteProjectStore::LoadContainerResult BuildContainer( CidStore& ChunkStore, ProjectStore::Project& Project, ProjectStore::Oplog& Oplog, + WorkerThreadPool& WorkerPool, size_t MaxBlockSize, size_t MaxChunksPerBlock, size_t MaxChunkEmbedSize, @@ -153,6 +154,8 @@ RemoteProjectStore::Result SaveOplog(CidStore& ChunkStore, RemoteProjectStore& RemoteStore, ProjectStore::Project& Project, ProjectStore::Oplog& Oplog, + WorkerThreadPool& NetworkWorkerPool, + WorkerThreadPool& WorkerPool, size_t MaxBlockSize, size_t MaxChunksPerBlock, size_t MaxChunkEmbedSize, @@ -165,6 +168,8 @@ RemoteProjectStore::Result SaveOplog(CidStore& ChunkStore, RemoteProjectStore::Result LoadOplog(CidStore& ChunkStore, RemoteProjectStore& RemoteStore, ProjectStore::Oplog& Oplog, + WorkerThreadPool& NetworkWorkerPool, + WorkerThreadPool& WorkerPool, bool ForceDownload, bool IgnoreMissingAttachments, bool CleanOplog, diff --git a/src/zenremotestore/jupiter/jupiterclient.cpp b/src/zenremotestore/jupiter/jupiterclient.cpp new file mode 100644 index 000000000..bf9d8e346 --- /dev/null +++ b/src/zenremotestore/jupiter/jupiterclient.cpp @@ -0,0 +1,28 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <zenremotestore/jupiter/jupiterclient.h> + +namespace zen { + +using namespace std::literals; + +JupiterClient::JupiterClient(const JupiterClientOptions& Options, std::function<HttpClientAccessToken()>&& TokenProvider) +: m_Log(zen::logging::Get("jupiter"sv)) +, m_DefaultDdcNamespace(Options.DdcNamespace) +, m_DefaultBlobStoreNamespace(Options.BlobStoreNamespace) +, m_ComputeCluster(Options.ComputeCluster) +, m_HttpClient(Options.ServiceUrl, + HttpClientSettings{.ConnectTimeout = Options.ConnectTimeout, + .Timeout = Options.Timeout, + .AccessTokenProvider = std::move(TokenProvider), + .AssumeHttp2 = Options.AssumeHttp2, + .AllowResume = Options.AllowResume, + .RetryCount = Options.RetryCount}) +{ +} + +JupiterClient::~JupiterClient() +{ +} + +} // namespace zen diff --git a/src/zenremotestore/jupiter/jupiterhost.cpp b/src/zenremotestore/jupiter/jupiterhost.cpp new file mode 100644 index 000000000..df6be10c9 --- /dev/null +++ b/src/zenremotestore/jupiter/jupiterhost.cpp @@ -0,0 +1,66 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <zenremotestore/jupiter/jupiterhost.h> + +#include <zencore/compactbinary.h> +#include <zencore/fmtutils.h> +#include <zenhttp/httpclient.h> + +namespace zen { + +JupiterServerDiscovery +DiscoverJupiterEndpoints(std::string_view Host, const HttpClientSettings& ClientSettings) +{ + JupiterServerDiscovery Result; + + HttpClient DiscoveryHttpClient(Host, ClientSettings); + HttpClient::Response ServerInfoResponse = DiscoveryHttpClient.Get("/api/v1/status/servers", HttpClient::Accept(HttpContentType::kJSON)); + if (!ServerInfoResponse.IsSuccess()) + { + ServerInfoResponse.ThrowError(fmt::format("Failed to get list of servers from discovery url '{}'", Host)); + } + std::string_view JsonResponse = ServerInfoResponse.AsText(); + CbObject CbPayload = LoadCompactBinaryFromJson(JsonResponse).AsObject(); + CbArrayView ServerEndpoints = CbPayload["serverEndpoints"].AsArrayView(); + Result.ServerEndPoints.reserve(ServerEndpoints.Num()); + + auto ParseEndPoints = [](CbArrayView ServerEndpoints) -> std::vector<JupiterServerDiscovery::EndPoint> { + std::vector<JupiterServerDiscovery::EndPoint> Result; + + Result.reserve(ServerEndpoints.Num()); + for (CbFieldView ServerEndpointView : ServerEndpoints) + { + CbObjectView ServerEndPoint = ServerEndpointView.AsObjectView(); + Result.push_back(JupiterServerDiscovery::EndPoint{.Name = std::string(ServerEndPoint["name"].AsString()), + .BaseUrl = std::string(ServerEndPoint["baseUrl"].AsString()), + .AssumeHttp2 = ServerEndPoint["baseUrl"].AsBool(false)}); + } + return Result; + }; + + Result.ServerEndPoints = ParseEndPoints(CbPayload["serverEndpoints"].AsArrayView()); + Result.CacheEndPoints = ParseEndPoints(CbPayload["cacheEndpoints"].AsArrayView()); + + return Result; +} + +JupiterEndpointTestResult +TestJupiterEndpoint(std::string_view BaseUrl, const bool AssumeHttp2) +{ + HttpClientSettings TestClientSettings{.LogCategory = "httpbuildsclient", + .ConnectTimeout = std::chrono::milliseconds{1000}, + .Timeout = std::chrono::milliseconds{2000}, + .AssumeHttp2 = AssumeHttp2, + .AllowResume = true, + .RetryCount = 0}; + + HttpClient TestHttpClient(BaseUrl, TestClientSettings); + HttpClient::Response TestResponse = TestHttpClient.Get("/health/live"); + if (TestResponse.IsSuccess()) + { + return {.Success = true}; + } + return {.Success = false, .FailureReason = TestResponse.ErrorMessage("")}; +} + +} // namespace zen diff --git a/src/zenremotestore/jupiter/jupitersession.cpp b/src/zenremotestore/jupiter/jupitersession.cpp new file mode 100644 index 000000000..942e2a9dc --- /dev/null +++ b/src/zenremotestore/jupiter/jupitersession.cpp @@ -0,0 +1,871 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <zenremotestore/jupiter/jupitersession.h> + +#include <zencore/compactbinary.h> +#include <zencore/compactbinarybuilder.h> +#include <zencore/compactbinaryutil.h> +#include <zencore/compositebuffer.h> +#include <zencore/compress.h> +#include <zencore/fmtutils.h> +#include <zencore/trace.h> + +ZEN_THIRD_PARTY_INCLUDES_START +#include <json11.hpp> +ZEN_THIRD_PARTY_INCLUDES_END + +using namespace std::literals; + +namespace zen { + +namespace detail { + JupiterResult ConvertResponse(const HttpClient::Response& Response, const std::string_view ErrorPrefix = ""sv) + { + if (Response.Error) + { + return {.SentBytes = gsl::narrow<uint64_t>(Response.UploadedBytes), + .ReceivedBytes = gsl::narrow<uint64_t>(Response.DownloadedBytes), + .ElapsedSeconds = Response.ElapsedSeconds, + .ErrorCode = Response.Error.value().ErrorCode, + .Reason = Response.ErrorMessage(ErrorPrefix), + .Success = false}; + } + if (!Response.IsSuccess()) + { + return {.SentBytes = gsl::narrow<uint64_t>(Response.UploadedBytes), + .ReceivedBytes = gsl::narrow<uint64_t>(Response.DownloadedBytes), + .ElapsedSeconds = Response.ElapsedSeconds, + .ErrorCode = static_cast<int32_t>(Response.StatusCode), + .Reason = Response.ErrorMessage(ErrorPrefix), + .Success = false}; + } + return {.Response = Response.ResponsePayload, + .SentBytes = gsl::narrow<uint64_t>(Response.UploadedBytes), + .ReceivedBytes = gsl::narrow<uint64_t>(Response.DownloadedBytes), + .ElapsedSeconds = Response.ElapsedSeconds, + .ErrorCode = 0, + .Success = true}; + } +} // namespace detail + +JupiterSession::JupiterSession(LoggerRef InLog, HttpClient& InHttpClient, bool AllowRedirect) +: m_Log(InLog) +, m_HttpClient(InHttpClient) +, m_AllowRedirect(AllowRedirect) +{ +} + +JupiterSession::~JupiterSession() +{ +} + +JupiterResult +JupiterSession::Authenticate() +{ + bool OK = m_HttpClient.Authenticate(); + return {.Success = OK}; +} + +JupiterResult +JupiterSession::GetRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, ZenContentType RefType) +{ + ZEN_TRACE_CPU("JupiterClient::GetRef"); + + HttpClient::Response Response = + m_HttpClient.Get(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString()), {HttpClient::Accept(RefType)}); + + return detail::ConvertResponse(Response, "JupiterSession::GetRef"sv); +} + +JupiterResult +JupiterSession::GetBlob(std::string_view Namespace, const IoHash& Key) +{ + ZEN_TRACE_CPU("JupiterClient::GetBlob"); + HttpClient::Response Response = + m_HttpClient.Get(fmt::format("/api/v1/blobs/{}/{}", Namespace, Key.ToHexString()), {HttpClient::Accept(ZenContentType::kBinary)}); + + return detail::ConvertResponse(Response); +} + +JupiterResult +JupiterSession::GetCompressedBlob(std::string_view Namespace, const IoHash& Key, std::filesystem::path TempFolderPath) +{ + ZEN_TRACE_CPU("JupiterClient::GetCompressedBlob"); + + HttpClient::Response Response = m_HttpClient.Download(fmt::format("/api/v1/compressed-blobs/{}/{}", Namespace, Key.ToHexString()), + TempFolderPath, + {HttpClient::Accept(ZenContentType::kCompressedBinary)}); + + return detail::ConvertResponse(Response); +} + +JupiterResult +JupiterSession::GetInlineBlob(std::string_view Namespace, + std::string_view BucketId, + const IoHash& Key, + IoHash& OutPayloadHash, + std::filesystem::path TempFolderPath) +{ + ZEN_TRACE_CPU("JupiterClient::GetInlineBlob"); + + HttpClient::Response Response = m_HttpClient.Download(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString()), + TempFolderPath, + {{"Accept", "application/x-jupiter-inline"}}); + + JupiterResult Result = detail::ConvertResponse(Response); + + if (auto It = Response.Header->find("X-Jupiter-InlinePayloadHash"); It != Response.Header->end()) + { + const std::string& PayloadHashHeader = It->second; + if (PayloadHashHeader.length() == IoHash::StringLength) + { + OutPayloadHash = IoHash::FromHexString(PayloadHashHeader); + } + } + + return Result; +} + +JupiterResult +JupiterSession::GetObject(std::string_view Namespace, const IoHash& Key) +{ + ZEN_TRACE_CPU("JupiterClient::GetObject"); + + HttpClient::Response Response = m_HttpClient.Get(fmt::format("/api/v1/objects/{}/{}", Namespace, Key.ToHexString()), + {HttpClient::Accept(ZenContentType::kCbObject)}); + + return detail::ConvertResponse(Response); +} + +PutRefResult +JupiterSession::PutRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, IoBuffer Ref, ZenContentType RefType) +{ + ZEN_TRACE_CPU("JupiterClient::PutRef"); + + Ref.SetContentType(RefType); + + IoHash Hash = IoHash::HashBuffer(Ref.Data(), Ref.Size()); + + HttpClient::Response Response = m_HttpClient.Put(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString()), + Ref, + {{"X-Jupiter-IoHash", Hash.ToHexString()}}); + + PutRefResult Result = {detail::ConvertResponse(Response)}; + if (Result.Success) + { + std::string JsonError; + json11::Json Json = json11::Json::parse(Response.ToText(), JsonError); + if (JsonError.empty()) + { + json11::Json::array Needs = Json["needs"].array_items(); + for (const auto& Need : Needs) + { + Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value())); + } + } + Result.RawHash = Hash; + } + return Result; +} + +FinalizeRefResult +JupiterSession::FinalizeRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, const IoHash& RefHash) +{ + ZEN_TRACE_CPU("JupiterClient::FinalizeRef"); + + HttpClient::Response Response = + m_HttpClient.Post(fmt::format("/api/v1/refs/{}/{}/{}/finalize/{}", Namespace, BucketId, Key.ToHexString(), RefHash.ToHexString()), + {{"X-Jupiter-IoHash", RefHash.ToHexString()}, {"Content-Type", "application/x-ue-cb"}}); + + FinalizeRefResult Result = {detail::ConvertResponse(Response)}; + + if (Result.Success) + { + std::string JsonError; + json11::Json Json = json11::Json::parse(std::string(Response.ToText()), JsonError); + if (JsonError.empty()) + { + json11::Json::array Needs = Json["needs"].array_items(); + for (const auto& Need : Needs) + { + Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value())); + } + } + } + return Result; +} + +JupiterResult +JupiterSession::PutBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob) +{ + ZEN_TRACE_CPU("JupiterClient::PutBlob"); + + HttpClient::Response Response = m_HttpClient.Put(fmt::format("/api/v1/blobs/{}/{}", Namespace, Key.ToHexString()), Blob); + + return detail::ConvertResponse(Response); +} + +JupiterResult +JupiterSession::PutCompressedBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob) +{ + ZEN_TRACE_CPU("JupiterClient::PutCompressedBlob"); + + Blob.SetContentType(ZenContentType::kCompressedBinary); + HttpClient::Response Response = m_HttpClient.Upload(fmt::format("/api/v1/compressed-blobs/{}/{}", Namespace, Key.ToHexString()), Blob); + + return detail::ConvertResponse(Response); +} + +JupiterResult +JupiterSession::PutCompressedBlob(std::string_view Namespace, const IoHash& Key, const CompositeBuffer& Payload) +{ + ZEN_TRACE_CPU("JupiterClient::PutCompressedBlob"); + + HttpClient::Response Response = m_HttpClient.Upload(fmt::format("/api/v1/compressed-blobs/{}/{}", Namespace, Key.ToHexString()), + Payload, + ZenContentType::kCompressedBinary); + + return detail::ConvertResponse(Response); +} + +JupiterResult +JupiterSession::PutObject(std::string_view Namespace, const IoHash& Key, IoBuffer Object) +{ + ZEN_TRACE_CPU("JupiterClient::PutObject"); + + Object.SetContentType(ZenContentType::kCbObject); + HttpClient::Response Response = m_HttpClient.Upload(fmt::format("/api/v1/objects/{}/{}", Namespace, Key.ToHexString()), Object); + + return detail::ConvertResponse(Response); +} + +JupiterResult +JupiterSession::RefExists(std::string_view Namespace, std::string_view BucketId, const IoHash& Key) +{ + ZEN_TRACE_CPU("JupiterClient::RefExists"); + + HttpClient::Response Response = m_HttpClient.Head(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString())); + + return detail::ConvertResponse(Response); +} + +GetObjectReferencesResult +JupiterSession::GetObjectReferences(std::string_view Namespace, const IoHash& Key) +{ + ZEN_TRACE_CPU("JupiterClient::GetObjectReferences"); + + HttpClient::Response Response = m_HttpClient.Head(fmt::format("/api/v1/refs/{}/{}/references", Namespace, Key.ToHexString()), + {HttpClient::Accept(ZenContentType::kCbObject)}); + + GetObjectReferencesResult Result = {detail::ConvertResponse(Response)}; + + if (Result.Success) + { + const CbObject ReferencesResponse = Response.AsObject(); + for (auto& Item : ReferencesResponse["references"sv]) + { + Result.References.insert(Item.AsHash()); + } + } + return Result; +} + +JupiterResult +JupiterSession::BlobExists(std::string_view Namespace, const IoHash& Key) +{ + return CacheTypeExists(Namespace, "blobs"sv, Key); +} + +JupiterResult +JupiterSession::CompressedBlobExists(std::string_view Namespace, const IoHash& Key) +{ + return CacheTypeExists(Namespace, "compressed-blobs"sv, Key); +} + +JupiterResult +JupiterSession::ObjectExists(std::string_view Namespace, const IoHash& Key) +{ + return CacheTypeExists(Namespace, "objects"sv, Key); +} + +JupiterExistsResult +JupiterSession::BlobExists(std::string_view Namespace, const std::set<IoHash>& Keys) +{ + return CacheTypeExists(Namespace, "blobs"sv, Keys); +} + +JupiterExistsResult +JupiterSession::CompressedBlobExists(std::string_view Namespace, const std::set<IoHash>& Keys) +{ + return CacheTypeExists(Namespace, "compressed-blobs"sv, Keys); +} + +JupiterExistsResult +JupiterSession::ObjectExists(std::string_view Namespace, const std::set<IoHash>& Keys) +{ + return CacheTypeExists(Namespace, "objects"sv, Keys); +} + +std::vector<IoHash> +JupiterSession::Filter(std::string_view Namespace, std::string_view BucketId, const std::vector<IoHash>& ChunkHashes) +{ + // ExtendableStringBuilder<256> Uri; + // Uri << m_CacheClient->ServiceUrl(); + // Uri << "/api/v1/s/" << Namespace; + + ZEN_UNUSED(Namespace, BucketId, ChunkHashes); + + return {}; +} + +JupiterResult +JupiterSession::CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const IoHash& Key) +{ + ZEN_TRACE_CPU("JupiterClient::CacheTypeExists"); + + HttpClient::Response Response = m_HttpClient.Head(fmt::format("/api/v1/{}/{}/{}", TypeId, Namespace, Key.ToHexString())); + + return detail::ConvertResponse(Response); +} + +JupiterExistsResult +JupiterSession::CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const std::set<IoHash>& Keys) +{ + ZEN_TRACE_CPU("JupiterClient::CacheTypeExists"); + + ExtendableStringBuilder<256> Body; + Body << "["; + for (const auto& Key : Keys) + { + Body << (Body.Size() != 1 ? ",\"" : "\"") << Key.ToHexString() << "\""; + } + Body << "]"; + IoBuffer Payload = IoBuffer(IoBuffer::Wrap, Body.Data(), Body.Size()); + Payload.SetContentType(ZenContentType::kJSON); + + HttpClient::Response Response = + m_HttpClient.Post(fmt::format("/api/v1/{}/{}/exist", TypeId, Namespace), Payload, {HttpClient::Accept(ZenContentType::kCbObject)}); + + JupiterExistsResult Result = {detail::ConvertResponse(Response)}; + + if (Result.Success) + { + const CbObject ExistsResponse = Response.AsObject(); + for (auto& Item : ExistsResponse["needs"sv]) + { + Result.Needs.insert(Item.AsHash()); + } + } + return Result; +} + +JupiterResult +JupiterSession::ListBuildNamespaces() +{ + HttpClient::Response Response = m_HttpClient.Get(fmt::format("/api/v2/builds"), {HttpClient::Accept(ZenContentType::kJSON)}); + return detail::ConvertResponse(Response, "JupiterSession::ListBuildNamespaces"sv); +} + +JupiterResult +JupiterSession::ListBuildBuckets(std::string_view Namespace) +{ + HttpClient::Response Response = + m_HttpClient.Get(fmt::format("/api/v2/builds/{}", Namespace), {HttpClient::Accept(ZenContentType::kJSON)}); + return detail::ConvertResponse(Response, "JupiterSession::ListBuildBuckets"sv); +} + +JupiterResult +JupiterSession::ListBuilds(std::string_view Namespace, std::string_view BucketId, const IoBuffer& Payload) +{ + ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject); + std::string OptionalBucketPath = BucketId.empty() ? "" : fmt::format("/{}", BucketId); + HttpClient::Response Response = m_HttpClient.Post(fmt::format("/api/v2/builds/{}{}/search", Namespace, OptionalBucketPath), + Payload, + {HttpClient::Accept(ZenContentType::kCbObject)}); + return detail::ConvertResponse(Response, "JupiterSession::ListBuilds"sv); +} + +JupiterResult +JupiterSession::PutBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const IoBuffer& Payload) +{ + ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject); + HttpClient::Response Response = m_HttpClient.Put(fmt::format("/api/v2/builds/{}/{}/{}", Namespace, BucketId, BuildId), Payload); + return detail::ConvertResponse(Response, "JupiterSession::PutBuild"sv); +} + +JupiterResult +JupiterSession::GetBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId) +{ + HttpClient::Response Response = m_HttpClient.Get(fmt::format("/api/v2/builds/{}/{}/{}", Namespace, BucketId, BuildId), + HttpClient::Accept(ZenContentType::kCbObject)); + return detail::ConvertResponse(Response, "JupiterSession::GetBuild"sv); +} + +JupiterResult +JupiterSession::FinalizeBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId) +{ + HttpClient::Response Response = m_HttpClient.Post(fmt::format("/api/v2/builds/{}/{}/{}/finalize", Namespace, BucketId, BuildId)); + return detail::ConvertResponse(Response, "JupiterSession::FinalizeBuild"sv); +} + +PutBuildPartResult +JupiterSession::PutBuildPart(std::string_view Namespace, + std::string_view BucketId, + const Oid& BuildId, + const Oid& PartId, + std::string_view PartName, + const IoBuffer& Payload) +{ + ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject); + + IoHash Hash = IoHash::HashBuffer(Payload.Data(), Payload.Size()); + + HttpClient::Response Response = + m_HttpClient.Put(fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/{}", Namespace, BucketId, BuildId, PartId, PartName), + Payload, + {{"X-Jupiter-IoHash", Hash.ToHexString()}}); + + PutBuildPartResult Result = {detail::ConvertResponse(Response, "JupiterSession::PutBuildPart"sv)}; + if (Result.Success) + { + std::string JsonError; + json11::Json Json = json11::Json::parse(Response.ToText(), JsonError); + if (JsonError.empty()) + { + json11::Json::array Needs = Json["needs"].array_items(); + for (const auto& Need : Needs) + { + Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value())); + } + } + Result.RawHash = Hash; + } + return Result; +} + +JupiterResult +JupiterSession::GetBuildPart(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const Oid& PartId) +{ + HttpClient::Response Response = m_HttpClient.Get(fmt::format("/api/v2/builds/{}/{}/{}/parts/{}", Namespace, BucketId, BuildId, PartId), + HttpClient::Accept(ZenContentType::kCbObject)); + return detail::ConvertResponse(Response, "JupiterSession::GetBuildPart"sv); +} + +JupiterResult +JupiterSession::PutBuildBlob(std::string_view Namespace, + std::string_view BucketId, + const Oid& BuildId, + const IoHash& Hash, + ZenContentType ContentType, + const CompositeBuffer& Payload) +{ + HttpClient::Response Response = + m_HttpClient.Upload(fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}", Namespace, BucketId, BuildId, Hash.ToHexString()), + Payload, + ContentType); + return detail::ConvertResponse(Response, "JupiterSession::PutBuildBlob"sv); +} + +JupiterResult +JupiterSession::PutMultipartBuildBlob(std::string_view Namespace, + std::string_view BucketId, + const Oid& BuildId, + const IoHash& Hash, + ZenContentType ContentType, + uint64_t PayloadSize, + std::function<IoBuffer(uint64_t Offset, uint64_t Size)>&& Transmitter, + std::vector<std::function<JupiterResult(bool& OutIsComplete)>>& OutWorkItems) +{ + struct MultipartUploadResponse + { + struct Part + { + uint64_t FirstByte; + uint64_t LastByte; + std::string PartId; + std::string QueryString; + }; + + std::string UploadId; + std::string BlobName; + std::vector<Part> Parts; + + static MultipartUploadResponse Parse(CbObject& Payload) + { + MultipartUploadResponse Result; + Result.UploadId = Payload["uploadId"sv].AsString(); + Result.BlobName = Payload["blobName"sv].AsString(); + CbArrayView PartsArray = Payload["parts"sv].AsArrayView(); + Result.Parts.reserve(PartsArray.Num()); + for (CbFieldView PartView : PartsArray) + { + CbObjectView PartObject = PartView.AsObjectView(); + Result.Parts.emplace_back(Part{ + .FirstByte = PartObject["firstByte"sv].AsUInt64(), + .LastByte = PartObject["lastByte"sv].AsUInt64(), + .PartId = std::string(PartObject["partId"sv].AsString()), + .QueryString = std::string(PartObject["queryString"sv].AsString()), + }); + } + return Result; + } + }; + + CbObjectWriter StartMultipartPayloadWriter; + StartMultipartPayloadWriter.AddInteger("blobLength"sv, PayloadSize); + CbObject StartMultipartPayload = StartMultipartPayloadWriter.Save(); + + std::string StartMultipartResponseRequestString = + fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}/startMultipartUpload", Namespace, BucketId, BuildId, Hash.ToHexString()); + // ZEN_INFO("POST: {}", StartMultipartResponseRequestString); + HttpClient::Response StartMultipartResponse = + m_HttpClient.Post(StartMultipartResponseRequestString, StartMultipartPayload, HttpClient::Accept(ZenContentType::kCbObject)); + if (!StartMultipartResponse.IsSuccess()) + { + ZEN_WARN("{}", StartMultipartResponse.ErrorMessage("startMultipartUpload: ")); + return detail::ConvertResponse(StartMultipartResponse, "JupiterSession::PutMultipartBuildBlob"sv); + } + CbValidateError ValidateResult = CbValidateError::None; + CbObject ResponseObject = ValidateAndReadCompactBinaryObject(IoBuffer(StartMultipartResponse.ResponsePayload), ValidateResult); + if (ValidateResult != CbValidateError::None) + { + JupiterResult Result = detail::ConvertResponse(StartMultipartResponse, "JupiterSession::PutMultipartBuildBlob"sv); + Result.ErrorCode = (int32)HttpResponseCode::UnsupportedMediaType; + Result.Reason = fmt::format("Invalid multipart response object format: '{}'", ToString(ValidateResult)); + return Result; + } + + struct WorkloadData + { + MultipartUploadResponse PartDescription; + std::function<IoBuffer(uint64_t Offset, uint64_t Size)> Transmitter; + std::atomic<size_t> PartsLeft; + }; + + std::shared_ptr<WorkloadData> Workload(std::make_shared<WorkloadData>()); + + Workload->PartDescription = MultipartUploadResponse::Parse(ResponseObject); + Workload->Transmitter = std::move(Transmitter); + Workload->PartsLeft = Workload->PartDescription.Parts.size(); + + for (size_t PartIndex = 0; PartIndex < Workload->PartDescription.Parts.size(); PartIndex++) + { + OutWorkItems.emplace_back([this, Namespace, BucketId, BuildId, Hash, ContentType, Workload, PartIndex]( + bool& OutIsComplete) -> JupiterResult { + const MultipartUploadResponse::Part& Part = Workload->PartDescription.Parts[PartIndex]; + IoBuffer PartPayload = Workload->Transmitter(Part.FirstByte, Part.LastByte - Part.FirstByte); + std::string MultipartUploadResponseRequestString = + fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}/uploadMultipart{}&supportsRedirect={}", + Namespace, + BucketId, + BuildId, + Hash.ToHexString(), + Part.QueryString, + m_AllowRedirect ? "true"sv : "false"sv); + // ZEN_INFO("PUT: {}", MultipartUploadResponseRequestString); + HttpClient::Response MultipartUploadResponse = m_HttpClient.Put(MultipartUploadResponseRequestString, PartPayload); + if (!MultipartUploadResponse.IsSuccess()) + { + ZEN_WARN("{}", MultipartUploadResponse.ErrorMessage(MultipartUploadResponseRequestString)); + } + OutIsComplete = Workload->PartsLeft.fetch_sub(1) == 1; + if (OutIsComplete) + { + int64_t TotalUploadedBytes = MultipartUploadResponse.UploadedBytes; + int64_t TotalDownloadedBytes = MultipartUploadResponse.DownloadedBytes; + double TotalElapsedSeconds = MultipartUploadResponse.ElapsedSeconds; + HttpClient::Response MultipartEndResponse = MultipartUploadResponse; + while (MultipartEndResponse.IsSuccess()) + { + CbObjectWriter CompletePayloadWriter; + CompletePayloadWriter.AddString("blobName"sv, Workload->PartDescription.BlobName); + CompletePayloadWriter.AddString("uploadId"sv, Workload->PartDescription.UploadId); + CompletePayloadWriter.AddBool("isCompressed"sv, ContentType == ZenContentType::kCompressedBinary); + CompletePayloadWriter.BeginArray("partIds"sv); + std::unordered_map<std::string, size_t> PartNameToIndex; + for (size_t UploadPartIndex = 0; UploadPartIndex < Workload->PartDescription.Parts.size(); UploadPartIndex++) + { + const MultipartUploadResponse::Part& PartDescription = Workload->PartDescription.Parts[UploadPartIndex]; + PartNameToIndex.insert({PartDescription.PartId, UploadPartIndex}); + CompletePayloadWriter.AddString(PartDescription.PartId); + } + CompletePayloadWriter.EndArray(); // "partIds" + CbObject CompletePayload = CompletePayloadWriter.Save(); + + std::string MultipartEndResponseRequestString = + fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}/completeMultipart", Namespace, BucketId, BuildId, Hash.ToHexString()); + + MultipartEndResponse = m_HttpClient.Post(MultipartEndResponseRequestString, + CompletePayload, + HttpClient::Accept(ZenContentType::kCbObject)); + TotalUploadedBytes += MultipartEndResponse.UploadedBytes; + TotalDownloadedBytes += MultipartEndResponse.DownloadedBytes; + TotalElapsedSeconds += MultipartEndResponse.ElapsedSeconds; + if (MultipartEndResponse.IsSuccess()) + { + CbObject ResponseObject = MultipartEndResponse.AsObject(); + CbArrayView MissingPartsArrayView = ResponseObject["missingParts"sv].AsArrayView(); + if (MissingPartsArrayView.Num() == 0) + { + break; + } + else + { + for (CbFieldView PartIdView : MissingPartsArrayView) + { + std::string RetryPartId(PartIdView.AsString()); + size_t RetryPartIndex = PartNameToIndex.at(RetryPartId); + const MultipartUploadResponse::Part& RetryPart = Workload->PartDescription.Parts[RetryPartIndex]; + IoBuffer RetryPartPayload = + Workload->Transmitter(RetryPart.FirstByte, RetryPart.LastByte - RetryPart.FirstByte - 1); + std::string RetryMultipartUploadResponseRequestString = + fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}/uploadMultipart{}&supportsRedirect={}", + Namespace, + BucketId, + BuildId, + Hash.ToHexString(), + RetryPart.QueryString, + m_AllowRedirect ? "true"sv : "false"sv); + + MultipartUploadResponse = m_HttpClient.Put(RetryMultipartUploadResponseRequestString, RetryPartPayload); + TotalUploadedBytes = MultipartUploadResponse.UploadedBytes; + TotalDownloadedBytes = MultipartUploadResponse.DownloadedBytes; + TotalElapsedSeconds = MultipartUploadResponse.ElapsedSeconds; + if (!MultipartUploadResponse.IsSuccess()) + { + ZEN_WARN("{}", MultipartUploadResponse.ErrorMessage(RetryMultipartUploadResponseRequestString)); + MultipartEndResponse = MultipartUploadResponse; + } + } + } + } + else + { + ZEN_WARN("{}", MultipartEndResponse.ErrorMessage(MultipartEndResponseRequestString)); + } + } + MultipartEndResponse.UploadedBytes = TotalUploadedBytes; + MultipartEndResponse.DownloadedBytes = TotalDownloadedBytes; + MultipartEndResponse.ElapsedSeconds = TotalElapsedSeconds; + return detail::ConvertResponse(MultipartEndResponse, "JupiterSession::PutMultipartBuildBlob"sv); + } + return detail::ConvertResponse(MultipartUploadResponse, "JupiterSession::PutMultipartBuildBlob"sv); + }); + } + return detail::ConvertResponse(StartMultipartResponse, "JupiterSession::PutMultipartBuildBlob"sv); +} + +JupiterResult +JupiterSession::GetMultipartBuildBlob(std::string_view Namespace, + std::string_view BucketId, + const Oid& BuildId, + const IoHash& Hash, + uint64_t ChunkSize, + std::function<void(uint64_t Offset, const IoBuffer& Chunk)>&& OnReceive, + std::function<void()>&& OnComplete, + std::vector<std::function<JupiterResult()>>& OutWorkItems) +{ + std::string RequestUrl = fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}?supportsRedirect={}", + Namespace, + BucketId, + BuildId, + Hash.ToHexString(), + m_AllowRedirect ? "true"sv : "false"sv); + HttpClient::Response Response = + m_HttpClient.Get(RequestUrl, HttpClient::KeyValueMap({{"Range", fmt::format("bytes={}-{}", 0, ChunkSize - 1)}})); + if (Response.IsSuccess()) + { + if (std::string_view ContentRange = Response.Header.Entries["Content-Range"]; !ContentRange.empty()) + { + if (std::string_view::size_type SizeDelimiterPos = ContentRange.find('/'); SizeDelimiterPos != std::string_view::npos) + { + if (std::optional<uint64_t> TotalSizeMaybe = ParseInt<uint64_t>(ContentRange.substr(SizeDelimiterPos + 1)); + TotalSizeMaybe.has_value()) + { + uint64_t TotalSize = TotalSizeMaybe.value(); + uint64_t PayloadSize = Response.ResponsePayload.GetSize(); + + OnReceive(0, Response.ResponsePayload); + + if (TotalSize > PayloadSize) + { + struct WorkloadData + { + std::function<void(uint64_t Offset, const IoBuffer& Chunk)> OnReceive; + std::function<void()> OnComplete; + std::atomic<uint64_t> BytesRemaining; + }; + + std::shared_ptr<WorkloadData> Workload(std::make_shared<WorkloadData>()); + Workload->OnReceive = std::move(OnReceive); + Workload->OnComplete = std::move(OnComplete); + Workload->BytesRemaining = TotalSize - PayloadSize; + + uint64_t Offset = PayloadSize; + while (Offset < TotalSize) + { + uint64_t PartSize = Min(ChunkSize, TotalSize - Offset); + OutWorkItems.emplace_back([this, + Namespace = std::string(Namespace), + BucketId = std::string(BucketId), + BuildId = Oid(BuildId), + Hash = IoHash(Hash), + TotalSize, + Workload, + Offset, + PartSize]() -> JupiterResult { + std::string RequestUrl = fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}?supportsRedirect={}", + Namespace, + BucketId, + BuildId, + Hash.ToHexString(), + m_AllowRedirect ? "true"sv : "false"sv); + HttpClient::Response Response = m_HttpClient.Get( + RequestUrl, + HttpClient::KeyValueMap({{"Range", fmt::format("bytes={}-{}", Offset, Offset + PartSize - 1)}})); + if (Response.IsSuccess()) + { + Workload->OnReceive(Offset, Response.ResponsePayload); + uint64_t ByteRemaning = Workload->BytesRemaining.fetch_sub(Response.ResponsePayload.GetSize()); + if (ByteRemaning == Response.ResponsePayload.GetSize()) + { + Workload->OnComplete(); + } + } + return detail::ConvertResponse(Response, "JupiterSession::GetMultipartBuildBlob"sv); + }); + Offset += PartSize; + } + } + else + { + OnComplete(); + } + return detail::ConvertResponse(Response, "JupiterSession::GetMultipartBuildBlob"sv); + } + } + } + OnReceive(0, Response.ResponsePayload); + OnComplete(); + } + return detail::ConvertResponse(Response, "JupiterSession::GetMultipartBuildBlob"sv); +} + +JupiterResult +JupiterSession::GetBuildBlob(std::string_view Namespace, + std::string_view BucketId, + const Oid& BuildId, + const IoHash& Hash, + std::filesystem::path TempFolderPath, + uint64_t Offset, + uint64_t Size) +{ + HttpClient::KeyValueMap Headers; + if (Offset != 0 || Size != (uint64_t)-1) + { + Headers.Entries.insert({"Range", fmt::format("bytes={}-{}", Offset, Offset + Size - 1)}); + } + HttpClient::Response Response = m_HttpClient.Download(fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}?supportsRedirect={}", + Namespace, + BucketId, + BuildId, + Hash.ToHexString(), + m_AllowRedirect ? "true"sv : "false"sv), + TempFolderPath, + Headers); + if (Response.IsSuccess()) + { + // If we get a redirect to S3 or a non-Jupiter endpoint the content type will not be correct, validate it and set it + if (m_AllowRedirect && (Response.ResponsePayload.GetContentType() == HttpContentType::kBinary)) + { + IoHash ValidateRawHash; + uint64_t ValidateRawSize = 0; + ZEN_ASSERT_SLOW(CompressedBuffer::ValidateCompressedHeader(Response.ResponsePayload, ValidateRawHash, ValidateRawSize)); + ZEN_ASSERT_SLOW(ValidateRawHash == Hash); + ZEN_ASSERT_SLOW(ValidateRawSize > 0); + ZEN_UNUSED(ValidateRawHash, ValidateRawSize); + Response.ResponsePayload.SetContentType(ZenContentType::kCompressedBinary); + } + } + return detail::ConvertResponse(Response, "JupiterSession::GetBuildBlob"sv); +} + +JupiterResult +JupiterSession::PutBlockMetadata(std::string_view Namespace, + std::string_view BucketId, + const Oid& BuildId, + const IoHash& Hash, + const IoBuffer& Payload) +{ + ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject); + HttpClient::Response Response = + m_HttpClient.Put(fmt::format("/api/v2/builds/{}/{}/{}/blocks/{}/metadata", Namespace, BucketId, BuildId, Hash.ToHexString()), + Payload); + return detail::ConvertResponse(Response, "JupiterSession::PutBlockMetadata"sv); +} + +FinalizeBuildPartResult +JupiterSession::FinalizeBuildPart(std::string_view Namespace, + std::string_view BucketId, + const Oid& BuildId, + const Oid& PartId, + const IoHash& RawHash) +{ + HttpClient::Response Response = m_HttpClient.Post( + fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/finalize/{}", Namespace, BucketId, BuildId, PartId, RawHash.ToHexString()), + HttpClient::Accept(ZenContentType::kCbObject)); + + FinalizeBuildPartResult Result = {detail::ConvertResponse(Response, "JupiterSession::FinalizeBuildPart"sv)}; + if (Result.Success) + { + std::string JsonError; + json11::Json Json = json11::Json::parse(Response.ToText(), JsonError); + if (JsonError.empty()) + { + json11::Json::array Needs = Json["needs"].array_items(); + for (const auto& Need : Needs) + { + Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value())); + } + } + } + return Result; +} + +JupiterResult +JupiterSession::FindBlocks(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, uint64_t MaxBlockCount) +{ + const std::string Parameters = MaxBlockCount == (uint64_t)-1 ? "" : fmt::format("?count={}", MaxBlockCount); + HttpClient::Response Response = + m_HttpClient.Get(fmt::format("/api/v2/builds/{}/{}/{}/blocks/listBlocks{}", Namespace, BucketId, BuildId, Parameters), + HttpClient::Accept(ZenContentType::kCbObject)); + return detail::ConvertResponse(Response, "JupiterSession::FindBlocks"sv); +} + +JupiterResult +JupiterSession::GetBlockMetadata(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, IoBuffer Payload) +{ + ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject); + HttpClient::Response Response = + m_HttpClient.Post(fmt::format("/api/v2/builds/{}/{}/{}/blocks/getBlockMetadata", Namespace, BucketId, BuildId), + Payload, + HttpClient::Accept(ZenContentType::kCbObject)); + return detail::ConvertResponse(Response, "JupiterSession::GetBlockMetadata"sv); +} + +JupiterResult +JupiterSession::PutBuildPartStats(std::string_view Namespace, + std::string_view BucketId, + const Oid& BuildId, + const Oid& BuildPartId, + IoBuffer Payload) +{ + ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject); + HttpClient::Response Response = + m_HttpClient.Put(fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/stats", Namespace, BucketId, BuildId, BuildPartId), + Payload, + HttpClient::Accept(ZenContentType::kCbObject)); + return detail::ConvertResponse(Response, "JupiterSession::PutBuildPartStats"sv); +} + +} // namespace zen diff --git a/src/zenremotestore/projectstore/jupiterremoteprojectstore.cpp b/src/zenremotestore/projectstore/jupiterremoteprojectstore.cpp index bfd4e433c..b198b7c99 100644 --- a/src/zenremotestore/projectstore/jupiterremoteprojectstore.cpp +++ b/src/zenremotestore/projectstore/jupiterremoteprojectstore.cpp @@ -8,8 +8,8 @@ #include <zenhttp/httpclientauth.h> -#include <zenutil/jupiter/jupiterclient.h> -#include <zenutil/jupiter/jupitersession.h> +#include <zenremotestore/jupiter/jupiterclient.h> +#include <zenremotestore/jupiter/jupitersession.h> namespace zen { diff --git a/src/zenremotestore/projectstore/remoteprojectstore.cpp b/src/zenremotestore/projectstore/remoteprojectstore.cpp index c2e270909..d008d1452 100644 --- a/src/zenremotestore/projectstore/remoteprojectstore.cpp +++ b/src/zenremotestore/projectstore/remoteprojectstore.cpp @@ -15,7 +15,6 @@ #include <zenhttp/httpcommon.h> #include <zenremotestore/chunking/chunkedfile.h> #include <zenstore/cidstore.h> -#include <zenutil/workerpools.h> #include <unordered_map> @@ -2307,6 +2306,7 @@ RemoteProjectStore::LoadContainerResult BuildContainer(CidStore& ChunkStore, ProjectStore::Project& Project, ProjectStore::Oplog& Oplog, + WorkerThreadPool& WorkerPool, size_t MaxBlockSize, size_t MaxChunksPerBlock, size_t MaxChunkEmbedSize, @@ -2319,7 +2319,7 @@ BuildContainer(CidStore& ChunkStore, const std::function<void(std::vector<std::pair<IoHash, FetchChunkFunc>>&&)>& OnBlockChunks, bool EmbedLooseFiles) { - WorkerThreadPool& WorkerPool = GetLargeWorkerPool(EWorkloadType::Background); + // WorkerThreadPool& WorkerPool = GetLargeWorkerPool(EWorkloadType::Background); remotestore_impl::AsyncRemoteResult RemoteResult; CbObject ContainerObject = BuildContainer(ChunkStore, @@ -2348,6 +2348,8 @@ SaveOplog(CidStore& ChunkStore, RemoteProjectStore& RemoteStore, ProjectStore::Project& Project, ProjectStore::Oplog& Oplog, + WorkerThreadPool& NetworkWorkerPool, + WorkerThreadPool& WorkerPool, size_t MaxBlockSize, size_t MaxChunksPerBlock, size_t MaxChunkEmbedSize, @@ -2363,9 +2365,6 @@ SaveOplog(CidStore& ChunkStore, remotestore_impl::UploadInfo Info; - WorkerThreadPool& WorkerPool = GetLargeWorkerPool(EWorkloadType::Background); - WorkerThreadPool& NetworkWorkerPool = GetMediumWorkerPool(EWorkloadType::Background); - const RemoteProjectStore::RemoteStoreInfo RemoteStoreInfo = RemoteStore.GetInfo(); std::filesystem::path AttachmentTempPath; @@ -2887,6 +2886,8 @@ RemoteProjectStore::Result LoadOplog(CidStore& ChunkStore, RemoteProjectStore& RemoteStore, ProjectStore::Oplog& Oplog, + WorkerThreadPool& NetworkWorkerPool, + WorkerThreadPool& WorkerPool, bool ForceDownload, bool IgnoreMissingAttachments, bool CleanOplog, @@ -2898,9 +2899,6 @@ LoadOplog(CidStore& ChunkStore, Stopwatch Timer; - WorkerThreadPool& WorkerPool = GetLargeWorkerPool(EWorkloadType::Background); - WorkerThreadPool& NetworkWorkerPool = GetSmallWorkerPool(EWorkloadType::Background); - std::unordered_set<IoHash, IoHash::Hasher> Attachments; uint64_t BlockCountToDownload = 0; @@ -3454,10 +3452,15 @@ TEST_CASE_TEMPLATE("project.store.export", std::shared_ptr<RemoteProjectStore> RemoteStore = CreateFileRemoteStore(Options); RemoteProjectStore::RemoteStoreInfo StoreInfo = RemoteStore->GetInfo(); + WorkerThreadPool WorkerPool(4); + WorkerThreadPool NetworkPool(2); + RemoteProjectStore::Result ExportResult = SaveOplog(CidStore, *RemoteStore, *Project.Get(), *Oplog, + NetworkPool, + WorkerPool, Options.MaxBlockSize, Options.MaxChunksPerBlock, Options.MaxChunkEmbedSize, @@ -3475,6 +3478,8 @@ TEST_CASE_TEMPLATE("project.store.export", RemoteProjectStore::Result ImportResult = LoadOplog(CidStore, *RemoteStore, *OplogImport, + NetworkPool, + WorkerPool, /*Force*/ false, /*IgnoreMissingAttachments*/ false, /*CleanOplog*/ false, @@ -3484,6 +3489,8 @@ TEST_CASE_TEMPLATE("project.store.export", RemoteProjectStore::Result ImportForceResult = LoadOplog(CidStore, *RemoteStore, *OplogImport, + NetworkPool, + WorkerPool, /*Force*/ true, /*IgnoreMissingAttachments*/ false, /*CleanOplog*/ false, @@ -3493,6 +3500,8 @@ TEST_CASE_TEMPLATE("project.store.export", RemoteProjectStore::Result ImportCleanResult = LoadOplog(CidStore, *RemoteStore, *OplogImport, + NetworkPool, + WorkerPool, /*Force*/ false, /*IgnoreMissingAttachments*/ false, /*CleanOplog*/ true, @@ -3502,6 +3511,8 @@ TEST_CASE_TEMPLATE("project.store.export", RemoteProjectStore::Result ImportForceCleanResult = LoadOplog(CidStore, *RemoteStore, *OplogImport, + NetworkPool, + WorkerPool, /*Force*/ true, /*IgnoreMissingAttachments*/ false, /*CleanOplog*/ true, diff --git a/src/zenremotestore/xmake.lua b/src/zenremotestore/xmake.lua index 35d554710..0818dda7b 100644 --- a/src/zenremotestore/xmake.lua +++ b/src/zenremotestore/xmake.lua @@ -6,6 +6,6 @@ target('zenremotestore') add_headerfiles("**.h") add_files("**.cpp") add_includedirs("include", {public=true}) - add_deps("zencore", "zenstore", "zenutil") + add_deps("zencore", "zenstore") add_packages("vcpkg::robin-map") add_packages("vcpkg::eastl", {public=true}); |