diff options
| author | Dan Engelbrecht <[email protected]> | 2025-04-11 19:22:21 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-04-11 19:22:21 +0200 |
| commit | dec3f27c488a1dda8a2f1133361e2fda9315e0d2 (patch) | |
| tree | 0aec5d833245bb5d65a84a65a1e4deda3241d222 /src/zenutil/jupiter/jupitersession.cpp | |
| parent | xmake updatefrontend (diff) | |
| download | zen-dec3f27c488a1dda8a2f1133361e2fda9315e0d2.tar.xz zen-dec3f27c488a1dda8a2f1133361e2fda9315e0d2.zip | |
fix race condition in multipart download (#358)
Diffstat (limited to 'src/zenutil/jupiter/jupitersession.cpp')
| -rw-r--r-- | src/zenutil/jupiter/jupitersession.cpp | 38 |
1 files changed, 25 insertions, 13 deletions
diff --git a/src/zenutil/jupiter/jupitersession.cpp b/src/zenutil/jupiter/jupitersession.cpp index fde86a478..1f71c29b7 100644 --- a/src/zenutil/jupiter/jupitersession.cpp +++ b/src/zenutil/jupiter/jupitersession.cpp @@ -626,13 +626,14 @@ JupiterSession::PutMultipartBuildBlob(std::string_view Namespace, } JupiterResult -JupiterSession::GetMultipartBuildBlob(std::string_view Namespace, - std::string_view BucketId, - const Oid& BuildId, - const IoHash& Hash, - uint64_t ChunkSize, - std::function<void(uint64_t Offset, const IoBuffer& Chunk, uint64_t BytesRemaining)>&& Receiver, - std::vector<std::function<JupiterResult()>>& OutWorkItems) +JupiterSession::GetMultipartBuildBlob(std::string_view Namespace, + std::string_view BucketId, + const Oid& BuildId, + const IoHash& Hash, + uint64_t ChunkSize, + std::function<void(uint64_t Offset, const IoBuffer& Chunk)>&& OnReceive, + std::function<void()>&& OnComplete, + std::vector<std::function<JupiterResult()>>& OutWorkItems) { std::string RequestUrl = fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}", Namespace, BucketId, BuildId, Hash.ToHexString()); HttpClient::Response Response = @@ -649,18 +650,20 @@ JupiterSession::GetMultipartBuildBlob(std::string_view Namespa uint64_t TotalSize = TotalSizeMaybe.value(); uint64_t PayloadSize = Response.ResponsePayload.GetSize(); - Receiver(0, Response.ResponsePayload, TotalSize); + OnReceive(0, Response.ResponsePayload); if (TotalSize > PayloadSize) { struct WorkloadData { - std::function<void(uint64_t Offset, const IoBuffer& Chunk, uint64_t BytesRemaining)> Receiver; - std::atomic<uint64_t> BytesRemaining; + std::function<void(uint64_t Offset, const IoBuffer& Chunk)> OnReceive; + std::function<void()> OnComplete; + std::atomic<uint64_t> BytesRemaining; }; std::shared_ptr<WorkloadData> Workload(std::make_shared<WorkloadData>()); - Workload->Receiver = std::move(Receiver); + Workload->OnReceive = std::move(OnReceive); + Workload->OnComplete = std::move(OnComplete); Workload->BytesRemaining = TotalSize - PayloadSize; uint64_t Offset = PayloadSize; @@ -676,19 +679,28 @@ JupiterSession::GetMultipartBuildBlob(std::string_view Namespa HttpClient::KeyValueMap({{"Range", fmt::format("bytes={}-{}", Offset, Offset + PartSize - 1)}})); if (Response.IsSuccess()) { + Workload->OnReceive(Offset, Response.ResponsePayload); uint64_t ByteRemaning = Workload->BytesRemaining.fetch_sub(Response.ResponsePayload.GetSize()); - Workload->Receiver(Offset, Response.ResponsePayload, ByteRemaning); + if (ByteRemaning == Response.ResponsePayload.GetSize()) + { + Workload->OnComplete(); + } } return detail::ConvertResponse(Response, "JupiterSession::GetMultipartBuildBlob"sv); }); Offset += PartSize; } } + else + { + OnComplete(); + } return detail::ConvertResponse(Response, "JupiterSession::GetMultipartBuildBlob"sv); } } } - Receiver(0, Response.ResponsePayload, Response.ResponsePayload.GetSize()); + OnReceive(0, Response.ResponsePayload); + OnComplete(); } return detail::ConvertResponse(Response, "JupiterSession::GetMultipartBuildBlob"sv); } |