diff options
| author | Dan Engelbrecht <[email protected]> | 2025-08-12 13:53:58 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-08-12 13:53:58 +0200 |
| commit | 3a9bc3071b9a9452a5aef23c438233fc9e86fb47 (patch) | |
| tree | b0a1d67fe765f2ddc96772db088d781be159d627 /src/zenserver | |
| parent | add filtering to builds download (#463) (diff) | |
| download | zen-3a9bc3071b9a9452a5aef23c438233fc9e86fb47.tar.xz zen-3a9bc3071b9a9452a5aef23c438233fc9e86fb47.zip | |
use new builds api for oplogs (#464)
- Improvement: Refactored jupiter oplog export code to reuse builds jupiter wrapper classes
- Improvement: If `zen builds`, `zen oplog-import` or `zen oplog-import` command fails due to a http error, the return code for the program will be set to the error/status code
Diffstat (limited to 'src/zenserver')
| -rw-r--r-- | src/zenserver/admin/admin.cpp | 1 | ||||
| -rw-r--r-- | src/zenserver/projectstore/buildsremoteprojectstore.cpp | 589 | ||||
| -rw-r--r-- | src/zenserver/projectstore/buildsremoteprojectstore.h | 6 | ||||
| -rw-r--r-- | src/zenserver/projectstore/projectstore.cpp | 8 |
4 files changed, 295 insertions, 309 deletions
diff --git a/src/zenserver/admin/admin.cpp b/src/zenserver/admin/admin.cpp index 73166e608..9d982678a 100644 --- a/src/zenserver/admin/admin.cpp +++ b/src/zenserver/admin/admin.cpp @@ -247,6 +247,7 @@ HttpAdminService::HttpAdminService(GcScheduler& Scheduler, Obj.AddFloat("QueueTimeS", GetAgeAsSeconds(CurrentState->CreateTime, CurrentState->StartTime)); Obj.AddFloat("RunTimeS", GetAgeAsSeconds(CurrentState->StartTime, CurrentState->EndTime)); Obj.AddFloat("CompleteTimeS", GetAgeAsSeconds(CurrentState->EndTime, Now)); + Obj.AddInteger("ReturnCode", CurrentState->ReturnCode); Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Obj.Save()); } break; diff --git a/src/zenserver/projectstore/buildsremoteprojectstore.cpp b/src/zenserver/projectstore/buildsremoteprojectstore.cpp index eab687db2..c9a01e56a 100644 --- a/src/zenserver/projectstore/buildsremoteprojectstore.cpp +++ b/src/zenserver/projectstore/buildsremoteprojectstore.cpp @@ -3,14 +3,12 @@ #include "buildsremoteprojectstore.h" #include <zencore/compactbinarybuilder.h> -#include <zencore/compactbinaryvalidation.h> #include <zencore/compress.h> #include <zencore/fmtutils.h> +#include <zencore/scopeguard.h> #include <zenhttp/httpclientauth.h> - -#include <zenutil/jupiter/jupiterclient.h> -#include <zenutil/jupiter/jupitersession.h> +#include <zenutil/jupiter/jupiterbuildstorage.h> namespace zen { @@ -21,20 +19,24 @@ static const std::string_view OplogContainerPartName = "oplogcontainer"sv; class BuildsRemoteStore : public RemoteProjectStore { public: - BuildsRemoteStore(Ref<JupiterClient>&& InJupiterClient, - std::string_view Namespace, - std::string_view Bucket, - const Oid& BuildId, - const IoBuffer& MetaData, - bool ForceDisableBlocks, - bool ForceDisableTempBlocks, - const std::filesystem::path& TempFilePath) - : m_JupiterClient(std::move(InJupiterClient)) + BuildsRemoteStore(std::unique_ptr<BuildStorage::Statistics>&& BuildStorageStats, + std::unique_ptr<HttpClient>&& BuildStorageHttp, + std::unique_ptr<BuildStorage>&& BuildStorage, + std::string_view Url, + std::string_view Namespace, + std::string_view Bucket, + const Oid& BuildId, + const IoBuffer& MetaData, + bool ForceDisableBlocks, + bool ForceDisableTempBlocks) + : m_BuildStorageStats(std::move(BuildStorageStats)) + , m_BuildStorageHttp(std::move(BuildStorageHttp)) + , m_BuildStorage(std::move(BuildStorage)) + , m_Url(Url) , m_Namespace(Namespace) , m_Bucket(Bucket) , m_BuildId(BuildId) , m_MetaData(MetaData) - , m_TempFilePath(TempFilePath) , m_EnableBlocks(!ForceDisableBlocks) , m_UseTempBlocks(!ForceDisableTempBlocks) { @@ -47,63 +49,91 @@ public: .UseTempBlockFiles = m_UseTempBlocks, .AllowChunking = true, .ContainerName = fmt::format("{}/{}/{}", m_Namespace, m_Bucket, m_BuildId), - .Description = fmt::format("[cloud] {} as {}/{}/{}"sv, m_JupiterClient->ServiceUrl(), m_Namespace, m_Bucket, m_BuildId)}; + .Description = fmt::format("[cloud] {} as {}/{}/{}"sv, m_Url, m_Namespace, m_Bucket, m_BuildId)}; } virtual Stats GetStats() const override { - return {.m_SentBytes = m_SentBytes.load(), - .m_ReceivedBytes = m_ReceivedBytes.load(), - .m_RequestTimeNS = m_RequestTimeNS.load(), - .m_RequestCount = m_RequestCount.load(), - .m_PeakSentBytes = m_PeakSentBytes.load(), - .m_PeakReceivedBytes = m_PeakReceivedBytes.load(), - .m_PeakBytesPerSec = m_PeakBytesPerSec.load()}; + return { + .m_SentBytes = m_BuildStorageStats->TotalBytesWritten.load(), + .m_ReceivedBytes = m_BuildStorageStats->TotalBytesRead.load(), + .m_RequestTimeNS = m_BuildStorageStats->TotalRequestTimeUs.load() * 1000, + .m_RequestCount = m_BuildStorageStats->TotalRequestCount.load(), + .m_PeakSentBytes = m_BuildStorageStats->PeakSentBytes.load(), + .m_PeakReceivedBytes = m_BuildStorageStats->PeakReceivedBytes.load(), + .m_PeakBytesPerSec = m_BuildStorageStats->PeakBytesPerSec.load(), + }; } virtual CreateContainerResult CreateContainer() override { ZEN_ASSERT(m_OplogBuildPartId == Oid::Zero); - JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect); - - IoBuffer Payload = m_MetaData; - Payload.SetContentType(ZenContentType::kCbObject); - JupiterResult PutResult = Session.PutBuild(m_Namespace, m_Bucket, m_BuildId, Payload); - AddStats(PutResult); + CreateContainerResult Result; + Stopwatch Timer; + auto _ = MakeGuard([&Timer, &Result]() { Result.ElapsedSeconds = Timer.GetElapsedTimeUs() * 1000000.0; }); - CreateContainerResult Result{ConvertResult(PutResult)}; - if (Result.ErrorCode) + CbObject Payload = LoadCompactBinaryObject(m_MetaData); + try + { + CbObject PutBuildResult = m_BuildStorage->PutBuild(m_BuildId, Payload); + ZEN_UNUSED(PutBuildResult); + m_OplogBuildPartId = Oid::NewOid(); + } + catch (const HttpClientError& Ex) { - Result.Reason = fmt::format("Failed creating oplog build to {}/{}/{}/{}. Reason: '{}'", - m_JupiterClient->ServiceUrl(), - m_Namespace, - m_Bucket, - m_BuildId, - Result.Reason); + Result.ErrorCode = MakeErrorCode(Ex); + Result.Reason = + fmt::format("Failed creating oplog build to {}/{}/{}/{}. Reason: '{}'", m_Url, m_Namespace, m_Bucket, m_BuildId, Ex.what()); } - m_OplogBuildPartId = Oid::NewOid(); + catch (const std::exception& Ex) + { + Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); + Result.Reason = + fmt::format("Failed creating oplog build to {}/{}/{}/{}. Reason: '{}'", m_Url, m_Namespace, m_Bucket, m_BuildId, Ex.what()); + } + return Result; } virtual SaveResult SaveContainer(const IoBuffer& Payload) override { ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero); - JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect); - PutBuildPartResult PutResult = - Session.PutBuildPart(m_Namespace, m_Bucket, m_BuildId, m_OplogBuildPartId, OplogContainerPartName, Payload); - AddStats(PutResult); - SaveResult Result{ConvertResult(PutResult), {PutResult.Needs.begin(), PutResult.Needs.end()}, PutResult.RawHash}; - if (Result.ErrorCode) + SaveResult Result; + Stopwatch Timer; + auto _ = MakeGuard([&Timer, &Result]() { Result.ElapsedSeconds = Timer.GetElapsedTimeUs() * 1000000.0; }); + + try + { + CbObject ObjectPayload = LoadCompactBinaryObject(Payload); + + std::pair<IoHash, std::vector<IoHash>> PutBuildPartResult = + m_BuildStorage->PutBuildPart(m_BuildId, m_OplogBuildPartId, OplogContainerPartName, ObjectPayload); + Result.RawHash = PutBuildPartResult.first; + Result.Needs = std::unordered_set<IoHash, IoHash::Hasher>(PutBuildPartResult.second.begin(), PutBuildPartResult.second.end()); + } + catch (const HttpClientError& Ex) { - Result.Reason = fmt::format("Failed saving oplog container build part to {}/{}/{}/{}/{}. Reason: '{}'", - m_JupiterClient->ServiceUrl(), - m_Namespace, - m_Bucket, - m_BuildId, - m_OplogBuildPartId, - Result.Reason); + Result.ErrorCode = MakeErrorCode(Ex); + Result.Reason = fmt::format("Failed saving oplog container build part to {}/{}/{}/{}/{}. Reason: '{}'", + m_Url, + m_Namespace, + m_Bucket, + m_BuildId, + m_OplogBuildPartId, + Ex.what()); + } + catch (const std::exception& Ex) + { + Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); + Result.Reason = fmt::format("Failed saving oplog container build part to {}/{}/{}/{}/{}. Reason: '{}'", + m_Url, + m_Namespace, + m_Bucket, + m_BuildId, + m_OplogBuildPartId, + Ex.what()); } return Result; @@ -114,52 +144,84 @@ public: ChunkBlockDescription&& Block) override { ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero); - JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect); - JupiterResult PutResult = - Session.PutBuildBlob(m_Namespace, m_Bucket, m_BuildId, RawHash, ZenContentType::kCompressedBinary, Payload); - AddStats(PutResult); + SaveAttachmentResult Result; + Stopwatch Timer; + auto _ = MakeGuard([&Timer, &Result]() { Result.ElapsedSeconds = Timer.GetElapsedTimeUs() * 1000000.0; }); - SaveAttachmentResult Result{ConvertResult(PutResult)}; - if (Result.ErrorCode) + try { - Result.Reason = fmt::format("Failed saving oplog attachment to {}/{}/{}/{}/{}. Reason: '{}'", - m_JupiterClient->ServiceUrl(), - m_Namespace, - m_Bucket, - m_BuildId, - RawHash, - Result.Reason); - return Result; - } + m_BuildStorage->PutBuildBlob(m_BuildId, RawHash, ZenContentType::kCompressedBinary, Payload); - if (Block.BlockHash == RawHash) - { - CbObjectWriter BlockMetaData; - BlockMetaData.AddString("createdBy", GetRunningExecutablePath().stem().string()); - - IoBuffer MetaPayload = BuildChunkBlockDescription(Block, BlockMetaData.Save()).GetBuffer().AsIoBuffer(); - MetaPayload.SetContentType(ZenContentType::kCbObject); - JupiterResult PutMetaResult = Session.PutBlockMetadata(m_Namespace, m_Bucket, m_BuildId, RawHash, MetaPayload); - AddStats(PutMetaResult); - RemoteProjectStore::Result MetaDataResult = ConvertResult(PutMetaResult); - if (MetaDataResult.ErrorCode) + if (Block.BlockHash == RawHash) { - ZEN_WARN("Failed saving block attachment meta data to {}/{}/{}/{}/{}. Reason: '{}'", - m_JupiterClient->ServiceUrl(), - m_Namespace, - m_Bucket, - m_BuildId, - RawHash, - MetaDataResult.Reason); + try + { + CbObjectWriter BlockMetaData; + BlockMetaData.AddString("createdBy", GetRunningExecutablePath().stem().string()); + CbObject MetaPayload = BuildChunkBlockDescription(Block, BlockMetaData.Save()); + if (!m_BuildStorage->PutBlockMetadata(m_BuildId, RawHash, MetaPayload)) + { + ZEN_WARN("Failed saving block attachment meta data to {}/{}/{}/{}/{}. Reason: '{}'", + m_Url, + m_Namespace, + m_Bucket, + m_BuildId, + RawHash, + "not found"); + } + } + catch (const HttpClientError& Ex) + { + Result.ErrorCode = MakeErrorCode(Ex); + Result.Reason = fmt::format("Failed saving block attachment meta data to {}/{}/{}/{}. Reason: '{}'", + m_Url, + m_Namespace, + m_Bucket, + m_BuildId, + Ex.what()); + } + catch (const std::exception& Ex) + { + Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); + Result.Reason = fmt::format("Failed saving block attachment meta data to {}/{}/{}/{}. Reason: '{}'", + m_Url, + m_Namespace, + m_Bucket, + m_BuildId, + Ex.what()); + } } } + catch (const HttpClientError& Ex) + { + Result.ErrorCode = MakeErrorCode(Ex); + Result.Reason = fmt::format("Failed saving oplog attachment to {}/{}/{}/{}. Reason: '{}'", + m_Url, + m_Namespace, + m_Bucket, + m_BuildId, + Ex.what()); + } + catch (const std::exception& Ex) + { + Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); + Result.Reason = fmt::format("Failed saving oplog attachment to {}/{}/{}/{}. Reason: '{}'", + m_Url, + m_Namespace, + m_Bucket, + m_BuildId, + Ex.what()); + } + return Result; } virtual SaveAttachmentsResult SaveAttachments(const std::vector<SharedBuffer>& Chunks) override { SaveAttachmentsResult Result; + Stopwatch Timer; + auto _ = MakeGuard([&Timer, &Result]() { Result.ElapsedSeconds = Timer.GetElapsedTimeUs() * 1000000.0; }); for (const SharedBuffer& Chunk : Chunks) { CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(Chunk.AsIoBuffer()); @@ -177,37 +239,36 @@ public: ZEN_UNUSED(RawHash); ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero); - JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect); - FinalizeBuildPartResult FinalizeRefResult = - Session.FinalizeBuildPart(m_Namespace, m_Bucket, m_BuildId, m_OplogBuildPartId, RawHash); - AddStats(FinalizeRefResult); + FinalizeResult Result; + Stopwatch Timer; + auto _ = MakeGuard([&Timer, &Result]() { Result.ElapsedSeconds = Timer.GetElapsedTimeUs() * 1000000.0; }); - FinalizeResult Result{ConvertResult(FinalizeRefResult), {FinalizeRefResult.Needs.begin(), FinalizeRefResult.Needs.end()}}; - if (Result.ErrorCode) + try { - Result.Reason = fmt::format("Failed finalizing oplog container build part to {}/{}/{}/{}/{}. Reason: '{}'", - m_JupiterClient->ServiceUrl(), - m_Namespace, - m_Bucket, - m_BuildId, - m_OplogBuildPartId, - Result.Reason); + std::vector<IoHash> Needs = m_BuildStorage->FinalizeBuildPart(m_BuildId, m_OplogBuildPartId, RawHash); + Result.Needs = std::unordered_set<IoHash, IoHash::Hasher>(Needs.begin(), Needs.end()); } - else if (Result.Needs.empty()) + catch (const HttpClientError& Ex) { - JupiterResult FinalizeBuildResult = Session.FinalizeBuild(m_Namespace, m_Bucket, m_BuildId); - AddStats(FinalizeBuildResult); - FinalizeBuildResult.ElapsedSeconds += FinalizeRefResult.ElapsedSeconds; - Result = {ConvertResult(FinalizeBuildResult)}; - if (Result.ErrorCode) - { - Result.Reason = fmt::format("Failed finalizing oplog container build to {}/{}/{}/{}. Reason: '{}'", - m_JupiterClient->ServiceUrl(), - m_Namespace, - m_Bucket, - m_BuildId, - FinalizeBuildResult.Reason); - } + Result.ErrorCode = Ex.m_Error != 0 ? Ex.m_Error : Ex.m_ResponseCode != HttpResponseCode::ImATeapot ? (int)Ex.m_ResponseCode : 0; + Result.Reason = fmt::format("Failed finalizing oplog container build part to {}/{}/{}/{}/{}. Reason: '{}'", + m_Url, + m_Namespace, + m_Bucket, + m_BuildId, + m_OplogBuildPartId, + Ex.what()); + } + catch (const std::exception& Ex) + { + Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); + Result.Reason = fmt::format("Failed finalizing oplog container build part to {}/{}/{}/{}/{}. Reason: '{}'", + m_Url, + m_Namespace, + m_Bucket, + m_BuildId, + m_OplogBuildPartId, + Ex.what()); } return Result; } @@ -216,161 +277,128 @@ public: { ZEN_ASSERT(m_OplogBuildPartId == Oid::Zero); - JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect); - JupiterResult GetBuildResult = Session.GetBuild(m_Namespace, m_Bucket, m_BuildId); - AddStats(GetBuildResult); - LoadContainerResult Result{ConvertResult(GetBuildResult)}; - if (Result.ErrorCode) - { - Result.Reason = fmt::format("Failed fetching oplog container build from {}/{}/{}/{}. Reason: '{}'", - m_JupiterClient->ServiceUrl(), - m_Namespace, - m_Bucket, - m_BuildId, - Result.Reason); - return Result; - } - CbObject BuildObject = LoadCompactBinaryObject(GetBuildResult.Response); - if (!BuildObject) - { - Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); - Result.Reason = fmt::format("The build {}/{}/{}/{} payload is not formatted as a compact binary object"sv, - m_JupiterClient->ServiceUrl(), - m_Namespace, - m_Bucket, - m_BuildId); - return Result; - } - CbObjectView PartsObject = BuildObject["parts"sv].AsObjectView(); - if (!PartsObject) + LoadContainerResult Result; + Stopwatch Timer; + auto _ = MakeGuard([&Timer, &Result]() { Result.ElapsedSeconds = Timer.GetElapsedTimeUs() * 1000000.0; }); + + try { - Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); - Result.Reason = fmt::format("The build {}/{}/{}/{} payload does not contain a 'parts' object"sv, - m_JupiterClient->ServiceUrl(), - m_Namespace, - m_Bucket, - m_BuildId); - return Result; + CbObject BuildObject = m_BuildStorage->GetBuild(m_BuildId); + + CbObjectView PartsObject = BuildObject["parts"sv].AsObjectView(); + if (!PartsObject) + { + throw std::runtime_error(fmt::format("The build {}/{}/{}/{} payload does not contain a 'parts' object"sv, + m_Url, + m_Namespace, + m_Bucket, + m_BuildId)); + } + m_OplogBuildPartId = PartsObject[OplogContainerPartName].AsObjectId(); + if (m_OplogBuildPartId == Oid::Zero) + { + throw std::runtime_error(fmt::format("The build {}/{}/{}/{} payload 'parts' object does not contain a '{}' entry"sv, + m_Url, + m_Namespace, + m_Bucket, + m_BuildId, + OplogContainerPartName)); + } + + Result.ContainerObject = m_BuildStorage->GetBuildPart(m_BuildId, m_OplogBuildPartId); } - m_OplogBuildPartId = PartsObject[OplogContainerPartName].AsObjectId(); - if (m_OplogBuildPartId == Oid::Zero) + catch (const HttpClientError& Ex) { - Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); - Result.Reason = fmt::format("The build {}/{}/{}/{} payload 'parts' object does not contain a '{}' entry"sv, - m_JupiterClient->ServiceUrl(), + Result.ErrorCode = MakeErrorCode(Ex); + Result.Reason = fmt::format("Failed fetching oplog container build part to {}/{}/{}/{}. Reason: '{}'", + m_Url, m_Namespace, m_Bucket, m_BuildId, - OplogContainerPartName); - return Result; + Ex.what()); } - - JupiterResult GetBuildPartResult = Session.GetBuildPart(m_Namespace, m_Bucket, m_BuildId, m_OplogBuildPartId); - AddStats(GetBuildPartResult); - Result = {ConvertResult(GetBuildResult)}; - Result.ElapsedSeconds += GetBuildResult.ElapsedSeconds; - if (Result.ErrorCode) + catch (const std::exception& Ex) { Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); - Result.Reason = fmt::format("Failed fetching oplog build part from {}/{}/{}/{}/{}. Reason: '{}'", - m_JupiterClient->ServiceUrl(), + Result.Reason = fmt::format("Failed fetching oplog container build part to {}/{}/{}/{}. Reason: '{}'", + m_Url, m_Namespace, m_Bucket, m_BuildId, - m_OplogBuildPartId, - Result.Reason); - return Result; + Ex.what()); } - CbObject ContainerObject = LoadCompactBinaryObject(GetBuildPartResult.Response); - if (!ContainerObject) - { - Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); - Result.Reason = fmt::format("The build part for oplog container {}/{}/{}/{}/{} is not formatted as a compact binary object"sv, - m_JupiterClient->ServiceUrl(), - m_Namespace, - m_Bucket, - m_BuildId, - m_OplogBuildPartId); - return Result; - } - Result.ContainerObject = std::move(ContainerObject); return Result; } virtual GetKnownBlocksResult GetKnownBlocks() override { ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero); - JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect); - JupiterResult FindResult = Session.FindBlocks(m_Namespace, m_Bucket, m_BuildId, (uint64_t)-1); - AddStats(FindResult); - GetKnownBlocksResult Result{ConvertResult(FindResult)}; - if (Result.ErrorCode) + + GetKnownBlocksResult Result; + Stopwatch Timer; + auto _ = MakeGuard([&Timer, &Result]() { Result.ElapsedSeconds = Timer.GetElapsedTimeUs() * 1000000.0; }); + + try { - Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); - Result.Reason = fmt::format("Failed listing know blocks for {}/{}/{}/{}. Reason: '{}'", - m_JupiterClient->ServiceUrl(), - m_Namespace, - m_Bucket, - m_BuildId, - Result.Reason); - return Result; + CbObject KnownBlocks = m_BuildStorage->FindBlocks(m_BuildId, 10000u); + std::optional<std::vector<ChunkBlockDescription>> Blocks = ParseChunkBlockDescriptionList(KnownBlocks); + Result.Blocks.reserve(Blocks.value().size()); + for (ChunkBlockDescription& BlockDescription : Blocks.value()) + { + Result.Blocks.push_back(ThinChunkBlockDescription{.BlockHash = BlockDescription.BlockHash, + .ChunkRawHashes = std::move(BlockDescription.ChunkRawHashes)}); + } } - if (ValidateCompactBinary(FindResult.Response.GetView(), CbValidateMode::Default) != CbValidateError::None) + catch (const HttpClientError& Ex) { - Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); - Result.Reason = fmt::format("The block list {}/{}/{} is not formatted as a compact binary object"sv, - m_JupiterClient->ServiceUrl(), - m_Namespace, - m_Bucket, - m_BuildId); - return Result; + Result.ErrorCode = MakeErrorCode(Ex); + Result.Reason = + fmt::format("Failed listing know blocks for {}/{}/{}/{}. Reason: '{}'", m_Url, m_Namespace, m_Bucket, m_BuildId, Ex.what()); } - std::optional<std::vector<ChunkBlockDescription>> Blocks = - ParseChunkBlockDescriptionList(LoadCompactBinaryObject(FindResult.Response)); - if (!Blocks) + catch (const std::exception& Ex) { Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); - Result.Reason = fmt::format("The block list {}/{}/{} is not formatted as a list of blocks"sv, - m_JupiterClient->ServiceUrl(), - m_Namespace, - m_Bucket, - m_BuildId); - return Result; - } - Result.Blocks.reserve(Blocks.value().size()); - for (ChunkBlockDescription& BlockDescription : Blocks.value()) - { - Result.Blocks.push_back(ThinChunkBlockDescription{.BlockHash = BlockDescription.BlockHash, - .ChunkRawHashes = std::move(BlockDescription.ChunkRawHashes)}); + Result.Reason = + fmt::format("Failed listing know blocks for {}/{}/{}/{}. Reason: '{}'", m_Url, m_Namespace, m_Bucket, m_BuildId, Ex.what()); } + return Result; } virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) override { ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero); - JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect); - JupiterResult GetResult = Session.GetBuildBlob(m_Namespace, m_Bucket, m_BuildId, RawHash, m_TempFilePath); - AddStats(GetResult); - LoadAttachmentResult Result{ConvertResult(GetResult), std::move(GetResult.Response)}; - if (GetResult.ErrorCode) + LoadAttachmentResult Result; + Stopwatch Timer; + auto _ = MakeGuard([&Timer, &Result]() { Result.ElapsedSeconds = Timer.GetElapsedTimeUs() * 1000000.0; }); + + try + { + Result.Bytes = m_BuildStorage->GetBuildBlob(m_BuildId, RawHash); + } + catch (const HttpClientError& Ex) + { + Result.ErrorCode = MakeErrorCode(Ex); + Result.Reason = + fmt::format("Failed listing know blocks for {}/{}/{}/{}. Reason: '{}'", m_Url, m_Namespace, m_Bucket, m_BuildId, Ex.what()); + } + catch (const std::exception& Ex) { - Result.Reason = fmt::format("Failed fetching oplog attachment from {}/{}/{}/{}/{}. Reason: '{}'", - m_JupiterClient->ServiceUrl(), - m_Namespace, - m_Bucket, - m_BuildId, - RawHash, - Result.Reason); + Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); + Result.Reason = + fmt::format("Failed listing know blocks for {}/{}/{}/{}. Reason: '{}'", m_Url, m_Namespace, m_Bucket, m_BuildId, Ex.what()); } + return Result; } virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes) override { LoadAttachmentsResult Result; + Stopwatch Timer; + auto _ = MakeGuard([&Timer, &Result]() { Result.ElapsedSeconds = Timer.GetElapsedTimeUs() * 1000000.0; }); for (const IoHash& Hash : RawHashes) { LoadAttachmentResult ChunkResult = LoadAttachment(Hash); @@ -386,81 +414,27 @@ public: } private: - void AddStats(const JupiterResult& Result) + static int MakeErrorCode(const HttpClientError& Ex) { - m_SentBytes.fetch_add(gsl::narrow<uint64_t>(Result.SentBytes)); - m_ReceivedBytes.fetch_add(gsl::narrow<uint64_t>(Result.ReceivedBytes)); - m_RequestTimeNS.fetch_add(static_cast<uint64_t>(Result.ElapsedSeconds * 1000000000)); - SetAtomicMax(m_PeakSentBytes, Result.SentBytes); - SetAtomicMax(m_PeakReceivedBytes, Result.ReceivedBytes); - if (Result.ElapsedSeconds > 0.0) - { - uint64_t BytesPerSec = static_cast<uint64_t>((Result.SentBytes + Result.ReceivedBytes) / Result.ElapsedSeconds); - SetAtomicMax(m_PeakBytesPerSec, BytesPerSec); - } - - m_RequestCount.fetch_add(1); + return Ex.m_Error != 0 ? Ex.m_Error : Ex.m_ResponseCode != HttpResponseCode::ImATeapot ? (int)Ex.m_ResponseCode : 0; } - static Result ConvertResult(const JupiterResult& Response) - { - std::string Text; - int32_t ErrorCode = 0; - if (Response.ErrorCode != 0 || !Response.Success) - { - if (Response.Response) - { - HttpContentType ContentType = Response.Response.GetContentType(); - if (ContentType == ZenContentType::kText || ContentType == ZenContentType::kJSON) - { - ExtendableStringBuilder<256> SB; - SB.Append("\n"); - SB.Append(std::string_view(reinterpret_cast<const std::string::value_type*>(Response.Response.GetData()), - Response.Response.GetSize())); - Text = SB.ToString(); - } - else if (ContentType == ZenContentType::kCbObject) - { - ExtendableStringBuilder<256> SB; - SB.Append("\n"); - CompactBinaryToJson(Response.Response.GetView(), SB); - Text = SB.ToString(); - } - } - } - if (Response.ErrorCode != 0) - { - ErrorCode = Response.ErrorCode; - } - else if (!Response.Success) - { - ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); - } - return {.ErrorCode = ErrorCode, .ElapsedSeconds = Response.ElapsedSeconds, .Reason = Response.Reason, .Text = Text}; - } - - Ref<JupiterClient> m_JupiterClient; - const std::string m_Namespace; - const std::string m_Bucket; - const Oid m_BuildId; - IoBuffer m_MetaData; - Oid m_OplogBuildPartId = Oid::Zero; - std::filesystem::path m_TempFilePath; - const bool m_EnableBlocks = true; - const bool m_UseTempBlocks = true; - const bool m_AllowRedirect = false; - - std::atomic_uint64_t m_SentBytes = {}; - std::atomic_uint64_t m_ReceivedBytes = {}; - std::atomic_uint64_t m_RequestTimeNS = {}; - std::atomic_uint64_t m_RequestCount = {}; - std::atomic_uint64_t m_PeakSentBytes = {}; - std::atomic_uint64_t m_PeakReceivedBytes = {}; - std::atomic_uint64_t m_PeakBytesPerSec = {}; + std::unique_ptr<BuildStorage::Statistics> m_BuildStorageStats; + std::unique_ptr<HttpClient> m_BuildStorageHttp; + std::unique_ptr<BuildStorage> m_BuildStorage; + const std::string m_Url; + const std::string m_Namespace; + const std::string m_Bucket; + const Oid m_BuildId; + IoBuffer m_MetaData; + Oid m_OplogBuildPartId = Oid::Zero; + const bool m_EnableBlocks = true; + const bool m_UseTempBlocks = true; + const bool m_AllowRedirect = false; }; std::shared_ptr<RemoteProjectStore> -CreateBuildsRemoteStore(const BuildsRemoteStoreOptions& Options, const std::filesystem::path& TempFilePath, bool Quiet) +CreateJupiterBuildsRemoteStore(const BuildsRemoteStoreOptions& Options, const std::filesystem::path& TempFilePath, bool Quiet) { std::string Url = Options.Url; if (Url.find("://"sv) == std::string::npos) @@ -468,13 +442,7 @@ CreateBuildsRemoteStore(const BuildsRemoteStoreOptions& Options, const std::file // Assume https URL Url = fmt::format("https://{}"sv, Url); } - JupiterClientOptions ClientOptions{.Name = "Remote store"sv, - .ServiceUrl = Url, - .ConnectTimeout = std::chrono::milliseconds(2000), - .Timeout = std::chrono::milliseconds(1800000), - .AssumeHttp2 = Options.AssumeHttp2, - .AllowResume = true, - .RetryCount = 4}; + // 1) openid-provider if given (assumes oidctoken.exe -Zen true has been run with matching Options.OpenIdProvider // 2) Access token as parameter in request // 3) Environment variable (different win vs linux/mac) @@ -502,16 +470,31 @@ CreateBuildsRemoteStore(const BuildsRemoteStoreOptions& Options, const std::file TokenProvider = httpclientauth::CreateFromDefaultOpenIdProvider(Options.AuthManager); } - Ref<JupiterClient> Client(new JupiterClient(ClientOptions, std::move(TokenProvider))); + HttpClientSettings ClientSettings{.LogCategory = "httpbuildsclient", + .ConnectTimeout = std::chrono::milliseconds(2000), + .Timeout = std::chrono::milliseconds(1800000), + .AccessTokenProvider = std::move(TokenProvider), + .AssumeHttp2 = Options.AssumeHttp2, + .AllowResume = true, + .RetryCount = 4}; + + std::unique_ptr<BuildStorage::Statistics> BuildStorageStats(std::make_unique<BuildStorage::Statistics>()); + + std::unique_ptr<HttpClient> BuildStorageHttp = std::make_unique<HttpClient>(Url, ClientSettings); + + std::unique_ptr<BuildStorage> BuildStorage = + CreateJupiterBuildStorage(Log(), *BuildStorageHttp, *BuildStorageStats, Options.Namespace, Options.Bucket, false, TempFilePath); - std::shared_ptr<RemoteProjectStore> RemoteStore = std::make_shared<BuildsRemoteStore>(std::move(Client), + std::shared_ptr<RemoteProjectStore> RemoteStore = std::make_shared<BuildsRemoteStore>(std::move(BuildStorageStats), + std::move(BuildStorageHttp), + std::move(BuildStorage), + Url, Options.Namespace, Options.Bucket, Options.BuildId, Options.MetaData, Options.ForceDisableBlocks, - Options.ForceDisableTempBlocks, - TempFilePath); + Options.ForceDisableTempBlocks); return RemoteStore; } diff --git a/src/zenserver/projectstore/buildsremoteprojectstore.h b/src/zenserver/projectstore/buildsremoteprojectstore.h index 6f33ac89b..60b6caef7 100644 --- a/src/zenserver/projectstore/buildsremoteprojectstore.h +++ b/src/zenserver/projectstore/buildsremoteprojectstore.h @@ -24,8 +24,8 @@ struct BuildsRemoteStoreOptions : RemoteStoreOptions IoBuffer MetaData; }; -std::shared_ptr<RemoteProjectStore> CreateBuildsRemoteStore(const BuildsRemoteStoreOptions& Options, - const std::filesystem::path& TempFilePath, - bool Quiet); +std::shared_ptr<RemoteProjectStore> CreateJupiterBuildsRemoteStore(const BuildsRemoteStoreOptions& Options, + const std::filesystem::path& TempFilePath, + bool Quiet); } // namespace zen diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp index a8c970323..07fd6e908 100644 --- a/src/zenserver/projectstore/projectstore.cpp +++ b/src/zenserver/projectstore/projectstore.cpp @@ -364,7 +364,7 @@ namespace { ForceDisableTempBlocks, AssumeHttp2, MetaData}; - RemoteStore = CreateBuildsRemoteStore(Options, TempFilePath, /*Quiet*/ false); + RemoteStore = CreateJupiterBuildsRemoteStore(Options, TempFilePath, /*Quiet*/ false); } if (!RemoteStore) @@ -5801,7 +5801,8 @@ ProjectStore::Export(Ref<ProjectStore::Project> Project, ProjectStore::Oplog& Op ZEN_INFO("SaveOplog: Status: {} '{}'", ToString(Response.first), Response.second); if (!IsHttpSuccessCode(Response.first)) { - throw std::runtime_error(Response.second.empty() ? fmt::format("Status: {}", ToString(Response.first)) : Response.second); + throw JobError(Response.second.empty() ? fmt::format("Status: {}", ToString(Response.first)) : Response.second, + (int)Response.first); } }); @@ -5842,7 +5843,8 @@ ProjectStore::Import(ProjectStore::Project& Project, ProjectStore::Oplog& Oplog, ZEN_INFO("LoadOplog: Status: {} '{}'", ToString(Response.first), Response.second); if (!IsHttpSuccessCode(Response.first)) { - throw std::runtime_error(Response.second.empty() ? fmt::format("Status: {}", ToString(Response.first)) : Response.second); + throw JobError(Response.second.empty() ? fmt::format("Status: {}", ToString(Response.first)) : Response.second, + (int)Response.first); } }); |