aboutsummaryrefslogtreecommitdiff
path: root/src/zenutil/jupiter/jupitersession.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-04-11 19:22:21 +0200
committerGitHub Enterprise <[email protected]>2025-04-11 19:22:21 +0200
commitdec3f27c488a1dda8a2f1133361e2fda9315e0d2 (patch)
tree0aec5d833245bb5d65a84a65a1e4deda3241d222 /src/zenutil/jupiter/jupitersession.cpp
parentxmake updatefrontend (diff)
downloadzen-dec3f27c488a1dda8a2f1133361e2fda9315e0d2.tar.xz
zen-dec3f27c488a1dda8a2f1133361e2fda9315e0d2.zip
fix race condition in multipart download (#358)
Diffstat (limited to 'src/zenutil/jupiter/jupitersession.cpp')
-rw-r--r--src/zenutil/jupiter/jupitersession.cpp38
1 files changed, 25 insertions, 13 deletions
diff --git a/src/zenutil/jupiter/jupitersession.cpp b/src/zenutil/jupiter/jupitersession.cpp
index fde86a478..1f71c29b7 100644
--- a/src/zenutil/jupiter/jupitersession.cpp
+++ b/src/zenutil/jupiter/jupitersession.cpp
@@ -626,13 +626,14 @@ JupiterSession::PutMultipartBuildBlob(std::string_view Namespace,
}
JupiterResult
-JupiterSession::GetMultipartBuildBlob(std::string_view Namespace,
- std::string_view BucketId,
- const Oid& BuildId,
- const IoHash& Hash,
- uint64_t ChunkSize,
- std::function<void(uint64_t Offset, const IoBuffer& Chunk, uint64_t BytesRemaining)>&& Receiver,
- std::vector<std::function<JupiterResult()>>& OutWorkItems)
+JupiterSession::GetMultipartBuildBlob(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const IoHash& Hash,
+ uint64_t ChunkSize,
+ std::function<void(uint64_t Offset, const IoBuffer& Chunk)>&& OnReceive,
+ std::function<void()>&& OnComplete,
+ std::vector<std::function<JupiterResult()>>& OutWorkItems)
{
std::string RequestUrl = fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}", Namespace, BucketId, BuildId, Hash.ToHexString());
HttpClient::Response Response =
@@ -649,18 +650,20 @@ JupiterSession::GetMultipartBuildBlob(std::string_view Namespa
uint64_t TotalSize = TotalSizeMaybe.value();
uint64_t PayloadSize = Response.ResponsePayload.GetSize();
- Receiver(0, Response.ResponsePayload, TotalSize);
+ OnReceive(0, Response.ResponsePayload);
if (TotalSize > PayloadSize)
{
struct WorkloadData
{
- std::function<void(uint64_t Offset, const IoBuffer& Chunk, uint64_t BytesRemaining)> Receiver;
- std::atomic<uint64_t> BytesRemaining;
+ std::function<void(uint64_t Offset, const IoBuffer& Chunk)> OnReceive;
+ std::function<void()> OnComplete;
+ std::atomic<uint64_t> BytesRemaining;
};
std::shared_ptr<WorkloadData> Workload(std::make_shared<WorkloadData>());
- Workload->Receiver = std::move(Receiver);
+ Workload->OnReceive = std::move(OnReceive);
+ Workload->OnComplete = std::move(OnComplete);
Workload->BytesRemaining = TotalSize - PayloadSize;
uint64_t Offset = PayloadSize;
@@ -676,19 +679,28 @@ JupiterSession::GetMultipartBuildBlob(std::string_view Namespa
HttpClient::KeyValueMap({{"Range", fmt::format("bytes={}-{}", Offset, Offset + PartSize - 1)}}));
if (Response.IsSuccess())
{
+ Workload->OnReceive(Offset, Response.ResponsePayload);
uint64_t ByteRemaning = Workload->BytesRemaining.fetch_sub(Response.ResponsePayload.GetSize());
- Workload->Receiver(Offset, Response.ResponsePayload, ByteRemaning);
+ if (ByteRemaning == Response.ResponsePayload.GetSize())
+ {
+ Workload->OnComplete();
+ }
}
return detail::ConvertResponse(Response, "JupiterSession::GetMultipartBuildBlob"sv);
});
Offset += PartSize;
}
}
+ else
+ {
+ OnComplete();
+ }
return detail::ConvertResponse(Response, "JupiterSession::GetMultipartBuildBlob"sv);
}
}
}
- Receiver(0, Response.ResponsePayload, Response.ResponsePayload.GetSize());
+ OnReceive(0, Response.ResponsePayload);
+ OnComplete();
}
return detail::ConvertResponse(Response, "JupiterSession::GetMultipartBuildBlob"sv);
}