diff options
Diffstat (limited to 'src/zenserver/projectstore/buildsremoteprojectstore.cpp')
| -rw-r--r-- | src/zenserver/projectstore/buildsremoteprojectstore.cpp | 561 |
1 files changed, 561 insertions, 0 deletions
diff --git a/src/zenserver/projectstore/buildsremoteprojectstore.cpp b/src/zenserver/projectstore/buildsremoteprojectstore.cpp new file mode 100644 index 000000000..6d0d51a60 --- /dev/null +++ b/src/zenserver/projectstore/buildsremoteprojectstore.cpp @@ -0,0 +1,561 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "buildsremoteprojectstore.h" + +#include <zencore/compactbinarybuilder.h> +#include <zencore/compress.h> +#include <zencore/fmtutils.h> + +#include <upstream/jupiter.h> +#include <zenhttp/auth/authmgr.h> + +namespace zen { + +using namespace std::literals; + +static const std::string_view OplogContainerPartName = "oplogcontainer"sv; + +class BuildsRemoteStore : public RemoteProjectStore +{ +public: + BuildsRemoteStore(Ref<CloudCacheClient>&& CloudClient, + std::string_view Namespace, + std::string_view Bucket, + const Oid& BuildId, + const IoBuffer& MetaData, + bool ForceDisableBlocks, + bool ForceDisableTempBlocks, + const std::filesystem::path& TempFilePath) + : m_CloudClient(std::move(CloudClient)) + , m_Namespace(Namespace) + , m_Bucket(Bucket) + , m_BuildId(BuildId) + , m_MetaData(MetaData) + , m_TempFilePath(TempFilePath) + { + m_MetaData.MakeOwned(); + if (ForceDisableBlocks) + { + m_EnableBlocks = false; + } + if (ForceDisableTempBlocks) + { + m_UseTempBlocks = false; + } + } + + virtual RemoteStoreInfo GetInfo() const override + { + return {.CreateBlocks = m_EnableBlocks, + .UseTempBlockFiles = m_UseTempBlocks, + .AllowChunking = true, + .ContainerName = fmt::format("{}/{}/{}", m_Namespace, m_Bucket, m_BuildId), + .Description = fmt::format("[cloud] {} as {}/{}/{}"sv, m_CloudClient->ServiceUrl(), m_Namespace, m_Bucket, m_BuildId)}; + } + + virtual Stats GetStats() const override + { + return {.m_SentBytes = m_SentBytes.load(), + .m_ReceivedBytes = m_ReceivedBytes.load(), + .m_RequestTimeNS = m_RequestTimeNS.load(), + .m_RequestCount = m_RequestCount.load(), + .m_PeakSentBytes = m_PeakSentBytes.load(), + .m_PeakReceivedBytes = m_PeakReceivedBytes.load(), + .m_PeakBytesPerSec = m_PeakBytesPerSec.load()}; + } + + virtual CreateContainerResult CreateContainer() override + { + ZEN_ASSERT(m_OplogBuildPartId == Oid::Zero); + + CloudCacheSession Session(m_CloudClient.Get()); + + IoBuffer Payload = m_MetaData; + Payload.SetContentType(ZenContentType::kCbObject); + CloudCacheResult PutResult = Session.PutBuild(m_Namespace, m_Bucket, m_BuildId, Payload); + AddStats(PutResult); + + CreateContainerResult Result{ConvertResult(PutResult)}; + if (Result.ErrorCode) + { + Result.Reason = fmt::format("Failed creating oplog build to {}/{}/{}/{}. Reason: '{}'", + m_CloudClient->ServiceUrl(), + m_Namespace, + m_Bucket, + m_BuildId, + Result.Reason); + } + m_OplogBuildPartId = Oid::NewOid(); + return Result; + } + + virtual SaveResult SaveContainer(const IoBuffer& Payload) override + { + ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero); + CloudCacheSession Session(m_CloudClient.Get()); + PutBuildPartResult PutResult = + Session.PutBuildPart(m_Namespace, m_Bucket, m_BuildId, m_OplogBuildPartId, OplogContainerPartName, Payload); + AddStats(PutResult); + + SaveResult Result{ConvertResult(PutResult), {PutResult.Needs.begin(), PutResult.Needs.end()}, PutResult.RawHash}; + if (Result.ErrorCode) + { + Result.Reason = fmt::format("Failed saving oplog container build part to {}/{}/{}/{}/{}. Reason: '{}'", + m_CloudClient->ServiceUrl(), + m_Namespace, + m_Bucket, + m_BuildId, + m_OplogBuildPartId, + Result.Reason); + } + + return Result; + } + + virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash, Block&& Block) override + { + ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero); + CloudCacheSession Session(m_CloudClient.Get()); + + CloudCacheResult PutResult = + Session.PutBuildBlob(m_Namespace, m_Bucket, m_BuildId, m_OplogBuildPartId, RawHash, ZenContentType::kCompressedBinary, Payload); + AddStats(PutResult); + + SaveAttachmentResult Result{ConvertResult(PutResult)}; + if (Result.ErrorCode) + { + Result.Reason = fmt::format("Failed saving oplog attachment to {}/{}/{}/{}/{}/{}. Reason: '{}'", + m_CloudClient->ServiceUrl(), + m_Namespace, + m_Bucket, + m_BuildId, + m_OplogBuildPartId, + RawHash, + Result.Reason); + return Result; + } + + if (Block.BlockHash == RawHash) + { + ZEN_ASSERT(Block.ChunkLengths.size() == Block.ChunkHashes.size()); + CbObjectWriter Writer; + Writer.AddHash("rawHash"sv, RawHash); + Writer.BeginArray("rawHashes"sv); + { + for (const IoHash& ChunkHash : Block.ChunkHashes) + { + Writer.AddHash(ChunkHash); + } + } + Writer.EndArray(); + Writer.BeginArray("chunkLengths"); + { + for (uint32_t ChunkSize : Block.ChunkLengths) + { + Writer.AddInteger(ChunkSize); + } + } + Writer.EndArray(); + Writer.BeginArray("chunkOffsets"); + { + ZEN_ASSERT(Block.FirstChunkOffset != (uint32_t)-1); + uint32_t Offset = Block.FirstChunkOffset; + for (uint32_t ChunkSize : Block.ChunkLengths) + { + Writer.AddInteger(Offset); + Offset += ChunkSize; + } + } + Writer.EndArray(); + + Writer.BeginObject("metadata"sv); + { + Writer.AddString("createdBy", "zenserver"); + } + Writer.EndObject(); + + IoBuffer MetaPayload = Writer.Save().GetBuffer().AsIoBuffer(); + MetaPayload.SetContentType(ZenContentType::kCbObject); + CloudCacheResult PutMetaResult = + Session.PutBlockMetadata(m_Namespace, m_Bucket, m_BuildId, m_OplogBuildPartId, RawHash, MetaPayload); + AddStats(PutMetaResult); + RemoteProjectStore::Result MetaDataResult = ConvertResult(PutMetaResult); + if (MetaDataResult.ErrorCode) + { + ZEN_WARN("Failed saving block attachment meta data to {}/{}/{}/{}/{}/{}. Reason: '{}'", + m_CloudClient->ServiceUrl(), + m_Namespace, + m_Bucket, + m_BuildId, + m_OplogBuildPartId, + RawHash, + MetaDataResult.Reason); + } + } + return Result; + } + + virtual SaveAttachmentsResult SaveAttachments(const std::vector<SharedBuffer>& Chunks) override + { + SaveAttachmentsResult Result; + for (const SharedBuffer& Chunk : Chunks) + { + CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(Chunk.AsIoBuffer()); + SaveAttachmentResult ChunkResult = SaveAttachment(Compressed.GetCompressed(), Compressed.DecodeRawHash(), {}); + if (ChunkResult.ErrorCode) + { + return SaveAttachmentsResult{ChunkResult}; + } + } + return Result; + } + + virtual FinalizeResult FinalizeContainer(const IoHash& RawHash) override + { + ZEN_UNUSED(RawHash); + ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero); + + CloudCacheSession Session(m_CloudClient.Get()); + FinalizeBuildPartResult FinalizeRefResult = + Session.FinalizeBuildPart(m_Namespace, m_Bucket, m_BuildId, m_OplogBuildPartId, RawHash); + AddStats(FinalizeRefResult); + + FinalizeResult Result{ConvertResult(FinalizeRefResult), {FinalizeRefResult.Needs.begin(), FinalizeRefResult.Needs.end()}}; + if (Result.ErrorCode) + { + Result.Reason = fmt::format("Failed finalizing oplog container build part to {}/{}/{}/{}/{}. Reason: '{}'", + m_CloudClient->ServiceUrl(), + m_Namespace, + m_Bucket, + m_BuildId, + m_OplogBuildPartId, + Result.Reason); + } + else if (Result.Needs.empty()) + { + CloudCacheResult FinalizeBuildResult = Session.FinalizeBuild(m_Namespace, m_Bucket, m_BuildId); + AddStats(FinalizeBuildResult); + FinalizeBuildResult.ElapsedSeconds += FinalizeRefResult.ElapsedSeconds; + Result = {ConvertResult(FinalizeBuildResult)}; + if (Result.ErrorCode) + { + Result.Reason = fmt::format("Failed finalizing oplog container build to {}/{}/{}/{}. Reason: '{}'", + m_CloudClient->ServiceUrl(), + m_Namespace, + m_Bucket, + m_BuildId, + FinalizeBuildResult.Reason); + } + } + return Result; + } + + virtual LoadContainerResult LoadContainer() override + { + ZEN_ASSERT(m_OplogBuildPartId == Oid::Zero); + + CloudCacheSession Session(m_CloudClient.Get()); + CloudCacheResult GetBuildResult = Session.GetBuild(m_Namespace, m_Bucket, m_BuildId); + AddStats(GetBuildResult); + LoadContainerResult Result{ConvertResult(GetBuildResult)}; + if (Result.ErrorCode) + { + Result.Reason = fmt::format("Failed fetching oplog container build from {}/{}/{}/{}. Reason: '{}'", + m_CloudClient->ServiceUrl(), + m_Namespace, + m_Bucket, + m_BuildId, + Result.Reason); + return Result; + } + CbObject BuildObject = LoadCompactBinaryObject(GetBuildResult.Response); + if (!BuildObject) + { + Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); + Result.Reason = fmt::format("The build {}/{}/{}/{} payload is not formatted as a compact binary object"sv, + m_CloudClient->ServiceUrl(), + m_Namespace, + m_Bucket, + m_BuildId); + return Result; + } + CbObjectView PartsObject = BuildObject["parts"sv].AsObjectView(); + if (!PartsObject) + { + Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); + Result.Reason = fmt::format("The build {}/{}/{}/{} payload does not contain a 'parts' object"sv, + m_CloudClient->ServiceUrl(), + m_Namespace, + m_Bucket, + m_BuildId); + return Result; + } + m_OplogBuildPartId = PartsObject[OplogContainerPartName].AsObjectId(); + if (m_OplogBuildPartId == Oid::Zero) + { + Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); + Result.Reason = fmt::format("The build {}/{}/{}/{} payload 'parts' object does not contain a '{}' entry"sv, + m_CloudClient->ServiceUrl(), + m_Namespace, + m_Bucket, + m_BuildId, + OplogContainerPartName); + return Result; + } + + CloudCacheResult GetBuildPartResult = Session.GetBuildPart(m_Namespace, m_Bucket, m_BuildId, m_OplogBuildPartId); + AddStats(GetBuildPartResult); + Result = {ConvertResult(GetBuildResult)}; + Result.ElapsedSeconds += GetBuildResult.ElapsedSeconds; + if (Result.ErrorCode) + { + Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); + Result.Reason = fmt::format("Failed fetching oplog build part from {}/{}/{}/{}/{}. Reason: '{}'", + m_CloudClient->ServiceUrl(), + m_Namespace, + m_Bucket, + m_BuildId, + m_OplogBuildPartId, + Result.Reason); + return Result; + } + + CbObject ContainerObject = LoadCompactBinaryObject(GetBuildPartResult.Response); + if (!ContainerObject) + { + Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); + Result.Reason = fmt::format("The build part for oplog container {}/{}/{}/{}/{} is not formatted as a compact binary object"sv, + m_CloudClient->ServiceUrl(), + m_Namespace, + m_Bucket, + m_BuildId, + m_OplogBuildPartId); + return Result; + } + Result.ContainerObject = std::move(ContainerObject); + return Result; + } + + virtual GetKnownBlocksResult GetKnownBlocks() override + { + ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero); + CloudCacheSession Session(m_CloudClient.Get()); + CloudCacheResult FindResult = Session.FindBlocks(m_Namespace, m_Bucket, m_BuildId, m_OplogBuildPartId); + AddStats(FindResult); + GetKnownBlocksResult Result{ConvertResult(FindResult)}; + if (Result.ErrorCode) + { + Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); + Result.Reason = fmt::format("Failed listing know blocks for {}/{}/{}/{}/{}. Reason: '{}'", + m_CloudClient->ServiceUrl(), + m_Namespace, + m_Bucket, + m_BuildId, + m_OplogBuildPartId, + Result.Reason); + return Result; + } + CbObject BlocksObject = LoadCompactBinaryObject(FindResult.Response); + if (!BlocksObject) + { + Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); + Result.Reason = fmt::format("The block list {}/{}/{}/{} is not formatted as a compact binary object"sv, + m_CloudClient->ServiceUrl(), + m_Namespace, + m_Bucket, + m_BuildId, + m_OplogBuildPartId); + return Result; + } + + CbArrayView Blocks = BlocksObject["blocks"].AsArrayView(); + Result.Blocks.reserve(Blocks.Num()); + for (CbFieldView BlockView : Blocks) + { + CbObjectView BlockObject = BlockView.AsObjectView(); + IoHash BlockHash = BlockObject["rawHash"sv].AsHash(); + if (BlockHash != IoHash::Zero) + { + CbArrayView ChunksArray = BlockObject["rawHashes"sv].AsArrayView(); + std::vector<IoHash> ChunkHashes; + ChunkHashes.reserve(ChunksArray.Num()); + for (CbFieldView ChunkView : ChunksArray) + { + ChunkHashes.push_back(ChunkView.AsHash()); + } + Result.Blocks.emplace_back(Block{.BlockHash = BlockHash, .ChunkHashes = ChunkHashes}); + } + } + return Result; + } + + virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) override + { + ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero); + CloudCacheSession Session(m_CloudClient.Get()); + CloudCacheResult GetResult = Session.GetBuildBlob(m_Namespace, m_Bucket, m_BuildId, m_OplogBuildPartId, RawHash, m_TempFilePath); + AddStats(GetResult); + + LoadAttachmentResult Result{ConvertResult(GetResult), std::move(GetResult.Response)}; + if (GetResult.ErrorCode) + { + Result.Reason = fmt::format("Failed fetching oplog attachment from {}/{}/{}&{}/{}/{}. Reason: '{}'", + m_CloudClient->ServiceUrl(), + m_Namespace, + m_Bucket, + m_BuildId, + m_OplogBuildPartId, + RawHash, + Result.Reason); + } + return Result; + } + + virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes) override + { + LoadAttachmentsResult Result; + for (const IoHash& Hash : RawHashes) + { + LoadAttachmentResult ChunkResult = LoadAttachment(Hash); + if (ChunkResult.ErrorCode) + { + return LoadAttachmentsResult{ChunkResult}; + } + ZEN_DEBUG("Loaded attachment in {}", NiceTimeSpanMs(static_cast<uint64_t>(ChunkResult.ElapsedSeconds * 1000))); + Result.Chunks.emplace_back( + std::pair<IoHash, CompressedBuffer>{Hash, CompressedBuffer::FromCompressedNoValidate(std::move(ChunkResult.Bytes))}); + } + return Result; + } + +private: + void AddStats(const CloudCacheResult& Result) + { + m_SentBytes.fetch_add(gsl::narrow<uint64_t>(Result.SentBytes)); + m_ReceivedBytes.fetch_add(gsl::narrow<uint64_t>(Result.ReceivedBytes)); + m_RequestTimeNS.fetch_add(static_cast<uint64_t>(Result.ElapsedSeconds * 1000000000)); + SetAtomicMax(m_PeakSentBytes, Result.SentBytes); + SetAtomicMax(m_PeakReceivedBytes, Result.ReceivedBytes); + if (Result.ElapsedSeconds > 0.0) + { + uint64_t BytesPerSec = static_cast<uint64_t>((Result.SentBytes + Result.ReceivedBytes) / Result.ElapsedSeconds); + SetAtomicMax(m_PeakBytesPerSec, BytesPerSec); + } + + m_RequestCount.fetch_add(1); + } + + static Result ConvertResult(const CloudCacheResult& Response) + { + std::string Text; + int32_t ErrorCode = 0; + if (Response.ErrorCode != 0 || !Response.Success) + { + if (Response.Response) + { + HttpContentType ContentType = Response.Response.GetContentType(); + if (ContentType == ZenContentType::kText || ContentType == ZenContentType::kJSON) + { + ExtendableStringBuilder<256> SB; + SB.Append("\n"); + SB.Append(std::string_view(reinterpret_cast<const std::string::value_type*>(Response.Response.GetData()), + Response.Response.GetSize())); + Text = SB.ToString(); + } + else if (ContentType == ZenContentType::kCbObject) + { + ExtendableStringBuilder<256> SB; + SB.Append("\n"); + CompactBinaryToJson(Response.Response.GetView(), SB); + Text = SB.ToString(); + } + } + } + if (Response.ErrorCode != 0) + { + ErrorCode = Response.ErrorCode; + } + else if (!Response.Success) + { + ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); + } + return {.ErrorCode = ErrorCode, .ElapsedSeconds = Response.ElapsedSeconds, .Reason = Response.Reason, .Text = Text}; + } + + Ref<CloudCacheClient> m_CloudClient; + const std::string m_Namespace; + const std::string m_Bucket; + const Oid m_BuildId; + IoBuffer m_MetaData; + Oid m_OplogBuildPartId = Oid::Zero; + std::filesystem::path m_TempFilePath; + bool m_EnableBlocks = true; + bool m_UseTempBlocks = true; + + std::atomic_uint64_t m_SentBytes = {}; + std::atomic_uint64_t m_ReceivedBytes = {}; + std::atomic_uint64_t m_RequestTimeNS = {}; + std::atomic_uint64_t m_RequestCount = {}; + std::atomic_uint64_t m_PeakSentBytes = {}; + std::atomic_uint64_t m_PeakReceivedBytes = {}; + std::atomic_uint64_t m_PeakBytesPerSec = {}; +}; + +std::shared_ptr<RemoteProjectStore> +CreateBuildsRemoteStore(const BuildsRemoteStoreOptions& Options, const std::filesystem::path& TempFilePath) +{ + std::string Url = Options.Url; + if (Url.find("://"sv) == std::string::npos) + { + // Assume https URL + Url = fmt::format("https://{}"sv, Url); + } + CloudCacheClientOptions ClientOptions{.Name = "Remote store"sv, + .ServiceUrl = Url, + .ConnectTimeout = std::chrono::milliseconds(2000), + .Timeout = std::chrono::milliseconds(1800000), + .AssumeHttp2 = Options.AssumeHttp2, + .AllowResume = true, + .RetryCount = 4}; + // 1) openid-provider if given (assumes oidctoken.exe -Zen true has been run with matching Options.OpenIdProvider + // 2) Access token as parameter in request + // 3) Environment variable (different win vs linux/mac) + // 4) Default openid-provider (assumes oidctoken.exe -Zen true has been run with matching Options.OpenIdProvider + + std::unique_ptr<CloudCacheTokenProvider> TokenProvider; + if (!Options.OpenIdProvider.empty()) + { + TokenProvider = + CloudCacheTokenProvider::CreateFromCallback([&AuthManager = Options.AuthManager, OpenIdProvider = Options.OpenIdProvider]() { + AuthMgr::OpenIdAccessToken Token = AuthManager.GetOpenIdAccessToken(OpenIdProvider.empty() ? "Default" : OpenIdProvider); + return CloudCacheAccessToken{.Value = Token.AccessToken, .ExpireTime = Token.ExpireTime}; + }); + } + else if (!Options.AccessToken.empty()) + { + TokenProvider = CloudCacheTokenProvider::CreateFromCallback([AccessToken = "Bearer " + Options.AccessToken]() { + return CloudCacheAccessToken{.Value = AccessToken, .ExpireTime = GcClock::TimePoint::max()}; + }); + } + else + { + TokenProvider = CloudCacheTokenProvider::CreateFromCallback([&AuthManager = Options.AuthManager]() { + AuthMgr::OpenIdAccessToken Token = AuthManager.GetOpenIdAccessToken("Default"); + return CloudCacheAccessToken{.Value = Token.AccessToken, .ExpireTime = Token.ExpireTime}; + }); + } + + Ref<CloudCacheClient> CloudClient(new CloudCacheClient(ClientOptions, std::move(TokenProvider))); + + std::shared_ptr<RemoteProjectStore> RemoteStore = std::make_shared<BuildsRemoteStore>(std::move(CloudClient), + Options.Namespace, + Options.Bucket, + Options.BuildId, + Options.MetaData, + Options.ForceDisableBlocks, + Options.ForceDisableTempBlocks, + TempFilePath); + return RemoteStore; +} + +} // namespace zen |