aboutsummaryrefslogtreecommitdiff
path: root/src/zen/cmds/builds_cmd.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zen/cmds/builds_cmd.cpp')
-rw-r--r--src/zen/cmds/builds_cmd.cpp4215
1 files changed, 2843 insertions, 1372 deletions
diff --git a/src/zen/cmds/builds_cmd.cpp b/src/zen/cmds/builds_cmd.cpp
index 889ccef0b..8e8fd480a 100644
--- a/src/zen/cmds/builds_cmd.cpp
+++ b/src/zen/cmds/builds_cmd.cpp
@@ -6,7 +6,9 @@
#include <zencore/compactbinarybuilder.h>
#include <zencore/compactbinaryfile.h>
#include <zencore/compactbinaryfmt.h>
+#include <zencore/compactbinaryvalue.h>
#include <zencore/compress.h>
+#include <zencore/config.h>
#include <zencore/except.h>
#include <zencore/filesystem.h>
#include <zencore/fmtutils.h>
@@ -20,6 +22,7 @@
#include <zenhttp/httpclient.h>
#include <zenhttp/httpclientauth.h>
#include <zenhttp/httpcommon.h>
+#include <zenutil/buildstoragecache.h>
#include <zenutil/chunkblock.h>
#include <zenutil/chunkedcontent.h>
#include <zenutil/chunkedfile.h>
@@ -51,6 +54,8 @@ ZEN_THIRD_PARTY_INCLUDES_END
#define EXTRA_VERIFY 0
+#define ZEN_CLOUD_STORAGE "Cloud Storage"
+
namespace zen {
namespace {
static std::atomic<bool> AbortFlag = false;
@@ -79,30 +84,56 @@ namespace {
size_t MaxChunkEmbedSize = DefaultMaxChunkEmbedSize;
};
- const ChunksBlockParameters DefaultChunksBlockParams{.MaxBlockSize = 32u * 1024u * 1024u,
- .MaxChunkEmbedSize = DefaultChunkedParams.MaxSize};
-
+ const ChunksBlockParameters DefaultChunksBlockParams{
+ .MaxBlockSize = 32u * 1024u * 1024u,
+ .MaxChunkEmbedSize = 2u * 1024u * 1024u // DefaultChunkedParams.MaxSize
+ };
const uint64_t DefaultPreferredMultipartChunkSize = 32u * 1024u * 1024u;
const double DefaultLatency = 0; // .0010;
const double DefaultDelayPerKBSec = 0; // 0.00005;
- const std::string ZenFolderName = ".zen";
- const std::string ZenStateFilePath = fmt::format("{}/current_state.cbo", ZenFolderName);
- const std::string ZenStateFileJsonPath = fmt::format("{}/current_state.json", ZenFolderName);
- const std::string ZenTempFolderName = fmt::format("{}/tmp", ZenFolderName);
+ const bool SingleThreaded = false;
+ bool BoostWorkerThreads = false;
- const std::string ZenTempCacheFolderName =
- fmt::format("{}/cache", ZenTempFolderName); // Decompressed and verified data - chunks & sequences
- const std::string ZenTempBlockFolderName = fmt::format("{}/blocks", ZenTempFolderName); // Temp storage for whole and partial blocks
- const std::string ZenTempChunkFolderName =
- fmt::format("{}/chunks", ZenTempFolderName); // Temp storage for decompressed and validated chunks
+ WorkerThreadPool& GetIOWorkerPool()
+ {
+ return SingleThreaded ? GetSyncWorkerPool()
+ : BoostWorkerThreads ? GetLargeWorkerPool(EWorkloadType::Burst)
+ : GetMediumWorkerPool(EWorkloadType::Burst);
+ }
+ WorkerThreadPool& GetNetworkPool() { return SingleThreaded ? GetSyncWorkerPool() : GetSmallWorkerPool(EWorkloadType::Burst); }
- const std::string ZenTempDownloadFolderName =
- fmt::format("{}/download", ZenTempFolderName); // Temp storage for unverfied downloaded blobs
+ const uint64_t MinimumSizeForCompressInBlock = 2u * 1024u;
- const std::string ZenTempStorageFolderName =
- fmt::format("{}/storage", ZenTempFolderName); // Temp storage folder for BuildStorage implementations
+ const std::string ZenFolderName = ".zen";
+ std::filesystem::path ZenStateFilePath(const std::filesystem::path& ZenFolderPath) { return ZenFolderPath / "current_state.cbo"; }
+ // std::filesystem::path ZenStateFileJsonPath(const std::filesystem::path& ZenFolderPath) { return ZenFolderPath / "current_state.json";
+ // }
+ std::filesystem::path ZenTempFolderPath(const std::filesystem::path& ZenFolderPath) { return ZenFolderPath / "tmp"; }
+
+ std::filesystem::path ZenTempCacheFolderPath(const std::filesystem::path& ZenFolderPath)
+ {
+ return ZenTempFolderPath(ZenFolderPath) / "cache"; // Decompressed and verified data - chunks & sequences
+ }
+ std::filesystem::path ZenTempBlockFolderPath(const std::filesystem::path& ZenFolderPath)
+ {
+ return ZenTempFolderPath(ZenFolderPath) / "blocks"; // Temp storage for whole and partial blocks
+ }
+ std::filesystem::path ZenTempChunkFolderPath(const std::filesystem::path& ZenFolderPath)
+ {
+ return ZenTempFolderPath(ZenFolderPath) / "chunks"; // Temp storage for decompressed and validated chunks
+ }
+
+ std::filesystem::path ZenTempDownloadFolderPath(const std::filesystem::path& ZenFolderPath)
+ {
+ return ZenTempFolderPath(ZenFolderPath) / "download"; // Temp storage for decompressed and validated chunks
+ }
+
+ // std::filesystem::path ZenTempStorageFolderPath(const std::filesystem::path& ZenFolderPath)
+ // {
+ // return ZenTempFolderPath(ZenFolderPath) / "storage"; // Temp storage folder for BuildStorage implementations
+ // }
const std::string ZenExcludeManifestName = ".zen_exclude_manifest.txt";
@@ -133,6 +164,98 @@ namespace {
);
+ std::filesystem::path MakeSafeAbsolutePath(const std::string Path)
+ {
+ std::filesystem::path AbsolutePath = std::filesystem::absolute(StringToPath(Path)).make_preferred();
+#if ZEN_PLATFORM_WINDOWS && 1
+ const std::string_view Prefix = "\\\\?\\";
+ const std::u8string PrefixU8(Prefix.begin(), Prefix.end());
+ std::u8string PathString = AbsolutePath.u8string();
+ if (!PathString.empty() && !PathString.starts_with(PrefixU8))
+ {
+ PathString.insert(0, PrefixU8);
+ return std::filesystem::path(PathString);
+ }
+#endif
+ return AbsolutePath;
+ }
+
+ void RenameFileWithRetry(const std::filesystem::path& SourcePath, const std::filesystem::path& TargetPath)
+ {
+ std::error_code Ec;
+ RenameFile(SourcePath, TargetPath, Ec);
+ for (size_t Retries = 0; Ec && Retries < 3; Retries++)
+ {
+ Sleep(100 + int(Retries * 50));
+ RenameFile(SourcePath, TargetPath, Ec);
+ }
+ if (Ec)
+ {
+ zen::ThrowSystemError(Ec.value(), Ec.message());
+ }
+ }
+
+ bool SetFileReadOnlyWithRetry(const std::filesystem::path& Path, bool ReadOnly)
+ {
+ std::error_code Ec;
+ bool Result = SetFileReadOnly(Path, ReadOnly, Ec);
+ for (size_t Retries = 0; Ec && Retries < 3; Retries++)
+ {
+ Sleep(100 + int(Retries * 50));
+ if (!IsFile(Path))
+ {
+ return false;
+ }
+ Ec.clear();
+ Result = SetFileReadOnly(Path, ReadOnly, Ec);
+ }
+ if (Ec)
+ {
+ zen::ThrowSystemError(Ec.value(), Ec.message());
+ }
+ return Result;
+ }
+
+ void RemoveFileWithRetry(const std::filesystem::path& Path)
+ {
+ std::error_code Ec;
+ RemoveFile(Path, Ec);
+ for (size_t Retries = 0; Ec && Retries < 3; Retries++)
+ {
+ Sleep(100 + int(Retries * 50));
+ if (!IsFile(Path))
+ {
+ return;
+ }
+ Ec.clear();
+ RemoveFile(Path, Ec);
+ }
+ if (Ec)
+ {
+ zen::ThrowSystemError(Ec.value(), Ec.message());
+ }
+ }
+
+ void RemoveDirWithRetry(const std::filesystem::path& Path)
+ {
+ std::error_code Ec;
+ RemoveDir(Path, Ec);
+ for (size_t Retries = 0; Ec && Retries < 3; Retries++)
+ {
+ Sleep(100 + int(Retries * 50));
+ if (!IsDir(Path))
+ {
+ return;
+ }
+ Ec.clear();
+ RemoveDir(Path, Ec);
+ }
+ if (Ec)
+ {
+ zen::ThrowSystemError(Ec.value(), Ec.message());
+ }
+ }
+
uint32_t SetNativeFileAttributes(const std::filesystem::path FilePath, SourcePlatform SourcePlatform, uint32_t Attributes)
{
#if ZEN_PLATFORM_WINDOWS
@@ -195,34 +318,109 @@ namespace {
bool CleanDirectory(const std::filesystem::path& Path, std::span<const std::string_view> ExcludeDirectories)
{
ZEN_TRACE_CPU("CleanDirectory");
+ Stopwatch Timer;
- bool CleanWipe = true;
+ ProgressBar Progress(UsePlainProgress);
- DirectoryContent LocalDirectoryContent;
- GetDirectoryContent(Path, DirectoryContentFlags::IncludeDirs | DirectoryContentFlags::IncludeFiles, LocalDirectoryContent);
- for (const std::filesystem::path& LocalFilePath : LocalDirectoryContent.Files)
+ std::atomic<bool> CleanWipe = true;
+ std::atomic<uint64_t> DiscoveredItemCount = 0;
+ std::atomic<uint64_t> DeletedItemCount = 0;
+ std::atomic<uint64_t> DeletedByteCount = 0;
+ ParallellWork Work(AbortFlag);
+
+ struct AsyncVisitor : public GetDirectoryContentVisitor
{
- try
+ AsyncVisitor(const std::filesystem::path& InPath,
+ std::atomic<bool>& InCleanWipe,
+ std::atomic<uint64_t>& InDiscoveredItemCount,
+ std::atomic<uint64_t>& InDeletedItemCount,
+ std::atomic<uint64_t>& InDeletedByteCount,
+ std::span<const std::string_view> InExcludeDirectories)
+ : Path(InPath)
+ , CleanWipe(InCleanWipe)
+ , DiscoveredItemCount(InDiscoveredItemCount)
+ , DeletedItemCount(InDeletedItemCount)
+ , DeletedByteCount(InDeletedByteCount)
+ , ExcludeDirectories(InExcludeDirectories)
{
- std::filesystem::remove(LocalFilePath);
}
- catch (const std::exception&)
+ virtual void AsyncVisitDirectory(const std::filesystem::path& RelativeRoot, DirectoryContent&& Content) override
{
- // DeleteOnClose files may be a bit slow in getting cleaned up, so pause amd retry one time
- Sleep(200);
- try
- {
- std::filesystem::remove(LocalFilePath);
- }
- catch (const std::exception& Ex)
+ ZEN_TRACE_CPU("CleanDirectory_AsyncVisitDirectory");
+ if (!AbortFlag)
{
- ZEN_WARN("Failed removing file {}. Reason: {}", LocalFilePath, Ex.what());
- CleanWipe = false;
+ if (!Content.FileNames.empty())
+ {
+ DiscoveredItemCount += Content.FileNames.size();
+
+ const std::string RelativeRootString = RelativeRoot.generic_string();
+ bool RemoveContent = true;
+ for (const std::string_view ExcludeDirectory : ExcludeDirectories)
+ {
+ if (RelativeRootString.starts_with(ExcludeDirectory))
+ {
+ if (RelativeRootString.length() > ExcludeDirectory.length())
+ {
+ const char MaybePathDelimiter = RelativeRootString[ExcludeDirectory.length()];
+ if (MaybePathDelimiter == '/' || MaybePathDelimiter == '\\' ||
+ MaybePathDelimiter == std::filesystem::path::preferred_separator)
+ {
+ RemoveContent = false;
+ break;
+ }
+ }
+ else
+ {
+ RemoveContent = false;
+ break;
+ }
+ }
+ }
+ if (RemoveContent)
+ {
+ ZEN_TRACE_CPU("DeleteFiles");
+ for (size_t FileIndex = 0; FileIndex < Content.FileNames.size(); FileIndex++)
+ {
+ const std::filesystem::path& FileName = Content.FileNames[FileIndex];
+ const std::filesystem::path FilePath = (Path / RelativeRoot / FileName).make_preferred();
+ try
+ {
+ SetFileReadOnlyWithRetry(FilePath, false);
+ RemoveFileWithRetry(FilePath);
+ DeletedItemCount++;
+ DeletedByteCount += Content.FileSizes[FileIndex];
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_WARN("Failed removing file {}. Reason: {}", FilePath, Ex.what());
+ CleanWipe = false;
+ }
+ }
+ }
+ }
}
}
- }
+ const std::filesystem::path& Path;
+ std::atomic<bool>& CleanWipe;
+ std::atomic<uint64_t>& DiscoveredItemCount;
+ std::atomic<uint64_t>& DeletedItemCount;
+ std::atomic<uint64_t>& DeletedByteCount;
+ std::span<const std::string_view> ExcludeDirectories;
+ } Visitor(Path, CleanWipe, DiscoveredItemCount, DeletedItemCount, DeletedByteCount, ExcludeDirectories);
- for (const std::filesystem::path& LocalDirPath : LocalDirectoryContent.Directories)
+ GetDirectoryContent(
+ Path,
+ DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::Recursive | DirectoryContentFlags::IncludeFileSizes,
+ Visitor,
+ GetIOWorkerPool(),
+ Work.PendingWork());
+
+ DirectoryContent LocalDirectoryContent;
+ GetDirectoryContent(Path, DirectoryContentFlags::IncludeDirs | DirectoryContentFlags::IncludeFiles, LocalDirectoryContent);
+ DiscoveredItemCount += LocalDirectoryContent.Directories.size();
+ std::vector<std::filesystem::path> DirectoriesToDelete;
+ DirectoriesToDelete.reserve(LocalDirectoryContent.Directories.size());
+ for (std::filesystem::path& LocalDirPath : LocalDirectoryContent.Directories)
{
bool Leave = false;
for (const std::string_view ExcludeDirectory : ExcludeDirectories)
@@ -235,33 +433,84 @@ namespace {
}
if (!Leave)
{
- try
- {
- zen::CleanDirectory(LocalDirPath);
- std::filesystem::remove(LocalDirPath);
- }
- catch (const std::exception&)
+ DirectoriesToDelete.emplace_back(std::move(LocalDirPath));
+ DiscoveredItemCount++;
+ }
+ }
+
+ uint64_t LastUpdateTimeMs = Timer.GetElapsedTimeMs();
+
+ Work.Wait(UsePlainProgress ? 5000 : 200, [&](bool IsAborted, ptrdiff_t PendingWork) {
+ ZEN_UNUSED(IsAborted, PendingWork);
+ LastUpdateTimeMs = Timer.GetElapsedTimeMs();
+
+ uint64_t Deleted = DeletedItemCount.load();
+ uint64_t DeletedBytes = DeletedByteCount.load();
+ uint64_t Discovered = DiscoveredItemCount.load();
+ Progress.UpdateState({.Task = "Cleaning folder ",
+ .Details = fmt::format("Found {}, Deleted {} ({})", Discovered, Deleted, NiceBytes(DeletedBytes)),
+ .TotalCount = Discovered,
+ .RemainingCount = Discovered - Deleted},
+ false);
+ });
+
+ for (const std::filesystem::path& DirectoryToDelete : DirectoriesToDelete)
+ {
+ ZEN_TRACE_CPU("DeleteDirs");
+ try
+ {
+ std::error_code Ec;
+ zen::CleanDirectory(DirectoryToDelete, true, Ec);
+ if (Ec)
{
Sleep(200);
- try
- {
- zen::CleanDirectory(LocalDirPath);
- std::filesystem::remove(LocalDirPath);
- }
- catch (const std::exception& Ex)
- {
- ZEN_WARN("Failed removing directory {}. Reason: {}", LocalDirPath, Ex.what());
- CleanWipe = false;
- }
+ zen::CleanDirectory(DirectoryToDelete, true);
+ Ec.clear();
}
+
+ RemoveDirWithRetry(DirectoryToDelete);
+
+ DeletedItemCount++;
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_WARN("Failed removing directory {}. Reason: {}", DirectoryToDelete, Ex.what());
+ CleanWipe = false;
+ }
+
+ uint64_t NowMs = Timer.GetElapsedTimeMs();
+ if ((NowMs - LastUpdateTimeMs) >= (UsePlainProgress ? 5000 : 200))
+ {
+ LastUpdateTimeMs = NowMs;
+
+ uint64_t Deleted = DeletedItemCount.load();
+ uint64_t DeletedBytes = DeletedByteCount.load();
+ uint64_t Discovered = DiscoveredItemCount.load();
+ Progress.UpdateState({.Task = "Cleaning folder ",
+ .Details = fmt::format("Found {}, Deleted {} ({})", Discovered, Deleted, NiceBytes(DeletedBytes)),
+ .TotalCount = Discovered,
+ .RemainingCount = Discovered - Deleted},
+ false);
}
}
+
+ Progress.Finish();
+
+ uint64_t ElapsedTimeMs = Timer.GetElapsedTimeMs();
+ if (ElapsedTimeMs >= 200)
+ {
+ ZEN_CONSOLE("Wiped folder '{}' {} ({}) in {}",
+ Path,
+ DiscoveredItemCount.load(),
+ NiceBytes(DeletedByteCount.load()),
+ NiceTimeSpanMs(ElapsedTimeMs));
+ }
return CleanWipe;
}
std::string ReadAccessTokenFromFile(const std::filesystem::path& Path)
{
- if (!std::filesystem::is_regular_file(Path))
+ if (!IsFile(Path))
{
throw std::runtime_error(fmt::format("the file '{}' does not exist", Path));
}
@@ -304,7 +553,7 @@ namespace {
const IoHash& Hash,
const std::string& Suffix = {})
{
- std::filesystem::path TempFilePath = (TempFolderPath / (Hash.ToHexString() + Suffix)).make_preferred();
+ std::filesystem::path TempFilePath = TempFolderPath / (Hash.ToHexString() + Suffix);
return WriteToTempFile(std::move(Buffer), TempFilePath);
}
@@ -402,12 +651,12 @@ namespace {
std::filesystem::path GetTempChunkedSequenceFileName(const std::filesystem::path& CacheFolderPath, const IoHash& RawHash)
{
- return (CacheFolderPath / (RawHash.ToHexString() + ".tmp")).make_preferred();
+ return CacheFolderPath / (RawHash.ToHexString() + ".tmp");
}
std::filesystem::path GetFinalChunkedSequenceFileName(const std::filesystem::path& CacheFolderPath, const IoHash& RawHash)
{
- return (CacheFolderPath / RawHash.ToHexString()).make_preferred();
+ return CacheFolderPath / RawHash.ToHexString();
}
ChunkedFolderContent ScanAndChunkFolder(
@@ -425,7 +674,7 @@ namespace {
Path,
std::move(IsAcceptedFolder),
std::move(IsAcceptedFile),
- GetMediumWorkerPool(EWorkloadType::Burst),
+ GetIOWorkerPool(),
UsePlainProgress ? 5000 : 200,
[](bool, std::ptrdiff_t) {},
AbortFlag);
@@ -439,7 +688,7 @@ namespace {
FilteredBytesHashed.Start();
ChunkedFolderContent FolderContent = ChunkFolderContent(
ChunkingStats,
- GetMediumWorkerPool(EWorkloadType::Burst),
+ GetIOWorkerPool(),
Path,
Content,
ChunkController,
@@ -501,6 +750,7 @@ namespace {
uint64_t AcceptedBlockCount = 0;
uint64_t AcceptedChunkCount = 0;
uint64_t AcceptedByteCount = 0;
+ uint64_t AcceptedRawByteCount = 0;
uint64_t RejectedBlockCount = 0;
uint64_t RejectedChunkCount = 0;
uint64_t RejectedByteCount = 0;
@@ -539,6 +789,7 @@ namespace {
uint64_t ChunkCount = 0;
uint64_t ChunkByteCount = 0;
std::atomic<uint64_t> CompressedChunkCount = 0;
+ std::atomic<uint64_t> CompressedChunkRawBytes = 0;
std::atomic<uint64_t> CompressedChunkBytes = 0;
uint64_t CompressChunksElapsedWallTimeUS = 0;
@@ -547,6 +798,7 @@ namespace {
ChunkCount += Rhs.ChunkCount;
ChunkByteCount += Rhs.ChunkByteCount;
CompressedChunkCount += Rhs.CompressedChunkCount;
+ CompressedChunkRawBytes += Rhs.CompressedChunkRawBytes;
CompressedChunkBytes += Rhs.CompressedChunkBytes;
CompressChunksElapsedWallTimeUS += Rhs.CompressChunksElapsedWallTimeUS;
return *this;
@@ -603,11 +855,9 @@ namespace {
struct WriteChunkStatistics
{
- std::atomic<uint32_t> ChunkCountWritten = 0;
- std::atomic<uint64_t> ChunkBytesWritten = 0;
- uint64_t DownloadTimeUs = 0;
- uint64_t WriteTimeUs = 0;
- uint64_t WriteChunksElapsedWallTimeUs = 0;
+ uint64_t DownloadTimeUs = 0;
+ uint64_t WriteTimeUs = 0;
+ uint64_t WriteChunksElapsedWallTimeUs = 0;
};
struct RebuildFolderStateStatistics
@@ -626,6 +876,16 @@ namespace {
uint64_t VerifyElapsedWallTimeUs = 0;
};
+ struct StorageInstance
+ {
+ std::unique_ptr<HttpClient> BuildStorageHttp;
+ std::unique_ptr<BuildStorage> BuildStorage;
+ std::string StorageName;
+ std::unique_ptr<HttpClient> CacheHttp;
+ std::unique_ptr<BuildStorageCache> BuildCacheStorage;
+ std::string CacheName;
+ };
+
std::vector<uint32_t> CalculateAbsoluteChunkOrders(const std::span<const IoHash> LocalChunkHashes,
const std::span<const uint32_t> LocalChunkOrder,
const tsl::robin_map<IoHash, uint32_t, IoHash::Hasher>& ChunkHashToLocalChunkIndex,
@@ -772,7 +1032,7 @@ namespace {
std::span<const uint32_t> ChunkCounts,
std::span<const IoHash> LocalChunkHashes,
std::span<const uint64_t> LocalChunkRawSizes,
- std::vector<uint32_t> AbsoluteChunkOrders,
+ const std::vector<uint32_t>& AbsoluteChunkOrders,
const std::span<const uint32_t> LooseLocalChunkIndexes,
const std::span<IoHash> BlockHashes)
{
@@ -1352,7 +1612,15 @@ namespace {
ZEN_ASSERT(false);
}
uint64_t RawSize = Chunk.GetSize();
- return {RawSize, CompressedBuffer::Compress(Chunk, OodleCompressor::Mermaid, OodleCompressionLevel::None)};
+ if (Lookup.RawHashToSequenceIndex.contains(ChunkHash) && RawSize >= MinimumSizeForCompressInBlock)
+ {
+ // Standalone chunk, not part of a sequence
+ return {RawSize, CompressedBuffer::Compress(Chunk, OodleCompressor::Mermaid)};
+ }
+ else
+ {
+ return {RawSize, CompressedBuffer::Compress(Chunk, OodleCompressor::Mermaid, OodleCompressionLevel::None)};
+ }
}));
}
@@ -1376,13 +1644,24 @@ namespace {
{
std::span<const ChunkedContentLookup::ChunkSequenceLocation> ChunkLocations = GetChunkSequenceLocations(Lookup, ChunkIndex);
ZEN_ASSERT(!ChunkLocations.empty());
- CompositeBuffer Chunk = OpenFileCache.GetRange(ChunkLocations[0].SequenceIndex,
+ const IoHash& ChunkHash = Content.ChunkedContent.ChunkHashes[ChunkIndex];
+ CompositeBuffer Chunk = OpenFileCache.GetRange(ChunkLocations[0].SequenceIndex,
ChunkLocations[0].Offset,
Content.ChunkedContent.ChunkRawSizes[ChunkIndex]);
- ZEN_ASSERT_SLOW(IoHash::HashBuffer(Chunk) == Content.ChunkedContent.ChunkHashes[ChunkIndex]);
- CompositeBuffer CompressedChunk =
- CompressedBuffer::Compress(std::move(Chunk), OodleCompressor::Mermaid, OodleCompressionLevel::None).GetCompressed();
- ResultBuffers.insert(ResultBuffers.end(), CompressedChunk.GetSegments().begin(), CompressedChunk.GetSegments().end());
+ ZEN_ASSERT_SLOW(IoHash::HashBuffer(Chunk) == ChunkHash);
+
+ const uint64_t RawSize = Chunk.GetSize();
+ if (Lookup.RawHashToSequenceIndex.contains(ChunkHash) && RawSize >= MinimumSizeForCompressInBlock)
+ {
+ CompositeBuffer CompressedChunk = CompressedBuffer::Compress(std::move(Chunk), OodleCompressor::Mermaid).GetCompressed();
+ ResultBuffers.insert(ResultBuffers.end(), CompressedChunk.GetSegments().begin(), CompressedChunk.GetSegments().end());
+ }
+ else
+ {
+ CompositeBuffer CompressedChunk =
+ CompressedBuffer::Compress(std::move(Chunk), OodleCompressor::Mermaid, OodleCompressionLevel::None).GetCompressed();
+ ResultBuffers.insert(ResultBuffers.end(), CompressedChunk.GetSegments().begin(), CompressedChunk.GetSegments().end());
+ }
}
return CompressedBuffer::FromCompressedNoValidate(CompositeBuffer(std::move(ResultBuffers)));
};
@@ -1444,7 +1723,7 @@ namespace {
for (auto& WorkItem : WorkItems)
{
Work.ScheduleWork(
- NetworkPool, // GetSyncWorkerPool(),//
+ NetworkPool,
[WorkItem = std::move(WorkItem)](std::atomic<bool>&) {
ZEN_TRACE_CPU("DownloadLargeBlob_Work");
if (!AbortFlag)
@@ -1513,15 +1792,16 @@ namespace {
}
ValidateStats.BlockAttachmentCount = BlockAttachments.size();
- std::vector<ChunkBlockDescription> VerifyBlockDescriptions = Storage.GetBlockMetadata(BuildId, BlockAttachments);
+ std::vector<ChunkBlockDescription> VerifyBlockDescriptions =
+ ParseChunkBlockDescriptionList(Storage.GetBlockMetadatas(BuildId, BlockAttachments));
if (VerifyBlockDescriptions.size() != BlockAttachments.size())
{
throw std::runtime_error(fmt::format("Uploaded blocks metadata could not all be found, {} blocks metadata is missing",
BlockAttachments.size() - VerifyBlockDescriptions.size()));
}
- WorkerThreadPool& NetworkPool = GetSmallWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); //
- WorkerThreadPool& VerifyPool = GetMediumWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); //
+ WorkerThreadPool& NetworkPool = GetNetworkPool();
+ WorkerThreadPool& VerifyPool = GetIOWorkerPool();
ParallellWork Work(AbortFlag);
const std::filesystem::path TempFolder = ".zen-tmp";
@@ -1530,7 +1810,8 @@ namespace {
auto __ = MakeGuard([&TempFolder]() {
if (CleanDirectory(TempFolder, {}))
{
- std::filesystem::remove(TempFolder);
+ std::error_code DummyEc;
+ RemoveDir(TempFolder, DummyEc);
}
});
@@ -1787,7 +2068,6 @@ namespace {
const ChunkedContentLookup& Lookup,
uint32_t ChunkIndex,
const std::filesystem::path& TempFolderPath,
- std::atomic<uint64_t>& ReadRawBytes,
LooseChunksStatistics& LooseChunksStats)
{
ZEN_TRACE_CPU("CompressChunk");
@@ -1808,11 +2088,11 @@ namespace {
}
ZEN_ASSERT_SLOW(IoHash::HashBuffer(RawSource) == ChunkHash);
{
- std::filesystem::path TempFilePath = (TempFolderPath / ChunkHash.ToHexString()).make_preferred();
+ std::filesystem::path TempFilePath = TempFolderPath / ChunkHash.ToHexString();
BasicFile CompressedFile;
std::error_code Ec;
- CompressedFile.Open(TempFilePath, BasicFile::Mode::kTruncate, Ec);
+ CompressedFile.Open(TempFilePath, BasicFile::Mode::kTruncateDelete, Ec);
if (Ec)
{
throw std::runtime_error(
@@ -1823,7 +2103,7 @@ namespace {
CompositeBuffer(SharedBuffer(RawSource)),
[&](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& RangeBuffer) {
ZEN_UNUSED(SourceOffset);
- ReadRawBytes += SourceSize;
+ LooseChunksStats.CompressedChunkRawBytes += SourceSize;
CompressedFile.Write(RangeBuffer, Offset);
LooseChunksStats.CompressedChunkBytes += RangeBuffer.GetSize();
});
@@ -1850,7 +2130,7 @@ namespace {
return Compressed.GetCompressed();
}
CompressedFile.Close();
- std::filesystem::remove(TempFilePath, Ec);
+ RemoveFile(TempFilePath, Ec);
ZEN_UNUSED(Ec);
}
@@ -1881,7 +2161,7 @@ namespace {
void GenerateBuildBlocks(const std::filesystem::path& Path,
const ChunkedFolderContent& Content,
const ChunkedContentLookup& Lookup,
- BuildStorage& Storage,
+ StorageInstance& Storage,
const Oid& BuildId,
const std::vector<std::vector<uint32_t>>& NewBlockChunks,
GeneratedBlocks& OutBlocks,
@@ -1904,9 +2184,8 @@ namespace {
RwLock Lock;
- WorkerThreadPool& GenerateBlobsPool =
- GetMediumWorkerPool(EWorkloadType::Burst); // GetMediumWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool();//
- WorkerThreadPool& UploadBlocksPool = GetSmallWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool();//
+ WorkerThreadPool& GenerateBlobsPool = GetIOWorkerPool();
+ WorkerThreadPool& UploadBlocksPool = GetNetworkPool();
FilteredRate FilteredGeneratedBytesPerSecond;
FilteredRate FilteredUploadedBytesPerSecond;
@@ -2005,21 +2284,35 @@ namespace {
const IoHash& BlockHash = OutBlocks.BlockDescriptions[BlockIndex].BlockHash;
const uint64_t CompressedBlockSize = Payload.GetCompressedSize();
- Storage.PutBuildBlob(BuildId,
- BlockHash,
- ZenContentType::kCompressedBinary,
- std::move(Payload).GetCompressed());
+ if (Storage.BuildCacheStorage)
+ {
+ Storage.BuildCacheStorage->PutBuildBlob(BuildId,
+ BlockHash,
+ ZenContentType::kCompressedBinary,
+ Payload.GetCompressed());
+ }
+
+ Storage.BuildStorage->PutBuildBlob(BuildId,
+ BlockHash,
+ ZenContentType::kCompressedBinary,
+ std::move(Payload).GetCompressed());
UploadStats.BlocksBytes += CompressedBlockSize;
+
ZEN_CONSOLE_VERBOSE("Uploaded block {} ({}) containing {} chunks",
- OutBlocks.BlockDescriptions[BlockIndex].BlockHash,
+ BlockHash,
NiceBytes(CompressedBlockSize),
OutBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size());
- Storage.PutBlockMetadata(BuildId,
- OutBlocks.BlockDescriptions[BlockIndex].BlockHash,
- BlockMetaData);
+ if (Storage.BuildCacheStorage)
+ {
+ Storage.BuildCacheStorage->PutBlobMetadatas(BuildId,
+ std::vector<IoHash>({BlockHash}),
+ std::vector<CbObject>({BlockMetaData}));
+ }
+
+ Storage.BuildStorage->PutBlockMetadata(BuildId, BlockHash, BlockMetaData);
ZEN_CONSOLE_VERBOSE("Uploaded block {} metadata ({})",
- OutBlocks.BlockDescriptions[BlockIndex].BlockHash,
+ BlockHash,
NiceBytes(BlockMetaData.GetSize()));
OutBlocks.MetaDataHasBeenUploaded[BlockIndex] = true;
@@ -2074,9 +2367,10 @@ namespace {
}
}
- void UploadPartBlobs(BuildStorage& Storage,
+ void UploadPartBlobs(StorageInstance& Storage,
const Oid& BuildId,
const std::filesystem::path& Path,
+ const std::filesystem::path& ZenFolderPath,
const ChunkedFolderContent& Content,
const ChunkedContentLookup& Lookup,
std::span<IoHash> RawHashes,
@@ -2092,8 +2386,8 @@ namespace {
{
ProgressBar ProgressBar(UsePlainProgress);
- WorkerThreadPool& ReadChunkPool = GetMediumWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); //
- WorkerThreadPool& UploadChunkPool = GetSmallWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); //
+ WorkerThreadPool& ReadChunkPool = GetIOWorkerPool();
+ WorkerThreadPool& UploadChunkPool = GetNetworkPool();
FilteredRate FilteredGenerateBlockBytesPerSecond;
FilteredRate FilteredCompressedBytesPerSecond;
@@ -2150,7 +2444,7 @@ namespace {
if (QueuedPendingInMemoryBlocksForUpload.load() > 16)
{
ZEN_TRACE_CPU("AsyncUploadBlock_WriteTempBlock");
- Payload = CompositeBuffer(WriteToTempFile(std::move(Payload), Path / ZenTempBlockFolderName, BlockHash));
+ Payload = CompositeBuffer(WriteToTempFile(std::move(Payload), ZenTempBlockFolderPath(ZenFolderPath), BlockHash));
IsInMemoryBlock = false;
}
else
@@ -2177,18 +2471,26 @@ namespace {
const CbObject BlockMetaData =
BuildChunkBlockDescription(NewBlocks.BlockDescriptions[BlockIndex], NewBlocks.BlockMetaDatas[BlockIndex]);
- Storage.PutBuildBlob(BuildId, BlockHash, ZenContentType::kCompressedBinary, Payload);
+ if (Storage.BuildCacheStorage)
+ {
+ Storage.BuildCacheStorage->PutBuildBlob(BuildId, BlockHash, ZenContentType::kCompressedBinary, Payload);
+ }
+ Storage.BuildStorage->PutBuildBlob(BuildId, BlockHash, ZenContentType::kCompressedBinary, Payload);
ZEN_CONSOLE_VERBOSE("Uploaded block {} ({}) containing {} chunks",
- NewBlocks.BlockDescriptions[BlockIndex].BlockHash,
+ BlockHash,
NiceBytes(PayloadSize),
NewBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size());
UploadedBlockSize += PayloadSize;
UploadStats.BlocksBytes += PayloadSize;
- Storage.PutBlockMetadata(BuildId, BlockHash, BlockMetaData);
- ZEN_CONSOLE_VERBOSE("Uploaded block {} metadata ({})",
- NewBlocks.BlockDescriptions[BlockIndex].BlockHash,
- NiceBytes(BlockMetaData.GetSize()));
+ if (Storage.BuildCacheStorage)
+ {
+ Storage.BuildCacheStorage->PutBlobMetadatas(BuildId,
+ std::vector<IoHash>({BlockHash}),
+ std::vector<CbObject>({BlockMetaData}));
+ }
+ Storage.BuildStorage->PutBlockMetadata(BuildId, BlockHash, BlockMetaData);
+ ZEN_CONSOLE_VERBOSE("Uploaded block {} metadata ({})", BlockHash, NiceBytes(BlockMetaData.GetSize()));
NewBlocks.MetaDataHasBeenUploaded[BlockIndex] = true;
@@ -2214,12 +2516,17 @@ namespace {
ZEN_TRACE_CPU("AsyncUploadLooseChunk");
const uint64_t PayloadSize = Payload.GetSize();
- ;
+
+ if (Storage.BuildCacheStorage)
+ {
+ Storage.BuildCacheStorage->PutBuildBlob(BuildId, RawHash, ZenContentType::kCompressedBinary, Payload);
+ }
+
if (PayloadSize >= LargeAttachmentSize)
{
ZEN_TRACE_CPU("AsyncUploadLooseChunk_Multipart");
UploadStats.MultipartAttachmentCount++;
- std::vector<std::function<void()>> MultipartWork = Storage.PutLargeBuildBlob(
+ std::vector<std::function<void()>> MultipartWork = Storage.BuildStorage->PutLargeBuildBlob(
BuildId,
RawHash,
ZenContentType::kCompressedBinary,
@@ -2264,7 +2571,7 @@ namespace {
else
{
ZEN_TRACE_CPU("AsyncUploadLooseChunk_Singlepart");
- Storage.PutBuildBlob(BuildId, RawHash, ZenContentType::kCompressedBinary, Payload);
+ Storage.BuildStorage->PutBuildBlob(BuildId, RawHash, ZenContentType::kCompressedBinary, Payload);
ZEN_CONSOLE_VERBOSE("Uploaded chunk {} ({})", RawHash, NiceBytes(PayloadSize));
UploadStats.ChunksBytes += Payload.GetSize();
UploadStats.ChunkCount++;
@@ -2286,16 +2593,8 @@ namespace {
std::atomic<uint64_t> GeneratedBlockCount = 0;
std::atomic<uint64_t> GeneratedBlockByteCount = 0;
- std::vector<uint32_t> CompressLooseChunkOrderIndexes;
-
std::atomic<uint64_t> QueuedPendingInMemoryBlocksForUpload = 0;
- // Start upload of any pre-compressed loose chunks
- for (const uint32_t LooseChunkOrderIndex : LooseChunkOrderIndexes)
- {
- CompressLooseChunkOrderIndexes.push_back(LooseChunkOrderIndex);
- }
-
// Start generation of any non-prebuilt blocks and schedule upload
for (const size_t BlockIndex : BlockIndexes)
{
@@ -2303,7 +2602,7 @@ namespace {
if (!AbortFlag)
{
Work.ScheduleWork(
- ReadChunkPool, // GetSyncWorkerPool()
+ ReadChunkPool,
[&, BlockIndex](std::atomic<bool>&) {
if (!AbortFlag)
{
@@ -2355,34 +2654,27 @@ namespace {
}
}
- std::atomic<uint64_t> RawLooseChunkByteCount = 0;
-
// Start compression of any non-precompressed loose chunks and schedule upload
- for (const uint32_t CompressLooseChunkOrderIndex : CompressLooseChunkOrderIndexes)
+ for (const uint32_t LooseChunkOrderIndex : LooseChunkOrderIndexes)
{
- const uint32_t ChunkIndex = LooseChunkIndexes[CompressLooseChunkOrderIndex];
+ const uint32_t ChunkIndex = LooseChunkIndexes[LooseChunkOrderIndex];
Work.ScheduleWork(
- ReadChunkPool, // GetSyncWorkerPool(),// ReadChunkPool,
+ ReadChunkPool,
[&, ChunkIndex](std::atomic<bool>&) {
if (!AbortFlag)
{
ZEN_TRACE_CPU("UploadPartBlobs_CompressChunk");
FilteredCompressedBytesPerSecond.Start();
- CompositeBuffer Payload = CompressChunk(Path,
- Content,
- Lookup,
- ChunkIndex,
- Path / ZenTempChunkFolderName,
- RawLooseChunkByteCount,
- LooseChunksStats);
+ CompositeBuffer Payload =
+ CompressChunk(Path, Content, Lookup, ChunkIndex, ZenTempChunkFolderPath(ZenFolderPath), LooseChunksStats);
ZEN_CONSOLE_VERBOSE("Compressed chunk {} ({} -> {})",
Content.ChunkedContent.ChunkHashes[ChunkIndex],
NiceBytes(Content.ChunkedContent.ChunkRawSizes[ChunkIndex]),
NiceBytes(Payload.GetSize()));
const uint64_t ChunkRawSize = Content.ChunkedContent.ChunkRawSizes[ChunkIndex];
UploadStats.ReadFromDiskBytes += ChunkRawSize;
- if (LooseChunksStats.CompressedChunkCount == CompressLooseChunkOrderIndexes.size())
+ if (LooseChunksStats.CompressedChunkCount == LooseChunkOrderIndexes.size())
{
FilteredCompressedBytesPerSecond.Stop();
}
@@ -2397,7 +2689,7 @@ namespace {
Work.Wait(UsePlainProgress ? 5000 : 200, [&](bool IsAborted, std::ptrdiff_t PendingWork) {
ZEN_UNUSED(IsAborted, PendingWork);
- FilteredCompressedBytesPerSecond.Update(LooseChunksStats.CompressedChunkBytes.load());
+ FilteredCompressedBytesPerSecond.Update(LooseChunksStats.CompressedChunkRawBytes.load());
FilteredGenerateBlockBytesPerSecond.Update(GeneratedBlockByteCount.load());
FilteredUploadedBytesPerSecond.Update(UploadedCompressedChunkSize.load() + UploadedBlockSize.load());
uint64_t UploadedRawSize = UploadedRawChunkSize.load() + UploadedBlockSize.load();
@@ -2408,8 +2700,8 @@ namespace {
"Uploaded {}/{} ({}/{}) blobs "
"({} {}bits/s)",
LooseChunksStats.CompressedChunkCount.load(),
- CompressLooseChunkOrderIndexes.size(),
- NiceBytes(RawLooseChunkByteCount),
+ LooseChunkOrderIndexes.size(),
+ NiceBytes(LooseChunksStats.CompressedChunkRawBytes),
NiceBytes(TotalLooseChunksSize),
NiceNum(FilteredCompressedBytesPerSecond.GetCurrent()),
@@ -2538,9 +2830,10 @@ namespace {
for (size_t KnownBlockIndex : ReuseBlockIndexes)
{
std::vector<uint32_t> FoundChunkIndexes;
- size_t BlockSize = 0;
- size_t AdjustedReuseSize = 0;
- const ChunkBlockDescription& KnownBlock = KnownBlocks[KnownBlockIndex];
+ size_t BlockSize = 0;
+ size_t AdjustedReuseSize = 0;
+ size_t AdjustedRawReuseSize = 0;
+ const ChunkBlockDescription& KnownBlock = KnownBlocks[KnownBlockIndex];
for (size_t BlockChunkIndex = 0; BlockChunkIndex < KnownBlock.ChunkRawHashes.size(); BlockChunkIndex++)
{
const IoHash& BlockChunkHash = KnownBlock.ChunkRawHashes[BlockChunkIndex];
@@ -2553,6 +2846,7 @@ namespace {
{
FoundChunkIndexes.push_back(ChunkIndex);
AdjustedReuseSize += KnownBlock.ChunkCompressedLengths[BlockChunkIndex];
+ AdjustedRawReuseSize += KnownBlock.ChunkRawLengths[BlockChunkIndex];
}
}
}
@@ -2573,6 +2867,7 @@ namespace {
}
FindBlocksStats.AcceptedChunkCount += FoundChunkIndexes.size();
FindBlocksStats.AcceptedByteCount += AdjustedReuseSize;
+ FindBlocksStats.AcceptedRawByteCount += AdjustedRawReuseSize;
FindBlocksStats.AcceptedReduntantChunkCount += KnownBlock.ChunkRawHashes.size() - FoundChunkIndexes.size();
FindBlocksStats.AcceptedReduntantByteCount += BlockSize - AdjustedReuseSize;
}
@@ -2598,12 +2893,14 @@ namespace {
return FilteredReuseBlockIndexes;
};
- void UploadFolder(BuildStorage& Storage,
+ void UploadFolder(StorageInstance& Storage,
const Oid& BuildId,
const Oid& BuildPartId,
const std::string_view BuildPartName,
const std::filesystem::path& Path,
+ const std::filesystem::path& ZenFolderPath,
const std::filesystem::path& ManifestPath,
+ const uint64_t FindBlockMaxCount,
const uint8_t BlockReuseMinPercentLimit,
bool AllowMultiparts,
const CbObject& MetaData,
@@ -2613,17 +2910,18 @@ namespace {
{
Stopwatch ProcessTimer;
- const std::filesystem::path ZenTempFolder = Path / ZenTempFolderName;
+ const std::filesystem::path ZenTempFolder = ZenTempFolderPath(ZenFolderPath);
CreateDirectories(ZenTempFolder);
CleanDirectory(ZenTempFolder, {});
auto _ = MakeGuard([&]() {
if (CleanDirectory(ZenTempFolder, {}))
{
- std::filesystem::remove(ZenTempFolder);
+ std::error_code DummyEc;
+ RemoveDir(ZenTempFolder, DummyEc);
}
});
- CreateDirectories(Path / ZenTempBlockFolderName);
- CreateDirectories(Path / ZenTempChunkFolderName);
+ CreateDirectories(ZenTempBlockFolderPath(ZenFolderPath));
+ CreateDirectories(ZenTempChunkFolderPath(ZenFolderPath));
std::uint64_t TotalRawSize = 0;
@@ -2641,54 +2939,52 @@ namespace {
FindBlocksStatistics FindBlocksStats;
- std::future<PrepareBuildResult> PrepBuildResultFuture =
- GetSmallWorkerPool(EWorkloadType::Burst)
- .EnqueueTask(std::packaged_task<PrepareBuildResult()>{
- [&Storage, BuildId, &MetaData, CreateBuild, AllowMultiparts, IgnoreExistingBlocks, &FindBlocksStats] {
- ZEN_TRACE_CPU("PrepareBuild");
+ std::future<PrepareBuildResult> PrepBuildResultFuture = GetNetworkPool().EnqueueTask(std::packaged_task<PrepareBuildResult()>{
+ [&Storage, BuildId, FindBlockMaxCount, &MetaData, CreateBuild, AllowMultiparts, IgnoreExistingBlocks, &FindBlocksStats] {
+ ZEN_TRACE_CPU("PrepareBuild");
- PrepareBuildResult Result;
- Stopwatch Timer;
- if (CreateBuild)
- {
- ZEN_TRACE_CPU("CreateBuild");
+ PrepareBuildResult Result;
+ Stopwatch Timer;
+ if (CreateBuild)
+ {
+ ZEN_TRACE_CPU("CreateBuild");
- Stopwatch PutBuildTimer;
- CbObject PutBuildResult = Storage.PutBuild(BuildId, MetaData);
- Result.PrepareBuildTimeMs = PutBuildTimer.GetElapsedTimeMs();
- Result.PreferredMultipartChunkSize = PutBuildResult["chunkSize"sv].AsUInt64(Result.PreferredMultipartChunkSize);
- Result.PayloadSize = MetaData.GetSize();
- }
- else
- {
- ZEN_TRACE_CPU("PutBuild");
- Stopwatch GetBuildTimer;
- CbObject Build = Storage.GetBuild(BuildId);
- Result.PrepareBuildTimeMs = GetBuildTimer.GetElapsedTimeMs();
- Result.PayloadSize = Build.GetSize();
- if (auto ChunkSize = Build["chunkSize"sv].AsUInt64(); ChunkSize != 0)
- {
- Result.PreferredMultipartChunkSize = ChunkSize;
- }
- else if (AllowMultiparts)
- {
- ZEN_WARN("PreferredMultipartChunkSize is unknown. Defaulting to '{}'",
- NiceBytes(Result.PreferredMultipartChunkSize));
- }
- }
+ Stopwatch PutBuildTimer;
+ CbObject PutBuildResult = Storage.BuildStorage->PutBuild(BuildId, MetaData);
+ Result.PrepareBuildTimeMs = PutBuildTimer.GetElapsedTimeMs();
+ Result.PreferredMultipartChunkSize = PutBuildResult["chunkSize"sv].AsUInt64(Result.PreferredMultipartChunkSize);
+ Result.PayloadSize = MetaData.GetSize();
+ }
+ else
+ {
+ ZEN_TRACE_CPU("PutBuild");
+ Stopwatch GetBuildTimer;
+ CbObject Build = Storage.BuildStorage->GetBuild(BuildId);
+ Result.PrepareBuildTimeMs = GetBuildTimer.GetElapsedTimeMs();
+ Result.PayloadSize = Build.GetSize();
+ if (auto ChunkSize = Build["chunkSize"sv].AsUInt64(); ChunkSize != 0)
+ {
+ Result.PreferredMultipartChunkSize = ChunkSize;
+ }
+ else if (AllowMultiparts)
+ {
+ ZEN_WARN("PreferredMultipartChunkSize is unknown. Defaulting to '{}'",
+ NiceBytes(Result.PreferredMultipartChunkSize));
+ }
+ }
- if (!IgnoreExistingBlocks)
- {
- ZEN_TRACE_CPU("FindBlocks");
- Stopwatch KnownBlocksTimer;
- Result.KnownBlocks = Storage.FindBlocks(BuildId);
- FindBlocksStats.FindBlockTimeMS = KnownBlocksTimer.GetElapsedTimeMs();
- FindBlocksStats.FoundBlockCount = Result.KnownBlocks.size();
- Result.FindBlocksTimeMs = KnownBlocksTimer.GetElapsedTimeMs();
- }
- Result.ElapsedTimeMs = Timer.GetElapsedTimeMs();
- return Result;
- }});
+ if (!IgnoreExistingBlocks)
+ {
+ ZEN_TRACE_CPU("FindBlocks");
+ Stopwatch KnownBlocksTimer;
+ Result.KnownBlocks = ParseChunkBlockDescriptionList(Storage.BuildStorage->FindBlocks(BuildId, FindBlockMaxCount));
+ FindBlocksStats.FindBlockTimeMS = KnownBlocksTimer.GetElapsedTimeMs();
+ FindBlocksStats.FoundBlockCount = Result.KnownBlocks.size();
+ Result.FindBlocksTimeMs = KnownBlocksTimer.GetElapsedTimeMs();
+ }
+ Result.ElapsedTimeMs = Timer.GetElapsedTimeMs();
+ return Result;
+ }});
ChunkedFolderContent LocalContent;
@@ -2767,7 +3063,7 @@ namespace {
{
std::filesystem::path ExcludeManifestPath = Path / ZenExcludeManifestName;
tsl::robin_set<std::string> ExcludeAssetPaths;
- if (std::filesystem::is_regular_file(ExcludeManifestPath))
+ if (IsFile(ExcludeManifestPath))
{
std::vector<std::filesystem::path> AssetPaths = ParseManifest(Path, ExcludeManifestPath);
ExcludeAssetPaths.reserve(AssetPaths.size());
@@ -2796,10 +3092,10 @@ namespace {
}
return true;
},
- GetMediumWorkerPool(EWorkloadType::Burst),
+ GetIOWorkerPool(),
UsePlainProgress ? 5000 : 200,
[&](bool, std::ptrdiff_t) {
- ZEN_DEBUG("Found {} files in '{}'...", LocalFolderScanStats.AcceptedFileCount.load(), Path);
+ ZEN_CONSOLE_VERBOSE("Found {} files in '{}'...", LocalFolderScanStats.AcceptedFileCount.load(), Path);
},
AbortFlag);
}
@@ -2810,12 +3106,13 @@ namespace {
for (const std::filesystem::path& AssetPath : AssetPaths)
{
Content.Paths.push_back(AssetPath);
- Content.RawSizes.push_back(std::filesystem::file_size(Path / AssetPath));
+ const std::filesystem::path AssetFilePath = (Path / AssetPath).make_preferred();
+ Content.RawSizes.push_back(FileSizeFromPath(AssetFilePath));
#if ZEN_PLATFORM_WINDOWS
- Content.Attributes.push_back(GetFileAttributes(Path / AssetPath));
+ Content.Attributes.push_back(GetFileAttributes(AssetFilePath));
#endif // ZEN_PLATFORM_WINDOWS
#if ZEN_PLATFORM_MAC || ZEN_PLATFORM_LINUX
- Content.Attributes.push_back(GetFileMode(Path / AssetPath));
+ Content.Attributes.push_back(GetFileMode(AssetFilePath));
#endif // ZEN_PLATFORM_MAC || ZEN_PLATFORM_LINUX
LocalFolderScanStats.AcceptedFileByteCount += Content.RawSizes.back();
LocalFolderScanStats.AcceptedFileCount++;
@@ -2823,12 +3120,13 @@ namespace {
if (ManifestPath.is_relative())
{
Content.Paths.push_back(ManifestPath);
- Content.RawSizes.push_back(std::filesystem::file_size(ManifestPath));
+ const std::filesystem::path ManifestFilePath = (Path / ManifestPath).make_preferred();
+ Content.RawSizes.push_back(FileSizeFromPath(ManifestFilePath));
#if ZEN_PLATFORM_WINDOWS
- Content.Attributes.push_back(GetFileAttributes(ManifestPath));
+ Content.Attributes.push_back(GetFileAttributes(ManifestFilePath));
#endif // ZEN_PLATFORM_WINDOWS
#if ZEN_PLATFORM_MAC || ZEN_PLATFORM_LINUX
- Content.Attributes.push_back(GetFileMode(ManifestPath));
+ Content.Attributes.push_back(GetFileMode(ManifestFilePath));
#endif // ZEN_PLATFORM_MAC || ZEN_PLATFORM_LINUX
LocalFolderScanStats.AcceptedFileByteCount += Content.RawSizes.back();
@@ -2855,7 +3153,7 @@ namespace {
FilteredBytesHashed.Start();
LocalContent = ChunkFolderContent(
ChunkingStats,
- GetMediumWorkerPool(EWorkloadType::Burst),
+ GetIOWorkerPool(),
Path,
Content,
*ChunkController,
@@ -2976,16 +3274,17 @@ namespace {
}
FindBlocksStats.NewBlocksChunkCount = NewBlockChunkIndexes.size();
- const double AcceptedByteCountPercent = FindBlocksStats.PotentialChunkByteCount > 0
- ? (100.0 * FindBlocksStats.AcceptedByteCount / FindBlocksStats.PotentialChunkByteCount)
- : 0.0;
+ const double AcceptedByteCountPercent =
+ FindBlocksStats.PotentialChunkByteCount > 0
+ ? (100.0 * FindBlocksStats.AcceptedRawByteCount / FindBlocksStats.PotentialChunkByteCount)
+ : 0.0;
const double AcceptedReduntantByteCountPercent =
FindBlocksStats.AcceptedByteCount > 0 ? (100.0 * FindBlocksStats.AcceptedReduntantByteCount) /
(FindBlocksStats.AcceptedByteCount + FindBlocksStats.AcceptedReduntantByteCount)
: 0.0;
ZEN_CONSOLE(
- "Found {} chunks in {} ({}) blocks eligeble for reuse in {}\n"
+ "Found {} chunks in {} ({}) blocks eligible for reuse in {}\n"
" Reusing {} ({}) matching chunks in {} blocks ({:.1f}%)\n"
" Accepting {} ({}) redundant chunks ({:.1f}%)\n"
" Rejected {} ({}) chunks in {} blocks\n"
@@ -2998,7 +3297,7 @@ namespace {
NiceTimeSpanMs(FindBlocksStats.FindBlockTimeMS),
FindBlocksStats.AcceptedChunkCount,
- NiceBytes(FindBlocksStats.AcceptedByteCount),
+ NiceBytes(FindBlocksStats.AcceptedRawByteCount),
FindBlocksStats.AcceptedBlockCount,
AcceptedByteCountPercent,
@@ -3204,7 +3503,8 @@ namespace {
}
Stopwatch PutBuildPartResultTimer;
- std::pair<IoHash, std::vector<IoHash>> PutBuildPartResult = Storage.PutBuildPart(BuildId, BuildPartId, BuildPartName, PartManifest);
+ std::pair<IoHash, std::vector<IoHash>> PutBuildPartResult =
+ Storage.BuildStorage->PutBuildPart(BuildId, BuildPartId, BuildPartName, PartManifest);
ZEN_CONSOLE("PutBuildPart took {}, payload size {}. {} attachments are needed.",
NiceTimeSpanMs(PutBuildPartResultTimer.GetElapsedTimeMs()),
NiceBytes(PartManifest.GetSize()),
@@ -3231,8 +3531,8 @@ namespace {
TempLooseChunksStats.CompressedChunkCount.load(),
NiceBytes(TempLooseChunksStats.CompressedChunkBytes.load()),
- NiceNum(GetBytesPerSecond(TempLooseChunksStats.CompressChunksElapsedWallTimeUS,
- TempLooseChunksStats.CompressedChunkBytes)),
+ NiceNum(
+ GetBytesPerSecond(TempLooseChunksStats.CompressChunksElapsedWallTimeUS, TempLooseChunksStats.ChunkByteCount)),
TempUploadStats.ChunkCount.load(),
NiceBytes(TempUploadStats.ChunksBytes),
@@ -3243,6 +3543,7 @@ namespace {
UploadPartBlobs(Storage,
BuildId,
Path,
+ ZenFolderPath,
LocalContent,
LocalLookup,
RawHashes,
@@ -3289,7 +3590,7 @@ namespace {
while (!AbortFlag)
{
Stopwatch FinalizeBuildPartTimer;
- std::vector<IoHash> Needs = Storage.FinalizeBuildPart(BuildId, BuildPartId, PartHash);
+ std::vector<IoHash> Needs = Storage.BuildStorage->FinalizeBuildPart(BuildId, BuildPartId, PartHash);
ZEN_CONSOLE("FinalizeBuildPart took {}. {} attachments are missing.",
NiceTimeSpanMs(FinalizeBuildPartTimer.GetElapsedTimeMs()),
Needs.size());
@@ -3304,7 +3605,7 @@ namespace {
if (CreateBuild && !AbortFlag)
{
Stopwatch FinalizeBuildTimer;
- Storage.FinalizeBuild(BuildId);
+ Storage.BuildStorage->FinalizeBuild(BuildId);
ZEN_CONSOLE("FinalizeBuild took {}", NiceTimeSpanMs(FinalizeBuildTimer.GetElapsedTimeMs()));
}
@@ -3321,7 +3622,13 @@ namespace {
{
const CbObject BlockMetaData =
BuildChunkBlockDescription(NewBlocks.BlockDescriptions[BlockIndex], NewBlocks.BlockMetaDatas[BlockIndex]);
- Storage.PutBlockMetadata(BuildId, BlockHash, BlockMetaData);
+ if (Storage.BuildCacheStorage)
+ {
+ Storage.BuildCacheStorage->PutBlobMetadatas(BuildId,
+ std::vector<IoHash>({BlockHash}),
+ std::vector<CbObject>({BlockMetaData}));
+ }
+ Storage.BuildStorage->PutBlockMetadata(BuildId, BlockHash, BlockMetaData);
UploadStats.BlocksBytes += BlockMetaData.GetSize();
NewBlocks.MetaDataHasBeenUploaded[BlockIndex] = true;
UploadBlockMetadataCount++;
@@ -3340,7 +3647,7 @@ namespace {
DownloadStatistics ValidateDownloadStats;
if (PostUploadVerify && !AbortFlag)
{
- ValidateBuildPart(Storage, BuildId, BuildPartId, BuildPartName, ValidateStats, ValidateDownloadStats);
+ ValidateBuildPart(*Storage.BuildStorage, BuildId, BuildPartId, BuildPartName, ValidateStats, ValidateDownloadStats);
}
ZEN_CONSOLE_VERBOSE(
@@ -3384,6 +3691,7 @@ namespace {
"\n AcceptedBlockCount: {}"
"\n AcceptedChunkCount: {}"
"\n AcceptedByteCount: {}"
+ "\n AcceptedRawByteCount: {}"
"\n RejectedBlockCount: {}"
"\n RejectedChunkCount: {}"
"\n RejectedByteCount: {}"
@@ -3401,6 +3709,7 @@ namespace {
FindBlocksStats.AcceptedBlockCount,
FindBlocksStats.AcceptedChunkCount,
NiceBytes(FindBlocksStats.AcceptedByteCount),
+ NiceBytes(FindBlocksStats.AcceptedRawByteCount),
FindBlocksStats.RejectedBlockCount,
FindBlocksStats.RejectedChunkCount,
NiceBytes(FindBlocksStats.RejectedByteCount),
@@ -3570,13 +3879,13 @@ namespace {
ValidateInfo);
- Storage.PutBuildPartStats(
+ Storage.BuildStorage->PutBuildPartStats(
BuildId,
BuildPartId,
{{"totalSize", double(LocalFolderScanStats.FoundFileByteCount.load())},
{"reusedRatio", AcceptedByteCountPercent / 100.0},
{"reusedBlockCount", double(FindBlocksStats.AcceptedBlockCount)},
- {"reusedBlockByteCount", double(FindBlocksStats.AcceptedByteCount)},
+ {"reusedBlockByteCount", double(FindBlocksStats.AcceptedRawByteCount)},
{"newBlockCount", double(FindBlocksStats.NewBlocksCount)},
{"newBlockByteCount", double(FindBlocksStats.NewBlocksChunkByteCount)},
{"uploadedCount", double(UploadStats.BlockCount.load() + UploadStats.ChunkCount.load())},
@@ -3597,7 +3906,7 @@ namespace {
ProgressBar ProgressBar(UsePlainProgress);
- WorkerThreadPool& VerifyPool = GetMediumWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); //
+ WorkerThreadPool& VerifyPool = GetIOWorkerPool();
ParallellWork Work(AbortFlag);
@@ -3646,7 +3955,7 @@ namespace {
if (IsAcceptedFolder(TargetPath.parent_path().generic_string()))
{
const uint64_t ExpectedSize = Content.RawSizes[PathIndex];
- if (!std::filesystem::exists(TargetPath))
+ if (!IsFile(TargetPath))
{
ErrorLock.WithExclusiveLock([&]() {
Errors.push_back(fmt::format("File {} with expected size {} does not exist", TargetPath, ExpectedSize));
@@ -3656,7 +3965,7 @@ namespace {
else
{
std::error_code Ec;
- uint64_t SizeOnDisk = gsl::narrow<uint64_t>(std::filesystem::file_size(TargetPath, Ec));
+ uint64_t SizeOnDisk = gsl::narrow<uint64_t>(FileSizeFromPath(TargetPath, Ec));
if (Ec)
{
ErrorLock.WithExclusiveLock([&]() {
@@ -3873,12 +4182,34 @@ namespace {
return ChunkTargetPtrs;
};
+ uint64_t GetChunkWriteCount(std::span<const std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
+ const ChunkedContentLookup& Lookup,
+ uint32_t ChunkIndex)
+ {
+ uint64_t WriteCount = 0;
+ std::span<const ChunkedContentLookup::ChunkSequenceLocation> ChunkSources = GetChunkSequenceLocations(Lookup, ChunkIndex);
+ for (const ChunkedContentLookup::ChunkSequenceLocation& Source : ChunkSources)
+ {
+ if (SequenceIndexChunksLeftToWriteCounters[Source.SequenceIndex].load() > 0)
+ {
+ WriteCount++;
+ }
+ }
+ return WriteCount;
+ };
+
void FinalizeChunkSequence(const std::filesystem::path& TargetFolder, const IoHash& SequenceRawHash)
{
ZEN_TRACE_CPU("FinalizeChunkSequence");
- ZEN_ASSERT_SLOW(!std::filesystem::exists(GetFinalChunkedSequenceFileName(TargetFolder, SequenceRawHash)));
- std::filesystem::rename(GetTempChunkedSequenceFileName(TargetFolder, SequenceRawHash),
- GetFinalChunkedSequenceFileName(TargetFolder, SequenceRawHash));
+ ZEN_ASSERT_SLOW(!IsFile(GetFinalChunkedSequenceFileName(TargetFolder, SequenceRawHash)));
+ std::error_code Ec;
+ RenameFile(GetTempChunkedSequenceFileName(TargetFolder, SequenceRawHash),
+ GetFinalChunkedSequenceFileName(TargetFolder, SequenceRawHash),
+ Ec);
+ if (Ec)
+ {
+ throw std::system_error(Ec);
+ }
}
void FinalizeChunkSequences(const std::filesystem::path& TargetFolder,
@@ -3892,8 +4223,39 @@ namespace {
}
}
+ void VerifySequence(const std::filesystem::path& TargetFolder,
+ const ChunkedFolderContent& RemoteContent,
+ const ChunkedContentLookup& Lookup,
+ uint32_t RemoteSequenceIndex)
+ {
+ ZEN_TRACE_CPU("VerifySequence");
+ const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex];
+ {
+ ZEN_TRACE_CPU("HashSequence");
+ const std::uint32_t RemotePathIndex = Lookup.SequenceIndexFirstPathIndex[RemoteSequenceIndex];
+ const uint64_t ExpectedSize = RemoteContent.RawSizes[RemotePathIndex];
+ IoBuffer VerifyBuffer = IoBufferBuilder::MakeFromFile(GetTempChunkedSequenceFileName(TargetFolder, SequenceRawHash));
+ const uint64_t VerifySize = VerifyBuffer.GetSize();
+ if (VerifySize != ExpectedSize)
+ {
+ throw std::runtime_error(fmt::format("Written chunk sequence {} size {} does not match expected size {}",
+ SequenceRawHash,
+ VerifySize,
+ ExpectedSize));
+ }
+ ZEN_TRACE_CPU("HashSequence");
+ const IoHash VerifyChunkHash = IoHash::HashBuffer(std::move(VerifyBuffer));
+ if (VerifyChunkHash != SequenceRawHash)
+ {
+ throw std::runtime_error(
+ fmt::format("Written chunk sequence {} hash does not match expected hash {}", VerifyChunkHash, SequenceRawHash));
+ }
+ }
+ }
+
void VerifyAndCompleteChunkSequencesAsync(const std::filesystem::path& TargetFolder,
const ChunkedFolderContent& RemoteContent,
+ const ChunkedContentLookup& Lookup,
std::span<const uint32_t> RemoteSequenceIndexes,
ParallellWork& Work,
WorkerThreadPool& VerifyPool)
@@ -3908,46 +4270,33 @@ namespace {
const uint32_t RemoteSequenceIndex = RemoteSequenceIndexes[RemoteSequenceIndexOffset];
Work.ScheduleWork(
VerifyPool,
- [&RemoteContent, TargetFolder, RemoteSequenceIndex](std::atomic<bool>&) {
+ [&RemoteContent, &Lookup, TargetFolder, RemoteSequenceIndex](std::atomic<bool>&) {
if (!AbortFlag)
{
ZEN_TRACE_CPU("VerifyAndCompleteChunkSequenceAsync");
- const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex];
+ VerifySequence(TargetFolder, RemoteContent, Lookup, RemoteSequenceIndex);
+ if (!AbortFlag)
{
- ZEN_TRACE_CPU("HashSequence");
- const IoHash VerifyChunkHash = IoHash::HashBuffer(
- IoBufferBuilder::MakeFromFile(GetTempChunkedSequenceFileName(TargetFolder, SequenceRawHash)));
- if (VerifyChunkHash != SequenceRawHash)
- {
- throw std::runtime_error(fmt::format("Written chunk sequence {} hash does not match expected hash {}",
- VerifyChunkHash,
- SequenceRawHash));
- }
+ const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex];
+ FinalizeChunkSequence(TargetFolder, SequenceRawHash);
}
- FinalizeChunkSequence(TargetFolder, SequenceRawHash);
}
},
Work.DefaultErrorFunction());
}
const uint32_t RemoteSequenceIndex = RemoteSequenceIndexes[0];
+ VerifySequence(TargetFolder, RemoteContent, Lookup, RemoteSequenceIndex);
const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex];
- {
- ZEN_TRACE_CPU("HashSequence");
- const IoHash VerifyChunkHash =
- IoHash::HashBuffer(IoBufferBuilder::MakeFromFile(GetTempChunkedSequenceFileName(TargetFolder, SequenceRawHash)));
- if (VerifyChunkHash != SequenceRawHash)
- {
- throw std::runtime_error(
- fmt::format("Written chunk sequence {} hash does not match expected hash {}", VerifyChunkHash, SequenceRawHash));
- }
- }
FinalizeChunkSequence(TargetFolder, SequenceRawHash);
}
bool CompleteSequenceChunk(uint32_t RemoteSequenceIndex, std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters)
{
- return SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].fetch_sub(1) == 1;
+ uint32_t PreviousValue = SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].fetch_sub(1);
+ ZEN_ASSERT(PreviousValue >= 1);
+ ZEN_ASSERT(PreviousValue != (uint32_t)-1);
+ return PreviousValue == 1;
}
std::vector<uint32_t> CompleteChunkTargets(const std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>& ChunkTargetPtrs,
@@ -3985,8 +4334,7 @@ namespace {
const BlockWriteOps& Ops,
ParallellWork& Work,
WorkerThreadPool& VerifyPool,
- DiskStatistics& DiskStats,
- WriteChunkStatistics& WriteChunkStats)
+ DiskStatistics& DiskStats)
{
ZEN_TRACE_CPU("WriteBlockChunkOps");
{
@@ -4017,12 +4365,6 @@ namespace {
FileOffset,
RemoteContent.RawSizes[PathIndex]);
}
- WriteChunkStats.ChunkCountWritten += gsl::narrow<uint32_t>(Ops.ChunkBuffers.size());
- WriteChunkStats.ChunkBytesWritten +=
- std::accumulate(Ops.ChunkBuffers.begin(),
- Ops.ChunkBuffers.end(),
- uint64_t(0),
- [](uint64_t Current, const CompositeBuffer& Buffer) -> uint64_t { return Current + Buffer.GetSize(); });
}
if (!AbortFlag)
{
@@ -4036,7 +4378,7 @@ namespace {
CompletedChunkSequences.push_back(RemoteSequenceIndex);
}
}
- VerifyAndCompleteChunkSequencesAsync(CacheFolderPath, RemoteContent, CompletedChunkSequences, Work, VerifyPool);
+ VerifyAndCompleteChunkSequencesAsync(CacheFolderPath, RemoteContent, Lookup, CompletedChunkSequences, Work, VerifyPool);
}
}
@@ -4117,11 +4459,36 @@ namespace {
bool NeedsWrite = true;
if (RemoteChunkIndexNeedsCopyFromSourceFlags[ChunkIndex].compare_exchange_strong(NeedsWrite, false))
{
- MemoryView ChunkMemoryView = BlockView.Mid(OffsetInBlock + CompressedBuffer::GetHeaderSizeForNoneEncoder(),
- ChunkCompressedSize - CompressedBuffer::GetHeaderSizeForNoneEncoder());
- IoBuffer Decompressed(IoBuffer::Wrap, ChunkMemoryView.GetData(), ChunkMemoryView.GetSize());
- ZEN_ASSERT_SLOW(ChunkHash == IoHash::HashBuffer(Decompressed));
+ MemoryView ChunkMemoryView = BlockView.Mid(OffsetInBlock, ChunkCompressedSize);
+ IoHash VerifyChunkHash;
+ uint64_t VerifyChunkSize;
+ CompressedBuffer CompressedChunk =
+ CompressedBuffer::FromCompressed(SharedBuffer::MakeView(ChunkMemoryView), VerifyChunkHash, VerifyChunkSize);
+ ZEN_ASSERT(CompressedChunk);
+ ZEN_ASSERT(VerifyChunkHash == ChunkHash);
+ ZEN_ASSERT(VerifyChunkSize == RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex]);
+
+ OodleCompressor ChunkCompressor;
+ OodleCompressionLevel ChunkCompressionLevel;
+ uint64_t ChunkBlockSize;
+
+ bool GetCompressParametersSuccess =
+ CompressedChunk.TryGetCompressParameters(ChunkCompressor, ChunkCompressionLevel, ChunkBlockSize);
+ ZEN_ASSERT(GetCompressParametersSuccess);
+
+ IoBuffer Decompressed;
+ if (ChunkCompressionLevel == OodleCompressionLevel::None)
+ {
+ MemoryView ChunkDecompressedMemoryView = ChunkMemoryView.Mid(CompressedBuffer::GetHeaderSizeForNoneEncoder());
+ Decompressed =
+ IoBuffer(IoBuffer::Wrap, ChunkDecompressedMemoryView.GetData(), ChunkDecompressedMemoryView.GetSize());
+ }
+ else
+ {
+ Decompressed = CompressedChunk.Decompress().AsIoBuffer();
+ }
ZEN_ASSERT(Decompressed.GetSize() == RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex]);
+ ZEN_ASSERT_SLOW(ChunkHash == IoHash::HashBuffer(Decompressed));
for (const ChunkedContentLookup::ChunkSequenceLocation* Target : ChunkTargetPtrs)
{
OutOps.WriteOps.push_back(
@@ -4162,8 +4529,7 @@ namespace {
CompositeBuffer&& BlockBuffer,
const ChunkedContentLookup& Lookup,
std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags,
- DiskStatistics& DiskStats,
- WriteChunkStatistics& WriteChunkStats)
+ DiskStatistics& DiskStats)
{
ZEN_TRACE_CPU("WriteBlockToDisk");
@@ -4197,8 +4563,7 @@ namespace {
Ops,
Work,
VerifyPool,
- DiskStats,
- WriteChunkStats);
+ DiskStats);
return true;
}
return false;
@@ -4222,8 +4587,7 @@ namespace {
Ops,
Work,
VerifyPool,
- DiskStats,
- WriteChunkStats);
+ DiskStats);
return true;
}
return false;
@@ -4240,8 +4604,7 @@ namespace {
uint32_t LastIncludedBlockChunkIndex,
const ChunkedContentLookup& Lookup,
std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags,
- DiskStatistics& DiskStats,
- WriteChunkStatistics& WriteChunkStats)
+ DiskStatistics& DiskStats)
{
ZEN_TRACE_CPU("WritePartialBlockToDisk");
@@ -4267,8 +4630,7 @@ namespace {
Ops,
Work,
VerifyPool,
- DiskStats,
- WriteChunkStats);
+ DiskStats);
return true;
}
else
@@ -4355,8 +4717,7 @@ namespace {
void StreamDecompress(const std::filesystem::path& CacheFolderPath,
const IoHash& SequenceRawHash,
CompositeBuffer&& CompressedPart,
- DiskStatistics& DiskStats,
- WriteChunkStatistics& WriteChunkStats)
+ DiskStatistics& DiskStats)
{
ZEN_TRACE_CPU("StreamDecompress");
const std::filesystem::path TempChunkSequenceFileName = GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash);
@@ -4390,7 +4751,6 @@ namespace {
DiskStats.ReadByteCount += SourceSize;
if (!AbortFlag)
{
- WriteChunkStats.ChunkBytesWritten += RangeBuffer.GetSize();
DecompressedTemp.Write(RangeBuffer, Offset);
for (const SharedBuffer& Segment : RangeBuffer.GetSegments())
{
@@ -4424,7 +4784,7 @@ namespace {
throw std::runtime_error(
fmt::format("Failed moving temporary file for decompressing large blob {}. Reason: {}", SequenceRawHash, Ec.message()));
}
- WriteChunkStats.ChunkCountWritten++;
+ // WriteChunkStats.ChunkCountWritten++;
}
bool WriteCompressedChunk(const std::filesystem::path& TargetFolder,
@@ -4433,8 +4793,7 @@ namespace {
const IoHash& ChunkHash,
const std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>& ChunkTargetPtrs,
IoBuffer&& CompressedPart,
- DiskStatistics& DiskStats,
- WriteChunkStatistics& WriteChunkStats)
+ DiskStatistics& DiskStats)
{
auto ChunkHashToChunkIndexIt = RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash);
ZEN_ASSERT(ChunkHashToChunkIndexIt != RemoteLookup.ChunkHashToChunkIndex.end());
@@ -4444,7 +4803,7 @@ namespace {
{
const std::uint32_t SequenceIndex = ChunkTargetPtrs.front()->SequenceIndex;
const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex];
- StreamDecompress(TargetFolder, SequenceRawHash, CompositeBuffer(std::move(CompressedPart)), DiskStats, WriteChunkStats);
+ StreamDecompress(TargetFolder, SequenceRawHash, CompositeBuffer(std::move(CompressedPart)), DiskStats);
}
else
{
@@ -4459,15 +4818,13 @@ namespace {
ChunkTargetPtrs,
CompositeBuffer(std::move(Chunk)),
OpenFileCache);
- WriteChunkStats.ChunkCountWritten++;
- WriteChunkStats.ChunkBytesWritten += ChunkRawSize;
return true;
}
}
return false;
}
- void AsyncWriteDownloadedChunk(const std::filesystem::path& Path,
+ void AsyncWriteDownloadedChunk(const std::filesystem::path& ZenFolderPath,
const ChunkedFolderContent& RemoteContent,
const ChunkedContentLookup& RemoteLookup,
uint32_t RemoteChunkIndex,
@@ -4479,8 +4836,7 @@ namespace {
std::atomic<uint64_t>& WritePartsComplete,
const uint64_t TotalPartWriteCount,
FilteredRate& FilteredWrittenBytesPerSecond,
- DiskStatistics& DiskStats,
- WriteChunkStatistics& WriteChunkStats)
+ DiskStatistics& DiskStats)
{
ZEN_TRACE_CPU("AsyncWriteDownloadedChunk");
@@ -4502,8 +4858,8 @@ namespace {
{
Payload.SetDeleteOnClose(false);
Payload = {};
- CompressedChunkPath = Path / ZenTempDownloadFolderName / ChunkHash.ToHexString();
- std::filesystem::rename(TempBlobPath, CompressedChunkPath, Ec);
+ CompressedChunkPath = ZenTempDownloadFolderPath(ZenFolderPath) / ChunkHash.ToHexString();
+ RenameFile(TempBlobPath, CompressedChunkPath, Ec);
if (Ec)
{
CompressedChunkPath = std::filesystem::path{};
@@ -4521,13 +4877,13 @@ namespace {
{
ZEN_TRACE_CPU("WriteTempChunk");
// Could not be moved and rather large, lets store it on disk
- CompressedChunkPath = Path / ZenTempDownloadFolderName / ChunkHash.ToHexString();
+ CompressedChunkPath = ZenTempDownloadFolderPath(ZenFolderPath) / ChunkHash.ToHexString();
TemporaryFile::SafeWriteFile(CompressedChunkPath, Payload);
Payload = {};
}
Work.ScheduleWork(
- WritePool, // GetSyncWorkerPool(),//
+ WritePool,
[&,
SequenceIndexChunksLeftToWriteCounters,
CompressedChunkPath,
@@ -4557,7 +4913,7 @@ namespace {
}
}
- std::filesystem::path TargetFolder = Path / ZenTempCacheFolderName;
+ std::filesystem::path TargetFolder = ZenTempCacheFolderPath(ZenFolderPath);
bool NeedHashVerify = WriteCompressedChunk(TargetFolder,
RemoteContent,
@@ -4565,8 +4921,7 @@ namespace {
ChunkHash,
ChunkTargetPtrs,
std::move(CompressedPart),
- DiskStats,
- WriteChunkStats);
+ DiskStats);
if (!AbortFlag)
{
WritePartsComplete++;
@@ -4575,13 +4930,18 @@ namespace {
FilteredWrittenBytesPerSecond.Stop();
}
- std::filesystem::remove(CompressedChunkPath);
+ RemoveFile(CompressedChunkPath);
std::vector<uint32_t> CompletedSequences =
CompleteChunkTargets(ChunkTargetPtrs, SequenceIndexChunksLeftToWriteCounters);
if (NeedHashVerify)
{
- VerifyAndCompleteChunkSequencesAsync(TargetFolder, RemoteContent, CompletedSequences, Work, WritePool);
+ VerifyAndCompleteChunkSequencesAsync(TargetFolder,
+ RemoteContent,
+ RemoteLookup,
+ CompletedSequences,
+ Work,
+ WritePool);
}
else
{
@@ -4593,9 +4953,10 @@ namespace {
Work.DefaultErrorFunction());
};
- void UpdateFolder(BuildStorage& Storage,
+ void UpdateFolder(StorageInstance& Storage,
const Oid& BuildId,
const std::filesystem::path& Path,
+ const std::filesystem::path& ZenFolderPath,
const std::uint64_t LargeAttachmentSize,
const std::uint64_t PreferredMultipartChunkSize,
const ChunkedFolderContent& LocalContent,
@@ -4604,6 +4965,7 @@ namespace {
const std::vector<IoHash>& LooseChunkHashes,
bool AllowPartialBlockRequests,
bool WipeTargetFolder,
+ bool PrimeCacheOnly,
FolderContent& OutLocalFolderState,
DiskStatistics& DiskStats,
CacheMappingStatistics& CacheMappingStats,
@@ -4613,7 +4975,7 @@ namespace {
{
ZEN_TRACE_CPU("UpdateFolder");
- ZEN_UNUSED(WipeTargetFolder);
+ ZEN_ASSERT((!PrimeCacheOnly) || (PrimeCacheOnly && (!AllowPartialBlockRequests)));
Stopwatch IndexTimer;
@@ -4623,7 +4985,7 @@ namespace {
ZEN_CONSOLE("Indexed local and remote content in {}", NiceTimeSpanMs(IndexTimer.GetElapsedTimeMs()));
- const std::filesystem::path CacheFolderPath = Path / ZenTempCacheFolderName;
+ const std::filesystem::path CacheFolderPath = ZenTempCacheFolderPath(ZenFolderPath);
Stopwatch CacheMappingTimer;
@@ -4633,6 +4995,7 @@ namespace {
tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> CachedChunkHashesFound;
tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> CachedSequenceHashesFound;
+ if (!PrimeCacheOnly)
{
ZEN_TRACE_CPU("UpdateFolder_CheckChunkCache");
@@ -4667,17 +5030,24 @@ namespace {
if (SequenceSize == CacheDirContent.FileSizes[Index])
{
CachedSequenceHashesFound.insert({FileHash, SequenceIndex});
- CacheMappingStats.CacheSequenceHashesCount += SequenceSize;
- CacheMappingStats.CacheSequenceHashesByteCount++;
+ CacheMappingStats.CacheSequenceHashesCount++;
+ CacheMappingStats.CacheSequenceHashesByteCount += SequenceSize;
+
+ const std::filesystem::path CacheFilePath =
+ GetFinalChunkedSequenceFileName(CacheFolderPath,
+ RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]);
+ ZEN_ASSERT_SLOW(IsFile(CacheFilePath));
+
continue;
}
}
}
- std::filesystem::remove(CacheDirContent.Files[Index]);
+ RemoveFileWithRetry(CacheDirContent.Files[Index]);
}
}
tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> CachedBlocksFound;
+ if (!PrimeCacheOnly)
{
ZEN_TRACE_CPU("UpdateFolder_CheckBlockCache");
@@ -4690,7 +5060,7 @@ namespace {
}
DirectoryContent BlockDirContent;
- GetDirectoryContent(Path / ZenTempBlockFolderName,
+ GetDirectoryContent(ZenTempBlockFolderPath(ZenFolderPath),
DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::IncludeFileSizes,
BlockDirContent);
CachedBlocksFound.reserve(BlockDirContent.Files.size());
@@ -4718,13 +5088,15 @@ namespace {
}
}
}
- std::filesystem::remove(BlockDirContent.Files[Index]);
+ RemoveFileWithRetry(BlockDirContent.Files[Index]);
}
}
std::vector<uint32_t> LocalPathIndexesMatchingSequenceIndexes;
- // Pick up all whole files we can use from current local state
+
+ if (!PrimeCacheOnly)
{
+ // Pick up all whole files we can use from current local state
ZEN_TRACE_CPU("UpdateFolder_CheckLocalChunks");
for (uint32_t RemoteSequenceIndex = 0; RemoteSequenceIndex < RemoteContent.ChunkedContent.SequenceRawHashes.size();
RemoteSequenceIndex++)
@@ -4736,6 +5108,8 @@ namespace {
// const uint32_t RemoteSequenceIndex = CacheSequenceIt->second;
// const uint32_t RemotePathIndex = GetFirstPathIndexForSeqeuenceIndex(RemoteLookup, RemoteSequenceIndex);
// RemoteSequenceByteCountFoundInCache += RemoteContent.RawSizes[RemotePathIndex];
+ const std::filesystem::path CacheFilePath = GetFinalChunkedSequenceFileName(CacheFolderPath, RemoteSequenceRawHash);
+ ZEN_ASSERT_SLOW(IsFile(CacheFilePath));
}
else if (auto CacheChunkIt = CachedChunkHashesFound.find(RemoteSequenceRawHash);
CacheChunkIt != CachedChunkHashesFound.end())
@@ -4743,13 +5117,16 @@ namespace {
// const uint32_t RemoteChunkIndex = CacheChunkIt->second;
// const uint32_t RemotePathIndex = GetFirstPathIndexForSeqeuenceIndex(RemoteLookup, RemoteSequenceIndex);
// RemoteSequenceByteCountFoundInCache += RemoteContent.RawSizes[RemotePathIndex];
+ const std::filesystem::path CacheFilePath = GetFinalChunkedSequenceFileName(CacheFolderPath, RemoteSequenceRawHash);
+ ZEN_ASSERT_SLOW(IsFile(CacheFilePath));
}
else if (auto It = LocalLookup.RawHashToSequenceIndex.find(RemoteSequenceRawHash);
It != LocalLookup.RawHashToSequenceIndex.end())
{
const uint32_t LocalSequenceIndex = It->second;
const uint32_t LocalPathIndex = GetFirstPathIndexForSeqeuenceIndex(LocalLookup, LocalSequenceIndex);
- uint64_t RawSize = LocalContent.RawSizes[LocalPathIndex];
+ ZEN_ASSERT_SLOW(IsFile((Path / LocalContent.Paths[LocalPathIndex]).make_preferred()));
+ uint64_t RawSize = LocalContent.RawSizes[LocalPathIndex];
LocalPathIndexesMatchingSequenceIndexes.push_back(LocalPathIndex);
CacheMappingStats.LocalPathsMatchingSequencesCount++;
CacheMappingStats.LocalPathsMatchingSequencesByteCount += RawSize;
@@ -4762,6 +5139,15 @@ namespace {
}
}
}
+ else
+ {
+ for (uint32_t RemoteSequenceIndex = 0; RemoteSequenceIndex < RemoteContent.ChunkedContent.SequenceRawHashes.size();
+ RemoteSequenceIndex++)
+ {
+ const uint32_t ChunkCount = RemoteContent.ChunkedContent.ChunkCounts[RemoteSequenceIndex];
+ SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex] = ChunkCount;
+ }
+ }
// Pick up all chunks in current local state
struct CacheCopyData
{
@@ -4779,6 +5165,7 @@ namespace {
tsl::robin_map<IoHash, size_t, IoHash::Hasher> RawHashToCacheCopyDataIndex;
std::vector<CacheCopyData> CacheCopyDatas;
+ if (!PrimeCacheOnly)
{
ZEN_TRACE_CPU("UpdateFolder_GetLocalChunks");
@@ -4852,38 +5239,37 @@ namespace {
}
}
- if (!CachedSequenceHashesFound.empty() || !CachedChunkHashesFound.empty() || !CachedBlocksFound.empty() ||
- !LocalPathIndexesMatchingSequenceIndexes.empty() || CacheMappingStats.LocalChunkMatchingRemoteCount > 0)
+ if (!CachedSequenceHashesFound.empty() || !CachedChunkHashesFound.empty() || !CachedBlocksFound.empty())
{
- ZEN_CONSOLE(
- "Cache: {} ({}) chunk sequences, {} ({}) chunks, {} ({}) blocks. Local state: {} ({}) chunk sequences, {} ({}) chunks",
- CachedSequenceHashesFound.size(),
- NiceBytes(CacheMappingStats.CacheSequenceHashesByteCount),
- CachedChunkHashesFound.size(),
- NiceBytes(CacheMappingStats.CacheChunkByteCount),
- CachedBlocksFound.size(),
- NiceBytes(CacheMappingStats.CacheBlocksByteCount),
- LocalPathIndexesMatchingSequenceIndexes.size(),
- NiceBytes(CacheMappingStats.LocalPathsMatchingSequencesByteCount),
- CacheMappingStats.LocalChunkMatchingRemoteCount,
- NiceBytes(CacheMappingStats.LocalChunkMatchingRemoteByteCount));
+ ZEN_CONSOLE("Download cache: Found {} ({}) chunk sequences, {} ({}) chunks, {} ({}) blocks.",
+ CachedSequenceHashesFound.size(),
+ NiceBytes(CacheMappingStats.CacheSequenceHashesByteCount),
+ CachedChunkHashesFound.size(),
+ NiceBytes(CacheMappingStats.CacheChunkByteCount),
+ CachedBlocksFound.size(),
+ NiceBytes(CacheMappingStats.CacheBlocksByteCount));
}
- uint32_t ChunkCountToWrite = 0;
+ if (!LocalPathIndexesMatchingSequenceIndexes.empty() || CacheMappingStats.LocalChunkMatchingRemoteCount > 0)
+ {
+ ZEN_CONSOLE("Local state : Found {} ({}) chunk sequences, {} ({}) chunks",
+ LocalPathIndexesMatchingSequenceIndexes.size(),
+ NiceBytes(CacheMappingStats.LocalPathsMatchingSequencesByteCount),
+ CacheMappingStats.LocalChunkMatchingRemoteCount,
+ NiceBytes(CacheMappingStats.LocalChunkMatchingRemoteByteCount));
+ }
+
+ uint64_t BytesToWrite = 0;
+
for (uint32_t RemoteChunkIndex = 0; RemoteChunkIndex < RemoteContent.ChunkedContent.ChunkHashes.size(); RemoteChunkIndex++)
{
- if (RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex])
- {
- ChunkCountToWrite++;
- }
- else
+ uint64_t ChunkWriteCount = GetChunkWriteCount(SequenceIndexChunksLeftToWriteCounters, RemoteLookup, RemoteChunkIndex);
+ if (ChunkWriteCount > 0)
{
- std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs =
- GetRemainingChunkTargets(SequenceIndexChunksLeftToWriteCounters, RemoteLookup, RemoteChunkIndex);
- if (!ChunkTargetPtrs.empty())
+ BytesToWrite += RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex] * ChunkWriteCount;
+ if (!RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex])
{
RemoteChunkIndexNeedsCopyFromSourceFlags[RemoteChunkIndex] = true;
- ChunkCountToWrite++;
}
}
}
@@ -4900,8 +5286,8 @@ namespace {
FilteredRate FilteredDownloadedBytesPerSecond;
FilteredRate FilteredWrittenBytesPerSecond;
- WorkerThreadPool& NetworkPool = GetSmallWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); //
- WorkerThreadPool& WritePool = GetMediumWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); //
+ WorkerThreadPool& NetworkPool = GetNetworkPool();
+ WorkerThreadPool& WritePool = GetIOWorkerPool();
ProgressBar WriteProgressBar(UsePlainProgress);
ParallellWork Work(AbortFlag);
@@ -4922,7 +5308,7 @@ namespace {
const uint32_t RemoteChunkIndex = RemoteChunkIndexIt->second;
if (RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex])
{
- ZEN_DEBUG("Skipping chunk {} due to cache reuse", ChunkHash);
+ ZEN_CONSOLE_VERBOSE("Skipping chunk {} due to cache reuse", ChunkHash);
continue;
}
bool NeedsCopy = true;
@@ -4933,7 +5319,7 @@ namespace {
if (ChunkTargetPtrs.empty())
{
- ZEN_DEBUG("Skipping chunk {} due to cache reuse", ChunkHash);
+ ZEN_CONSOLE_VERBOSE("Skipping chunk {} due to cache reuse", ChunkHash);
}
else
{
@@ -4993,118 +5379,274 @@ namespace {
const std::vector<uint32_t> BlockChunkIndexNeeded = GetNeededChunkBlockIndexes(BlockDescription);
if (!BlockChunkIndexNeeded.empty())
{
- bool UsingCachedBlock = false;
- if (auto It = CachedBlocksFound.find(BlockDescription.BlockHash); It != CachedBlocksFound.end())
+ if (PrimeCacheOnly)
{
- ZEN_TRACE_CPU("UpdateFolder_HandleBlocks_CacheGet");
-
+ TotalRequestCount++;
TotalPartWriteCount++;
- std::filesystem::path BlockPath = Path / ZenTempBlockFolderName / BlockDescription.BlockHash.ToHexString();
- if (std::filesystem::exists(BlockPath))
- {
- CachedChunkBlockIndexes.push_back(BlockIndex);
- UsingCachedBlock = true;
- }
+ FullBlockWorks.push_back(BlockIndex);
}
-
- if (!UsingCachedBlock)
+ else
{
- bool WantsToDoPartialBlockDownload = BlockChunkIndexNeeded.size() < BlockDescription.ChunkRawHashes.size();
- bool CanDoPartialBlockDownload =
- (BlockDescription.HeaderSize > 0) &&
- (BlockDescription.ChunkCompressedLengths.size() == BlockDescription.ChunkRawHashes.size());
- if (AllowPartialBlockRequests && WantsToDoPartialBlockDownload && CanDoPartialBlockDownload)
+ bool UsingCachedBlock = false;
+ if (auto It = CachedBlocksFound.find(BlockDescription.BlockHash); It != CachedBlocksFound.end())
{
- std::vector<BlockRangeDescriptor> BlockRanges;
+ ZEN_TRACE_CPU("UpdateFolder_HandleBlocks_CacheGet");
- ZEN_TRACE_CPU("UpdateFolder_HandleBlocks_PartialAnalysis");
+ TotalPartWriteCount++;
- uint32_t NeedBlockChunkIndexOffset = 0;
- uint32_t ChunkBlockIndex = 0;
- uint32_t CurrentOffset =
- gsl::narrow<uint32_t>(CompressedBuffer::GetHeaderSizeForNoneEncoder() + BlockDescription.HeaderSize);
+ std::filesystem::path BlockPath =
+ ZenTempBlockFolderPath(ZenFolderPath) / BlockDescription.BlockHash.ToHexString();
+ if (IsFile(BlockPath))
+ {
+ CachedChunkBlockIndexes.push_back(BlockIndex);
+ UsingCachedBlock = true;
+ }
+ }
- BlockRangeDescriptor NextRange{.BlockIndex = BlockIndex};
- while (NeedBlockChunkIndexOffset < BlockChunkIndexNeeded.size() &&
- ChunkBlockIndex < BlockDescription.ChunkRawHashes.size())
+ if (!UsingCachedBlock)
+ {
+ bool WantsToDoPartialBlockDownload = BlockChunkIndexNeeded.size() < BlockDescription.ChunkRawHashes.size();
+ bool CanDoPartialBlockDownload =
+ (BlockDescription.HeaderSize > 0) &&
+ (BlockDescription.ChunkCompressedLengths.size() == BlockDescription.ChunkRawHashes.size());
+ if (AllowPartialBlockRequests && WantsToDoPartialBlockDownload && CanDoPartialBlockDownload)
{
- const uint32_t ChunkCompressedLength = BlockDescription.ChunkCompressedLengths[ChunkBlockIndex];
- if (ChunkBlockIndex < BlockChunkIndexNeeded[NeedBlockChunkIndexOffset])
+ std::vector<BlockRangeDescriptor> BlockRanges;
+
+ ZEN_TRACE_CPU("UpdateFolder_HandleBlocks_PartialAnalysis");
+
+ uint32_t NeedBlockChunkIndexOffset = 0;
+ uint32_t ChunkBlockIndex = 0;
+ uint32_t CurrentOffset =
+ gsl::narrow<uint32_t>(CompressedBuffer::GetHeaderSizeForNoneEncoder() + BlockDescription.HeaderSize);
+
+ const uint64_t TotalBlockSize = std::accumulate(BlockDescription.ChunkCompressedLengths.begin(),
+ BlockDescription.ChunkCompressedLengths.end(),
+ std::uint64_t(CurrentOffset));
+
+ BlockRangeDescriptor NextRange{.BlockIndex = BlockIndex};
+ while (NeedBlockChunkIndexOffset < BlockChunkIndexNeeded.size() &&
+ ChunkBlockIndex < BlockDescription.ChunkRawHashes.size())
{
- if (NextRange.RangeLength > 0)
+ const uint32_t ChunkCompressedLength = BlockDescription.ChunkCompressedLengths[ChunkBlockIndex];
+ if (ChunkBlockIndex < BlockChunkIndexNeeded[NeedBlockChunkIndexOffset])
{
- BlockRanges.push_back(NextRange);
- NextRange = {.BlockIndex = BlockIndex};
+ if (NextRange.RangeLength > 0)
+ {
+ BlockRanges.push_back(NextRange);
+ NextRange = {.BlockIndex = BlockIndex};
+ }
+ ChunkBlockIndex++;
+ CurrentOffset += ChunkCompressedLength;
+ }
+ else if (ChunkBlockIndex == BlockChunkIndexNeeded[NeedBlockChunkIndexOffset])
+ {
+ if (NextRange.RangeLength == 0)
+ {
+ NextRange.RangeStart = CurrentOffset;
+ NextRange.ChunkBlockIndexStart = ChunkBlockIndex;
+ }
+ NextRange.RangeLength += ChunkCompressedLength;
+ NextRange.ChunkBlockIndexCount++;
+ ChunkBlockIndex++;
+ CurrentOffset += ChunkCompressedLength;
+ NeedBlockChunkIndexOffset++;
+ }
+ else
+ {
+ ZEN_ASSERT(false);
}
- ChunkBlockIndex++;
- CurrentOffset += ChunkCompressedLength;
}
- else if (ChunkBlockIndex == BlockChunkIndexNeeded[NeedBlockChunkIndexOffset])
+ if (NextRange.RangeLength > 0)
{
- if (NextRange.RangeLength == 0)
+ BlockRanges.push_back(NextRange);
+ }
+
+ ZEN_ASSERT(!BlockRanges.empty());
+
+ std::vector<BlockRangeDescriptor> CollapsedBlockRanges;
+ auto It = BlockRanges.begin();
+ CollapsedBlockRanges.push_back(*It++);
+ while (It != BlockRanges.end())
+ {
+ BlockRangeDescriptor& LastRange = CollapsedBlockRanges.back();
+ uint64_t Slack = It->RangeStart - (LastRange.RangeStart + LastRange.RangeLength);
+ uint64_t BothRangeSize = It->RangeLength + LastRange.RangeLength;
+ if (Slack <= Max(BothRangeSize / 8, 64u * 1024u)) // Made up heuristic - we'll see how it pans out
+ {
+ LastRange.ChunkBlockIndexCount =
+ (It->ChunkBlockIndexStart + It->ChunkBlockIndexCount) - LastRange.ChunkBlockIndexStart;
+ LastRange.RangeLength = (It->RangeStart + It->RangeLength) - LastRange.RangeStart;
+ }
+ else
{
- NextRange.RangeStart = CurrentOffset;
- NextRange.ChunkBlockIndexStart = ChunkBlockIndex;
+ CollapsedBlockRanges.push_back(*It);
}
- NextRange.RangeLength += ChunkCompressedLength;
- NextRange.ChunkBlockIndexCount++;
- ChunkBlockIndex++;
- CurrentOffset += ChunkCompressedLength;
- NeedBlockChunkIndexOffset++;
+ ++It;
}
- else
+
+ const std::uint64_t WantedSize = std::accumulate(
+ CollapsedBlockRanges.begin(),
+ CollapsedBlockRanges.end(),
+ uint64_t(0),
+ [](uint64_t Current, const BlockRangeDescriptor& Range) { return Current + Range.RangeLength; });
+ ZEN_ASSERT(WantedSize <= TotalBlockSize);
+ if (WantedSize > ((TotalBlockSize * 95) / 100))
{
- ZEN_ASSERT(false);
+ ZEN_CONSOLE_VERBOSE("Using more than 95% ({}) of block {} ({}), requesting full block",
+ NiceBytes(WantedSize),
+ BlockDescription.BlockHash,
+ NiceBytes(TotalBlockSize));
+ TotalRequestCount++;
+ TotalPartWriteCount++;
+
+ FullBlockWorks.push_back(BlockIndex);
}
- }
- if (NextRange.RangeLength > 0)
- {
- BlockRanges.push_back(NextRange);
- }
-
- ZEN_ASSERT(!BlockRanges.empty());
- std::vector<BlockRangeDescriptor> CollapsedBlockRanges;
- auto It = BlockRanges.begin();
- CollapsedBlockRanges.push_back(*It++);
- while (It != BlockRanges.end())
- {
- BlockRangeDescriptor& LastRange = CollapsedBlockRanges.back();
- uint64_t Slack = It->RangeStart - (LastRange.RangeStart + LastRange.RangeLength);
- uint64_t BothRangeSize = It->RangeLength + LastRange.RangeLength;
- if (Slack <= Max(BothRangeSize / 8, 64u * 1024u)) // Made up heuristic - we'll see how it pans out
+ else if ((WantedSize > ((TotalBlockSize * 9) / 10)) && CollapsedBlockRanges.size() > 1)
+ {
+ ZEN_CONSOLE_VERBOSE(
+ "Using more than 90% ({}) of block {} ({}) using {} requests, requesting full block",
+ NiceBytes(WantedSize),
+ BlockDescription.BlockHash,
+ NiceBytes(TotalBlockSize),
+ CollapsedBlockRanges.size());
+ TotalRequestCount++;
+ TotalPartWriteCount++;
+
+ FullBlockWorks.push_back(BlockIndex);
+ }
+ else if ((WantedSize > ((TotalBlockSize * 8) / 10)) && (CollapsedBlockRanges.size() > 16))
+ {
+ ZEN_CONSOLE_VERBOSE(
+ "Using more than 80% ({}) of block {} ({}) using {} requests, requesting full block",
+ NiceBytes(WantedSize),
+ BlockDescription.BlockHash,
+ NiceBytes(TotalBlockSize),
+ CollapsedBlockRanges.size());
+ TotalRequestCount++;
+ TotalPartWriteCount++;
+
+ FullBlockWorks.push_back(BlockIndex);
+ }
+ else if ((WantedSize > ((TotalBlockSize * 7) / 10)) && (CollapsedBlockRanges.size() > 48))
+ {
+ ZEN_CONSOLE_VERBOSE(
+ "Using more than 70% ({}) of block {} ({}) using {} requests, requesting full block",
+ NiceBytes(WantedSize),
+ BlockDescription.BlockHash,
+ NiceBytes(TotalBlockSize),
+ CollapsedBlockRanges.size());
+ TotalRequestCount++;
+ TotalPartWriteCount++;
+
+ FullBlockWorks.push_back(BlockIndex);
+ }
+ else if ((WantedSize > ((TotalBlockSize * 6) / 10)) && (CollapsedBlockRanges.size() > 64))
{
- LastRange.ChunkBlockIndexCount =
- (It->ChunkBlockIndexStart + It->ChunkBlockIndexCount) - LastRange.ChunkBlockIndexStart;
- LastRange.RangeLength = (It->RangeStart + It->RangeLength) - LastRange.RangeStart;
+ ZEN_CONSOLE_VERBOSE(
+ "Using more than 60% ({}) of block {} ({}) using {} requests, requesting full block",
+ NiceBytes(WantedSize),
+ BlockDescription.BlockHash,
+ NiceBytes(TotalBlockSize),
+ CollapsedBlockRanges.size());
+ TotalRequestCount++;
+ TotalPartWriteCount++;
+
+ FullBlockWorks.push_back(BlockIndex);
}
else
{
- CollapsedBlockRanges.push_back(*It);
+ if (WantedSize > ((TotalBlockSize * 5) / 10))
+ {
+ ZEN_CONSOLE_VERBOSE("Using {}% ({}) of block {} ({}) using {} requests, requesting partial block",
+ (WantedSize * 100) / TotalBlockSize,
+ NiceBytes(WantedSize),
+ BlockDescription.BlockHash,
+ NiceBytes(TotalBlockSize),
+ CollapsedBlockRanges.size());
+ }
+ TotalRequestCount += CollapsedBlockRanges.size();
+ TotalPartWriteCount += CollapsedBlockRanges.size();
+
+ BlockRangeWorks.insert(BlockRangeWorks.end(), CollapsedBlockRanges.begin(), CollapsedBlockRanges.end());
}
- ++It;
}
+ else
+ {
+ TotalRequestCount++;
+ TotalPartWriteCount++;
- TotalRequestCount += CollapsedBlockRanges.size();
- TotalPartWriteCount += CollapsedBlockRanges.size();
-
- BlockRangeWorks.insert(BlockRangeWorks.end(), CollapsedBlockRanges.begin(), CollapsedBlockRanges.end());
- }
- else
- {
- TotalRequestCount++;
- TotalPartWriteCount++;
-
- FullBlockWorks.push_back(BlockIndex);
+ FullBlockWorks.push_back(BlockIndex);
+ }
}
}
}
else
{
- ZEN_DEBUG("Skipping block {} due to cache reuse", BlockDescriptions[BlockIndex].BlockHash);
+ ZEN_CONSOLE_VERBOSE("Skipping block {} due to cache reuse", BlockDescriptions[BlockIndex].BlockHash);
}
}
+ struct BlobsExistsResult
+ {
+ tsl::robin_set<IoHash> ExistingBlobs;
+ uint64_t ElapsedTimeMs = 0;
+ };
+
+ BlobsExistsResult ExistsResult;
+
+ if (Storage.BuildCacheStorage)
+ {
+ ZEN_TRACE_CPU("BlobCacheExistCheck");
+ Stopwatch Timer;
+
+ tsl::robin_set<IoHash> BlobHashesSet;
+
+ BlobHashesSet.reserve(LooseChunkHashWorks.size() + FullBlockWorks.size());
+ for (LooseChunkHashWorkData& LooseChunkHashWork : LooseChunkHashWorks)
+ {
+ BlobHashesSet.insert(RemoteContent.ChunkedContent.ChunkHashes[LooseChunkHashWork.RemoteChunkIndex]);
+ }
+ for (const BlockRangeDescriptor& BlockRange : BlockRangeWorks)
+ {
+ const uint32_t BlockIndex = BlockRange.BlockIndex;
+ const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex];
+ BlobHashesSet.insert(BlockDescription.BlockHash);
+ }
+ for (uint32_t BlockIndex : FullBlockWorks)
+ {
+ const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex];
+ BlobHashesSet.insert(BlockDescription.BlockHash);
+ }
+
+ if (!BlobHashesSet.empty())
+ {
+ const std::vector<IoHash> BlobHashes(BlobHashesSet.begin(), BlobHashesSet.end());
+ const std::vector<BuildStorageCache::BlobExistsResult> CacheExistsResult =
+ Storage.BuildCacheStorage->BlobsExists(BuildId, BlobHashes);
+
+ if (CacheExistsResult.size() == BlobHashes.size())
+ {
+ ExistsResult.ExistingBlobs.reserve(CacheExistsResult.size());
+ for (size_t BlobIndex = 0; BlobIndex < BlobHashes.size(); BlobIndex++)
+ {
+ if (CacheExistsResult[BlobIndex].HasBody)
+ {
+ ExistsResult.ExistingBlobs.insert(BlobHashes[BlobIndex]);
+ }
+ }
+ }
+ ExistsResult.ElapsedTimeMs = Timer.GetElapsedTimeMs();
+ if (!ExistsResult.ExistingBlobs.empty())
+ {
+ ZEN_CONSOLE("Remote cache : Found {} out of {} needed blobs in {}",
+ ExistsResult.ExistingBlobs.size(),
+ BlobHashes.size(),
+ NiceTimeSpanMs(ExistsResult.ElapsedTimeMs));
+ }
+ }
+ }
for (uint32_t LooseChunkHashWorkIndex = 0; LooseChunkHashWorkIndex < LooseChunkHashWorks.size(); LooseChunkHashWorkIndex++)
{
if (AbortFlag)
@@ -5118,17 +5660,25 @@ namespace {
std::move(LooseChunkHashWork.ChunkTargetPtrs);
const uint32_t RemoteChunkIndex = LooseChunkHashWork.RemoteChunkIndex;
+ if (PrimeCacheOnly && ExistsResult.ExistingBlobs.contains(RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]))
+ {
+ DownloadStats.RequestsCompleteCount++;
+ continue;
+ }
+
Work.ScheduleWork(
- WritePool, // NetworkPool, // GetSyncWorkerPool(),//
+ WritePool,
[&, RemoteChunkIndex, ChunkTargetPtrs](std::atomic<bool>&) mutable {
if (!AbortFlag)
{
ZEN_TRACE_CPU("UpdateFolder_ReadPreDownloaded");
std::filesystem::path ExistingCompressedChunkPath;
+ if (!PrimeCacheOnly)
{
- const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex];
- std::filesystem::path CompressedChunkPath = Path / ZenTempDownloadFolderName / ChunkHash.ToHexString();
- if (std::filesystem::exists(CompressedChunkPath))
+ const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex];
+ std::filesystem::path CompressedChunkPath =
+ ZenTempDownloadFolderPath(ZenFolderPath) / ChunkHash.ToHexString();
+ if (IsFile(CompressedChunkPath))
{
IoBuffer ExistingCompressedPart = IoBufferBuilder::MakeFromFile(ExistingCompressedChunkPath);
if (ExistingCompressedPart)
@@ -5147,156 +5697,250 @@ namespace {
else
{
std::error_code DummyEc;
- std::filesystem::remove(CompressedChunkPath, DummyEc);
+ RemoveFile(CompressedChunkPath, DummyEc);
}
}
}
}
- if (!ExistingCompressedChunkPath.empty())
+ if (!AbortFlag)
+
{
- Work.ScheduleWork(
- WritePool, // WritePool, GetSyncWorkerPool()
- [&Path,
- &RemoteContent,
- &RemoteLookup,
- &CacheFolderPath,
- &SequenceIndexChunksLeftToWriteCounters,
- &Work,
- &WritePool,
- &DiskStats,
- &WriteChunkStats,
- &WritePartsComplete,
- &TotalPartWriteCount,
- &FilteredWrittenBytesPerSecond,
- RemoteChunkIndex,
- ChunkTargetPtrs,
- CompressedChunkPath = std::move(ExistingCompressedChunkPath)](std::atomic<bool>&) mutable {
- if (!AbortFlag)
- {
- ZEN_TRACE_CPU("UpdateFolder_WritePreDownloaded");
+ if (!ExistingCompressedChunkPath.empty())
+ {
+ Work.ScheduleWork(
+ WritePool,
+ [&Path,
+ &ZenFolderPath,
+ &RemoteContent,
+ &RemoteLookup,
+ &CacheFolderPath,
+ &SequenceIndexChunksLeftToWriteCounters,
+ &Work,
+ &WritePool,
+ &DiskStats,
+ &WriteChunkStats,
+ &WritePartsComplete,
+ TotalPartWriteCount,
+ &FilteredWrittenBytesPerSecond,
+ RemoteChunkIndex,
+ ChunkTargetPtrs,
+ CompressedChunkPath = std::move(ExistingCompressedChunkPath)](std::atomic<bool>&) mutable {
+ if (!AbortFlag)
+ {
+ ZEN_TRACE_CPU("UpdateFolder_WritePreDownloaded");
- FilteredWrittenBytesPerSecond.Start();
+ FilteredWrittenBytesPerSecond.Start();
- const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex];
+ const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex];
- IoBuffer CompressedPart = IoBufferBuilder::MakeFromFile(CompressedChunkPath);
- if (!CompressedPart)
- {
- throw std::runtime_error(fmt::format("Could not open dowloaded compressed chunk {} from {}",
- ChunkHash,
- CompressedChunkPath));
- }
+ IoBuffer CompressedPart = IoBufferBuilder::MakeFromFile(CompressedChunkPath);
+ if (!CompressedPart)
+ {
+ throw std::runtime_error(
+ fmt::format("Could not open dowloaded compressed chunk {} from {}",
+ ChunkHash,
+ CompressedChunkPath));
+ }
+
+ std::filesystem::path TargetFolder = ZenTempCacheFolderPath(ZenFolderPath);
+ bool NeedHashVerify = WriteCompressedChunk(TargetFolder,
+ RemoteContent,
+ RemoteLookup,
+ ChunkHash,
+ ChunkTargetPtrs,
+ std::move(CompressedPart),
+ DiskStats);
+ WritePartsComplete++;
+
+ if (!AbortFlag)
+ {
+ if (WritePartsComplete == TotalPartWriteCount)
+ {
+ FilteredWrittenBytesPerSecond.Stop();
+ }
- std::filesystem::path TargetFolder = Path / ZenTempCacheFolderName;
- bool NeedHashVerify = WriteCompressedChunk(TargetFolder,
- RemoteContent,
- RemoteLookup,
- ChunkHash,
- ChunkTargetPtrs,
- std::move(CompressedPart),
- DiskStats,
- WriteChunkStats);
- WriteChunkStats.ChunkCountWritten++;
- WriteChunkStats.ChunkBytesWritten +=
- RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex];
- WritePartsComplete++;
+ RemoveFile(CompressedChunkPath);
+ std::vector<uint32_t> CompletedSequences =
+ CompleteChunkTargets(ChunkTargetPtrs, SequenceIndexChunksLeftToWriteCounters);
+ if (NeedHashVerify)
+ {
+ VerifyAndCompleteChunkSequencesAsync(TargetFolder,
+ RemoteContent,
+ RemoteLookup,
+ CompletedSequences,
+ Work,
+ WritePool);
+ }
+ else
+ {
+ FinalizeChunkSequences(TargetFolder, RemoteContent, CompletedSequences);
+ }
+ }
+ }
+ },
+ Work.DefaultErrorFunction());
+ }
+ else
+ {
+ Work.ScheduleWork(
+ NetworkPool,
+ [&Path,
+ &ZenFolderPath,
+ &Storage,
+ BuildId,
+ &PrimeCacheOnly,
+ &RemoteContent,
+ &RemoteLookup,
+ &ExistsResult,
+ &SequenceIndexChunksLeftToWriteCounters,
+ &Work,
+ &WritePool,
+ &NetworkPool,
+ &DiskStats,
+ &WriteChunkStats,
+ &WritePartsComplete,
+ TotalPartWriteCount,
+ TotalRequestCount,
+ &FilteredDownloadedBytesPerSecond,
+ &FilteredWrittenBytesPerSecond,
+ LargeAttachmentSize,
+ PreferredMultipartChunkSize,
+ RemoteChunkIndex,
+ ChunkTargetPtrs,
+ &DownloadStats](std::atomic<bool>&) mutable {
if (!AbortFlag)
{
- if (WritePartsComplete == TotalPartWriteCount)
+ const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex];
+ FilteredDownloadedBytesPerSecond.Start();
+ IoBuffer BuildBlob;
+ const bool ExistsInCache =
+ Storage.BuildCacheStorage && ExistsResult.ExistingBlobs.contains(ChunkHash);
+ if (ExistsInCache)
{
- FilteredWrittenBytesPerSecond.Stop();
+ BuildBlob = Storage.BuildCacheStorage->GetBuildBlob(BuildId, ChunkHash);
}
-
- std::filesystem::remove(CompressedChunkPath);
-
- std::vector<uint32_t> CompletedSequences =
- CompleteChunkTargets(ChunkTargetPtrs, SequenceIndexChunksLeftToWriteCounters);
- if (NeedHashVerify)
+ if (BuildBlob)
{
- VerifyAndCompleteChunkSequencesAsync(TargetFolder,
- RemoteContent,
- CompletedSequences,
- Work,
- WritePool);
+ uint64_t BlobSize = BuildBlob.GetSize();
+ DownloadStats.DownloadedChunkCount++;
+ DownloadStats.DownloadedChunkByteCount += BlobSize;
+ DownloadStats.RequestsCompleteCount++;
+ if (DownloadStats.RequestsCompleteCount == TotalRequestCount)
+ {
+ FilteredDownloadedBytesPerSecond.Stop();
+ }
+ AsyncWriteDownloadedChunk(ZenFolderPath,
+ RemoteContent,
+ RemoteLookup,
+ RemoteChunkIndex,
+ std::move(ChunkTargetPtrs),
+ Work,
+ WritePool,
+ std::move(BuildBlob),
+ SequenceIndexChunksLeftToWriteCounters,
+ WritePartsComplete,
+ TotalPartWriteCount,
+ FilteredWrittenBytesPerSecond,
+ DiskStats);
}
else
{
- FinalizeChunkSequences(TargetFolder, RemoteContent, CompletedSequences);
+ if (RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex] >= LargeAttachmentSize)
+ {
+ ZEN_TRACE_CPU("UpdateFolder_GetLargeChunk");
+ DownloadLargeBlob(
+ *Storage.BuildStorage,
+ ZenTempDownloadFolderPath(ZenFolderPath),
+ BuildId,
+ ChunkHash,
+ PreferredMultipartChunkSize,
+ Work,
+ NetworkPool,
+ DownloadStats,
+ [&, RemoteChunkIndex, ChunkTargetPtrs](IoBuffer&& Payload) mutable {
+ DownloadStats.RequestsCompleteCount++;
+ if (DownloadStats.RequestsCompleteCount == TotalRequestCount)
+ {
+ FilteredDownloadedBytesPerSecond.Stop();
+ }
+ if (Payload && Storage.BuildCacheStorage)
+ {
+ Storage.BuildCacheStorage->PutBuildBlob(
+ BuildId,
+ ChunkHash,
+ ZenContentType::kCompressedBinary,
+ CompositeBuffer(SharedBuffer(Payload)));
+ }
+ if (!PrimeCacheOnly)
+ {
+ if (!AbortFlag)
+ {
+ AsyncWriteDownloadedChunk(ZenFolderPath,
+ RemoteContent,
+ RemoteLookup,
+ RemoteChunkIndex,
+ std::move(ChunkTargetPtrs),
+ Work,
+ WritePool,
+ std::move(Payload),
+ SequenceIndexChunksLeftToWriteCounters,
+ WritePartsComplete,
+ TotalPartWriteCount,
+ FilteredWrittenBytesPerSecond,
+ DiskStats);
+ }
+ }
+ });
+ }
+ else
+ {
+ ZEN_TRACE_CPU("UpdateFolder_GetChunk");
+ BuildBlob = Storage.BuildStorage->GetBuildBlob(BuildId, ChunkHash);
+ if (BuildBlob && Storage.BuildCacheStorage)
+ {
+ Storage.BuildCacheStorage->PutBuildBlob(
+ BuildId,
+ ChunkHash,
+ BuildBlob.GetContentType(),
+ CompositeBuffer(SharedBuffer(BuildBlob)));
+ }
+ if (!BuildBlob)
+ {
+ throw std::runtime_error(fmt::format("Chunk {} is missing", ChunkHash));
+ }
+ if (!PrimeCacheOnly)
+ {
+ if (!AbortFlag)
+ {
+ uint64_t BlobSize = BuildBlob.GetSize();
+ DownloadStats.DownloadedChunkCount++;
+ DownloadStats.DownloadedChunkByteCount += BlobSize;
+ DownloadStats.RequestsCompleteCount++;
+ if (DownloadStats.RequestsCompleteCount == TotalRequestCount)
+ {
+ FilteredDownloadedBytesPerSecond.Stop();
+ }
+ AsyncWriteDownloadedChunk(ZenFolderPath,
+ RemoteContent,
+ RemoteLookup,
+ RemoteChunkIndex,
+ std::move(ChunkTargetPtrs),
+ Work,
+ WritePool,
+ std::move(BuildBlob),
+ SequenceIndexChunksLeftToWriteCounters,
+ WritePartsComplete,
+ TotalPartWriteCount,
+ FilteredWrittenBytesPerSecond,
+ DiskStats);
+ }
+ }
+ }
}
}
- }
- },
- Work.DefaultErrorFunction());
- }
- else
- {
- FilteredDownloadedBytesPerSecond.Start();
- const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex];
- if (RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex] >= LargeAttachmentSize)
- {
- ZEN_TRACE_CPU("UpdateFolder_GetLargeChunk");
- DownloadLargeBlob(Storage,
- Path / ZenTempDownloadFolderName,
- BuildId,
- ChunkHash,
- PreferredMultipartChunkSize,
- Work,
- NetworkPool,
- DownloadStats,
- [&, RemoteChunkIndex, ChunkTargetPtrs](IoBuffer&& Payload) mutable {
- DownloadStats.RequestsCompleteCount++;
- if (DownloadStats.RequestsCompleteCount == TotalRequestCount)
- {
- FilteredDownloadedBytesPerSecond.Stop();
- }
- AsyncWriteDownloadedChunk(Path,
- RemoteContent,
- RemoteLookup,
- RemoteChunkIndex,
- std::move(ChunkTargetPtrs),
- Work,
- WritePool,
- std::move(Payload),
- SequenceIndexChunksLeftToWriteCounters,
- WritePartsComplete,
- TotalPartWriteCount,
- FilteredWrittenBytesPerSecond,
- DiskStats,
- WriteChunkStats);
- });
- }
- else
- {
- ZEN_TRACE_CPU("UpdateFolder_GetChunk");
-
- IoBuffer BuildBlob = Storage.GetBuildBlob(BuildId, ChunkHash);
- if (!BuildBlob)
- {
- throw std::runtime_error(fmt::format("Chunk {} is missing", ChunkHash));
- }
- uint64_t BlobSize = BuildBlob.GetSize();
- DownloadStats.DownloadedChunkCount++;
- DownloadStats.DownloadedChunkByteCount += BlobSize;
- DownloadStats.RequestsCompleteCount++;
- if (DownloadStats.RequestsCompleteCount == TotalRequestCount)
- {
- FilteredDownloadedBytesPerSecond.Stop();
- }
- AsyncWriteDownloadedChunk(Path,
- RemoteContent,
- RemoteLookup,
- RemoteChunkIndex,
- std::move(ChunkTargetPtrs),
- Work,
- WritePool,
- std::move(BuildBlob),
- SequenceIndexChunksLeftToWriteCounters,
- WritePartsComplete,
- TotalPartWriteCount,
- FilteredWrittenBytesPerSecond,
- DiskStats,
- WriteChunkStats);
+ },
+ Work.DefaultErrorFunction());
}
}
}
@@ -5306,13 +5950,14 @@ namespace {
for (size_t CopyDataIndex = 0; CopyDataIndex < CacheCopyDatas.size(); CopyDataIndex++)
{
+ ZEN_ASSERT(!PrimeCacheOnly);
if (AbortFlag)
{
break;
}
Work.ScheduleWork(
- WritePool, // GetSyncWorkerPool(),//
+ WritePool,
[&, CopyDataIndex](std::atomic<bool>&) {
if (!AbortFlag)
{
@@ -5439,16 +6084,6 @@ namespace {
ChunkSource,
Op.Target->Offset,
RemoteContent.RawSizes[RemotePathIndex]);
- for (size_t WrittenOpIndex = WriteOpIndex; WrittenOpIndex < WriteOpIndex + WriteCount; WrittenOpIndex++)
- {
- const WriteOp& WrittenOp = WriteOps[WrittenOpIndex];
- if (ChunkIndexesWritten.insert(WrittenOp.ChunkIndex).second)
- {
- WriteChunkStats.ChunkCountWritten++;
- WriteChunkStats.ChunkBytesWritten +=
- RemoteContent.ChunkedContent.ChunkRawSizes[WrittenOp.ChunkIndex];
- }
- }
CacheLocalFileBytesRead += ReadLength; // TODO: This should be the sum of unique chunk sizes?
@@ -5469,10 +6104,13 @@ namespace {
}
VerifyAndCompleteChunkSequencesAsync(CacheFolderPath,
RemoteContent,
+ RemoteLookup,
CompletedChunkSequences,
Work,
WritePool);
- ZEN_DEBUG("Copied {} from {}", NiceBytes(CacheLocalFileBytesRead), LocalContent.Paths[LocalPathIndex]);
+ ZEN_CONSOLE_VERBOSE("Copied {} from {}",
+ NiceBytes(CacheLocalFileBytesRead),
+ LocalContent.Paths[LocalPathIndex]);
}
WritePartsComplete++;
if (WritePartsComplete == TotalPartWriteCount)
@@ -5486,13 +6124,14 @@ namespace {
for (uint32_t BlockIndex : CachedChunkBlockIndexes)
{
+ ZEN_ASSERT(!PrimeCacheOnly);
if (AbortFlag)
{
break;
}
Work.ScheduleWork(
- WritePool, // GetSyncWorkerPool(), // WritePool,
+ WritePool,
[&, BlockIndex](std::atomic<bool>&) mutable {
if (!AbortFlag)
{
@@ -5501,35 +6140,38 @@ namespace {
const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex];
FilteredWrittenBytesPerSecond.Start();
- std::filesystem::path BlockChunkPath = Path / ZenTempBlockFolderName / BlockDescription.BlockHash.ToHexString();
- IoBuffer BlockBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath);
+ std::filesystem::path BlockChunkPath =
+ ZenTempBlockFolderPath(ZenFolderPath) / BlockDescription.BlockHash.ToHexString();
+ IoBuffer BlockBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath);
if (!BlockBuffer)
{
throw std::runtime_error(
fmt::format("Can not read block {} at {}", BlockDescription.BlockHash, BlockChunkPath));
}
- if (!WriteBlockToDisk(CacheFolderPath,
- RemoteContent,
- BlockDescription,
- SequenceIndexChunksLeftToWriteCounters,
- Work,
- WritePool,
- CompositeBuffer(std::move(BlockBuffer)),
- RemoteLookup,
- RemoteChunkIndexNeedsCopyFromSourceFlags,
- DiskStats,
- WriteChunkStats))
- {
- std::error_code DummyEc;
- std::filesystem::remove(BlockChunkPath, DummyEc);
- throw std::runtime_error(fmt::format("Block {} is malformed", BlockDescription.BlockHash));
- }
- WritePartsComplete++;
- std::filesystem::remove(BlockChunkPath);
- if (WritePartsComplete == TotalPartWriteCount)
+ if (!AbortFlag)
{
- FilteredWrittenBytesPerSecond.Stop();
+ if (!WriteBlockToDisk(CacheFolderPath,
+ RemoteContent,
+ BlockDescription,
+ SequenceIndexChunksLeftToWriteCounters,
+ Work,
+ WritePool,
+ CompositeBuffer(std::move(BlockBuffer)),
+ RemoteLookup,
+ RemoteChunkIndexNeedsCopyFromSourceFlags,
+ DiskStats))
+ {
+ std::error_code DummyEc;
+ RemoveFile(BlockChunkPath, DummyEc);
+ throw std::runtime_error(fmt::format("Block {} is malformed", BlockDescription.BlockHash));
+ }
+ WritePartsComplete++;
+ RemoveFile(BlockChunkPath);
+ if (WritePartsComplete == TotalPartWriteCount)
+ {
+ FilteredWrittenBytesPerSecond.Stop();
+ }
}
}
},
@@ -5538,6 +6180,7 @@ namespace {
for (size_t BlockRangeIndex = 0; BlockRangeIndex < BlockRangeWorks.size(); BlockRangeIndex++)
{
+ ZEN_ASSERT(!PrimeCacheOnly);
if (AbortFlag)
{
break;
@@ -5546,7 +6189,7 @@ namespace {
ZEN_ASSERT(BlockRange.BlockIndex != (uint32_t)-1);
const uint32_t BlockIndex = BlockRange.BlockIndex;
Work.ScheduleWork(
- NetworkPool, // NetworkPool, // GetSyncWorkerPool()
+ NetworkPool,
[&, BlockIndex, BlockRange](std::atomic<bool>&) {
if (!AbortFlag)
{
@@ -5555,131 +6198,146 @@ namespace {
const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex];
FilteredDownloadedBytesPerSecond.Start();
- IoBuffer BlockBuffer =
- Storage.GetBuildBlob(BuildId, BlockDescription.BlockHash, BlockRange.RangeStart, BlockRange.RangeLength);
+ IoBuffer BlockBuffer;
+ if (Storage.BuildCacheStorage && ExistsResult.ExistingBlobs.contains(BlockDescription.BlockHash))
+ {
+ BlockBuffer = Storage.BuildCacheStorage->GetBuildBlob(BuildId,
+ BlockDescription.BlockHash,
+ BlockRange.RangeStart,
+ BlockRange.RangeLength);
+ }
if (!BlockBuffer)
{
- throw std::runtime_error(fmt::format("Block {} is missing", BlockDescription.BlockHash));
+ BlockBuffer = Storage.BuildStorage->GetBuildBlob(BuildId,
+ BlockDescription.BlockHash,
+ BlockRange.RangeStart,
+ BlockRange.RangeLength);
}
- uint64_t BlockSize = BlockBuffer.GetSize();
- DownloadStats.DownloadedBlockCount++;
- DownloadStats.DownloadedBlockByteCount += BlockSize;
- DownloadStats.RequestsCompleteCount++;
- if (DownloadStats.RequestsCompleteCount == TotalRequestCount)
+ if (!BlockBuffer)
{
- FilteredDownloadedBytesPerSecond.Stop();
+ throw std::runtime_error(fmt::format("Block {} is missing", BlockDescription.BlockHash));
}
-
- std::filesystem::path BlockChunkPath;
-
- // Check if the dowloaded block is file based and we can move it directly without rewriting it
+ if (!AbortFlag)
{
- IoBufferFileReference FileRef;
- if (BlockBuffer.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) &&
- (FileRef.FileChunkSize == BlockSize))
+ uint64_t BlockSize = BlockBuffer.GetSize();
+ DownloadStats.DownloadedBlockCount++;
+ DownloadStats.DownloadedBlockByteCount += BlockSize;
+ DownloadStats.RequestsCompleteCount++;
+ if (DownloadStats.RequestsCompleteCount == TotalRequestCount)
{
- ZEN_TRACE_CPU("UpdateFolder_MoveTempBlock");
+ FilteredDownloadedBytesPerSecond.Stop();
+ }
- std::error_code Ec;
- std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec);
- if (!Ec)
+ std::filesystem::path BlockChunkPath;
+
+ // Check if the dowloaded block is file based and we can move it directly without rewriting it
+ {
+ IoBufferFileReference FileRef;
+ if (BlockBuffer.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) &&
+ (FileRef.FileChunkSize == BlockSize))
{
- BlockBuffer.SetDeleteOnClose(false);
- BlockBuffer = {};
- BlockChunkPath = Path / ZenTempBlockFolderName /
- fmt::format("{}_{:x}_{:x}",
- BlockDescription.BlockHash,
- BlockRange.RangeStart,
- BlockRange.RangeLength);
- std::filesystem::rename(TempBlobPath, BlockChunkPath, Ec);
- if (Ec)
+ ZEN_TRACE_CPU("UpdateFolder_MoveTempBlock");
+
+ std::error_code Ec;
+ std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec);
+ if (!Ec)
{
- BlockChunkPath = std::filesystem::path{};
+ BlockBuffer.SetDeleteOnClose(false);
+ BlockBuffer = {};
+ BlockChunkPath = ZenTempBlockFolderPath(ZenFolderPath) / fmt::format("{}_{:x}_{:x}",
+ BlockDescription.BlockHash,
+ BlockRange.RangeStart,
+ BlockRange.RangeLength);
+ RenameFile(TempBlobPath, BlockChunkPath, Ec);
+ if (Ec)
+ {
+ BlockChunkPath = std::filesystem::path{};
- // Re-open the temp file again
- BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete);
- BlockBuffer = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, BlockSize, true);
- BlockBuffer.SetDeleteOnClose(true);
+ // Re-open the temp file again
+ BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete);
+ BlockBuffer = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, BlockSize, true);
+ BlockBuffer.SetDeleteOnClose(true);
+ }
}
}
}
- }
- if (BlockChunkPath.empty() && (BlockSize > 512u * 1024u))
- {
- ZEN_TRACE_CPU("UpdateFolder_WriteTempBlock");
- // Could not be moved and rather large, lets store it on disk
- BlockChunkPath =
- Path / ZenTempBlockFolderName /
- fmt::format("{}_{:x}_{:x}", BlockDescription.BlockHash, BlockRange.RangeStart, BlockRange.RangeLength);
- TemporaryFile::SafeWriteFile(BlockChunkPath, BlockBuffer);
- BlockBuffer = {};
- }
+ if (BlockChunkPath.empty() && (BlockSize > 512u * 1024u))
+ {
+ ZEN_TRACE_CPU("UpdateFolder_WriteTempBlock");
+ // Could not be moved and rather large, lets store it on disk
+ BlockChunkPath = ZenTempBlockFolderPath(ZenFolderPath) / fmt::format("{}_{:x}_{:x}",
+ BlockDescription.BlockHash,
+ BlockRange.RangeStart,
+ BlockRange.RangeLength);
+ TemporaryFile::SafeWriteFile(BlockChunkPath, BlockBuffer);
+ BlockBuffer = {};
+ }
- if (!AbortFlag)
- {
- Work.ScheduleWork(
- WritePool, // WritePool, // GetSyncWorkerPool(),
- [&, BlockIndex, BlockRange, BlockChunkPath, BlockPartialBuffer = std::move(BlockBuffer)](
- std::atomic<bool>&) mutable {
- if (!AbortFlag)
- {
- ZEN_TRACE_CPU("UpdateFolder_WritePartialBlock");
+ if (!AbortFlag)
+ {
+ Work.ScheduleWork(
+ WritePool,
+ [&, BlockIndex, BlockRange, BlockChunkPath, BlockPartialBuffer = std::move(BlockBuffer)](
+ std::atomic<bool>&) mutable {
+ if (!AbortFlag)
+ {
+ ZEN_TRACE_CPU("UpdateFolder_WritePartialBlock");
- const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex];
+ const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex];
- if (BlockChunkPath.empty())
- {
- ZEN_ASSERT(BlockPartialBuffer);
- }
- else
- {
- ZEN_ASSERT(!BlockPartialBuffer);
- BlockPartialBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath);
- if (!BlockPartialBuffer)
+ if (BlockChunkPath.empty())
{
- throw std::runtime_error(fmt::format("Could not open downloaded block {} from {}",
- BlockDescription.BlockHash,
- BlockChunkPath));
+ ZEN_ASSERT(BlockPartialBuffer);
+ }
+ else
+ {
+ ZEN_ASSERT(!BlockPartialBuffer);
+ BlockPartialBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath);
+ if (!BlockPartialBuffer)
+ {
+ throw std::runtime_error(fmt::format("Could not open downloaded block {} from {}",
+ BlockDescription.BlockHash,
+ BlockChunkPath));
+ }
}
- }
- FilteredWrittenBytesPerSecond.Start();
-
- if (!WritePartialBlockToDisk(
- CacheFolderPath,
- RemoteContent,
- BlockDescription,
- SequenceIndexChunksLeftToWriteCounters,
- Work,
- WritePool,
- CompositeBuffer(std::move(BlockPartialBuffer)),
- BlockRange.ChunkBlockIndexStart,
- BlockRange.ChunkBlockIndexStart + BlockRange.ChunkBlockIndexCount - 1,
- RemoteLookup,
- RemoteChunkIndexNeedsCopyFromSourceFlags,
- DiskStats,
- WriteChunkStats))
- {
- std::error_code DummyEc;
- std::filesystem::remove(BlockChunkPath, DummyEc);
- throw std::runtime_error(
- fmt::format("Partial block {} is malformed", BlockDescription.BlockHash));
- }
+ FilteredWrittenBytesPerSecond.Start();
+
+ if (!WritePartialBlockToDisk(
+ CacheFolderPath,
+ RemoteContent,
+ BlockDescription,
+ SequenceIndexChunksLeftToWriteCounters,
+ Work,
+ WritePool,
+ CompositeBuffer(std::move(BlockPartialBuffer)),
+ BlockRange.ChunkBlockIndexStart,
+ BlockRange.ChunkBlockIndexStart + BlockRange.ChunkBlockIndexCount - 1,
+ RemoteLookup,
+ RemoteChunkIndexNeedsCopyFromSourceFlags,
+ DiskStats))
+ {
+ std::error_code DummyEc;
+ RemoveFile(BlockChunkPath, DummyEc);
+ throw std::runtime_error(
+ fmt::format("Partial block {} is malformed", BlockDescription.BlockHash));
+ }
- if (!BlockChunkPath.empty())
- {
- std::filesystem::remove(BlockChunkPath);
- }
+ if (!BlockChunkPath.empty())
+ {
+ RemoveFile(BlockChunkPath);
+ }
- WritePartsComplete++;
- if (WritePartsComplete == TotalPartWriteCount)
- {
- FilteredWrittenBytesPerSecond.Stop();
+ WritePartsComplete++;
+ if (WritePartsComplete == TotalPartWriteCount)
+ {
+ FilteredWrittenBytesPerSecond.Stop();
+ }
}
- }
- },
- Work.DefaultErrorFunction());
+ },
+ Work.DefaultErrorFunction());
+ }
}
}
},
@@ -5692,8 +6350,15 @@ namespace {
{
break;
}
+
+ if (PrimeCacheOnly && ExistsResult.ExistingBlobs.contains(BlockDescriptions[BlockIndex].BlockHash))
+ {
+ DownloadStats.RequestsCompleteCount++;
+ continue;
+ }
+
Work.ScheduleWork(
- NetworkPool, // GetSyncWorkerPool(), // NetworkPool,
+ NetworkPool,
[&, BlockIndex](std::atomic<bool>&) {
if (!AbortFlag)
{
@@ -5702,133 +6367,159 @@ namespace {
const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex];
FilteredDownloadedBytesPerSecond.Start();
- IoBuffer BlockBuffer = Storage.GetBuildBlob(BuildId, BlockDescription.BlockHash);
- if (!BlockBuffer)
- {
- throw std::runtime_error(fmt::format("Block {} is missing", BlockDescription.BlockHash));
- }
- uint64_t BlockSize = BlockBuffer.GetSize();
- DownloadStats.DownloadedBlockCount++;
- DownloadStats.DownloadedBlockByteCount += BlockSize;
- DownloadStats.RequestsCompleteCount++;
- if (DownloadStats.RequestsCompleteCount == TotalRequestCount)
+
+ IoBuffer BlockBuffer;
+ const bool ExistsInCache =
+ Storage.BuildCacheStorage && ExistsResult.ExistingBlobs.contains(BlockDescription.BlockHash);
+ if (ExistsInCache)
{
- FilteredDownloadedBytesPerSecond.Stop();
+ BlockBuffer = Storage.BuildCacheStorage->GetBuildBlob(BuildId, BlockDescription.BlockHash);
}
-
- std::filesystem::path BlockChunkPath;
-
- // Check if the dowloaded block is file based and we can move it directly without rewriting it
+ if (!BlockBuffer)
{
- IoBufferFileReference FileRef;
- if (BlockBuffer.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) &&
- (FileRef.FileChunkSize == BlockSize))
+ BlockBuffer = Storage.BuildStorage->GetBuildBlob(BuildId, BlockDescription.BlockHash);
+ if (BlockBuffer && Storage.BuildCacheStorage)
{
- ZEN_TRACE_CPU("UpdateFolder_MoveTempBlock");
- std::error_code Ec;
- std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec);
- if (!Ec)
- {
- BlockBuffer.SetDeleteOnClose(false);
- BlockBuffer = {};
- BlockChunkPath = Path / ZenTempBlockFolderName / BlockDescription.BlockHash.ToHexString();
- std::filesystem::rename(TempBlobPath, BlockChunkPath, Ec);
- if (Ec)
- {
- BlockChunkPath = std::filesystem::path{};
-
- // Re-open the temp file again
- BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete);
- BlockBuffer = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, BlockSize, true);
- BlockBuffer.SetDeleteOnClose(true);
- }
- }
+ Storage.BuildCacheStorage->PutBuildBlob(BuildId,
+ BlockDescription.BlockHash,
+ BlockBuffer.GetContentType(),
+ CompositeBuffer(SharedBuffer(BlockBuffer)));
}
}
-
- if (BlockChunkPath.empty() && (BlockSize > 512u * 1024u))
+ if (!BlockBuffer)
{
- ZEN_TRACE_CPU("UpdateFolder_WriteTempBlock");
- // Could not be moved and rather large, lets store it on disk
- BlockChunkPath = Path / ZenTempBlockFolderName / BlockDescription.BlockHash.ToHexString();
- TemporaryFile::SafeWriteFile(BlockChunkPath, BlockBuffer);
- BlockBuffer = {};
+ throw std::runtime_error(fmt::format("Block {} is missing", BlockDescription.BlockHash));
}
-
if (!AbortFlag)
{
- Work.ScheduleWork(
- WritePool, // WritePool, GetSyncWorkerPool()
- [&Work,
- &WritePool,
- &RemoteContent,
- &RemoteLookup,
- CacheFolderPath,
- &RemoteChunkIndexNeedsCopyFromSourceFlags,
- &SequenceIndexChunksLeftToWriteCounters,
- BlockIndex,
- &BlockDescriptions,
- &WriteChunkStats,
- &DiskStats,
- &WritePartsComplete,
- &TotalPartWriteCount,
- &FilteredWrittenBytesPerSecond,
- BlockChunkPath,
- BlockBuffer = std::move(BlockBuffer)](std::atomic<bool>&) mutable {
- if (!AbortFlag)
- {
- ZEN_TRACE_CPU("UpdateFolder_WriteFullBlock");
+ uint64_t BlockSize = BlockBuffer.GetSize();
+ DownloadStats.DownloadedBlockCount++;
+ DownloadStats.DownloadedBlockByteCount += BlockSize;
+ DownloadStats.RequestsCompleteCount++;
+ if (DownloadStats.RequestsCompleteCount == TotalRequestCount)
+ {
+ FilteredDownloadedBytesPerSecond.Stop();
+ }
- const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex];
+ if (!PrimeCacheOnly)
+ {
+ std::filesystem::path BlockChunkPath;
- if (BlockChunkPath.empty())
- {
- ZEN_ASSERT(BlockBuffer);
- }
- else
+ // Check if the dowloaded block is file based and we can move it directly without rewriting it
+ {
+ IoBufferFileReference FileRef;
+ if (BlockBuffer.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) &&
+ (FileRef.FileChunkSize == BlockSize))
+ {
+ ZEN_TRACE_CPU("UpdateFolder_MoveTempBlock");
+ std::error_code Ec;
+ std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec);
+ if (!Ec)
{
- ZEN_ASSERT(!BlockBuffer);
- BlockBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath);
- if (!BlockBuffer)
+ BlockBuffer.SetDeleteOnClose(false);
+ BlockBuffer = {};
+ BlockChunkPath =
+ ZenTempBlockFolderPath(ZenFolderPath) / BlockDescription.BlockHash.ToHexString();
+ RenameFile(TempBlobPath, BlockChunkPath, Ec);
+ if (Ec)
{
- throw std::runtime_error(fmt::format("Could not open dowloaded block {} from {}",
- BlockDescription.BlockHash,
- BlockChunkPath));
+ BlockChunkPath = std::filesystem::path{};
+
+ // Re-open the temp file again
+ BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete);
+ BlockBuffer = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, BlockSize, true);
+ BlockBuffer.SetDeleteOnClose(true);
}
}
+ }
+ }
- FilteredWrittenBytesPerSecond.Start();
- if (!WriteBlockToDisk(CacheFolderPath,
- RemoteContent,
- BlockDescription,
- SequenceIndexChunksLeftToWriteCounters,
- Work,
- WritePool,
- CompositeBuffer(std::move(BlockBuffer)),
- RemoteLookup,
- RemoteChunkIndexNeedsCopyFromSourceFlags,
- DiskStats,
- WriteChunkStats))
- {
- std::error_code DummyEc;
- std::filesystem::remove(BlockChunkPath, DummyEc);
- throw std::runtime_error(fmt::format("Block {} is malformed", BlockDescription.BlockHash));
- }
+ if (BlockChunkPath.empty() && (BlockSize > 512u * 1024u))
+ {
+ ZEN_TRACE_CPU("UpdateFolder_WriteTempBlock");
+ // Could not be moved and rather large, lets store it on disk
+ BlockChunkPath = ZenTempBlockFolderPath(ZenFolderPath) / BlockDescription.BlockHash.ToHexString();
+ TemporaryFile::SafeWriteFile(BlockChunkPath, BlockBuffer);
+ BlockBuffer = {};
+ }
- if (!BlockChunkPath.empty())
- {
- std::filesystem::remove(BlockChunkPath);
- }
+ if (!AbortFlag)
+ {
+ Work.ScheduleWork(
+ WritePool,
+ [&Work,
+ &WritePool,
+ &RemoteContent,
+ &RemoteLookup,
+ CacheFolderPath,
+ &RemoteChunkIndexNeedsCopyFromSourceFlags,
+ &SequenceIndexChunksLeftToWriteCounters,
+ BlockIndex,
+ &BlockDescriptions,
+ &WriteChunkStats,
+ &DiskStats,
+ &WritePartsComplete,
+ TotalPartWriteCount,
+ &FilteredWrittenBytesPerSecond,
+ BlockChunkPath,
+ BlockBuffer = std::move(BlockBuffer)](std::atomic<bool>&) mutable {
+ if (!AbortFlag)
+ {
+ ZEN_TRACE_CPU("UpdateFolder_WriteFullBlock");
- WritePartsComplete++;
+ const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex];
- if (WritePartsComplete == TotalPartWriteCount)
- {
- FilteredWrittenBytesPerSecond.Stop();
- }
- }
- },
- Work.DefaultErrorFunction());
+ if (BlockChunkPath.empty())
+ {
+ ZEN_ASSERT(BlockBuffer);
+ }
+ else
+ {
+ ZEN_ASSERT(!BlockBuffer);
+ BlockBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath);
+ if (!BlockBuffer)
+ {
+ throw std::runtime_error(
+ fmt::format("Could not open dowloaded block {} from {}",
+ BlockDescription.BlockHash,
+ BlockChunkPath));
+ }
+ }
+
+ FilteredWrittenBytesPerSecond.Start();
+ if (!WriteBlockToDisk(CacheFolderPath,
+ RemoteContent,
+ BlockDescription,
+ SequenceIndexChunksLeftToWriteCounters,
+ Work,
+ WritePool,
+ CompositeBuffer(std::move(BlockBuffer)),
+ RemoteLookup,
+ RemoteChunkIndexNeedsCopyFromSourceFlags,
+ DiskStats))
+ {
+ std::error_code DummyEc;
+ RemoveFile(BlockChunkPath, DummyEc);
+ throw std::runtime_error(
+ fmt::format("Block {} is malformed", BlockDescription.BlockHash));
+ }
+
+ if (!BlockChunkPath.empty())
+ {
+ RemoveFile(BlockChunkPath);
+ }
+
+ WritePartsComplete++;
+
+ if (WritePartsComplete == TotalPartWriteCount)
+ {
+ FilteredWrittenBytesPerSecond.Stop();
+ }
+ }
+ },
+ Work.DefaultErrorFunction());
+ }
+ }
}
}
},
@@ -5840,26 +6531,32 @@ namespace {
Work.Wait(UsePlainProgress ? 5000 : 200, [&](bool IsAborted, std::ptrdiff_t PendingWork) {
ZEN_UNUSED(IsAborted, PendingWork);
- ZEN_ASSERT(ChunkCountToWrite >= WriteChunkStats.ChunkCountWritten.load());
uint64_t DownloadedBytes = DownloadStats.DownloadedChunkByteCount.load() +
DownloadStats.DownloadedBlockByteCount.load() +
+DownloadStats.DownloadedPartialBlockByteCount.load();
FilteredWrittenBytesPerSecond.Update(DiskStats.WriteByteCount.load());
FilteredDownloadedBytesPerSecond.Update(DownloadedBytes);
- std::string Details = fmt::format("{}/{} ({} {}bits/s) downloaded. {}/{} ({} {}B/s) written.",
- DownloadStats.RequestsCompleteCount.load(),
- TotalRequestCount,
- NiceBytes(DownloadedBytes),
- NiceNum(FilteredDownloadedBytesPerSecond.GetCurrent() * 8),
- WriteChunkStats.ChunkCountWritten.load(),
- ChunkCountToWrite,
- NiceBytes(DiskStats.WriteByteCount.load()),
- NiceNum(FilteredWrittenBytesPerSecond.GetCurrent()));
+ std::string DownloadRateString =
+ (DownloadStats.RequestsCompleteCount == TotalRequestCount)
+ ? ""
+ : fmt::format(" {}bits/s", NiceNum(FilteredDownloadedBytesPerSecond.GetCurrent() * 8));
+ std::string WriteDetails = PrimeCacheOnly ? ""
+ : fmt::format(" {}/{} ({}B/s) written.",
+ NiceBytes(DiskStats.WriteByteCount.load()),
+ NiceBytes(BytesToWrite),
+ NiceNum(FilteredWrittenBytesPerSecond.GetCurrent()));
+ std::string Details = fmt::format("{}/{} ({}{}) downloaded.{}",
+ DownloadStats.RequestsCompleteCount.load(),
+ TotalRequestCount,
+ NiceBytes(DownloadedBytes),
+ DownloadRateString,
+ WriteDetails);
WriteProgressBar.UpdateState(
- {.Task = "Writing chunks ",
+ {.Task = PrimeCacheOnly ? "Downloading " : "Writing chunks ",
.Details = Details,
- .TotalCount = gsl::narrow<uint64_t>(ChunkCountToWrite),
- .RemainingCount = gsl::narrow<uint64_t>(ChunkCountToWrite - WriteChunkStats.ChunkCountWritten.load())},
+ .TotalCount = PrimeCacheOnly ? TotalRequestCount : BytesToWrite,
+ .RemainingCount = PrimeCacheOnly ? (TotalRequestCount - DownloadStats.RequestsCompleteCount.load())
+ : (BytesToWrite - DiskStats.WriteByteCount.load())},
false);
});
}
@@ -5874,21 +6571,28 @@ namespace {
WriteProgressBar.Finish();
- uint32_t RawSequencesMissingWriteCount = 0;
- for (uint32_t SequenceIndex = 0; SequenceIndex < SequenceIndexChunksLeftToWriteCounters.size(); SequenceIndex++)
+ if (!PrimeCacheOnly)
{
- const auto& SequenceIndexChunksLeftToWriteCounter = SequenceIndexChunksLeftToWriteCounters[SequenceIndex];
- if (SequenceIndexChunksLeftToWriteCounter.load() != 0)
+ uint32_t RawSequencesMissingWriteCount = 0;
+ for (uint32_t SequenceIndex = 0; SequenceIndex < SequenceIndexChunksLeftToWriteCounters.size(); SequenceIndex++)
{
- RawSequencesMissingWriteCount++;
- const uint32_t PathIndex = RemoteLookup.SequenceIndexFirstPathIndex[SequenceIndex];
- const std::filesystem::path& IncompletePath = RemoteContent.Paths[PathIndex];
- ZEN_ASSERT(!IncompletePath.empty());
- const uint32_t ExpectedSequenceCount = RemoteContent.ChunkedContent.ChunkCounts[SequenceIndex];
- ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounter.load() <= ExpectedSequenceCount);
+ const auto& SequenceIndexChunksLeftToWriteCounter = SequenceIndexChunksLeftToWriteCounters[SequenceIndex];
+ if (SequenceIndexChunksLeftToWriteCounter.load() != 0)
+ {
+ RawSequencesMissingWriteCount++;
+ const uint32_t PathIndex = RemoteLookup.SequenceIndexFirstPathIndex[SequenceIndex];
+ const std::filesystem::path& IncompletePath = RemoteContent.Paths[PathIndex];
+ ZEN_ASSERT(!IncompletePath.empty());
+ const uint32_t ExpectedSequenceCount = RemoteContent.ChunkedContent.ChunkCounts[SequenceIndex];
+ ZEN_CONSOLE("{}: Max count {}, Current count {}",
+ IncompletePath,
+ ExpectedSequenceCount,
+ SequenceIndexChunksLeftToWriteCounter.load());
+ ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounter.load() <= ExpectedSequenceCount);
+ }
}
+ ZEN_ASSERT(RawSequencesMissingWriteCount == 0);
}
- ZEN_ASSERT(RawSequencesMissingWriteCount == 0);
const uint64_t DownloadedBytes = DownloadStats.DownloadedChunkByteCount.load() + DownloadStats.DownloadedBlockByteCount.load() +
+DownloadStats.DownloadedPartialBlockByteCount.load();
@@ -5906,82 +6610,199 @@ namespace {
WriteChunkStats.WriteTimeUs = FilteredWrittenBytesPerSecond.GetElapsedTimeUS();
}
- // Move all files we will reuse to cache folder
- // TODO: If WipeTargetFolder is false we could check which files are already correct and leave them in place
- if (!LocalPathIndexesMatchingSequenceIndexes.empty())
+ if (PrimeCacheOnly)
{
- ZEN_TRACE_CPU("UpdateFolder_CacheReused");
- uint64_t TotalFullFileSizeCached = 0;
- for (uint32_t LocalPathIndex : LocalPathIndexesMatchingSequenceIndexes)
- {
- const IoHash& RawHash = LocalContent.RawHashes[LocalPathIndex];
- const std::filesystem::path LocalFilePath = (Path / LocalContent.Paths[LocalPathIndex]).make_preferred();
- const std::filesystem::path CacheFilePath = GetFinalChunkedSequenceFileName(CacheFolderPath, RawHash);
- ZEN_ASSERT_SLOW(std::filesystem::exists(LocalFilePath));
- SetFileReadOnly(LocalFilePath, false);
- ZEN_ASSERT_SLOW(!std::filesystem::exists(CacheFilePath));
- std::filesystem::rename(LocalFilePath, CacheFilePath);
- TotalFullFileSizeCached += std::filesystem::file_size(CacheFilePath);
- }
- ZEN_CONSOLE("Saved {} ({}) unchanged files in cache",
- LocalPathIndexesMatchingSequenceIndexes.size(),
- NiceBytes(TotalFullFileSizeCached));
+ return;
}
- if (WipeTargetFolder)
- {
- ZEN_TRACE_CPU("UpdateFolder_WipeTarget");
- Stopwatch Timer;
+ tsl::robin_map<uint32_t, uint32_t> RemotePathIndexToLocalPathIndex;
+ RemotePathIndexToLocalPathIndex.reserve(RemoteContent.Paths.size());
- // Clean target folder
- ZEN_CONSOLE("Wiping {}", Path);
- if (!CleanDirectory(Path, DefaultExcludeFolders))
- {
- ZEN_WARN("Some files in {} could not be removed", Path);
- }
- RebuildFolderStateStats.CleanFolderElapsedWallTimeUs = Timer.GetElapsedTimeUs();
+ tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> SequenceHashToLocalPathIndex;
+ std::vector<uint32_t> RemoveLocalPathIndexes;
+
+ if (AbortFlag)
+ {
+ return;
}
- else
+
{
- ZEN_TRACE_CPU("UpdateFolder_RemoveUnused");
- Stopwatch Timer;
+ ZEN_TRACE_CPU("UpdateFolder_PrepareTarget");
- // Remove unused tracked files
- tsl::robin_map<std::string, uint32_t> RemotePathToRemoteIndex;
+ tsl::robin_set<IoHash, IoHash::Hasher> CachedRemoteSequences;
+ tsl::robin_map<std::string, uint32_t> RemotePathToRemoteIndex;
RemotePathToRemoteIndex.reserve(RemoteContent.Paths.size());
for (uint32_t RemotePathIndex = 0; RemotePathIndex < RemoteContent.Paths.size(); RemotePathIndex++)
{
RemotePathToRemoteIndex.insert({RemoteContent.Paths[RemotePathIndex].generic_string(), RemotePathIndex});
}
- std::vector<std::filesystem::path> LocalFilesToRemove;
+
+ std::vector<uint32_t> FilesToCache;
+
+ uint64_t MatchCount = 0;
+ uint64_t PathMismatchCount = 0;
+ uint64_t HashMismatchCount = 0;
+ std::atomic<uint64_t> CachedCount = 0;
+ std::atomic<uint64_t> CachedByteCount = 0;
+ uint64_t SkippedCount = 0;
+ uint64_t DeleteCount = 0;
for (uint32_t LocalPathIndex = 0; LocalPathIndex < LocalContent.Paths.size(); LocalPathIndex++)
{
- if (!RemotePathToRemoteIndex.contains(LocalContent.Paths[LocalPathIndex].generic_string()))
+ if (AbortFlag)
+ {
+ break;
+ }
+ const IoHash& RawHash = LocalContent.RawHashes[LocalPathIndex];
+ const std::filesystem::path& LocalPath = LocalContent.Paths[LocalPathIndex];
+
+ ZEN_ASSERT_SLOW(IsFile((Path / LocalContent.Paths[LocalPathIndex]).make_preferred()));
+
+ if (!WipeTargetFolder)
{
- const std::filesystem::path LocalFilePath = (Path / LocalContent.Paths[LocalPathIndex]).make_preferred();
- if (std::filesystem::exists(LocalFilePath))
+ if (auto RemotePathIt = RemotePathToRemoteIndex.find(LocalPath.generic_string());
+ RemotePathIt != RemotePathToRemoteIndex.end())
{
- LocalFilesToRemove.emplace_back(std::move(LocalFilePath));
+ const uint32_t RemotePathIndex = RemotePathIt->second;
+ if (RemoteContent.RawHashes[RemotePathIndex] == RawHash)
+ {
+ // It is already in it's desired place
+ RemotePathIndexToLocalPathIndex[RemotePathIndex] = LocalPathIndex;
+ SequenceHashToLocalPathIndex.insert({RawHash, LocalPathIndex});
+ MatchCount++;
+ continue;
+ }
+ else
+ {
+ HashMismatchCount++;
+ }
+ }
+ else
+ {
+ PathMismatchCount++;
}
}
+ if (RemoteLookup.RawHashToSequenceIndex.contains(RawHash))
+ {
+ if (!CachedRemoteSequences.contains(RawHash))
+ {
+ ZEN_TRACE_CPU("MoveToCache");
+ // We need it
+ FilesToCache.push_back(LocalPathIndex);
+ CachedRemoteSequences.insert(RawHash);
+ }
+ else
+ {
+ // We already have it
+ SkippedCount++;
+ }
+ }
+ else if (!WipeTargetFolder)
+ {
+ // We don't need it
+ RemoveLocalPathIndexes.push_back(LocalPathIndex);
+ DeleteCount++;
+ }
+ }
+
+ if (AbortFlag)
+ {
+ return;
}
- if (!LocalFilesToRemove.empty())
+
{
- ZEN_CONSOLE("Cleaning {} removed files from {}", LocalFilesToRemove.size(), Path);
- for (const std::filesystem::path& LocalFilePath : LocalFilesToRemove)
+ ZEN_TRACE_CPU("UpdateFolder_CopyToCache");
+
+ Stopwatch Timer;
+
+ WorkerThreadPool& WritePool = GetIOWorkerPool();
+
+ ProgressBar CacheLocalProgressBar(UsePlainProgress);
+ ParallellWork Work(AbortFlag);
+
+ for (uint32_t LocalPathIndex : FilesToCache)
{
- SetFileReadOnly(LocalFilePath, false);
- std::filesystem::remove(LocalFilePath);
+ if (AbortFlag)
+ {
+ break;
+ }
+ Work.ScheduleWork(
+ WritePool,
+ [&, LocalPathIndex](std::atomic<bool>&) {
+ ZEN_TRACE_CPU("UpdateFolder_AsyncCopyToCache");
+ if (!AbortFlag)
+ {
+ const IoHash& RawHash = LocalContent.RawHashes[LocalPathIndex];
+ const std::filesystem::path& LocalPath = LocalContent.Paths[LocalPathIndex];
+ const std::filesystem::path CacheFilePath = GetFinalChunkedSequenceFileName(CacheFolderPath, RawHash);
+ ZEN_ASSERT_SLOW(!IsFile(CacheFilePath));
+ const std::filesystem::path LocalFilePath = (Path / LocalPath).make_preferred();
+ RenameFileWithRetry(LocalFilePath, CacheFilePath);
+ CachedCount++;
+ CachedByteCount += LocalContent.RawSizes[LocalPathIndex];
+ }
+ },
+ Work.DefaultErrorFunction());
}
+
+ {
+ ZEN_TRACE_CPU("CacheLocal_Wait");
+
+ Work.Wait(UsePlainProgress ? 5000 : 200, [&](bool IsAborted, std::ptrdiff_t PendingWork) {
+ ZEN_UNUSED(IsAborted, PendingWork);
+ const uint64_t WorkTotal = FilesToCache.size();
+ const uint64_t WorkComplete = CachedCount.load();
+ std::string Details = fmt::format("{}/{} ({}) files", WorkComplete, WorkTotal, NiceBytes(CachedByteCount));
+ CacheLocalProgressBar.UpdateState({.Task = "Caching local ",
+ .Details = Details,
+ .TotalCount = gsl::narrow<uint64_t>(WorkTotal),
+ .RemainingCount = gsl::narrow<uint64_t>(WorkTotal - WorkComplete)},
+ false);
+ });
+ }
+
+ if (AbortFlag)
+ {
+ return;
+ }
+
+ CacheLocalProgressBar.Finish();
+
+ ZEN_DEBUG(
+ "Local state prep: Match: {}, PathMismatch: {}, HashMismatch: {}, Cached: {} ({}), Skipped: {}, "
+ "Delete: {}",
+ MatchCount,
+ PathMismatchCount,
+ HashMismatchCount,
+ CachedCount.load(),
+ NiceBytes(CachedByteCount.load()),
+ SkippedCount,
+ DeleteCount);
+ }
+ }
+
+ if (WipeTargetFolder)
+ {
+ ZEN_TRACE_CPU("UpdateFolder_WipeTarget");
+ Stopwatch Timer;
+
+ // Clean target folder
+ if (!CleanDirectory(Path, DefaultExcludeFolders))
+ {
+ ZEN_WARN("Some files in {} could not be removed", Path);
}
RebuildFolderStateStats.CleanFolderElapsedWallTimeUs = Timer.GetElapsedTimeUs();
}
+ if (AbortFlag)
+ {
+ return;
+ }
+
{
ZEN_TRACE_CPU("UpdateFolder_FinalizeTree");
Stopwatch Timer;
- WorkerThreadPool& WritePool = GetMediumWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); //
+ WorkerThreadPool& WritePool = GetIOWorkerPool();
ProgressBar RebuildProgressBar(UsePlainProgress);
ParallellWork Work(AbortFlag);
@@ -5991,16 +6812,52 @@ namespace {
OutLocalFolderState.Attributes.resize(RemoteContent.Paths.size());
OutLocalFolderState.ModificationTicks.resize(RemoteContent.Paths.size());
+ std::atomic<uint64_t> DeletedCount = 0;
+
+ for (uint32_t LocalPathIndex : RemoveLocalPathIndexes)
+ {
+ if (AbortFlag)
+ {
+ break;
+ }
+ Work.ScheduleWork(
+ WritePool,
+ [&, LocalPathIndex](std::atomic<bool>&) {
+ if (!AbortFlag)
+ {
+ const std::filesystem::path LocalFilePath = (Path / LocalContent.Paths[LocalPathIndex]).make_preferred();
+ SetFileReadOnlyWithRetry(LocalFilePath, false);
+ RemoveFileWithRetry(LocalFilePath);
+ DeletedCount++;
+ }
+ },
+ Work.DefaultErrorFunction());
+ }
+
std::atomic<uint64_t> TargetsComplete = 0;
- std::vector<std::pair<IoHash, uint32_t>> Targets;
+ struct FinalizeTarget
+ {
+ IoHash RawHash;
+ uint32_t RemotePathIndex;
+ };
+
+ std::vector<FinalizeTarget> Targets;
Targets.reserve(RemoteContent.Paths.size());
for (uint32_t RemotePathIndex = 0; RemotePathIndex < RemoteContent.Paths.size(); RemotePathIndex++)
{
- Targets.push_back(std::make_pair(RemoteContent.RawHashes[RemotePathIndex], RemotePathIndex));
+ Targets.push_back(FinalizeTarget{.RawHash = RemoteContent.RawHashes[RemotePathIndex], .RemotePathIndex = RemotePathIndex});
}
- std::sort(Targets.begin(), Targets.end(), [](const std::pair<IoHash, uint32_t>& Lhs, const std::pair<IoHash, uint32_t>& Rhs) {
- return Lhs.first < Rhs.first;
+ std::sort(Targets.begin(), Targets.end(), [](const FinalizeTarget& Lhs, const FinalizeTarget& Rhs) {
+ if (Lhs.RawHash < Rhs.RawHash)
+ {
+ return true;
+ }
+ else if (Lhs.RawHash > Rhs.RawHash)
+ {
+ return false;
+ }
+ return Lhs.RemotePathIndex < Rhs.RemotePathIndex;
});
size_t TargetOffset = 0;
@@ -6011,93 +6868,168 @@ namespace {
break;
}
- size_t TargetCount = 1;
- const IoHash& RawHash = Targets[TargetOffset].first;
- while (Targets[TargetOffset + TargetCount].first == RawHash)
+ size_t TargetCount = 1;
+ while (Targets[TargetOffset + TargetCount].RawHash == Targets[TargetOffset].RawHash)
{
TargetCount++;
}
Work.ScheduleWork(
- WritePool, // GetSyncWorkerPool(),//
+ WritePool,
[&, BaseTargetOffset = TargetOffset, TargetCount](std::atomic<bool>&) {
if (!AbortFlag)
{
ZEN_TRACE_CPU("FinalizeTree_Work");
- size_t TargetOffset = BaseTargetOffset;
- const IoHash& RawHash = Targets[TargetOffset].first;
- const uint32_t FirstTargetPathIndex = Targets[TargetOffset].second;
- const std::filesystem::path& FirstTargetPath = RemoteContent.Paths[FirstTargetPathIndex];
- OutLocalFolderState.Paths[FirstTargetPathIndex] = FirstTargetPath;
- OutLocalFolderState.RawSizes[FirstTargetPathIndex] = RemoteContent.RawSizes[FirstTargetPathIndex];
- const std::filesystem::path FirstTargetFilePath = (Path / FirstTargetPath).make_preferred();
+ size_t TargetOffset = BaseTargetOffset;
+ const IoHash& RawHash = Targets[TargetOffset].RawHash;
+
if (RawHash == IoHash::Zero)
{
- if (std::filesystem::exists(FirstTargetFilePath))
+ ZEN_TRACE_CPU("ZeroSize");
+ while (TargetOffset < (BaseTargetOffset + TargetCount))
{
- SetFileReadOnly(FirstTargetFilePath, false);
- }
- CreateDirectories(FirstTargetFilePath.parent_path());
- {
- BasicFile OutputFile;
- OutputFile.Open(FirstTargetFilePath, BasicFile::Mode::kTruncate);
+ const uint32_t RemotePathIndex = Targets[TargetOffset].RemotePathIndex;
+ ZEN_ASSERT(Targets[TargetOffset].RawHash == RawHash);
+ const std::filesystem::path& TargetPath = RemoteContent.Paths[RemotePathIndex];
+ std::filesystem::path TargetFilePath = (Path / TargetPath).make_preferred();
+ if (!RemotePathIndexToLocalPathIndex[RemotePathIndex])
+ {
+ if (IsFile(TargetFilePath))
+ {
+ SetFileReadOnlyWithRetry(TargetFilePath, false);
+ }
+ else
+ {
+ CreateDirectories(TargetFilePath.parent_path());
+ }
+ BasicFile OutputFile;
+ OutputFile.Open(TargetFilePath, BasicFile::Mode::kTruncate);
+ }
+ OutLocalFolderState.Paths[RemotePathIndex] = TargetPath;
+ OutLocalFolderState.RawSizes[RemotePathIndex] = RemoteContent.RawSizes[RemotePathIndex];
+
+ OutLocalFolderState.Attributes[RemotePathIndex] =
+ RemoteContent.Attributes.empty()
+ ? GetNativeFileAttributes(TargetFilePath)
+ : SetNativeFileAttributes(TargetFilePath,
+ RemoteContent.Platform,
+ RemoteContent.Attributes[RemotePathIndex]);
+ OutLocalFolderState.ModificationTicks[RemotePathIndex] = GetModificationTickFromPath(TargetFilePath);
+
+ TargetOffset++;
+ TargetsComplete++;
}
}
else
{
- ZEN_TRACE_CPU("FinalizeTree_MoveIntoPlace");
-
- const std::filesystem::path CacheFilePath = GetFinalChunkedSequenceFileName(CacheFolderPath, RawHash);
- ZEN_ASSERT_SLOW(std::filesystem::exists(CacheFilePath));
- CreateDirectories(FirstTargetFilePath.parent_path());
- if (std::filesystem::exists(FirstTargetFilePath))
+ ZEN_TRACE_CPU("Files");
+ ZEN_ASSERT(RemoteLookup.RawHashToSequenceIndex.contains(RawHash));
+ const uint32_t FirstRemotePathIndex = Targets[TargetOffset].RemotePathIndex;
+ const std::filesystem::path& FirstTargetPath = RemoteContent.Paths[FirstRemotePathIndex];
+ std::filesystem::path FirstTargetFilePath = (Path / FirstTargetPath).make_preferred();
+
+ if (auto InPlaceIt = RemotePathIndexToLocalPathIndex.find(FirstRemotePathIndex);
+ InPlaceIt != RemotePathIndexToLocalPathIndex.end())
{
- SetFileReadOnly(FirstTargetFilePath, false);
+ ZEN_ASSERT_SLOW(IsFile(FirstTargetFilePath));
}
- std::filesystem::rename(CacheFilePath, FirstTargetFilePath);
- RebuildFolderStateStats.FinalizeTreeFilesMovedCount++;
- }
+ else
+ {
+ if (IsFile(FirstTargetFilePath))
+ {
+ SetFileReadOnlyWithRetry(FirstTargetFilePath, false);
+ }
+ else
+ {
+ CreateDirectories(FirstTargetFilePath.parent_path());
+ }
- OutLocalFolderState.Attributes[FirstTargetPathIndex] =
- RemoteContent.Attributes.empty() ? GetNativeFileAttributes(FirstTargetFilePath)
- : SetNativeFileAttributes(FirstTargetFilePath,
- RemoteContent.Platform,
- RemoteContent.Attributes[FirstTargetPathIndex]);
- OutLocalFolderState.ModificationTicks[FirstTargetPathIndex] = GetModificationTickFromPath(FirstTargetFilePath);
+ if (auto InplaceIt = SequenceHashToLocalPathIndex.find(RawHash);
+ InplaceIt != SequenceHashToLocalPathIndex.end())
+ {
+ ZEN_TRACE_CPU("Copy");
+ const uint32_t LocalPathIndex = InplaceIt->second;
+ const std::filesystem::path& SourcePath = LocalContent.Paths[LocalPathIndex];
+ std::filesystem::path SourceFilePath = (Path / SourcePath).make_preferred();
+ ZEN_ASSERT_SLOW(IsFile(SourceFilePath));
+
+ ZEN_DEBUG("Copying from '{}' -> '{}'", SourceFilePath, FirstTargetFilePath);
+ CopyFile(SourceFilePath, FirstTargetFilePath, {.EnableClone = false});
+ RebuildFolderStateStats.FinalizeTreeFilesCopiedCount++;
+ }
+ else
+ {
+ ZEN_TRACE_CPU("Rename");
+ const std::filesystem::path CacheFilePath =
+ GetFinalChunkedSequenceFileName(CacheFolderPath, RawHash);
+ ZEN_ASSERT_SLOW(IsFile(CacheFilePath));
- TargetOffset++;
- TargetsComplete++;
- while (TargetOffset < (BaseTargetOffset + TargetCount))
- {
- ZEN_TRACE_CPU("FinalizeTree_Copy");
-
- ZEN_ASSERT(Targets[TargetOffset].first == RawHash);
- ZEN_ASSERT_SLOW(std::filesystem::exists(FirstTargetFilePath));
- const uint32_t ExtraTargetPathIndex = Targets[TargetOffset].second;
- const std::filesystem::path& ExtraTargetPath = RemoteContent.Paths[ExtraTargetPathIndex];
- const std::filesystem::path ExtraTargetFilePath = (Path / ExtraTargetPath).make_preferred();
- OutLocalFolderState.Paths[ExtraTargetPathIndex] = ExtraTargetPath;
- OutLocalFolderState.RawSizes[ExtraTargetPathIndex] = RemoteContent.RawSizes[ExtraTargetPathIndex];
- CreateDirectories(ExtraTargetFilePath.parent_path());
- if (std::filesystem::exists(ExtraTargetFilePath))
- {
- SetFileReadOnly(ExtraTargetFilePath, false);
+ RenameFileWithRetry(CacheFilePath, FirstTargetFilePath);
+
+ RebuildFolderStateStats.FinalizeTreeFilesMovedCount++;
+ }
}
- CopyFile(FirstTargetFilePath, ExtraTargetFilePath, {.EnableClone = false});
- RebuildFolderStateStats.FinalizeTreeFilesCopiedCount++;
- OutLocalFolderState.Attributes[ExtraTargetPathIndex] =
+ OutLocalFolderState.Paths[FirstRemotePathIndex] = FirstTargetPath;
+ OutLocalFolderState.RawSizes[FirstRemotePathIndex] = RemoteContent.RawSizes[FirstRemotePathIndex];
+
+ OutLocalFolderState.Attributes[FirstRemotePathIndex] =
RemoteContent.Attributes.empty()
- ? GetNativeFileAttributes(ExtraTargetFilePath)
- : SetNativeFileAttributes(ExtraTargetFilePath,
+ ? GetNativeFileAttributes(FirstTargetFilePath)
+ : SetNativeFileAttributes(FirstTargetFilePath,
RemoteContent.Platform,
- RemoteContent.Attributes[ExtraTargetPathIndex]);
- OutLocalFolderState.ModificationTicks[ExtraTargetPathIndex] =
- GetModificationTickFromPath(ExtraTargetFilePath);
+ RemoteContent.Attributes[FirstRemotePathIndex]);
+ OutLocalFolderState.ModificationTicks[FirstRemotePathIndex] =
+ GetModificationTickFromPath(FirstTargetFilePath);
TargetOffset++;
TargetsComplete++;
+
+ while (TargetOffset < (BaseTargetOffset + TargetCount))
+ {
+ const uint32_t RemotePathIndex = Targets[TargetOffset].RemotePathIndex;
+ ZEN_ASSERT(Targets[TargetOffset].RawHash == RawHash);
+ const std::filesystem::path& TargetPath = RemoteContent.Paths[RemotePathIndex];
+ std::filesystem::path TargetFilePath = (Path / TargetPath).make_preferred();
+
+ if (auto InPlaceIt = RemotePathIndexToLocalPathIndex.find(RemotePathIndex);
+ InPlaceIt != RemotePathIndexToLocalPathIndex.end())
+ {
+ ZEN_ASSERT_SLOW(IsFile(TargetFilePath));
+ }
+ else
+ {
+ ZEN_TRACE_CPU("Copy");
+ if (IsFile(TargetFilePath))
+ {
+ SetFileReadOnlyWithRetry(TargetFilePath, false);
+ }
+ else
+ {
+ CreateDirectories(TargetFilePath.parent_path());
+ }
+
+ ZEN_ASSERT_SLOW(IsFile(FirstTargetFilePath));
+ ZEN_DEBUG("Copying from '{}' -> '{}'", FirstTargetFilePath, TargetFilePath);
+ CopyFile(FirstTargetFilePath, TargetFilePath, {.EnableClone = false});
+ RebuildFolderStateStats.FinalizeTreeFilesCopiedCount++;
+ }
+
+ OutLocalFolderState.Paths[RemotePathIndex] = TargetPath;
+ OutLocalFolderState.RawSizes[RemotePathIndex] = RemoteContent.RawSizes[RemotePathIndex];
+
+ OutLocalFolderState.Attributes[RemotePathIndex] =
+ RemoteContent.Attributes.empty()
+ ? GetNativeFileAttributes(TargetFilePath)
+ : SetNativeFileAttributes(TargetFilePath,
+ RemoteContent.Platform,
+ RemoteContent.Attributes[RemotePathIndex]);
+ OutLocalFolderState.ModificationTicks[RemotePathIndex] = GetModificationTickFromPath(TargetFilePath);
+
+ TargetOffset++;
+ TargetsComplete++;
+ }
}
}
},
@@ -6111,11 +7043,13 @@ namespace {
Work.Wait(UsePlainProgress ? 5000 : 200, [&](bool IsAborted, std::ptrdiff_t PendingWork) {
ZEN_UNUSED(IsAborted, PendingWork);
- std::string Details = fmt::format("{}/{} files", TargetsComplete.load(), Targets.size());
+ const uint64_t WorkTotal = Targets.size() + RemoveLocalPathIndexes.size();
+ const uint64_t WorkComplete = TargetsComplete.load() + DeletedCount.load();
+ std::string Details = fmt::format("{}/{} files", WorkComplete, WorkTotal);
RebuildProgressBar.UpdateState({.Task = "Rebuilding state ",
.Details = Details,
- .TotalCount = gsl::narrow<uint64_t>(Targets.size()),
- .RemainingCount = gsl::narrow<uint64_t>(Targets.size() - TargetsComplete.load())},
+ .TotalCount = gsl::narrow<uint64_t>(WorkTotal),
+ .RemainingCount = gsl::narrow<uint64_t>(WorkTotal - WorkComplete)},
false);
});
}
@@ -6131,6 +7065,107 @@ namespace {
}
}
+ std::string GetCbObjectAsNiceString(CbObjectView Object, std::string_view Prefix, std::string_view Suffix)
+ {
+ ExtendableStringBuilder<512> SB;
+ std::vector<std::pair<std::string, std::string>> NameStringValuePairs;
+ for (CbFieldView Field : Object)
+ {
+ std::string_view Name = Field.GetName();
+ switch (CbValue Accessor = Field.GetValue(); Accessor.GetType())
+ {
+ case CbFieldType::String:
+ NameStringValuePairs.push_back({std::string(Name), std::string(Accessor.AsString())});
+ break;
+ case CbFieldType::IntegerPositive:
+ NameStringValuePairs.push_back({std::string(Name), fmt::format("{}", Accessor.AsIntegerPositive())});
+ break;
+ case CbFieldType::IntegerNegative:
+ NameStringValuePairs.push_back({std::string(Name), fmt::format("{}", Accessor.AsIntegerNegative())});
+ break;
+ case CbFieldType::Float32:
+ {
+ const float Value = Accessor.AsFloat32();
+ if (std::isfinite(Value))
+ {
+ NameStringValuePairs.push_back({std::string(Name), fmt::format("{:.9g}", Value)});
+ }
+ else
+ {
+ NameStringValuePairs.push_back({std::string(Name), "null"});
+ }
+ }
+ break;
+ case CbFieldType::Float64:
+ {
+ const double Value = Accessor.AsFloat64();
+ if (std::isfinite(Value))
+ {
+ NameStringValuePairs.push_back({std::string(Name), fmt::format("{:.17g}", Value)});
+ }
+ else
+ {
+ NameStringValuePairs.push_back({std::string(Name), "null"});
+ }
+ }
+ break;
+ case CbFieldType::BoolFalse:
+ NameStringValuePairs.push_back({std::string(Name), "false"});
+ break;
+ case CbFieldType::BoolTrue:
+ NameStringValuePairs.push_back({std::string(Name), "true"});
+ break;
+ case CbFieldType::Hash:
+ {
+ NameStringValuePairs.push_back({std::string(Name), Accessor.AsHash().ToHexString()});
+ }
+ break;
+ case CbFieldType::Uuid:
+ {
+ StringBuilder<Oid::StringLength + 1> Builder;
+ Accessor.AsUuid().ToString(Builder);
+ NameStringValuePairs.push_back({std::string(Name), Builder.ToString()});
+ }
+ break;
+ case CbFieldType::DateTime:
+ {
+ ExtendableStringBuilder<64> Builder;
+ Builder << DateTime(Accessor.AsDateTimeTicks()).ToIso8601();
+ NameStringValuePairs.push_back({std::string(Name), Builder.ToString()});
+ }
+ break;
+ case CbFieldType::TimeSpan:
+ {
+ ExtendableStringBuilder<64> Builder;
+ const TimeSpan Span(Accessor.AsTimeSpanTicks());
+ if (Span.GetDays() == 0)
+ {
+ Builder << Span.ToString("%h:%m:%s.%n");
+ }
+ else
+ {
+ Builder << Span.ToString("%d.%h:%m:%s.%n");
+ }
+ NameStringValuePairs.push_back({std::string(Name), Builder.ToString()});
+ break;
+ }
+ case CbFieldType::ObjectId:
+ NameStringValuePairs.push_back({std::string(Name), Accessor.AsObjectId().ToString()});
+ break;
+ }
+ }
+ std::string::size_type LongestKey = 0;
+ for (const std::pair<std::string, std::string>& KeyValue : NameStringValuePairs)
+ {
+ LongestKey = Max(KeyValue.first.length(), LongestKey);
+ }
+ for (const std::pair<std::string, std::string>& KeyValue : NameStringValuePairs)
+ {
+ SB.Append(fmt::format("{}{:<{}}: {}{}", Prefix, KeyValue.first, LongestKey, KeyValue.second, Suffix));
+ }
+ return SB.ToString();
+ }
+
std::vector<std::pair<Oid, std::string>> ResolveBuildPartNames(BuildStorage& Storage,
const Oid& BuildId,
const std::vector<Oid>& BuildPartIds,
@@ -6140,17 +7175,13 @@ namespace {
std::vector<std::pair<Oid, std::string>> Result;
{
Stopwatch GetBuildTimer;
-
- std::vector<std::pair<Oid, std::string>> AvailableParts;
-
- CbObject BuildObject = Storage.GetBuild(BuildId);
-
+ CbObject BuildObject = Storage.GetBuild(BuildId);
ZEN_CONSOLE("GetBuild took {}. Name: '{}', Payload size: {}",
NiceTimeSpanMs(GetBuildTimer.GetElapsedTimeMs()),
- BuildObject["BuildName"sv].AsString(),
+ BuildObject["name"sv].AsString(),
NiceBytes(BuildObject.GetSize()));
- ZEN_DEBUG("Build object: {}", BuildObject);
+ ZEN_CONSOLE("{}", GetCbObjectAsNiceString(BuildObject, " "sv, "\n"sv));
CbObjectView PartsObject = BuildObject["parts"sv].AsObjectView();
if (!PartsObject)
@@ -6160,6 +7191,8 @@ namespace {
OutPreferredMultipartChunkSize = BuildObject["chunkSize"sv].AsUInt64(OutPreferredMultipartChunkSize);
+ std::vector<std::pair<Oid, std::string>> AvailableParts;
+
for (CbFieldView PartView : PartsObject)
{
const std::string BuildPartName = std::string(PartView.GetName());
@@ -6221,7 +7254,7 @@ namespace {
return Result;
}
- ChunkedFolderContent GetRemoteContent(BuildStorage& Storage,
+ ChunkedFolderContent GetRemoteContent(StorageInstance& Storage,
const Oid& BuildId,
const std::vector<std::pair<Oid, std::string>>& BuildParts,
std::unique_ptr<ChunkingController>& OutChunkController,
@@ -6234,12 +7267,13 @@ namespace {
Stopwatch GetBuildPartTimer;
const Oid BuildPartId = BuildParts[0].first;
const std::string_view BuildPartName = BuildParts[0].second;
- CbObject BuildPartManifest = Storage.GetBuildPart(BuildId, BuildPartId);
+ CbObject BuildPartManifest = Storage.BuildStorage->GetBuildPart(BuildId, BuildPartId);
ZEN_CONSOLE("GetBuildPart {} ('{}') took {}. Payload size: {}",
BuildPartId,
BuildPartName,
NiceTimeSpanMs(GetBuildPartTimer.GetElapsedTimeMs()),
NiceBytes(BuildPartManifest.GetSize()));
+ ZEN_CONSOLE("{}", GetCbObjectAsNiceString(BuildPartManifest, " "sv, "\n"sv));
{
CbObjectView Chunker = BuildPartManifest["chunker"sv].AsObjectView();
@@ -6248,7 +7282,7 @@ namespace {
OutChunkController = CreateChunkingController(ChunkerName, Parameters);
}
- auto ParseBuildPartManifest = [](BuildStorage& Storage,
+ auto ParseBuildPartManifest = [](StorageInstance& Storage,
const Oid& BuildId,
const Oid& BuildPartId,
CbObject BuildPartManifest,
@@ -6274,12 +7308,103 @@ namespace {
// TODO: GetBlockDescriptions for all BlockRawHashes in one go - check for local block descriptions when we cache them
- Stopwatch GetBlockMetadataTimer;
- OutBlockDescriptions = Storage.GetBlockMetadata(BuildId, BlockRawHashes);
- ZEN_CONSOLE("GetBlockMetadata for {} took {}. Found {} blocks",
- BuildPartId,
- NiceTimeSpanMs(GetBlockMetadataTimer.GetElapsedTimeMs()),
- OutBlockDescriptions.size());
+ {
+ Stopwatch GetBlockMetadataTimer;
+
+ std::vector<ChunkBlockDescription> UnorderedList;
+ tsl::robin_map<IoHash, size_t, IoHash::Hasher> BlockDescriptionLookup;
+ if (Storage.BuildCacheStorage)
+ {
+ std::vector<CbObject> CacheBlockMetadatas = Storage.BuildCacheStorage->GetBlobMetadatas(BuildId, BlockRawHashes);
+ UnorderedList.reserve(CacheBlockMetadatas.size());
+ for (size_t CacheBlockMetadataIndex = 0; CacheBlockMetadataIndex < CacheBlockMetadatas.size();
+ CacheBlockMetadataIndex++)
+ {
+ const CbObject& CacheBlockMetadata = CacheBlockMetadatas[CacheBlockMetadataIndex];
+ ChunkBlockDescription Description = ParseChunkBlockDescription(CacheBlockMetadata);
+ if (Description.BlockHash == IoHash::Zero)
+ {
+ ZEN_WARN("Unexpected/invalid block metadata received from remote cache, skipping block");
+ }
+ else
+ {
+ UnorderedList.emplace_back(std::move(Description));
+ }
+ }
+ for (size_t DescriptionIndex = 0; DescriptionIndex < UnorderedList.size(); DescriptionIndex++)
+ {
+ const ChunkBlockDescription& Description = UnorderedList[DescriptionIndex];
+ BlockDescriptionLookup.insert_or_assign(Description.BlockHash, DescriptionIndex);
+ }
+ }
+
+ if (UnorderedList.size() < BlockRawHashes.size())
+ {
+ std::vector<IoHash> RemainingBlockHashes;
+ RemainingBlockHashes.reserve(BlockRawHashes.size() - UnorderedList.size());
+ for (const IoHash& BlockRawHash : BlockRawHashes)
+ {
+ if (!BlockDescriptionLookup.contains(BlockRawHash))
+ {
+ RemainingBlockHashes.push_back(BlockRawHash);
+ }
+ }
+ CbObject BlockMetadatas = Storage.BuildStorage->GetBlockMetadatas(BuildId, RemainingBlockHashes);
+ std::vector<ChunkBlockDescription> RemainingList;
+ {
+ CbArrayView BlocksArray = BlockMetadatas["blocks"sv].AsArrayView();
+ std::vector<IoHash> FoundBlockHashes;
+ std::vector<CbObject> FoundBlockMetadatas;
+ for (CbFieldView Block : BlocksArray)
+ {
+ ChunkBlockDescription Description = ParseChunkBlockDescription(Block.AsObjectView());
+
+ if (Description.BlockHash == IoHash::Zero)
+ {
+ ZEN_WARN("Unexpected/invalid block metadata received from remote store, skipping block");
+ }
+ else
+ {
+ if (Storage.BuildCacheStorage)
+ {
+ UniqueBuffer MetaBuffer = UniqueBuffer::Alloc(Block.GetSize());
+ Block.CopyTo(MetaBuffer.GetMutableView());
+ CbObject BlockMetadata(MetaBuffer.MoveToShared());
+
+ FoundBlockHashes.push_back(Description.BlockHash);
+ FoundBlockMetadatas.push_back(BlockMetadata);
+ }
+ RemainingList.emplace_back(std::move(Description));
+ }
+ }
+ if (Storage.BuildCacheStorage && !FoundBlockHashes.empty())
+ {
+ Storage.BuildCacheStorage->PutBlobMetadatas(BuildId, FoundBlockHashes, FoundBlockMetadatas);
+ }
+ }
+
+ for (size_t DescriptionIndex = 0; DescriptionIndex < RemainingList.size(); DescriptionIndex++)
+ {
+ const ChunkBlockDescription& Description = RemainingList[DescriptionIndex];
+ BlockDescriptionLookup.insert_or_assign(Description.BlockHash, UnorderedList.size() + DescriptionIndex);
+ }
+ UnorderedList.insert(UnorderedList.end(), RemainingList.begin(), RemainingList.end());
+ }
+
+ OutBlockDescriptions.reserve(BlockDescriptionLookup.size());
+ for (const IoHash& BlockHash : BlockRawHashes)
+ {
+ if (auto It = BlockDescriptionLookup.find(BlockHash); It != BlockDescriptionLookup.end())
+ {
+ OutBlockDescriptions.push_back(std::move(UnorderedList[It->second]));
+ }
+ }
+
+ ZEN_CONSOLE("GetBlockMetadata for {} took {}. Found {} blocks",
+ BuildPartId,
+ NiceTimeSpanMs(GetBlockMetadataTimer.GetElapsedTimeMs()),
+ OutBlockDescriptions.size());
+ }
if (OutBlockDescriptions.size() != BlockRawHashes.size())
{
@@ -6292,7 +7417,8 @@ namespace {
ZEN_CONSOLE("{} Attemping fallback options.", ErrorDescription);
std::vector<ChunkBlockDescription> AugmentedBlockDescriptions;
AugmentedBlockDescriptions.reserve(BlockRawHashes.size());
- std::vector<ChunkBlockDescription> FoundBlocks = Storage.FindBlocks(BuildId);
+ std::vector<ChunkBlockDescription> FoundBlocks =
+ ParseChunkBlockDescriptionList(Storage.BuildStorage->FindBlocks(BuildId, (uint64_t)-1));
for (const IoHash& BlockHash : BlockRawHashes)
{
@@ -6315,7 +7441,7 @@ namespace {
}
else
{
- IoBuffer BlockBuffer = Storage.GetBuildBlob(BuildId, BlockHash);
+ IoBuffer BlockBuffer = Storage.BuildStorage->GetBuildBlob(BuildId, BlockHash);
if (!BlockBuffer)
{
throw std::runtime_error(fmt::format("Block {} could not be found", BlockHash));
@@ -6381,7 +7507,7 @@ namespace {
const Oid& OverlayBuildPartId = BuildParts[PartIndex].first;
const std::string& OverlayBuildPartName = BuildParts[PartIndex].second;
Stopwatch GetOverlayBuildPartTimer;
- CbObject OverlayBuildPartManifest = Storage.GetBuildPart(BuildId, OverlayBuildPartId);
+ CbObject OverlayBuildPartManifest = Storage.BuildStorage->GetBuildPart(BuildId, OverlayBuildPartId);
ZEN_CONSOLE("GetBuildPart {} ('{}') took {}. Payload size: {}",
OverlayBuildPartId,
OverlayBuildPartName,
@@ -6447,64 +7573,21 @@ namespace {
ChunkedFolderContent GetLocalContent(GetFolderContentStatistics& LocalFolderScanStats,
ChunkingStatistics& ChunkingStats,
const std::filesystem::path& Path,
- ChunkingController& ChunkController)
+ const std::filesystem::path& ZenFolderPath,
+ ChunkingController& ChunkController,
+ const ChunkedFolderContent& ReferenceContent,
+ FolderContent& OutLocalFolderContent)
{
+ FolderContent LocalFolderState;
ChunkedFolderContent LocalContent;
- auto IsAcceptedFolder = [ExcludeFolders = DefaultExcludeFolders](const std::string_view& RelativePath) -> bool {
- for (const std::string_view& ExcludeFolder : ExcludeFolders)
- {
- if (RelativePath.starts_with(ExcludeFolder))
- {
- if (RelativePath.length() == ExcludeFolder.length())
- {
- return false;
- }
- else if (RelativePath[ExcludeFolder.length()] == '/')
- {
- return false;
- }
- }
- }
- return true;
- };
-
- auto IsAcceptedFile = [ExcludeExtensions =
- DefaultExcludeExtensions](const std::string_view& RelativePath, uint64_t, uint32_t) -> bool {
- for (const std::string_view& ExcludeExtension : ExcludeExtensions)
- {
- if (RelativePath.ends_with(ExcludeExtension))
- {
- return false;
- }
- }
- return true;
- };
-
- FolderContent CurrentLocalFolderContent = GetFolderContent(
- LocalFolderScanStats,
- Path,
- std::move(IsAcceptedFolder),
- std::move(IsAcceptedFile),
- GetMediumWorkerPool(EWorkloadType::Burst),
- UsePlainProgress ? 5000 : 200,
- [&](bool, std::ptrdiff_t) { ZEN_DEBUG("Found {} files in '{}'...", LocalFolderScanStats.AcceptedFileCount.load(), Path); },
- AbortFlag);
- if (AbortFlag)
- {
- return {};
- }
-
- FolderContent LocalFolderState;
-
- bool ScanContent = true;
- std::vector<uint32_t> PathIndexesOufOfDate;
- if (std::filesystem::is_regular_file(Path / ZenStateFilePath))
+ bool HasLocalState = false;
+ if (IsFile(ZenStateFilePath(ZenFolderPath)))
{
try
{
Stopwatch ReadStateTimer;
- CbObject CurrentStateObject = LoadCompactBinaryObject(Path / ZenStateFilePath).Object;
+ CbObject CurrentStateObject = LoadCompactBinaryObject(ZenStateFilePath(ZenFolderPath)).Object;
if (CurrentStateObject)
{
Oid CurrentBuildId;
@@ -6530,124 +7613,240 @@ namespace {
MergeChunkedFolderContents(SavedPartContents[0],
std::span<const ChunkedFolderContent>(SavedPartContents).subspan(1));
}
+ HasLocalState = true;
+ }
+ }
+ }
+ ZEN_CONSOLE("Read local state in {}", NiceTimeSpanMs(ReadStateTimer.GetElapsedTimeMs()));
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_CONSOLE("Failed reading state file, falling back to scannning. Reason: {}", Ex.what());
+ }
+ }
- if (!LocalFolderState.AreKnownFilesEqual(CurrentLocalFolderContent))
- {
- const size_t LocaStatePathCount = LocalFolderState.Paths.size();
- std::vector<std::filesystem::path> DeletedPaths;
- FolderContent UpdatedContent = GetUpdatedContent(LocalFolderState, CurrentLocalFolderContent, DeletedPaths);
- if (!DeletedPaths.empty())
- {
- LocalContent = DeletePathsFromChunkedContent(LocalContent, DeletedPaths);
- }
+ {
+ const uint32_t LocalPathCount = gsl::narrow<uint32_t>(ReferenceContent.Paths.size());
+ const uint32_t RemotePathCount = gsl::narrow<uint32_t>(LocalFolderState.Paths.size());
- ZEN_CONSOLE("Updating state, {} local files deleted and {} local files updated out of {}",
- DeletedPaths.size(),
- UpdatedContent.Paths.size(),
- LocaStatePathCount);
- if (UpdatedContent.Paths.size() > 0)
- {
- uint64_t ByteCountToScan = 0;
- for (const uint64_t RawSize : UpdatedContent.RawSizes)
- {
- ByteCountToScan += RawSize;
- }
- ProgressBar ProgressBar(false);
- FilteredRate FilteredBytesHashed;
- FilteredBytesHashed.Start();
- ChunkedFolderContent UpdatedLocalContent = ChunkFolderContent(
- ChunkingStats,
- GetMediumWorkerPool(EWorkloadType::Burst),
- Path,
- UpdatedContent,
- ChunkController,
- UsePlainProgress ? 5000 : 200,
- [&](bool, std::ptrdiff_t) {
- FilteredBytesHashed.Update(ChunkingStats.BytesHashed.load());
- std::string Details = fmt::format("{}/{} ({}/{}, {}B/s) scanned, {} ({}) chunks found",
- ChunkingStats.FilesProcessed.load(),
- UpdatedContent.Paths.size(),
- NiceBytes(ChunkingStats.BytesHashed.load()),
- NiceBytes(ByteCountToScan),
- NiceNum(FilteredBytesHashed.GetCurrent()),
- ChunkingStats.UniqueChunksFound.load(),
- NiceBytes(ChunkingStats.UniqueBytesFound.load()));
- ProgressBar.UpdateState({.Task = "Scanning files ",
- .Details = Details,
- .TotalCount = ByteCountToScan,
- .RemainingCount = ByteCountToScan - ChunkingStats.BytesHashed.load()},
- false);
- },
- AbortFlag);
- if (AbortFlag)
- {
- return {};
- }
- FilteredBytesHashed.Stop();
- ProgressBar.Finish();
- LocalContent = MergeChunkedFolderContents(LocalContent, {{UpdatedLocalContent}});
- }
- }
- else
+ std::vector<std::filesystem::path> PathsToCheck;
+ PathsToCheck.reserve(LocalPathCount + RemotePathCount);
+
+ tsl::robin_set<std::string> FileSet;
+ FileSet.reserve(LocalPathCount + RemotePathCount);
+
+ for (const std::filesystem::path& LocalPath : LocalFolderState.Paths)
+ {
+ FileSet.insert(LocalPath.generic_string());
+ PathsToCheck.push_back(LocalPath);
+ }
+
+ for (const std::filesystem::path& RemotePath : ReferenceContent.Paths)
+ {
+ if (FileSet.insert(RemotePath.generic_string()).second)
+ {
+ PathsToCheck.push_back(RemotePath);
+ }
+ }
+
+ const uint32_t PathCount = gsl::narrow<uint32_t>(PathsToCheck.size());
+
+ OutLocalFolderContent.Paths.resize(PathCount);
+ OutLocalFolderContent.RawSizes.resize(PathCount);
+ OutLocalFolderContent.Attributes.resize(PathCount);
+ OutLocalFolderContent.ModificationTicks.resize(PathCount);
+
+ {
+ Stopwatch Timer;
+ auto _ =
+ MakeGuard([&LocalFolderScanStats, &Timer]() { LocalFolderScanStats.ElapsedWallTimeUS = Timer.GetElapsedTimeUs(); });
+
+ ProgressBar ProgressBar(UsePlainProgress);
+
+ ParallellWork Work(AbortFlag);
+ std::atomic<uint64_t> CompletedPathCount = 0;
+ uint32_t PathIndex = 0;
+
+ while (PathIndex < PathCount)
+ {
+ uint32_t PathRangeCount = Min(128u, PathCount - PathIndex);
+ Work.ScheduleWork(
+ GetIOWorkerPool(),
+ [PathIndex,
+ PathRangeCount,
+ &PathsToCheck,
+ &Path,
+ &OutLocalFolderContent,
+ &CompletedPathCount,
+ &LocalFolderScanStats](std::atomic<bool>&) {
+ for (uint32_t PathRangeIndex = PathIndex; PathRangeIndex < PathIndex + PathRangeCount; PathRangeIndex++)
{
- // Remove files from LocalContent no longer in LocalFolderState
- tsl::robin_set<std::string> LocalFolderPaths;
- LocalFolderPaths.reserve(LocalFolderState.Paths.size());
- for (const std::filesystem::path& LocalFolderPath : LocalFolderState.Paths)
- {
- LocalFolderPaths.insert(LocalFolderPath.generic_string());
- }
- std::vector<std::filesystem::path> DeletedPaths;
- for (const std::filesystem::path& LocalContentPath : LocalContent.Paths)
+ const std::filesystem::path& FilePath = PathsToCheck[PathRangeIndex];
+ std::filesystem::path LocalFilePath = (Path / FilePath).make_preferred();
+ if (TryGetFileProperties(LocalFilePath,
+ OutLocalFolderContent.RawSizes[PathRangeIndex],
+ OutLocalFolderContent.ModificationTicks[PathRangeIndex],
+ OutLocalFolderContent.Attributes[PathRangeIndex]))
{
- if (!LocalFolderPaths.contains(LocalContentPath.generic_string()))
- {
- DeletedPaths.push_back(LocalContentPath);
- }
+ OutLocalFolderContent.Paths[PathRangeIndex] = std::move(FilePath);
+ LocalFolderScanStats.FoundFileCount++;
+ LocalFolderScanStats.FoundFileByteCount += OutLocalFolderContent.RawSizes[PathRangeIndex];
+ LocalFolderScanStats.AcceptedFileCount++;
+ LocalFolderScanStats.AcceptedFileByteCount += OutLocalFolderContent.RawSizes[PathRangeIndex];
}
- if (!DeletedPaths.empty())
- {
- LocalContent = DeletePathsFromChunkedContent(LocalContent, DeletedPaths);
- }
-
- ZEN_CONSOLE("Using cached local state");
+ CompletedPathCount++;
}
- ZEN_CONSOLE("Read local state in {}", NiceTimeSpanMs(ReadStateTimer.GetElapsedTimeMs()));
- ScanContent = false;
- }
+ },
+ Work.DefaultErrorFunction());
+ PathIndex += PathRangeCount;
+ }
+ Work.Wait(200, [&](bool, ptrdiff_t) {
+ // FilteredBytesHashed.Update(ChunkingStats.BytesHashed.load());
+ std::string Details = fmt::format("{}/{} checked, {} found",
+ CompletedPathCount.load(),
+ PathCount,
+ LocalFolderScanStats.FoundFileCount.load());
+ ProgressBar.UpdateState({.Task = "Checking files ",
+ .Details = Details,
+ .TotalCount = PathCount,
+ .RemainingCount = PathCount - CompletedPathCount.load()},
+ false);
+ });
+ ProgressBar.Finish();
+ }
+
+ uint32_t WritePathIndex = 0;
+ for (uint32_t ReadPathIndex = 0; ReadPathIndex < PathCount; ReadPathIndex++)
+ {
+ if (!OutLocalFolderContent.Paths[ReadPathIndex].empty())
+ {
+ if (WritePathIndex < ReadPathIndex)
+ {
+ OutLocalFolderContent.Paths[WritePathIndex] = std::move(OutLocalFolderContent.Paths[ReadPathIndex]);
+ OutLocalFolderContent.RawSizes[WritePathIndex] = OutLocalFolderContent.RawSizes[ReadPathIndex];
+ OutLocalFolderContent.Attributes[WritePathIndex] = OutLocalFolderContent.Attributes[ReadPathIndex];
+ OutLocalFolderContent.ModificationTicks[WritePathIndex] = OutLocalFolderContent.ModificationTicks[ReadPathIndex];
}
+ WritePathIndex++;
}
}
- catch (const std::exception& Ex)
+
+ OutLocalFolderContent.Paths.resize(WritePathIndex);
+ OutLocalFolderContent.RawSizes.resize(WritePathIndex);
+ OutLocalFolderContent.Attributes.resize(WritePathIndex);
+ OutLocalFolderContent.ModificationTicks.resize(WritePathIndex);
+ }
+
+ bool ScanContent = true;
+ std::vector<uint32_t> PathIndexesOufOfDate;
+ if (HasLocalState)
+ {
+ if (!LocalFolderState.AreKnownFilesEqual(OutLocalFolderContent))
{
- ZEN_CONSOLE("Failed reading state file, falling back to scannning. Reason: {}", Ex.what());
+ const size_t LocaStatePathCount = LocalFolderState.Paths.size();
+ std::vector<std::filesystem::path> DeletedPaths;
+ FolderContent UpdatedContent = GetUpdatedContent(LocalFolderState, OutLocalFolderContent, DeletedPaths);
+ if (!DeletedPaths.empty())
+ {
+ LocalContent = DeletePathsFromChunkedContent(LocalContent, DeletedPaths);
+ }
+
+ ZEN_CONSOLE("Updating state, {} local files deleted and {} local files updated out of {}",
+ DeletedPaths.size(),
+ UpdatedContent.Paths.size(),
+ LocaStatePathCount);
+ if (UpdatedContent.Paths.size() > 0)
+ {
+ uint64_t ByteCountToScan = 0;
+ for (const uint64_t RawSize : UpdatedContent.RawSizes)
+ {
+ ByteCountToScan += RawSize;
+ }
+ ProgressBar ProgressBar(false);
+ FilteredRate FilteredBytesHashed;
+ FilteredBytesHashed.Start();
+ ChunkedFolderContent UpdatedLocalContent = ChunkFolderContent(
+ ChunkingStats,
+ GetIOWorkerPool(),
+ Path,
+ UpdatedContent,
+ ChunkController,
+ UsePlainProgress ? 5000 : 200,
+ [&](bool, std::ptrdiff_t) {
+ FilteredBytesHashed.Update(ChunkingStats.BytesHashed.load());
+ std::string Details = fmt::format("{}/{} ({}/{}, {}B/s) scanned, {} ({}) chunks found",
+ ChunkingStats.FilesProcessed.load(),
+ UpdatedContent.Paths.size(),
+ NiceBytes(ChunkingStats.BytesHashed.load()),
+ NiceBytes(ByteCountToScan),
+ NiceNum(FilteredBytesHashed.GetCurrent()),
+ ChunkingStats.UniqueChunksFound.load(),
+ NiceBytes(ChunkingStats.UniqueBytesFound.load()));
+ ProgressBar.UpdateState({.Task = "Scanning files ",
+ .Details = Details,
+ .TotalCount = ByteCountToScan,
+ .RemainingCount = ByteCountToScan - ChunkingStats.BytesHashed.load()},
+ false);
+ },
+ AbortFlag);
+ if (AbortFlag)
+ {
+ return {};
+ }
+ FilteredBytesHashed.Stop();
+ ProgressBar.Finish();
+ LocalContent = MergeChunkedFolderContents(LocalContent, {{UpdatedLocalContent}});
+ }
+ }
+ else
+ {
+ // Remove files from LocalContent no longer in LocalFolderState
+ tsl::robin_set<std::string> LocalFolderPaths;
+ LocalFolderPaths.reserve(LocalFolderState.Paths.size());
+ for (const std::filesystem::path& LocalFolderPath : LocalFolderState.Paths)
+ {
+ LocalFolderPaths.insert(LocalFolderPath.generic_string());
+ }
+ std::vector<std::filesystem::path> DeletedPaths;
+ for (const std::filesystem::path& LocalContentPath : LocalContent.Paths)
+ {
+ if (!LocalFolderPaths.contains(LocalContentPath.generic_string()))
+ {
+ DeletedPaths.push_back(LocalContentPath);
+ }
+ }
+ if (!DeletedPaths.empty())
+ {
+ LocalContent = DeletePathsFromChunkedContent(LocalContent, DeletedPaths);
+ }
}
+ ScanContent = false;
}
if (ScanContent)
{
uint64_t ByteCountToScan = 0;
- for (const uint64_t RawSize : CurrentLocalFolderContent.RawSizes)
+ for (const uint64_t RawSize : OutLocalFolderContent.RawSizes)
{
ByteCountToScan += RawSize;
}
ProgressBar ProgressBar(false);
FilteredRate FilteredBytesHashed;
FilteredBytesHashed.Start();
- ChunkedFolderContent UpdatedLocalContent = ChunkFolderContent(
+ LocalContent = ChunkFolderContent(
ChunkingStats,
- GetMediumWorkerPool(EWorkloadType::Burst),
+ GetIOWorkerPool(),
Path,
- CurrentLocalFolderContent,
+ OutLocalFolderContent,
ChunkController,
UsePlainProgress ? 5000 : 200,
[&](bool, std::ptrdiff_t) {
FilteredBytesHashed.Update(ChunkingStats.BytesHashed.load());
std::string Details = fmt::format("{}/{} ({}/{}, {}B/s) scanned, {} ({}) chunks found",
ChunkingStats.FilesProcessed.load(),
- CurrentLocalFolderContent.Paths.size(),
+ OutLocalFolderContent.Paths.size(),
NiceBytes(ChunkingStats.BytesHashed.load()),
- ByteCountToScan,
+ NiceBytes(ByteCountToScan),
NiceNum(FilteredBytesHashed.GetCurrent()),
ChunkingStats.UniqueChunksFound.load(),
NiceBytes(ChunkingStats.UniqueBytesFound.load()));
@@ -6670,31 +7869,35 @@ namespace {
return LocalContent;
}
- void DownloadFolder(BuildStorage& Storage,
+ void DownloadFolder(StorageInstance& Storage,
const Oid& BuildId,
const std::vector<Oid>& BuildPartIds,
std::span<const std::string> BuildPartNames,
const std::filesystem::path& Path,
+ const std::filesystem::path& ZenFolderPath,
bool AllowMultiparts,
bool AllowPartialBlockRequests,
bool WipeTargetFolder,
- bool PostDownloadVerify)
+ bool PostDownloadVerify,
+ bool PrimeCacheOnly)
{
ZEN_TRACE_CPU("DownloadFolder");
+ ZEN_ASSERT((!PrimeCacheOnly) || (PrimeCacheOnly && (!AllowPartialBlockRequests)));
+
Stopwatch DownloadTimer;
- const std::filesystem::path ZenTempFolder = Path / ZenTempFolderName;
+ const std::filesystem::path ZenTempFolder = ZenTempFolderPath(ZenFolderPath);
CreateDirectories(ZenTempFolder);
- CreateDirectories(Path / ZenTempBlockFolderName);
- CreateDirectories(Path / ZenTempCacheFolderName);
- CreateDirectories(Path / ZenTempDownloadFolderName);
+ CreateDirectories(ZenTempBlockFolderPath(ZenFolderPath));
+ CreateDirectories(ZenTempCacheFolderPath(ZenFolderPath));
+ CreateDirectories(ZenTempDownloadFolderPath(ZenFolderPath));
std::uint64_t PreferredMultipartChunkSize = 32u * 1024u * 1024u;
std::vector<std::pair<Oid, std::string>> AllBuildParts =
- ResolveBuildPartNames(Storage, BuildId, BuildPartIds, BuildPartNames, PreferredMultipartChunkSize);
+ ResolveBuildPartNames(*Storage.BuildStorage, BuildId, BuildPartIds, BuildPartNames, PreferredMultipartChunkSize);
std::vector<ChunkedFolderContent> PartContents;
@@ -6706,26 +7909,36 @@ namespace {
ChunkedFolderContent RemoteContent =
GetRemoteContent(Storage, BuildId, AllBuildParts, ChunkController, PartContents, BlockDescriptions, LooseChunkHashes);
- const std::uint64_t LargeAttachmentSize = AllowMultiparts ? PreferredMultipartChunkSize * 4u : (std::uint64_t)-1;
- if (!ChunkController)
- {
- ZEN_CONSOLE("Warning: Unspecified chunking algorith, using default");
- ChunkController = CreateBasicChunkingController();
- }
-
+ const std::uint64_t LargeAttachmentSize = AllowMultiparts ? PreferredMultipartChunkSize * 4u : (std::uint64_t)-1;
GetFolderContentStatistics LocalFolderScanStats;
ChunkingStatistics ChunkingStats;
ChunkedFolderContent LocalContent;
- if (std::filesystem::is_directory(Path))
+ FolderContent LocalFolderContent;
+ if (!PrimeCacheOnly)
{
- if (!WipeTargetFolder)
+ if (IsDir(Path))
{
- LocalContent = GetLocalContent(LocalFolderScanStats, ChunkingStats, Path, *ChunkController);
+ if (!WipeTargetFolder)
+ {
+ if (!ChunkController)
+ {
+ ZEN_CONSOLE("Warning: Unspecified chunking algorith, using default");
+ ChunkController = CreateBasicChunkingController();
+ }
+
+ LocalContent = GetLocalContent(LocalFolderScanStats,
+ ChunkingStats,
+ Path,
+ ZenFolderPath,
+ *ChunkController,
+ RemoteContent,
+ LocalFolderContent);
+ }
+ }
+ else
+ {
+ CreateDirectories(Path);
}
- }
- else
- {
- CreateDirectories(Path);
}
if (AbortFlag)
{
@@ -6778,6 +7991,12 @@ namespace {
{
ZEN_CONSOLE("Local state is identical to build to download. All done. Completed in {}.",
NiceTimeSpanMs(DownloadTimer.GetElapsedTimeMs()));
+
+ Stopwatch WriteStateTimer;
+ CbObject StateObject = CreateStateObject(BuildId, AllBuildParts, PartContents, LocalFolderContent);
+ CreateDirectories(ZenStateFilePath(ZenFolderPath).parent_path());
+ TemporaryFile::SafeWriteFile(ZenStateFilePath(ZenFolderPath), StateObject.GetView());
+ ZEN_CONSOLE("Wrote local state in {}", NiceTimeSpanMs(WriteStateTimer.GetElapsedTimeMs()));
}
else
{
@@ -6786,7 +8005,7 @@ namespace {
{
BuildPartString.Append(fmt::format(" {} ({})", BuildPart.second, BuildPart.first));
}
- ZEN_CONSOLE("Downloading build {}, parts:{}", BuildId, BuildPartString.ToView());
+ ZEN_CONSOLE("Downloading build {}, parts:{} to '{}'", BuildId, BuildPartString.ToView(), Path);
FolderContent LocalFolderState;
DiskStatistics DiskStats;
@@ -6799,6 +8018,7 @@ namespace {
UpdateFolder(Storage,
BuildId,
Path,
+ ZenFolderPath,
LargeAttachmentSize,
PreferredMultipartChunkSize,
LocalContent,
@@ -6807,6 +8027,7 @@ namespace {
LooseChunkHashes,
AllowPartialBlockRequests,
WipeTargetFolder,
+ PrimeCacheOnly,
LocalFolderState,
DiskStats,
CacheMappingStats,
@@ -6816,20 +8037,23 @@ namespace {
if (!AbortFlag)
{
- VerifyFolder(RemoteContent, Path, PostDownloadVerify, VerifyFolderStats);
+ if (!PrimeCacheOnly)
+ {
+ VerifyFolder(RemoteContent, Path, PostDownloadVerify, VerifyFolderStats);
- Stopwatch WriteStateTimer;
- CbObject StateObject = CreateStateObject(BuildId, AllBuildParts, PartContents, LocalFolderState);
+ Stopwatch WriteStateTimer;
+ CbObject StateObject = CreateStateObject(BuildId, AllBuildParts, PartContents, LocalFolderState);
- CreateDirectories((Path / ZenStateFilePath).parent_path());
- TemporaryFile::SafeWriteFile(Path / ZenStateFilePath, StateObject.GetView());
- ZEN_CONSOLE("Wrote local state in {}", NiceTimeSpanMs(WriteStateTimer.GetElapsedTimeMs()));
+ CreateDirectories(ZenStateFilePath(ZenFolderPath).parent_path());
+ TemporaryFile::SafeWriteFile(ZenStateFilePath(ZenFolderPath), StateObject.GetView());
+ ZEN_CONSOLE("Wrote local state in {}", NiceTimeSpanMs(WriteStateTimer.GetElapsedTimeMs()));
#if 0
- ExtendableStringBuilder<1024> SB;
- CompactBinaryToJson(StateObject, SB);
- WriteFile(Path / ZenStateFileJsonPath, IoBuffer(IoBuffer::Wrap, SB.Data(), SB.Size()));
+ ExtendableStringBuilder<1024> SB;
+ CompactBinaryToJson(StateObject, SB);
+ WriteFile(ZenStateFileJsonPath(ZenFolderPath), IoBuffer(IoBuffer::Wrap, SB.Data(), SB.Size()));
#endif // 0
+ }
const uint64_t DownloadCount = DownloadStats.DownloadedChunkCount.load() + DownloadStats.DownloadedBlockCount.load() +
DownloadStats.DownloadedPartialBlockCount.load();
const uint64_t DownloadByteCount = DownloadStats.DownloadedChunkByteCount.load() +
@@ -6863,9 +8087,26 @@ namespace {
NiceTimeSpanMs(VerifyFolderStats.VerifyElapsedWallTimeUs / 1000));
}
}
+ if (PrimeCacheOnly)
+ {
+ if (Storage.BuildCacheStorage)
+ {
+ Storage.BuildCacheStorage->Flush(5000, [](intptr_t Remaining) {
+ if (Remaining == 0)
+ {
+ ZEN_CONSOLE("Build cache upload complete");
+ }
+ else
+ {
+ ZEN_CONSOLE("Waiting for build cache to complete uploading. {} blobs remaining", Remaining);
+ }
+ return !AbortFlag;
+ });
+ }
+ }
if (CleanDirectory(ZenTempFolder, {}))
{
- std::filesystem::remove(ZenTempFolder);
+ RemoveDirWithRetry(ZenTempFolder);
}
}
@@ -7109,7 +8350,14 @@ BuildsCommand::BuildsCommand()
auto AddCloudOptions = [this, &AddAuthOptions](cxxopts::Options& Ops) {
AddAuthOptions(Ops);
- Ops.add_option("cloud build", "", "url", "Cloud Builds URL", cxxopts::value(m_BuildsUrl), "<url>");
+ Ops.add_option("cloud build", "", "override-host", "Cloud Builds URL", cxxopts::value(m_OverrideHost), "<override-host>");
+ Ops.add_option("cloud build",
+ "",
+ "url",
+ "Cloud Builds host url (legacy - use --override-host)",
+ cxxopts::value(m_OverrideHost),
+ "<url>");
+ Ops.add_option("cloud build", "", "host", "Cloud Builds host", cxxopts::value(m_Host), "<host>");
Ops.add_option("cloud build",
"",
"assume-http2",
@@ -7131,11 +8379,32 @@ BuildsCommand::BuildsCommand()
"<jsonmetadata>");
};
+ auto AddCacheOptions = [this](cxxopts::Options& Ops) {
+ Ops.add_option("cache", "", "zen-cache-host", "Host ip and port for zen builds cache", cxxopts::value(m_ZenCacheHost), "<zenhost>");
+ };
+
auto AddOutputOptions = [this](cxxopts::Options& Ops) {
Ops.add_option("output", "", "plain-progress", "Show progress using plain output", cxxopts::value(m_PlainProgress), "<progress>");
Ops.add_option("output", "", "verbose", "Enable verbose console output", cxxopts::value(m_Verbose), "<verbose>");
};
+ auto AddWorkerOptions = [this](cxxopts::Options& Ops) {
+ Ops.add_option("",
+ "",
+ "boost-workers",
+ "Increase the number of worker threads - may cause computer to less responsive",
+ cxxopts::value(m_BoostWorkerThreads),
+ "<boostworkers>");
+ };
+
+ auto AddZenFolderOptions = [this](cxxopts::Options& Ops) {
+ Ops.add_option("",
+ "",
+ "zen-folder-path",
+ fmt::format("Path to zen state and temp folders. Defaults to [--local-path/]{}", ZenFolderName),
+ cxxopts::value(m_ZenFolderPath),
+ "<boostworkers>");
+ };
m_Options.add_option("",
"v",
"verb",
@@ -7149,6 +8418,7 @@ BuildsCommand::BuildsCommand()
AddCloudOptions(m_ListOptions);
AddFileOptions(m_ListOptions);
AddOutputOptions(m_ListOptions);
+ AddZenFolderOptions(m_ListOptions);
m_ListOptions.add_options()("h,help", "Print help");
m_ListOptions.add_option("",
"",
@@ -7169,6 +8439,9 @@ BuildsCommand::BuildsCommand()
AddCloudOptions(m_UploadOptions);
AddFileOptions(m_UploadOptions);
AddOutputOptions(m_UploadOptions);
+ AddCacheOptions(m_UploadOptions);
+ AddWorkerOptions(m_UploadOptions);
+ AddZenFolderOptions(m_UploadOptions);
m_UploadOptions.add_options()("h,help", "Print help");
m_UploadOptions.add_option("", "l", "local-path", "Root file system folder for build", cxxopts::value(m_Path), "<local-path>");
m_UploadOptions.add_option("",
@@ -7224,6 +8497,12 @@ BuildsCommand::BuildsCommand()
"<manifestpath>");
m_UploadOptions
.add_option("", "", "verify", "Enable post upload verify of all uploaded data", cxxopts::value(m_PostUploadVerify), "<verify>");
+ m_UploadOptions.add_option("",
+ "",
+ "find-max-block-count",
+ "The maximum number of blocks we search for in the build context",
+ cxxopts::value(m_FindBlockMaxCount),
+ "<maxblockcount>");
m_UploadOptions.parse_positional({"local-path", "build-id"});
m_UploadOptions.positional_help("local-path build-id");
@@ -7232,6 +8511,16 @@ BuildsCommand::BuildsCommand()
AddCloudOptions(m_DownloadOptions);
AddFileOptions(m_DownloadOptions);
AddOutputOptions(m_DownloadOptions);
+ AddCacheOptions(m_DownloadOptions);
+ AddZenFolderOptions(m_DownloadOptions);
+ AddWorkerOptions(m_DownloadOptions);
+ m_DownloadOptions.add_option("cache",
+ "",
+ "cache-prime-only",
+ "Only download blobs missing in cache and upload to cache",
+ cxxopts::value(m_PrimeCacheOnly),
+ "<cacheprimeonly>");
+
m_DownloadOptions.add_options()("h,help", "Print help");
m_DownloadOptions.add_option("", "l", "local-path", "Root file system folder for build", cxxopts::value(m_Path), "<local-path>");
m_DownloadOptions.add_option("", "", "build-id", "Build Id", cxxopts::value(m_BuildId), "<id>");
@@ -7263,12 +8552,14 @@ BuildsCommand::BuildsCommand()
"Allow request for partial chunk blocks. Defaults to true.",
cxxopts::value(m_AllowPartialBlockRequests),
"<allowpartialblockrequests>");
+
m_DownloadOptions
.add_option("", "", "verify", "Enable post download verify of all tracked files", cxxopts::value(m_PostDownloadVerify), "<verify>");
m_DownloadOptions.parse_positional({"local-path", "build-id", "build-part-name"});
m_DownloadOptions.positional_help("local-path build-id build-part-name");
AddOutputOptions(m_DiffOptions);
+ AddWorkerOptions(m_DiffOptions);
m_DiffOptions.add_options()("h,help", "Print help");
m_DiffOptions.add_option("", "l", "local-path", "Root file system folder used as base", cxxopts::value(m_Path), "<local-path>");
m_DiffOptions.add_option("", "c", "compare-path", "Root file system folder used as diff", cxxopts::value(m_DiffPath), "<diff-path>");
@@ -7284,6 +8575,8 @@ BuildsCommand::BuildsCommand()
AddCloudOptions(m_TestOptions);
AddFileOptions(m_TestOptions);
AddOutputOptions(m_TestOptions);
+ AddCacheOptions(m_TestOptions);
+ AddWorkerOptions(m_TestOptions);
m_TestOptions.add_options()("h,help", "Print help");
m_TestOptions.add_option("", "l", "local-path", "Root file system folder used as base", cxxopts::value(m_Path), "<local-path>");
m_TestOptions.add_option("",
@@ -7304,6 +8597,8 @@ BuildsCommand::BuildsCommand()
AddCloudOptions(m_FetchBlobOptions);
AddFileOptions(m_FetchBlobOptions);
AddOutputOptions(m_FetchBlobOptions);
+ AddCacheOptions(m_FetchBlobOptions);
+ AddZenFolderOptions(m_FetchBlobOptions);
m_FetchBlobOptions.add_option("", "", "build-id", "Build Id", cxxopts::value(m_BuildId), "<id>");
m_FetchBlobOptions
.add_option("", "", "blob-hash", "IoHash in hex form identifying the blob to download", cxxopts::value(m_BlobHash), "<blob-hash>");
@@ -7313,6 +8608,8 @@ BuildsCommand::BuildsCommand()
AddCloudOptions(m_ValidateBuildPartOptions);
AddFileOptions(m_ValidateBuildPartOptions);
AddOutputOptions(m_ValidateBuildPartOptions);
+ AddWorkerOptions(m_ValidateBuildPartOptions);
+ AddZenFolderOptions(m_ValidateBuildPartOptions);
m_ValidateBuildPartOptions.add_option("", "", "build-id", "Build Id", cxxopts::value(m_BuildId), "<id>");
m_ValidateBuildPartOptions.add_option("",
"",
@@ -7333,6 +8630,8 @@ BuildsCommand::BuildsCommand()
AddCloudOptions(m_MultiTestDownloadOptions);
AddFileOptions(m_MultiTestDownloadOptions);
AddOutputOptions(m_MultiTestDownloadOptions);
+ AddCacheOptions(m_MultiTestDownloadOptions);
+ AddWorkerOptions(m_MultiTestDownloadOptions);
m_MultiTestDownloadOptions
.add_option("", "l", "local-path", "Root file system folder used as base", cxxopts::value(m_Path), "<local-path>");
m_MultiTestDownloadOptions.add_option("", "", "build-ids", "Build Ids list separated by ','", cxxopts::value(m_BuildIds), "<ids>");
@@ -7373,7 +8672,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
}
auto ParseStorageOptions = [&]() {
- if (!m_BuildsUrl.empty())
+ if (!m_OverrideHost.empty() || !m_Host.empty())
{
if (!m_StoragePath.empty())
{
@@ -7385,16 +8684,23 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
fmt::format("namespace and bucket options are required for url option\n{}", m_Options.help()));
}
}
+ else if (m_StoragePath.empty())
+ {
+ throw zen::OptionParseException(fmt::format("At least one storage option is required\n{}", m_UploadOptions.help()));
+ }
};
std::unique_ptr<AuthMgr> Auth;
- HttpClientSettings ClientSettings{.AssumeHttp2 = m_AssumeHttp2, .AllowResume = true, .RetryCount = 2};
+ HttpClientSettings ClientSettings{.LogCategory = "httpbuildsclient",
+ .AssumeHttp2 = m_AssumeHttp2,
+ .AllowResume = true,
+ .RetryCount = 2};
auto CreateAuthMgr = [&]() {
if (!Auth)
{
- std::filesystem::path DataRoot = m_SystemRootDir.empty() ? PickDefaultSystemRootDirectory() : StringToPath(m_SystemRootDir);
-
+ std::filesystem::path DataRoot =
+ m_SystemRootDir.empty() ? PickDefaultSystemRootDirectory() : MakeSafeAbsolutePath(m_SystemRootDir);
if (m_EncryptionKey.empty())
{
m_EncryptionKey = "abcdefghijklmnopqrstuvxyz0123456";
@@ -7448,7 +8754,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
}
else if (!m_AccessTokenPath.empty())
{
- std::string ResolvedAccessToken = ReadAccessTokenFromFile(StringToPath(m_AccessTokenPath));
+ std::string ResolvedAccessToken = ReadAccessTokenFromFile(MakeSafeAbsolutePath(m_AccessTokenPath));
if (!ResolvedAccessToken.empty())
{
ClientSettings.AccessTokenProvider = httpclientauth::CreateFromStaticToken(ResolvedAccessToken);
@@ -7474,7 +8780,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
ClientSettings.AccessTokenProvider = httpclientauth::CreateFromDefaultOpenIdProvider(*Auth);
}
- if (!m_BuildsUrl.empty() && !ClientSettings.AccessTokenProvider)
+ if (!ClientSettings.AccessTokenProvider)
{
ZEN_CONSOLE("Warning: No auth provider given, attempting operation without credentials.");
}
@@ -7486,15 +8792,243 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
};
ParseOutputOptions();
- try
- {
- if (SubOption == &m_ListOptions)
+ auto CreateBuildStorage = [&](BuildStorage::Statistics& StorageStats,
+ BuildStorageCache::Statistics& StorageCacheStats,
+ const std::filesystem::path& TempPath) -> StorageInstance {
+ ParseStorageOptions();
+
+ StorageInstance Result;
+
+ std::string BuildStorageName = ZEN_CLOUD_STORAGE;
+ std::string BuildCacheName;
+ bool CacheAssumeHttp2 = false;
+ std::string StorageDescription;
+ std::string CacheDescription;
+
+ if (!m_Host.empty() || !m_OverrideHost.empty())
{
- ParseStorageOptions();
ParseAuthOptions();
+ }
+
+ std::string CloudHost;
+
+ if (!m_Host.empty())
+ {
+ if (m_OverrideHost.empty() || m_ZenCacheHost.empty())
+ {
+ HttpClient DiscoveryHttpClient(m_Host, ClientSettings);
+ HttpClient::Response ServerInfoResponse =
+ DiscoveryHttpClient.Get("/api/v1/status/servers", HttpClient::Accept(HttpContentType::kJSON));
+ if (!ServerInfoResponse.IsSuccess())
+ {
+ throw std::runtime_error(fmt::format("Failed to get list of servers from discovery url '{}'. Reason: '{}'",
+ m_Host,
+ ServerInfoResponse.ErrorMessage("")));
+ }
+
+ std::string_view JsonResponse = ServerInfoResponse.AsText();
+ CbObject ResponseObjectView = LoadCompactBinaryFromJson(JsonResponse).AsObject();
- HttpClient Http(m_BuildsUrl, ClientSettings);
+ if (m_OverrideHost.empty())
+ {
+ CbArrayView ServerEndpointsArray = ResponseObjectView["serverEndpoints"sv].AsArrayView();
+ std::uint64_t ServerCount = ServerEndpointsArray.Num();
+ if (ServerCount == 0)
+ {
+ throw std::runtime_error(fmt::format("Failed to find any builds hosts at {}", m_Host));
+ }
+ for (CbFieldView ServerEndpointView : ServerEndpointsArray)
+ {
+ CbObjectView ServerEndpointObject = ServerEndpointView.AsObjectView();
+ std::string_view BaseUrl = ServerEndpointObject["baseUrl"sv].AsString();
+ if (!BaseUrl.empty())
+ {
+ const bool AssumeHttp2 = ServerEndpointObject["assumeHttp2"sv].AsBool(false);
+ std::string_view Name = ServerEndpointObject["name"sv].AsString();
+
+ HttpClientSettings TestClientSettings{.LogCategory = "httpbuildsclient",
+ .ConnectTimeout = std::chrono::milliseconds{1000},
+ .Timeout = std::chrono::milliseconds{2000},
+ .AssumeHttp2 = AssumeHttp2,
+ .AllowResume = true,
+ .RetryCount = 0};
+
+ HttpClient TestHttpClient(BaseUrl, TestClientSettings);
+ HttpClient::Response TestResponse = TestHttpClient.Get("/health/live");
+ if (TestResponse.IsSuccess())
+ {
+ CloudHost = BaseUrl;
+ m_AssumeHttp2 = AssumeHttp2;
+ BuildStorageName = Name;
+ break;
+ }
+ }
+ }
+ if (CloudHost.empty())
+ {
+ throw std::runtime_error(
+ fmt::format("Failed to find any usable builds hosts out of {} using {}", ServerCount, m_Host));
+ }
+ }
+
+ auto TestCacheEndpoint = [](std::string_view BaseUrl, const bool AssumeHttp2) -> bool {
+ HttpClientSettings TestClientSettings{.LogCategory = "httpcacheclient",
+ .ConnectTimeout = std::chrono::milliseconds{1000},
+ .Timeout = std::chrono::milliseconds{2000},
+ .AssumeHttp2 = AssumeHttp2,
+ .AllowResume = true,
+ .RetryCount = 0};
+ HttpClient TestHttpClient(BaseUrl, TestClientSettings);
+ HttpClient::Response TestResponse = TestHttpClient.Get("/status/builds");
+ if (TestResponse.IsSuccess())
+ {
+ return true;
+ }
+ return false;
+ };
+
+ if (m_ZenCacheHost.empty())
+ {
+ CbArrayView CacheEndpointsArray = ResponseObjectView["cacheEndpoints"sv].AsArrayView();
+ std::uint64_t CacheCount = CacheEndpointsArray.Num();
+ for (CbFieldView CacheEndpointView : CacheEndpointsArray)
+ {
+ CbObjectView CacheEndpointObject = CacheEndpointView.AsObjectView();
+
+ std::string_view BaseUrl = CacheEndpointObject["baseUrl"sv].AsString();
+ if (!BaseUrl.empty())
+ {
+ const bool AssumeHttp2 = CacheEndpointObject["assumeHttp2"sv].AsBool(false);
+ std::string_view Name = CacheEndpointObject["name"sv].AsString();
+
+ if (TestCacheEndpoint(BaseUrl, AssumeHttp2))
+ {
+ m_ZenCacheHost = BaseUrl;
+ CacheAssumeHttp2 = AssumeHttp2;
+ BuildCacheName = Name;
+ break;
+ }
+ }
+ }
+ if (m_ZenCacheHost.empty())
+ {
+ ZenServerState State;
+ if (State.InitializeReadOnly())
+ {
+ State.Snapshot([&](const ZenServerState::ZenServerEntry& Entry) {
+ if (m_ZenCacheHost.empty())
+ {
+ std::string ZenServerLocalHostUrl =
+ fmt::format("http://127.0.0.1:{}", Entry.EffectiveListenPort.load());
+ if (TestCacheEndpoint(ZenServerLocalHostUrl, false))
+ {
+ m_ZenCacheHost = ZenServerLocalHostUrl;
+ CacheAssumeHttp2 = false;
+ BuildCacheName = "localhost";
+ }
+ }
+ });
+ }
+ if (m_ZenCacheHost.empty())
+ {
+ ZEN_CONSOLE("Warning: Failed to find any usable cache hosts out of {} using {}", CacheCount, m_Host);
+ }
+ }
+ }
+ else if (TestCacheEndpoint(m_ZenCacheHost, false))
+ {
+ std::string::size_type HostnameStart = 0;
+ std::string::size_type HostnameLength = std::string::npos;
+ if (auto StartPos = m_ZenCacheHost.find("//"); StartPos != std::string::npos)
+ {
+ HostnameStart = StartPos + 2;
+ }
+ if (auto EndPos = m_ZenCacheHost.find("/", HostnameStart); EndPos != std::string::npos)
+ {
+ HostnameLength = EndPos - HostnameStart;
+ }
+ BuildCacheName = m_ZenCacheHost.substr(HostnameStart, HostnameLength);
+ }
+ }
+ }
+ else
+ {
+ CloudHost = m_OverrideHost;
+ }
+
+ if (!CloudHost.empty())
+ {
+ Result.BuildStorageHttp = std::make_unique<HttpClient>(CloudHost, ClientSettings);
+ StorageDescription = fmt::format("Cloud {}{}. SessionId: '{}'. Namespace '{}', Bucket '{}'",
+ BuildStorageName.empty() ? "" : fmt::format("{}, ", BuildStorageName),
+ CloudHost,
+ Result.BuildStorageHttp->GetSessionId(),
+ m_Namespace,
+ m_Bucket);
+ Result.BuildStorage =
+ CreateJupiterBuildStorage(Log(), *Result.BuildStorageHttp, StorageStats, m_Namespace, m_Bucket, TempPath / "storage");
+ Result.StorageName = BuildStorageName;
+ }
+ else if (!m_StoragePath.empty())
+ {
+ std::filesystem::path StoragePath = MakeSafeAbsolutePath(m_StoragePath);
+ StorageDescription = fmt::format("folder {}", StoragePath);
+ Result.BuildStorage = CreateFileBuildStorage(StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec);
+ Result.StorageName = fmt::format("Disk {}", StoragePath.stem());
+ }
+ else
+ {
+ throw zen::OptionParseException(fmt::format("Storage option is missing\n{}", m_UploadOptions.help()));
+ }
+ if (!m_ZenCacheHost.empty())
+ {
+ Result.CacheHttp = std::make_unique<HttpClient>(m_ZenCacheHost,
+ HttpClientSettings{.LogCategory = "httpcacheclient",
+ .ConnectTimeout = std::chrono::milliseconds{3000},
+ .Timeout = std::chrono::milliseconds{30000},
+ .AssumeHttp2 = CacheAssumeHttp2,
+ .AllowResume = true,
+ .RetryCount = 0});
+ Result.BuildCacheStorage = CreateZenBuildStorageCache(*Result.CacheHttp,
+ StorageCacheStats,
+ m_Namespace,
+ m_Bucket,
+ TempPath / "zencache",
+ m_PrimeCacheOnly);
+ CacheDescription = fmt::format("Zen {}{}. SessionId: '{}'",
+ BuildCacheName.empty() ? "" : fmt::format("{}, ", BuildCacheName),
+ m_ZenCacheHost,
+ Result.CacheHttp->GetSessionId());
+
+ if (!m_Namespace.empty())
+ {
+ CacheDescription += fmt::format(". Namespace '{}'", m_Namespace);
+ }
+ if (!m_Bucket.empty())
+ {
+ CacheDescription += fmt::format(" Bucket '{}'", m_Bucket);
+ }
+ Result.CacheName = BuildCacheName;
+ }
+ ZEN_CONSOLE("Remote: {}", StorageDescription);
+ if (!Result.CacheName.empty())
+ {
+ ZEN_CONSOLE("Cache : {}", CacheDescription);
+ }
+ return Result;
+ };
+
+ BoostWorkerThreads = m_BoostWorkerThreads;
+
+ try
+ {
+ if (SubOption == &m_ListOptions)
+ {
+ if (!m_ListResultPath.empty())
+ {
+ ZEN_CONSOLE("Running {}: {}", GetRunningExecutablePath(), ZEN_CFG_VERSION_BUILD_STRING_FULL);
+ }
CbObject QueryObject;
if (m_ListQueryPath.empty())
{
@@ -7505,7 +9039,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
}
else
{
- std::filesystem::path ListQueryPath = StringToPath(m_ListQueryPath);
+ std::filesystem::path ListQueryPath = MakeSafeAbsolutePath(m_ListQueryPath);
if (ToLower(ListQueryPath.extension().string()) == ".cbo")
{
QueryObject = LoadCompactBinaryObject(IoBufferBuilder::MakeFromFile(ListQueryPath));
@@ -7525,28 +9059,22 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
}
BuildStorage::Statistics StorageStats;
- std::unique_ptr<BuildStorage> Storage;
- if (!m_BuildsUrl.empty())
- {
- ZEN_CONSOLE_VERBOSE("Querying builds in cloud endpoint '{}'. SessionId: '{}'. Namespace '{}', Bucket '{}'",
- m_BuildsUrl,
- Http.GetSessionId(),
- m_Namespace,
- m_Bucket);
- Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, std::filesystem::path{});
- }
- else if (!m_StoragePath.empty())
- {
- std::filesystem::path StoragePath = StringToPath(m_StoragePath);
- ZEN_CONSOLE_VERBOSE("Querying builds in folder '{}'.", StoragePath);
- Storage = CreateFileBuildStorage(StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec);
- }
- else
- {
- throw zen::OptionParseException(fmt::format("Storage option is missing\n{}", m_UploadOptions.help()));
- }
+ BuildStorageCache::Statistics StorageCacheStats;
- CbObject Response = Storage->ListBuilds(QueryObject);
+ const std::filesystem::path ZenFolderPath =
+ m_ZenFolderPath.empty() ? MakeSafeAbsolutePath(".") / ZenFolderName : MakeSafeAbsolutePath(m_ZenFolderPath);
+ CreateDirectories(ZenFolderPath);
+ auto _ = MakeGuard([ZenFolderPath]() {
+ if (CleanDirectory(ZenFolderPath, {}))
+ {
+ std::error_code DummyEc;
+ RemoveDir(ZenFolderPath, DummyEc);
+ }
+ });
+
+ StorageInstance Storage = CreateBuildStorage(StorageStats, StorageCacheStats, ZenTempFolderPath(ZenFolderPath));
+
+ CbObject Response = Storage.BuildStorage->ListBuilds(QueryObject);
ZEN_ASSERT(ValidateCompactBinary(Response.GetView(), CbValidateMode::All) == CbValidateError::None);
if (m_ListResultPath.empty())
{
@@ -7556,7 +9084,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
}
else
{
- std::filesystem::path ListResultPath = StringToPath(m_ListResultPath);
+ std::filesystem::path ListResultPath = MakeSafeAbsolutePath(m_ListResultPath);
if (ToLower(ListResultPath.extension().string()) == ".cbo")
{
MemoryView ResponseView = Response.GetView();
@@ -7575,10 +9103,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
if (SubOption == &m_UploadOptions)
{
- ParseStorageOptions();
- ParseAuthOptions();
-
- HttpClient Http(m_BuildsUrl, ClientSettings);
+ ZEN_CONSOLE("Running {}: {}", GetRunningExecutablePath(), ZEN_CFG_VERSION_BUILD_STRING_FULL);
if (m_Path.empty())
{
@@ -7610,7 +9135,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
}
}
- std::filesystem::path Path = StringToPath(m_Path);
+ std::filesystem::path Path = MakeSafeAbsolutePath(m_Path);
if (m_BuildPartName.empty())
{
@@ -7645,48 +9170,31 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
throw zen::OptionParseException(fmt::format("Invalid build part id\n{}", m_UploadOptions.help()));
}
+ const Oid BuildId = Oid::FromHexString(m_BuildId);
+ const Oid BuildPartId = Oid::FromHexString(m_BuildPartId);
+
BuildStorage::Statistics StorageStats;
- const Oid BuildId = Oid::FromHexString(m_BuildId);
- const Oid BuildPartId = Oid::FromHexString(m_BuildPartId);
- std::unique_ptr<BuildStorage> Storage;
- std::string StorageName;
- if (!m_BuildsUrl.empty())
- {
- ZEN_CONSOLE("Uploading '{}' from '{}' to cloud endpoint '{}'. SessionId: '{}'. Namespace '{}', Bucket '{}', {}BuildId '{}'",
- m_BuildPartName,
- Path,
- m_BuildsUrl,
- Http.GetSessionId(),
- m_Namespace,
- m_Bucket,
- GeneratedBuildId ? "Generated " : "",
- BuildId);
- Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, Path / ZenTempStorageFolderName);
- StorageName = "Cloud DDC";
- }
- else if (!m_StoragePath.empty())
- {
- std::filesystem::path StoragePath = StringToPath(m_StoragePath);
- ZEN_CONSOLE("Uploading '{}' from '{}' to folder '{}'. {}BuildId '{}'",
- m_BuildPartName,
- Path,
- StoragePath,
- GeneratedBuildId ? "Generated " : "",
- BuildId);
- Storage = CreateFileBuildStorage(StoragePath, StorageStats, m_WriteMetadataAsJson, DefaultLatency, DefaultDelayPerKBSec);
- StorageName = fmt::format("Disk {}", StoragePath.stem());
- }
- else
- {
- throw zen::OptionParseException(fmt::format("Storage option is missing\n{}", m_UploadOptions.help()));
- }
+ BuildStorageCache::Statistics StorageCacheStats;
+
+ const std::filesystem::path ZenFolderPath =
+ m_ZenFolderPath.empty() ? MakeSafeAbsolutePath(".") / ZenFolderName : MakeSafeAbsolutePath(m_ZenFolderPath);
+ CreateDirectories(ZenFolderPath);
+ auto _ = MakeGuard([ZenFolderPath]() {
+ if (CleanDirectory(ZenFolderPath, {}))
+ {
+ std::error_code DummyEc;
+ RemoveDir(ZenFolderPath, DummyEc);
+ }
+ });
+
+ StorageInstance Storage = CreateBuildStorage(StorageStats, StorageCacheStats, ZenTempFolderPath(ZenFolderPath));
CbObject MetaData;
if (m_CreateBuild)
{
if (!m_BuildMetadataPath.empty())
{
- std::filesystem::path MetadataPath = StringToPath(m_BuildMetadataPath);
+ std::filesystem::path MetadataPath = MakeSafeAbsolutePath(m_BuildMetadataPath);
IoBuffer MetaDataJson = ReadFile(MetadataPath).Flatten();
std::string_view Json(reinterpret_cast<const char*>(MetaDataJson.GetData()), MetaDataJson.GetSize());
std::string JsonError;
@@ -7713,12 +9221,14 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
}
}
- UploadFolder(*Storage,
+ UploadFolder(Storage,
BuildId,
BuildPartId,
m_BuildPartName,
Path,
- StringToPath(m_ManifestPath),
+ ZenFolderPath,
+ MakeSafeAbsolutePath(m_ManifestPath),
+ m_FindBlockMaxCount,
m_BlockReuseMinPercentLimit,
m_AllowMultiparts,
MetaData,
@@ -7735,7 +9245,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
"Requests: {}\n"
"Avg Request Time: {}\n"
"Avg I/O Time: {}",
- StorageName,
+ Storage.StorageName,
NiceBytes(StorageStats.TotalBytesRead.load()),
NiceBytes(StorageStats.TotalBytesWritten.load()),
StorageStats.TotalRequestCount.load(),
@@ -7751,10 +9261,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
if (SubOption == &m_DownloadOptions)
{
- ParseStorageOptions();
- ParseAuthOptions();
-
- HttpClient Http(m_BuildsUrl, ClientSettings);
+ ZEN_CONSOLE("Running {}: {}", GetRunningExecutablePath(), ZEN_CFG_VERSION_BUILD_STRING_FULL);
if (m_Path.empty())
{
@@ -7775,6 +9282,22 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
throw zen::OptionParseException(fmt::format("build-part-id conflicts with build-part-name\n{}", m_DownloadOptions.help()));
}
+ if (m_PostDownloadVerify && m_PrimeCacheOnly)
+ {
+ throw zen::OptionParseException(
+ fmt::format("'cache-prime-only' option is not compatible with 'verify' option\n{}", m_DownloadOptions.help()));
+ }
+
+ if (m_Clean && m_PrimeCacheOnly)
+ {
+ ZEN_WARN("ignoring 'clean' option when 'cache-prime-only' is enabled");
+ }
+
+ if (m_AllowPartialBlockRequests && m_PrimeCacheOnly)
+ {
+ ZEN_WARN("ignoring 'allow-partial-block-requests' option when 'cache-prime-only' is enabled");
+ }
+
std::vector<Oid> BuildPartIds;
for (const std::string& BuildPartId : m_BuildPartIds)
{
@@ -7786,45 +9309,27 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
}
}
- std::filesystem::path Path = StringToPath(m_Path);
+ std::filesystem::path Path = MakeSafeAbsolutePath(m_Path);
BuildStorage::Statistics StorageStats;
- std::unique_ptr<BuildStorage> Storage;
- std::string StorageName;
- if (!m_BuildsUrl.empty())
- {
- ZEN_CONSOLE("Downloading '{}' to '{}' from cloud endpoint {}. SessionId: '{}'. Namespace '{}', Bucket '{}', BuildId '{}'",
- BuildId,
- Path,
- m_BuildsUrl,
- Http.GetSessionId(),
- m_Namespace,
- m_Bucket,
- BuildId);
- Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, Path / ZenTempStorageFolderName);
- StorageName = "Cloud DDC";
- }
- else if (!m_StoragePath.empty())
- {
- std::filesystem::path StoragePath = StringToPath(m_StoragePath);
- ZEN_CONSOLE("Downloading '{}' to '{}' from folder {}. BuildId '{}'", BuildId, Path, StoragePath, BuildId);
- Storage = CreateFileBuildStorage(StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec);
- StorageName = fmt::format("Disk {}", StoragePath.stem());
- }
- else
- {
- throw zen::OptionParseException(fmt::format("Storage option is missing\n{}", m_UploadOptions.help()));
- }
+ BuildStorageCache::Statistics StorageCacheStats;
+
+ const std::filesystem::path ZenFolderPath =
+ m_ZenFolderPath.empty() ? Path / ZenFolderName : MakeSafeAbsolutePath(m_ZenFolderPath);
- DownloadFolder(*Storage,
+ StorageInstance Storage = CreateBuildStorage(StorageStats, StorageCacheStats, ZenTempFolderPath(ZenFolderPath));
+
+ DownloadFolder(Storage,
BuildId,
BuildPartIds,
m_BuildPartNames,
Path,
+ ZenFolderPath,
m_AllowMultiparts,
- m_AllowPartialBlockRequests,
+ m_AllowPartialBlockRequests && !m_PrimeCacheOnly,
m_Clean,
- m_PostDownloadVerify);
+ m_PostDownloadVerify,
+ m_PrimeCacheOnly);
if (false)
{
@@ -7835,7 +9340,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
"Requests: {}\n"
"Avg Request Time: {}\n"
"Avg I/O Time: {}",
- StorageName,
+ Storage.StorageName,
NiceBytes(StorageStats.TotalBytesRead.load()),
NiceBytes(StorageStats.TotalBytesWritten.load()),
StorageStats.TotalRequestCount.load(),
@@ -7859,8 +9364,8 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
{
throw zen::OptionParseException(fmt::format("compare-path is required\n{}", m_DownloadOptions.help()));
}
- std::filesystem::path Path = StringToPath(m_Path);
- std::filesystem::path DiffPath = StringToPath(m_DiffPath);
+ std::filesystem::path Path = MakeSafeAbsolutePath(m_Path);
+ std::filesystem::path DiffPath = MakeSafeAbsolutePath(m_DiffPath);
DiffFolders(Path, DiffPath, m_OnlyChunked);
return AbortFlag ? 11 : 0;
}
@@ -7872,10 +9377,6 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
throw zen::OptionParseException(fmt::format("local-path is required\n{}", m_DownloadOptions.help()));
}
- ParseStorageOptions();
- ParseAuthOptions();
-
- HttpClient Http(m_BuildsUrl, ClientSettings);
// m_StoragePath = "D:\\buildstorage";
// m_Path = "F:\\Saved\\DownloadedBuilds\\++Fortnite+Main-CL-XXXXXXXX\\WindowsClient";
// std::vector<std::string> BuildIdStrings{"07d3942f0e7f4ca1b13b0587",
@@ -7886,34 +9387,15 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
// "07d3964f919d577a321a1fdd",
// "07d396a6ce875004e16b9528"};
- std::filesystem::path Path = StringToPath(m_Path);
+ std::filesystem::path Path = MakeSafeAbsolutePath(m_Path);
BuildStorage::Statistics StorageStats;
- std::unique_ptr<BuildStorage> Storage;
- std::string StorageName;
- if (!m_BuildsUrl.empty())
- {
- ZEN_CONSOLE("Downloading {} to '{}' from cloud endpoint {}. SessionId: '{}'. Namespace '{}', Bucket '{}'",
- FormatArray<std::string>(m_BuildIds, " "sv),
- Path,
- m_BuildsUrl,
- Http.GetSessionId(),
- m_Namespace,
- m_Bucket);
- Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, Path / ZenTempStorageFolderName);
- StorageName = "Cloud DDC";
- }
- else if (!m_StoragePath.empty())
- {
- std::filesystem::path StoragePath = StringToPath(m_StoragePath);
- ZEN_CONSOLE("Downloading {}'to '{}' from folder {}", FormatArray<std::string>(m_BuildIds, " "sv), Path, StoragePath);
- Storage = CreateFileBuildStorage(StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec);
- StorageName = fmt::format("Disk {}", StoragePath.stem());
- }
- else
- {
- throw zen::OptionParseException(fmt::format("Storage option is missing\n{}", m_UploadOptions.help()));
- }
+ BuildStorageCache::Statistics StorageCacheStats;
+
+ const std::filesystem::path ZenFolderPath =
+ m_ZenFolderPath.empty() ? Path / ZenFolderName : MakeSafeAbsolutePath(m_ZenFolderPath);
+
+ StorageInstance Storage = CreateBuildStorage(StorageStats, StorageCacheStats, ZenTempFolderPath(ZenFolderPath));
Stopwatch Timer;
for (const std::string& BuildIdString : m_BuildIds)
@@ -7923,15 +9405,17 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
{
throw zen::OptionParseException(fmt::format("invalid build id {}\n{}", BuildIdString, m_DownloadOptions.help()));
}
- DownloadFolder(*Storage,
+ DownloadFolder(Storage,
BuildId,
{},
{},
Path,
+ ZenFolderPath,
m_AllowMultiparts,
m_AllowPartialBlockRequests,
BuildIdString == m_BuildIds.front(),
- true);
+ true,
+ false);
if (AbortFlag)
{
ZEN_CONSOLE("Download cancelled");
@@ -7945,71 +9429,45 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
if (SubOption == &m_TestOptions)
{
- ParseStorageOptions();
- ParseAuthOptions();
-
- HttpClient Http(m_BuildsUrl, ClientSettings);
-
if (m_Path.empty())
{
throw zen::OptionParseException(fmt::format("local-path is required\n{}", m_DownloadOptions.help()));
}
- std::filesystem::path Path = StringToPath(m_Path);
+ std::filesystem::path Path = MakeSafeAbsolutePath(m_Path);
m_BuildId = Oid::NewOid().ToString();
m_BuildPartName = Path.filename().string();
m_BuildPartId = Oid::NewOid().ToString();
m_CreateBuild = true;
- BuildStorage::Statistics StorageStats;
- const Oid BuildId = Oid::FromHexString(m_BuildId);
- const Oid BuildPartId = Oid::FromHexString(m_BuildPartId);
- std::unique_ptr<BuildStorage> Storage;
- std::string StorageName;
+ const Oid BuildId = Oid::FromHexString(m_BuildId);
+ const Oid BuildPartId = Oid::FromHexString(m_BuildPartId);
- std::filesystem::path StoragePath = StringToPath(m_StoragePath);
+ std::filesystem::path StoragePath = MakeSafeAbsolutePath(m_StoragePath);
- if (m_BuildsUrl.empty() && StoragePath.empty())
+ if (m_OverrideHost.empty() && StoragePath.empty())
{
StoragePath = (GetRunningExecutablePath().parent_path() / ".tmpstore").make_preferred();
CreateDirectories(StoragePath);
CleanDirectory(StoragePath, {});
+ m_StoragePath = StoragePath.generic_string();
}
auto _ = MakeGuard([&]() {
- if (m_BuildsUrl.empty() && StoragePath.empty())
+ if (m_OverrideHost.empty() && StoragePath.empty())
{
DeleteDirectories(StoragePath);
}
});
- if (!m_BuildsUrl.empty())
- {
- ZEN_CONSOLE("Using '{}' to '{}' from cloud endpoint {}. SessionId: '{}'. Namespace '{}', Bucket '{}', BuildId '{}'",
- m_BuildPartName.empty() ? m_BuildPartId : m_BuildPartName,
- Path,
- m_BuildsUrl,
- Http.GetSessionId(),
- m_Namespace,
- m_Bucket,
- BuildId);
- Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, Path / ZenTempStorageFolderName);
- StorageName = "Cloud DDC";
- }
- else if (!StoragePath.empty())
- {
- ZEN_CONSOLE("Using '{}' to '{}' from folder {}. BuildId '{}'",
- m_BuildPartName.empty() ? m_BuildPartId : m_BuildPartName,
- Path,
- StoragePath,
- BuildId);
- Storage = CreateFileBuildStorage(StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec);
- StorageName = fmt::format("Disk {}", StoragePath.stem());
- }
- else
- {
- throw zen::OptionParseException(fmt::format("Storage option is missing\n{}", m_UploadOptions.help()));
- }
+ BuildStorage::Statistics StorageStats;
+ BuildStorageCache::Statistics StorageCacheStats;
+
+ const std::filesystem::path DownloadPath = Path.parent_path() / (m_BuildPartName + "_test");
+ const std::filesystem::path ZenFolderPath =
+ m_ZenFolderPath.empty() ? DownloadPath / ZenFolderName : MakeSafeAbsolutePath(m_ZenFolderPath);
+
+ StorageInstance Storage = CreateBuildStorage(StorageStats, StorageCacheStats, ZenTempFolderPath(ZenFolderPath));
auto MakeMetaData = [](const Oid& BuildId) -> CbObject {
CbObjectWriter BuildMetaDataWriter;
@@ -8032,12 +9490,14 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
ZEN_CONSOLE("Upload Build {}, Part {} ({})\n{}", m_BuildId, BuildPartId, m_BuildPartName, SB.ToView());
}
- UploadFolder(*Storage,
+ UploadFolder(Storage,
BuildId,
BuildPartId,
m_BuildPartName,
Path,
+ ZenFolderPath,
{},
+ m_FindBlockMaxCount,
m_BlockReuseMinPercentLimit,
m_AllowMultiparts,
MetaData,
@@ -8050,9 +9510,18 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
return 11;
}
- const std::filesystem::path DownloadPath = Path.parent_path() / (m_BuildPartName + "_download");
ZEN_CONSOLE("\nDownload Build {}, Part {} ({}) to '{}'", BuildId, BuildPartId, m_BuildPartName, DownloadPath);
- DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, m_AllowPartialBlockRequests, true, true);
+ DownloadFolder(Storage,
+ BuildId,
+ {BuildPartId},
+ {},
+ DownloadPath,
+ ZenFolderPath,
+ m_AllowMultiparts,
+ m_AllowPartialBlockRequests,
+ true,
+ true,
+ false);
if (AbortFlag)
{
ZEN_CONSOLE("Download failed.");
@@ -8064,7 +9533,17 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
BuildPartId,
m_BuildPartName,
DownloadPath);
- DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, m_AllowPartialBlockRequests, false, true);
+ DownloadFolder(Storage,
+ BuildId,
+ {BuildPartId},
+ {},
+ DownloadPath,
+ ZenFolderPath,
+ m_AllowMultiparts,
+ m_AllowPartialBlockRequests,
+ false,
+ true,
+ false);
if (AbortFlag)
{
ZEN_CONSOLE("Re-download failed. (identical target)");
@@ -8115,11 +9594,11 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
if (SourceSize > 256)
{
Work.ScheduleWork(
- GetMediumWorkerPool(EWorkloadType::Burst),
- [SourceSize, FilePath](std::atomic<bool>&) {
+ GetIOWorkerPool(),
+ [SourceSize, FilePath = std::filesystem::path(FilePath)](std::atomic<bool>&) {
if (!AbortFlag)
{
- bool IsReadOnly = SetFileReadOnly(FilePath, false);
+ bool IsReadOnly = SetFileReadOnlyWithRetry(FilePath, false);
{
BasicFile Source(FilePath, BasicFile::Mode::kWrite);
uint64_t RangeSize = Min(SourceSize / 3, 512u * 1024u);
@@ -8146,7 +9625,10 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
}
break;
case 1:
- std::filesystem::remove(FilePath);
+ {
+ (void)SetFileReadOnlyWithRetry(FilePath, false);
+ RemoveFileWithRetry(FilePath);
+ }
break;
default:
break;
@@ -8168,7 +9650,17 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
BuildPartId,
m_BuildPartName,
DownloadPath);
- DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, m_AllowPartialBlockRequests, false, true);
+ DownloadFolder(Storage,
+ BuildId,
+ {BuildPartId},
+ {},
+ DownloadPath,
+ ZenFolderPath,
+ m_AllowMultiparts,
+ m_AllowPartialBlockRequests,
+ false,
+ true,
+ false);
if (AbortFlag)
{
ZEN_CONSOLE("Re-download failed. (scrambled target)");
@@ -8187,12 +9679,14 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
ZEN_CONSOLE("\nUpload scrambled Build {}, Part {} ({})\n{}\n", BuildId2, BuildPartId2, m_BuildPartName, SB.ToView());
}
- UploadFolder(*Storage,
+ UploadFolder(Storage,
BuildId2,
BuildPartId2,
m_BuildPartName,
DownloadPath,
+ ZenFolderPath,
{},
+ m_FindBlockMaxCount,
m_BlockReuseMinPercentLimit,
m_AllowMultiparts,
MetaData2,
@@ -8206,7 +9700,17 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
}
ZEN_CONSOLE("\nDownload Build {}, Part {} ({}) to '{}' (original)", BuildId, BuildPartId, m_BuildPartName, DownloadPath);
- DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, m_AllowPartialBlockRequests, false, true);
+ DownloadFolder(Storage,
+ BuildId,
+ {BuildPartId},
+ {},
+ DownloadPath,
+ ZenFolderPath,
+ m_AllowMultiparts,
+ m_AllowPartialBlockRequests,
+ false,
+ true,
+ false);
if (AbortFlag)
{
ZEN_CONSOLE("Re-download failed.");
@@ -8214,15 +9718,17 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
}
ZEN_CONSOLE("\nDownload Build {}, Part {} ({}) to '{}' (scrambled)", BuildId2, BuildPartId2, m_BuildPartName, DownloadPath);
- DownloadFolder(*Storage,
+ DownloadFolder(Storage,
BuildId2,
{BuildPartId2},
{},
DownloadPath,
+ ZenFolderPath,
m_AllowMultiparts,
m_AllowPartialBlockRequests,
false,
- true);
+ true,
+ false);
if (AbortFlag)
{
ZEN_CONSOLE("Re-download failed.");
@@ -8230,15 +9736,17 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
}
ZEN_CONSOLE("\nRe-download Build {}, Part {} ({}) to '{}' (scrambled)", BuildId2, BuildPartId2, m_BuildPartName, DownloadPath);
- DownloadFolder(*Storage,
+ DownloadFolder(Storage,
BuildId2,
{BuildPartId2},
{},
DownloadPath,
+ ZenFolderPath,
m_AllowMultiparts,
m_AllowPartialBlockRequests,
false,
- true);
+ true,
+ false);
if (AbortFlag)
{
ZEN_CONSOLE("Re-download failed.");
@@ -8250,11 +9758,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
if (SubOption == &m_FetchBlobOptions)
{
- ParseStorageOptions();
- ParseAuthOptions();
-
- HttpClient Http(m_BuildsUrl, ClientSettings);
-
+ ZEN_CONSOLE("Running {}: {}", GetRunningExecutablePath(), ZEN_CFG_VERSION_BUILD_STRING_FULL);
if (m_BlobHash.empty())
{
throw zen::OptionParseException(fmt::format("Blob hash string is missing\n{}", m_UploadOptions.help()));
@@ -8266,44 +9770,29 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
throw zen::OptionParseException(fmt::format("Blob hash string is invalid\n{}", m_UploadOptions.help()));
}
- if (m_BuildsUrl.empty() && m_StoragePath.empty())
- {
- throw zen::OptionParseException(fmt::format("At least one storage option is required\n{}", m_UploadOptions.help()));
- }
+ const Oid BuildId = Oid::FromHexString(m_BuildId);
+
+ std::filesystem::path Path = MakeSafeAbsolutePath(m_Path);
BuildStorage::Statistics StorageStats;
- const Oid BuildId = Oid::FromHexString(m_BuildId);
- std::unique_ptr<BuildStorage> Storage;
- std::string StorageName;
+ BuildStorageCache::Statistics StorageCacheStats;
- std::filesystem::path Path = StringToPath(m_Path);
+ const std::filesystem::path ZenFolderPath =
+ m_ZenFolderPath.empty() ? MakeSafeAbsolutePath(".") / ZenFolderName : MakeSafeAbsolutePath(m_ZenFolderPath);
+ CreateDirectories(ZenFolderPath);
+ auto _ = MakeGuard([ZenFolderPath]() {
+ if (CleanDirectory(ZenFolderPath, {}))
+ {
+ std::error_code DummyEc;
+ RemoveDir(ZenFolderPath, DummyEc);
+ }
+ });
- if (!m_BuildsUrl.empty())
- {
- ZEN_CONSOLE("Using from cloud endpoint {}. SessionId: '{}'. Namespace '{}', Bucket '{}', BuildId '{}'",
- m_BuildsUrl,
- Http.GetSessionId(),
- m_Namespace,
- m_Bucket,
- BuildId);
- Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, Path / ZenTempStorageFolderName);
- StorageName = "Cloud DDC";
- }
- else if (!m_StoragePath.empty())
- {
- std::filesystem::path StoragePath = StringToPath(m_StoragePath);
- ZEN_CONSOLE("Using folder {}. BuildId '{}'", StoragePath, BuildId);
- Storage = CreateFileBuildStorage(StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec);
- StorageName = fmt::format("Disk {}", StoragePath.stem());
- }
- else
- {
- throw zen::OptionParseException(fmt::format("Storage option is missing\n{}", m_UploadOptions.help()));
- }
+ StorageInstance Storage = CreateBuildStorage(StorageStats, StorageCacheStats, ZenTempFolderPath(ZenFolderPath));
uint64_t CompressedSize;
uint64_t DecompressedSize;
- ValidateBlob(*Storage, BuildId, BlobHash, CompressedSize, DecompressedSize);
+ ValidateBlob(*Storage.BuildStorage, BuildId, BlobHash, CompressedSize, DecompressedSize);
if (AbortFlag)
{
return 11;
@@ -8317,15 +9806,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
if (SubOption == &m_ValidateBuildPartOptions)
{
- ParseStorageOptions();
- ParseAuthOptions();
-
- HttpClient Http(m_BuildsUrl, ClientSettings);
-
- if (m_BuildsUrl.empty() && m_StoragePath.empty())
- {
- throw zen::OptionParseException(fmt::format("At least one storage option is required\n{}", m_UploadOptions.help()));
- }
+ ZEN_CONSOLE("Running {}: {}", GetRunningExecutablePath(), ZEN_CFG_VERSION_BUILD_STRING_FULL);
if (m_BuildId.empty())
{
@@ -8342,39 +9823,29 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
throw zen::OptionParseException(fmt::format("build-part-id conflicts with build-part-name\n{}", m_DownloadOptions.help()));
}
+ std::filesystem::path Path = MakeSafeAbsolutePath(m_Path);
+
BuildStorage::Statistics StorageStats;
- std::unique_ptr<BuildStorage> Storage;
- std::string StorageName;
+ BuildStorageCache::Statistics StorageCacheStats;
+
+ const std::filesystem::path ZenFolderPath =
+ m_ZenFolderPath.empty() ? MakeSafeAbsolutePath(".") / ZenFolderName : MakeSafeAbsolutePath(m_ZenFolderPath);
+ CreateDirectories(ZenFolderPath);
+ auto _ = MakeGuard([ZenFolderPath]() {
+ if (CleanDirectory(ZenFolderPath, {}))
+ {
+ std::error_code DummyEc;
+ RemoveDir(ZenFolderPath, DummyEc);
+ }
+ });
- std::filesystem::path Path = StringToPath(m_Path);
+ StorageInstance Storage = CreateBuildStorage(StorageStats, StorageCacheStats, ZenTempFolderPath(ZenFolderPath));
- if (!m_BuildsUrl.empty())
- {
- ZEN_CONSOLE("Using from cloud endpoint {}. SessionId: '{}'. Namespace '{}', Bucket '{}', BuildId '{}'",
- m_BuildsUrl,
- Http.GetSessionId(),
- m_Namespace,
- m_Bucket,
- BuildId);
- Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, Path / ZenTempStorageFolderName);
- StorageName = "Cloud DDC";
- }
- else if (!m_StoragePath.empty())
- {
- std::filesystem::path StoragePath = StringToPath(m_StoragePath);
- ZEN_CONSOLE("Using folder {}. BuildId '{}'", StoragePath, BuildId);
- Storage = CreateFileBuildStorage(StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec);
- StorageName = fmt::format("Disk {}", StoragePath.stem());
- }
- else
- {
- throw zen::OptionParseException(fmt::format("Storage option is missing\n{}", m_UploadOptions.help()));
- }
Oid BuildPartId = Oid::TryFromHexString(m_BuildPartId);
ValidateStatistics ValidateStats;
DownloadStatistics DownloadStats;
- ValidateBuildPart(*Storage, BuildId, BuildPartId, m_BuildPartName, ValidateStats, DownloadStats);
+ ValidateBuildPart(*Storage.BuildStorage, BuildId, BuildPartId, m_BuildPartName, ValidateStats, DownloadStats);
return AbortFlag ? 13 : 0;
}