aboutsummaryrefslogtreecommitdiff
path: root/src/zenutil/jupiter
diff options
context:
space:
mode:
authorzousar <[email protected]>2025-06-24 16:26:29 -0600
committerzousar <[email protected]>2025-06-24 16:26:29 -0600
commitbb298631ba35a323827dda0b8cd6158e276b5f61 (patch)
tree7ba8db91c44ce83f2c518f80f80ab14910eefa6f /src/zenutil/jupiter
parentChange to PutResult structure (diff)
parent5.6.14 (diff)
downloadzen-bb298631ba35a323827dda0b8cd6158e276b5f61.tar.xz
zen-bb298631ba35a323827dda0b8cd6158e276b5f61.zip
Merge branch 'main' into zs/put-overwrite-policy
Diffstat (limited to 'src/zenutil/jupiter')
-rw-r--r--src/zenutil/jupiter/jupiterbuildstorage.cpp479
-rw-r--r--src/zenutil/jupiter/jupiterclient.cpp1
-rw-r--r--src/zenutil/jupiter/jupitersession.cpp393
3 files changed, 855 insertions, 18 deletions
diff --git a/src/zenutil/jupiter/jupiterbuildstorage.cpp b/src/zenutil/jupiter/jupiterbuildstorage.cpp
new file mode 100644
index 000000000..9974725ff
--- /dev/null
+++ b/src/zenutil/jupiter/jupiterbuildstorage.cpp
@@ -0,0 +1,479 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <zenutil/jupiter/jupiterbuildstorage.h>
+
+#include <zencore/compactbinarybuilder.h>
+#include <zencore/fmtutils.h>
+#include <zencore/scopeguard.h>
+#include <zencore/timer.h>
+#include <zencore/trace.h>
+#include <zenutil/jupiter/jupitersession.h>
+
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <tsl/robin_map.h>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+namespace zen {
+
+using namespace std::literals;
+
+class JupiterBuildStorage : public BuildStorage
+{
+public:
+ JupiterBuildStorage(LoggerRef InLog,
+ HttpClient& InHttpClient,
+ Statistics& Stats,
+ std::string_view Namespace,
+ std::string_view Bucket,
+ bool AllowRedirect,
+ const std::filesystem::path& TempFolderPath)
+ : m_Session(InLog, InHttpClient, AllowRedirect)
+ , m_Stats(Stats)
+ , m_Namespace(Namespace)
+ , m_Bucket(Bucket)
+ , m_TempFolderPath(TempFolderPath)
+ {
+ }
+ virtual ~JupiterBuildStorage() {}
+
+ virtual CbObject ListNamespaces(bool bRecursive) override
+ {
+ ZEN_TRACE_CPU("Jupiter::ListNamespaces");
+
+ Stopwatch ExecutionTimer;
+ auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
+ JupiterResult ListResult = m_Session.ListBuildNamespaces();
+ AddStatistic(ListResult);
+ if (!ListResult.Success)
+ {
+ throw std::runtime_error(fmt::format("Failed listing namespaces: {} ({})", ListResult.Reason, ListResult.ErrorCode));
+ }
+ CbObject NamespaceResponse = PayloadToCbObject("Failed listing namespaces"sv, ListResult.Response);
+
+ CbObjectWriter Response;
+ Response.BeginArray("results"sv);
+ for (CbFieldView NamespaceField : NamespaceResponse["namespaces"])
+ {
+ std::string_view Namespace = NamespaceField.AsString();
+ if (!Namespace.empty())
+ {
+ Response.BeginObject();
+ Response.AddString("name", Namespace);
+
+ if (bRecursive)
+ {
+ JupiterResult BucketsResult = m_Session.ListBuildBuckets(Namespace);
+ AddStatistic(BucketsResult);
+ if (!BucketsResult.Success)
+ {
+ throw std::runtime_error(
+ fmt::format("Failed listing namespaces: {} ({})", BucketsResult.Reason, BucketsResult.ErrorCode));
+ }
+ CbObject BucketResponse = PayloadToCbObject("Failed listing namespaces"sv, BucketsResult.Response);
+
+ Response.BeginArray("items");
+ for (CbFieldView BucketField : BucketResponse["buckets"])
+ {
+ std::string_view Bucket = BucketField.AsString();
+ if (!Bucket.empty())
+ {
+ Response.AddString(Bucket);
+ }
+ }
+ Response.EndArray();
+ }
+
+ Response.EndObject();
+ }
+ }
+ Response.EndArray();
+
+ return Response.Save();
+ }
+
+ virtual CbObject ListBuilds(CbObject Query) override
+ {
+ ZEN_TRACE_CPU("Jupiter::ListBuilds");
+
+ Stopwatch ExecutionTimer;
+ auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
+ IoBuffer Payload = Query.GetBuffer().AsIoBuffer();
+ Payload.SetContentType(ZenContentType::kCbObject);
+ JupiterResult ListResult = m_Session.ListBuilds(m_Namespace, m_Bucket, Payload);
+ AddStatistic(ListResult);
+ if (!ListResult.Success)
+ {
+ throw std::runtime_error(fmt::format("Failed listing builds: {} ({})", ListResult.Reason, ListResult.ErrorCode));
+ }
+ return PayloadToCbObject("Failed listing builds"sv, ListResult.Response);
+ }
+
+ virtual CbObject PutBuild(const Oid& BuildId, const CbObject& MetaData) override
+ {
+ ZEN_TRACE_CPU("Jupiter::PutBuild");
+
+ Stopwatch ExecutionTimer;
+ auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
+ IoBuffer Payload = MetaData.GetBuffer().AsIoBuffer();
+ Payload.SetContentType(ZenContentType::kCbObject);
+ JupiterResult PutResult = m_Session.PutBuild(m_Namespace, m_Bucket, BuildId, Payload);
+ AddStatistic(PutResult);
+ if (!PutResult.Success)
+ {
+ throw std::runtime_error(fmt::format("Failed creating build: {} ({})", PutResult.Reason, PutResult.ErrorCode));
+ }
+ return PayloadToCbObject(fmt::format("Failed creating build: {}", BuildId), PutResult.Response);
+ }
+
+ virtual CbObject GetBuild(const Oid& BuildId) override
+ {
+ ZEN_TRACE_CPU("Jupiter::GetBuild");
+
+ Stopwatch ExecutionTimer;
+ auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
+ JupiterResult GetBuildResult = m_Session.GetBuild(m_Namespace, m_Bucket, BuildId);
+ AddStatistic(GetBuildResult);
+ if (!GetBuildResult.Success)
+ {
+ throw std::runtime_error(fmt::format("Failed fetching build: {} ({})", GetBuildResult.Reason, GetBuildResult.ErrorCode));
+ }
+ return PayloadToCbObject(fmt::format("Failed fetching build {}:", BuildId), GetBuildResult.Response);
+ }
+
+ virtual void FinalizeBuild(const Oid& BuildId) override
+ {
+ ZEN_TRACE_CPU("Jupiter::FinalizeBuild");
+
+ Stopwatch ExecutionTimer;
+ auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
+ JupiterResult FinalizeBuildResult = m_Session.FinalizeBuild(m_Namespace, m_Bucket, BuildId);
+ AddStatistic(FinalizeBuildResult);
+ if (!FinalizeBuildResult.Success)
+ {
+ throw std::runtime_error(
+ fmt::format("Failed finalizing build part: {} ({})", FinalizeBuildResult.Reason, FinalizeBuildResult.ErrorCode));
+ }
+ }
+
+ virtual std::pair<IoHash, std::vector<IoHash>> PutBuildPart(const Oid& BuildId,
+ const Oid& BuildPartId,
+ std::string_view PartName,
+ const CbObject& MetaData) override
+ {
+ ZEN_TRACE_CPU("Jupiter::PutBuildPart");
+
+ Stopwatch ExecutionTimer;
+ auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
+ IoBuffer Payload = MetaData.GetBuffer().AsIoBuffer();
+ Payload.SetContentType(ZenContentType::kCbObject);
+ PutBuildPartResult PutPartResult = m_Session.PutBuildPart(m_Namespace, m_Bucket, BuildId, BuildPartId, PartName, Payload);
+ AddStatistic(PutPartResult);
+ if (!PutPartResult.Success)
+ {
+ throw std::runtime_error(fmt::format("Failed creating build part: {} ({})", PutPartResult.Reason, PutPartResult.ErrorCode));
+ }
+ return std::make_pair(PutPartResult.RawHash, std::move(PutPartResult.Needs));
+ }
+
+ virtual CbObject GetBuildPart(const Oid& BuildId, const Oid& BuildPartId) override
+ {
+ ZEN_TRACE_CPU("Jupiter::GetBuildPart");
+
+ Stopwatch ExecutionTimer;
+ auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
+ JupiterResult GetBuildPartResult = m_Session.GetBuildPart(m_Namespace, m_Bucket, BuildId, BuildPartId);
+ AddStatistic(GetBuildPartResult);
+ if (!GetBuildPartResult.Success)
+ {
+ throw std::runtime_error(fmt::format("Failed fetching build part {}: {} ({})",
+ BuildPartId,
+ GetBuildPartResult.Reason,
+ GetBuildPartResult.ErrorCode));
+ }
+ return PayloadToCbObject(fmt::format("Failed fetching build part {}:", BuildPartId), GetBuildPartResult.Response);
+ }
+
+ virtual std::vector<IoHash> FinalizeBuildPart(const Oid& BuildId, const Oid& BuildPartId, const IoHash& PartHash) override
+ {
+ ZEN_TRACE_CPU("Jupiter::FinalizeBuildPart");
+
+ Stopwatch ExecutionTimer;
+ auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
+ FinalizeBuildPartResult FinalizePartResult = m_Session.FinalizeBuildPart(m_Namespace, m_Bucket, BuildId, BuildPartId, PartHash);
+ AddStatistic(FinalizePartResult);
+ if (!FinalizePartResult.Success)
+ {
+ throw std::runtime_error(
+ fmt::format("Failed finalizing build part: {} ({})", FinalizePartResult.Reason, FinalizePartResult.ErrorCode));
+ }
+ return std::move(FinalizePartResult.Needs);
+ }
+
+ virtual void PutBuildBlob(const Oid& BuildId,
+ const IoHash& RawHash,
+ ZenContentType ContentType,
+ const CompositeBuffer& Payload) override
+ {
+ ZEN_TRACE_CPU("Jupiter::PutBuildBlob");
+
+ Stopwatch ExecutionTimer;
+ auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
+ JupiterResult PutBlobResult = m_Session.PutBuildBlob(m_Namespace, m_Bucket, BuildId, RawHash, ContentType, Payload);
+ AddStatistic(PutBlobResult);
+ if (!PutBlobResult.Success)
+ {
+ throw std::runtime_error(fmt::format("Failed putting build part: {} ({})", PutBlobResult.Reason, PutBlobResult.ErrorCode));
+ }
+ }
+
+ virtual std::vector<std::function<void()>> PutLargeBuildBlob(const Oid& BuildId,
+ const IoHash& RawHash,
+ ZenContentType ContentType,
+ uint64_t PayloadSize,
+ std::function<IoBuffer(uint64_t Offset, uint64_t Size)>&& Transmitter,
+ std::function<void(uint64_t, bool)>&& OnSentBytes) override
+ {
+ ZEN_TRACE_CPU("Jupiter::PutLargeBuildBlob");
+
+ Stopwatch ExecutionTimer;
+ auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
+ std::vector<std::function<JupiterResult(bool&)>> WorkItems;
+ JupiterResult PutMultipartBlobResult = m_Session.PutMultipartBuildBlob(m_Namespace,
+ m_Bucket,
+ BuildId,
+ RawHash,
+ ContentType,
+ PayloadSize,
+ std::move(Transmitter),
+ WorkItems);
+ AddStatistic(PutMultipartBlobResult);
+ if (!PutMultipartBlobResult.Success)
+ {
+ throw std::runtime_error(
+ fmt::format("Failed putting build part: {} ({})", PutMultipartBlobResult.Reason, PutMultipartBlobResult.ErrorCode));
+ }
+ OnSentBytes(PutMultipartBlobResult.SentBytes, WorkItems.empty());
+
+ std::vector<std::function<void()>> WorkList;
+ for (auto& WorkItem : WorkItems)
+ {
+ WorkList.emplace_back([this, WorkItem = std::move(WorkItem), OnSentBytes]() {
+ Stopwatch ExecutionTimer;
+ auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
+ bool IsComplete = false;
+ JupiterResult PartResult = WorkItem(IsComplete);
+ AddStatistic(PartResult);
+ if (!PartResult.Success)
+ {
+ throw std::runtime_error(fmt::format("Failed putting build part: {} ({})", PartResult.Reason, PartResult.ErrorCode));
+ }
+ OnSentBytes(PartResult.SentBytes, IsComplete);
+ });
+ }
+ return WorkList;
+ }
+
+ virtual IoBuffer GetBuildBlob(const Oid& BuildId, const IoHash& RawHash, uint64_t RangeOffset, uint64_t RangeBytes) override
+ {
+ ZEN_TRACE_CPU("Jupiter::GetBuildBlob");
+
+ Stopwatch ExecutionTimer;
+ auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
+ CreateDirectories(m_TempFolderPath);
+ JupiterResult GetBuildBlobResult =
+ m_Session.GetBuildBlob(m_Namespace, m_Bucket, BuildId, RawHash, m_TempFolderPath, RangeOffset, RangeBytes);
+ AddStatistic(GetBuildBlobResult);
+ if (!GetBuildBlobResult.Success)
+ {
+ throw std::runtime_error(
+ fmt::format("Failed fetching build blob {}: {} ({})", RawHash, GetBuildBlobResult.Reason, GetBuildBlobResult.ErrorCode));
+ }
+ return std::move(GetBuildBlobResult.Response);
+ }
+
+ virtual std::vector<std::function<void()>> GetLargeBuildBlob(const Oid& BuildId,
+ const IoHash& RawHash,
+ uint64_t ChunkSize,
+ std::function<void(uint64_t Offset, const IoBuffer& Chunk)>&& OnReceive,
+ std::function<void()>&& OnComplete) override
+ {
+ ZEN_TRACE_CPU("Jupiter::GetLargeBuildBlob");
+
+ Stopwatch ExecutionTimer;
+ auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
+ std::vector<std::function<JupiterResult()>> WorkItems;
+ JupiterResult GetMultipartBlobResult = m_Session.GetMultipartBuildBlob(m_Namespace,
+ m_Bucket,
+ BuildId,
+ RawHash,
+ ChunkSize,
+ std::move(OnReceive),
+ std::move(OnComplete),
+ WorkItems);
+
+ AddStatistic(GetMultipartBlobResult);
+ if (!GetMultipartBlobResult.Success)
+ {
+ throw std::runtime_error(
+ fmt::format("Failed getting build part: {} ({})", GetMultipartBlobResult.Reason, GetMultipartBlobResult.ErrorCode));
+ }
+ std::vector<std::function<void()>> WorkList;
+ for (auto& WorkItem : WorkItems)
+ {
+ WorkList.emplace_back([this, WorkItem = std::move(WorkItem)]() {
+ Stopwatch ExecutionTimer;
+ auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
+ JupiterResult PartResult = WorkItem();
+ AddStatistic(PartResult);
+ if (!PartResult.Success)
+ {
+ throw std::runtime_error(fmt::format("Failed getting build part: {} ({})", PartResult.Reason, PartResult.ErrorCode));
+ }
+ });
+ }
+ return WorkList;
+ }
+
+ virtual bool PutBlockMetadata(const Oid& BuildId, const IoHash& BlockRawHash, const CbObject& MetaData) override
+ {
+ ZEN_TRACE_CPU("Jupiter::PutBlockMetadata");
+
+ Stopwatch ExecutionTimer;
+ auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
+ IoBuffer Payload = MetaData.GetBuffer().AsIoBuffer();
+ Payload.SetContentType(ZenContentType::kCbObject);
+ JupiterResult PutMetaResult = m_Session.PutBlockMetadata(m_Namespace, m_Bucket, BuildId, BlockRawHash, Payload);
+ AddStatistic(PutMetaResult);
+ if (!PutMetaResult.Success)
+ {
+ if (PutMetaResult.ErrorCode == int32_t(HttpResponseCode::NotFound))
+ {
+ return false;
+ }
+ throw std::runtime_error(
+ fmt::format("Failed putting build block metadata: {} ({})", PutMetaResult.Reason, PutMetaResult.ErrorCode));
+ }
+ return true;
+ }
+
+ virtual CbObject FindBlocks(const Oid& BuildId, uint64_t MaxBlockCount) override
+ {
+ ZEN_TRACE_CPU("Jupiter::FindBlocks");
+
+ Stopwatch ExecutionTimer;
+ auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
+ JupiterResult FindResult = m_Session.FindBlocks(m_Namespace, m_Bucket, BuildId, MaxBlockCount);
+ AddStatistic(FindResult);
+ if (!FindResult.Success)
+ {
+ throw std::runtime_error(fmt::format("Failed fetching known blocks: {} ({})", FindResult.Reason, FindResult.ErrorCode));
+ }
+ return PayloadToCbObject("Failed fetching known blocks"sv, FindResult.Response);
+ }
+
+ virtual CbObject GetBlockMetadatas(const Oid& BuildId, std::span<const IoHash> BlockHashes) override
+ {
+ ZEN_TRACE_CPU("Jupiter::GetBlockMetadata");
+
+ Stopwatch ExecutionTimer;
+ auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
+ CbObjectWriter Request;
+
+ Request.BeginArray("blocks"sv);
+ for (const IoHash& BlockHash : BlockHashes)
+ {
+ Request.AddHash(BlockHash);
+ }
+ Request.EndArray();
+
+ IoBuffer Payload = Request.Save().GetBuffer().AsIoBuffer();
+ Payload.SetContentType(ZenContentType::kCbObject);
+ JupiterResult GetBlockMetadataResult = m_Session.GetBlockMetadata(m_Namespace, m_Bucket, BuildId, Payload);
+ AddStatistic(GetBlockMetadataResult);
+ if (!GetBlockMetadataResult.Success)
+ {
+ throw std::runtime_error(
+ fmt::format("Failed fetching block metadatas: {} ({})", GetBlockMetadataResult.Reason, GetBlockMetadataResult.ErrorCode));
+ }
+ return PayloadToCbObject("Failed fetching block metadatas", GetBlockMetadataResult.Response);
+ }
+
+ virtual void PutBuildPartStats(const Oid& BuildId,
+ const Oid& BuildPartId,
+ const tsl::robin_map<std::string, double>& FloatStats) override
+ {
+ ZEN_UNUSED(BuildId, BuildPartId, FloatStats);
+ CbObjectWriter Request;
+ Request.BeginObject("floatStats"sv);
+ for (auto It : FloatStats)
+ {
+ Request.AddFloat(It.first, It.second);
+ }
+ Request.EndObject();
+ IoBuffer Payload = Request.Save().GetBuffer().AsIoBuffer();
+ Payload.SetContentType(ZenContentType::kCbObject);
+ JupiterResult PutBuildPartStatsResult = m_Session.PutBuildPartStats(m_Namespace, m_Bucket, BuildId, BuildPartId, Payload);
+ AddStatistic(PutBuildPartStatsResult);
+ if (!PutBuildPartStatsResult.Success)
+ {
+ throw std::runtime_error(fmt::format("Failed posting build part statistics: {} ({})",
+ PutBuildPartStatsResult.Reason,
+ PutBuildPartStatsResult.ErrorCode));
+ }
+ }
+
+private:
+ static CbObject PayloadToCbObject(std::string_view Context, const IoBuffer& Payload)
+ {
+ if (Payload.GetContentType() == ZenContentType::kJSON)
+ {
+ std::string_view Json(reinterpret_cast<const char*>(Payload.GetData()), Payload.GetSize());
+ return LoadCompactBinaryFromJson(Json).AsObject();
+ }
+ else if (Payload.GetContentType() == ZenContentType::kCbObject)
+ {
+ return LoadCompactBinaryObject(Payload);
+ }
+ else if (Payload.GetContentType() == ZenContentType::kCompressedBinary)
+ {
+ IoHash RawHash;
+ uint64_t RawSize;
+ return LoadCompactBinaryObject(CompressedBuffer::FromCompressed(SharedBuffer(Payload), RawHash, RawSize));
+ }
+ else
+ {
+ throw std::runtime_error(
+ fmt::format("{}: {} ({})", "Unsupported response format", Context, ToString(Payload.GetContentType())));
+ }
+ }
+
+ void AddStatistic(const JupiterResult& Result)
+ {
+ m_Stats.TotalBytesWritten += Result.SentBytes;
+ m_Stats.TotalBytesRead += Result.ReceivedBytes;
+ m_Stats.TotalRequestTimeUs += uint64_t(Result.ElapsedSeconds * 1000000.0);
+ m_Stats.TotalRequestCount++;
+ }
+
+ JupiterSession m_Session;
+ Statistics& m_Stats;
+ const std::string m_Namespace;
+ const std::string m_Bucket;
+ const std::filesystem::path m_TempFolderPath;
+};
+
+std::unique_ptr<BuildStorage>
+CreateJupiterBuildStorage(LoggerRef InLog,
+ HttpClient& InHttpClient,
+ BuildStorage::Statistics& Stats,
+ std::string_view Namespace,
+ std::string_view Bucket,
+ bool AllowRedirect,
+ const std::filesystem::path& TempFolderPath)
+{
+ ZEN_TRACE_CPU("CreateJupiterBuildStorage");
+
+ return std::make_unique<JupiterBuildStorage>(InLog, InHttpClient, Stats, Namespace, Bucket, AllowRedirect, TempFolderPath);
+}
+
+} // namespace zen
diff --git a/src/zenutil/jupiter/jupiterclient.cpp b/src/zenutil/jupiter/jupiterclient.cpp
index 5e5da3750..dbac218a4 100644
--- a/src/zenutil/jupiter/jupiterclient.cpp
+++ b/src/zenutil/jupiter/jupiterclient.cpp
@@ -11,7 +11,6 @@ JupiterClient::JupiterClient(const JupiterClientOptions& Options, std::function<
, m_DefaultDdcNamespace(Options.DdcNamespace)
, m_DefaultBlobStoreNamespace(Options.BlobStoreNamespace)
, m_ComputeCluster(Options.ComputeCluster)
-, m_TokenProvider(std::move(TokenProvider))
, m_HttpClient(Options.ServiceUrl,
HttpClientSettings{.ConnectTimeout = Options.ConnectTimeout,
.Timeout = Options.Timeout,
diff --git a/src/zenutil/jupiter/jupitersession.cpp b/src/zenutil/jupiter/jupitersession.cpp
index f706a7efc..1fd59acdf 100644
--- a/src/zenutil/jupiter/jupitersession.cpp
+++ b/src/zenutil/jupiter/jupitersession.cpp
@@ -3,6 +3,9 @@
#include <zenutil/jupiter/jupitersession.h>
#include <zencore/compactbinary.h>
+#include <zencore/compactbinarybuilder.h>
+#include <zencore/compositebuffer.h>
+#include <zencore/compress.h>
#include <zencore/fmtutils.h>
#include <zencore/trace.h>
@@ -46,7 +49,10 @@ namespace detail {
}
} // namespace detail
-JupiterSession::JupiterSession(LoggerRef InLog, HttpClient& InHttpClient) : m_Log(InLog), m_HttpClient(InHttpClient)
+JupiterSession::JupiterSession(LoggerRef InLog, HttpClient& InHttpClient, bool AllowRedirect)
+: m_Log(InLog)
+, m_HttpClient(InHttpClient)
+, m_AllowRedirect(AllowRedirect)
{
}
@@ -355,6 +361,32 @@ JupiterSession::CacheTypeExists(std::string_view Namespace, std::string_view Typ
}
JupiterResult
+JupiterSession::ListBuildNamespaces()
+{
+ HttpClient::Response Response = m_HttpClient.Get(fmt::format("/api/v2/builds"), {HttpClient::Accept(ZenContentType::kJSON)});
+ return detail::ConvertResponse(Response, "JupiterSession::ListBuildNamespaces"sv);
+}
+
+JupiterResult
+JupiterSession::ListBuildBuckets(std::string_view Namespace)
+{
+ HttpClient::Response Response =
+ m_HttpClient.Get(fmt::format("/api/v2/builds/{}", Namespace), {HttpClient::Accept(ZenContentType::kJSON)});
+ return detail::ConvertResponse(Response, "JupiterSession::ListBuildBuckets"sv);
+}
+
+JupiterResult
+JupiterSession::ListBuilds(std::string_view Namespace, std::string_view BucketId, const IoBuffer& Payload)
+{
+ ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject);
+ std::string OptionalBucketPath = BucketId.empty() ? "" : fmt::format("/{}", BucketId);
+ HttpClient::Response Response = m_HttpClient.Post(fmt::format("/api/v2/builds/{}{}/search", Namespace, OptionalBucketPath),
+ Payload,
+ {HttpClient::Accept(ZenContentType::kCbObject)});
+ return detail::ConvertResponse(Response, "JupiterSession::ListBuilds"sv);
+}
+
+JupiterResult
JupiterSession::PutBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const IoBuffer& Payload)
{
ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject);
@@ -424,29 +456,330 @@ JupiterResult
JupiterSession::PutBuildBlob(std::string_view Namespace,
std::string_view BucketId,
const Oid& BuildId,
- const Oid& PartId,
const IoHash& Hash,
ZenContentType ContentType,
const CompositeBuffer& Payload)
{
- HttpClient::Response Response = m_HttpClient.Upload(
- fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blobs/{}", Namespace, BucketId, BuildId, PartId, Hash.ToHexString()),
- Payload,
- ContentType);
+ HttpClient::Response Response =
+ m_HttpClient.Upload(fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}", Namespace, BucketId, BuildId, Hash.ToHexString()),
+ Payload,
+ ContentType);
return detail::ConvertResponse(Response, "JupiterSession::PutBuildBlob"sv);
}
JupiterResult
+JupiterSession::PutMultipartBuildBlob(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const IoHash& Hash,
+ ZenContentType ContentType,
+ uint64_t PayloadSize,
+ std::function<IoBuffer(uint64_t Offset, uint64_t Size)>&& Transmitter,
+ std::vector<std::function<JupiterResult(bool& OutIsComplete)>>& OutWorkItems)
+{
+ struct MultipartUploadResponse
+ {
+ struct Part
+ {
+ uint64_t FirstByte;
+ uint64_t LastByte;
+ std::string PartId;
+ std::string QueryString;
+ };
+
+ std::string UploadId;
+ std::string BlobName;
+ std::vector<Part> Parts;
+
+ static MultipartUploadResponse Parse(CbObject& Payload)
+ {
+ MultipartUploadResponse Result;
+ Result.UploadId = Payload["uploadId"sv].AsString();
+ Result.BlobName = Payload["blobName"sv].AsString();
+ CbArrayView PartsArray = Payload["parts"sv].AsArrayView();
+ Result.Parts.reserve(PartsArray.Num());
+ for (CbFieldView PartView : PartsArray)
+ {
+ CbObjectView PartObject = PartView.AsObjectView();
+ Result.Parts.emplace_back(Part{
+ .FirstByte = PartObject["firstByte"sv].AsUInt64(),
+ .LastByte = PartObject["lastByte"sv].AsUInt64(),
+ .PartId = std::string(PartObject["partId"sv].AsString()),
+ .QueryString = std::string(PartObject["queryString"sv].AsString()),
+ });
+ }
+ return Result;
+ }
+ };
+
+ CbObjectWriter StartMultipartPayloadWriter;
+ StartMultipartPayloadWriter.AddInteger("blobLength"sv, PayloadSize);
+ CbObject StartMultipartPayload = StartMultipartPayloadWriter.Save();
+
+ std::string StartMultipartResponseRequestString =
+ fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}/startMultipartUpload", Namespace, BucketId, BuildId, Hash.ToHexString());
+ // ZEN_INFO("POST: {}", StartMultipartResponseRequestString);
+ HttpClient::Response StartMultipartResponse =
+ m_HttpClient.Post(StartMultipartResponseRequestString, StartMultipartPayload, HttpClient::Accept(ZenContentType::kCbObject));
+ if (!StartMultipartResponse.IsSuccess())
+ {
+ ZEN_WARN("{}", StartMultipartResponse.ErrorMessage("startMultipartUpload: "));
+ return detail::ConvertResponse(StartMultipartResponse, "JupiterSession::PutMultipartBuildBlob"sv);
+ }
+ CbObject ResponseObject = LoadCompactBinaryObject(StartMultipartResponse.ResponsePayload);
+
+ struct WorkloadData
+ {
+ MultipartUploadResponse PartDescription;
+ std::function<IoBuffer(uint64_t Offset, uint64_t Size)> Transmitter;
+ std::atomic<size_t> PartsLeft;
+ };
+
+ std::shared_ptr<WorkloadData> Workload(std::make_shared<WorkloadData>());
+
+ Workload->PartDescription = MultipartUploadResponse::Parse(ResponseObject);
+ Workload->Transmitter = std::move(Transmitter);
+ Workload->PartsLeft = Workload->PartDescription.Parts.size();
+
+ for (size_t PartIndex = 0; PartIndex < Workload->PartDescription.Parts.size(); PartIndex++)
+ {
+ OutWorkItems.emplace_back([this, Namespace, BucketId, BuildId, Hash, ContentType, Workload, PartIndex](
+ bool& OutIsComplete) -> JupiterResult {
+ const MultipartUploadResponse::Part& Part = Workload->PartDescription.Parts[PartIndex];
+ IoBuffer PartPayload = Workload->Transmitter(Part.FirstByte, Part.LastByte - Part.FirstByte);
+ std::string MultipartUploadResponseRequestString =
+ fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}/uploadMultipart{}&supportsRedirect={}",
+ Namespace,
+ BucketId,
+ BuildId,
+ Hash.ToHexString(),
+ Part.QueryString,
+ m_AllowRedirect ? "true"sv : "false"sv);
+ // ZEN_INFO("PUT: {}", MultipartUploadResponseRequestString);
+ HttpClient::Response MultipartUploadResponse = m_HttpClient.Put(MultipartUploadResponseRequestString, PartPayload);
+ if (!MultipartUploadResponse.IsSuccess())
+ {
+ ZEN_WARN("{}", MultipartUploadResponse.ErrorMessage(MultipartUploadResponseRequestString));
+ }
+ OutIsComplete = Workload->PartsLeft.fetch_sub(1) == 1;
+ if (OutIsComplete)
+ {
+ int64_t TotalUploadedBytes = MultipartUploadResponse.UploadedBytes;
+ int64_t TotalDownloadedBytes = MultipartUploadResponse.DownloadedBytes;
+ double TotalElapsedSeconds = MultipartUploadResponse.ElapsedSeconds;
+ HttpClient::Response MultipartEndResponse = MultipartUploadResponse;
+ while (MultipartEndResponse.IsSuccess())
+ {
+ CbObjectWriter CompletePayloadWriter;
+ CompletePayloadWriter.AddString("blobName"sv, Workload->PartDescription.BlobName);
+ CompletePayloadWriter.AddString("uploadId"sv, Workload->PartDescription.UploadId);
+ CompletePayloadWriter.AddBool("isCompressed"sv, ContentType == ZenContentType::kCompressedBinary);
+ CompletePayloadWriter.BeginArray("partIds"sv);
+ std::unordered_map<std::string, size_t> PartNameToIndex;
+ for (size_t UploadPartIndex = 0; UploadPartIndex < Workload->PartDescription.Parts.size(); UploadPartIndex++)
+ {
+ const MultipartUploadResponse::Part& PartDescription = Workload->PartDescription.Parts[UploadPartIndex];
+ PartNameToIndex.insert({PartDescription.PartId, UploadPartIndex});
+ CompletePayloadWriter.AddString(PartDescription.PartId);
+ }
+ CompletePayloadWriter.EndArray(); // "partIds"
+ CbObject CompletePayload = CompletePayloadWriter.Save();
+
+ std::string MultipartEndResponseRequestString =
+ fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}/completeMultipart", Namespace, BucketId, BuildId, Hash.ToHexString());
+
+ MultipartEndResponse = m_HttpClient.Post(MultipartEndResponseRequestString,
+ CompletePayload,
+ HttpClient::Accept(ZenContentType::kCbObject));
+ TotalUploadedBytes += MultipartEndResponse.UploadedBytes;
+ TotalDownloadedBytes += MultipartEndResponse.DownloadedBytes;
+ TotalElapsedSeconds += MultipartEndResponse.ElapsedSeconds;
+ if (MultipartEndResponse.IsSuccess())
+ {
+ CbObject ResponseObject = MultipartEndResponse.AsObject();
+ CbArrayView MissingPartsArrayView = ResponseObject["missingParts"sv].AsArrayView();
+ if (MissingPartsArrayView.Num() == 0)
+ {
+ break;
+ }
+ else
+ {
+ for (CbFieldView PartIdView : MissingPartsArrayView)
+ {
+ std::string RetryPartId(PartIdView.AsString());
+ size_t RetryPartIndex = PartNameToIndex.at(RetryPartId);
+ const MultipartUploadResponse::Part& RetryPart = Workload->PartDescription.Parts[RetryPartIndex];
+ IoBuffer RetryPartPayload =
+ Workload->Transmitter(RetryPart.FirstByte, RetryPart.LastByte - RetryPart.FirstByte - 1);
+ std::string RetryMultipartUploadResponseRequestString =
+ fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}/uploadMultipart{}&supportsRedirect={}",
+ Namespace,
+ BucketId,
+ BuildId,
+ Hash.ToHexString(),
+ RetryPart.QueryString,
+ m_AllowRedirect ? "true"sv : "false"sv);
+
+ MultipartUploadResponse = m_HttpClient.Put(RetryMultipartUploadResponseRequestString, RetryPartPayload);
+ TotalUploadedBytes = MultipartUploadResponse.UploadedBytes;
+ TotalDownloadedBytes = MultipartUploadResponse.DownloadedBytes;
+ TotalElapsedSeconds = MultipartUploadResponse.ElapsedSeconds;
+ if (!MultipartUploadResponse.IsSuccess())
+ {
+ ZEN_WARN("{}", MultipartUploadResponse.ErrorMessage(RetryMultipartUploadResponseRequestString));
+ MultipartEndResponse = MultipartUploadResponse;
+ }
+ }
+ }
+ }
+ else
+ {
+ ZEN_WARN("{}", MultipartEndResponse.ErrorMessage(MultipartEndResponseRequestString));
+ }
+ }
+ MultipartEndResponse.UploadedBytes = TotalUploadedBytes;
+ MultipartEndResponse.DownloadedBytes = TotalDownloadedBytes;
+ MultipartEndResponse.ElapsedSeconds = TotalElapsedSeconds;
+ return detail::ConvertResponse(MultipartEndResponse, "JupiterSession::PutMultipartBuildBlob"sv);
+ }
+ return detail::ConvertResponse(MultipartUploadResponse, "JupiterSession::PutMultipartBuildBlob"sv);
+ });
+ }
+ return detail::ConvertResponse(StartMultipartResponse, "JupiterSession::PutMultipartBuildBlob"sv);
+}
+
+JupiterResult
+JupiterSession::GetMultipartBuildBlob(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const IoHash& Hash,
+ uint64_t ChunkSize,
+ std::function<void(uint64_t Offset, const IoBuffer& Chunk)>&& OnReceive,
+ std::function<void()>&& OnComplete,
+ std::vector<std::function<JupiterResult()>>& OutWorkItems)
+{
+ std::string RequestUrl = fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}?supportsRedirect={}",
+ Namespace,
+ BucketId,
+ BuildId,
+ Hash.ToHexString(),
+ m_AllowRedirect ? "true"sv : "false"sv);
+ HttpClient::Response Response =
+ m_HttpClient.Get(RequestUrl, HttpClient::KeyValueMap({{"Range", fmt::format("bytes={}-{}", 0, ChunkSize - 1)}}));
+ if (Response.IsSuccess())
+ {
+ if (std::string_view ContentRange = Response.Header.Entries["Content-Range"]; !ContentRange.empty())
+ {
+ if (std::string_view::size_type SizeDelimiterPos = ContentRange.find('/'); SizeDelimiterPos != std::string_view::npos)
+ {
+ if (std::optional<uint64_t> TotalSizeMaybe = ParseInt<uint64_t>(ContentRange.substr(SizeDelimiterPos + 1));
+ TotalSizeMaybe.has_value())
+ {
+ uint64_t TotalSize = TotalSizeMaybe.value();
+ uint64_t PayloadSize = Response.ResponsePayload.GetSize();
+
+ OnReceive(0, Response.ResponsePayload);
+
+ if (TotalSize > PayloadSize)
+ {
+ struct WorkloadData
+ {
+ std::function<void(uint64_t Offset, const IoBuffer& Chunk)> OnReceive;
+ std::function<void()> OnComplete;
+ std::atomic<uint64_t> BytesRemaining;
+ };
+
+ std::shared_ptr<WorkloadData> Workload(std::make_shared<WorkloadData>());
+ Workload->OnReceive = std::move(OnReceive);
+ Workload->OnComplete = std::move(OnComplete);
+ Workload->BytesRemaining = TotalSize - PayloadSize;
+
+ uint64_t Offset = PayloadSize;
+ while (Offset < TotalSize)
+ {
+ uint64_t PartSize = Min(ChunkSize, TotalSize - Offset);
+ OutWorkItems.emplace_back([this,
+ Namespace = std::string(Namespace),
+ BucketId = std::string(BucketId),
+ BuildId = Oid(BuildId),
+ Hash = IoHash(Hash),
+ TotalSize,
+ Workload,
+ Offset,
+ PartSize]() -> JupiterResult {
+ std::string RequestUrl = fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}?supportsRedirect={}",
+ Namespace,
+ BucketId,
+ BuildId,
+ Hash.ToHexString(),
+ m_AllowRedirect ? "true"sv : "false"sv);
+ HttpClient::Response Response = m_HttpClient.Get(
+ RequestUrl,
+ HttpClient::KeyValueMap({{"Range", fmt::format("bytes={}-{}", Offset, Offset + PartSize - 1)}}));
+ if (Response.IsSuccess())
+ {
+ Workload->OnReceive(Offset, Response.ResponsePayload);
+ uint64_t ByteRemaning = Workload->BytesRemaining.fetch_sub(Response.ResponsePayload.GetSize());
+ if (ByteRemaning == Response.ResponsePayload.GetSize())
+ {
+ Workload->OnComplete();
+ }
+ }
+ return detail::ConvertResponse(Response, "JupiterSession::GetMultipartBuildBlob"sv);
+ });
+ Offset += PartSize;
+ }
+ }
+ else
+ {
+ OnComplete();
+ }
+ return detail::ConvertResponse(Response, "JupiterSession::GetMultipartBuildBlob"sv);
+ }
+ }
+ }
+ OnReceive(0, Response.ResponsePayload);
+ OnComplete();
+ }
+ return detail::ConvertResponse(Response, "JupiterSession::GetMultipartBuildBlob"sv);
+}
+
+JupiterResult
JupiterSession::GetBuildBlob(std::string_view Namespace,
std::string_view BucketId,
const Oid& BuildId,
- const Oid& PartId,
const IoHash& Hash,
- std::filesystem::path TempFolderPath)
+ std::filesystem::path TempFolderPath,
+ uint64_t Offset,
+ uint64_t Size)
{
- HttpClient::Response Response = m_HttpClient.Download(
- fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blobs/{}", Namespace, BucketId, BuildId, PartId, Hash.ToHexString()),
- TempFolderPath);
+ HttpClient::KeyValueMap Headers;
+ if (Offset != 0 || Size != (uint64_t)-1)
+ {
+ Headers.Entries.insert({"Range", fmt::format("bytes={}-{}", Offset, Offset + Size - 1)});
+ }
+ HttpClient::Response Response = m_HttpClient.Download(fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}?supportsRedirect={}",
+ Namespace,
+ BucketId,
+ BuildId,
+ Hash.ToHexString(),
+ m_AllowRedirect ? "true"sv : "false"sv),
+ TempFolderPath,
+ Headers);
+ if (Response.IsSuccess())
+ {
+ // If we get a redirect to S3 or a non-Jupiter endpoint the content type will not be correct, validate it and set it
+ if (m_AllowRedirect && (Response.ResponsePayload.GetContentType() == HttpContentType::kBinary))
+ {
+ IoHash ValidateRawHash;
+ uint64_t ValidateRawSize = 0;
+ ZEN_ASSERT_SLOW(CompressedBuffer::ValidateCompressedHeader(Response.ResponsePayload, ValidateRawHash, ValidateRawSize));
+ ZEN_ASSERT_SLOW(ValidateRawHash == Hash);
+ ZEN_ASSERT_SLOW(ValidateRawSize > 0);
+ ZEN_UNUSED(ValidateRawHash, ValidateRawSize);
+ Response.ResponsePayload.SetContentType(ZenContentType::kCompressedBinary);
+ }
+ }
return detail::ConvertResponse(Response, "JupiterSession::GetBuildBlob"sv);
}
@@ -454,14 +787,13 @@ JupiterResult
JupiterSession::PutBlockMetadata(std::string_view Namespace,
std::string_view BucketId,
const Oid& BuildId,
- const Oid& PartId,
const IoHash& Hash,
const IoBuffer& Payload)
{
ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject);
- HttpClient::Response Response = m_HttpClient.Put(
- fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blocks/{}/metadata", Namespace, BucketId, BuildId, PartId, Hash.ToHexString()),
- Payload);
+ HttpClient::Response Response =
+ m_HttpClient.Put(fmt::format("/api/v2/builds/{}/{}/{}/blocks/{}/metadata", Namespace, BucketId, BuildId, Hash.ToHexString()),
+ Payload);
return detail::ConvertResponse(Response, "JupiterSession::PutBlockMetadata"sv);
}
@@ -494,12 +826,39 @@ JupiterSession::FinalizeBuildPart(std::string_view Namespace,
}
JupiterResult
-JupiterSession::FindBlocks(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const Oid& PartId)
+JupiterSession::FindBlocks(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, uint64_t MaxBlockCount)
{
+ const std::string Parameters = MaxBlockCount == (uint64_t)-1 ? "" : fmt::format("?count={}", MaxBlockCount);
HttpClient::Response Response =
- m_HttpClient.Get(fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blocks/listBlocks", Namespace, BucketId, BuildId, PartId),
+ m_HttpClient.Get(fmt::format("/api/v2/builds/{}/{}/{}/blocks/listBlocks{}", Namespace, BucketId, BuildId, Parameters),
HttpClient::Accept(ZenContentType::kCbObject));
return detail::ConvertResponse(Response, "JupiterSession::FindBlocks"sv);
}
+JupiterResult
+JupiterSession::GetBlockMetadata(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, IoBuffer Payload)
+{
+ ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject);
+ HttpClient::Response Response =
+ m_HttpClient.Post(fmt::format("/api/v2/builds/{}/{}/{}/blocks/getBlockMetadata", Namespace, BucketId, BuildId),
+ Payload,
+ HttpClient::Accept(ZenContentType::kCbObject));
+ return detail::ConvertResponse(Response, "JupiterSession::GetBlockMetadata"sv);
+}
+
+JupiterResult
+JupiterSession::PutBuildPartStats(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const Oid& BuildPartId,
+ IoBuffer Payload)
+{
+ ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject);
+ HttpClient::Response Response =
+ m_HttpClient.Put(fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/stats", Namespace, BucketId, BuildId, BuildPartId),
+ Payload,
+ HttpClient::Accept(ZenContentType::kCbObject));
+ return detail::ConvertResponse(Response, "JupiterSession::PutBuildPartStats"sv);
+}
+
} // namespace zen