diff options
| author | Dan Engelbrecht <[email protected]> | 2025-08-12 13:53:58 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-08-12 13:53:58 +0200 |
| commit | 3a9bc3071b9a9452a5aef23c438233fc9e86fb47 (patch) | |
| tree | b0a1d67fe765f2ddc96772db088d781be159d627 /src/zenutil | |
| parent | add filtering to builds download (#463) (diff) | |
| download | zen-3a9bc3071b9a9452a5aef23c438233fc9e86fb47.tar.xz zen-3a9bc3071b9a9452a5aef23c438233fc9e86fb47.zip | |
use new builds api for oplogs (#464)
- Improvement: Refactored jupiter oplog export code to reuse builds jupiter wrapper classes
- Improvement: If `zen builds`, `zen oplog-import` or `zen oplog-import` command fails due to a http error, the return code for the program will be set to the error/status code
Diffstat (limited to 'src/zenutil')
| -rw-r--r-- | src/zenutil/buildstoragecache.cpp | 7 | ||||
| -rw-r--r-- | src/zenutil/filebuildstorage.cpp | 276 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/buildstorage.h | 3 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/buildstoragecache.h | 3 | ||||
| -rw-r--r-- | src/zenutil/jupiter/jupiterbuildstorage.cpp | 76 |
5 files changed, 232 insertions, 133 deletions
diff --git a/src/zenutil/buildstoragecache.cpp b/src/zenutil/buildstoragecache.cpp index 88238effd..2171f4d62 100644 --- a/src/zenutil/buildstoragecache.cpp +++ b/src/zenutil/buildstoragecache.cpp @@ -378,6 +378,13 @@ private: m_Stats.TotalBytesRead += Result.DownloadedBytes; m_Stats.TotalRequestTimeUs += uint64_t(Result.ElapsedSeconds * 1000000.0); m_Stats.TotalRequestCount++; + SetAtomicMax(m_Stats.PeakSentBytes, Result.UploadedBytes); + SetAtomicMax(m_Stats.PeakReceivedBytes, Result.DownloadedBytes); + if (Result.ElapsedSeconds > 0.0) + { + uint64_t BytesPerSec = uint64_t((Result.UploadedBytes + Result.DownloadedBytes) / Result.ElapsedSeconds); + SetAtomicMax(m_Stats.PeakBytesPerSec, BytesPerSec); + } } HttpClient& m_HttpClient; diff --git a/src/zenutil/filebuildstorage.cpp b/src/zenutil/filebuildstorage.cpp index c2cc5ab3c..f75fe403f 100644 --- a/src/zenutil/filebuildstorage.cpp +++ b/src/zenutil/filebuildstorage.cpp @@ -40,19 +40,23 @@ public: ZEN_TRACE_CPU("FileBuildStorage::ListNamespaces"); ZEN_UNUSED(bRecursive); - SimulateLatency(0, 0); + uint64_t ReceivedBytes = 0; + uint64_t SentBytes = 0; + + SimulateLatency(SentBytes, 0); + auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); Stopwatch ExecutionTimer; - auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); - m_Stats.TotalRequestCount++; + auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); CbObjectWriter Writer; Writer.BeginArray("results"); { } Writer.EndArray(); // results - Writer.Save(); - SimulateLatency(Writer.GetSaveSize(), 0); + + Writer.Finalize(); + ReceivedBytes = Writer.GetSaveSize(); return Writer.Save(); } @@ -61,11 +65,14 @@ public: ZEN_TRACE_CPU("FileBuildStorage::ListBuilds"); ZEN_UNUSED(Query); - SimulateLatency(Query.GetSize(), 0); + uint64_t ReceivedBytes = 0; + uint64_t SentBytes = Query.GetSize(); + + SimulateLatency(SentBytes, 0); + auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); Stopwatch ExecutionTimer; - auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); - m_Stats.TotalRequestCount++; + auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); const std::filesystem::path BuildFolder = GetBuildsFolder(); DirectoryContent Content; @@ -88,19 +95,23 @@ public: } } Writer.EndArray(); // results - Writer.Save(); - SimulateLatency(Writer.GetSaveSize(), 0); + + Writer.Finalize(); + ReceivedBytes = Writer.GetSaveSize(); return Writer.Save(); } virtual CbObject PutBuild(const Oid& BuildId, const CbObject& MetaData) override { ZEN_TRACE_CPU("FileBuildStorage::PutBuild"); - SimulateLatency(MetaData.GetSize(), 0); + uint64_t ReceivedBytes = 0; + uint64_t SentBytes = MetaData.GetSize(); + + SimulateLatency(SentBytes, 0); + auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); Stopwatch ExecutionTimer; - auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); - m_Stats.TotalRequestCount++; + auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); CbObjectWriter BuildObject; BuildObject.AddObject("metadata", MetaData); @@ -109,35 +120,41 @@ public: CbObjectWriter BuildResponse; BuildResponse.AddInteger("chunkSize"sv, 32u * 1024u * 1024u); - BuildResponse.Save(); - - SimulateLatency(0, BuildResponse.GetSaveSize()); + BuildResponse.Finalize(); + ReceivedBytes = BuildResponse.GetSaveSize(); return BuildResponse.Save(); } virtual CbObject GetBuild(const Oid& BuildId) override { ZEN_TRACE_CPU("FileBuildStorage::GetBuild"); - SimulateLatency(0, 0); + uint64_t ReceivedBytes = 0; + uint64_t SentBytes = 0; + + SimulateLatency(SentBytes, 0); + auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); + Stopwatch ExecutionTimer; - auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); - m_Stats.TotalRequestCount++; + auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); CbObject Build = ReadBuild(BuildId); - SimulateLatency(0, Build.GetSize()); + ReceivedBytes = Build.GetSize(); return Build; } virtual void FinalizeBuild(const Oid& BuildId) override { ZEN_TRACE_CPU("FileBuildStorage::FinalizeBuild"); - SimulateLatency(0, 0); - Stopwatch ExecutionTimer; - auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); - m_Stats.TotalRequestCount++; - ZEN_UNUSED(BuildId); - SimulateLatency(0, 0); + + uint64_t ReceivedBytes = 0; + uint64_t SentBytes = 0; + + SimulateLatency(SentBytes, 0); + auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); + + Stopwatch ExecutionTimer; + auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); } virtual std::pair<IoHash, std::vector<IoHash>> PutBuildPart(const Oid& BuildId, @@ -146,10 +163,14 @@ public: const CbObject& MetaData) override { ZEN_TRACE_CPU("FileBuildStorage::PutBuildPart"); - SimulateLatency(MetaData.GetSize(), 0); + uint64_t ReceivedBytes = 0; + uint64_t SentBytes = MetaData.GetSize(); + + SimulateLatency(SentBytes, 0); + auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); + Stopwatch ExecutionTimer; - auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); - m_Stats.TotalRequestCount++; + auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); const std::filesystem::path BuildPartDataPath = GetBuildPartPath(BuildId, BuildPartId); CreateDirectories(BuildPartDataPath.parent_path()); @@ -184,7 +205,7 @@ public: std::vector<IoHash> NeededAttachments = GetNeededAttachments(MetaData); - SimulateLatency(0, sizeof(IoHash) * NeededAttachments.size()); + ReceivedBytes = sizeof(IoHash) * NeededAttachments.size(); return std::make_pair(RawHash, std::move(NeededAttachments)); } @@ -192,22 +213,24 @@ public: virtual CbObject GetBuildPart(const Oid& BuildId, const Oid& BuildPartId) override { ZEN_TRACE_CPU("FileBuildStorage::GetBuildPart"); - SimulateLatency(0, 0); + uint64_t ReceivedBytes = 0; + uint64_t SentBytes = 0; + + SimulateLatency(SentBytes, 0); + auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); Stopwatch ExecutionTimer; - auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); - m_Stats.TotalRequestCount++; + auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); const std::filesystem::path BuildPartDataPath = GetBuildPartPath(BuildId, BuildPartId); IoBuffer Payload = ReadFile(BuildPartDataPath).Flatten(); - m_Stats.TotalBytesRead += Payload.GetSize(); ZEN_ASSERT(ValidateCompactBinary(Payload.GetView(), CbValidateMode::Default) == CbValidateError::None); CbObject BuildPartObject = CbObject(SharedBuffer(Payload)); - SimulateLatency(0, BuildPartObject.GetSize()); + ReceivedBytes = BuildPartObject.GetSize(); return BuildPartObject; } @@ -215,15 +238,18 @@ public: virtual std::vector<IoHash> FinalizeBuildPart(const Oid& BuildId, const Oid& BuildPartId, const IoHash& PartHash) override { ZEN_TRACE_CPU("FileBuildStorage::FinalizeBuildPart"); - SimulateLatency(0, 0); + uint64_t ReceivedBytes = 0; + uint64_t SentBytes = 0; + + SimulateLatency(SentBytes, 0); + auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); Stopwatch ExecutionTimer; - auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); - m_Stats.TotalRequestCount++; + auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); const std::filesystem::path BuildPartDataPath = GetBuildPartPath(BuildId, BuildPartId); IoBuffer Payload = ReadFile(BuildPartDataPath).Flatten(); - m_Stats.TotalBytesRead += Payload.GetSize(); + IoHash RawHash = IoHash::HashBuffer(Payload.GetView()); if (RawHash != PartHash) { @@ -234,7 +260,7 @@ public: CbObject BuildPartObject = CbObject(SharedBuffer(Payload)); std::vector<IoHash> NeededAttachments(GetNeededAttachments(BuildPartObject)); - SimulateLatency(0, NeededAttachments.size() * sizeof(IoHash)); + ReceivedBytes = NeededAttachments.size() * sizeof(IoHash); return NeededAttachments; } @@ -247,13 +273,14 @@ public: ZEN_TRACE_CPU("FileBuildStorage::PutBuildBlob"); ZEN_UNUSED(BuildId); ZEN_ASSERT(ContentType == ZenContentType::kCompressedBinary); - SimulateLatency(Payload.GetSize(), 0); ZEN_ASSERT_SLOW(ValidateCompressedBuffer(RawHash, Payload)); - Stopwatch ExecutionTimer; - auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); - m_Stats.TotalRequestCount++; + uint64_t ReceivedBytes = 0; + uint64_t SentBytes = Payload.GetSize(); + + SimulateLatency(SentBytes, 0); + auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); const std::filesystem::path BlockPath = GetBlobPayloadPath(RawHash); if (!IsFile(BlockPath)) @@ -261,8 +288,8 @@ public: CreateDirectories(BlockPath.parent_path()); TemporaryFile::SafeWriteFile(BlockPath, Payload.Flatten().GetView()); } - m_Stats.TotalBytesWritten += Payload.GetSize(); - SimulateLatency(0, 0); + + ReceivedBytes = Payload.GetSize(); } virtual std::vector<std::function<void()>> PutLargeBuildBlob(const Oid& BuildId, @@ -275,10 +302,15 @@ public: ZEN_TRACE_CPU("FileBuildStorage::PutLargeBuildBlob"); ZEN_UNUSED(BuildId); ZEN_UNUSED(ContentType); - SimulateLatency(0, 0); + + uint64_t ReceivedBytes = 0; + uint64_t SentBytes = 0; + + SimulateLatency(SentBytes, 0); + auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); + Stopwatch ExecutionTimer; - auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); - m_Stats.TotalRequestCount++; + auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); const std::filesystem::path BlockPath = GetBlobPayloadPath(RawHash); if (!IsFile(BlockPath)) @@ -314,7 +346,15 @@ public: WorkItems.push_back([this, RawHash, BlockPath, Workload, Offset, Size]() { ZEN_TRACE_CPU("FileBuildStorage::PutLargeBuildBlob_Work"); IoBuffer PartPayload = Workload->Transmitter(Offset, Size); - SimulateLatency(PartPayload.GetSize(), 0); + + uint64_t ReceivedBytes = 0; + uint64_t SentBytes = PartPayload.GetSize(); + + SimulateLatency(SentBytes, 0); + auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); + + Stopwatch ExecutionTimer; + auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); std::error_code Ec; Workload->TempFile.Write(PartPayload, Offset, Ec); @@ -325,8 +365,7 @@ public: Ec.message(), Ec.value())); } - uint64_t BytesWritten = PartPayload.GetSize(); - m_Stats.TotalBytesWritten += BytesWritten; + const bool IsLastPart = Workload->PartsLeft.fetch_sub(1) == 1; if (IsLastPart) { @@ -342,18 +381,14 @@ public: Ec.value())); } } - Workload->OnSentBytes(BytesWritten, IsLastPart); - SimulateLatency(0, 0); + Workload->OnSentBytes(SentBytes, IsLastPart); }); Offset += Size; } Workload->PartsLeft.store(WorkItems.size()); - - SimulateLatency(0, 0); return WorkItems; } - SimulateLatency(0, 0); return {}; } @@ -361,10 +396,15 @@ public: { ZEN_TRACE_CPU("FileBuildStorage::GetBuildBlob"); ZEN_UNUSED(BuildId); - SimulateLatency(0, 0); + + uint64_t ReceivedBytes = 0; + uint64_t SentBytes = 0; + + SimulateLatency(SentBytes, 0); + auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); + Stopwatch ExecutionTimer; - auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); - m_Stats.TotalRequestCount++; + auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); const std::filesystem::path BlockPath = GetBlobPayloadPath(RawHash); if (IsFile(BlockPath)) @@ -382,11 +422,9 @@ public: ZEN_ASSERT_SLOW(ValidateCompressedBuffer(RawHash, CompositeBuffer(SharedBuffer(Payload)))); } Payload.SetContentType(ZenContentType::kCompressedBinary); - m_Stats.TotalBytesRead += Payload.GetSize(); - SimulateLatency(0, Payload.GetSize()); + ReceivedBytes = Payload.GetSize(); return Payload; } - SimulateLatency(0, 0); return IoBuffer{}; } @@ -398,10 +436,15 @@ public: { ZEN_TRACE_CPU("FileBuildStorage::GetLargeBuildBlob"); ZEN_UNUSED(BuildId); - SimulateLatency(0, 0); + + uint64_t ReceivedBytes = 0; + uint64_t SentBytes = 0; + + SimulateLatency(SentBytes, 0); + auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); + Stopwatch ExecutionTimer; - auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); - m_Stats.TotalRequestCount++; + auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); const std::filesystem::path BlockPath = GetBlobPayloadPath(RawHash); if (IsFile(BlockPath)) @@ -429,22 +472,29 @@ public: uint64_t Size = Min(ChunkSize, BlobSize - Offset); WorkItems.push_back([this, BlockPath, Workload, Offset, Size]() { ZEN_TRACE_CPU("FileBuildStorage::GetLargeBuildBlob_Work"); - SimulateLatency(0, 0); + + uint64_t ReceivedBytes = 0; + uint64_t SentBytes = 0; + + SimulateLatency(SentBytes, 0); + auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); + + Stopwatch ExecutionTimer; + auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); + IoBuffer PartPayload(Size); Workload->BlobFile.Read(PartPayload.GetMutableView().GetData(), Size, Offset); - m_Stats.TotalBytesRead += PartPayload.GetSize(); + ReceivedBytes = PartPayload.GetSize(); Workload->OnReceive(Offset, PartPayload); uint64_t ByteRemaning = Workload->BytesRemaining.fetch_sub(Size); if (ByteRemaning == Size) { Workload->OnComplete(); } - SimulateLatency(Size, PartPayload.GetSize()); }); Offset += Size; } - SimulateLatency(0, 0); return WorkItems; } return {}; @@ -455,18 +505,19 @@ public: ZEN_TRACE_CPU("FileBuildStorage::PutBlockMetadata"); ZEN_UNUSED(BuildId); - SimulateLatency(MetaData.GetSize(), 0); + uint64_t ReceivedBytes = 0; + uint64_t SentBytes = MetaData.GetSize(); + + SimulateLatency(SentBytes, 0); + auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); Stopwatch ExecutionTimer; - auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); - m_Stats.TotalRequestCount++; + auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); const std::filesystem::path BlockMetaDataPath = GetBlobMetadataPath(BlockRawHash); CreateDirectories(BlockMetaDataPath.parent_path()); TemporaryFile::SafeWriteFile(BlockMetaDataPath, MetaData.GetView()); - m_Stats.TotalBytesWritten += MetaData.GetSize(); WriteAsJson(BlockMetaDataPath, MetaData); - SimulateLatency(0, 0); return true; } @@ -474,10 +525,15 @@ public: { ZEN_TRACE_CPU("FileBuildStorage::FindBlocks"); ZEN_UNUSED(BuildId); - SimulateLatency(sizeof(BuildId), 0); + + uint64_t ReceivedBytes = 0; + uint64_t SentBytes = 0; + + SimulateLatency(SentBytes, 0); + auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); + Stopwatch ExecutionTimer; - auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); - m_Stats.TotalRequestCount++; + auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); uint64_t FoundCount = 0; @@ -495,8 +551,6 @@ public: { IoBuffer BlockMetaDataPayload = ReadFile(MetaDataFile).Flatten(); - m_Stats.TotalBytesRead += BlockMetaDataPayload.GetSize(); - CbObject BlockObject = CbObject(SharedBuffer(BlockMetaDataPayload)); Writer.AddObject(BlockObject); FoundCount++; @@ -508,19 +562,25 @@ public: } } Writer.EndArray(); // blocks - CbObject Result = Writer.Save(); - SimulateLatency(0, Result.GetSize()); - return Result; + + Writer.Finalize(); + ReceivedBytes = Writer.GetSaveSize(); + return Writer.Save(); } virtual CbObject GetBlockMetadatas(const Oid& BuildId, std::span<const IoHash> BlockHashes) override { ZEN_TRACE_CPU("FileBuildStorage::GetBlockMetadata"); ZEN_UNUSED(BuildId); - SimulateLatency(sizeof(Oid) + sizeof(IoHash) * BlockHashes.size(), 0); + + uint64_t ReceivedBytes = 0; + uint64_t SentBytes = sizeof(Oid) + sizeof(IoHash) * BlockHashes.size(); + + SimulateLatency(SentBytes, 0); + auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); + Stopwatch ExecutionTimer; - auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); - m_Stats.TotalRequestCount++; + auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); CbObjectWriter Writer; Writer.BeginArray("blocks"); @@ -532,22 +592,29 @@ public: { IoBuffer BlockMetaDataPayload = ReadFile(MetaDataFile).Flatten(); - m_Stats.TotalBytesRead += BlockMetaDataPayload.GetSize(); - CbObject BlockObject = CbObject(SharedBuffer(BlockMetaDataPayload)); Writer.AddObject(BlockObject); } } Writer.EndArray(); // blocks - CbObject Result = Writer.Save(); - SimulateLatency(0, Result.GetSize()); - return Result; + Writer.Finalize(); + ReceivedBytes = Writer.GetSaveSize(); + return Writer.Save(); } virtual void PutBuildPartStats(const Oid& BuildId, const Oid& BuildPartId, const tsl::robin_map<std::string, double>& FloatStats) override { + uint64_t ReceivedBytes = 0; + uint64_t SentBytes = 0; + + SimulateLatency(SentBytes, 0); + auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); + + Stopwatch ExecutionTimer; + auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); + CbObjectWriter Request; Request.BeginObject("floatStats"sv); for (auto It : FloatStats) @@ -555,17 +622,16 @@ public: Request.AddFloat(It.first, It.second); } Request.EndObject(); - CbObject Payload = Request.Save(); - - SimulateLatency(Payload.GetSize(), 0); + Request.Finalize(); + SentBytes = Request.GetSaveSize(); const std::filesystem::path BuildPartStatsDataPath = GetBuildPartStatsPath(BuildId, BuildPartId); CreateDirectories(BuildPartStatsDataPath.parent_path()); + CbObject Payload = Request.Save(); + TemporaryFile::SafeWriteFile(BuildPartStatsDataPath, Payload.GetView()); WriteAsJson(BuildPartStatsDataPath, Payload); - - SimulateLatency(0, 0); } protected: @@ -629,7 +695,6 @@ protected: const std::filesystem::path BuildDataPath = GetBuildPath(BuildId); CreateDirectories(BuildDataPath.parent_path()); TemporaryFile::SafeWriteFile(BuildDataPath, Data.GetView()); - m_Stats.TotalBytesWritten += Data.GetSize(); WriteAsJson(BuildDataPath, Data); } @@ -646,7 +711,6 @@ protected: Content.ErrorCode.value())); } IoBuffer Payload = Content.Flatten(); - m_Stats.TotalBytesRead += Payload.GetSize(); ZEN_ASSERT(ValidateCompactBinary(Payload.GetView(), CbValidateMode::Default) == CbValidateError::None); CbObject BuildObject = CbObject(SharedBuffer(Payload)); return BuildObject; @@ -704,6 +768,22 @@ protected: } private: + void AddStatistic(Stopwatch& ExecutionTimer, uint64_t UploadedBytes, uint64_t DownloadedBytes) + { + const uint64_t ElapsedUs = ExecutionTimer.GetElapsedTimeUs(); + m_Stats.TotalBytesWritten += UploadedBytes; + m_Stats.TotalBytesRead += DownloadedBytes; + m_Stats.TotalRequestTimeUs += ElapsedUs; + m_Stats.TotalRequestCount++; + SetAtomicMax(m_Stats.PeakSentBytes, UploadedBytes); + SetAtomicMax(m_Stats.PeakReceivedBytes, DownloadedBytes); + if (ElapsedUs > 0) + { + uint64_t BytesPerSec = ((UploadedBytes + DownloadedBytes) * 1000000u) / ElapsedUs; + SetAtomicMax(m_Stats.PeakBytesPerSec, BytesPerSec); + } + } + const std::filesystem::path m_StoragePath; BuildStorage::Statistics& m_Stats; const bool m_EnableJsonOutput = false; diff --git a/src/zenutil/include/zenutil/buildstorage.h b/src/zenutil/include/zenutil/buildstorage.h index f49d4b42a..46ecd0a11 100644 --- a/src/zenutil/include/zenutil/buildstorage.h +++ b/src/zenutil/include/zenutil/buildstorage.h @@ -21,6 +21,9 @@ public: std::atomic<uint64_t> TotalRequestCount = 0; std::atomic<uint64_t> TotalRequestTimeUs = 0; std::atomic<uint64_t> TotalExecutionTimeUs = 0; + std::atomic<uint64_t> PeakSentBytes = 0; + std::atomic<uint64_t> PeakReceivedBytes = 0; + std::atomic<uint64_t> PeakBytesPerSec = 0; }; virtual ~BuildStorage() {} diff --git a/src/zenutil/include/zenutil/buildstoragecache.h b/src/zenutil/include/zenutil/buildstoragecache.h index e1fb73fd4..a0690a16a 100644 --- a/src/zenutil/include/zenutil/buildstoragecache.h +++ b/src/zenutil/include/zenutil/buildstoragecache.h @@ -22,6 +22,9 @@ public: std::atomic<uint64_t> TotalRequestCount = 0; std::atomic<uint64_t> TotalRequestTimeUs = 0; std::atomic<uint64_t> TotalExecutionTimeUs = 0; + std::atomic<uint64_t> PeakSentBytes = 0; + std::atomic<uint64_t> PeakReceivedBytes = 0; + std::atomic<uint64_t> PeakBytesPerSec = 0; }; virtual ~BuildStorageCache() {} diff --git a/src/zenutil/jupiter/jupiterbuildstorage.cpp b/src/zenutil/jupiter/jupiterbuildstorage.cpp index 9974725ff..c9278acb4 100644 --- a/src/zenutil/jupiter/jupiterbuildstorage.cpp +++ b/src/zenutil/jupiter/jupiterbuildstorage.cpp @@ -17,6 +17,16 @@ namespace zen { using namespace std::literals; +namespace { + void ThrowFromJupiterResult(const JupiterResult& Result, std::string_view Prefix) + { + int Error = Result.ErrorCode < (int)HttpResponseCode::Continue ? Result.ErrorCode : 0; + HttpResponseCode Status = + Result.ErrorCode >= int(HttpResponseCode::Continue) ? HttpResponseCode(Result.ErrorCode) : HttpResponseCode::ImATeapot; + throw HttpClientError(fmt::format("{}: {} ({})", Prefix, Result.Reason, Result.ErrorCode), Error, Status); + } +} // namespace + class JupiterBuildStorage : public BuildStorage { public: @@ -46,7 +56,7 @@ public: AddStatistic(ListResult); if (!ListResult.Success) { - throw std::runtime_error(fmt::format("Failed listing namespaces: {} ({})", ListResult.Reason, ListResult.ErrorCode)); + ThrowFromJupiterResult(ListResult, "Failed listing namespaces"); } CbObject NamespaceResponse = PayloadToCbObject("Failed listing namespaces"sv, ListResult.Response); @@ -66,10 +76,10 @@ public: AddStatistic(BucketsResult); if (!BucketsResult.Success) { - throw std::runtime_error( - fmt::format("Failed listing namespaces: {} ({})", BucketsResult.Reason, BucketsResult.ErrorCode)); + ThrowFromJupiterResult(BucketsResult, fmt::format("Failed listing buckets in namespace {}", Namespace)); } - CbObject BucketResponse = PayloadToCbObject("Failed listing namespaces"sv, BucketsResult.Response); + CbObject BucketResponse = + PayloadToCbObject(fmt::format("Failed listing buckets in namespace {}", Namespace), BucketsResult.Response); Response.BeginArray("items"); for (CbFieldView BucketField : BucketResponse["buckets"]) @@ -103,7 +113,7 @@ public: AddStatistic(ListResult); if (!ListResult.Success) { - throw std::runtime_error(fmt::format("Failed listing builds: {} ({})", ListResult.Reason, ListResult.ErrorCode)); + ThrowFromJupiterResult(ListResult, "Failed listing builds"sv); } return PayloadToCbObject("Failed listing builds"sv, ListResult.Response); } @@ -120,7 +130,7 @@ public: AddStatistic(PutResult); if (!PutResult.Success) { - throw std::runtime_error(fmt::format("Failed creating build: {} ({})", PutResult.Reason, PutResult.ErrorCode)); + ThrowFromJupiterResult(PutResult, "Failed creating build"sv); } return PayloadToCbObject(fmt::format("Failed creating build: {}", BuildId), PutResult.Response); } @@ -135,7 +145,7 @@ public: AddStatistic(GetBuildResult); if (!GetBuildResult.Success) { - throw std::runtime_error(fmt::format("Failed fetching build: {} ({})", GetBuildResult.Reason, GetBuildResult.ErrorCode)); + ThrowFromJupiterResult(GetBuildResult, "Failed fetching build"sv); } return PayloadToCbObject(fmt::format("Failed fetching build {}:", BuildId), GetBuildResult.Response); } @@ -150,8 +160,7 @@ public: AddStatistic(FinalizeBuildResult); if (!FinalizeBuildResult.Success) { - throw std::runtime_error( - fmt::format("Failed finalizing build part: {} ({})", FinalizeBuildResult.Reason, FinalizeBuildResult.ErrorCode)); + ThrowFromJupiterResult(FinalizeBuildResult, "Failed finalizing build"sv); } } @@ -170,7 +179,7 @@ public: AddStatistic(PutPartResult); if (!PutPartResult.Success) { - throw std::runtime_error(fmt::format("Failed creating build part: {} ({})", PutPartResult.Reason, PutPartResult.ErrorCode)); + ThrowFromJupiterResult(PutPartResult, "Failed creating build part"sv); } return std::make_pair(PutPartResult.RawHash, std::move(PutPartResult.Needs)); } @@ -185,10 +194,7 @@ public: AddStatistic(GetBuildPartResult); if (!GetBuildPartResult.Success) { - throw std::runtime_error(fmt::format("Failed fetching build part {}: {} ({})", - BuildPartId, - GetBuildPartResult.Reason, - GetBuildPartResult.ErrorCode)); + ThrowFromJupiterResult(GetBuildPartResult, "Failed fetching build part"sv); } return PayloadToCbObject(fmt::format("Failed fetching build part {}:", BuildPartId), GetBuildPartResult.Response); } @@ -203,8 +209,7 @@ public: AddStatistic(FinalizePartResult); if (!FinalizePartResult.Success) { - throw std::runtime_error( - fmt::format("Failed finalizing build part: {} ({})", FinalizePartResult.Reason, FinalizePartResult.ErrorCode)); + ThrowFromJupiterResult(FinalizePartResult, "Failed finalizing build part"sv); } return std::move(FinalizePartResult.Needs); } @@ -222,7 +227,7 @@ public: AddStatistic(PutBlobResult); if (!PutBlobResult.Success) { - throw std::runtime_error(fmt::format("Failed putting build part: {} ({})", PutBlobResult.Reason, PutBlobResult.ErrorCode)); + ThrowFromJupiterResult(PutBlobResult, "Failed putting build part"sv); } } @@ -249,8 +254,7 @@ public: AddStatistic(PutMultipartBlobResult); if (!PutMultipartBlobResult.Success) { - throw std::runtime_error( - fmt::format("Failed putting build part: {} ({})", PutMultipartBlobResult.Reason, PutMultipartBlobResult.ErrorCode)); + ThrowFromJupiterResult(PutMultipartBlobResult, "Failed putting large build blob"sv); } OnSentBytes(PutMultipartBlobResult.SentBytes, WorkItems.empty()); @@ -265,7 +269,7 @@ public: AddStatistic(PartResult); if (!PartResult.Success) { - throw std::runtime_error(fmt::format("Failed putting build part: {} ({})", PartResult.Reason, PartResult.ErrorCode)); + ThrowFromJupiterResult(PartResult, "Failed putting large build blob"sv); } OnSentBytes(PartResult.SentBytes, IsComplete); }); @@ -285,8 +289,7 @@ public: AddStatistic(GetBuildBlobResult); if (!GetBuildBlobResult.Success) { - throw std::runtime_error( - fmt::format("Failed fetching build blob {}: {} ({})", RawHash, GetBuildBlobResult.Reason, GetBuildBlobResult.ErrorCode)); + ThrowFromJupiterResult(GetBuildBlobResult, "Failed fetching build blob"sv); } return std::move(GetBuildBlobResult.Response); } @@ -314,8 +317,7 @@ public: AddStatistic(GetMultipartBlobResult); if (!GetMultipartBlobResult.Success) { - throw std::runtime_error( - fmt::format("Failed getting build part: {} ({})", GetMultipartBlobResult.Reason, GetMultipartBlobResult.ErrorCode)); + ThrowFromJupiterResult(GetMultipartBlobResult, "Failed getting large build part"sv); } std::vector<std::function<void()>> WorkList; for (auto& WorkItem : WorkItems) @@ -327,7 +329,7 @@ public: AddStatistic(PartResult); if (!PartResult.Success) { - throw std::runtime_error(fmt::format("Failed getting build part: {} ({})", PartResult.Reason, PartResult.ErrorCode)); + ThrowFromJupiterResult(PartResult, "Failed getting large build part"sv); } }); } @@ -350,8 +352,7 @@ public: { return false; } - throw std::runtime_error( - fmt::format("Failed putting build block metadata: {} ({})", PutMetaResult.Reason, PutMetaResult.ErrorCode)); + ThrowFromJupiterResult(PutMetaResult, "Failed putting build block metadata"sv); } return true; } @@ -366,7 +367,7 @@ public: AddStatistic(FindResult); if (!FindResult.Success) { - throw std::runtime_error(fmt::format("Failed fetching known blocks: {} ({})", FindResult.Reason, FindResult.ErrorCode)); + ThrowFromJupiterResult(FindResult, "Failed fetching known blocks"sv); } return PayloadToCbObject("Failed fetching known blocks"sv, FindResult.Response); } @@ -392,8 +393,7 @@ public: AddStatistic(GetBlockMetadataResult); if (!GetBlockMetadataResult.Success) { - throw std::runtime_error( - fmt::format("Failed fetching block metadatas: {} ({})", GetBlockMetadataResult.Reason, GetBlockMetadataResult.ErrorCode)); + ThrowFromJupiterResult(GetBlockMetadataResult, "Failed fetching block metadatas"sv); } return PayloadToCbObject("Failed fetching block metadatas", GetBlockMetadataResult.Response); } @@ -416,14 +416,12 @@ public: AddStatistic(PutBuildPartStatsResult); if (!PutBuildPartStatsResult.Success) { - throw std::runtime_error(fmt::format("Failed posting build part statistics: {} ({})", - PutBuildPartStatsResult.Reason, - PutBuildPartStatsResult.ErrorCode)); + ThrowFromJupiterResult(PutBuildPartStatsResult, "Failed posting build part statistics"sv); } } private: - static CbObject PayloadToCbObject(std::string_view Context, const IoBuffer& Payload) + static CbObject PayloadToCbObject(std::string_view ErrorContext, const IoBuffer& Payload) { if (Payload.GetContentType() == ZenContentType::kJSON) { @@ -443,7 +441,7 @@ private: else { throw std::runtime_error( - fmt::format("{}: {} ({})", "Unsupported response format", Context, ToString(Payload.GetContentType()))); + fmt::format("{}: {} ({})", "Unsupported response format", ErrorContext, ToString(Payload.GetContentType()))); } } @@ -453,6 +451,14 @@ private: m_Stats.TotalBytesRead += Result.ReceivedBytes; m_Stats.TotalRequestTimeUs += uint64_t(Result.ElapsedSeconds * 1000000.0); m_Stats.TotalRequestCount++; + + SetAtomicMax(m_Stats.PeakSentBytes, Result.SentBytes); + SetAtomicMax(m_Stats.PeakReceivedBytes, Result.ReceivedBytes); + if (Result.ElapsedSeconds > 0.0) + { + uint64_t BytesPerSec = uint64_t((Result.SentBytes + Result.ReceivedBytes) / Result.ElapsedSeconds); + SetAtomicMax(m_Stats.PeakBytesPerSec, BytesPerSec); + } } JupiterSession m_Session; |