aboutsummaryrefslogtreecommitdiff
path: root/src/zenutil
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-08-12 13:53:58 +0200
committerGitHub Enterprise <[email protected]>2025-08-12 13:53:58 +0200
commit3a9bc3071b9a9452a5aef23c438233fc9e86fb47 (patch)
treeb0a1d67fe765f2ddc96772db088d781be159d627 /src/zenutil
parentadd filtering to builds download (#463) (diff)
downloadzen-3a9bc3071b9a9452a5aef23c438233fc9e86fb47.tar.xz
zen-3a9bc3071b9a9452a5aef23c438233fc9e86fb47.zip
use new builds api for oplogs (#464)
- Improvement: Refactored jupiter oplog export code to reuse builds jupiter wrapper classes - Improvement: If `zen builds`, `zen oplog-import` or `zen oplog-import` command fails due to a http error, the return code for the program will be set to the error/status code
Diffstat (limited to 'src/zenutil')
-rw-r--r--src/zenutil/buildstoragecache.cpp7
-rw-r--r--src/zenutil/filebuildstorage.cpp276
-rw-r--r--src/zenutil/include/zenutil/buildstorage.h3
-rw-r--r--src/zenutil/include/zenutil/buildstoragecache.h3
-rw-r--r--src/zenutil/jupiter/jupiterbuildstorage.cpp76
5 files changed, 232 insertions, 133 deletions
diff --git a/src/zenutil/buildstoragecache.cpp b/src/zenutil/buildstoragecache.cpp
index 88238effd..2171f4d62 100644
--- a/src/zenutil/buildstoragecache.cpp
+++ b/src/zenutil/buildstoragecache.cpp
@@ -378,6 +378,13 @@ private:
m_Stats.TotalBytesRead += Result.DownloadedBytes;
m_Stats.TotalRequestTimeUs += uint64_t(Result.ElapsedSeconds * 1000000.0);
m_Stats.TotalRequestCount++;
+ SetAtomicMax(m_Stats.PeakSentBytes, Result.UploadedBytes);
+ SetAtomicMax(m_Stats.PeakReceivedBytes, Result.DownloadedBytes);
+ if (Result.ElapsedSeconds > 0.0)
+ {
+ uint64_t BytesPerSec = uint64_t((Result.UploadedBytes + Result.DownloadedBytes) / Result.ElapsedSeconds);
+ SetAtomicMax(m_Stats.PeakBytesPerSec, BytesPerSec);
+ }
}
HttpClient& m_HttpClient;
diff --git a/src/zenutil/filebuildstorage.cpp b/src/zenutil/filebuildstorage.cpp
index c2cc5ab3c..f75fe403f 100644
--- a/src/zenutil/filebuildstorage.cpp
+++ b/src/zenutil/filebuildstorage.cpp
@@ -40,19 +40,23 @@ public:
ZEN_TRACE_CPU("FileBuildStorage::ListNamespaces");
ZEN_UNUSED(bRecursive);
- SimulateLatency(0, 0);
+ uint64_t ReceivedBytes = 0;
+ uint64_t SentBytes = 0;
+
+ SimulateLatency(SentBytes, 0);
+ auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); });
Stopwatch ExecutionTimer;
- auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
- m_Stats.TotalRequestCount++;
+ auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); });
CbObjectWriter Writer;
Writer.BeginArray("results");
{
}
Writer.EndArray(); // results
- Writer.Save();
- SimulateLatency(Writer.GetSaveSize(), 0);
+
+ Writer.Finalize();
+ ReceivedBytes = Writer.GetSaveSize();
return Writer.Save();
}
@@ -61,11 +65,14 @@ public:
ZEN_TRACE_CPU("FileBuildStorage::ListBuilds");
ZEN_UNUSED(Query);
- SimulateLatency(Query.GetSize(), 0);
+ uint64_t ReceivedBytes = 0;
+ uint64_t SentBytes = Query.GetSize();
+
+ SimulateLatency(SentBytes, 0);
+ auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); });
Stopwatch ExecutionTimer;
- auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
- m_Stats.TotalRequestCount++;
+ auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); });
const std::filesystem::path BuildFolder = GetBuildsFolder();
DirectoryContent Content;
@@ -88,19 +95,23 @@ public:
}
}
Writer.EndArray(); // results
- Writer.Save();
- SimulateLatency(Writer.GetSaveSize(), 0);
+
+ Writer.Finalize();
+ ReceivedBytes = Writer.GetSaveSize();
return Writer.Save();
}
virtual CbObject PutBuild(const Oid& BuildId, const CbObject& MetaData) override
{
ZEN_TRACE_CPU("FileBuildStorage::PutBuild");
- SimulateLatency(MetaData.GetSize(), 0);
+ uint64_t ReceivedBytes = 0;
+ uint64_t SentBytes = MetaData.GetSize();
+
+ SimulateLatency(SentBytes, 0);
+ auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); });
Stopwatch ExecutionTimer;
- auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
- m_Stats.TotalRequestCount++;
+ auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); });
CbObjectWriter BuildObject;
BuildObject.AddObject("metadata", MetaData);
@@ -109,35 +120,41 @@ public:
CbObjectWriter BuildResponse;
BuildResponse.AddInteger("chunkSize"sv, 32u * 1024u * 1024u);
- BuildResponse.Save();
-
- SimulateLatency(0, BuildResponse.GetSaveSize());
+ BuildResponse.Finalize();
+ ReceivedBytes = BuildResponse.GetSaveSize();
return BuildResponse.Save();
}
virtual CbObject GetBuild(const Oid& BuildId) override
{
ZEN_TRACE_CPU("FileBuildStorage::GetBuild");
- SimulateLatency(0, 0);
+ uint64_t ReceivedBytes = 0;
+ uint64_t SentBytes = 0;
+
+ SimulateLatency(SentBytes, 0);
+ auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); });
+
Stopwatch ExecutionTimer;
- auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
- m_Stats.TotalRequestCount++;
+ auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); });
CbObject Build = ReadBuild(BuildId);
- SimulateLatency(0, Build.GetSize());
+ ReceivedBytes = Build.GetSize();
return Build;
}
virtual void FinalizeBuild(const Oid& BuildId) override
{
ZEN_TRACE_CPU("FileBuildStorage::FinalizeBuild");
- SimulateLatency(0, 0);
- Stopwatch ExecutionTimer;
- auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
- m_Stats.TotalRequestCount++;
-
ZEN_UNUSED(BuildId);
- SimulateLatency(0, 0);
+
+ uint64_t ReceivedBytes = 0;
+ uint64_t SentBytes = 0;
+
+ SimulateLatency(SentBytes, 0);
+ auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); });
+
+ Stopwatch ExecutionTimer;
+ auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); });
}
virtual std::pair<IoHash, std::vector<IoHash>> PutBuildPart(const Oid& BuildId,
@@ -146,10 +163,14 @@ public:
const CbObject& MetaData) override
{
ZEN_TRACE_CPU("FileBuildStorage::PutBuildPart");
- SimulateLatency(MetaData.GetSize(), 0);
+ uint64_t ReceivedBytes = 0;
+ uint64_t SentBytes = MetaData.GetSize();
+
+ SimulateLatency(SentBytes, 0);
+ auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); });
+
Stopwatch ExecutionTimer;
- auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
- m_Stats.TotalRequestCount++;
+ auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); });
const std::filesystem::path BuildPartDataPath = GetBuildPartPath(BuildId, BuildPartId);
CreateDirectories(BuildPartDataPath.parent_path());
@@ -184,7 +205,7 @@ public:
std::vector<IoHash> NeededAttachments = GetNeededAttachments(MetaData);
- SimulateLatency(0, sizeof(IoHash) * NeededAttachments.size());
+ ReceivedBytes = sizeof(IoHash) * NeededAttachments.size();
return std::make_pair(RawHash, std::move(NeededAttachments));
}
@@ -192,22 +213,24 @@ public:
virtual CbObject GetBuildPart(const Oid& BuildId, const Oid& BuildPartId) override
{
ZEN_TRACE_CPU("FileBuildStorage::GetBuildPart");
- SimulateLatency(0, 0);
+ uint64_t ReceivedBytes = 0;
+ uint64_t SentBytes = 0;
+
+ SimulateLatency(SentBytes, 0);
+ auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); });
Stopwatch ExecutionTimer;
- auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
- m_Stats.TotalRequestCount++;
+ auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); });
const std::filesystem::path BuildPartDataPath = GetBuildPartPath(BuildId, BuildPartId);
IoBuffer Payload = ReadFile(BuildPartDataPath).Flatten();
- m_Stats.TotalBytesRead += Payload.GetSize();
ZEN_ASSERT(ValidateCompactBinary(Payload.GetView(), CbValidateMode::Default) == CbValidateError::None);
CbObject BuildPartObject = CbObject(SharedBuffer(Payload));
- SimulateLatency(0, BuildPartObject.GetSize());
+ ReceivedBytes = BuildPartObject.GetSize();
return BuildPartObject;
}
@@ -215,15 +238,18 @@ public:
virtual std::vector<IoHash> FinalizeBuildPart(const Oid& BuildId, const Oid& BuildPartId, const IoHash& PartHash) override
{
ZEN_TRACE_CPU("FileBuildStorage::FinalizeBuildPart");
- SimulateLatency(0, 0);
+ uint64_t ReceivedBytes = 0;
+ uint64_t SentBytes = 0;
+
+ SimulateLatency(SentBytes, 0);
+ auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); });
Stopwatch ExecutionTimer;
- auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
- m_Stats.TotalRequestCount++;
+ auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); });
const std::filesystem::path BuildPartDataPath = GetBuildPartPath(BuildId, BuildPartId);
IoBuffer Payload = ReadFile(BuildPartDataPath).Flatten();
- m_Stats.TotalBytesRead += Payload.GetSize();
+
IoHash RawHash = IoHash::HashBuffer(Payload.GetView());
if (RawHash != PartHash)
{
@@ -234,7 +260,7 @@ public:
CbObject BuildPartObject = CbObject(SharedBuffer(Payload));
std::vector<IoHash> NeededAttachments(GetNeededAttachments(BuildPartObject));
- SimulateLatency(0, NeededAttachments.size() * sizeof(IoHash));
+ ReceivedBytes = NeededAttachments.size() * sizeof(IoHash);
return NeededAttachments;
}
@@ -247,13 +273,14 @@ public:
ZEN_TRACE_CPU("FileBuildStorage::PutBuildBlob");
ZEN_UNUSED(BuildId);
ZEN_ASSERT(ContentType == ZenContentType::kCompressedBinary);
- SimulateLatency(Payload.GetSize(), 0);
ZEN_ASSERT_SLOW(ValidateCompressedBuffer(RawHash, Payload));
- Stopwatch ExecutionTimer;
- auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
- m_Stats.TotalRequestCount++;
+ uint64_t ReceivedBytes = 0;
+ uint64_t SentBytes = Payload.GetSize();
+
+ SimulateLatency(SentBytes, 0);
+ auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); });
const std::filesystem::path BlockPath = GetBlobPayloadPath(RawHash);
if (!IsFile(BlockPath))
@@ -261,8 +288,8 @@ public:
CreateDirectories(BlockPath.parent_path());
TemporaryFile::SafeWriteFile(BlockPath, Payload.Flatten().GetView());
}
- m_Stats.TotalBytesWritten += Payload.GetSize();
- SimulateLatency(0, 0);
+
+ ReceivedBytes = Payload.GetSize();
}
virtual std::vector<std::function<void()>> PutLargeBuildBlob(const Oid& BuildId,
@@ -275,10 +302,15 @@ public:
ZEN_TRACE_CPU("FileBuildStorage::PutLargeBuildBlob");
ZEN_UNUSED(BuildId);
ZEN_UNUSED(ContentType);
- SimulateLatency(0, 0);
+
+ uint64_t ReceivedBytes = 0;
+ uint64_t SentBytes = 0;
+
+ SimulateLatency(SentBytes, 0);
+ auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); });
+
Stopwatch ExecutionTimer;
- auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
- m_Stats.TotalRequestCount++;
+ auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); });
const std::filesystem::path BlockPath = GetBlobPayloadPath(RawHash);
if (!IsFile(BlockPath))
@@ -314,7 +346,15 @@ public:
WorkItems.push_back([this, RawHash, BlockPath, Workload, Offset, Size]() {
ZEN_TRACE_CPU("FileBuildStorage::PutLargeBuildBlob_Work");
IoBuffer PartPayload = Workload->Transmitter(Offset, Size);
- SimulateLatency(PartPayload.GetSize(), 0);
+
+ uint64_t ReceivedBytes = 0;
+ uint64_t SentBytes = PartPayload.GetSize();
+
+ SimulateLatency(SentBytes, 0);
+ auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); });
+
+ Stopwatch ExecutionTimer;
+ auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); });
std::error_code Ec;
Workload->TempFile.Write(PartPayload, Offset, Ec);
@@ -325,8 +365,7 @@ public:
Ec.message(),
Ec.value()));
}
- uint64_t BytesWritten = PartPayload.GetSize();
- m_Stats.TotalBytesWritten += BytesWritten;
+
const bool IsLastPart = Workload->PartsLeft.fetch_sub(1) == 1;
if (IsLastPart)
{
@@ -342,18 +381,14 @@ public:
Ec.value()));
}
}
- Workload->OnSentBytes(BytesWritten, IsLastPart);
- SimulateLatency(0, 0);
+ Workload->OnSentBytes(SentBytes, IsLastPart);
});
Offset += Size;
}
Workload->PartsLeft.store(WorkItems.size());
-
- SimulateLatency(0, 0);
return WorkItems;
}
- SimulateLatency(0, 0);
return {};
}
@@ -361,10 +396,15 @@ public:
{
ZEN_TRACE_CPU("FileBuildStorage::GetBuildBlob");
ZEN_UNUSED(BuildId);
- SimulateLatency(0, 0);
+
+ uint64_t ReceivedBytes = 0;
+ uint64_t SentBytes = 0;
+
+ SimulateLatency(SentBytes, 0);
+ auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); });
+
Stopwatch ExecutionTimer;
- auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
- m_Stats.TotalRequestCount++;
+ auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); });
const std::filesystem::path BlockPath = GetBlobPayloadPath(RawHash);
if (IsFile(BlockPath))
@@ -382,11 +422,9 @@ public:
ZEN_ASSERT_SLOW(ValidateCompressedBuffer(RawHash, CompositeBuffer(SharedBuffer(Payload))));
}
Payload.SetContentType(ZenContentType::kCompressedBinary);
- m_Stats.TotalBytesRead += Payload.GetSize();
- SimulateLatency(0, Payload.GetSize());
+ ReceivedBytes = Payload.GetSize();
return Payload;
}
- SimulateLatency(0, 0);
return IoBuffer{};
}
@@ -398,10 +436,15 @@ public:
{
ZEN_TRACE_CPU("FileBuildStorage::GetLargeBuildBlob");
ZEN_UNUSED(BuildId);
- SimulateLatency(0, 0);
+
+ uint64_t ReceivedBytes = 0;
+ uint64_t SentBytes = 0;
+
+ SimulateLatency(SentBytes, 0);
+ auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); });
+
Stopwatch ExecutionTimer;
- auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
- m_Stats.TotalRequestCount++;
+ auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); });
const std::filesystem::path BlockPath = GetBlobPayloadPath(RawHash);
if (IsFile(BlockPath))
@@ -429,22 +472,29 @@ public:
uint64_t Size = Min(ChunkSize, BlobSize - Offset);
WorkItems.push_back([this, BlockPath, Workload, Offset, Size]() {
ZEN_TRACE_CPU("FileBuildStorage::GetLargeBuildBlob_Work");
- SimulateLatency(0, 0);
+
+ uint64_t ReceivedBytes = 0;
+ uint64_t SentBytes = 0;
+
+ SimulateLatency(SentBytes, 0);
+ auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); });
+
+ Stopwatch ExecutionTimer;
+ auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); });
+
IoBuffer PartPayload(Size);
Workload->BlobFile.Read(PartPayload.GetMutableView().GetData(), Size, Offset);
- m_Stats.TotalBytesRead += PartPayload.GetSize();
+ ReceivedBytes = PartPayload.GetSize();
Workload->OnReceive(Offset, PartPayload);
uint64_t ByteRemaning = Workload->BytesRemaining.fetch_sub(Size);
if (ByteRemaning == Size)
{
Workload->OnComplete();
}
- SimulateLatency(Size, PartPayload.GetSize());
});
Offset += Size;
}
- SimulateLatency(0, 0);
return WorkItems;
}
return {};
@@ -455,18 +505,19 @@ public:
ZEN_TRACE_CPU("FileBuildStorage::PutBlockMetadata");
ZEN_UNUSED(BuildId);
- SimulateLatency(MetaData.GetSize(), 0);
+ uint64_t ReceivedBytes = 0;
+ uint64_t SentBytes = MetaData.GetSize();
+
+ SimulateLatency(SentBytes, 0);
+ auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); });
Stopwatch ExecutionTimer;
- auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
- m_Stats.TotalRequestCount++;
+ auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); });
const std::filesystem::path BlockMetaDataPath = GetBlobMetadataPath(BlockRawHash);
CreateDirectories(BlockMetaDataPath.parent_path());
TemporaryFile::SafeWriteFile(BlockMetaDataPath, MetaData.GetView());
- m_Stats.TotalBytesWritten += MetaData.GetSize();
WriteAsJson(BlockMetaDataPath, MetaData);
- SimulateLatency(0, 0);
return true;
}
@@ -474,10 +525,15 @@ public:
{
ZEN_TRACE_CPU("FileBuildStorage::FindBlocks");
ZEN_UNUSED(BuildId);
- SimulateLatency(sizeof(BuildId), 0);
+
+ uint64_t ReceivedBytes = 0;
+ uint64_t SentBytes = 0;
+
+ SimulateLatency(SentBytes, 0);
+ auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); });
+
Stopwatch ExecutionTimer;
- auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
- m_Stats.TotalRequestCount++;
+ auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); });
uint64_t FoundCount = 0;
@@ -495,8 +551,6 @@ public:
{
IoBuffer BlockMetaDataPayload = ReadFile(MetaDataFile).Flatten();
- m_Stats.TotalBytesRead += BlockMetaDataPayload.GetSize();
-
CbObject BlockObject = CbObject(SharedBuffer(BlockMetaDataPayload));
Writer.AddObject(BlockObject);
FoundCount++;
@@ -508,19 +562,25 @@ public:
}
}
Writer.EndArray(); // blocks
- CbObject Result = Writer.Save();
- SimulateLatency(0, Result.GetSize());
- return Result;
+
+ Writer.Finalize();
+ ReceivedBytes = Writer.GetSaveSize();
+ return Writer.Save();
}
virtual CbObject GetBlockMetadatas(const Oid& BuildId, std::span<const IoHash> BlockHashes) override
{
ZEN_TRACE_CPU("FileBuildStorage::GetBlockMetadata");
ZEN_UNUSED(BuildId);
- SimulateLatency(sizeof(Oid) + sizeof(IoHash) * BlockHashes.size(), 0);
+
+ uint64_t ReceivedBytes = 0;
+ uint64_t SentBytes = sizeof(Oid) + sizeof(IoHash) * BlockHashes.size();
+
+ SimulateLatency(SentBytes, 0);
+ auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); });
+
Stopwatch ExecutionTimer;
- auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
- m_Stats.TotalRequestCount++;
+ auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); });
CbObjectWriter Writer;
Writer.BeginArray("blocks");
@@ -532,22 +592,29 @@ public:
{
IoBuffer BlockMetaDataPayload = ReadFile(MetaDataFile).Flatten();
- m_Stats.TotalBytesRead += BlockMetaDataPayload.GetSize();
-
CbObject BlockObject = CbObject(SharedBuffer(BlockMetaDataPayload));
Writer.AddObject(BlockObject);
}
}
Writer.EndArray(); // blocks
- CbObject Result = Writer.Save();
- SimulateLatency(0, Result.GetSize());
- return Result;
+ Writer.Finalize();
+ ReceivedBytes = Writer.GetSaveSize();
+ return Writer.Save();
}
virtual void PutBuildPartStats(const Oid& BuildId,
const Oid& BuildPartId,
const tsl::robin_map<std::string, double>& FloatStats) override
{
+ uint64_t ReceivedBytes = 0;
+ uint64_t SentBytes = 0;
+
+ SimulateLatency(SentBytes, 0);
+ auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); });
+
+ Stopwatch ExecutionTimer;
+ auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); });
+
CbObjectWriter Request;
Request.BeginObject("floatStats"sv);
for (auto It : FloatStats)
@@ -555,17 +622,16 @@ public:
Request.AddFloat(It.first, It.second);
}
Request.EndObject();
- CbObject Payload = Request.Save();
-
- SimulateLatency(Payload.GetSize(), 0);
+ Request.Finalize();
+ SentBytes = Request.GetSaveSize();
const std::filesystem::path BuildPartStatsDataPath = GetBuildPartStatsPath(BuildId, BuildPartId);
CreateDirectories(BuildPartStatsDataPath.parent_path());
+ CbObject Payload = Request.Save();
+
TemporaryFile::SafeWriteFile(BuildPartStatsDataPath, Payload.GetView());
WriteAsJson(BuildPartStatsDataPath, Payload);
-
- SimulateLatency(0, 0);
}
protected:
@@ -629,7 +695,6 @@ protected:
const std::filesystem::path BuildDataPath = GetBuildPath(BuildId);
CreateDirectories(BuildDataPath.parent_path());
TemporaryFile::SafeWriteFile(BuildDataPath, Data.GetView());
- m_Stats.TotalBytesWritten += Data.GetSize();
WriteAsJson(BuildDataPath, Data);
}
@@ -646,7 +711,6 @@ protected:
Content.ErrorCode.value()));
}
IoBuffer Payload = Content.Flatten();
- m_Stats.TotalBytesRead += Payload.GetSize();
ZEN_ASSERT(ValidateCompactBinary(Payload.GetView(), CbValidateMode::Default) == CbValidateError::None);
CbObject BuildObject = CbObject(SharedBuffer(Payload));
return BuildObject;
@@ -704,6 +768,22 @@ protected:
}
private:
+ void AddStatistic(Stopwatch& ExecutionTimer, uint64_t UploadedBytes, uint64_t DownloadedBytes)
+ {
+ const uint64_t ElapsedUs = ExecutionTimer.GetElapsedTimeUs();
+ m_Stats.TotalBytesWritten += UploadedBytes;
+ m_Stats.TotalBytesRead += DownloadedBytes;
+ m_Stats.TotalRequestTimeUs += ElapsedUs;
+ m_Stats.TotalRequestCount++;
+ SetAtomicMax(m_Stats.PeakSentBytes, UploadedBytes);
+ SetAtomicMax(m_Stats.PeakReceivedBytes, DownloadedBytes);
+ if (ElapsedUs > 0)
+ {
+ uint64_t BytesPerSec = ((UploadedBytes + DownloadedBytes) * 1000000u) / ElapsedUs;
+ SetAtomicMax(m_Stats.PeakBytesPerSec, BytesPerSec);
+ }
+ }
+
const std::filesystem::path m_StoragePath;
BuildStorage::Statistics& m_Stats;
const bool m_EnableJsonOutput = false;
diff --git a/src/zenutil/include/zenutil/buildstorage.h b/src/zenutil/include/zenutil/buildstorage.h
index f49d4b42a..46ecd0a11 100644
--- a/src/zenutil/include/zenutil/buildstorage.h
+++ b/src/zenutil/include/zenutil/buildstorage.h
@@ -21,6 +21,9 @@ public:
std::atomic<uint64_t> TotalRequestCount = 0;
std::atomic<uint64_t> TotalRequestTimeUs = 0;
std::atomic<uint64_t> TotalExecutionTimeUs = 0;
+ std::atomic<uint64_t> PeakSentBytes = 0;
+ std::atomic<uint64_t> PeakReceivedBytes = 0;
+ std::atomic<uint64_t> PeakBytesPerSec = 0;
};
virtual ~BuildStorage() {}
diff --git a/src/zenutil/include/zenutil/buildstoragecache.h b/src/zenutil/include/zenutil/buildstoragecache.h
index e1fb73fd4..a0690a16a 100644
--- a/src/zenutil/include/zenutil/buildstoragecache.h
+++ b/src/zenutil/include/zenutil/buildstoragecache.h
@@ -22,6 +22,9 @@ public:
std::atomic<uint64_t> TotalRequestCount = 0;
std::atomic<uint64_t> TotalRequestTimeUs = 0;
std::atomic<uint64_t> TotalExecutionTimeUs = 0;
+ std::atomic<uint64_t> PeakSentBytes = 0;
+ std::atomic<uint64_t> PeakReceivedBytes = 0;
+ std::atomic<uint64_t> PeakBytesPerSec = 0;
};
virtual ~BuildStorageCache() {}
diff --git a/src/zenutil/jupiter/jupiterbuildstorage.cpp b/src/zenutil/jupiter/jupiterbuildstorage.cpp
index 9974725ff..c9278acb4 100644
--- a/src/zenutil/jupiter/jupiterbuildstorage.cpp
+++ b/src/zenutil/jupiter/jupiterbuildstorage.cpp
@@ -17,6 +17,16 @@ namespace zen {
using namespace std::literals;
+namespace {
+ void ThrowFromJupiterResult(const JupiterResult& Result, std::string_view Prefix)
+ {
+ int Error = Result.ErrorCode < (int)HttpResponseCode::Continue ? Result.ErrorCode : 0;
+ HttpResponseCode Status =
+ Result.ErrorCode >= int(HttpResponseCode::Continue) ? HttpResponseCode(Result.ErrorCode) : HttpResponseCode::ImATeapot;
+ throw HttpClientError(fmt::format("{}: {} ({})", Prefix, Result.Reason, Result.ErrorCode), Error, Status);
+ }
+} // namespace
+
class JupiterBuildStorage : public BuildStorage
{
public:
@@ -46,7 +56,7 @@ public:
AddStatistic(ListResult);
if (!ListResult.Success)
{
- throw std::runtime_error(fmt::format("Failed listing namespaces: {} ({})", ListResult.Reason, ListResult.ErrorCode));
+ ThrowFromJupiterResult(ListResult, "Failed listing namespaces");
}
CbObject NamespaceResponse = PayloadToCbObject("Failed listing namespaces"sv, ListResult.Response);
@@ -66,10 +76,10 @@ public:
AddStatistic(BucketsResult);
if (!BucketsResult.Success)
{
- throw std::runtime_error(
- fmt::format("Failed listing namespaces: {} ({})", BucketsResult.Reason, BucketsResult.ErrorCode));
+ ThrowFromJupiterResult(BucketsResult, fmt::format("Failed listing buckets in namespace {}", Namespace));
}
- CbObject BucketResponse = PayloadToCbObject("Failed listing namespaces"sv, BucketsResult.Response);
+ CbObject BucketResponse =
+ PayloadToCbObject(fmt::format("Failed listing buckets in namespace {}", Namespace), BucketsResult.Response);
Response.BeginArray("items");
for (CbFieldView BucketField : BucketResponse["buckets"])
@@ -103,7 +113,7 @@ public:
AddStatistic(ListResult);
if (!ListResult.Success)
{
- throw std::runtime_error(fmt::format("Failed listing builds: {} ({})", ListResult.Reason, ListResult.ErrorCode));
+ ThrowFromJupiterResult(ListResult, "Failed listing builds"sv);
}
return PayloadToCbObject("Failed listing builds"sv, ListResult.Response);
}
@@ -120,7 +130,7 @@ public:
AddStatistic(PutResult);
if (!PutResult.Success)
{
- throw std::runtime_error(fmt::format("Failed creating build: {} ({})", PutResult.Reason, PutResult.ErrorCode));
+ ThrowFromJupiterResult(PutResult, "Failed creating build"sv);
}
return PayloadToCbObject(fmt::format("Failed creating build: {}", BuildId), PutResult.Response);
}
@@ -135,7 +145,7 @@ public:
AddStatistic(GetBuildResult);
if (!GetBuildResult.Success)
{
- throw std::runtime_error(fmt::format("Failed fetching build: {} ({})", GetBuildResult.Reason, GetBuildResult.ErrorCode));
+ ThrowFromJupiterResult(GetBuildResult, "Failed fetching build"sv);
}
return PayloadToCbObject(fmt::format("Failed fetching build {}:", BuildId), GetBuildResult.Response);
}
@@ -150,8 +160,7 @@ public:
AddStatistic(FinalizeBuildResult);
if (!FinalizeBuildResult.Success)
{
- throw std::runtime_error(
- fmt::format("Failed finalizing build part: {} ({})", FinalizeBuildResult.Reason, FinalizeBuildResult.ErrorCode));
+ ThrowFromJupiterResult(FinalizeBuildResult, "Failed finalizing build"sv);
}
}
@@ -170,7 +179,7 @@ public:
AddStatistic(PutPartResult);
if (!PutPartResult.Success)
{
- throw std::runtime_error(fmt::format("Failed creating build part: {} ({})", PutPartResult.Reason, PutPartResult.ErrorCode));
+ ThrowFromJupiterResult(PutPartResult, "Failed creating build part"sv);
}
return std::make_pair(PutPartResult.RawHash, std::move(PutPartResult.Needs));
}
@@ -185,10 +194,7 @@ public:
AddStatistic(GetBuildPartResult);
if (!GetBuildPartResult.Success)
{
- throw std::runtime_error(fmt::format("Failed fetching build part {}: {} ({})",
- BuildPartId,
- GetBuildPartResult.Reason,
- GetBuildPartResult.ErrorCode));
+ ThrowFromJupiterResult(GetBuildPartResult, "Failed fetching build part"sv);
}
return PayloadToCbObject(fmt::format("Failed fetching build part {}:", BuildPartId), GetBuildPartResult.Response);
}
@@ -203,8 +209,7 @@ public:
AddStatistic(FinalizePartResult);
if (!FinalizePartResult.Success)
{
- throw std::runtime_error(
- fmt::format("Failed finalizing build part: {} ({})", FinalizePartResult.Reason, FinalizePartResult.ErrorCode));
+ ThrowFromJupiterResult(FinalizePartResult, "Failed finalizing build part"sv);
}
return std::move(FinalizePartResult.Needs);
}
@@ -222,7 +227,7 @@ public:
AddStatistic(PutBlobResult);
if (!PutBlobResult.Success)
{
- throw std::runtime_error(fmt::format("Failed putting build part: {} ({})", PutBlobResult.Reason, PutBlobResult.ErrorCode));
+ ThrowFromJupiterResult(PutBlobResult, "Failed putting build part"sv);
}
}
@@ -249,8 +254,7 @@ public:
AddStatistic(PutMultipartBlobResult);
if (!PutMultipartBlobResult.Success)
{
- throw std::runtime_error(
- fmt::format("Failed putting build part: {} ({})", PutMultipartBlobResult.Reason, PutMultipartBlobResult.ErrorCode));
+ ThrowFromJupiterResult(PutMultipartBlobResult, "Failed putting large build blob"sv);
}
OnSentBytes(PutMultipartBlobResult.SentBytes, WorkItems.empty());
@@ -265,7 +269,7 @@ public:
AddStatistic(PartResult);
if (!PartResult.Success)
{
- throw std::runtime_error(fmt::format("Failed putting build part: {} ({})", PartResult.Reason, PartResult.ErrorCode));
+ ThrowFromJupiterResult(PartResult, "Failed putting large build blob"sv);
}
OnSentBytes(PartResult.SentBytes, IsComplete);
});
@@ -285,8 +289,7 @@ public:
AddStatistic(GetBuildBlobResult);
if (!GetBuildBlobResult.Success)
{
- throw std::runtime_error(
- fmt::format("Failed fetching build blob {}: {} ({})", RawHash, GetBuildBlobResult.Reason, GetBuildBlobResult.ErrorCode));
+ ThrowFromJupiterResult(GetBuildBlobResult, "Failed fetching build blob"sv);
}
return std::move(GetBuildBlobResult.Response);
}
@@ -314,8 +317,7 @@ public:
AddStatistic(GetMultipartBlobResult);
if (!GetMultipartBlobResult.Success)
{
- throw std::runtime_error(
- fmt::format("Failed getting build part: {} ({})", GetMultipartBlobResult.Reason, GetMultipartBlobResult.ErrorCode));
+ ThrowFromJupiterResult(GetMultipartBlobResult, "Failed getting large build part"sv);
}
std::vector<std::function<void()>> WorkList;
for (auto& WorkItem : WorkItems)
@@ -327,7 +329,7 @@ public:
AddStatistic(PartResult);
if (!PartResult.Success)
{
- throw std::runtime_error(fmt::format("Failed getting build part: {} ({})", PartResult.Reason, PartResult.ErrorCode));
+ ThrowFromJupiterResult(PartResult, "Failed getting large build part"sv);
}
});
}
@@ -350,8 +352,7 @@ public:
{
return false;
}
- throw std::runtime_error(
- fmt::format("Failed putting build block metadata: {} ({})", PutMetaResult.Reason, PutMetaResult.ErrorCode));
+ ThrowFromJupiterResult(PutMetaResult, "Failed putting build block metadata"sv);
}
return true;
}
@@ -366,7 +367,7 @@ public:
AddStatistic(FindResult);
if (!FindResult.Success)
{
- throw std::runtime_error(fmt::format("Failed fetching known blocks: {} ({})", FindResult.Reason, FindResult.ErrorCode));
+ ThrowFromJupiterResult(FindResult, "Failed fetching known blocks"sv);
}
return PayloadToCbObject("Failed fetching known blocks"sv, FindResult.Response);
}
@@ -392,8 +393,7 @@ public:
AddStatistic(GetBlockMetadataResult);
if (!GetBlockMetadataResult.Success)
{
- throw std::runtime_error(
- fmt::format("Failed fetching block metadatas: {} ({})", GetBlockMetadataResult.Reason, GetBlockMetadataResult.ErrorCode));
+ ThrowFromJupiterResult(GetBlockMetadataResult, "Failed fetching block metadatas"sv);
}
return PayloadToCbObject("Failed fetching block metadatas", GetBlockMetadataResult.Response);
}
@@ -416,14 +416,12 @@ public:
AddStatistic(PutBuildPartStatsResult);
if (!PutBuildPartStatsResult.Success)
{
- throw std::runtime_error(fmt::format("Failed posting build part statistics: {} ({})",
- PutBuildPartStatsResult.Reason,
- PutBuildPartStatsResult.ErrorCode));
+ ThrowFromJupiterResult(PutBuildPartStatsResult, "Failed posting build part statistics"sv);
}
}
private:
- static CbObject PayloadToCbObject(std::string_view Context, const IoBuffer& Payload)
+ static CbObject PayloadToCbObject(std::string_view ErrorContext, const IoBuffer& Payload)
{
if (Payload.GetContentType() == ZenContentType::kJSON)
{
@@ -443,7 +441,7 @@ private:
else
{
throw std::runtime_error(
- fmt::format("{}: {} ({})", "Unsupported response format", Context, ToString(Payload.GetContentType())));
+ fmt::format("{}: {} ({})", "Unsupported response format", ErrorContext, ToString(Payload.GetContentType())));
}
}
@@ -453,6 +451,14 @@ private:
m_Stats.TotalBytesRead += Result.ReceivedBytes;
m_Stats.TotalRequestTimeUs += uint64_t(Result.ElapsedSeconds * 1000000.0);
m_Stats.TotalRequestCount++;
+
+ SetAtomicMax(m_Stats.PeakSentBytes, Result.SentBytes);
+ SetAtomicMax(m_Stats.PeakReceivedBytes, Result.ReceivedBytes);
+ if (Result.ElapsedSeconds > 0.0)
+ {
+ uint64_t BytesPerSec = uint64_t((Result.SentBytes + Result.ReceivedBytes) / Result.ElapsedSeconds);
+ SetAtomicMax(m_Stats.PeakBytesPerSec, BytesPerSec);
+ }
}
JupiterSession m_Session;