diff options
| author | Dan Engelbrecht <[email protected]> | 2025-04-11 19:22:21 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-04-11 19:22:21 +0200 |
| commit | dec3f27c488a1dda8a2f1133361e2fda9315e0d2 (patch) | |
| tree | 0aec5d833245bb5d65a84a65a1e4deda3241d222 /src | |
| parent | xmake updatefrontend (diff) | |
| download | zen-dec3f27c488a1dda8a2f1133361e2fda9315e0d2.tar.xz zen-dec3f27c488a1dda8a2f1133361e2fda9315e0d2.zip | |
fix race condition in multipart download (#358)
Diffstat (limited to 'src')
| -rw-r--r-- | src/zen/cmds/builds_cmd.cpp | 26 | ||||
| -rw-r--r-- | src/zenutil/filebuildstorage.cpp | 26 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/buildstorage.h | 12 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/jupiter/jupitersession.h | 15 | ||||
| -rw-r--r-- | src/zenutil/jupiter/jupiterbuildstorage.cpp | 20 | ||||
| -rw-r--r-- | src/zenutil/jupiter/jupitersession.cpp | 38 |
6 files changed, 81 insertions, 56 deletions
diff --git a/src/zen/cmds/builds_cmd.cpp b/src/zen/cmds/builds_cmd.cpp index f7f9e3abb..cdcd79f58 100644 --- a/src/zen/cmds/builds_cmd.cpp +++ b/src/zen/cmds/builds_cmd.cpp @@ -1843,25 +1843,25 @@ namespace { BuildId, ChunkHash, PreferredMultipartChunkSize, - [Workload, &DownloadStats, OnDownloadComplete = std::move(OnDownloadComplete)](uint64_t Offset, - const IoBuffer& Chunk, - uint64_t BytesRemaining) { + [Workload, &DownloadStats](uint64_t Offset, const IoBuffer& Chunk) { DownloadStats.DownloadedChunkByteCount += Chunk.GetSize(); if (!AbortFlag.load()) { ZEN_TRACE_CPU("DownloadLargeBlob_Save"); Workload->TempFile.Write(Chunk.GetView(), Offset); - if (Chunk.GetSize() == BytesRemaining) - { - DownloadStats.DownloadedChunkCount++; - uint64_t PayloadSize = Workload->TempFile.FileSize(); - void* FileHandle = Workload->TempFile.Detach(); - ZEN_ASSERT(FileHandle != nullptr); - IoBuffer Payload(IoBuffer::File, FileHandle, 0, PayloadSize, true); - Payload.SetDeleteOnClose(true); - OnDownloadComplete(std::move(Payload)); - } + } + }, + [Workload, &DownloadStats, OnDownloadComplete = std::move(OnDownloadComplete)]() { + DownloadStats.DownloadedChunkCount++; + if (!AbortFlag.load()) + { + uint64_t PayloadSize = Workload->TempFile.FileSize(); + void* FileHandle = Workload->TempFile.Detach(); + ZEN_ASSERT(FileHandle != nullptr); + IoBuffer Payload(IoBuffer::File, FileHandle, 0, PayloadSize, true); + Payload.SetDeleteOnClose(true); + OnDownloadComplete(std::move(Payload)); } }); if (!WorkItems.empty()) diff --git a/src/zenutil/filebuildstorage.cpp b/src/zenutil/filebuildstorage.cpp index 7aa252e44..f335a03a3 100644 --- a/src/zenutil/filebuildstorage.cpp +++ b/src/zenutil/filebuildstorage.cpp @@ -369,11 +369,11 @@ public: return IoBuffer{}; } - virtual std::vector<std::function<void()>> GetLargeBuildBlob( - const Oid& BuildId, - const IoHash& RawHash, - uint64_t ChunkSize, - std::function<void(uint64_t Offset, const IoBuffer& Chunk, uint64_t BytesRemaining)>&& Receiver) override + virtual std::vector<std::function<void()>> GetLargeBuildBlob(const Oid& BuildId, + const IoHash& RawHash, + uint64_t ChunkSize, + std::function<void(uint64_t Offset, const IoBuffer& Chunk)>&& OnReceive, + std::function<void()>&& OnComplete) override { ZEN_TRACE_CPU("FileBuildStorage::GetLargeBuildBlob"); ZEN_UNUSED(BuildId); @@ -387,16 +387,18 @@ public: { struct WorkloadData { - std::atomic<uint64_t> BytesRemaining; - BasicFile BlobFile; - std::function<void(uint64_t Offset, const IoBuffer& Chunk, uint64_t BytesRemaining)> Receiver; + std::atomic<uint64_t> BytesRemaining; + BasicFile BlobFile; + std::function<void(uint64_t Offset, const IoBuffer& Chunk)> OnReceive; + std::function<void()> OnComplete; }; std::shared_ptr<WorkloadData> Workload(std::make_shared<WorkloadData>()); Workload->BlobFile.Open(BlockPath, BasicFile::Mode::kRead); const uint64_t BlobSize = Workload->BlobFile.FileSize(); - Workload->Receiver = std::move(Receiver); + Workload->OnReceive = std::move(OnReceive); + Workload->OnComplete = std::move(OnComplete); Workload->BytesRemaining = BlobSize; std::vector<std::function<void()>> WorkItems; @@ -410,8 +412,12 @@ public: IoBuffer PartPayload(Size); Workload->BlobFile.Read(PartPayload.GetMutableView().GetData(), Size, Offset); m_Stats.TotalBytesRead += PartPayload.GetSize(); + Workload->OnReceive(Offset, PartPayload); uint64_t ByteRemaning = Workload->BytesRemaining.fetch_sub(Size); - Workload->Receiver(Offset, PartPayload, ByteRemaning); + if (ByteRemaning == Size) + { + Workload->OnComplete(); + } SimulateLatency(Size, PartPayload.GetSize()); }); diff --git a/src/zenutil/include/zenutil/buildstorage.h b/src/zenutil/include/zenutil/buildstorage.h index b0665dbf8..05e3ca22d 100644 --- a/src/zenutil/include/zenutil/buildstorage.h +++ b/src/zenutil/include/zenutil/buildstorage.h @@ -47,12 +47,12 @@ public: virtual IoBuffer GetBuildBlob(const Oid& BuildId, const IoHash& RawHash, uint64_t RangeOffset = 0, - uint64_t RangeBytes = (uint64_t)-1) = 0; - virtual std::vector<std::function<void()>> GetLargeBuildBlob( - const Oid& BuildId, - const IoHash& RawHash, - uint64_t ChunkSize, - std::function<void(uint64_t Offset, const IoBuffer& Chunk, uint64_t BytesRemaining)>&& Receiver) = 0; + uint64_t RangeBytes = (uint64_t)-1) = 0; + virtual std::vector<std::function<void()>> GetLargeBuildBlob(const Oid& BuildId, + const IoHash& RawHash, + uint64_t ChunkSize, + std::function<void(uint64_t Offset, const IoBuffer& Chunk)>&& OnReceive, + std::function<void()>&& OnComplete) = 0; virtual void PutBlockMetadata(const Oid& BuildId, const IoHash& BlockRawHash, const CbObject& MetaData) = 0; virtual CbObject FindBlocks(const Oid& BuildId, uint64_t MaxBlockCount) = 0; diff --git a/src/zenutil/include/zenutil/jupiter/jupitersession.h b/src/zenutil/include/zenutil/jupiter/jupitersession.h index 417ed7384..c2886ca4c 100644 --- a/src/zenutil/include/zenutil/jupiter/jupitersession.h +++ b/src/zenutil/include/zenutil/jupiter/jupitersession.h @@ -135,13 +135,14 @@ public: uint64_t PayloadSize, std::function<IoBuffer(uint64_t Offset, uint64_t Size)>&& Transmitter, std::vector<std::function<JupiterResult(bool& OutIsComplete)>>& OutWorkItems); - JupiterResult GetMultipartBuildBlob(std::string_view Namespace, - std::string_view BucketId, - const Oid& BuildId, - const IoHash& Hash, - uint64_t ChunkSize, - std::function<void(uint64_t Offset, const IoBuffer& Chunk, uint64_t BytesRemaining)>&& Receiver, - std::vector<std::function<JupiterResult()>>& OutWorkItems); + JupiterResult GetMultipartBuildBlob(std::string_view Namespace, + std::string_view BucketId, + const Oid& BuildId, + const IoHash& Hash, + uint64_t ChunkSize, + std::function<void(uint64_t Offset, const IoBuffer& Chunk)>&& OnReceive, + std::function<void()>&& OnComplete, + std::vector<std::function<JupiterResult()>>& OutWorkItems); JupiterResult PutBlockMetadata(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, diff --git a/src/zenutil/jupiter/jupiterbuildstorage.cpp b/src/zenutil/jupiter/jupiterbuildstorage.cpp index f2d190408..24e062c7b 100644 --- a/src/zenutil/jupiter/jupiterbuildstorage.cpp +++ b/src/zenutil/jupiter/jupiterbuildstorage.cpp @@ -235,19 +235,25 @@ public: return std::move(GetBuildBlobResult.Response); } - virtual std::vector<std::function<void()>> GetLargeBuildBlob( - const Oid& BuildId, - const IoHash& RawHash, - uint64_t ChunkSize, - std::function<void(uint64_t Offset, const IoBuffer& Chunk, uint64_t BytesRemaining)>&& Receiver) override + virtual std::vector<std::function<void()>> GetLargeBuildBlob(const Oid& BuildId, + const IoHash& RawHash, + uint64_t ChunkSize, + std::function<void(uint64_t Offset, const IoBuffer& Chunk)>&& OnReceive, + std::function<void()>&& OnComplete) override { ZEN_TRACE_CPU("Jupiter::GetLargeBuildBlob"); Stopwatch ExecutionTimer; auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); std::vector<std::function<JupiterResult()>> WorkItems; - JupiterResult GetMultipartBlobResult = - m_Session.GetMultipartBuildBlob(m_Namespace, m_Bucket, BuildId, RawHash, ChunkSize, std::move(Receiver), WorkItems); + JupiterResult GetMultipartBlobResult = m_Session.GetMultipartBuildBlob(m_Namespace, + m_Bucket, + BuildId, + RawHash, + ChunkSize, + std::move(OnReceive), + std::move(OnComplete), + WorkItems); AddStatistic(GetMultipartBlobResult); if (!GetMultipartBlobResult.Success) diff --git a/src/zenutil/jupiter/jupitersession.cpp b/src/zenutil/jupiter/jupitersession.cpp index fde86a478..1f71c29b7 100644 --- a/src/zenutil/jupiter/jupitersession.cpp +++ b/src/zenutil/jupiter/jupitersession.cpp @@ -626,13 +626,14 @@ JupiterSession::PutMultipartBuildBlob(std::string_view Namespace, } JupiterResult -JupiterSession::GetMultipartBuildBlob(std::string_view Namespace, - std::string_view BucketId, - const Oid& BuildId, - const IoHash& Hash, - uint64_t ChunkSize, - std::function<void(uint64_t Offset, const IoBuffer& Chunk, uint64_t BytesRemaining)>&& Receiver, - std::vector<std::function<JupiterResult()>>& OutWorkItems) +JupiterSession::GetMultipartBuildBlob(std::string_view Namespace, + std::string_view BucketId, + const Oid& BuildId, + const IoHash& Hash, + uint64_t ChunkSize, + std::function<void(uint64_t Offset, const IoBuffer& Chunk)>&& OnReceive, + std::function<void()>&& OnComplete, + std::vector<std::function<JupiterResult()>>& OutWorkItems) { std::string RequestUrl = fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}", Namespace, BucketId, BuildId, Hash.ToHexString()); HttpClient::Response Response = @@ -649,18 +650,20 @@ JupiterSession::GetMultipartBuildBlob(std::string_view Namespa uint64_t TotalSize = TotalSizeMaybe.value(); uint64_t PayloadSize = Response.ResponsePayload.GetSize(); - Receiver(0, Response.ResponsePayload, TotalSize); + OnReceive(0, Response.ResponsePayload); if (TotalSize > PayloadSize) { struct WorkloadData { - std::function<void(uint64_t Offset, const IoBuffer& Chunk, uint64_t BytesRemaining)> Receiver; - std::atomic<uint64_t> BytesRemaining; + std::function<void(uint64_t Offset, const IoBuffer& Chunk)> OnReceive; + std::function<void()> OnComplete; + std::atomic<uint64_t> BytesRemaining; }; std::shared_ptr<WorkloadData> Workload(std::make_shared<WorkloadData>()); - Workload->Receiver = std::move(Receiver); + Workload->OnReceive = std::move(OnReceive); + Workload->OnComplete = std::move(OnComplete); Workload->BytesRemaining = TotalSize - PayloadSize; uint64_t Offset = PayloadSize; @@ -676,19 +679,28 @@ JupiterSession::GetMultipartBuildBlob(std::string_view Namespa HttpClient::KeyValueMap({{"Range", fmt::format("bytes={}-{}", Offset, Offset + PartSize - 1)}})); if (Response.IsSuccess()) { + Workload->OnReceive(Offset, Response.ResponsePayload); uint64_t ByteRemaning = Workload->BytesRemaining.fetch_sub(Response.ResponsePayload.GetSize()); - Workload->Receiver(Offset, Response.ResponsePayload, ByteRemaning); + if (ByteRemaning == Response.ResponsePayload.GetSize()) + { + Workload->OnComplete(); + } } return detail::ConvertResponse(Response, "JupiterSession::GetMultipartBuildBlob"sv); }); Offset += PartSize; } } + else + { + OnComplete(); + } return detail::ConvertResponse(Response, "JupiterSession::GetMultipartBuildBlob"sv); } } } - Receiver(0, Response.ResponsePayload, Response.ResponsePayload.GetSize()); + OnReceive(0, Response.ResponsePayload); + OnComplete(); } return detail::ConvertResponse(Response, "JupiterSession::GetMultipartBuildBlob"sv); } |