aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-08-12 13:53:58 +0200
committerGitHub Enterprise <[email protected]>2025-08-12 13:53:58 +0200
commit3a9bc3071b9a9452a5aef23c438233fc9e86fb47 (patch)
treeb0a1d67fe765f2ddc96772db088d781be159d627 /src
parentadd filtering to builds download (#463) (diff)
downloadzen-3a9bc3071b9a9452a5aef23c438233fc9e86fb47.tar.xz
zen-3a9bc3071b9a9452a5aef23c438233fc9e86fb47.zip
use new builds api for oplogs (#464)
- Improvement: Refactored jupiter oplog export code to reuse builds jupiter wrapper classes - Improvement: If `zen builds`, `zen oplog-import` or `zen oplog-import` command fails due to a http error, the return code for the program will be set to the error/status code
Diffstat (limited to 'src')
-rw-r--r--src/zen/cmds/builds_cmd.cpp9
-rw-r--r--src/zen/cmds/projectstore_cmd.cpp95
-rw-r--r--src/zencore/include/zencore/jobqueue.h11
-rw-r--r--src/zencore/jobqueue.cpp22
-rw-r--r--src/zenhttp/httpclient.cpp7
-rw-r--r--src/zenhttp/include/zenhttp/httpclient.h23
-rw-r--r--src/zenserver/admin/admin.cpp1
-rw-r--r--src/zenserver/projectstore/buildsremoteprojectstore.cpp589
-rw-r--r--src/zenserver/projectstore/buildsremoteprojectstore.h6
-rw-r--r--src/zenserver/projectstore/projectstore.cpp8
-rw-r--r--src/zenutil/buildstoragecache.cpp7
-rw-r--r--src/zenutil/filebuildstorage.cpp276
-rw-r--r--src/zenutil/include/zenutil/buildstorage.h3
-rw-r--r--src/zenutil/include/zenutil/buildstoragecache.h3
-rw-r--r--src/zenutil/jupiter/jupiterbuildstorage.cpp76
15 files changed, 665 insertions, 471 deletions
diff --git a/src/zen/cmds/builds_cmd.cpp b/src/zen/cmds/builds_cmd.cpp
index aa6ae5ea4..cf5e8d9da 100644
--- a/src/zen/cmds/builds_cmd.cpp
+++ b/src/zen/cmds/builds_cmd.cpp
@@ -10785,9 +10785,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
DiscoveryHttpClient.Get("/api/v1/status/servers", HttpClient::Accept(HttpContentType::kJSON));
if (!ServerInfoResponse.IsSuccess())
{
- throw std::runtime_error(fmt::format("Failed to get list of servers from discovery url '{}'. Reason: '{}'",
- m_Host,
- ServerInfoResponse.ErrorMessage("")));
+ ServerInfoResponse.ThrowError(fmt::format("Failed to get list of servers from discovery url '{}'", m_Host));
}
std::string_view JsonResponse = ServerInfoResponse.AsText();
@@ -12166,6 +12164,11 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
return 3;
}
}
+ catch (const HttpClientError& HttpEx)
+ {
+ ZEN_CONSOLE("Operation failed: {}", HttpEx.what());
+ return HttpEx.m_Error != 0 ? HttpEx.m_Error : (int)HttpEx.m_ResponseCode;
+ }
catch (const std::exception& Ex)
{
ZEN_ERROR("{}", Ex.what());
diff --git a/src/zen/cmds/projectstore_cmd.cpp b/src/zen/cmds/projectstore_cmd.cpp
index 58af0577e..f919edc87 100644
--- a/src/zen/cmds/projectstore_cmd.cpp
+++ b/src/zen/cmds/projectstore_cmd.cpp
@@ -149,6 +149,16 @@ namespace {
}
}
+ class AsyncJobError : public std::runtime_error
+ {
+ public:
+ using _Mybase = runtime_error;
+
+ AsyncJobError(const std::string& Message, int ReturnCode) : _Mybase(Message), m_ReturnCode(ReturnCode) {}
+
+ const int m_ReturnCode = 0;
+ };
+
void ExecuteAsyncOperation(HttpClient& Http, std::string_view Url, IoBuffer&& Payload, bool PlainProgress)
{
signal(SIGINT, SignalCallbackHandler);
@@ -252,13 +262,14 @@ namespace {
if (Status == "Aborted")
{
std::string_view AbortReason = StatusObject["AbortReason"].AsString();
+ int ReturnCode = StatusObject["ReturnCode"].AsInt32(-1);
if (!AbortReason.empty())
{
- throw std::runtime_error(std::string(AbortReason));
+ throw AsyncJobError(std::string(AbortReason), ReturnCode);
}
else
{
- throw std::runtime_error("Aborted");
+ throw AsyncJobError("Aborted", ReturnCode);
}
break;
}
@@ -1335,24 +1346,44 @@ ExportOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg
ZEN_CONSOLE("Saving oplog '{}/{}' from '{}' to {}", m_ProjectName, m_OplogName, m_HostName, TargetDescription);
- if (m_Async)
+ try
{
- if (HttpClient::Response Result = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", m_ProjectName, m_OplogName),
- std::move(Payload),
- HttpClient::Accept(ZenContentType::kJSON));
- Result)
+ if (m_Async)
{
- ZEN_CONSOLE("{}", Result.ToText());
+ if (HttpClient::Response Result = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", m_ProjectName, m_OplogName),
+ std::move(Payload),
+ HttpClient::Accept(ZenContentType::kJSON));
+ Result)
+ {
+ ZEN_CONSOLE("{}", Result.ToText());
+ }
+ else
+ {
+ Result.ThrowError("failed requesting loading oplog export"sv);
+ }
}
else
{
- Result.ThrowError("failed requesting loading oplog export"sv);
- return 1;
+ ExecuteAsyncOperation(Http,
+ fmt::format("/prj/{}/oplog/{}/rpc", m_ProjectName, m_OplogName),
+ std::move(Payload),
+ m_PlainProgress);
}
}
- else
+ catch (const HttpClientError& Ex)
{
- ExecuteAsyncOperation(Http, fmt::format("/prj/{}/oplog/{}/rpc", m_ProjectName, m_OplogName), std::move(Payload), m_PlainProgress);
+ ZEN_CONSOLE("Oplog export failed: '{}'", Ex.what());
+ return Ex.m_Error != 0 ? Ex.m_Error : (int)Ex.m_ResponseCode;
+ }
+ catch (const AsyncJobError& Ex)
+ {
+ ZEN_CONSOLE("Oplog export failed: '{}'", Ex.what());
+ return Ex.m_ReturnCode;
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_CONSOLE("Oplog export failed: '{}'", Ex.what());
+ return 1;
}
return 0;
}
@@ -1668,24 +1699,44 @@ ImportOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg
ZEN_CONSOLE("Loading oplog '{}/{}' from '{}' to {}", m_ProjectName, m_OplogName, SourceDescription, m_HostName);
- if (m_Async)
+ try
{
- if (HttpClient::Response Result = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", m_ProjectName, m_OplogName),
- std::move(Payload),
- HttpClient::Accept(ZenContentType::kJSON));
- Result)
+ if (m_Async)
{
- ZEN_CONSOLE("{}", Result.ToText());
+ if (HttpClient::Response Result = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", m_ProjectName, m_OplogName),
+ std::move(Payload),
+ HttpClient::Accept(ZenContentType::kJSON));
+ Result)
+ {
+ ZEN_CONSOLE("{}", Result.ToText());
+ }
+ else
+ {
+ Result.ThrowError("failed requesting loading oplog import"sv);
+ }
}
else
{
- Result.ThrowError("failed requesting loading oplog import"sv);
- return 1;
+ ExecuteAsyncOperation(Http,
+ fmt::format("/prj/{}/oplog/{}/rpc", m_ProjectName, m_OplogName),
+ std::move(Payload),
+ m_PlainProgress);
}
}
- else
+ catch (const HttpClientError& Ex)
+ {
+ ZEN_CONSOLE("Oplog import failed: '{}'", Ex.what());
+ return Ex.m_Error != 0 ? Ex.m_Error : (int)Ex.m_ResponseCode;
+ }
+ catch (const AsyncJobError& Ex)
{
- ExecuteAsyncOperation(Http, fmt::format("/prj/{}/oplog/{}/rpc", m_ProjectName, m_OplogName), std::move(Payload), m_PlainProgress);
+ ZEN_CONSOLE("Oplog export failed: '{}'", Ex.what());
+ return Ex.m_ReturnCode;
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_CONSOLE("Oplog import failed: '{}'", Ex.what());
+ return 1;
}
return 0;
}
diff --git a/src/zencore/include/zencore/jobqueue.h b/src/zencore/include/zencore/jobqueue.h
index d5ec6255a..470ed3fc6 100644
--- a/src/zencore/include/zencore/jobqueue.h
+++ b/src/zencore/include/zencore/jobqueue.h
@@ -28,6 +28,16 @@ public:
virtual void ReportProgress(std::string_view CurrentOp, std::string_view Details, ptrdiff_t TotalCount, ptrdiff_t RemainingCount) = 0;
};
+class JobError : public std::runtime_error
+{
+public:
+ using _Mybase = runtime_error;
+
+ JobError(const std::string& Message, int ReturnCode) : _Mybase(Message), m_ReturnCode(ReturnCode) {}
+
+ const int m_ReturnCode = 0;
+};
+
class JobQueue
{
public:
@@ -73,6 +83,7 @@ public:
std::chrono::system_clock::time_point StartTime;
std::chrono::system_clock::time_point EndTime;
int WorkerThreadId;
+ int ReturnCode;
};
// Will only respond once when status is Complete or Aborted
diff --git a/src/zencore/jobqueue.cpp b/src/zencore/jobqueue.cpp
index b97484458..5d727b69c 100644
--- a/src/zencore/jobqueue.cpp
+++ b/src/zencore/jobqueue.cpp
@@ -50,6 +50,7 @@ public:
JobClock::Tick StartTick;
JobClock::Tick EndTick;
int WorkerThreadId;
+ int ReturnCode;
virtual bool IsCancelled() const override { return CancelFlag.load(); }
virtual void ReportMessage(std::string_view Message) override { Queue->ReportMessage(Id, Message); }
@@ -101,6 +102,7 @@ public:
NewJob->StartTick = JobClock::Never();
NewJob->EndTick = JobClock::Never();
NewJob->WorkerThreadId = 0;
+ NewJob->ReturnCode = -1;
ZEN_DEBUG("Scheduling background job {}:'{}'", NewJob->Id.Id, NewJob->Name);
QueueLock.WithExclusiveLock([&]() { QueuedJobs.emplace_back(std::move(NewJob)); });
@@ -274,7 +276,8 @@ public:
.CreateTime = JobClock::TimePointFromTick(Job.CreateTick),
.StartTime = JobClock::TimePointFromTick(Job.StartTick),
.EndTime = JobClock::TimePointFromTick(Job.EndTick),
- .WorkerThreadId = Job.WorkerThreadId};
+ .WorkerThreadId = Job.WorkerThreadId,
+ .ReturnCode = Job.ReturnCode};
};
std::optional<JobDetails> Result;
@@ -365,6 +368,7 @@ public:
ZEN_DEBUG("Executing background job {}:'{}'", CurrentJob->Id.Id, CurrentJob->Name);
CurrentJob->Callback(*CurrentJob);
ZEN_DEBUG("Completed background job {}:'{}'", CurrentJob->Id.Id, CurrentJob->Name);
+ CurrentJob->ReturnCode = 0;
QueueLock.WithExclusiveLock([&]() {
CurrentJob->EndTick = JobClock::Now();
CurrentJob->WorkerThreadId = 0;
@@ -383,6 +387,22 @@ public:
AbortedJobs.insert_or_assign(CurrentJob->Id.Id, std::move(CurrentJob));
});
}
+ catch (const JobError& Ex)
+ {
+ ZEN_DEBUG("Background job {}:'{}' failed. Reason: '{}'. Return code {}",
+ CurrentJob->Id.Id,
+ CurrentJob->Name,
+ Ex.what(),
+ Ex.m_ReturnCode);
+ QueueLock.WithExclusiveLock([&]() {
+ CurrentJob->State.AbortReason = Ex.what();
+ CurrentJob->EndTick = JobClock::Now();
+ CurrentJob->WorkerThreadId = 0;
+ CurrentJob->ReturnCode = Ex.m_ReturnCode;
+ RunningJobs.erase(CurrentJob->Id.Id);
+ AbortedJobs.insert_or_assign(CurrentJob->Id.Id, std::move(CurrentJob));
+ });
+ }
catch (const std::exception& Ex)
{
ZEN_DEBUG("Background job {}:'{}' aborted. Reason: '{}'", CurrentJob->Id.Id, CurrentJob->Name, Ex.what());
diff --git a/src/zenhttp/httpclient.cpp b/src/zenhttp/httpclient.cpp
index a2d323b5e..30a2bfc65 100644
--- a/src/zenhttp/httpclient.cpp
+++ b/src/zenhttp/httpclient.cpp
@@ -1628,12 +1628,13 @@ HttpClient::Response::ErrorMessage(std::string_view Prefix) const
}
else if (StatusCode != HttpResponseCode::ImATeapot && (int)StatusCode)
{
- return fmt::format("{}{}HTTP error {} {} ({})",
+ std::string TextResponse = ToText();
+ return fmt::format("{}{}HTTP error {} {}{}",
Prefix,
Prefix.empty() ? ""sv : ": "sv,
(int)StatusCode,
zen::ToString(StatusCode),
- ToText());
+ TextResponse.empty() ? ""sv : fmt::format(" ({})", TextResponse));
}
else
{
@@ -1646,7 +1647,7 @@ HttpClient::Response::ThrowError(std::string_view ErrorPrefix)
{
if (!IsSuccess())
{
- throw std::runtime_error(ErrorMessage(ErrorPrefix));
+ throw HttpClientError(ErrorMessage(ErrorPrefix), Error.has_value() ? Error.value().ErrorCode : 0, StatusCode);
}
}
diff --git a/src/zenhttp/include/zenhttp/httpclient.h b/src/zenhttp/include/zenhttp/httpclient.h
index c991a71ea..50bd5b53a 100644
--- a/src/zenhttp/include/zenhttp/httpclient.h
+++ b/src/zenhttp/include/zenhttp/httpclient.h
@@ -57,6 +57,29 @@ struct HttpClientSettings
uint8_t RetryCount = 0;
};
+class HttpClientError : public std::runtime_error
+{
+public:
+ using _Mybase = runtime_error;
+
+ HttpClientError(const std::string& Message, int Error, HttpResponseCode ResponseCode)
+ : _Mybase(Message)
+ , m_Error(Error)
+ , m_ResponseCode(ResponseCode)
+ {
+ }
+
+ HttpClientError(const char* Message, int Error, HttpResponseCode ResponseCode)
+ : _Mybase(Message)
+ , m_Error(Error)
+ , m_ResponseCode(ResponseCode)
+ {
+ }
+
+ const int m_Error = 0;
+ const HttpResponseCode m_ResponseCode = HttpResponseCode::ImATeapot;
+};
+
class HttpClient
{
public:
diff --git a/src/zenserver/admin/admin.cpp b/src/zenserver/admin/admin.cpp
index 73166e608..9d982678a 100644
--- a/src/zenserver/admin/admin.cpp
+++ b/src/zenserver/admin/admin.cpp
@@ -247,6 +247,7 @@ HttpAdminService::HttpAdminService(GcScheduler& Scheduler,
Obj.AddFloat("QueueTimeS", GetAgeAsSeconds(CurrentState->CreateTime, CurrentState->StartTime));
Obj.AddFloat("RunTimeS", GetAgeAsSeconds(CurrentState->StartTime, CurrentState->EndTime));
Obj.AddFloat("CompleteTimeS", GetAgeAsSeconds(CurrentState->EndTime, Now));
+ Obj.AddInteger("ReturnCode", CurrentState->ReturnCode);
Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Obj.Save());
}
break;
diff --git a/src/zenserver/projectstore/buildsremoteprojectstore.cpp b/src/zenserver/projectstore/buildsremoteprojectstore.cpp
index eab687db2..c9a01e56a 100644
--- a/src/zenserver/projectstore/buildsremoteprojectstore.cpp
+++ b/src/zenserver/projectstore/buildsremoteprojectstore.cpp
@@ -3,14 +3,12 @@
#include "buildsremoteprojectstore.h"
#include <zencore/compactbinarybuilder.h>
-#include <zencore/compactbinaryvalidation.h>
#include <zencore/compress.h>
#include <zencore/fmtutils.h>
+#include <zencore/scopeguard.h>
#include <zenhttp/httpclientauth.h>
-
-#include <zenutil/jupiter/jupiterclient.h>
-#include <zenutil/jupiter/jupitersession.h>
+#include <zenutil/jupiter/jupiterbuildstorage.h>
namespace zen {
@@ -21,20 +19,24 @@ static const std::string_view OplogContainerPartName = "oplogcontainer"sv;
class BuildsRemoteStore : public RemoteProjectStore
{
public:
- BuildsRemoteStore(Ref<JupiterClient>&& InJupiterClient,
- std::string_view Namespace,
- std::string_view Bucket,
- const Oid& BuildId,
- const IoBuffer& MetaData,
- bool ForceDisableBlocks,
- bool ForceDisableTempBlocks,
- const std::filesystem::path& TempFilePath)
- : m_JupiterClient(std::move(InJupiterClient))
+ BuildsRemoteStore(std::unique_ptr<BuildStorage::Statistics>&& BuildStorageStats,
+ std::unique_ptr<HttpClient>&& BuildStorageHttp,
+ std::unique_ptr<BuildStorage>&& BuildStorage,
+ std::string_view Url,
+ std::string_view Namespace,
+ std::string_view Bucket,
+ const Oid& BuildId,
+ const IoBuffer& MetaData,
+ bool ForceDisableBlocks,
+ bool ForceDisableTempBlocks)
+ : m_BuildStorageStats(std::move(BuildStorageStats))
+ , m_BuildStorageHttp(std::move(BuildStorageHttp))
+ , m_BuildStorage(std::move(BuildStorage))
+ , m_Url(Url)
, m_Namespace(Namespace)
, m_Bucket(Bucket)
, m_BuildId(BuildId)
, m_MetaData(MetaData)
- , m_TempFilePath(TempFilePath)
, m_EnableBlocks(!ForceDisableBlocks)
, m_UseTempBlocks(!ForceDisableTempBlocks)
{
@@ -47,63 +49,91 @@ public:
.UseTempBlockFiles = m_UseTempBlocks,
.AllowChunking = true,
.ContainerName = fmt::format("{}/{}/{}", m_Namespace, m_Bucket, m_BuildId),
- .Description = fmt::format("[cloud] {} as {}/{}/{}"sv, m_JupiterClient->ServiceUrl(), m_Namespace, m_Bucket, m_BuildId)};
+ .Description = fmt::format("[cloud] {} as {}/{}/{}"sv, m_Url, m_Namespace, m_Bucket, m_BuildId)};
}
virtual Stats GetStats() const override
{
- return {.m_SentBytes = m_SentBytes.load(),
- .m_ReceivedBytes = m_ReceivedBytes.load(),
- .m_RequestTimeNS = m_RequestTimeNS.load(),
- .m_RequestCount = m_RequestCount.load(),
- .m_PeakSentBytes = m_PeakSentBytes.load(),
- .m_PeakReceivedBytes = m_PeakReceivedBytes.load(),
- .m_PeakBytesPerSec = m_PeakBytesPerSec.load()};
+ return {
+ .m_SentBytes = m_BuildStorageStats->TotalBytesWritten.load(),
+ .m_ReceivedBytes = m_BuildStorageStats->TotalBytesRead.load(),
+ .m_RequestTimeNS = m_BuildStorageStats->TotalRequestTimeUs.load() * 1000,
+ .m_RequestCount = m_BuildStorageStats->TotalRequestCount.load(),
+ .m_PeakSentBytes = m_BuildStorageStats->PeakSentBytes.load(),
+ .m_PeakReceivedBytes = m_BuildStorageStats->PeakReceivedBytes.load(),
+ .m_PeakBytesPerSec = m_BuildStorageStats->PeakBytesPerSec.load(),
+ };
}
virtual CreateContainerResult CreateContainer() override
{
ZEN_ASSERT(m_OplogBuildPartId == Oid::Zero);
- JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect);
-
- IoBuffer Payload = m_MetaData;
- Payload.SetContentType(ZenContentType::kCbObject);
- JupiterResult PutResult = Session.PutBuild(m_Namespace, m_Bucket, m_BuildId, Payload);
- AddStats(PutResult);
+ CreateContainerResult Result;
+ Stopwatch Timer;
+ auto _ = MakeGuard([&Timer, &Result]() { Result.ElapsedSeconds = Timer.GetElapsedTimeUs() * 1000000.0; });
- CreateContainerResult Result{ConvertResult(PutResult)};
- if (Result.ErrorCode)
+ CbObject Payload = LoadCompactBinaryObject(m_MetaData);
+ try
+ {
+ CbObject PutBuildResult = m_BuildStorage->PutBuild(m_BuildId, Payload);
+ ZEN_UNUSED(PutBuildResult);
+ m_OplogBuildPartId = Oid::NewOid();
+ }
+ catch (const HttpClientError& Ex)
{
- Result.Reason = fmt::format("Failed creating oplog build to {}/{}/{}/{}. Reason: '{}'",
- m_JupiterClient->ServiceUrl(),
- m_Namespace,
- m_Bucket,
- m_BuildId,
- Result.Reason);
+ Result.ErrorCode = MakeErrorCode(Ex);
+ Result.Reason =
+ fmt::format("Failed creating oplog build to {}/{}/{}/{}. Reason: '{}'", m_Url, m_Namespace, m_Bucket, m_BuildId, Ex.what());
}
- m_OplogBuildPartId = Oid::NewOid();
+ catch (const std::exception& Ex)
+ {
+ Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
+ Result.Reason =
+ fmt::format("Failed creating oplog build to {}/{}/{}/{}. Reason: '{}'", m_Url, m_Namespace, m_Bucket, m_BuildId, Ex.what());
+ }
+
return Result;
}
virtual SaveResult SaveContainer(const IoBuffer& Payload) override
{
ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero);
- JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect);
- PutBuildPartResult PutResult =
- Session.PutBuildPart(m_Namespace, m_Bucket, m_BuildId, m_OplogBuildPartId, OplogContainerPartName, Payload);
- AddStats(PutResult);
- SaveResult Result{ConvertResult(PutResult), {PutResult.Needs.begin(), PutResult.Needs.end()}, PutResult.RawHash};
- if (Result.ErrorCode)
+ SaveResult Result;
+ Stopwatch Timer;
+ auto _ = MakeGuard([&Timer, &Result]() { Result.ElapsedSeconds = Timer.GetElapsedTimeUs() * 1000000.0; });
+
+ try
+ {
+ CbObject ObjectPayload = LoadCompactBinaryObject(Payload);
+
+ std::pair<IoHash, std::vector<IoHash>> PutBuildPartResult =
+ m_BuildStorage->PutBuildPart(m_BuildId, m_OplogBuildPartId, OplogContainerPartName, ObjectPayload);
+ Result.RawHash = PutBuildPartResult.first;
+ Result.Needs = std::unordered_set<IoHash, IoHash::Hasher>(PutBuildPartResult.second.begin(), PutBuildPartResult.second.end());
+ }
+ catch (const HttpClientError& Ex)
{
- Result.Reason = fmt::format("Failed saving oplog container build part to {}/{}/{}/{}/{}. Reason: '{}'",
- m_JupiterClient->ServiceUrl(),
- m_Namespace,
- m_Bucket,
- m_BuildId,
- m_OplogBuildPartId,
- Result.Reason);
+ Result.ErrorCode = MakeErrorCode(Ex);
+ Result.Reason = fmt::format("Failed saving oplog container build part to {}/{}/{}/{}/{}. Reason: '{}'",
+ m_Url,
+ m_Namespace,
+ m_Bucket,
+ m_BuildId,
+ m_OplogBuildPartId,
+ Ex.what());
+ }
+ catch (const std::exception& Ex)
+ {
+ Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
+ Result.Reason = fmt::format("Failed saving oplog container build part to {}/{}/{}/{}/{}. Reason: '{}'",
+ m_Url,
+ m_Namespace,
+ m_Bucket,
+ m_BuildId,
+ m_OplogBuildPartId,
+ Ex.what());
}
return Result;
@@ -114,52 +144,84 @@ public:
ChunkBlockDescription&& Block) override
{
ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero);
- JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect);
- JupiterResult PutResult =
- Session.PutBuildBlob(m_Namespace, m_Bucket, m_BuildId, RawHash, ZenContentType::kCompressedBinary, Payload);
- AddStats(PutResult);
+ SaveAttachmentResult Result;
+ Stopwatch Timer;
+ auto _ = MakeGuard([&Timer, &Result]() { Result.ElapsedSeconds = Timer.GetElapsedTimeUs() * 1000000.0; });
- SaveAttachmentResult Result{ConvertResult(PutResult)};
- if (Result.ErrorCode)
+ try
{
- Result.Reason = fmt::format("Failed saving oplog attachment to {}/{}/{}/{}/{}. Reason: '{}'",
- m_JupiterClient->ServiceUrl(),
- m_Namespace,
- m_Bucket,
- m_BuildId,
- RawHash,
- Result.Reason);
- return Result;
- }
+ m_BuildStorage->PutBuildBlob(m_BuildId, RawHash, ZenContentType::kCompressedBinary, Payload);
- if (Block.BlockHash == RawHash)
- {
- CbObjectWriter BlockMetaData;
- BlockMetaData.AddString("createdBy", GetRunningExecutablePath().stem().string());
-
- IoBuffer MetaPayload = BuildChunkBlockDescription(Block, BlockMetaData.Save()).GetBuffer().AsIoBuffer();
- MetaPayload.SetContentType(ZenContentType::kCbObject);
- JupiterResult PutMetaResult = Session.PutBlockMetadata(m_Namespace, m_Bucket, m_BuildId, RawHash, MetaPayload);
- AddStats(PutMetaResult);
- RemoteProjectStore::Result MetaDataResult = ConvertResult(PutMetaResult);
- if (MetaDataResult.ErrorCode)
+ if (Block.BlockHash == RawHash)
{
- ZEN_WARN("Failed saving block attachment meta data to {}/{}/{}/{}/{}. Reason: '{}'",
- m_JupiterClient->ServiceUrl(),
- m_Namespace,
- m_Bucket,
- m_BuildId,
- RawHash,
- MetaDataResult.Reason);
+ try
+ {
+ CbObjectWriter BlockMetaData;
+ BlockMetaData.AddString("createdBy", GetRunningExecutablePath().stem().string());
+ CbObject MetaPayload = BuildChunkBlockDescription(Block, BlockMetaData.Save());
+ if (!m_BuildStorage->PutBlockMetadata(m_BuildId, RawHash, MetaPayload))
+ {
+ ZEN_WARN("Failed saving block attachment meta data to {}/{}/{}/{}/{}. Reason: '{}'",
+ m_Url,
+ m_Namespace,
+ m_Bucket,
+ m_BuildId,
+ RawHash,
+ "not found");
+ }
+ }
+ catch (const HttpClientError& Ex)
+ {
+ Result.ErrorCode = MakeErrorCode(Ex);
+ Result.Reason = fmt::format("Failed saving block attachment meta data to {}/{}/{}/{}. Reason: '{}'",
+ m_Url,
+ m_Namespace,
+ m_Bucket,
+ m_BuildId,
+ Ex.what());
+ }
+ catch (const std::exception& Ex)
+ {
+ Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
+ Result.Reason = fmt::format("Failed saving block attachment meta data to {}/{}/{}/{}. Reason: '{}'",
+ m_Url,
+ m_Namespace,
+ m_Bucket,
+ m_BuildId,
+ Ex.what());
+ }
}
}
+ catch (const HttpClientError& Ex)
+ {
+ Result.ErrorCode = MakeErrorCode(Ex);
+ Result.Reason = fmt::format("Failed saving oplog attachment to {}/{}/{}/{}. Reason: '{}'",
+ m_Url,
+ m_Namespace,
+ m_Bucket,
+ m_BuildId,
+ Ex.what());
+ }
+ catch (const std::exception& Ex)
+ {
+ Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
+ Result.Reason = fmt::format("Failed saving oplog attachment to {}/{}/{}/{}. Reason: '{}'",
+ m_Url,
+ m_Namespace,
+ m_Bucket,
+ m_BuildId,
+ Ex.what());
+ }
+
return Result;
}
virtual SaveAttachmentsResult SaveAttachments(const std::vector<SharedBuffer>& Chunks) override
{
SaveAttachmentsResult Result;
+ Stopwatch Timer;
+ auto _ = MakeGuard([&Timer, &Result]() { Result.ElapsedSeconds = Timer.GetElapsedTimeUs() * 1000000.0; });
for (const SharedBuffer& Chunk : Chunks)
{
CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(Chunk.AsIoBuffer());
@@ -177,37 +239,36 @@ public:
ZEN_UNUSED(RawHash);
ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero);
- JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect);
- FinalizeBuildPartResult FinalizeRefResult =
- Session.FinalizeBuildPart(m_Namespace, m_Bucket, m_BuildId, m_OplogBuildPartId, RawHash);
- AddStats(FinalizeRefResult);
+ FinalizeResult Result;
+ Stopwatch Timer;
+ auto _ = MakeGuard([&Timer, &Result]() { Result.ElapsedSeconds = Timer.GetElapsedTimeUs() * 1000000.0; });
- FinalizeResult Result{ConvertResult(FinalizeRefResult), {FinalizeRefResult.Needs.begin(), FinalizeRefResult.Needs.end()}};
- if (Result.ErrorCode)
+ try
{
- Result.Reason = fmt::format("Failed finalizing oplog container build part to {}/{}/{}/{}/{}. Reason: '{}'",
- m_JupiterClient->ServiceUrl(),
- m_Namespace,
- m_Bucket,
- m_BuildId,
- m_OplogBuildPartId,
- Result.Reason);
+ std::vector<IoHash> Needs = m_BuildStorage->FinalizeBuildPart(m_BuildId, m_OplogBuildPartId, RawHash);
+ Result.Needs = std::unordered_set<IoHash, IoHash::Hasher>(Needs.begin(), Needs.end());
}
- else if (Result.Needs.empty())
+ catch (const HttpClientError& Ex)
{
- JupiterResult FinalizeBuildResult = Session.FinalizeBuild(m_Namespace, m_Bucket, m_BuildId);
- AddStats(FinalizeBuildResult);
- FinalizeBuildResult.ElapsedSeconds += FinalizeRefResult.ElapsedSeconds;
- Result = {ConvertResult(FinalizeBuildResult)};
- if (Result.ErrorCode)
- {
- Result.Reason = fmt::format("Failed finalizing oplog container build to {}/{}/{}/{}. Reason: '{}'",
- m_JupiterClient->ServiceUrl(),
- m_Namespace,
- m_Bucket,
- m_BuildId,
- FinalizeBuildResult.Reason);
- }
+ Result.ErrorCode = Ex.m_Error != 0 ? Ex.m_Error : Ex.m_ResponseCode != HttpResponseCode::ImATeapot ? (int)Ex.m_ResponseCode : 0;
+ Result.Reason = fmt::format("Failed finalizing oplog container build part to {}/{}/{}/{}/{}. Reason: '{}'",
+ m_Url,
+ m_Namespace,
+ m_Bucket,
+ m_BuildId,
+ m_OplogBuildPartId,
+ Ex.what());
+ }
+ catch (const std::exception& Ex)
+ {
+ Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
+ Result.Reason = fmt::format("Failed finalizing oplog container build part to {}/{}/{}/{}/{}. Reason: '{}'",
+ m_Url,
+ m_Namespace,
+ m_Bucket,
+ m_BuildId,
+ m_OplogBuildPartId,
+ Ex.what());
}
return Result;
}
@@ -216,161 +277,128 @@ public:
{
ZEN_ASSERT(m_OplogBuildPartId == Oid::Zero);
- JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect);
- JupiterResult GetBuildResult = Session.GetBuild(m_Namespace, m_Bucket, m_BuildId);
- AddStats(GetBuildResult);
- LoadContainerResult Result{ConvertResult(GetBuildResult)};
- if (Result.ErrorCode)
- {
- Result.Reason = fmt::format("Failed fetching oplog container build from {}/{}/{}/{}. Reason: '{}'",
- m_JupiterClient->ServiceUrl(),
- m_Namespace,
- m_Bucket,
- m_BuildId,
- Result.Reason);
- return Result;
- }
- CbObject BuildObject = LoadCompactBinaryObject(GetBuildResult.Response);
- if (!BuildObject)
- {
- Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
- Result.Reason = fmt::format("The build {}/{}/{}/{} payload is not formatted as a compact binary object"sv,
- m_JupiterClient->ServiceUrl(),
- m_Namespace,
- m_Bucket,
- m_BuildId);
- return Result;
- }
- CbObjectView PartsObject = BuildObject["parts"sv].AsObjectView();
- if (!PartsObject)
+ LoadContainerResult Result;
+ Stopwatch Timer;
+ auto _ = MakeGuard([&Timer, &Result]() { Result.ElapsedSeconds = Timer.GetElapsedTimeUs() * 1000000.0; });
+
+ try
{
- Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
- Result.Reason = fmt::format("The build {}/{}/{}/{} payload does not contain a 'parts' object"sv,
- m_JupiterClient->ServiceUrl(),
- m_Namespace,
- m_Bucket,
- m_BuildId);
- return Result;
+ CbObject BuildObject = m_BuildStorage->GetBuild(m_BuildId);
+
+ CbObjectView PartsObject = BuildObject["parts"sv].AsObjectView();
+ if (!PartsObject)
+ {
+ throw std::runtime_error(fmt::format("The build {}/{}/{}/{} payload does not contain a 'parts' object"sv,
+ m_Url,
+ m_Namespace,
+ m_Bucket,
+ m_BuildId));
+ }
+ m_OplogBuildPartId = PartsObject[OplogContainerPartName].AsObjectId();
+ if (m_OplogBuildPartId == Oid::Zero)
+ {
+ throw std::runtime_error(fmt::format("The build {}/{}/{}/{} payload 'parts' object does not contain a '{}' entry"sv,
+ m_Url,
+ m_Namespace,
+ m_Bucket,
+ m_BuildId,
+ OplogContainerPartName));
+ }
+
+ Result.ContainerObject = m_BuildStorage->GetBuildPart(m_BuildId, m_OplogBuildPartId);
}
- m_OplogBuildPartId = PartsObject[OplogContainerPartName].AsObjectId();
- if (m_OplogBuildPartId == Oid::Zero)
+ catch (const HttpClientError& Ex)
{
- Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
- Result.Reason = fmt::format("The build {}/{}/{}/{} payload 'parts' object does not contain a '{}' entry"sv,
- m_JupiterClient->ServiceUrl(),
+ Result.ErrorCode = MakeErrorCode(Ex);
+ Result.Reason = fmt::format("Failed fetching oplog container build part to {}/{}/{}/{}. Reason: '{}'",
+ m_Url,
m_Namespace,
m_Bucket,
m_BuildId,
- OplogContainerPartName);
- return Result;
+ Ex.what());
}
-
- JupiterResult GetBuildPartResult = Session.GetBuildPart(m_Namespace, m_Bucket, m_BuildId, m_OplogBuildPartId);
- AddStats(GetBuildPartResult);
- Result = {ConvertResult(GetBuildResult)};
- Result.ElapsedSeconds += GetBuildResult.ElapsedSeconds;
- if (Result.ErrorCode)
+ catch (const std::exception& Ex)
{
Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
- Result.Reason = fmt::format("Failed fetching oplog build part from {}/{}/{}/{}/{}. Reason: '{}'",
- m_JupiterClient->ServiceUrl(),
+ Result.Reason = fmt::format("Failed fetching oplog container build part to {}/{}/{}/{}. Reason: '{}'",
+ m_Url,
m_Namespace,
m_Bucket,
m_BuildId,
- m_OplogBuildPartId,
- Result.Reason);
- return Result;
+ Ex.what());
}
- CbObject ContainerObject = LoadCompactBinaryObject(GetBuildPartResult.Response);
- if (!ContainerObject)
- {
- Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
- Result.Reason = fmt::format("The build part for oplog container {}/{}/{}/{}/{} is not formatted as a compact binary object"sv,
- m_JupiterClient->ServiceUrl(),
- m_Namespace,
- m_Bucket,
- m_BuildId,
- m_OplogBuildPartId);
- return Result;
- }
- Result.ContainerObject = std::move(ContainerObject);
return Result;
}
virtual GetKnownBlocksResult GetKnownBlocks() override
{
ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero);
- JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect);
- JupiterResult FindResult = Session.FindBlocks(m_Namespace, m_Bucket, m_BuildId, (uint64_t)-1);
- AddStats(FindResult);
- GetKnownBlocksResult Result{ConvertResult(FindResult)};
- if (Result.ErrorCode)
+
+ GetKnownBlocksResult Result;
+ Stopwatch Timer;
+ auto _ = MakeGuard([&Timer, &Result]() { Result.ElapsedSeconds = Timer.GetElapsedTimeUs() * 1000000.0; });
+
+ try
{
- Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
- Result.Reason = fmt::format("Failed listing know blocks for {}/{}/{}/{}. Reason: '{}'",
- m_JupiterClient->ServiceUrl(),
- m_Namespace,
- m_Bucket,
- m_BuildId,
- Result.Reason);
- return Result;
+ CbObject KnownBlocks = m_BuildStorage->FindBlocks(m_BuildId, 10000u);
+ std::optional<std::vector<ChunkBlockDescription>> Blocks = ParseChunkBlockDescriptionList(KnownBlocks);
+ Result.Blocks.reserve(Blocks.value().size());
+ for (ChunkBlockDescription& BlockDescription : Blocks.value())
+ {
+ Result.Blocks.push_back(ThinChunkBlockDescription{.BlockHash = BlockDescription.BlockHash,
+ .ChunkRawHashes = std::move(BlockDescription.ChunkRawHashes)});
+ }
}
- if (ValidateCompactBinary(FindResult.Response.GetView(), CbValidateMode::Default) != CbValidateError::None)
+ catch (const HttpClientError& Ex)
{
- Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
- Result.Reason = fmt::format("The block list {}/{}/{} is not formatted as a compact binary object"sv,
- m_JupiterClient->ServiceUrl(),
- m_Namespace,
- m_Bucket,
- m_BuildId);
- return Result;
+ Result.ErrorCode = MakeErrorCode(Ex);
+ Result.Reason =
+ fmt::format("Failed listing know blocks for {}/{}/{}/{}. Reason: '{}'", m_Url, m_Namespace, m_Bucket, m_BuildId, Ex.what());
}
- std::optional<std::vector<ChunkBlockDescription>> Blocks =
- ParseChunkBlockDescriptionList(LoadCompactBinaryObject(FindResult.Response));
- if (!Blocks)
+ catch (const std::exception& Ex)
{
Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
- Result.Reason = fmt::format("The block list {}/{}/{} is not formatted as a list of blocks"sv,
- m_JupiterClient->ServiceUrl(),
- m_Namespace,
- m_Bucket,
- m_BuildId);
- return Result;
- }
- Result.Blocks.reserve(Blocks.value().size());
- for (ChunkBlockDescription& BlockDescription : Blocks.value())
- {
- Result.Blocks.push_back(ThinChunkBlockDescription{.BlockHash = BlockDescription.BlockHash,
- .ChunkRawHashes = std::move(BlockDescription.ChunkRawHashes)});
+ Result.Reason =
+ fmt::format("Failed listing know blocks for {}/{}/{}/{}. Reason: '{}'", m_Url, m_Namespace, m_Bucket, m_BuildId, Ex.what());
}
+
return Result;
}
virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) override
{
ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero);
- JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect);
- JupiterResult GetResult = Session.GetBuildBlob(m_Namespace, m_Bucket, m_BuildId, RawHash, m_TempFilePath);
- AddStats(GetResult);
- LoadAttachmentResult Result{ConvertResult(GetResult), std::move(GetResult.Response)};
- if (GetResult.ErrorCode)
+ LoadAttachmentResult Result;
+ Stopwatch Timer;
+ auto _ = MakeGuard([&Timer, &Result]() { Result.ElapsedSeconds = Timer.GetElapsedTimeUs() * 1000000.0; });
+
+ try
+ {
+ Result.Bytes = m_BuildStorage->GetBuildBlob(m_BuildId, RawHash);
+ }
+ catch (const HttpClientError& Ex)
+ {
+ Result.ErrorCode = MakeErrorCode(Ex);
+ Result.Reason =
+ fmt::format("Failed listing know blocks for {}/{}/{}/{}. Reason: '{}'", m_Url, m_Namespace, m_Bucket, m_BuildId, Ex.what());
+ }
+ catch (const std::exception& Ex)
{
- Result.Reason = fmt::format("Failed fetching oplog attachment from {}/{}/{}/{}/{}. Reason: '{}'",
- m_JupiterClient->ServiceUrl(),
- m_Namespace,
- m_Bucket,
- m_BuildId,
- RawHash,
- Result.Reason);
+ Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
+ Result.Reason =
+ fmt::format("Failed listing know blocks for {}/{}/{}/{}. Reason: '{}'", m_Url, m_Namespace, m_Bucket, m_BuildId, Ex.what());
}
+
return Result;
}
virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes) override
{
LoadAttachmentsResult Result;
+ Stopwatch Timer;
+ auto _ = MakeGuard([&Timer, &Result]() { Result.ElapsedSeconds = Timer.GetElapsedTimeUs() * 1000000.0; });
for (const IoHash& Hash : RawHashes)
{
LoadAttachmentResult ChunkResult = LoadAttachment(Hash);
@@ -386,81 +414,27 @@ public:
}
private:
- void AddStats(const JupiterResult& Result)
+ static int MakeErrorCode(const HttpClientError& Ex)
{
- m_SentBytes.fetch_add(gsl::narrow<uint64_t>(Result.SentBytes));
- m_ReceivedBytes.fetch_add(gsl::narrow<uint64_t>(Result.ReceivedBytes));
- m_RequestTimeNS.fetch_add(static_cast<uint64_t>(Result.ElapsedSeconds * 1000000000));
- SetAtomicMax(m_PeakSentBytes, Result.SentBytes);
- SetAtomicMax(m_PeakReceivedBytes, Result.ReceivedBytes);
- if (Result.ElapsedSeconds > 0.0)
- {
- uint64_t BytesPerSec = static_cast<uint64_t>((Result.SentBytes + Result.ReceivedBytes) / Result.ElapsedSeconds);
- SetAtomicMax(m_PeakBytesPerSec, BytesPerSec);
- }
-
- m_RequestCount.fetch_add(1);
+ return Ex.m_Error != 0 ? Ex.m_Error : Ex.m_ResponseCode != HttpResponseCode::ImATeapot ? (int)Ex.m_ResponseCode : 0;
}
- static Result ConvertResult(const JupiterResult& Response)
- {
- std::string Text;
- int32_t ErrorCode = 0;
- if (Response.ErrorCode != 0 || !Response.Success)
- {
- if (Response.Response)
- {
- HttpContentType ContentType = Response.Response.GetContentType();
- if (ContentType == ZenContentType::kText || ContentType == ZenContentType::kJSON)
- {
- ExtendableStringBuilder<256> SB;
- SB.Append("\n");
- SB.Append(std::string_view(reinterpret_cast<const std::string::value_type*>(Response.Response.GetData()),
- Response.Response.GetSize()));
- Text = SB.ToString();
- }
- else if (ContentType == ZenContentType::kCbObject)
- {
- ExtendableStringBuilder<256> SB;
- SB.Append("\n");
- CompactBinaryToJson(Response.Response.GetView(), SB);
- Text = SB.ToString();
- }
- }
- }
- if (Response.ErrorCode != 0)
- {
- ErrorCode = Response.ErrorCode;
- }
- else if (!Response.Success)
- {
- ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
- }
- return {.ErrorCode = ErrorCode, .ElapsedSeconds = Response.ElapsedSeconds, .Reason = Response.Reason, .Text = Text};
- }
-
- Ref<JupiterClient> m_JupiterClient;
- const std::string m_Namespace;
- const std::string m_Bucket;
- const Oid m_BuildId;
- IoBuffer m_MetaData;
- Oid m_OplogBuildPartId = Oid::Zero;
- std::filesystem::path m_TempFilePath;
- const bool m_EnableBlocks = true;
- const bool m_UseTempBlocks = true;
- const bool m_AllowRedirect = false;
-
- std::atomic_uint64_t m_SentBytes = {};
- std::atomic_uint64_t m_ReceivedBytes = {};
- std::atomic_uint64_t m_RequestTimeNS = {};
- std::atomic_uint64_t m_RequestCount = {};
- std::atomic_uint64_t m_PeakSentBytes = {};
- std::atomic_uint64_t m_PeakReceivedBytes = {};
- std::atomic_uint64_t m_PeakBytesPerSec = {};
+ std::unique_ptr<BuildStorage::Statistics> m_BuildStorageStats;
+ std::unique_ptr<HttpClient> m_BuildStorageHttp;
+ std::unique_ptr<BuildStorage> m_BuildStorage;
+ const std::string m_Url;
+ const std::string m_Namespace;
+ const std::string m_Bucket;
+ const Oid m_BuildId;
+ IoBuffer m_MetaData;
+ Oid m_OplogBuildPartId = Oid::Zero;
+ const bool m_EnableBlocks = true;
+ const bool m_UseTempBlocks = true;
+ const bool m_AllowRedirect = false;
};
std::shared_ptr<RemoteProjectStore>
-CreateBuildsRemoteStore(const BuildsRemoteStoreOptions& Options, const std::filesystem::path& TempFilePath, bool Quiet)
+CreateJupiterBuildsRemoteStore(const BuildsRemoteStoreOptions& Options, const std::filesystem::path& TempFilePath, bool Quiet)
{
std::string Url = Options.Url;
if (Url.find("://"sv) == std::string::npos)
@@ -468,13 +442,7 @@ CreateBuildsRemoteStore(const BuildsRemoteStoreOptions& Options, const std::file
// Assume https URL
Url = fmt::format("https://{}"sv, Url);
}
- JupiterClientOptions ClientOptions{.Name = "Remote store"sv,
- .ServiceUrl = Url,
- .ConnectTimeout = std::chrono::milliseconds(2000),
- .Timeout = std::chrono::milliseconds(1800000),
- .AssumeHttp2 = Options.AssumeHttp2,
- .AllowResume = true,
- .RetryCount = 4};
+
// 1) openid-provider if given (assumes oidctoken.exe -Zen true has been run with matching Options.OpenIdProvider
// 2) Access token as parameter in request
// 3) Environment variable (different win vs linux/mac)
@@ -502,16 +470,31 @@ CreateBuildsRemoteStore(const BuildsRemoteStoreOptions& Options, const std::file
TokenProvider = httpclientauth::CreateFromDefaultOpenIdProvider(Options.AuthManager);
}
- Ref<JupiterClient> Client(new JupiterClient(ClientOptions, std::move(TokenProvider)));
+ HttpClientSettings ClientSettings{.LogCategory = "httpbuildsclient",
+ .ConnectTimeout = std::chrono::milliseconds(2000),
+ .Timeout = std::chrono::milliseconds(1800000),
+ .AccessTokenProvider = std::move(TokenProvider),
+ .AssumeHttp2 = Options.AssumeHttp2,
+ .AllowResume = true,
+ .RetryCount = 4};
+
+ std::unique_ptr<BuildStorage::Statistics> BuildStorageStats(std::make_unique<BuildStorage::Statistics>());
+
+ std::unique_ptr<HttpClient> BuildStorageHttp = std::make_unique<HttpClient>(Url, ClientSettings);
+
+ std::unique_ptr<BuildStorage> BuildStorage =
+ CreateJupiterBuildStorage(Log(), *BuildStorageHttp, *BuildStorageStats, Options.Namespace, Options.Bucket, false, TempFilePath);
- std::shared_ptr<RemoteProjectStore> RemoteStore = std::make_shared<BuildsRemoteStore>(std::move(Client),
+ std::shared_ptr<RemoteProjectStore> RemoteStore = std::make_shared<BuildsRemoteStore>(std::move(BuildStorageStats),
+ std::move(BuildStorageHttp),
+ std::move(BuildStorage),
+ Url,
Options.Namespace,
Options.Bucket,
Options.BuildId,
Options.MetaData,
Options.ForceDisableBlocks,
- Options.ForceDisableTempBlocks,
- TempFilePath);
+ Options.ForceDisableTempBlocks);
return RemoteStore;
}
diff --git a/src/zenserver/projectstore/buildsremoteprojectstore.h b/src/zenserver/projectstore/buildsremoteprojectstore.h
index 6f33ac89b..60b6caef7 100644
--- a/src/zenserver/projectstore/buildsremoteprojectstore.h
+++ b/src/zenserver/projectstore/buildsremoteprojectstore.h
@@ -24,8 +24,8 @@ struct BuildsRemoteStoreOptions : RemoteStoreOptions
IoBuffer MetaData;
};
-std::shared_ptr<RemoteProjectStore> CreateBuildsRemoteStore(const BuildsRemoteStoreOptions& Options,
- const std::filesystem::path& TempFilePath,
- bool Quiet);
+std::shared_ptr<RemoteProjectStore> CreateJupiterBuildsRemoteStore(const BuildsRemoteStoreOptions& Options,
+ const std::filesystem::path& TempFilePath,
+ bool Quiet);
} // namespace zen
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp
index a8c970323..07fd6e908 100644
--- a/src/zenserver/projectstore/projectstore.cpp
+++ b/src/zenserver/projectstore/projectstore.cpp
@@ -364,7 +364,7 @@ namespace {
ForceDisableTempBlocks,
AssumeHttp2,
MetaData};
- RemoteStore = CreateBuildsRemoteStore(Options, TempFilePath, /*Quiet*/ false);
+ RemoteStore = CreateJupiterBuildsRemoteStore(Options, TempFilePath, /*Quiet*/ false);
}
if (!RemoteStore)
@@ -5801,7 +5801,8 @@ ProjectStore::Export(Ref<ProjectStore::Project> Project, ProjectStore::Oplog& Op
ZEN_INFO("SaveOplog: Status: {} '{}'", ToString(Response.first), Response.second);
if (!IsHttpSuccessCode(Response.first))
{
- throw std::runtime_error(Response.second.empty() ? fmt::format("Status: {}", ToString(Response.first)) : Response.second);
+ throw JobError(Response.second.empty() ? fmt::format("Status: {}", ToString(Response.first)) : Response.second,
+ (int)Response.first);
}
});
@@ -5842,7 +5843,8 @@ ProjectStore::Import(ProjectStore::Project& Project, ProjectStore::Oplog& Oplog,
ZEN_INFO("LoadOplog: Status: {} '{}'", ToString(Response.first), Response.second);
if (!IsHttpSuccessCode(Response.first))
{
- throw std::runtime_error(Response.second.empty() ? fmt::format("Status: {}", ToString(Response.first)) : Response.second);
+ throw JobError(Response.second.empty() ? fmt::format("Status: {}", ToString(Response.first)) : Response.second,
+ (int)Response.first);
}
});
diff --git a/src/zenutil/buildstoragecache.cpp b/src/zenutil/buildstoragecache.cpp
index 88238effd..2171f4d62 100644
--- a/src/zenutil/buildstoragecache.cpp
+++ b/src/zenutil/buildstoragecache.cpp
@@ -378,6 +378,13 @@ private:
m_Stats.TotalBytesRead += Result.DownloadedBytes;
m_Stats.TotalRequestTimeUs += uint64_t(Result.ElapsedSeconds * 1000000.0);
m_Stats.TotalRequestCount++;
+ SetAtomicMax(m_Stats.PeakSentBytes, Result.UploadedBytes);
+ SetAtomicMax(m_Stats.PeakReceivedBytes, Result.DownloadedBytes);
+ if (Result.ElapsedSeconds > 0.0)
+ {
+ uint64_t BytesPerSec = uint64_t((Result.UploadedBytes + Result.DownloadedBytes) / Result.ElapsedSeconds);
+ SetAtomicMax(m_Stats.PeakBytesPerSec, BytesPerSec);
+ }
}
HttpClient& m_HttpClient;
diff --git a/src/zenutil/filebuildstorage.cpp b/src/zenutil/filebuildstorage.cpp
index c2cc5ab3c..f75fe403f 100644
--- a/src/zenutil/filebuildstorage.cpp
+++ b/src/zenutil/filebuildstorage.cpp
@@ -40,19 +40,23 @@ public:
ZEN_TRACE_CPU("FileBuildStorage::ListNamespaces");
ZEN_UNUSED(bRecursive);
- SimulateLatency(0, 0);
+ uint64_t ReceivedBytes = 0;
+ uint64_t SentBytes = 0;
+
+ SimulateLatency(SentBytes, 0);
+ auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); });
Stopwatch ExecutionTimer;
- auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
- m_Stats.TotalRequestCount++;
+ auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); });
CbObjectWriter Writer;
Writer.BeginArray("results");
{
}
Writer.EndArray(); // results
- Writer.Save();
- SimulateLatency(Writer.GetSaveSize(), 0);
+
+ Writer.Finalize();
+ ReceivedBytes = Writer.GetSaveSize();
return Writer.Save();
}
@@ -61,11 +65,14 @@ public:
ZEN_TRACE_CPU("FileBuildStorage::ListBuilds");
ZEN_UNUSED(Query);
- SimulateLatency(Query.GetSize(), 0);
+ uint64_t ReceivedBytes = 0;
+ uint64_t SentBytes = Query.GetSize();
+
+ SimulateLatency(SentBytes, 0);
+ auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); });
Stopwatch ExecutionTimer;
- auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
- m_Stats.TotalRequestCount++;
+ auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); });
const std::filesystem::path BuildFolder = GetBuildsFolder();
DirectoryContent Content;
@@ -88,19 +95,23 @@ public:
}
}
Writer.EndArray(); // results
- Writer.Save();
- SimulateLatency(Writer.GetSaveSize(), 0);
+
+ Writer.Finalize();
+ ReceivedBytes = Writer.GetSaveSize();
return Writer.Save();
}
virtual CbObject PutBuild(const Oid& BuildId, const CbObject& MetaData) override
{
ZEN_TRACE_CPU("FileBuildStorage::PutBuild");
- SimulateLatency(MetaData.GetSize(), 0);
+ uint64_t ReceivedBytes = 0;
+ uint64_t SentBytes = MetaData.GetSize();
+
+ SimulateLatency(SentBytes, 0);
+ auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); });
Stopwatch ExecutionTimer;
- auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
- m_Stats.TotalRequestCount++;
+ auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); });
CbObjectWriter BuildObject;
BuildObject.AddObject("metadata", MetaData);
@@ -109,35 +120,41 @@ public:
CbObjectWriter BuildResponse;
BuildResponse.AddInteger("chunkSize"sv, 32u * 1024u * 1024u);
- BuildResponse.Save();
-
- SimulateLatency(0, BuildResponse.GetSaveSize());
+ BuildResponse.Finalize();
+ ReceivedBytes = BuildResponse.GetSaveSize();
return BuildResponse.Save();
}
virtual CbObject GetBuild(const Oid& BuildId) override
{
ZEN_TRACE_CPU("FileBuildStorage::GetBuild");
- SimulateLatency(0, 0);
+ uint64_t ReceivedBytes = 0;
+ uint64_t SentBytes = 0;
+
+ SimulateLatency(SentBytes, 0);
+ auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); });
+
Stopwatch ExecutionTimer;
- auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
- m_Stats.TotalRequestCount++;
+ auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); });
CbObject Build = ReadBuild(BuildId);
- SimulateLatency(0, Build.GetSize());
+ ReceivedBytes = Build.GetSize();
return Build;
}
virtual void FinalizeBuild(const Oid& BuildId) override
{
ZEN_TRACE_CPU("FileBuildStorage::FinalizeBuild");
- SimulateLatency(0, 0);
- Stopwatch ExecutionTimer;
- auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
- m_Stats.TotalRequestCount++;
-
ZEN_UNUSED(BuildId);
- SimulateLatency(0, 0);
+
+ uint64_t ReceivedBytes = 0;
+ uint64_t SentBytes = 0;
+
+ SimulateLatency(SentBytes, 0);
+ auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); });
+
+ Stopwatch ExecutionTimer;
+ auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); });
}
virtual std::pair<IoHash, std::vector<IoHash>> PutBuildPart(const Oid& BuildId,
@@ -146,10 +163,14 @@ public:
const CbObject& MetaData) override
{
ZEN_TRACE_CPU("FileBuildStorage::PutBuildPart");
- SimulateLatency(MetaData.GetSize(), 0);
+ uint64_t ReceivedBytes = 0;
+ uint64_t SentBytes = MetaData.GetSize();
+
+ SimulateLatency(SentBytes, 0);
+ auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); });
+
Stopwatch ExecutionTimer;
- auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
- m_Stats.TotalRequestCount++;
+ auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); });
const std::filesystem::path BuildPartDataPath = GetBuildPartPath(BuildId, BuildPartId);
CreateDirectories(BuildPartDataPath.parent_path());
@@ -184,7 +205,7 @@ public:
std::vector<IoHash> NeededAttachments = GetNeededAttachments(MetaData);
- SimulateLatency(0, sizeof(IoHash) * NeededAttachments.size());
+ ReceivedBytes = sizeof(IoHash) * NeededAttachments.size();
return std::make_pair(RawHash, std::move(NeededAttachments));
}
@@ -192,22 +213,24 @@ public:
virtual CbObject GetBuildPart(const Oid& BuildId, const Oid& BuildPartId) override
{
ZEN_TRACE_CPU("FileBuildStorage::GetBuildPart");
- SimulateLatency(0, 0);
+ uint64_t ReceivedBytes = 0;
+ uint64_t SentBytes = 0;
+
+ SimulateLatency(SentBytes, 0);
+ auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); });
Stopwatch ExecutionTimer;
- auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
- m_Stats.TotalRequestCount++;
+ auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); });
const std::filesystem::path BuildPartDataPath = GetBuildPartPath(BuildId, BuildPartId);
IoBuffer Payload = ReadFile(BuildPartDataPath).Flatten();
- m_Stats.TotalBytesRead += Payload.GetSize();
ZEN_ASSERT(ValidateCompactBinary(Payload.GetView(), CbValidateMode::Default) == CbValidateError::None);
CbObject BuildPartObject = CbObject(SharedBuffer(Payload));
- SimulateLatency(0, BuildPartObject.GetSize());
+ ReceivedBytes = BuildPartObject.GetSize();
return BuildPartObject;
}
@@ -215,15 +238,18 @@ public:
virtual std::vector<IoHash> FinalizeBuildPart(const Oid& BuildId, const Oid& BuildPartId, const IoHash& PartHash) override
{
ZEN_TRACE_CPU("FileBuildStorage::FinalizeBuildPart");
- SimulateLatency(0, 0);
+ uint64_t ReceivedBytes = 0;
+ uint64_t SentBytes = 0;
+
+ SimulateLatency(SentBytes, 0);
+ auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); });
Stopwatch ExecutionTimer;
- auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
- m_Stats.TotalRequestCount++;
+ auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); });
const std::filesystem::path BuildPartDataPath = GetBuildPartPath(BuildId, BuildPartId);
IoBuffer Payload = ReadFile(BuildPartDataPath).Flatten();
- m_Stats.TotalBytesRead += Payload.GetSize();
+
IoHash RawHash = IoHash::HashBuffer(Payload.GetView());
if (RawHash != PartHash)
{
@@ -234,7 +260,7 @@ public:
CbObject BuildPartObject = CbObject(SharedBuffer(Payload));
std::vector<IoHash> NeededAttachments(GetNeededAttachments(BuildPartObject));
- SimulateLatency(0, NeededAttachments.size() * sizeof(IoHash));
+ ReceivedBytes = NeededAttachments.size() * sizeof(IoHash);
return NeededAttachments;
}
@@ -247,13 +273,14 @@ public:
ZEN_TRACE_CPU("FileBuildStorage::PutBuildBlob");
ZEN_UNUSED(BuildId);
ZEN_ASSERT(ContentType == ZenContentType::kCompressedBinary);
- SimulateLatency(Payload.GetSize(), 0);
ZEN_ASSERT_SLOW(ValidateCompressedBuffer(RawHash, Payload));
- Stopwatch ExecutionTimer;
- auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
- m_Stats.TotalRequestCount++;
+ uint64_t ReceivedBytes = 0;
+ uint64_t SentBytes = Payload.GetSize();
+
+ SimulateLatency(SentBytes, 0);
+ auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); });
const std::filesystem::path BlockPath = GetBlobPayloadPath(RawHash);
if (!IsFile(BlockPath))
@@ -261,8 +288,8 @@ public:
CreateDirectories(BlockPath.parent_path());
TemporaryFile::SafeWriteFile(BlockPath, Payload.Flatten().GetView());
}
- m_Stats.TotalBytesWritten += Payload.GetSize();
- SimulateLatency(0, 0);
+
+ ReceivedBytes = Payload.GetSize();
}
virtual std::vector<std::function<void()>> PutLargeBuildBlob(const Oid& BuildId,
@@ -275,10 +302,15 @@ public:
ZEN_TRACE_CPU("FileBuildStorage::PutLargeBuildBlob");
ZEN_UNUSED(BuildId);
ZEN_UNUSED(ContentType);
- SimulateLatency(0, 0);
+
+ uint64_t ReceivedBytes = 0;
+ uint64_t SentBytes = 0;
+
+ SimulateLatency(SentBytes, 0);
+ auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); });
+
Stopwatch ExecutionTimer;
- auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
- m_Stats.TotalRequestCount++;
+ auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); });
const std::filesystem::path BlockPath = GetBlobPayloadPath(RawHash);
if (!IsFile(BlockPath))
@@ -314,7 +346,15 @@ public:
WorkItems.push_back([this, RawHash, BlockPath, Workload, Offset, Size]() {
ZEN_TRACE_CPU("FileBuildStorage::PutLargeBuildBlob_Work");
IoBuffer PartPayload = Workload->Transmitter(Offset, Size);
- SimulateLatency(PartPayload.GetSize(), 0);
+
+ uint64_t ReceivedBytes = 0;
+ uint64_t SentBytes = PartPayload.GetSize();
+
+ SimulateLatency(SentBytes, 0);
+ auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); });
+
+ Stopwatch ExecutionTimer;
+ auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); });
std::error_code Ec;
Workload->TempFile.Write(PartPayload, Offset, Ec);
@@ -325,8 +365,7 @@ public:
Ec.message(),
Ec.value()));
}
- uint64_t BytesWritten = PartPayload.GetSize();
- m_Stats.TotalBytesWritten += BytesWritten;
+
const bool IsLastPart = Workload->PartsLeft.fetch_sub(1) == 1;
if (IsLastPart)
{
@@ -342,18 +381,14 @@ public:
Ec.value()));
}
}
- Workload->OnSentBytes(BytesWritten, IsLastPart);
- SimulateLatency(0, 0);
+ Workload->OnSentBytes(SentBytes, IsLastPart);
});
Offset += Size;
}
Workload->PartsLeft.store(WorkItems.size());
-
- SimulateLatency(0, 0);
return WorkItems;
}
- SimulateLatency(0, 0);
return {};
}
@@ -361,10 +396,15 @@ public:
{
ZEN_TRACE_CPU("FileBuildStorage::GetBuildBlob");
ZEN_UNUSED(BuildId);
- SimulateLatency(0, 0);
+
+ uint64_t ReceivedBytes = 0;
+ uint64_t SentBytes = 0;
+
+ SimulateLatency(SentBytes, 0);
+ auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); });
+
Stopwatch ExecutionTimer;
- auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
- m_Stats.TotalRequestCount++;
+ auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); });
const std::filesystem::path BlockPath = GetBlobPayloadPath(RawHash);
if (IsFile(BlockPath))
@@ -382,11 +422,9 @@ public:
ZEN_ASSERT_SLOW(ValidateCompressedBuffer(RawHash, CompositeBuffer(SharedBuffer(Payload))));
}
Payload.SetContentType(ZenContentType::kCompressedBinary);
- m_Stats.TotalBytesRead += Payload.GetSize();
- SimulateLatency(0, Payload.GetSize());
+ ReceivedBytes = Payload.GetSize();
return Payload;
}
- SimulateLatency(0, 0);
return IoBuffer{};
}
@@ -398,10 +436,15 @@ public:
{
ZEN_TRACE_CPU("FileBuildStorage::GetLargeBuildBlob");
ZEN_UNUSED(BuildId);
- SimulateLatency(0, 0);
+
+ uint64_t ReceivedBytes = 0;
+ uint64_t SentBytes = 0;
+
+ SimulateLatency(SentBytes, 0);
+ auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); });
+
Stopwatch ExecutionTimer;
- auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
- m_Stats.TotalRequestCount++;
+ auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); });
const std::filesystem::path BlockPath = GetBlobPayloadPath(RawHash);
if (IsFile(BlockPath))
@@ -429,22 +472,29 @@ public:
uint64_t Size = Min(ChunkSize, BlobSize - Offset);
WorkItems.push_back([this, BlockPath, Workload, Offset, Size]() {
ZEN_TRACE_CPU("FileBuildStorage::GetLargeBuildBlob_Work");
- SimulateLatency(0, 0);
+
+ uint64_t ReceivedBytes = 0;
+ uint64_t SentBytes = 0;
+
+ SimulateLatency(SentBytes, 0);
+ auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); });
+
+ Stopwatch ExecutionTimer;
+ auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); });
+
IoBuffer PartPayload(Size);
Workload->BlobFile.Read(PartPayload.GetMutableView().GetData(), Size, Offset);
- m_Stats.TotalBytesRead += PartPayload.GetSize();
+ ReceivedBytes = PartPayload.GetSize();
Workload->OnReceive(Offset, PartPayload);
uint64_t ByteRemaning = Workload->BytesRemaining.fetch_sub(Size);
if (ByteRemaning == Size)
{
Workload->OnComplete();
}
- SimulateLatency(Size, PartPayload.GetSize());
});
Offset += Size;
}
- SimulateLatency(0, 0);
return WorkItems;
}
return {};
@@ -455,18 +505,19 @@ public:
ZEN_TRACE_CPU("FileBuildStorage::PutBlockMetadata");
ZEN_UNUSED(BuildId);
- SimulateLatency(MetaData.GetSize(), 0);
+ uint64_t ReceivedBytes = 0;
+ uint64_t SentBytes = MetaData.GetSize();
+
+ SimulateLatency(SentBytes, 0);
+ auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); });
Stopwatch ExecutionTimer;
- auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
- m_Stats.TotalRequestCount++;
+ auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); });
const std::filesystem::path BlockMetaDataPath = GetBlobMetadataPath(BlockRawHash);
CreateDirectories(BlockMetaDataPath.parent_path());
TemporaryFile::SafeWriteFile(BlockMetaDataPath, MetaData.GetView());
- m_Stats.TotalBytesWritten += MetaData.GetSize();
WriteAsJson(BlockMetaDataPath, MetaData);
- SimulateLatency(0, 0);
return true;
}
@@ -474,10 +525,15 @@ public:
{
ZEN_TRACE_CPU("FileBuildStorage::FindBlocks");
ZEN_UNUSED(BuildId);
- SimulateLatency(sizeof(BuildId), 0);
+
+ uint64_t ReceivedBytes = 0;
+ uint64_t SentBytes = 0;
+
+ SimulateLatency(SentBytes, 0);
+ auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); });
+
Stopwatch ExecutionTimer;
- auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
- m_Stats.TotalRequestCount++;
+ auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); });
uint64_t FoundCount = 0;
@@ -495,8 +551,6 @@ public:
{
IoBuffer BlockMetaDataPayload = ReadFile(MetaDataFile).Flatten();
- m_Stats.TotalBytesRead += BlockMetaDataPayload.GetSize();
-
CbObject BlockObject = CbObject(SharedBuffer(BlockMetaDataPayload));
Writer.AddObject(BlockObject);
FoundCount++;
@@ -508,19 +562,25 @@ public:
}
}
Writer.EndArray(); // blocks
- CbObject Result = Writer.Save();
- SimulateLatency(0, Result.GetSize());
- return Result;
+
+ Writer.Finalize();
+ ReceivedBytes = Writer.GetSaveSize();
+ return Writer.Save();
}
virtual CbObject GetBlockMetadatas(const Oid& BuildId, std::span<const IoHash> BlockHashes) override
{
ZEN_TRACE_CPU("FileBuildStorage::GetBlockMetadata");
ZEN_UNUSED(BuildId);
- SimulateLatency(sizeof(Oid) + sizeof(IoHash) * BlockHashes.size(), 0);
+
+ uint64_t ReceivedBytes = 0;
+ uint64_t SentBytes = sizeof(Oid) + sizeof(IoHash) * BlockHashes.size();
+
+ SimulateLatency(SentBytes, 0);
+ auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); });
+
Stopwatch ExecutionTimer;
- auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
- m_Stats.TotalRequestCount++;
+ auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); });
CbObjectWriter Writer;
Writer.BeginArray("blocks");
@@ -532,22 +592,29 @@ public:
{
IoBuffer BlockMetaDataPayload = ReadFile(MetaDataFile).Flatten();
- m_Stats.TotalBytesRead += BlockMetaDataPayload.GetSize();
-
CbObject BlockObject = CbObject(SharedBuffer(BlockMetaDataPayload));
Writer.AddObject(BlockObject);
}
}
Writer.EndArray(); // blocks
- CbObject Result = Writer.Save();
- SimulateLatency(0, Result.GetSize());
- return Result;
+ Writer.Finalize();
+ ReceivedBytes = Writer.GetSaveSize();
+ return Writer.Save();
}
virtual void PutBuildPartStats(const Oid& BuildId,
const Oid& BuildPartId,
const tsl::robin_map<std::string, double>& FloatStats) override
{
+ uint64_t ReceivedBytes = 0;
+ uint64_t SentBytes = 0;
+
+ SimulateLatency(SentBytes, 0);
+ auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); });
+
+ Stopwatch ExecutionTimer;
+ auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); });
+
CbObjectWriter Request;
Request.BeginObject("floatStats"sv);
for (auto It : FloatStats)
@@ -555,17 +622,16 @@ public:
Request.AddFloat(It.first, It.second);
}
Request.EndObject();
- CbObject Payload = Request.Save();
-
- SimulateLatency(Payload.GetSize(), 0);
+ Request.Finalize();
+ SentBytes = Request.GetSaveSize();
const std::filesystem::path BuildPartStatsDataPath = GetBuildPartStatsPath(BuildId, BuildPartId);
CreateDirectories(BuildPartStatsDataPath.parent_path());
+ CbObject Payload = Request.Save();
+
TemporaryFile::SafeWriteFile(BuildPartStatsDataPath, Payload.GetView());
WriteAsJson(BuildPartStatsDataPath, Payload);
-
- SimulateLatency(0, 0);
}
protected:
@@ -629,7 +695,6 @@ protected:
const std::filesystem::path BuildDataPath = GetBuildPath(BuildId);
CreateDirectories(BuildDataPath.parent_path());
TemporaryFile::SafeWriteFile(BuildDataPath, Data.GetView());
- m_Stats.TotalBytesWritten += Data.GetSize();
WriteAsJson(BuildDataPath, Data);
}
@@ -646,7 +711,6 @@ protected:
Content.ErrorCode.value()));
}
IoBuffer Payload = Content.Flatten();
- m_Stats.TotalBytesRead += Payload.GetSize();
ZEN_ASSERT(ValidateCompactBinary(Payload.GetView(), CbValidateMode::Default) == CbValidateError::None);
CbObject BuildObject = CbObject(SharedBuffer(Payload));
return BuildObject;
@@ -704,6 +768,22 @@ protected:
}
private:
+ void AddStatistic(Stopwatch& ExecutionTimer, uint64_t UploadedBytes, uint64_t DownloadedBytes)
+ {
+ const uint64_t ElapsedUs = ExecutionTimer.GetElapsedTimeUs();
+ m_Stats.TotalBytesWritten += UploadedBytes;
+ m_Stats.TotalBytesRead += DownloadedBytes;
+ m_Stats.TotalRequestTimeUs += ElapsedUs;
+ m_Stats.TotalRequestCount++;
+ SetAtomicMax(m_Stats.PeakSentBytes, UploadedBytes);
+ SetAtomicMax(m_Stats.PeakReceivedBytes, DownloadedBytes);
+ if (ElapsedUs > 0)
+ {
+ uint64_t BytesPerSec = ((UploadedBytes + DownloadedBytes) * 1000000u) / ElapsedUs;
+ SetAtomicMax(m_Stats.PeakBytesPerSec, BytesPerSec);
+ }
+ }
+
const std::filesystem::path m_StoragePath;
BuildStorage::Statistics& m_Stats;
const bool m_EnableJsonOutput = false;
diff --git a/src/zenutil/include/zenutil/buildstorage.h b/src/zenutil/include/zenutil/buildstorage.h
index f49d4b42a..46ecd0a11 100644
--- a/src/zenutil/include/zenutil/buildstorage.h
+++ b/src/zenutil/include/zenutil/buildstorage.h
@@ -21,6 +21,9 @@ public:
std::atomic<uint64_t> TotalRequestCount = 0;
std::atomic<uint64_t> TotalRequestTimeUs = 0;
std::atomic<uint64_t> TotalExecutionTimeUs = 0;
+ std::atomic<uint64_t> PeakSentBytes = 0;
+ std::atomic<uint64_t> PeakReceivedBytes = 0;
+ std::atomic<uint64_t> PeakBytesPerSec = 0;
};
virtual ~BuildStorage() {}
diff --git a/src/zenutil/include/zenutil/buildstoragecache.h b/src/zenutil/include/zenutil/buildstoragecache.h
index e1fb73fd4..a0690a16a 100644
--- a/src/zenutil/include/zenutil/buildstoragecache.h
+++ b/src/zenutil/include/zenutil/buildstoragecache.h
@@ -22,6 +22,9 @@ public:
std::atomic<uint64_t> TotalRequestCount = 0;
std::atomic<uint64_t> TotalRequestTimeUs = 0;
std::atomic<uint64_t> TotalExecutionTimeUs = 0;
+ std::atomic<uint64_t> PeakSentBytes = 0;
+ std::atomic<uint64_t> PeakReceivedBytes = 0;
+ std::atomic<uint64_t> PeakBytesPerSec = 0;
};
virtual ~BuildStorageCache() {}
diff --git a/src/zenutil/jupiter/jupiterbuildstorage.cpp b/src/zenutil/jupiter/jupiterbuildstorage.cpp
index 9974725ff..c9278acb4 100644
--- a/src/zenutil/jupiter/jupiterbuildstorage.cpp
+++ b/src/zenutil/jupiter/jupiterbuildstorage.cpp
@@ -17,6 +17,16 @@ namespace zen {
using namespace std::literals;
+namespace {
+ void ThrowFromJupiterResult(const JupiterResult& Result, std::string_view Prefix)
+ {
+ int Error = Result.ErrorCode < (int)HttpResponseCode::Continue ? Result.ErrorCode : 0;
+ HttpResponseCode Status =
+ Result.ErrorCode >= int(HttpResponseCode::Continue) ? HttpResponseCode(Result.ErrorCode) : HttpResponseCode::ImATeapot;
+ throw HttpClientError(fmt::format("{}: {} ({})", Prefix, Result.Reason, Result.ErrorCode), Error, Status);
+ }
+} // namespace
+
class JupiterBuildStorage : public BuildStorage
{
public:
@@ -46,7 +56,7 @@ public:
AddStatistic(ListResult);
if (!ListResult.Success)
{
- throw std::runtime_error(fmt::format("Failed listing namespaces: {} ({})", ListResult.Reason, ListResult.ErrorCode));
+ ThrowFromJupiterResult(ListResult, "Failed listing namespaces");
}
CbObject NamespaceResponse = PayloadToCbObject("Failed listing namespaces"sv, ListResult.Response);
@@ -66,10 +76,10 @@ public:
AddStatistic(BucketsResult);
if (!BucketsResult.Success)
{
- throw std::runtime_error(
- fmt::format("Failed listing namespaces: {} ({})", BucketsResult.Reason, BucketsResult.ErrorCode));
+ ThrowFromJupiterResult(BucketsResult, fmt::format("Failed listing buckets in namespace {}", Namespace));
}
- CbObject BucketResponse = PayloadToCbObject("Failed listing namespaces"sv, BucketsResult.Response);
+ CbObject BucketResponse =
+ PayloadToCbObject(fmt::format("Failed listing buckets in namespace {}", Namespace), BucketsResult.Response);
Response.BeginArray("items");
for (CbFieldView BucketField : BucketResponse["buckets"])
@@ -103,7 +113,7 @@ public:
AddStatistic(ListResult);
if (!ListResult.Success)
{
- throw std::runtime_error(fmt::format("Failed listing builds: {} ({})", ListResult.Reason, ListResult.ErrorCode));
+ ThrowFromJupiterResult(ListResult, "Failed listing builds"sv);
}
return PayloadToCbObject("Failed listing builds"sv, ListResult.Response);
}
@@ -120,7 +130,7 @@ public:
AddStatistic(PutResult);
if (!PutResult.Success)
{
- throw std::runtime_error(fmt::format("Failed creating build: {} ({})", PutResult.Reason, PutResult.ErrorCode));
+ ThrowFromJupiterResult(PutResult, "Failed creating build"sv);
}
return PayloadToCbObject(fmt::format("Failed creating build: {}", BuildId), PutResult.Response);
}
@@ -135,7 +145,7 @@ public:
AddStatistic(GetBuildResult);
if (!GetBuildResult.Success)
{
- throw std::runtime_error(fmt::format("Failed fetching build: {} ({})", GetBuildResult.Reason, GetBuildResult.ErrorCode));
+ ThrowFromJupiterResult(GetBuildResult, "Failed fetching build"sv);
}
return PayloadToCbObject(fmt::format("Failed fetching build {}:", BuildId), GetBuildResult.Response);
}
@@ -150,8 +160,7 @@ public:
AddStatistic(FinalizeBuildResult);
if (!FinalizeBuildResult.Success)
{
- throw std::runtime_error(
- fmt::format("Failed finalizing build part: {} ({})", FinalizeBuildResult.Reason, FinalizeBuildResult.ErrorCode));
+ ThrowFromJupiterResult(FinalizeBuildResult, "Failed finalizing build"sv);
}
}
@@ -170,7 +179,7 @@ public:
AddStatistic(PutPartResult);
if (!PutPartResult.Success)
{
- throw std::runtime_error(fmt::format("Failed creating build part: {} ({})", PutPartResult.Reason, PutPartResult.ErrorCode));
+ ThrowFromJupiterResult(PutPartResult, "Failed creating build part"sv);
}
return std::make_pair(PutPartResult.RawHash, std::move(PutPartResult.Needs));
}
@@ -185,10 +194,7 @@ public:
AddStatistic(GetBuildPartResult);
if (!GetBuildPartResult.Success)
{
- throw std::runtime_error(fmt::format("Failed fetching build part {}: {} ({})",
- BuildPartId,
- GetBuildPartResult.Reason,
- GetBuildPartResult.ErrorCode));
+ ThrowFromJupiterResult(GetBuildPartResult, "Failed fetching build part"sv);
}
return PayloadToCbObject(fmt::format("Failed fetching build part {}:", BuildPartId), GetBuildPartResult.Response);
}
@@ -203,8 +209,7 @@ public:
AddStatistic(FinalizePartResult);
if (!FinalizePartResult.Success)
{
- throw std::runtime_error(
- fmt::format("Failed finalizing build part: {} ({})", FinalizePartResult.Reason, FinalizePartResult.ErrorCode));
+ ThrowFromJupiterResult(FinalizePartResult, "Failed finalizing build part"sv);
}
return std::move(FinalizePartResult.Needs);
}
@@ -222,7 +227,7 @@ public:
AddStatistic(PutBlobResult);
if (!PutBlobResult.Success)
{
- throw std::runtime_error(fmt::format("Failed putting build part: {} ({})", PutBlobResult.Reason, PutBlobResult.ErrorCode));
+ ThrowFromJupiterResult(PutBlobResult, "Failed putting build part"sv);
}
}
@@ -249,8 +254,7 @@ public:
AddStatistic(PutMultipartBlobResult);
if (!PutMultipartBlobResult.Success)
{
- throw std::runtime_error(
- fmt::format("Failed putting build part: {} ({})", PutMultipartBlobResult.Reason, PutMultipartBlobResult.ErrorCode));
+ ThrowFromJupiterResult(PutMultipartBlobResult, "Failed putting large build blob"sv);
}
OnSentBytes(PutMultipartBlobResult.SentBytes, WorkItems.empty());
@@ -265,7 +269,7 @@ public:
AddStatistic(PartResult);
if (!PartResult.Success)
{
- throw std::runtime_error(fmt::format("Failed putting build part: {} ({})", PartResult.Reason, PartResult.ErrorCode));
+ ThrowFromJupiterResult(PartResult, "Failed putting large build blob"sv);
}
OnSentBytes(PartResult.SentBytes, IsComplete);
});
@@ -285,8 +289,7 @@ public:
AddStatistic(GetBuildBlobResult);
if (!GetBuildBlobResult.Success)
{
- throw std::runtime_error(
- fmt::format("Failed fetching build blob {}: {} ({})", RawHash, GetBuildBlobResult.Reason, GetBuildBlobResult.ErrorCode));
+ ThrowFromJupiterResult(GetBuildBlobResult, "Failed fetching build blob"sv);
}
return std::move(GetBuildBlobResult.Response);
}
@@ -314,8 +317,7 @@ public:
AddStatistic(GetMultipartBlobResult);
if (!GetMultipartBlobResult.Success)
{
- throw std::runtime_error(
- fmt::format("Failed getting build part: {} ({})", GetMultipartBlobResult.Reason, GetMultipartBlobResult.ErrorCode));
+ ThrowFromJupiterResult(GetMultipartBlobResult, "Failed getting large build part"sv);
}
std::vector<std::function<void()>> WorkList;
for (auto& WorkItem : WorkItems)
@@ -327,7 +329,7 @@ public:
AddStatistic(PartResult);
if (!PartResult.Success)
{
- throw std::runtime_error(fmt::format("Failed getting build part: {} ({})", PartResult.Reason, PartResult.ErrorCode));
+ ThrowFromJupiterResult(PartResult, "Failed getting large build part"sv);
}
});
}
@@ -350,8 +352,7 @@ public:
{
return false;
}
- throw std::runtime_error(
- fmt::format("Failed putting build block metadata: {} ({})", PutMetaResult.Reason, PutMetaResult.ErrorCode));
+ ThrowFromJupiterResult(PutMetaResult, "Failed putting build block metadata"sv);
}
return true;
}
@@ -366,7 +367,7 @@ public:
AddStatistic(FindResult);
if (!FindResult.Success)
{
- throw std::runtime_error(fmt::format("Failed fetching known blocks: {} ({})", FindResult.Reason, FindResult.ErrorCode));
+ ThrowFromJupiterResult(FindResult, "Failed fetching known blocks"sv);
}
return PayloadToCbObject("Failed fetching known blocks"sv, FindResult.Response);
}
@@ -392,8 +393,7 @@ public:
AddStatistic(GetBlockMetadataResult);
if (!GetBlockMetadataResult.Success)
{
- throw std::runtime_error(
- fmt::format("Failed fetching block metadatas: {} ({})", GetBlockMetadataResult.Reason, GetBlockMetadataResult.ErrorCode));
+ ThrowFromJupiterResult(GetBlockMetadataResult, "Failed fetching block metadatas"sv);
}
return PayloadToCbObject("Failed fetching block metadatas", GetBlockMetadataResult.Response);
}
@@ -416,14 +416,12 @@ public:
AddStatistic(PutBuildPartStatsResult);
if (!PutBuildPartStatsResult.Success)
{
- throw std::runtime_error(fmt::format("Failed posting build part statistics: {} ({})",
- PutBuildPartStatsResult.Reason,
- PutBuildPartStatsResult.ErrorCode));
+ ThrowFromJupiterResult(PutBuildPartStatsResult, "Failed posting build part statistics"sv);
}
}
private:
- static CbObject PayloadToCbObject(std::string_view Context, const IoBuffer& Payload)
+ static CbObject PayloadToCbObject(std::string_view ErrorContext, const IoBuffer& Payload)
{
if (Payload.GetContentType() == ZenContentType::kJSON)
{
@@ -443,7 +441,7 @@ private:
else
{
throw std::runtime_error(
- fmt::format("{}: {} ({})", "Unsupported response format", Context, ToString(Payload.GetContentType())));
+ fmt::format("{}: {} ({})", "Unsupported response format", ErrorContext, ToString(Payload.GetContentType())));
}
}
@@ -453,6 +451,14 @@ private:
m_Stats.TotalBytesRead += Result.ReceivedBytes;
m_Stats.TotalRequestTimeUs += uint64_t(Result.ElapsedSeconds * 1000000.0);
m_Stats.TotalRequestCount++;
+
+ SetAtomicMax(m_Stats.PeakSentBytes, Result.SentBytes);
+ SetAtomicMax(m_Stats.PeakReceivedBytes, Result.ReceivedBytes);
+ if (Result.ElapsedSeconds > 0.0)
+ {
+ uint64_t BytesPerSec = uint64_t((Result.SentBytes + Result.ReceivedBytes) / Result.ElapsedSeconds);
+ SetAtomicMax(m_Stats.PeakBytesPerSec, BytesPerSec);
+ }
}
JupiterSession m_Session;