diff options
| author | Dan Engelbrecht <[email protected]> | 2025-08-12 13:53:58 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-08-12 13:53:58 +0200 |
| commit | 3a9bc3071b9a9452a5aef23c438233fc9e86fb47 (patch) | |
| tree | b0a1d67fe765f2ddc96772db088d781be159d627 /src | |
| parent | add filtering to builds download (#463) (diff) | |
| download | zen-3a9bc3071b9a9452a5aef23c438233fc9e86fb47.tar.xz zen-3a9bc3071b9a9452a5aef23c438233fc9e86fb47.zip | |
use new builds api for oplogs (#464)
- Improvement: Refactored jupiter oplog export code to reuse builds jupiter wrapper classes
- Improvement: If `zen builds`, `zen oplog-import` or `zen oplog-import` command fails due to a http error, the return code for the program will be set to the error/status code
Diffstat (limited to 'src')
| -rw-r--r-- | src/zen/cmds/builds_cmd.cpp | 9 | ||||
| -rw-r--r-- | src/zen/cmds/projectstore_cmd.cpp | 95 | ||||
| -rw-r--r-- | src/zencore/include/zencore/jobqueue.h | 11 | ||||
| -rw-r--r-- | src/zencore/jobqueue.cpp | 22 | ||||
| -rw-r--r-- | src/zenhttp/httpclient.cpp | 7 | ||||
| -rw-r--r-- | src/zenhttp/include/zenhttp/httpclient.h | 23 | ||||
| -rw-r--r-- | src/zenserver/admin/admin.cpp | 1 | ||||
| -rw-r--r-- | src/zenserver/projectstore/buildsremoteprojectstore.cpp | 589 | ||||
| -rw-r--r-- | src/zenserver/projectstore/buildsremoteprojectstore.h | 6 | ||||
| -rw-r--r-- | src/zenserver/projectstore/projectstore.cpp | 8 | ||||
| -rw-r--r-- | src/zenutil/buildstoragecache.cpp | 7 | ||||
| -rw-r--r-- | src/zenutil/filebuildstorage.cpp | 276 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/buildstorage.h | 3 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/buildstoragecache.h | 3 | ||||
| -rw-r--r-- | src/zenutil/jupiter/jupiterbuildstorage.cpp | 76 |
15 files changed, 665 insertions, 471 deletions
diff --git a/src/zen/cmds/builds_cmd.cpp b/src/zen/cmds/builds_cmd.cpp index aa6ae5ea4..cf5e8d9da 100644 --- a/src/zen/cmds/builds_cmd.cpp +++ b/src/zen/cmds/builds_cmd.cpp @@ -10785,9 +10785,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) DiscoveryHttpClient.Get("/api/v1/status/servers", HttpClient::Accept(HttpContentType::kJSON)); if (!ServerInfoResponse.IsSuccess()) { - throw std::runtime_error(fmt::format("Failed to get list of servers from discovery url '{}'. Reason: '{}'", - m_Host, - ServerInfoResponse.ErrorMessage(""))); + ServerInfoResponse.ThrowError(fmt::format("Failed to get list of servers from discovery url '{}'", m_Host)); } std::string_view JsonResponse = ServerInfoResponse.AsText(); @@ -12166,6 +12164,11 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) return 3; } } + catch (const HttpClientError& HttpEx) + { + ZEN_CONSOLE("Operation failed: {}", HttpEx.what()); + return HttpEx.m_Error != 0 ? HttpEx.m_Error : (int)HttpEx.m_ResponseCode; + } catch (const std::exception& Ex) { ZEN_ERROR("{}", Ex.what()); diff --git a/src/zen/cmds/projectstore_cmd.cpp b/src/zen/cmds/projectstore_cmd.cpp index 58af0577e..f919edc87 100644 --- a/src/zen/cmds/projectstore_cmd.cpp +++ b/src/zen/cmds/projectstore_cmd.cpp @@ -149,6 +149,16 @@ namespace { } } + class AsyncJobError : public std::runtime_error + { + public: + using _Mybase = runtime_error; + + AsyncJobError(const std::string& Message, int ReturnCode) : _Mybase(Message), m_ReturnCode(ReturnCode) {} + + const int m_ReturnCode = 0; + }; + void ExecuteAsyncOperation(HttpClient& Http, std::string_view Url, IoBuffer&& Payload, bool PlainProgress) { signal(SIGINT, SignalCallbackHandler); @@ -252,13 +262,14 @@ namespace { if (Status == "Aborted") { std::string_view AbortReason = StatusObject["AbortReason"].AsString(); + int ReturnCode = StatusObject["ReturnCode"].AsInt32(-1); if (!AbortReason.empty()) { - throw std::runtime_error(std::string(AbortReason)); + throw AsyncJobError(std::string(AbortReason), ReturnCode); } else { - throw std::runtime_error("Aborted"); + throw AsyncJobError("Aborted", ReturnCode); } break; } @@ -1335,24 +1346,44 @@ ExportOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg ZEN_CONSOLE("Saving oplog '{}/{}' from '{}' to {}", m_ProjectName, m_OplogName, m_HostName, TargetDescription); - if (m_Async) + try { - if (HttpClient::Response Result = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", m_ProjectName, m_OplogName), - std::move(Payload), - HttpClient::Accept(ZenContentType::kJSON)); - Result) + if (m_Async) { - ZEN_CONSOLE("{}", Result.ToText()); + if (HttpClient::Response Result = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", m_ProjectName, m_OplogName), + std::move(Payload), + HttpClient::Accept(ZenContentType::kJSON)); + Result) + { + ZEN_CONSOLE("{}", Result.ToText()); + } + else + { + Result.ThrowError("failed requesting loading oplog export"sv); + } } else { - Result.ThrowError("failed requesting loading oplog export"sv); - return 1; + ExecuteAsyncOperation(Http, + fmt::format("/prj/{}/oplog/{}/rpc", m_ProjectName, m_OplogName), + std::move(Payload), + m_PlainProgress); } } - else + catch (const HttpClientError& Ex) { - ExecuteAsyncOperation(Http, fmt::format("/prj/{}/oplog/{}/rpc", m_ProjectName, m_OplogName), std::move(Payload), m_PlainProgress); + ZEN_CONSOLE("Oplog export failed: '{}'", Ex.what()); + return Ex.m_Error != 0 ? Ex.m_Error : (int)Ex.m_ResponseCode; + } + catch (const AsyncJobError& Ex) + { + ZEN_CONSOLE("Oplog export failed: '{}'", Ex.what()); + return Ex.m_ReturnCode; + } + catch (const std::exception& Ex) + { + ZEN_CONSOLE("Oplog export failed: '{}'", Ex.what()); + return 1; } return 0; } @@ -1668,24 +1699,44 @@ ImportOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg ZEN_CONSOLE("Loading oplog '{}/{}' from '{}' to {}", m_ProjectName, m_OplogName, SourceDescription, m_HostName); - if (m_Async) + try { - if (HttpClient::Response Result = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", m_ProjectName, m_OplogName), - std::move(Payload), - HttpClient::Accept(ZenContentType::kJSON)); - Result) + if (m_Async) { - ZEN_CONSOLE("{}", Result.ToText()); + if (HttpClient::Response Result = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", m_ProjectName, m_OplogName), + std::move(Payload), + HttpClient::Accept(ZenContentType::kJSON)); + Result) + { + ZEN_CONSOLE("{}", Result.ToText()); + } + else + { + Result.ThrowError("failed requesting loading oplog import"sv); + } } else { - Result.ThrowError("failed requesting loading oplog import"sv); - return 1; + ExecuteAsyncOperation(Http, + fmt::format("/prj/{}/oplog/{}/rpc", m_ProjectName, m_OplogName), + std::move(Payload), + m_PlainProgress); } } - else + catch (const HttpClientError& Ex) + { + ZEN_CONSOLE("Oplog import failed: '{}'", Ex.what()); + return Ex.m_Error != 0 ? Ex.m_Error : (int)Ex.m_ResponseCode; + } + catch (const AsyncJobError& Ex) { - ExecuteAsyncOperation(Http, fmt::format("/prj/{}/oplog/{}/rpc", m_ProjectName, m_OplogName), std::move(Payload), m_PlainProgress); + ZEN_CONSOLE("Oplog export failed: '{}'", Ex.what()); + return Ex.m_ReturnCode; + } + catch (const std::exception& Ex) + { + ZEN_CONSOLE("Oplog import failed: '{}'", Ex.what()); + return 1; } return 0; } diff --git a/src/zencore/include/zencore/jobqueue.h b/src/zencore/include/zencore/jobqueue.h index d5ec6255a..470ed3fc6 100644 --- a/src/zencore/include/zencore/jobqueue.h +++ b/src/zencore/include/zencore/jobqueue.h @@ -28,6 +28,16 @@ public: virtual void ReportProgress(std::string_view CurrentOp, std::string_view Details, ptrdiff_t TotalCount, ptrdiff_t RemainingCount) = 0; }; +class JobError : public std::runtime_error +{ +public: + using _Mybase = runtime_error; + + JobError(const std::string& Message, int ReturnCode) : _Mybase(Message), m_ReturnCode(ReturnCode) {} + + const int m_ReturnCode = 0; +}; + class JobQueue { public: @@ -73,6 +83,7 @@ public: std::chrono::system_clock::time_point StartTime; std::chrono::system_clock::time_point EndTime; int WorkerThreadId; + int ReturnCode; }; // Will only respond once when status is Complete or Aborted diff --git a/src/zencore/jobqueue.cpp b/src/zencore/jobqueue.cpp index b97484458..5d727b69c 100644 --- a/src/zencore/jobqueue.cpp +++ b/src/zencore/jobqueue.cpp @@ -50,6 +50,7 @@ public: JobClock::Tick StartTick; JobClock::Tick EndTick; int WorkerThreadId; + int ReturnCode; virtual bool IsCancelled() const override { return CancelFlag.load(); } virtual void ReportMessage(std::string_view Message) override { Queue->ReportMessage(Id, Message); } @@ -101,6 +102,7 @@ public: NewJob->StartTick = JobClock::Never(); NewJob->EndTick = JobClock::Never(); NewJob->WorkerThreadId = 0; + NewJob->ReturnCode = -1; ZEN_DEBUG("Scheduling background job {}:'{}'", NewJob->Id.Id, NewJob->Name); QueueLock.WithExclusiveLock([&]() { QueuedJobs.emplace_back(std::move(NewJob)); }); @@ -274,7 +276,8 @@ public: .CreateTime = JobClock::TimePointFromTick(Job.CreateTick), .StartTime = JobClock::TimePointFromTick(Job.StartTick), .EndTime = JobClock::TimePointFromTick(Job.EndTick), - .WorkerThreadId = Job.WorkerThreadId}; + .WorkerThreadId = Job.WorkerThreadId, + .ReturnCode = Job.ReturnCode}; }; std::optional<JobDetails> Result; @@ -365,6 +368,7 @@ public: ZEN_DEBUG("Executing background job {}:'{}'", CurrentJob->Id.Id, CurrentJob->Name); CurrentJob->Callback(*CurrentJob); ZEN_DEBUG("Completed background job {}:'{}'", CurrentJob->Id.Id, CurrentJob->Name); + CurrentJob->ReturnCode = 0; QueueLock.WithExclusiveLock([&]() { CurrentJob->EndTick = JobClock::Now(); CurrentJob->WorkerThreadId = 0; @@ -383,6 +387,22 @@ public: AbortedJobs.insert_or_assign(CurrentJob->Id.Id, std::move(CurrentJob)); }); } + catch (const JobError& Ex) + { + ZEN_DEBUG("Background job {}:'{}' failed. Reason: '{}'. Return code {}", + CurrentJob->Id.Id, + CurrentJob->Name, + Ex.what(), + Ex.m_ReturnCode); + QueueLock.WithExclusiveLock([&]() { + CurrentJob->State.AbortReason = Ex.what(); + CurrentJob->EndTick = JobClock::Now(); + CurrentJob->WorkerThreadId = 0; + CurrentJob->ReturnCode = Ex.m_ReturnCode; + RunningJobs.erase(CurrentJob->Id.Id); + AbortedJobs.insert_or_assign(CurrentJob->Id.Id, std::move(CurrentJob)); + }); + } catch (const std::exception& Ex) { ZEN_DEBUG("Background job {}:'{}' aborted. Reason: '{}'", CurrentJob->Id.Id, CurrentJob->Name, Ex.what()); diff --git a/src/zenhttp/httpclient.cpp b/src/zenhttp/httpclient.cpp index a2d323b5e..30a2bfc65 100644 --- a/src/zenhttp/httpclient.cpp +++ b/src/zenhttp/httpclient.cpp @@ -1628,12 +1628,13 @@ HttpClient::Response::ErrorMessage(std::string_view Prefix) const } else if (StatusCode != HttpResponseCode::ImATeapot && (int)StatusCode) { - return fmt::format("{}{}HTTP error {} {} ({})", + std::string TextResponse = ToText(); + return fmt::format("{}{}HTTP error {} {}{}", Prefix, Prefix.empty() ? ""sv : ": "sv, (int)StatusCode, zen::ToString(StatusCode), - ToText()); + TextResponse.empty() ? ""sv : fmt::format(" ({})", TextResponse)); } else { @@ -1646,7 +1647,7 @@ HttpClient::Response::ThrowError(std::string_view ErrorPrefix) { if (!IsSuccess()) { - throw std::runtime_error(ErrorMessage(ErrorPrefix)); + throw HttpClientError(ErrorMessage(ErrorPrefix), Error.has_value() ? Error.value().ErrorCode : 0, StatusCode); } } diff --git a/src/zenhttp/include/zenhttp/httpclient.h b/src/zenhttp/include/zenhttp/httpclient.h index c991a71ea..50bd5b53a 100644 --- a/src/zenhttp/include/zenhttp/httpclient.h +++ b/src/zenhttp/include/zenhttp/httpclient.h @@ -57,6 +57,29 @@ struct HttpClientSettings uint8_t RetryCount = 0; }; +class HttpClientError : public std::runtime_error +{ +public: + using _Mybase = runtime_error; + + HttpClientError(const std::string& Message, int Error, HttpResponseCode ResponseCode) + : _Mybase(Message) + , m_Error(Error) + , m_ResponseCode(ResponseCode) + { + } + + HttpClientError(const char* Message, int Error, HttpResponseCode ResponseCode) + : _Mybase(Message) + , m_Error(Error) + , m_ResponseCode(ResponseCode) + { + } + + const int m_Error = 0; + const HttpResponseCode m_ResponseCode = HttpResponseCode::ImATeapot; +}; + class HttpClient { public: diff --git a/src/zenserver/admin/admin.cpp b/src/zenserver/admin/admin.cpp index 73166e608..9d982678a 100644 --- a/src/zenserver/admin/admin.cpp +++ b/src/zenserver/admin/admin.cpp @@ -247,6 +247,7 @@ HttpAdminService::HttpAdminService(GcScheduler& Scheduler, Obj.AddFloat("QueueTimeS", GetAgeAsSeconds(CurrentState->CreateTime, CurrentState->StartTime)); Obj.AddFloat("RunTimeS", GetAgeAsSeconds(CurrentState->StartTime, CurrentState->EndTime)); Obj.AddFloat("CompleteTimeS", GetAgeAsSeconds(CurrentState->EndTime, Now)); + Obj.AddInteger("ReturnCode", CurrentState->ReturnCode); Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Obj.Save()); } break; diff --git a/src/zenserver/projectstore/buildsremoteprojectstore.cpp b/src/zenserver/projectstore/buildsremoteprojectstore.cpp index eab687db2..c9a01e56a 100644 --- a/src/zenserver/projectstore/buildsremoteprojectstore.cpp +++ b/src/zenserver/projectstore/buildsremoteprojectstore.cpp @@ -3,14 +3,12 @@ #include "buildsremoteprojectstore.h" #include <zencore/compactbinarybuilder.h> -#include <zencore/compactbinaryvalidation.h> #include <zencore/compress.h> #include <zencore/fmtutils.h> +#include <zencore/scopeguard.h> #include <zenhttp/httpclientauth.h> - -#include <zenutil/jupiter/jupiterclient.h> -#include <zenutil/jupiter/jupitersession.h> +#include <zenutil/jupiter/jupiterbuildstorage.h> namespace zen { @@ -21,20 +19,24 @@ static const std::string_view OplogContainerPartName = "oplogcontainer"sv; class BuildsRemoteStore : public RemoteProjectStore { public: - BuildsRemoteStore(Ref<JupiterClient>&& InJupiterClient, - std::string_view Namespace, - std::string_view Bucket, - const Oid& BuildId, - const IoBuffer& MetaData, - bool ForceDisableBlocks, - bool ForceDisableTempBlocks, - const std::filesystem::path& TempFilePath) - : m_JupiterClient(std::move(InJupiterClient)) + BuildsRemoteStore(std::unique_ptr<BuildStorage::Statistics>&& BuildStorageStats, + std::unique_ptr<HttpClient>&& BuildStorageHttp, + std::unique_ptr<BuildStorage>&& BuildStorage, + std::string_view Url, + std::string_view Namespace, + std::string_view Bucket, + const Oid& BuildId, + const IoBuffer& MetaData, + bool ForceDisableBlocks, + bool ForceDisableTempBlocks) + : m_BuildStorageStats(std::move(BuildStorageStats)) + , m_BuildStorageHttp(std::move(BuildStorageHttp)) + , m_BuildStorage(std::move(BuildStorage)) + , m_Url(Url) , m_Namespace(Namespace) , m_Bucket(Bucket) , m_BuildId(BuildId) , m_MetaData(MetaData) - , m_TempFilePath(TempFilePath) , m_EnableBlocks(!ForceDisableBlocks) , m_UseTempBlocks(!ForceDisableTempBlocks) { @@ -47,63 +49,91 @@ public: .UseTempBlockFiles = m_UseTempBlocks, .AllowChunking = true, .ContainerName = fmt::format("{}/{}/{}", m_Namespace, m_Bucket, m_BuildId), - .Description = fmt::format("[cloud] {} as {}/{}/{}"sv, m_JupiterClient->ServiceUrl(), m_Namespace, m_Bucket, m_BuildId)}; + .Description = fmt::format("[cloud] {} as {}/{}/{}"sv, m_Url, m_Namespace, m_Bucket, m_BuildId)}; } virtual Stats GetStats() const override { - return {.m_SentBytes = m_SentBytes.load(), - .m_ReceivedBytes = m_ReceivedBytes.load(), - .m_RequestTimeNS = m_RequestTimeNS.load(), - .m_RequestCount = m_RequestCount.load(), - .m_PeakSentBytes = m_PeakSentBytes.load(), - .m_PeakReceivedBytes = m_PeakReceivedBytes.load(), - .m_PeakBytesPerSec = m_PeakBytesPerSec.load()}; + return { + .m_SentBytes = m_BuildStorageStats->TotalBytesWritten.load(), + .m_ReceivedBytes = m_BuildStorageStats->TotalBytesRead.load(), + .m_RequestTimeNS = m_BuildStorageStats->TotalRequestTimeUs.load() * 1000, + .m_RequestCount = m_BuildStorageStats->TotalRequestCount.load(), + .m_PeakSentBytes = m_BuildStorageStats->PeakSentBytes.load(), + .m_PeakReceivedBytes = m_BuildStorageStats->PeakReceivedBytes.load(), + .m_PeakBytesPerSec = m_BuildStorageStats->PeakBytesPerSec.load(), + }; } virtual CreateContainerResult CreateContainer() override { ZEN_ASSERT(m_OplogBuildPartId == Oid::Zero); - JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect); - - IoBuffer Payload = m_MetaData; - Payload.SetContentType(ZenContentType::kCbObject); - JupiterResult PutResult = Session.PutBuild(m_Namespace, m_Bucket, m_BuildId, Payload); - AddStats(PutResult); + CreateContainerResult Result; + Stopwatch Timer; + auto _ = MakeGuard([&Timer, &Result]() { Result.ElapsedSeconds = Timer.GetElapsedTimeUs() * 1000000.0; }); - CreateContainerResult Result{ConvertResult(PutResult)}; - if (Result.ErrorCode) + CbObject Payload = LoadCompactBinaryObject(m_MetaData); + try + { + CbObject PutBuildResult = m_BuildStorage->PutBuild(m_BuildId, Payload); + ZEN_UNUSED(PutBuildResult); + m_OplogBuildPartId = Oid::NewOid(); + } + catch (const HttpClientError& Ex) { - Result.Reason = fmt::format("Failed creating oplog build to {}/{}/{}/{}. Reason: '{}'", - m_JupiterClient->ServiceUrl(), - m_Namespace, - m_Bucket, - m_BuildId, - Result.Reason); + Result.ErrorCode = MakeErrorCode(Ex); + Result.Reason = + fmt::format("Failed creating oplog build to {}/{}/{}/{}. Reason: '{}'", m_Url, m_Namespace, m_Bucket, m_BuildId, Ex.what()); } - m_OplogBuildPartId = Oid::NewOid(); + catch (const std::exception& Ex) + { + Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); + Result.Reason = + fmt::format("Failed creating oplog build to {}/{}/{}/{}. Reason: '{}'", m_Url, m_Namespace, m_Bucket, m_BuildId, Ex.what()); + } + return Result; } virtual SaveResult SaveContainer(const IoBuffer& Payload) override { ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero); - JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect); - PutBuildPartResult PutResult = - Session.PutBuildPart(m_Namespace, m_Bucket, m_BuildId, m_OplogBuildPartId, OplogContainerPartName, Payload); - AddStats(PutResult); - SaveResult Result{ConvertResult(PutResult), {PutResult.Needs.begin(), PutResult.Needs.end()}, PutResult.RawHash}; - if (Result.ErrorCode) + SaveResult Result; + Stopwatch Timer; + auto _ = MakeGuard([&Timer, &Result]() { Result.ElapsedSeconds = Timer.GetElapsedTimeUs() * 1000000.0; }); + + try + { + CbObject ObjectPayload = LoadCompactBinaryObject(Payload); + + std::pair<IoHash, std::vector<IoHash>> PutBuildPartResult = + m_BuildStorage->PutBuildPart(m_BuildId, m_OplogBuildPartId, OplogContainerPartName, ObjectPayload); + Result.RawHash = PutBuildPartResult.first; + Result.Needs = std::unordered_set<IoHash, IoHash::Hasher>(PutBuildPartResult.second.begin(), PutBuildPartResult.second.end()); + } + catch (const HttpClientError& Ex) { - Result.Reason = fmt::format("Failed saving oplog container build part to {}/{}/{}/{}/{}. Reason: '{}'", - m_JupiterClient->ServiceUrl(), - m_Namespace, - m_Bucket, - m_BuildId, - m_OplogBuildPartId, - Result.Reason); + Result.ErrorCode = MakeErrorCode(Ex); + Result.Reason = fmt::format("Failed saving oplog container build part to {}/{}/{}/{}/{}. Reason: '{}'", + m_Url, + m_Namespace, + m_Bucket, + m_BuildId, + m_OplogBuildPartId, + Ex.what()); + } + catch (const std::exception& Ex) + { + Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); + Result.Reason = fmt::format("Failed saving oplog container build part to {}/{}/{}/{}/{}. Reason: '{}'", + m_Url, + m_Namespace, + m_Bucket, + m_BuildId, + m_OplogBuildPartId, + Ex.what()); } return Result; @@ -114,52 +144,84 @@ public: ChunkBlockDescription&& Block) override { ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero); - JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect); - JupiterResult PutResult = - Session.PutBuildBlob(m_Namespace, m_Bucket, m_BuildId, RawHash, ZenContentType::kCompressedBinary, Payload); - AddStats(PutResult); + SaveAttachmentResult Result; + Stopwatch Timer; + auto _ = MakeGuard([&Timer, &Result]() { Result.ElapsedSeconds = Timer.GetElapsedTimeUs() * 1000000.0; }); - SaveAttachmentResult Result{ConvertResult(PutResult)}; - if (Result.ErrorCode) + try { - Result.Reason = fmt::format("Failed saving oplog attachment to {}/{}/{}/{}/{}. Reason: '{}'", - m_JupiterClient->ServiceUrl(), - m_Namespace, - m_Bucket, - m_BuildId, - RawHash, - Result.Reason); - return Result; - } + m_BuildStorage->PutBuildBlob(m_BuildId, RawHash, ZenContentType::kCompressedBinary, Payload); - if (Block.BlockHash == RawHash) - { - CbObjectWriter BlockMetaData; - BlockMetaData.AddString("createdBy", GetRunningExecutablePath().stem().string()); - - IoBuffer MetaPayload = BuildChunkBlockDescription(Block, BlockMetaData.Save()).GetBuffer().AsIoBuffer(); - MetaPayload.SetContentType(ZenContentType::kCbObject); - JupiterResult PutMetaResult = Session.PutBlockMetadata(m_Namespace, m_Bucket, m_BuildId, RawHash, MetaPayload); - AddStats(PutMetaResult); - RemoteProjectStore::Result MetaDataResult = ConvertResult(PutMetaResult); - if (MetaDataResult.ErrorCode) + if (Block.BlockHash == RawHash) { - ZEN_WARN("Failed saving block attachment meta data to {}/{}/{}/{}/{}. Reason: '{}'", - m_JupiterClient->ServiceUrl(), - m_Namespace, - m_Bucket, - m_BuildId, - RawHash, - MetaDataResult.Reason); + try + { + CbObjectWriter BlockMetaData; + BlockMetaData.AddString("createdBy", GetRunningExecutablePath().stem().string()); + CbObject MetaPayload = BuildChunkBlockDescription(Block, BlockMetaData.Save()); + if (!m_BuildStorage->PutBlockMetadata(m_BuildId, RawHash, MetaPayload)) + { + ZEN_WARN("Failed saving block attachment meta data to {}/{}/{}/{}/{}. Reason: '{}'", + m_Url, + m_Namespace, + m_Bucket, + m_BuildId, + RawHash, + "not found"); + } + } + catch (const HttpClientError& Ex) + { + Result.ErrorCode = MakeErrorCode(Ex); + Result.Reason = fmt::format("Failed saving block attachment meta data to {}/{}/{}/{}. Reason: '{}'", + m_Url, + m_Namespace, + m_Bucket, + m_BuildId, + Ex.what()); + } + catch (const std::exception& Ex) + { + Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); + Result.Reason = fmt::format("Failed saving block attachment meta data to {}/{}/{}/{}. Reason: '{}'", + m_Url, + m_Namespace, + m_Bucket, + m_BuildId, + Ex.what()); + } } } + catch (const HttpClientError& Ex) + { + Result.ErrorCode = MakeErrorCode(Ex); + Result.Reason = fmt::format("Failed saving oplog attachment to {}/{}/{}/{}. Reason: '{}'", + m_Url, + m_Namespace, + m_Bucket, + m_BuildId, + Ex.what()); + } + catch (const std::exception& Ex) + { + Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); + Result.Reason = fmt::format("Failed saving oplog attachment to {}/{}/{}/{}. Reason: '{}'", + m_Url, + m_Namespace, + m_Bucket, + m_BuildId, + Ex.what()); + } + return Result; } virtual SaveAttachmentsResult SaveAttachments(const std::vector<SharedBuffer>& Chunks) override { SaveAttachmentsResult Result; + Stopwatch Timer; + auto _ = MakeGuard([&Timer, &Result]() { Result.ElapsedSeconds = Timer.GetElapsedTimeUs() * 1000000.0; }); for (const SharedBuffer& Chunk : Chunks) { CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(Chunk.AsIoBuffer()); @@ -177,37 +239,36 @@ public: ZEN_UNUSED(RawHash); ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero); - JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect); - FinalizeBuildPartResult FinalizeRefResult = - Session.FinalizeBuildPart(m_Namespace, m_Bucket, m_BuildId, m_OplogBuildPartId, RawHash); - AddStats(FinalizeRefResult); + FinalizeResult Result; + Stopwatch Timer; + auto _ = MakeGuard([&Timer, &Result]() { Result.ElapsedSeconds = Timer.GetElapsedTimeUs() * 1000000.0; }); - FinalizeResult Result{ConvertResult(FinalizeRefResult), {FinalizeRefResult.Needs.begin(), FinalizeRefResult.Needs.end()}}; - if (Result.ErrorCode) + try { - Result.Reason = fmt::format("Failed finalizing oplog container build part to {}/{}/{}/{}/{}. Reason: '{}'", - m_JupiterClient->ServiceUrl(), - m_Namespace, - m_Bucket, - m_BuildId, - m_OplogBuildPartId, - Result.Reason); + std::vector<IoHash> Needs = m_BuildStorage->FinalizeBuildPart(m_BuildId, m_OplogBuildPartId, RawHash); + Result.Needs = std::unordered_set<IoHash, IoHash::Hasher>(Needs.begin(), Needs.end()); } - else if (Result.Needs.empty()) + catch (const HttpClientError& Ex) { - JupiterResult FinalizeBuildResult = Session.FinalizeBuild(m_Namespace, m_Bucket, m_BuildId); - AddStats(FinalizeBuildResult); - FinalizeBuildResult.ElapsedSeconds += FinalizeRefResult.ElapsedSeconds; - Result = {ConvertResult(FinalizeBuildResult)}; - if (Result.ErrorCode) - { - Result.Reason = fmt::format("Failed finalizing oplog container build to {}/{}/{}/{}. Reason: '{}'", - m_JupiterClient->ServiceUrl(), - m_Namespace, - m_Bucket, - m_BuildId, - FinalizeBuildResult.Reason); - } + Result.ErrorCode = Ex.m_Error != 0 ? Ex.m_Error : Ex.m_ResponseCode != HttpResponseCode::ImATeapot ? (int)Ex.m_ResponseCode : 0; + Result.Reason = fmt::format("Failed finalizing oplog container build part to {}/{}/{}/{}/{}. Reason: '{}'", + m_Url, + m_Namespace, + m_Bucket, + m_BuildId, + m_OplogBuildPartId, + Ex.what()); + } + catch (const std::exception& Ex) + { + Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); + Result.Reason = fmt::format("Failed finalizing oplog container build part to {}/{}/{}/{}/{}. Reason: '{}'", + m_Url, + m_Namespace, + m_Bucket, + m_BuildId, + m_OplogBuildPartId, + Ex.what()); } return Result; } @@ -216,161 +277,128 @@ public: { ZEN_ASSERT(m_OplogBuildPartId == Oid::Zero); - JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect); - JupiterResult GetBuildResult = Session.GetBuild(m_Namespace, m_Bucket, m_BuildId); - AddStats(GetBuildResult); - LoadContainerResult Result{ConvertResult(GetBuildResult)}; - if (Result.ErrorCode) - { - Result.Reason = fmt::format("Failed fetching oplog container build from {}/{}/{}/{}. Reason: '{}'", - m_JupiterClient->ServiceUrl(), - m_Namespace, - m_Bucket, - m_BuildId, - Result.Reason); - return Result; - } - CbObject BuildObject = LoadCompactBinaryObject(GetBuildResult.Response); - if (!BuildObject) - { - Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); - Result.Reason = fmt::format("The build {}/{}/{}/{} payload is not formatted as a compact binary object"sv, - m_JupiterClient->ServiceUrl(), - m_Namespace, - m_Bucket, - m_BuildId); - return Result; - } - CbObjectView PartsObject = BuildObject["parts"sv].AsObjectView(); - if (!PartsObject) + LoadContainerResult Result; + Stopwatch Timer; + auto _ = MakeGuard([&Timer, &Result]() { Result.ElapsedSeconds = Timer.GetElapsedTimeUs() * 1000000.0; }); + + try { - Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); - Result.Reason = fmt::format("The build {}/{}/{}/{} payload does not contain a 'parts' object"sv, - m_JupiterClient->ServiceUrl(), - m_Namespace, - m_Bucket, - m_BuildId); - return Result; + CbObject BuildObject = m_BuildStorage->GetBuild(m_BuildId); + + CbObjectView PartsObject = BuildObject["parts"sv].AsObjectView(); + if (!PartsObject) + { + throw std::runtime_error(fmt::format("The build {}/{}/{}/{} payload does not contain a 'parts' object"sv, + m_Url, + m_Namespace, + m_Bucket, + m_BuildId)); + } + m_OplogBuildPartId = PartsObject[OplogContainerPartName].AsObjectId(); + if (m_OplogBuildPartId == Oid::Zero) + { + throw std::runtime_error(fmt::format("The build {}/{}/{}/{} payload 'parts' object does not contain a '{}' entry"sv, + m_Url, + m_Namespace, + m_Bucket, + m_BuildId, + OplogContainerPartName)); + } + + Result.ContainerObject = m_BuildStorage->GetBuildPart(m_BuildId, m_OplogBuildPartId); } - m_OplogBuildPartId = PartsObject[OplogContainerPartName].AsObjectId(); - if (m_OplogBuildPartId == Oid::Zero) + catch (const HttpClientError& Ex) { - Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); - Result.Reason = fmt::format("The build {}/{}/{}/{} payload 'parts' object does not contain a '{}' entry"sv, - m_JupiterClient->ServiceUrl(), + Result.ErrorCode = MakeErrorCode(Ex); + Result.Reason = fmt::format("Failed fetching oplog container build part to {}/{}/{}/{}. Reason: '{}'", + m_Url, m_Namespace, m_Bucket, m_BuildId, - OplogContainerPartName); - return Result; + Ex.what()); } - - JupiterResult GetBuildPartResult = Session.GetBuildPart(m_Namespace, m_Bucket, m_BuildId, m_OplogBuildPartId); - AddStats(GetBuildPartResult); - Result = {ConvertResult(GetBuildResult)}; - Result.ElapsedSeconds += GetBuildResult.ElapsedSeconds; - if (Result.ErrorCode) + catch (const std::exception& Ex) { Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); - Result.Reason = fmt::format("Failed fetching oplog build part from {}/{}/{}/{}/{}. Reason: '{}'", - m_JupiterClient->ServiceUrl(), + Result.Reason = fmt::format("Failed fetching oplog container build part to {}/{}/{}/{}. Reason: '{}'", + m_Url, m_Namespace, m_Bucket, m_BuildId, - m_OplogBuildPartId, - Result.Reason); - return Result; + Ex.what()); } - CbObject ContainerObject = LoadCompactBinaryObject(GetBuildPartResult.Response); - if (!ContainerObject) - { - Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); - Result.Reason = fmt::format("The build part for oplog container {}/{}/{}/{}/{} is not formatted as a compact binary object"sv, - m_JupiterClient->ServiceUrl(), - m_Namespace, - m_Bucket, - m_BuildId, - m_OplogBuildPartId); - return Result; - } - Result.ContainerObject = std::move(ContainerObject); return Result; } virtual GetKnownBlocksResult GetKnownBlocks() override { ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero); - JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect); - JupiterResult FindResult = Session.FindBlocks(m_Namespace, m_Bucket, m_BuildId, (uint64_t)-1); - AddStats(FindResult); - GetKnownBlocksResult Result{ConvertResult(FindResult)}; - if (Result.ErrorCode) + + GetKnownBlocksResult Result; + Stopwatch Timer; + auto _ = MakeGuard([&Timer, &Result]() { Result.ElapsedSeconds = Timer.GetElapsedTimeUs() * 1000000.0; }); + + try { - Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); - Result.Reason = fmt::format("Failed listing know blocks for {}/{}/{}/{}. Reason: '{}'", - m_JupiterClient->ServiceUrl(), - m_Namespace, - m_Bucket, - m_BuildId, - Result.Reason); - return Result; + CbObject KnownBlocks = m_BuildStorage->FindBlocks(m_BuildId, 10000u); + std::optional<std::vector<ChunkBlockDescription>> Blocks = ParseChunkBlockDescriptionList(KnownBlocks); + Result.Blocks.reserve(Blocks.value().size()); + for (ChunkBlockDescription& BlockDescription : Blocks.value()) + { + Result.Blocks.push_back(ThinChunkBlockDescription{.BlockHash = BlockDescription.BlockHash, + .ChunkRawHashes = std::move(BlockDescription.ChunkRawHashes)}); + } } - if (ValidateCompactBinary(FindResult.Response.GetView(), CbValidateMode::Default) != CbValidateError::None) + catch (const HttpClientError& Ex) { - Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); - Result.Reason = fmt::format("The block list {}/{}/{} is not formatted as a compact binary object"sv, - m_JupiterClient->ServiceUrl(), - m_Namespace, - m_Bucket, - m_BuildId); - return Result; + Result.ErrorCode = MakeErrorCode(Ex); + Result.Reason = + fmt::format("Failed listing know blocks for {}/{}/{}/{}. Reason: '{}'", m_Url, m_Namespace, m_Bucket, m_BuildId, Ex.what()); } - std::optional<std::vector<ChunkBlockDescription>> Blocks = - ParseChunkBlockDescriptionList(LoadCompactBinaryObject(FindResult.Response)); - if (!Blocks) + catch (const std::exception& Ex) { Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); - Result.Reason = fmt::format("The block list {}/{}/{} is not formatted as a list of blocks"sv, - m_JupiterClient->ServiceUrl(), - m_Namespace, - m_Bucket, - m_BuildId); - return Result; - } - Result.Blocks.reserve(Blocks.value().size()); - for (ChunkBlockDescription& BlockDescription : Blocks.value()) - { - Result.Blocks.push_back(ThinChunkBlockDescription{.BlockHash = BlockDescription.BlockHash, - .ChunkRawHashes = std::move(BlockDescription.ChunkRawHashes)}); + Result.Reason = + fmt::format("Failed listing know blocks for {}/{}/{}/{}. Reason: '{}'", m_Url, m_Namespace, m_Bucket, m_BuildId, Ex.what()); } + return Result; } virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) override { ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero); - JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect); - JupiterResult GetResult = Session.GetBuildBlob(m_Namespace, m_Bucket, m_BuildId, RawHash, m_TempFilePath); - AddStats(GetResult); - LoadAttachmentResult Result{ConvertResult(GetResult), std::move(GetResult.Response)}; - if (GetResult.ErrorCode) + LoadAttachmentResult Result; + Stopwatch Timer; + auto _ = MakeGuard([&Timer, &Result]() { Result.ElapsedSeconds = Timer.GetElapsedTimeUs() * 1000000.0; }); + + try + { + Result.Bytes = m_BuildStorage->GetBuildBlob(m_BuildId, RawHash); + } + catch (const HttpClientError& Ex) + { + Result.ErrorCode = MakeErrorCode(Ex); + Result.Reason = + fmt::format("Failed listing know blocks for {}/{}/{}/{}. Reason: '{}'", m_Url, m_Namespace, m_Bucket, m_BuildId, Ex.what()); + } + catch (const std::exception& Ex) { - Result.Reason = fmt::format("Failed fetching oplog attachment from {}/{}/{}/{}/{}. Reason: '{}'", - m_JupiterClient->ServiceUrl(), - m_Namespace, - m_Bucket, - m_BuildId, - RawHash, - Result.Reason); + Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); + Result.Reason = + fmt::format("Failed listing know blocks for {}/{}/{}/{}. Reason: '{}'", m_Url, m_Namespace, m_Bucket, m_BuildId, Ex.what()); } + return Result; } virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes) override { LoadAttachmentsResult Result; + Stopwatch Timer; + auto _ = MakeGuard([&Timer, &Result]() { Result.ElapsedSeconds = Timer.GetElapsedTimeUs() * 1000000.0; }); for (const IoHash& Hash : RawHashes) { LoadAttachmentResult ChunkResult = LoadAttachment(Hash); @@ -386,81 +414,27 @@ public: } private: - void AddStats(const JupiterResult& Result) + static int MakeErrorCode(const HttpClientError& Ex) { - m_SentBytes.fetch_add(gsl::narrow<uint64_t>(Result.SentBytes)); - m_ReceivedBytes.fetch_add(gsl::narrow<uint64_t>(Result.ReceivedBytes)); - m_RequestTimeNS.fetch_add(static_cast<uint64_t>(Result.ElapsedSeconds * 1000000000)); - SetAtomicMax(m_PeakSentBytes, Result.SentBytes); - SetAtomicMax(m_PeakReceivedBytes, Result.ReceivedBytes); - if (Result.ElapsedSeconds > 0.0) - { - uint64_t BytesPerSec = static_cast<uint64_t>((Result.SentBytes + Result.ReceivedBytes) / Result.ElapsedSeconds); - SetAtomicMax(m_PeakBytesPerSec, BytesPerSec); - } - - m_RequestCount.fetch_add(1); + return Ex.m_Error != 0 ? Ex.m_Error : Ex.m_ResponseCode != HttpResponseCode::ImATeapot ? (int)Ex.m_ResponseCode : 0; } - static Result ConvertResult(const JupiterResult& Response) - { - std::string Text; - int32_t ErrorCode = 0; - if (Response.ErrorCode != 0 || !Response.Success) - { - if (Response.Response) - { - HttpContentType ContentType = Response.Response.GetContentType(); - if (ContentType == ZenContentType::kText || ContentType == ZenContentType::kJSON) - { - ExtendableStringBuilder<256> SB; - SB.Append("\n"); - SB.Append(std::string_view(reinterpret_cast<const std::string::value_type*>(Response.Response.GetData()), - Response.Response.GetSize())); - Text = SB.ToString(); - } - else if (ContentType == ZenContentType::kCbObject) - { - ExtendableStringBuilder<256> SB; - SB.Append("\n"); - CompactBinaryToJson(Response.Response.GetView(), SB); - Text = SB.ToString(); - } - } - } - if (Response.ErrorCode != 0) - { - ErrorCode = Response.ErrorCode; - } - else if (!Response.Success) - { - ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); - } - return {.ErrorCode = ErrorCode, .ElapsedSeconds = Response.ElapsedSeconds, .Reason = Response.Reason, .Text = Text}; - } - - Ref<JupiterClient> m_JupiterClient; - const std::string m_Namespace; - const std::string m_Bucket; - const Oid m_BuildId; - IoBuffer m_MetaData; - Oid m_OplogBuildPartId = Oid::Zero; - std::filesystem::path m_TempFilePath; - const bool m_EnableBlocks = true; - const bool m_UseTempBlocks = true; - const bool m_AllowRedirect = false; - - std::atomic_uint64_t m_SentBytes = {}; - std::atomic_uint64_t m_ReceivedBytes = {}; - std::atomic_uint64_t m_RequestTimeNS = {}; - std::atomic_uint64_t m_RequestCount = {}; - std::atomic_uint64_t m_PeakSentBytes = {}; - std::atomic_uint64_t m_PeakReceivedBytes = {}; - std::atomic_uint64_t m_PeakBytesPerSec = {}; + std::unique_ptr<BuildStorage::Statistics> m_BuildStorageStats; + std::unique_ptr<HttpClient> m_BuildStorageHttp; + std::unique_ptr<BuildStorage> m_BuildStorage; + const std::string m_Url; + const std::string m_Namespace; + const std::string m_Bucket; + const Oid m_BuildId; + IoBuffer m_MetaData; + Oid m_OplogBuildPartId = Oid::Zero; + const bool m_EnableBlocks = true; + const bool m_UseTempBlocks = true; + const bool m_AllowRedirect = false; }; std::shared_ptr<RemoteProjectStore> -CreateBuildsRemoteStore(const BuildsRemoteStoreOptions& Options, const std::filesystem::path& TempFilePath, bool Quiet) +CreateJupiterBuildsRemoteStore(const BuildsRemoteStoreOptions& Options, const std::filesystem::path& TempFilePath, bool Quiet) { std::string Url = Options.Url; if (Url.find("://"sv) == std::string::npos) @@ -468,13 +442,7 @@ CreateBuildsRemoteStore(const BuildsRemoteStoreOptions& Options, const std::file // Assume https URL Url = fmt::format("https://{}"sv, Url); } - JupiterClientOptions ClientOptions{.Name = "Remote store"sv, - .ServiceUrl = Url, - .ConnectTimeout = std::chrono::milliseconds(2000), - .Timeout = std::chrono::milliseconds(1800000), - .AssumeHttp2 = Options.AssumeHttp2, - .AllowResume = true, - .RetryCount = 4}; + // 1) openid-provider if given (assumes oidctoken.exe -Zen true has been run with matching Options.OpenIdProvider // 2) Access token as parameter in request // 3) Environment variable (different win vs linux/mac) @@ -502,16 +470,31 @@ CreateBuildsRemoteStore(const BuildsRemoteStoreOptions& Options, const std::file TokenProvider = httpclientauth::CreateFromDefaultOpenIdProvider(Options.AuthManager); } - Ref<JupiterClient> Client(new JupiterClient(ClientOptions, std::move(TokenProvider))); + HttpClientSettings ClientSettings{.LogCategory = "httpbuildsclient", + .ConnectTimeout = std::chrono::milliseconds(2000), + .Timeout = std::chrono::milliseconds(1800000), + .AccessTokenProvider = std::move(TokenProvider), + .AssumeHttp2 = Options.AssumeHttp2, + .AllowResume = true, + .RetryCount = 4}; + + std::unique_ptr<BuildStorage::Statistics> BuildStorageStats(std::make_unique<BuildStorage::Statistics>()); + + std::unique_ptr<HttpClient> BuildStorageHttp = std::make_unique<HttpClient>(Url, ClientSettings); + + std::unique_ptr<BuildStorage> BuildStorage = + CreateJupiterBuildStorage(Log(), *BuildStorageHttp, *BuildStorageStats, Options.Namespace, Options.Bucket, false, TempFilePath); - std::shared_ptr<RemoteProjectStore> RemoteStore = std::make_shared<BuildsRemoteStore>(std::move(Client), + std::shared_ptr<RemoteProjectStore> RemoteStore = std::make_shared<BuildsRemoteStore>(std::move(BuildStorageStats), + std::move(BuildStorageHttp), + std::move(BuildStorage), + Url, Options.Namespace, Options.Bucket, Options.BuildId, Options.MetaData, Options.ForceDisableBlocks, - Options.ForceDisableTempBlocks, - TempFilePath); + Options.ForceDisableTempBlocks); return RemoteStore; } diff --git a/src/zenserver/projectstore/buildsremoteprojectstore.h b/src/zenserver/projectstore/buildsremoteprojectstore.h index 6f33ac89b..60b6caef7 100644 --- a/src/zenserver/projectstore/buildsremoteprojectstore.h +++ b/src/zenserver/projectstore/buildsremoteprojectstore.h @@ -24,8 +24,8 @@ struct BuildsRemoteStoreOptions : RemoteStoreOptions IoBuffer MetaData; }; -std::shared_ptr<RemoteProjectStore> CreateBuildsRemoteStore(const BuildsRemoteStoreOptions& Options, - const std::filesystem::path& TempFilePath, - bool Quiet); +std::shared_ptr<RemoteProjectStore> CreateJupiterBuildsRemoteStore(const BuildsRemoteStoreOptions& Options, + const std::filesystem::path& TempFilePath, + bool Quiet); } // namespace zen diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp index a8c970323..07fd6e908 100644 --- a/src/zenserver/projectstore/projectstore.cpp +++ b/src/zenserver/projectstore/projectstore.cpp @@ -364,7 +364,7 @@ namespace { ForceDisableTempBlocks, AssumeHttp2, MetaData}; - RemoteStore = CreateBuildsRemoteStore(Options, TempFilePath, /*Quiet*/ false); + RemoteStore = CreateJupiterBuildsRemoteStore(Options, TempFilePath, /*Quiet*/ false); } if (!RemoteStore) @@ -5801,7 +5801,8 @@ ProjectStore::Export(Ref<ProjectStore::Project> Project, ProjectStore::Oplog& Op ZEN_INFO("SaveOplog: Status: {} '{}'", ToString(Response.first), Response.second); if (!IsHttpSuccessCode(Response.first)) { - throw std::runtime_error(Response.second.empty() ? fmt::format("Status: {}", ToString(Response.first)) : Response.second); + throw JobError(Response.second.empty() ? fmt::format("Status: {}", ToString(Response.first)) : Response.second, + (int)Response.first); } }); @@ -5842,7 +5843,8 @@ ProjectStore::Import(ProjectStore::Project& Project, ProjectStore::Oplog& Oplog, ZEN_INFO("LoadOplog: Status: {} '{}'", ToString(Response.first), Response.second); if (!IsHttpSuccessCode(Response.first)) { - throw std::runtime_error(Response.second.empty() ? fmt::format("Status: {}", ToString(Response.first)) : Response.second); + throw JobError(Response.second.empty() ? fmt::format("Status: {}", ToString(Response.first)) : Response.second, + (int)Response.first); } }); diff --git a/src/zenutil/buildstoragecache.cpp b/src/zenutil/buildstoragecache.cpp index 88238effd..2171f4d62 100644 --- a/src/zenutil/buildstoragecache.cpp +++ b/src/zenutil/buildstoragecache.cpp @@ -378,6 +378,13 @@ private: m_Stats.TotalBytesRead += Result.DownloadedBytes; m_Stats.TotalRequestTimeUs += uint64_t(Result.ElapsedSeconds * 1000000.0); m_Stats.TotalRequestCount++; + SetAtomicMax(m_Stats.PeakSentBytes, Result.UploadedBytes); + SetAtomicMax(m_Stats.PeakReceivedBytes, Result.DownloadedBytes); + if (Result.ElapsedSeconds > 0.0) + { + uint64_t BytesPerSec = uint64_t((Result.UploadedBytes + Result.DownloadedBytes) / Result.ElapsedSeconds); + SetAtomicMax(m_Stats.PeakBytesPerSec, BytesPerSec); + } } HttpClient& m_HttpClient; diff --git a/src/zenutil/filebuildstorage.cpp b/src/zenutil/filebuildstorage.cpp index c2cc5ab3c..f75fe403f 100644 --- a/src/zenutil/filebuildstorage.cpp +++ b/src/zenutil/filebuildstorage.cpp @@ -40,19 +40,23 @@ public: ZEN_TRACE_CPU("FileBuildStorage::ListNamespaces"); ZEN_UNUSED(bRecursive); - SimulateLatency(0, 0); + uint64_t ReceivedBytes = 0; + uint64_t SentBytes = 0; + + SimulateLatency(SentBytes, 0); + auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); Stopwatch ExecutionTimer; - auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); - m_Stats.TotalRequestCount++; + auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); CbObjectWriter Writer; Writer.BeginArray("results"); { } Writer.EndArray(); // results - Writer.Save(); - SimulateLatency(Writer.GetSaveSize(), 0); + + Writer.Finalize(); + ReceivedBytes = Writer.GetSaveSize(); return Writer.Save(); } @@ -61,11 +65,14 @@ public: ZEN_TRACE_CPU("FileBuildStorage::ListBuilds"); ZEN_UNUSED(Query); - SimulateLatency(Query.GetSize(), 0); + uint64_t ReceivedBytes = 0; + uint64_t SentBytes = Query.GetSize(); + + SimulateLatency(SentBytes, 0); + auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); Stopwatch ExecutionTimer; - auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); - m_Stats.TotalRequestCount++; + auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); const std::filesystem::path BuildFolder = GetBuildsFolder(); DirectoryContent Content; @@ -88,19 +95,23 @@ public: } } Writer.EndArray(); // results - Writer.Save(); - SimulateLatency(Writer.GetSaveSize(), 0); + + Writer.Finalize(); + ReceivedBytes = Writer.GetSaveSize(); return Writer.Save(); } virtual CbObject PutBuild(const Oid& BuildId, const CbObject& MetaData) override { ZEN_TRACE_CPU("FileBuildStorage::PutBuild"); - SimulateLatency(MetaData.GetSize(), 0); + uint64_t ReceivedBytes = 0; + uint64_t SentBytes = MetaData.GetSize(); + + SimulateLatency(SentBytes, 0); + auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); Stopwatch ExecutionTimer; - auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); - m_Stats.TotalRequestCount++; + auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); CbObjectWriter BuildObject; BuildObject.AddObject("metadata", MetaData); @@ -109,35 +120,41 @@ public: CbObjectWriter BuildResponse; BuildResponse.AddInteger("chunkSize"sv, 32u * 1024u * 1024u); - BuildResponse.Save(); - - SimulateLatency(0, BuildResponse.GetSaveSize()); + BuildResponse.Finalize(); + ReceivedBytes = BuildResponse.GetSaveSize(); return BuildResponse.Save(); } virtual CbObject GetBuild(const Oid& BuildId) override { ZEN_TRACE_CPU("FileBuildStorage::GetBuild"); - SimulateLatency(0, 0); + uint64_t ReceivedBytes = 0; + uint64_t SentBytes = 0; + + SimulateLatency(SentBytes, 0); + auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); + Stopwatch ExecutionTimer; - auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); - m_Stats.TotalRequestCount++; + auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); CbObject Build = ReadBuild(BuildId); - SimulateLatency(0, Build.GetSize()); + ReceivedBytes = Build.GetSize(); return Build; } virtual void FinalizeBuild(const Oid& BuildId) override { ZEN_TRACE_CPU("FileBuildStorage::FinalizeBuild"); - SimulateLatency(0, 0); - Stopwatch ExecutionTimer; - auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); - m_Stats.TotalRequestCount++; - ZEN_UNUSED(BuildId); - SimulateLatency(0, 0); + + uint64_t ReceivedBytes = 0; + uint64_t SentBytes = 0; + + SimulateLatency(SentBytes, 0); + auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); + + Stopwatch ExecutionTimer; + auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); } virtual std::pair<IoHash, std::vector<IoHash>> PutBuildPart(const Oid& BuildId, @@ -146,10 +163,14 @@ public: const CbObject& MetaData) override { ZEN_TRACE_CPU("FileBuildStorage::PutBuildPart"); - SimulateLatency(MetaData.GetSize(), 0); + uint64_t ReceivedBytes = 0; + uint64_t SentBytes = MetaData.GetSize(); + + SimulateLatency(SentBytes, 0); + auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); + Stopwatch ExecutionTimer; - auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); - m_Stats.TotalRequestCount++; + auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); const std::filesystem::path BuildPartDataPath = GetBuildPartPath(BuildId, BuildPartId); CreateDirectories(BuildPartDataPath.parent_path()); @@ -184,7 +205,7 @@ public: std::vector<IoHash> NeededAttachments = GetNeededAttachments(MetaData); - SimulateLatency(0, sizeof(IoHash) * NeededAttachments.size()); + ReceivedBytes = sizeof(IoHash) * NeededAttachments.size(); return std::make_pair(RawHash, std::move(NeededAttachments)); } @@ -192,22 +213,24 @@ public: virtual CbObject GetBuildPart(const Oid& BuildId, const Oid& BuildPartId) override { ZEN_TRACE_CPU("FileBuildStorage::GetBuildPart"); - SimulateLatency(0, 0); + uint64_t ReceivedBytes = 0; + uint64_t SentBytes = 0; + + SimulateLatency(SentBytes, 0); + auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); Stopwatch ExecutionTimer; - auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); - m_Stats.TotalRequestCount++; + auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); const std::filesystem::path BuildPartDataPath = GetBuildPartPath(BuildId, BuildPartId); IoBuffer Payload = ReadFile(BuildPartDataPath).Flatten(); - m_Stats.TotalBytesRead += Payload.GetSize(); ZEN_ASSERT(ValidateCompactBinary(Payload.GetView(), CbValidateMode::Default) == CbValidateError::None); CbObject BuildPartObject = CbObject(SharedBuffer(Payload)); - SimulateLatency(0, BuildPartObject.GetSize()); + ReceivedBytes = BuildPartObject.GetSize(); return BuildPartObject; } @@ -215,15 +238,18 @@ public: virtual std::vector<IoHash> FinalizeBuildPart(const Oid& BuildId, const Oid& BuildPartId, const IoHash& PartHash) override { ZEN_TRACE_CPU("FileBuildStorage::FinalizeBuildPart"); - SimulateLatency(0, 0); + uint64_t ReceivedBytes = 0; + uint64_t SentBytes = 0; + + SimulateLatency(SentBytes, 0); + auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); Stopwatch ExecutionTimer; - auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); - m_Stats.TotalRequestCount++; + auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); const std::filesystem::path BuildPartDataPath = GetBuildPartPath(BuildId, BuildPartId); IoBuffer Payload = ReadFile(BuildPartDataPath).Flatten(); - m_Stats.TotalBytesRead += Payload.GetSize(); + IoHash RawHash = IoHash::HashBuffer(Payload.GetView()); if (RawHash != PartHash) { @@ -234,7 +260,7 @@ public: CbObject BuildPartObject = CbObject(SharedBuffer(Payload)); std::vector<IoHash> NeededAttachments(GetNeededAttachments(BuildPartObject)); - SimulateLatency(0, NeededAttachments.size() * sizeof(IoHash)); + ReceivedBytes = NeededAttachments.size() * sizeof(IoHash); return NeededAttachments; } @@ -247,13 +273,14 @@ public: ZEN_TRACE_CPU("FileBuildStorage::PutBuildBlob"); ZEN_UNUSED(BuildId); ZEN_ASSERT(ContentType == ZenContentType::kCompressedBinary); - SimulateLatency(Payload.GetSize(), 0); ZEN_ASSERT_SLOW(ValidateCompressedBuffer(RawHash, Payload)); - Stopwatch ExecutionTimer; - auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); - m_Stats.TotalRequestCount++; + uint64_t ReceivedBytes = 0; + uint64_t SentBytes = Payload.GetSize(); + + SimulateLatency(SentBytes, 0); + auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); const std::filesystem::path BlockPath = GetBlobPayloadPath(RawHash); if (!IsFile(BlockPath)) @@ -261,8 +288,8 @@ public: CreateDirectories(BlockPath.parent_path()); TemporaryFile::SafeWriteFile(BlockPath, Payload.Flatten().GetView()); } - m_Stats.TotalBytesWritten += Payload.GetSize(); - SimulateLatency(0, 0); + + ReceivedBytes = Payload.GetSize(); } virtual std::vector<std::function<void()>> PutLargeBuildBlob(const Oid& BuildId, @@ -275,10 +302,15 @@ public: ZEN_TRACE_CPU("FileBuildStorage::PutLargeBuildBlob"); ZEN_UNUSED(BuildId); ZEN_UNUSED(ContentType); - SimulateLatency(0, 0); + + uint64_t ReceivedBytes = 0; + uint64_t SentBytes = 0; + + SimulateLatency(SentBytes, 0); + auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); + Stopwatch ExecutionTimer; - auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); - m_Stats.TotalRequestCount++; + auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); const std::filesystem::path BlockPath = GetBlobPayloadPath(RawHash); if (!IsFile(BlockPath)) @@ -314,7 +346,15 @@ public: WorkItems.push_back([this, RawHash, BlockPath, Workload, Offset, Size]() { ZEN_TRACE_CPU("FileBuildStorage::PutLargeBuildBlob_Work"); IoBuffer PartPayload = Workload->Transmitter(Offset, Size); - SimulateLatency(PartPayload.GetSize(), 0); + + uint64_t ReceivedBytes = 0; + uint64_t SentBytes = PartPayload.GetSize(); + + SimulateLatency(SentBytes, 0); + auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); + + Stopwatch ExecutionTimer; + auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); std::error_code Ec; Workload->TempFile.Write(PartPayload, Offset, Ec); @@ -325,8 +365,7 @@ public: Ec.message(), Ec.value())); } - uint64_t BytesWritten = PartPayload.GetSize(); - m_Stats.TotalBytesWritten += BytesWritten; + const bool IsLastPart = Workload->PartsLeft.fetch_sub(1) == 1; if (IsLastPart) { @@ -342,18 +381,14 @@ public: Ec.value())); } } - Workload->OnSentBytes(BytesWritten, IsLastPart); - SimulateLatency(0, 0); + Workload->OnSentBytes(SentBytes, IsLastPart); }); Offset += Size; } Workload->PartsLeft.store(WorkItems.size()); - - SimulateLatency(0, 0); return WorkItems; } - SimulateLatency(0, 0); return {}; } @@ -361,10 +396,15 @@ public: { ZEN_TRACE_CPU("FileBuildStorage::GetBuildBlob"); ZEN_UNUSED(BuildId); - SimulateLatency(0, 0); + + uint64_t ReceivedBytes = 0; + uint64_t SentBytes = 0; + + SimulateLatency(SentBytes, 0); + auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); + Stopwatch ExecutionTimer; - auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); - m_Stats.TotalRequestCount++; + auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); const std::filesystem::path BlockPath = GetBlobPayloadPath(RawHash); if (IsFile(BlockPath)) @@ -382,11 +422,9 @@ public: ZEN_ASSERT_SLOW(ValidateCompressedBuffer(RawHash, CompositeBuffer(SharedBuffer(Payload)))); } Payload.SetContentType(ZenContentType::kCompressedBinary); - m_Stats.TotalBytesRead += Payload.GetSize(); - SimulateLatency(0, Payload.GetSize()); + ReceivedBytes = Payload.GetSize(); return Payload; } - SimulateLatency(0, 0); return IoBuffer{}; } @@ -398,10 +436,15 @@ public: { ZEN_TRACE_CPU("FileBuildStorage::GetLargeBuildBlob"); ZEN_UNUSED(BuildId); - SimulateLatency(0, 0); + + uint64_t ReceivedBytes = 0; + uint64_t SentBytes = 0; + + SimulateLatency(SentBytes, 0); + auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); + Stopwatch ExecutionTimer; - auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); - m_Stats.TotalRequestCount++; + auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); const std::filesystem::path BlockPath = GetBlobPayloadPath(RawHash); if (IsFile(BlockPath)) @@ -429,22 +472,29 @@ public: uint64_t Size = Min(ChunkSize, BlobSize - Offset); WorkItems.push_back([this, BlockPath, Workload, Offset, Size]() { ZEN_TRACE_CPU("FileBuildStorage::GetLargeBuildBlob_Work"); - SimulateLatency(0, 0); + + uint64_t ReceivedBytes = 0; + uint64_t SentBytes = 0; + + SimulateLatency(SentBytes, 0); + auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); + + Stopwatch ExecutionTimer; + auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); + IoBuffer PartPayload(Size); Workload->BlobFile.Read(PartPayload.GetMutableView().GetData(), Size, Offset); - m_Stats.TotalBytesRead += PartPayload.GetSize(); + ReceivedBytes = PartPayload.GetSize(); Workload->OnReceive(Offset, PartPayload); uint64_t ByteRemaning = Workload->BytesRemaining.fetch_sub(Size); if (ByteRemaning == Size) { Workload->OnComplete(); } - SimulateLatency(Size, PartPayload.GetSize()); }); Offset += Size; } - SimulateLatency(0, 0); return WorkItems; } return {}; @@ -455,18 +505,19 @@ public: ZEN_TRACE_CPU("FileBuildStorage::PutBlockMetadata"); ZEN_UNUSED(BuildId); - SimulateLatency(MetaData.GetSize(), 0); + uint64_t ReceivedBytes = 0; + uint64_t SentBytes = MetaData.GetSize(); + + SimulateLatency(SentBytes, 0); + auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); Stopwatch ExecutionTimer; - auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); - m_Stats.TotalRequestCount++; + auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); const std::filesystem::path BlockMetaDataPath = GetBlobMetadataPath(BlockRawHash); CreateDirectories(BlockMetaDataPath.parent_path()); TemporaryFile::SafeWriteFile(BlockMetaDataPath, MetaData.GetView()); - m_Stats.TotalBytesWritten += MetaData.GetSize(); WriteAsJson(BlockMetaDataPath, MetaData); - SimulateLatency(0, 0); return true; } @@ -474,10 +525,15 @@ public: { ZEN_TRACE_CPU("FileBuildStorage::FindBlocks"); ZEN_UNUSED(BuildId); - SimulateLatency(sizeof(BuildId), 0); + + uint64_t ReceivedBytes = 0; + uint64_t SentBytes = 0; + + SimulateLatency(SentBytes, 0); + auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); + Stopwatch ExecutionTimer; - auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); - m_Stats.TotalRequestCount++; + auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); uint64_t FoundCount = 0; @@ -495,8 +551,6 @@ public: { IoBuffer BlockMetaDataPayload = ReadFile(MetaDataFile).Flatten(); - m_Stats.TotalBytesRead += BlockMetaDataPayload.GetSize(); - CbObject BlockObject = CbObject(SharedBuffer(BlockMetaDataPayload)); Writer.AddObject(BlockObject); FoundCount++; @@ -508,19 +562,25 @@ public: } } Writer.EndArray(); // blocks - CbObject Result = Writer.Save(); - SimulateLatency(0, Result.GetSize()); - return Result; + + Writer.Finalize(); + ReceivedBytes = Writer.GetSaveSize(); + return Writer.Save(); } virtual CbObject GetBlockMetadatas(const Oid& BuildId, std::span<const IoHash> BlockHashes) override { ZEN_TRACE_CPU("FileBuildStorage::GetBlockMetadata"); ZEN_UNUSED(BuildId); - SimulateLatency(sizeof(Oid) + sizeof(IoHash) * BlockHashes.size(), 0); + + uint64_t ReceivedBytes = 0; + uint64_t SentBytes = sizeof(Oid) + sizeof(IoHash) * BlockHashes.size(); + + SimulateLatency(SentBytes, 0); + auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); + Stopwatch ExecutionTimer; - auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); - m_Stats.TotalRequestCount++; + auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); CbObjectWriter Writer; Writer.BeginArray("blocks"); @@ -532,22 +592,29 @@ public: { IoBuffer BlockMetaDataPayload = ReadFile(MetaDataFile).Flatten(); - m_Stats.TotalBytesRead += BlockMetaDataPayload.GetSize(); - CbObject BlockObject = CbObject(SharedBuffer(BlockMetaDataPayload)); Writer.AddObject(BlockObject); } } Writer.EndArray(); // blocks - CbObject Result = Writer.Save(); - SimulateLatency(0, Result.GetSize()); - return Result; + Writer.Finalize(); + ReceivedBytes = Writer.GetSaveSize(); + return Writer.Save(); } virtual void PutBuildPartStats(const Oid& BuildId, const Oid& BuildPartId, const tsl::robin_map<std::string, double>& FloatStats) override { + uint64_t ReceivedBytes = 0; + uint64_t SentBytes = 0; + + SimulateLatency(SentBytes, 0); + auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); + + Stopwatch ExecutionTimer; + auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); + CbObjectWriter Request; Request.BeginObject("floatStats"sv); for (auto It : FloatStats) @@ -555,17 +622,16 @@ public: Request.AddFloat(It.first, It.second); } Request.EndObject(); - CbObject Payload = Request.Save(); - - SimulateLatency(Payload.GetSize(), 0); + Request.Finalize(); + SentBytes = Request.GetSaveSize(); const std::filesystem::path BuildPartStatsDataPath = GetBuildPartStatsPath(BuildId, BuildPartId); CreateDirectories(BuildPartStatsDataPath.parent_path()); + CbObject Payload = Request.Save(); + TemporaryFile::SafeWriteFile(BuildPartStatsDataPath, Payload.GetView()); WriteAsJson(BuildPartStatsDataPath, Payload); - - SimulateLatency(0, 0); } protected: @@ -629,7 +695,6 @@ protected: const std::filesystem::path BuildDataPath = GetBuildPath(BuildId); CreateDirectories(BuildDataPath.parent_path()); TemporaryFile::SafeWriteFile(BuildDataPath, Data.GetView()); - m_Stats.TotalBytesWritten += Data.GetSize(); WriteAsJson(BuildDataPath, Data); } @@ -646,7 +711,6 @@ protected: Content.ErrorCode.value())); } IoBuffer Payload = Content.Flatten(); - m_Stats.TotalBytesRead += Payload.GetSize(); ZEN_ASSERT(ValidateCompactBinary(Payload.GetView(), CbValidateMode::Default) == CbValidateError::None); CbObject BuildObject = CbObject(SharedBuffer(Payload)); return BuildObject; @@ -704,6 +768,22 @@ protected: } private: + void AddStatistic(Stopwatch& ExecutionTimer, uint64_t UploadedBytes, uint64_t DownloadedBytes) + { + const uint64_t ElapsedUs = ExecutionTimer.GetElapsedTimeUs(); + m_Stats.TotalBytesWritten += UploadedBytes; + m_Stats.TotalBytesRead += DownloadedBytes; + m_Stats.TotalRequestTimeUs += ElapsedUs; + m_Stats.TotalRequestCount++; + SetAtomicMax(m_Stats.PeakSentBytes, UploadedBytes); + SetAtomicMax(m_Stats.PeakReceivedBytes, DownloadedBytes); + if (ElapsedUs > 0) + { + uint64_t BytesPerSec = ((UploadedBytes + DownloadedBytes) * 1000000u) / ElapsedUs; + SetAtomicMax(m_Stats.PeakBytesPerSec, BytesPerSec); + } + } + const std::filesystem::path m_StoragePath; BuildStorage::Statistics& m_Stats; const bool m_EnableJsonOutput = false; diff --git a/src/zenutil/include/zenutil/buildstorage.h b/src/zenutil/include/zenutil/buildstorage.h index f49d4b42a..46ecd0a11 100644 --- a/src/zenutil/include/zenutil/buildstorage.h +++ b/src/zenutil/include/zenutil/buildstorage.h @@ -21,6 +21,9 @@ public: std::atomic<uint64_t> TotalRequestCount = 0; std::atomic<uint64_t> TotalRequestTimeUs = 0; std::atomic<uint64_t> TotalExecutionTimeUs = 0; + std::atomic<uint64_t> PeakSentBytes = 0; + std::atomic<uint64_t> PeakReceivedBytes = 0; + std::atomic<uint64_t> PeakBytesPerSec = 0; }; virtual ~BuildStorage() {} diff --git a/src/zenutil/include/zenutil/buildstoragecache.h b/src/zenutil/include/zenutil/buildstoragecache.h index e1fb73fd4..a0690a16a 100644 --- a/src/zenutil/include/zenutil/buildstoragecache.h +++ b/src/zenutil/include/zenutil/buildstoragecache.h @@ -22,6 +22,9 @@ public: std::atomic<uint64_t> TotalRequestCount = 0; std::atomic<uint64_t> TotalRequestTimeUs = 0; std::atomic<uint64_t> TotalExecutionTimeUs = 0; + std::atomic<uint64_t> PeakSentBytes = 0; + std::atomic<uint64_t> PeakReceivedBytes = 0; + std::atomic<uint64_t> PeakBytesPerSec = 0; }; virtual ~BuildStorageCache() {} diff --git a/src/zenutil/jupiter/jupiterbuildstorage.cpp b/src/zenutil/jupiter/jupiterbuildstorage.cpp index 9974725ff..c9278acb4 100644 --- a/src/zenutil/jupiter/jupiterbuildstorage.cpp +++ b/src/zenutil/jupiter/jupiterbuildstorage.cpp @@ -17,6 +17,16 @@ namespace zen { using namespace std::literals; +namespace { + void ThrowFromJupiterResult(const JupiterResult& Result, std::string_view Prefix) + { + int Error = Result.ErrorCode < (int)HttpResponseCode::Continue ? Result.ErrorCode : 0; + HttpResponseCode Status = + Result.ErrorCode >= int(HttpResponseCode::Continue) ? HttpResponseCode(Result.ErrorCode) : HttpResponseCode::ImATeapot; + throw HttpClientError(fmt::format("{}: {} ({})", Prefix, Result.Reason, Result.ErrorCode), Error, Status); + } +} // namespace + class JupiterBuildStorage : public BuildStorage { public: @@ -46,7 +56,7 @@ public: AddStatistic(ListResult); if (!ListResult.Success) { - throw std::runtime_error(fmt::format("Failed listing namespaces: {} ({})", ListResult.Reason, ListResult.ErrorCode)); + ThrowFromJupiterResult(ListResult, "Failed listing namespaces"); } CbObject NamespaceResponse = PayloadToCbObject("Failed listing namespaces"sv, ListResult.Response); @@ -66,10 +76,10 @@ public: AddStatistic(BucketsResult); if (!BucketsResult.Success) { - throw std::runtime_error( - fmt::format("Failed listing namespaces: {} ({})", BucketsResult.Reason, BucketsResult.ErrorCode)); + ThrowFromJupiterResult(BucketsResult, fmt::format("Failed listing buckets in namespace {}", Namespace)); } - CbObject BucketResponse = PayloadToCbObject("Failed listing namespaces"sv, BucketsResult.Response); + CbObject BucketResponse = + PayloadToCbObject(fmt::format("Failed listing buckets in namespace {}", Namespace), BucketsResult.Response); Response.BeginArray("items"); for (CbFieldView BucketField : BucketResponse["buckets"]) @@ -103,7 +113,7 @@ public: AddStatistic(ListResult); if (!ListResult.Success) { - throw std::runtime_error(fmt::format("Failed listing builds: {} ({})", ListResult.Reason, ListResult.ErrorCode)); + ThrowFromJupiterResult(ListResult, "Failed listing builds"sv); } return PayloadToCbObject("Failed listing builds"sv, ListResult.Response); } @@ -120,7 +130,7 @@ public: AddStatistic(PutResult); if (!PutResult.Success) { - throw std::runtime_error(fmt::format("Failed creating build: {} ({})", PutResult.Reason, PutResult.ErrorCode)); + ThrowFromJupiterResult(PutResult, "Failed creating build"sv); } return PayloadToCbObject(fmt::format("Failed creating build: {}", BuildId), PutResult.Response); } @@ -135,7 +145,7 @@ public: AddStatistic(GetBuildResult); if (!GetBuildResult.Success) { - throw std::runtime_error(fmt::format("Failed fetching build: {} ({})", GetBuildResult.Reason, GetBuildResult.ErrorCode)); + ThrowFromJupiterResult(GetBuildResult, "Failed fetching build"sv); } return PayloadToCbObject(fmt::format("Failed fetching build {}:", BuildId), GetBuildResult.Response); } @@ -150,8 +160,7 @@ public: AddStatistic(FinalizeBuildResult); if (!FinalizeBuildResult.Success) { - throw std::runtime_error( - fmt::format("Failed finalizing build part: {} ({})", FinalizeBuildResult.Reason, FinalizeBuildResult.ErrorCode)); + ThrowFromJupiterResult(FinalizeBuildResult, "Failed finalizing build"sv); } } @@ -170,7 +179,7 @@ public: AddStatistic(PutPartResult); if (!PutPartResult.Success) { - throw std::runtime_error(fmt::format("Failed creating build part: {} ({})", PutPartResult.Reason, PutPartResult.ErrorCode)); + ThrowFromJupiterResult(PutPartResult, "Failed creating build part"sv); } return std::make_pair(PutPartResult.RawHash, std::move(PutPartResult.Needs)); } @@ -185,10 +194,7 @@ public: AddStatistic(GetBuildPartResult); if (!GetBuildPartResult.Success) { - throw std::runtime_error(fmt::format("Failed fetching build part {}: {} ({})", - BuildPartId, - GetBuildPartResult.Reason, - GetBuildPartResult.ErrorCode)); + ThrowFromJupiterResult(GetBuildPartResult, "Failed fetching build part"sv); } return PayloadToCbObject(fmt::format("Failed fetching build part {}:", BuildPartId), GetBuildPartResult.Response); } @@ -203,8 +209,7 @@ public: AddStatistic(FinalizePartResult); if (!FinalizePartResult.Success) { - throw std::runtime_error( - fmt::format("Failed finalizing build part: {} ({})", FinalizePartResult.Reason, FinalizePartResult.ErrorCode)); + ThrowFromJupiterResult(FinalizePartResult, "Failed finalizing build part"sv); } return std::move(FinalizePartResult.Needs); } @@ -222,7 +227,7 @@ public: AddStatistic(PutBlobResult); if (!PutBlobResult.Success) { - throw std::runtime_error(fmt::format("Failed putting build part: {} ({})", PutBlobResult.Reason, PutBlobResult.ErrorCode)); + ThrowFromJupiterResult(PutBlobResult, "Failed putting build part"sv); } } @@ -249,8 +254,7 @@ public: AddStatistic(PutMultipartBlobResult); if (!PutMultipartBlobResult.Success) { - throw std::runtime_error( - fmt::format("Failed putting build part: {} ({})", PutMultipartBlobResult.Reason, PutMultipartBlobResult.ErrorCode)); + ThrowFromJupiterResult(PutMultipartBlobResult, "Failed putting large build blob"sv); } OnSentBytes(PutMultipartBlobResult.SentBytes, WorkItems.empty()); @@ -265,7 +269,7 @@ public: AddStatistic(PartResult); if (!PartResult.Success) { - throw std::runtime_error(fmt::format("Failed putting build part: {} ({})", PartResult.Reason, PartResult.ErrorCode)); + ThrowFromJupiterResult(PartResult, "Failed putting large build blob"sv); } OnSentBytes(PartResult.SentBytes, IsComplete); }); @@ -285,8 +289,7 @@ public: AddStatistic(GetBuildBlobResult); if (!GetBuildBlobResult.Success) { - throw std::runtime_error( - fmt::format("Failed fetching build blob {}: {} ({})", RawHash, GetBuildBlobResult.Reason, GetBuildBlobResult.ErrorCode)); + ThrowFromJupiterResult(GetBuildBlobResult, "Failed fetching build blob"sv); } return std::move(GetBuildBlobResult.Response); } @@ -314,8 +317,7 @@ public: AddStatistic(GetMultipartBlobResult); if (!GetMultipartBlobResult.Success) { - throw std::runtime_error( - fmt::format("Failed getting build part: {} ({})", GetMultipartBlobResult.Reason, GetMultipartBlobResult.ErrorCode)); + ThrowFromJupiterResult(GetMultipartBlobResult, "Failed getting large build part"sv); } std::vector<std::function<void()>> WorkList; for (auto& WorkItem : WorkItems) @@ -327,7 +329,7 @@ public: AddStatistic(PartResult); if (!PartResult.Success) { - throw std::runtime_error(fmt::format("Failed getting build part: {} ({})", PartResult.Reason, PartResult.ErrorCode)); + ThrowFromJupiterResult(PartResult, "Failed getting large build part"sv); } }); } @@ -350,8 +352,7 @@ public: { return false; } - throw std::runtime_error( - fmt::format("Failed putting build block metadata: {} ({})", PutMetaResult.Reason, PutMetaResult.ErrorCode)); + ThrowFromJupiterResult(PutMetaResult, "Failed putting build block metadata"sv); } return true; } @@ -366,7 +367,7 @@ public: AddStatistic(FindResult); if (!FindResult.Success) { - throw std::runtime_error(fmt::format("Failed fetching known blocks: {} ({})", FindResult.Reason, FindResult.ErrorCode)); + ThrowFromJupiterResult(FindResult, "Failed fetching known blocks"sv); } return PayloadToCbObject("Failed fetching known blocks"sv, FindResult.Response); } @@ -392,8 +393,7 @@ public: AddStatistic(GetBlockMetadataResult); if (!GetBlockMetadataResult.Success) { - throw std::runtime_error( - fmt::format("Failed fetching block metadatas: {} ({})", GetBlockMetadataResult.Reason, GetBlockMetadataResult.ErrorCode)); + ThrowFromJupiterResult(GetBlockMetadataResult, "Failed fetching block metadatas"sv); } return PayloadToCbObject("Failed fetching block metadatas", GetBlockMetadataResult.Response); } @@ -416,14 +416,12 @@ public: AddStatistic(PutBuildPartStatsResult); if (!PutBuildPartStatsResult.Success) { - throw std::runtime_error(fmt::format("Failed posting build part statistics: {} ({})", - PutBuildPartStatsResult.Reason, - PutBuildPartStatsResult.ErrorCode)); + ThrowFromJupiterResult(PutBuildPartStatsResult, "Failed posting build part statistics"sv); } } private: - static CbObject PayloadToCbObject(std::string_view Context, const IoBuffer& Payload) + static CbObject PayloadToCbObject(std::string_view ErrorContext, const IoBuffer& Payload) { if (Payload.GetContentType() == ZenContentType::kJSON) { @@ -443,7 +441,7 @@ private: else { throw std::runtime_error( - fmt::format("{}: {} ({})", "Unsupported response format", Context, ToString(Payload.GetContentType()))); + fmt::format("{}: {} ({})", "Unsupported response format", ErrorContext, ToString(Payload.GetContentType()))); } } @@ -453,6 +451,14 @@ private: m_Stats.TotalBytesRead += Result.ReceivedBytes; m_Stats.TotalRequestTimeUs += uint64_t(Result.ElapsedSeconds * 1000000.0); m_Stats.TotalRequestCount++; + + SetAtomicMax(m_Stats.PeakSentBytes, Result.SentBytes); + SetAtomicMax(m_Stats.PeakReceivedBytes, Result.ReceivedBytes); + if (Result.ElapsedSeconds > 0.0) + { + uint64_t BytesPerSec = uint64_t((Result.SentBytes + Result.ReceivedBytes) / Result.ElapsedSeconds); + SetAtomicMax(m_Stats.PeakBytesPerSec, BytesPerSec); + } } JupiterSession m_Session; |