diff options
| author | Dan Engelbrecht <[email protected]> | 2025-03-04 10:22:53 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-03-04 10:22:53 +0100 |
| commit | 7cb1de5a966aa23e0c098d2bbf9009d0f82a6477 (patch) | |
| tree | 5c340a3947e27b2710968442cee7f009bc6ab439 /src/zen/cmds/builds_cmd.cpp | |
| parent | limit and validate responses before logging the text (#292) (diff) | |
| download | archived-zen-7cb1de5a966aa23e0c098d2bbf9009d0f82a6477.tar.xz archived-zen-7cb1de5a966aa23e0c098d2bbf9009d0f82a6477.zip | |
stream decompress (#293)
* clean up latency parameters and slow down rate updates
* add DecompressToStream
Diffstat (limited to 'src/zen/cmds/builds_cmd.cpp')
| -rw-r--r-- | src/zen/cmds/builds_cmd.cpp | 198 |
1 files changed, 141 insertions, 57 deletions
diff --git a/src/zen/cmds/builds_cmd.cpp b/src/zen/cmds/builds_cmd.cpp index 219d01240..a9852151f 100644 --- a/src/zen/cmds/builds_cmd.cpp +++ b/src/zen/cmds/builds_cmd.cpp @@ -78,15 +78,19 @@ namespace { const ChunksBlockParameters DefaultChunksBlockParams{.MaxBlockSize = 32u * 1024u * 1024u, .MaxChunkEmbedSize = DefaultChunkedParams.MaxSize}; - const std::string ZenFolderName = ".zen"; - const std::string ZenStateFilePath = fmt::format("{}/current_state.cbo", ZenFolderName); - const std::string ZenStateFileJsonPath = fmt::format("{}/current_state.json", ZenFolderName); - const std::string ZenTempFolderName = fmt::format("{}/tmp", ZenFolderName); - const std::string ZenTempCacheFolderName = fmt::format("{}/cache", ZenTempFolderName); - const std::string ZenTempStorageFolderName = fmt::format("{}/storage", ZenTempFolderName); - const std::string ZenTempBlockFolderName = fmt::format("{}/blocks", ZenTempFolderName); - const std::string ZenTempChunkFolderName = fmt::format("{}/chunks", ZenTempFolderName); - const std::string ZenExcludeManifestName = ".zen_exclude_manifest.txt"; + + const double DefaultLatency = 0; // .0010; + const double DefaultDelayPerKBSec = 0; // 0.00005; + + const std::string ZenFolderName = ".zen"; + const std::string ZenStateFilePath = fmt::format("{}/current_state.cbo", ZenFolderName); + const std::string ZenStateFileJsonPath = fmt::format("{}/current_state.json", ZenFolderName); + const std::string ZenTempFolderName = fmt::format("{}/tmp", ZenFolderName); + const std::string ZenTempCacheFolderName = fmt::format("{}/cache", ZenTempFolderName); + const std::string ZenTempStorageFolderName = fmt::format("{}/storage", ZenTempFolderName); + const std::string ZenTempBlockFolderName = fmt::format("{}/blocks", ZenTempFolderName); + const std::string ZenTempChunkFolderName = fmt::format("{}/chunks", ZenTempFolderName); + const std::string ZenExcludeManifestName = ".zen_exclude_manifest.txt"; const std::string UnsyncFolderName = ".unsync"; @@ -271,7 +275,7 @@ namespace { } uint64_t TimeUS = Timer.GetElapsedTimeUs(); uint64_t TimeDeltaUS = TimeUS - LastTimeUS; - if (TimeDeltaUS >= 1000000) + if (TimeDeltaUS >= 2000000) { uint64_t Delta = Count - LastCount; uint64_t PerSecond = (Delta * 1000000) / TimeDeltaUS; @@ -3464,6 +3468,61 @@ namespace { } } + bool CanStreamDecompress(const ChunkedFolderContent& RemoteContent, + const std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> Locations) + { + if (Locations.size() == 1) + { + const uint32_t FirstSequenceIndex = Locations[0]->SequenceIndex; + if (Locations[0]->Offset == 0 && RemoteContent.ChunkedContent.ChunkCounts[FirstSequenceIndex] == 1) + { + return true; + } + } + return false; + } + + void StreamDecompress(const std::filesystem::path& CacheFolderPath, + const IoHash& SequenceRawHash, + CompositeBuffer&& CompressedPart, + std::atomic<uint64_t>& WriteToDiskBytes) + { + const std::filesystem::path TempChunkSequenceFileName = GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash); + TemporaryFile DecompressedTemp; + std::error_code Ec; + DecompressedTemp.CreateTemporary(CacheFolderPath, Ec); + if (Ec) + { + throw std::runtime_error( + fmt::format("Failed creating temporary file for decompressing large blob {}. Reason: {}", SequenceRawHash, Ec.message())); + } + IoHash RawHash; + uint64_t RawSize; + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(CompressedPart, RawHash, RawSize); + if (!Compressed) + { + throw std::runtime_error(fmt::format("Failed to parse header of compressed large blob {}", SequenceRawHash)); + } + if (RawHash != SequenceRawHash) + { + throw std::runtime_error(fmt::format("RawHash in header {} in large blob {} does match.", RawHash, SequenceRawHash)); + } + bool CouldDecompress = Compressed.DecompressToStream(0, (uint64_t)-1, [&](uint64_t Offset, const CompositeBuffer& RangeBuffer) { + DecompressedTemp.Write(RangeBuffer, Offset); + WriteToDiskBytes += RangeBuffer.GetSize(); + }); + if (!CouldDecompress) + { + throw std::runtime_error(fmt::format("Failed to decompress large blob {}", SequenceRawHash)); + } + DecompressedTemp.MoveTemporaryIntoPlace(TempChunkSequenceFileName, Ec); + if (Ec) + { + throw std::runtime_error( + fmt::format("Failed moving temporary file for decompressing large blob {}. Reason: {}", SequenceRawHash, Ec.message())); + } + } + void DownloadLargeBlob(BuildStorage& Storage, const std::filesystem::path& TempFolderPath, const std::filesystem::path& CacheFolderPath, @@ -3527,7 +3586,7 @@ namespace { DownloadedChunks++; Work.ScheduleWork( - WritePool, + WritePool, // GetSyncWorkerPool(),// [&CacheFolderPath, &RemoteContent, &RemoteLookup, @@ -3555,33 +3614,44 @@ namespace { } CompressedPart.SetDeleteOnClose(true); - uint64_t TotalBytesWritten = 0; - auto ChunkHashToChunkIndexIt = RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash); ZEN_ASSERT(ChunkHashToChunkIndexIt != RemoteLookup.ChunkHashToChunkIndex.end()); - uint32_t ChunkIndex = ChunkHashToChunkIndexIt->second; - - SharedBuffer Chunk = Decompress(CompositeBuffer(std::move(CompressedPart)), - ChunkHash, - RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex]); + if (CanStreamDecompress(RemoteContent, ChunkTargetPtrs)) + { + const IoHash& SequenceRawHash = + RemoteContent.ChunkedContent.SequenceRawHashes[ChunkTargetPtrs.front()->SequenceIndex]; + StreamDecompress(CacheFolderPath, + SequenceRawHash, + CompositeBuffer(std::move(CompressedPart)), + WriteToDiskBytes); + } + else + { + const uint32_t ChunkIndex = ChunkHashToChunkIndexIt->second; + SharedBuffer Chunk = Decompress(CompositeBuffer(std::move(CompressedPart)), + ChunkHash, + RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex]); - // ZEN_ASSERT_SLOW(ChunkHash == - // IoHash::HashBuffer(Chunk.AsIoBuffer())); + // ZEN_ASSERT_SLOW(ChunkHash == + // IoHash::HashBuffer(Chunk.AsIoBuffer())); - if (!AbortFlag) - { - WriteFileCache OpenFileCache; - - WriteChunkToDisk(CacheFolderPath, - RemoteContent, - RemoteLookup, - ChunkTargetPtrs, - CompositeBuffer(Chunk), - OpenFileCache, - TotalBytesWritten); - ChunksComplete++; - WriteToDiskBytes += TotalBytesWritten; + if (!AbortFlag) + { + WriteFileCache OpenFileCache; + + uint64_t TotalBytesWritten = 0; + WriteChunkToDisk(CacheFolderPath, + RemoteContent, + RemoteLookup, + ChunkTargetPtrs, + CompositeBuffer(Chunk), + OpenFileCache, + TotalBytesWritten); + ChunksComplete++; + WriteToDiskBytes += TotalBytesWritten; + } } + if (!AbortFlag) { // Write tracking, updating this must be done without any files open (WriteFileCache) @@ -3619,7 +3689,7 @@ namespace { for (auto& WorkItem : WorkItems) { Work.ScheduleWork( - NetworkPool, + NetworkPool, // GetSyncWorkerPool(),// [WorkItem = std::move(WorkItem)](std::atomic<bool>&) { if (!AbortFlag) { @@ -3830,7 +3900,7 @@ namespace { else { Work.ScheduleWork( - NetworkPool, + NetworkPool, // GetSyncWorkerPool(),// [&, ChunkHash, RemoteChunkIndex, ChunkTargetPtrs](std::atomic<bool>&) { if (!AbortFlag) { @@ -3880,21 +3950,38 @@ namespace { if (!AbortFlag) { FilteredWrittenBytesPerSecond.Start(); - uint64_t TotalBytesWritten = 0; - SharedBuffer Chunk = - Decompress(CompressedPart, - ChunkHash, - RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex]); + if (CanStreamDecompress(RemoteContent, ChunkTargetPtrs)) { - WriteFileCache OpenFileCache; - WriteChunkToDisk(CacheFolderPath, - RemoteContent, - RemoteLookup, - ChunkTargetPtrs, - CompositeBuffer(Chunk), - OpenFileCache, - TotalBytesWritten); + const IoHash& SequenceRawHash = + RemoteContent.ChunkedContent + .SequenceRawHashes[ChunkTargetPtrs.front()->SequenceIndex]; + StreamDecompress(CacheFolderPath, + SequenceRawHash, + CompositeBuffer(CompressedPart), + WriteToDiskBytes); + ChunkCountWritten++; + } + else + { + uint64_t TotalBytesWritten = 0; + SharedBuffer Chunk = + Decompress(CompressedPart, + ChunkHash, + RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex]); + + { + WriteFileCache OpenFileCache; + WriteChunkToDisk(CacheFolderPath, + RemoteContent, + RemoteLookup, + ChunkTargetPtrs, + CompositeBuffer(Chunk), + OpenFileCache, + TotalBytesWritten); + ChunkCountWritten++; + WriteToDiskBytes += TotalBytesWritten; + } } if (!AbortFlag) { @@ -3925,9 +4012,6 @@ namespace { GetFinalChunkedSequenceFileName(CacheFolderPath, SequenceRawHash)); } } - - ChunkCountWritten++; - WriteToDiskBytes += TotalBytesWritten; } } }, @@ -5707,7 +5791,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) else if (!m_StoragePath.empty()) { ZEN_CONSOLE("Querying builds in folder '{}'.", m_StoragePath); - Storage = CreateFileBuildStorage(m_StoragePath, StorageStats, false); // , .0015, 0.00004 + Storage = CreateFileBuildStorage(m_StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec); } else { @@ -5819,7 +5903,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) m_StoragePath, GeneratedBuildId ? "Generated " : "", BuildId); - Storage = CreateFileBuildStorage(m_StoragePath, StorageStats, m_WriteMetadataAsJson); // , .0015, 0.00004 + Storage = CreateFileBuildStorage(m_StoragePath, StorageStats, m_WriteMetadataAsJson, DefaultLatency, DefaultDelayPerKBSec); StorageName = fmt::format("Disk {}", m_StoragePath.stem()); } else @@ -5952,7 +6036,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) else if (!m_StoragePath.empty()) { ZEN_CONSOLE("Downloading '{}' to '{}' from folder {}. BuildId '{}'", BuildId, m_Path, m_StoragePath, BuildId); - Storage = CreateFileBuildStorage(m_StoragePath, StorageStats, false); // , .0015, 0.00004 + Storage = CreateFileBuildStorage(m_StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec); StorageName = fmt::format("Disk {}", m_StoragePath.stem()); } else @@ -6056,7 +6140,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) m_Path, m_StoragePath, BuildId); - Storage = CreateFileBuildStorage(m_StoragePath, StorageStats, false); // , .0015, 0.00004 + Storage = CreateFileBuildStorage(m_StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec); StorageName = fmt::format("Disk {}", m_StoragePath.stem()); } else @@ -6326,7 +6410,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) else if (!m_StoragePath.empty()) { ZEN_CONSOLE("Using folder {}. BuildId '{}'", m_StoragePath, BuildId); - Storage = CreateFileBuildStorage(m_StoragePath, StorageStats, false); // , .0015, 0.00004 + Storage = CreateFileBuildStorage(m_StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec); StorageName = fmt::format("Disk {}", m_StoragePath.stem()); } else @@ -6394,7 +6478,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) else if (!m_StoragePath.empty()) { ZEN_CONSOLE("Using folder {}. BuildId '{}'", m_StoragePath, BuildId); - Storage = CreateFileBuildStorage(m_StoragePath, StorageStats, false); // , .0015, 0.00004 + Storage = CreateFileBuildStorage(m_StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec); StorageName = fmt::format("Disk {}", m_StoragePath.stem()); } else |