aboutsummaryrefslogtreecommitdiff
path: root/src/zenremotestore
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-10-03 12:38:35 +0200
committerGitHub Enterprise <[email protected]>2025-10-03 12:38:35 +0200
commit5361ee1c77b68bb14237169660840d6d63a74892 (patch)
tree3ad259133e09485a14506be38e43ec5b62a050f2 /src/zenremotestore
parentmove chunking code to zenremotestore lib (#545) (diff)
downloadzen-5361ee1c77b68bb14237169660840d6d63a74892.tar.xz
zen-5361ee1c77b68bb14237169660840d6d63a74892.zip
remove zenutil dependency in zenremotestore (#547)
* remove dependency to zenutil/workerpools.h from remoteprojectstore.cpp * remove dependency to zenutil/workerpools.h from buildstoragecache.cpp * remove unneded include * move jupiter helpers to zenremotestore * move parallelwork to zencore * remove zenutil dependency from zenremotestore * clean up test project dependencies - use indirect dependencies
Diffstat (limited to 'src/zenremotestore')
-rw-r--r--src/zenremotestore/builds/buildstoragecache.cpp12
-rw-r--r--src/zenremotestore/builds/jupiterbuildstorage.cpp2
-rw-r--r--src/zenremotestore/chunking/chunkedcontent.cpp4
-rw-r--r--src/zenremotestore/include/zenremotestore/builds/buildstoragecache.h3
-rw-r--r--src/zenremotestore/include/zenremotestore/jupiter/jupiterclient.h56
-rw-r--r--src/zenremotestore/include/zenremotestore/jupiter/jupiterhost.h35
-rw-r--r--src/zenremotestore/include/zenremotestore/jupiter/jupitersession.h179
-rw-r--r--src/zenremotestore/include/zenremotestore/projectstore/remoteprojectstore.h5
-rw-r--r--src/zenremotestore/jupiter/jupiterclient.cpp28
-rw-r--r--src/zenremotestore/jupiter/jupiterhost.cpp66
-rw-r--r--src/zenremotestore/jupiter/jupitersession.cpp871
-rw-r--r--src/zenremotestore/projectstore/jupiterremoteprojectstore.cpp4
-rw-r--r--src/zenremotestore/projectstore/remoteprojectstore.cpp27
-rw-r--r--src/zenremotestore/xmake.lua2
14 files changed, 1270 insertions, 24 deletions
diff --git a/src/zenremotestore/builds/buildstoragecache.cpp b/src/zenremotestore/builds/buildstoragecache.cpp
index 694e364ea..d36d75480 100644
--- a/src/zenremotestore/builds/buildstoragecache.cpp
+++ b/src/zenremotestore/builds/buildstoragecache.cpp
@@ -11,7 +11,6 @@
#include <zencore/workthreadpool.h>
#include <zenhttp/httpclient.h>
#include <zenhttp/packageformat.h>
-#include <zenutil/workerpools.h>
ZEN_THIRD_PARTY_INCLUDES_START
#include <tsl/robin_set.h>
@@ -29,15 +28,13 @@ public:
std::string_view Namespace,
std::string_view Bucket,
const std::filesystem::path& TempFolderPath,
- bool BoostBackgroundThreadCount)
+ WorkerThreadPool& BackgroundWorkerPool)
: m_HttpClient(HttpClient)
, m_Stats(Stats)
, m_Namespace(Namespace.empty() ? "none" : Namespace)
, m_Bucket(Bucket.empty() ? "none" : Bucket)
, m_TempFolderPath(std::filesystem::path(TempFolderPath).make_preferred())
- , m_BoostBackgroundThreadCount(BoostBackgroundThreadCount)
- , m_BackgroundWorkPool(m_BoostBackgroundThreadCount ? GetSmallWorkerPool(EWorkloadType::Background)
- : GetTinyWorkerPool(EWorkloadType::Background))
+ , m_BackgroundWorkPool(BackgroundWorkerPool)
, m_PendingBackgroundWorkCount(1)
, m_CancelBackgroundWork(false)
{
@@ -394,7 +391,6 @@ private:
const std::string m_Namespace;
const std::string m_Bucket;
const std::filesystem::path m_TempFolderPath;
- const bool m_BoostBackgroundThreadCount;
bool IsFlushed = false;
WorkerThreadPool& m_BackgroundWorkPool;
@@ -408,9 +404,9 @@ CreateZenBuildStorageCache(HttpClient& HttpClient,
std::string_view Namespace,
std::string_view Bucket,
const std::filesystem::path& TempFolderPath,
- bool BoostBackgroundThreadCount)
+ WorkerThreadPool& BackgroundWorkerPool)
{
- return std::make_unique<ZenBuildStorageCache>(HttpClient, Stats, Namespace, Bucket, TempFolderPath, BoostBackgroundThreadCount);
+ return std::make_unique<ZenBuildStorageCache>(HttpClient, Stats, Namespace, Bucket, TempFolderPath, BackgroundWorkerPool);
}
ZenCacheEndpointTestResult
diff --git a/src/zenremotestore/builds/jupiterbuildstorage.cpp b/src/zenremotestore/builds/jupiterbuildstorage.cpp
index f1f9b3555..14a5ecc85 100644
--- a/src/zenremotestore/builds/jupiterbuildstorage.cpp
+++ b/src/zenremotestore/builds/jupiterbuildstorage.cpp
@@ -8,7 +8,7 @@
#include <zencore/scopeguard.h>
#include <zencore/timer.h>
#include <zencore/trace.h>
-#include <zenutil/jupiter/jupitersession.h>
+#include <zenremotestore/jupiter/jupitersession.h>
ZEN_THIRD_PARTY_INCLUDES_START
#include <tsl/robin_map.h>
diff --git a/src/zenremotestore/chunking/chunkedcontent.cpp b/src/zenremotestore/chunking/chunkedcontent.cpp
index e97dcff15..9df7725db 100644
--- a/src/zenremotestore/chunking/chunkedcontent.cpp
+++ b/src/zenremotestore/chunking/chunkedcontent.cpp
@@ -5,14 +5,12 @@
#include <zencore/filesystem.h>
#include <zencore/fmtutils.h>
#include <zencore/logging.h>
+#include <zencore/parallelwork.h>
#include <zencore/scopeguard.h>
#include <zencore/timer.h>
#include <zencore/trace.h>
-
#include <zenremotestore/chunking/chunkedfile.h>
#include <zenremotestore/chunking/chunkingcontroller.h>
-#include <zenutil/parallelwork.h>
-#include <zenutil/workerpools.h>
ZEN_THIRD_PARTY_INCLUDES_START
#include <tsl/robin_set.h>
diff --git a/src/zenremotestore/include/zenremotestore/builds/buildstoragecache.h b/src/zenremotestore/include/zenremotestore/builds/buildstoragecache.h
index 2e8024915..e30270848 100644
--- a/src/zenremotestore/include/zenremotestore/builds/buildstoragecache.h
+++ b/src/zenremotestore/include/zenremotestore/builds/buildstoragecache.h
@@ -11,6 +11,7 @@
namespace zen {
class HttpClient;
+class WorkerThreadPool;
class BuildStorageCache
{
@@ -56,7 +57,7 @@ std::unique_ptr<BuildStorageCache> CreateZenBuildStorageCache(HttpClient& H
std::string_view Namespace,
std::string_view Bucket,
const std::filesystem::path& TempFolderPath,
- bool BoostBackgroundThreadCount);
+ WorkerThreadPool& BackgroundWorkerPool);
struct ZenCacheEndpointTestResult
{
diff --git a/src/zenremotestore/include/zenremotestore/jupiter/jupiterclient.h b/src/zenremotestore/include/zenremotestore/jupiter/jupiterclient.h
new file mode 100644
index 000000000..8a51bd60a
--- /dev/null
+++ b/src/zenremotestore/include/zenremotestore/jupiter/jupiterclient.h
@@ -0,0 +1,56 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zenbase/refcount.h>
+#include <zencore/logging.h>
+#include <zenhttp/httpclient.h>
+
+#include <chrono>
+
+namespace zen {
+
+class IoBuffer;
+
+struct JupiterClientOptions
+{
+ std::string_view Name;
+ std::string_view ServiceUrl;
+ std::string_view DdcNamespace;
+ std::string_view BlobStoreNamespace;
+ std::string_view ComputeCluster;
+ std::chrono::milliseconds ConnectTimeout{5000};
+ std::chrono::milliseconds Timeout{};
+ bool AssumeHttp2 = false;
+ bool AllowResume = false;
+ uint8_t RetryCount = 0;
+};
+
+/**
+ * Jupiter upstream cache client
+ */
+class JupiterClient : public RefCounted
+{
+public:
+ JupiterClient(const JupiterClientOptions& Options, std::function<HttpClientAccessToken()>&& TokenProvider);
+ ~JupiterClient();
+
+ std::string_view DefaultDdcNamespace() const { return m_DefaultDdcNamespace; }
+ std::string_view DefaultBlobStoreNamespace() const { return m_DefaultBlobStoreNamespace; }
+ std::string_view ComputeCluster() const { return m_ComputeCluster; }
+ std::string_view ServiceUrl() const { return m_HttpClient.GetBaseUri(); }
+
+ LoggerRef Logger() { return m_Log; }
+ HttpClient& Client() { return m_HttpClient; }
+
+private:
+ LoggerRef m_Log;
+ const std::string m_DefaultDdcNamespace;
+ const std::string m_DefaultBlobStoreNamespace;
+ const std::string m_ComputeCluster;
+ HttpClient m_HttpClient;
+
+ friend class JupiterSession;
+};
+
+} // namespace zen
diff --git a/src/zenremotestore/include/zenremotestore/jupiter/jupiterhost.h b/src/zenremotestore/include/zenremotestore/jupiter/jupiterhost.h
new file mode 100644
index 000000000..3bbc700b8
--- /dev/null
+++ b/src/zenremotestore/include/zenremotestore/jupiter/jupiterhost.h
@@ -0,0 +1,35 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <string>
+#include <string_view>
+#include <vector>
+
+namespace zen {
+
+struct HttpClientSettings;
+
+struct JupiterServerDiscovery
+{
+ struct EndPoint
+ {
+ std::string Name;
+ std::string BaseUrl;
+ bool AssumeHttp2 = false;
+ };
+ std::vector<EndPoint> ServerEndPoints;
+ std::vector<EndPoint> CacheEndPoints;
+};
+
+JupiterServerDiscovery DiscoverJupiterEndpoints(std::string_view Host, const HttpClientSettings& ClientSettings);
+
+struct JupiterEndpointTestResult
+{
+ bool Success = false;
+ std::string FailureReason;
+};
+
+JupiterEndpointTestResult TestJupiterEndpoint(std::string_view BaseUrl, const bool AssumeHttp2);
+
+} // namespace zen
diff --git a/src/zenremotestore/include/zenremotestore/jupiter/jupitersession.h b/src/zenremotestore/include/zenremotestore/jupiter/jupitersession.h
new file mode 100644
index 000000000..b79790f25
--- /dev/null
+++ b/src/zenremotestore/include/zenremotestore/jupiter/jupitersession.h
@@ -0,0 +1,179 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/iohash.h>
+#include <zencore/logging.h>
+#include <zenhttp/httpclient.h>
+
+#include <set>
+
+namespace zen {
+
+class IoBuffer;
+
+struct JupiterResult
+{
+ IoBuffer Response;
+ uint64_t SentBytes{};
+ uint64_t ReceivedBytes{};
+ double ElapsedSeconds{};
+ int32_t ErrorCode{};
+ std::string Reason;
+ bool Success = false;
+};
+
+struct PutRefResult : JupiterResult
+{
+ std::vector<IoHash> Needs;
+ IoHash RawHash;
+};
+
+struct FinalizeRefResult : JupiterResult
+{
+ std::vector<IoHash> Needs;
+};
+
+struct JupiterExistsResult : JupiterResult
+{
+ std::set<IoHash> Needs;
+};
+
+struct GetObjectReferencesResult : JupiterResult
+{
+ std::set<IoHash> References;
+};
+
+struct PutBuildPartResult : JupiterResult
+{
+ std::vector<IoHash> Needs;
+ IoHash RawHash;
+};
+
+struct FinalizeBuildPartResult : JupiterResult
+{
+ std::vector<IoHash> Needs;
+};
+
+/**
+ * Context for performing Jupiter operations
+ *
+ * Maintains an HTTP connection so that subsequent operations don't need to go
+ * through the whole connection setup process
+ *
+ */
+class JupiterSession
+{
+public:
+ JupiterSession(LoggerRef InLog, HttpClient& InHttpClient, bool AllowRedirect);
+ ~JupiterSession();
+
+ JupiterResult Authenticate();
+
+ JupiterResult GetRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, ZenContentType RefType);
+ JupiterResult GetBlob(std::string_view Namespace, const IoHash& Key);
+ JupiterResult GetCompressedBlob(std::string_view Namespace, const IoHash& Key, std::filesystem::path TempFolderPath = {});
+ JupiterResult GetObject(std::string_view Namespace, const IoHash& Key);
+ JupiterResult GetInlineBlob(std::string_view Namespace,
+ std::string_view BucketId,
+ const IoHash& Key,
+ IoHash& OutPayloadHash,
+ std::filesystem::path TempFolderPath = {});
+
+ PutRefResult PutRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, IoBuffer Ref, ZenContentType RefType);
+ JupiterResult PutBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob);
+ JupiterResult PutCompressedBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob);
+ JupiterResult PutCompressedBlob(std::string_view Namespace, const IoHash& Key, const CompositeBuffer& Blob);
+ JupiterResult PutObject(std::string_view Namespace, const IoHash& Key, IoBuffer Object);
+
+ FinalizeRefResult FinalizeRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, const IoHash& RefHah);
+
+ JupiterResult RefExists(std::string_view Namespace, std::string_view BucketId, const IoHash& Key);
+
+ GetObjectReferencesResult GetObjectReferences(std::string_view Namespace, const IoHash& Key);
+
+ JupiterResult BlobExists(std::string_view Namespace, const IoHash& Key);
+ JupiterResult CompressedBlobExists(std::string_view Namespace, const IoHash& Key);
+ JupiterResult ObjectExists(std::string_view Namespace, const IoHash& Key);
+
+ JupiterExistsResult BlobExists(std::string_view Namespace, const std::set<IoHash>& Keys);
+ JupiterExistsResult CompressedBlobExists(std::string_view Namespace, const std::set<IoHash>& Keys);
+ JupiterExistsResult ObjectExists(std::string_view Namespace, const std::set<IoHash>& Keys);
+
+ std::vector<IoHash> Filter(std::string_view Namespace, std::string_view BucketId, const std::vector<IoHash>& ChunkHashes);
+
+ JupiterResult ListBuildNamespaces();
+ JupiterResult ListBuildBuckets(std::string_view Namespace);
+ JupiterResult ListBuilds(std::string_view Namespace, std::string_view BucketId, const IoBuffer& Payload);
+ JupiterResult PutBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const IoBuffer& Payload);
+ JupiterResult GetBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId);
+ JupiterResult FinalizeBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId);
+ PutBuildPartResult PutBuildPart(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const Oid& PartId,
+ std::string_view PartName,
+ const IoBuffer& Payload);
+ JupiterResult GetBuildPart(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const Oid& PartId);
+ JupiterResult PutBuildBlob(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const IoHash& Hash,
+ ZenContentType ContentType,
+ const CompositeBuffer& Payload);
+ JupiterResult GetBuildBlob(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const IoHash& Hash,
+ std::filesystem::path TempFolderPath,
+ uint64_t Offset = 0,
+ uint64_t Size = (uint64_t)-1);
+
+ JupiterResult PutMultipartBuildBlob(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const IoHash& Hash,
+ ZenContentType ContentType,
+ uint64_t PayloadSize,
+ std::function<IoBuffer(uint64_t Offset, uint64_t Size)>&& Transmitter,
+ std::vector<std::function<JupiterResult(bool& OutIsComplete)>>& OutWorkItems);
+ JupiterResult GetMultipartBuildBlob(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const IoHash& Hash,
+ uint64_t ChunkSize,
+ std::function<void(uint64_t Offset, const IoBuffer& Chunk)>&& OnReceive,
+ std::function<void()>&& OnComplete,
+ std::vector<std::function<JupiterResult()>>& OutWorkItems);
+ JupiterResult PutBlockMetadata(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const IoHash& Hash,
+ const IoBuffer& Payload);
+ FinalizeBuildPartResult FinalizeBuildPart(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const Oid& PartId,
+ const IoHash& RawHash);
+ JupiterResult FindBlocks(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, uint64_t MaxBlockCount);
+ JupiterResult GetBlockMetadata(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, IoBuffer Payload);
+
+ JupiterResult PutBuildPartStats(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const Oid& BuildPartId,
+ IoBuffer Payload);
+
+private:
+ inline LoggerRef Log() { return m_Log; }
+
+ JupiterResult CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const IoHash& Key);
+
+ JupiterExistsResult CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const std::set<IoHash>& Keys);
+
+ LoggerRef m_Log;
+ HttpClient& m_HttpClient;
+ const bool m_AllowRedirect = false;
+};
+
+} // namespace zen
diff --git a/src/zenremotestore/include/zenremotestore/projectstore/remoteprojectstore.h b/src/zenremotestore/include/zenremotestore/projectstore/remoteprojectstore.h
index 7e5af5e6b..fbcdde955 100644
--- a/src/zenremotestore/include/zenremotestore/projectstore/remoteprojectstore.h
+++ b/src/zenremotestore/include/zenremotestore/projectstore/remoteprojectstore.h
@@ -126,6 +126,7 @@ RemoteProjectStore::LoadContainerResult BuildContainer(
CidStore& ChunkStore,
ProjectStore::Project& Project,
ProjectStore::Oplog& Oplog,
+ WorkerThreadPool& WorkerPool,
size_t MaxBlockSize,
size_t MaxChunksPerBlock,
size_t MaxChunkEmbedSize,
@@ -153,6 +154,8 @@ RemoteProjectStore::Result SaveOplog(CidStore& ChunkStore,
RemoteProjectStore& RemoteStore,
ProjectStore::Project& Project,
ProjectStore::Oplog& Oplog,
+ WorkerThreadPool& NetworkWorkerPool,
+ WorkerThreadPool& WorkerPool,
size_t MaxBlockSize,
size_t MaxChunksPerBlock,
size_t MaxChunkEmbedSize,
@@ -165,6 +168,8 @@ RemoteProjectStore::Result SaveOplog(CidStore& ChunkStore,
RemoteProjectStore::Result LoadOplog(CidStore& ChunkStore,
RemoteProjectStore& RemoteStore,
ProjectStore::Oplog& Oplog,
+ WorkerThreadPool& NetworkWorkerPool,
+ WorkerThreadPool& WorkerPool,
bool ForceDownload,
bool IgnoreMissingAttachments,
bool CleanOplog,
diff --git a/src/zenremotestore/jupiter/jupiterclient.cpp b/src/zenremotestore/jupiter/jupiterclient.cpp
new file mode 100644
index 000000000..bf9d8e346
--- /dev/null
+++ b/src/zenremotestore/jupiter/jupiterclient.cpp
@@ -0,0 +1,28 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <zenremotestore/jupiter/jupiterclient.h>
+
+namespace zen {
+
+using namespace std::literals;
+
+JupiterClient::JupiterClient(const JupiterClientOptions& Options, std::function<HttpClientAccessToken()>&& TokenProvider)
+: m_Log(zen::logging::Get("jupiter"sv))
+, m_DefaultDdcNamespace(Options.DdcNamespace)
+, m_DefaultBlobStoreNamespace(Options.BlobStoreNamespace)
+, m_ComputeCluster(Options.ComputeCluster)
+, m_HttpClient(Options.ServiceUrl,
+ HttpClientSettings{.ConnectTimeout = Options.ConnectTimeout,
+ .Timeout = Options.Timeout,
+ .AccessTokenProvider = std::move(TokenProvider),
+ .AssumeHttp2 = Options.AssumeHttp2,
+ .AllowResume = Options.AllowResume,
+ .RetryCount = Options.RetryCount})
+{
+}
+
+JupiterClient::~JupiterClient()
+{
+}
+
+} // namespace zen
diff --git a/src/zenremotestore/jupiter/jupiterhost.cpp b/src/zenremotestore/jupiter/jupiterhost.cpp
new file mode 100644
index 000000000..df6be10c9
--- /dev/null
+++ b/src/zenremotestore/jupiter/jupiterhost.cpp
@@ -0,0 +1,66 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <zenremotestore/jupiter/jupiterhost.h>
+
+#include <zencore/compactbinary.h>
+#include <zencore/fmtutils.h>
+#include <zenhttp/httpclient.h>
+
+namespace zen {
+
+JupiterServerDiscovery
+DiscoverJupiterEndpoints(std::string_view Host, const HttpClientSettings& ClientSettings)
+{
+ JupiterServerDiscovery Result;
+
+ HttpClient DiscoveryHttpClient(Host, ClientSettings);
+ HttpClient::Response ServerInfoResponse = DiscoveryHttpClient.Get("/api/v1/status/servers", HttpClient::Accept(HttpContentType::kJSON));
+ if (!ServerInfoResponse.IsSuccess())
+ {
+ ServerInfoResponse.ThrowError(fmt::format("Failed to get list of servers from discovery url '{}'", Host));
+ }
+ std::string_view JsonResponse = ServerInfoResponse.AsText();
+ CbObject CbPayload = LoadCompactBinaryFromJson(JsonResponse).AsObject();
+ CbArrayView ServerEndpoints = CbPayload["serverEndpoints"].AsArrayView();
+ Result.ServerEndPoints.reserve(ServerEndpoints.Num());
+
+ auto ParseEndPoints = [](CbArrayView ServerEndpoints) -> std::vector<JupiterServerDiscovery::EndPoint> {
+ std::vector<JupiterServerDiscovery::EndPoint> Result;
+
+ Result.reserve(ServerEndpoints.Num());
+ for (CbFieldView ServerEndpointView : ServerEndpoints)
+ {
+ CbObjectView ServerEndPoint = ServerEndpointView.AsObjectView();
+ Result.push_back(JupiterServerDiscovery::EndPoint{.Name = std::string(ServerEndPoint["name"].AsString()),
+ .BaseUrl = std::string(ServerEndPoint["baseUrl"].AsString()),
+ .AssumeHttp2 = ServerEndPoint["baseUrl"].AsBool(false)});
+ }
+ return Result;
+ };
+
+ Result.ServerEndPoints = ParseEndPoints(CbPayload["serverEndpoints"].AsArrayView());
+ Result.CacheEndPoints = ParseEndPoints(CbPayload["cacheEndpoints"].AsArrayView());
+
+ return Result;
+}
+
+JupiterEndpointTestResult
+TestJupiterEndpoint(std::string_view BaseUrl, const bool AssumeHttp2)
+{
+ HttpClientSettings TestClientSettings{.LogCategory = "httpbuildsclient",
+ .ConnectTimeout = std::chrono::milliseconds{1000},
+ .Timeout = std::chrono::milliseconds{2000},
+ .AssumeHttp2 = AssumeHttp2,
+ .AllowResume = true,
+ .RetryCount = 0};
+
+ HttpClient TestHttpClient(BaseUrl, TestClientSettings);
+ HttpClient::Response TestResponse = TestHttpClient.Get("/health/live");
+ if (TestResponse.IsSuccess())
+ {
+ return {.Success = true};
+ }
+ return {.Success = false, .FailureReason = TestResponse.ErrorMessage("")};
+}
+
+} // namespace zen
diff --git a/src/zenremotestore/jupiter/jupitersession.cpp b/src/zenremotestore/jupiter/jupitersession.cpp
new file mode 100644
index 000000000..942e2a9dc
--- /dev/null
+++ b/src/zenremotestore/jupiter/jupitersession.cpp
@@ -0,0 +1,871 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <zenremotestore/jupiter/jupitersession.h>
+
+#include <zencore/compactbinary.h>
+#include <zencore/compactbinarybuilder.h>
+#include <zencore/compactbinaryutil.h>
+#include <zencore/compositebuffer.h>
+#include <zencore/compress.h>
+#include <zencore/fmtutils.h>
+#include <zencore/trace.h>
+
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <json11.hpp>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+using namespace std::literals;
+
+namespace zen {
+
+namespace detail {
+ JupiterResult ConvertResponse(const HttpClient::Response& Response, const std::string_view ErrorPrefix = ""sv)
+ {
+ if (Response.Error)
+ {
+ return {.SentBytes = gsl::narrow<uint64_t>(Response.UploadedBytes),
+ .ReceivedBytes = gsl::narrow<uint64_t>(Response.DownloadedBytes),
+ .ElapsedSeconds = Response.ElapsedSeconds,
+ .ErrorCode = Response.Error.value().ErrorCode,
+ .Reason = Response.ErrorMessage(ErrorPrefix),
+ .Success = false};
+ }
+ if (!Response.IsSuccess())
+ {
+ return {.SentBytes = gsl::narrow<uint64_t>(Response.UploadedBytes),
+ .ReceivedBytes = gsl::narrow<uint64_t>(Response.DownloadedBytes),
+ .ElapsedSeconds = Response.ElapsedSeconds,
+ .ErrorCode = static_cast<int32_t>(Response.StatusCode),
+ .Reason = Response.ErrorMessage(ErrorPrefix),
+ .Success = false};
+ }
+ return {.Response = Response.ResponsePayload,
+ .SentBytes = gsl::narrow<uint64_t>(Response.UploadedBytes),
+ .ReceivedBytes = gsl::narrow<uint64_t>(Response.DownloadedBytes),
+ .ElapsedSeconds = Response.ElapsedSeconds,
+ .ErrorCode = 0,
+ .Success = true};
+ }
+} // namespace detail
+
+JupiterSession::JupiterSession(LoggerRef InLog, HttpClient& InHttpClient, bool AllowRedirect)
+: m_Log(InLog)
+, m_HttpClient(InHttpClient)
+, m_AllowRedirect(AllowRedirect)
+{
+}
+
+JupiterSession::~JupiterSession()
+{
+}
+
+JupiterResult
+JupiterSession::Authenticate()
+{
+ bool OK = m_HttpClient.Authenticate();
+ return {.Success = OK};
+}
+
+JupiterResult
+JupiterSession::GetRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, ZenContentType RefType)
+{
+ ZEN_TRACE_CPU("JupiterClient::GetRef");
+
+ HttpClient::Response Response =
+ m_HttpClient.Get(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString()), {HttpClient::Accept(RefType)});
+
+ return detail::ConvertResponse(Response, "JupiterSession::GetRef"sv);
+}
+
+JupiterResult
+JupiterSession::GetBlob(std::string_view Namespace, const IoHash& Key)
+{
+ ZEN_TRACE_CPU("JupiterClient::GetBlob");
+ HttpClient::Response Response =
+ m_HttpClient.Get(fmt::format("/api/v1/blobs/{}/{}", Namespace, Key.ToHexString()), {HttpClient::Accept(ZenContentType::kBinary)});
+
+ return detail::ConvertResponse(Response);
+}
+
+JupiterResult
+JupiterSession::GetCompressedBlob(std::string_view Namespace, const IoHash& Key, std::filesystem::path TempFolderPath)
+{
+ ZEN_TRACE_CPU("JupiterClient::GetCompressedBlob");
+
+ HttpClient::Response Response = m_HttpClient.Download(fmt::format("/api/v1/compressed-blobs/{}/{}", Namespace, Key.ToHexString()),
+ TempFolderPath,
+ {HttpClient::Accept(ZenContentType::kCompressedBinary)});
+
+ return detail::ConvertResponse(Response);
+}
+
+JupiterResult
+JupiterSession::GetInlineBlob(std::string_view Namespace,
+ std::string_view BucketId,
+ const IoHash& Key,
+ IoHash& OutPayloadHash,
+ std::filesystem::path TempFolderPath)
+{
+ ZEN_TRACE_CPU("JupiterClient::GetInlineBlob");
+
+ HttpClient::Response Response = m_HttpClient.Download(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString()),
+ TempFolderPath,
+ {{"Accept", "application/x-jupiter-inline"}});
+
+ JupiterResult Result = detail::ConvertResponse(Response);
+
+ if (auto It = Response.Header->find("X-Jupiter-InlinePayloadHash"); It != Response.Header->end())
+ {
+ const std::string& PayloadHashHeader = It->second;
+ if (PayloadHashHeader.length() == IoHash::StringLength)
+ {
+ OutPayloadHash = IoHash::FromHexString(PayloadHashHeader);
+ }
+ }
+
+ return Result;
+}
+
+JupiterResult
+JupiterSession::GetObject(std::string_view Namespace, const IoHash& Key)
+{
+ ZEN_TRACE_CPU("JupiterClient::GetObject");
+
+ HttpClient::Response Response = m_HttpClient.Get(fmt::format("/api/v1/objects/{}/{}", Namespace, Key.ToHexString()),
+ {HttpClient::Accept(ZenContentType::kCbObject)});
+
+ return detail::ConvertResponse(Response);
+}
+
+PutRefResult
+JupiterSession::PutRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, IoBuffer Ref, ZenContentType RefType)
+{
+ ZEN_TRACE_CPU("JupiterClient::PutRef");
+
+ Ref.SetContentType(RefType);
+
+ IoHash Hash = IoHash::HashBuffer(Ref.Data(), Ref.Size());
+
+ HttpClient::Response Response = m_HttpClient.Put(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString()),
+ Ref,
+ {{"X-Jupiter-IoHash", Hash.ToHexString()}});
+
+ PutRefResult Result = {detail::ConvertResponse(Response)};
+ if (Result.Success)
+ {
+ std::string JsonError;
+ json11::Json Json = json11::Json::parse(Response.ToText(), JsonError);
+ if (JsonError.empty())
+ {
+ json11::Json::array Needs = Json["needs"].array_items();
+ for (const auto& Need : Needs)
+ {
+ Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value()));
+ }
+ }
+ Result.RawHash = Hash;
+ }
+ return Result;
+}
+
+FinalizeRefResult
+JupiterSession::FinalizeRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, const IoHash& RefHash)
+{
+ ZEN_TRACE_CPU("JupiterClient::FinalizeRef");
+
+ HttpClient::Response Response =
+ m_HttpClient.Post(fmt::format("/api/v1/refs/{}/{}/{}/finalize/{}", Namespace, BucketId, Key.ToHexString(), RefHash.ToHexString()),
+ {{"X-Jupiter-IoHash", RefHash.ToHexString()}, {"Content-Type", "application/x-ue-cb"}});
+
+ FinalizeRefResult Result = {detail::ConvertResponse(Response)};
+
+ if (Result.Success)
+ {
+ std::string JsonError;
+ json11::Json Json = json11::Json::parse(std::string(Response.ToText()), JsonError);
+ if (JsonError.empty())
+ {
+ json11::Json::array Needs = Json["needs"].array_items();
+ for (const auto& Need : Needs)
+ {
+ Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value()));
+ }
+ }
+ }
+ return Result;
+}
+
+JupiterResult
+JupiterSession::PutBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob)
+{
+ ZEN_TRACE_CPU("JupiterClient::PutBlob");
+
+ HttpClient::Response Response = m_HttpClient.Put(fmt::format("/api/v1/blobs/{}/{}", Namespace, Key.ToHexString()), Blob);
+
+ return detail::ConvertResponse(Response);
+}
+
+JupiterResult
+JupiterSession::PutCompressedBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob)
+{
+ ZEN_TRACE_CPU("JupiterClient::PutCompressedBlob");
+
+ Blob.SetContentType(ZenContentType::kCompressedBinary);
+ HttpClient::Response Response = m_HttpClient.Upload(fmt::format("/api/v1/compressed-blobs/{}/{}", Namespace, Key.ToHexString()), Blob);
+
+ return detail::ConvertResponse(Response);
+}
+
+JupiterResult
+JupiterSession::PutCompressedBlob(std::string_view Namespace, const IoHash& Key, const CompositeBuffer& Payload)
+{
+ ZEN_TRACE_CPU("JupiterClient::PutCompressedBlob");
+
+ HttpClient::Response Response = m_HttpClient.Upload(fmt::format("/api/v1/compressed-blobs/{}/{}", Namespace, Key.ToHexString()),
+ Payload,
+ ZenContentType::kCompressedBinary);
+
+ return detail::ConvertResponse(Response);
+}
+
+JupiterResult
+JupiterSession::PutObject(std::string_view Namespace, const IoHash& Key, IoBuffer Object)
+{
+ ZEN_TRACE_CPU("JupiterClient::PutObject");
+
+ Object.SetContentType(ZenContentType::kCbObject);
+ HttpClient::Response Response = m_HttpClient.Upload(fmt::format("/api/v1/objects/{}/{}", Namespace, Key.ToHexString()), Object);
+
+ return detail::ConvertResponse(Response);
+}
+
+JupiterResult
+JupiterSession::RefExists(std::string_view Namespace, std::string_view BucketId, const IoHash& Key)
+{
+ ZEN_TRACE_CPU("JupiterClient::RefExists");
+
+ HttpClient::Response Response = m_HttpClient.Head(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString()));
+
+ return detail::ConvertResponse(Response);
+}
+
+GetObjectReferencesResult
+JupiterSession::GetObjectReferences(std::string_view Namespace, const IoHash& Key)
+{
+ ZEN_TRACE_CPU("JupiterClient::GetObjectReferences");
+
+ HttpClient::Response Response = m_HttpClient.Head(fmt::format("/api/v1/refs/{}/{}/references", Namespace, Key.ToHexString()),
+ {HttpClient::Accept(ZenContentType::kCbObject)});
+
+ GetObjectReferencesResult Result = {detail::ConvertResponse(Response)};
+
+ if (Result.Success)
+ {
+ const CbObject ReferencesResponse = Response.AsObject();
+ for (auto& Item : ReferencesResponse["references"sv])
+ {
+ Result.References.insert(Item.AsHash());
+ }
+ }
+ return Result;
+}
+
+JupiterResult
+JupiterSession::BlobExists(std::string_view Namespace, const IoHash& Key)
+{
+ return CacheTypeExists(Namespace, "blobs"sv, Key);
+}
+
+JupiterResult
+JupiterSession::CompressedBlobExists(std::string_view Namespace, const IoHash& Key)
+{
+ return CacheTypeExists(Namespace, "compressed-blobs"sv, Key);
+}
+
+JupiterResult
+JupiterSession::ObjectExists(std::string_view Namespace, const IoHash& Key)
+{
+ return CacheTypeExists(Namespace, "objects"sv, Key);
+}
+
+JupiterExistsResult
+JupiterSession::BlobExists(std::string_view Namespace, const std::set<IoHash>& Keys)
+{
+ return CacheTypeExists(Namespace, "blobs"sv, Keys);
+}
+
+JupiterExistsResult
+JupiterSession::CompressedBlobExists(std::string_view Namespace, const std::set<IoHash>& Keys)
+{
+ return CacheTypeExists(Namespace, "compressed-blobs"sv, Keys);
+}
+
+JupiterExistsResult
+JupiterSession::ObjectExists(std::string_view Namespace, const std::set<IoHash>& Keys)
+{
+ return CacheTypeExists(Namespace, "objects"sv, Keys);
+}
+
+std::vector<IoHash>
+JupiterSession::Filter(std::string_view Namespace, std::string_view BucketId, const std::vector<IoHash>& ChunkHashes)
+{
+ // ExtendableStringBuilder<256> Uri;
+ // Uri << m_CacheClient->ServiceUrl();
+ // Uri << "/api/v1/s/" << Namespace;
+
+ ZEN_UNUSED(Namespace, BucketId, ChunkHashes);
+
+ return {};
+}
+
+JupiterResult
+JupiterSession::CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const IoHash& Key)
+{
+ ZEN_TRACE_CPU("JupiterClient::CacheTypeExists");
+
+ HttpClient::Response Response = m_HttpClient.Head(fmt::format("/api/v1/{}/{}/{}", TypeId, Namespace, Key.ToHexString()));
+
+ return detail::ConvertResponse(Response);
+}
+
+JupiterExistsResult
+JupiterSession::CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const std::set<IoHash>& Keys)
+{
+ ZEN_TRACE_CPU("JupiterClient::CacheTypeExists");
+
+ ExtendableStringBuilder<256> Body;
+ Body << "[";
+ for (const auto& Key : Keys)
+ {
+ Body << (Body.Size() != 1 ? ",\"" : "\"") << Key.ToHexString() << "\"";
+ }
+ Body << "]";
+ IoBuffer Payload = IoBuffer(IoBuffer::Wrap, Body.Data(), Body.Size());
+ Payload.SetContentType(ZenContentType::kJSON);
+
+ HttpClient::Response Response =
+ m_HttpClient.Post(fmt::format("/api/v1/{}/{}/exist", TypeId, Namespace), Payload, {HttpClient::Accept(ZenContentType::kCbObject)});
+
+ JupiterExistsResult Result = {detail::ConvertResponse(Response)};
+
+ if (Result.Success)
+ {
+ const CbObject ExistsResponse = Response.AsObject();
+ for (auto& Item : ExistsResponse["needs"sv])
+ {
+ Result.Needs.insert(Item.AsHash());
+ }
+ }
+ return Result;
+}
+
+JupiterResult
+JupiterSession::ListBuildNamespaces()
+{
+ HttpClient::Response Response = m_HttpClient.Get(fmt::format("/api/v2/builds"), {HttpClient::Accept(ZenContentType::kJSON)});
+ return detail::ConvertResponse(Response, "JupiterSession::ListBuildNamespaces"sv);
+}
+
+JupiterResult
+JupiterSession::ListBuildBuckets(std::string_view Namespace)
+{
+ HttpClient::Response Response =
+ m_HttpClient.Get(fmt::format("/api/v2/builds/{}", Namespace), {HttpClient::Accept(ZenContentType::kJSON)});
+ return detail::ConvertResponse(Response, "JupiterSession::ListBuildBuckets"sv);
+}
+
+JupiterResult
+JupiterSession::ListBuilds(std::string_view Namespace, std::string_view BucketId, const IoBuffer& Payload)
+{
+ ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject);
+ std::string OptionalBucketPath = BucketId.empty() ? "" : fmt::format("/{}", BucketId);
+ HttpClient::Response Response = m_HttpClient.Post(fmt::format("/api/v2/builds/{}{}/search", Namespace, OptionalBucketPath),
+ Payload,
+ {HttpClient::Accept(ZenContentType::kCbObject)});
+ return detail::ConvertResponse(Response, "JupiterSession::ListBuilds"sv);
+}
+
+JupiterResult
+JupiterSession::PutBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const IoBuffer& Payload)
+{
+ ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject);
+ HttpClient::Response Response = m_HttpClient.Put(fmt::format("/api/v2/builds/{}/{}/{}", Namespace, BucketId, BuildId), Payload);
+ return detail::ConvertResponse(Response, "JupiterSession::PutBuild"sv);
+}
+
+JupiterResult
+JupiterSession::GetBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId)
+{
+ HttpClient::Response Response = m_HttpClient.Get(fmt::format("/api/v2/builds/{}/{}/{}", Namespace, BucketId, BuildId),
+ HttpClient::Accept(ZenContentType::kCbObject));
+ return detail::ConvertResponse(Response, "JupiterSession::GetBuild"sv);
+}
+
+JupiterResult
+JupiterSession::FinalizeBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId)
+{
+ HttpClient::Response Response = m_HttpClient.Post(fmt::format("/api/v2/builds/{}/{}/{}/finalize", Namespace, BucketId, BuildId));
+ return detail::ConvertResponse(Response, "JupiterSession::FinalizeBuild"sv);
+}
+
+PutBuildPartResult
+JupiterSession::PutBuildPart(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const Oid& PartId,
+ std::string_view PartName,
+ const IoBuffer& Payload)
+{
+ ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject);
+
+ IoHash Hash = IoHash::HashBuffer(Payload.Data(), Payload.Size());
+
+ HttpClient::Response Response =
+ m_HttpClient.Put(fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/{}", Namespace, BucketId, BuildId, PartId, PartName),
+ Payload,
+ {{"X-Jupiter-IoHash", Hash.ToHexString()}});
+
+ PutBuildPartResult Result = {detail::ConvertResponse(Response, "JupiterSession::PutBuildPart"sv)};
+ if (Result.Success)
+ {
+ std::string JsonError;
+ json11::Json Json = json11::Json::parse(Response.ToText(), JsonError);
+ if (JsonError.empty())
+ {
+ json11::Json::array Needs = Json["needs"].array_items();
+ for (const auto& Need : Needs)
+ {
+ Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value()));
+ }
+ }
+ Result.RawHash = Hash;
+ }
+ return Result;
+}
+
+JupiterResult
+JupiterSession::GetBuildPart(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const Oid& PartId)
+{
+ HttpClient::Response Response = m_HttpClient.Get(fmt::format("/api/v2/builds/{}/{}/{}/parts/{}", Namespace, BucketId, BuildId, PartId),
+ HttpClient::Accept(ZenContentType::kCbObject));
+ return detail::ConvertResponse(Response, "JupiterSession::GetBuildPart"sv);
+}
+
+JupiterResult
+JupiterSession::PutBuildBlob(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const IoHash& Hash,
+ ZenContentType ContentType,
+ const CompositeBuffer& Payload)
+{
+ HttpClient::Response Response =
+ m_HttpClient.Upload(fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}", Namespace, BucketId, BuildId, Hash.ToHexString()),
+ Payload,
+ ContentType);
+ return detail::ConvertResponse(Response, "JupiterSession::PutBuildBlob"sv);
+}
+
+JupiterResult
+JupiterSession::PutMultipartBuildBlob(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const IoHash& Hash,
+ ZenContentType ContentType,
+ uint64_t PayloadSize,
+ std::function<IoBuffer(uint64_t Offset, uint64_t Size)>&& Transmitter,
+ std::vector<std::function<JupiterResult(bool& OutIsComplete)>>& OutWorkItems)
+{
+ struct MultipartUploadResponse
+ {
+ struct Part
+ {
+ uint64_t FirstByte;
+ uint64_t LastByte;
+ std::string PartId;
+ std::string QueryString;
+ };
+
+ std::string UploadId;
+ std::string BlobName;
+ std::vector<Part> Parts;
+
+ static MultipartUploadResponse Parse(CbObject& Payload)
+ {
+ MultipartUploadResponse Result;
+ Result.UploadId = Payload["uploadId"sv].AsString();
+ Result.BlobName = Payload["blobName"sv].AsString();
+ CbArrayView PartsArray = Payload["parts"sv].AsArrayView();
+ Result.Parts.reserve(PartsArray.Num());
+ for (CbFieldView PartView : PartsArray)
+ {
+ CbObjectView PartObject = PartView.AsObjectView();
+ Result.Parts.emplace_back(Part{
+ .FirstByte = PartObject["firstByte"sv].AsUInt64(),
+ .LastByte = PartObject["lastByte"sv].AsUInt64(),
+ .PartId = std::string(PartObject["partId"sv].AsString()),
+ .QueryString = std::string(PartObject["queryString"sv].AsString()),
+ });
+ }
+ return Result;
+ }
+ };
+
+ CbObjectWriter StartMultipartPayloadWriter;
+ StartMultipartPayloadWriter.AddInteger("blobLength"sv, PayloadSize);
+ CbObject StartMultipartPayload = StartMultipartPayloadWriter.Save();
+
+ std::string StartMultipartResponseRequestString =
+ fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}/startMultipartUpload", Namespace, BucketId, BuildId, Hash.ToHexString());
+ // ZEN_INFO("POST: {}", StartMultipartResponseRequestString);
+ HttpClient::Response StartMultipartResponse =
+ m_HttpClient.Post(StartMultipartResponseRequestString, StartMultipartPayload, HttpClient::Accept(ZenContentType::kCbObject));
+ if (!StartMultipartResponse.IsSuccess())
+ {
+ ZEN_WARN("{}", StartMultipartResponse.ErrorMessage("startMultipartUpload: "));
+ return detail::ConvertResponse(StartMultipartResponse, "JupiterSession::PutMultipartBuildBlob"sv);
+ }
+ CbValidateError ValidateResult = CbValidateError::None;
+ CbObject ResponseObject = ValidateAndReadCompactBinaryObject(IoBuffer(StartMultipartResponse.ResponsePayload), ValidateResult);
+ if (ValidateResult != CbValidateError::None)
+ {
+ JupiterResult Result = detail::ConvertResponse(StartMultipartResponse, "JupiterSession::PutMultipartBuildBlob"sv);
+ Result.ErrorCode = (int32)HttpResponseCode::UnsupportedMediaType;
+ Result.Reason = fmt::format("Invalid multipart response object format: '{}'", ToString(ValidateResult));
+ return Result;
+ }
+
+ struct WorkloadData
+ {
+ MultipartUploadResponse PartDescription;
+ std::function<IoBuffer(uint64_t Offset, uint64_t Size)> Transmitter;
+ std::atomic<size_t> PartsLeft;
+ };
+
+ std::shared_ptr<WorkloadData> Workload(std::make_shared<WorkloadData>());
+
+ Workload->PartDescription = MultipartUploadResponse::Parse(ResponseObject);
+ Workload->Transmitter = std::move(Transmitter);
+ Workload->PartsLeft = Workload->PartDescription.Parts.size();
+
+ for (size_t PartIndex = 0; PartIndex < Workload->PartDescription.Parts.size(); PartIndex++)
+ {
+ OutWorkItems.emplace_back([this, Namespace, BucketId, BuildId, Hash, ContentType, Workload, PartIndex](
+ bool& OutIsComplete) -> JupiterResult {
+ const MultipartUploadResponse::Part& Part = Workload->PartDescription.Parts[PartIndex];
+ IoBuffer PartPayload = Workload->Transmitter(Part.FirstByte, Part.LastByte - Part.FirstByte);
+ std::string MultipartUploadResponseRequestString =
+ fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}/uploadMultipart{}&supportsRedirect={}",
+ Namespace,
+ BucketId,
+ BuildId,
+ Hash.ToHexString(),
+ Part.QueryString,
+ m_AllowRedirect ? "true"sv : "false"sv);
+ // ZEN_INFO("PUT: {}", MultipartUploadResponseRequestString);
+ HttpClient::Response MultipartUploadResponse = m_HttpClient.Put(MultipartUploadResponseRequestString, PartPayload);
+ if (!MultipartUploadResponse.IsSuccess())
+ {
+ ZEN_WARN("{}", MultipartUploadResponse.ErrorMessage(MultipartUploadResponseRequestString));
+ }
+ OutIsComplete = Workload->PartsLeft.fetch_sub(1) == 1;
+ if (OutIsComplete)
+ {
+ int64_t TotalUploadedBytes = MultipartUploadResponse.UploadedBytes;
+ int64_t TotalDownloadedBytes = MultipartUploadResponse.DownloadedBytes;
+ double TotalElapsedSeconds = MultipartUploadResponse.ElapsedSeconds;
+ HttpClient::Response MultipartEndResponse = MultipartUploadResponse;
+ while (MultipartEndResponse.IsSuccess())
+ {
+ CbObjectWriter CompletePayloadWriter;
+ CompletePayloadWriter.AddString("blobName"sv, Workload->PartDescription.BlobName);
+ CompletePayloadWriter.AddString("uploadId"sv, Workload->PartDescription.UploadId);
+ CompletePayloadWriter.AddBool("isCompressed"sv, ContentType == ZenContentType::kCompressedBinary);
+ CompletePayloadWriter.BeginArray("partIds"sv);
+ std::unordered_map<std::string, size_t> PartNameToIndex;
+ for (size_t UploadPartIndex = 0; UploadPartIndex < Workload->PartDescription.Parts.size(); UploadPartIndex++)
+ {
+ const MultipartUploadResponse::Part& PartDescription = Workload->PartDescription.Parts[UploadPartIndex];
+ PartNameToIndex.insert({PartDescription.PartId, UploadPartIndex});
+ CompletePayloadWriter.AddString(PartDescription.PartId);
+ }
+ CompletePayloadWriter.EndArray(); // "partIds"
+ CbObject CompletePayload = CompletePayloadWriter.Save();
+
+ std::string MultipartEndResponseRequestString =
+ fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}/completeMultipart", Namespace, BucketId, BuildId, Hash.ToHexString());
+
+ MultipartEndResponse = m_HttpClient.Post(MultipartEndResponseRequestString,
+ CompletePayload,
+ HttpClient::Accept(ZenContentType::kCbObject));
+ TotalUploadedBytes += MultipartEndResponse.UploadedBytes;
+ TotalDownloadedBytes += MultipartEndResponse.DownloadedBytes;
+ TotalElapsedSeconds += MultipartEndResponse.ElapsedSeconds;
+ if (MultipartEndResponse.IsSuccess())
+ {
+ CbObject ResponseObject = MultipartEndResponse.AsObject();
+ CbArrayView MissingPartsArrayView = ResponseObject["missingParts"sv].AsArrayView();
+ if (MissingPartsArrayView.Num() == 0)
+ {
+ break;
+ }
+ else
+ {
+ for (CbFieldView PartIdView : MissingPartsArrayView)
+ {
+ std::string RetryPartId(PartIdView.AsString());
+ size_t RetryPartIndex = PartNameToIndex.at(RetryPartId);
+ const MultipartUploadResponse::Part& RetryPart = Workload->PartDescription.Parts[RetryPartIndex];
+ IoBuffer RetryPartPayload =
+ Workload->Transmitter(RetryPart.FirstByte, RetryPart.LastByte - RetryPart.FirstByte - 1);
+ std::string RetryMultipartUploadResponseRequestString =
+ fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}/uploadMultipart{}&supportsRedirect={}",
+ Namespace,
+ BucketId,
+ BuildId,
+ Hash.ToHexString(),
+ RetryPart.QueryString,
+ m_AllowRedirect ? "true"sv : "false"sv);
+
+ MultipartUploadResponse = m_HttpClient.Put(RetryMultipartUploadResponseRequestString, RetryPartPayload);
+ TotalUploadedBytes = MultipartUploadResponse.UploadedBytes;
+ TotalDownloadedBytes = MultipartUploadResponse.DownloadedBytes;
+ TotalElapsedSeconds = MultipartUploadResponse.ElapsedSeconds;
+ if (!MultipartUploadResponse.IsSuccess())
+ {
+ ZEN_WARN("{}", MultipartUploadResponse.ErrorMessage(RetryMultipartUploadResponseRequestString));
+ MultipartEndResponse = MultipartUploadResponse;
+ }
+ }
+ }
+ }
+ else
+ {
+ ZEN_WARN("{}", MultipartEndResponse.ErrorMessage(MultipartEndResponseRequestString));
+ }
+ }
+ MultipartEndResponse.UploadedBytes = TotalUploadedBytes;
+ MultipartEndResponse.DownloadedBytes = TotalDownloadedBytes;
+ MultipartEndResponse.ElapsedSeconds = TotalElapsedSeconds;
+ return detail::ConvertResponse(MultipartEndResponse, "JupiterSession::PutMultipartBuildBlob"sv);
+ }
+ return detail::ConvertResponse(MultipartUploadResponse, "JupiterSession::PutMultipartBuildBlob"sv);
+ });
+ }
+ return detail::ConvertResponse(StartMultipartResponse, "JupiterSession::PutMultipartBuildBlob"sv);
+}
+
+JupiterResult
+JupiterSession::GetMultipartBuildBlob(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const IoHash& Hash,
+ uint64_t ChunkSize,
+ std::function<void(uint64_t Offset, const IoBuffer& Chunk)>&& OnReceive,
+ std::function<void()>&& OnComplete,
+ std::vector<std::function<JupiterResult()>>& OutWorkItems)
+{
+ std::string RequestUrl = fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}?supportsRedirect={}",
+ Namespace,
+ BucketId,
+ BuildId,
+ Hash.ToHexString(),
+ m_AllowRedirect ? "true"sv : "false"sv);
+ HttpClient::Response Response =
+ m_HttpClient.Get(RequestUrl, HttpClient::KeyValueMap({{"Range", fmt::format("bytes={}-{}", 0, ChunkSize - 1)}}));
+ if (Response.IsSuccess())
+ {
+ if (std::string_view ContentRange = Response.Header.Entries["Content-Range"]; !ContentRange.empty())
+ {
+ if (std::string_view::size_type SizeDelimiterPos = ContentRange.find('/'); SizeDelimiterPos != std::string_view::npos)
+ {
+ if (std::optional<uint64_t> TotalSizeMaybe = ParseInt<uint64_t>(ContentRange.substr(SizeDelimiterPos + 1));
+ TotalSizeMaybe.has_value())
+ {
+ uint64_t TotalSize = TotalSizeMaybe.value();
+ uint64_t PayloadSize = Response.ResponsePayload.GetSize();
+
+ OnReceive(0, Response.ResponsePayload);
+
+ if (TotalSize > PayloadSize)
+ {
+ struct WorkloadData
+ {
+ std::function<void(uint64_t Offset, const IoBuffer& Chunk)> OnReceive;
+ std::function<void()> OnComplete;
+ std::atomic<uint64_t> BytesRemaining;
+ };
+
+ std::shared_ptr<WorkloadData> Workload(std::make_shared<WorkloadData>());
+ Workload->OnReceive = std::move(OnReceive);
+ Workload->OnComplete = std::move(OnComplete);
+ Workload->BytesRemaining = TotalSize - PayloadSize;
+
+ uint64_t Offset = PayloadSize;
+ while (Offset < TotalSize)
+ {
+ uint64_t PartSize = Min(ChunkSize, TotalSize - Offset);
+ OutWorkItems.emplace_back([this,
+ Namespace = std::string(Namespace),
+ BucketId = std::string(BucketId),
+ BuildId = Oid(BuildId),
+ Hash = IoHash(Hash),
+ TotalSize,
+ Workload,
+ Offset,
+ PartSize]() -> JupiterResult {
+ std::string RequestUrl = fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}?supportsRedirect={}",
+ Namespace,
+ BucketId,
+ BuildId,
+ Hash.ToHexString(),
+ m_AllowRedirect ? "true"sv : "false"sv);
+ HttpClient::Response Response = m_HttpClient.Get(
+ RequestUrl,
+ HttpClient::KeyValueMap({{"Range", fmt::format("bytes={}-{}", Offset, Offset + PartSize - 1)}}));
+ if (Response.IsSuccess())
+ {
+ Workload->OnReceive(Offset, Response.ResponsePayload);
+ uint64_t ByteRemaning = Workload->BytesRemaining.fetch_sub(Response.ResponsePayload.GetSize());
+ if (ByteRemaning == Response.ResponsePayload.GetSize())
+ {
+ Workload->OnComplete();
+ }
+ }
+ return detail::ConvertResponse(Response, "JupiterSession::GetMultipartBuildBlob"sv);
+ });
+ Offset += PartSize;
+ }
+ }
+ else
+ {
+ OnComplete();
+ }
+ return detail::ConvertResponse(Response, "JupiterSession::GetMultipartBuildBlob"sv);
+ }
+ }
+ }
+ OnReceive(0, Response.ResponsePayload);
+ OnComplete();
+ }
+ return detail::ConvertResponse(Response, "JupiterSession::GetMultipartBuildBlob"sv);
+}
+
+JupiterResult
+JupiterSession::GetBuildBlob(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const IoHash& Hash,
+ std::filesystem::path TempFolderPath,
+ uint64_t Offset,
+ uint64_t Size)
+{
+ HttpClient::KeyValueMap Headers;
+ if (Offset != 0 || Size != (uint64_t)-1)
+ {
+ Headers.Entries.insert({"Range", fmt::format("bytes={}-{}", Offset, Offset + Size - 1)});
+ }
+ HttpClient::Response Response = m_HttpClient.Download(fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}?supportsRedirect={}",
+ Namespace,
+ BucketId,
+ BuildId,
+ Hash.ToHexString(),
+ m_AllowRedirect ? "true"sv : "false"sv),
+ TempFolderPath,
+ Headers);
+ if (Response.IsSuccess())
+ {
+ // If we get a redirect to S3 or a non-Jupiter endpoint the content type will not be correct, validate it and set it
+ if (m_AllowRedirect && (Response.ResponsePayload.GetContentType() == HttpContentType::kBinary))
+ {
+ IoHash ValidateRawHash;
+ uint64_t ValidateRawSize = 0;
+ ZEN_ASSERT_SLOW(CompressedBuffer::ValidateCompressedHeader(Response.ResponsePayload, ValidateRawHash, ValidateRawSize));
+ ZEN_ASSERT_SLOW(ValidateRawHash == Hash);
+ ZEN_ASSERT_SLOW(ValidateRawSize > 0);
+ ZEN_UNUSED(ValidateRawHash, ValidateRawSize);
+ Response.ResponsePayload.SetContentType(ZenContentType::kCompressedBinary);
+ }
+ }
+ return detail::ConvertResponse(Response, "JupiterSession::GetBuildBlob"sv);
+}
+
+JupiterResult
+JupiterSession::PutBlockMetadata(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const IoHash& Hash,
+ const IoBuffer& Payload)
+{
+ ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject);
+ HttpClient::Response Response =
+ m_HttpClient.Put(fmt::format("/api/v2/builds/{}/{}/{}/blocks/{}/metadata", Namespace, BucketId, BuildId, Hash.ToHexString()),
+ Payload);
+ return detail::ConvertResponse(Response, "JupiterSession::PutBlockMetadata"sv);
+}
+
+FinalizeBuildPartResult
+JupiterSession::FinalizeBuildPart(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const Oid& PartId,
+ const IoHash& RawHash)
+{
+ HttpClient::Response Response = m_HttpClient.Post(
+ fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/finalize/{}", Namespace, BucketId, BuildId, PartId, RawHash.ToHexString()),
+ HttpClient::Accept(ZenContentType::kCbObject));
+
+ FinalizeBuildPartResult Result = {detail::ConvertResponse(Response, "JupiterSession::FinalizeBuildPart"sv)};
+ if (Result.Success)
+ {
+ std::string JsonError;
+ json11::Json Json = json11::Json::parse(Response.ToText(), JsonError);
+ if (JsonError.empty())
+ {
+ json11::Json::array Needs = Json["needs"].array_items();
+ for (const auto& Need : Needs)
+ {
+ Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value()));
+ }
+ }
+ }
+ return Result;
+}
+
+JupiterResult
+JupiterSession::FindBlocks(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, uint64_t MaxBlockCount)
+{
+ const std::string Parameters = MaxBlockCount == (uint64_t)-1 ? "" : fmt::format("?count={}", MaxBlockCount);
+ HttpClient::Response Response =
+ m_HttpClient.Get(fmt::format("/api/v2/builds/{}/{}/{}/blocks/listBlocks{}", Namespace, BucketId, BuildId, Parameters),
+ HttpClient::Accept(ZenContentType::kCbObject));
+ return detail::ConvertResponse(Response, "JupiterSession::FindBlocks"sv);
+}
+
+JupiterResult
+JupiterSession::GetBlockMetadata(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, IoBuffer Payload)
+{
+ ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject);
+ HttpClient::Response Response =
+ m_HttpClient.Post(fmt::format("/api/v2/builds/{}/{}/{}/blocks/getBlockMetadata", Namespace, BucketId, BuildId),
+ Payload,
+ HttpClient::Accept(ZenContentType::kCbObject));
+ return detail::ConvertResponse(Response, "JupiterSession::GetBlockMetadata"sv);
+}
+
+JupiterResult
+JupiterSession::PutBuildPartStats(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const Oid& BuildPartId,
+ IoBuffer Payload)
+{
+ ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject);
+ HttpClient::Response Response =
+ m_HttpClient.Put(fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/stats", Namespace, BucketId, BuildId, BuildPartId),
+ Payload,
+ HttpClient::Accept(ZenContentType::kCbObject));
+ return detail::ConvertResponse(Response, "JupiterSession::PutBuildPartStats"sv);
+}
+
+} // namespace zen
diff --git a/src/zenremotestore/projectstore/jupiterremoteprojectstore.cpp b/src/zenremotestore/projectstore/jupiterremoteprojectstore.cpp
index bfd4e433c..b198b7c99 100644
--- a/src/zenremotestore/projectstore/jupiterremoteprojectstore.cpp
+++ b/src/zenremotestore/projectstore/jupiterremoteprojectstore.cpp
@@ -8,8 +8,8 @@
#include <zenhttp/httpclientauth.h>
-#include <zenutil/jupiter/jupiterclient.h>
-#include <zenutil/jupiter/jupitersession.h>
+#include <zenremotestore/jupiter/jupiterclient.h>
+#include <zenremotestore/jupiter/jupitersession.h>
namespace zen {
diff --git a/src/zenremotestore/projectstore/remoteprojectstore.cpp b/src/zenremotestore/projectstore/remoteprojectstore.cpp
index c2e270909..d008d1452 100644
--- a/src/zenremotestore/projectstore/remoteprojectstore.cpp
+++ b/src/zenremotestore/projectstore/remoteprojectstore.cpp
@@ -15,7 +15,6 @@
#include <zenhttp/httpcommon.h>
#include <zenremotestore/chunking/chunkedfile.h>
#include <zenstore/cidstore.h>
-#include <zenutil/workerpools.h>
#include <unordered_map>
@@ -2307,6 +2306,7 @@ RemoteProjectStore::LoadContainerResult
BuildContainer(CidStore& ChunkStore,
ProjectStore::Project& Project,
ProjectStore::Oplog& Oplog,
+ WorkerThreadPool& WorkerPool,
size_t MaxBlockSize,
size_t MaxChunksPerBlock,
size_t MaxChunkEmbedSize,
@@ -2319,7 +2319,7 @@ BuildContainer(CidStore& ChunkStore,
const std::function<void(std::vector<std::pair<IoHash, FetchChunkFunc>>&&)>& OnBlockChunks,
bool EmbedLooseFiles)
{
- WorkerThreadPool& WorkerPool = GetLargeWorkerPool(EWorkloadType::Background);
+ // WorkerThreadPool& WorkerPool = GetLargeWorkerPool(EWorkloadType::Background);
remotestore_impl::AsyncRemoteResult RemoteResult;
CbObject ContainerObject = BuildContainer(ChunkStore,
@@ -2348,6 +2348,8 @@ SaveOplog(CidStore& ChunkStore,
RemoteProjectStore& RemoteStore,
ProjectStore::Project& Project,
ProjectStore::Oplog& Oplog,
+ WorkerThreadPool& NetworkWorkerPool,
+ WorkerThreadPool& WorkerPool,
size_t MaxBlockSize,
size_t MaxChunksPerBlock,
size_t MaxChunkEmbedSize,
@@ -2363,9 +2365,6 @@ SaveOplog(CidStore& ChunkStore,
remotestore_impl::UploadInfo Info;
- WorkerThreadPool& WorkerPool = GetLargeWorkerPool(EWorkloadType::Background);
- WorkerThreadPool& NetworkWorkerPool = GetMediumWorkerPool(EWorkloadType::Background);
-
const RemoteProjectStore::RemoteStoreInfo RemoteStoreInfo = RemoteStore.GetInfo();
std::filesystem::path AttachmentTempPath;
@@ -2887,6 +2886,8 @@ RemoteProjectStore::Result
LoadOplog(CidStore& ChunkStore,
RemoteProjectStore& RemoteStore,
ProjectStore::Oplog& Oplog,
+ WorkerThreadPool& NetworkWorkerPool,
+ WorkerThreadPool& WorkerPool,
bool ForceDownload,
bool IgnoreMissingAttachments,
bool CleanOplog,
@@ -2898,9 +2899,6 @@ LoadOplog(CidStore& ChunkStore,
Stopwatch Timer;
- WorkerThreadPool& WorkerPool = GetLargeWorkerPool(EWorkloadType::Background);
- WorkerThreadPool& NetworkWorkerPool = GetSmallWorkerPool(EWorkloadType::Background);
-
std::unordered_set<IoHash, IoHash::Hasher> Attachments;
uint64_t BlockCountToDownload = 0;
@@ -3454,10 +3452,15 @@ TEST_CASE_TEMPLATE("project.store.export",
std::shared_ptr<RemoteProjectStore> RemoteStore = CreateFileRemoteStore(Options);
RemoteProjectStore::RemoteStoreInfo StoreInfo = RemoteStore->GetInfo();
+ WorkerThreadPool WorkerPool(4);
+ WorkerThreadPool NetworkPool(2);
+
RemoteProjectStore::Result ExportResult = SaveOplog(CidStore,
*RemoteStore,
*Project.Get(),
*Oplog,
+ NetworkPool,
+ WorkerPool,
Options.MaxBlockSize,
Options.MaxChunksPerBlock,
Options.MaxChunkEmbedSize,
@@ -3475,6 +3478,8 @@ TEST_CASE_TEMPLATE("project.store.export",
RemoteProjectStore::Result ImportResult = LoadOplog(CidStore,
*RemoteStore,
*OplogImport,
+ NetworkPool,
+ WorkerPool,
/*Force*/ false,
/*IgnoreMissingAttachments*/ false,
/*CleanOplog*/ false,
@@ -3484,6 +3489,8 @@ TEST_CASE_TEMPLATE("project.store.export",
RemoteProjectStore::Result ImportForceResult = LoadOplog(CidStore,
*RemoteStore,
*OplogImport,
+ NetworkPool,
+ WorkerPool,
/*Force*/ true,
/*IgnoreMissingAttachments*/ false,
/*CleanOplog*/ false,
@@ -3493,6 +3500,8 @@ TEST_CASE_TEMPLATE("project.store.export",
RemoteProjectStore::Result ImportCleanResult = LoadOplog(CidStore,
*RemoteStore,
*OplogImport,
+ NetworkPool,
+ WorkerPool,
/*Force*/ false,
/*IgnoreMissingAttachments*/ false,
/*CleanOplog*/ true,
@@ -3502,6 +3511,8 @@ TEST_CASE_TEMPLATE("project.store.export",
RemoteProjectStore::Result ImportForceCleanResult = LoadOplog(CidStore,
*RemoteStore,
*OplogImport,
+ NetworkPool,
+ WorkerPool,
/*Force*/ true,
/*IgnoreMissingAttachments*/ false,
/*CleanOplog*/ true,
diff --git a/src/zenremotestore/xmake.lua b/src/zenremotestore/xmake.lua
index 35d554710..0818dda7b 100644
--- a/src/zenremotestore/xmake.lua
+++ b/src/zenremotestore/xmake.lua
@@ -6,6 +6,6 @@ target('zenremotestore')
add_headerfiles("**.h")
add_files("**.cpp")
add_includedirs("include", {public=true})
- add_deps("zencore", "zenstore", "zenutil")
+ add_deps("zencore", "zenstore")
add_packages("vcpkg::robin-map")
add_packages("vcpkg::eastl", {public=true});