aboutsummaryrefslogtreecommitdiff
path: root/src/zen/cmds/builds_cmd.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-10-13 13:32:36 +0200
committerGitHub Enterprise <[email protected]>2025-10-13 13:32:36 +0200
commita6925de9bca8579637fa8a4152ab2b77ef5ca90e (patch)
treee2742dcc584e78de7f8921bd655c22929b37da80 /src/zen/cmds/builds_cmd.cpp
parentmove service common code into base class (#567) (diff)
downloadarchived-zen-a6925de9bca8579637fa8a4152ab2b77ef5ca90e.tar.xz
archived-zen-a6925de9bca8579637fa8a4152ab2b77ef5ca90e.zip
refactor builds cmd (#566)
Move builds download code from builds_cmd.cpp to remotestorelib
Diffstat (limited to 'src/zen/cmds/builds_cmd.cpp')
-rw-r--r--src/zen/cmds/builds_cmd.cpp4598
1 files changed, 152 insertions, 4446 deletions
diff --git a/src/zen/cmds/builds_cmd.cpp b/src/zen/cmds/builds_cmd.cpp
index e40d4366b..00bba6f4f 100644
--- a/src/zen/cmds/builds_cmd.cpp
+++ b/src/zen/cmds/builds_cmd.cpp
@@ -24,6 +24,7 @@
#include <zenhttp/httpclientauth.h>
#include <zenhttp/httpcommon.h>
#include <zenremotestore/builds/buildstoragecache.h>
+#include <zenremotestore/builds/buildstorageoperations.h>
#include <zenremotestore/builds/filebuildstorage.h>
#include <zenremotestore/builds/jupiterbuildstorage.h>
#include <zenremotestore/chunking/chunkblock.h>
@@ -31,6 +32,7 @@
#include <zenremotestore/chunking/chunkedfile.h>
#include <zenremotestore/chunking/chunkingcontroller.h>
#include <zenremotestore/jupiter/jupiterhost.h>
+#include <zenutil/bufferedopenfile.h>
#include <zenutil/bufferedwritefilecache.h>
#include <zenutil/wildcard.h>
#include <zenutil/workerpools.h>
@@ -334,7 +336,6 @@ namespace {
std::filesystem::path ZenStateFilePath(const std::filesystem::path& ZenFolderPath) { return ZenFolderPath / "current_state.cbo"; }
// std::filesystem::path ZenStateFileJsonPath(const std::filesystem::path& ZenFolderPath) { return ZenFolderPath / "current_state.json";
// }
- std::filesystem::path ZenTempFolderPath(const std::filesystem::path& ZenFolderPath) { return ZenFolderPath / "tmp"; }
std::filesystem::path ZenTempCacheFolderPath(const std::filesystem::path& ZenFolderPath)
{
@@ -356,11 +357,6 @@ namespace {
return ZenTempFolderPath(ZenFolderPath) / "download"; // Temp storage for decompressed and validated chunks
}
- // std::filesystem::path ZenTempStorageFolderPath(const std::filesystem::path& ZenFolderPath)
- // {
- // return ZenTempFolderPath(ZenFolderPath) / "storage"; // Temp storage folder for BuildStorage implementations
- // }
-
const std::string ZenExcludeManifestName = ".zen_exclude_manifest.txt";
const std::string UnsyncFolderName = ".unsync";
@@ -368,8 +364,8 @@ namespace {
const std::string UGSFolderName = ".ugs";
const std::string LegacyZenTempFolderName = ".zen-tmp";
- const std::vector<std::string_view> DefaultExcludeFolders({UnsyncFolderName, ZenFolderName, UGSFolderName, LegacyZenTempFolderName});
- const std::vector<std::string_view> DefaultExcludeExtensions({});
+ const std::vector<std::string> DefaultExcludeFolders({UnsyncFolderName, ZenFolderName, UGSFolderName, LegacyZenTempFolderName});
+ const std::vector<std::string> DefaultExcludeExtensions({});
static bool IsVerbose = false;
static bool IsQuiet = false;
@@ -408,6 +404,62 @@ namespace {
);
+ class ConsoleOpLogProgressBar : public BuildOpLogOutput::ProgressBar
+ {
+ public:
+ ConsoleOpLogProgressBar(zen::ProgressBar::Mode InMode, std::string_view InSubTask) : m_Inner(InMode, InSubTask) {}
+
+ virtual void UpdateState(const State& NewState, bool DoLinebreak)
+ {
+ zen::ProgressBar::State State = {.Task = NewState.Task,
+ .Details = NewState.Details,
+ .TotalCount = NewState.TotalCount,
+ .RemainingCount = NewState.RemainingCount,
+ .Status = ConvertStatus(NewState.Status)};
+ m_Inner.UpdateState(State, DoLinebreak);
+ }
+ virtual void Finish() { m_Inner.Finish(); }
+
+ private:
+ zen::ProgressBar::State::EStatus ConvertStatus(State::EStatus Status)
+ {
+ switch (Status)
+ {
+ case State::EStatus::Running:
+ return zen::ProgressBar::State::EStatus::Running;
+ case State::EStatus::Aborted:
+ return zen::ProgressBar::State::EStatus::Aborted;
+ case State::EStatus::Paused:
+ return zen::ProgressBar::State::EStatus::Paused;
+ default:
+ return (zen::ProgressBar::State::EStatus)Status;
+ }
+ }
+ zen::ProgressBar m_Inner;
+ };
+
+ class ConsoleOpLogOutput : public BuildOpLogOutput
+ {
+ public:
+ ConsoleOpLogOutput(zen::ProgressBar::Mode InMode) : m_Mode(InMode) {}
+ virtual void EmitLogMessage(int LogLevel, std::string_view Format, fmt::format_args Args)
+ {
+ logging::EmitConsoleLogMessage(LogLevel, Format, Args);
+ }
+
+ virtual void SetLogOperationName(std::string_view Name) { zen::ProgressBar::SetLogOperationName(m_Mode, Name); }
+ virtual void SetLogOperationProgress(uint32_t StepIndex, uint32_t StepCount)
+ {
+ zen::ProgressBar::SetLogOperationProgress(m_Mode, StepIndex, StepCount);
+ }
+ virtual uint32_t GetProgressUpdateDelayMS() { return GetUpdateDelayMS(m_Mode); }
+
+ virtual ProgressBar* CreateProgressBar(std::string_view InSubTask) { return new ConsoleOpLogProgressBar(m_Mode, InSubTask); }
+
+ private:
+ zen::ProgressBar::Mode m_Mode;
+ };
+
bool IncludePath(std::span<const std::string> IncludeWildcards,
std::span<const std::string> ExcludeWildcards,
const std::filesystem::path& Path)
@@ -481,46 +533,6 @@ namespace {
return Result;
}
- void RenameFileWithRetry(const std::filesystem::path& SourcePath, const std::filesystem::path& TargetPath)
- {
- std::error_code Ec;
- RenameFile(SourcePath, TargetPath, Ec);
- for (size_t Retries = 0; Ec && Retries < 10; Retries++)
- {
- ZEN_ASSERT_SLOW(IsFile(SourcePath));
- if (Retries > 5)
- {
- ZEN_CONSOLE_WARN("Unable to overwrite file {} ({}: {}), retrying...", TargetPath, Ec.value(), Ec.message());
- }
- Sleep(50 + int(Retries * 150));
- Ec.clear();
- RenameFile(SourcePath, TargetPath, Ec);
- }
- if (Ec)
- {
- throw std::system_error(std::error_code(Ec.value(), std::system_category()),
- fmt::format("Rename from '{}' to '{}' failed with: {}", SourcePath, TargetPath, Ec.message()));
- }
- }
-
- void TryRemoveFile(const std::filesystem::path& Path)
- {
- std::error_code Ec;
- RemoveFile(Path, Ec);
- if (Ec)
- {
- if (IsFile(Path, Ec))
- {
- Ec.clear();
- RemoveFile(Path, Ec);
- if (Ec)
- {
- ZEN_DEBUG("Failed removing file '{}', reason: {}", Path, Ec.message());
- }
- }
- }
- }
-
void RemoveFileWithRetry(const std::filesystem::path& Path)
{
std::error_code Ec;
@@ -563,89 +575,6 @@ namespace {
}
}
- void CopyFile(const std::filesystem::path& SourceFilePath,
- const std::filesystem::path& TargetFilePath,
- uint64_t RawSize,
- std::atomic<uint64_t>& WriteCount,
- std::atomic<uint64_t>& WriteByteCount,
- std::atomic<uint64_t>& CloneCount,
- std::atomic<uint64_t>& CloneByteCount)
- {
- if (AllowFileClone && TryCloneFile(SourceFilePath, TargetFilePath))
- {
- WriteCount += 1;
- WriteByteCount += RawSize;
- CloneCount += 1;
- CloneByteCount += RawSize;
- }
- else
- {
- BasicFile TargetFile(TargetFilePath, BasicFile::Mode::kTruncate);
- if (UseSparseFiles)
- {
- PrepareFileForScatteredWrite(TargetFile.Handle(), RawSize);
- }
- uint64_t Offset = 0;
- if (!ScanFile(SourceFilePath, 512u * 1024u, [&](const void* Data, size_t Size) {
- TargetFile.Write(Data, Size, Offset);
- Offset += Size;
- WriteCount++;
- WriteByteCount += Size;
- }))
- {
- throw std::runtime_error(fmt::format("Failed to copy scavenged file '{}' to '{}'", SourceFilePath, TargetFilePath));
- }
- }
- }
-
- uint32_t SetNativeFileAttributes(const std::filesystem::path FilePath, SourcePlatform SourcePlatform, uint32_t Attributes)
- {
-#if ZEN_PLATFORM_WINDOWS
- if (SourcePlatform == SourcePlatform::Windows)
- {
- SetFileAttributes(FilePath, Attributes);
- return Attributes;
- }
- else
- {
- uint32_t CurrentAttributes = GetFileAttributes(FilePath);
- uint32_t NewAttributes = MakeFileAttributeReadOnly(CurrentAttributes, IsFileModeReadOnly(Attributes));
- if (CurrentAttributes != NewAttributes)
- {
- SetFileAttributes(FilePath, NewAttributes);
- }
- return NewAttributes;
- }
-#endif // ZEN_PLATFORM_WINDOWS
-#if ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC
- if (SourcePlatform != SourcePlatform::Windows)
- {
- SetFileMode(FilePath, Attributes);
- return Attributes;
- }
- else
- {
- uint32_t CurrentMode = GetFileMode(FilePath);
- uint32_t NewMode = MakeFileModeReadOnly(CurrentMode, IsFileAttributeReadOnly(Attributes));
- if (CurrentMode != NewMode)
- {
- SetFileMode(FilePath, NewMode);
- }
- return NewMode;
- }
-#endif // ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC
- };
-
- uint32_t GetNativeFileAttributes(const std::filesystem::path FilePath)
- {
-#if ZEN_PLATFORM_WINDOWS
- return GetFileAttributes(FilePath);
-#endif // ZEN_PLATFORM_WINDOWS
-#if ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC
- return GetFileMode(FilePath);
-#endif // ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC
- }
-
template<typename T>
std::string FormatArray(std::span<const T> Items, std::string_view Prefix)
{
@@ -657,7 +586,7 @@ namespace {
return SB.ToString();
}
- bool CleanDirectory(const std::filesystem::path& Path, std::span<const std::string_view> ExcludeDirectories)
+ bool CleanDirectory(const std::filesystem::path& Path, std::span<const std::string> ExcludeDirectories)
{
ZEN_TRACE_CPU("CleanDirectory");
Stopwatch Timer;
@@ -672,12 +601,12 @@ namespace {
struct AsyncVisitor : public GetDirectoryContentVisitor
{
- AsyncVisitor(const std::filesystem::path& InPath,
- std::atomic<bool>& InCleanWipe,
- std::atomic<uint64_t>& InDiscoveredItemCount,
- std::atomic<uint64_t>& InDeletedItemCount,
- std::atomic<uint64_t>& InDeletedByteCount,
- std::span<const std::string_view> InExcludeDirectories)
+ AsyncVisitor(const std::filesystem::path& InPath,
+ std::atomic<bool>& InCleanWipe,
+ std::atomic<uint64_t>& InDiscoveredItemCount,
+ std::atomic<uint64_t>& InDeletedItemCount,
+ std::atomic<uint64_t>& InDeletedByteCount,
+ std::span<const std::string> InExcludeDirectories)
: Path(InPath)
, CleanWipe(InCleanWipe)
, DiscoveredItemCount(InDiscoveredItemCount)
@@ -742,12 +671,12 @@ namespace {
}
}
}
- const std::filesystem::path& Path;
- std::atomic<bool>& CleanWipe;
- std::atomic<uint64_t>& DiscoveredItemCount;
- std::atomic<uint64_t>& DeletedItemCount;
- std::atomic<uint64_t>& DeletedByteCount;
- std::span<const std::string_view> ExcludeDirectories;
+ const std::filesystem::path& Path;
+ std::atomic<bool>& CleanWipe;
+ std::atomic<uint64_t>& DiscoveredItemCount;
+ std::atomic<uint64_t>& DeletedItemCount;
+ std::atomic<uint64_t>& DeletedByteCount;
+ std::span<const std::string> ExcludeDirectories;
} Visitor(Path, CleanWipe, DiscoveredItemCount, DeletedItemCount, DeletedByteCount, ExcludeDirectories);
GetDirectoryContent(
@@ -765,7 +694,7 @@ namespace {
for (std::filesystem::path& LocalDirPath : LocalDirectoryContent.Directories)
{
bool Leave = false;
- for (const std::string_view ExcludeDirectory : ExcludeDirectories)
+ for (const std::string& ExcludeDirectory : ExcludeDirectories)
{
if (LocalDirPath == (Path / ExcludeDirectory))
{
@@ -957,29 +886,6 @@ namespace {
return Count * 1000000 / ElapsedWallTimeUS;
}
- std::filesystem::path GetTempChunkedSequenceFileName(const std::filesystem::path& CacheFolderPath, const IoHash& RawHash)
- {
- return CacheFolderPath / (RawHash.ToHexString() + ".tmp");
- }
-
- std::filesystem::path GetFinalChunkedSequenceFileName(const std::filesystem::path& CacheFolderPath, const IoHash& RawHash)
- {
- return CacheFolderPath / RawHash.ToHexString();
- }
-
- struct DiskStatistics
- {
- std::atomic<uint64_t> OpenReadCount = 0;
- std::atomic<uint64_t> OpenWriteCount = 0;
- std::atomic<uint64_t> ReadCount = 0;
- std::atomic<uint64_t> ReadByteCount = 0;
- std::atomic<uint64_t> WriteCount = 0;
- std::atomic<uint64_t> WriteByteCount = 0;
- std::atomic<uint64_t> CloneCount = 0;
- std::atomic<uint64_t> CloneByteCount = 0;
- std::atomic<uint64_t> CurrentOpenFileCount = 0;
- };
-
struct FindBlocksStatistics
{
uint64_t FindBlockTimeMS = 0;
@@ -1061,66 +967,6 @@ namespace {
}
};
- struct CacheMappingStatistics
- {
- uint64_t CacheChunkCount = 0;
- uint64_t CacheChunkByteCount = 0;
-
- uint64_t CacheBlockCount = 0;
- uint64_t CacheBlocksByteCount = 0;
-
- uint64_t CacheSequenceHashesCount = 0;
- uint64_t CacheSequenceHashesByteCount = 0;
-
- uint64_t CacheScanElapsedWallTimeUs = 0;
-
- uint32_t LocalPathsMatchingSequencesCount = 0;
- uint64_t LocalPathsMatchingSequencesByteCount = 0;
-
- uint64_t LocalChunkMatchingRemoteCount = 0;
- uint64_t LocalChunkMatchingRemoteByteCount = 0;
-
- uint64_t LocalScanElapsedWallTimeUs = 0;
-
- uint32_t ScavengedPathsMatchingSequencesCount = 0;
- uint64_t ScavengedPathsMatchingSequencesByteCount = 0;
-
- uint64_t ScavengedChunkMatchingRemoteCount = 0;
- uint64_t ScavengedChunkMatchingRemoteByteCount = 0;
-
- uint64_t ScavengeElapsedWallTimeUs = 0;
- };
-
- struct DownloadStatistics
- {
- std::atomic<uint64_t> RequestsCompleteCount = 0;
-
- std::atomic<uint64_t> DownloadedChunkCount = 0;
- std::atomic<uint64_t> DownloadedChunkByteCount = 0;
- std::atomic<uint64_t> MultipartAttachmentCount = 0;
-
- std::atomic<uint64_t> DownloadedBlockCount = 0;
- std::atomic<uint64_t> DownloadedBlockByteCount = 0;
-
- std::atomic<uint64_t> DownloadedPartialBlockCount = 0;
- std::atomic<uint64_t> DownloadedPartialBlockByteCount = 0;
- };
-
- struct WriteChunkStatistics
- {
- uint64_t DownloadTimeUs = 0;
- uint64_t WriteTimeUs = 0;
- uint64_t WriteChunksElapsedWallTimeUs = 0;
- };
-
- struct RebuildFolderStateStatistics
- {
- uint64_t CleanFolderElapsedWallTimeUs = 0;
- std::atomic<uint32_t> FinalizeTreeFilesMovedCount = 0;
- std::atomic<uint32_t> FinalizeTreeFilesCopiedCount = 0;
- uint64_t FinalizeTreeElapsedWallTimeUs = 0;
- };
-
struct VerifyFolderStatistics
{
std::atomic<uint64_t> FilesVerified = 0;
@@ -1129,16 +975,6 @@ namespace {
uint64_t VerifyElapsedWallTimeUs = 0;
};
- struct StorageInstance
- {
- std::unique_ptr<HttpClient> BuildStorageHttp;
- std::unique_ptr<BuildStorage> BuildStorage;
- std::string StorageName;
- std::unique_ptr<HttpClient> CacheHttp;
- std::unique_ptr<BuildStorageCache> BuildCacheStorage;
- std::string CacheName;
- };
-
std::vector<uint32_t> CalculateAbsoluteChunkOrders(const std::span<const IoHash> LocalChunkHashes,
const std::span<const uint32_t> LocalChunkOrder,
const tsl::robin_map<IoHash, uint32_t, IoHash::Hasher>& ChunkHashToLocalChunkIndex,
@@ -1632,405 +1468,6 @@ namespace {
TemporaryFile::SafeWriteFile(WritePath, JsonPayload);
}
- struct ScavengeSource
- {
- std::filesystem::path StateFilePath;
- std::filesystem::path Path;
- };
-
- std::vector<ScavengeSource> GetDownloadedStatePaths(const std::filesystem::path& SystemRootDir)
- {
- std::vector<ScavengeSource> Result;
- DirectoryContent Content;
- GetDirectoryContent(SystemRootDir / "builds" / "downloads", DirectoryContentFlags::IncludeFiles, Content);
- for (const std::filesystem::path& EntryPath : Content.Files)
- {
- bool DeleteEntry = false;
- IoHash EntryPathHash;
- if (IoHash::TryParse(EntryPath.stem().string(), EntryPathHash))
- {
- // Read state and verify that it is valid
- IoBuffer MetaDataJson = ReadFile(EntryPath).Flatten();
- std::string_view Json(reinterpret_cast<const char*>(MetaDataJson.GetData()), MetaDataJson.GetSize());
- std::string JsonError;
- CbObject DownloadInfo = LoadCompactBinaryFromJson(Json, JsonError).AsObject();
- if (JsonError.empty())
- {
- std::filesystem::path StateFilePath = DownloadInfo["statePath"].AsU8String();
- if (IsFile(StateFilePath))
- {
- std::filesystem::path Path = DownloadInfo["path"].AsU8String();
- if (IsDir(Path))
- {
- Result.push_back({.StateFilePath = std::move(StateFilePath), .Path = std::move(Path)});
- }
- else
- {
- DeleteEntry = true;
- }
- }
- else
- {
- DeleteEntry = true;
- }
- }
- else
- {
- ZEN_CONSOLE_WARN("Invalid download state file at {}. '{}'", EntryPath, JsonError);
- DeleteEntry = true;
- }
- }
-
- if (DeleteEntry)
- {
- std::error_code DummyEc;
- std::filesystem::remove(EntryPath, DummyEc);
- }
- }
- return Result;
- }
-
- struct BlockRangeDescriptor
- {
- uint32_t BlockIndex = (uint32_t)-1;
- uint64_t RangeStart = 0;
- uint64_t RangeLength = 0;
- uint32_t ChunkBlockIndexStart = 0;
- uint32_t ChunkBlockIndexCount = 0;
- };
-
- BlockRangeDescriptor MergeBlockRanges(std::span<const BlockRangeDescriptor> Ranges)
- {
- ZEN_ASSERT(Ranges.size() > 1);
- const BlockRangeDescriptor& First = Ranges.front();
- const BlockRangeDescriptor& Last = Ranges.back();
-
- return BlockRangeDescriptor{
- .BlockIndex = First.BlockIndex,
- .RangeStart = First.RangeStart,
- .RangeLength = Last.RangeStart + Last.RangeLength - First.RangeStart,
- .ChunkBlockIndexStart = First.ChunkBlockIndexStart,
- .ChunkBlockIndexCount = Last.ChunkBlockIndexStart + Last.ChunkBlockIndexCount - First.ChunkBlockIndexStart};
- }
-
- std::optional<std::vector<BlockRangeDescriptor>> MakeOptionalBlockRangeVector(uint64_t TotalBlockSize,
- const BlockRangeDescriptor& Range)
- {
- if (Range.RangeLength == TotalBlockSize)
- {
- return {};
- }
- else
- {
- return std::vector<BlockRangeDescriptor>{Range};
- }
- };
-
- struct BlockRangeLimit
- {
- uint16_t SizePercent;
- uint16_t MaxRangeCount;
- };
-
- static const uint16_t FullBlockRangePercentLimit = 95;
-
- static const std::vector<BlockRangeLimit> ForceMergeLimits = {{.SizePercent = FullBlockRangePercentLimit, .MaxRangeCount = 1},
- {.SizePercent = 90, .MaxRangeCount = 2},
- {.SizePercent = 85, .MaxRangeCount = 8},
- {.SizePercent = 80, .MaxRangeCount = 16},
- {.SizePercent = 70, .MaxRangeCount = 32},
- {.SizePercent = 60, .MaxRangeCount = 48},
- {.SizePercent = 2, .MaxRangeCount = 56},
- {.SizePercent = 0, .MaxRangeCount = 64}};
-
- const BlockRangeLimit* GetBlockRangeLimitForRange(std::span<const BlockRangeLimit> Limits,
- uint64_t TotalBlockSize,
- std::span<const BlockRangeDescriptor> Ranges)
- {
- if (Ranges.size() > 1)
- {
- const std::uint64_t WantedSize =
- std::accumulate(Ranges.begin(), Ranges.end(), uint64_t(0), [](uint64_t Current, const BlockRangeDescriptor& Range) {
- return Current + Range.RangeLength;
- });
-
- const double RangeRequestedPercent = (WantedSize * 100.0) / TotalBlockSize;
-
- for (const BlockRangeLimit& Limit : Limits)
- {
- if (RangeRequestedPercent >= Limit.SizePercent && Ranges.size() > Limit.MaxRangeCount)
- {
- return &Limit;
- }
- }
- }
- return nullptr;
- };
-
- std::vector<BlockRangeDescriptor> CollapseBlockRanges(const uint64_t AlwaysAcceptableGap,
- std::span<const BlockRangeDescriptor> BlockRanges)
- {
- ZEN_ASSERT(BlockRanges.size() > 1);
- std::vector<BlockRangeDescriptor> CollapsedBlockRanges;
-
- auto BlockRangesIt = BlockRanges.begin();
- CollapsedBlockRanges.push_back(*BlockRangesIt++);
- for (; BlockRangesIt != BlockRanges.end(); BlockRangesIt++)
- {
- BlockRangeDescriptor& LastRange = CollapsedBlockRanges.back();
-
- const uint64_t BothRangeSize = BlockRangesIt->RangeLength + LastRange.RangeLength;
-
- const uint64_t Gap = BlockRangesIt->RangeStart - (LastRange.RangeStart + LastRange.RangeLength);
- if (Gap <= Max(BothRangeSize / 16, AlwaysAcceptableGap))
- {
- LastRange.ChunkBlockIndexCount =
- (BlockRangesIt->ChunkBlockIndexStart + BlockRangesIt->ChunkBlockIndexCount) - LastRange.ChunkBlockIndexStart;
- LastRange.RangeLength = (BlockRangesIt->RangeStart + BlockRangesIt->RangeLength) - LastRange.RangeStart;
- }
- else
- {
- CollapsedBlockRanges.push_back(*BlockRangesIt);
- }
- }
-
- return CollapsedBlockRanges;
- };
-
- uint64_t CalculateNextGap(std::span<const BlockRangeDescriptor> BlockRanges)
- {
- ZEN_ASSERT(BlockRanges.size() > 1);
- uint64_t AcceptableGap = (uint64_t)-1;
- for (size_t RangeIndex = 0; RangeIndex < BlockRanges.size() - 1; RangeIndex++)
- {
- const BlockRangeDescriptor& Range = BlockRanges[RangeIndex];
- const BlockRangeDescriptor& NextRange = BlockRanges[RangeIndex + 1];
-
- const uint64_t Gap = NextRange.RangeStart - (Range.RangeStart + Range.RangeLength);
- AcceptableGap = Min(Gap, AcceptableGap);
- }
- AcceptableGap = RoundUp(AcceptableGap, 16u * 1024u);
- return AcceptableGap;
- };
-
- std::optional<std::vector<BlockRangeDescriptor>> CalculateBlockRanges(uint32_t BlockIndex,
- const ChunkBlockDescription& BlockDescription,
- std::span<const uint32_t> BlockChunkIndexNeeded,
- bool LimitToSingleRange,
- const uint64_t ChunkStartOffsetInBlock,
- const uint64_t TotalBlockSize,
- uint64_t& OutTotalWantedChunksSize)
- {
- ZEN_TRACE_CPU("UpdateFolder_HandleBlocks_PartialAnalysis");
-
- std::vector<BlockRangeDescriptor> BlockRanges;
- {
- uint64_t CurrentOffset = ChunkStartOffsetInBlock;
- uint32_t ChunkBlockIndex = 0;
- uint32_t NeedBlockChunkIndexOffset = 0;
- BlockRangeDescriptor NextRange{.BlockIndex = BlockIndex};
- while (NeedBlockChunkIndexOffset < BlockChunkIndexNeeded.size() && ChunkBlockIndex < BlockDescription.ChunkRawHashes.size())
- {
- const uint32_t ChunkCompressedLength = BlockDescription.ChunkCompressedLengths[ChunkBlockIndex];
- if (ChunkBlockIndex < BlockChunkIndexNeeded[NeedBlockChunkIndexOffset])
- {
- if (NextRange.RangeLength > 0)
- {
- BlockRanges.push_back(NextRange);
- NextRange = {.BlockIndex = BlockIndex};
- }
- ChunkBlockIndex++;
- CurrentOffset += ChunkCompressedLength;
- }
- else if (ChunkBlockIndex == BlockChunkIndexNeeded[NeedBlockChunkIndexOffset])
- {
- if (NextRange.RangeLength == 0)
- {
- NextRange.RangeStart = CurrentOffset;
- NextRange.ChunkBlockIndexStart = ChunkBlockIndex;
- }
- NextRange.RangeLength += ChunkCompressedLength;
- NextRange.ChunkBlockIndexCount++;
- ChunkBlockIndex++;
- CurrentOffset += ChunkCompressedLength;
- NeedBlockChunkIndexOffset++;
- }
- else
- {
- ZEN_ASSERT(false);
- }
- }
- if (NextRange.RangeLength > 0)
- {
- BlockRanges.push_back(NextRange);
- }
- }
- ZEN_ASSERT(!BlockRanges.empty());
-
- OutTotalWantedChunksSize =
- std::accumulate(BlockRanges.begin(), BlockRanges.end(), uint64_t(0), [](uint64_t Current, const BlockRangeDescriptor& Range) {
- return Current + Range.RangeLength;
- });
-
- double RangeWantedPercent = (OutTotalWantedChunksSize * 100.0) / TotalBlockSize;
-
- if (BlockRanges.size() == 1)
- {
- ZEN_CONSOLE_VERBOSE("Range request of {} ({:.2f}%) using single range from block {} ({}) as is",
- NiceBytes(OutTotalWantedChunksSize),
- RangeWantedPercent,
- BlockDescription.BlockHash,
- NiceBytes(TotalBlockSize));
- return BlockRanges;
- }
-
- if (LimitToSingleRange)
- {
- const BlockRangeDescriptor MergedRange = MergeBlockRanges(BlockRanges);
- const double RangeRequestedPercent = (MergedRange.RangeLength * 100.0) / TotalBlockSize;
- const double WastedPercent = ((MergedRange.RangeLength - OutTotalWantedChunksSize) * 100.0) / MergedRange.RangeLength;
- ZEN_CONSOLE_VERBOSE(
- "Range request of {} ({:.2f}%) using {} ranges from block {} ({}) limited to single block range {} ({:.2f}%) wasting "
- "{:.2f}% ({})",
- NiceBytes(OutTotalWantedChunksSize),
- RangeWantedPercent,
- BlockRanges.size(),
- BlockDescription.BlockHash,
- NiceBytes(TotalBlockSize),
- NiceBytes(MergedRange.RangeLength),
- RangeRequestedPercent,
- WastedPercent,
- NiceBytes(MergedRange.RangeLength - OutTotalWantedChunksSize));
- return MakeOptionalBlockRangeVector(TotalBlockSize, MergedRange);
- }
-
- if (RangeWantedPercent > FullBlockRangePercentLimit)
- {
- const BlockRangeDescriptor MergedRange = MergeBlockRanges(BlockRanges);
- const double RangeRequestedPercent = (MergedRange.RangeLength * 100.0) / TotalBlockSize;
- const double WastedPercent = ((MergedRange.RangeLength - OutTotalWantedChunksSize) * 100.0) / MergedRange.RangeLength;
- ZEN_CONSOLE_VERBOSE(
- "Range request of {} ({:.2f}%) using {} ranges from block {} ({}) exceeds {}%. Merged to single block range {} "
- "({:.2f}%) wasting {:.2f}% ({})",
- NiceBytes(OutTotalWantedChunksSize),
- RangeWantedPercent,
- BlockRanges.size(),
- BlockDescription.BlockHash,
- NiceBytes(TotalBlockSize),
- FullBlockRangePercentLimit,
- NiceBytes(MergedRange.RangeLength),
- RangeRequestedPercent,
- WastedPercent,
- NiceBytes(MergedRange.RangeLength - OutTotalWantedChunksSize));
- return MakeOptionalBlockRangeVector(TotalBlockSize, MergedRange);
- }
-
- std::vector<BlockRangeDescriptor> CollapsedBlockRanges = CollapseBlockRanges(16u * 1024u, BlockRanges);
- while (GetBlockRangeLimitForRange(ForceMergeLimits, TotalBlockSize, CollapsedBlockRanges))
- {
- CollapsedBlockRanges = CollapseBlockRanges(CalculateNextGap(CollapsedBlockRanges), CollapsedBlockRanges);
- }
-
- const std::uint64_t WantedCollapsedSize =
- std::accumulate(CollapsedBlockRanges.begin(),
- CollapsedBlockRanges.end(),
- uint64_t(0),
- [](uint64_t Current, const BlockRangeDescriptor& Range) { return Current + Range.RangeLength; });
-
- const double CollapsedRangeRequestedPercent = (WantedCollapsedSize * 100.0) / TotalBlockSize;
-
- {
- const double WastedPercent = ((WantedCollapsedSize - OutTotalWantedChunksSize) * 100.0) / WantedCollapsedSize;
-
- ZEN_CONSOLE_VERBOSE(
- "Range request of {} ({:.2f}%) using {} ranges from block {} ({}) collapsed to {} {:.2f}% using {} ranges wasting {:.2f}% "
- "({})",
- NiceBytes(OutTotalWantedChunksSize),
- RangeWantedPercent,
- BlockRanges.size(),
- BlockDescription.BlockHash,
- NiceBytes(TotalBlockSize),
- NiceBytes(WantedCollapsedSize),
- CollapsedRangeRequestedPercent,
- CollapsedBlockRanges.size(),
- WastedPercent,
- NiceBytes(WantedCollapsedSize - OutTotalWantedChunksSize));
- return CollapsedBlockRanges;
- }
- };
-
- class BufferedOpenFile
- {
- public:
- BufferedOpenFile(const std::filesystem::path Path, DiskStatistics& DiskStats)
- : m_Source(Path, BasicFile::Mode::kRead)
- , m_SourceSize(m_Source.FileSize())
- , m_DiskStats(DiskStats)
- {
- m_DiskStats.OpenReadCount++;
- m_DiskStats.CurrentOpenFileCount++;
- }
- ~BufferedOpenFile() { m_DiskStats.CurrentOpenFileCount--; }
- BufferedOpenFile() = delete;
- BufferedOpenFile(const BufferedOpenFile&) = delete;
- BufferedOpenFile(BufferedOpenFile&&) = delete;
- BufferedOpenFile& operator=(BufferedOpenFile&&) = delete;
- BufferedOpenFile& operator=(const BufferedOpenFile&) = delete;
-
- const uint64_t BlockSize = 256u * 1024u;
- CompositeBuffer GetRange(uint64_t Offset, uint64_t Size)
- {
- ZEN_TRACE_CPU("BufferedOpenFile::GetRange");
-
- ZEN_ASSERT((m_CacheBlockIndex == (uint64_t)-1) || m_Cache);
- auto _ = MakeGuard([&]() { ZEN_ASSERT((m_CacheBlockIndex == (uint64_t)-1) || m_Cache); });
-
- ZEN_ASSERT((Offset + Size) <= m_SourceSize);
- const uint64_t BlockIndexStart = Offset / BlockSize;
- const uint64_t BlockIndexEnd = (Offset + Size - 1) / BlockSize;
-
- std::vector<SharedBuffer> BufferRanges;
- BufferRanges.reserve(BlockIndexEnd - BlockIndexStart + 1);
-
- uint64_t ReadOffset = Offset;
- for (uint64_t BlockIndex = BlockIndexStart; BlockIndex <= BlockIndexEnd; BlockIndex++)
- {
- const uint64_t BlockStartOffset = BlockIndex * BlockSize;
- if (m_CacheBlockIndex != BlockIndex)
- {
- uint64_t CacheSize = Min(BlockSize, m_SourceSize - BlockStartOffset);
- ZEN_ASSERT(CacheSize > 0);
- m_Cache = IoBuffer(CacheSize);
- m_Source.Read(m_Cache.GetMutableView().GetData(), CacheSize, BlockStartOffset);
- m_DiskStats.ReadCount++;
- m_DiskStats.ReadByteCount += CacheSize;
- m_CacheBlockIndex = BlockIndex;
- }
-
- const uint64_t BytesRead = ReadOffset - Offset;
- ZEN_ASSERT(BlockStartOffset <= ReadOffset);
- const uint64_t OffsetIntoBlock = ReadOffset - BlockStartOffset;
- ZEN_ASSERT(OffsetIntoBlock < m_Cache.GetSize());
- const uint64_t BlockBytes = Min(m_Cache.GetSize() - OffsetIntoBlock, Size - BytesRead);
- BufferRanges.emplace_back(SharedBuffer(IoBuffer(m_Cache, OffsetIntoBlock, BlockBytes)));
- ReadOffset += BlockBytes;
- }
- CompositeBuffer Result(std::move(BufferRanges));
- ZEN_ASSERT(Result.GetSize() == Size);
- return Result;
- }
-
- public:
- void* Handle() { return m_Source.Handle(); }
-
- private:
- BasicFile m_Source;
- const uint64_t m_SourceSize;
- DiskStatistics& m_DiskStats;
- uint64_t m_CacheBlockIndex = (uint64_t)-1;
- IoBuffer m_Cache;
- };
-
class ReadFileCache
{
public:
@@ -2079,7 +1516,12 @@ namespace {
m_OpenFiles.pop_back();
}
m_OpenFiles.insert(m_OpenFiles.begin(),
- std::make_pair(SequenceIndex, std::make_unique<BufferedOpenFile>(LocalFilePath, m_DiskStats)));
+ std::make_pair(SequenceIndex,
+ std::make_unique<BufferedOpenFile>(LocalFilePath,
+ m_DiskStats.OpenReadCount,
+ m_DiskStats.CurrentOpenFileCount,
+ m_DiskStats.ReadCount,
+ m_DiskStats.ReadByteCount)));
CompositeBuffer Result = m_OpenFiles.front().second->GetRange(Offset, Size);
return Result;
}
@@ -2290,72 +1732,6 @@ namespace {
return CompressedBuffer::FromCompressedNoValidate(CompositeBuffer(std::move(ResultBuffers)));
};
- void DownloadLargeBlob(BuildStorage& Storage,
- const std::filesystem::path& DownloadFolder,
- const Oid& BuildId,
- const IoHash& ChunkHash,
- const std::uint64_t PreferredMultipartChunkSize,
- ParallelWork& Work,
- WorkerThreadPool& NetworkPool,
- DownloadStatistics& DownloadStats,
- std::function<void(IoBuffer&& Payload)>&& OnDownloadComplete)
- {
- ZEN_TRACE_CPU("DownloadLargeBlob");
-
- struct WorkloadData
- {
- TemporaryFile TempFile;
- };
- std::shared_ptr<WorkloadData> Workload(std::make_shared<WorkloadData>());
-
- std::error_code Ec;
- Workload->TempFile.CreateTemporary(DownloadFolder, Ec);
- if (Ec)
- {
- throw std::runtime_error(
- fmt::format("Failed opening temporary file '{}': {} ({})", Workload->TempFile.GetPath(), Ec.message(), Ec.value()));
- }
- std::vector<std::function<void()>> WorkItems = Storage.GetLargeBuildBlob(
- BuildId,
- ChunkHash,
- PreferredMultipartChunkSize,
- [Workload, &DownloadStats](uint64_t Offset, const IoBuffer& Chunk) {
- DownloadStats.DownloadedChunkByteCount += Chunk.GetSize();
-
- if (!AbortFlag.load())
- {
- ZEN_TRACE_CPU("DownloadLargeBlob_Save");
- Workload->TempFile.Write(Chunk.GetView(), Offset);
- }
- },
- [Workload, &DownloadStats, OnDownloadComplete = std::move(OnDownloadComplete)]() {
- DownloadStats.DownloadedChunkCount++;
- if (!AbortFlag.load())
- {
- uint64_t PayloadSize = Workload->TempFile.FileSize();
- void* FileHandle = Workload->TempFile.Detach();
- ZEN_ASSERT(FileHandle != nullptr);
- IoBuffer Payload(IoBuffer::File, FileHandle, 0, PayloadSize, true);
- Payload.SetDeleteOnClose(true);
- OnDownloadComplete(std::move(Payload));
- }
- });
- if (!WorkItems.empty())
- {
- DownloadStats.MultipartAttachmentCount++;
- }
- for (auto& WorkItem : WorkItems)
- {
- Work.ScheduleWork(NetworkPool, [WorkItem = std::move(WorkItem)](std::atomic<bool>&) {
- ZEN_TRACE_CPU("DownloadLargeBlob_Work");
- if (!AbortFlag)
- {
- WorkItem();
- }
- });
- }
- }
-
struct ValidateStatistics
{
uint64_t BuildBlobSize = 0;
@@ -3843,7 +3219,7 @@ namespace {
ChunkingStatistics ChunkingStats;
{
auto IsAcceptedFolder = [ExcludeFolders = DefaultExcludeFolders](const std::string_view& RelativePath) -> bool {
- for (const std::string_view& ExcludeFolder : ExcludeFolders)
+ for (const std::string& ExcludeFolder : ExcludeFolders)
{
if (RelativePath.starts_with(ExcludeFolder))
{
@@ -3862,7 +3238,7 @@ namespace {
auto IsAcceptedFile = [ExcludeExtensions =
DefaultExcludeExtensions](const std::string_view& RelativePath, uint64_t, uint32_t) -> bool {
- for (const std::string_view& ExcludeExtension : ExcludeExtensions)
+ for (const std::string& ExcludeExtension : ExcludeExtensions)
{
if (RelativePath.ends_with(ExcludeExtension))
{
@@ -3961,7 +3337,7 @@ namespace {
const std::filesystem::path AssetFilePath = (Path / AssetPath).make_preferred();
Content.RawSizes.push_back(FileSizeFromPath(AssetFilePath));
#if ZEN_PLATFORM_WINDOWS
- Content.Attributes.push_back(GetFileAttributes(AssetFilePath));
+ Content.Attributes.push_back(GetFileAttributesFromPath(AssetFilePath));
#endif // ZEN_PLATFORM_WINDOWS
#if ZEN_PLATFORM_MAC || ZEN_PLATFORM_LINUX
Content.Attributes.push_back(GetFileMode(AssetFilePath));
@@ -3975,7 +3351,7 @@ namespace {
const std::filesystem::path ManifestFilePath = (Path / ManifestPath).make_preferred();
Content.RawSizes.push_back(FileSizeFromPath(ManifestFilePath));
#if ZEN_PLATFORM_WINDOWS
- Content.Attributes.push_back(GetFileAttributes(ManifestFilePath));
+ Content.Attributes.push_back(GetFileAttributesFromPath(ManifestFilePath));
#endif // ZEN_PLATFORM_WINDOWS
#if ZEN_PLATFORM_MAC || ZEN_PLATFORM_LINUX
Content.Attributes.push_back(GetFileMode(ManifestFilePath));
@@ -4853,7 +4229,7 @@ namespace {
std::vector<std::string> Errors;
auto IsAcceptedFolder = [ExcludeFolders = DefaultExcludeFolders](const std::string_view& RelativePath) -> bool {
- for (const std::string_view& ExcludeFolder : ExcludeFolders)
+ for (const std::string& ExcludeFolder : ExcludeFolders)
{
if (RelativePath.starts_with(ExcludeFolder))
{
@@ -5025,863 +4401,7 @@ namespace {
}
}
- std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> GetRemainingChunkTargets(
- std::span<const std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
- const ChunkedContentLookup& Lookup,
- uint32_t ChunkIndex)
- {
- std::span<const ChunkedContentLookup::ChunkSequenceLocation> ChunkSources = GetChunkSequenceLocations(Lookup, ChunkIndex);
- std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs;
- if (!ChunkSources.empty())
- {
- ChunkTargetPtrs.reserve(ChunkSources.size());
- for (const ChunkedContentLookup::ChunkSequenceLocation& Source : ChunkSources)
- {
- if (SequenceIndexChunksLeftToWriteCounters[Source.SequenceIndex].load() > 0)
- {
- ChunkTargetPtrs.push_back(&Source);
- }
- }
- }
- return ChunkTargetPtrs;
- };
-
- uint64_t GetChunkWriteCount(std::span<const std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
- const ChunkedContentLookup& Lookup,
- uint32_t ChunkIndex)
- {
- uint64_t WriteCount = 0;
- std::span<const ChunkedContentLookup::ChunkSequenceLocation> ChunkSources = GetChunkSequenceLocations(Lookup, ChunkIndex);
- for (const ChunkedContentLookup::ChunkSequenceLocation& Source : ChunkSources)
- {
- if (SequenceIndexChunksLeftToWriteCounters[Source.SequenceIndex].load() > 0)
- {
- WriteCount++;
- }
- }
- return WriteCount;
- };
-
- void FinalizeChunkSequence(const std::filesystem::path& TargetFolder, const IoHash& SequenceRawHash)
- {
- ZEN_TRACE_CPU("FinalizeChunkSequence");
- ZEN_ASSERT_SLOW(!IsFile(GetFinalChunkedSequenceFileName(TargetFolder, SequenceRawHash)));
- std::error_code Ec;
- RenameFile(GetTempChunkedSequenceFileName(TargetFolder, SequenceRawHash),
- GetFinalChunkedSequenceFileName(TargetFolder, SequenceRawHash),
- Ec);
- if (Ec)
- {
- throw std::system_error(Ec);
- }
- }
-
- void FinalizeChunkSequences(const std::filesystem::path& TargetFolder,
- const ChunkedFolderContent& RemoteContent,
- std::span<const uint32_t> RemoteSequenceIndexes)
- {
- ZEN_TRACE_CPU("FinalizeChunkSequences");
- for (uint32_t SequenceIndex : RemoteSequenceIndexes)
- {
- FinalizeChunkSequence(TargetFolder, RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]);
- }
- }
-
- void VerifySequence(const std::filesystem::path& TargetFolder,
- const ChunkedFolderContent& RemoteContent,
- const ChunkedContentLookup& Lookup,
- uint32_t RemoteSequenceIndex)
- {
- ZEN_TRACE_CPU("VerifySequence");
- const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex];
- {
- ZEN_TRACE_CPU("HashSequence");
- const std::uint32_t RemotePathIndex = Lookup.SequenceIndexFirstPathIndex[RemoteSequenceIndex];
- const uint64_t ExpectedSize = RemoteContent.RawSizes[RemotePathIndex];
- IoBuffer VerifyBuffer = IoBufferBuilder::MakeFromFile(GetTempChunkedSequenceFileName(TargetFolder, SequenceRawHash));
- const uint64_t VerifySize = VerifyBuffer.GetSize();
- if (VerifySize != ExpectedSize)
- {
- throw std::runtime_error(fmt::format("Written chunk sequence {} size {} does not match expected size {}",
- SequenceRawHash,
- VerifySize,
- ExpectedSize));
- }
-
- const IoHash VerifyChunkHash = IoHash::HashBuffer(std::move(VerifyBuffer));
- if (VerifyChunkHash != SequenceRawHash)
- {
- throw std::runtime_error(
- fmt::format("Written chunk sequence {} hash does not match expected hash {}", VerifyChunkHash, SequenceRawHash));
- }
- }
- }
-
- void VerifyAndCompleteChunkSequencesAsync(const std::filesystem::path& TargetFolder,
- const ChunkedFolderContent& RemoteContent,
- const ChunkedContentLookup& Lookup,
- std::span<const uint32_t> RemoteSequenceIndexes,
- ParallelWork& Work,
- WorkerThreadPool& VerifyPool)
- {
- if (RemoteSequenceIndexes.empty())
- {
- return;
- }
- ZEN_TRACE_CPU("VerifyAndCompleteChunkSequence");
- for (uint32_t RemoteSequenceIndexOffset = 1; RemoteSequenceIndexOffset < RemoteSequenceIndexes.size(); RemoteSequenceIndexOffset++)
- {
- const uint32_t RemoteSequenceIndex = RemoteSequenceIndexes[RemoteSequenceIndexOffset];
- Work.ScheduleWork(VerifyPool, [&RemoteContent, &Lookup, TargetFolder, RemoteSequenceIndex](std::atomic<bool>&) {
- if (!AbortFlag)
- {
- ZEN_TRACE_CPU("VerifyAndCompleteChunkSequenceAsync");
- VerifySequence(TargetFolder, RemoteContent, Lookup, RemoteSequenceIndex);
- if (!AbortFlag)
- {
- const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex];
- FinalizeChunkSequence(TargetFolder, SequenceRawHash);
- }
- }
- });
- }
- const uint32_t RemoteSequenceIndex = RemoteSequenceIndexes[0];
-
- VerifySequence(TargetFolder, RemoteContent, Lookup, RemoteSequenceIndex);
- const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex];
- FinalizeChunkSequence(TargetFolder, SequenceRawHash);
- }
-
- bool CompleteSequenceChunk(uint32_t RemoteSequenceIndex, std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters)
- {
- uint32_t PreviousValue = SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].fetch_sub(1);
- ZEN_ASSERT(PreviousValue >= 1);
- ZEN_ASSERT(PreviousValue != (uint32_t)-1);
- return PreviousValue == 1;
- }
-
- std::vector<uint32_t> CompleteChunkTargets(const std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>& ChunkTargetPtrs,
- std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters)
- {
- ZEN_TRACE_CPU("CompleteChunkTargets");
-
- std::vector<uint32_t> CompletedSequenceIndexes;
- for (const ChunkedContentLookup::ChunkSequenceLocation* Location : ChunkTargetPtrs)
- {
- const uint32_t RemoteSequenceIndex = Location->SequenceIndex;
- if (CompleteSequenceChunk(RemoteSequenceIndex, SequenceIndexChunksLeftToWriteCounters))
- {
- CompletedSequenceIndexes.push_back(RemoteSequenceIndex);
- }
- }
- return CompletedSequenceIndexes;
- }
-
- void WriteSequenceChunk(const std::filesystem::path& TargetFolderPath,
- const ChunkedFolderContent& RemoteContent,
- BufferedWriteFileCache::Local& LocalWriter,
- const CompositeBuffer& Chunk,
- const uint32_t SequenceIndex,
- const uint64_t FileOffset,
- const uint32_t PathIndex,
- DiskStatistics& DiskStats)
- {
- ZEN_TRACE_CPU("WriteSequenceChunk");
-
- const uint64_t SequenceSize = RemoteContent.RawSizes[PathIndex];
-
- auto OpenFile = [&](BasicFile& File) {
- const std::filesystem::path FileName =
- GetTempChunkedSequenceFileName(TargetFolderPath, RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]);
- File.Open(FileName, BasicFile::Mode::kWrite);
- if (UseSparseFiles)
- {
- PrepareFileForScatteredWrite(File.Handle(), SequenceSize);
- }
- };
-
- const uint64_t ChunkSize = Chunk.GetSize();
- ZEN_ASSERT(FileOffset + ChunkSize <= SequenceSize);
- if (ChunkSize == SequenceSize)
- {
- BasicFile SingleChunkFile;
- OpenFile(SingleChunkFile);
-
- DiskStats.CurrentOpenFileCount++;
- auto _ = MakeGuard([&DiskStats]() { DiskStats.CurrentOpenFileCount--; });
- SingleChunkFile.Write(Chunk, FileOffset);
-
- DiskStats.WriteCount++;
- DiskStats.WriteByteCount += ChunkSize;
- }
- else
- {
- const uint64_t MaxWriterBufferSize = 256u * 1025u;
-
- BufferedWriteFileCache::Local::Writer* Writer = LocalWriter.GetWriter(SequenceIndex);
- if (Writer)
- {
- if ((!Writer->Writer) && (ChunkSize < MaxWriterBufferSize))
- {
- Writer->Writer = std::make_unique<BasicFileWriter>(*Writer->File, Min(SequenceSize, MaxWriterBufferSize));
- }
- Writer->Write(Chunk, FileOffset);
-
- DiskStats.WriteCount++;
- DiskStats.WriteByteCount += ChunkSize;
- }
- else
- {
- Writer = LocalWriter.PutWriter(SequenceIndex, std::make_unique<BufferedWriteFileCache::Local::Writer>());
-
- Writer->File = std::make_unique<BasicFile>();
- OpenFile(*Writer->File);
- if (ChunkSize < MaxWriterBufferSize)
- {
- Writer->Writer = std::make_unique<BasicFileWriter>(*Writer->File, Min(SequenceSize, MaxWriterBufferSize));
- }
- Writer->Write(Chunk, FileOffset);
-
- DiskStats.WriteCount++;
- DiskStats.WriteByteCount += ChunkSize;
- }
- }
- }
-
- struct BlockWriteOps
- {
- std::vector<CompositeBuffer> ChunkBuffers;
- struct WriteOpData
- {
- const ChunkedContentLookup::ChunkSequenceLocation* Target = nullptr;
- size_t ChunkBufferIndex = (size_t)-1;
- };
- std::vector<WriteOpData> WriteOps;
- };
-
- void WriteBlockChunkOps(const std::filesystem::path& CacheFolderPath,
- const ChunkedFolderContent& RemoteContent,
- const ChunkedContentLookup& Lookup,
- std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
- const BlockWriteOps& Ops,
- BufferedWriteFileCache& WriteCache,
- ParallelWork& Work,
- WorkerThreadPool& VerifyPool,
- DiskStatistics& DiskStats)
- {
- ZEN_TRACE_CPU("WriteBlockChunkOps");
-
- {
- BufferedWriteFileCache::Local LocalWriter(WriteCache);
- for (const BlockWriteOps::WriteOpData& WriteOp : Ops.WriteOps)
- {
- if (AbortFlag)
- {
- break;
- }
- const CompositeBuffer& Chunk = Ops.ChunkBuffers[WriteOp.ChunkBufferIndex];
- const uint32_t SequenceIndex = WriteOp.Target->SequenceIndex;
- ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[SequenceIndex].load() <=
- RemoteContent.ChunkedContent.ChunkCounts[SequenceIndex]);
- ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[SequenceIndex].load() > 0);
- const uint64_t FileOffset = WriteOp.Target->Offset;
- const uint32_t PathIndex = Lookup.SequenceIndexFirstPathIndex[SequenceIndex];
-
- WriteSequenceChunk(CacheFolderPath, RemoteContent, LocalWriter, Chunk, SequenceIndex, FileOffset, PathIndex, DiskStats);
- }
- }
- if (!AbortFlag)
- {
- // Write tracking, updating this must be done without any files open (BufferedWriteFileCache::Local)
- std::vector<uint32_t> CompletedChunkSequences;
- for (const BlockWriteOps::WriteOpData& WriteOp : Ops.WriteOps)
- {
- const uint32_t RemoteSequenceIndex = WriteOp.Target->SequenceIndex;
- if (CompleteSequenceChunk(RemoteSequenceIndex, SequenceIndexChunksLeftToWriteCounters))
- {
- CompletedChunkSequences.push_back(RemoteSequenceIndex);
- }
- }
- WriteCache.Close(CompletedChunkSequences);
- VerifyAndCompleteChunkSequencesAsync(CacheFolderPath, RemoteContent, Lookup, CompletedChunkSequences, Work, VerifyPool);
- }
- }
-
- IoBuffer MakeBufferMemoryBased(const CompositeBuffer& PartialBlockBuffer)
- {
- ZEN_TRACE_CPU("MakeBufferMemoryBased");
- IoBuffer BlockMemoryBuffer;
- std::span<const SharedBuffer> Segments = PartialBlockBuffer.GetSegments();
- if (Segments.size() == 1)
- {
- IoBufferFileReference FileRef = {};
- if (PartialBlockBuffer.GetSegments().front().AsIoBuffer().GetFileReference(FileRef))
- {
- BlockMemoryBuffer = UniqueBuffer::Alloc(FileRef.FileChunkSize).MoveToShared().AsIoBuffer();
- BasicFile Reader;
- Reader.Attach(FileRef.FileHandle);
- auto _ = MakeGuard([&Reader]() { Reader.Detach(); });
- MutableMemoryView ReadMem = BlockMemoryBuffer.GetMutableView();
- Reader.Read(ReadMem.GetData(), FileRef.FileChunkSize, FileRef.FileChunkOffset);
- return BlockMemoryBuffer;
- }
- else
- {
- return PartialBlockBuffer.GetSegments().front().AsIoBuffer();
- }
- }
- else
- {
- // Not a homogenous memory buffer, read all to memory
-
- BlockMemoryBuffer = UniqueBuffer::Alloc(PartialBlockBuffer.GetSize()).MoveToShared().AsIoBuffer();
- MutableMemoryView ReadMem = BlockMemoryBuffer.GetMutableView();
- for (const SharedBuffer& Segment : Segments)
- {
- IoBufferFileReference FileRef = {};
- if (Segment.AsIoBuffer().GetFileReference(FileRef))
- {
- BasicFile Reader;
- Reader.Attach(FileRef.FileHandle);
- Reader.Read(ReadMem.GetData(), FileRef.FileChunkSize, FileRef.FileChunkOffset);
- Reader.Detach();
- ReadMem = ReadMem.Mid(FileRef.FileChunkSize);
- }
- else
- {
- ReadMem = ReadMem.CopyFrom(Segment.AsIoBuffer().GetView());
- }
- }
- return BlockMemoryBuffer;
- }
- }
-
- bool GetBlockWriteOps(const ChunkedFolderContent& RemoteContent,
- const ChunkedContentLookup& Lookup,
- std::span<const IoHash> ChunkRawHashes,
- std::span<const uint32_t> ChunkCompressedLengths,
- std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
- std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags,
- const MemoryView BlockView,
- uint32_t FirstIncludedBlockChunkIndex,
- uint32_t LastIncludedBlockChunkIndex,
- BlockWriteOps& OutOps)
- {
- ZEN_TRACE_CPU("GetBlockWriteOps");
- uint32_t OffsetInBlock = 0;
- for (uint32_t ChunkBlockIndex = FirstIncludedBlockChunkIndex; ChunkBlockIndex <= LastIncludedBlockChunkIndex; ChunkBlockIndex++)
- {
- const uint32_t ChunkCompressedSize = ChunkCompressedLengths[ChunkBlockIndex];
- const IoHash& ChunkHash = ChunkRawHashes[ChunkBlockIndex];
- if (auto It = Lookup.ChunkHashToChunkIndex.find(ChunkHash); It != Lookup.ChunkHashToChunkIndex.end())
- {
- const uint32_t ChunkIndex = It->second;
- std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs =
- GetRemainingChunkTargets(SequenceIndexChunksLeftToWriteCounters, Lookup, ChunkIndex);
-
- if (!ChunkTargetPtrs.empty())
- {
- bool NeedsWrite = true;
- if (RemoteChunkIndexNeedsCopyFromSourceFlags[ChunkIndex].compare_exchange_strong(NeedsWrite, false))
- {
- MemoryView ChunkMemoryView = BlockView.Mid(OffsetInBlock, ChunkCompressedSize);
- IoHash VerifyChunkHash;
- uint64_t VerifyChunkSize;
- CompressedBuffer CompressedChunk =
- CompressedBuffer::FromCompressed(SharedBuffer::MakeView(ChunkMemoryView), VerifyChunkHash, VerifyChunkSize);
- ZEN_ASSERT(CompressedChunk);
- ZEN_ASSERT(VerifyChunkHash == ChunkHash);
- ZEN_ASSERT(VerifyChunkSize == RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex]);
-
- OodleCompressor ChunkCompressor;
- OodleCompressionLevel ChunkCompressionLevel;
- uint64_t ChunkBlockSize;
-
- bool GetCompressParametersSuccess =
- CompressedChunk.TryGetCompressParameters(ChunkCompressor, ChunkCompressionLevel, ChunkBlockSize);
- ZEN_ASSERT(GetCompressParametersSuccess);
-
- IoBuffer Decompressed;
- if (ChunkCompressionLevel == OodleCompressionLevel::None)
- {
- MemoryView ChunkDecompressedMemoryView = ChunkMemoryView.Mid(CompressedBuffer::GetHeaderSizeForNoneEncoder());
- Decompressed =
- IoBuffer(IoBuffer::Wrap, ChunkDecompressedMemoryView.GetData(), ChunkDecompressedMemoryView.GetSize());
- }
- else
- {
- Decompressed = CompressedChunk.Decompress().AsIoBuffer();
- }
- ZEN_ASSERT(Decompressed.GetSize() == RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex]);
- ZEN_ASSERT_SLOW(ChunkHash == IoHash::HashBuffer(Decompressed));
- for (const ChunkedContentLookup::ChunkSequenceLocation* Target : ChunkTargetPtrs)
- {
- OutOps.WriteOps.push_back(
- BlockWriteOps::WriteOpData{.Target = Target, .ChunkBufferIndex = OutOps.ChunkBuffers.size()});
- }
- OutOps.ChunkBuffers.emplace_back(std::move(Decompressed));
- }
- }
- }
-
- OffsetInBlock += ChunkCompressedSize;
- }
- {
- ZEN_TRACE_CPU("GetBlockWriteOps_sort");
- std::sort(OutOps.WriteOps.begin(),
- OutOps.WriteOps.end(),
- [](const BlockWriteOps::WriteOpData& Lhs, const BlockWriteOps::WriteOpData& Rhs) {
- if (Lhs.Target->SequenceIndex < Rhs.Target->SequenceIndex)
- {
- return true;
- }
- if (Lhs.Target->SequenceIndex > Rhs.Target->SequenceIndex)
- {
- return false;
- }
- return Lhs.Target->Offset < Rhs.Target->Offset;
- });
- }
- return true;
- }
-
- bool WriteBlockToDisk(const std::filesystem::path& CacheFolderPath,
- const ChunkedFolderContent& RemoteContent,
- const ChunkBlockDescription& BlockDescription,
- std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
- ParallelWork& Work,
- WorkerThreadPool& VerifyPool,
- CompositeBuffer&& BlockBuffer,
- const ChunkedContentLookup& Lookup,
- std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags,
- BufferedWriteFileCache& WriteCache,
- DiskStatistics& DiskStats)
- {
- ZEN_TRACE_CPU("WriteBlockToDisk");
-
- IoBuffer BlockMemoryBuffer = MakeBufferMemoryBased(BlockBuffer);
- const MemoryView BlockView = BlockMemoryBuffer.GetView();
-
- BlockWriteOps Ops;
- if ((BlockDescription.HeaderSize == 0) || BlockDescription.ChunkCompressedLengths.empty())
- {
- ZEN_TRACE_CPU("WriteBlockToDisk_Legacy");
-
- uint64_t HeaderSize;
- const std::vector<uint32_t> ChunkCompressedLengths =
- ReadChunkBlockHeader(BlockView.Mid(CompressedBuffer::GetHeaderSizeForNoneEncoder()), HeaderSize);
-
- if (GetBlockWriteOps(RemoteContent,
- Lookup,
- BlockDescription.ChunkRawHashes,
- ChunkCompressedLengths,
- SequenceIndexChunksLeftToWriteCounters,
- RemoteChunkIndexNeedsCopyFromSourceFlags,
- BlockView.Mid(CompressedBuffer::GetHeaderSizeForNoneEncoder() + HeaderSize),
- 0,
- gsl::narrow<uint32_t>(BlockDescription.ChunkRawHashes.size() - 1),
- Ops))
- {
- WriteBlockChunkOps(CacheFolderPath,
- RemoteContent,
- Lookup,
- SequenceIndexChunksLeftToWriteCounters,
- Ops,
- WriteCache,
- Work,
- VerifyPool,
- DiskStats);
- return true;
- }
- return false;
- }
-
- if (GetBlockWriteOps(RemoteContent,
- Lookup,
- BlockDescription.ChunkRawHashes,
- BlockDescription.ChunkCompressedLengths,
- SequenceIndexChunksLeftToWriteCounters,
- RemoteChunkIndexNeedsCopyFromSourceFlags,
- BlockView.Mid(CompressedBuffer::GetHeaderSizeForNoneEncoder() + BlockDescription.HeaderSize),
- 0,
- gsl::narrow<uint32_t>(BlockDescription.ChunkRawHashes.size() - 1),
- Ops))
- {
- WriteBlockChunkOps(CacheFolderPath,
- RemoteContent,
- Lookup,
- SequenceIndexChunksLeftToWriteCounters,
- Ops,
- WriteCache,
- Work,
- VerifyPool,
- DiskStats);
- return true;
- }
- return false;
- }
-
- bool WritePartialBlockToDisk(const std::filesystem::path& CacheFolderPath,
- const ChunkedFolderContent& RemoteContent,
- const ChunkBlockDescription& BlockDescription,
- std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
- ParallelWork& Work,
- WorkerThreadPool& VerifyPool,
- CompositeBuffer&& PartialBlockBuffer,
- uint32_t FirstIncludedBlockChunkIndex,
- uint32_t LastIncludedBlockChunkIndex,
- const ChunkedContentLookup& Lookup,
- std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags,
- BufferedWriteFileCache& WriteCache,
- DiskStatistics& DiskStats)
- {
- ZEN_TRACE_CPU("WritePartialBlockToDisk");
-
- IoBuffer BlockMemoryBuffer = MakeBufferMemoryBased(PartialBlockBuffer);
- const MemoryView BlockView = BlockMemoryBuffer.GetView();
-
- BlockWriteOps Ops;
- if (GetBlockWriteOps(RemoteContent,
- Lookup,
- BlockDescription.ChunkRawHashes,
- BlockDescription.ChunkCompressedLengths,
- SequenceIndexChunksLeftToWriteCounters,
- RemoteChunkIndexNeedsCopyFromSourceFlags,
- BlockView,
- FirstIncludedBlockChunkIndex,
- LastIncludedBlockChunkIndex,
- Ops))
- {
- WriteBlockChunkOps(CacheFolderPath,
- RemoteContent,
- Lookup,
- SequenceIndexChunksLeftToWriteCounters,
- Ops,
- WriteCache,
- Work,
- VerifyPool,
- DiskStats);
- return true;
- }
- else
- {
- return false;
- }
- }
-
- bool IsSingleFileChunk(const ChunkedFolderContent& RemoteContent,
- const std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> Locations)
- {
- if (Locations.size() == 1)
- {
- const uint32_t FirstSequenceIndex = Locations[0]->SequenceIndex;
- if (RemoteContent.ChunkedContent.ChunkCounts[FirstSequenceIndex] == 1)
- {
- ZEN_ASSERT_SLOW(Locations[0]->Offset == 0);
- return true;
- }
- }
- return false;
- }
-
- void StreamDecompress(const std::filesystem::path& CacheFolderPath,
- const IoHash& SequenceRawHash,
- CompositeBuffer&& CompressedPart,
- DiskStatistics& DiskStats)
- {
- ZEN_TRACE_CPU("StreamDecompress");
- const std::filesystem::path TempChunkSequenceFileName = GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash);
- TemporaryFile DecompressedTemp;
- std::error_code Ec;
- DecompressedTemp.CreateTemporary(TempChunkSequenceFileName.parent_path(), Ec);
- if (Ec)
- {
- throw std::runtime_error(
- fmt::format("Failed creating temporary file for decompressing large blob {}. Reason: {}", SequenceRawHash, Ec.message()));
- }
- IoHash RawHash;
- uint64_t RawSize;
- CompressedBuffer Compressed = CompressedBuffer::FromCompressed(CompressedPart, RawHash, RawSize);
- if (!Compressed)
- {
- throw std::runtime_error(fmt::format("Failed to parse header of compressed large blob {}", SequenceRawHash));
- }
- if (RawHash != SequenceRawHash)
- {
- throw std::runtime_error(fmt::format("RawHash in header {} in large blob {} does match.", RawHash, SequenceRawHash));
- }
- PrepareFileForScatteredWrite(DecompressedTemp.Handle(), RawSize);
-
- IoHashStream Hash;
- bool CouldDecompress = Compressed.DecompressToStream(
- 0,
- (uint64_t)-1,
- [&](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& RangeBuffer) {
- ZEN_UNUSED(SourceOffset);
- ZEN_TRACE_CPU("StreamDecompress_Write");
- DiskStats.ReadByteCount += SourceSize;
- if (!AbortFlag)
- {
- for (const SharedBuffer& Segment : RangeBuffer.GetSegments())
- {
- Hash.Append(Segment.GetView());
- DecompressedTemp.Write(Segment, Offset);
- Offset += Segment.GetSize();
- DiskStats.WriteByteCount += Segment.GetSize();
- DiskStats.WriteCount++;
- }
- return true;
- }
- return false;
- });
-
- if (AbortFlag)
- {
- return;
- }
-
- if (!CouldDecompress)
- {
- throw std::runtime_error(fmt::format("Failed to decompress large blob {}", SequenceRawHash));
- }
- const IoHash VerifyHash = Hash.GetHash();
- if (VerifyHash != SequenceRawHash)
- {
- throw std::runtime_error(
- fmt::format("Decompressed blob payload hash {} does not match expected hash {}", VerifyHash, SequenceRawHash));
- }
- DecompressedTemp.MoveTemporaryIntoPlace(TempChunkSequenceFileName, Ec);
- if (Ec)
- {
- throw std::runtime_error(
- fmt::format("Failed moving temporary file for decompressing large blob {}. Reason: {}", SequenceRawHash, Ec.message()));
- }
- // WriteChunkStats.ChunkCountWritten++;
- }
-
- bool WriteCompressedChunk(const std::filesystem::path& TargetFolder,
- const ChunkedFolderContent& RemoteContent,
- const ChunkedContentLookup& RemoteLookup,
- const IoHash& ChunkHash,
- const std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>& ChunkTargetPtrs,
- BufferedWriteFileCache& WriteCache,
- IoBuffer&& CompressedPart,
- DiskStatistics& DiskStats)
- {
- ZEN_TRACE_CPU("WriteCompressedChunk");
- auto ChunkHashToChunkIndexIt = RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash);
- ZEN_ASSERT(ChunkHashToChunkIndexIt != RemoteLookup.ChunkHashToChunkIndex.end());
- if (IsSingleFileChunk(RemoteContent, ChunkTargetPtrs))
- {
- const std::uint32_t SequenceIndex = ChunkTargetPtrs.front()->SequenceIndex;
- const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex];
- StreamDecompress(TargetFolder, SequenceRawHash, CompositeBuffer(std::move(CompressedPart)), DiskStats);
- return false;
- }
- else
- {
- IoHash RawHash;
- uint64_t RawSize;
- CompressedBuffer Compressed = CompressedBuffer::FromCompressed(CompositeBuffer(std::move(CompressedPart)), RawHash, RawSize);
- if (!Compressed)
- {
- throw std::runtime_error(fmt::format("Failed to parse header of compressed large blob {}", ChunkHash));
- }
- if (RawHash != ChunkHash)
- {
- throw std::runtime_error(fmt::format("RawHash in header {} in large blob {} does match.", RawHash, ChunkHash));
- }
-
- BufferedWriteFileCache::Local LocalWriter(WriteCache);
-
- IoHashStream Hash;
- bool CouldDecompress = Compressed.DecompressToStream(
- 0,
- (uint64_t)-1,
- [&](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& RangeBuffer) {
- ZEN_UNUSED(SourceOffset);
- ZEN_TRACE_CPU("StreamDecompress_Write");
- DiskStats.ReadByteCount += SourceSize;
- if (!AbortFlag)
- {
- for (const ChunkedContentLookup::ChunkSequenceLocation* TargetPtr : ChunkTargetPtrs)
- {
- const auto& Target = *TargetPtr;
- const uint64_t FileOffset = Target.Offset + Offset;
- const uint32_t SequenceIndex = Target.SequenceIndex;
- const uint32_t PathIndex = RemoteLookup.SequenceIndexFirstPathIndex[SequenceIndex];
-
- WriteSequenceChunk(TargetFolder,
- RemoteContent,
- LocalWriter,
- RangeBuffer,
- SequenceIndex,
- FileOffset,
- PathIndex,
- DiskStats);
- }
-
- return true;
- }
- return false;
- });
-
- if (AbortFlag)
- {
- return false;
- }
-
- if (!CouldDecompress)
- {
- throw std::runtime_error(fmt::format("Failed to decompress large chunk {}", ChunkHash));
- }
-
- return true;
- }
- }
-
- void AsyncWriteDownloadedChunk(const std::filesystem::path& ZenFolderPath,
- const ChunkedFolderContent& RemoteContent,
- const ChunkedContentLookup& RemoteLookup,
- uint32_t RemoteChunkIndex,
- std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>&& ChunkTargetPtrs,
- BufferedWriteFileCache& WriteCache,
- ParallelWork& Work,
- WorkerThreadPool& WritePool,
- IoBuffer&& Payload,
- std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
- std::atomic<uint64_t>& WritePartsComplete,
- const uint64_t TotalPartWriteCount,
- FilteredRate& FilteredWrittenBytesPerSecond,
- DiskStatistics& DiskStats)
- {
- ZEN_TRACE_CPU("AsyncWriteDownloadedChunk");
-
- const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex];
-
- const uint64_t Size = Payload.GetSize();
-
- std::filesystem::path CompressedChunkPath;
-
- // Check if the dowloaded chunk is file based and we can move it directly without rewriting it
- {
- IoBufferFileReference FileRef;
- if (Payload.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) && (FileRef.FileChunkSize == Size))
- {
- ZEN_TRACE_CPU("MoveTempChunk");
- std::error_code Ec;
- std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec);
- if (!Ec)
- {
- Payload.SetDeleteOnClose(false);
- Payload = {};
- CompressedChunkPath = ZenTempDownloadFolderPath(ZenFolderPath) / ChunkHash.ToHexString();
- RenameFile(TempBlobPath, CompressedChunkPath, Ec);
- if (Ec)
- {
- CompressedChunkPath = std::filesystem::path{};
-
- // Re-open the temp file again
- BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete);
- Payload = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, Size, true);
- Payload.SetDeleteOnClose(true);
- }
- }
- }
- }
-
- if (CompressedChunkPath.empty() && (Size > 512u * 1024u))
- {
- ZEN_TRACE_CPU("WriteTempChunk");
- // Could not be moved and rather large, lets store it on disk
- CompressedChunkPath = ZenTempDownloadFolderPath(ZenFolderPath) / ChunkHash.ToHexString();
- TemporaryFile::SafeWriteFile(CompressedChunkPath, Payload);
- Payload = {};
- }
-
- Work.ScheduleWork(
- WritePool,
- [&ZenFolderPath,
- &RemoteContent,
- &RemoteLookup,
- SequenceIndexChunksLeftToWriteCounters,
- &Work,
- &WritePool,
- CompressedChunkPath,
- RemoteChunkIndex,
- TotalPartWriteCount,
- &WriteCache,
- &DiskStats,
- &WritePartsComplete,
- &FilteredWrittenBytesPerSecond,
- ChunkTargetPtrs = std::move(ChunkTargetPtrs),
- CompressedPart = std::move(Payload)](std::atomic<bool>&) mutable {
- ZEN_TRACE_CPU("UpdateFolder_WriteChunk");
-
- if (!AbortFlag)
- {
- FilteredWrittenBytesPerSecond.Start();
-
- const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex];
- if (CompressedChunkPath.empty())
- {
- ZEN_ASSERT(CompressedPart);
- }
- else
- {
- ZEN_ASSERT(!CompressedPart);
- CompressedPart = IoBufferBuilder::MakeFromFile(CompressedChunkPath);
- if (!CompressedPart)
- {
- throw std::runtime_error(
- fmt::format("Could not open dowloaded compressed chunk {} from {}", ChunkHash, CompressedChunkPath));
- }
- }
-
- std::filesystem::path TargetFolder = ZenTempCacheFolderPath(ZenFolderPath);
-
- bool NeedHashVerify = WriteCompressedChunk(TargetFolder,
- RemoteContent,
- RemoteLookup,
- ChunkHash,
- ChunkTargetPtrs,
- WriteCache,
- std::move(CompressedPart),
- DiskStats);
- if (!AbortFlag)
- {
- WritePartsComplete++;
- if (WritePartsComplete == TotalPartWriteCount)
- {
- FilteredWrittenBytesPerSecond.Stop();
- }
-
- if (!CompressedChunkPath.empty())
- {
- TryRemoveFile(CompressedChunkPath);
- }
-
- std::vector<uint32_t> CompletedSequences =
- CompleteChunkTargets(ChunkTargetPtrs, SequenceIndexChunksLeftToWriteCounters);
- WriteCache.Close(CompletedSequences);
- if (NeedHashVerify)
- {
- VerifyAndCompleteChunkSequencesAsync(TargetFolder,
- RemoteContent,
- RemoteLookup,
- CompletedSequences,
- Work,
- WritePool);
- }
- else
- {
- FinalizeChunkSequences(TargetFolder, RemoteContent, CompletedSequences);
- }
- }
- }
- });
- };
-
+ // TODO: Move out of this file and make it one with implementation in buildstorageoperations.cpp
bool ReadStateFile(const std::filesystem::path& StateFilePath,
FolderContent& OutLocalFolderState,
ChunkedFolderContent& OutLocalContent)
@@ -6008,2859 +4528,6 @@ namespace {
return Result;
}
- enum EPartialBlockRequestMode
- {
- Off,
- ZenCacheOnly,
- Mixed,
- All,
- Invalid
- };
- static EPartialBlockRequestMode PartialBlockRequestModeFromString(const std::string_view ModeString)
- {
- switch (HashStringAsLowerDjb2(ModeString))
- {
- case HashStringDjb2("false"):
- return EPartialBlockRequestMode::Off;
- case HashStringDjb2("zencacheonly"):
- return EPartialBlockRequestMode::ZenCacheOnly;
- case HashStringDjb2("mixed"):
- return EPartialBlockRequestMode::Mixed;
- case HashStringDjb2("true"):
- return EPartialBlockRequestMode::All;
- default:
- return EPartialBlockRequestMode::Invalid;
- }
- }
-
- struct UpdateOptions
- {
- std::filesystem::path SystemRootDir;
- std::filesystem::path ZenFolderPath;
- std::uint64_t LargeAttachmentSize = DefaultPreferredMultipartChunkSize * 4u;
- std::uint64_t PreferredMultipartChunkSize = DefaultPreferredMultipartChunkSize;
- EPartialBlockRequestMode PartialBlockRequestMode = EPartialBlockRequestMode::Mixed;
- bool WipeTargetFolder = false;
- bool PrimeCacheOnly = false;
- bool EnableOtherDownloadsScavenging = true;
- bool EnableTargetFolderScavenging = true;
- };
-
- void UpdateFolder(StorageInstance& Storage,
- const Oid& BuildId,
- const std::filesystem::path& Path,
- const ChunkedFolderContent& LocalContent,
- const ChunkedFolderContent& RemoteContent,
- const std::vector<ChunkBlockDescription>& BlockDescriptions,
- const std::vector<IoHash>& LooseChunkHashes,
- const UpdateOptions& Options,
- FolderContent& OutLocalFolderState,
- DiskStatistics& DiskStats,
- CacheMappingStatistics& CacheMappingStats,
- DownloadStatistics& DownloadStats,
- WriteChunkStatistics& WriteChunkStats,
- RebuildFolderStateStatistics& RebuildFolderStateStats)
- {
- ZEN_TRACE_CPU("UpdateFolder");
-
- ZEN_ASSERT((!Options.PrimeCacheOnly) ||
- (Options.PrimeCacheOnly && (Options.PartialBlockRequestMode == EPartialBlockRequestMode::Off)));
-
- Stopwatch IndexTimer;
-
- const ChunkedContentLookup LocalLookup = BuildChunkedContentLookup(LocalContent);
-
- const ChunkedContentLookup RemoteLookup = BuildChunkedContentLookup(RemoteContent);
-
- if (!IsQuiet)
- {
- ZEN_CONSOLE("Indexed local and remote content in {}", NiceTimeSpanMs(IndexTimer.GetElapsedTimeMs()));
- }
-
- const std::filesystem::path CacheFolderPath = ZenTempCacheFolderPath(Options.ZenFolderPath);
-
- Stopwatch CacheMappingTimer;
-
- std::vector<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters(RemoteContent.ChunkedContent.SequenceRawHashes.size());
- std::vector<bool> RemoteChunkIndexNeedsCopyFromLocalFileFlags(RemoteContent.ChunkedContent.ChunkHashes.size());
- std::vector<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags(RemoteContent.ChunkedContent.ChunkHashes.size());
-
- tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> CachedChunkHashesFound;
- tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> CachedSequenceHashesFound;
- if (!Options.PrimeCacheOnly)
- {
- ZEN_TRACE_CPU("UpdateFolder_CheckChunkCache");
-
- Stopwatch CacheTimer;
-
- DirectoryContent CacheDirContent;
- GetDirectoryContent(CacheFolderPath,
- DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::IncludeFileSizes,
- CacheDirContent);
- for (size_t Index = 0; Index < CacheDirContent.Files.size(); Index++)
- {
- if (Options.EnableTargetFolderScavenging)
- {
- IoHash FileHash;
- if (IoHash::TryParse(CacheDirContent.Files[Index].filename().string(), FileHash))
- {
- if (auto ChunkIt = RemoteLookup.ChunkHashToChunkIndex.find(FileHash);
- ChunkIt != RemoteLookup.ChunkHashToChunkIndex.end())
- {
- const uint32_t ChunkIndex = ChunkIt->second;
- const uint64_t ChunkSize = RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex];
- if (ChunkSize == CacheDirContent.FileSizes[Index])
- {
- CachedChunkHashesFound.insert({FileHash, ChunkIndex});
- CacheMappingStats.CacheChunkCount++;
- CacheMappingStats.CacheChunkByteCount += ChunkSize;
- continue;
- }
- }
- else if (auto SequenceIt = RemoteLookup.RawHashToSequenceIndex.find(FileHash);
- SequenceIt != RemoteLookup.RawHashToSequenceIndex.end())
- {
- const uint32_t SequenceIndex = SequenceIt->second;
- const uint32_t PathIndex = RemoteLookup.SequenceIndexFirstPathIndex[SequenceIndex];
- const uint64_t SequenceSize = RemoteContent.RawSizes[PathIndex];
- if (SequenceSize == CacheDirContent.FileSizes[Index])
- {
- CachedSequenceHashesFound.insert({FileHash, SequenceIndex});
- CacheMappingStats.CacheSequenceHashesCount++;
- CacheMappingStats.CacheSequenceHashesByteCount += SequenceSize;
-
- const std::filesystem::path CacheFilePath =
- GetFinalChunkedSequenceFileName(CacheFolderPath,
- RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]);
- ZEN_ASSERT_SLOW(IsFile(CacheFilePath));
-
- continue;
- }
- }
- }
- }
- TryRemoveFile(CacheDirContent.Files[Index]);
- }
- CacheMappingStats.CacheScanElapsedWallTimeUs += CacheTimer.GetElapsedTimeUs();
- }
-
- tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> CachedBlocksFound;
- if (!Options.PrimeCacheOnly)
- {
- ZEN_TRACE_CPU("UpdateFolder_CheckBlockCache");
-
- Stopwatch CacheTimer;
-
- tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> AllBlockSizes;
- AllBlockSizes.reserve(BlockDescriptions.size());
- for (uint32_t BlockIndex = 0; BlockIndex < BlockDescriptions.size(); BlockIndex++)
- {
- const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex];
- AllBlockSizes.insert({BlockDescription.BlockHash, BlockIndex});
- }
-
- DirectoryContent BlockDirContent;
- GetDirectoryContent(ZenTempBlockFolderPath(Options.ZenFolderPath),
- DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::IncludeFileSizes,
- BlockDirContent);
- CachedBlocksFound.reserve(BlockDirContent.Files.size());
- for (size_t Index = 0; Index < BlockDirContent.Files.size(); Index++)
- {
- if (Options.EnableTargetFolderScavenging)
- {
- IoHash FileHash;
- if (IoHash::TryParse(BlockDirContent.Files[Index].filename().string(), FileHash))
- {
- if (auto BlockIt = AllBlockSizes.find(FileHash); BlockIt != AllBlockSizes.end())
- {
- const uint32_t BlockIndex = BlockIt->second;
- const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex];
- uint64_t BlockSize = CompressedBuffer::GetHeaderSizeForNoneEncoder() + BlockDescription.HeaderSize;
- for (uint64_t ChunkSize : BlockDescription.ChunkCompressedLengths)
- {
- BlockSize += ChunkSize;
- }
-
- if (BlockSize == BlockDirContent.FileSizes[Index])
- {
- CachedBlocksFound.insert({FileHash, BlockIndex});
- CacheMappingStats.CacheBlockCount++;
- CacheMappingStats.CacheBlocksByteCount += BlockSize;
- continue;
- }
- }
- }
- }
- TryRemoveFile(BlockDirContent.Files[Index]);
- }
-
- CacheMappingStats.CacheScanElapsedWallTimeUs += CacheTimer.GetElapsedTimeUs();
- }
-
- std::vector<uint32_t> LocalPathIndexesMatchingSequenceIndexes;
- tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> SequenceIndexesLeftToFindToRemoteIndex;
-
- if (!Options.PrimeCacheOnly && Options.EnableTargetFolderScavenging)
- {
- // Pick up all whole files we can use from current local state
- ZEN_TRACE_CPU("UpdateFolder_GetLocalSequences");
-
- Stopwatch LocalTimer;
-
- for (uint32_t RemoteSequenceIndex = 0; RemoteSequenceIndex < RemoteContent.ChunkedContent.SequenceRawHashes.size();
- RemoteSequenceIndex++)
- {
- const IoHash& RemoteSequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex];
- const uint32_t RemotePathIndex = GetFirstPathIndexForSeqeuenceIndex(RemoteLookup, RemoteSequenceIndex);
- const uint64_t RemoteRawSize = RemoteContent.RawSizes[RemotePathIndex];
- if (auto CacheSequenceIt = CachedSequenceHashesFound.find(RemoteSequenceRawHash);
- CacheSequenceIt != CachedSequenceHashesFound.end())
- {
- const std::filesystem::path CacheFilePath = GetFinalChunkedSequenceFileName(CacheFolderPath, RemoteSequenceRawHash);
- ZEN_ASSERT_SLOW(IsFile(CacheFilePath));
- ZEN_CONSOLE_VERBOSE("Found sequence {} at {} ({})", RemoteSequenceRawHash, CacheFilePath, NiceBytes(RemoteRawSize));
- }
- else if (auto CacheChunkIt = CachedChunkHashesFound.find(RemoteSequenceRawHash);
- CacheChunkIt != CachedChunkHashesFound.end())
- {
- const std::filesystem::path CacheFilePath = GetFinalChunkedSequenceFileName(CacheFolderPath, RemoteSequenceRawHash);
- ZEN_ASSERT_SLOW(IsFile(CacheFilePath));
- ZEN_CONSOLE_VERBOSE("Found chunk {} at {} ({})", RemoteSequenceRawHash, CacheFilePath, NiceBytes(RemoteRawSize));
- }
- else if (auto It = LocalLookup.RawHashToSequenceIndex.find(RemoteSequenceRawHash);
- It != LocalLookup.RawHashToSequenceIndex.end())
- {
- const uint32_t LocalSequenceIndex = It->second;
- const uint32_t LocalPathIndex = GetFirstPathIndexForSeqeuenceIndex(LocalLookup, LocalSequenceIndex);
- const std::filesystem::path LocalFilePath = (Path / LocalContent.Paths[LocalPathIndex]).make_preferred();
- ZEN_ASSERT_SLOW(IsFile(LocalFilePath));
- LocalPathIndexesMatchingSequenceIndexes.push_back(LocalPathIndex);
- CacheMappingStats.LocalPathsMatchingSequencesCount++;
- CacheMappingStats.LocalPathsMatchingSequencesByteCount += RemoteRawSize;
- ZEN_CONSOLE_VERBOSE("Found sequence {} at {} ({})", RemoteSequenceRawHash, LocalFilePath, NiceBytes(RemoteRawSize));
- }
- else
- {
- // We must write the sequence
- const uint32_t ChunkCount = RemoteContent.ChunkedContent.ChunkCounts[RemoteSequenceIndex];
- SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex] = ChunkCount;
- SequenceIndexesLeftToFindToRemoteIndex.insert({RemoteSequenceRawHash, RemoteSequenceIndex});
- }
- }
-
- CacheMappingStats.LocalScanElapsedWallTimeUs += LocalTimer.GetElapsedTimeUs();
- }
- else
- {
- for (uint32_t RemoteSequenceIndex = 0; RemoteSequenceIndex < RemoteContent.ChunkedContent.SequenceRawHashes.size();
- RemoteSequenceIndex++)
- {
- const uint32_t ChunkCount = RemoteContent.ChunkedContent.ChunkCounts[RemoteSequenceIndex];
- SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex] = ChunkCount;
- }
- }
-
- std::vector<ChunkedFolderContent> ScavengedContents;
- std::vector<ChunkedContentLookup> ScavengedLookups;
- std::vector<std::filesystem::path> ScavengedPaths;
-
- struct ScavengeCopyOperation
- {
- uint32_t ScavengedContentIndex = (uint32_t)-1;
- uint32_t ScavengedPathIndex = (uint32_t)-1;
- uint32_t RemoteSequenceIndex = (uint32_t)-1;
- uint64_t RawSize = (uint32_t)-1;
- };
-
- std::vector<ScavengeCopyOperation> ScavengeCopyOperations;
- uint64_t ScavengedPathsCount = 0;
-
- if (!Options.PrimeCacheOnly && Options.EnableOtherDownloadsScavenging)
- {
- ZEN_TRACE_CPU("UpdateFolder_GetScavengedSequences");
-
- Stopwatch ScavengeTimer;
-
- if (!SequenceIndexesLeftToFindToRemoteIndex.empty())
- {
- std::vector<ScavengeSource> ScavengeSources = GetDownloadedStatePaths(Options.SystemRootDir);
- auto EraseIt = std::remove_if(ScavengeSources.begin(), ScavengeSources.end(), [&Path](const ScavengeSource& Source) {
- return Source.Path == Path;
- });
- ScavengeSources.erase(EraseIt, ScavengeSources.end());
-
- const size_t ScavengePathCount = ScavengeSources.size();
-
- ScavengedContents.resize(ScavengePathCount);
- ScavengedLookups.resize(ScavengePathCount);
- ScavengedPaths.resize(ScavengePathCount);
-
- ProgressBar ScavengeProgressBar(ProgressMode, "Scavenging");
- ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
-
- std::atomic<uint64_t> PathsFound(0);
- std::atomic<uint64_t> ChunksFound(0);
- std::atomic<uint64_t> PathsScavenged(0);
-
- for (size_t ScavengeIndex = 0; ScavengeIndex < ScavengePathCount; ScavengeIndex++)
- {
- Work.ScheduleWork(
- GetIOWorkerPool(),
- [&RemoteLookup,
- &ScavengeSources,
- &ScavengedContents,
- &ScavengedPaths,
- &ScavengedLookups,
- &PathsFound,
- &ChunksFound,
- &PathsScavenged,
- ScavengeIndex](std::atomic<bool>&) {
- if (!AbortFlag)
- {
- const ScavengeSource& Source = ScavengeSources[ScavengeIndex];
-
- ChunkedFolderContent& ScavengedLocalContent = ScavengedContents[ScavengeIndex];
- std::filesystem::path& ScavengePath = ScavengedPaths[ScavengeIndex];
-
- FolderContent LocalFolderState;
- if (ReadStateFile(Source.StateFilePath, LocalFolderState, ScavengedLocalContent))
- {
- if (IsDir(Source.Path))
- {
- ChunkedContentLookup& ScavengedLookup = ScavengedLookups[ScavengeIndex];
- ScavengedLookup = BuildChunkedContentLookup(ScavengedLocalContent);
-
- std::vector<uint32_t> PathIndexesToScavange;
- uint32_t ScavengedStatePathCount = gsl::narrow<uint32_t>(ScavengedLocalContent.Paths.size());
- PathIndexesToScavange.reserve(ScavengedStatePathCount);
- for (uint32_t ScavengedStatePathIndex = 0; ScavengedStatePathIndex < ScavengedStatePathCount;
- ScavengedStatePathIndex++)
- {
- const IoHash& SequenceHash = ScavengedLocalContent.RawHashes[ScavengedStatePathIndex];
- if (auto ScavengeSequenceIt = ScavengedLookup.RawHashToSequenceIndex.find(SequenceHash);
- ScavengeSequenceIt != ScavengedLookup.RawHashToSequenceIndex.end())
- {
- const uint32_t ScavengeSequenceIndex = ScavengeSequenceIt->second;
- if (RemoteLookup.RawHashToSequenceIndex.contains(SequenceHash))
- {
- PathIndexesToScavange.push_back(ScavengedStatePathIndex);
- }
- else
- {
- const uint32_t ScavengeChunkCount =
- ScavengedLocalContent.ChunkedContent.ChunkCounts[ScavengeSequenceIndex];
- for (uint32_t ScavengeChunkIndexOffset = 0;
- ScavengeChunkIndexOffset < ScavengeChunkCount;
- ScavengeChunkIndexOffset++)
- {
- const size_t ScavengeChunkOrderIndex =
- ScavengedLookup.ChunkSequenceLocationOffset[ScavengeSequenceIndex] +
- ScavengeChunkIndexOffset;
- const uint32_t ScavengeChunkIndex =
- ScavengedLocalContent.ChunkedContent.ChunkOrders[ScavengeChunkOrderIndex];
- const IoHash& ScavengeChunkHash =
- ScavengedLocalContent.ChunkedContent.ChunkHashes[ScavengeChunkIndex];
- if (RemoteLookup.ChunkHashToChunkIndex.contains(ScavengeChunkHash))
- {
- PathIndexesToScavange.push_back(ScavengedStatePathIndex);
- break;
- }
- }
- }
- }
- }
-
- if (!PathIndexesToScavange.empty())
- {
- std::vector<std::filesystem::path> PathsToScavenge;
- PathsToScavenge.reserve(PathIndexesToScavange.size());
- for (uint32_t ScavengedStatePathIndex : PathIndexesToScavange)
- {
- PathsToScavenge.push_back(ScavengedLocalContent.Paths[ScavengedStatePathIndex]);
- }
-
- GetFolderContentStatistics ScavengedFolderScanStats;
-
- FolderContent ValidFolderContent =
- GetValidFolderContent(ScavengedFolderScanStats, Source.Path, PathsToScavenge, {});
-
- if (!LocalFolderState.AreKnownFilesEqual(ValidFolderContent))
- {
- std::vector<std::filesystem::path> DeletedPaths;
- FolderContent UpdatedContent =
- GetUpdatedContent(LocalFolderState, ValidFolderContent, DeletedPaths);
-
- // If the files are modified since the state was saved we ignore the files since we don't
- // want to incur the cost of scanning/hashing scavenged files
- DeletedPaths.insert(DeletedPaths.end(),
- UpdatedContent.Paths.begin(),
- UpdatedContent.Paths.end());
- if (!DeletedPaths.empty())
- {
- ScavengedLocalContent =
- DeletePathsFromChunkedContent(ScavengedLocalContent, ScavengedLookup, DeletedPaths);
- ScavengedLookup = BuildChunkedContentLookup(ScavengedLocalContent);
- }
- }
-
- if (!ScavengedLocalContent.Paths.empty())
- {
- ScavengePath = Source.Path;
- PathsFound += ScavengedLocalContent.Paths.size();
- ChunksFound += ScavengedLocalContent.ChunkedContent.ChunkHashes.size();
- }
- }
-
- if (ScavengePath.empty())
- {
- ScavengedLocalContent = {};
- ScavengedLookups[ScavengeIndex] = {};
- ScavengedPaths[ScavengeIndex].clear();
- }
- }
- }
- PathsScavenged++;
- }
- });
- }
- {
- ZEN_TRACE_CPU("ScavengeScan_Wait");
-
- Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) {
- ZEN_UNUSED(PendingWork);
- std::string Details = fmt::format("{}/{} scanned. {} paths and {} chunks found for scavenging",
- PathsScavenged.load(),
- ScavengePathCount,
- PathsFound.load(),
- ChunksFound.load());
- ScavengeProgressBar.UpdateState({.Task = "Scavenging ",
- .Details = Details,
- .TotalCount = ScavengePathCount,
- .RemainingCount = ScavengePathCount - PathsScavenged.load(),
- .Status = ProgressBar::State::CalculateStatus(IsAborted, IsPaused)},
- false);
- });
- }
-
- ScavengeProgressBar.Finish();
- if (AbortFlag)
- {
- return;
- }
-
- for (uint32_t ScavengedContentIndex = 0;
- ScavengedContentIndex < ScavengedContents.size() && (!SequenceIndexesLeftToFindToRemoteIndex.empty());
- ScavengedContentIndex++)
- {
- const std::filesystem::path& ScavengePath = ScavengedPaths[ScavengedContentIndex];
- if (!ScavengePath.empty())
- {
- const ChunkedFolderContent& ScavengedLocalContent = ScavengedContents[ScavengedContentIndex];
- const ChunkedContentLookup& ScavengedLookup = ScavengedLookups[ScavengedContentIndex];
-
- for (uint32_t ScavengedSequenceIndex = 0;
- ScavengedSequenceIndex < ScavengedLocalContent.ChunkedContent.SequenceRawHashes.size();
- ScavengedSequenceIndex++)
- {
- const IoHash& SequenceRawHash = ScavengedLocalContent.ChunkedContent.SequenceRawHashes[ScavengedSequenceIndex];
- if (auto It = SequenceIndexesLeftToFindToRemoteIndex.find(SequenceRawHash);
- It != SequenceIndexesLeftToFindToRemoteIndex.end())
- {
- const uint32_t RemoteSequenceIndex = It->second;
- const uint64_t RawSize =
- RemoteContent.RawSizes[RemoteLookup.SequenceIndexFirstPathIndex[RemoteSequenceIndex]];
- ZEN_ASSERT(RawSize > 0);
-
- const uint32_t ScavengedPathIndex = ScavengedLookup.SequenceIndexFirstPathIndex[ScavengedSequenceIndex];
- ZEN_ASSERT_SLOW(IsFile((ScavengePath / ScavengedLocalContent.Paths[ScavengedPathIndex]).make_preferred()));
-
- ScavengeCopyOperations.push_back({.ScavengedContentIndex = ScavengedContentIndex,
- .ScavengedPathIndex = ScavengedPathIndex,
- .RemoteSequenceIndex = RemoteSequenceIndex,
- .RawSize = RawSize});
-
- SequenceIndexesLeftToFindToRemoteIndex.erase(SequenceRawHash);
- SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex] = 0;
-
- CacheMappingStats.ScavengedPathsMatchingSequencesCount++;
- CacheMappingStats.ScavengedPathsMatchingSequencesByteCount += RawSize;
- }
- }
- ScavengedPathsCount++;
- }
- }
- }
- CacheMappingStats.ScavengeElapsedWallTimeUs += ScavengeTimer.GetElapsedTimeUs();
- }
-
- uint32_t RemainingChunkCount = 0;
- for (uint32_t RemoteChunkIndex = 0; RemoteChunkIndex < RemoteContent.ChunkedContent.ChunkHashes.size(); RemoteChunkIndex++)
- {
- uint64_t ChunkWriteCount = GetChunkWriteCount(SequenceIndexChunksLeftToWriteCounters, RemoteLookup, RemoteChunkIndex);
- if (ChunkWriteCount > 0)
- {
- RemainingChunkCount++;
- }
- }
-
- // Pick up all chunks in current local state
- // TODO: Rename to LocalStateCopyData
- struct CacheCopyData
- {
- uint32_t ScavengeSourceIndex = (uint32_t)-1;
- uint32_t SourceSequenceIndex = (uint32_t)-1;
- std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> TargetChunkLocationPtrs;
- struct ChunkTarget
- {
- uint32_t TargetChunkLocationCount = (uint32_t)-1;
- uint32_t RemoteChunkIndex = (uint32_t)-1;
- uint64_t CacheFileOffset = (uint64_t)-1;
- };
- std::vector<ChunkTarget> ChunkTargets;
- };
-
- tsl::robin_map<IoHash, size_t, IoHash::Hasher> RawHashToCacheCopyDataIndex;
- std::vector<CacheCopyData> CacheCopyDatas;
-
- if (!Options.PrimeCacheOnly && Options.EnableTargetFolderScavenging)
- {
- ZEN_TRACE_CPU("UpdateFolder_GetLocalChunks");
-
- Stopwatch LocalTimer;
-
- for (uint32_t LocalSequenceIndex = 0;
- LocalSequenceIndex < LocalContent.ChunkedContent.SequenceRawHashes.size() && (RemainingChunkCount > 0);
- LocalSequenceIndex++)
- {
- const IoHash& LocalSequenceRawHash = LocalContent.ChunkedContent.SequenceRawHashes[LocalSequenceIndex];
- const uint32_t LocalOrderOffset = LocalLookup.SequenceIndexChunkOrderOffset[LocalSequenceIndex];
-
- {
- uint64_t SourceOffset = 0;
- const uint32_t LocalChunkCount = LocalContent.ChunkedContent.ChunkCounts[LocalSequenceIndex];
- for (uint32_t LocalOrderIndex = 0; LocalOrderIndex < LocalChunkCount; LocalOrderIndex++)
- {
- const uint32_t LocalChunkIndex = LocalContent.ChunkedContent.ChunkOrders[LocalOrderOffset + LocalOrderIndex];
- const IoHash& LocalChunkHash = LocalContent.ChunkedContent.ChunkHashes[LocalChunkIndex];
- const uint64_t LocalChunkRawSize = LocalContent.ChunkedContent.ChunkRawSizes[LocalChunkIndex];
-
- if (auto RemoteChunkIt = RemoteLookup.ChunkHashToChunkIndex.find(LocalChunkHash);
- RemoteChunkIt != RemoteLookup.ChunkHashToChunkIndex.end())
- {
- const uint32_t RemoteChunkIndex = RemoteChunkIt->second;
- if (!RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex])
- {
- std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs =
- GetRemainingChunkTargets(SequenceIndexChunksLeftToWriteCounters, RemoteLookup, RemoteChunkIndex);
-
- if (!ChunkTargetPtrs.empty())
- {
- CacheCopyData::ChunkTarget Target = {
- .TargetChunkLocationCount = gsl::narrow<uint32_t>(ChunkTargetPtrs.size()),
- .RemoteChunkIndex = RemoteChunkIndex,
- .CacheFileOffset = SourceOffset};
- if (auto CopySourceIt = RawHashToCacheCopyDataIndex.find(LocalSequenceRawHash);
- CopySourceIt != RawHashToCacheCopyDataIndex.end())
- {
- CacheCopyData& Data = CacheCopyDatas[CopySourceIt->second];
- if (Data.TargetChunkLocationPtrs.size() > 1024)
- {
- RawHashToCacheCopyDataIndex.insert_or_assign(LocalSequenceRawHash, CacheCopyDatas.size());
- CacheCopyDatas.push_back(
- CacheCopyData{.ScavengeSourceIndex = (uint32_t)-1,
- .SourceSequenceIndex = LocalSequenceIndex,
- .TargetChunkLocationPtrs = ChunkTargetPtrs,
- .ChunkTargets = std::vector<CacheCopyData::ChunkTarget>{Target}});
- }
- else
- {
- Data.TargetChunkLocationPtrs.insert(Data.TargetChunkLocationPtrs.end(),
- ChunkTargetPtrs.begin(),
- ChunkTargetPtrs.end());
- Data.ChunkTargets.push_back(Target);
- }
- }
- else
- {
- RawHashToCacheCopyDataIndex.insert_or_assign(LocalSequenceRawHash, CacheCopyDatas.size());
- CacheCopyDatas.push_back(
- CacheCopyData{.ScavengeSourceIndex = (uint32_t)-1,
- .SourceSequenceIndex = LocalSequenceIndex,
- .TargetChunkLocationPtrs = ChunkTargetPtrs,
- .ChunkTargets = std::vector<CacheCopyData::ChunkTarget>{Target}});
- }
- CacheMappingStats.LocalChunkMatchingRemoteCount++;
- CacheMappingStats.LocalChunkMatchingRemoteByteCount += LocalChunkRawSize;
- RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex] = true;
- RemainingChunkCount--;
- }
- }
- }
- SourceOffset += LocalChunkRawSize;
- }
- }
- }
- CacheMappingStats.LocalScanElapsedWallTimeUs += LocalTimer.GetElapsedTimeUs();
- }
-
- if (!Options.PrimeCacheOnly && Options.EnableOtherDownloadsScavenging)
- {
- ZEN_TRACE_CPU("UpdateFolder_GetScavengeChunks");
-
- Stopwatch ScavengeTimer;
-
- for (uint32_t ScavengedContentIndex = 0; ScavengedContentIndex < ScavengedContents.size() && (RemainingChunkCount > 0);
- ScavengedContentIndex++)
- {
- const ChunkedFolderContent& ScavengedContent = ScavengedContents[ScavengedContentIndex];
- // const std::filesystem::path& ScavengedPath = ScavengedPaths[ScavengedContentIndex];
- const ChunkedContentLookup& ScavengedLookup = ScavengedLookups[ScavengedContentIndex];
-
- for (uint32_t ScavengedSequenceIndex = 0;
- ScavengedSequenceIndex < ScavengedContent.ChunkedContent.SequenceRawHashes.size() && (RemainingChunkCount > 0);
- ScavengedSequenceIndex++)
- {
- const IoHash& ScavengedSequenceRawHash = ScavengedContent.ChunkedContent.SequenceRawHashes[ScavengedSequenceIndex];
- const uint32_t ScavengedOrderOffset = ScavengedLookup.SequenceIndexChunkOrderOffset[ScavengedSequenceIndex];
-
- {
- uint64_t SourceOffset = 0;
- const uint32_t ScavengedChunkCount = ScavengedContent.ChunkedContent.ChunkCounts[ScavengedSequenceIndex];
- for (uint32_t ScavengedOrderIndex = 0; ScavengedOrderIndex < ScavengedChunkCount; ScavengedOrderIndex++)
- {
- const uint32_t ScavengedChunkIndex =
- ScavengedContent.ChunkedContent.ChunkOrders[ScavengedOrderOffset + ScavengedOrderIndex];
- const IoHash& ScavengedChunkHash = ScavengedContent.ChunkedContent.ChunkHashes[ScavengedChunkIndex];
- const uint64_t ScavengedChunkRawSize = ScavengedContent.ChunkedContent.ChunkRawSizes[ScavengedChunkIndex];
-
- if (auto RemoteChunkIt = RemoteLookup.ChunkHashToChunkIndex.find(ScavengedChunkHash);
- RemoteChunkIt != RemoteLookup.ChunkHashToChunkIndex.end())
- {
- const uint32_t RemoteChunkIndex = RemoteChunkIt->second;
- if (!RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex])
- {
- std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs =
- GetRemainingChunkTargets(SequenceIndexChunksLeftToWriteCounters, RemoteLookup, RemoteChunkIndex);
-
- if (!ChunkTargetPtrs.empty())
- {
- CacheCopyData::ChunkTarget Target = {
- .TargetChunkLocationCount = gsl::narrow<uint32_t>(ChunkTargetPtrs.size()),
- .RemoteChunkIndex = RemoteChunkIndex,
- .CacheFileOffset = SourceOffset};
- if (auto CopySourceIt = RawHashToCacheCopyDataIndex.find(ScavengedSequenceRawHash);
- CopySourceIt != RawHashToCacheCopyDataIndex.end())
- {
- CacheCopyData& Data = CacheCopyDatas[CopySourceIt->second];
- if (Data.TargetChunkLocationPtrs.size() > 1024)
- {
- RawHashToCacheCopyDataIndex.insert_or_assign(ScavengedSequenceRawHash,
- CacheCopyDatas.size());
- CacheCopyDatas.push_back(
- CacheCopyData{.ScavengeSourceIndex = ScavengedContentIndex,
- .SourceSequenceIndex = ScavengedSequenceIndex,
- .TargetChunkLocationPtrs = ChunkTargetPtrs,
- .ChunkTargets = std::vector<CacheCopyData::ChunkTarget>{Target}});
- }
- else
- {
- Data.TargetChunkLocationPtrs.insert(Data.TargetChunkLocationPtrs.end(),
- ChunkTargetPtrs.begin(),
- ChunkTargetPtrs.end());
- Data.ChunkTargets.push_back(Target);
- }
- }
- else
- {
- RawHashToCacheCopyDataIndex.insert_or_assign(ScavengedSequenceRawHash, CacheCopyDatas.size());
- CacheCopyDatas.push_back(
- CacheCopyData{.ScavengeSourceIndex = ScavengedContentIndex,
- .SourceSequenceIndex = ScavengedSequenceIndex,
- .TargetChunkLocationPtrs = ChunkTargetPtrs,
- .ChunkTargets = std::vector<CacheCopyData::ChunkTarget>{Target}});
- }
- CacheMappingStats.ScavengedChunkMatchingRemoteCount++;
- CacheMappingStats.ScavengedChunkMatchingRemoteByteCount += ScavengedChunkRawSize;
- RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex] = true;
- RemainingChunkCount--;
- }
- }
- }
- SourceOffset += ScavengedChunkRawSize;
- }
- }
- }
- }
- CacheMappingStats.ScavengeElapsedWallTimeUs += ScavengeTimer.GetElapsedTimeUs();
- }
- if (!IsQuiet)
- {
- if (!CachedSequenceHashesFound.empty() || !CachedChunkHashesFound.empty() || !CachedBlocksFound.empty())
- {
- ZEN_CONSOLE("Download cache: Found {} ({}) chunk sequences, {} ({}) chunks, {} ({}) blocks in {}",
- CachedSequenceHashesFound.size(),
- NiceBytes(CacheMappingStats.CacheSequenceHashesByteCount),
- CachedChunkHashesFound.size(),
- NiceBytes(CacheMappingStats.CacheChunkByteCount),
- CachedBlocksFound.size(),
- NiceBytes(CacheMappingStats.CacheBlocksByteCount),
- NiceTimeSpanMs(CacheMappingStats.CacheScanElapsedWallTimeUs / 1000));
- }
-
- if (!LocalPathIndexesMatchingSequenceIndexes.empty() || CacheMappingStats.LocalChunkMatchingRemoteCount > 0)
- {
- ZEN_CONSOLE("Local state : Found {} ({}) chunk sequences, {} ({}) chunks in {}",
- LocalPathIndexesMatchingSequenceIndexes.size(),
- NiceBytes(CacheMappingStats.LocalPathsMatchingSequencesByteCount),
- CacheMappingStats.LocalChunkMatchingRemoteCount,
- NiceBytes(CacheMappingStats.LocalChunkMatchingRemoteByteCount),
- NiceTimeSpanMs(CacheMappingStats.LocalScanElapsedWallTimeUs / 1000));
- }
- if (CacheMappingStats.ScavengedPathsMatchingSequencesCount > 0 || CacheMappingStats.ScavengedChunkMatchingRemoteCount > 0)
- {
- ZEN_CONSOLE("Scavenge of {} paths, found {} ({}) chunk sequences, {} ({}) chunks in {}",
- ScavengedPathsCount,
- CacheMappingStats.ScavengedPathsMatchingSequencesCount,
- NiceBytes(CacheMappingStats.ScavengedPathsMatchingSequencesByteCount),
- CacheMappingStats.ScavengedChunkMatchingRemoteCount,
- NiceBytes(CacheMappingStats.ScavengedChunkMatchingRemoteByteCount),
- NiceTimeSpanMs(CacheMappingStats.ScavengeElapsedWallTimeUs / 1000));
- }
- }
-
- uint64_t BytesToWrite = 0;
-
- for (uint32_t RemoteChunkIndex = 0; RemoteChunkIndex < RemoteContent.ChunkedContent.ChunkHashes.size(); RemoteChunkIndex++)
- {
- uint64_t ChunkWriteCount = GetChunkWriteCount(SequenceIndexChunksLeftToWriteCounters, RemoteLookup, RemoteChunkIndex);
- if (ChunkWriteCount > 0)
- {
- BytesToWrite += RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex] * ChunkWriteCount;
- if (!RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex])
- {
- RemoteChunkIndexNeedsCopyFromSourceFlags[RemoteChunkIndex] = true;
- }
- }
- }
-
- for (const ScavengeCopyOperation& ScavengeCopyOp : ScavengeCopyOperations)
- {
- BytesToWrite += ScavengeCopyOp.RawSize;
- }
-
- uint64_t TotalRequestCount = 0;
- uint64_t TotalPartWriteCount = 0;
- std::atomic<uint64_t> WritePartsComplete = 0;
-
- {
- ZEN_TRACE_CPU("WriteChunks");
-
- Stopwatch WriteTimer;
-
- FilteredRate FilteredDownloadedBytesPerSecond;
- FilteredRate FilteredWrittenBytesPerSecond;
-
- WorkerThreadPool& NetworkPool = GetNetworkPool();
- WorkerThreadPool& WritePool = GetIOWorkerPool();
-
- ProgressBar WriteProgressBar(ProgressMode, Options.PrimeCacheOnly ? "Downloading" : "Writing");
- ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
-
- struct LooseChunkHashWorkData
- {
- std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs;
- uint32_t RemoteChunkIndex = (uint32_t)-1;
- };
-
- std::vector<LooseChunkHashWorkData> LooseChunkHashWorks;
- TotalPartWriteCount += CacheCopyDatas.size();
- TotalPartWriteCount += ScavengeCopyOperations.size();
-
- for (const IoHash ChunkHash : LooseChunkHashes)
- {
- auto RemoteChunkIndexIt = RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash);
- ZEN_ASSERT(RemoteChunkIndexIt != RemoteLookup.ChunkHashToChunkIndex.end());
- const uint32_t RemoteChunkIndex = RemoteChunkIndexIt->second;
- if (RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex])
- {
- ZEN_CONSOLE_VERBOSE("Skipping chunk {} due to cache reuse", ChunkHash);
- continue;
- }
- bool NeedsCopy = true;
- if (RemoteChunkIndexNeedsCopyFromSourceFlags[RemoteChunkIndex].compare_exchange_strong(NeedsCopy, false))
- {
- std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs =
- GetRemainingChunkTargets(SequenceIndexChunksLeftToWriteCounters, RemoteLookup, RemoteChunkIndex);
-
- if (ChunkTargetPtrs.empty())
- {
- ZEN_CONSOLE_VERBOSE("Skipping chunk {} due to cache reuse", ChunkHash);
- }
- else
- {
- TotalRequestCount++;
- TotalPartWriteCount++;
- LooseChunkHashWorks.push_back(
- LooseChunkHashWorkData{.ChunkTargetPtrs = ChunkTargetPtrs, .RemoteChunkIndex = RemoteChunkIndex});
- }
- }
- }
-
- uint32_t BlockCount = gsl::narrow<uint32_t>(BlockDescriptions.size());
-
- std::vector<bool> ChunkIsPickedUpByBlock(RemoteContent.ChunkedContent.ChunkHashes.size(), false);
- auto GetNeededChunkBlockIndexes = [&RemoteContent,
- &RemoteLookup,
- &RemoteChunkIndexNeedsCopyFromSourceFlags,
- &ChunkIsPickedUpByBlock](const ChunkBlockDescription& BlockDescription) {
- ZEN_TRACE_CPU("UpdateFolder_GetNeededChunkBlockIndexes");
- std::vector<uint32_t> NeededBlockChunkIndexes;
- for (uint32_t ChunkBlockIndex = 0; ChunkBlockIndex < BlockDescription.ChunkRawHashes.size(); ChunkBlockIndex++)
- {
- const IoHash& ChunkHash = BlockDescription.ChunkRawHashes[ChunkBlockIndex];
- if (auto It = RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash); It != RemoteLookup.ChunkHashToChunkIndex.end())
- {
- const uint32_t RemoteChunkIndex = It->second;
- if (!ChunkIsPickedUpByBlock[RemoteChunkIndex])
- {
- if (RemoteChunkIndexNeedsCopyFromSourceFlags[RemoteChunkIndex])
- {
- ChunkIsPickedUpByBlock[RemoteChunkIndex] = true;
- NeededBlockChunkIndexes.push_back(ChunkBlockIndex);
- }
- }
- }
- }
- return NeededBlockChunkIndexes;
- };
-
- std::vector<uint32_t> CachedChunkBlockIndexes;
- std::vector<uint32_t> FetchBlockIndexes;
- std::vector<std::vector<uint32_t>> AllBlockChunkIndexNeeded;
-
- for (uint32_t BlockIndex = 0; BlockIndex < BlockCount; BlockIndex++)
- {
- const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex];
-
- std::vector<uint32_t> BlockChunkIndexNeeded = GetNeededChunkBlockIndexes(BlockDescription);
- if (!BlockChunkIndexNeeded.empty())
- {
- if (Options.PrimeCacheOnly)
- {
- FetchBlockIndexes.push_back(BlockIndex);
- }
- else
- {
- bool UsingCachedBlock = false;
- if (auto It = CachedBlocksFound.find(BlockDescription.BlockHash); It != CachedBlocksFound.end())
- {
- ZEN_TRACE_CPU("UpdateFolder_HandleBlocks_CacheGet");
-
- TotalPartWriteCount++;
-
- std::filesystem::path BlockPath =
- ZenTempBlockFolderPath(Options.ZenFolderPath) / BlockDescription.BlockHash.ToHexString();
- if (IsFile(BlockPath))
- {
- CachedChunkBlockIndexes.push_back(BlockIndex);
- UsingCachedBlock = true;
- }
- }
- if (!UsingCachedBlock)
- {
- FetchBlockIndexes.push_back(BlockIndex);
- }
- }
- }
- AllBlockChunkIndexNeeded.emplace_back(std::move(BlockChunkIndexNeeded));
- }
-
- struct BlobsExistsResult
- {
- tsl::robin_set<IoHash> ExistingBlobs;
- uint64_t ElapsedTimeMs = 0;
- };
-
- BlobsExistsResult ExistsResult;
-
- if (Storage.BuildCacheStorage)
- {
- ZEN_TRACE_CPU("BlobCacheExistCheck");
- Stopwatch Timer;
-
- tsl::robin_set<IoHash> BlobHashesSet;
-
- BlobHashesSet.reserve(LooseChunkHashWorks.size() + FetchBlockIndexes.size());
- for (LooseChunkHashWorkData& LooseChunkHashWork : LooseChunkHashWorks)
- {
- BlobHashesSet.insert(RemoteContent.ChunkedContent.ChunkHashes[LooseChunkHashWork.RemoteChunkIndex]);
- }
- for (uint32_t BlockIndex : FetchBlockIndexes)
- {
- const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex];
- BlobHashesSet.insert(BlockDescription.BlockHash);
- }
-
- if (!BlobHashesSet.empty())
- {
- const std::vector<IoHash> BlobHashes(BlobHashesSet.begin(), BlobHashesSet.end());
- const std::vector<BuildStorageCache::BlobExistsResult> CacheExistsResult =
- Storage.BuildCacheStorage->BlobsExists(BuildId, BlobHashes);
-
- if (CacheExistsResult.size() == BlobHashes.size())
- {
- ExistsResult.ExistingBlobs.reserve(CacheExistsResult.size());
- for (size_t BlobIndex = 0; BlobIndex < BlobHashes.size(); BlobIndex++)
- {
- if (CacheExistsResult[BlobIndex].HasBody)
- {
- ExistsResult.ExistingBlobs.insert(BlobHashes[BlobIndex]);
- }
- }
- }
- ExistsResult.ElapsedTimeMs = Timer.GetElapsedTimeMs();
- if (!ExistsResult.ExistingBlobs.empty() && !IsQuiet)
- {
- ZEN_CONSOLE("Remote cache : Found {} out of {} needed blobs in {}",
- ExistsResult.ExistingBlobs.size(),
- BlobHashes.size(),
- NiceTimeSpanMs(ExistsResult.ElapsedTimeMs));
- }
- }
- }
-
- std::vector<BlockRangeDescriptor> BlockRangeWorks;
- std::vector<uint32_t> FullBlockWorks;
- {
- Stopwatch Timer;
-
- std::vector<uint32_t> PartialBlockIndexes;
-
- for (uint32_t BlockIndex : FetchBlockIndexes)
- {
- const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex];
-
- const std::vector<uint32_t> BlockChunkIndexNeeded = std::move(AllBlockChunkIndexNeeded[BlockIndex]);
- if (!BlockChunkIndexNeeded.empty())
- {
- bool WantsToDoPartialBlockDownload = BlockChunkIndexNeeded.size() < BlockDescription.ChunkRawHashes.size();
- bool CanDoPartialBlockDownload =
- (BlockDescription.HeaderSize > 0) &&
- (BlockDescription.ChunkCompressedLengths.size() == BlockDescription.ChunkRawHashes.size());
-
- bool AllowedToDoPartialRequest = false;
- bool BlockExistInCache = ExistsResult.ExistingBlobs.contains(BlockDescription.BlockHash);
- switch (Options.PartialBlockRequestMode)
- {
- case EPartialBlockRequestMode::Off:
- break;
- case EPartialBlockRequestMode::ZenCacheOnly:
- AllowedToDoPartialRequest = BlockExistInCache;
- break;
- case EPartialBlockRequestMode::Mixed:
- case EPartialBlockRequestMode::All:
- AllowedToDoPartialRequest = true;
- break;
- default:
- ZEN_ASSERT(false);
- break;
- }
-
- const uint32_t ChunkStartOffsetInBlock =
- gsl::narrow<uint32_t>(CompressedBuffer::GetHeaderSizeForNoneEncoder() + BlockDescription.HeaderSize);
-
- const uint64_t TotalBlockSize = std::accumulate(BlockDescription.ChunkCompressedLengths.begin(),
- BlockDescription.ChunkCompressedLengths.end(),
- std::uint64_t(ChunkStartOffsetInBlock));
-
- if (AllowedToDoPartialRequest && WantsToDoPartialBlockDownload && CanDoPartialBlockDownload)
- {
- ZEN_TRACE_CPU("UpdateFolder_HandleBlocks_PartialAnalysis");
-
- bool LimitToSingleRange =
- BlockExistInCache ? false : Options.PartialBlockRequestMode == EPartialBlockRequestMode::Mixed;
- uint64_t TotalWantedChunksSize = 0;
- std::optional<std::vector<BlockRangeDescriptor>> MaybeBlockRanges =
- CalculateBlockRanges(BlockIndex,
- BlockDescription,
- BlockChunkIndexNeeded,
- LimitToSingleRange,
- ChunkStartOffsetInBlock,
- TotalBlockSize,
- TotalWantedChunksSize);
- ZEN_ASSERT(TotalWantedChunksSize <= TotalBlockSize);
-
- if (MaybeBlockRanges.has_value())
- {
- const std::vector<BlockRangeDescriptor>& BlockRanges = MaybeBlockRanges.value();
- ZEN_ASSERT(!BlockRanges.empty());
- BlockRangeWorks.insert(BlockRangeWorks.end(), BlockRanges.begin(), BlockRanges.end());
- TotalRequestCount += BlockRanges.size();
- TotalPartWriteCount += BlockRanges.size();
-
- uint64_t RequestedSize = std::accumulate(
- BlockRanges.begin(),
- BlockRanges.end(),
- uint64_t(0),
- [](uint64_t Current, const BlockRangeDescriptor& Range) { return Current + Range.RangeLength; });
- PartialBlockIndexes.push_back(BlockIndex);
-
- if (RequestedSize > TotalWantedChunksSize)
- {
- ZEN_CONSOLE_VERBOSE("Requesting {} chunks ({}) from block {} ({}) using {} requests (extra bytes {})",
- BlockChunkIndexNeeded.size(),
- NiceBytes(RequestedSize),
- BlockDescription.BlockHash,
- NiceBytes(TotalBlockSize),
- BlockRanges.size(),
- NiceBytes(RequestedSize - TotalWantedChunksSize));
- }
- }
- else
- {
- FullBlockWorks.push_back(BlockIndex);
- TotalRequestCount++;
- TotalPartWriteCount++;
- }
- }
- else
- {
- FullBlockWorks.push_back(BlockIndex);
- TotalRequestCount++;
- TotalPartWriteCount++;
- }
- }
- }
-
- if (!PartialBlockIndexes.empty())
- {
- uint64_t TotalFullBlockRequestBytes = 0;
- for (uint32_t BlockIndex : FullBlockWorks)
- {
- const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex];
- uint32_t CurrentOffset =
- gsl::narrow<uint32_t>(CompressedBuffer::GetHeaderSizeForNoneEncoder() + BlockDescription.HeaderSize);
-
- TotalFullBlockRequestBytes += std::accumulate(BlockDescription.ChunkCompressedLengths.begin(),
- BlockDescription.ChunkCompressedLengths.end(),
- std::uint64_t(CurrentOffset));
- }
-
- uint64_t TotalPartialBlockBytes = 0;
- for (uint32_t BlockIndex : PartialBlockIndexes)
- {
- const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex];
- uint32_t CurrentOffset =
- gsl::narrow<uint32_t>(CompressedBuffer::GetHeaderSizeForNoneEncoder() + BlockDescription.HeaderSize);
-
- TotalPartialBlockBytes += std::accumulate(BlockDescription.ChunkCompressedLengths.begin(),
- BlockDescription.ChunkCompressedLengths.end(),
- std::uint64_t(CurrentOffset));
- }
-
- uint64_t NonPartialTotalBlockBytes = TotalFullBlockRequestBytes + TotalPartialBlockBytes;
-
- const uint64_t TotalPartialBlockRequestBytes =
- std::accumulate(BlockRangeWorks.begin(),
- BlockRangeWorks.end(),
- uint64_t(0),
- [](uint64_t Current, const BlockRangeDescriptor& Range) { return Current + Range.RangeLength; });
- uint64_t TotalExtraPartialBlocksRequests = BlockRangeWorks.size() - PartialBlockIndexes.size();
-
- uint64_t TotalSavedBlocksSize = TotalPartialBlockBytes - TotalPartialBlockRequestBytes;
- double SavedSizePercent = (TotalSavedBlocksSize * 100.0) / NonPartialTotalBlockBytes;
-
- if (!IsQuiet)
- {
- ZEN_CONSOLE(
- "Analisys of partial block requests saves download of {} out of {} ({:.1f}%) using {} extra "
- "requests. Completed in {}",
- NiceBytes(TotalSavedBlocksSize),
- NiceBytes(NonPartialTotalBlockBytes),
- SavedSizePercent,
- TotalExtraPartialBlocksRequests,
- NiceTimeSpanMs(ExistsResult.ElapsedTimeMs));
- }
- // exit(0);
- }
- }
-
- BufferedWriteFileCache WriteCache;
-
- for (uint32_t ScavengeOpIndex = 0; ScavengeOpIndex < ScavengeCopyOperations.size(); ScavengeOpIndex++)
- {
- if (AbortFlag)
- {
- break;
- }
- if (!Options.PrimeCacheOnly)
- {
- Work.ScheduleWork(
- WritePool,
- [&RemoteContent,
- &CacheFolderPath,
- &ScavengedPaths,
- &ScavengeCopyOperations,
- &ScavengedContents,
- &FilteredWrittenBytesPerSecond,
- ScavengeOpIndex,
- &WritePartsComplete,
- TotalPartWriteCount,
- &DiskStats](std::atomic<bool>&) mutable {
- if (!AbortFlag)
- {
- ZEN_TRACE_CPU("UpdateFolder_WriteScavenged");
-
- FilteredWrittenBytesPerSecond.Start();
-
- const ScavengeCopyOperation& ScavengeOp = ScavengeCopyOperations[ScavengeOpIndex];
- const ChunkedFolderContent& ScavengedContent = ScavengedContents[ScavengeOp.ScavengedContentIndex];
- const std::filesystem::path ScavengedPath = ScavengedContent.Paths[ScavengeOp.ScavengedPathIndex];
-
- const std::filesystem::path ScavengedFilePath =
- (ScavengedPaths[ScavengeOp.ScavengedContentIndex] / ScavengedPath).make_preferred();
- ZEN_ASSERT_SLOW(FileSizeFromPath(ScavengedFilePath) == ScavengeOp.RawSize);
-
- const IoHash& RemoteSequenceRawHash =
- RemoteContent.ChunkedContent.SequenceRawHashes[ScavengeOp.RemoteSequenceIndex];
- const std::filesystem::path TempFilePath =
- GetTempChunkedSequenceFileName(CacheFolderPath, RemoteSequenceRawHash);
-
- const uint64_t RawSize = ScavengedContent.RawSizes[ScavengeOp.ScavengedPathIndex];
- CopyFile(ScavengedFilePath,
- TempFilePath,
- RawSize,
- DiskStats.WriteCount,
- DiskStats.WriteByteCount,
- DiskStats.CloneCount,
- DiskStats.CloneByteCount);
-
- const std::filesystem::path CacheFilePath =
- GetFinalChunkedSequenceFileName(CacheFolderPath, RemoteSequenceRawHash);
- RenameFile(TempFilePath, CacheFilePath);
-
- WritePartsComplete++;
- if (WritePartsComplete == TotalPartWriteCount)
- {
- FilteredWrittenBytesPerSecond.Stop();
- }
- }
- });
- }
- }
-
- for (uint32_t LooseChunkHashWorkIndex = 0; LooseChunkHashWorkIndex < LooseChunkHashWorks.size(); LooseChunkHashWorkIndex++)
- {
- if (AbortFlag)
- {
- break;
- }
-
- LooseChunkHashWorkData& LooseChunkHashWork = LooseChunkHashWorks[LooseChunkHashWorkIndex];
-
- std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs =
- std::move(LooseChunkHashWork.ChunkTargetPtrs);
- const uint32_t RemoteChunkIndex = LooseChunkHashWork.RemoteChunkIndex;
-
- if (Options.PrimeCacheOnly &&
- ExistsResult.ExistingBlobs.contains(RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]))
- {
- DownloadStats.RequestsCompleteCount++;
- continue;
- }
-
- Work.ScheduleWork(
- WritePool,
- [&Storage,
- &Path,
- &Options,
- &RemoteContent,
- &RemoteLookup,
- &CacheFolderPath,
- &SequenceIndexChunksLeftToWriteCounters,
- &Work,
- &WritePool,
- &NetworkPool,
- &ExistsResult,
- &DiskStats,
- &DownloadStats,
- &WriteChunkStats,
- &WritePartsComplete,
- RemoteChunkIndex,
- ChunkTargetPtrs,
- BuildId = Oid(BuildId),
- TotalRequestCount,
- TotalPartWriteCount,
- &WriteCache,
- &FilteredDownloadedBytesPerSecond,
- &FilteredWrittenBytesPerSecond](std::atomic<bool>&) mutable {
- if (!AbortFlag)
- {
- ZEN_TRACE_CPU("UpdateFolder_ReadPreDownloaded");
- std::filesystem::path ExistingCompressedChunkPath;
- if (!Options.PrimeCacheOnly)
- {
- const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex];
- std::filesystem::path CompressedChunkPath =
- ZenTempDownloadFolderPath(Options.ZenFolderPath) / ChunkHash.ToHexString();
- if (IsFile(CompressedChunkPath))
- {
- IoBuffer ExistingCompressedPart = IoBufferBuilder::MakeFromFile(ExistingCompressedChunkPath);
- if (ExistingCompressedPart)
- {
- IoHash RawHash;
- uint64_t RawSize;
- if (CompressedBuffer::ValidateCompressedHeader(ExistingCompressedPart, RawHash, RawSize))
- {
- DownloadStats.RequestsCompleteCount++;
- if (DownloadStats.RequestsCompleteCount == TotalRequestCount)
- {
- FilteredDownloadedBytesPerSecond.Stop();
- }
- ExistingCompressedChunkPath = std::move(CompressedChunkPath);
- }
- else
- {
- std::error_code DummyEc;
- RemoveFile(CompressedChunkPath, DummyEc);
- }
- }
- }
- }
- if (!AbortFlag)
-
- {
- if (!ExistingCompressedChunkPath.empty())
- {
- Work.ScheduleWork(
- WritePool,
- [&Path,
- &Options,
- &RemoteContent,
- &RemoteLookup,
- &CacheFolderPath,
- &SequenceIndexChunksLeftToWriteCounters,
- &WriteCache,
- &Work,
- &WritePool,
- &DiskStats,
- &WriteChunkStats,
- &WritePartsComplete,
- TotalPartWriteCount,
- &FilteredWrittenBytesPerSecond,
- RemoteChunkIndex,
- ChunkTargetPtrs,
- CompressedChunkPath = std::move(ExistingCompressedChunkPath)](std::atomic<bool>&) mutable {
- if (!AbortFlag)
- {
- ZEN_TRACE_CPU("UpdateFolder_WritePreDownloaded");
-
- FilteredWrittenBytesPerSecond.Start();
-
- const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex];
-
- IoBuffer CompressedPart = IoBufferBuilder::MakeFromFile(CompressedChunkPath);
- if (!CompressedPart)
- {
- throw std::runtime_error(
- fmt::format("Could not open dowloaded compressed chunk {} from {}",
- ChunkHash,
- CompressedChunkPath));
- }
-
- std::filesystem::path TargetFolder = ZenTempCacheFolderPath(Options.ZenFolderPath);
- bool NeedHashVerify = WriteCompressedChunk(TargetFolder,
- RemoteContent,
- RemoteLookup,
- ChunkHash,
- ChunkTargetPtrs,
- WriteCache,
- std::move(CompressedPart),
- DiskStats);
- WritePartsComplete++;
-
- if (!AbortFlag)
- {
- if (WritePartsComplete == TotalPartWriteCount)
- {
- FilteredWrittenBytesPerSecond.Stop();
- }
-
- TryRemoveFile(CompressedChunkPath);
-
- std::vector<uint32_t> CompletedSequences =
- CompleteChunkTargets(ChunkTargetPtrs, SequenceIndexChunksLeftToWriteCounters);
- WriteCache.Close(CompletedSequences);
- if (NeedHashVerify)
- {
- VerifyAndCompleteChunkSequencesAsync(TargetFolder,
- RemoteContent,
- RemoteLookup,
- CompletedSequences,
- Work,
- WritePool);
- }
- else
- {
- FinalizeChunkSequences(TargetFolder, RemoteContent, CompletedSequences);
- }
- }
- }
- });
- }
- else
- {
- Work.ScheduleWork(
- NetworkPool,
- [&Path,
- &Options,
- &Storage,
- BuildId = Oid(BuildId),
- &RemoteContent,
- &RemoteLookup,
- &ExistsResult,
- &SequenceIndexChunksLeftToWriteCounters,
- &WriteCache,
- &Work,
- &WritePool,
- &NetworkPool,
- &DiskStats,
- &WriteChunkStats,
- &WritePartsComplete,
- TotalPartWriteCount,
- TotalRequestCount,
- &FilteredDownloadedBytesPerSecond,
- &FilteredWrittenBytesPerSecond,
- RemoteChunkIndex,
- ChunkTargetPtrs,
- &DownloadStats](std::atomic<bool>&) mutable {
- if (!AbortFlag)
- {
- const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex];
- FilteredDownloadedBytesPerSecond.Start();
- IoBuffer BuildBlob;
- const bool ExistsInCache =
- Storage.BuildCacheStorage && ExistsResult.ExistingBlobs.contains(ChunkHash);
- if (ExistsInCache)
- {
- BuildBlob = Storage.BuildCacheStorage->GetBuildBlob(BuildId, ChunkHash);
- }
- if (BuildBlob)
- {
- uint64_t BlobSize = BuildBlob.GetSize();
- DownloadStats.DownloadedChunkCount++;
- DownloadStats.DownloadedChunkByteCount += BlobSize;
- DownloadStats.RequestsCompleteCount++;
- if (DownloadStats.RequestsCompleteCount == TotalRequestCount)
- {
- FilteredDownloadedBytesPerSecond.Stop();
- }
- AsyncWriteDownloadedChunk(Options.ZenFolderPath,
- RemoteContent,
- RemoteLookup,
- RemoteChunkIndex,
- std::move(ChunkTargetPtrs),
- WriteCache,
- Work,
- WritePool,
- std::move(BuildBlob),
- SequenceIndexChunksLeftToWriteCounters,
- WritePartsComplete,
- TotalPartWriteCount,
- FilteredWrittenBytesPerSecond,
- DiskStats);
- }
- else
- {
- if (RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex] >=
- Options.LargeAttachmentSize)
- {
- ZEN_TRACE_CPU("UpdateFolder_GetLargeChunk");
- DownloadLargeBlob(*Storage.BuildStorage,
- ZenTempDownloadFolderPath(Options.ZenFolderPath),
- BuildId,
- ChunkHash,
- Options.PreferredMultipartChunkSize,
- Work,
- NetworkPool,
- DownloadStats,
- [&Storage,
- &Options,
- &RemoteContent,
- &RemoteLookup,
- BuildId,
- &SequenceIndexChunksLeftToWriteCounters,
- &WriteCache,
- &Work,
- &WritePool,
- ChunkHash,
- TotalPartWriteCount,
- TotalRequestCount,
- &WritePartsComplete,
- &FilteredWrittenBytesPerSecond,
- &FilteredDownloadedBytesPerSecond,
- &DownloadStats,
- &DiskStats,
- RemoteChunkIndex,
- ChunkTargetPtrs](IoBuffer&& Payload) mutable {
- DownloadStats.RequestsCompleteCount++;
- if (DownloadStats.RequestsCompleteCount == TotalRequestCount)
- {
- FilteredDownloadedBytesPerSecond.Stop();
- }
- if (Payload && Storage.BuildCacheStorage)
- {
- Storage.BuildCacheStorage->PutBuildBlob(
- BuildId,
- ChunkHash,
- ZenContentType::kCompressedBinary,
- CompositeBuffer(SharedBuffer(Payload)));
- }
- if (!Options.PrimeCacheOnly)
- {
- if (!AbortFlag)
- {
- AsyncWriteDownloadedChunk(
- Options.ZenFolderPath,
- RemoteContent,
- RemoteLookup,
- RemoteChunkIndex,
- std::move(ChunkTargetPtrs),
- WriteCache,
- Work,
- WritePool,
- std::move(Payload),
- SequenceIndexChunksLeftToWriteCounters,
- WritePartsComplete,
- TotalPartWriteCount,
- FilteredWrittenBytesPerSecond,
- DiskStats);
- }
- }
- });
- }
- else
- {
- ZEN_TRACE_CPU("UpdateFolder_GetChunk");
- BuildBlob = Storage.BuildStorage->GetBuildBlob(BuildId, ChunkHash);
- if (BuildBlob && Storage.BuildCacheStorage)
- {
- Storage.BuildCacheStorage->PutBuildBlob(
- BuildId,
- ChunkHash,
- ZenContentType::kCompressedBinary,
- CompositeBuffer(SharedBuffer(BuildBlob)));
- }
- if (!BuildBlob)
- {
- throw std::runtime_error(fmt::format("Chunk {} is missing", ChunkHash));
- }
- if (!Options.PrimeCacheOnly)
- {
- if (!AbortFlag)
- {
- uint64_t BlobSize = BuildBlob.GetSize();
- DownloadStats.DownloadedChunkCount++;
- DownloadStats.DownloadedChunkByteCount += BlobSize;
- DownloadStats.RequestsCompleteCount++;
- if (DownloadStats.RequestsCompleteCount == TotalRequestCount)
- {
- FilteredDownloadedBytesPerSecond.Stop();
- }
- AsyncWriteDownloadedChunk(Options.ZenFolderPath,
- RemoteContent,
- RemoteLookup,
- RemoteChunkIndex,
- std::move(ChunkTargetPtrs),
- WriteCache,
- Work,
- WritePool,
- std::move(BuildBlob),
- SequenceIndexChunksLeftToWriteCounters,
- WritePartsComplete,
- TotalPartWriteCount,
- FilteredWrittenBytesPerSecond,
- DiskStats);
- }
- }
- }
- }
- }
- });
- }
- }
- }
- });
- }
-
- std::unique_ptr<CloneQueryInterface> CloneQuery;
- if (AllowFileClone)
- {
- CloneQuery = GetCloneQueryInterface(CacheFolderPath);
- }
-
- for (size_t CopyDataIndex = 0; CopyDataIndex < CacheCopyDatas.size(); CopyDataIndex++)
- {
- ZEN_ASSERT(!Options.PrimeCacheOnly);
- if (AbortFlag)
- {
- break;
- }
-
- Work.ScheduleWork(
- WritePool,
- [&Path,
- &LocalContent,
- &RemoteContent,
- &RemoteLookup,
- &CacheFolderPath,
- &CloneQuery,
- &LocalLookup,
- &SequenceIndexChunksLeftToWriteCounters,
- &WriteCache,
- &Work,
- &WritePool,
- &FilteredWrittenBytesPerSecond,
- &CacheCopyDatas,
- &ScavengedContents,
- &ScavengedLookups,
- &ScavengedPaths,
- &WritePartsComplete,
- TotalPartWriteCount,
- &DiskStats,
- CopyDataIndex](std::atomic<bool>&) {
- if (!AbortFlag)
- {
- ZEN_TRACE_CPU("UpdateFolder_CopyLocal");
-
- FilteredWrittenBytesPerSecond.Start();
- const CacheCopyData& CopyData = CacheCopyDatas[CopyDataIndex];
-
- std::filesystem::path SourceFilePath;
-
- if (CopyData.ScavengeSourceIndex == (uint32_t)-1)
- {
- const uint32_t LocalPathIndex = LocalLookup.SequenceIndexFirstPathIndex[CopyData.SourceSequenceIndex];
- SourceFilePath = (Path / LocalContent.Paths[LocalPathIndex]).make_preferred();
- }
- else
- {
- const ChunkedFolderContent& ScavengedContent = ScavengedContents[CopyData.ScavengeSourceIndex];
- const ChunkedContentLookup& ScavengedLookup = ScavengedLookups[CopyData.ScavengeSourceIndex];
- const std::filesystem::path ScavengedPath = ScavengedPaths[CopyData.ScavengeSourceIndex];
- const uint32_t ScavengedPathIndex =
- ScavengedLookup.SequenceIndexFirstPathIndex[CopyData.SourceSequenceIndex];
- SourceFilePath = (ScavengedPath / ScavengedContent.Paths[ScavengedPathIndex]).make_preferred();
- }
- ZEN_ASSERT_SLOW(IsFile(SourceFilePath));
- ZEN_ASSERT(!CopyData.TargetChunkLocationPtrs.empty());
-
- uint64_t CacheLocalFileBytesRead = 0;
-
- size_t TargetStart = 0;
- const std::span<const ChunkedContentLookup::ChunkSequenceLocation* const> AllTargets(
- CopyData.TargetChunkLocationPtrs);
-
- struct WriteOp
- {
- const ChunkedContentLookup::ChunkSequenceLocation* Target = nullptr;
- uint64_t CacheFileOffset = (uint64_t)-1;
- uint32_t ChunkIndex = (uint32_t)-1;
- };
-
- std::vector<WriteOp> WriteOps;
-
- if (!AbortFlag)
- {
- ZEN_TRACE_CPU("Sort");
- WriteOps.reserve(AllTargets.size());
- for (const CacheCopyData::ChunkTarget& ChunkTarget : CopyData.ChunkTargets)
- {
- std::span<const ChunkedContentLookup::ChunkSequenceLocation* const> TargetRange =
- AllTargets.subspan(TargetStart, ChunkTarget.TargetChunkLocationCount);
- for (const ChunkedContentLookup::ChunkSequenceLocation* Target : TargetRange)
- {
- WriteOps.push_back(WriteOp{.Target = Target,
- .CacheFileOffset = ChunkTarget.CacheFileOffset,
- .ChunkIndex = ChunkTarget.RemoteChunkIndex});
- }
- TargetStart += ChunkTarget.TargetChunkLocationCount;
- }
-
- std::sort(WriteOps.begin(), WriteOps.end(), [](const WriteOp& Lhs, const WriteOp& Rhs) {
- if (Lhs.Target->SequenceIndex < Rhs.Target->SequenceIndex)
- {
- return true;
- }
- else if (Lhs.Target->SequenceIndex > Rhs.Target->SequenceIndex)
- {
- return false;
- }
- if (Lhs.Target->Offset < Rhs.Target->Offset)
- {
- return true;
- }
- return false;
- });
- }
-
- if (!AbortFlag)
- {
- ZEN_TRACE_CPU("Write");
-
- tsl::robin_set<uint32_t> ChunkIndexesWritten;
-
- BufferedOpenFile SourceFile(SourceFilePath, DiskStats);
-
- bool CanCloneSource = CloneQuery && CloneQuery->CanClone(SourceFile.Handle());
-
- BufferedWriteFileCache::Local LocalWriter(WriteCache);
-
- for (size_t WriteOpIndex = 0; WriteOpIndex < WriteOps.size();)
- {
- if (AbortFlag)
- {
- break;
- }
- const WriteOp& Op = WriteOps[WriteOpIndex];
-
- const uint32_t RemoteSequenceIndex = Op.Target->SequenceIndex;
- ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].load() <=
- RemoteContent.ChunkedContent.ChunkCounts[RemoteSequenceIndex]);
- ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].load() > 0);
- const uint32_t RemotePathIndex = RemoteLookup.SequenceIndexFirstPathIndex[RemoteSequenceIndex];
- const uint64_t TargetSize = RemoteContent.RawSizes[RemotePathIndex];
- const uint64_t ChunkSize = RemoteContent.ChunkedContent.ChunkRawSizes[Op.ChunkIndex];
-
- uint64_t ReadLength = ChunkSize;
- size_t WriteCount = 1;
- uint64_t OpSourceEnd = Op.CacheFileOffset + ChunkSize;
- uint64_t OpTargetEnd = Op.Target->Offset + ChunkSize;
- while ((WriteOpIndex + WriteCount) < WriteOps.size())
- {
- const WriteOp& NextOp = WriteOps[WriteOpIndex + WriteCount];
- if (NextOp.Target->SequenceIndex != Op.Target->SequenceIndex)
- {
- break;
- }
- if (NextOp.Target->Offset != OpTargetEnd)
- {
- break;
- }
- if (NextOp.CacheFileOffset != OpSourceEnd)
- {
- break;
- }
- const uint64_t NextChunkLength = RemoteContent.ChunkedContent.ChunkRawSizes[NextOp.ChunkIndex];
- if (ReadLength + NextChunkLength > 512u * 1024u)
- {
- break;
- }
- ReadLength += NextChunkLength;
- OpSourceEnd += NextChunkLength;
- OpTargetEnd += NextChunkLength;
- WriteCount++;
- }
-
- {
- bool DidClone = false;
-
- if (CanCloneSource)
- {
- uint64_t PreBytes = 0;
- uint64_t PostBytes = 0;
- uint64_t ClonableBytes = CloneQuery->GetClonableRange(Op.CacheFileOffset,
- Op.Target->Offset,
- ReadLength,
- PreBytes,
- PostBytes);
- if (ClonableBytes > 0)
- {
- // We need to open the file...
- BufferedWriteFileCache::Local::Writer* Writer = LocalWriter.GetWriter(RemoteSequenceIndex);
- if (!Writer)
- {
- Writer =
- LocalWriter.PutWriter(RemoteSequenceIndex,
- std::make_unique<BufferedWriteFileCache::Local::Writer>());
-
- Writer->File = std::make_unique<BasicFile>();
-
- const std::filesystem::path FileName = GetTempChunkedSequenceFileName(
- CacheFolderPath,
- RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]);
- Writer->File->Open(FileName, BasicFile::Mode::kWrite);
- if (UseSparseFiles)
- {
- PrepareFileForScatteredWrite(Writer->File->Handle(), TargetSize);
- }
- }
- DidClone = CloneQuery->TryClone(SourceFile.Handle(),
- Writer->File->Handle(),
- Op.CacheFileOffset + PreBytes,
- Op.Target->Offset + PreBytes,
- ClonableBytes,
- TargetSize);
- if (DidClone)
- {
- DiskStats.WriteCount++;
- DiskStats.WriteByteCount += ClonableBytes;
-
- DiskStats.CloneCount++;
- DiskStats.CloneByteCount += ClonableBytes;
-
- if (PreBytes > 0)
- {
- CompositeBuffer ChunkSource = SourceFile.GetRange(Op.CacheFileOffset, PreBytes);
- const uint64_t FileOffset = Op.Target->Offset;
-
- WriteSequenceChunk(CacheFolderPath,
- RemoteContent,
- LocalWriter,
- ChunkSource,
- RemoteSequenceIndex,
- FileOffset,
- RemotePathIndex,
- DiskStats);
- }
- if (PostBytes > 0)
- {
- CompositeBuffer ChunkSource =
- SourceFile.GetRange(Op.CacheFileOffset + ReadLength - PostBytes, PostBytes);
- const uint64_t FileOffset = Op.Target->Offset + ReadLength - PostBytes;
-
- WriteSequenceChunk(CacheFolderPath,
- RemoteContent,
- LocalWriter,
- ChunkSource,
- RemoteSequenceIndex,
- FileOffset,
- RemotePathIndex,
- DiskStats);
- }
- }
- }
- }
-
- if (!DidClone)
- {
- CompositeBuffer ChunkSource = SourceFile.GetRange(Op.CacheFileOffset, ReadLength);
-
- const uint64_t FileOffset = Op.Target->Offset;
-
- WriteSequenceChunk(CacheFolderPath,
- RemoteContent,
- LocalWriter,
- ChunkSource,
- RemoteSequenceIndex,
- FileOffset,
- RemotePathIndex,
- DiskStats);
- }
- }
-
- CacheLocalFileBytesRead += ReadLength; // TODO: This should be the sum of unique chunk sizes?
-
- WriteOpIndex += WriteCount;
- }
- }
- if (!AbortFlag)
- {
- // Write tracking, updating this must be done without any files open (BufferedWriteFileCache::Local)
- std::vector<uint32_t> CompletedChunkSequences;
- for (const WriteOp& Op : WriteOps)
- {
- const uint32_t RemoteSequenceIndex = Op.Target->SequenceIndex;
- if (CompleteSequenceChunk(RemoteSequenceIndex, SequenceIndexChunksLeftToWriteCounters))
- {
- CompletedChunkSequences.push_back(RemoteSequenceIndex);
- }
- }
- WriteCache.Close(CompletedChunkSequences);
- VerifyAndCompleteChunkSequencesAsync(CacheFolderPath,
- RemoteContent,
- RemoteLookup,
- CompletedChunkSequences,
- Work,
- WritePool);
- ZEN_CONSOLE_VERBOSE("Copied {} from {}", NiceBytes(CacheLocalFileBytesRead), SourceFilePath);
- }
- WritePartsComplete++;
- if (WritePartsComplete == TotalPartWriteCount)
- {
- FilteredWrittenBytesPerSecond.Stop();
- }
- }
- });
- }
-
- for (uint32_t BlockIndex : CachedChunkBlockIndexes)
- {
- ZEN_ASSERT(!Options.PrimeCacheOnly);
- if (AbortFlag)
- {
- break;
- }
-
- Work.ScheduleWork(
- WritePool,
- [&Options,
- &CacheFolderPath,
- &RemoteContent,
- &RemoteLookup,
- &RemoteChunkIndexNeedsCopyFromSourceFlags,
- &SequenceIndexChunksLeftToWriteCounters,
- &WriteCache,
- &Work,
- &WritePool,
- &BlockDescriptions,
- &FilteredWrittenBytesPerSecond,
- &DiskStats,
- &WritePartsComplete,
- TotalPartWriteCount,
- BlockIndex](std::atomic<bool>&) mutable {
- if (!AbortFlag)
- {
- ZEN_TRACE_CPU("UpdateFolder_WriteCachedBlock");
-
- const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex];
- FilteredWrittenBytesPerSecond.Start();
-
- std::filesystem::path BlockChunkPath =
- ZenTempBlockFolderPath(Options.ZenFolderPath) / BlockDescription.BlockHash.ToHexString();
- IoBuffer BlockBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath);
- if (!BlockBuffer)
- {
- throw std::runtime_error(
- fmt::format("Can not read block {} at {}", BlockDescription.BlockHash, BlockChunkPath));
- }
-
- if (!AbortFlag)
- {
- if (!WriteBlockToDisk(CacheFolderPath,
- RemoteContent,
- BlockDescription,
- SequenceIndexChunksLeftToWriteCounters,
- Work,
- WritePool,
- CompositeBuffer(std::move(BlockBuffer)),
- RemoteLookup,
- RemoteChunkIndexNeedsCopyFromSourceFlags,
- WriteCache,
- DiskStats))
- {
- std::error_code DummyEc;
- RemoveFile(BlockChunkPath, DummyEc);
- throw std::runtime_error(fmt::format("Block {} is malformed", BlockDescription.BlockHash));
- }
-
- TryRemoveFile(BlockChunkPath);
-
- WritePartsComplete++;
-
- if (WritePartsComplete == TotalPartWriteCount)
- {
- FilteredWrittenBytesPerSecond.Stop();
- }
- }
- }
- });
- }
-
- for (size_t BlockRangeIndex = 0; BlockRangeIndex < BlockRangeWorks.size(); BlockRangeIndex++)
- {
- ZEN_ASSERT(!Options.PrimeCacheOnly);
- if (AbortFlag)
- {
- break;
- }
- const BlockRangeDescriptor BlockRange = BlockRangeWorks[BlockRangeIndex];
- ZEN_ASSERT(BlockRange.BlockIndex != (uint32_t)-1);
- const uint32_t BlockIndex = BlockRange.BlockIndex;
- Work.ScheduleWork(
- NetworkPool,
- [&Storage,
- &Options,
- BuildId,
- &RemoteLookup,
- &BlockDescriptions,
- &RemoteChunkIndexNeedsCopyFromSourceFlags,
- &CacheFolderPath,
- &RemoteContent,
- &SequenceIndexChunksLeftToWriteCounters,
- &ExistsResult,
- &WriteCache,
- &FilteredDownloadedBytesPerSecond,
- TotalRequestCount,
- &WritePartsComplete,
- TotalPartWriteCount,
- &FilteredWrittenBytesPerSecond,
- &DiskStats,
- &DownloadStats,
- &Work,
- &WritePool,
- BlockIndex,
- BlockRange](std::atomic<bool>&) {
- if (!AbortFlag)
- {
- ZEN_TRACE_CPU("UpdateFolder_GetPartialBlock");
-
- const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex];
-
- FilteredDownloadedBytesPerSecond.Start();
- IoBuffer BlockBuffer;
- if (Storage.BuildCacheStorage && ExistsResult.ExistingBlobs.contains(BlockDescription.BlockHash))
- {
- BlockBuffer = Storage.BuildCacheStorage->GetBuildBlob(BuildId,
- BlockDescription.BlockHash,
- BlockRange.RangeStart,
- BlockRange.RangeLength);
- }
- if (!BlockBuffer)
- {
- BlockBuffer = Storage.BuildStorage->GetBuildBlob(BuildId,
- BlockDescription.BlockHash,
- BlockRange.RangeStart,
- BlockRange.RangeLength);
- }
- if (!BlockBuffer)
- {
- throw std::runtime_error(fmt::format("Block {} is missing when fetching range {} -> {}",
- BlockDescription.BlockHash,
- BlockRange.RangeStart,
- BlockRange.RangeStart + BlockRange.RangeLength));
- }
- if (!AbortFlag)
- {
- uint64_t BlockSize = BlockBuffer.GetSize();
- DownloadStats.DownloadedBlockCount++;
- DownloadStats.DownloadedBlockByteCount += BlockSize;
- DownloadStats.RequestsCompleteCount++;
- if (DownloadStats.RequestsCompleteCount == TotalRequestCount)
- {
- FilteredDownloadedBytesPerSecond.Stop();
- }
-
- std::filesystem::path BlockChunkPath;
-
- // Check if the dowloaded block is file based and we can move it directly without rewriting it
- {
- IoBufferFileReference FileRef;
- if (BlockBuffer.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) &&
- (FileRef.FileChunkSize == BlockSize))
- {
- ZEN_TRACE_CPU("UpdateFolder_MoveTempBlock");
-
- std::error_code Ec;
- std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec);
- if (!Ec)
- {
- BlockBuffer.SetDeleteOnClose(false);
- BlockBuffer = {};
- BlockChunkPath =
- ZenTempBlockFolderPath(Options.ZenFolderPath) / fmt::format("{}_{:x}_{:x}",
- BlockDescription.BlockHash,
- BlockRange.RangeStart,
- BlockRange.RangeLength);
- RenameFile(TempBlobPath, BlockChunkPath, Ec);
- if (Ec)
- {
- BlockChunkPath = std::filesystem::path{};
-
- // Re-open the temp file again
- BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete);
- BlockBuffer = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, BlockSize, true);
- BlockBuffer.SetDeleteOnClose(true);
- }
- }
- }
- }
-
- if (BlockChunkPath.empty() && (BlockSize > 512u * 1024u))
- {
- ZEN_TRACE_CPU("UpdateFolder_WriteTempBlock");
- // Could not be moved and rather large, lets store it on disk
- BlockChunkPath = ZenTempBlockFolderPath(Options.ZenFolderPath) / fmt::format("{}_{:x}_{:x}",
- BlockDescription.BlockHash,
- BlockRange.RangeStart,
- BlockRange.RangeLength);
- TemporaryFile::SafeWriteFile(BlockChunkPath, BlockBuffer);
- BlockBuffer = {};
- }
-
- if (!AbortFlag)
- {
- Work.ScheduleWork(
- WritePool,
- [&CacheFolderPath,
- &RemoteContent,
- &RemoteLookup,
- &BlockDescriptions,
- &RemoteChunkIndexNeedsCopyFromSourceFlags,
- &SequenceIndexChunksLeftToWriteCounters,
- &WritePartsComplete,
- &WriteCache,
- &Work,
- TotalPartWriteCount,
- &WritePool,
- &DiskStats,
- &FilteredWrittenBytesPerSecond,
- BlockIndex,
- BlockRange,
- BlockChunkPath,
- BlockPartialBuffer = std::move(BlockBuffer)](std::atomic<bool>&) mutable {
- if (!AbortFlag)
- {
- ZEN_TRACE_CPU("UpdateFolder_WritePartialBlock");
-
- const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex];
-
- if (BlockChunkPath.empty())
- {
- ZEN_ASSERT(BlockPartialBuffer);
- }
- else
- {
- ZEN_ASSERT(!BlockPartialBuffer);
- BlockPartialBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath);
- if (!BlockPartialBuffer)
- {
- throw std::runtime_error(fmt::format("Could not open downloaded block {} from {}",
- BlockDescription.BlockHash,
- BlockChunkPath));
- }
- }
-
- FilteredWrittenBytesPerSecond.Start();
-
- if (!WritePartialBlockToDisk(
- CacheFolderPath,
- RemoteContent,
- BlockDescription,
- SequenceIndexChunksLeftToWriteCounters,
- Work,
- WritePool,
- CompositeBuffer(std::move(BlockPartialBuffer)),
- BlockRange.ChunkBlockIndexStart,
- BlockRange.ChunkBlockIndexStart + BlockRange.ChunkBlockIndexCount - 1,
- RemoteLookup,
- RemoteChunkIndexNeedsCopyFromSourceFlags,
- WriteCache,
- DiskStats))
- {
- std::error_code DummyEc;
- RemoveFile(BlockChunkPath, DummyEc);
- throw std::runtime_error(
- fmt::format("Partial block {} is malformed", BlockDescription.BlockHash));
- }
-
- if (!BlockChunkPath.empty())
- {
- TryRemoveFile(BlockChunkPath);
- }
-
- WritePartsComplete++;
- if (WritePartsComplete == TotalPartWriteCount)
- {
- FilteredWrittenBytesPerSecond.Stop();
- }
- }
- });
- }
- }
- }
- });
- }
-
- for (uint32_t BlockIndex : FullBlockWorks)
- {
- if (AbortFlag)
- {
- break;
- }
-
- if (Options.PrimeCacheOnly && ExistsResult.ExistingBlobs.contains(BlockDescriptions[BlockIndex].BlockHash))
- {
- DownloadStats.RequestsCompleteCount++;
- continue;
- }
-
- Work.ScheduleWork(
- NetworkPool,
- [&Storage,
- &Options,
- BuildId,
- &BlockDescriptions,
- &WritePartsComplete,
- TotalPartWriteCount,
- &FilteredWrittenBytesPerSecond,
- &ExistsResult,
- &Work,
- &WritePool,
- &RemoteContent,
- &RemoteLookup,
- &WriteCache,
- &CacheFolderPath,
- &RemoteChunkIndexNeedsCopyFromSourceFlags,
- &SequenceIndexChunksLeftToWriteCounters,
- &FilteredDownloadedBytesPerSecond,
- &WriteChunkStats,
- &DiskStats,
- &DownloadStats,
- TotalRequestCount,
- BlockIndex](std::atomic<bool>&) {
- if (!AbortFlag)
- {
- ZEN_TRACE_CPU("UpdateFolder_GetFullBlock");
-
- const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex];
-
- FilteredDownloadedBytesPerSecond.Start();
-
- IoBuffer BlockBuffer;
- const bool ExistsInCache =
- Storage.BuildCacheStorage && ExistsResult.ExistingBlobs.contains(BlockDescription.BlockHash);
- if (ExistsInCache)
- {
- BlockBuffer = Storage.BuildCacheStorage->GetBuildBlob(BuildId, BlockDescription.BlockHash);
- }
- if (!BlockBuffer)
- {
- BlockBuffer = Storage.BuildStorage->GetBuildBlob(BuildId, BlockDescription.BlockHash);
- if (BlockBuffer && Storage.BuildCacheStorage)
- {
- Storage.BuildCacheStorage->PutBuildBlob(BuildId,
- BlockDescription.BlockHash,
- ZenContentType::kCompressedBinary,
- CompositeBuffer(SharedBuffer(BlockBuffer)));
- }
- }
- if (!BlockBuffer)
- {
- throw std::runtime_error(fmt::format("Block {} is missing", BlockDescription.BlockHash));
- }
- if (!AbortFlag)
- {
- uint64_t BlockSize = BlockBuffer.GetSize();
- DownloadStats.DownloadedBlockCount++;
- DownloadStats.DownloadedBlockByteCount += BlockSize;
- DownloadStats.RequestsCompleteCount++;
- if (DownloadStats.RequestsCompleteCount == TotalRequestCount)
- {
- FilteredDownloadedBytesPerSecond.Stop();
- }
-
- if (!Options.PrimeCacheOnly)
- {
- std::filesystem::path BlockChunkPath;
-
- // Check if the dowloaded block is file based and we can move it directly without rewriting it
- {
- IoBufferFileReference FileRef;
- if (BlockBuffer.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) &&
- (FileRef.FileChunkSize == BlockSize))
- {
- ZEN_TRACE_CPU("UpdateFolder_MoveTempBlock");
- std::error_code Ec;
- std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec);
- if (!Ec)
- {
- BlockBuffer.SetDeleteOnClose(false);
- BlockBuffer = {};
- BlockChunkPath = ZenTempBlockFolderPath(Options.ZenFolderPath) /
- BlockDescription.BlockHash.ToHexString();
- RenameFile(TempBlobPath, BlockChunkPath, Ec);
- if (Ec)
- {
- BlockChunkPath = std::filesystem::path{};
-
- // Re-open the temp file again
- BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete);
- BlockBuffer = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, BlockSize, true);
- BlockBuffer.SetDeleteOnClose(true);
- }
- }
- }
- }
-
- if (BlockChunkPath.empty() && (BlockSize > 512u * 1024u))
- {
- ZEN_TRACE_CPU("UpdateFolder_WriteTempBlock");
- // Could not be moved and rather large, lets store it on disk
- BlockChunkPath =
- ZenTempBlockFolderPath(Options.ZenFolderPath) / BlockDescription.BlockHash.ToHexString();
- TemporaryFile::SafeWriteFile(BlockChunkPath, BlockBuffer);
- BlockBuffer = {};
- }
-
- if (!AbortFlag)
- {
- Work.ScheduleWork(WritePool,
- [&Work,
- &WritePool,
- &RemoteContent,
- &RemoteLookup,
- CacheFolderPath,
- &RemoteChunkIndexNeedsCopyFromSourceFlags,
- &SequenceIndexChunksLeftToWriteCounters,
- BlockIndex,
- &BlockDescriptions,
- &WriteCache,
- &WriteChunkStats,
- &DiskStats,
- &WritePartsComplete,
- TotalPartWriteCount,
- &FilteredWrittenBytesPerSecond,
- BlockChunkPath,
- BlockBuffer = std::move(BlockBuffer)](std::atomic<bool>&) mutable {
- if (!AbortFlag)
- {
- ZEN_TRACE_CPU("UpdateFolder_WriteFullBlock");
-
- const ChunkBlockDescription& BlockDescription =
- BlockDescriptions[BlockIndex];
-
- if (BlockChunkPath.empty())
- {
- ZEN_ASSERT(BlockBuffer);
- }
- else
- {
- ZEN_ASSERT(!BlockBuffer);
- BlockBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath);
- if (!BlockBuffer)
- {
- throw std::runtime_error(
- fmt::format("Could not open dowloaded block {} from {}",
- BlockDescription.BlockHash,
- BlockChunkPath));
- }
- }
-
- FilteredWrittenBytesPerSecond.Start();
- if (!WriteBlockToDisk(CacheFolderPath,
- RemoteContent,
- BlockDescription,
- SequenceIndexChunksLeftToWriteCounters,
- Work,
- WritePool,
- CompositeBuffer(std::move(BlockBuffer)),
- RemoteLookup,
- RemoteChunkIndexNeedsCopyFromSourceFlags,
- WriteCache,
- DiskStats))
- {
- std::error_code DummyEc;
- RemoveFile(BlockChunkPath, DummyEc);
- throw std::runtime_error(
- fmt::format("Block {} is malformed", BlockDescription.BlockHash));
- }
-
- if (!BlockChunkPath.empty())
- {
- TryRemoveFile(BlockChunkPath);
- }
-
- WritePartsComplete++;
-
- if (WritePartsComplete == TotalPartWriteCount)
- {
- FilteredWrittenBytesPerSecond.Stop();
- }
- }
- });
- }
- }
- }
- }
- });
- }
-
- {
- ZEN_TRACE_CPU("WriteChunks_Wait");
-
- Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) {
- ZEN_UNUSED(PendingWork);
- uint64_t DownloadedBytes = DownloadStats.DownloadedChunkByteCount.load() +
- DownloadStats.DownloadedBlockByteCount.load() +
- +DownloadStats.DownloadedPartialBlockByteCount.load();
- FilteredWrittenBytesPerSecond.Update(DiskStats.WriteByteCount.load());
- FilteredDownloadedBytesPerSecond.Update(DownloadedBytes);
- std::string DownloadRateString =
- (DownloadStats.RequestsCompleteCount == TotalRequestCount)
- ? ""
- : fmt::format(" {}bits/s", NiceNum(FilteredDownloadedBytesPerSecond.GetCurrent() * 8));
- std::string CloneDetails;
- if (DiskStats.CloneCount.load() > 0)
- {
- CloneDetails = fmt::format(" ({} cloned)", NiceBytes(DiskStats.CloneByteCount.load()));
- }
- std::string WriteDetails = Options.PrimeCacheOnly ? ""
- : fmt::format(" {}/{} ({}B/s) written{}.",
- NiceBytes(DiskStats.WriteByteCount.load()),
- NiceBytes(BytesToWrite),
- NiceNum(FilteredWrittenBytesPerSecond.GetCurrent()),
- CloneDetails);
- std::string Details = fmt::format("{}/{} ({}{}) downloaded.{}",
- DownloadStats.RequestsCompleteCount.load(),
- TotalRequestCount,
- NiceBytes(DownloadedBytes),
- DownloadRateString,
- WriteDetails);
- WriteProgressBar.UpdateState(
- {.Task = Options.PrimeCacheOnly ? "Downloading " : "Writing chunks ",
- .Details = Details,
- .TotalCount = Options.PrimeCacheOnly ? TotalRequestCount : BytesToWrite,
- .RemainingCount = Options.PrimeCacheOnly ? (TotalRequestCount - DownloadStats.RequestsCompleteCount.load())
- : (BytesToWrite - DiskStats.WriteByteCount.load()),
- .Status = ProgressBar::State::CalculateStatus(IsAborted, IsPaused)},
- false);
- });
- }
-
- CloneQuery.reset();
-
- FilteredWrittenBytesPerSecond.Stop();
- FilteredDownloadedBytesPerSecond.Stop();
-
- WriteProgressBar.Finish();
- if (AbortFlag)
- {
- return;
- }
-
- if (!Options.PrimeCacheOnly)
- {
- uint32_t RawSequencesMissingWriteCount = 0;
- for (uint32_t SequenceIndex = 0; SequenceIndex < SequenceIndexChunksLeftToWriteCounters.size(); SequenceIndex++)
- {
- const auto& SequenceIndexChunksLeftToWriteCounter = SequenceIndexChunksLeftToWriteCounters[SequenceIndex];
- if (SequenceIndexChunksLeftToWriteCounter.load() != 0)
- {
- RawSequencesMissingWriteCount++;
- const uint32_t PathIndex = RemoteLookup.SequenceIndexFirstPathIndex[SequenceIndex];
- const std::filesystem::path& IncompletePath = RemoteContent.Paths[PathIndex];
- ZEN_ASSERT(!IncompletePath.empty());
- const uint32_t ExpectedSequenceCount = RemoteContent.ChunkedContent.ChunkCounts[SequenceIndex];
- if (!IsQuiet)
- {
- ZEN_CONSOLE("{}: Max count {}, Current count {}",
- IncompletePath,
- ExpectedSequenceCount,
- SequenceIndexChunksLeftToWriteCounter.load());
- }
- ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounter.load() <= ExpectedSequenceCount);
- }
- }
- ZEN_ASSERT(RawSequencesMissingWriteCount == 0);
- }
-
- const uint64_t DownloadedBytes = DownloadStats.DownloadedChunkByteCount.load() + DownloadStats.DownloadedBlockByteCount.load() +
- +DownloadStats.DownloadedPartialBlockByteCount.load();
- if (!IsQuiet)
- {
- std::string CloneDetails;
- if (DiskStats.CloneCount.load() > 0)
- {
- CloneDetails = fmt::format(" ({} cloned)", NiceBytes(DiskStats.CloneByteCount.load()));
- }
- ZEN_CONSOLE("Downloaded {} ({}bits/s) in {}. Wrote {} ({}B/s){} in {}. Completed in {}",
- NiceBytes(DownloadedBytes),
- NiceNum(GetBytesPerSecond(FilteredDownloadedBytesPerSecond.GetElapsedTimeUS(), DownloadedBytes * 8)),
- NiceTimeSpanMs(FilteredDownloadedBytesPerSecond.GetElapsedTimeUS() / 1000),
- NiceBytes(DiskStats.WriteByteCount.load()),
- NiceNum(GetBytesPerSecond(FilteredWrittenBytesPerSecond.GetElapsedTimeUS(), DiskStats.WriteByteCount.load())),
- CloneDetails,
- NiceTimeSpanMs(FilteredWrittenBytesPerSecond.GetElapsedTimeUS() / 1000),
- NiceTimeSpanMs(WriteTimer.GetElapsedTimeMs()));
- }
-
- WriteChunkStats.WriteChunksElapsedWallTimeUs = WriteTimer.GetElapsedTimeUs();
- WriteChunkStats.DownloadTimeUs = FilteredDownloadedBytesPerSecond.GetElapsedTimeUS();
- WriteChunkStats.WriteTimeUs = FilteredWrittenBytesPerSecond.GetElapsedTimeUS();
- }
-
- if (Options.PrimeCacheOnly)
- {
- return;
- }
-
- tsl::robin_map<uint32_t, uint32_t> RemotePathIndexToLocalPathIndex;
- RemotePathIndexToLocalPathIndex.reserve(RemoteContent.Paths.size());
-
- tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> SequenceHashToLocalPathIndex;
- std::vector<uint32_t> RemoveLocalPathIndexes;
-
- if (AbortFlag)
- {
- return;
- }
-
- {
- ZEN_TRACE_CPU("UpdateFolder_PrepareTarget");
-
- tsl::robin_set<IoHash, IoHash::Hasher> CachedRemoteSequences;
- tsl::robin_map<std::string, uint32_t> RemotePathToRemoteIndex;
- RemotePathToRemoteIndex.reserve(RemoteContent.Paths.size());
- for (uint32_t RemotePathIndex = 0; RemotePathIndex < RemoteContent.Paths.size(); RemotePathIndex++)
- {
- RemotePathToRemoteIndex.insert({RemoteContent.Paths[RemotePathIndex].generic_string(), RemotePathIndex});
- }
-
- std::vector<uint32_t> FilesToCache;
-
- uint64_t MatchCount = 0;
- uint64_t PathMismatchCount = 0;
- uint64_t HashMismatchCount = 0;
- std::atomic<uint64_t> CachedCount = 0;
- std::atomic<uint64_t> CachedByteCount = 0;
- uint64_t SkippedCount = 0;
- uint64_t DeleteCount = 0;
- for (uint32_t LocalPathIndex = 0; LocalPathIndex < LocalContent.Paths.size(); LocalPathIndex++)
- {
- if (AbortFlag)
- {
- break;
- }
- const IoHash& RawHash = LocalContent.RawHashes[LocalPathIndex];
- const std::filesystem::path& LocalPath = LocalContent.Paths[LocalPathIndex];
-
- ZEN_ASSERT_SLOW(IsFile((Path / LocalContent.Paths[LocalPathIndex]).make_preferred()));
-
- if (!Options.WipeTargetFolder)
- {
- if (auto RemotePathIt = RemotePathToRemoteIndex.find(LocalPath.generic_string());
- RemotePathIt != RemotePathToRemoteIndex.end())
- {
- const uint32_t RemotePathIndex = RemotePathIt->second;
- if (RemoteContent.RawHashes[RemotePathIndex] == RawHash)
- {
- // It is already in it's desired place
- RemotePathIndexToLocalPathIndex[RemotePathIndex] = LocalPathIndex;
- SequenceHashToLocalPathIndex.insert({RawHash, LocalPathIndex});
- MatchCount++;
- continue;
- }
- else
- {
- HashMismatchCount++;
- }
- }
- else
- {
- PathMismatchCount++;
- }
- }
- if (RemoteLookup.RawHashToSequenceIndex.contains(RawHash))
- {
- if (!CachedRemoteSequences.contains(RawHash))
- {
- ZEN_TRACE_CPU("MoveToCache");
- // We need it
- FilesToCache.push_back(LocalPathIndex);
- CachedRemoteSequences.insert(RawHash);
- }
- else
- {
- // We already have it
- SkippedCount++;
- }
- }
- else if (!Options.WipeTargetFolder)
- {
- // We don't need it
- RemoveLocalPathIndexes.push_back(LocalPathIndex);
- DeleteCount++;
- }
- }
-
- if (AbortFlag)
- {
- return;
- }
-
- {
- ZEN_TRACE_CPU("UpdateFolder_CopyToCache");
-
- Stopwatch Timer;
-
- WorkerThreadPool& WritePool = GetIOWorkerPool();
-
- ProgressBar CacheLocalProgressBar(ProgressMode, "Cache Local Data");
- ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
-
- for (uint32_t LocalPathIndex : FilesToCache)
- {
- if (AbortFlag)
- {
- break;
- }
- Work.ScheduleWork(
- WritePool,
- [&Path, &LocalContent, &CacheFolderPath, &CachedCount, &CachedByteCount, LocalPathIndex](std::atomic<bool>&) {
- ZEN_TRACE_CPU("UpdateFolder_AsyncCopyToCache");
- if (!AbortFlag)
- {
- const IoHash& RawHash = LocalContent.RawHashes[LocalPathIndex];
- const std::filesystem::path& LocalPath = LocalContent.Paths[LocalPathIndex];
- const std::filesystem::path CacheFilePath = GetFinalChunkedSequenceFileName(CacheFolderPath, RawHash);
- ZEN_ASSERT_SLOW(!IsFileWithRetry(CacheFilePath));
- const std::filesystem::path LocalFilePath = (Path / LocalPath).make_preferred();
- RenameFileWithRetry(LocalFilePath, CacheFilePath);
- CachedCount++;
- CachedByteCount += LocalContent.RawSizes[LocalPathIndex];
- }
- });
- }
-
- {
- ZEN_TRACE_CPU("CacheLocal_Wait");
-
- Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) {
- ZEN_UNUSED(PendingWork);
- const uint64_t WorkTotal = FilesToCache.size();
- const uint64_t WorkComplete = CachedCount.load();
- std::string Details = fmt::format("{}/{} ({}) files", WorkComplete, WorkTotal, NiceBytes(CachedByteCount));
- CacheLocalProgressBar.UpdateState({.Task = "Caching local ",
- .Details = Details,
- .TotalCount = gsl::narrow<uint64_t>(WorkTotal),
- .RemainingCount = gsl::narrow<uint64_t>(WorkTotal - WorkComplete),
- .Status = ProgressBar::State::CalculateStatus(IsAborted, IsPaused)},
- false);
- });
- }
-
- CacheLocalProgressBar.Finish();
- if (AbortFlag)
- {
- return;
- }
-
- ZEN_DEBUG(
- "Local state prep: Match: {}, PathMismatch: {}, HashMismatch: {}, Cached: {} ({}), Skipped: {}, "
- "Delete: {}",
- MatchCount,
- PathMismatchCount,
- HashMismatchCount,
- CachedCount.load(),
- NiceBytes(CachedByteCount.load()),
- SkippedCount,
- DeleteCount);
- }
- }
-
- if (Options.WipeTargetFolder)
- {
- ZEN_TRACE_CPU("UpdateFolder_WipeTarget");
- Stopwatch Timer;
-
- // Clean target folder
- if (!CleanDirectory(Path, DefaultExcludeFolders))
- {
- ZEN_CONSOLE_WARN("Some files in {} could not be removed", Path);
- }
- RebuildFolderStateStats.CleanFolderElapsedWallTimeUs = Timer.GetElapsedTimeUs();
- }
-
- if (AbortFlag)
- {
- return;
- }
-
- {
- ZEN_TRACE_CPU("UpdateFolder_FinalizeTree");
- Stopwatch Timer;
-
- WorkerThreadPool& WritePool = GetIOWorkerPool();
-
- ProgressBar RebuildProgressBar(ProgressMode, "Rebuild State");
- ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
-
- OutLocalFolderState.Paths.resize(RemoteContent.Paths.size());
- OutLocalFolderState.RawSizes.resize(RemoteContent.Paths.size());
- OutLocalFolderState.Attributes.resize(RemoteContent.Paths.size());
- OutLocalFolderState.ModificationTicks.resize(RemoteContent.Paths.size());
-
- std::atomic<uint64_t> DeletedCount = 0;
-
- for (uint32_t LocalPathIndex : RemoveLocalPathIndexes)
- {
- if (AbortFlag)
- {
- break;
- }
- Work.ScheduleWork(WritePool, [&Path, &LocalContent, &DeletedCount, LocalPathIndex](std::atomic<bool>&) {
- if (!AbortFlag)
- {
- ZEN_TRACE_CPU("FinalizeTree_Remove");
- const std::filesystem::path LocalFilePath = (Path / LocalContent.Paths[LocalPathIndex]).make_preferred();
- SetFileReadOnlyWithRetry(LocalFilePath, false);
- RemoveFileWithRetry(LocalFilePath);
- DeletedCount++;
- }
- });
- }
-
- std::atomic<uint64_t> TargetsComplete = 0;
-
- struct FinalizeTarget
- {
- IoHash RawHash;
- uint32_t RemotePathIndex;
- };
-
- std::vector<FinalizeTarget> Targets;
- Targets.reserve(RemoteContent.Paths.size());
- for (uint32_t RemotePathIndex = 0; RemotePathIndex < RemoteContent.Paths.size(); RemotePathIndex++)
- {
- Targets.push_back(FinalizeTarget{.RawHash = RemoteContent.RawHashes[RemotePathIndex], .RemotePathIndex = RemotePathIndex});
- }
- std::sort(Targets.begin(), Targets.end(), [](const FinalizeTarget& Lhs, const FinalizeTarget& Rhs) {
- if (Lhs.RawHash < Rhs.RawHash)
- {
- return true;
- }
- else if (Lhs.RawHash > Rhs.RawHash)
- {
- return false;
- }
- return Lhs.RemotePathIndex < Rhs.RemotePathIndex;
- });
-
- size_t TargetOffset = 0;
- while (TargetOffset < Targets.size())
- {
- if (AbortFlag)
- {
- break;
- }
-
- size_t TargetCount = 1;
- while ((TargetOffset + TargetCount) < Targets.size() &&
- (Targets[TargetOffset + TargetCount].RawHash == Targets[TargetOffset].RawHash))
- {
- TargetCount++;
- }
-
- Work.ScheduleWork(
- WritePool,
- [&Path,
- &LocalContent,
- &SequenceHashToLocalPathIndex,
- &RemoteContent,
- &RemoteLookup,
- &CacheFolderPath,
- &Targets,
- &RemotePathIndexToLocalPathIndex,
- &RebuildFolderStateStats,
- &OutLocalFolderState,
- BaseTargetOffset = TargetOffset,
- TargetCount,
- &TargetsComplete](std::atomic<bool>&) {
- if (!AbortFlag)
- {
- ZEN_TRACE_CPU("FinalizeTree_Work");
-
- size_t TargetOffset = BaseTargetOffset;
- const IoHash& RawHash = Targets[TargetOffset].RawHash;
-
- if (RawHash == IoHash::Zero)
- {
- ZEN_TRACE_CPU("ZeroSize");
- while (TargetOffset < (BaseTargetOffset + TargetCount))
- {
- const uint32_t RemotePathIndex = Targets[TargetOffset].RemotePathIndex;
- ZEN_ASSERT(Targets[TargetOffset].RawHash == RawHash);
- const std::filesystem::path& TargetPath = RemoteContent.Paths[RemotePathIndex];
- std::filesystem::path TargetFilePath = (Path / TargetPath).make_preferred();
- if (!RemotePathIndexToLocalPathIndex[RemotePathIndex])
- {
- if (IsFileWithRetry(TargetFilePath))
- {
- SetFileReadOnlyWithRetry(TargetFilePath, false);
- }
- else
- {
- CreateDirectories(TargetFilePath.parent_path());
- }
- BasicFile OutputFile;
- OutputFile.Open(TargetFilePath, BasicFile::Mode::kTruncate);
- }
- OutLocalFolderState.Paths[RemotePathIndex] = TargetPath;
- OutLocalFolderState.RawSizes[RemotePathIndex] = RemoteContent.RawSizes[RemotePathIndex];
-
- OutLocalFolderState.Attributes[RemotePathIndex] =
- RemoteContent.Attributes.empty()
- ? GetNativeFileAttributes(TargetFilePath)
- : SetNativeFileAttributes(TargetFilePath,
- RemoteContent.Platform,
- RemoteContent.Attributes[RemotePathIndex]);
- OutLocalFolderState.ModificationTicks[RemotePathIndex] = GetModificationTickFromPath(TargetFilePath);
-
- TargetOffset++;
- TargetsComplete++;
- }
- }
- else
- {
- ZEN_TRACE_CPU("Files");
- ZEN_ASSERT(RemoteLookup.RawHashToSequenceIndex.contains(RawHash));
- const uint32_t FirstRemotePathIndex = Targets[TargetOffset].RemotePathIndex;
- const std::filesystem::path& FirstTargetPath = RemoteContent.Paths[FirstRemotePathIndex];
- std::filesystem::path FirstTargetFilePath = (Path / FirstTargetPath).make_preferred();
-
- if (auto InPlaceIt = RemotePathIndexToLocalPathIndex.find(FirstRemotePathIndex);
- InPlaceIt != RemotePathIndexToLocalPathIndex.end())
- {
- ZEN_ASSERT_SLOW(IsFileWithRetry(FirstTargetFilePath));
- }
- else
- {
- if (IsFileWithRetry(FirstTargetFilePath))
- {
- SetFileReadOnlyWithRetry(FirstTargetFilePath, false);
- }
- else
- {
- CreateDirectories(FirstTargetFilePath.parent_path());
- }
-
- if (auto InplaceIt = SequenceHashToLocalPathIndex.find(RawHash);
- InplaceIt != SequenceHashToLocalPathIndex.end())
- {
- ZEN_TRACE_CPU("Copy");
- const uint32_t LocalPathIndex = InplaceIt->second;
- const std::filesystem::path& SourcePath = LocalContent.Paths[LocalPathIndex];
- std::filesystem::path SourceFilePath = (Path / SourcePath).make_preferred();
- ZEN_ASSERT_SLOW(IsFileWithRetry(SourceFilePath));
-
- ZEN_DEBUG("Copying from '{}' -> '{}'", SourceFilePath, FirstTargetFilePath);
- const uint64_t RawSize = LocalContent.RawSizes[LocalPathIndex];
- std::atomic<uint64_t> WriteCount;
- std::atomic<uint64_t> WriteByteCount;
- std::atomic<uint64_t> CloneCount;
- std::atomic<uint64_t> CloneByteCount;
- CopyFile(SourceFilePath,
- FirstTargetFilePath,
- RawSize,
- WriteCount,
- WriteByteCount,
- CloneCount,
- CloneByteCount);
- RebuildFolderStateStats.FinalizeTreeFilesCopiedCount++;
- }
- else
- {
- ZEN_TRACE_CPU("Rename");
- const std::filesystem::path CacheFilePath =
- GetFinalChunkedSequenceFileName(CacheFolderPath, RawHash);
- ZEN_ASSERT_SLOW(IsFileWithRetry(CacheFilePath));
-
- RenameFileWithRetry(CacheFilePath, FirstTargetFilePath);
-
- RebuildFolderStateStats.FinalizeTreeFilesMovedCount++;
- }
- }
-
- OutLocalFolderState.Paths[FirstRemotePathIndex] = FirstTargetPath;
- OutLocalFolderState.RawSizes[FirstRemotePathIndex] = RemoteContent.RawSizes[FirstRemotePathIndex];
-
- OutLocalFolderState.Attributes[FirstRemotePathIndex] =
- RemoteContent.Attributes.empty()
- ? GetNativeFileAttributes(FirstTargetFilePath)
- : SetNativeFileAttributes(FirstTargetFilePath,
- RemoteContent.Platform,
- RemoteContent.Attributes[FirstRemotePathIndex]);
- OutLocalFolderState.ModificationTicks[FirstRemotePathIndex] =
- GetModificationTickFromPath(FirstTargetFilePath);
-
- TargetOffset++;
- TargetsComplete++;
-
- while (TargetOffset < (BaseTargetOffset + TargetCount))
- {
- const uint32_t RemotePathIndex = Targets[TargetOffset].RemotePathIndex;
- ZEN_ASSERT(Targets[TargetOffset].RawHash == RawHash);
- const std::filesystem::path& TargetPath = RemoteContent.Paths[RemotePathIndex];
- std::filesystem::path TargetFilePath = (Path / TargetPath).make_preferred();
-
- if (auto InPlaceIt = RemotePathIndexToLocalPathIndex.find(RemotePathIndex);
- InPlaceIt != RemotePathIndexToLocalPathIndex.end())
- {
- ZEN_ASSERT_SLOW(IsFileWithRetry(TargetFilePath));
- }
- else
- {
- ZEN_TRACE_CPU("Copy");
- if (IsFileWithRetry(TargetFilePath))
- {
- SetFileReadOnlyWithRetry(TargetFilePath, false);
- }
- else
- {
- CreateDirectories(TargetFilePath.parent_path());
- }
-
- ZEN_ASSERT_SLOW(IsFileWithRetry(FirstTargetFilePath));
- ZEN_DEBUG("Copying from '{}' -> '{}'", FirstTargetFilePath, TargetFilePath);
- const uint64_t RawSize = RemoteContent.RawSizes[RemotePathIndex];
- std::atomic<uint64_t> WriteCount;
- std::atomic<uint64_t> WriteByteCount;
- std::atomic<uint64_t> CloneCount;
- std::atomic<uint64_t> CloneByteCount;
- CopyFile(FirstTargetFilePath,
- TargetFilePath,
- RawSize,
- WriteCount,
- WriteByteCount,
- CloneCount,
- CloneByteCount);
- RebuildFolderStateStats.FinalizeTreeFilesCopiedCount++;
- }
-
- OutLocalFolderState.Paths[RemotePathIndex] = TargetPath;
- OutLocalFolderState.RawSizes[RemotePathIndex] = RemoteContent.RawSizes[RemotePathIndex];
-
- OutLocalFolderState.Attributes[RemotePathIndex] =
- RemoteContent.Attributes.empty()
- ? GetNativeFileAttributes(TargetFilePath)
- : SetNativeFileAttributes(TargetFilePath,
- RemoteContent.Platform,
- RemoteContent.Attributes[RemotePathIndex]);
- OutLocalFolderState.ModificationTicks[RemotePathIndex] = GetModificationTickFromPath(TargetFilePath);
-
- TargetOffset++;
- TargetsComplete++;
- }
- }
- }
- });
-
- TargetOffset += TargetCount;
- }
-
- {
- ZEN_TRACE_CPU("FinalizeTree_Wait");
-
- Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) {
- ZEN_UNUSED(PendingWork);
- const uint64_t WorkTotal = Targets.size() + RemoveLocalPathIndexes.size();
- const uint64_t WorkComplete = TargetsComplete.load() + DeletedCount.load();
- std::string Details = fmt::format("{}/{} files", WorkComplete, WorkTotal);
- RebuildProgressBar.UpdateState({.Task = "Rebuilding state ",
- .Details = Details,
- .TotalCount = gsl::narrow<uint64_t>(WorkTotal),
- .RemainingCount = gsl::narrow<uint64_t>(WorkTotal - WorkComplete),
- .Status = ProgressBar::State::CalculateStatus(IsAborted, IsPaused)},
- false);
- });
- }
-
- RebuildFolderStateStats.FinalizeTreeElapsedWallTimeUs = Timer.GetElapsedTimeUs();
- RebuildProgressBar.Finish();
- }
- }
-
std::string GetCbObjectAsNiceString(CbObjectView Object, std::string_view Prefix, std::string_view Suffix)
{
ExtendableStringBuilder<512> SB;
@@ -9974,30 +5641,46 @@ namespace {
RebuildFolderStateStatistics RebuildFolderStateStats;
VerifyFolderStatistics VerifyFolderStats;
+ const ChunkedContentLookup LocalLookup = BuildChunkedContentLookup(LocalContent);
+ const ChunkedContentLookup RemoteLookup = BuildChunkedContentLookup(RemoteContent);
+
ProgressBar::SetLogOperationProgress(ProgressMode, TaskSteps::Download, TaskSteps::StepCount);
- UpdateFolder(Storage,
- BuildId,
- Path,
- LocalContent,
- RemoteContent,
- BlockDescriptions,
- LooseChunkHashes,
- UpdateOptions{.SystemRootDir = Options.SystemRootDir,
- .ZenFolderPath = Options.ZenFolderPath,
- .LargeAttachmentSize = LargeAttachmentSize,
- .PreferredMultipartChunkSize = PreferredMultipartChunkSize,
- .PartialBlockRequestMode = Options.PartialBlockRequestMode,
- .WipeTargetFolder = Options.CleanTargetFolder,
- .PrimeCacheOnly = Options.PrimeCacheOnly,
- .EnableOtherDownloadsScavenging = Options.EnableOtherDownloadsScavenging,
- .EnableTargetFolderScavenging = Options.EnableTargetFolderScavenging},
- LocalFolderState,
- DiskStats,
- CacheMappingStats,
- DownloadStats,
- WriteChunkStats,
- RebuildFolderStateStats);
+ {
+ ConsoleOpLogOutput Output(ProgressMode);
+ BuildsOperationUpdateFolder Updater(
+ Output,
+ Storage,
+ AbortFlag,
+ PauseFlag,
+ GetIOWorkerPool(),
+ GetNetworkPool(),
+ BuildId,
+ Path,
+ LocalContent,
+ LocalLookup,
+ RemoteContent,
+ RemoteLookup,
+ BlockDescriptions,
+ LooseChunkHashes,
+ BuildsOperationUpdateFolder::Options{.IsQuiet = IsQuiet,
+ .IsVerbose = IsVerbose,
+ .AllowFileClone = AllowFileClone,
+ .UseSparseFiles = UseSparseFiles,
+ .SystemRootDir = Options.SystemRootDir,
+ .ZenFolderPath = Options.ZenFolderPath,
+ .LargeAttachmentSize = LargeAttachmentSize,
+ .PreferredMultipartChunkSize = PreferredMultipartChunkSize,
+ .PartialBlockRequestMode = Options.PartialBlockRequestMode,
+ .WipeTargetFolder = Options.CleanTargetFolder,
+ .PrimeCacheOnly = Options.PrimeCacheOnly,
+ .EnableOtherDownloadsScavenging = Options.EnableOtherDownloadsScavenging,
+ .EnableTargetFolderScavenging = Options.EnableTargetFolderScavenging,
+ .DefaultExcludeFolders = DefaultExcludeFolders,
+ .DefaultExcludeExtensions = DefaultExcludeExtensions},
+ DiskStats);
+ Updater.Execute(LocalFolderState);
+ }
if (!AbortFlag)
{
@@ -10195,7 +5878,7 @@ namespace {
{
std::unique_ptr<ChunkingController> ChunkController =
CreateChunkingControllerWithFixedChunking(ChunkingControllerWithFixedChunkingSettings{});
- std::vector<std::string_view> ExcludeExtensions = DefaultExcludeExtensions;
+ std::vector<std::string> ExcludeExtensions = DefaultExcludeExtensions;
if (OnlyChunked)
{
ExcludeExtensions.insert(ExcludeExtensions.end(),
@@ -10204,7 +5887,7 @@ namespace {
}
auto IsAcceptedFolder = [ExcludeFolders = DefaultExcludeFolders](const std::string_view& RelativePath) -> bool {
- for (const std::string_view& ExcludeFolder : ExcludeFolders)
+ for (const std::string& ExcludeFolder : ExcludeFolders)
{
if (RelativePath.starts_with(ExcludeFolder))
{
@@ -10222,7 +5905,7 @@ namespace {
};
auto IsAcceptedFile = [ExcludeExtensions](const std::string_view& RelativePath, uint64_t, uint32_t) -> bool {
- for (const std::string_view& ExcludeExtension : ExcludeExtensions)
+ for (const std::string& ExcludeExtension : ExcludeExtensions)
{
if (RelativePath.ends_with(ExcludeExtension))
{
@@ -12114,12 +7797,15 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
const std::filesystem::path DownloadPath = m_Path.parent_path() / (m_BuildPartName + "_test");
const std::filesystem::path DownloadPath2 = m_Path.parent_path() / (m_BuildPartName + "_test2");
+ const std::filesystem::path DownloadPath3 = m_Path.parent_path() / (m_BuildPartName + "_test3");
- auto ___ = MakeGuard([DownloadPath, DownloadPath2]() {
+ auto ___ = MakeGuard([DownloadPath, DownloadPath2, DownloadPath3]() {
CleanDirectory(DownloadPath, true);
DeleteDirectories(DownloadPath);
CleanDirectory(DownloadPath2, true);
DeleteDirectories(DownloadPath2);
+ CleanDirectory(DownloadPath3, true);
+ DeleteDirectories(DownloadPath3);
});
if (m_ZenFolderPath.empty())
@@ -12237,7 +7923,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
DownloadContent);
auto IsAcceptedFolder = [ExcludeFolders = DefaultExcludeFolders, Path](const std::filesystem::path& AbsolutePath) -> bool {
std::string RelativePath = std::filesystem::relative(AbsolutePath, Path).generic_string();
- for (const std::string_view& ExcludeFolder : ExcludeFolders)
+ for (const std::string& ExcludeFolder : ExcludeFolders)
{
if (RelativePath.starts_with(ExcludeFolder))
{
@@ -12451,6 +8137,26 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
{
throw std::runtime_error("Test aborted. (Download original)");
}
+
+ ZEN_CONSOLE("\nDownload Build {}, Part {} ({}) to '{}' (original)", BuildId, BuildPartId, m_BuildPartName, DownloadPath3);
+ DownloadFolder(Storage,
+ BuildId,
+ {BuildPartId},
+ {},
+ DownloadPath3,
+ DownloadOptions{.SystemRootDir = m_SystemRootDir,
+ .ZenFolderPath = DownloadPath3 / ZenFolderName,
+ .AllowMultiparts = m_AllowMultiparts,
+ .PartialBlockRequestMode = PartialBlockRequestMode,
+ .CleanTargetFolder = false,
+ .PostDownloadVerify = true,
+ .PrimeCacheOnly = false,
+ .EnableOtherDownloadsScavenging = m_EnableScavenging,
+ .EnableTargetFolderScavenging = true});
+ if (AbortFlag)
+ {
+ throw std::runtime_error("Test aborted. (Download original)");
+ }
}
}