aboutsummaryrefslogtreecommitdiff
path: root/src/zenutil
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenutil')
-rw-r--r--src/zenutil/include/zenutil/jupiter/jupiterclient.h56
-rw-r--r--src/zenutil/include/zenutil/jupiter/jupiterhost.h35
-rw-r--r--src/zenutil/include/zenutil/jupiter/jupitersession.h179
-rw-r--r--src/zenutil/include/zenutil/parallelwork.h80
-rw-r--r--src/zenutil/jupiter/jupiterclient.cpp28
-rw-r--r--src/zenutil/jupiter/jupiterhost.cpp66
-rw-r--r--src/zenutil/jupiter/jupitersession.cpp871
-rw-r--r--src/zenutil/parallelwork.cpp264
-rw-r--r--src/zenutil/zenutil.cpp2
9 files changed, 0 insertions, 1581 deletions
diff --git a/src/zenutil/include/zenutil/jupiter/jupiterclient.h b/src/zenutil/include/zenutil/jupiter/jupiterclient.h
deleted file mode 100644
index 8a51bd60a..000000000
--- a/src/zenutil/include/zenutil/jupiter/jupiterclient.h
+++ /dev/null
@@ -1,56 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#pragma once
-
-#include <zenbase/refcount.h>
-#include <zencore/logging.h>
-#include <zenhttp/httpclient.h>
-
-#include <chrono>
-
-namespace zen {
-
-class IoBuffer;
-
-struct JupiterClientOptions
-{
- std::string_view Name;
- std::string_view ServiceUrl;
- std::string_view DdcNamespace;
- std::string_view BlobStoreNamespace;
- std::string_view ComputeCluster;
- std::chrono::milliseconds ConnectTimeout{5000};
- std::chrono::milliseconds Timeout{};
- bool AssumeHttp2 = false;
- bool AllowResume = false;
- uint8_t RetryCount = 0;
-};
-
-/**
- * Jupiter upstream cache client
- */
-class JupiterClient : public RefCounted
-{
-public:
- JupiterClient(const JupiterClientOptions& Options, std::function<HttpClientAccessToken()>&& TokenProvider);
- ~JupiterClient();
-
- std::string_view DefaultDdcNamespace() const { return m_DefaultDdcNamespace; }
- std::string_view DefaultBlobStoreNamespace() const { return m_DefaultBlobStoreNamespace; }
- std::string_view ComputeCluster() const { return m_ComputeCluster; }
- std::string_view ServiceUrl() const { return m_HttpClient.GetBaseUri(); }
-
- LoggerRef Logger() { return m_Log; }
- HttpClient& Client() { return m_HttpClient; }
-
-private:
- LoggerRef m_Log;
- const std::string m_DefaultDdcNamespace;
- const std::string m_DefaultBlobStoreNamespace;
- const std::string m_ComputeCluster;
- HttpClient m_HttpClient;
-
- friend class JupiterSession;
-};
-
-} // namespace zen
diff --git a/src/zenutil/include/zenutil/jupiter/jupiterhost.h b/src/zenutil/include/zenutil/jupiter/jupiterhost.h
deleted file mode 100644
index 3bbc700b8..000000000
--- a/src/zenutil/include/zenutil/jupiter/jupiterhost.h
+++ /dev/null
@@ -1,35 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#pragma once
-
-#include <string>
-#include <string_view>
-#include <vector>
-
-namespace zen {
-
-struct HttpClientSettings;
-
-struct JupiterServerDiscovery
-{
- struct EndPoint
- {
- std::string Name;
- std::string BaseUrl;
- bool AssumeHttp2 = false;
- };
- std::vector<EndPoint> ServerEndPoints;
- std::vector<EndPoint> CacheEndPoints;
-};
-
-JupiterServerDiscovery DiscoverJupiterEndpoints(std::string_view Host, const HttpClientSettings& ClientSettings);
-
-struct JupiterEndpointTestResult
-{
- bool Success = false;
- std::string FailureReason;
-};
-
-JupiterEndpointTestResult TestJupiterEndpoint(std::string_view BaseUrl, const bool AssumeHttp2);
-
-} // namespace zen
diff --git a/src/zenutil/include/zenutil/jupiter/jupitersession.h b/src/zenutil/include/zenutil/jupiter/jupitersession.h
deleted file mode 100644
index b79790f25..000000000
--- a/src/zenutil/include/zenutil/jupiter/jupitersession.h
+++ /dev/null
@@ -1,179 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#pragma once
-
-#include <zencore/iohash.h>
-#include <zencore/logging.h>
-#include <zenhttp/httpclient.h>
-
-#include <set>
-
-namespace zen {
-
-class IoBuffer;
-
-struct JupiterResult
-{
- IoBuffer Response;
- uint64_t SentBytes{};
- uint64_t ReceivedBytes{};
- double ElapsedSeconds{};
- int32_t ErrorCode{};
- std::string Reason;
- bool Success = false;
-};
-
-struct PutRefResult : JupiterResult
-{
- std::vector<IoHash> Needs;
- IoHash RawHash;
-};
-
-struct FinalizeRefResult : JupiterResult
-{
- std::vector<IoHash> Needs;
-};
-
-struct JupiterExistsResult : JupiterResult
-{
- std::set<IoHash> Needs;
-};
-
-struct GetObjectReferencesResult : JupiterResult
-{
- std::set<IoHash> References;
-};
-
-struct PutBuildPartResult : JupiterResult
-{
- std::vector<IoHash> Needs;
- IoHash RawHash;
-};
-
-struct FinalizeBuildPartResult : JupiterResult
-{
- std::vector<IoHash> Needs;
-};
-
-/**
- * Context for performing Jupiter operations
- *
- * Maintains an HTTP connection so that subsequent operations don't need to go
- * through the whole connection setup process
- *
- */
-class JupiterSession
-{
-public:
- JupiterSession(LoggerRef InLog, HttpClient& InHttpClient, bool AllowRedirect);
- ~JupiterSession();
-
- JupiterResult Authenticate();
-
- JupiterResult GetRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, ZenContentType RefType);
- JupiterResult GetBlob(std::string_view Namespace, const IoHash& Key);
- JupiterResult GetCompressedBlob(std::string_view Namespace, const IoHash& Key, std::filesystem::path TempFolderPath = {});
- JupiterResult GetObject(std::string_view Namespace, const IoHash& Key);
- JupiterResult GetInlineBlob(std::string_view Namespace,
- std::string_view BucketId,
- const IoHash& Key,
- IoHash& OutPayloadHash,
- std::filesystem::path TempFolderPath = {});
-
- PutRefResult PutRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, IoBuffer Ref, ZenContentType RefType);
- JupiterResult PutBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob);
- JupiterResult PutCompressedBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob);
- JupiterResult PutCompressedBlob(std::string_view Namespace, const IoHash& Key, const CompositeBuffer& Blob);
- JupiterResult PutObject(std::string_view Namespace, const IoHash& Key, IoBuffer Object);
-
- FinalizeRefResult FinalizeRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, const IoHash& RefHah);
-
- JupiterResult RefExists(std::string_view Namespace, std::string_view BucketId, const IoHash& Key);
-
- GetObjectReferencesResult GetObjectReferences(std::string_view Namespace, const IoHash& Key);
-
- JupiterResult BlobExists(std::string_view Namespace, const IoHash& Key);
- JupiterResult CompressedBlobExists(std::string_view Namespace, const IoHash& Key);
- JupiterResult ObjectExists(std::string_view Namespace, const IoHash& Key);
-
- JupiterExistsResult BlobExists(std::string_view Namespace, const std::set<IoHash>& Keys);
- JupiterExistsResult CompressedBlobExists(std::string_view Namespace, const std::set<IoHash>& Keys);
- JupiterExistsResult ObjectExists(std::string_view Namespace, const std::set<IoHash>& Keys);
-
- std::vector<IoHash> Filter(std::string_view Namespace, std::string_view BucketId, const std::vector<IoHash>& ChunkHashes);
-
- JupiterResult ListBuildNamespaces();
- JupiterResult ListBuildBuckets(std::string_view Namespace);
- JupiterResult ListBuilds(std::string_view Namespace, std::string_view BucketId, const IoBuffer& Payload);
- JupiterResult PutBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const IoBuffer& Payload);
- JupiterResult GetBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId);
- JupiterResult FinalizeBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId);
- PutBuildPartResult PutBuildPart(std::string_view Namespace,
- std::string_view BucketId,
- const Oid& BuildId,
- const Oid& PartId,
- std::string_view PartName,
- const IoBuffer& Payload);
- JupiterResult GetBuildPart(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const Oid& PartId);
- JupiterResult PutBuildBlob(std::string_view Namespace,
- std::string_view BucketId,
- const Oid& BuildId,
- const IoHash& Hash,
- ZenContentType ContentType,
- const CompositeBuffer& Payload);
- JupiterResult GetBuildBlob(std::string_view Namespace,
- std::string_view BucketId,
- const Oid& BuildId,
- const IoHash& Hash,
- std::filesystem::path TempFolderPath,
- uint64_t Offset = 0,
- uint64_t Size = (uint64_t)-1);
-
- JupiterResult PutMultipartBuildBlob(std::string_view Namespace,
- std::string_view BucketId,
- const Oid& BuildId,
- const IoHash& Hash,
- ZenContentType ContentType,
- uint64_t PayloadSize,
- std::function<IoBuffer(uint64_t Offset, uint64_t Size)>&& Transmitter,
- std::vector<std::function<JupiterResult(bool& OutIsComplete)>>& OutWorkItems);
- JupiterResult GetMultipartBuildBlob(std::string_view Namespace,
- std::string_view BucketId,
- const Oid& BuildId,
- const IoHash& Hash,
- uint64_t ChunkSize,
- std::function<void(uint64_t Offset, const IoBuffer& Chunk)>&& OnReceive,
- std::function<void()>&& OnComplete,
- std::vector<std::function<JupiterResult()>>& OutWorkItems);
- JupiterResult PutBlockMetadata(std::string_view Namespace,
- std::string_view BucketId,
- const Oid& BuildId,
- const IoHash& Hash,
- const IoBuffer& Payload);
- FinalizeBuildPartResult FinalizeBuildPart(std::string_view Namespace,
- std::string_view BucketId,
- const Oid& BuildId,
- const Oid& PartId,
- const IoHash& RawHash);
- JupiterResult FindBlocks(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, uint64_t MaxBlockCount);
- JupiterResult GetBlockMetadata(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, IoBuffer Payload);
-
- JupiterResult PutBuildPartStats(std::string_view Namespace,
- std::string_view BucketId,
- const Oid& BuildId,
- const Oid& BuildPartId,
- IoBuffer Payload);
-
-private:
- inline LoggerRef Log() { return m_Log; }
-
- JupiterResult CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const IoHash& Key);
-
- JupiterExistsResult CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const std::set<IoHash>& Keys);
-
- LoggerRef m_Log;
- HttpClient& m_HttpClient;
- const bool m_AllowRedirect = false;
-};
-
-} // namespace zen
diff --git a/src/zenutil/include/zenutil/parallelwork.h b/src/zenutil/include/zenutil/parallelwork.h
deleted file mode 100644
index 05146d644..000000000
--- a/src/zenutil/include/zenutil/parallelwork.h
+++ /dev/null
@@ -1,80 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#pragma once
-
-#include <zencore/scopeguard.h>
-#include <zencore/thread.h>
-#include <zencore/workthreadpool.h>
-
-#include <atomic>
-
-namespace zen {
-
-class ParallelWork
-{
-public:
- ParallelWork(std::atomic<bool>& AbortFlag, std::atomic<bool>& PauseFlag, WorkerThreadPool::EMode Mode);
-
- ~ParallelWork();
-
- typedef std::function<void(std::atomic<bool>& AbortFlag)> WorkCallback;
- typedef std::function<void(std::exception_ptr Ex, std::atomic<bool>& AbortFlag)> ExceptionCallback;
- typedef std::function<void(bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork)> UpdateCallback;
-
- void ScheduleWork(WorkerThreadPool& WorkerPool, WorkCallback&& Work, ExceptionCallback&& OnError = {})
- {
- m_PendingWork.AddCount(1);
- try
- {
- WorkerPool.ScheduleWork(
- [this, Work = std::move(Work), OnError = OnError ? std::move(OnError) : DefaultErrorFunction()] {
- auto _ = MakeGuard([this]() { m_PendingWork.CountDown(); });
- try
- {
- while (m_PauseFlag && !m_AbortFlag)
- {
- Sleep(2000);
- }
- Work(m_AbortFlag);
- }
- catch (...)
- {
- OnError(std::current_exception(), m_AbortFlag);
- }
- },
- m_Mode);
- }
- catch (const std::exception&)
- {
- m_PendingWork.CountDown();
- throw;
- }
- }
-
- void Abort() { m_AbortFlag = true; }
-
- bool IsAborted() const { return m_AbortFlag.load(); }
-
- void Wait(int32_t UpdateIntervalMS, UpdateCallback&& UpdateCallback);
-
- void Wait();
-
- Latch& PendingWork() { return m_PendingWork; }
-
-private:
- ExceptionCallback DefaultErrorFunction();
- void RethrowErrors();
-
- std::atomic<bool>& m_AbortFlag;
- std::atomic<bool>& m_PauseFlag;
- const WorkerThreadPool::EMode m_Mode;
- bool m_DispatchComplete = false;
- Latch m_PendingWork;
-
- RwLock m_ErrorLock;
- std::vector<std::exception_ptr> m_Errors;
-};
-
-void parallellwork_forcelink();
-
-} // namespace zen
diff --git a/src/zenutil/jupiter/jupiterclient.cpp b/src/zenutil/jupiter/jupiterclient.cpp
deleted file mode 100644
index dbac218a4..000000000
--- a/src/zenutil/jupiter/jupiterclient.cpp
+++ /dev/null
@@ -1,28 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#include <zenutil/jupiter/jupiterclient.h>
-
-namespace zen {
-
-using namespace std::literals;
-
-JupiterClient::JupiterClient(const JupiterClientOptions& Options, std::function<HttpClientAccessToken()>&& TokenProvider)
-: m_Log(zen::logging::Get("jupiter"sv))
-, m_DefaultDdcNamespace(Options.DdcNamespace)
-, m_DefaultBlobStoreNamespace(Options.BlobStoreNamespace)
-, m_ComputeCluster(Options.ComputeCluster)
-, m_HttpClient(Options.ServiceUrl,
- HttpClientSettings{.ConnectTimeout = Options.ConnectTimeout,
- .Timeout = Options.Timeout,
- .AccessTokenProvider = std::move(TokenProvider),
- .AssumeHttp2 = Options.AssumeHttp2,
- .AllowResume = Options.AllowResume,
- .RetryCount = Options.RetryCount})
-{
-}
-
-JupiterClient::~JupiterClient()
-{
-}
-
-} // namespace zen
diff --git a/src/zenutil/jupiter/jupiterhost.cpp b/src/zenutil/jupiter/jupiterhost.cpp
deleted file mode 100644
index d06229cbf..000000000
--- a/src/zenutil/jupiter/jupiterhost.cpp
+++ /dev/null
@@ -1,66 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#include <zenutil/jupiter/jupiterhost.h>
-
-#include <zencore/compactbinary.h>
-#include <zencore/fmtutils.h>
-#include <zenhttp/httpclient.h>
-
-namespace zen {
-
-JupiterServerDiscovery
-DiscoverJupiterEndpoints(std::string_view Host, const HttpClientSettings& ClientSettings)
-{
- JupiterServerDiscovery Result;
-
- HttpClient DiscoveryHttpClient(Host, ClientSettings);
- HttpClient::Response ServerInfoResponse = DiscoveryHttpClient.Get("/api/v1/status/servers", HttpClient::Accept(HttpContentType::kJSON));
- if (!ServerInfoResponse.IsSuccess())
- {
- ServerInfoResponse.ThrowError(fmt::format("Failed to get list of servers from discovery url '{}'", Host));
- }
- std::string_view JsonResponse = ServerInfoResponse.AsText();
- CbObject CbPayload = LoadCompactBinaryFromJson(JsonResponse).AsObject();
- CbArrayView ServerEndpoints = CbPayload["serverEndpoints"].AsArrayView();
- Result.ServerEndPoints.reserve(ServerEndpoints.Num());
-
- auto ParseEndPoints = [](CbArrayView ServerEndpoints) -> std::vector<JupiterServerDiscovery::EndPoint> {
- std::vector<JupiterServerDiscovery::EndPoint> Result;
-
- Result.reserve(ServerEndpoints.Num());
- for (CbFieldView ServerEndpointView : ServerEndpoints)
- {
- CbObjectView ServerEndPoint = ServerEndpointView.AsObjectView();
- Result.push_back(JupiterServerDiscovery::EndPoint{.Name = std::string(ServerEndPoint["name"].AsString()),
- .BaseUrl = std::string(ServerEndPoint["baseUrl"].AsString()),
- .AssumeHttp2 = ServerEndPoint["baseUrl"].AsBool(false)});
- }
- return Result;
- };
-
- Result.ServerEndPoints = ParseEndPoints(CbPayload["serverEndpoints"].AsArrayView());
- Result.CacheEndPoints = ParseEndPoints(CbPayload["cacheEndpoints"].AsArrayView());
-
- return Result;
-}
-
-JupiterEndpointTestResult
-TestJupiterEndpoint(std::string_view BaseUrl, const bool AssumeHttp2)
-{
- HttpClientSettings TestClientSettings{.LogCategory = "httpbuildsclient",
- .ConnectTimeout = std::chrono::milliseconds{1000},
- .Timeout = std::chrono::milliseconds{2000},
- .AssumeHttp2 = AssumeHttp2,
- .AllowResume = true,
- .RetryCount = 0};
-
- HttpClient TestHttpClient(BaseUrl, TestClientSettings);
- HttpClient::Response TestResponse = TestHttpClient.Get("/health/live");
- if (TestResponse.IsSuccess())
- {
- return {.Success = true};
- }
- return {.Success = false, .FailureReason = TestResponse.ErrorMessage("")};
-}
-
-} // namespace zen
diff --git a/src/zenutil/jupiter/jupitersession.cpp b/src/zenutil/jupiter/jupitersession.cpp
deleted file mode 100644
index 4b594239e..000000000
--- a/src/zenutil/jupiter/jupitersession.cpp
+++ /dev/null
@@ -1,871 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#include <zenutil/jupiter/jupitersession.h>
-
-#include <zencore/compactbinary.h>
-#include <zencore/compactbinarybuilder.h>
-#include <zencore/compactbinaryutil.h>
-#include <zencore/compositebuffer.h>
-#include <zencore/compress.h>
-#include <zencore/fmtutils.h>
-#include <zencore/trace.h>
-
-ZEN_THIRD_PARTY_INCLUDES_START
-#include <json11.hpp>
-ZEN_THIRD_PARTY_INCLUDES_END
-
-using namespace std::literals;
-
-namespace zen {
-
-namespace detail {
- JupiterResult ConvertResponse(const HttpClient::Response& Response, const std::string_view ErrorPrefix = ""sv)
- {
- if (Response.Error)
- {
- return {.SentBytes = gsl::narrow<uint64_t>(Response.UploadedBytes),
- .ReceivedBytes = gsl::narrow<uint64_t>(Response.DownloadedBytes),
- .ElapsedSeconds = Response.ElapsedSeconds,
- .ErrorCode = Response.Error.value().ErrorCode,
- .Reason = Response.ErrorMessage(ErrorPrefix),
- .Success = false};
- }
- if (!Response.IsSuccess())
- {
- return {.SentBytes = gsl::narrow<uint64_t>(Response.UploadedBytes),
- .ReceivedBytes = gsl::narrow<uint64_t>(Response.DownloadedBytes),
- .ElapsedSeconds = Response.ElapsedSeconds,
- .ErrorCode = static_cast<int32_t>(Response.StatusCode),
- .Reason = Response.ErrorMessage(ErrorPrefix),
- .Success = false};
- }
- return {.Response = Response.ResponsePayload,
- .SentBytes = gsl::narrow<uint64_t>(Response.UploadedBytes),
- .ReceivedBytes = gsl::narrow<uint64_t>(Response.DownloadedBytes),
- .ElapsedSeconds = Response.ElapsedSeconds,
- .ErrorCode = 0,
- .Success = true};
- }
-} // namespace detail
-
-JupiterSession::JupiterSession(LoggerRef InLog, HttpClient& InHttpClient, bool AllowRedirect)
-: m_Log(InLog)
-, m_HttpClient(InHttpClient)
-, m_AllowRedirect(AllowRedirect)
-{
-}
-
-JupiterSession::~JupiterSession()
-{
-}
-
-JupiterResult
-JupiterSession::Authenticate()
-{
- bool OK = m_HttpClient.Authenticate();
- return {.Success = OK};
-}
-
-JupiterResult
-JupiterSession::GetRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, ZenContentType RefType)
-{
- ZEN_TRACE_CPU("JupiterClient::GetRef");
-
- HttpClient::Response Response =
- m_HttpClient.Get(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString()), {HttpClient::Accept(RefType)});
-
- return detail::ConvertResponse(Response, "JupiterSession::GetRef"sv);
-}
-
-JupiterResult
-JupiterSession::GetBlob(std::string_view Namespace, const IoHash& Key)
-{
- ZEN_TRACE_CPU("JupiterClient::GetBlob");
- HttpClient::Response Response =
- m_HttpClient.Get(fmt::format("/api/v1/blobs/{}/{}", Namespace, Key.ToHexString()), {HttpClient::Accept(ZenContentType::kBinary)});
-
- return detail::ConvertResponse(Response);
-}
-
-JupiterResult
-JupiterSession::GetCompressedBlob(std::string_view Namespace, const IoHash& Key, std::filesystem::path TempFolderPath)
-{
- ZEN_TRACE_CPU("JupiterClient::GetCompressedBlob");
-
- HttpClient::Response Response = m_HttpClient.Download(fmt::format("/api/v1/compressed-blobs/{}/{}", Namespace, Key.ToHexString()),
- TempFolderPath,
- {HttpClient::Accept(ZenContentType::kCompressedBinary)});
-
- return detail::ConvertResponse(Response);
-}
-
-JupiterResult
-JupiterSession::GetInlineBlob(std::string_view Namespace,
- std::string_view BucketId,
- const IoHash& Key,
- IoHash& OutPayloadHash,
- std::filesystem::path TempFolderPath)
-{
- ZEN_TRACE_CPU("JupiterClient::GetInlineBlob");
-
- HttpClient::Response Response = m_HttpClient.Download(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString()),
- TempFolderPath,
- {{"Accept", "application/x-jupiter-inline"}});
-
- JupiterResult Result = detail::ConvertResponse(Response);
-
- if (auto It = Response.Header->find("X-Jupiter-InlinePayloadHash"); It != Response.Header->end())
- {
- const std::string& PayloadHashHeader = It->second;
- if (PayloadHashHeader.length() == IoHash::StringLength)
- {
- OutPayloadHash = IoHash::FromHexString(PayloadHashHeader);
- }
- }
-
- return Result;
-}
-
-JupiterResult
-JupiterSession::GetObject(std::string_view Namespace, const IoHash& Key)
-{
- ZEN_TRACE_CPU("JupiterClient::GetObject");
-
- HttpClient::Response Response = m_HttpClient.Get(fmt::format("/api/v1/objects/{}/{}", Namespace, Key.ToHexString()),
- {HttpClient::Accept(ZenContentType::kCbObject)});
-
- return detail::ConvertResponse(Response);
-}
-
-PutRefResult
-JupiterSession::PutRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, IoBuffer Ref, ZenContentType RefType)
-{
- ZEN_TRACE_CPU("JupiterClient::PutRef");
-
- Ref.SetContentType(RefType);
-
- IoHash Hash = IoHash::HashBuffer(Ref.Data(), Ref.Size());
-
- HttpClient::Response Response = m_HttpClient.Put(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString()),
- Ref,
- {{"X-Jupiter-IoHash", Hash.ToHexString()}});
-
- PutRefResult Result = {detail::ConvertResponse(Response)};
- if (Result.Success)
- {
- std::string JsonError;
- json11::Json Json = json11::Json::parse(Response.ToText(), JsonError);
- if (JsonError.empty())
- {
- json11::Json::array Needs = Json["needs"].array_items();
- for (const auto& Need : Needs)
- {
- Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value()));
- }
- }
- Result.RawHash = Hash;
- }
- return Result;
-}
-
-FinalizeRefResult
-JupiterSession::FinalizeRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, const IoHash& RefHash)
-{
- ZEN_TRACE_CPU("JupiterClient::FinalizeRef");
-
- HttpClient::Response Response =
- m_HttpClient.Post(fmt::format("/api/v1/refs/{}/{}/{}/finalize/{}", Namespace, BucketId, Key.ToHexString(), RefHash.ToHexString()),
- {{"X-Jupiter-IoHash", RefHash.ToHexString()}, {"Content-Type", "application/x-ue-cb"}});
-
- FinalizeRefResult Result = {detail::ConvertResponse(Response)};
-
- if (Result.Success)
- {
- std::string JsonError;
- json11::Json Json = json11::Json::parse(std::string(Response.ToText()), JsonError);
- if (JsonError.empty())
- {
- json11::Json::array Needs = Json["needs"].array_items();
- for (const auto& Need : Needs)
- {
- Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value()));
- }
- }
- }
- return Result;
-}
-
-JupiterResult
-JupiterSession::PutBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob)
-{
- ZEN_TRACE_CPU("JupiterClient::PutBlob");
-
- HttpClient::Response Response = m_HttpClient.Put(fmt::format("/api/v1/blobs/{}/{}", Namespace, Key.ToHexString()), Blob);
-
- return detail::ConvertResponse(Response);
-}
-
-JupiterResult
-JupiterSession::PutCompressedBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob)
-{
- ZEN_TRACE_CPU("JupiterClient::PutCompressedBlob");
-
- Blob.SetContentType(ZenContentType::kCompressedBinary);
- HttpClient::Response Response = m_HttpClient.Upload(fmt::format("/api/v1/compressed-blobs/{}/{}", Namespace, Key.ToHexString()), Blob);
-
- return detail::ConvertResponse(Response);
-}
-
-JupiterResult
-JupiterSession::PutCompressedBlob(std::string_view Namespace, const IoHash& Key, const CompositeBuffer& Payload)
-{
- ZEN_TRACE_CPU("JupiterClient::PutCompressedBlob");
-
- HttpClient::Response Response = m_HttpClient.Upload(fmt::format("/api/v1/compressed-blobs/{}/{}", Namespace, Key.ToHexString()),
- Payload,
- ZenContentType::kCompressedBinary);
-
- return detail::ConvertResponse(Response);
-}
-
-JupiterResult
-JupiterSession::PutObject(std::string_view Namespace, const IoHash& Key, IoBuffer Object)
-{
- ZEN_TRACE_CPU("JupiterClient::PutObject");
-
- Object.SetContentType(ZenContentType::kCbObject);
- HttpClient::Response Response = m_HttpClient.Upload(fmt::format("/api/v1/objects/{}/{}", Namespace, Key.ToHexString()), Object);
-
- return detail::ConvertResponse(Response);
-}
-
-JupiterResult
-JupiterSession::RefExists(std::string_view Namespace, std::string_view BucketId, const IoHash& Key)
-{
- ZEN_TRACE_CPU("JupiterClient::RefExists");
-
- HttpClient::Response Response = m_HttpClient.Head(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString()));
-
- return detail::ConvertResponse(Response);
-}
-
-GetObjectReferencesResult
-JupiterSession::GetObjectReferences(std::string_view Namespace, const IoHash& Key)
-{
- ZEN_TRACE_CPU("JupiterClient::GetObjectReferences");
-
- HttpClient::Response Response = m_HttpClient.Head(fmt::format("/api/v1/refs/{}/{}/references", Namespace, Key.ToHexString()),
- {HttpClient::Accept(ZenContentType::kCbObject)});
-
- GetObjectReferencesResult Result = {detail::ConvertResponse(Response)};
-
- if (Result.Success)
- {
- const CbObject ReferencesResponse = Response.AsObject();
- for (auto& Item : ReferencesResponse["references"sv])
- {
- Result.References.insert(Item.AsHash());
- }
- }
- return Result;
-}
-
-JupiterResult
-JupiterSession::BlobExists(std::string_view Namespace, const IoHash& Key)
-{
- return CacheTypeExists(Namespace, "blobs"sv, Key);
-}
-
-JupiterResult
-JupiterSession::CompressedBlobExists(std::string_view Namespace, const IoHash& Key)
-{
- return CacheTypeExists(Namespace, "compressed-blobs"sv, Key);
-}
-
-JupiterResult
-JupiterSession::ObjectExists(std::string_view Namespace, const IoHash& Key)
-{
- return CacheTypeExists(Namespace, "objects"sv, Key);
-}
-
-JupiterExistsResult
-JupiterSession::BlobExists(std::string_view Namespace, const std::set<IoHash>& Keys)
-{
- return CacheTypeExists(Namespace, "blobs"sv, Keys);
-}
-
-JupiterExistsResult
-JupiterSession::CompressedBlobExists(std::string_view Namespace, const std::set<IoHash>& Keys)
-{
- return CacheTypeExists(Namespace, "compressed-blobs"sv, Keys);
-}
-
-JupiterExistsResult
-JupiterSession::ObjectExists(std::string_view Namespace, const std::set<IoHash>& Keys)
-{
- return CacheTypeExists(Namespace, "objects"sv, Keys);
-}
-
-std::vector<IoHash>
-JupiterSession::Filter(std::string_view Namespace, std::string_view BucketId, const std::vector<IoHash>& ChunkHashes)
-{
- // ExtendableStringBuilder<256> Uri;
- // Uri << m_CacheClient->ServiceUrl();
- // Uri << "/api/v1/s/" << Namespace;
-
- ZEN_UNUSED(Namespace, BucketId, ChunkHashes);
-
- return {};
-}
-
-JupiterResult
-JupiterSession::CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const IoHash& Key)
-{
- ZEN_TRACE_CPU("JupiterClient::CacheTypeExists");
-
- HttpClient::Response Response = m_HttpClient.Head(fmt::format("/api/v1/{}/{}/{}", TypeId, Namespace, Key.ToHexString()));
-
- return detail::ConvertResponse(Response);
-}
-
-JupiterExistsResult
-JupiterSession::CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const std::set<IoHash>& Keys)
-{
- ZEN_TRACE_CPU("JupiterClient::CacheTypeExists");
-
- ExtendableStringBuilder<256> Body;
- Body << "[";
- for (const auto& Key : Keys)
- {
- Body << (Body.Size() != 1 ? ",\"" : "\"") << Key.ToHexString() << "\"";
- }
- Body << "]";
- IoBuffer Payload = IoBuffer(IoBuffer::Wrap, Body.Data(), Body.Size());
- Payload.SetContentType(ZenContentType::kJSON);
-
- HttpClient::Response Response =
- m_HttpClient.Post(fmt::format("/api/v1/{}/{}/exist", TypeId, Namespace), Payload, {HttpClient::Accept(ZenContentType::kCbObject)});
-
- JupiterExistsResult Result = {detail::ConvertResponse(Response)};
-
- if (Result.Success)
- {
- const CbObject ExistsResponse = Response.AsObject();
- for (auto& Item : ExistsResponse["needs"sv])
- {
- Result.Needs.insert(Item.AsHash());
- }
- }
- return Result;
-}
-
-JupiterResult
-JupiterSession::ListBuildNamespaces()
-{
- HttpClient::Response Response = m_HttpClient.Get(fmt::format("/api/v2/builds"), {HttpClient::Accept(ZenContentType::kJSON)});
- return detail::ConvertResponse(Response, "JupiterSession::ListBuildNamespaces"sv);
-}
-
-JupiterResult
-JupiterSession::ListBuildBuckets(std::string_view Namespace)
-{
- HttpClient::Response Response =
- m_HttpClient.Get(fmt::format("/api/v2/builds/{}", Namespace), {HttpClient::Accept(ZenContentType::kJSON)});
- return detail::ConvertResponse(Response, "JupiterSession::ListBuildBuckets"sv);
-}
-
-JupiterResult
-JupiterSession::ListBuilds(std::string_view Namespace, std::string_view BucketId, const IoBuffer& Payload)
-{
- ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject);
- std::string OptionalBucketPath = BucketId.empty() ? "" : fmt::format("/{}", BucketId);
- HttpClient::Response Response = m_HttpClient.Post(fmt::format("/api/v2/builds/{}{}/search", Namespace, OptionalBucketPath),
- Payload,
- {HttpClient::Accept(ZenContentType::kCbObject)});
- return detail::ConvertResponse(Response, "JupiterSession::ListBuilds"sv);
-}
-
-JupiterResult
-JupiterSession::PutBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const IoBuffer& Payload)
-{
- ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject);
- HttpClient::Response Response = m_HttpClient.Put(fmt::format("/api/v2/builds/{}/{}/{}", Namespace, BucketId, BuildId), Payload);
- return detail::ConvertResponse(Response, "JupiterSession::PutBuild"sv);
-}
-
-JupiterResult
-JupiterSession::GetBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId)
-{
- HttpClient::Response Response = m_HttpClient.Get(fmt::format("/api/v2/builds/{}/{}/{}", Namespace, BucketId, BuildId),
- HttpClient::Accept(ZenContentType::kCbObject));
- return detail::ConvertResponse(Response, "JupiterSession::GetBuild"sv);
-}
-
-JupiterResult
-JupiterSession::FinalizeBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId)
-{
- HttpClient::Response Response = m_HttpClient.Post(fmt::format("/api/v2/builds/{}/{}/{}/finalize", Namespace, BucketId, BuildId));
- return detail::ConvertResponse(Response, "JupiterSession::FinalizeBuild"sv);
-}
-
-PutBuildPartResult
-JupiterSession::PutBuildPart(std::string_view Namespace,
- std::string_view BucketId,
- const Oid& BuildId,
- const Oid& PartId,
- std::string_view PartName,
- const IoBuffer& Payload)
-{
- ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject);
-
- IoHash Hash = IoHash::HashBuffer(Payload.Data(), Payload.Size());
-
- HttpClient::Response Response =
- m_HttpClient.Put(fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/{}", Namespace, BucketId, BuildId, PartId, PartName),
- Payload,
- {{"X-Jupiter-IoHash", Hash.ToHexString()}});
-
- PutBuildPartResult Result = {detail::ConvertResponse(Response, "JupiterSession::PutBuildPart"sv)};
- if (Result.Success)
- {
- std::string JsonError;
- json11::Json Json = json11::Json::parse(Response.ToText(), JsonError);
- if (JsonError.empty())
- {
- json11::Json::array Needs = Json["needs"].array_items();
- for (const auto& Need : Needs)
- {
- Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value()));
- }
- }
- Result.RawHash = Hash;
- }
- return Result;
-}
-
-JupiterResult
-JupiterSession::GetBuildPart(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const Oid& PartId)
-{
- HttpClient::Response Response = m_HttpClient.Get(fmt::format("/api/v2/builds/{}/{}/{}/parts/{}", Namespace, BucketId, BuildId, PartId),
- HttpClient::Accept(ZenContentType::kCbObject));
- return detail::ConvertResponse(Response, "JupiterSession::GetBuildPart"sv);
-}
-
-JupiterResult
-JupiterSession::PutBuildBlob(std::string_view Namespace,
- std::string_view BucketId,
- const Oid& BuildId,
- const IoHash& Hash,
- ZenContentType ContentType,
- const CompositeBuffer& Payload)
-{
- HttpClient::Response Response =
- m_HttpClient.Upload(fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}", Namespace, BucketId, BuildId, Hash.ToHexString()),
- Payload,
- ContentType);
- return detail::ConvertResponse(Response, "JupiterSession::PutBuildBlob"sv);
-}
-
-JupiterResult
-JupiterSession::PutMultipartBuildBlob(std::string_view Namespace,
- std::string_view BucketId,
- const Oid& BuildId,
- const IoHash& Hash,
- ZenContentType ContentType,
- uint64_t PayloadSize,
- std::function<IoBuffer(uint64_t Offset, uint64_t Size)>&& Transmitter,
- std::vector<std::function<JupiterResult(bool& OutIsComplete)>>& OutWorkItems)
-{
- struct MultipartUploadResponse
- {
- struct Part
- {
- uint64_t FirstByte;
- uint64_t LastByte;
- std::string PartId;
- std::string QueryString;
- };
-
- std::string UploadId;
- std::string BlobName;
- std::vector<Part> Parts;
-
- static MultipartUploadResponse Parse(CbObject& Payload)
- {
- MultipartUploadResponse Result;
- Result.UploadId = Payload["uploadId"sv].AsString();
- Result.BlobName = Payload["blobName"sv].AsString();
- CbArrayView PartsArray = Payload["parts"sv].AsArrayView();
- Result.Parts.reserve(PartsArray.Num());
- for (CbFieldView PartView : PartsArray)
- {
- CbObjectView PartObject = PartView.AsObjectView();
- Result.Parts.emplace_back(Part{
- .FirstByte = PartObject["firstByte"sv].AsUInt64(),
- .LastByte = PartObject["lastByte"sv].AsUInt64(),
- .PartId = std::string(PartObject["partId"sv].AsString()),
- .QueryString = std::string(PartObject["queryString"sv].AsString()),
- });
- }
- return Result;
- }
- };
-
- CbObjectWriter StartMultipartPayloadWriter;
- StartMultipartPayloadWriter.AddInteger("blobLength"sv, PayloadSize);
- CbObject StartMultipartPayload = StartMultipartPayloadWriter.Save();
-
- std::string StartMultipartResponseRequestString =
- fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}/startMultipartUpload", Namespace, BucketId, BuildId, Hash.ToHexString());
- // ZEN_INFO("POST: {}", StartMultipartResponseRequestString);
- HttpClient::Response StartMultipartResponse =
- m_HttpClient.Post(StartMultipartResponseRequestString, StartMultipartPayload, HttpClient::Accept(ZenContentType::kCbObject));
- if (!StartMultipartResponse.IsSuccess())
- {
- ZEN_WARN("{}", StartMultipartResponse.ErrorMessage("startMultipartUpload: "));
- return detail::ConvertResponse(StartMultipartResponse, "JupiterSession::PutMultipartBuildBlob"sv);
- }
- CbValidateError ValidateResult = CbValidateError::None;
- CbObject ResponseObject = ValidateAndReadCompactBinaryObject(IoBuffer(StartMultipartResponse.ResponsePayload), ValidateResult);
- if (ValidateResult != CbValidateError::None)
- {
- JupiterResult Result = detail::ConvertResponse(StartMultipartResponse, "JupiterSession::PutMultipartBuildBlob"sv);
- Result.ErrorCode = (int32)HttpResponseCode::UnsupportedMediaType;
- Result.Reason = fmt::format("Invalid multipart response object format: '{}'", ToString(ValidateResult));
- return Result;
- }
-
- struct WorkloadData
- {
- MultipartUploadResponse PartDescription;
- std::function<IoBuffer(uint64_t Offset, uint64_t Size)> Transmitter;
- std::atomic<size_t> PartsLeft;
- };
-
- std::shared_ptr<WorkloadData> Workload(std::make_shared<WorkloadData>());
-
- Workload->PartDescription = MultipartUploadResponse::Parse(ResponseObject);
- Workload->Transmitter = std::move(Transmitter);
- Workload->PartsLeft = Workload->PartDescription.Parts.size();
-
- for (size_t PartIndex = 0; PartIndex < Workload->PartDescription.Parts.size(); PartIndex++)
- {
- OutWorkItems.emplace_back([this, Namespace, BucketId, BuildId, Hash, ContentType, Workload, PartIndex](
- bool& OutIsComplete) -> JupiterResult {
- const MultipartUploadResponse::Part& Part = Workload->PartDescription.Parts[PartIndex];
- IoBuffer PartPayload = Workload->Transmitter(Part.FirstByte, Part.LastByte - Part.FirstByte);
- std::string MultipartUploadResponseRequestString =
- fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}/uploadMultipart{}&supportsRedirect={}",
- Namespace,
- BucketId,
- BuildId,
- Hash.ToHexString(),
- Part.QueryString,
- m_AllowRedirect ? "true"sv : "false"sv);
- // ZEN_INFO("PUT: {}", MultipartUploadResponseRequestString);
- HttpClient::Response MultipartUploadResponse = m_HttpClient.Put(MultipartUploadResponseRequestString, PartPayload);
- if (!MultipartUploadResponse.IsSuccess())
- {
- ZEN_WARN("{}", MultipartUploadResponse.ErrorMessage(MultipartUploadResponseRequestString));
- }
- OutIsComplete = Workload->PartsLeft.fetch_sub(1) == 1;
- if (OutIsComplete)
- {
- int64_t TotalUploadedBytes = MultipartUploadResponse.UploadedBytes;
- int64_t TotalDownloadedBytes = MultipartUploadResponse.DownloadedBytes;
- double TotalElapsedSeconds = MultipartUploadResponse.ElapsedSeconds;
- HttpClient::Response MultipartEndResponse = MultipartUploadResponse;
- while (MultipartEndResponse.IsSuccess())
- {
- CbObjectWriter CompletePayloadWriter;
- CompletePayloadWriter.AddString("blobName"sv, Workload->PartDescription.BlobName);
- CompletePayloadWriter.AddString("uploadId"sv, Workload->PartDescription.UploadId);
- CompletePayloadWriter.AddBool("isCompressed"sv, ContentType == ZenContentType::kCompressedBinary);
- CompletePayloadWriter.BeginArray("partIds"sv);
- std::unordered_map<std::string, size_t> PartNameToIndex;
- for (size_t UploadPartIndex = 0; UploadPartIndex < Workload->PartDescription.Parts.size(); UploadPartIndex++)
- {
- const MultipartUploadResponse::Part& PartDescription = Workload->PartDescription.Parts[UploadPartIndex];
- PartNameToIndex.insert({PartDescription.PartId, UploadPartIndex});
- CompletePayloadWriter.AddString(PartDescription.PartId);
- }
- CompletePayloadWriter.EndArray(); // "partIds"
- CbObject CompletePayload = CompletePayloadWriter.Save();
-
- std::string MultipartEndResponseRequestString =
- fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}/completeMultipart", Namespace, BucketId, BuildId, Hash.ToHexString());
-
- MultipartEndResponse = m_HttpClient.Post(MultipartEndResponseRequestString,
- CompletePayload,
- HttpClient::Accept(ZenContentType::kCbObject));
- TotalUploadedBytes += MultipartEndResponse.UploadedBytes;
- TotalDownloadedBytes += MultipartEndResponse.DownloadedBytes;
- TotalElapsedSeconds += MultipartEndResponse.ElapsedSeconds;
- if (MultipartEndResponse.IsSuccess())
- {
- CbObject ResponseObject = MultipartEndResponse.AsObject();
- CbArrayView MissingPartsArrayView = ResponseObject["missingParts"sv].AsArrayView();
- if (MissingPartsArrayView.Num() == 0)
- {
- break;
- }
- else
- {
- for (CbFieldView PartIdView : MissingPartsArrayView)
- {
- std::string RetryPartId(PartIdView.AsString());
- size_t RetryPartIndex = PartNameToIndex.at(RetryPartId);
- const MultipartUploadResponse::Part& RetryPart = Workload->PartDescription.Parts[RetryPartIndex];
- IoBuffer RetryPartPayload =
- Workload->Transmitter(RetryPart.FirstByte, RetryPart.LastByte - RetryPart.FirstByte - 1);
- std::string RetryMultipartUploadResponseRequestString =
- fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}/uploadMultipart{}&supportsRedirect={}",
- Namespace,
- BucketId,
- BuildId,
- Hash.ToHexString(),
- RetryPart.QueryString,
- m_AllowRedirect ? "true"sv : "false"sv);
-
- MultipartUploadResponse = m_HttpClient.Put(RetryMultipartUploadResponseRequestString, RetryPartPayload);
- TotalUploadedBytes = MultipartUploadResponse.UploadedBytes;
- TotalDownloadedBytes = MultipartUploadResponse.DownloadedBytes;
- TotalElapsedSeconds = MultipartUploadResponse.ElapsedSeconds;
- if (!MultipartUploadResponse.IsSuccess())
- {
- ZEN_WARN("{}", MultipartUploadResponse.ErrorMessage(RetryMultipartUploadResponseRequestString));
- MultipartEndResponse = MultipartUploadResponse;
- }
- }
- }
- }
- else
- {
- ZEN_WARN("{}", MultipartEndResponse.ErrorMessage(MultipartEndResponseRequestString));
- }
- }
- MultipartEndResponse.UploadedBytes = TotalUploadedBytes;
- MultipartEndResponse.DownloadedBytes = TotalDownloadedBytes;
- MultipartEndResponse.ElapsedSeconds = TotalElapsedSeconds;
- return detail::ConvertResponse(MultipartEndResponse, "JupiterSession::PutMultipartBuildBlob"sv);
- }
- return detail::ConvertResponse(MultipartUploadResponse, "JupiterSession::PutMultipartBuildBlob"sv);
- });
- }
- return detail::ConvertResponse(StartMultipartResponse, "JupiterSession::PutMultipartBuildBlob"sv);
-}
-
-JupiterResult
-JupiterSession::GetMultipartBuildBlob(std::string_view Namespace,
- std::string_view BucketId,
- const Oid& BuildId,
- const IoHash& Hash,
- uint64_t ChunkSize,
- std::function<void(uint64_t Offset, const IoBuffer& Chunk)>&& OnReceive,
- std::function<void()>&& OnComplete,
- std::vector<std::function<JupiterResult()>>& OutWorkItems)
-{
- std::string RequestUrl = fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}?supportsRedirect={}",
- Namespace,
- BucketId,
- BuildId,
- Hash.ToHexString(),
- m_AllowRedirect ? "true"sv : "false"sv);
- HttpClient::Response Response =
- m_HttpClient.Get(RequestUrl, HttpClient::KeyValueMap({{"Range", fmt::format("bytes={}-{}", 0, ChunkSize - 1)}}));
- if (Response.IsSuccess())
- {
- if (std::string_view ContentRange = Response.Header.Entries["Content-Range"]; !ContentRange.empty())
- {
- if (std::string_view::size_type SizeDelimiterPos = ContentRange.find('/'); SizeDelimiterPos != std::string_view::npos)
- {
- if (std::optional<uint64_t> TotalSizeMaybe = ParseInt<uint64_t>(ContentRange.substr(SizeDelimiterPos + 1));
- TotalSizeMaybe.has_value())
- {
- uint64_t TotalSize = TotalSizeMaybe.value();
- uint64_t PayloadSize = Response.ResponsePayload.GetSize();
-
- OnReceive(0, Response.ResponsePayload);
-
- if (TotalSize > PayloadSize)
- {
- struct WorkloadData
- {
- std::function<void(uint64_t Offset, const IoBuffer& Chunk)> OnReceive;
- std::function<void()> OnComplete;
- std::atomic<uint64_t> BytesRemaining;
- };
-
- std::shared_ptr<WorkloadData> Workload(std::make_shared<WorkloadData>());
- Workload->OnReceive = std::move(OnReceive);
- Workload->OnComplete = std::move(OnComplete);
- Workload->BytesRemaining = TotalSize - PayloadSize;
-
- uint64_t Offset = PayloadSize;
- while (Offset < TotalSize)
- {
- uint64_t PartSize = Min(ChunkSize, TotalSize - Offset);
- OutWorkItems.emplace_back([this,
- Namespace = std::string(Namespace),
- BucketId = std::string(BucketId),
- BuildId = Oid(BuildId),
- Hash = IoHash(Hash),
- TotalSize,
- Workload,
- Offset,
- PartSize]() -> JupiterResult {
- std::string RequestUrl = fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}?supportsRedirect={}",
- Namespace,
- BucketId,
- BuildId,
- Hash.ToHexString(),
- m_AllowRedirect ? "true"sv : "false"sv);
- HttpClient::Response Response = m_HttpClient.Get(
- RequestUrl,
- HttpClient::KeyValueMap({{"Range", fmt::format("bytes={}-{}", Offset, Offset + PartSize - 1)}}));
- if (Response.IsSuccess())
- {
- Workload->OnReceive(Offset, Response.ResponsePayload);
- uint64_t ByteRemaning = Workload->BytesRemaining.fetch_sub(Response.ResponsePayload.GetSize());
- if (ByteRemaning == Response.ResponsePayload.GetSize())
- {
- Workload->OnComplete();
- }
- }
- return detail::ConvertResponse(Response, "JupiterSession::GetMultipartBuildBlob"sv);
- });
- Offset += PartSize;
- }
- }
- else
- {
- OnComplete();
- }
- return detail::ConvertResponse(Response, "JupiterSession::GetMultipartBuildBlob"sv);
- }
- }
- }
- OnReceive(0, Response.ResponsePayload);
- OnComplete();
- }
- return detail::ConvertResponse(Response, "JupiterSession::GetMultipartBuildBlob"sv);
-}
-
-JupiterResult
-JupiterSession::GetBuildBlob(std::string_view Namespace,
- std::string_view BucketId,
- const Oid& BuildId,
- const IoHash& Hash,
- std::filesystem::path TempFolderPath,
- uint64_t Offset,
- uint64_t Size)
-{
- HttpClient::KeyValueMap Headers;
- if (Offset != 0 || Size != (uint64_t)-1)
- {
- Headers.Entries.insert({"Range", fmt::format("bytes={}-{}", Offset, Offset + Size - 1)});
- }
- HttpClient::Response Response = m_HttpClient.Download(fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}?supportsRedirect={}",
- Namespace,
- BucketId,
- BuildId,
- Hash.ToHexString(),
- m_AllowRedirect ? "true"sv : "false"sv),
- TempFolderPath,
- Headers);
- if (Response.IsSuccess())
- {
- // If we get a redirect to S3 or a non-Jupiter endpoint the content type will not be correct, validate it and set it
- if (m_AllowRedirect && (Response.ResponsePayload.GetContentType() == HttpContentType::kBinary))
- {
- IoHash ValidateRawHash;
- uint64_t ValidateRawSize = 0;
- ZEN_ASSERT_SLOW(CompressedBuffer::ValidateCompressedHeader(Response.ResponsePayload, ValidateRawHash, ValidateRawSize));
- ZEN_ASSERT_SLOW(ValidateRawHash == Hash);
- ZEN_ASSERT_SLOW(ValidateRawSize > 0);
- ZEN_UNUSED(ValidateRawHash, ValidateRawSize);
- Response.ResponsePayload.SetContentType(ZenContentType::kCompressedBinary);
- }
- }
- return detail::ConvertResponse(Response, "JupiterSession::GetBuildBlob"sv);
-}
-
-JupiterResult
-JupiterSession::PutBlockMetadata(std::string_view Namespace,
- std::string_view BucketId,
- const Oid& BuildId,
- const IoHash& Hash,
- const IoBuffer& Payload)
-{
- ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject);
- HttpClient::Response Response =
- m_HttpClient.Put(fmt::format("/api/v2/builds/{}/{}/{}/blocks/{}/metadata", Namespace, BucketId, BuildId, Hash.ToHexString()),
- Payload);
- return detail::ConvertResponse(Response, "JupiterSession::PutBlockMetadata"sv);
-}
-
-FinalizeBuildPartResult
-JupiterSession::FinalizeBuildPart(std::string_view Namespace,
- std::string_view BucketId,
- const Oid& BuildId,
- const Oid& PartId,
- const IoHash& RawHash)
-{
- HttpClient::Response Response = m_HttpClient.Post(
- fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/finalize/{}", Namespace, BucketId, BuildId, PartId, RawHash.ToHexString()),
- HttpClient::Accept(ZenContentType::kCbObject));
-
- FinalizeBuildPartResult Result = {detail::ConvertResponse(Response, "JupiterSession::FinalizeBuildPart"sv)};
- if (Result.Success)
- {
- std::string JsonError;
- json11::Json Json = json11::Json::parse(Response.ToText(), JsonError);
- if (JsonError.empty())
- {
- json11::Json::array Needs = Json["needs"].array_items();
- for (const auto& Need : Needs)
- {
- Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value()));
- }
- }
- }
- return Result;
-}
-
-JupiterResult
-JupiterSession::FindBlocks(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, uint64_t MaxBlockCount)
-{
- const std::string Parameters = MaxBlockCount == (uint64_t)-1 ? "" : fmt::format("?count={}", MaxBlockCount);
- HttpClient::Response Response =
- m_HttpClient.Get(fmt::format("/api/v2/builds/{}/{}/{}/blocks/listBlocks{}", Namespace, BucketId, BuildId, Parameters),
- HttpClient::Accept(ZenContentType::kCbObject));
- return detail::ConvertResponse(Response, "JupiterSession::FindBlocks"sv);
-}
-
-JupiterResult
-JupiterSession::GetBlockMetadata(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, IoBuffer Payload)
-{
- ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject);
- HttpClient::Response Response =
- m_HttpClient.Post(fmt::format("/api/v2/builds/{}/{}/{}/blocks/getBlockMetadata", Namespace, BucketId, BuildId),
- Payload,
- HttpClient::Accept(ZenContentType::kCbObject));
- return detail::ConvertResponse(Response, "JupiterSession::GetBlockMetadata"sv);
-}
-
-JupiterResult
-JupiterSession::PutBuildPartStats(std::string_view Namespace,
- std::string_view BucketId,
- const Oid& BuildId,
- const Oid& BuildPartId,
- IoBuffer Payload)
-{
- ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject);
- HttpClient::Response Response =
- m_HttpClient.Put(fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/stats", Namespace, BucketId, BuildId, BuildPartId),
- Payload,
- HttpClient::Accept(ZenContentType::kCbObject));
- return detail::ConvertResponse(Response, "JupiterSession::PutBuildPartStats"sv);
-}
-
-} // namespace zen
diff --git a/src/zenutil/parallelwork.cpp b/src/zenutil/parallelwork.cpp
deleted file mode 100644
index 95417078a..000000000
--- a/src/zenutil/parallelwork.cpp
+++ /dev/null
@@ -1,264 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#include <zenutil/parallelwork.h>
-
-#include <zencore/callstack.h>
-#include <zencore/except.h>
-#include <zencore/fmtutils.h>
-#include <zencore/logging.h>
-
-#include <typeinfo>
-
-#if ZEN_WITH_TESTS
-# include <zencore/testing.h>
-#endif // ZEN_WITH_TESTS
-
-namespace zen {
-
-ParallelWork::ParallelWork(std::atomic<bool>& AbortFlag, std::atomic<bool>& PauseFlag, WorkerThreadPool::EMode Mode)
-: m_AbortFlag(AbortFlag)
-, m_PauseFlag(PauseFlag)
-, m_Mode(Mode)
-, m_PendingWork(1)
-{
-}
-
-ParallelWork::~ParallelWork()
-{
- try
- {
- if (!m_DispatchComplete)
- {
- ZEN_ASSERT(m_PendingWork.Remaining() > 0);
- ZEN_WARN(
- "ParallelWork disposed without explicit wait for completion, likely caused by an exception, waiting for dispatched threads "
- "to complete");
- m_PendingWork.CountDown();
- m_DispatchComplete = true;
- }
- const bool WaitSucceeded = m_PendingWork.Wait();
- const ptrdiff_t RemainingWork = m_PendingWork.Remaining();
- if (!WaitSucceeded)
- {
- ZEN_ERROR("ParallelWork::~ParallelWork(): waiting for latch failed, pending work count at {}", RemainingWork);
- }
- if (RemainingWork != 0)
- {
- void* Frames[8];
- uint32_t FrameCount = GetCallstack(2, 8, Frames);
- CallstackFrames* Callstack = CreateCallstack(FrameCount, Frames);
- auto _ = MakeGuard([Callstack]() { FreeCallstack(Callstack); });
- ZEN_WARN("ParallelWork::~ParallelWork(): waited for outstanding work but pending work count is {} instead of 0", RemainingWork);
-
- uint32_t WaitedMs = 0;
- while (m_PendingWork.Remaining() > 0 && WaitedMs < 2000)
- {
- Sleep(50);
- WaitedMs += 50;
- }
- ptrdiff_t RemainingWorkAfterSafetyWait = m_PendingWork.Remaining();
- if (RemainingWorkAfterSafetyWait != 0)
- {
- ZEN_ERROR("ParallelWork::~ParallelWork(): safety wait for {} tasks failed, pending work count at {} after {}\n{}",
- RemainingWork,
- RemainingWorkAfterSafetyWait,
- NiceLatencyNs(WaitedMs * 1000000u),
- CallstackToString(Callstack, " "))
- }
- else
- {
- ZEN_ERROR("ParallelWork::~ParallelWork(): safety wait for {} tasks completed after {}\n{}",
- RemainingWork,
- NiceLatencyNs(WaitedMs * 1000000u),
- CallstackToString(Callstack, " "));
- }
- }
- }
- catch (const std::exception& Ex)
- {
- ZEN_ERROR("Exception in ParallelWork::~ParallelWork(): {}", Ex.what());
- }
-}
-
-ParallelWork::ExceptionCallback
-ParallelWork::DefaultErrorFunction()
-{
- return [&](std::exception_ptr Ex, std::atomic<bool>& AbortFlag) {
- m_ErrorLock.WithExclusiveLock([&]() { m_Errors.push_back(Ex); });
- AbortFlag = true;
- };
-}
-
-void
-ParallelWork::Wait(int32_t UpdateIntervalMS, UpdateCallback&& UpdateCallback)
-{
- ZEN_ASSERT(!m_DispatchComplete);
- ZEN_ASSERT(m_PendingWork.Remaining() > 0);
- m_PendingWork.CountDown();
- m_DispatchComplete = true;
-
- while (!m_PendingWork.Wait(UpdateIntervalMS))
- {
- UpdateCallback(m_AbortFlag.load(), m_PauseFlag.load(), m_PendingWork.Remaining());
- }
-
- RethrowErrors();
-}
-
-void
-ParallelWork::Wait()
-{
- ZEN_ASSERT(!m_DispatchComplete);
- ZEN_ASSERT(m_PendingWork.Remaining() > 0);
- m_PendingWork.CountDown();
- m_DispatchComplete = true;
-
- const bool WaitSucceeded = m_PendingWork.Wait();
- const ptrdiff_t RemainingWork = m_PendingWork.Remaining();
- if (!WaitSucceeded)
- {
- ZEN_ERROR("ParallelWork::Wait(): waiting for latch failed, pending work count at {}", RemainingWork);
- }
- else if (RemainingWork != 0)
- {
- ZEN_ERROR("ParallelWork::Wait(): pending work count at {} after successful wait for latch", RemainingWork);
- }
-
- RethrowErrors();
-}
-
-void
-ParallelWork::RethrowErrors()
-{
- if (!m_Errors.empty())
- {
- if (m_Errors.size() > 1)
- {
- ZEN_INFO("Multiple exceptions thrown during ParallelWork execution, dropping the following exceptions:");
- auto It = m_Errors.begin() + 1;
- while (It != m_Errors.end())
- {
- try
- {
- std::rethrow_exception(*It);
- }
- catch (const std::exception& Ex)
- {
- ZEN_INFO(" {}", Ex.what());
- }
- It++;
- }
- }
- std::exception_ptr Ex = m_Errors.front();
- m_Errors.clear();
- std::rethrow_exception(Ex);
- }
-}
-
-#if ZEN_WITH_TESTS
-
-TEST_CASE("parallellwork.nowork")
-{
- std::atomic<bool> AbortFlag;
- std::atomic<bool> PauseFlag;
- ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
- Work.Wait();
-}
-
-TEST_CASE("parallellwork.basic")
-{
- WorkerThreadPool WorkerPool(2);
-
- std::atomic<bool> AbortFlag;
- std::atomic<bool> PauseFlag;
- ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
- for (uint32_t I = 0; I < 5; I++)
- {
- Work.ScheduleWork(WorkerPool, [](std::atomic<bool>& AbortFlag) { CHECK(!AbortFlag); });
- }
- Work.Wait();
-}
-
-TEST_CASE("parallellwork.throws_in_work")
-{
- WorkerThreadPool WorkerPool(2);
-
- std::atomic<bool> AbortFlag;
- std::atomic<bool> PauseFlag;
- ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
- for (uint32_t I = 0; I < 10; I++)
- {
- Work.ScheduleWork(WorkerPool, [I](std::atomic<bool>& AbortFlag) {
- ZEN_UNUSED(AbortFlag);
- if (I > 3)
- {
- throw std::runtime_error("We throw in async thread");
- }
- else
- {
- Sleep(10);
- }
- });
- }
- CHECK_THROWS_WITH(Work.Wait(), "We throw in async thread");
-}
-
-TEST_CASE("parallellwork.throws_in_dispatch")
-{
- WorkerThreadPool WorkerPool(2);
- std::atomic<uint32_t> ExecutedCount;
- try
- {
- std::atomic<bool> AbortFlag;
- std::atomic<bool> PauseFlag;
- ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
- for (uint32_t I = 0; I < 5; I++)
- {
- Work.ScheduleWork(WorkerPool, [I, &ExecutedCount](std::atomic<bool>& AbortFlag) {
- if (AbortFlag.load())
- {
- return;
- }
- ExecutedCount++;
- });
- if (I == 3)
- {
- throw std::runtime_error("We throw in dispatcher thread");
- }
- }
- CHECK(false);
- }
- catch (const std::runtime_error& Ex)
- {
- CHECK_EQ("We throw in dispatcher thread", std::string(Ex.what()));
- CHECK_LE(ExecutedCount.load(), 4);
- }
-}
-
-TEST_CASE("parallellwork.limitqueue")
-{
- WorkerThreadPool WorkerPool(2);
-
- std::atomic<bool> AbortFlag;
- std::atomic<bool> PauseFlag;
- ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::DisableBacklog);
- for (uint32_t I = 0; I < 5; I++)
- {
- Work.ScheduleWork(WorkerPool, [](std::atomic<bool>& AbortFlag) {
- if (AbortFlag.load())
- {
- return;
- }
- Sleep(10);
- });
- }
- Work.Wait();
-}
-
-void
-parallellwork_forcelink()
-{
-}
-#endif // ZEN_WITH_TESTS
-
-} // namespace zen
diff --git a/src/zenutil/zenutil.cpp b/src/zenutil/zenutil.cpp
index 88be8a244..586fd1513 100644
--- a/src/zenutil/zenutil.cpp
+++ b/src/zenutil/zenutil.cpp
@@ -7,7 +7,6 @@
# include <zenutil/cache/cacherequests.h>
# include <zenutil/cache/rpcrecording.h>
# include <zenutil/commandlineoptions.h>
-# include <zenutil/parallelwork.h>
# include <zenutil/wildcard.h>
namespace zen {
@@ -19,7 +18,6 @@ zenutil_forcelinktests()
cache::rpcrecord_forcelink();
cacherequests_forcelink();
commandlineoptions_forcelink();
- parallellwork_forcelink();
wildcard_forcelink();
}