aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-04-11 19:22:21 +0200
committerGitHub Enterprise <[email protected]>2025-04-11 19:22:21 +0200
commitdec3f27c488a1dda8a2f1133361e2fda9315e0d2 (patch)
tree0aec5d833245bb5d65a84a65a1e4deda3241d222 /src
parentxmake updatefrontend (diff)
downloadzen-dec3f27c488a1dda8a2f1133361e2fda9315e0d2.tar.xz
zen-dec3f27c488a1dda8a2f1133361e2fda9315e0d2.zip
fix race condition in multipart download (#358)
Diffstat (limited to 'src')
-rw-r--r--src/zen/cmds/builds_cmd.cpp26
-rw-r--r--src/zenutil/filebuildstorage.cpp26
-rw-r--r--src/zenutil/include/zenutil/buildstorage.h12
-rw-r--r--src/zenutil/include/zenutil/jupiter/jupitersession.h15
-rw-r--r--src/zenutil/jupiter/jupiterbuildstorage.cpp20
-rw-r--r--src/zenutil/jupiter/jupitersession.cpp38
6 files changed, 81 insertions, 56 deletions
diff --git a/src/zen/cmds/builds_cmd.cpp b/src/zen/cmds/builds_cmd.cpp
index f7f9e3abb..cdcd79f58 100644
--- a/src/zen/cmds/builds_cmd.cpp
+++ b/src/zen/cmds/builds_cmd.cpp
@@ -1843,25 +1843,25 @@ namespace {
BuildId,
ChunkHash,
PreferredMultipartChunkSize,
- [Workload, &DownloadStats, OnDownloadComplete = std::move(OnDownloadComplete)](uint64_t Offset,
- const IoBuffer& Chunk,
- uint64_t BytesRemaining) {
+ [Workload, &DownloadStats](uint64_t Offset, const IoBuffer& Chunk) {
DownloadStats.DownloadedChunkByteCount += Chunk.GetSize();
if (!AbortFlag.load())
{
ZEN_TRACE_CPU("DownloadLargeBlob_Save");
Workload->TempFile.Write(Chunk.GetView(), Offset);
- if (Chunk.GetSize() == BytesRemaining)
- {
- DownloadStats.DownloadedChunkCount++;
- uint64_t PayloadSize = Workload->TempFile.FileSize();
- void* FileHandle = Workload->TempFile.Detach();
- ZEN_ASSERT(FileHandle != nullptr);
- IoBuffer Payload(IoBuffer::File, FileHandle, 0, PayloadSize, true);
- Payload.SetDeleteOnClose(true);
- OnDownloadComplete(std::move(Payload));
- }
+ }
+ },
+ [Workload, &DownloadStats, OnDownloadComplete = std::move(OnDownloadComplete)]() {
+ DownloadStats.DownloadedChunkCount++;
+ if (!AbortFlag.load())
+ {
+ uint64_t PayloadSize = Workload->TempFile.FileSize();
+ void* FileHandle = Workload->TempFile.Detach();
+ ZEN_ASSERT(FileHandle != nullptr);
+ IoBuffer Payload(IoBuffer::File, FileHandle, 0, PayloadSize, true);
+ Payload.SetDeleteOnClose(true);
+ OnDownloadComplete(std::move(Payload));
}
});
if (!WorkItems.empty())
diff --git a/src/zenutil/filebuildstorage.cpp b/src/zenutil/filebuildstorage.cpp
index 7aa252e44..f335a03a3 100644
--- a/src/zenutil/filebuildstorage.cpp
+++ b/src/zenutil/filebuildstorage.cpp
@@ -369,11 +369,11 @@ public:
return IoBuffer{};
}
- virtual std::vector<std::function<void()>> GetLargeBuildBlob(
- const Oid& BuildId,
- const IoHash& RawHash,
- uint64_t ChunkSize,
- std::function<void(uint64_t Offset, const IoBuffer& Chunk, uint64_t BytesRemaining)>&& Receiver) override
+ virtual std::vector<std::function<void()>> GetLargeBuildBlob(const Oid& BuildId,
+ const IoHash& RawHash,
+ uint64_t ChunkSize,
+ std::function<void(uint64_t Offset, const IoBuffer& Chunk)>&& OnReceive,
+ std::function<void()>&& OnComplete) override
{
ZEN_TRACE_CPU("FileBuildStorage::GetLargeBuildBlob");
ZEN_UNUSED(BuildId);
@@ -387,16 +387,18 @@ public:
{
struct WorkloadData
{
- std::atomic<uint64_t> BytesRemaining;
- BasicFile BlobFile;
- std::function<void(uint64_t Offset, const IoBuffer& Chunk, uint64_t BytesRemaining)> Receiver;
+ std::atomic<uint64_t> BytesRemaining;
+ BasicFile BlobFile;
+ std::function<void(uint64_t Offset, const IoBuffer& Chunk)> OnReceive;
+ std::function<void()> OnComplete;
};
std::shared_ptr<WorkloadData> Workload(std::make_shared<WorkloadData>());
Workload->BlobFile.Open(BlockPath, BasicFile::Mode::kRead);
const uint64_t BlobSize = Workload->BlobFile.FileSize();
- Workload->Receiver = std::move(Receiver);
+ Workload->OnReceive = std::move(OnReceive);
+ Workload->OnComplete = std::move(OnComplete);
Workload->BytesRemaining = BlobSize;
std::vector<std::function<void()>> WorkItems;
@@ -410,8 +412,12 @@ public:
IoBuffer PartPayload(Size);
Workload->BlobFile.Read(PartPayload.GetMutableView().GetData(), Size, Offset);
m_Stats.TotalBytesRead += PartPayload.GetSize();
+ Workload->OnReceive(Offset, PartPayload);
uint64_t ByteRemaning = Workload->BytesRemaining.fetch_sub(Size);
- Workload->Receiver(Offset, PartPayload, ByteRemaning);
+ if (ByteRemaning == Size)
+ {
+ Workload->OnComplete();
+ }
SimulateLatency(Size, PartPayload.GetSize());
});
diff --git a/src/zenutil/include/zenutil/buildstorage.h b/src/zenutil/include/zenutil/buildstorage.h
index b0665dbf8..05e3ca22d 100644
--- a/src/zenutil/include/zenutil/buildstorage.h
+++ b/src/zenutil/include/zenutil/buildstorage.h
@@ -47,12 +47,12 @@ public:
virtual IoBuffer GetBuildBlob(const Oid& BuildId,
const IoHash& RawHash,
uint64_t RangeOffset = 0,
- uint64_t RangeBytes = (uint64_t)-1) = 0;
- virtual std::vector<std::function<void()>> GetLargeBuildBlob(
- const Oid& BuildId,
- const IoHash& RawHash,
- uint64_t ChunkSize,
- std::function<void(uint64_t Offset, const IoBuffer& Chunk, uint64_t BytesRemaining)>&& Receiver) = 0;
+ uint64_t RangeBytes = (uint64_t)-1) = 0;
+ virtual std::vector<std::function<void()>> GetLargeBuildBlob(const Oid& BuildId,
+ const IoHash& RawHash,
+ uint64_t ChunkSize,
+ std::function<void(uint64_t Offset, const IoBuffer& Chunk)>&& OnReceive,
+ std::function<void()>&& OnComplete) = 0;
virtual void PutBlockMetadata(const Oid& BuildId, const IoHash& BlockRawHash, const CbObject& MetaData) = 0;
virtual CbObject FindBlocks(const Oid& BuildId, uint64_t MaxBlockCount) = 0;
diff --git a/src/zenutil/include/zenutil/jupiter/jupitersession.h b/src/zenutil/include/zenutil/jupiter/jupitersession.h
index 417ed7384..c2886ca4c 100644
--- a/src/zenutil/include/zenutil/jupiter/jupitersession.h
+++ b/src/zenutil/include/zenutil/jupiter/jupitersession.h
@@ -135,13 +135,14 @@ public:
uint64_t PayloadSize,
std::function<IoBuffer(uint64_t Offset, uint64_t Size)>&& Transmitter,
std::vector<std::function<JupiterResult(bool& OutIsComplete)>>& OutWorkItems);
- JupiterResult GetMultipartBuildBlob(std::string_view Namespace,
- std::string_view BucketId,
- const Oid& BuildId,
- const IoHash& Hash,
- uint64_t ChunkSize,
- std::function<void(uint64_t Offset, const IoBuffer& Chunk, uint64_t BytesRemaining)>&& Receiver,
- std::vector<std::function<JupiterResult()>>& OutWorkItems);
+ JupiterResult GetMultipartBuildBlob(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const IoHash& Hash,
+ uint64_t ChunkSize,
+ std::function<void(uint64_t Offset, const IoBuffer& Chunk)>&& OnReceive,
+ std::function<void()>&& OnComplete,
+ std::vector<std::function<JupiterResult()>>& OutWorkItems);
JupiterResult PutBlockMetadata(std::string_view Namespace,
std::string_view BucketId,
const Oid& BuildId,
diff --git a/src/zenutil/jupiter/jupiterbuildstorage.cpp b/src/zenutil/jupiter/jupiterbuildstorage.cpp
index f2d190408..24e062c7b 100644
--- a/src/zenutil/jupiter/jupiterbuildstorage.cpp
+++ b/src/zenutil/jupiter/jupiterbuildstorage.cpp
@@ -235,19 +235,25 @@ public:
return std::move(GetBuildBlobResult.Response);
}
- virtual std::vector<std::function<void()>> GetLargeBuildBlob(
- const Oid& BuildId,
- const IoHash& RawHash,
- uint64_t ChunkSize,
- std::function<void(uint64_t Offset, const IoBuffer& Chunk, uint64_t BytesRemaining)>&& Receiver) override
+ virtual std::vector<std::function<void()>> GetLargeBuildBlob(const Oid& BuildId,
+ const IoHash& RawHash,
+ uint64_t ChunkSize,
+ std::function<void(uint64_t Offset, const IoBuffer& Chunk)>&& OnReceive,
+ std::function<void()>&& OnComplete) override
{
ZEN_TRACE_CPU("Jupiter::GetLargeBuildBlob");
Stopwatch ExecutionTimer;
auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
std::vector<std::function<JupiterResult()>> WorkItems;
- JupiterResult GetMultipartBlobResult =
- m_Session.GetMultipartBuildBlob(m_Namespace, m_Bucket, BuildId, RawHash, ChunkSize, std::move(Receiver), WorkItems);
+ JupiterResult GetMultipartBlobResult = m_Session.GetMultipartBuildBlob(m_Namespace,
+ m_Bucket,
+ BuildId,
+ RawHash,
+ ChunkSize,
+ std::move(OnReceive),
+ std::move(OnComplete),
+ WorkItems);
AddStatistic(GetMultipartBlobResult);
if (!GetMultipartBlobResult.Success)
diff --git a/src/zenutil/jupiter/jupitersession.cpp b/src/zenutil/jupiter/jupitersession.cpp
index fde86a478..1f71c29b7 100644
--- a/src/zenutil/jupiter/jupitersession.cpp
+++ b/src/zenutil/jupiter/jupitersession.cpp
@@ -626,13 +626,14 @@ JupiterSession::PutMultipartBuildBlob(std::string_view Namespace,
}
JupiterResult
-JupiterSession::GetMultipartBuildBlob(std::string_view Namespace,
- std::string_view BucketId,
- const Oid& BuildId,
- const IoHash& Hash,
- uint64_t ChunkSize,
- std::function<void(uint64_t Offset, const IoBuffer& Chunk, uint64_t BytesRemaining)>&& Receiver,
- std::vector<std::function<JupiterResult()>>& OutWorkItems)
+JupiterSession::GetMultipartBuildBlob(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const IoHash& Hash,
+ uint64_t ChunkSize,
+ std::function<void(uint64_t Offset, const IoBuffer& Chunk)>&& OnReceive,
+ std::function<void()>&& OnComplete,
+ std::vector<std::function<JupiterResult()>>& OutWorkItems)
{
std::string RequestUrl = fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}", Namespace, BucketId, BuildId, Hash.ToHexString());
HttpClient::Response Response =
@@ -649,18 +650,20 @@ JupiterSession::GetMultipartBuildBlob(std::string_view Namespa
uint64_t TotalSize = TotalSizeMaybe.value();
uint64_t PayloadSize = Response.ResponsePayload.GetSize();
- Receiver(0, Response.ResponsePayload, TotalSize);
+ OnReceive(0, Response.ResponsePayload);
if (TotalSize > PayloadSize)
{
struct WorkloadData
{
- std::function<void(uint64_t Offset, const IoBuffer& Chunk, uint64_t BytesRemaining)> Receiver;
- std::atomic<uint64_t> BytesRemaining;
+ std::function<void(uint64_t Offset, const IoBuffer& Chunk)> OnReceive;
+ std::function<void()> OnComplete;
+ std::atomic<uint64_t> BytesRemaining;
};
std::shared_ptr<WorkloadData> Workload(std::make_shared<WorkloadData>());
- Workload->Receiver = std::move(Receiver);
+ Workload->OnReceive = std::move(OnReceive);
+ Workload->OnComplete = std::move(OnComplete);
Workload->BytesRemaining = TotalSize - PayloadSize;
uint64_t Offset = PayloadSize;
@@ -676,19 +679,28 @@ JupiterSession::GetMultipartBuildBlob(std::string_view Namespa
HttpClient::KeyValueMap({{"Range", fmt::format("bytes={}-{}", Offset, Offset + PartSize - 1)}}));
if (Response.IsSuccess())
{
+ Workload->OnReceive(Offset, Response.ResponsePayload);
uint64_t ByteRemaning = Workload->BytesRemaining.fetch_sub(Response.ResponsePayload.GetSize());
- Workload->Receiver(Offset, Response.ResponsePayload, ByteRemaning);
+ if (ByteRemaning == Response.ResponsePayload.GetSize())
+ {
+ Workload->OnComplete();
+ }
}
return detail::ConvertResponse(Response, "JupiterSession::GetMultipartBuildBlob"sv);
});
Offset += PartSize;
}
}
+ else
+ {
+ OnComplete();
+ }
return detail::ConvertResponse(Response, "JupiterSession::GetMultipartBuildBlob"sv);
}
}
}
- Receiver(0, Response.ResponsePayload, Response.ResponsePayload.GetSize());
+ OnReceive(0, Response.ResponsePayload);
+ OnComplete();
}
return detail::ConvertResponse(Response, "JupiterSession::GetMultipartBuildBlob"sv);
}