aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-05-19 22:25:58 +0200
committerGitHub Enterprise <[email protected]>2025-05-19 22:25:58 +0200
commit49701314f570da3622f11eb37cc889c7d39d9a93 (patch)
tree6159bfc2ba7974a453ded7a58813134e523e9a62 /src
parentparallel work handle dispatch exception (#400) (diff)
downloadzen-49701314f570da3622f11eb37cc889c7d39d9a93.tar.xz
zen-49701314f570da3622f11eb37cc889c7d39d9a93.zip
handle exception with batch work (#401)
* use ParallelWork in rpc playback * use ParallelWork in projectstore * use ParallelWork in buildstore * use ParallelWork in cachedisklayer * use ParallelWork in compactcas * use ParallelWork in filecas * don't set abort flag in ParallelWork destructor * add PrepareFileForScatteredWrite for temp files in httpclient * Use PrepareFileForScatteredWrite when stream-decompressing files * be more relaxed when deleting temp files * allow explicit zen-cache when using direct host url without resolving * fix lambda capture when writing loose chunks * no delay when attempting to remove temp files
Diffstat (limited to 'src')
-rw-r--r--src/zen/cmds/builds_cmd.cpp542
-rw-r--r--src/zenhttp/httpclient.cpp6
-rw-r--r--src/zenserver/cache/httpstructuredcache.cpp17
-rw-r--r--src/zenserver/projectstore/projectstore.cpp40
-rw-r--r--src/zenstore/buildstore/buildstore.cpp58
-rw-r--r--src/zenstore/cache/cachedisklayer.cpp25
-rw-r--r--src/zenstore/compactcas.cpp128
-rw-r--r--src/zenstore/filecas.cpp70
-rw-r--r--src/zenutil/parallelwork.cpp1
9 files changed, 463 insertions, 424 deletions
diff --git a/src/zen/cmds/builds_cmd.cpp b/src/zen/cmds/builds_cmd.cpp
index 815bb7597..78d362fdb 100644
--- a/src/zen/cmds/builds_cmd.cpp
+++ b/src/zen/cmds/builds_cmd.cpp
@@ -244,6 +244,24 @@ namespace {
}
}
+ void TryRemoveFile(const std::filesystem::path& Path)
+ {
+ std::error_code Ec;
+ RemoveFile(Path, Ec);
+ if (Ec)
+ {
+ if (IsFile(Path, Ec))
+ {
+ Ec.clear();
+ RemoveFile(Path, Ec);
+ if (Ec)
+ {
+ ZEN_DEBUG("Failed removing file '{}', reason: {}", Path, Ec.message());
+ }
+ }
+ }
+ }
+
void RemoveFileWithRetry(const std::filesystem::path& Path)
{
std::error_code Ec;
@@ -4864,6 +4882,7 @@ namespace {
{
throw std::runtime_error(fmt::format("RawHash in header {} in large blob {} does match.", RawHash, SequenceRawHash));
}
+ PrepareFileForScatteredWrite(DecompressedTemp.Handle(), RawSize);
IoHashStream Hash;
bool CouldDecompress = Compressed.DecompressToStream(
@@ -5055,7 +5074,10 @@ namespace {
FilteredWrittenBytesPerSecond.Stop();
}
- RemoveFileWithRetry(CompressedChunkPath);
+ if (!CompressedChunkPath.empty())
+ {
+ TryRemoveFile(CompressedChunkPath);
+ }
std::vector<uint32_t> CompletedSequences =
CompleteChunkTargets(ChunkTargetPtrs, SequenceIndexChunksLeftToWriteCounters);
@@ -5304,7 +5326,7 @@ namespace {
}
}
}
- RemoveFileWithRetry(CacheDirContent.Files[Index]);
+ TryRemoveFile(CacheDirContent.Files[Index]);
}
CacheMappingStats.CacheScanElapsedWallTimeUs += CacheTimer.GetElapsedTimeUs();
}
@@ -5353,7 +5375,7 @@ namespace {
}
}
}
- RemoveFileWithRetry(BlockDirContent.Files[Index]);
+ TryRemoveFile(BlockDirContent.Files[Index]);
}
CacheMappingStats.CacheScanElapsedWallTimeUs += CacheTimer.GetElapsedTimeUs();
@@ -6227,279 +6249,284 @@ namespace {
continue;
}
- Work.ScheduleWork(WritePool, [&, RemoteChunkIndex, ChunkTargetPtrs](std::atomic<bool>&) mutable {
- if (!AbortFlag)
- {
- ZEN_TRACE_CPU("UpdateFolder_ReadPreDownloaded");
- std::filesystem::path ExistingCompressedChunkPath;
- if (!PrimeCacheOnly)
+ Work.ScheduleWork(
+ WritePool,
+ [&, RemoteChunkIndex, ChunkTargetPtrs, BuildId, TotalRequestCount, TotalPartWriteCount](std::atomic<bool>&) mutable {
+ if (!AbortFlag)
{
- const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex];
- std::filesystem::path CompressedChunkPath = ZenTempDownloadFolderPath(ZenFolderPath) / ChunkHash.ToHexString();
- if (IsFile(CompressedChunkPath))
+ ZEN_TRACE_CPU("UpdateFolder_ReadPreDownloaded");
+ std::filesystem::path ExistingCompressedChunkPath;
+ if (!PrimeCacheOnly)
{
- IoBuffer ExistingCompressedPart = IoBufferBuilder::MakeFromFile(ExistingCompressedChunkPath);
- if (ExistingCompressedPart)
+ const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex];
+ std::filesystem::path CompressedChunkPath =
+ ZenTempDownloadFolderPath(ZenFolderPath) / ChunkHash.ToHexString();
+ if (IsFile(CompressedChunkPath))
{
- IoHash RawHash;
- uint64_t RawSize;
- if (CompressedBuffer::ValidateCompressedHeader(ExistingCompressedPart, RawHash, RawSize))
+ IoBuffer ExistingCompressedPart = IoBufferBuilder::MakeFromFile(ExistingCompressedChunkPath);
+ if (ExistingCompressedPart)
{
- DownloadStats.RequestsCompleteCount++;
- if (DownloadStats.RequestsCompleteCount == TotalRequestCount)
+ IoHash RawHash;
+ uint64_t RawSize;
+ if (CompressedBuffer::ValidateCompressedHeader(ExistingCompressedPart, RawHash, RawSize))
{
- FilteredDownloadedBytesPerSecond.Stop();
+ DownloadStats.RequestsCompleteCount++;
+ if (DownloadStats.RequestsCompleteCount == TotalRequestCount)
+ {
+ FilteredDownloadedBytesPerSecond.Stop();
+ }
+ ExistingCompressedChunkPath = std::move(CompressedChunkPath);
+ }
+ else
+ {
+ std::error_code DummyEc;
+ RemoveFile(CompressedChunkPath, DummyEc);
}
- ExistingCompressedChunkPath = std::move(CompressedChunkPath);
- }
- else
- {
- std::error_code DummyEc;
- RemoveFile(CompressedChunkPath, DummyEc);
}
}
}
- }
- if (!AbortFlag)
+ if (!AbortFlag)
- {
- if (!ExistingCompressedChunkPath.empty())
{
- Work.ScheduleWork(
- WritePool,
- [&Path,
- &ZenFolderPath,
- &RemoteContent,
- &RemoteLookup,
- &CacheFolderPath,
- &SequenceIndexChunksLeftToWriteCounters,
- &Work,
- &WritePool,
- &DiskStats,
- &WriteChunkStats,
- &WritePartsComplete,
- TotalPartWriteCount,
- &FilteredWrittenBytesPerSecond,
- RemoteChunkIndex,
- ChunkTargetPtrs,
- CompressedChunkPath = std::move(ExistingCompressedChunkPath)](std::atomic<bool>&) mutable {
- if (!AbortFlag)
- {
- ZEN_TRACE_CPU("UpdateFolder_WritePreDownloaded");
-
- FilteredWrittenBytesPerSecond.Start();
-
- const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex];
-
- IoBuffer CompressedPart = IoBufferBuilder::MakeFromFile(CompressedChunkPath);
- if (!CompressedPart)
+ if (!ExistingCompressedChunkPath.empty())
+ {
+ Work.ScheduleWork(
+ WritePool,
+ [&Path,
+ &ZenFolderPath,
+ &RemoteContent,
+ &RemoteLookup,
+ &CacheFolderPath,
+ &SequenceIndexChunksLeftToWriteCounters,
+ &Work,
+ &WritePool,
+ &DiskStats,
+ &WriteChunkStats,
+ &WritePartsComplete,
+ TotalPartWriteCount,
+ &FilteredWrittenBytesPerSecond,
+ RemoteChunkIndex,
+ ChunkTargetPtrs,
+ CompressedChunkPath = std::move(ExistingCompressedChunkPath)](std::atomic<bool>&) mutable {
+ if (!AbortFlag)
{
- throw std::runtime_error(fmt::format("Could not open dowloaded compressed chunk {} from {}",
- ChunkHash,
- CompressedChunkPath));
- }
+ ZEN_TRACE_CPU("UpdateFolder_WritePreDownloaded");
- std::filesystem::path TargetFolder = ZenTempCacheFolderPath(ZenFolderPath);
- bool NeedHashVerify = WriteCompressedChunk(TargetFolder,
- RemoteContent,
- RemoteLookup,
- ChunkHash,
- ChunkTargetPtrs,
- std::move(CompressedPart),
- DiskStats);
- WritePartsComplete++;
+ FilteredWrittenBytesPerSecond.Start();
- if (!AbortFlag)
- {
- if (WritePartsComplete == TotalPartWriteCount)
+ const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex];
+
+ IoBuffer CompressedPart = IoBufferBuilder::MakeFromFile(CompressedChunkPath);
+ if (!CompressedPart)
{
- FilteredWrittenBytesPerSecond.Stop();
+ throw std::runtime_error(
+ fmt::format("Could not open dowloaded compressed chunk {} from {}",
+ ChunkHash,
+ CompressedChunkPath));
}
- RemoveFileWithRetry(CompressedChunkPath);
+ std::filesystem::path TargetFolder = ZenTempCacheFolderPath(ZenFolderPath);
+ bool NeedHashVerify = WriteCompressedChunk(TargetFolder,
+ RemoteContent,
+ RemoteLookup,
+ ChunkHash,
+ ChunkTargetPtrs,
+ std::move(CompressedPart),
+ DiskStats);
+ WritePartsComplete++;
- std::vector<uint32_t> CompletedSequences =
- CompleteChunkTargets(ChunkTargetPtrs, SequenceIndexChunksLeftToWriteCounters);
- if (NeedHashVerify)
- {
- VerifyAndCompleteChunkSequencesAsync(TargetFolder,
- RemoteContent,
- RemoteLookup,
- CompletedSequences,
- Work,
- WritePool);
- }
- else
+ if (!AbortFlag)
{
- FinalizeChunkSequences(TargetFolder, RemoteContent, CompletedSequences);
+ if (WritePartsComplete == TotalPartWriteCount)
+ {
+ FilteredWrittenBytesPerSecond.Stop();
+ }
+
+ TryRemoveFile(CompressedChunkPath);
+
+ std::vector<uint32_t> CompletedSequences =
+ CompleteChunkTargets(ChunkTargetPtrs, SequenceIndexChunksLeftToWriteCounters);
+ if (NeedHashVerify)
+ {
+ VerifyAndCompleteChunkSequencesAsync(TargetFolder,
+ RemoteContent,
+ RemoteLookup,
+ CompletedSequences,
+ Work,
+ WritePool);
+ }
+ else
+ {
+ FinalizeChunkSequences(TargetFolder, RemoteContent, CompletedSequences);
+ }
}
}
- }
- });
- }
- else
- {
- Work.ScheduleWork(
- NetworkPool,
- [&Path,
- &ZenFolderPath,
- &Storage,
- BuildId,
- &PrimeCacheOnly,
- &RemoteContent,
- &RemoteLookup,
- &ExistsResult,
- &SequenceIndexChunksLeftToWriteCounters,
- &Work,
- &WritePool,
- &NetworkPool,
- &DiskStats,
- &WriteChunkStats,
- &WritePartsComplete,
- TotalPartWriteCount,
- TotalRequestCount,
- &FilteredDownloadedBytesPerSecond,
- &FilteredWrittenBytesPerSecond,
- LargeAttachmentSize,
- PreferredMultipartChunkSize,
- RemoteChunkIndex,
- ChunkTargetPtrs,
- &DownloadStats](std::atomic<bool>&) mutable {
- if (!AbortFlag)
- {
- const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex];
- FilteredDownloadedBytesPerSecond.Start();
- IoBuffer BuildBlob;
- const bool ExistsInCache =
- Storage.BuildCacheStorage && ExistsResult.ExistingBlobs.contains(ChunkHash);
- if (ExistsInCache)
- {
- BuildBlob = Storage.BuildCacheStorage->GetBuildBlob(BuildId, ChunkHash);
- }
- if (BuildBlob)
+ });
+ }
+ else
+ {
+ Work.ScheduleWork(
+ NetworkPool,
+ [&Path,
+ &ZenFolderPath,
+ &Storage,
+ BuildId,
+ &PrimeCacheOnly,
+ &RemoteContent,
+ &RemoteLookup,
+ &ExistsResult,
+ &SequenceIndexChunksLeftToWriteCounters,
+ &Work,
+ &WritePool,
+ &NetworkPool,
+ &DiskStats,
+ &WriteChunkStats,
+ &WritePartsComplete,
+ TotalPartWriteCount,
+ TotalRequestCount,
+ &FilteredDownloadedBytesPerSecond,
+ &FilteredWrittenBytesPerSecond,
+ LargeAttachmentSize,
+ PreferredMultipartChunkSize,
+ RemoteChunkIndex,
+ ChunkTargetPtrs,
+ &DownloadStats](std::atomic<bool>&) mutable {
+ if (!AbortFlag)
{
- uint64_t BlobSize = BuildBlob.GetSize();
- DownloadStats.DownloadedChunkCount++;
- DownloadStats.DownloadedChunkByteCount += BlobSize;
- DownloadStats.RequestsCompleteCount++;
- if (DownloadStats.RequestsCompleteCount == TotalRequestCount)
+ const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex];
+ FilteredDownloadedBytesPerSecond.Start();
+ IoBuffer BuildBlob;
+ const bool ExistsInCache =
+ Storage.BuildCacheStorage && ExistsResult.ExistingBlobs.contains(ChunkHash);
+ if (ExistsInCache)
{
- FilteredDownloadedBytesPerSecond.Stop();
+ BuildBlob = Storage.BuildCacheStorage->GetBuildBlob(BuildId, ChunkHash);
}
- AsyncWriteDownloadedChunk(ZenFolderPath,
- RemoteContent,
- RemoteLookup,
- RemoteChunkIndex,
- std::move(ChunkTargetPtrs),
- Work,
- WritePool,
- std::move(BuildBlob),
- SequenceIndexChunksLeftToWriteCounters,
- WritePartsComplete,
- TotalPartWriteCount,
- FilteredWrittenBytesPerSecond,
- DiskStats);
- }
- else
- {
- if (RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex] >= LargeAttachmentSize)
+ if (BuildBlob)
{
- ZEN_TRACE_CPU("UpdateFolder_GetLargeChunk");
- DownloadLargeBlob(
- *Storage.BuildStorage,
- ZenTempDownloadFolderPath(ZenFolderPath),
- BuildId,
- ChunkHash,
- PreferredMultipartChunkSize,
- Work,
- NetworkPool,
- DownloadStats,
- [&, TotalPartWriteCount, TotalRequestCount, RemoteChunkIndex, ChunkTargetPtrs](
- IoBuffer&& Payload) mutable {
- DownloadStats.RequestsCompleteCount++;
- if (DownloadStats.RequestsCompleteCount == TotalRequestCount)
- {
- FilteredDownloadedBytesPerSecond.Stop();
- }
- if (Payload && Storage.BuildCacheStorage)
- {
- Storage.BuildCacheStorage->PutBuildBlob(
- BuildId,
- ChunkHash,
- ZenContentType::kCompressedBinary,
- CompositeBuffer(SharedBuffer(Payload)));
- }
- if (!PrimeCacheOnly)
- {
- if (!AbortFlag)
- {
- AsyncWriteDownloadedChunk(ZenFolderPath,
- RemoteContent,
- RemoteLookup,
- RemoteChunkIndex,
- std::move(ChunkTargetPtrs),
- Work,
- WritePool,
- std::move(Payload),
- SequenceIndexChunksLeftToWriteCounters,
- WritePartsComplete,
- TotalPartWriteCount,
- FilteredWrittenBytesPerSecond,
- DiskStats);
- }
- }
- });
+ uint64_t BlobSize = BuildBlob.GetSize();
+ DownloadStats.DownloadedChunkCount++;
+ DownloadStats.DownloadedChunkByteCount += BlobSize;
+ DownloadStats.RequestsCompleteCount++;
+ if (DownloadStats.RequestsCompleteCount == TotalRequestCount)
+ {
+ FilteredDownloadedBytesPerSecond.Stop();
+ }
+ AsyncWriteDownloadedChunk(ZenFolderPath,
+ RemoteContent,
+ RemoteLookup,
+ RemoteChunkIndex,
+ std::move(ChunkTargetPtrs),
+ Work,
+ WritePool,
+ std::move(BuildBlob),
+ SequenceIndexChunksLeftToWriteCounters,
+ WritePartsComplete,
+ TotalPartWriteCount,
+ FilteredWrittenBytesPerSecond,
+ DiskStats);
}
else
{
- ZEN_TRACE_CPU("UpdateFolder_GetChunk");
- BuildBlob = Storage.BuildStorage->GetBuildBlob(BuildId, ChunkHash);
- if (BuildBlob && Storage.BuildCacheStorage)
+ if (RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex] >= LargeAttachmentSize)
{
- Storage.BuildCacheStorage->PutBuildBlob(BuildId,
- ChunkHash,
- BuildBlob.GetContentType(),
- CompositeBuffer(SharedBuffer(BuildBlob)));
- }
- if (!BuildBlob)
- {
- throw std::runtime_error(fmt::format("Chunk {} is missing", ChunkHash));
+ ZEN_TRACE_CPU("UpdateFolder_GetLargeChunk");
+ DownloadLargeBlob(
+ *Storage.BuildStorage,
+ ZenTempDownloadFolderPath(ZenFolderPath),
+ BuildId,
+ ChunkHash,
+ PreferredMultipartChunkSize,
+ Work,
+ NetworkPool,
+ DownloadStats,
+ [&, TotalPartWriteCount, TotalRequestCount, RemoteChunkIndex, ChunkTargetPtrs](
+ IoBuffer&& Payload) mutable {
+ DownloadStats.RequestsCompleteCount++;
+ if (DownloadStats.RequestsCompleteCount == TotalRequestCount)
+ {
+ FilteredDownloadedBytesPerSecond.Stop();
+ }
+ if (Payload && Storage.BuildCacheStorage)
+ {
+ Storage.BuildCacheStorage->PutBuildBlob(
+ BuildId,
+ ChunkHash,
+ ZenContentType::kCompressedBinary,
+ CompositeBuffer(SharedBuffer(Payload)));
+ }
+ if (!PrimeCacheOnly)
+ {
+ if (!AbortFlag)
+ {
+ AsyncWriteDownloadedChunk(ZenFolderPath,
+ RemoteContent,
+ RemoteLookup,
+ RemoteChunkIndex,
+ std::move(ChunkTargetPtrs),
+ Work,
+ WritePool,
+ std::move(Payload),
+ SequenceIndexChunksLeftToWriteCounters,
+ WritePartsComplete,
+ TotalPartWriteCount,
+ FilteredWrittenBytesPerSecond,
+ DiskStats);
+ }
+ }
+ });
}
- if (!PrimeCacheOnly)
+ else
{
- if (!AbortFlag)
+ ZEN_TRACE_CPU("UpdateFolder_GetChunk");
+ BuildBlob = Storage.BuildStorage->GetBuildBlob(BuildId, ChunkHash);
+ if (BuildBlob && Storage.BuildCacheStorage)
{
- uint64_t BlobSize = BuildBlob.GetSize();
- DownloadStats.DownloadedChunkCount++;
- DownloadStats.DownloadedChunkByteCount += BlobSize;
- DownloadStats.RequestsCompleteCount++;
- if (DownloadStats.RequestsCompleteCount == TotalRequestCount)
+ Storage.BuildCacheStorage->PutBuildBlob(
+ BuildId,
+ ChunkHash,
+ BuildBlob.GetContentType(),
+ CompositeBuffer(SharedBuffer(BuildBlob)));
+ }
+ if (!BuildBlob)
+ {
+ throw std::runtime_error(fmt::format("Chunk {} is missing", ChunkHash));
+ }
+ if (!PrimeCacheOnly)
+ {
+ if (!AbortFlag)
{
- FilteredDownloadedBytesPerSecond.Stop();
+ uint64_t BlobSize = BuildBlob.GetSize();
+ DownloadStats.DownloadedChunkCount++;
+ DownloadStats.DownloadedChunkByteCount += BlobSize;
+ DownloadStats.RequestsCompleteCount++;
+ if (DownloadStats.RequestsCompleteCount == TotalRequestCount)
+ {
+ FilteredDownloadedBytesPerSecond.Stop();
+ }
+ AsyncWriteDownloadedChunk(ZenFolderPath,
+ RemoteContent,
+ RemoteLookup,
+ RemoteChunkIndex,
+ std::move(ChunkTargetPtrs),
+ Work,
+ WritePool,
+ std::move(BuildBlob),
+ SequenceIndexChunksLeftToWriteCounters,
+ WritePartsComplete,
+ TotalPartWriteCount,
+ FilteredWrittenBytesPerSecond,
+ DiskStats);
}
- AsyncWriteDownloadedChunk(ZenFolderPath,
- RemoteContent,
- RemoteLookup,
- RemoteChunkIndex,
- std::move(ChunkTargetPtrs),
- Work,
- WritePool,
- std::move(BuildBlob),
- SequenceIndexChunksLeftToWriteCounters,
- WritePartsComplete,
- TotalPartWriteCount,
- FilteredWrittenBytesPerSecond,
- DiskStats);
}
}
}
}
- }
- });
+ });
+ }
}
}
- }
- });
+ });
}
for (size_t CopyDataIndex = 0; CopyDataIndex < CacheCopyDatas.size(); CopyDataIndex++)
@@ -6728,8 +6755,11 @@ namespace {
RemoveFile(BlockChunkPath, DummyEc);
throw std::runtime_error(fmt::format("Block {} is malformed", BlockDescription.BlockHash));
}
+
+ TryRemoveFile(BlockChunkPath);
+
WritePartsComplete++;
- RemoveFileWithRetry(BlockChunkPath);
+
if (WritePartsComplete == TotalPartWriteCount)
{
FilteredWrittenBytesPerSecond.Stop();
@@ -6884,7 +6914,7 @@ namespace {
if (!BlockChunkPath.empty())
{
- RemoveFileWithRetry(BlockChunkPath);
+ TryRemoveFile(BlockChunkPath);
}
WritePartsComplete++;
@@ -7059,7 +7089,7 @@ namespace {
if (!BlockChunkPath.empty())
{
- RemoveFileWithRetry(BlockChunkPath);
+ TryRemoveFile(BlockChunkPath);
}
WritePartsComplete++;
@@ -9456,6 +9486,22 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
std::string CloudHost;
+ auto TestCacheEndpoint = [](std::string_view BaseUrl, const bool AssumeHttp2) -> std::pair<bool, std::string> {
+ HttpClientSettings TestClientSettings{.LogCategory = "httpcacheclient",
+ .ConnectTimeout = std::chrono::milliseconds{1000},
+ .Timeout = std::chrono::milliseconds{2000},
+ .AssumeHttp2 = AssumeHttp2,
+ .AllowResume = true,
+ .RetryCount = 0};
+ HttpClient TestHttpClient(BaseUrl, TestClientSettings);
+ HttpClient::Response TestResponse = TestHttpClient.Get("/status/builds");
+ if (TestResponse.IsSuccess())
+ {
+ return {true, ""};
+ }
+ return {false, TestResponse.ErrorMessage("")};
+ };
+
if (!m_Host.empty())
{
if (m_OverrideHost.empty() || m_ZenCacheHost.empty())
@@ -9535,22 +9581,6 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
throw std::runtime_error(fmt::format("Host {} could not be reached. Reason: {}", m_OverrideHost, TestResult.second));
}
- auto TestCacheEndpoint = [](std::string_view BaseUrl, const bool AssumeHttp2) -> std::pair<bool, std::string> {
- HttpClientSettings TestClientSettings{.LogCategory = "httpcacheclient",
- .ConnectTimeout = std::chrono::milliseconds{1000},
- .Timeout = std::chrono::milliseconds{2000},
- .AssumeHttp2 = AssumeHttp2,
- .AllowResume = true,
- .RetryCount = 0};
- HttpClient TestHttpClient(BaseUrl, TestClientSettings);
- HttpClient::Response TestResponse = TestHttpClient.Get("/status/builds");
- if (TestResponse.IsSuccess())
- {
- return {true, ""};
- }
- return {false, TestResponse.ErrorMessage("")};
- };
-
if (m_ZenCacheHost.empty())
{
CbArrayView CacheEndpointsArray = ResponseObjectView["cacheEndpoints"sv].AsArrayView();
@@ -9680,7 +9710,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
{
CacheDescription += fmt::format(" Bucket '{}'", m_Bucket);
}
- Result.CacheName = BuildCacheName;
+ Result.CacheName = BuildCacheName.empty() ? m_ZenCacheHost : BuildCacheName;
}
ZEN_CONSOLE("Remote: {}", StorageDescription);
if (!Result.CacheName.empty())
diff --git a/src/zenhttp/httpclient.cpp b/src/zenhttp/httpclient.cpp
index ca1b820c9..a6e4d9290 100644
--- a/src/zenhttp/httpclient.cpp
+++ b/src/zenhttp/httpclient.cpp
@@ -86,7 +86,7 @@ namespace detail {
}
}
- std::error_code Open(const std::filesystem::path& TempFolderPath)
+ std::error_code Open(const std::filesystem::path& TempFolderPath, uint64_t FinalSize)
{
ZEN_TRACE_CPU("TempPayloadFile::Open");
ZEN_ASSERT(m_FileHandle == nullptr);
@@ -128,6 +128,8 @@ namespace detail {
#endif // ZEN_PLATFORM_WINDOWS
m_FileHandle = FileHandle;
+ PrepareFileForScatteredWrite(m_FileHandle, FinalSize);
+
return {};
}
@@ -1211,7 +1213,7 @@ HttpClient::Download(std::string_view Url, const std::filesystem::path& TempFold
if (ContentLength.value() > 1024 * 1024)
{
PayloadFile = std::make_unique<detail::TempPayloadFile>();
- std::error_code Ec = PayloadFile->Open(TempFolderPath);
+ std::error_code Ec = PayloadFile->Open(TempFolderPath, ContentLength.value());
if (Ec)
{
ZEN_WARN("Failed to create temp file in '{}' for HttpClient::Download. Reason: {}",
diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp
index b9a9ca380..f7e63433b 100644
--- a/src/zenserver/cache/httpstructuredcache.cpp
+++ b/src/zenserver/cache/httpstructuredcache.cpp
@@ -25,6 +25,7 @@
#include <zenutil/cache/cacherequests.h>
#include <zenutil/cache/rpcrecording.h>
#include <zenutil/jupiter/jupiterclient.h>
+#include <zenutil/parallelwork.h>
#include <zenutil/workerpools.h>
#include "upstream/upstreamcache.h"
@@ -1585,12 +1586,13 @@ HttpStructuredCacheService::ReplayRequestRecorder(const CacheRequestContext& Co
WorkerThreadPool WorkerPool(ThreadCount);
uint64_t RequestCount = Replayer.GetRequestCount();
Stopwatch Timer;
- auto _ = MakeGuard([&]() { ZEN_INFO("Replayed {} requests in {}", RequestCount, NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); });
- Latch JobLatch(RequestCount);
+ auto _ = MakeGuard([&]() { ZEN_INFO("Replayed {} requests in {}", RequestCount, NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); });
+ std::atomic<bool> AbortFlag;
+ ParallelWork Work(AbortFlag);
ZEN_INFO("Replaying {} requests", RequestCount);
for (uint64_t RequestIndex = 0; RequestIndex < RequestCount; ++RequestIndex)
{
- WorkerPool.ScheduleWork([this, &Context, &JobLatch, &Replayer, RequestIndex]() {
+ Work.ScheduleWork(WorkerPool, [this, &Context, &Replayer, RequestIndex](std::atomic<bool>&) {
IoBuffer Body;
zen::cache::RecordedRequestInfo RequestInfo = Replayer.GetRequest(RequestIndex, /* out */ Body);
@@ -1634,16 +1636,15 @@ HttpStructuredCacheService::ReplayRequestRecorder(const CacheRequestContext& Co
}
}
}
- JobLatch.CountDown();
});
}
- while (!JobLatch.Wait(10000))
- {
+ Work.Wait(10000, [&](bool IsAborted, std::ptrdiff_t PendingWork) {
+ ZEN_UNUSED(IsAborted);
ZEN_INFO("Replayed {} of {} requests, elapsed {}",
- RequestCount - JobLatch.Remaining(),
+ RequestCount - PendingWork,
RequestCount,
NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
- }
+ });
}
void
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp
index 6a55efdb7..7d22da717 100644
--- a/src/zenserver/projectstore/projectstore.cpp
+++ b/src/zenserver/projectstore/projectstore.cpp
@@ -22,6 +22,7 @@
#include <zenstore/scrubcontext.h>
#include <zenutil/cache/rpcrecording.h>
#include <zenutil/openprocesscache.h>
+#include <zenutil/parallelwork.h>
#include <zenutil/referencemetadata.h>
#include <zenutil/workerpools.h>
@@ -1588,17 +1589,14 @@ ProjectStore::Oplog::Validate(std::atomic_bool& IsCancelledFlag, WorkerThreadPoo
}
};
- Latch WorkLatch(1);
-
+ std::atomic<bool> AbortFlag;
+ ParallelWork Work(AbortFlag);
for (uint32_t OpIndex = 0; !IsCancelledFlag && OpIndex < Result.OpCount; OpIndex++)
{
if (OptionalWorkerPool)
{
- WorkLatch.AddCount(1);
- OptionalWorkerPool->ScheduleWork([&, Index = OpIndex]() {
+ Work.ScheduleWork(*OptionalWorkerPool, [&, Index = OpIndex](std::atomic<bool>&) {
ZEN_MEMSCOPE(GetProjectstoreTag());
-
- auto _ = MakeGuard([&WorkLatch] { WorkLatch.CountDown(); });
ValidateOne(Index);
});
}
@@ -1607,9 +1605,7 @@ ProjectStore::Oplog::Validate(std::atomic_bool& IsCancelledFlag, WorkerThreadPoo
ValidateOne(OpIndex);
}
}
-
- WorkLatch.CountDown();
- WorkLatch.Wait();
+ Work.Wait();
{
// Check if we were deleted while we were checking the references without a lock...
@@ -2106,8 +2102,9 @@ ProjectStore::Oplog::IterateChunks(std::span<Oid> ChunkIds,
}
if (OptionalWorkerPool)
{
- std::atomic_bool Result = true;
- Latch WorkLatch(1);
+ std::atomic_bool Result = true;
+ std::atomic<bool> AbortFlag;
+ ParallelWork Work(AbortFlag);
for (size_t ChunkIndex = 0; ChunkIndex < FileChunkIndexes.size(); ChunkIndex++)
{
@@ -2115,10 +2112,10 @@ ProjectStore::Oplog::IterateChunks(std::span<Oid> ChunkIds,
{
break;
}
- WorkLatch.AddCount(1);
- OptionalWorkerPool->ScheduleWork(
- [this, &WorkLatch, &ChunkIds, IncludeModTag, ChunkIndex, &FileChunkIndexes, &FileChunkPaths, &AsyncCallback, &Result]() {
- auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); });
+ Work.ScheduleWork(
+ *OptionalWorkerPool,
+ [this, &ChunkIds, IncludeModTag, ChunkIndex, &FileChunkIndexes, &FileChunkPaths, &AsyncCallback, &Result](
+ std::atomic<bool>&) {
if (Result.load() == false)
{
return;
@@ -2162,8 +2159,7 @@ ProjectStore::Oplog::IterateChunks(std::span<Oid> ChunkIds,
LargeSizeLimit);
}
- WorkLatch.CountDown();
- WorkLatch.Wait();
+ Work.Wait();
return Result.load();
}
@@ -3886,13 +3882,12 @@ ProjectStore::Flush()
}
WorkerThreadPool& WorkerPool = GetSmallWorkerPool(EWorkloadType::Burst);
- Latch WorkLatch(1);
+ std::atomic<bool> AbortFlag;
+ ParallelWork Work(AbortFlag);
for (const Ref<Project>& Project : Projects)
{
- WorkLatch.AddCount(1);
- WorkerPool.ScheduleWork([this, &WorkLatch, Project]() {
- auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); });
+ Work.ScheduleWork(WorkerPool, [this, Project](std::atomic<bool>&) {
try
{
Project->Flush();
@@ -3904,8 +3899,7 @@ ProjectStore::Flush()
});
}
- WorkLatch.CountDown();
- WorkLatch.Wait();
+ Work.Wait();
}
void
diff --git a/src/zenstore/buildstore/buildstore.cpp b/src/zenstore/buildstore/buildstore.cpp
index b4891a742..6eb01dfc4 100644
--- a/src/zenstore/buildstore/buildstore.cpp
+++ b/src/zenstore/buildstore/buildstore.cpp
@@ -9,6 +9,7 @@
#include <zencore/scopeguard.h>
#include <zencore/trace.h>
#include <zencore/workthreadpool.h>
+#include <zenutil/parallelwork.h>
#include <zencore/uid.h>
#include <zencore/xxhash.h>
@@ -22,6 +23,7 @@ ZEN_THIRD_PARTY_INCLUDES_END
# include <zencore/compress.h>
# include <zencore/testing.h>
# include <zencore/testutils.h>
+# include <zenutil/workerpools.h>
#endif // ZEN_WITH_TESTS
namespace zen {
@@ -480,11 +482,12 @@ BuildStore::GetMetadatas(std::span<const IoHash> BlobHashes, WorkerThreadPool* O
if (!MetaLocations.empty())
{
- Latch WorkLatch(1);
+ std::atomic<bool> AbortFlag;
+ ParallelWork Work(AbortFlag);
m_MetadataBlockStore.IterateChunks(
MetaLocations,
- [this, OptionalWorkerPool, &Result, &MetaLocations, &MetaLocationResultIndexes, &ReferencedBlocks, DoOneBlock, &WorkLatch](
+ [this, OptionalWorkerPool, &Work, &Result, &MetaLocations, &MetaLocationResultIndexes, &ReferencedBlocks, DoOneBlock](
uint32_t BlockIndex,
std::span<const size_t> ChunkIndexes) -> bool {
ZEN_UNUSED(BlockIndex);
@@ -496,40 +499,31 @@ BuildStore::GetMetadatas(std::span<const IoHash> BlobHashes, WorkerThreadPool* O
{
ZEN_ASSERT(OptionalWorkerPool != nullptr);
std::vector<size_t> TmpChunkIndexes(ChunkIndexes.begin(), ChunkIndexes.end());
- WorkLatch.AddCount(1);
- try
- {
- OptionalWorkerPool->ScheduleWork([this,
- &Result,
- &MetaLocations,
- &MetaLocationResultIndexes,
- DoOneBlock,
- &WorkLatch,
- ChunkIndexes = std::move(TmpChunkIndexes)]() {
- auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); });
+ Work.ScheduleWork(
+ *OptionalWorkerPool,
+ [this, &Result, &MetaLocations, &MetaLocationResultIndexes, DoOneBlock, ChunkIndexes = std::move(TmpChunkIndexes)](
+ std::atomic<bool>& AbortFlag) {
+ if (AbortFlag)
+ {
+ return;
+ }
try
{
- DoOneBlock(MetaLocations, MetaLocationResultIndexes, ChunkIndexes, Result);
+ if (!DoOneBlock(MetaLocations, MetaLocationResultIndexes, ChunkIndexes, Result))
+ {
+ AbortFlag.store(true);
+ }
}
catch (const std::exception& Ex)
{
ZEN_WARN("Failed getting metadata for {} chunks. Reason: {}", ChunkIndexes.size(), Ex.what());
}
});
- }
- catch (const std::exception& Ex)
- {
- WorkLatch.CountDown();
- ZEN_ERROR("Failed dispatching async work to fetch metadata for {} chunks. Reason: {}",
- ChunkIndexes.size(),
- Ex.what());
- }
- return true;
+ return !Work.IsAborted();
}
});
- WorkLatch.CountDown();
- WorkLatch.Wait();
+ Work.Wait();
}
for (size_t Index = 0; Index < Result.size(); Index++)
{
@@ -1661,6 +1655,8 @@ TEST_CASE("BuildStore.Metadata")
ScopedTemporaryDirectory _;
+ WorkerThreadPool& WorkerPool = GetSmallWorkerPool(EWorkloadType::Burst);
+
BuildStoreConfig Config;
Config.RootDirectory = _.Path() / "build_store";
@@ -1678,7 +1674,7 @@ TEST_CASE("BuildStore.Metadata")
}
Store.PutMetadatas(BlobHashes, MetaPayloads);
- std::vector<IoBuffer> ValidateMetaPayloads = Store.GetMetadatas(BlobHashes, nullptr);
+ std::vector<IoBuffer> ValidateMetaPayloads = Store.GetMetadatas(BlobHashes, &WorkerPool);
CHECK(ValidateMetaPayloads.size() == MetaPayloads.size());
for (size_t I = 0; I < ValidateMetaPayloads.size(); I++)
{
@@ -1689,7 +1685,7 @@ TEST_CASE("BuildStore.Metadata")
{
GcManager Gc;
BuildStore Store(Config, Gc);
- std::vector<IoBuffer> ValidateMetaPayloads = Store.GetMetadatas(BlobHashes, nullptr);
+ std::vector<IoBuffer> ValidateMetaPayloads = Store.GetMetadatas(BlobHashes, &WorkerPool);
CHECK(ValidateMetaPayloads.size() == MetaPayloads.size());
for (size_t I = 0; I < ValidateMetaPayloads.size(); I++)
{
@@ -1715,7 +1711,7 @@ TEST_CASE("BuildStore.Metadata")
Store.PutBlob(CompressedBlobsHashes.back(), Payload);
}
- std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, nullptr);
+ std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, &WorkerPool);
for (const auto& MetadataIt : MetadataPayloads)
{
CHECK(!MetadataIt);
@@ -1741,7 +1737,7 @@ TEST_CASE("BuildStore.Metadata")
}
Store.PutMetadatas(CompressedBlobsHashes, BlobMetaPayloads);
- std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, nullptr);
+ std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, &WorkerPool);
CHECK(MetadataPayloads.size() == BlobMetaPayloads.size());
for (size_t I = 0; I < MetadataPayloads.size(); I++)
{
@@ -1754,7 +1750,7 @@ TEST_CASE("BuildStore.Metadata")
GcManager Gc;
BuildStore Store(Config, Gc);
- std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, nullptr);
+ std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, &WorkerPool);
CHECK(MetadataPayloads.size() == BlobMetaPayloads.size());
for (size_t I = 0; I < MetadataPayloads.size(); I++)
{
@@ -1783,7 +1779,7 @@ TEST_CASE("BuildStore.Metadata")
GcManager Gc;
BuildStore Store(Config, Gc);
- std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, nullptr);
+ std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, &WorkerPool);
CHECK(MetadataPayloads.size() == BlobMetaPayloads.size());
for (size_t I = 0; I < MetadataPayloads.size(); I++)
{
diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp
index 91bd9cba8..d80da6ea6 100644
--- a/src/zenstore/cache/cachedisklayer.cpp
+++ b/src/zenstore/cache/cachedisklayer.cpp
@@ -14,6 +14,7 @@
#include <zencore/trace.h>
#include <zencore/workthreadpool.h>
#include <zencore/xxhash.h>
+#include <zenutil/parallelwork.h>
#include <zenutil/referencemetadata.h>
#include <zenutil/workerpools.h>
@@ -3936,14 +3937,13 @@ ZenCacheDiskLayer::DiscoverBuckets()
RwLock SyncLock;
WorkerThreadPool& Pool = GetLargeWorkerPool(EWorkloadType::Burst);
- Latch WorkLatch(1);
+ std::atomic<bool> AbortFlag;
+ ParallelWork Work(AbortFlag);
for (auto& BucketPath : FoundBucketDirectories)
{
- WorkLatch.AddCount(1);
- Pool.ScheduleWork([this, &WorkLatch, &SyncLock, BucketPath]() {
+ Work.ScheduleWork(Pool, [this, &SyncLock, BucketPath](std::atomic<bool>&) {
ZEN_MEMSCOPE(GetCacheDiskTag());
- auto _ = MakeGuard([&]() { WorkLatch.CountDown(); });
const std::string BucketName = PathToUtf8(BucketPath.stem());
try
{
@@ -3984,8 +3984,7 @@ ZenCacheDiskLayer::DiscoverBuckets()
}
});
}
- WorkLatch.CountDown();
- WorkLatch.Wait();
+ Work.Wait();
}
bool
@@ -4062,16 +4061,15 @@ ZenCacheDiskLayer::Flush()
}
{
WorkerThreadPool& Pool = GetMediumWorkerPool(EWorkloadType::Burst);
- Latch WorkLatch(1);
+ std::atomic<bool> AbortFlag;
+ ParallelWork Work(AbortFlag);
try
{
for (auto& Bucket : Buckets)
{
- WorkLatch.AddCount(1);
- Pool.ScheduleWork([&WorkLatch, Bucket]() {
+ Work.ScheduleWork(Pool, [Bucket](std::atomic<bool>&) {
ZEN_MEMSCOPE(GetCacheDiskTag());
- auto _ = MakeGuard([&]() { WorkLatch.CountDown(); });
try
{
Bucket->Flush();
@@ -4087,11 +4085,8 @@ ZenCacheDiskLayer::Flush()
{
ZEN_ERROR("Failed to flush buckets at '{}'. Reason: '{}'", m_RootDir, Ex.what());
}
- WorkLatch.CountDown();
- while (!WorkLatch.Wait(1000))
- {
- ZEN_DEBUG("Waiting for {} buckets at '{}' to flush", WorkLatch.Remaining(), m_RootDir);
- }
+ Work.Wait(1000,
+ [&](std::ptrdiff_t Remaining, bool) { ZEN_DEBUG("Waiting for {} buckets at '{}' to flush", Remaining, m_RootDir); });
}
}
diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp
index 8cf241e34..15bea272b 100644
--- a/src/zenstore/compactcas.cpp
+++ b/src/zenstore/compactcas.cpp
@@ -15,6 +15,7 @@
#include <zencore/trace.h>
#include <zencore/workthreadpool.h>
#include <zenstore/scrubcontext.h>
+#include <zenutil/parallelwork.h>
#include <gsl/gsl-lite.hpp>
@@ -393,64 +394,75 @@ CasContainerStrategy::IterateChunks(std::span<const IoHash> ChunkHas
LargeSizeLimit);
};
- Latch WorkLatch(1);
- std::atomic_bool AsyncContinue = true;
- bool Continue = m_BlockStore.IterateChunks(
- FoundChunkLocations,
- [this,
- &AsyncContinue,
- &WorkLatch,
- &AsyncCallback,
- LargeSizeLimit,
- DoOneBlock,
- &FoundChunkIndexes,
- &FoundChunkLocations,
- OptionalWorkerPool](uint32_t BlockIndex, std::span<const size_t> ChunkIndexes) {
- if (OptionalWorkerPool && (ChunkIndexes.size() > 3))
- {
- std::vector<size_t> TmpChunkIndexes(ChunkIndexes.begin(), ChunkIndexes.end());
- WorkLatch.AddCount(1);
- OptionalWorkerPool->ScheduleWork([this,
- &AsyncContinue,
- &WorkLatch,
- &AsyncCallback,
- LargeSizeLimit,
- DoOneBlock,
- BlockIndex,
- &FoundChunkIndexes,
- &FoundChunkLocations,
- ChunkIndexes = std::move(TmpChunkIndexes)]() {
- auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); });
- if (!AsyncContinue)
- {
- return;
- }
- try
- {
- bool Continue = DoOneBlock(AsyncCallback, LargeSizeLimit, FoundChunkIndexes, FoundChunkLocations, ChunkIndexes);
- if (!Continue)
- {
- AsyncContinue.store(false);
- }
- }
- catch (const std::exception& Ex)
- {
- ZEN_WARN("Failed iterating chunks for cas root path {}, block {}. Reason: '{}'",
- m_RootDirectory,
- BlockIndex,
- Ex.what());
- }
- });
- return AsyncContinue.load();
- }
- else
- {
- return DoOneBlock(AsyncCallback, LargeSizeLimit, FoundChunkIndexes, FoundChunkLocations, ChunkIndexes);
- }
- });
- WorkLatch.CountDown();
- WorkLatch.Wait();
- return AsyncContinue.load() && Continue;
+ std::atomic<bool> AsyncContinue = true;
+ {
+ std::atomic<bool> AbortFlag;
+ ParallelWork Work(AbortFlag);
+ const bool Continue = m_BlockStore.IterateChunks(
+ FoundChunkLocations,
+ [this,
+ &Work,
+ &AsyncContinue,
+ &AsyncCallback,
+ LargeSizeLimit,
+ DoOneBlock,
+ &FoundChunkIndexes,
+ &FoundChunkLocations,
+ OptionalWorkerPool](uint32_t BlockIndex, std::span<const size_t> ChunkIndexes) {
+ if (OptionalWorkerPool && (ChunkIndexes.size() > 3))
+ {
+ std::vector<size_t> TmpChunkIndexes(ChunkIndexes.begin(), ChunkIndexes.end());
+ Work.ScheduleWork(
+ *OptionalWorkerPool,
+ [this,
+ &AsyncContinue,
+ &AsyncCallback,
+ LargeSizeLimit,
+ DoOneBlock,
+ BlockIndex,
+ &FoundChunkIndexes,
+ &FoundChunkLocations,
+ ChunkIndexes = std::move(TmpChunkIndexes)](std::atomic<bool>& AbortFlag) {
+ if (AbortFlag)
+ {
+ AsyncContinue.store(false);
+ }
+ if (!AsyncContinue)
+ {
+ return;
+ }
+ try
+ {
+ bool Continue =
+ DoOneBlock(AsyncCallback, LargeSizeLimit, FoundChunkIndexes, FoundChunkLocations, ChunkIndexes);
+ if (!Continue)
+ {
+ AsyncContinue.store(false);
+ }
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_WARN("Failed iterating chunks for cas root path {}, block {}. Reason: '{}'",
+ m_RootDirectory,
+ BlockIndex,
+ Ex.what());
+ AsyncContinue.store(false);
+ }
+ });
+ return AsyncContinue.load();
+ }
+ else
+ {
+ return DoOneBlock(AsyncCallback, LargeSizeLimit, FoundChunkIndexes, FoundChunkLocations, ChunkIndexes);
+ }
+ });
+ if (!Continue)
+ {
+ AsyncContinue.store(false);
+ }
+ Work.Wait();
+ }
+ return AsyncContinue.load();
}
void
diff --git a/src/zenstore/filecas.cpp b/src/zenstore/filecas.cpp
index 4911bffb9..6354edf70 100644
--- a/src/zenstore/filecas.cpp
+++ b/src/zenstore/filecas.cpp
@@ -20,6 +20,7 @@
#include <zencore/workthreadpool.h>
#include <zenstore/gc.h>
#include <zenstore/scrubcontext.h>
+#include <zenutil/parallelwork.h>
#if ZEN_WITH_TESTS
# include <zencore/compactbinarybuilder.h>
@@ -632,10 +633,11 @@ FileCasStrategy::IterateChunks(std::span<IoHash> ChunkHashes,
}
}
}
- std::atomic_bool Continue = true;
+ std::atomic<bool> AsyncContinue = true;
if (!FoundChunkIndexes.empty())
{
- auto ProcessOne = [this, &ChunkHashes, &Continue, &AsyncCallback](size_t ChunkIndex, uint64_t ExpectedSize) {
+ auto ProcessOne = [this, &ChunkHashes, &AsyncCallback](size_t ChunkIndex, uint64_t ExpectedSize) {
+ ZEN_ASSERT(ChunkIndex < ChunkHashes.size());
const IoHash& ChunkHash = ChunkHashes[ChunkIndex];
IoBuffer Payload = SafeOpenChunk(ChunkHash, ExpectedSize);
if (!AsyncCallback(ChunkIndex, std::move(Payload)))
@@ -645,49 +647,57 @@ FileCasStrategy::IterateChunks(std::span<IoHash> ChunkHashes,
return true;
};
- Latch WorkLatch(1);
+ std::atomic<bool> AbortFlag;
+ ParallelWork Work(AbortFlag);
for (size_t Index = 0; Index < FoundChunkIndexes.size(); Index++)
{
- size_t ChunkIndex = FoundChunkIndexes[Index];
- uint64_t ExpectedSize = FoundChunkExpectedSizes[Index];
- if (!Continue)
+ if (!AsyncContinue)
{
break;
}
+ size_t ChunkIndex = FoundChunkIndexes[Index];
+ uint64_t ExpectedSize = FoundChunkExpectedSizes[Index];
if (OptionalWorkerPool)
{
- WorkLatch.AddCount(1);
- OptionalWorkerPool->ScheduleWork([this, &WorkLatch, &ProcessOne, &ChunkHashes, ChunkIndex, ExpectedSize, &Continue]() {
- auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); });
- if (!Continue)
- {
- return;
- }
- try
- {
- if (!ProcessOne(ChunkIndex, ExpectedSize))
+ Work.ScheduleWork(
+ *OptionalWorkerPool,
+ [this, &ProcessOne, &ChunkHashes, ChunkIndex, ExpectedSize, &AsyncContinue](std::atomic<bool>& AbortFlag) {
+ if (AbortFlag)
{
- Continue = false;
+ AsyncContinue.store(false);
}
- }
- catch (const std::exception& Ex)
- {
- ZEN_WARN("Failed iterating chunks for cas root path {}, chunk {}. Reason: '{}'",
- m_RootDirectory,
- ChunkHashes[ChunkIndex],
- Ex.what());
- }
- });
+ if (!AsyncContinue)
+ {
+ return;
+ }
+ try
+ {
+ if (!ProcessOne(ChunkIndex, ExpectedSize))
+ {
+ AsyncContinue.store(false);
+ }
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_WARN("Failed iterating chunks for cas root path {}, chunk {}. Reason: '{}'",
+ m_RootDirectory,
+ ChunkHashes[ChunkIndex],
+ Ex.what());
+ AsyncContinue.store(false);
+ }
+ });
}
else
{
- Continue = Continue && ProcessOne(ChunkIndex, ExpectedSize);
+ if (!ProcessOne(ChunkIndex, ExpectedSize))
+ {
+ AsyncContinue.store(false);
+ }
}
}
- WorkLatch.CountDown();
- WorkLatch.Wait();
+ Work.Wait();
}
- return Continue;
+ return AsyncContinue.load();
}
void
diff --git a/src/zenutil/parallelwork.cpp b/src/zenutil/parallelwork.cpp
index 516d70e28..91591375a 100644
--- a/src/zenutil/parallelwork.cpp
+++ b/src/zenutil/parallelwork.cpp
@@ -30,7 +30,6 @@ ParallelWork::~ParallelWork()
"to complete");
m_PendingWork.CountDown();
}
- m_AbortFlag.store(true);
m_PendingWork.Wait();
ZEN_ASSERT(m_PendingWork.Remaining() == 0);
}