// Copyright Epic Games, Inc. All Rights Reserved. #include #include #include #include #include #include #include namespace zen { using namespace std::literals; class JupiterRemoteStore : public RemoteProjectStore { public: JupiterRemoteStore(LoggerRef InLog, Ref&& InJupiterClient, std::string_view Namespace, std::string_view Bucket, const IoHash& Key, const IoHash& OptionalBaseKey, bool ForceDisableBlocks, bool ForceDisableTempBlocks, const std::filesystem::path& TempFilePath) : m_Log(InLog) , m_JupiterClient(std::move(InJupiterClient)) , m_Namespace(Namespace) , m_Bucket(Bucket) , m_Key(Key) , m_OptionalBaseKey(OptionalBaseKey) , m_TempFilePath(TempFilePath) , m_EnableBlocks(!ForceDisableBlocks) , m_UseTempBlocks(!ForceDisableTempBlocks) { } virtual RemoteStoreInfo GetInfo() const override { return {.CreateBlocks = m_EnableBlocks, .UseTempBlockFiles = m_UseTempBlocks, .AllowChunking = true, .ContainerName = fmt::format("{}/{}/{}", m_Namespace, m_Bucket, m_Key), .Description = fmt::format("[cloud] {}. SessionId: {}. {}/{}/{}{}"sv, m_JupiterClient->ServiceUrl(), m_JupiterClient->Client().GetSessionId(), m_Namespace, m_Bucket, m_Key, m_OptionalBaseKey == IoHash::Zero ? "" : fmt::format(" Base {}", m_OptionalBaseKey))}; } virtual Stats GetStats() const override { return {.m_SentBytes = m_SentBytes.load(), .m_ReceivedBytes = m_ReceivedBytes.load(), .m_RequestTimeNS = m_RequestTimeNS.load(), .m_RequestCount = m_RequestCount.load(), .m_PeakSentBytes = m_PeakSentBytes.load(), .m_PeakReceivedBytes = m_PeakReceivedBytes.load(), .m_PeakBytesPerSec = m_PeakBytesPerSec.load()}; } virtual bool GetExtendedStats(ExtendedStats& OutStats) const override { ZEN_UNUSED(OutStats); return false; } virtual CreateContainerResult CreateContainer() override { // Nothing to do here return {}; } virtual SaveResult SaveContainer(const IoBuffer& Payload) override { JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect); PutRefResult PutResult = Session.PutRef(m_Namespace, m_Bucket, m_Key, /*Overwrite*/ false, Payload, ZenContentType::kCbObject); AddStats(PutResult); SaveResult Result{ConvertResult(PutResult), {PutResult.Needs.begin(), PutResult.Needs.end()}, PutResult.RawHash}; if (Result.ErrorCode) { Result.Reason = fmt::format("Failed saving oplog container to {}/{}/{}/{}. Reason: '{}'", m_JupiterClient->ServiceUrl(), m_Namespace, m_Bucket, m_Key, Result.Reason); } return Result; } virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash, ChunkBlockDescription&&) override { JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect); JupiterResult PutResult = Session.PutCompressedBlob(m_Namespace, RawHash, Payload); AddStats(PutResult); SaveAttachmentResult Result{ConvertResult(PutResult)}; if (Result.ErrorCode) { Result.Reason = fmt::format("Failed saving oplog attachment to {}/{}/{}. Reason: '{}'", m_JupiterClient->ServiceUrl(), m_Namespace, RawHash, Result.Reason); } return Result; } virtual SaveAttachmentsResult SaveAttachments(const std::vector& Chunks) override { SaveAttachmentsResult Result; for (const SharedBuffer& Chunk : Chunks) { CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(Chunk.AsIoBuffer()); SaveAttachmentResult ChunkResult = SaveAttachment(Compressed.GetCompressed(), Compressed.DecodeRawHash(), {}); if (ChunkResult.ErrorCode) { return SaveAttachmentsResult{ChunkResult}; } } return Result; } virtual FinalizeResult FinalizeContainer(const IoHash& RawHash) override { JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect); FinalizeRefResult FinalizeRefResult = Session.FinalizeRef(m_Namespace, m_Bucket, m_Key, RawHash); AddStats(FinalizeRefResult); FinalizeResult Result{ConvertResult(FinalizeRefResult), {FinalizeRefResult.Needs.begin(), FinalizeRefResult.Needs.end()}}; if (Result.ErrorCode) { Result.Reason = fmt::format("Failed finalizing oplog container to {}/{}/{}/{}. Reason: '{}'", m_JupiterClient->ServiceUrl(), m_Namespace, m_Bucket, m_Key, Result.Reason); } return Result; } virtual LoadContainerResult LoadContainer() override { return LoadContainer(m_Key); } virtual GetKnownBlocksResult GetKnownBlocks() override { if (m_OptionalBaseKey == IoHash::Zero) { return GetKnownBlocksResult{{.ErrorCode = static_cast(HttpResponseCode::NoContent)}}; } LoadContainerResult LoadResult = LoadContainer(m_OptionalBaseKey); if (LoadResult.ErrorCode) { return GetKnownBlocksResult{LoadResult}; } std::vector BlockHashes = GetBlockHashesFromOplog(LoadResult.ContainerObject); if (BlockHashes.empty()) { return GetKnownBlocksResult{ {.ErrorCode = static_cast(HttpResponseCode::NoContent), .ElapsedSeconds = LoadResult.ElapsedSeconds}}; } JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect); JupiterExistsResult ExistsResult = Session.CompressedBlobExists(m_Namespace, std::set(BlockHashes.begin(), BlockHashes.end())); AddStats(ExistsResult); if (ExistsResult.ErrorCode) { return GetKnownBlocksResult{{.ErrorCode = ExistsResult.ErrorCode, .ElapsedSeconds = LoadResult.ElapsedSeconds + ExistsResult.ElapsedSeconds, .Reason = fmt::format("Failed checking attachment existance in {}/{}. Reason: '{}'", m_JupiterClient->ServiceUrl(), m_Namespace, ExistsResult.Reason)}}; } Stopwatch Timer; std::vector ExistingBlockHashes; for (const IoHash& RawHash : BlockHashes) { if (!ExistsResult.Needs.contains(RawHash)) { ExistingBlockHashes.push_back(RawHash); } } if (ExistingBlockHashes.empty()) { return GetKnownBlocksResult{{.ErrorCode = static_cast(HttpResponseCode::NoContent), .ElapsedSeconds = LoadResult.ElapsedSeconds + ExistsResult.ElapsedSeconds}}; } std::vector ThinKnownBlocks = GetBlocksFromOplog(LoadResult.ContainerObject, ExistingBlockHashes); GetKnownBlocksResult Result{ {.ElapsedSeconds = LoadResult.ElapsedSeconds + ExistsResult.ElapsedSeconds + Timer.GetElapsedTimeUs() * 1000.0}}; const size_t KnowBlockCount = ThinKnownBlocks.size(); Result.Blocks.resize(KnowBlockCount); for (size_t BlockIndex = 0; BlockIndex < KnowBlockCount; BlockIndex++) { Result.Blocks[BlockIndex].BlockHash = ThinKnownBlocks[BlockIndex].BlockHash; Result.Blocks[BlockIndex].ChunkRawHashes = std::move(ThinKnownBlocks[BlockIndex].ChunkRawHashes); } return Result; } virtual GetBlockDescriptionsResult GetBlockDescriptions(std::span BlockHashes) override { ZEN_UNUSED(BlockHashes); return GetBlockDescriptionsResult{Result{.ErrorCode = int(HttpResponseCode::NotFound)}}; } virtual AttachmentExistsInCacheResult AttachmentExistsInCache(std::span RawHashes) override { return AttachmentExistsInCacheResult{Result{.ErrorCode = 0}, std::vector(RawHashes.size(), false)}; } virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash, const AttachmentRange& Range) override { JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect); JupiterResult GetResult = Session.GetCompressedBlob(m_Namespace, RawHash, m_TempFilePath); AddStats(GetResult); LoadAttachmentResult Result{ConvertResult(GetResult), std::move(GetResult.Response)}; if (GetResult.ErrorCode) { Result.Reason = fmt::format("Failed fetching oplog attachment from {}/{}/{}. Reason: '{}'", m_JupiterClient->ServiceUrl(), m_Namespace, RawHash, Result.Reason); } if (!Result.ErrorCode && Range) { Result.Bytes = IoBuffer(Result.Bytes, Range.Offset, Range.Bytes); } return Result; } virtual LoadAttachmentsResult LoadAttachments(const std::vector& RawHashes) override { LoadAttachmentsResult Result; for (const IoHash& Hash : RawHashes) { LoadAttachmentResult ChunkResult = LoadAttachment(Hash, {}); if (ChunkResult.ErrorCode) { return LoadAttachmentsResult{ChunkResult}; } ZEN_DEBUG("Loaded attachment in {}", NiceTimeSpanMs(static_cast(ChunkResult.ElapsedSeconds * 1000))); Result.Chunks.emplace_back( std::pair{Hash, CompressedBuffer::FromCompressedNoValidate(std::move(ChunkResult.Bytes))}); } return Result; } virtual void Flush() override {} private: LoadContainerResult LoadContainer(const IoHash& Key) { JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect); JupiterResult GetResult = Session.GetRef(m_Namespace, m_Bucket, Key, ZenContentType::kCbObject); AddStats(GetResult); if (GetResult.ErrorCode || !GetResult.Success) { LoadContainerResult Result{ConvertResult(GetResult)}; Result.Reason = fmt::format("Failed fetching oplog container from {}/{}/{}/{}. Reason: '{}'", m_JupiterClient->ServiceUrl(), m_Namespace, m_Bucket, Key, Result.Reason); return Result; } CbValidateError ValidateResult = CbValidateError::None; if (CbObject ContainerObject = ValidateAndReadCompactBinaryObject(IoBuffer(GetResult.Response), ValidateResult); ValidateResult != CbValidateError::None || !ContainerObject) { return LoadContainerResult{ RemoteProjectStore::Result{.ErrorCode = gsl::narrow(HttpResponseCode::InternalServerError), .ElapsedSeconds = GetResult.ElapsedSeconds, .Reason = fmt::format("The ref {}/{}/{}/{} is not formatted as a compact binary object"sv, m_JupiterClient->ServiceUrl(), m_Namespace, m_Bucket, Key)}, {}}; } else { return LoadContainerResult{ConvertResult(GetResult), std::move(ContainerObject)}; } } void AddStats(const JupiterResult& Result) { m_SentBytes.fetch_add(gsl::narrow(Result.SentBytes)); m_ReceivedBytes.fetch_add(gsl::narrow(Result.ReceivedBytes)); m_RequestTimeNS.fetch_add(static_cast(Result.ElapsedSeconds * 1000000000)); SetAtomicMax(m_PeakSentBytes, Result.SentBytes); SetAtomicMax(m_PeakReceivedBytes, Result.ReceivedBytes); if (Result.ElapsedSeconds > 0.0) { uint64_t BytesPerSec = static_cast((Result.SentBytes + Result.ReceivedBytes) / Result.ElapsedSeconds); SetAtomicMax(m_PeakBytesPerSec, BytesPerSec); } m_RequestCount.fetch_add(1); } static Result ConvertResult(const JupiterResult& Response) { std::string Text; int32_t ErrorCode = 0; if (Response.ErrorCode != 0 || !Response.Success) { if (Response.Response) { HttpContentType ContentType = Response.Response.GetContentType(); if (ContentType == ZenContentType::kText || ContentType == ZenContentType::kJSON) { ExtendableStringBuilder<256> SB; SB.Append("\n"); SB.Append(std::string_view(reinterpret_cast(Response.Response.GetData()), Response.Response.GetSize())); Text = SB.ToString(); } else if (ContentType == ZenContentType::kCbObject) { ExtendableStringBuilder<256> SB; SB.Append("\n"); CompactBinaryToJson(Response.Response.GetView(), SB); Text = SB.ToString(); } } } if (Response.ErrorCode != 0) { ErrorCode = Response.ErrorCode; } else if (!Response.Success) { ErrorCode = gsl::narrow(HttpResponseCode::InternalServerError); } return {.ErrorCode = ErrorCode, .ElapsedSeconds = Response.ElapsedSeconds, .Reason = Response.Reason, .Text = Text}; } inline LoggerRef Log() const { return m_Log; } LoggerRef m_Log; Ref m_JupiterClient; const std::string m_Namespace; const std::string m_Bucket; const IoHash m_Key; const IoHash m_OptionalBaseKey; std::filesystem::path m_TempFilePath; const bool m_EnableBlocks = true; const bool m_UseTempBlocks = true; const bool m_AllowRedirect = false; std::atomic_uint64_t m_SentBytes = {}; std::atomic_uint64_t m_ReceivedBytes = {}; std::atomic_uint64_t m_RequestTimeNS = {}; std::atomic_uint64_t m_RequestCount = {}; std::atomic_uint64_t m_PeakSentBytes = {}; std::atomic_uint64_t m_PeakReceivedBytes = {}; std::atomic_uint64_t m_PeakBytesPerSec = {}; }; std::shared_ptr CreateJupiterRemoteStore(LoggerRef InLog, const JupiterRemoteStoreOptions& Options, const std::filesystem::path& TempFilePath, bool Quiet, bool Unattended, bool Hidden) { std::string Url = Options.Url; if (Url.find("://"sv) == std::string::npos) { // Assume https URL Url = fmt::format("https://{}"sv, Url); } JupiterClientOptions ClientOptions{.Name = "Remote store"sv, .ServiceUrl = Url, .ConnectTimeout = std::chrono::milliseconds(3000), .Timeout = std::chrono::milliseconds(1800000), .AssumeHttp2 = Options.AssumeHttp2, .AllowResume = true, .RetryCount = 4}; // 1) openid-provider if given (assumes oidctoken.exe -Zen true has been run with matching Options.OpenIdProvider // 2) Access token as parameter in request // 3) Environment variable (different win vs linux/mac) // 4) Default openid-provider (assumes oidctoken.exe -Zen true has been run with matching Options.OpenIdProvider std::function TokenProvider; if (!Options.OpenIdProvider.empty()) { TokenProvider = httpclientauth::CreateFromOpenIdProvider(Options.AuthManager, Options.OpenIdProvider); } else if (!Options.AccessToken.empty()) { TokenProvider = httpclientauth::CreateFromStaticToken(Options.AccessToken); } else if (!Options.OidcExePath.empty()) { if (auto TokenProviderMaybe = httpclientauth::CreateFromOidcTokenExecutable(Options.OidcExePath, Url, Quiet, Unattended, Hidden); TokenProviderMaybe) { TokenProvider = TokenProviderMaybe.value(); } } if (!TokenProvider) { TokenProvider = httpclientauth::CreateFromDefaultOpenIdProvider(Options.AuthManager); } Ref Client(new JupiterClient(ClientOptions, std::move(TokenProvider))); std::shared_ptr RemoteStore = std::make_shared(InLog, std::move(Client), Options.Namespace, Options.Bucket, Options.Key, Options.OptionalBaseKey, Options.ForceDisableBlocks, Options.ForceDisableTempBlocks, TempFilePath); return RemoteStore; } } // namespace zen