aboutsummaryrefslogtreecommitdiff
path: root/src/zen/cmds/builds_cmd.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-03-04 10:22:53 +0100
committerGitHub Enterprise <[email protected]>2025-03-04 10:22:53 +0100
commit7cb1de5a966aa23e0c098d2bbf9009d0f82a6477 (patch)
tree5c340a3947e27b2710968442cee7f009bc6ab439 /src/zen/cmds/builds_cmd.cpp
parentlimit and validate responses before logging the text (#292) (diff)
downloadarchived-zen-7cb1de5a966aa23e0c098d2bbf9009d0f82a6477.tar.xz
archived-zen-7cb1de5a966aa23e0c098d2bbf9009d0f82a6477.zip
stream decompress (#293)
* clean up latency parameters and slow down rate updates * add DecompressToStream
Diffstat (limited to 'src/zen/cmds/builds_cmd.cpp')
-rw-r--r--src/zen/cmds/builds_cmd.cpp198
1 files changed, 141 insertions, 57 deletions
diff --git a/src/zen/cmds/builds_cmd.cpp b/src/zen/cmds/builds_cmd.cpp
index 219d01240..a9852151f 100644
--- a/src/zen/cmds/builds_cmd.cpp
+++ b/src/zen/cmds/builds_cmd.cpp
@@ -78,15 +78,19 @@ namespace {
const ChunksBlockParameters DefaultChunksBlockParams{.MaxBlockSize = 32u * 1024u * 1024u,
.MaxChunkEmbedSize = DefaultChunkedParams.MaxSize};
- const std::string ZenFolderName = ".zen";
- const std::string ZenStateFilePath = fmt::format("{}/current_state.cbo", ZenFolderName);
- const std::string ZenStateFileJsonPath = fmt::format("{}/current_state.json", ZenFolderName);
- const std::string ZenTempFolderName = fmt::format("{}/tmp", ZenFolderName);
- const std::string ZenTempCacheFolderName = fmt::format("{}/cache", ZenTempFolderName);
- const std::string ZenTempStorageFolderName = fmt::format("{}/storage", ZenTempFolderName);
- const std::string ZenTempBlockFolderName = fmt::format("{}/blocks", ZenTempFolderName);
- const std::string ZenTempChunkFolderName = fmt::format("{}/chunks", ZenTempFolderName);
- const std::string ZenExcludeManifestName = ".zen_exclude_manifest.txt";
+
+ const double DefaultLatency = 0; // .0010;
+ const double DefaultDelayPerKBSec = 0; // 0.00005;
+
+ const std::string ZenFolderName = ".zen";
+ const std::string ZenStateFilePath = fmt::format("{}/current_state.cbo", ZenFolderName);
+ const std::string ZenStateFileJsonPath = fmt::format("{}/current_state.json", ZenFolderName);
+ const std::string ZenTempFolderName = fmt::format("{}/tmp", ZenFolderName);
+ const std::string ZenTempCacheFolderName = fmt::format("{}/cache", ZenTempFolderName);
+ const std::string ZenTempStorageFolderName = fmt::format("{}/storage", ZenTempFolderName);
+ const std::string ZenTempBlockFolderName = fmt::format("{}/blocks", ZenTempFolderName);
+ const std::string ZenTempChunkFolderName = fmt::format("{}/chunks", ZenTempFolderName);
+ const std::string ZenExcludeManifestName = ".zen_exclude_manifest.txt";
const std::string UnsyncFolderName = ".unsync";
@@ -271,7 +275,7 @@ namespace {
}
uint64_t TimeUS = Timer.GetElapsedTimeUs();
uint64_t TimeDeltaUS = TimeUS - LastTimeUS;
- if (TimeDeltaUS >= 1000000)
+ if (TimeDeltaUS >= 2000000)
{
uint64_t Delta = Count - LastCount;
uint64_t PerSecond = (Delta * 1000000) / TimeDeltaUS;
@@ -3464,6 +3468,61 @@ namespace {
}
}
+ bool CanStreamDecompress(const ChunkedFolderContent& RemoteContent,
+ const std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> Locations)
+ {
+ if (Locations.size() == 1)
+ {
+ const uint32_t FirstSequenceIndex = Locations[0]->SequenceIndex;
+ if (Locations[0]->Offset == 0 && RemoteContent.ChunkedContent.ChunkCounts[FirstSequenceIndex] == 1)
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ void StreamDecompress(const std::filesystem::path& CacheFolderPath,
+ const IoHash& SequenceRawHash,
+ CompositeBuffer&& CompressedPart,
+ std::atomic<uint64_t>& WriteToDiskBytes)
+ {
+ const std::filesystem::path TempChunkSequenceFileName = GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash);
+ TemporaryFile DecompressedTemp;
+ std::error_code Ec;
+ DecompressedTemp.CreateTemporary(CacheFolderPath, Ec);
+ if (Ec)
+ {
+ throw std::runtime_error(
+ fmt::format("Failed creating temporary file for decompressing large blob {}. Reason: {}", SequenceRawHash, Ec.message()));
+ }
+ IoHash RawHash;
+ uint64_t RawSize;
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(CompressedPart, RawHash, RawSize);
+ if (!Compressed)
+ {
+ throw std::runtime_error(fmt::format("Failed to parse header of compressed large blob {}", SequenceRawHash));
+ }
+ if (RawHash != SequenceRawHash)
+ {
+ throw std::runtime_error(fmt::format("RawHash in header {} in large blob {} does match.", RawHash, SequenceRawHash));
+ }
+ bool CouldDecompress = Compressed.DecompressToStream(0, (uint64_t)-1, [&](uint64_t Offset, const CompositeBuffer& RangeBuffer) {
+ DecompressedTemp.Write(RangeBuffer, Offset);
+ WriteToDiskBytes += RangeBuffer.GetSize();
+ });
+ if (!CouldDecompress)
+ {
+ throw std::runtime_error(fmt::format("Failed to decompress large blob {}", SequenceRawHash));
+ }
+ DecompressedTemp.MoveTemporaryIntoPlace(TempChunkSequenceFileName, Ec);
+ if (Ec)
+ {
+ throw std::runtime_error(
+ fmt::format("Failed moving temporary file for decompressing large blob {}. Reason: {}", SequenceRawHash, Ec.message()));
+ }
+ }
+
void DownloadLargeBlob(BuildStorage& Storage,
const std::filesystem::path& TempFolderPath,
const std::filesystem::path& CacheFolderPath,
@@ -3527,7 +3586,7 @@ namespace {
DownloadedChunks++;
Work.ScheduleWork(
- WritePool,
+ WritePool, // GetSyncWorkerPool(),//
[&CacheFolderPath,
&RemoteContent,
&RemoteLookup,
@@ -3555,33 +3614,44 @@ namespace {
}
CompressedPart.SetDeleteOnClose(true);
- uint64_t TotalBytesWritten = 0;
-
auto ChunkHashToChunkIndexIt = RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash);
ZEN_ASSERT(ChunkHashToChunkIndexIt != RemoteLookup.ChunkHashToChunkIndex.end());
- uint32_t ChunkIndex = ChunkHashToChunkIndexIt->second;
-
- SharedBuffer Chunk = Decompress(CompositeBuffer(std::move(CompressedPart)),
- ChunkHash,
- RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex]);
+ if (CanStreamDecompress(RemoteContent, ChunkTargetPtrs))
+ {
+ const IoHash& SequenceRawHash =
+ RemoteContent.ChunkedContent.SequenceRawHashes[ChunkTargetPtrs.front()->SequenceIndex];
+ StreamDecompress(CacheFolderPath,
+ SequenceRawHash,
+ CompositeBuffer(std::move(CompressedPart)),
+ WriteToDiskBytes);
+ }
+ else
+ {
+ const uint32_t ChunkIndex = ChunkHashToChunkIndexIt->second;
+ SharedBuffer Chunk = Decompress(CompositeBuffer(std::move(CompressedPart)),
+ ChunkHash,
+ RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex]);
- // ZEN_ASSERT_SLOW(ChunkHash ==
- // IoHash::HashBuffer(Chunk.AsIoBuffer()));
+ // ZEN_ASSERT_SLOW(ChunkHash ==
+ // IoHash::HashBuffer(Chunk.AsIoBuffer()));
- if (!AbortFlag)
- {
- WriteFileCache OpenFileCache;
-
- WriteChunkToDisk(CacheFolderPath,
- RemoteContent,
- RemoteLookup,
- ChunkTargetPtrs,
- CompositeBuffer(Chunk),
- OpenFileCache,
- TotalBytesWritten);
- ChunksComplete++;
- WriteToDiskBytes += TotalBytesWritten;
+ if (!AbortFlag)
+ {
+ WriteFileCache OpenFileCache;
+
+ uint64_t TotalBytesWritten = 0;
+ WriteChunkToDisk(CacheFolderPath,
+ RemoteContent,
+ RemoteLookup,
+ ChunkTargetPtrs,
+ CompositeBuffer(Chunk),
+ OpenFileCache,
+ TotalBytesWritten);
+ ChunksComplete++;
+ WriteToDiskBytes += TotalBytesWritten;
+ }
}
+
if (!AbortFlag)
{
// Write tracking, updating this must be done without any files open (WriteFileCache)
@@ -3619,7 +3689,7 @@ namespace {
for (auto& WorkItem : WorkItems)
{
Work.ScheduleWork(
- NetworkPool,
+ NetworkPool, // GetSyncWorkerPool(),//
[WorkItem = std::move(WorkItem)](std::atomic<bool>&) {
if (!AbortFlag)
{
@@ -3830,7 +3900,7 @@ namespace {
else
{
Work.ScheduleWork(
- NetworkPool,
+ NetworkPool, // GetSyncWorkerPool(),//
[&, ChunkHash, RemoteChunkIndex, ChunkTargetPtrs](std::atomic<bool>&) {
if (!AbortFlag)
{
@@ -3880,21 +3950,38 @@ namespace {
if (!AbortFlag)
{
FilteredWrittenBytesPerSecond.Start();
- uint64_t TotalBytesWritten = 0;
- SharedBuffer Chunk =
- Decompress(CompressedPart,
- ChunkHash,
- RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex]);
+ if (CanStreamDecompress(RemoteContent, ChunkTargetPtrs))
{
- WriteFileCache OpenFileCache;
- WriteChunkToDisk(CacheFolderPath,
- RemoteContent,
- RemoteLookup,
- ChunkTargetPtrs,
- CompositeBuffer(Chunk),
- OpenFileCache,
- TotalBytesWritten);
+ const IoHash& SequenceRawHash =
+ RemoteContent.ChunkedContent
+ .SequenceRawHashes[ChunkTargetPtrs.front()->SequenceIndex];
+ StreamDecompress(CacheFolderPath,
+ SequenceRawHash,
+ CompositeBuffer(CompressedPart),
+ WriteToDiskBytes);
+ ChunkCountWritten++;
+ }
+ else
+ {
+ uint64_t TotalBytesWritten = 0;
+ SharedBuffer Chunk =
+ Decompress(CompressedPart,
+ ChunkHash,
+ RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex]);
+
+ {
+ WriteFileCache OpenFileCache;
+ WriteChunkToDisk(CacheFolderPath,
+ RemoteContent,
+ RemoteLookup,
+ ChunkTargetPtrs,
+ CompositeBuffer(Chunk),
+ OpenFileCache,
+ TotalBytesWritten);
+ ChunkCountWritten++;
+ WriteToDiskBytes += TotalBytesWritten;
+ }
}
if (!AbortFlag)
{
@@ -3925,9 +4012,6 @@ namespace {
GetFinalChunkedSequenceFileName(CacheFolderPath, SequenceRawHash));
}
}
-
- ChunkCountWritten++;
- WriteToDiskBytes += TotalBytesWritten;
}
}
},
@@ -5707,7 +5791,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
else if (!m_StoragePath.empty())
{
ZEN_CONSOLE("Querying builds in folder '{}'.", m_StoragePath);
- Storage = CreateFileBuildStorage(m_StoragePath, StorageStats, false); // , .0015, 0.00004
+ Storage = CreateFileBuildStorage(m_StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec);
}
else
{
@@ -5819,7 +5903,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
m_StoragePath,
GeneratedBuildId ? "Generated " : "",
BuildId);
- Storage = CreateFileBuildStorage(m_StoragePath, StorageStats, m_WriteMetadataAsJson); // , .0015, 0.00004
+ Storage = CreateFileBuildStorage(m_StoragePath, StorageStats, m_WriteMetadataAsJson, DefaultLatency, DefaultDelayPerKBSec);
StorageName = fmt::format("Disk {}", m_StoragePath.stem());
}
else
@@ -5952,7 +6036,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
else if (!m_StoragePath.empty())
{
ZEN_CONSOLE("Downloading '{}' to '{}' from folder {}. BuildId '{}'", BuildId, m_Path, m_StoragePath, BuildId);
- Storage = CreateFileBuildStorage(m_StoragePath, StorageStats, false); // , .0015, 0.00004
+ Storage = CreateFileBuildStorage(m_StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec);
StorageName = fmt::format("Disk {}", m_StoragePath.stem());
}
else
@@ -6056,7 +6140,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
m_Path,
m_StoragePath,
BuildId);
- Storage = CreateFileBuildStorage(m_StoragePath, StorageStats, false); // , .0015, 0.00004
+ Storage = CreateFileBuildStorage(m_StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec);
StorageName = fmt::format("Disk {}", m_StoragePath.stem());
}
else
@@ -6326,7 +6410,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
else if (!m_StoragePath.empty())
{
ZEN_CONSOLE("Using folder {}. BuildId '{}'", m_StoragePath, BuildId);
- Storage = CreateFileBuildStorage(m_StoragePath, StorageStats, false); // , .0015, 0.00004
+ Storage = CreateFileBuildStorage(m_StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec);
StorageName = fmt::format("Disk {}", m_StoragePath.stem());
}
else
@@ -6394,7 +6478,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
else if (!m_StoragePath.empty())
{
ZEN_CONSOLE("Using folder {}. BuildId '{}'", m_StoragePath, BuildId);
- Storage = CreateFileBuildStorage(m_StoragePath, StorageStats, false); // , .0015, 0.00004
+ Storage = CreateFileBuildStorage(m_StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec);
StorageName = fmt::format("Disk {}", m_StoragePath.stem());
}
else