diff options
| author | Dan Engelbrecht <[email protected]> | 2025-10-03 09:38:05 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-10-03 09:38:05 +0200 |
| commit | f5727a1e4d6bfb833e37e1210691d351456dbe3a (patch) | |
| tree | cbf7688c4e31ba7db429a98c7c9f813fa0a826be /src/zenremotestore/projectstore/jupiterremoteprojectstore.cpp | |
| parent | move projectstore to zenstore (#541) (diff) | |
| download | zen-f5727a1e4d6bfb833e37e1210691d351456dbe3a.tar.xz zen-f5727a1e4d6bfb833e37e1210691d351456dbe3a.zip | |
move remoteproject to remotestorelib (#542)
* move remoteproject code to remotestorelib
Diffstat (limited to 'src/zenremotestore/projectstore/jupiterremoteprojectstore.cpp')
| -rw-r--r-- | src/zenremotestore/projectstore/jupiterremoteprojectstore.cpp | 402 |
1 files changed, 402 insertions, 0 deletions
diff --git a/src/zenremotestore/projectstore/jupiterremoteprojectstore.cpp b/src/zenremotestore/projectstore/jupiterremoteprojectstore.cpp new file mode 100644 index 000000000..bfd4e433c --- /dev/null +++ b/src/zenremotestore/projectstore/jupiterremoteprojectstore.cpp @@ -0,0 +1,402 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <zenremotestore/projectstore/jupiterremoteprojectstore.h> + +#include <zencore/compactbinaryutil.h> +#include <zencore/compress.h> +#include <zencore/fmtutils.h> + +#include <zenhttp/httpclientauth.h> + +#include <zenutil/jupiter/jupiterclient.h> +#include <zenutil/jupiter/jupitersession.h> + +namespace zen { + +using namespace std::literals; + +class JupiterRemoteStore : public RemoteProjectStore +{ +public: + JupiterRemoteStore(Ref<JupiterClient>&& InJupiterClient, + std::string_view Namespace, + std::string_view Bucket, + const IoHash& Key, + const IoHash& OptionalBaseKey, + bool ForceDisableBlocks, + bool ForceDisableTempBlocks, + const std::filesystem::path& TempFilePath) + : m_JupiterClient(std::move(InJupiterClient)) + , m_Namespace(Namespace) + , m_Bucket(Bucket) + , m_Key(Key) + , m_OptionalBaseKey(OptionalBaseKey) + , m_TempFilePath(TempFilePath) + , m_EnableBlocks(!ForceDisableBlocks) + , m_UseTempBlocks(!ForceDisableTempBlocks) + { + } + + virtual RemoteStoreInfo GetInfo() const override + { + return {.CreateBlocks = m_EnableBlocks, + .UseTempBlockFiles = m_UseTempBlocks, + .AllowChunking = true, + .ContainerName = fmt::format("{}/{}/{}", m_Namespace, m_Bucket, m_Key), + .Description = fmt::format("[cloud] {}. SessionId: {}. {}/{}/{}{}"sv, + m_JupiterClient->ServiceUrl(), + m_JupiterClient->Client().GetSessionId(), + m_Namespace, + m_Bucket, + m_Key, + m_OptionalBaseKey == IoHash::Zero ? "" : fmt::format(" Base {}", m_OptionalBaseKey))}; + } + + virtual Stats GetStats() const override + { + return {.m_SentBytes = m_SentBytes.load(), + .m_ReceivedBytes = m_ReceivedBytes.load(), + .m_RequestTimeNS = m_RequestTimeNS.load(), + .m_RequestCount = m_RequestCount.load(), + .m_PeakSentBytes = m_PeakSentBytes.load(), + .m_PeakReceivedBytes = m_PeakReceivedBytes.load(), + .m_PeakBytesPerSec = m_PeakBytesPerSec.load()}; + } + + virtual CreateContainerResult CreateContainer() override + { + // Nothing to do here + return {}; + } + + virtual SaveResult SaveContainer(const IoBuffer& Payload) override + { + JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect); + PutRefResult PutResult = Session.PutRef(m_Namespace, m_Bucket, m_Key, Payload, ZenContentType::kCbObject); + AddStats(PutResult); + + SaveResult Result{ConvertResult(PutResult), {PutResult.Needs.begin(), PutResult.Needs.end()}, PutResult.RawHash}; + if (Result.ErrorCode) + { + Result.Reason = fmt::format("Failed saving oplog container to {}/{}/{}/{}. Reason: '{}'", + m_JupiterClient->ServiceUrl(), + m_Namespace, + m_Bucket, + m_Key, + Result.Reason); + } + return Result; + } + + virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash, ChunkBlockDescription&&) override + { + JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect); + JupiterResult PutResult = Session.PutCompressedBlob(m_Namespace, RawHash, Payload); + AddStats(PutResult); + + SaveAttachmentResult Result{ConvertResult(PutResult)}; + if (Result.ErrorCode) + { + Result.Reason = fmt::format("Failed saving oplog attachment to {}/{}/{}. Reason: '{}'", + m_JupiterClient->ServiceUrl(), + m_Namespace, + RawHash, + Result.Reason); + } + return Result; + } + + virtual SaveAttachmentsResult SaveAttachments(const std::vector<SharedBuffer>& Chunks) override + { + SaveAttachmentsResult Result; + for (const SharedBuffer& Chunk : Chunks) + { + CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(Chunk.AsIoBuffer()); + SaveAttachmentResult ChunkResult = SaveAttachment(Compressed.GetCompressed(), Compressed.DecodeRawHash(), {}); + if (ChunkResult.ErrorCode) + { + return SaveAttachmentsResult{ChunkResult}; + } + } + return Result; + } + + virtual FinalizeResult FinalizeContainer(const IoHash& RawHash) override + { + JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect); + FinalizeRefResult FinalizeRefResult = Session.FinalizeRef(m_Namespace, m_Bucket, m_Key, RawHash); + AddStats(FinalizeRefResult); + + FinalizeResult Result{ConvertResult(FinalizeRefResult), {FinalizeRefResult.Needs.begin(), FinalizeRefResult.Needs.end()}}; + if (Result.ErrorCode) + { + Result.Reason = fmt::format("Failed finalizing oplog container to {}/{}/{}/{}. Reason: '{}'", + m_JupiterClient->ServiceUrl(), + m_Namespace, + m_Bucket, + m_Key, + Result.Reason); + } + return Result; + } + + virtual LoadContainerResult LoadContainer() override { return LoadContainer(m_Key); } + + virtual GetKnownBlocksResult GetKnownBlocks() override + { + if (m_OptionalBaseKey == IoHash::Zero) + { + return GetKnownBlocksResult{{.ErrorCode = static_cast<int>(HttpResponseCode::NoContent)}}; + } + LoadContainerResult LoadResult = LoadContainer(m_OptionalBaseKey); + if (LoadResult.ErrorCode) + { + return GetKnownBlocksResult{LoadResult}; + } + std::vector<IoHash> BlockHashes = GetBlockHashesFromOplog(LoadResult.ContainerObject); + if (BlockHashes.empty()) + { + return GetKnownBlocksResult{ + {.ErrorCode = static_cast<int>(HttpResponseCode::NoContent), .ElapsedSeconds = LoadResult.ElapsedSeconds}}; + } + + JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect); + JupiterExistsResult ExistsResult = + Session.CompressedBlobExists(m_Namespace, std::set<IoHash>(BlockHashes.begin(), BlockHashes.end())); + AddStats(ExistsResult); + + if (ExistsResult.ErrorCode) + { + return GetKnownBlocksResult{{.ErrorCode = ExistsResult.ErrorCode, + .ElapsedSeconds = LoadResult.ElapsedSeconds + ExistsResult.ElapsedSeconds, + .Reason = fmt::format("Failed checking attachment existance in {}/{}. Reason: '{}'", + m_JupiterClient->ServiceUrl(), + m_Namespace, + ExistsResult.Reason)}}; + } + + Stopwatch Timer; + std::vector<IoHash> ExistingBlockHashes; + for (const IoHash& RawHash : BlockHashes) + { + if (!ExistsResult.Needs.contains(RawHash)) + { + ExistingBlockHashes.push_back(RawHash); + } + } + if (ExistingBlockHashes.empty()) + { + return GetKnownBlocksResult{{.ErrorCode = static_cast<int>(HttpResponseCode::NoContent), + .ElapsedSeconds = LoadResult.ElapsedSeconds + ExistsResult.ElapsedSeconds}}; + } + std::vector<ThinChunkBlockDescription> KnownBlocks = GetBlocksFromOplog(LoadResult.ContainerObject, ExistingBlockHashes); + + GetKnownBlocksResult Result{ + {.ElapsedSeconds = LoadResult.ElapsedSeconds + ExistsResult.ElapsedSeconds + Timer.GetElapsedTimeUs() * 1000.0}}; + Result.Blocks = std::move(KnownBlocks); + return Result; + } + + virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) override + { + JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect); + JupiterResult GetResult = Session.GetCompressedBlob(m_Namespace, RawHash, m_TempFilePath); + AddStats(GetResult); + + LoadAttachmentResult Result{ConvertResult(GetResult), std::move(GetResult.Response)}; + if (GetResult.ErrorCode) + { + Result.Reason = fmt::format("Failed fetching oplog attachment from {}/{}/{}. Reason: '{}'", + m_JupiterClient->ServiceUrl(), + m_Namespace, + RawHash, + Result.Reason); + } + return Result; + } + + virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes) override + { + LoadAttachmentsResult Result; + for (const IoHash& Hash : RawHashes) + { + LoadAttachmentResult ChunkResult = LoadAttachment(Hash); + if (ChunkResult.ErrorCode) + { + return LoadAttachmentsResult{ChunkResult}; + } + ZEN_DEBUG("Loaded attachment in {}", NiceTimeSpanMs(static_cast<uint64_t>(ChunkResult.ElapsedSeconds * 1000))); + Result.Chunks.emplace_back( + std::pair<IoHash, CompressedBuffer>{Hash, CompressedBuffer::FromCompressedNoValidate(std::move(ChunkResult.Bytes))}); + } + return Result; + } + +private: + LoadContainerResult LoadContainer(const IoHash& Key) + { + JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect); + JupiterResult GetResult = Session.GetRef(m_Namespace, m_Bucket, Key, ZenContentType::kCbObject); + AddStats(GetResult); + if (GetResult.ErrorCode || !GetResult.Success) + { + LoadContainerResult Result{ConvertResult(GetResult)}; + Result.Reason = fmt::format("Failed fetching oplog container from {}/{}/{}/{}. Reason: '{}'", + m_JupiterClient->ServiceUrl(), + m_Namespace, + m_Bucket, + Key, + Result.Reason); + return Result; + } + + CbValidateError ValidateResult = CbValidateError::None; + if (CbObject ContainerObject = ValidateAndReadCompactBinaryObject(IoBuffer(GetResult.Response), ValidateResult); + ValidateResult != CbValidateError::None || !ContainerObject) + { + return LoadContainerResult{ + RemoteProjectStore::Result{.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError), + .ElapsedSeconds = GetResult.ElapsedSeconds, + .Reason = fmt::format("The ref {}/{}/{}/{} is not formatted as a compact binary object"sv, + m_JupiterClient->ServiceUrl(), + m_Namespace, + m_Bucket, + Key)}, + {}}; + } + else + { + return LoadContainerResult{ConvertResult(GetResult), std::move(ContainerObject)}; + } + } + + void AddStats(const JupiterResult& Result) + { + m_SentBytes.fetch_add(gsl::narrow<uint64_t>(Result.SentBytes)); + m_ReceivedBytes.fetch_add(gsl::narrow<uint64_t>(Result.ReceivedBytes)); + m_RequestTimeNS.fetch_add(static_cast<uint64_t>(Result.ElapsedSeconds * 1000000000)); + SetAtomicMax(m_PeakSentBytes, Result.SentBytes); + SetAtomicMax(m_PeakReceivedBytes, Result.ReceivedBytes); + if (Result.ElapsedSeconds > 0.0) + { + uint64_t BytesPerSec = static_cast<uint64_t>((Result.SentBytes + Result.ReceivedBytes) / Result.ElapsedSeconds); + SetAtomicMax(m_PeakBytesPerSec, BytesPerSec); + } + + m_RequestCount.fetch_add(1); + } + + static Result ConvertResult(const JupiterResult& Response) + { + std::string Text; + int32_t ErrorCode = 0; + if (Response.ErrorCode != 0 || !Response.Success) + { + if (Response.Response) + { + HttpContentType ContentType = Response.Response.GetContentType(); + if (ContentType == ZenContentType::kText || ContentType == ZenContentType::kJSON) + { + ExtendableStringBuilder<256> SB; + SB.Append("\n"); + SB.Append(std::string_view(reinterpret_cast<const std::string::value_type*>(Response.Response.GetData()), + Response.Response.GetSize())); + Text = SB.ToString(); + } + else if (ContentType == ZenContentType::kCbObject) + { + ExtendableStringBuilder<256> SB; + SB.Append("\n"); + CompactBinaryToJson(Response.Response.GetView(), SB); + Text = SB.ToString(); + } + } + } + if (Response.ErrorCode != 0) + { + ErrorCode = Response.ErrorCode; + } + else if (!Response.Success) + { + ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); + } + return {.ErrorCode = ErrorCode, .ElapsedSeconds = Response.ElapsedSeconds, .Reason = Response.Reason, .Text = Text}; + } + + Ref<JupiterClient> m_JupiterClient; + const std::string m_Namespace; + const std::string m_Bucket; + const IoHash m_Key; + const IoHash m_OptionalBaseKey; + std::filesystem::path m_TempFilePath; + const bool m_EnableBlocks = true; + const bool m_UseTempBlocks = true; + const bool m_AllowRedirect = false; + + std::atomic_uint64_t m_SentBytes = {}; + std::atomic_uint64_t m_ReceivedBytes = {}; + std::atomic_uint64_t m_RequestTimeNS = {}; + std::atomic_uint64_t m_RequestCount = {}; + std::atomic_uint64_t m_PeakSentBytes = {}; + std::atomic_uint64_t m_PeakReceivedBytes = {}; + std::atomic_uint64_t m_PeakBytesPerSec = {}; +}; + +std::shared_ptr<RemoteProjectStore> +CreateJupiterRemoteStore(const JupiterRemoteStoreOptions& Options, const std::filesystem::path& TempFilePath, bool Quiet, bool Unattended) +{ + std::string Url = Options.Url; + if (Url.find("://"sv) == std::string::npos) + { + // Assume https URL + Url = fmt::format("https://{}"sv, Url); + } + JupiterClientOptions ClientOptions{.Name = "Remote store"sv, + .ServiceUrl = Url, + .ConnectTimeout = std::chrono::milliseconds(2000), + .Timeout = std::chrono::milliseconds(1800000), + .AssumeHttp2 = Options.AssumeHttp2, + .AllowResume = true, + .RetryCount = 4}; + // 1) openid-provider if given (assumes oidctoken.exe -Zen true has been run with matching Options.OpenIdProvider + // 2) Access token as parameter in request + // 3) Environment variable (different win vs linux/mac) + // 4) Default openid-provider (assumes oidctoken.exe -Zen true has been run with matching Options.OpenIdProvider + + std::function<HttpClientAccessToken()> TokenProvider; + if (!Options.OpenIdProvider.empty()) + { + TokenProvider = httpclientauth::CreateFromOpenIdProvider(Options.AuthManager, Options.OpenIdProvider); + } + else if (!Options.AccessToken.empty()) + { + TokenProvider = httpclientauth::CreateFromStaticToken(Options.AccessToken); + } + else if (!Options.OidcExePath.empty()) + { + if (auto TokenProviderMaybe = httpclientauth::CreateFromOidcTokenExecutable(Options.OidcExePath, Url, Quiet, Unattended); + TokenProviderMaybe) + { + TokenProvider = TokenProviderMaybe.value(); + } + } + + if (!TokenProvider) + { + TokenProvider = httpclientauth::CreateFromDefaultOpenIdProvider(Options.AuthManager); + } + + Ref<JupiterClient> Client(new JupiterClient(ClientOptions, std::move(TokenProvider))); + + std::shared_ptr<RemoteProjectStore> RemoteStore = std::make_shared<JupiterRemoteStore>(std::move(Client), + Options.Namespace, + Options.Bucket, + Options.Key, + Options.OptionalBaseKey, + Options.ForceDisableBlocks, + Options.ForceDisableTempBlocks, + TempFilePath); + return RemoteStore; +} + +} // namespace zen |