diff options
| author | Dan Engelbrecht <[email protected]> | 2025-05-19 22:25:58 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-05-19 22:25:58 +0200 |
| commit | 49701314f570da3622f11eb37cc889c7d39d9a93 (patch) | |
| tree | 6159bfc2ba7974a453ded7a58813134e523e9a62 /src/zen/cmds/builds_cmd.cpp | |
| parent | parallel work handle dispatch exception (#400) (diff) | |
| download | archived-zen-49701314f570da3622f11eb37cc889c7d39d9a93.tar.xz archived-zen-49701314f570da3622f11eb37cc889c7d39d9a93.zip | |
handle exception with batch work (#401)
* use ParallelWork in rpc playback
* use ParallelWork in projectstore
* use ParallelWork in buildstore
* use ParallelWork in cachedisklayer
* use ParallelWork in compactcas
* use ParallelWork in filecas
* don't set abort flag in ParallelWork destructor
* add PrepareFileForScatteredWrite for temp files in httpclient
* Use PrepareFileForScatteredWrite when stream-decompressing files
* be more relaxed when deleting temp files
* allow explicit zen-cache when using direct host url without resolving
* fix lambda capture when writing loose chunks
* no delay when attempting to remove temp files
Diffstat (limited to 'src/zen/cmds/builds_cmd.cpp')
| -rw-r--r-- | src/zen/cmds/builds_cmd.cpp | 542 |
1 files changed, 286 insertions, 256 deletions
diff --git a/src/zen/cmds/builds_cmd.cpp b/src/zen/cmds/builds_cmd.cpp index 815bb7597..78d362fdb 100644 --- a/src/zen/cmds/builds_cmd.cpp +++ b/src/zen/cmds/builds_cmd.cpp @@ -244,6 +244,24 @@ namespace { } } + void TryRemoveFile(const std::filesystem::path& Path) + { + std::error_code Ec; + RemoveFile(Path, Ec); + if (Ec) + { + if (IsFile(Path, Ec)) + { + Ec.clear(); + RemoveFile(Path, Ec); + if (Ec) + { + ZEN_DEBUG("Failed removing file '{}', reason: {}", Path, Ec.message()); + } + } + } + } + void RemoveFileWithRetry(const std::filesystem::path& Path) { std::error_code Ec; @@ -4864,6 +4882,7 @@ namespace { { throw std::runtime_error(fmt::format("RawHash in header {} in large blob {} does match.", RawHash, SequenceRawHash)); } + PrepareFileForScatteredWrite(DecompressedTemp.Handle(), RawSize); IoHashStream Hash; bool CouldDecompress = Compressed.DecompressToStream( @@ -5055,7 +5074,10 @@ namespace { FilteredWrittenBytesPerSecond.Stop(); } - RemoveFileWithRetry(CompressedChunkPath); + if (!CompressedChunkPath.empty()) + { + TryRemoveFile(CompressedChunkPath); + } std::vector<uint32_t> CompletedSequences = CompleteChunkTargets(ChunkTargetPtrs, SequenceIndexChunksLeftToWriteCounters); @@ -5304,7 +5326,7 @@ namespace { } } } - RemoveFileWithRetry(CacheDirContent.Files[Index]); + TryRemoveFile(CacheDirContent.Files[Index]); } CacheMappingStats.CacheScanElapsedWallTimeUs += CacheTimer.GetElapsedTimeUs(); } @@ -5353,7 +5375,7 @@ namespace { } } } - RemoveFileWithRetry(BlockDirContent.Files[Index]); + TryRemoveFile(BlockDirContent.Files[Index]); } CacheMappingStats.CacheScanElapsedWallTimeUs += CacheTimer.GetElapsedTimeUs(); @@ -6227,279 +6249,284 @@ namespace { continue; } - Work.ScheduleWork(WritePool, [&, RemoteChunkIndex, ChunkTargetPtrs](std::atomic<bool>&) mutable { - if (!AbortFlag) - { - ZEN_TRACE_CPU("UpdateFolder_ReadPreDownloaded"); - std::filesystem::path ExistingCompressedChunkPath; - if (!PrimeCacheOnly) + Work.ScheduleWork( + WritePool, + [&, RemoteChunkIndex, ChunkTargetPtrs, BuildId, TotalRequestCount, TotalPartWriteCount](std::atomic<bool>&) mutable { + if (!AbortFlag) { - const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; - std::filesystem::path CompressedChunkPath = ZenTempDownloadFolderPath(ZenFolderPath) / ChunkHash.ToHexString(); - if (IsFile(CompressedChunkPath)) + ZEN_TRACE_CPU("UpdateFolder_ReadPreDownloaded"); + std::filesystem::path ExistingCompressedChunkPath; + if (!PrimeCacheOnly) { - IoBuffer ExistingCompressedPart = IoBufferBuilder::MakeFromFile(ExistingCompressedChunkPath); - if (ExistingCompressedPart) + const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; + std::filesystem::path CompressedChunkPath = + ZenTempDownloadFolderPath(ZenFolderPath) / ChunkHash.ToHexString(); + if (IsFile(CompressedChunkPath)) { - IoHash RawHash; - uint64_t RawSize; - if (CompressedBuffer::ValidateCompressedHeader(ExistingCompressedPart, RawHash, RawSize)) + IoBuffer ExistingCompressedPart = IoBufferBuilder::MakeFromFile(ExistingCompressedChunkPath); + if (ExistingCompressedPart) { - DownloadStats.RequestsCompleteCount++; - if (DownloadStats.RequestsCompleteCount == TotalRequestCount) + IoHash RawHash; + uint64_t RawSize; + if (CompressedBuffer::ValidateCompressedHeader(ExistingCompressedPart, RawHash, RawSize)) { - FilteredDownloadedBytesPerSecond.Stop(); + DownloadStats.RequestsCompleteCount++; + if (DownloadStats.RequestsCompleteCount == TotalRequestCount) + { + FilteredDownloadedBytesPerSecond.Stop(); + } + ExistingCompressedChunkPath = std::move(CompressedChunkPath); + } + else + { + std::error_code DummyEc; + RemoveFile(CompressedChunkPath, DummyEc); } - ExistingCompressedChunkPath = std::move(CompressedChunkPath); - } - else - { - std::error_code DummyEc; - RemoveFile(CompressedChunkPath, DummyEc); } } } - } - if (!AbortFlag) + if (!AbortFlag) - { - if (!ExistingCompressedChunkPath.empty()) { - Work.ScheduleWork( - WritePool, - [&Path, - &ZenFolderPath, - &RemoteContent, - &RemoteLookup, - &CacheFolderPath, - &SequenceIndexChunksLeftToWriteCounters, - &Work, - &WritePool, - &DiskStats, - &WriteChunkStats, - &WritePartsComplete, - TotalPartWriteCount, - &FilteredWrittenBytesPerSecond, - RemoteChunkIndex, - ChunkTargetPtrs, - CompressedChunkPath = std::move(ExistingCompressedChunkPath)](std::atomic<bool>&) mutable { - if (!AbortFlag) - { - ZEN_TRACE_CPU("UpdateFolder_WritePreDownloaded"); - - FilteredWrittenBytesPerSecond.Start(); - - const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; - - IoBuffer CompressedPart = IoBufferBuilder::MakeFromFile(CompressedChunkPath); - if (!CompressedPart) + if (!ExistingCompressedChunkPath.empty()) + { + Work.ScheduleWork( + WritePool, + [&Path, + &ZenFolderPath, + &RemoteContent, + &RemoteLookup, + &CacheFolderPath, + &SequenceIndexChunksLeftToWriteCounters, + &Work, + &WritePool, + &DiskStats, + &WriteChunkStats, + &WritePartsComplete, + TotalPartWriteCount, + &FilteredWrittenBytesPerSecond, + RemoteChunkIndex, + ChunkTargetPtrs, + CompressedChunkPath = std::move(ExistingCompressedChunkPath)](std::atomic<bool>&) mutable { + if (!AbortFlag) { - throw std::runtime_error(fmt::format("Could not open dowloaded compressed chunk {} from {}", - ChunkHash, - CompressedChunkPath)); - } + ZEN_TRACE_CPU("UpdateFolder_WritePreDownloaded"); - std::filesystem::path TargetFolder = ZenTempCacheFolderPath(ZenFolderPath); - bool NeedHashVerify = WriteCompressedChunk(TargetFolder, - RemoteContent, - RemoteLookup, - ChunkHash, - ChunkTargetPtrs, - std::move(CompressedPart), - DiskStats); - WritePartsComplete++; + FilteredWrittenBytesPerSecond.Start(); - if (!AbortFlag) - { - if (WritePartsComplete == TotalPartWriteCount) + const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; + + IoBuffer CompressedPart = IoBufferBuilder::MakeFromFile(CompressedChunkPath); + if (!CompressedPart) { - FilteredWrittenBytesPerSecond.Stop(); + throw std::runtime_error( + fmt::format("Could not open dowloaded compressed chunk {} from {}", + ChunkHash, + CompressedChunkPath)); } - RemoveFileWithRetry(CompressedChunkPath); + std::filesystem::path TargetFolder = ZenTempCacheFolderPath(ZenFolderPath); + bool NeedHashVerify = WriteCompressedChunk(TargetFolder, + RemoteContent, + RemoteLookup, + ChunkHash, + ChunkTargetPtrs, + std::move(CompressedPart), + DiskStats); + WritePartsComplete++; - std::vector<uint32_t> CompletedSequences = - CompleteChunkTargets(ChunkTargetPtrs, SequenceIndexChunksLeftToWriteCounters); - if (NeedHashVerify) - { - VerifyAndCompleteChunkSequencesAsync(TargetFolder, - RemoteContent, - RemoteLookup, - CompletedSequences, - Work, - WritePool); - } - else + if (!AbortFlag) { - FinalizeChunkSequences(TargetFolder, RemoteContent, CompletedSequences); + if (WritePartsComplete == TotalPartWriteCount) + { + FilteredWrittenBytesPerSecond.Stop(); + } + + TryRemoveFile(CompressedChunkPath); + + std::vector<uint32_t> CompletedSequences = + CompleteChunkTargets(ChunkTargetPtrs, SequenceIndexChunksLeftToWriteCounters); + if (NeedHashVerify) + { + VerifyAndCompleteChunkSequencesAsync(TargetFolder, + RemoteContent, + RemoteLookup, + CompletedSequences, + Work, + WritePool); + } + else + { + FinalizeChunkSequences(TargetFolder, RemoteContent, CompletedSequences); + } } } - } - }); - } - else - { - Work.ScheduleWork( - NetworkPool, - [&Path, - &ZenFolderPath, - &Storage, - BuildId, - &PrimeCacheOnly, - &RemoteContent, - &RemoteLookup, - &ExistsResult, - &SequenceIndexChunksLeftToWriteCounters, - &Work, - &WritePool, - &NetworkPool, - &DiskStats, - &WriteChunkStats, - &WritePartsComplete, - TotalPartWriteCount, - TotalRequestCount, - &FilteredDownloadedBytesPerSecond, - &FilteredWrittenBytesPerSecond, - LargeAttachmentSize, - PreferredMultipartChunkSize, - RemoteChunkIndex, - ChunkTargetPtrs, - &DownloadStats](std::atomic<bool>&) mutable { - if (!AbortFlag) - { - const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; - FilteredDownloadedBytesPerSecond.Start(); - IoBuffer BuildBlob; - const bool ExistsInCache = - Storage.BuildCacheStorage && ExistsResult.ExistingBlobs.contains(ChunkHash); - if (ExistsInCache) - { - BuildBlob = Storage.BuildCacheStorage->GetBuildBlob(BuildId, ChunkHash); - } - if (BuildBlob) + }); + } + else + { + Work.ScheduleWork( + NetworkPool, + [&Path, + &ZenFolderPath, + &Storage, + BuildId, + &PrimeCacheOnly, + &RemoteContent, + &RemoteLookup, + &ExistsResult, + &SequenceIndexChunksLeftToWriteCounters, + &Work, + &WritePool, + &NetworkPool, + &DiskStats, + &WriteChunkStats, + &WritePartsComplete, + TotalPartWriteCount, + TotalRequestCount, + &FilteredDownloadedBytesPerSecond, + &FilteredWrittenBytesPerSecond, + LargeAttachmentSize, + PreferredMultipartChunkSize, + RemoteChunkIndex, + ChunkTargetPtrs, + &DownloadStats](std::atomic<bool>&) mutable { + if (!AbortFlag) { - uint64_t BlobSize = BuildBlob.GetSize(); - DownloadStats.DownloadedChunkCount++; - DownloadStats.DownloadedChunkByteCount += BlobSize; - DownloadStats.RequestsCompleteCount++; - if (DownloadStats.RequestsCompleteCount == TotalRequestCount) + const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; + FilteredDownloadedBytesPerSecond.Start(); + IoBuffer BuildBlob; + const bool ExistsInCache = + Storage.BuildCacheStorage && ExistsResult.ExistingBlobs.contains(ChunkHash); + if (ExistsInCache) { - FilteredDownloadedBytesPerSecond.Stop(); + BuildBlob = Storage.BuildCacheStorage->GetBuildBlob(BuildId, ChunkHash); } - AsyncWriteDownloadedChunk(ZenFolderPath, - RemoteContent, - RemoteLookup, - RemoteChunkIndex, - std::move(ChunkTargetPtrs), - Work, - WritePool, - std::move(BuildBlob), - SequenceIndexChunksLeftToWriteCounters, - WritePartsComplete, - TotalPartWriteCount, - FilteredWrittenBytesPerSecond, - DiskStats); - } - else - { - if (RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex] >= LargeAttachmentSize) + if (BuildBlob) { - ZEN_TRACE_CPU("UpdateFolder_GetLargeChunk"); - DownloadLargeBlob( - *Storage.BuildStorage, - ZenTempDownloadFolderPath(ZenFolderPath), - BuildId, - ChunkHash, - PreferredMultipartChunkSize, - Work, - NetworkPool, - DownloadStats, - [&, TotalPartWriteCount, TotalRequestCount, RemoteChunkIndex, ChunkTargetPtrs]( - IoBuffer&& Payload) mutable { - DownloadStats.RequestsCompleteCount++; - if (DownloadStats.RequestsCompleteCount == TotalRequestCount) - { - FilteredDownloadedBytesPerSecond.Stop(); - } - if (Payload && Storage.BuildCacheStorage) - { - Storage.BuildCacheStorage->PutBuildBlob( - BuildId, - ChunkHash, - ZenContentType::kCompressedBinary, - CompositeBuffer(SharedBuffer(Payload))); - } - if (!PrimeCacheOnly) - { - if (!AbortFlag) - { - AsyncWriteDownloadedChunk(ZenFolderPath, - RemoteContent, - RemoteLookup, - RemoteChunkIndex, - std::move(ChunkTargetPtrs), - Work, - WritePool, - std::move(Payload), - SequenceIndexChunksLeftToWriteCounters, - WritePartsComplete, - TotalPartWriteCount, - FilteredWrittenBytesPerSecond, - DiskStats); - } - } - }); + uint64_t BlobSize = BuildBlob.GetSize(); + DownloadStats.DownloadedChunkCount++; + DownloadStats.DownloadedChunkByteCount += BlobSize; + DownloadStats.RequestsCompleteCount++; + if (DownloadStats.RequestsCompleteCount == TotalRequestCount) + { + FilteredDownloadedBytesPerSecond.Stop(); + } + AsyncWriteDownloadedChunk(ZenFolderPath, + RemoteContent, + RemoteLookup, + RemoteChunkIndex, + std::move(ChunkTargetPtrs), + Work, + WritePool, + std::move(BuildBlob), + SequenceIndexChunksLeftToWriteCounters, + WritePartsComplete, + TotalPartWriteCount, + FilteredWrittenBytesPerSecond, + DiskStats); } else { - ZEN_TRACE_CPU("UpdateFolder_GetChunk"); - BuildBlob = Storage.BuildStorage->GetBuildBlob(BuildId, ChunkHash); - if (BuildBlob && Storage.BuildCacheStorage) + if (RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex] >= LargeAttachmentSize) { - Storage.BuildCacheStorage->PutBuildBlob(BuildId, - ChunkHash, - BuildBlob.GetContentType(), - CompositeBuffer(SharedBuffer(BuildBlob))); - } - if (!BuildBlob) - { - throw std::runtime_error(fmt::format("Chunk {} is missing", ChunkHash)); + ZEN_TRACE_CPU("UpdateFolder_GetLargeChunk"); + DownloadLargeBlob( + *Storage.BuildStorage, + ZenTempDownloadFolderPath(ZenFolderPath), + BuildId, + ChunkHash, + PreferredMultipartChunkSize, + Work, + NetworkPool, + DownloadStats, + [&, TotalPartWriteCount, TotalRequestCount, RemoteChunkIndex, ChunkTargetPtrs]( + IoBuffer&& Payload) mutable { + DownloadStats.RequestsCompleteCount++; + if (DownloadStats.RequestsCompleteCount == TotalRequestCount) + { + FilteredDownloadedBytesPerSecond.Stop(); + } + if (Payload && Storage.BuildCacheStorage) + { + Storage.BuildCacheStorage->PutBuildBlob( + BuildId, + ChunkHash, + ZenContentType::kCompressedBinary, + CompositeBuffer(SharedBuffer(Payload))); + } + if (!PrimeCacheOnly) + { + if (!AbortFlag) + { + AsyncWriteDownloadedChunk(ZenFolderPath, + RemoteContent, + RemoteLookup, + RemoteChunkIndex, + std::move(ChunkTargetPtrs), + Work, + WritePool, + std::move(Payload), + SequenceIndexChunksLeftToWriteCounters, + WritePartsComplete, + TotalPartWriteCount, + FilteredWrittenBytesPerSecond, + DiskStats); + } + } + }); } - if (!PrimeCacheOnly) + else { - if (!AbortFlag) + ZEN_TRACE_CPU("UpdateFolder_GetChunk"); + BuildBlob = Storage.BuildStorage->GetBuildBlob(BuildId, ChunkHash); + if (BuildBlob && Storage.BuildCacheStorage) { - uint64_t BlobSize = BuildBlob.GetSize(); - DownloadStats.DownloadedChunkCount++; - DownloadStats.DownloadedChunkByteCount += BlobSize; - DownloadStats.RequestsCompleteCount++; - if (DownloadStats.RequestsCompleteCount == TotalRequestCount) + Storage.BuildCacheStorage->PutBuildBlob( + BuildId, + ChunkHash, + BuildBlob.GetContentType(), + CompositeBuffer(SharedBuffer(BuildBlob))); + } + if (!BuildBlob) + { + throw std::runtime_error(fmt::format("Chunk {} is missing", ChunkHash)); + } + if (!PrimeCacheOnly) + { + if (!AbortFlag) { - FilteredDownloadedBytesPerSecond.Stop(); + uint64_t BlobSize = BuildBlob.GetSize(); + DownloadStats.DownloadedChunkCount++; + DownloadStats.DownloadedChunkByteCount += BlobSize; + DownloadStats.RequestsCompleteCount++; + if (DownloadStats.RequestsCompleteCount == TotalRequestCount) + { + FilteredDownloadedBytesPerSecond.Stop(); + } + AsyncWriteDownloadedChunk(ZenFolderPath, + RemoteContent, + RemoteLookup, + RemoteChunkIndex, + std::move(ChunkTargetPtrs), + Work, + WritePool, + std::move(BuildBlob), + SequenceIndexChunksLeftToWriteCounters, + WritePartsComplete, + TotalPartWriteCount, + FilteredWrittenBytesPerSecond, + DiskStats); } - AsyncWriteDownloadedChunk(ZenFolderPath, - RemoteContent, - RemoteLookup, - RemoteChunkIndex, - std::move(ChunkTargetPtrs), - Work, - WritePool, - std::move(BuildBlob), - SequenceIndexChunksLeftToWriteCounters, - WritePartsComplete, - TotalPartWriteCount, - FilteredWrittenBytesPerSecond, - DiskStats); } } } } - } - }); + }); + } } } - } - }); + }); } for (size_t CopyDataIndex = 0; CopyDataIndex < CacheCopyDatas.size(); CopyDataIndex++) @@ -6728,8 +6755,11 @@ namespace { RemoveFile(BlockChunkPath, DummyEc); throw std::runtime_error(fmt::format("Block {} is malformed", BlockDescription.BlockHash)); } + + TryRemoveFile(BlockChunkPath); + WritePartsComplete++; - RemoveFileWithRetry(BlockChunkPath); + if (WritePartsComplete == TotalPartWriteCount) { FilteredWrittenBytesPerSecond.Stop(); @@ -6884,7 +6914,7 @@ namespace { if (!BlockChunkPath.empty()) { - RemoveFileWithRetry(BlockChunkPath); + TryRemoveFile(BlockChunkPath); } WritePartsComplete++; @@ -7059,7 +7089,7 @@ namespace { if (!BlockChunkPath.empty()) { - RemoveFileWithRetry(BlockChunkPath); + TryRemoveFile(BlockChunkPath); } WritePartsComplete++; @@ -9456,6 +9486,22 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) std::string CloudHost; + auto TestCacheEndpoint = [](std::string_view BaseUrl, const bool AssumeHttp2) -> std::pair<bool, std::string> { + HttpClientSettings TestClientSettings{.LogCategory = "httpcacheclient", + .ConnectTimeout = std::chrono::milliseconds{1000}, + .Timeout = std::chrono::milliseconds{2000}, + .AssumeHttp2 = AssumeHttp2, + .AllowResume = true, + .RetryCount = 0}; + HttpClient TestHttpClient(BaseUrl, TestClientSettings); + HttpClient::Response TestResponse = TestHttpClient.Get("/status/builds"); + if (TestResponse.IsSuccess()) + { + return {true, ""}; + } + return {false, TestResponse.ErrorMessage("")}; + }; + if (!m_Host.empty()) { if (m_OverrideHost.empty() || m_ZenCacheHost.empty()) @@ -9535,22 +9581,6 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) throw std::runtime_error(fmt::format("Host {} could not be reached. Reason: {}", m_OverrideHost, TestResult.second)); } - auto TestCacheEndpoint = [](std::string_view BaseUrl, const bool AssumeHttp2) -> std::pair<bool, std::string> { - HttpClientSettings TestClientSettings{.LogCategory = "httpcacheclient", - .ConnectTimeout = std::chrono::milliseconds{1000}, - .Timeout = std::chrono::milliseconds{2000}, - .AssumeHttp2 = AssumeHttp2, - .AllowResume = true, - .RetryCount = 0}; - HttpClient TestHttpClient(BaseUrl, TestClientSettings); - HttpClient::Response TestResponse = TestHttpClient.Get("/status/builds"); - if (TestResponse.IsSuccess()) - { - return {true, ""}; - } - return {false, TestResponse.ErrorMessage("")}; - }; - if (m_ZenCacheHost.empty()) { CbArrayView CacheEndpointsArray = ResponseObjectView["cacheEndpoints"sv].AsArrayView(); @@ -9680,7 +9710,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) { CacheDescription += fmt::format(" Bucket '{}'", m_Bucket); } - Result.CacheName = BuildCacheName; + Result.CacheName = BuildCacheName.empty() ? m_ZenCacheHost : BuildCacheName; } ZEN_CONSOLE("Remote: {}", StorageDescription); if (!Result.CacheName.empty()) |