aboutsummaryrefslogtreecommitdiff
path: root/src/zenremotestore/projectstore/jupiterremoteprojectstore.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-10-03 09:38:05 +0200
committerGitHub Enterprise <[email protected]>2025-10-03 09:38:05 +0200
commitf5727a1e4d6bfb833e37e1210691d351456dbe3a (patch)
treecbf7688c4e31ba7db429a98c7c9f813fa0a826be /src/zenremotestore/projectstore/jupiterremoteprojectstore.cpp
parentmove projectstore to zenstore (#541) (diff)
downloadzen-f5727a1e4d6bfb833e37e1210691d351456dbe3a.tar.xz
zen-f5727a1e4d6bfb833e37e1210691d351456dbe3a.zip
move remoteproject to remotestorelib (#542)
* move remoteproject code to remotestorelib
Diffstat (limited to 'src/zenremotestore/projectstore/jupiterremoteprojectstore.cpp')
-rw-r--r--src/zenremotestore/projectstore/jupiterremoteprojectstore.cpp402
1 files changed, 402 insertions, 0 deletions
diff --git a/src/zenremotestore/projectstore/jupiterremoteprojectstore.cpp b/src/zenremotestore/projectstore/jupiterremoteprojectstore.cpp
new file mode 100644
index 000000000..bfd4e433c
--- /dev/null
+++ b/src/zenremotestore/projectstore/jupiterremoteprojectstore.cpp
@@ -0,0 +1,402 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <zenremotestore/projectstore/jupiterremoteprojectstore.h>
+
+#include <zencore/compactbinaryutil.h>
+#include <zencore/compress.h>
+#include <zencore/fmtutils.h>
+
+#include <zenhttp/httpclientauth.h>
+
+#include <zenutil/jupiter/jupiterclient.h>
+#include <zenutil/jupiter/jupitersession.h>
+
+namespace zen {
+
+using namespace std::literals;
+
+class JupiterRemoteStore : public RemoteProjectStore
+{
+public:
+ JupiterRemoteStore(Ref<JupiterClient>&& InJupiterClient,
+ std::string_view Namespace,
+ std::string_view Bucket,
+ const IoHash& Key,
+ const IoHash& OptionalBaseKey,
+ bool ForceDisableBlocks,
+ bool ForceDisableTempBlocks,
+ const std::filesystem::path& TempFilePath)
+ : m_JupiterClient(std::move(InJupiterClient))
+ , m_Namespace(Namespace)
+ , m_Bucket(Bucket)
+ , m_Key(Key)
+ , m_OptionalBaseKey(OptionalBaseKey)
+ , m_TempFilePath(TempFilePath)
+ , m_EnableBlocks(!ForceDisableBlocks)
+ , m_UseTempBlocks(!ForceDisableTempBlocks)
+ {
+ }
+
+ virtual RemoteStoreInfo GetInfo() const override
+ {
+ return {.CreateBlocks = m_EnableBlocks,
+ .UseTempBlockFiles = m_UseTempBlocks,
+ .AllowChunking = true,
+ .ContainerName = fmt::format("{}/{}/{}", m_Namespace, m_Bucket, m_Key),
+ .Description = fmt::format("[cloud] {}. SessionId: {}. {}/{}/{}{}"sv,
+ m_JupiterClient->ServiceUrl(),
+ m_JupiterClient->Client().GetSessionId(),
+ m_Namespace,
+ m_Bucket,
+ m_Key,
+ m_OptionalBaseKey == IoHash::Zero ? "" : fmt::format(" Base {}", m_OptionalBaseKey))};
+ }
+
+ virtual Stats GetStats() const override
+ {
+ return {.m_SentBytes = m_SentBytes.load(),
+ .m_ReceivedBytes = m_ReceivedBytes.load(),
+ .m_RequestTimeNS = m_RequestTimeNS.load(),
+ .m_RequestCount = m_RequestCount.load(),
+ .m_PeakSentBytes = m_PeakSentBytes.load(),
+ .m_PeakReceivedBytes = m_PeakReceivedBytes.load(),
+ .m_PeakBytesPerSec = m_PeakBytesPerSec.load()};
+ }
+
+ virtual CreateContainerResult CreateContainer() override
+ {
+ // Nothing to do here
+ return {};
+ }
+
+ virtual SaveResult SaveContainer(const IoBuffer& Payload) override
+ {
+ JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect);
+ PutRefResult PutResult = Session.PutRef(m_Namespace, m_Bucket, m_Key, Payload, ZenContentType::kCbObject);
+ AddStats(PutResult);
+
+ SaveResult Result{ConvertResult(PutResult), {PutResult.Needs.begin(), PutResult.Needs.end()}, PutResult.RawHash};
+ if (Result.ErrorCode)
+ {
+ Result.Reason = fmt::format("Failed saving oplog container to {}/{}/{}/{}. Reason: '{}'",
+ m_JupiterClient->ServiceUrl(),
+ m_Namespace,
+ m_Bucket,
+ m_Key,
+ Result.Reason);
+ }
+ return Result;
+ }
+
+ virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash, ChunkBlockDescription&&) override
+ {
+ JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect);
+ JupiterResult PutResult = Session.PutCompressedBlob(m_Namespace, RawHash, Payload);
+ AddStats(PutResult);
+
+ SaveAttachmentResult Result{ConvertResult(PutResult)};
+ if (Result.ErrorCode)
+ {
+ Result.Reason = fmt::format("Failed saving oplog attachment to {}/{}/{}. Reason: '{}'",
+ m_JupiterClient->ServiceUrl(),
+ m_Namespace,
+ RawHash,
+ Result.Reason);
+ }
+ return Result;
+ }
+
+ virtual SaveAttachmentsResult SaveAttachments(const std::vector<SharedBuffer>& Chunks) override
+ {
+ SaveAttachmentsResult Result;
+ for (const SharedBuffer& Chunk : Chunks)
+ {
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(Chunk.AsIoBuffer());
+ SaveAttachmentResult ChunkResult = SaveAttachment(Compressed.GetCompressed(), Compressed.DecodeRawHash(), {});
+ if (ChunkResult.ErrorCode)
+ {
+ return SaveAttachmentsResult{ChunkResult};
+ }
+ }
+ return Result;
+ }
+
+ virtual FinalizeResult FinalizeContainer(const IoHash& RawHash) override
+ {
+ JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect);
+ FinalizeRefResult FinalizeRefResult = Session.FinalizeRef(m_Namespace, m_Bucket, m_Key, RawHash);
+ AddStats(FinalizeRefResult);
+
+ FinalizeResult Result{ConvertResult(FinalizeRefResult), {FinalizeRefResult.Needs.begin(), FinalizeRefResult.Needs.end()}};
+ if (Result.ErrorCode)
+ {
+ Result.Reason = fmt::format("Failed finalizing oplog container to {}/{}/{}/{}. Reason: '{}'",
+ m_JupiterClient->ServiceUrl(),
+ m_Namespace,
+ m_Bucket,
+ m_Key,
+ Result.Reason);
+ }
+ return Result;
+ }
+
+ virtual LoadContainerResult LoadContainer() override { return LoadContainer(m_Key); }
+
+ virtual GetKnownBlocksResult GetKnownBlocks() override
+ {
+ if (m_OptionalBaseKey == IoHash::Zero)
+ {
+ return GetKnownBlocksResult{{.ErrorCode = static_cast<int>(HttpResponseCode::NoContent)}};
+ }
+ LoadContainerResult LoadResult = LoadContainer(m_OptionalBaseKey);
+ if (LoadResult.ErrorCode)
+ {
+ return GetKnownBlocksResult{LoadResult};
+ }
+ std::vector<IoHash> BlockHashes = GetBlockHashesFromOplog(LoadResult.ContainerObject);
+ if (BlockHashes.empty())
+ {
+ return GetKnownBlocksResult{
+ {.ErrorCode = static_cast<int>(HttpResponseCode::NoContent), .ElapsedSeconds = LoadResult.ElapsedSeconds}};
+ }
+
+ JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect);
+ JupiterExistsResult ExistsResult =
+ Session.CompressedBlobExists(m_Namespace, std::set<IoHash>(BlockHashes.begin(), BlockHashes.end()));
+ AddStats(ExistsResult);
+
+ if (ExistsResult.ErrorCode)
+ {
+ return GetKnownBlocksResult{{.ErrorCode = ExistsResult.ErrorCode,
+ .ElapsedSeconds = LoadResult.ElapsedSeconds + ExistsResult.ElapsedSeconds,
+ .Reason = fmt::format("Failed checking attachment existance in {}/{}. Reason: '{}'",
+ m_JupiterClient->ServiceUrl(),
+ m_Namespace,
+ ExistsResult.Reason)}};
+ }
+
+ Stopwatch Timer;
+ std::vector<IoHash> ExistingBlockHashes;
+ for (const IoHash& RawHash : BlockHashes)
+ {
+ if (!ExistsResult.Needs.contains(RawHash))
+ {
+ ExistingBlockHashes.push_back(RawHash);
+ }
+ }
+ if (ExistingBlockHashes.empty())
+ {
+ return GetKnownBlocksResult{{.ErrorCode = static_cast<int>(HttpResponseCode::NoContent),
+ .ElapsedSeconds = LoadResult.ElapsedSeconds + ExistsResult.ElapsedSeconds}};
+ }
+ std::vector<ThinChunkBlockDescription> KnownBlocks = GetBlocksFromOplog(LoadResult.ContainerObject, ExistingBlockHashes);
+
+ GetKnownBlocksResult Result{
+ {.ElapsedSeconds = LoadResult.ElapsedSeconds + ExistsResult.ElapsedSeconds + Timer.GetElapsedTimeUs() * 1000.0}};
+ Result.Blocks = std::move(KnownBlocks);
+ return Result;
+ }
+
+ virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) override
+ {
+ JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect);
+ JupiterResult GetResult = Session.GetCompressedBlob(m_Namespace, RawHash, m_TempFilePath);
+ AddStats(GetResult);
+
+ LoadAttachmentResult Result{ConvertResult(GetResult), std::move(GetResult.Response)};
+ if (GetResult.ErrorCode)
+ {
+ Result.Reason = fmt::format("Failed fetching oplog attachment from {}/{}/{}. Reason: '{}'",
+ m_JupiterClient->ServiceUrl(),
+ m_Namespace,
+ RawHash,
+ Result.Reason);
+ }
+ return Result;
+ }
+
+ virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes) override
+ {
+ LoadAttachmentsResult Result;
+ for (const IoHash& Hash : RawHashes)
+ {
+ LoadAttachmentResult ChunkResult = LoadAttachment(Hash);
+ if (ChunkResult.ErrorCode)
+ {
+ return LoadAttachmentsResult{ChunkResult};
+ }
+ ZEN_DEBUG("Loaded attachment in {}", NiceTimeSpanMs(static_cast<uint64_t>(ChunkResult.ElapsedSeconds * 1000)));
+ Result.Chunks.emplace_back(
+ std::pair<IoHash, CompressedBuffer>{Hash, CompressedBuffer::FromCompressedNoValidate(std::move(ChunkResult.Bytes))});
+ }
+ return Result;
+ }
+
+private:
+ LoadContainerResult LoadContainer(const IoHash& Key)
+ {
+ JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect);
+ JupiterResult GetResult = Session.GetRef(m_Namespace, m_Bucket, Key, ZenContentType::kCbObject);
+ AddStats(GetResult);
+ if (GetResult.ErrorCode || !GetResult.Success)
+ {
+ LoadContainerResult Result{ConvertResult(GetResult)};
+ Result.Reason = fmt::format("Failed fetching oplog container from {}/{}/{}/{}. Reason: '{}'",
+ m_JupiterClient->ServiceUrl(),
+ m_Namespace,
+ m_Bucket,
+ Key,
+ Result.Reason);
+ return Result;
+ }
+
+ CbValidateError ValidateResult = CbValidateError::None;
+ if (CbObject ContainerObject = ValidateAndReadCompactBinaryObject(IoBuffer(GetResult.Response), ValidateResult);
+ ValidateResult != CbValidateError::None || !ContainerObject)
+ {
+ return LoadContainerResult{
+ RemoteProjectStore::Result{.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError),
+ .ElapsedSeconds = GetResult.ElapsedSeconds,
+ .Reason = fmt::format("The ref {}/{}/{}/{} is not formatted as a compact binary object"sv,
+ m_JupiterClient->ServiceUrl(),
+ m_Namespace,
+ m_Bucket,
+ Key)},
+ {}};
+ }
+ else
+ {
+ return LoadContainerResult{ConvertResult(GetResult), std::move(ContainerObject)};
+ }
+ }
+
+ void AddStats(const JupiterResult& Result)
+ {
+ m_SentBytes.fetch_add(gsl::narrow<uint64_t>(Result.SentBytes));
+ m_ReceivedBytes.fetch_add(gsl::narrow<uint64_t>(Result.ReceivedBytes));
+ m_RequestTimeNS.fetch_add(static_cast<uint64_t>(Result.ElapsedSeconds * 1000000000));
+ SetAtomicMax(m_PeakSentBytes, Result.SentBytes);
+ SetAtomicMax(m_PeakReceivedBytes, Result.ReceivedBytes);
+ if (Result.ElapsedSeconds > 0.0)
+ {
+ uint64_t BytesPerSec = static_cast<uint64_t>((Result.SentBytes + Result.ReceivedBytes) / Result.ElapsedSeconds);
+ SetAtomicMax(m_PeakBytesPerSec, BytesPerSec);
+ }
+
+ m_RequestCount.fetch_add(1);
+ }
+
+ static Result ConvertResult(const JupiterResult& Response)
+ {
+ std::string Text;
+ int32_t ErrorCode = 0;
+ if (Response.ErrorCode != 0 || !Response.Success)
+ {
+ if (Response.Response)
+ {
+ HttpContentType ContentType = Response.Response.GetContentType();
+ if (ContentType == ZenContentType::kText || ContentType == ZenContentType::kJSON)
+ {
+ ExtendableStringBuilder<256> SB;
+ SB.Append("\n");
+ SB.Append(std::string_view(reinterpret_cast<const std::string::value_type*>(Response.Response.GetData()),
+ Response.Response.GetSize()));
+ Text = SB.ToString();
+ }
+ else if (ContentType == ZenContentType::kCbObject)
+ {
+ ExtendableStringBuilder<256> SB;
+ SB.Append("\n");
+ CompactBinaryToJson(Response.Response.GetView(), SB);
+ Text = SB.ToString();
+ }
+ }
+ }
+ if (Response.ErrorCode != 0)
+ {
+ ErrorCode = Response.ErrorCode;
+ }
+ else if (!Response.Success)
+ {
+ ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
+ }
+ return {.ErrorCode = ErrorCode, .ElapsedSeconds = Response.ElapsedSeconds, .Reason = Response.Reason, .Text = Text};
+ }
+
+ Ref<JupiterClient> m_JupiterClient;
+ const std::string m_Namespace;
+ const std::string m_Bucket;
+ const IoHash m_Key;
+ const IoHash m_OptionalBaseKey;
+ std::filesystem::path m_TempFilePath;
+ const bool m_EnableBlocks = true;
+ const bool m_UseTempBlocks = true;
+ const bool m_AllowRedirect = false;
+
+ std::atomic_uint64_t m_SentBytes = {};
+ std::atomic_uint64_t m_ReceivedBytes = {};
+ std::atomic_uint64_t m_RequestTimeNS = {};
+ std::atomic_uint64_t m_RequestCount = {};
+ std::atomic_uint64_t m_PeakSentBytes = {};
+ std::atomic_uint64_t m_PeakReceivedBytes = {};
+ std::atomic_uint64_t m_PeakBytesPerSec = {};
+};
+
+std::shared_ptr<RemoteProjectStore>
+CreateJupiterRemoteStore(const JupiterRemoteStoreOptions& Options, const std::filesystem::path& TempFilePath, bool Quiet, bool Unattended)
+{
+ std::string Url = Options.Url;
+ if (Url.find("://"sv) == std::string::npos)
+ {
+ // Assume https URL
+ Url = fmt::format("https://{}"sv, Url);
+ }
+ JupiterClientOptions ClientOptions{.Name = "Remote store"sv,
+ .ServiceUrl = Url,
+ .ConnectTimeout = std::chrono::milliseconds(2000),
+ .Timeout = std::chrono::milliseconds(1800000),
+ .AssumeHttp2 = Options.AssumeHttp2,
+ .AllowResume = true,
+ .RetryCount = 4};
+ // 1) openid-provider if given (assumes oidctoken.exe -Zen true has been run with matching Options.OpenIdProvider
+ // 2) Access token as parameter in request
+ // 3) Environment variable (different win vs linux/mac)
+ // 4) Default openid-provider (assumes oidctoken.exe -Zen true has been run with matching Options.OpenIdProvider
+
+ std::function<HttpClientAccessToken()> TokenProvider;
+ if (!Options.OpenIdProvider.empty())
+ {
+ TokenProvider = httpclientauth::CreateFromOpenIdProvider(Options.AuthManager, Options.OpenIdProvider);
+ }
+ else if (!Options.AccessToken.empty())
+ {
+ TokenProvider = httpclientauth::CreateFromStaticToken(Options.AccessToken);
+ }
+ else if (!Options.OidcExePath.empty())
+ {
+ if (auto TokenProviderMaybe = httpclientauth::CreateFromOidcTokenExecutable(Options.OidcExePath, Url, Quiet, Unattended);
+ TokenProviderMaybe)
+ {
+ TokenProvider = TokenProviderMaybe.value();
+ }
+ }
+
+ if (!TokenProvider)
+ {
+ TokenProvider = httpclientauth::CreateFromDefaultOpenIdProvider(Options.AuthManager);
+ }
+
+ Ref<JupiterClient> Client(new JupiterClient(ClientOptions, std::move(TokenProvider)));
+
+ std::shared_ptr<RemoteProjectStore> RemoteStore = std::make_shared<JupiterRemoteStore>(std::move(Client),
+ Options.Namespace,
+ Options.Bucket,
+ Options.Key,
+ Options.OptionalBaseKey,
+ Options.ForceDisableBlocks,
+ Options.ForceDisableTempBlocks,
+ TempFilePath);
+ return RemoteStore;
+}
+
+} // namespace zen