diff options
| author | Florent Devillechabrol <[email protected]> | 2025-04-02 10:38:02 -0700 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-04-02 10:38:02 -0700 |
| commit | 486a22ad2c61bc1616d8745e0077eb320089bfec (patch) | |
| tree | 665d5c9002cd97c04ddffeaf211fcf8e55d01dce /src/zen/cmds/builds_cmd.cpp | |
| parent | Fixed missing trailing quote when converting binary data from compact binary ... (diff) | |
| parent | added --find-max-block-count option to builds upload (#337) (diff) | |
| download | archived-zen-486a22ad2c61bc1616d8745e0077eb320089bfec.tar.xz archived-zen-486a22ad2c61bc1616d8745e0077eb320089bfec.zip | |
Merge branch 'main' into fd-fix-binary-json
Diffstat (limited to 'src/zen/cmds/builds_cmd.cpp')
| -rw-r--r-- | src/zen/cmds/builds_cmd.cpp | 4215 |
1 files changed, 2843 insertions, 1372 deletions
diff --git a/src/zen/cmds/builds_cmd.cpp b/src/zen/cmds/builds_cmd.cpp index 889ccef0b..8e8fd480a 100644 --- a/src/zen/cmds/builds_cmd.cpp +++ b/src/zen/cmds/builds_cmd.cpp @@ -6,7 +6,9 @@ #include <zencore/compactbinarybuilder.h> #include <zencore/compactbinaryfile.h> #include <zencore/compactbinaryfmt.h> +#include <zencore/compactbinaryvalue.h> #include <zencore/compress.h> +#include <zencore/config.h> #include <zencore/except.h> #include <zencore/filesystem.h> #include <zencore/fmtutils.h> @@ -20,6 +22,7 @@ #include <zenhttp/httpclient.h> #include <zenhttp/httpclientauth.h> #include <zenhttp/httpcommon.h> +#include <zenutil/buildstoragecache.h> #include <zenutil/chunkblock.h> #include <zenutil/chunkedcontent.h> #include <zenutil/chunkedfile.h> @@ -51,6 +54,8 @@ ZEN_THIRD_PARTY_INCLUDES_END #define EXTRA_VERIFY 0 +#define ZEN_CLOUD_STORAGE "Cloud Storage" + namespace zen { namespace { static std::atomic<bool> AbortFlag = false; @@ -79,30 +84,56 @@ namespace { size_t MaxChunkEmbedSize = DefaultMaxChunkEmbedSize; }; - const ChunksBlockParameters DefaultChunksBlockParams{.MaxBlockSize = 32u * 1024u * 1024u, - .MaxChunkEmbedSize = DefaultChunkedParams.MaxSize}; - + const ChunksBlockParameters DefaultChunksBlockParams{ + .MaxBlockSize = 32u * 1024u * 1024u, + .MaxChunkEmbedSize = 2u * 1024u * 1024u // DefaultChunkedParams.MaxSize + }; const uint64_t DefaultPreferredMultipartChunkSize = 32u * 1024u * 1024u; const double DefaultLatency = 0; // .0010; const double DefaultDelayPerKBSec = 0; // 0.00005; - const std::string ZenFolderName = ".zen"; - const std::string ZenStateFilePath = fmt::format("{}/current_state.cbo", ZenFolderName); - const std::string ZenStateFileJsonPath = fmt::format("{}/current_state.json", ZenFolderName); - const std::string ZenTempFolderName = fmt::format("{}/tmp", ZenFolderName); + const bool SingleThreaded = false; + bool BoostWorkerThreads = false; - const std::string ZenTempCacheFolderName = - fmt::format("{}/cache", ZenTempFolderName); // Decompressed and verified data - chunks & sequences - const std::string ZenTempBlockFolderName = fmt::format("{}/blocks", ZenTempFolderName); // Temp storage for whole and partial blocks - const std::string ZenTempChunkFolderName = - fmt::format("{}/chunks", ZenTempFolderName); // Temp storage for decompressed and validated chunks + WorkerThreadPool& GetIOWorkerPool() + { + return SingleThreaded ? GetSyncWorkerPool() + : BoostWorkerThreads ? GetLargeWorkerPool(EWorkloadType::Burst) + : GetMediumWorkerPool(EWorkloadType::Burst); + } + WorkerThreadPool& GetNetworkPool() { return SingleThreaded ? GetSyncWorkerPool() : GetSmallWorkerPool(EWorkloadType::Burst); } - const std::string ZenTempDownloadFolderName = - fmt::format("{}/download", ZenTempFolderName); // Temp storage for unverfied downloaded blobs + const uint64_t MinimumSizeForCompressInBlock = 2u * 1024u; - const std::string ZenTempStorageFolderName = - fmt::format("{}/storage", ZenTempFolderName); // Temp storage folder for BuildStorage implementations + const std::string ZenFolderName = ".zen"; + std::filesystem::path ZenStateFilePath(const std::filesystem::path& ZenFolderPath) { return ZenFolderPath / "current_state.cbo"; } + // std::filesystem::path ZenStateFileJsonPath(const std::filesystem::path& ZenFolderPath) { return ZenFolderPath / "current_state.json"; + // } + std::filesystem::path ZenTempFolderPath(const std::filesystem::path& ZenFolderPath) { return ZenFolderPath / "tmp"; } + + std::filesystem::path ZenTempCacheFolderPath(const std::filesystem::path& ZenFolderPath) + { + return ZenTempFolderPath(ZenFolderPath) / "cache"; // Decompressed and verified data - chunks & sequences + } + std::filesystem::path ZenTempBlockFolderPath(const std::filesystem::path& ZenFolderPath) + { + return ZenTempFolderPath(ZenFolderPath) / "blocks"; // Temp storage for whole and partial blocks + } + std::filesystem::path ZenTempChunkFolderPath(const std::filesystem::path& ZenFolderPath) + { + return ZenTempFolderPath(ZenFolderPath) / "chunks"; // Temp storage for decompressed and validated chunks + } + + std::filesystem::path ZenTempDownloadFolderPath(const std::filesystem::path& ZenFolderPath) + { + return ZenTempFolderPath(ZenFolderPath) / "download"; // Temp storage for decompressed and validated chunks + } + + // std::filesystem::path ZenTempStorageFolderPath(const std::filesystem::path& ZenFolderPath) + // { + // return ZenTempFolderPath(ZenFolderPath) / "storage"; // Temp storage folder for BuildStorage implementations + // } const std::string ZenExcludeManifestName = ".zen_exclude_manifest.txt"; @@ -133,6 +164,98 @@ namespace { ); + std::filesystem::path MakeSafeAbsolutePath(const std::string Path) + { + std::filesystem::path AbsolutePath = std::filesystem::absolute(StringToPath(Path)).make_preferred(); +#if ZEN_PLATFORM_WINDOWS && 1 + const std::string_view Prefix = "\\\\?\\"; + const std::u8string PrefixU8(Prefix.begin(), Prefix.end()); + std::u8string PathString = AbsolutePath.u8string(); + if (!PathString.empty() && !PathString.starts_with(PrefixU8)) + { + PathString.insert(0, PrefixU8); + return std::filesystem::path(PathString); + } +#endif + return AbsolutePath; + } + + void RenameFileWithRetry(const std::filesystem::path& SourcePath, const std::filesystem::path& TargetPath) + { + std::error_code Ec; + RenameFile(SourcePath, TargetPath, Ec); + for (size_t Retries = 0; Ec && Retries < 3; Retries++) + { + Sleep(100 + int(Retries * 50)); + RenameFile(SourcePath, TargetPath, Ec); + } + if (Ec) + { + zen::ThrowSystemError(Ec.value(), Ec.message()); + } + } + + bool SetFileReadOnlyWithRetry(const std::filesystem::path& Path, bool ReadOnly) + { + std::error_code Ec; + bool Result = SetFileReadOnly(Path, ReadOnly, Ec); + for (size_t Retries = 0; Ec && Retries < 3; Retries++) + { + Sleep(100 + int(Retries * 50)); + if (!IsFile(Path)) + { + return false; + } + Ec.clear(); + Result = SetFileReadOnly(Path, ReadOnly, Ec); + } + if (Ec) + { + zen::ThrowSystemError(Ec.value(), Ec.message()); + } + return Result; + } + + void RemoveFileWithRetry(const std::filesystem::path& Path) + { + std::error_code Ec; + RemoveFile(Path, Ec); + for (size_t Retries = 0; Ec && Retries < 3; Retries++) + { + Sleep(100 + int(Retries * 50)); + if (!IsFile(Path)) + { + return; + } + Ec.clear(); + RemoveFile(Path, Ec); + } + if (Ec) + { + zen::ThrowSystemError(Ec.value(), Ec.message()); + } + } + + void RemoveDirWithRetry(const std::filesystem::path& Path) + { + std::error_code Ec; + RemoveDir(Path, Ec); + for (size_t Retries = 0; Ec && Retries < 3; Retries++) + { + Sleep(100 + int(Retries * 50)); + if (!IsDir(Path)) + { + return; + } + Ec.clear(); + RemoveDir(Path, Ec); + } + if (Ec) + { + zen::ThrowSystemError(Ec.value(), Ec.message()); + } + } + uint32_t SetNativeFileAttributes(const std::filesystem::path FilePath, SourcePlatform SourcePlatform, uint32_t Attributes) { #if ZEN_PLATFORM_WINDOWS @@ -195,34 +318,109 @@ namespace { bool CleanDirectory(const std::filesystem::path& Path, std::span<const std::string_view> ExcludeDirectories) { ZEN_TRACE_CPU("CleanDirectory"); + Stopwatch Timer; - bool CleanWipe = true; + ProgressBar Progress(UsePlainProgress); - DirectoryContent LocalDirectoryContent; - GetDirectoryContent(Path, DirectoryContentFlags::IncludeDirs | DirectoryContentFlags::IncludeFiles, LocalDirectoryContent); - for (const std::filesystem::path& LocalFilePath : LocalDirectoryContent.Files) + std::atomic<bool> CleanWipe = true; + std::atomic<uint64_t> DiscoveredItemCount = 0; + std::atomic<uint64_t> DeletedItemCount = 0; + std::atomic<uint64_t> DeletedByteCount = 0; + ParallellWork Work(AbortFlag); + + struct AsyncVisitor : public GetDirectoryContentVisitor { - try + AsyncVisitor(const std::filesystem::path& InPath, + std::atomic<bool>& InCleanWipe, + std::atomic<uint64_t>& InDiscoveredItemCount, + std::atomic<uint64_t>& InDeletedItemCount, + std::atomic<uint64_t>& InDeletedByteCount, + std::span<const std::string_view> InExcludeDirectories) + : Path(InPath) + , CleanWipe(InCleanWipe) + , DiscoveredItemCount(InDiscoveredItemCount) + , DeletedItemCount(InDeletedItemCount) + , DeletedByteCount(InDeletedByteCount) + , ExcludeDirectories(InExcludeDirectories) { - std::filesystem::remove(LocalFilePath); } - catch (const std::exception&) + virtual void AsyncVisitDirectory(const std::filesystem::path& RelativeRoot, DirectoryContent&& Content) override { - // DeleteOnClose files may be a bit slow in getting cleaned up, so pause amd retry one time - Sleep(200); - try - { - std::filesystem::remove(LocalFilePath); - } - catch (const std::exception& Ex) + ZEN_TRACE_CPU("CleanDirectory_AsyncVisitDirectory"); + if (!AbortFlag) { - ZEN_WARN("Failed removing file {}. Reason: {}", LocalFilePath, Ex.what()); - CleanWipe = false; + if (!Content.FileNames.empty()) + { + DiscoveredItemCount += Content.FileNames.size(); + + const std::string RelativeRootString = RelativeRoot.generic_string(); + bool RemoveContent = true; + for (const std::string_view ExcludeDirectory : ExcludeDirectories) + { + if (RelativeRootString.starts_with(ExcludeDirectory)) + { + if (RelativeRootString.length() > ExcludeDirectory.length()) + { + const char MaybePathDelimiter = RelativeRootString[ExcludeDirectory.length()]; + if (MaybePathDelimiter == '/' || MaybePathDelimiter == '\\' || + MaybePathDelimiter == std::filesystem::path::preferred_separator) + { + RemoveContent = false; + break; + } + } + else + { + RemoveContent = false; + break; + } + } + } + if (RemoveContent) + { + ZEN_TRACE_CPU("DeleteFiles"); + for (size_t FileIndex = 0; FileIndex < Content.FileNames.size(); FileIndex++) + { + const std::filesystem::path& FileName = Content.FileNames[FileIndex]; + const std::filesystem::path FilePath = (Path / RelativeRoot / FileName).make_preferred(); + try + { + SetFileReadOnlyWithRetry(FilePath, false); + RemoveFileWithRetry(FilePath); + DeletedItemCount++; + DeletedByteCount += Content.FileSizes[FileIndex]; + } + catch (const std::exception& Ex) + { + ZEN_WARN("Failed removing file {}. Reason: {}", FilePath, Ex.what()); + CleanWipe = false; + } + } + } + } } } - } + const std::filesystem::path& Path; + std::atomic<bool>& CleanWipe; + std::atomic<uint64_t>& DiscoveredItemCount; + std::atomic<uint64_t>& DeletedItemCount; + std::atomic<uint64_t>& DeletedByteCount; + std::span<const std::string_view> ExcludeDirectories; + } Visitor(Path, CleanWipe, DiscoveredItemCount, DeletedItemCount, DeletedByteCount, ExcludeDirectories); - for (const std::filesystem::path& LocalDirPath : LocalDirectoryContent.Directories) + GetDirectoryContent( + Path, + DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::Recursive | DirectoryContentFlags::IncludeFileSizes, + Visitor, + GetIOWorkerPool(), + Work.PendingWork()); + + DirectoryContent LocalDirectoryContent; + GetDirectoryContent(Path, DirectoryContentFlags::IncludeDirs | DirectoryContentFlags::IncludeFiles, LocalDirectoryContent); + DiscoveredItemCount += LocalDirectoryContent.Directories.size(); + std::vector<std::filesystem::path> DirectoriesToDelete; + DirectoriesToDelete.reserve(LocalDirectoryContent.Directories.size()); + for (std::filesystem::path& LocalDirPath : LocalDirectoryContent.Directories) { bool Leave = false; for (const std::string_view ExcludeDirectory : ExcludeDirectories) @@ -235,33 +433,84 @@ namespace { } if (!Leave) { - try - { - zen::CleanDirectory(LocalDirPath); - std::filesystem::remove(LocalDirPath); - } - catch (const std::exception&) + DirectoriesToDelete.emplace_back(std::move(LocalDirPath)); + DiscoveredItemCount++; + } + } + + uint64_t LastUpdateTimeMs = Timer.GetElapsedTimeMs(); + + Work.Wait(UsePlainProgress ? 5000 : 200, [&](bool IsAborted, ptrdiff_t PendingWork) { + ZEN_UNUSED(IsAborted, PendingWork); + LastUpdateTimeMs = Timer.GetElapsedTimeMs(); + + uint64_t Deleted = DeletedItemCount.load(); + uint64_t DeletedBytes = DeletedByteCount.load(); + uint64_t Discovered = DiscoveredItemCount.load(); + Progress.UpdateState({.Task = "Cleaning folder ", + .Details = fmt::format("Found {}, Deleted {} ({})", Discovered, Deleted, NiceBytes(DeletedBytes)), + .TotalCount = Discovered, + .RemainingCount = Discovered - Deleted}, + false); + }); + + for (const std::filesystem::path& DirectoryToDelete : DirectoriesToDelete) + { + ZEN_TRACE_CPU("DeleteDirs"); + try + { + std::error_code Ec; + zen::CleanDirectory(DirectoryToDelete, true, Ec); + if (Ec) { Sleep(200); - try - { - zen::CleanDirectory(LocalDirPath); - std::filesystem::remove(LocalDirPath); - } - catch (const std::exception& Ex) - { - ZEN_WARN("Failed removing directory {}. Reason: {}", LocalDirPath, Ex.what()); - CleanWipe = false; - } + zen::CleanDirectory(DirectoryToDelete, true); + Ec.clear(); } + + RemoveDirWithRetry(DirectoryToDelete); + + DeletedItemCount++; + } + catch (const std::exception& Ex) + { + ZEN_WARN("Failed removing directory {}. Reason: {}", DirectoryToDelete, Ex.what()); + CleanWipe = false; + } + + uint64_t NowMs = Timer.GetElapsedTimeMs(); + if ((NowMs - LastUpdateTimeMs) >= (UsePlainProgress ? 5000 : 200)) + { + LastUpdateTimeMs = NowMs; + + uint64_t Deleted = DeletedItemCount.load(); + uint64_t DeletedBytes = DeletedByteCount.load(); + uint64_t Discovered = DiscoveredItemCount.load(); + Progress.UpdateState({.Task = "Cleaning folder ", + .Details = fmt::format("Found {}, Deleted {} ({})", Discovered, Deleted, NiceBytes(DeletedBytes)), + .TotalCount = Discovered, + .RemainingCount = Discovered - Deleted}, + false); } } + + Progress.Finish(); + + uint64_t ElapsedTimeMs = Timer.GetElapsedTimeMs(); + if (ElapsedTimeMs >= 200) + { + ZEN_CONSOLE("Wiped folder '{}' {} ({}) in {}", + Path, + DiscoveredItemCount.load(), + NiceBytes(DeletedByteCount.load()), + NiceTimeSpanMs(ElapsedTimeMs)); + } return CleanWipe; } std::string ReadAccessTokenFromFile(const std::filesystem::path& Path) { - if (!std::filesystem::is_regular_file(Path)) + if (!IsFile(Path)) { throw std::runtime_error(fmt::format("the file '{}' does not exist", Path)); } @@ -304,7 +553,7 @@ namespace { const IoHash& Hash, const std::string& Suffix = {}) { - std::filesystem::path TempFilePath = (TempFolderPath / (Hash.ToHexString() + Suffix)).make_preferred(); + std::filesystem::path TempFilePath = TempFolderPath / (Hash.ToHexString() + Suffix); return WriteToTempFile(std::move(Buffer), TempFilePath); } @@ -402,12 +651,12 @@ namespace { std::filesystem::path GetTempChunkedSequenceFileName(const std::filesystem::path& CacheFolderPath, const IoHash& RawHash) { - return (CacheFolderPath / (RawHash.ToHexString() + ".tmp")).make_preferred(); + return CacheFolderPath / (RawHash.ToHexString() + ".tmp"); } std::filesystem::path GetFinalChunkedSequenceFileName(const std::filesystem::path& CacheFolderPath, const IoHash& RawHash) { - return (CacheFolderPath / RawHash.ToHexString()).make_preferred(); + return CacheFolderPath / RawHash.ToHexString(); } ChunkedFolderContent ScanAndChunkFolder( @@ -425,7 +674,7 @@ namespace { Path, std::move(IsAcceptedFolder), std::move(IsAcceptedFile), - GetMediumWorkerPool(EWorkloadType::Burst), + GetIOWorkerPool(), UsePlainProgress ? 5000 : 200, [](bool, std::ptrdiff_t) {}, AbortFlag); @@ -439,7 +688,7 @@ namespace { FilteredBytesHashed.Start(); ChunkedFolderContent FolderContent = ChunkFolderContent( ChunkingStats, - GetMediumWorkerPool(EWorkloadType::Burst), + GetIOWorkerPool(), Path, Content, ChunkController, @@ -501,6 +750,7 @@ namespace { uint64_t AcceptedBlockCount = 0; uint64_t AcceptedChunkCount = 0; uint64_t AcceptedByteCount = 0; + uint64_t AcceptedRawByteCount = 0; uint64_t RejectedBlockCount = 0; uint64_t RejectedChunkCount = 0; uint64_t RejectedByteCount = 0; @@ -539,6 +789,7 @@ namespace { uint64_t ChunkCount = 0; uint64_t ChunkByteCount = 0; std::atomic<uint64_t> CompressedChunkCount = 0; + std::atomic<uint64_t> CompressedChunkRawBytes = 0; std::atomic<uint64_t> CompressedChunkBytes = 0; uint64_t CompressChunksElapsedWallTimeUS = 0; @@ -547,6 +798,7 @@ namespace { ChunkCount += Rhs.ChunkCount; ChunkByteCount += Rhs.ChunkByteCount; CompressedChunkCount += Rhs.CompressedChunkCount; + CompressedChunkRawBytes += Rhs.CompressedChunkRawBytes; CompressedChunkBytes += Rhs.CompressedChunkBytes; CompressChunksElapsedWallTimeUS += Rhs.CompressChunksElapsedWallTimeUS; return *this; @@ -603,11 +855,9 @@ namespace { struct WriteChunkStatistics { - std::atomic<uint32_t> ChunkCountWritten = 0; - std::atomic<uint64_t> ChunkBytesWritten = 0; - uint64_t DownloadTimeUs = 0; - uint64_t WriteTimeUs = 0; - uint64_t WriteChunksElapsedWallTimeUs = 0; + uint64_t DownloadTimeUs = 0; + uint64_t WriteTimeUs = 0; + uint64_t WriteChunksElapsedWallTimeUs = 0; }; struct RebuildFolderStateStatistics @@ -626,6 +876,16 @@ namespace { uint64_t VerifyElapsedWallTimeUs = 0; }; + struct StorageInstance + { + std::unique_ptr<HttpClient> BuildStorageHttp; + std::unique_ptr<BuildStorage> BuildStorage; + std::string StorageName; + std::unique_ptr<HttpClient> CacheHttp; + std::unique_ptr<BuildStorageCache> BuildCacheStorage; + std::string CacheName; + }; + std::vector<uint32_t> CalculateAbsoluteChunkOrders(const std::span<const IoHash> LocalChunkHashes, const std::span<const uint32_t> LocalChunkOrder, const tsl::robin_map<IoHash, uint32_t, IoHash::Hasher>& ChunkHashToLocalChunkIndex, @@ -772,7 +1032,7 @@ namespace { std::span<const uint32_t> ChunkCounts, std::span<const IoHash> LocalChunkHashes, std::span<const uint64_t> LocalChunkRawSizes, - std::vector<uint32_t> AbsoluteChunkOrders, + const std::vector<uint32_t>& AbsoluteChunkOrders, const std::span<const uint32_t> LooseLocalChunkIndexes, const std::span<IoHash> BlockHashes) { @@ -1352,7 +1612,15 @@ namespace { ZEN_ASSERT(false); } uint64_t RawSize = Chunk.GetSize(); - return {RawSize, CompressedBuffer::Compress(Chunk, OodleCompressor::Mermaid, OodleCompressionLevel::None)}; + if (Lookup.RawHashToSequenceIndex.contains(ChunkHash) && RawSize >= MinimumSizeForCompressInBlock) + { + // Standalone chunk, not part of a sequence + return {RawSize, CompressedBuffer::Compress(Chunk, OodleCompressor::Mermaid)}; + } + else + { + return {RawSize, CompressedBuffer::Compress(Chunk, OodleCompressor::Mermaid, OodleCompressionLevel::None)}; + } })); } @@ -1376,13 +1644,24 @@ namespace { { std::span<const ChunkedContentLookup::ChunkSequenceLocation> ChunkLocations = GetChunkSequenceLocations(Lookup, ChunkIndex); ZEN_ASSERT(!ChunkLocations.empty()); - CompositeBuffer Chunk = OpenFileCache.GetRange(ChunkLocations[0].SequenceIndex, + const IoHash& ChunkHash = Content.ChunkedContent.ChunkHashes[ChunkIndex]; + CompositeBuffer Chunk = OpenFileCache.GetRange(ChunkLocations[0].SequenceIndex, ChunkLocations[0].Offset, Content.ChunkedContent.ChunkRawSizes[ChunkIndex]); - ZEN_ASSERT_SLOW(IoHash::HashBuffer(Chunk) == Content.ChunkedContent.ChunkHashes[ChunkIndex]); - CompositeBuffer CompressedChunk = - CompressedBuffer::Compress(std::move(Chunk), OodleCompressor::Mermaid, OodleCompressionLevel::None).GetCompressed(); - ResultBuffers.insert(ResultBuffers.end(), CompressedChunk.GetSegments().begin(), CompressedChunk.GetSegments().end()); + ZEN_ASSERT_SLOW(IoHash::HashBuffer(Chunk) == ChunkHash); + + const uint64_t RawSize = Chunk.GetSize(); + if (Lookup.RawHashToSequenceIndex.contains(ChunkHash) && RawSize >= MinimumSizeForCompressInBlock) + { + CompositeBuffer CompressedChunk = CompressedBuffer::Compress(std::move(Chunk), OodleCompressor::Mermaid).GetCompressed(); + ResultBuffers.insert(ResultBuffers.end(), CompressedChunk.GetSegments().begin(), CompressedChunk.GetSegments().end()); + } + else + { + CompositeBuffer CompressedChunk = + CompressedBuffer::Compress(std::move(Chunk), OodleCompressor::Mermaid, OodleCompressionLevel::None).GetCompressed(); + ResultBuffers.insert(ResultBuffers.end(), CompressedChunk.GetSegments().begin(), CompressedChunk.GetSegments().end()); + } } return CompressedBuffer::FromCompressedNoValidate(CompositeBuffer(std::move(ResultBuffers))); }; @@ -1444,7 +1723,7 @@ namespace { for (auto& WorkItem : WorkItems) { Work.ScheduleWork( - NetworkPool, // GetSyncWorkerPool(),// + NetworkPool, [WorkItem = std::move(WorkItem)](std::atomic<bool>&) { ZEN_TRACE_CPU("DownloadLargeBlob_Work"); if (!AbortFlag) @@ -1513,15 +1792,16 @@ namespace { } ValidateStats.BlockAttachmentCount = BlockAttachments.size(); - std::vector<ChunkBlockDescription> VerifyBlockDescriptions = Storage.GetBlockMetadata(BuildId, BlockAttachments); + std::vector<ChunkBlockDescription> VerifyBlockDescriptions = + ParseChunkBlockDescriptionList(Storage.GetBlockMetadatas(BuildId, BlockAttachments)); if (VerifyBlockDescriptions.size() != BlockAttachments.size()) { throw std::runtime_error(fmt::format("Uploaded blocks metadata could not all be found, {} blocks metadata is missing", BlockAttachments.size() - VerifyBlockDescriptions.size())); } - WorkerThreadPool& NetworkPool = GetSmallWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); // - WorkerThreadPool& VerifyPool = GetMediumWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); // + WorkerThreadPool& NetworkPool = GetNetworkPool(); + WorkerThreadPool& VerifyPool = GetIOWorkerPool(); ParallellWork Work(AbortFlag); const std::filesystem::path TempFolder = ".zen-tmp"; @@ -1530,7 +1810,8 @@ namespace { auto __ = MakeGuard([&TempFolder]() { if (CleanDirectory(TempFolder, {})) { - std::filesystem::remove(TempFolder); + std::error_code DummyEc; + RemoveDir(TempFolder, DummyEc); } }); @@ -1787,7 +2068,6 @@ namespace { const ChunkedContentLookup& Lookup, uint32_t ChunkIndex, const std::filesystem::path& TempFolderPath, - std::atomic<uint64_t>& ReadRawBytes, LooseChunksStatistics& LooseChunksStats) { ZEN_TRACE_CPU("CompressChunk"); @@ -1808,11 +2088,11 @@ namespace { } ZEN_ASSERT_SLOW(IoHash::HashBuffer(RawSource) == ChunkHash); { - std::filesystem::path TempFilePath = (TempFolderPath / ChunkHash.ToHexString()).make_preferred(); + std::filesystem::path TempFilePath = TempFolderPath / ChunkHash.ToHexString(); BasicFile CompressedFile; std::error_code Ec; - CompressedFile.Open(TempFilePath, BasicFile::Mode::kTruncate, Ec); + CompressedFile.Open(TempFilePath, BasicFile::Mode::kTruncateDelete, Ec); if (Ec) { throw std::runtime_error( @@ -1823,7 +2103,7 @@ namespace { CompositeBuffer(SharedBuffer(RawSource)), [&](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& RangeBuffer) { ZEN_UNUSED(SourceOffset); - ReadRawBytes += SourceSize; + LooseChunksStats.CompressedChunkRawBytes += SourceSize; CompressedFile.Write(RangeBuffer, Offset); LooseChunksStats.CompressedChunkBytes += RangeBuffer.GetSize(); }); @@ -1850,7 +2130,7 @@ namespace { return Compressed.GetCompressed(); } CompressedFile.Close(); - std::filesystem::remove(TempFilePath, Ec); + RemoveFile(TempFilePath, Ec); ZEN_UNUSED(Ec); } @@ -1881,7 +2161,7 @@ namespace { void GenerateBuildBlocks(const std::filesystem::path& Path, const ChunkedFolderContent& Content, const ChunkedContentLookup& Lookup, - BuildStorage& Storage, + StorageInstance& Storage, const Oid& BuildId, const std::vector<std::vector<uint32_t>>& NewBlockChunks, GeneratedBlocks& OutBlocks, @@ -1904,9 +2184,8 @@ namespace { RwLock Lock; - WorkerThreadPool& GenerateBlobsPool = - GetMediumWorkerPool(EWorkloadType::Burst); // GetMediumWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool();// - WorkerThreadPool& UploadBlocksPool = GetSmallWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool();// + WorkerThreadPool& GenerateBlobsPool = GetIOWorkerPool(); + WorkerThreadPool& UploadBlocksPool = GetNetworkPool(); FilteredRate FilteredGeneratedBytesPerSecond; FilteredRate FilteredUploadedBytesPerSecond; @@ -2005,21 +2284,35 @@ namespace { const IoHash& BlockHash = OutBlocks.BlockDescriptions[BlockIndex].BlockHash; const uint64_t CompressedBlockSize = Payload.GetCompressedSize(); - Storage.PutBuildBlob(BuildId, - BlockHash, - ZenContentType::kCompressedBinary, - std::move(Payload).GetCompressed()); + if (Storage.BuildCacheStorage) + { + Storage.BuildCacheStorage->PutBuildBlob(BuildId, + BlockHash, + ZenContentType::kCompressedBinary, + Payload.GetCompressed()); + } + + Storage.BuildStorage->PutBuildBlob(BuildId, + BlockHash, + ZenContentType::kCompressedBinary, + std::move(Payload).GetCompressed()); UploadStats.BlocksBytes += CompressedBlockSize; + ZEN_CONSOLE_VERBOSE("Uploaded block {} ({}) containing {} chunks", - OutBlocks.BlockDescriptions[BlockIndex].BlockHash, + BlockHash, NiceBytes(CompressedBlockSize), OutBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size()); - Storage.PutBlockMetadata(BuildId, - OutBlocks.BlockDescriptions[BlockIndex].BlockHash, - BlockMetaData); + if (Storage.BuildCacheStorage) + { + Storage.BuildCacheStorage->PutBlobMetadatas(BuildId, + std::vector<IoHash>({BlockHash}), + std::vector<CbObject>({BlockMetaData})); + } + + Storage.BuildStorage->PutBlockMetadata(BuildId, BlockHash, BlockMetaData); ZEN_CONSOLE_VERBOSE("Uploaded block {} metadata ({})", - OutBlocks.BlockDescriptions[BlockIndex].BlockHash, + BlockHash, NiceBytes(BlockMetaData.GetSize())); OutBlocks.MetaDataHasBeenUploaded[BlockIndex] = true; @@ -2074,9 +2367,10 @@ namespace { } } - void UploadPartBlobs(BuildStorage& Storage, + void UploadPartBlobs(StorageInstance& Storage, const Oid& BuildId, const std::filesystem::path& Path, + const std::filesystem::path& ZenFolderPath, const ChunkedFolderContent& Content, const ChunkedContentLookup& Lookup, std::span<IoHash> RawHashes, @@ -2092,8 +2386,8 @@ namespace { { ProgressBar ProgressBar(UsePlainProgress); - WorkerThreadPool& ReadChunkPool = GetMediumWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); // - WorkerThreadPool& UploadChunkPool = GetSmallWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); // + WorkerThreadPool& ReadChunkPool = GetIOWorkerPool(); + WorkerThreadPool& UploadChunkPool = GetNetworkPool(); FilteredRate FilteredGenerateBlockBytesPerSecond; FilteredRate FilteredCompressedBytesPerSecond; @@ -2150,7 +2444,7 @@ namespace { if (QueuedPendingInMemoryBlocksForUpload.load() > 16) { ZEN_TRACE_CPU("AsyncUploadBlock_WriteTempBlock"); - Payload = CompositeBuffer(WriteToTempFile(std::move(Payload), Path / ZenTempBlockFolderName, BlockHash)); + Payload = CompositeBuffer(WriteToTempFile(std::move(Payload), ZenTempBlockFolderPath(ZenFolderPath), BlockHash)); IsInMemoryBlock = false; } else @@ -2177,18 +2471,26 @@ namespace { const CbObject BlockMetaData = BuildChunkBlockDescription(NewBlocks.BlockDescriptions[BlockIndex], NewBlocks.BlockMetaDatas[BlockIndex]); - Storage.PutBuildBlob(BuildId, BlockHash, ZenContentType::kCompressedBinary, Payload); + if (Storage.BuildCacheStorage) + { + Storage.BuildCacheStorage->PutBuildBlob(BuildId, BlockHash, ZenContentType::kCompressedBinary, Payload); + } + Storage.BuildStorage->PutBuildBlob(BuildId, BlockHash, ZenContentType::kCompressedBinary, Payload); ZEN_CONSOLE_VERBOSE("Uploaded block {} ({}) containing {} chunks", - NewBlocks.BlockDescriptions[BlockIndex].BlockHash, + BlockHash, NiceBytes(PayloadSize), NewBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size()); UploadedBlockSize += PayloadSize; UploadStats.BlocksBytes += PayloadSize; - Storage.PutBlockMetadata(BuildId, BlockHash, BlockMetaData); - ZEN_CONSOLE_VERBOSE("Uploaded block {} metadata ({})", - NewBlocks.BlockDescriptions[BlockIndex].BlockHash, - NiceBytes(BlockMetaData.GetSize())); + if (Storage.BuildCacheStorage) + { + Storage.BuildCacheStorage->PutBlobMetadatas(BuildId, + std::vector<IoHash>({BlockHash}), + std::vector<CbObject>({BlockMetaData})); + } + Storage.BuildStorage->PutBlockMetadata(BuildId, BlockHash, BlockMetaData); + ZEN_CONSOLE_VERBOSE("Uploaded block {} metadata ({})", BlockHash, NiceBytes(BlockMetaData.GetSize())); NewBlocks.MetaDataHasBeenUploaded[BlockIndex] = true; @@ -2214,12 +2516,17 @@ namespace { ZEN_TRACE_CPU("AsyncUploadLooseChunk"); const uint64_t PayloadSize = Payload.GetSize(); - ; + + if (Storage.BuildCacheStorage) + { + Storage.BuildCacheStorage->PutBuildBlob(BuildId, RawHash, ZenContentType::kCompressedBinary, Payload); + } + if (PayloadSize >= LargeAttachmentSize) { ZEN_TRACE_CPU("AsyncUploadLooseChunk_Multipart"); UploadStats.MultipartAttachmentCount++; - std::vector<std::function<void()>> MultipartWork = Storage.PutLargeBuildBlob( + std::vector<std::function<void()>> MultipartWork = Storage.BuildStorage->PutLargeBuildBlob( BuildId, RawHash, ZenContentType::kCompressedBinary, @@ -2264,7 +2571,7 @@ namespace { else { ZEN_TRACE_CPU("AsyncUploadLooseChunk_Singlepart"); - Storage.PutBuildBlob(BuildId, RawHash, ZenContentType::kCompressedBinary, Payload); + Storage.BuildStorage->PutBuildBlob(BuildId, RawHash, ZenContentType::kCompressedBinary, Payload); ZEN_CONSOLE_VERBOSE("Uploaded chunk {} ({})", RawHash, NiceBytes(PayloadSize)); UploadStats.ChunksBytes += Payload.GetSize(); UploadStats.ChunkCount++; @@ -2286,16 +2593,8 @@ namespace { std::atomic<uint64_t> GeneratedBlockCount = 0; std::atomic<uint64_t> GeneratedBlockByteCount = 0; - std::vector<uint32_t> CompressLooseChunkOrderIndexes; - std::atomic<uint64_t> QueuedPendingInMemoryBlocksForUpload = 0; - // Start upload of any pre-compressed loose chunks - for (const uint32_t LooseChunkOrderIndex : LooseChunkOrderIndexes) - { - CompressLooseChunkOrderIndexes.push_back(LooseChunkOrderIndex); - } - // Start generation of any non-prebuilt blocks and schedule upload for (const size_t BlockIndex : BlockIndexes) { @@ -2303,7 +2602,7 @@ namespace { if (!AbortFlag) { Work.ScheduleWork( - ReadChunkPool, // GetSyncWorkerPool() + ReadChunkPool, [&, BlockIndex](std::atomic<bool>&) { if (!AbortFlag) { @@ -2355,34 +2654,27 @@ namespace { } } - std::atomic<uint64_t> RawLooseChunkByteCount = 0; - // Start compression of any non-precompressed loose chunks and schedule upload - for (const uint32_t CompressLooseChunkOrderIndex : CompressLooseChunkOrderIndexes) + for (const uint32_t LooseChunkOrderIndex : LooseChunkOrderIndexes) { - const uint32_t ChunkIndex = LooseChunkIndexes[CompressLooseChunkOrderIndex]; + const uint32_t ChunkIndex = LooseChunkIndexes[LooseChunkOrderIndex]; Work.ScheduleWork( - ReadChunkPool, // GetSyncWorkerPool(),// ReadChunkPool, + ReadChunkPool, [&, ChunkIndex](std::atomic<bool>&) { if (!AbortFlag) { ZEN_TRACE_CPU("UploadPartBlobs_CompressChunk"); FilteredCompressedBytesPerSecond.Start(); - CompositeBuffer Payload = CompressChunk(Path, - Content, - Lookup, - ChunkIndex, - Path / ZenTempChunkFolderName, - RawLooseChunkByteCount, - LooseChunksStats); + CompositeBuffer Payload = + CompressChunk(Path, Content, Lookup, ChunkIndex, ZenTempChunkFolderPath(ZenFolderPath), LooseChunksStats); ZEN_CONSOLE_VERBOSE("Compressed chunk {} ({} -> {})", Content.ChunkedContent.ChunkHashes[ChunkIndex], NiceBytes(Content.ChunkedContent.ChunkRawSizes[ChunkIndex]), NiceBytes(Payload.GetSize())); const uint64_t ChunkRawSize = Content.ChunkedContent.ChunkRawSizes[ChunkIndex]; UploadStats.ReadFromDiskBytes += ChunkRawSize; - if (LooseChunksStats.CompressedChunkCount == CompressLooseChunkOrderIndexes.size()) + if (LooseChunksStats.CompressedChunkCount == LooseChunkOrderIndexes.size()) { FilteredCompressedBytesPerSecond.Stop(); } @@ -2397,7 +2689,7 @@ namespace { Work.Wait(UsePlainProgress ? 5000 : 200, [&](bool IsAborted, std::ptrdiff_t PendingWork) { ZEN_UNUSED(IsAborted, PendingWork); - FilteredCompressedBytesPerSecond.Update(LooseChunksStats.CompressedChunkBytes.load()); + FilteredCompressedBytesPerSecond.Update(LooseChunksStats.CompressedChunkRawBytes.load()); FilteredGenerateBlockBytesPerSecond.Update(GeneratedBlockByteCount.load()); FilteredUploadedBytesPerSecond.Update(UploadedCompressedChunkSize.load() + UploadedBlockSize.load()); uint64_t UploadedRawSize = UploadedRawChunkSize.load() + UploadedBlockSize.load(); @@ -2408,8 +2700,8 @@ namespace { "Uploaded {}/{} ({}/{}) blobs " "({} {}bits/s)", LooseChunksStats.CompressedChunkCount.load(), - CompressLooseChunkOrderIndexes.size(), - NiceBytes(RawLooseChunkByteCount), + LooseChunkOrderIndexes.size(), + NiceBytes(LooseChunksStats.CompressedChunkRawBytes), NiceBytes(TotalLooseChunksSize), NiceNum(FilteredCompressedBytesPerSecond.GetCurrent()), @@ -2538,9 +2830,10 @@ namespace { for (size_t KnownBlockIndex : ReuseBlockIndexes) { std::vector<uint32_t> FoundChunkIndexes; - size_t BlockSize = 0; - size_t AdjustedReuseSize = 0; - const ChunkBlockDescription& KnownBlock = KnownBlocks[KnownBlockIndex]; + size_t BlockSize = 0; + size_t AdjustedReuseSize = 0; + size_t AdjustedRawReuseSize = 0; + const ChunkBlockDescription& KnownBlock = KnownBlocks[KnownBlockIndex]; for (size_t BlockChunkIndex = 0; BlockChunkIndex < KnownBlock.ChunkRawHashes.size(); BlockChunkIndex++) { const IoHash& BlockChunkHash = KnownBlock.ChunkRawHashes[BlockChunkIndex]; @@ -2553,6 +2846,7 @@ namespace { { FoundChunkIndexes.push_back(ChunkIndex); AdjustedReuseSize += KnownBlock.ChunkCompressedLengths[BlockChunkIndex]; + AdjustedRawReuseSize += KnownBlock.ChunkRawLengths[BlockChunkIndex]; } } } @@ -2573,6 +2867,7 @@ namespace { } FindBlocksStats.AcceptedChunkCount += FoundChunkIndexes.size(); FindBlocksStats.AcceptedByteCount += AdjustedReuseSize; + FindBlocksStats.AcceptedRawByteCount += AdjustedRawReuseSize; FindBlocksStats.AcceptedReduntantChunkCount += KnownBlock.ChunkRawHashes.size() - FoundChunkIndexes.size(); FindBlocksStats.AcceptedReduntantByteCount += BlockSize - AdjustedReuseSize; } @@ -2598,12 +2893,14 @@ namespace { return FilteredReuseBlockIndexes; }; - void UploadFolder(BuildStorage& Storage, + void UploadFolder(StorageInstance& Storage, const Oid& BuildId, const Oid& BuildPartId, const std::string_view BuildPartName, const std::filesystem::path& Path, + const std::filesystem::path& ZenFolderPath, const std::filesystem::path& ManifestPath, + const uint64_t FindBlockMaxCount, const uint8_t BlockReuseMinPercentLimit, bool AllowMultiparts, const CbObject& MetaData, @@ -2613,17 +2910,18 @@ namespace { { Stopwatch ProcessTimer; - const std::filesystem::path ZenTempFolder = Path / ZenTempFolderName; + const std::filesystem::path ZenTempFolder = ZenTempFolderPath(ZenFolderPath); CreateDirectories(ZenTempFolder); CleanDirectory(ZenTempFolder, {}); auto _ = MakeGuard([&]() { if (CleanDirectory(ZenTempFolder, {})) { - std::filesystem::remove(ZenTempFolder); + std::error_code DummyEc; + RemoveDir(ZenTempFolder, DummyEc); } }); - CreateDirectories(Path / ZenTempBlockFolderName); - CreateDirectories(Path / ZenTempChunkFolderName); + CreateDirectories(ZenTempBlockFolderPath(ZenFolderPath)); + CreateDirectories(ZenTempChunkFolderPath(ZenFolderPath)); std::uint64_t TotalRawSize = 0; @@ -2641,54 +2939,52 @@ namespace { FindBlocksStatistics FindBlocksStats; - std::future<PrepareBuildResult> PrepBuildResultFuture = - GetSmallWorkerPool(EWorkloadType::Burst) - .EnqueueTask(std::packaged_task<PrepareBuildResult()>{ - [&Storage, BuildId, &MetaData, CreateBuild, AllowMultiparts, IgnoreExistingBlocks, &FindBlocksStats] { - ZEN_TRACE_CPU("PrepareBuild"); + std::future<PrepareBuildResult> PrepBuildResultFuture = GetNetworkPool().EnqueueTask(std::packaged_task<PrepareBuildResult()>{ + [&Storage, BuildId, FindBlockMaxCount, &MetaData, CreateBuild, AllowMultiparts, IgnoreExistingBlocks, &FindBlocksStats] { + ZEN_TRACE_CPU("PrepareBuild"); - PrepareBuildResult Result; - Stopwatch Timer; - if (CreateBuild) - { - ZEN_TRACE_CPU("CreateBuild"); + PrepareBuildResult Result; + Stopwatch Timer; + if (CreateBuild) + { + ZEN_TRACE_CPU("CreateBuild"); - Stopwatch PutBuildTimer; - CbObject PutBuildResult = Storage.PutBuild(BuildId, MetaData); - Result.PrepareBuildTimeMs = PutBuildTimer.GetElapsedTimeMs(); - Result.PreferredMultipartChunkSize = PutBuildResult["chunkSize"sv].AsUInt64(Result.PreferredMultipartChunkSize); - Result.PayloadSize = MetaData.GetSize(); - } - else - { - ZEN_TRACE_CPU("PutBuild"); - Stopwatch GetBuildTimer; - CbObject Build = Storage.GetBuild(BuildId); - Result.PrepareBuildTimeMs = GetBuildTimer.GetElapsedTimeMs(); - Result.PayloadSize = Build.GetSize(); - if (auto ChunkSize = Build["chunkSize"sv].AsUInt64(); ChunkSize != 0) - { - Result.PreferredMultipartChunkSize = ChunkSize; - } - else if (AllowMultiparts) - { - ZEN_WARN("PreferredMultipartChunkSize is unknown. Defaulting to '{}'", - NiceBytes(Result.PreferredMultipartChunkSize)); - } - } + Stopwatch PutBuildTimer; + CbObject PutBuildResult = Storage.BuildStorage->PutBuild(BuildId, MetaData); + Result.PrepareBuildTimeMs = PutBuildTimer.GetElapsedTimeMs(); + Result.PreferredMultipartChunkSize = PutBuildResult["chunkSize"sv].AsUInt64(Result.PreferredMultipartChunkSize); + Result.PayloadSize = MetaData.GetSize(); + } + else + { + ZEN_TRACE_CPU("PutBuild"); + Stopwatch GetBuildTimer; + CbObject Build = Storage.BuildStorage->GetBuild(BuildId); + Result.PrepareBuildTimeMs = GetBuildTimer.GetElapsedTimeMs(); + Result.PayloadSize = Build.GetSize(); + if (auto ChunkSize = Build["chunkSize"sv].AsUInt64(); ChunkSize != 0) + { + Result.PreferredMultipartChunkSize = ChunkSize; + } + else if (AllowMultiparts) + { + ZEN_WARN("PreferredMultipartChunkSize is unknown. Defaulting to '{}'", + NiceBytes(Result.PreferredMultipartChunkSize)); + } + } - if (!IgnoreExistingBlocks) - { - ZEN_TRACE_CPU("FindBlocks"); - Stopwatch KnownBlocksTimer; - Result.KnownBlocks = Storage.FindBlocks(BuildId); - FindBlocksStats.FindBlockTimeMS = KnownBlocksTimer.GetElapsedTimeMs(); - FindBlocksStats.FoundBlockCount = Result.KnownBlocks.size(); - Result.FindBlocksTimeMs = KnownBlocksTimer.GetElapsedTimeMs(); - } - Result.ElapsedTimeMs = Timer.GetElapsedTimeMs(); - return Result; - }}); + if (!IgnoreExistingBlocks) + { + ZEN_TRACE_CPU("FindBlocks"); + Stopwatch KnownBlocksTimer; + Result.KnownBlocks = ParseChunkBlockDescriptionList(Storage.BuildStorage->FindBlocks(BuildId, FindBlockMaxCount)); + FindBlocksStats.FindBlockTimeMS = KnownBlocksTimer.GetElapsedTimeMs(); + FindBlocksStats.FoundBlockCount = Result.KnownBlocks.size(); + Result.FindBlocksTimeMs = KnownBlocksTimer.GetElapsedTimeMs(); + } + Result.ElapsedTimeMs = Timer.GetElapsedTimeMs(); + return Result; + }}); ChunkedFolderContent LocalContent; @@ -2767,7 +3063,7 @@ namespace { { std::filesystem::path ExcludeManifestPath = Path / ZenExcludeManifestName; tsl::robin_set<std::string> ExcludeAssetPaths; - if (std::filesystem::is_regular_file(ExcludeManifestPath)) + if (IsFile(ExcludeManifestPath)) { std::vector<std::filesystem::path> AssetPaths = ParseManifest(Path, ExcludeManifestPath); ExcludeAssetPaths.reserve(AssetPaths.size()); @@ -2796,10 +3092,10 @@ namespace { } return true; }, - GetMediumWorkerPool(EWorkloadType::Burst), + GetIOWorkerPool(), UsePlainProgress ? 5000 : 200, [&](bool, std::ptrdiff_t) { - ZEN_DEBUG("Found {} files in '{}'...", LocalFolderScanStats.AcceptedFileCount.load(), Path); + ZEN_CONSOLE_VERBOSE("Found {} files in '{}'...", LocalFolderScanStats.AcceptedFileCount.load(), Path); }, AbortFlag); } @@ -2810,12 +3106,13 @@ namespace { for (const std::filesystem::path& AssetPath : AssetPaths) { Content.Paths.push_back(AssetPath); - Content.RawSizes.push_back(std::filesystem::file_size(Path / AssetPath)); + const std::filesystem::path AssetFilePath = (Path / AssetPath).make_preferred(); + Content.RawSizes.push_back(FileSizeFromPath(AssetFilePath)); #if ZEN_PLATFORM_WINDOWS - Content.Attributes.push_back(GetFileAttributes(Path / AssetPath)); + Content.Attributes.push_back(GetFileAttributes(AssetFilePath)); #endif // ZEN_PLATFORM_WINDOWS #if ZEN_PLATFORM_MAC || ZEN_PLATFORM_LINUX - Content.Attributes.push_back(GetFileMode(Path / AssetPath)); + Content.Attributes.push_back(GetFileMode(AssetFilePath)); #endif // ZEN_PLATFORM_MAC || ZEN_PLATFORM_LINUX LocalFolderScanStats.AcceptedFileByteCount += Content.RawSizes.back(); LocalFolderScanStats.AcceptedFileCount++; @@ -2823,12 +3120,13 @@ namespace { if (ManifestPath.is_relative()) { Content.Paths.push_back(ManifestPath); - Content.RawSizes.push_back(std::filesystem::file_size(ManifestPath)); + const std::filesystem::path ManifestFilePath = (Path / ManifestPath).make_preferred(); + Content.RawSizes.push_back(FileSizeFromPath(ManifestFilePath)); #if ZEN_PLATFORM_WINDOWS - Content.Attributes.push_back(GetFileAttributes(ManifestPath)); + Content.Attributes.push_back(GetFileAttributes(ManifestFilePath)); #endif // ZEN_PLATFORM_WINDOWS #if ZEN_PLATFORM_MAC || ZEN_PLATFORM_LINUX - Content.Attributes.push_back(GetFileMode(ManifestPath)); + Content.Attributes.push_back(GetFileMode(ManifestFilePath)); #endif // ZEN_PLATFORM_MAC || ZEN_PLATFORM_LINUX LocalFolderScanStats.AcceptedFileByteCount += Content.RawSizes.back(); @@ -2855,7 +3153,7 @@ namespace { FilteredBytesHashed.Start(); LocalContent = ChunkFolderContent( ChunkingStats, - GetMediumWorkerPool(EWorkloadType::Burst), + GetIOWorkerPool(), Path, Content, *ChunkController, @@ -2976,16 +3274,17 @@ namespace { } FindBlocksStats.NewBlocksChunkCount = NewBlockChunkIndexes.size(); - const double AcceptedByteCountPercent = FindBlocksStats.PotentialChunkByteCount > 0 - ? (100.0 * FindBlocksStats.AcceptedByteCount / FindBlocksStats.PotentialChunkByteCount) - : 0.0; + const double AcceptedByteCountPercent = + FindBlocksStats.PotentialChunkByteCount > 0 + ? (100.0 * FindBlocksStats.AcceptedRawByteCount / FindBlocksStats.PotentialChunkByteCount) + : 0.0; const double AcceptedReduntantByteCountPercent = FindBlocksStats.AcceptedByteCount > 0 ? (100.0 * FindBlocksStats.AcceptedReduntantByteCount) / (FindBlocksStats.AcceptedByteCount + FindBlocksStats.AcceptedReduntantByteCount) : 0.0; ZEN_CONSOLE( - "Found {} chunks in {} ({}) blocks eligeble for reuse in {}\n" + "Found {} chunks in {} ({}) blocks eligible for reuse in {}\n" " Reusing {} ({}) matching chunks in {} blocks ({:.1f}%)\n" " Accepting {} ({}) redundant chunks ({:.1f}%)\n" " Rejected {} ({}) chunks in {} blocks\n" @@ -2998,7 +3297,7 @@ namespace { NiceTimeSpanMs(FindBlocksStats.FindBlockTimeMS), FindBlocksStats.AcceptedChunkCount, - NiceBytes(FindBlocksStats.AcceptedByteCount), + NiceBytes(FindBlocksStats.AcceptedRawByteCount), FindBlocksStats.AcceptedBlockCount, AcceptedByteCountPercent, @@ -3204,7 +3503,8 @@ namespace { } Stopwatch PutBuildPartResultTimer; - std::pair<IoHash, std::vector<IoHash>> PutBuildPartResult = Storage.PutBuildPart(BuildId, BuildPartId, BuildPartName, PartManifest); + std::pair<IoHash, std::vector<IoHash>> PutBuildPartResult = + Storage.BuildStorage->PutBuildPart(BuildId, BuildPartId, BuildPartName, PartManifest); ZEN_CONSOLE("PutBuildPart took {}, payload size {}. {} attachments are needed.", NiceTimeSpanMs(PutBuildPartResultTimer.GetElapsedTimeMs()), NiceBytes(PartManifest.GetSize()), @@ -3231,8 +3531,8 @@ namespace { TempLooseChunksStats.CompressedChunkCount.load(), NiceBytes(TempLooseChunksStats.CompressedChunkBytes.load()), - NiceNum(GetBytesPerSecond(TempLooseChunksStats.CompressChunksElapsedWallTimeUS, - TempLooseChunksStats.CompressedChunkBytes)), + NiceNum( + GetBytesPerSecond(TempLooseChunksStats.CompressChunksElapsedWallTimeUS, TempLooseChunksStats.ChunkByteCount)), TempUploadStats.ChunkCount.load(), NiceBytes(TempUploadStats.ChunksBytes), @@ -3243,6 +3543,7 @@ namespace { UploadPartBlobs(Storage, BuildId, Path, + ZenFolderPath, LocalContent, LocalLookup, RawHashes, @@ -3289,7 +3590,7 @@ namespace { while (!AbortFlag) { Stopwatch FinalizeBuildPartTimer; - std::vector<IoHash> Needs = Storage.FinalizeBuildPart(BuildId, BuildPartId, PartHash); + std::vector<IoHash> Needs = Storage.BuildStorage->FinalizeBuildPart(BuildId, BuildPartId, PartHash); ZEN_CONSOLE("FinalizeBuildPart took {}. {} attachments are missing.", NiceTimeSpanMs(FinalizeBuildPartTimer.GetElapsedTimeMs()), Needs.size()); @@ -3304,7 +3605,7 @@ namespace { if (CreateBuild && !AbortFlag) { Stopwatch FinalizeBuildTimer; - Storage.FinalizeBuild(BuildId); + Storage.BuildStorage->FinalizeBuild(BuildId); ZEN_CONSOLE("FinalizeBuild took {}", NiceTimeSpanMs(FinalizeBuildTimer.GetElapsedTimeMs())); } @@ -3321,7 +3622,13 @@ namespace { { const CbObject BlockMetaData = BuildChunkBlockDescription(NewBlocks.BlockDescriptions[BlockIndex], NewBlocks.BlockMetaDatas[BlockIndex]); - Storage.PutBlockMetadata(BuildId, BlockHash, BlockMetaData); + if (Storage.BuildCacheStorage) + { + Storage.BuildCacheStorage->PutBlobMetadatas(BuildId, + std::vector<IoHash>({BlockHash}), + std::vector<CbObject>({BlockMetaData})); + } + Storage.BuildStorage->PutBlockMetadata(BuildId, BlockHash, BlockMetaData); UploadStats.BlocksBytes += BlockMetaData.GetSize(); NewBlocks.MetaDataHasBeenUploaded[BlockIndex] = true; UploadBlockMetadataCount++; @@ -3340,7 +3647,7 @@ namespace { DownloadStatistics ValidateDownloadStats; if (PostUploadVerify && !AbortFlag) { - ValidateBuildPart(Storage, BuildId, BuildPartId, BuildPartName, ValidateStats, ValidateDownloadStats); + ValidateBuildPart(*Storage.BuildStorage, BuildId, BuildPartId, BuildPartName, ValidateStats, ValidateDownloadStats); } ZEN_CONSOLE_VERBOSE( @@ -3384,6 +3691,7 @@ namespace { "\n AcceptedBlockCount: {}" "\n AcceptedChunkCount: {}" "\n AcceptedByteCount: {}" + "\n AcceptedRawByteCount: {}" "\n RejectedBlockCount: {}" "\n RejectedChunkCount: {}" "\n RejectedByteCount: {}" @@ -3401,6 +3709,7 @@ namespace { FindBlocksStats.AcceptedBlockCount, FindBlocksStats.AcceptedChunkCount, NiceBytes(FindBlocksStats.AcceptedByteCount), + NiceBytes(FindBlocksStats.AcceptedRawByteCount), FindBlocksStats.RejectedBlockCount, FindBlocksStats.RejectedChunkCount, NiceBytes(FindBlocksStats.RejectedByteCount), @@ -3570,13 +3879,13 @@ namespace { ValidateInfo); - Storage.PutBuildPartStats( + Storage.BuildStorage->PutBuildPartStats( BuildId, BuildPartId, {{"totalSize", double(LocalFolderScanStats.FoundFileByteCount.load())}, {"reusedRatio", AcceptedByteCountPercent / 100.0}, {"reusedBlockCount", double(FindBlocksStats.AcceptedBlockCount)}, - {"reusedBlockByteCount", double(FindBlocksStats.AcceptedByteCount)}, + {"reusedBlockByteCount", double(FindBlocksStats.AcceptedRawByteCount)}, {"newBlockCount", double(FindBlocksStats.NewBlocksCount)}, {"newBlockByteCount", double(FindBlocksStats.NewBlocksChunkByteCount)}, {"uploadedCount", double(UploadStats.BlockCount.load() + UploadStats.ChunkCount.load())}, @@ -3597,7 +3906,7 @@ namespace { ProgressBar ProgressBar(UsePlainProgress); - WorkerThreadPool& VerifyPool = GetMediumWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); // + WorkerThreadPool& VerifyPool = GetIOWorkerPool(); ParallellWork Work(AbortFlag); @@ -3646,7 +3955,7 @@ namespace { if (IsAcceptedFolder(TargetPath.parent_path().generic_string())) { const uint64_t ExpectedSize = Content.RawSizes[PathIndex]; - if (!std::filesystem::exists(TargetPath)) + if (!IsFile(TargetPath)) { ErrorLock.WithExclusiveLock([&]() { Errors.push_back(fmt::format("File {} with expected size {} does not exist", TargetPath, ExpectedSize)); @@ -3656,7 +3965,7 @@ namespace { else { std::error_code Ec; - uint64_t SizeOnDisk = gsl::narrow<uint64_t>(std::filesystem::file_size(TargetPath, Ec)); + uint64_t SizeOnDisk = gsl::narrow<uint64_t>(FileSizeFromPath(TargetPath, Ec)); if (Ec) { ErrorLock.WithExclusiveLock([&]() { @@ -3873,12 +4182,34 @@ namespace { return ChunkTargetPtrs; }; + uint64_t GetChunkWriteCount(std::span<const std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters, + const ChunkedContentLookup& Lookup, + uint32_t ChunkIndex) + { + uint64_t WriteCount = 0; + std::span<const ChunkedContentLookup::ChunkSequenceLocation> ChunkSources = GetChunkSequenceLocations(Lookup, ChunkIndex); + for (const ChunkedContentLookup::ChunkSequenceLocation& Source : ChunkSources) + { + if (SequenceIndexChunksLeftToWriteCounters[Source.SequenceIndex].load() > 0) + { + WriteCount++; + } + } + return WriteCount; + }; + void FinalizeChunkSequence(const std::filesystem::path& TargetFolder, const IoHash& SequenceRawHash) { ZEN_TRACE_CPU("FinalizeChunkSequence"); - ZEN_ASSERT_SLOW(!std::filesystem::exists(GetFinalChunkedSequenceFileName(TargetFolder, SequenceRawHash))); - std::filesystem::rename(GetTempChunkedSequenceFileName(TargetFolder, SequenceRawHash), - GetFinalChunkedSequenceFileName(TargetFolder, SequenceRawHash)); + ZEN_ASSERT_SLOW(!IsFile(GetFinalChunkedSequenceFileName(TargetFolder, SequenceRawHash))); + std::error_code Ec; + RenameFile(GetTempChunkedSequenceFileName(TargetFolder, SequenceRawHash), + GetFinalChunkedSequenceFileName(TargetFolder, SequenceRawHash), + Ec); + if (Ec) + { + throw std::system_error(Ec); + } } void FinalizeChunkSequences(const std::filesystem::path& TargetFolder, @@ -3892,8 +4223,39 @@ namespace { } } + void VerifySequence(const std::filesystem::path& TargetFolder, + const ChunkedFolderContent& RemoteContent, + const ChunkedContentLookup& Lookup, + uint32_t RemoteSequenceIndex) + { + ZEN_TRACE_CPU("VerifySequence"); + const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; + { + ZEN_TRACE_CPU("HashSequence"); + const std::uint32_t RemotePathIndex = Lookup.SequenceIndexFirstPathIndex[RemoteSequenceIndex]; + const uint64_t ExpectedSize = RemoteContent.RawSizes[RemotePathIndex]; + IoBuffer VerifyBuffer = IoBufferBuilder::MakeFromFile(GetTempChunkedSequenceFileName(TargetFolder, SequenceRawHash)); + const uint64_t VerifySize = VerifyBuffer.GetSize(); + if (VerifySize != ExpectedSize) + { + throw std::runtime_error(fmt::format("Written chunk sequence {} size {} does not match expected size {}", + SequenceRawHash, + VerifySize, + ExpectedSize)); + } + ZEN_TRACE_CPU("HashSequence"); + const IoHash VerifyChunkHash = IoHash::HashBuffer(std::move(VerifyBuffer)); + if (VerifyChunkHash != SequenceRawHash) + { + throw std::runtime_error( + fmt::format("Written chunk sequence {} hash does not match expected hash {}", VerifyChunkHash, SequenceRawHash)); + } + } + } + void VerifyAndCompleteChunkSequencesAsync(const std::filesystem::path& TargetFolder, const ChunkedFolderContent& RemoteContent, + const ChunkedContentLookup& Lookup, std::span<const uint32_t> RemoteSequenceIndexes, ParallellWork& Work, WorkerThreadPool& VerifyPool) @@ -3908,46 +4270,33 @@ namespace { const uint32_t RemoteSequenceIndex = RemoteSequenceIndexes[RemoteSequenceIndexOffset]; Work.ScheduleWork( VerifyPool, - [&RemoteContent, TargetFolder, RemoteSequenceIndex](std::atomic<bool>&) { + [&RemoteContent, &Lookup, TargetFolder, RemoteSequenceIndex](std::atomic<bool>&) { if (!AbortFlag) { ZEN_TRACE_CPU("VerifyAndCompleteChunkSequenceAsync"); - const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; + VerifySequence(TargetFolder, RemoteContent, Lookup, RemoteSequenceIndex); + if (!AbortFlag) { - ZEN_TRACE_CPU("HashSequence"); - const IoHash VerifyChunkHash = IoHash::HashBuffer( - IoBufferBuilder::MakeFromFile(GetTempChunkedSequenceFileName(TargetFolder, SequenceRawHash))); - if (VerifyChunkHash != SequenceRawHash) - { - throw std::runtime_error(fmt::format("Written chunk sequence {} hash does not match expected hash {}", - VerifyChunkHash, - SequenceRawHash)); - } + const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; + FinalizeChunkSequence(TargetFolder, SequenceRawHash); } - FinalizeChunkSequence(TargetFolder, SequenceRawHash); } }, Work.DefaultErrorFunction()); } const uint32_t RemoteSequenceIndex = RemoteSequenceIndexes[0]; + VerifySequence(TargetFolder, RemoteContent, Lookup, RemoteSequenceIndex); const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; - { - ZEN_TRACE_CPU("HashSequence"); - const IoHash VerifyChunkHash = - IoHash::HashBuffer(IoBufferBuilder::MakeFromFile(GetTempChunkedSequenceFileName(TargetFolder, SequenceRawHash))); - if (VerifyChunkHash != SequenceRawHash) - { - throw std::runtime_error( - fmt::format("Written chunk sequence {} hash does not match expected hash {}", VerifyChunkHash, SequenceRawHash)); - } - } FinalizeChunkSequence(TargetFolder, SequenceRawHash); } bool CompleteSequenceChunk(uint32_t RemoteSequenceIndex, std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters) { - return SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].fetch_sub(1) == 1; + uint32_t PreviousValue = SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].fetch_sub(1); + ZEN_ASSERT(PreviousValue >= 1); + ZEN_ASSERT(PreviousValue != (uint32_t)-1); + return PreviousValue == 1; } std::vector<uint32_t> CompleteChunkTargets(const std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>& ChunkTargetPtrs, @@ -3985,8 +4334,7 @@ namespace { const BlockWriteOps& Ops, ParallellWork& Work, WorkerThreadPool& VerifyPool, - DiskStatistics& DiskStats, - WriteChunkStatistics& WriteChunkStats) + DiskStatistics& DiskStats) { ZEN_TRACE_CPU("WriteBlockChunkOps"); { @@ -4017,12 +4365,6 @@ namespace { FileOffset, RemoteContent.RawSizes[PathIndex]); } - WriteChunkStats.ChunkCountWritten += gsl::narrow<uint32_t>(Ops.ChunkBuffers.size()); - WriteChunkStats.ChunkBytesWritten += - std::accumulate(Ops.ChunkBuffers.begin(), - Ops.ChunkBuffers.end(), - uint64_t(0), - [](uint64_t Current, const CompositeBuffer& Buffer) -> uint64_t { return Current + Buffer.GetSize(); }); } if (!AbortFlag) { @@ -4036,7 +4378,7 @@ namespace { CompletedChunkSequences.push_back(RemoteSequenceIndex); } } - VerifyAndCompleteChunkSequencesAsync(CacheFolderPath, RemoteContent, CompletedChunkSequences, Work, VerifyPool); + VerifyAndCompleteChunkSequencesAsync(CacheFolderPath, RemoteContent, Lookup, CompletedChunkSequences, Work, VerifyPool); } } @@ -4117,11 +4459,36 @@ namespace { bool NeedsWrite = true; if (RemoteChunkIndexNeedsCopyFromSourceFlags[ChunkIndex].compare_exchange_strong(NeedsWrite, false)) { - MemoryView ChunkMemoryView = BlockView.Mid(OffsetInBlock + CompressedBuffer::GetHeaderSizeForNoneEncoder(), - ChunkCompressedSize - CompressedBuffer::GetHeaderSizeForNoneEncoder()); - IoBuffer Decompressed(IoBuffer::Wrap, ChunkMemoryView.GetData(), ChunkMemoryView.GetSize()); - ZEN_ASSERT_SLOW(ChunkHash == IoHash::HashBuffer(Decompressed)); + MemoryView ChunkMemoryView = BlockView.Mid(OffsetInBlock, ChunkCompressedSize); + IoHash VerifyChunkHash; + uint64_t VerifyChunkSize; + CompressedBuffer CompressedChunk = + CompressedBuffer::FromCompressed(SharedBuffer::MakeView(ChunkMemoryView), VerifyChunkHash, VerifyChunkSize); + ZEN_ASSERT(CompressedChunk); + ZEN_ASSERT(VerifyChunkHash == ChunkHash); + ZEN_ASSERT(VerifyChunkSize == RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex]); + + OodleCompressor ChunkCompressor; + OodleCompressionLevel ChunkCompressionLevel; + uint64_t ChunkBlockSize; + + bool GetCompressParametersSuccess = + CompressedChunk.TryGetCompressParameters(ChunkCompressor, ChunkCompressionLevel, ChunkBlockSize); + ZEN_ASSERT(GetCompressParametersSuccess); + + IoBuffer Decompressed; + if (ChunkCompressionLevel == OodleCompressionLevel::None) + { + MemoryView ChunkDecompressedMemoryView = ChunkMemoryView.Mid(CompressedBuffer::GetHeaderSizeForNoneEncoder()); + Decompressed = + IoBuffer(IoBuffer::Wrap, ChunkDecompressedMemoryView.GetData(), ChunkDecompressedMemoryView.GetSize()); + } + else + { + Decompressed = CompressedChunk.Decompress().AsIoBuffer(); + } ZEN_ASSERT(Decompressed.GetSize() == RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex]); + ZEN_ASSERT_SLOW(ChunkHash == IoHash::HashBuffer(Decompressed)); for (const ChunkedContentLookup::ChunkSequenceLocation* Target : ChunkTargetPtrs) { OutOps.WriteOps.push_back( @@ -4162,8 +4529,7 @@ namespace { CompositeBuffer&& BlockBuffer, const ChunkedContentLookup& Lookup, std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags, - DiskStatistics& DiskStats, - WriteChunkStatistics& WriteChunkStats) + DiskStatistics& DiskStats) { ZEN_TRACE_CPU("WriteBlockToDisk"); @@ -4197,8 +4563,7 @@ namespace { Ops, Work, VerifyPool, - DiskStats, - WriteChunkStats); + DiskStats); return true; } return false; @@ -4222,8 +4587,7 @@ namespace { Ops, Work, VerifyPool, - DiskStats, - WriteChunkStats); + DiskStats); return true; } return false; @@ -4240,8 +4604,7 @@ namespace { uint32_t LastIncludedBlockChunkIndex, const ChunkedContentLookup& Lookup, std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags, - DiskStatistics& DiskStats, - WriteChunkStatistics& WriteChunkStats) + DiskStatistics& DiskStats) { ZEN_TRACE_CPU("WritePartialBlockToDisk"); @@ -4267,8 +4630,7 @@ namespace { Ops, Work, VerifyPool, - DiskStats, - WriteChunkStats); + DiskStats); return true; } else @@ -4355,8 +4717,7 @@ namespace { void StreamDecompress(const std::filesystem::path& CacheFolderPath, const IoHash& SequenceRawHash, CompositeBuffer&& CompressedPart, - DiskStatistics& DiskStats, - WriteChunkStatistics& WriteChunkStats) + DiskStatistics& DiskStats) { ZEN_TRACE_CPU("StreamDecompress"); const std::filesystem::path TempChunkSequenceFileName = GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash); @@ -4390,7 +4751,6 @@ namespace { DiskStats.ReadByteCount += SourceSize; if (!AbortFlag) { - WriteChunkStats.ChunkBytesWritten += RangeBuffer.GetSize(); DecompressedTemp.Write(RangeBuffer, Offset); for (const SharedBuffer& Segment : RangeBuffer.GetSegments()) { @@ -4424,7 +4784,7 @@ namespace { throw std::runtime_error( fmt::format("Failed moving temporary file for decompressing large blob {}. Reason: {}", SequenceRawHash, Ec.message())); } - WriteChunkStats.ChunkCountWritten++; + // WriteChunkStats.ChunkCountWritten++; } bool WriteCompressedChunk(const std::filesystem::path& TargetFolder, @@ -4433,8 +4793,7 @@ namespace { const IoHash& ChunkHash, const std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>& ChunkTargetPtrs, IoBuffer&& CompressedPart, - DiskStatistics& DiskStats, - WriteChunkStatistics& WriteChunkStats) + DiskStatistics& DiskStats) { auto ChunkHashToChunkIndexIt = RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash); ZEN_ASSERT(ChunkHashToChunkIndexIt != RemoteLookup.ChunkHashToChunkIndex.end()); @@ -4444,7 +4803,7 @@ namespace { { const std::uint32_t SequenceIndex = ChunkTargetPtrs.front()->SequenceIndex; const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]; - StreamDecompress(TargetFolder, SequenceRawHash, CompositeBuffer(std::move(CompressedPart)), DiskStats, WriteChunkStats); + StreamDecompress(TargetFolder, SequenceRawHash, CompositeBuffer(std::move(CompressedPart)), DiskStats); } else { @@ -4459,15 +4818,13 @@ namespace { ChunkTargetPtrs, CompositeBuffer(std::move(Chunk)), OpenFileCache); - WriteChunkStats.ChunkCountWritten++; - WriteChunkStats.ChunkBytesWritten += ChunkRawSize; return true; } } return false; } - void AsyncWriteDownloadedChunk(const std::filesystem::path& Path, + void AsyncWriteDownloadedChunk(const std::filesystem::path& ZenFolderPath, const ChunkedFolderContent& RemoteContent, const ChunkedContentLookup& RemoteLookup, uint32_t RemoteChunkIndex, @@ -4479,8 +4836,7 @@ namespace { std::atomic<uint64_t>& WritePartsComplete, const uint64_t TotalPartWriteCount, FilteredRate& FilteredWrittenBytesPerSecond, - DiskStatistics& DiskStats, - WriteChunkStatistics& WriteChunkStats) + DiskStatistics& DiskStats) { ZEN_TRACE_CPU("AsyncWriteDownloadedChunk"); @@ -4502,8 +4858,8 @@ namespace { { Payload.SetDeleteOnClose(false); Payload = {}; - CompressedChunkPath = Path / ZenTempDownloadFolderName / ChunkHash.ToHexString(); - std::filesystem::rename(TempBlobPath, CompressedChunkPath, Ec); + CompressedChunkPath = ZenTempDownloadFolderPath(ZenFolderPath) / ChunkHash.ToHexString(); + RenameFile(TempBlobPath, CompressedChunkPath, Ec); if (Ec) { CompressedChunkPath = std::filesystem::path{}; @@ -4521,13 +4877,13 @@ namespace { { ZEN_TRACE_CPU("WriteTempChunk"); // Could not be moved and rather large, lets store it on disk - CompressedChunkPath = Path / ZenTempDownloadFolderName / ChunkHash.ToHexString(); + CompressedChunkPath = ZenTempDownloadFolderPath(ZenFolderPath) / ChunkHash.ToHexString(); TemporaryFile::SafeWriteFile(CompressedChunkPath, Payload); Payload = {}; } Work.ScheduleWork( - WritePool, // GetSyncWorkerPool(),// + WritePool, [&, SequenceIndexChunksLeftToWriteCounters, CompressedChunkPath, @@ -4557,7 +4913,7 @@ namespace { } } - std::filesystem::path TargetFolder = Path / ZenTempCacheFolderName; + std::filesystem::path TargetFolder = ZenTempCacheFolderPath(ZenFolderPath); bool NeedHashVerify = WriteCompressedChunk(TargetFolder, RemoteContent, @@ -4565,8 +4921,7 @@ namespace { ChunkHash, ChunkTargetPtrs, std::move(CompressedPart), - DiskStats, - WriteChunkStats); + DiskStats); if (!AbortFlag) { WritePartsComplete++; @@ -4575,13 +4930,18 @@ namespace { FilteredWrittenBytesPerSecond.Stop(); } - std::filesystem::remove(CompressedChunkPath); + RemoveFile(CompressedChunkPath); std::vector<uint32_t> CompletedSequences = CompleteChunkTargets(ChunkTargetPtrs, SequenceIndexChunksLeftToWriteCounters); if (NeedHashVerify) { - VerifyAndCompleteChunkSequencesAsync(TargetFolder, RemoteContent, CompletedSequences, Work, WritePool); + VerifyAndCompleteChunkSequencesAsync(TargetFolder, + RemoteContent, + RemoteLookup, + CompletedSequences, + Work, + WritePool); } else { @@ -4593,9 +4953,10 @@ namespace { Work.DefaultErrorFunction()); }; - void UpdateFolder(BuildStorage& Storage, + void UpdateFolder(StorageInstance& Storage, const Oid& BuildId, const std::filesystem::path& Path, + const std::filesystem::path& ZenFolderPath, const std::uint64_t LargeAttachmentSize, const std::uint64_t PreferredMultipartChunkSize, const ChunkedFolderContent& LocalContent, @@ -4604,6 +4965,7 @@ namespace { const std::vector<IoHash>& LooseChunkHashes, bool AllowPartialBlockRequests, bool WipeTargetFolder, + bool PrimeCacheOnly, FolderContent& OutLocalFolderState, DiskStatistics& DiskStats, CacheMappingStatistics& CacheMappingStats, @@ -4613,7 +4975,7 @@ namespace { { ZEN_TRACE_CPU("UpdateFolder"); - ZEN_UNUSED(WipeTargetFolder); + ZEN_ASSERT((!PrimeCacheOnly) || (PrimeCacheOnly && (!AllowPartialBlockRequests))); Stopwatch IndexTimer; @@ -4623,7 +4985,7 @@ namespace { ZEN_CONSOLE("Indexed local and remote content in {}", NiceTimeSpanMs(IndexTimer.GetElapsedTimeMs())); - const std::filesystem::path CacheFolderPath = Path / ZenTempCacheFolderName; + const std::filesystem::path CacheFolderPath = ZenTempCacheFolderPath(ZenFolderPath); Stopwatch CacheMappingTimer; @@ -4633,6 +4995,7 @@ namespace { tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> CachedChunkHashesFound; tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> CachedSequenceHashesFound; + if (!PrimeCacheOnly) { ZEN_TRACE_CPU("UpdateFolder_CheckChunkCache"); @@ -4667,17 +5030,24 @@ namespace { if (SequenceSize == CacheDirContent.FileSizes[Index]) { CachedSequenceHashesFound.insert({FileHash, SequenceIndex}); - CacheMappingStats.CacheSequenceHashesCount += SequenceSize; - CacheMappingStats.CacheSequenceHashesByteCount++; + CacheMappingStats.CacheSequenceHashesCount++; + CacheMappingStats.CacheSequenceHashesByteCount += SequenceSize; + + const std::filesystem::path CacheFilePath = + GetFinalChunkedSequenceFileName(CacheFolderPath, + RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]); + ZEN_ASSERT_SLOW(IsFile(CacheFilePath)); + continue; } } } - std::filesystem::remove(CacheDirContent.Files[Index]); + RemoveFileWithRetry(CacheDirContent.Files[Index]); } } tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> CachedBlocksFound; + if (!PrimeCacheOnly) { ZEN_TRACE_CPU("UpdateFolder_CheckBlockCache"); @@ -4690,7 +5060,7 @@ namespace { } DirectoryContent BlockDirContent; - GetDirectoryContent(Path / ZenTempBlockFolderName, + GetDirectoryContent(ZenTempBlockFolderPath(ZenFolderPath), DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::IncludeFileSizes, BlockDirContent); CachedBlocksFound.reserve(BlockDirContent.Files.size()); @@ -4718,13 +5088,15 @@ namespace { } } } - std::filesystem::remove(BlockDirContent.Files[Index]); + RemoveFileWithRetry(BlockDirContent.Files[Index]); } } std::vector<uint32_t> LocalPathIndexesMatchingSequenceIndexes; - // Pick up all whole files we can use from current local state + + if (!PrimeCacheOnly) { + // Pick up all whole files we can use from current local state ZEN_TRACE_CPU("UpdateFolder_CheckLocalChunks"); for (uint32_t RemoteSequenceIndex = 0; RemoteSequenceIndex < RemoteContent.ChunkedContent.SequenceRawHashes.size(); RemoteSequenceIndex++) @@ -4736,6 +5108,8 @@ namespace { // const uint32_t RemoteSequenceIndex = CacheSequenceIt->second; // const uint32_t RemotePathIndex = GetFirstPathIndexForSeqeuenceIndex(RemoteLookup, RemoteSequenceIndex); // RemoteSequenceByteCountFoundInCache += RemoteContent.RawSizes[RemotePathIndex]; + const std::filesystem::path CacheFilePath = GetFinalChunkedSequenceFileName(CacheFolderPath, RemoteSequenceRawHash); + ZEN_ASSERT_SLOW(IsFile(CacheFilePath)); } else if (auto CacheChunkIt = CachedChunkHashesFound.find(RemoteSequenceRawHash); CacheChunkIt != CachedChunkHashesFound.end()) @@ -4743,13 +5117,16 @@ namespace { // const uint32_t RemoteChunkIndex = CacheChunkIt->second; // const uint32_t RemotePathIndex = GetFirstPathIndexForSeqeuenceIndex(RemoteLookup, RemoteSequenceIndex); // RemoteSequenceByteCountFoundInCache += RemoteContent.RawSizes[RemotePathIndex]; + const std::filesystem::path CacheFilePath = GetFinalChunkedSequenceFileName(CacheFolderPath, RemoteSequenceRawHash); + ZEN_ASSERT_SLOW(IsFile(CacheFilePath)); } else if (auto It = LocalLookup.RawHashToSequenceIndex.find(RemoteSequenceRawHash); It != LocalLookup.RawHashToSequenceIndex.end()) { const uint32_t LocalSequenceIndex = It->second; const uint32_t LocalPathIndex = GetFirstPathIndexForSeqeuenceIndex(LocalLookup, LocalSequenceIndex); - uint64_t RawSize = LocalContent.RawSizes[LocalPathIndex]; + ZEN_ASSERT_SLOW(IsFile((Path / LocalContent.Paths[LocalPathIndex]).make_preferred())); + uint64_t RawSize = LocalContent.RawSizes[LocalPathIndex]; LocalPathIndexesMatchingSequenceIndexes.push_back(LocalPathIndex); CacheMappingStats.LocalPathsMatchingSequencesCount++; CacheMappingStats.LocalPathsMatchingSequencesByteCount += RawSize; @@ -4762,6 +5139,15 @@ namespace { } } } + else + { + for (uint32_t RemoteSequenceIndex = 0; RemoteSequenceIndex < RemoteContent.ChunkedContent.SequenceRawHashes.size(); + RemoteSequenceIndex++) + { + const uint32_t ChunkCount = RemoteContent.ChunkedContent.ChunkCounts[RemoteSequenceIndex]; + SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex] = ChunkCount; + } + } // Pick up all chunks in current local state struct CacheCopyData { @@ -4779,6 +5165,7 @@ namespace { tsl::robin_map<IoHash, size_t, IoHash::Hasher> RawHashToCacheCopyDataIndex; std::vector<CacheCopyData> CacheCopyDatas; + if (!PrimeCacheOnly) { ZEN_TRACE_CPU("UpdateFolder_GetLocalChunks"); @@ -4852,38 +5239,37 @@ namespace { } } - if (!CachedSequenceHashesFound.empty() || !CachedChunkHashesFound.empty() || !CachedBlocksFound.empty() || - !LocalPathIndexesMatchingSequenceIndexes.empty() || CacheMappingStats.LocalChunkMatchingRemoteCount > 0) + if (!CachedSequenceHashesFound.empty() || !CachedChunkHashesFound.empty() || !CachedBlocksFound.empty()) { - ZEN_CONSOLE( - "Cache: {} ({}) chunk sequences, {} ({}) chunks, {} ({}) blocks. Local state: {} ({}) chunk sequences, {} ({}) chunks", - CachedSequenceHashesFound.size(), - NiceBytes(CacheMappingStats.CacheSequenceHashesByteCount), - CachedChunkHashesFound.size(), - NiceBytes(CacheMappingStats.CacheChunkByteCount), - CachedBlocksFound.size(), - NiceBytes(CacheMappingStats.CacheBlocksByteCount), - LocalPathIndexesMatchingSequenceIndexes.size(), - NiceBytes(CacheMappingStats.LocalPathsMatchingSequencesByteCount), - CacheMappingStats.LocalChunkMatchingRemoteCount, - NiceBytes(CacheMappingStats.LocalChunkMatchingRemoteByteCount)); + ZEN_CONSOLE("Download cache: Found {} ({}) chunk sequences, {} ({}) chunks, {} ({}) blocks.", + CachedSequenceHashesFound.size(), + NiceBytes(CacheMappingStats.CacheSequenceHashesByteCount), + CachedChunkHashesFound.size(), + NiceBytes(CacheMappingStats.CacheChunkByteCount), + CachedBlocksFound.size(), + NiceBytes(CacheMappingStats.CacheBlocksByteCount)); } - uint32_t ChunkCountToWrite = 0; + if (!LocalPathIndexesMatchingSequenceIndexes.empty() || CacheMappingStats.LocalChunkMatchingRemoteCount > 0) + { + ZEN_CONSOLE("Local state : Found {} ({}) chunk sequences, {} ({}) chunks", + LocalPathIndexesMatchingSequenceIndexes.size(), + NiceBytes(CacheMappingStats.LocalPathsMatchingSequencesByteCount), + CacheMappingStats.LocalChunkMatchingRemoteCount, + NiceBytes(CacheMappingStats.LocalChunkMatchingRemoteByteCount)); + } + + uint64_t BytesToWrite = 0; + for (uint32_t RemoteChunkIndex = 0; RemoteChunkIndex < RemoteContent.ChunkedContent.ChunkHashes.size(); RemoteChunkIndex++) { - if (RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex]) - { - ChunkCountToWrite++; - } - else + uint64_t ChunkWriteCount = GetChunkWriteCount(SequenceIndexChunksLeftToWriteCounters, RemoteLookup, RemoteChunkIndex); + if (ChunkWriteCount > 0) { - std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs = - GetRemainingChunkTargets(SequenceIndexChunksLeftToWriteCounters, RemoteLookup, RemoteChunkIndex); - if (!ChunkTargetPtrs.empty()) + BytesToWrite += RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex] * ChunkWriteCount; + if (!RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex]) { RemoteChunkIndexNeedsCopyFromSourceFlags[RemoteChunkIndex] = true; - ChunkCountToWrite++; } } } @@ -4900,8 +5286,8 @@ namespace { FilteredRate FilteredDownloadedBytesPerSecond; FilteredRate FilteredWrittenBytesPerSecond; - WorkerThreadPool& NetworkPool = GetSmallWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); // - WorkerThreadPool& WritePool = GetMediumWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); // + WorkerThreadPool& NetworkPool = GetNetworkPool(); + WorkerThreadPool& WritePool = GetIOWorkerPool(); ProgressBar WriteProgressBar(UsePlainProgress); ParallellWork Work(AbortFlag); @@ -4922,7 +5308,7 @@ namespace { const uint32_t RemoteChunkIndex = RemoteChunkIndexIt->second; if (RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex]) { - ZEN_DEBUG("Skipping chunk {} due to cache reuse", ChunkHash); + ZEN_CONSOLE_VERBOSE("Skipping chunk {} due to cache reuse", ChunkHash); continue; } bool NeedsCopy = true; @@ -4933,7 +5319,7 @@ namespace { if (ChunkTargetPtrs.empty()) { - ZEN_DEBUG("Skipping chunk {} due to cache reuse", ChunkHash); + ZEN_CONSOLE_VERBOSE("Skipping chunk {} due to cache reuse", ChunkHash); } else { @@ -4993,118 +5379,274 @@ namespace { const std::vector<uint32_t> BlockChunkIndexNeeded = GetNeededChunkBlockIndexes(BlockDescription); if (!BlockChunkIndexNeeded.empty()) { - bool UsingCachedBlock = false; - if (auto It = CachedBlocksFound.find(BlockDescription.BlockHash); It != CachedBlocksFound.end()) + if (PrimeCacheOnly) { - ZEN_TRACE_CPU("UpdateFolder_HandleBlocks_CacheGet"); - + TotalRequestCount++; TotalPartWriteCount++; - std::filesystem::path BlockPath = Path / ZenTempBlockFolderName / BlockDescription.BlockHash.ToHexString(); - if (std::filesystem::exists(BlockPath)) - { - CachedChunkBlockIndexes.push_back(BlockIndex); - UsingCachedBlock = true; - } + FullBlockWorks.push_back(BlockIndex); } - - if (!UsingCachedBlock) + else { - bool WantsToDoPartialBlockDownload = BlockChunkIndexNeeded.size() < BlockDescription.ChunkRawHashes.size(); - bool CanDoPartialBlockDownload = - (BlockDescription.HeaderSize > 0) && - (BlockDescription.ChunkCompressedLengths.size() == BlockDescription.ChunkRawHashes.size()); - if (AllowPartialBlockRequests && WantsToDoPartialBlockDownload && CanDoPartialBlockDownload) + bool UsingCachedBlock = false; + if (auto It = CachedBlocksFound.find(BlockDescription.BlockHash); It != CachedBlocksFound.end()) { - std::vector<BlockRangeDescriptor> BlockRanges; + ZEN_TRACE_CPU("UpdateFolder_HandleBlocks_CacheGet"); - ZEN_TRACE_CPU("UpdateFolder_HandleBlocks_PartialAnalysis"); + TotalPartWriteCount++; - uint32_t NeedBlockChunkIndexOffset = 0; - uint32_t ChunkBlockIndex = 0; - uint32_t CurrentOffset = - gsl::narrow<uint32_t>(CompressedBuffer::GetHeaderSizeForNoneEncoder() + BlockDescription.HeaderSize); + std::filesystem::path BlockPath = + ZenTempBlockFolderPath(ZenFolderPath) / BlockDescription.BlockHash.ToHexString(); + if (IsFile(BlockPath)) + { + CachedChunkBlockIndexes.push_back(BlockIndex); + UsingCachedBlock = true; + } + } - BlockRangeDescriptor NextRange{.BlockIndex = BlockIndex}; - while (NeedBlockChunkIndexOffset < BlockChunkIndexNeeded.size() && - ChunkBlockIndex < BlockDescription.ChunkRawHashes.size()) + if (!UsingCachedBlock) + { + bool WantsToDoPartialBlockDownload = BlockChunkIndexNeeded.size() < BlockDescription.ChunkRawHashes.size(); + bool CanDoPartialBlockDownload = + (BlockDescription.HeaderSize > 0) && + (BlockDescription.ChunkCompressedLengths.size() == BlockDescription.ChunkRawHashes.size()); + if (AllowPartialBlockRequests && WantsToDoPartialBlockDownload && CanDoPartialBlockDownload) { - const uint32_t ChunkCompressedLength = BlockDescription.ChunkCompressedLengths[ChunkBlockIndex]; - if (ChunkBlockIndex < BlockChunkIndexNeeded[NeedBlockChunkIndexOffset]) + std::vector<BlockRangeDescriptor> BlockRanges; + + ZEN_TRACE_CPU("UpdateFolder_HandleBlocks_PartialAnalysis"); + + uint32_t NeedBlockChunkIndexOffset = 0; + uint32_t ChunkBlockIndex = 0; + uint32_t CurrentOffset = + gsl::narrow<uint32_t>(CompressedBuffer::GetHeaderSizeForNoneEncoder() + BlockDescription.HeaderSize); + + const uint64_t TotalBlockSize = std::accumulate(BlockDescription.ChunkCompressedLengths.begin(), + BlockDescription.ChunkCompressedLengths.end(), + std::uint64_t(CurrentOffset)); + + BlockRangeDescriptor NextRange{.BlockIndex = BlockIndex}; + while (NeedBlockChunkIndexOffset < BlockChunkIndexNeeded.size() && + ChunkBlockIndex < BlockDescription.ChunkRawHashes.size()) { - if (NextRange.RangeLength > 0) + const uint32_t ChunkCompressedLength = BlockDescription.ChunkCompressedLengths[ChunkBlockIndex]; + if (ChunkBlockIndex < BlockChunkIndexNeeded[NeedBlockChunkIndexOffset]) { - BlockRanges.push_back(NextRange); - NextRange = {.BlockIndex = BlockIndex}; + if (NextRange.RangeLength > 0) + { + BlockRanges.push_back(NextRange); + NextRange = {.BlockIndex = BlockIndex}; + } + ChunkBlockIndex++; + CurrentOffset += ChunkCompressedLength; + } + else if (ChunkBlockIndex == BlockChunkIndexNeeded[NeedBlockChunkIndexOffset]) + { + if (NextRange.RangeLength == 0) + { + NextRange.RangeStart = CurrentOffset; + NextRange.ChunkBlockIndexStart = ChunkBlockIndex; + } + NextRange.RangeLength += ChunkCompressedLength; + NextRange.ChunkBlockIndexCount++; + ChunkBlockIndex++; + CurrentOffset += ChunkCompressedLength; + NeedBlockChunkIndexOffset++; + } + else + { + ZEN_ASSERT(false); } - ChunkBlockIndex++; - CurrentOffset += ChunkCompressedLength; } - else if (ChunkBlockIndex == BlockChunkIndexNeeded[NeedBlockChunkIndexOffset]) + if (NextRange.RangeLength > 0) { - if (NextRange.RangeLength == 0) + BlockRanges.push_back(NextRange); + } + + ZEN_ASSERT(!BlockRanges.empty()); + + std::vector<BlockRangeDescriptor> CollapsedBlockRanges; + auto It = BlockRanges.begin(); + CollapsedBlockRanges.push_back(*It++); + while (It != BlockRanges.end()) + { + BlockRangeDescriptor& LastRange = CollapsedBlockRanges.back(); + uint64_t Slack = It->RangeStart - (LastRange.RangeStart + LastRange.RangeLength); + uint64_t BothRangeSize = It->RangeLength + LastRange.RangeLength; + if (Slack <= Max(BothRangeSize / 8, 64u * 1024u)) // Made up heuristic - we'll see how it pans out + { + LastRange.ChunkBlockIndexCount = + (It->ChunkBlockIndexStart + It->ChunkBlockIndexCount) - LastRange.ChunkBlockIndexStart; + LastRange.RangeLength = (It->RangeStart + It->RangeLength) - LastRange.RangeStart; + } + else { - NextRange.RangeStart = CurrentOffset; - NextRange.ChunkBlockIndexStart = ChunkBlockIndex; + CollapsedBlockRanges.push_back(*It); } - NextRange.RangeLength += ChunkCompressedLength; - NextRange.ChunkBlockIndexCount++; - ChunkBlockIndex++; - CurrentOffset += ChunkCompressedLength; - NeedBlockChunkIndexOffset++; + ++It; } - else + + const std::uint64_t WantedSize = std::accumulate( + CollapsedBlockRanges.begin(), + CollapsedBlockRanges.end(), + uint64_t(0), + [](uint64_t Current, const BlockRangeDescriptor& Range) { return Current + Range.RangeLength; }); + ZEN_ASSERT(WantedSize <= TotalBlockSize); + if (WantedSize > ((TotalBlockSize * 95) / 100)) { - ZEN_ASSERT(false); + ZEN_CONSOLE_VERBOSE("Using more than 95% ({}) of block {} ({}), requesting full block", + NiceBytes(WantedSize), + BlockDescription.BlockHash, + NiceBytes(TotalBlockSize)); + TotalRequestCount++; + TotalPartWriteCount++; + + FullBlockWorks.push_back(BlockIndex); } - } - if (NextRange.RangeLength > 0) - { - BlockRanges.push_back(NextRange); - } - - ZEN_ASSERT(!BlockRanges.empty()); - std::vector<BlockRangeDescriptor> CollapsedBlockRanges; - auto It = BlockRanges.begin(); - CollapsedBlockRanges.push_back(*It++); - while (It != BlockRanges.end()) - { - BlockRangeDescriptor& LastRange = CollapsedBlockRanges.back(); - uint64_t Slack = It->RangeStart - (LastRange.RangeStart + LastRange.RangeLength); - uint64_t BothRangeSize = It->RangeLength + LastRange.RangeLength; - if (Slack <= Max(BothRangeSize / 8, 64u * 1024u)) // Made up heuristic - we'll see how it pans out + else if ((WantedSize > ((TotalBlockSize * 9) / 10)) && CollapsedBlockRanges.size() > 1) + { + ZEN_CONSOLE_VERBOSE( + "Using more than 90% ({}) of block {} ({}) using {} requests, requesting full block", + NiceBytes(WantedSize), + BlockDescription.BlockHash, + NiceBytes(TotalBlockSize), + CollapsedBlockRanges.size()); + TotalRequestCount++; + TotalPartWriteCount++; + + FullBlockWorks.push_back(BlockIndex); + } + else if ((WantedSize > ((TotalBlockSize * 8) / 10)) && (CollapsedBlockRanges.size() > 16)) + { + ZEN_CONSOLE_VERBOSE( + "Using more than 80% ({}) of block {} ({}) using {} requests, requesting full block", + NiceBytes(WantedSize), + BlockDescription.BlockHash, + NiceBytes(TotalBlockSize), + CollapsedBlockRanges.size()); + TotalRequestCount++; + TotalPartWriteCount++; + + FullBlockWorks.push_back(BlockIndex); + } + else if ((WantedSize > ((TotalBlockSize * 7) / 10)) && (CollapsedBlockRanges.size() > 48)) + { + ZEN_CONSOLE_VERBOSE( + "Using more than 70% ({}) of block {} ({}) using {} requests, requesting full block", + NiceBytes(WantedSize), + BlockDescription.BlockHash, + NiceBytes(TotalBlockSize), + CollapsedBlockRanges.size()); + TotalRequestCount++; + TotalPartWriteCount++; + + FullBlockWorks.push_back(BlockIndex); + } + else if ((WantedSize > ((TotalBlockSize * 6) / 10)) && (CollapsedBlockRanges.size() > 64)) { - LastRange.ChunkBlockIndexCount = - (It->ChunkBlockIndexStart + It->ChunkBlockIndexCount) - LastRange.ChunkBlockIndexStart; - LastRange.RangeLength = (It->RangeStart + It->RangeLength) - LastRange.RangeStart; + ZEN_CONSOLE_VERBOSE( + "Using more than 60% ({}) of block {} ({}) using {} requests, requesting full block", + NiceBytes(WantedSize), + BlockDescription.BlockHash, + NiceBytes(TotalBlockSize), + CollapsedBlockRanges.size()); + TotalRequestCount++; + TotalPartWriteCount++; + + FullBlockWorks.push_back(BlockIndex); } else { - CollapsedBlockRanges.push_back(*It); + if (WantedSize > ((TotalBlockSize * 5) / 10)) + { + ZEN_CONSOLE_VERBOSE("Using {}% ({}) of block {} ({}) using {} requests, requesting partial block", + (WantedSize * 100) / TotalBlockSize, + NiceBytes(WantedSize), + BlockDescription.BlockHash, + NiceBytes(TotalBlockSize), + CollapsedBlockRanges.size()); + } + TotalRequestCount += CollapsedBlockRanges.size(); + TotalPartWriteCount += CollapsedBlockRanges.size(); + + BlockRangeWorks.insert(BlockRangeWorks.end(), CollapsedBlockRanges.begin(), CollapsedBlockRanges.end()); } - ++It; } + else + { + TotalRequestCount++; + TotalPartWriteCount++; - TotalRequestCount += CollapsedBlockRanges.size(); - TotalPartWriteCount += CollapsedBlockRanges.size(); - - BlockRangeWorks.insert(BlockRangeWorks.end(), CollapsedBlockRanges.begin(), CollapsedBlockRanges.end()); - } - else - { - TotalRequestCount++; - TotalPartWriteCount++; - - FullBlockWorks.push_back(BlockIndex); + FullBlockWorks.push_back(BlockIndex); + } } } } else { - ZEN_DEBUG("Skipping block {} due to cache reuse", BlockDescriptions[BlockIndex].BlockHash); + ZEN_CONSOLE_VERBOSE("Skipping block {} due to cache reuse", BlockDescriptions[BlockIndex].BlockHash); } } + struct BlobsExistsResult + { + tsl::robin_set<IoHash> ExistingBlobs; + uint64_t ElapsedTimeMs = 0; + }; + + BlobsExistsResult ExistsResult; + + if (Storage.BuildCacheStorage) + { + ZEN_TRACE_CPU("BlobCacheExistCheck"); + Stopwatch Timer; + + tsl::robin_set<IoHash> BlobHashesSet; + + BlobHashesSet.reserve(LooseChunkHashWorks.size() + FullBlockWorks.size()); + for (LooseChunkHashWorkData& LooseChunkHashWork : LooseChunkHashWorks) + { + BlobHashesSet.insert(RemoteContent.ChunkedContent.ChunkHashes[LooseChunkHashWork.RemoteChunkIndex]); + } + for (const BlockRangeDescriptor& BlockRange : BlockRangeWorks) + { + const uint32_t BlockIndex = BlockRange.BlockIndex; + const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex]; + BlobHashesSet.insert(BlockDescription.BlockHash); + } + for (uint32_t BlockIndex : FullBlockWorks) + { + const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex]; + BlobHashesSet.insert(BlockDescription.BlockHash); + } + + if (!BlobHashesSet.empty()) + { + const std::vector<IoHash> BlobHashes(BlobHashesSet.begin(), BlobHashesSet.end()); + const std::vector<BuildStorageCache::BlobExistsResult> CacheExistsResult = + Storage.BuildCacheStorage->BlobsExists(BuildId, BlobHashes); + + if (CacheExistsResult.size() == BlobHashes.size()) + { + ExistsResult.ExistingBlobs.reserve(CacheExistsResult.size()); + for (size_t BlobIndex = 0; BlobIndex < BlobHashes.size(); BlobIndex++) + { + if (CacheExistsResult[BlobIndex].HasBody) + { + ExistsResult.ExistingBlobs.insert(BlobHashes[BlobIndex]); + } + } + } + ExistsResult.ElapsedTimeMs = Timer.GetElapsedTimeMs(); + if (!ExistsResult.ExistingBlobs.empty()) + { + ZEN_CONSOLE("Remote cache : Found {} out of {} needed blobs in {}", + ExistsResult.ExistingBlobs.size(), + BlobHashes.size(), + NiceTimeSpanMs(ExistsResult.ElapsedTimeMs)); + } + } + } for (uint32_t LooseChunkHashWorkIndex = 0; LooseChunkHashWorkIndex < LooseChunkHashWorks.size(); LooseChunkHashWorkIndex++) { if (AbortFlag) @@ -5118,17 +5660,25 @@ namespace { std::move(LooseChunkHashWork.ChunkTargetPtrs); const uint32_t RemoteChunkIndex = LooseChunkHashWork.RemoteChunkIndex; + if (PrimeCacheOnly && ExistsResult.ExistingBlobs.contains(RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex])) + { + DownloadStats.RequestsCompleteCount++; + continue; + } + Work.ScheduleWork( - WritePool, // NetworkPool, // GetSyncWorkerPool(),// + WritePool, [&, RemoteChunkIndex, ChunkTargetPtrs](std::atomic<bool>&) mutable { if (!AbortFlag) { ZEN_TRACE_CPU("UpdateFolder_ReadPreDownloaded"); std::filesystem::path ExistingCompressedChunkPath; + if (!PrimeCacheOnly) { - const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; - std::filesystem::path CompressedChunkPath = Path / ZenTempDownloadFolderName / ChunkHash.ToHexString(); - if (std::filesystem::exists(CompressedChunkPath)) + const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; + std::filesystem::path CompressedChunkPath = + ZenTempDownloadFolderPath(ZenFolderPath) / ChunkHash.ToHexString(); + if (IsFile(CompressedChunkPath)) { IoBuffer ExistingCompressedPart = IoBufferBuilder::MakeFromFile(ExistingCompressedChunkPath); if (ExistingCompressedPart) @@ -5147,156 +5697,250 @@ namespace { else { std::error_code DummyEc; - std::filesystem::remove(CompressedChunkPath, DummyEc); + RemoveFile(CompressedChunkPath, DummyEc); } } } } - if (!ExistingCompressedChunkPath.empty()) + if (!AbortFlag) + { - Work.ScheduleWork( - WritePool, // WritePool, GetSyncWorkerPool() - [&Path, - &RemoteContent, - &RemoteLookup, - &CacheFolderPath, - &SequenceIndexChunksLeftToWriteCounters, - &Work, - &WritePool, - &DiskStats, - &WriteChunkStats, - &WritePartsComplete, - &TotalPartWriteCount, - &FilteredWrittenBytesPerSecond, - RemoteChunkIndex, - ChunkTargetPtrs, - CompressedChunkPath = std::move(ExistingCompressedChunkPath)](std::atomic<bool>&) mutable { - if (!AbortFlag) - { - ZEN_TRACE_CPU("UpdateFolder_WritePreDownloaded"); + if (!ExistingCompressedChunkPath.empty()) + { + Work.ScheduleWork( + WritePool, + [&Path, + &ZenFolderPath, + &RemoteContent, + &RemoteLookup, + &CacheFolderPath, + &SequenceIndexChunksLeftToWriteCounters, + &Work, + &WritePool, + &DiskStats, + &WriteChunkStats, + &WritePartsComplete, + TotalPartWriteCount, + &FilteredWrittenBytesPerSecond, + RemoteChunkIndex, + ChunkTargetPtrs, + CompressedChunkPath = std::move(ExistingCompressedChunkPath)](std::atomic<bool>&) mutable { + if (!AbortFlag) + { + ZEN_TRACE_CPU("UpdateFolder_WritePreDownloaded"); - FilteredWrittenBytesPerSecond.Start(); + FilteredWrittenBytesPerSecond.Start(); - const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; + const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; - IoBuffer CompressedPart = IoBufferBuilder::MakeFromFile(CompressedChunkPath); - if (!CompressedPart) - { - throw std::runtime_error(fmt::format("Could not open dowloaded compressed chunk {} from {}", - ChunkHash, - CompressedChunkPath)); - } + IoBuffer CompressedPart = IoBufferBuilder::MakeFromFile(CompressedChunkPath); + if (!CompressedPart) + { + throw std::runtime_error( + fmt::format("Could not open dowloaded compressed chunk {} from {}", + ChunkHash, + CompressedChunkPath)); + } + + std::filesystem::path TargetFolder = ZenTempCacheFolderPath(ZenFolderPath); + bool NeedHashVerify = WriteCompressedChunk(TargetFolder, + RemoteContent, + RemoteLookup, + ChunkHash, + ChunkTargetPtrs, + std::move(CompressedPart), + DiskStats); + WritePartsComplete++; + + if (!AbortFlag) + { + if (WritePartsComplete == TotalPartWriteCount) + { + FilteredWrittenBytesPerSecond.Stop(); + } - std::filesystem::path TargetFolder = Path / ZenTempCacheFolderName; - bool NeedHashVerify = WriteCompressedChunk(TargetFolder, - RemoteContent, - RemoteLookup, - ChunkHash, - ChunkTargetPtrs, - std::move(CompressedPart), - DiskStats, - WriteChunkStats); - WriteChunkStats.ChunkCountWritten++; - WriteChunkStats.ChunkBytesWritten += - RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex]; - WritePartsComplete++; + RemoveFile(CompressedChunkPath); + std::vector<uint32_t> CompletedSequences = + CompleteChunkTargets(ChunkTargetPtrs, SequenceIndexChunksLeftToWriteCounters); + if (NeedHashVerify) + { + VerifyAndCompleteChunkSequencesAsync(TargetFolder, + RemoteContent, + RemoteLookup, + CompletedSequences, + Work, + WritePool); + } + else + { + FinalizeChunkSequences(TargetFolder, RemoteContent, CompletedSequences); + } + } + } + }, + Work.DefaultErrorFunction()); + } + else + { + Work.ScheduleWork( + NetworkPool, + [&Path, + &ZenFolderPath, + &Storage, + BuildId, + &PrimeCacheOnly, + &RemoteContent, + &RemoteLookup, + &ExistsResult, + &SequenceIndexChunksLeftToWriteCounters, + &Work, + &WritePool, + &NetworkPool, + &DiskStats, + &WriteChunkStats, + &WritePartsComplete, + TotalPartWriteCount, + TotalRequestCount, + &FilteredDownloadedBytesPerSecond, + &FilteredWrittenBytesPerSecond, + LargeAttachmentSize, + PreferredMultipartChunkSize, + RemoteChunkIndex, + ChunkTargetPtrs, + &DownloadStats](std::atomic<bool>&) mutable { if (!AbortFlag) { - if (WritePartsComplete == TotalPartWriteCount) + const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; + FilteredDownloadedBytesPerSecond.Start(); + IoBuffer BuildBlob; + const bool ExistsInCache = + Storage.BuildCacheStorage && ExistsResult.ExistingBlobs.contains(ChunkHash); + if (ExistsInCache) { - FilteredWrittenBytesPerSecond.Stop(); + BuildBlob = Storage.BuildCacheStorage->GetBuildBlob(BuildId, ChunkHash); } - - std::filesystem::remove(CompressedChunkPath); - - std::vector<uint32_t> CompletedSequences = - CompleteChunkTargets(ChunkTargetPtrs, SequenceIndexChunksLeftToWriteCounters); - if (NeedHashVerify) + if (BuildBlob) { - VerifyAndCompleteChunkSequencesAsync(TargetFolder, - RemoteContent, - CompletedSequences, - Work, - WritePool); + uint64_t BlobSize = BuildBlob.GetSize(); + DownloadStats.DownloadedChunkCount++; + DownloadStats.DownloadedChunkByteCount += BlobSize; + DownloadStats.RequestsCompleteCount++; + if (DownloadStats.RequestsCompleteCount == TotalRequestCount) + { + FilteredDownloadedBytesPerSecond.Stop(); + } + AsyncWriteDownloadedChunk(ZenFolderPath, + RemoteContent, + RemoteLookup, + RemoteChunkIndex, + std::move(ChunkTargetPtrs), + Work, + WritePool, + std::move(BuildBlob), + SequenceIndexChunksLeftToWriteCounters, + WritePartsComplete, + TotalPartWriteCount, + FilteredWrittenBytesPerSecond, + DiskStats); } else { - FinalizeChunkSequences(TargetFolder, RemoteContent, CompletedSequences); + if (RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex] >= LargeAttachmentSize) + { + ZEN_TRACE_CPU("UpdateFolder_GetLargeChunk"); + DownloadLargeBlob( + *Storage.BuildStorage, + ZenTempDownloadFolderPath(ZenFolderPath), + BuildId, + ChunkHash, + PreferredMultipartChunkSize, + Work, + NetworkPool, + DownloadStats, + [&, RemoteChunkIndex, ChunkTargetPtrs](IoBuffer&& Payload) mutable { + DownloadStats.RequestsCompleteCount++; + if (DownloadStats.RequestsCompleteCount == TotalRequestCount) + { + FilteredDownloadedBytesPerSecond.Stop(); + } + if (Payload && Storage.BuildCacheStorage) + { + Storage.BuildCacheStorage->PutBuildBlob( + BuildId, + ChunkHash, + ZenContentType::kCompressedBinary, + CompositeBuffer(SharedBuffer(Payload))); + } + if (!PrimeCacheOnly) + { + if (!AbortFlag) + { + AsyncWriteDownloadedChunk(ZenFolderPath, + RemoteContent, + RemoteLookup, + RemoteChunkIndex, + std::move(ChunkTargetPtrs), + Work, + WritePool, + std::move(Payload), + SequenceIndexChunksLeftToWriteCounters, + WritePartsComplete, + TotalPartWriteCount, + FilteredWrittenBytesPerSecond, + DiskStats); + } + } + }); + } + else + { + ZEN_TRACE_CPU("UpdateFolder_GetChunk"); + BuildBlob = Storage.BuildStorage->GetBuildBlob(BuildId, ChunkHash); + if (BuildBlob && Storage.BuildCacheStorage) + { + Storage.BuildCacheStorage->PutBuildBlob( + BuildId, + ChunkHash, + BuildBlob.GetContentType(), + CompositeBuffer(SharedBuffer(BuildBlob))); + } + if (!BuildBlob) + { + throw std::runtime_error(fmt::format("Chunk {} is missing", ChunkHash)); + } + if (!PrimeCacheOnly) + { + if (!AbortFlag) + { + uint64_t BlobSize = BuildBlob.GetSize(); + DownloadStats.DownloadedChunkCount++; + DownloadStats.DownloadedChunkByteCount += BlobSize; + DownloadStats.RequestsCompleteCount++; + if (DownloadStats.RequestsCompleteCount == TotalRequestCount) + { + FilteredDownloadedBytesPerSecond.Stop(); + } + AsyncWriteDownloadedChunk(ZenFolderPath, + RemoteContent, + RemoteLookup, + RemoteChunkIndex, + std::move(ChunkTargetPtrs), + Work, + WritePool, + std::move(BuildBlob), + SequenceIndexChunksLeftToWriteCounters, + WritePartsComplete, + TotalPartWriteCount, + FilteredWrittenBytesPerSecond, + DiskStats); + } + } + } } } - } - }, - Work.DefaultErrorFunction()); - } - else - { - FilteredDownloadedBytesPerSecond.Start(); - const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; - if (RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex] >= LargeAttachmentSize) - { - ZEN_TRACE_CPU("UpdateFolder_GetLargeChunk"); - DownloadLargeBlob(Storage, - Path / ZenTempDownloadFolderName, - BuildId, - ChunkHash, - PreferredMultipartChunkSize, - Work, - NetworkPool, - DownloadStats, - [&, RemoteChunkIndex, ChunkTargetPtrs](IoBuffer&& Payload) mutable { - DownloadStats.RequestsCompleteCount++; - if (DownloadStats.RequestsCompleteCount == TotalRequestCount) - { - FilteredDownloadedBytesPerSecond.Stop(); - } - AsyncWriteDownloadedChunk(Path, - RemoteContent, - RemoteLookup, - RemoteChunkIndex, - std::move(ChunkTargetPtrs), - Work, - WritePool, - std::move(Payload), - SequenceIndexChunksLeftToWriteCounters, - WritePartsComplete, - TotalPartWriteCount, - FilteredWrittenBytesPerSecond, - DiskStats, - WriteChunkStats); - }); - } - else - { - ZEN_TRACE_CPU("UpdateFolder_GetChunk"); - - IoBuffer BuildBlob = Storage.GetBuildBlob(BuildId, ChunkHash); - if (!BuildBlob) - { - throw std::runtime_error(fmt::format("Chunk {} is missing", ChunkHash)); - } - uint64_t BlobSize = BuildBlob.GetSize(); - DownloadStats.DownloadedChunkCount++; - DownloadStats.DownloadedChunkByteCount += BlobSize; - DownloadStats.RequestsCompleteCount++; - if (DownloadStats.RequestsCompleteCount == TotalRequestCount) - { - FilteredDownloadedBytesPerSecond.Stop(); - } - AsyncWriteDownloadedChunk(Path, - RemoteContent, - RemoteLookup, - RemoteChunkIndex, - std::move(ChunkTargetPtrs), - Work, - WritePool, - std::move(BuildBlob), - SequenceIndexChunksLeftToWriteCounters, - WritePartsComplete, - TotalPartWriteCount, - FilteredWrittenBytesPerSecond, - DiskStats, - WriteChunkStats); + }, + Work.DefaultErrorFunction()); } } } @@ -5306,13 +5950,14 @@ namespace { for (size_t CopyDataIndex = 0; CopyDataIndex < CacheCopyDatas.size(); CopyDataIndex++) { + ZEN_ASSERT(!PrimeCacheOnly); if (AbortFlag) { break; } Work.ScheduleWork( - WritePool, // GetSyncWorkerPool(),// + WritePool, [&, CopyDataIndex](std::atomic<bool>&) { if (!AbortFlag) { @@ -5439,16 +6084,6 @@ namespace { ChunkSource, Op.Target->Offset, RemoteContent.RawSizes[RemotePathIndex]); - for (size_t WrittenOpIndex = WriteOpIndex; WrittenOpIndex < WriteOpIndex + WriteCount; WrittenOpIndex++) - { - const WriteOp& WrittenOp = WriteOps[WrittenOpIndex]; - if (ChunkIndexesWritten.insert(WrittenOp.ChunkIndex).second) - { - WriteChunkStats.ChunkCountWritten++; - WriteChunkStats.ChunkBytesWritten += - RemoteContent.ChunkedContent.ChunkRawSizes[WrittenOp.ChunkIndex]; - } - } CacheLocalFileBytesRead += ReadLength; // TODO: This should be the sum of unique chunk sizes? @@ -5469,10 +6104,13 @@ namespace { } VerifyAndCompleteChunkSequencesAsync(CacheFolderPath, RemoteContent, + RemoteLookup, CompletedChunkSequences, Work, WritePool); - ZEN_DEBUG("Copied {} from {}", NiceBytes(CacheLocalFileBytesRead), LocalContent.Paths[LocalPathIndex]); + ZEN_CONSOLE_VERBOSE("Copied {} from {}", + NiceBytes(CacheLocalFileBytesRead), + LocalContent.Paths[LocalPathIndex]); } WritePartsComplete++; if (WritePartsComplete == TotalPartWriteCount) @@ -5486,13 +6124,14 @@ namespace { for (uint32_t BlockIndex : CachedChunkBlockIndexes) { + ZEN_ASSERT(!PrimeCacheOnly); if (AbortFlag) { break; } Work.ScheduleWork( - WritePool, // GetSyncWorkerPool(), // WritePool, + WritePool, [&, BlockIndex](std::atomic<bool>&) mutable { if (!AbortFlag) { @@ -5501,35 +6140,38 @@ namespace { const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex]; FilteredWrittenBytesPerSecond.Start(); - std::filesystem::path BlockChunkPath = Path / ZenTempBlockFolderName / BlockDescription.BlockHash.ToHexString(); - IoBuffer BlockBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath); + std::filesystem::path BlockChunkPath = + ZenTempBlockFolderPath(ZenFolderPath) / BlockDescription.BlockHash.ToHexString(); + IoBuffer BlockBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath); if (!BlockBuffer) { throw std::runtime_error( fmt::format("Can not read block {} at {}", BlockDescription.BlockHash, BlockChunkPath)); } - if (!WriteBlockToDisk(CacheFolderPath, - RemoteContent, - BlockDescription, - SequenceIndexChunksLeftToWriteCounters, - Work, - WritePool, - CompositeBuffer(std::move(BlockBuffer)), - RemoteLookup, - RemoteChunkIndexNeedsCopyFromSourceFlags, - DiskStats, - WriteChunkStats)) - { - std::error_code DummyEc; - std::filesystem::remove(BlockChunkPath, DummyEc); - throw std::runtime_error(fmt::format("Block {} is malformed", BlockDescription.BlockHash)); - } - WritePartsComplete++; - std::filesystem::remove(BlockChunkPath); - if (WritePartsComplete == TotalPartWriteCount) + if (!AbortFlag) { - FilteredWrittenBytesPerSecond.Stop(); + if (!WriteBlockToDisk(CacheFolderPath, + RemoteContent, + BlockDescription, + SequenceIndexChunksLeftToWriteCounters, + Work, + WritePool, + CompositeBuffer(std::move(BlockBuffer)), + RemoteLookup, + RemoteChunkIndexNeedsCopyFromSourceFlags, + DiskStats)) + { + std::error_code DummyEc; + RemoveFile(BlockChunkPath, DummyEc); + throw std::runtime_error(fmt::format("Block {} is malformed", BlockDescription.BlockHash)); + } + WritePartsComplete++; + RemoveFile(BlockChunkPath); + if (WritePartsComplete == TotalPartWriteCount) + { + FilteredWrittenBytesPerSecond.Stop(); + } } } }, @@ -5538,6 +6180,7 @@ namespace { for (size_t BlockRangeIndex = 0; BlockRangeIndex < BlockRangeWorks.size(); BlockRangeIndex++) { + ZEN_ASSERT(!PrimeCacheOnly); if (AbortFlag) { break; @@ -5546,7 +6189,7 @@ namespace { ZEN_ASSERT(BlockRange.BlockIndex != (uint32_t)-1); const uint32_t BlockIndex = BlockRange.BlockIndex; Work.ScheduleWork( - NetworkPool, // NetworkPool, // GetSyncWorkerPool() + NetworkPool, [&, BlockIndex, BlockRange](std::atomic<bool>&) { if (!AbortFlag) { @@ -5555,131 +6198,146 @@ namespace { const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex]; FilteredDownloadedBytesPerSecond.Start(); - IoBuffer BlockBuffer = - Storage.GetBuildBlob(BuildId, BlockDescription.BlockHash, BlockRange.RangeStart, BlockRange.RangeLength); + IoBuffer BlockBuffer; + if (Storage.BuildCacheStorage && ExistsResult.ExistingBlobs.contains(BlockDescription.BlockHash)) + { + BlockBuffer = Storage.BuildCacheStorage->GetBuildBlob(BuildId, + BlockDescription.BlockHash, + BlockRange.RangeStart, + BlockRange.RangeLength); + } if (!BlockBuffer) { - throw std::runtime_error(fmt::format("Block {} is missing", BlockDescription.BlockHash)); + BlockBuffer = Storage.BuildStorage->GetBuildBlob(BuildId, + BlockDescription.BlockHash, + BlockRange.RangeStart, + BlockRange.RangeLength); } - uint64_t BlockSize = BlockBuffer.GetSize(); - DownloadStats.DownloadedBlockCount++; - DownloadStats.DownloadedBlockByteCount += BlockSize; - DownloadStats.RequestsCompleteCount++; - if (DownloadStats.RequestsCompleteCount == TotalRequestCount) + if (!BlockBuffer) { - FilteredDownloadedBytesPerSecond.Stop(); + throw std::runtime_error(fmt::format("Block {} is missing", BlockDescription.BlockHash)); } - - std::filesystem::path BlockChunkPath; - - // Check if the dowloaded block is file based and we can move it directly without rewriting it + if (!AbortFlag) { - IoBufferFileReference FileRef; - if (BlockBuffer.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) && - (FileRef.FileChunkSize == BlockSize)) + uint64_t BlockSize = BlockBuffer.GetSize(); + DownloadStats.DownloadedBlockCount++; + DownloadStats.DownloadedBlockByteCount += BlockSize; + DownloadStats.RequestsCompleteCount++; + if (DownloadStats.RequestsCompleteCount == TotalRequestCount) { - ZEN_TRACE_CPU("UpdateFolder_MoveTempBlock"); + FilteredDownloadedBytesPerSecond.Stop(); + } - std::error_code Ec; - std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec); - if (!Ec) + std::filesystem::path BlockChunkPath; + + // Check if the dowloaded block is file based and we can move it directly without rewriting it + { + IoBufferFileReference FileRef; + if (BlockBuffer.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) && + (FileRef.FileChunkSize == BlockSize)) { - BlockBuffer.SetDeleteOnClose(false); - BlockBuffer = {}; - BlockChunkPath = Path / ZenTempBlockFolderName / - fmt::format("{}_{:x}_{:x}", - BlockDescription.BlockHash, - BlockRange.RangeStart, - BlockRange.RangeLength); - std::filesystem::rename(TempBlobPath, BlockChunkPath, Ec); - if (Ec) + ZEN_TRACE_CPU("UpdateFolder_MoveTempBlock"); + + std::error_code Ec; + std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec); + if (!Ec) { - BlockChunkPath = std::filesystem::path{}; + BlockBuffer.SetDeleteOnClose(false); + BlockBuffer = {}; + BlockChunkPath = ZenTempBlockFolderPath(ZenFolderPath) / fmt::format("{}_{:x}_{:x}", + BlockDescription.BlockHash, + BlockRange.RangeStart, + BlockRange.RangeLength); + RenameFile(TempBlobPath, BlockChunkPath, Ec); + if (Ec) + { + BlockChunkPath = std::filesystem::path{}; - // Re-open the temp file again - BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete); - BlockBuffer = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, BlockSize, true); - BlockBuffer.SetDeleteOnClose(true); + // Re-open the temp file again + BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete); + BlockBuffer = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, BlockSize, true); + BlockBuffer.SetDeleteOnClose(true); + } } } } - } - if (BlockChunkPath.empty() && (BlockSize > 512u * 1024u)) - { - ZEN_TRACE_CPU("UpdateFolder_WriteTempBlock"); - // Could not be moved and rather large, lets store it on disk - BlockChunkPath = - Path / ZenTempBlockFolderName / - fmt::format("{}_{:x}_{:x}", BlockDescription.BlockHash, BlockRange.RangeStart, BlockRange.RangeLength); - TemporaryFile::SafeWriteFile(BlockChunkPath, BlockBuffer); - BlockBuffer = {}; - } + if (BlockChunkPath.empty() && (BlockSize > 512u * 1024u)) + { + ZEN_TRACE_CPU("UpdateFolder_WriteTempBlock"); + // Could not be moved and rather large, lets store it on disk + BlockChunkPath = ZenTempBlockFolderPath(ZenFolderPath) / fmt::format("{}_{:x}_{:x}", + BlockDescription.BlockHash, + BlockRange.RangeStart, + BlockRange.RangeLength); + TemporaryFile::SafeWriteFile(BlockChunkPath, BlockBuffer); + BlockBuffer = {}; + } - if (!AbortFlag) - { - Work.ScheduleWork( - WritePool, // WritePool, // GetSyncWorkerPool(), - [&, BlockIndex, BlockRange, BlockChunkPath, BlockPartialBuffer = std::move(BlockBuffer)]( - std::atomic<bool>&) mutable { - if (!AbortFlag) - { - ZEN_TRACE_CPU("UpdateFolder_WritePartialBlock"); + if (!AbortFlag) + { + Work.ScheduleWork( + WritePool, + [&, BlockIndex, BlockRange, BlockChunkPath, BlockPartialBuffer = std::move(BlockBuffer)]( + std::atomic<bool>&) mutable { + if (!AbortFlag) + { + ZEN_TRACE_CPU("UpdateFolder_WritePartialBlock"); - const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex]; + const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex]; - if (BlockChunkPath.empty()) - { - ZEN_ASSERT(BlockPartialBuffer); - } - else - { - ZEN_ASSERT(!BlockPartialBuffer); - BlockPartialBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath); - if (!BlockPartialBuffer) + if (BlockChunkPath.empty()) { - throw std::runtime_error(fmt::format("Could not open downloaded block {} from {}", - BlockDescription.BlockHash, - BlockChunkPath)); + ZEN_ASSERT(BlockPartialBuffer); + } + else + { + ZEN_ASSERT(!BlockPartialBuffer); + BlockPartialBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath); + if (!BlockPartialBuffer) + { + throw std::runtime_error(fmt::format("Could not open downloaded block {} from {}", + BlockDescription.BlockHash, + BlockChunkPath)); + } } - } - FilteredWrittenBytesPerSecond.Start(); - - if (!WritePartialBlockToDisk( - CacheFolderPath, - RemoteContent, - BlockDescription, - SequenceIndexChunksLeftToWriteCounters, - Work, - WritePool, - CompositeBuffer(std::move(BlockPartialBuffer)), - BlockRange.ChunkBlockIndexStart, - BlockRange.ChunkBlockIndexStart + BlockRange.ChunkBlockIndexCount - 1, - RemoteLookup, - RemoteChunkIndexNeedsCopyFromSourceFlags, - DiskStats, - WriteChunkStats)) - { - std::error_code DummyEc; - std::filesystem::remove(BlockChunkPath, DummyEc); - throw std::runtime_error( - fmt::format("Partial block {} is malformed", BlockDescription.BlockHash)); - } + FilteredWrittenBytesPerSecond.Start(); + + if (!WritePartialBlockToDisk( + CacheFolderPath, + RemoteContent, + BlockDescription, + SequenceIndexChunksLeftToWriteCounters, + Work, + WritePool, + CompositeBuffer(std::move(BlockPartialBuffer)), + BlockRange.ChunkBlockIndexStart, + BlockRange.ChunkBlockIndexStart + BlockRange.ChunkBlockIndexCount - 1, + RemoteLookup, + RemoteChunkIndexNeedsCopyFromSourceFlags, + DiskStats)) + { + std::error_code DummyEc; + RemoveFile(BlockChunkPath, DummyEc); + throw std::runtime_error( + fmt::format("Partial block {} is malformed", BlockDescription.BlockHash)); + } - if (!BlockChunkPath.empty()) - { - std::filesystem::remove(BlockChunkPath); - } + if (!BlockChunkPath.empty()) + { + RemoveFile(BlockChunkPath); + } - WritePartsComplete++; - if (WritePartsComplete == TotalPartWriteCount) - { - FilteredWrittenBytesPerSecond.Stop(); + WritePartsComplete++; + if (WritePartsComplete == TotalPartWriteCount) + { + FilteredWrittenBytesPerSecond.Stop(); + } } - } - }, - Work.DefaultErrorFunction()); + }, + Work.DefaultErrorFunction()); + } } } }, @@ -5692,8 +6350,15 @@ namespace { { break; } + + if (PrimeCacheOnly && ExistsResult.ExistingBlobs.contains(BlockDescriptions[BlockIndex].BlockHash)) + { + DownloadStats.RequestsCompleteCount++; + continue; + } + Work.ScheduleWork( - NetworkPool, // GetSyncWorkerPool(), // NetworkPool, + NetworkPool, [&, BlockIndex](std::atomic<bool>&) { if (!AbortFlag) { @@ -5702,133 +6367,159 @@ namespace { const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex]; FilteredDownloadedBytesPerSecond.Start(); - IoBuffer BlockBuffer = Storage.GetBuildBlob(BuildId, BlockDescription.BlockHash); - if (!BlockBuffer) - { - throw std::runtime_error(fmt::format("Block {} is missing", BlockDescription.BlockHash)); - } - uint64_t BlockSize = BlockBuffer.GetSize(); - DownloadStats.DownloadedBlockCount++; - DownloadStats.DownloadedBlockByteCount += BlockSize; - DownloadStats.RequestsCompleteCount++; - if (DownloadStats.RequestsCompleteCount == TotalRequestCount) + + IoBuffer BlockBuffer; + const bool ExistsInCache = + Storage.BuildCacheStorage && ExistsResult.ExistingBlobs.contains(BlockDescription.BlockHash); + if (ExistsInCache) { - FilteredDownloadedBytesPerSecond.Stop(); + BlockBuffer = Storage.BuildCacheStorage->GetBuildBlob(BuildId, BlockDescription.BlockHash); } - - std::filesystem::path BlockChunkPath; - - // Check if the dowloaded block is file based and we can move it directly without rewriting it + if (!BlockBuffer) { - IoBufferFileReference FileRef; - if (BlockBuffer.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) && - (FileRef.FileChunkSize == BlockSize)) + BlockBuffer = Storage.BuildStorage->GetBuildBlob(BuildId, BlockDescription.BlockHash); + if (BlockBuffer && Storage.BuildCacheStorage) { - ZEN_TRACE_CPU("UpdateFolder_MoveTempBlock"); - std::error_code Ec; - std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec); - if (!Ec) - { - BlockBuffer.SetDeleteOnClose(false); - BlockBuffer = {}; - BlockChunkPath = Path / ZenTempBlockFolderName / BlockDescription.BlockHash.ToHexString(); - std::filesystem::rename(TempBlobPath, BlockChunkPath, Ec); - if (Ec) - { - BlockChunkPath = std::filesystem::path{}; - - // Re-open the temp file again - BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete); - BlockBuffer = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, BlockSize, true); - BlockBuffer.SetDeleteOnClose(true); - } - } + Storage.BuildCacheStorage->PutBuildBlob(BuildId, + BlockDescription.BlockHash, + BlockBuffer.GetContentType(), + CompositeBuffer(SharedBuffer(BlockBuffer))); } } - - if (BlockChunkPath.empty() && (BlockSize > 512u * 1024u)) + if (!BlockBuffer) { - ZEN_TRACE_CPU("UpdateFolder_WriteTempBlock"); - // Could not be moved and rather large, lets store it on disk - BlockChunkPath = Path / ZenTempBlockFolderName / BlockDescription.BlockHash.ToHexString(); - TemporaryFile::SafeWriteFile(BlockChunkPath, BlockBuffer); - BlockBuffer = {}; + throw std::runtime_error(fmt::format("Block {} is missing", BlockDescription.BlockHash)); } - if (!AbortFlag) { - Work.ScheduleWork( - WritePool, // WritePool, GetSyncWorkerPool() - [&Work, - &WritePool, - &RemoteContent, - &RemoteLookup, - CacheFolderPath, - &RemoteChunkIndexNeedsCopyFromSourceFlags, - &SequenceIndexChunksLeftToWriteCounters, - BlockIndex, - &BlockDescriptions, - &WriteChunkStats, - &DiskStats, - &WritePartsComplete, - &TotalPartWriteCount, - &FilteredWrittenBytesPerSecond, - BlockChunkPath, - BlockBuffer = std::move(BlockBuffer)](std::atomic<bool>&) mutable { - if (!AbortFlag) - { - ZEN_TRACE_CPU("UpdateFolder_WriteFullBlock"); + uint64_t BlockSize = BlockBuffer.GetSize(); + DownloadStats.DownloadedBlockCount++; + DownloadStats.DownloadedBlockByteCount += BlockSize; + DownloadStats.RequestsCompleteCount++; + if (DownloadStats.RequestsCompleteCount == TotalRequestCount) + { + FilteredDownloadedBytesPerSecond.Stop(); + } - const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex]; + if (!PrimeCacheOnly) + { + std::filesystem::path BlockChunkPath; - if (BlockChunkPath.empty()) - { - ZEN_ASSERT(BlockBuffer); - } - else + // Check if the dowloaded block is file based and we can move it directly without rewriting it + { + IoBufferFileReference FileRef; + if (BlockBuffer.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) && + (FileRef.FileChunkSize == BlockSize)) + { + ZEN_TRACE_CPU("UpdateFolder_MoveTempBlock"); + std::error_code Ec; + std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec); + if (!Ec) { - ZEN_ASSERT(!BlockBuffer); - BlockBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath); - if (!BlockBuffer) + BlockBuffer.SetDeleteOnClose(false); + BlockBuffer = {}; + BlockChunkPath = + ZenTempBlockFolderPath(ZenFolderPath) / BlockDescription.BlockHash.ToHexString(); + RenameFile(TempBlobPath, BlockChunkPath, Ec); + if (Ec) { - throw std::runtime_error(fmt::format("Could not open dowloaded block {} from {}", - BlockDescription.BlockHash, - BlockChunkPath)); + BlockChunkPath = std::filesystem::path{}; + + // Re-open the temp file again + BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete); + BlockBuffer = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, BlockSize, true); + BlockBuffer.SetDeleteOnClose(true); } } + } + } - FilteredWrittenBytesPerSecond.Start(); - if (!WriteBlockToDisk(CacheFolderPath, - RemoteContent, - BlockDescription, - SequenceIndexChunksLeftToWriteCounters, - Work, - WritePool, - CompositeBuffer(std::move(BlockBuffer)), - RemoteLookup, - RemoteChunkIndexNeedsCopyFromSourceFlags, - DiskStats, - WriteChunkStats)) - { - std::error_code DummyEc; - std::filesystem::remove(BlockChunkPath, DummyEc); - throw std::runtime_error(fmt::format("Block {} is malformed", BlockDescription.BlockHash)); - } + if (BlockChunkPath.empty() && (BlockSize > 512u * 1024u)) + { + ZEN_TRACE_CPU("UpdateFolder_WriteTempBlock"); + // Could not be moved and rather large, lets store it on disk + BlockChunkPath = ZenTempBlockFolderPath(ZenFolderPath) / BlockDescription.BlockHash.ToHexString(); + TemporaryFile::SafeWriteFile(BlockChunkPath, BlockBuffer); + BlockBuffer = {}; + } - if (!BlockChunkPath.empty()) - { - std::filesystem::remove(BlockChunkPath); - } + if (!AbortFlag) + { + Work.ScheduleWork( + WritePool, + [&Work, + &WritePool, + &RemoteContent, + &RemoteLookup, + CacheFolderPath, + &RemoteChunkIndexNeedsCopyFromSourceFlags, + &SequenceIndexChunksLeftToWriteCounters, + BlockIndex, + &BlockDescriptions, + &WriteChunkStats, + &DiskStats, + &WritePartsComplete, + TotalPartWriteCount, + &FilteredWrittenBytesPerSecond, + BlockChunkPath, + BlockBuffer = std::move(BlockBuffer)](std::atomic<bool>&) mutable { + if (!AbortFlag) + { + ZEN_TRACE_CPU("UpdateFolder_WriteFullBlock"); - WritePartsComplete++; + const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex]; - if (WritePartsComplete == TotalPartWriteCount) - { - FilteredWrittenBytesPerSecond.Stop(); - } - } - }, - Work.DefaultErrorFunction()); + if (BlockChunkPath.empty()) + { + ZEN_ASSERT(BlockBuffer); + } + else + { + ZEN_ASSERT(!BlockBuffer); + BlockBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath); + if (!BlockBuffer) + { + throw std::runtime_error( + fmt::format("Could not open dowloaded block {} from {}", + BlockDescription.BlockHash, + BlockChunkPath)); + } + } + + FilteredWrittenBytesPerSecond.Start(); + if (!WriteBlockToDisk(CacheFolderPath, + RemoteContent, + BlockDescription, + SequenceIndexChunksLeftToWriteCounters, + Work, + WritePool, + CompositeBuffer(std::move(BlockBuffer)), + RemoteLookup, + RemoteChunkIndexNeedsCopyFromSourceFlags, + DiskStats)) + { + std::error_code DummyEc; + RemoveFile(BlockChunkPath, DummyEc); + throw std::runtime_error( + fmt::format("Block {} is malformed", BlockDescription.BlockHash)); + } + + if (!BlockChunkPath.empty()) + { + RemoveFile(BlockChunkPath); + } + + WritePartsComplete++; + + if (WritePartsComplete == TotalPartWriteCount) + { + FilteredWrittenBytesPerSecond.Stop(); + } + } + }, + Work.DefaultErrorFunction()); + } + } } } }, @@ -5840,26 +6531,32 @@ namespace { Work.Wait(UsePlainProgress ? 5000 : 200, [&](bool IsAborted, std::ptrdiff_t PendingWork) { ZEN_UNUSED(IsAborted, PendingWork); - ZEN_ASSERT(ChunkCountToWrite >= WriteChunkStats.ChunkCountWritten.load()); uint64_t DownloadedBytes = DownloadStats.DownloadedChunkByteCount.load() + DownloadStats.DownloadedBlockByteCount.load() + +DownloadStats.DownloadedPartialBlockByteCount.load(); FilteredWrittenBytesPerSecond.Update(DiskStats.WriteByteCount.load()); FilteredDownloadedBytesPerSecond.Update(DownloadedBytes); - std::string Details = fmt::format("{}/{} ({} {}bits/s) downloaded. {}/{} ({} {}B/s) written.", - DownloadStats.RequestsCompleteCount.load(), - TotalRequestCount, - NiceBytes(DownloadedBytes), - NiceNum(FilteredDownloadedBytesPerSecond.GetCurrent() * 8), - WriteChunkStats.ChunkCountWritten.load(), - ChunkCountToWrite, - NiceBytes(DiskStats.WriteByteCount.load()), - NiceNum(FilteredWrittenBytesPerSecond.GetCurrent())); + std::string DownloadRateString = + (DownloadStats.RequestsCompleteCount == TotalRequestCount) + ? "" + : fmt::format(" {}bits/s", NiceNum(FilteredDownloadedBytesPerSecond.GetCurrent() * 8)); + std::string WriteDetails = PrimeCacheOnly ? "" + : fmt::format(" {}/{} ({}B/s) written.", + NiceBytes(DiskStats.WriteByteCount.load()), + NiceBytes(BytesToWrite), + NiceNum(FilteredWrittenBytesPerSecond.GetCurrent())); + std::string Details = fmt::format("{}/{} ({}{}) downloaded.{}", + DownloadStats.RequestsCompleteCount.load(), + TotalRequestCount, + NiceBytes(DownloadedBytes), + DownloadRateString, + WriteDetails); WriteProgressBar.UpdateState( - {.Task = "Writing chunks ", + {.Task = PrimeCacheOnly ? "Downloading " : "Writing chunks ", .Details = Details, - .TotalCount = gsl::narrow<uint64_t>(ChunkCountToWrite), - .RemainingCount = gsl::narrow<uint64_t>(ChunkCountToWrite - WriteChunkStats.ChunkCountWritten.load())}, + .TotalCount = PrimeCacheOnly ? TotalRequestCount : BytesToWrite, + .RemainingCount = PrimeCacheOnly ? (TotalRequestCount - DownloadStats.RequestsCompleteCount.load()) + : (BytesToWrite - DiskStats.WriteByteCount.load())}, false); }); } @@ -5874,21 +6571,28 @@ namespace { WriteProgressBar.Finish(); - uint32_t RawSequencesMissingWriteCount = 0; - for (uint32_t SequenceIndex = 0; SequenceIndex < SequenceIndexChunksLeftToWriteCounters.size(); SequenceIndex++) + if (!PrimeCacheOnly) { - const auto& SequenceIndexChunksLeftToWriteCounter = SequenceIndexChunksLeftToWriteCounters[SequenceIndex]; - if (SequenceIndexChunksLeftToWriteCounter.load() != 0) + uint32_t RawSequencesMissingWriteCount = 0; + for (uint32_t SequenceIndex = 0; SequenceIndex < SequenceIndexChunksLeftToWriteCounters.size(); SequenceIndex++) { - RawSequencesMissingWriteCount++; - const uint32_t PathIndex = RemoteLookup.SequenceIndexFirstPathIndex[SequenceIndex]; - const std::filesystem::path& IncompletePath = RemoteContent.Paths[PathIndex]; - ZEN_ASSERT(!IncompletePath.empty()); - const uint32_t ExpectedSequenceCount = RemoteContent.ChunkedContent.ChunkCounts[SequenceIndex]; - ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounter.load() <= ExpectedSequenceCount); + const auto& SequenceIndexChunksLeftToWriteCounter = SequenceIndexChunksLeftToWriteCounters[SequenceIndex]; + if (SequenceIndexChunksLeftToWriteCounter.load() != 0) + { + RawSequencesMissingWriteCount++; + const uint32_t PathIndex = RemoteLookup.SequenceIndexFirstPathIndex[SequenceIndex]; + const std::filesystem::path& IncompletePath = RemoteContent.Paths[PathIndex]; + ZEN_ASSERT(!IncompletePath.empty()); + const uint32_t ExpectedSequenceCount = RemoteContent.ChunkedContent.ChunkCounts[SequenceIndex]; + ZEN_CONSOLE("{}: Max count {}, Current count {}", + IncompletePath, + ExpectedSequenceCount, + SequenceIndexChunksLeftToWriteCounter.load()); + ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounter.load() <= ExpectedSequenceCount); + } } + ZEN_ASSERT(RawSequencesMissingWriteCount == 0); } - ZEN_ASSERT(RawSequencesMissingWriteCount == 0); const uint64_t DownloadedBytes = DownloadStats.DownloadedChunkByteCount.load() + DownloadStats.DownloadedBlockByteCount.load() + +DownloadStats.DownloadedPartialBlockByteCount.load(); @@ -5906,82 +6610,199 @@ namespace { WriteChunkStats.WriteTimeUs = FilteredWrittenBytesPerSecond.GetElapsedTimeUS(); } - // Move all files we will reuse to cache folder - // TODO: If WipeTargetFolder is false we could check which files are already correct and leave them in place - if (!LocalPathIndexesMatchingSequenceIndexes.empty()) + if (PrimeCacheOnly) { - ZEN_TRACE_CPU("UpdateFolder_CacheReused"); - uint64_t TotalFullFileSizeCached = 0; - for (uint32_t LocalPathIndex : LocalPathIndexesMatchingSequenceIndexes) - { - const IoHash& RawHash = LocalContent.RawHashes[LocalPathIndex]; - const std::filesystem::path LocalFilePath = (Path / LocalContent.Paths[LocalPathIndex]).make_preferred(); - const std::filesystem::path CacheFilePath = GetFinalChunkedSequenceFileName(CacheFolderPath, RawHash); - ZEN_ASSERT_SLOW(std::filesystem::exists(LocalFilePath)); - SetFileReadOnly(LocalFilePath, false); - ZEN_ASSERT_SLOW(!std::filesystem::exists(CacheFilePath)); - std::filesystem::rename(LocalFilePath, CacheFilePath); - TotalFullFileSizeCached += std::filesystem::file_size(CacheFilePath); - } - ZEN_CONSOLE("Saved {} ({}) unchanged files in cache", - LocalPathIndexesMatchingSequenceIndexes.size(), - NiceBytes(TotalFullFileSizeCached)); + return; } - if (WipeTargetFolder) - { - ZEN_TRACE_CPU("UpdateFolder_WipeTarget"); - Stopwatch Timer; + tsl::robin_map<uint32_t, uint32_t> RemotePathIndexToLocalPathIndex; + RemotePathIndexToLocalPathIndex.reserve(RemoteContent.Paths.size()); - // Clean target folder - ZEN_CONSOLE("Wiping {}", Path); - if (!CleanDirectory(Path, DefaultExcludeFolders)) - { - ZEN_WARN("Some files in {} could not be removed", Path); - } - RebuildFolderStateStats.CleanFolderElapsedWallTimeUs = Timer.GetElapsedTimeUs(); + tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> SequenceHashToLocalPathIndex; + std::vector<uint32_t> RemoveLocalPathIndexes; + + if (AbortFlag) + { + return; } - else + { - ZEN_TRACE_CPU("UpdateFolder_RemoveUnused"); - Stopwatch Timer; + ZEN_TRACE_CPU("UpdateFolder_PrepareTarget"); - // Remove unused tracked files - tsl::robin_map<std::string, uint32_t> RemotePathToRemoteIndex; + tsl::robin_set<IoHash, IoHash::Hasher> CachedRemoteSequences; + tsl::robin_map<std::string, uint32_t> RemotePathToRemoteIndex; RemotePathToRemoteIndex.reserve(RemoteContent.Paths.size()); for (uint32_t RemotePathIndex = 0; RemotePathIndex < RemoteContent.Paths.size(); RemotePathIndex++) { RemotePathToRemoteIndex.insert({RemoteContent.Paths[RemotePathIndex].generic_string(), RemotePathIndex}); } - std::vector<std::filesystem::path> LocalFilesToRemove; + + std::vector<uint32_t> FilesToCache; + + uint64_t MatchCount = 0; + uint64_t PathMismatchCount = 0; + uint64_t HashMismatchCount = 0; + std::atomic<uint64_t> CachedCount = 0; + std::atomic<uint64_t> CachedByteCount = 0; + uint64_t SkippedCount = 0; + uint64_t DeleteCount = 0; for (uint32_t LocalPathIndex = 0; LocalPathIndex < LocalContent.Paths.size(); LocalPathIndex++) { - if (!RemotePathToRemoteIndex.contains(LocalContent.Paths[LocalPathIndex].generic_string())) + if (AbortFlag) + { + break; + } + const IoHash& RawHash = LocalContent.RawHashes[LocalPathIndex]; + const std::filesystem::path& LocalPath = LocalContent.Paths[LocalPathIndex]; + + ZEN_ASSERT_SLOW(IsFile((Path / LocalContent.Paths[LocalPathIndex]).make_preferred())); + + if (!WipeTargetFolder) { - const std::filesystem::path LocalFilePath = (Path / LocalContent.Paths[LocalPathIndex]).make_preferred(); - if (std::filesystem::exists(LocalFilePath)) + if (auto RemotePathIt = RemotePathToRemoteIndex.find(LocalPath.generic_string()); + RemotePathIt != RemotePathToRemoteIndex.end()) { - LocalFilesToRemove.emplace_back(std::move(LocalFilePath)); + const uint32_t RemotePathIndex = RemotePathIt->second; + if (RemoteContent.RawHashes[RemotePathIndex] == RawHash) + { + // It is already in it's desired place + RemotePathIndexToLocalPathIndex[RemotePathIndex] = LocalPathIndex; + SequenceHashToLocalPathIndex.insert({RawHash, LocalPathIndex}); + MatchCount++; + continue; + } + else + { + HashMismatchCount++; + } + } + else + { + PathMismatchCount++; } } + if (RemoteLookup.RawHashToSequenceIndex.contains(RawHash)) + { + if (!CachedRemoteSequences.contains(RawHash)) + { + ZEN_TRACE_CPU("MoveToCache"); + // We need it + FilesToCache.push_back(LocalPathIndex); + CachedRemoteSequences.insert(RawHash); + } + else + { + // We already have it + SkippedCount++; + } + } + else if (!WipeTargetFolder) + { + // We don't need it + RemoveLocalPathIndexes.push_back(LocalPathIndex); + DeleteCount++; + } + } + + if (AbortFlag) + { + return; } - if (!LocalFilesToRemove.empty()) + { - ZEN_CONSOLE("Cleaning {} removed files from {}", LocalFilesToRemove.size(), Path); - for (const std::filesystem::path& LocalFilePath : LocalFilesToRemove) + ZEN_TRACE_CPU("UpdateFolder_CopyToCache"); + + Stopwatch Timer; + + WorkerThreadPool& WritePool = GetIOWorkerPool(); + + ProgressBar CacheLocalProgressBar(UsePlainProgress); + ParallellWork Work(AbortFlag); + + for (uint32_t LocalPathIndex : FilesToCache) { - SetFileReadOnly(LocalFilePath, false); - std::filesystem::remove(LocalFilePath); + if (AbortFlag) + { + break; + } + Work.ScheduleWork( + WritePool, + [&, LocalPathIndex](std::atomic<bool>&) { + ZEN_TRACE_CPU("UpdateFolder_AsyncCopyToCache"); + if (!AbortFlag) + { + const IoHash& RawHash = LocalContent.RawHashes[LocalPathIndex]; + const std::filesystem::path& LocalPath = LocalContent.Paths[LocalPathIndex]; + const std::filesystem::path CacheFilePath = GetFinalChunkedSequenceFileName(CacheFolderPath, RawHash); + ZEN_ASSERT_SLOW(!IsFile(CacheFilePath)); + const std::filesystem::path LocalFilePath = (Path / LocalPath).make_preferred(); + RenameFileWithRetry(LocalFilePath, CacheFilePath); + CachedCount++; + CachedByteCount += LocalContent.RawSizes[LocalPathIndex]; + } + }, + Work.DefaultErrorFunction()); } + + { + ZEN_TRACE_CPU("CacheLocal_Wait"); + + Work.Wait(UsePlainProgress ? 5000 : 200, [&](bool IsAborted, std::ptrdiff_t PendingWork) { + ZEN_UNUSED(IsAborted, PendingWork); + const uint64_t WorkTotal = FilesToCache.size(); + const uint64_t WorkComplete = CachedCount.load(); + std::string Details = fmt::format("{}/{} ({}) files", WorkComplete, WorkTotal, NiceBytes(CachedByteCount)); + CacheLocalProgressBar.UpdateState({.Task = "Caching local ", + .Details = Details, + .TotalCount = gsl::narrow<uint64_t>(WorkTotal), + .RemainingCount = gsl::narrow<uint64_t>(WorkTotal - WorkComplete)}, + false); + }); + } + + if (AbortFlag) + { + return; + } + + CacheLocalProgressBar.Finish(); + + ZEN_DEBUG( + "Local state prep: Match: {}, PathMismatch: {}, HashMismatch: {}, Cached: {} ({}), Skipped: {}, " + "Delete: {}", + MatchCount, + PathMismatchCount, + HashMismatchCount, + CachedCount.load(), + NiceBytes(CachedByteCount.load()), + SkippedCount, + DeleteCount); + } + } + + if (WipeTargetFolder) + { + ZEN_TRACE_CPU("UpdateFolder_WipeTarget"); + Stopwatch Timer; + + // Clean target folder + if (!CleanDirectory(Path, DefaultExcludeFolders)) + { + ZEN_WARN("Some files in {} could not be removed", Path); } RebuildFolderStateStats.CleanFolderElapsedWallTimeUs = Timer.GetElapsedTimeUs(); } + if (AbortFlag) + { + return; + } + { ZEN_TRACE_CPU("UpdateFolder_FinalizeTree"); Stopwatch Timer; - WorkerThreadPool& WritePool = GetMediumWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); // + WorkerThreadPool& WritePool = GetIOWorkerPool(); ProgressBar RebuildProgressBar(UsePlainProgress); ParallellWork Work(AbortFlag); @@ -5991,16 +6812,52 @@ namespace { OutLocalFolderState.Attributes.resize(RemoteContent.Paths.size()); OutLocalFolderState.ModificationTicks.resize(RemoteContent.Paths.size()); + std::atomic<uint64_t> DeletedCount = 0; + + for (uint32_t LocalPathIndex : RemoveLocalPathIndexes) + { + if (AbortFlag) + { + break; + } + Work.ScheduleWork( + WritePool, + [&, LocalPathIndex](std::atomic<bool>&) { + if (!AbortFlag) + { + const std::filesystem::path LocalFilePath = (Path / LocalContent.Paths[LocalPathIndex]).make_preferred(); + SetFileReadOnlyWithRetry(LocalFilePath, false); + RemoveFileWithRetry(LocalFilePath); + DeletedCount++; + } + }, + Work.DefaultErrorFunction()); + } + std::atomic<uint64_t> TargetsComplete = 0; - std::vector<std::pair<IoHash, uint32_t>> Targets; + struct FinalizeTarget + { + IoHash RawHash; + uint32_t RemotePathIndex; + }; + + std::vector<FinalizeTarget> Targets; Targets.reserve(RemoteContent.Paths.size()); for (uint32_t RemotePathIndex = 0; RemotePathIndex < RemoteContent.Paths.size(); RemotePathIndex++) { - Targets.push_back(std::make_pair(RemoteContent.RawHashes[RemotePathIndex], RemotePathIndex)); + Targets.push_back(FinalizeTarget{.RawHash = RemoteContent.RawHashes[RemotePathIndex], .RemotePathIndex = RemotePathIndex}); } - std::sort(Targets.begin(), Targets.end(), [](const std::pair<IoHash, uint32_t>& Lhs, const std::pair<IoHash, uint32_t>& Rhs) { - return Lhs.first < Rhs.first; + std::sort(Targets.begin(), Targets.end(), [](const FinalizeTarget& Lhs, const FinalizeTarget& Rhs) { + if (Lhs.RawHash < Rhs.RawHash) + { + return true; + } + else if (Lhs.RawHash > Rhs.RawHash) + { + return false; + } + return Lhs.RemotePathIndex < Rhs.RemotePathIndex; }); size_t TargetOffset = 0; @@ -6011,93 +6868,168 @@ namespace { break; } - size_t TargetCount = 1; - const IoHash& RawHash = Targets[TargetOffset].first; - while (Targets[TargetOffset + TargetCount].first == RawHash) + size_t TargetCount = 1; + while (Targets[TargetOffset + TargetCount].RawHash == Targets[TargetOffset].RawHash) { TargetCount++; } Work.ScheduleWork( - WritePool, // GetSyncWorkerPool(),// + WritePool, [&, BaseTargetOffset = TargetOffset, TargetCount](std::atomic<bool>&) { if (!AbortFlag) { ZEN_TRACE_CPU("FinalizeTree_Work"); - size_t TargetOffset = BaseTargetOffset; - const IoHash& RawHash = Targets[TargetOffset].first; - const uint32_t FirstTargetPathIndex = Targets[TargetOffset].second; - const std::filesystem::path& FirstTargetPath = RemoteContent.Paths[FirstTargetPathIndex]; - OutLocalFolderState.Paths[FirstTargetPathIndex] = FirstTargetPath; - OutLocalFolderState.RawSizes[FirstTargetPathIndex] = RemoteContent.RawSizes[FirstTargetPathIndex]; - const std::filesystem::path FirstTargetFilePath = (Path / FirstTargetPath).make_preferred(); + size_t TargetOffset = BaseTargetOffset; + const IoHash& RawHash = Targets[TargetOffset].RawHash; + if (RawHash == IoHash::Zero) { - if (std::filesystem::exists(FirstTargetFilePath)) + ZEN_TRACE_CPU("ZeroSize"); + while (TargetOffset < (BaseTargetOffset + TargetCount)) { - SetFileReadOnly(FirstTargetFilePath, false); - } - CreateDirectories(FirstTargetFilePath.parent_path()); - { - BasicFile OutputFile; - OutputFile.Open(FirstTargetFilePath, BasicFile::Mode::kTruncate); + const uint32_t RemotePathIndex = Targets[TargetOffset].RemotePathIndex; + ZEN_ASSERT(Targets[TargetOffset].RawHash == RawHash); + const std::filesystem::path& TargetPath = RemoteContent.Paths[RemotePathIndex]; + std::filesystem::path TargetFilePath = (Path / TargetPath).make_preferred(); + if (!RemotePathIndexToLocalPathIndex[RemotePathIndex]) + { + if (IsFile(TargetFilePath)) + { + SetFileReadOnlyWithRetry(TargetFilePath, false); + } + else + { + CreateDirectories(TargetFilePath.parent_path()); + } + BasicFile OutputFile; + OutputFile.Open(TargetFilePath, BasicFile::Mode::kTruncate); + } + OutLocalFolderState.Paths[RemotePathIndex] = TargetPath; + OutLocalFolderState.RawSizes[RemotePathIndex] = RemoteContent.RawSizes[RemotePathIndex]; + + OutLocalFolderState.Attributes[RemotePathIndex] = + RemoteContent.Attributes.empty() + ? GetNativeFileAttributes(TargetFilePath) + : SetNativeFileAttributes(TargetFilePath, + RemoteContent.Platform, + RemoteContent.Attributes[RemotePathIndex]); + OutLocalFolderState.ModificationTicks[RemotePathIndex] = GetModificationTickFromPath(TargetFilePath); + + TargetOffset++; + TargetsComplete++; } } else { - ZEN_TRACE_CPU("FinalizeTree_MoveIntoPlace"); - - const std::filesystem::path CacheFilePath = GetFinalChunkedSequenceFileName(CacheFolderPath, RawHash); - ZEN_ASSERT_SLOW(std::filesystem::exists(CacheFilePath)); - CreateDirectories(FirstTargetFilePath.parent_path()); - if (std::filesystem::exists(FirstTargetFilePath)) + ZEN_TRACE_CPU("Files"); + ZEN_ASSERT(RemoteLookup.RawHashToSequenceIndex.contains(RawHash)); + const uint32_t FirstRemotePathIndex = Targets[TargetOffset].RemotePathIndex; + const std::filesystem::path& FirstTargetPath = RemoteContent.Paths[FirstRemotePathIndex]; + std::filesystem::path FirstTargetFilePath = (Path / FirstTargetPath).make_preferred(); + + if (auto InPlaceIt = RemotePathIndexToLocalPathIndex.find(FirstRemotePathIndex); + InPlaceIt != RemotePathIndexToLocalPathIndex.end()) { - SetFileReadOnly(FirstTargetFilePath, false); + ZEN_ASSERT_SLOW(IsFile(FirstTargetFilePath)); } - std::filesystem::rename(CacheFilePath, FirstTargetFilePath); - RebuildFolderStateStats.FinalizeTreeFilesMovedCount++; - } + else + { + if (IsFile(FirstTargetFilePath)) + { + SetFileReadOnlyWithRetry(FirstTargetFilePath, false); + } + else + { + CreateDirectories(FirstTargetFilePath.parent_path()); + } - OutLocalFolderState.Attributes[FirstTargetPathIndex] = - RemoteContent.Attributes.empty() ? GetNativeFileAttributes(FirstTargetFilePath) - : SetNativeFileAttributes(FirstTargetFilePath, - RemoteContent.Platform, - RemoteContent.Attributes[FirstTargetPathIndex]); - OutLocalFolderState.ModificationTicks[FirstTargetPathIndex] = GetModificationTickFromPath(FirstTargetFilePath); + if (auto InplaceIt = SequenceHashToLocalPathIndex.find(RawHash); + InplaceIt != SequenceHashToLocalPathIndex.end()) + { + ZEN_TRACE_CPU("Copy"); + const uint32_t LocalPathIndex = InplaceIt->second; + const std::filesystem::path& SourcePath = LocalContent.Paths[LocalPathIndex]; + std::filesystem::path SourceFilePath = (Path / SourcePath).make_preferred(); + ZEN_ASSERT_SLOW(IsFile(SourceFilePath)); + + ZEN_DEBUG("Copying from '{}' -> '{}'", SourceFilePath, FirstTargetFilePath); + CopyFile(SourceFilePath, FirstTargetFilePath, {.EnableClone = false}); + RebuildFolderStateStats.FinalizeTreeFilesCopiedCount++; + } + else + { + ZEN_TRACE_CPU("Rename"); + const std::filesystem::path CacheFilePath = + GetFinalChunkedSequenceFileName(CacheFolderPath, RawHash); + ZEN_ASSERT_SLOW(IsFile(CacheFilePath)); - TargetOffset++; - TargetsComplete++; - while (TargetOffset < (BaseTargetOffset + TargetCount)) - { - ZEN_TRACE_CPU("FinalizeTree_Copy"); - - ZEN_ASSERT(Targets[TargetOffset].first == RawHash); - ZEN_ASSERT_SLOW(std::filesystem::exists(FirstTargetFilePath)); - const uint32_t ExtraTargetPathIndex = Targets[TargetOffset].second; - const std::filesystem::path& ExtraTargetPath = RemoteContent.Paths[ExtraTargetPathIndex]; - const std::filesystem::path ExtraTargetFilePath = (Path / ExtraTargetPath).make_preferred(); - OutLocalFolderState.Paths[ExtraTargetPathIndex] = ExtraTargetPath; - OutLocalFolderState.RawSizes[ExtraTargetPathIndex] = RemoteContent.RawSizes[ExtraTargetPathIndex]; - CreateDirectories(ExtraTargetFilePath.parent_path()); - if (std::filesystem::exists(ExtraTargetFilePath)) - { - SetFileReadOnly(ExtraTargetFilePath, false); + RenameFileWithRetry(CacheFilePath, FirstTargetFilePath); + + RebuildFolderStateStats.FinalizeTreeFilesMovedCount++; + } } - CopyFile(FirstTargetFilePath, ExtraTargetFilePath, {.EnableClone = false}); - RebuildFolderStateStats.FinalizeTreeFilesCopiedCount++; - OutLocalFolderState.Attributes[ExtraTargetPathIndex] = + OutLocalFolderState.Paths[FirstRemotePathIndex] = FirstTargetPath; + OutLocalFolderState.RawSizes[FirstRemotePathIndex] = RemoteContent.RawSizes[FirstRemotePathIndex]; + + OutLocalFolderState.Attributes[FirstRemotePathIndex] = RemoteContent.Attributes.empty() - ? GetNativeFileAttributes(ExtraTargetFilePath) - : SetNativeFileAttributes(ExtraTargetFilePath, + ? GetNativeFileAttributes(FirstTargetFilePath) + : SetNativeFileAttributes(FirstTargetFilePath, RemoteContent.Platform, - RemoteContent.Attributes[ExtraTargetPathIndex]); - OutLocalFolderState.ModificationTicks[ExtraTargetPathIndex] = - GetModificationTickFromPath(ExtraTargetFilePath); + RemoteContent.Attributes[FirstRemotePathIndex]); + OutLocalFolderState.ModificationTicks[FirstRemotePathIndex] = + GetModificationTickFromPath(FirstTargetFilePath); TargetOffset++; TargetsComplete++; + + while (TargetOffset < (BaseTargetOffset + TargetCount)) + { + const uint32_t RemotePathIndex = Targets[TargetOffset].RemotePathIndex; + ZEN_ASSERT(Targets[TargetOffset].RawHash == RawHash); + const std::filesystem::path& TargetPath = RemoteContent.Paths[RemotePathIndex]; + std::filesystem::path TargetFilePath = (Path / TargetPath).make_preferred(); + + if (auto InPlaceIt = RemotePathIndexToLocalPathIndex.find(RemotePathIndex); + InPlaceIt != RemotePathIndexToLocalPathIndex.end()) + { + ZEN_ASSERT_SLOW(IsFile(TargetFilePath)); + } + else + { + ZEN_TRACE_CPU("Copy"); + if (IsFile(TargetFilePath)) + { + SetFileReadOnlyWithRetry(TargetFilePath, false); + } + else + { + CreateDirectories(TargetFilePath.parent_path()); + } + + ZEN_ASSERT_SLOW(IsFile(FirstTargetFilePath)); + ZEN_DEBUG("Copying from '{}' -> '{}'", FirstTargetFilePath, TargetFilePath); + CopyFile(FirstTargetFilePath, TargetFilePath, {.EnableClone = false}); + RebuildFolderStateStats.FinalizeTreeFilesCopiedCount++; + } + + OutLocalFolderState.Paths[RemotePathIndex] = TargetPath; + OutLocalFolderState.RawSizes[RemotePathIndex] = RemoteContent.RawSizes[RemotePathIndex]; + + OutLocalFolderState.Attributes[RemotePathIndex] = + RemoteContent.Attributes.empty() + ? GetNativeFileAttributes(TargetFilePath) + : SetNativeFileAttributes(TargetFilePath, + RemoteContent.Platform, + RemoteContent.Attributes[RemotePathIndex]); + OutLocalFolderState.ModificationTicks[RemotePathIndex] = GetModificationTickFromPath(TargetFilePath); + + TargetOffset++; + TargetsComplete++; + } } } }, @@ -6111,11 +7043,13 @@ namespace { Work.Wait(UsePlainProgress ? 5000 : 200, [&](bool IsAborted, std::ptrdiff_t PendingWork) { ZEN_UNUSED(IsAborted, PendingWork); - std::string Details = fmt::format("{}/{} files", TargetsComplete.load(), Targets.size()); + const uint64_t WorkTotal = Targets.size() + RemoveLocalPathIndexes.size(); + const uint64_t WorkComplete = TargetsComplete.load() + DeletedCount.load(); + std::string Details = fmt::format("{}/{} files", WorkComplete, WorkTotal); RebuildProgressBar.UpdateState({.Task = "Rebuilding state ", .Details = Details, - .TotalCount = gsl::narrow<uint64_t>(Targets.size()), - .RemainingCount = gsl::narrow<uint64_t>(Targets.size() - TargetsComplete.load())}, + .TotalCount = gsl::narrow<uint64_t>(WorkTotal), + .RemainingCount = gsl::narrow<uint64_t>(WorkTotal - WorkComplete)}, false); }); } @@ -6131,6 +7065,107 @@ namespace { } } + std::string GetCbObjectAsNiceString(CbObjectView Object, std::string_view Prefix, std::string_view Suffix) + { + ExtendableStringBuilder<512> SB; + std::vector<std::pair<std::string, std::string>> NameStringValuePairs; + for (CbFieldView Field : Object) + { + std::string_view Name = Field.GetName(); + switch (CbValue Accessor = Field.GetValue(); Accessor.GetType()) + { + case CbFieldType::String: + NameStringValuePairs.push_back({std::string(Name), std::string(Accessor.AsString())}); + break; + case CbFieldType::IntegerPositive: + NameStringValuePairs.push_back({std::string(Name), fmt::format("{}", Accessor.AsIntegerPositive())}); + break; + case CbFieldType::IntegerNegative: + NameStringValuePairs.push_back({std::string(Name), fmt::format("{}", Accessor.AsIntegerNegative())}); + break; + case CbFieldType::Float32: + { + const float Value = Accessor.AsFloat32(); + if (std::isfinite(Value)) + { + NameStringValuePairs.push_back({std::string(Name), fmt::format("{:.9g}", Value)}); + } + else + { + NameStringValuePairs.push_back({std::string(Name), "null"}); + } + } + break; + case CbFieldType::Float64: + { + const double Value = Accessor.AsFloat64(); + if (std::isfinite(Value)) + { + NameStringValuePairs.push_back({std::string(Name), fmt::format("{:.17g}", Value)}); + } + else + { + NameStringValuePairs.push_back({std::string(Name), "null"}); + } + } + break; + case CbFieldType::BoolFalse: + NameStringValuePairs.push_back({std::string(Name), "false"}); + break; + case CbFieldType::BoolTrue: + NameStringValuePairs.push_back({std::string(Name), "true"}); + break; + case CbFieldType::Hash: + { + NameStringValuePairs.push_back({std::string(Name), Accessor.AsHash().ToHexString()}); + } + break; + case CbFieldType::Uuid: + { + StringBuilder<Oid::StringLength + 1> Builder; + Accessor.AsUuid().ToString(Builder); + NameStringValuePairs.push_back({std::string(Name), Builder.ToString()}); + } + break; + case CbFieldType::DateTime: + { + ExtendableStringBuilder<64> Builder; + Builder << DateTime(Accessor.AsDateTimeTicks()).ToIso8601(); + NameStringValuePairs.push_back({std::string(Name), Builder.ToString()}); + } + break; + case CbFieldType::TimeSpan: + { + ExtendableStringBuilder<64> Builder; + const TimeSpan Span(Accessor.AsTimeSpanTicks()); + if (Span.GetDays() == 0) + { + Builder << Span.ToString("%h:%m:%s.%n"); + } + else + { + Builder << Span.ToString("%d.%h:%m:%s.%n"); + } + NameStringValuePairs.push_back({std::string(Name), Builder.ToString()}); + break; + } + case CbFieldType::ObjectId: + NameStringValuePairs.push_back({std::string(Name), Accessor.AsObjectId().ToString()}); + break; + } + } + std::string::size_type LongestKey = 0; + for (const std::pair<std::string, std::string>& KeyValue : NameStringValuePairs) + { + LongestKey = Max(KeyValue.first.length(), LongestKey); + } + for (const std::pair<std::string, std::string>& KeyValue : NameStringValuePairs) + { + SB.Append(fmt::format("{}{:<{}}: {}{}", Prefix, KeyValue.first, LongestKey, KeyValue.second, Suffix)); + } + return SB.ToString(); + } + std::vector<std::pair<Oid, std::string>> ResolveBuildPartNames(BuildStorage& Storage, const Oid& BuildId, const std::vector<Oid>& BuildPartIds, @@ -6140,17 +7175,13 @@ namespace { std::vector<std::pair<Oid, std::string>> Result; { Stopwatch GetBuildTimer; - - std::vector<std::pair<Oid, std::string>> AvailableParts; - - CbObject BuildObject = Storage.GetBuild(BuildId); - + CbObject BuildObject = Storage.GetBuild(BuildId); ZEN_CONSOLE("GetBuild took {}. Name: '{}', Payload size: {}", NiceTimeSpanMs(GetBuildTimer.GetElapsedTimeMs()), - BuildObject["BuildName"sv].AsString(), + BuildObject["name"sv].AsString(), NiceBytes(BuildObject.GetSize())); - ZEN_DEBUG("Build object: {}", BuildObject); + ZEN_CONSOLE("{}", GetCbObjectAsNiceString(BuildObject, " "sv, "\n"sv)); CbObjectView PartsObject = BuildObject["parts"sv].AsObjectView(); if (!PartsObject) @@ -6160,6 +7191,8 @@ namespace { OutPreferredMultipartChunkSize = BuildObject["chunkSize"sv].AsUInt64(OutPreferredMultipartChunkSize); + std::vector<std::pair<Oid, std::string>> AvailableParts; + for (CbFieldView PartView : PartsObject) { const std::string BuildPartName = std::string(PartView.GetName()); @@ -6221,7 +7254,7 @@ namespace { return Result; } - ChunkedFolderContent GetRemoteContent(BuildStorage& Storage, + ChunkedFolderContent GetRemoteContent(StorageInstance& Storage, const Oid& BuildId, const std::vector<std::pair<Oid, std::string>>& BuildParts, std::unique_ptr<ChunkingController>& OutChunkController, @@ -6234,12 +7267,13 @@ namespace { Stopwatch GetBuildPartTimer; const Oid BuildPartId = BuildParts[0].first; const std::string_view BuildPartName = BuildParts[0].second; - CbObject BuildPartManifest = Storage.GetBuildPart(BuildId, BuildPartId); + CbObject BuildPartManifest = Storage.BuildStorage->GetBuildPart(BuildId, BuildPartId); ZEN_CONSOLE("GetBuildPart {} ('{}') took {}. Payload size: {}", BuildPartId, BuildPartName, NiceTimeSpanMs(GetBuildPartTimer.GetElapsedTimeMs()), NiceBytes(BuildPartManifest.GetSize())); + ZEN_CONSOLE("{}", GetCbObjectAsNiceString(BuildPartManifest, " "sv, "\n"sv)); { CbObjectView Chunker = BuildPartManifest["chunker"sv].AsObjectView(); @@ -6248,7 +7282,7 @@ namespace { OutChunkController = CreateChunkingController(ChunkerName, Parameters); } - auto ParseBuildPartManifest = [](BuildStorage& Storage, + auto ParseBuildPartManifest = [](StorageInstance& Storage, const Oid& BuildId, const Oid& BuildPartId, CbObject BuildPartManifest, @@ -6274,12 +7308,103 @@ namespace { // TODO: GetBlockDescriptions for all BlockRawHashes in one go - check for local block descriptions when we cache them - Stopwatch GetBlockMetadataTimer; - OutBlockDescriptions = Storage.GetBlockMetadata(BuildId, BlockRawHashes); - ZEN_CONSOLE("GetBlockMetadata for {} took {}. Found {} blocks", - BuildPartId, - NiceTimeSpanMs(GetBlockMetadataTimer.GetElapsedTimeMs()), - OutBlockDescriptions.size()); + { + Stopwatch GetBlockMetadataTimer; + + std::vector<ChunkBlockDescription> UnorderedList; + tsl::robin_map<IoHash, size_t, IoHash::Hasher> BlockDescriptionLookup; + if (Storage.BuildCacheStorage) + { + std::vector<CbObject> CacheBlockMetadatas = Storage.BuildCacheStorage->GetBlobMetadatas(BuildId, BlockRawHashes); + UnorderedList.reserve(CacheBlockMetadatas.size()); + for (size_t CacheBlockMetadataIndex = 0; CacheBlockMetadataIndex < CacheBlockMetadatas.size(); + CacheBlockMetadataIndex++) + { + const CbObject& CacheBlockMetadata = CacheBlockMetadatas[CacheBlockMetadataIndex]; + ChunkBlockDescription Description = ParseChunkBlockDescription(CacheBlockMetadata); + if (Description.BlockHash == IoHash::Zero) + { + ZEN_WARN("Unexpected/invalid block metadata received from remote cache, skipping block"); + } + else + { + UnorderedList.emplace_back(std::move(Description)); + } + } + for (size_t DescriptionIndex = 0; DescriptionIndex < UnorderedList.size(); DescriptionIndex++) + { + const ChunkBlockDescription& Description = UnorderedList[DescriptionIndex]; + BlockDescriptionLookup.insert_or_assign(Description.BlockHash, DescriptionIndex); + } + } + + if (UnorderedList.size() < BlockRawHashes.size()) + { + std::vector<IoHash> RemainingBlockHashes; + RemainingBlockHashes.reserve(BlockRawHashes.size() - UnorderedList.size()); + for (const IoHash& BlockRawHash : BlockRawHashes) + { + if (!BlockDescriptionLookup.contains(BlockRawHash)) + { + RemainingBlockHashes.push_back(BlockRawHash); + } + } + CbObject BlockMetadatas = Storage.BuildStorage->GetBlockMetadatas(BuildId, RemainingBlockHashes); + std::vector<ChunkBlockDescription> RemainingList; + { + CbArrayView BlocksArray = BlockMetadatas["blocks"sv].AsArrayView(); + std::vector<IoHash> FoundBlockHashes; + std::vector<CbObject> FoundBlockMetadatas; + for (CbFieldView Block : BlocksArray) + { + ChunkBlockDescription Description = ParseChunkBlockDescription(Block.AsObjectView()); + + if (Description.BlockHash == IoHash::Zero) + { + ZEN_WARN("Unexpected/invalid block metadata received from remote store, skipping block"); + } + else + { + if (Storage.BuildCacheStorage) + { + UniqueBuffer MetaBuffer = UniqueBuffer::Alloc(Block.GetSize()); + Block.CopyTo(MetaBuffer.GetMutableView()); + CbObject BlockMetadata(MetaBuffer.MoveToShared()); + + FoundBlockHashes.push_back(Description.BlockHash); + FoundBlockMetadatas.push_back(BlockMetadata); + } + RemainingList.emplace_back(std::move(Description)); + } + } + if (Storage.BuildCacheStorage && !FoundBlockHashes.empty()) + { + Storage.BuildCacheStorage->PutBlobMetadatas(BuildId, FoundBlockHashes, FoundBlockMetadatas); + } + } + + for (size_t DescriptionIndex = 0; DescriptionIndex < RemainingList.size(); DescriptionIndex++) + { + const ChunkBlockDescription& Description = RemainingList[DescriptionIndex]; + BlockDescriptionLookup.insert_or_assign(Description.BlockHash, UnorderedList.size() + DescriptionIndex); + } + UnorderedList.insert(UnorderedList.end(), RemainingList.begin(), RemainingList.end()); + } + + OutBlockDescriptions.reserve(BlockDescriptionLookup.size()); + for (const IoHash& BlockHash : BlockRawHashes) + { + if (auto It = BlockDescriptionLookup.find(BlockHash); It != BlockDescriptionLookup.end()) + { + OutBlockDescriptions.push_back(std::move(UnorderedList[It->second])); + } + } + + ZEN_CONSOLE("GetBlockMetadata for {} took {}. Found {} blocks", + BuildPartId, + NiceTimeSpanMs(GetBlockMetadataTimer.GetElapsedTimeMs()), + OutBlockDescriptions.size()); + } if (OutBlockDescriptions.size() != BlockRawHashes.size()) { @@ -6292,7 +7417,8 @@ namespace { ZEN_CONSOLE("{} Attemping fallback options.", ErrorDescription); std::vector<ChunkBlockDescription> AugmentedBlockDescriptions; AugmentedBlockDescriptions.reserve(BlockRawHashes.size()); - std::vector<ChunkBlockDescription> FoundBlocks = Storage.FindBlocks(BuildId); + std::vector<ChunkBlockDescription> FoundBlocks = + ParseChunkBlockDescriptionList(Storage.BuildStorage->FindBlocks(BuildId, (uint64_t)-1)); for (const IoHash& BlockHash : BlockRawHashes) { @@ -6315,7 +7441,7 @@ namespace { } else { - IoBuffer BlockBuffer = Storage.GetBuildBlob(BuildId, BlockHash); + IoBuffer BlockBuffer = Storage.BuildStorage->GetBuildBlob(BuildId, BlockHash); if (!BlockBuffer) { throw std::runtime_error(fmt::format("Block {} could not be found", BlockHash)); @@ -6381,7 +7507,7 @@ namespace { const Oid& OverlayBuildPartId = BuildParts[PartIndex].first; const std::string& OverlayBuildPartName = BuildParts[PartIndex].second; Stopwatch GetOverlayBuildPartTimer; - CbObject OverlayBuildPartManifest = Storage.GetBuildPart(BuildId, OverlayBuildPartId); + CbObject OverlayBuildPartManifest = Storage.BuildStorage->GetBuildPart(BuildId, OverlayBuildPartId); ZEN_CONSOLE("GetBuildPart {} ('{}') took {}. Payload size: {}", OverlayBuildPartId, OverlayBuildPartName, @@ -6447,64 +7573,21 @@ namespace { ChunkedFolderContent GetLocalContent(GetFolderContentStatistics& LocalFolderScanStats, ChunkingStatistics& ChunkingStats, const std::filesystem::path& Path, - ChunkingController& ChunkController) + const std::filesystem::path& ZenFolderPath, + ChunkingController& ChunkController, + const ChunkedFolderContent& ReferenceContent, + FolderContent& OutLocalFolderContent) { + FolderContent LocalFolderState; ChunkedFolderContent LocalContent; - auto IsAcceptedFolder = [ExcludeFolders = DefaultExcludeFolders](const std::string_view& RelativePath) -> bool { - for (const std::string_view& ExcludeFolder : ExcludeFolders) - { - if (RelativePath.starts_with(ExcludeFolder)) - { - if (RelativePath.length() == ExcludeFolder.length()) - { - return false; - } - else if (RelativePath[ExcludeFolder.length()] == '/') - { - return false; - } - } - } - return true; - }; - - auto IsAcceptedFile = [ExcludeExtensions = - DefaultExcludeExtensions](const std::string_view& RelativePath, uint64_t, uint32_t) -> bool { - for (const std::string_view& ExcludeExtension : ExcludeExtensions) - { - if (RelativePath.ends_with(ExcludeExtension)) - { - return false; - } - } - return true; - }; - - FolderContent CurrentLocalFolderContent = GetFolderContent( - LocalFolderScanStats, - Path, - std::move(IsAcceptedFolder), - std::move(IsAcceptedFile), - GetMediumWorkerPool(EWorkloadType::Burst), - UsePlainProgress ? 5000 : 200, - [&](bool, std::ptrdiff_t) { ZEN_DEBUG("Found {} files in '{}'...", LocalFolderScanStats.AcceptedFileCount.load(), Path); }, - AbortFlag); - if (AbortFlag) - { - return {}; - } - - FolderContent LocalFolderState; - - bool ScanContent = true; - std::vector<uint32_t> PathIndexesOufOfDate; - if (std::filesystem::is_regular_file(Path / ZenStateFilePath)) + bool HasLocalState = false; + if (IsFile(ZenStateFilePath(ZenFolderPath))) { try { Stopwatch ReadStateTimer; - CbObject CurrentStateObject = LoadCompactBinaryObject(Path / ZenStateFilePath).Object; + CbObject CurrentStateObject = LoadCompactBinaryObject(ZenStateFilePath(ZenFolderPath)).Object; if (CurrentStateObject) { Oid CurrentBuildId; @@ -6530,124 +7613,240 @@ namespace { MergeChunkedFolderContents(SavedPartContents[0], std::span<const ChunkedFolderContent>(SavedPartContents).subspan(1)); } + HasLocalState = true; + } + } + } + ZEN_CONSOLE("Read local state in {}", NiceTimeSpanMs(ReadStateTimer.GetElapsedTimeMs())); + } + catch (const std::exception& Ex) + { + ZEN_CONSOLE("Failed reading state file, falling back to scannning. Reason: {}", Ex.what()); + } + } - if (!LocalFolderState.AreKnownFilesEqual(CurrentLocalFolderContent)) - { - const size_t LocaStatePathCount = LocalFolderState.Paths.size(); - std::vector<std::filesystem::path> DeletedPaths; - FolderContent UpdatedContent = GetUpdatedContent(LocalFolderState, CurrentLocalFolderContent, DeletedPaths); - if (!DeletedPaths.empty()) - { - LocalContent = DeletePathsFromChunkedContent(LocalContent, DeletedPaths); - } + { + const uint32_t LocalPathCount = gsl::narrow<uint32_t>(ReferenceContent.Paths.size()); + const uint32_t RemotePathCount = gsl::narrow<uint32_t>(LocalFolderState.Paths.size()); - ZEN_CONSOLE("Updating state, {} local files deleted and {} local files updated out of {}", - DeletedPaths.size(), - UpdatedContent.Paths.size(), - LocaStatePathCount); - if (UpdatedContent.Paths.size() > 0) - { - uint64_t ByteCountToScan = 0; - for (const uint64_t RawSize : UpdatedContent.RawSizes) - { - ByteCountToScan += RawSize; - } - ProgressBar ProgressBar(false); - FilteredRate FilteredBytesHashed; - FilteredBytesHashed.Start(); - ChunkedFolderContent UpdatedLocalContent = ChunkFolderContent( - ChunkingStats, - GetMediumWorkerPool(EWorkloadType::Burst), - Path, - UpdatedContent, - ChunkController, - UsePlainProgress ? 5000 : 200, - [&](bool, std::ptrdiff_t) { - FilteredBytesHashed.Update(ChunkingStats.BytesHashed.load()); - std::string Details = fmt::format("{}/{} ({}/{}, {}B/s) scanned, {} ({}) chunks found", - ChunkingStats.FilesProcessed.load(), - UpdatedContent.Paths.size(), - NiceBytes(ChunkingStats.BytesHashed.load()), - NiceBytes(ByteCountToScan), - NiceNum(FilteredBytesHashed.GetCurrent()), - ChunkingStats.UniqueChunksFound.load(), - NiceBytes(ChunkingStats.UniqueBytesFound.load())); - ProgressBar.UpdateState({.Task = "Scanning files ", - .Details = Details, - .TotalCount = ByteCountToScan, - .RemainingCount = ByteCountToScan - ChunkingStats.BytesHashed.load()}, - false); - }, - AbortFlag); - if (AbortFlag) - { - return {}; - } - FilteredBytesHashed.Stop(); - ProgressBar.Finish(); - LocalContent = MergeChunkedFolderContents(LocalContent, {{UpdatedLocalContent}}); - } - } - else + std::vector<std::filesystem::path> PathsToCheck; + PathsToCheck.reserve(LocalPathCount + RemotePathCount); + + tsl::robin_set<std::string> FileSet; + FileSet.reserve(LocalPathCount + RemotePathCount); + + for (const std::filesystem::path& LocalPath : LocalFolderState.Paths) + { + FileSet.insert(LocalPath.generic_string()); + PathsToCheck.push_back(LocalPath); + } + + for (const std::filesystem::path& RemotePath : ReferenceContent.Paths) + { + if (FileSet.insert(RemotePath.generic_string()).second) + { + PathsToCheck.push_back(RemotePath); + } + } + + const uint32_t PathCount = gsl::narrow<uint32_t>(PathsToCheck.size()); + + OutLocalFolderContent.Paths.resize(PathCount); + OutLocalFolderContent.RawSizes.resize(PathCount); + OutLocalFolderContent.Attributes.resize(PathCount); + OutLocalFolderContent.ModificationTicks.resize(PathCount); + + { + Stopwatch Timer; + auto _ = + MakeGuard([&LocalFolderScanStats, &Timer]() { LocalFolderScanStats.ElapsedWallTimeUS = Timer.GetElapsedTimeUs(); }); + + ProgressBar ProgressBar(UsePlainProgress); + + ParallellWork Work(AbortFlag); + std::atomic<uint64_t> CompletedPathCount = 0; + uint32_t PathIndex = 0; + + while (PathIndex < PathCount) + { + uint32_t PathRangeCount = Min(128u, PathCount - PathIndex); + Work.ScheduleWork( + GetIOWorkerPool(), + [PathIndex, + PathRangeCount, + &PathsToCheck, + &Path, + &OutLocalFolderContent, + &CompletedPathCount, + &LocalFolderScanStats](std::atomic<bool>&) { + for (uint32_t PathRangeIndex = PathIndex; PathRangeIndex < PathIndex + PathRangeCount; PathRangeIndex++) { - // Remove files from LocalContent no longer in LocalFolderState - tsl::robin_set<std::string> LocalFolderPaths; - LocalFolderPaths.reserve(LocalFolderState.Paths.size()); - for (const std::filesystem::path& LocalFolderPath : LocalFolderState.Paths) - { - LocalFolderPaths.insert(LocalFolderPath.generic_string()); - } - std::vector<std::filesystem::path> DeletedPaths; - for (const std::filesystem::path& LocalContentPath : LocalContent.Paths) + const std::filesystem::path& FilePath = PathsToCheck[PathRangeIndex]; + std::filesystem::path LocalFilePath = (Path / FilePath).make_preferred(); + if (TryGetFileProperties(LocalFilePath, + OutLocalFolderContent.RawSizes[PathRangeIndex], + OutLocalFolderContent.ModificationTicks[PathRangeIndex], + OutLocalFolderContent.Attributes[PathRangeIndex])) { - if (!LocalFolderPaths.contains(LocalContentPath.generic_string())) - { - DeletedPaths.push_back(LocalContentPath); - } + OutLocalFolderContent.Paths[PathRangeIndex] = std::move(FilePath); + LocalFolderScanStats.FoundFileCount++; + LocalFolderScanStats.FoundFileByteCount += OutLocalFolderContent.RawSizes[PathRangeIndex]; + LocalFolderScanStats.AcceptedFileCount++; + LocalFolderScanStats.AcceptedFileByteCount += OutLocalFolderContent.RawSizes[PathRangeIndex]; } - if (!DeletedPaths.empty()) - { - LocalContent = DeletePathsFromChunkedContent(LocalContent, DeletedPaths); - } - - ZEN_CONSOLE("Using cached local state"); + CompletedPathCount++; } - ZEN_CONSOLE("Read local state in {}", NiceTimeSpanMs(ReadStateTimer.GetElapsedTimeMs())); - ScanContent = false; - } + }, + Work.DefaultErrorFunction()); + PathIndex += PathRangeCount; + } + Work.Wait(200, [&](bool, ptrdiff_t) { + // FilteredBytesHashed.Update(ChunkingStats.BytesHashed.load()); + std::string Details = fmt::format("{}/{} checked, {} found", + CompletedPathCount.load(), + PathCount, + LocalFolderScanStats.FoundFileCount.load()); + ProgressBar.UpdateState({.Task = "Checking files ", + .Details = Details, + .TotalCount = PathCount, + .RemainingCount = PathCount - CompletedPathCount.load()}, + false); + }); + ProgressBar.Finish(); + } + + uint32_t WritePathIndex = 0; + for (uint32_t ReadPathIndex = 0; ReadPathIndex < PathCount; ReadPathIndex++) + { + if (!OutLocalFolderContent.Paths[ReadPathIndex].empty()) + { + if (WritePathIndex < ReadPathIndex) + { + OutLocalFolderContent.Paths[WritePathIndex] = std::move(OutLocalFolderContent.Paths[ReadPathIndex]); + OutLocalFolderContent.RawSizes[WritePathIndex] = OutLocalFolderContent.RawSizes[ReadPathIndex]; + OutLocalFolderContent.Attributes[WritePathIndex] = OutLocalFolderContent.Attributes[ReadPathIndex]; + OutLocalFolderContent.ModificationTicks[WritePathIndex] = OutLocalFolderContent.ModificationTicks[ReadPathIndex]; } + WritePathIndex++; } } - catch (const std::exception& Ex) + + OutLocalFolderContent.Paths.resize(WritePathIndex); + OutLocalFolderContent.RawSizes.resize(WritePathIndex); + OutLocalFolderContent.Attributes.resize(WritePathIndex); + OutLocalFolderContent.ModificationTicks.resize(WritePathIndex); + } + + bool ScanContent = true; + std::vector<uint32_t> PathIndexesOufOfDate; + if (HasLocalState) + { + if (!LocalFolderState.AreKnownFilesEqual(OutLocalFolderContent)) { - ZEN_CONSOLE("Failed reading state file, falling back to scannning. Reason: {}", Ex.what()); + const size_t LocaStatePathCount = LocalFolderState.Paths.size(); + std::vector<std::filesystem::path> DeletedPaths; + FolderContent UpdatedContent = GetUpdatedContent(LocalFolderState, OutLocalFolderContent, DeletedPaths); + if (!DeletedPaths.empty()) + { + LocalContent = DeletePathsFromChunkedContent(LocalContent, DeletedPaths); + } + + ZEN_CONSOLE("Updating state, {} local files deleted and {} local files updated out of {}", + DeletedPaths.size(), + UpdatedContent.Paths.size(), + LocaStatePathCount); + if (UpdatedContent.Paths.size() > 0) + { + uint64_t ByteCountToScan = 0; + for (const uint64_t RawSize : UpdatedContent.RawSizes) + { + ByteCountToScan += RawSize; + } + ProgressBar ProgressBar(false); + FilteredRate FilteredBytesHashed; + FilteredBytesHashed.Start(); + ChunkedFolderContent UpdatedLocalContent = ChunkFolderContent( + ChunkingStats, + GetIOWorkerPool(), + Path, + UpdatedContent, + ChunkController, + UsePlainProgress ? 5000 : 200, + [&](bool, std::ptrdiff_t) { + FilteredBytesHashed.Update(ChunkingStats.BytesHashed.load()); + std::string Details = fmt::format("{}/{} ({}/{}, {}B/s) scanned, {} ({}) chunks found", + ChunkingStats.FilesProcessed.load(), + UpdatedContent.Paths.size(), + NiceBytes(ChunkingStats.BytesHashed.load()), + NiceBytes(ByteCountToScan), + NiceNum(FilteredBytesHashed.GetCurrent()), + ChunkingStats.UniqueChunksFound.load(), + NiceBytes(ChunkingStats.UniqueBytesFound.load())); + ProgressBar.UpdateState({.Task = "Scanning files ", + .Details = Details, + .TotalCount = ByteCountToScan, + .RemainingCount = ByteCountToScan - ChunkingStats.BytesHashed.load()}, + false); + }, + AbortFlag); + if (AbortFlag) + { + return {}; + } + FilteredBytesHashed.Stop(); + ProgressBar.Finish(); + LocalContent = MergeChunkedFolderContents(LocalContent, {{UpdatedLocalContent}}); + } + } + else + { + // Remove files from LocalContent no longer in LocalFolderState + tsl::robin_set<std::string> LocalFolderPaths; + LocalFolderPaths.reserve(LocalFolderState.Paths.size()); + for (const std::filesystem::path& LocalFolderPath : LocalFolderState.Paths) + { + LocalFolderPaths.insert(LocalFolderPath.generic_string()); + } + std::vector<std::filesystem::path> DeletedPaths; + for (const std::filesystem::path& LocalContentPath : LocalContent.Paths) + { + if (!LocalFolderPaths.contains(LocalContentPath.generic_string())) + { + DeletedPaths.push_back(LocalContentPath); + } + } + if (!DeletedPaths.empty()) + { + LocalContent = DeletePathsFromChunkedContent(LocalContent, DeletedPaths); + } } + ScanContent = false; } if (ScanContent) { uint64_t ByteCountToScan = 0; - for (const uint64_t RawSize : CurrentLocalFolderContent.RawSizes) + for (const uint64_t RawSize : OutLocalFolderContent.RawSizes) { ByteCountToScan += RawSize; } ProgressBar ProgressBar(false); FilteredRate FilteredBytesHashed; FilteredBytesHashed.Start(); - ChunkedFolderContent UpdatedLocalContent = ChunkFolderContent( + LocalContent = ChunkFolderContent( ChunkingStats, - GetMediumWorkerPool(EWorkloadType::Burst), + GetIOWorkerPool(), Path, - CurrentLocalFolderContent, + OutLocalFolderContent, ChunkController, UsePlainProgress ? 5000 : 200, [&](bool, std::ptrdiff_t) { FilteredBytesHashed.Update(ChunkingStats.BytesHashed.load()); std::string Details = fmt::format("{}/{} ({}/{}, {}B/s) scanned, {} ({}) chunks found", ChunkingStats.FilesProcessed.load(), - CurrentLocalFolderContent.Paths.size(), + OutLocalFolderContent.Paths.size(), NiceBytes(ChunkingStats.BytesHashed.load()), - ByteCountToScan, + NiceBytes(ByteCountToScan), NiceNum(FilteredBytesHashed.GetCurrent()), ChunkingStats.UniqueChunksFound.load(), NiceBytes(ChunkingStats.UniqueBytesFound.load())); @@ -6670,31 +7869,35 @@ namespace { return LocalContent; } - void DownloadFolder(BuildStorage& Storage, + void DownloadFolder(StorageInstance& Storage, const Oid& BuildId, const std::vector<Oid>& BuildPartIds, std::span<const std::string> BuildPartNames, const std::filesystem::path& Path, + const std::filesystem::path& ZenFolderPath, bool AllowMultiparts, bool AllowPartialBlockRequests, bool WipeTargetFolder, - bool PostDownloadVerify) + bool PostDownloadVerify, + bool PrimeCacheOnly) { ZEN_TRACE_CPU("DownloadFolder"); + ZEN_ASSERT((!PrimeCacheOnly) || (PrimeCacheOnly && (!AllowPartialBlockRequests))); + Stopwatch DownloadTimer; - const std::filesystem::path ZenTempFolder = Path / ZenTempFolderName; + const std::filesystem::path ZenTempFolder = ZenTempFolderPath(ZenFolderPath); CreateDirectories(ZenTempFolder); - CreateDirectories(Path / ZenTempBlockFolderName); - CreateDirectories(Path / ZenTempCacheFolderName); - CreateDirectories(Path / ZenTempDownloadFolderName); + CreateDirectories(ZenTempBlockFolderPath(ZenFolderPath)); + CreateDirectories(ZenTempCacheFolderPath(ZenFolderPath)); + CreateDirectories(ZenTempDownloadFolderPath(ZenFolderPath)); std::uint64_t PreferredMultipartChunkSize = 32u * 1024u * 1024u; std::vector<std::pair<Oid, std::string>> AllBuildParts = - ResolveBuildPartNames(Storage, BuildId, BuildPartIds, BuildPartNames, PreferredMultipartChunkSize); + ResolveBuildPartNames(*Storage.BuildStorage, BuildId, BuildPartIds, BuildPartNames, PreferredMultipartChunkSize); std::vector<ChunkedFolderContent> PartContents; @@ -6706,26 +7909,36 @@ namespace { ChunkedFolderContent RemoteContent = GetRemoteContent(Storage, BuildId, AllBuildParts, ChunkController, PartContents, BlockDescriptions, LooseChunkHashes); - const std::uint64_t LargeAttachmentSize = AllowMultiparts ? PreferredMultipartChunkSize * 4u : (std::uint64_t)-1; - if (!ChunkController) - { - ZEN_CONSOLE("Warning: Unspecified chunking algorith, using default"); - ChunkController = CreateBasicChunkingController(); - } - + const std::uint64_t LargeAttachmentSize = AllowMultiparts ? PreferredMultipartChunkSize * 4u : (std::uint64_t)-1; GetFolderContentStatistics LocalFolderScanStats; ChunkingStatistics ChunkingStats; ChunkedFolderContent LocalContent; - if (std::filesystem::is_directory(Path)) + FolderContent LocalFolderContent; + if (!PrimeCacheOnly) { - if (!WipeTargetFolder) + if (IsDir(Path)) { - LocalContent = GetLocalContent(LocalFolderScanStats, ChunkingStats, Path, *ChunkController); + if (!WipeTargetFolder) + { + if (!ChunkController) + { + ZEN_CONSOLE("Warning: Unspecified chunking algorith, using default"); + ChunkController = CreateBasicChunkingController(); + } + + LocalContent = GetLocalContent(LocalFolderScanStats, + ChunkingStats, + Path, + ZenFolderPath, + *ChunkController, + RemoteContent, + LocalFolderContent); + } + } + else + { + CreateDirectories(Path); } - } - else - { - CreateDirectories(Path); } if (AbortFlag) { @@ -6778,6 +7991,12 @@ namespace { { ZEN_CONSOLE("Local state is identical to build to download. All done. Completed in {}.", NiceTimeSpanMs(DownloadTimer.GetElapsedTimeMs())); + + Stopwatch WriteStateTimer; + CbObject StateObject = CreateStateObject(BuildId, AllBuildParts, PartContents, LocalFolderContent); + CreateDirectories(ZenStateFilePath(ZenFolderPath).parent_path()); + TemporaryFile::SafeWriteFile(ZenStateFilePath(ZenFolderPath), StateObject.GetView()); + ZEN_CONSOLE("Wrote local state in {}", NiceTimeSpanMs(WriteStateTimer.GetElapsedTimeMs())); } else { @@ -6786,7 +8005,7 @@ namespace { { BuildPartString.Append(fmt::format(" {} ({})", BuildPart.second, BuildPart.first)); } - ZEN_CONSOLE("Downloading build {}, parts:{}", BuildId, BuildPartString.ToView()); + ZEN_CONSOLE("Downloading build {}, parts:{} to '{}'", BuildId, BuildPartString.ToView(), Path); FolderContent LocalFolderState; DiskStatistics DiskStats; @@ -6799,6 +8018,7 @@ namespace { UpdateFolder(Storage, BuildId, Path, + ZenFolderPath, LargeAttachmentSize, PreferredMultipartChunkSize, LocalContent, @@ -6807,6 +8027,7 @@ namespace { LooseChunkHashes, AllowPartialBlockRequests, WipeTargetFolder, + PrimeCacheOnly, LocalFolderState, DiskStats, CacheMappingStats, @@ -6816,20 +8037,23 @@ namespace { if (!AbortFlag) { - VerifyFolder(RemoteContent, Path, PostDownloadVerify, VerifyFolderStats); + if (!PrimeCacheOnly) + { + VerifyFolder(RemoteContent, Path, PostDownloadVerify, VerifyFolderStats); - Stopwatch WriteStateTimer; - CbObject StateObject = CreateStateObject(BuildId, AllBuildParts, PartContents, LocalFolderState); + Stopwatch WriteStateTimer; + CbObject StateObject = CreateStateObject(BuildId, AllBuildParts, PartContents, LocalFolderState); - CreateDirectories((Path / ZenStateFilePath).parent_path()); - TemporaryFile::SafeWriteFile(Path / ZenStateFilePath, StateObject.GetView()); - ZEN_CONSOLE("Wrote local state in {}", NiceTimeSpanMs(WriteStateTimer.GetElapsedTimeMs())); + CreateDirectories(ZenStateFilePath(ZenFolderPath).parent_path()); + TemporaryFile::SafeWriteFile(ZenStateFilePath(ZenFolderPath), StateObject.GetView()); + ZEN_CONSOLE("Wrote local state in {}", NiceTimeSpanMs(WriteStateTimer.GetElapsedTimeMs())); #if 0 - ExtendableStringBuilder<1024> SB; - CompactBinaryToJson(StateObject, SB); - WriteFile(Path / ZenStateFileJsonPath, IoBuffer(IoBuffer::Wrap, SB.Data(), SB.Size())); + ExtendableStringBuilder<1024> SB; + CompactBinaryToJson(StateObject, SB); + WriteFile(ZenStateFileJsonPath(ZenFolderPath), IoBuffer(IoBuffer::Wrap, SB.Data(), SB.Size())); #endif // 0 + } const uint64_t DownloadCount = DownloadStats.DownloadedChunkCount.load() + DownloadStats.DownloadedBlockCount.load() + DownloadStats.DownloadedPartialBlockCount.load(); const uint64_t DownloadByteCount = DownloadStats.DownloadedChunkByteCount.load() + @@ -6863,9 +8087,26 @@ namespace { NiceTimeSpanMs(VerifyFolderStats.VerifyElapsedWallTimeUs / 1000)); } } + if (PrimeCacheOnly) + { + if (Storage.BuildCacheStorage) + { + Storage.BuildCacheStorage->Flush(5000, [](intptr_t Remaining) { + if (Remaining == 0) + { + ZEN_CONSOLE("Build cache upload complete"); + } + else + { + ZEN_CONSOLE("Waiting for build cache to complete uploading. {} blobs remaining", Remaining); + } + return !AbortFlag; + }); + } + } if (CleanDirectory(ZenTempFolder, {})) { - std::filesystem::remove(ZenTempFolder); + RemoveDirWithRetry(ZenTempFolder); } } @@ -7109,7 +8350,14 @@ BuildsCommand::BuildsCommand() auto AddCloudOptions = [this, &AddAuthOptions](cxxopts::Options& Ops) { AddAuthOptions(Ops); - Ops.add_option("cloud build", "", "url", "Cloud Builds URL", cxxopts::value(m_BuildsUrl), "<url>"); + Ops.add_option("cloud build", "", "override-host", "Cloud Builds URL", cxxopts::value(m_OverrideHost), "<override-host>"); + Ops.add_option("cloud build", + "", + "url", + "Cloud Builds host url (legacy - use --override-host)", + cxxopts::value(m_OverrideHost), + "<url>"); + Ops.add_option("cloud build", "", "host", "Cloud Builds host", cxxopts::value(m_Host), "<host>"); Ops.add_option("cloud build", "", "assume-http2", @@ -7131,11 +8379,32 @@ BuildsCommand::BuildsCommand() "<jsonmetadata>"); }; + auto AddCacheOptions = [this](cxxopts::Options& Ops) { + Ops.add_option("cache", "", "zen-cache-host", "Host ip and port for zen builds cache", cxxopts::value(m_ZenCacheHost), "<zenhost>"); + }; + auto AddOutputOptions = [this](cxxopts::Options& Ops) { Ops.add_option("output", "", "plain-progress", "Show progress using plain output", cxxopts::value(m_PlainProgress), "<progress>"); Ops.add_option("output", "", "verbose", "Enable verbose console output", cxxopts::value(m_Verbose), "<verbose>"); }; + auto AddWorkerOptions = [this](cxxopts::Options& Ops) { + Ops.add_option("", + "", + "boost-workers", + "Increase the number of worker threads - may cause computer to less responsive", + cxxopts::value(m_BoostWorkerThreads), + "<boostworkers>"); + }; + + auto AddZenFolderOptions = [this](cxxopts::Options& Ops) { + Ops.add_option("", + "", + "zen-folder-path", + fmt::format("Path to zen state and temp folders. Defaults to [--local-path/]{}", ZenFolderName), + cxxopts::value(m_ZenFolderPath), + "<boostworkers>"); + }; m_Options.add_option("", "v", "verb", @@ -7149,6 +8418,7 @@ BuildsCommand::BuildsCommand() AddCloudOptions(m_ListOptions); AddFileOptions(m_ListOptions); AddOutputOptions(m_ListOptions); + AddZenFolderOptions(m_ListOptions); m_ListOptions.add_options()("h,help", "Print help"); m_ListOptions.add_option("", "", @@ -7169,6 +8439,9 @@ BuildsCommand::BuildsCommand() AddCloudOptions(m_UploadOptions); AddFileOptions(m_UploadOptions); AddOutputOptions(m_UploadOptions); + AddCacheOptions(m_UploadOptions); + AddWorkerOptions(m_UploadOptions); + AddZenFolderOptions(m_UploadOptions); m_UploadOptions.add_options()("h,help", "Print help"); m_UploadOptions.add_option("", "l", "local-path", "Root file system folder for build", cxxopts::value(m_Path), "<local-path>"); m_UploadOptions.add_option("", @@ -7224,6 +8497,12 @@ BuildsCommand::BuildsCommand() "<manifestpath>"); m_UploadOptions .add_option("", "", "verify", "Enable post upload verify of all uploaded data", cxxopts::value(m_PostUploadVerify), "<verify>"); + m_UploadOptions.add_option("", + "", + "find-max-block-count", + "The maximum number of blocks we search for in the build context", + cxxopts::value(m_FindBlockMaxCount), + "<maxblockcount>"); m_UploadOptions.parse_positional({"local-path", "build-id"}); m_UploadOptions.positional_help("local-path build-id"); @@ -7232,6 +8511,16 @@ BuildsCommand::BuildsCommand() AddCloudOptions(m_DownloadOptions); AddFileOptions(m_DownloadOptions); AddOutputOptions(m_DownloadOptions); + AddCacheOptions(m_DownloadOptions); + AddZenFolderOptions(m_DownloadOptions); + AddWorkerOptions(m_DownloadOptions); + m_DownloadOptions.add_option("cache", + "", + "cache-prime-only", + "Only download blobs missing in cache and upload to cache", + cxxopts::value(m_PrimeCacheOnly), + "<cacheprimeonly>"); + m_DownloadOptions.add_options()("h,help", "Print help"); m_DownloadOptions.add_option("", "l", "local-path", "Root file system folder for build", cxxopts::value(m_Path), "<local-path>"); m_DownloadOptions.add_option("", "", "build-id", "Build Id", cxxopts::value(m_BuildId), "<id>"); @@ -7263,12 +8552,14 @@ BuildsCommand::BuildsCommand() "Allow request for partial chunk blocks. Defaults to true.", cxxopts::value(m_AllowPartialBlockRequests), "<allowpartialblockrequests>"); + m_DownloadOptions .add_option("", "", "verify", "Enable post download verify of all tracked files", cxxopts::value(m_PostDownloadVerify), "<verify>"); m_DownloadOptions.parse_positional({"local-path", "build-id", "build-part-name"}); m_DownloadOptions.positional_help("local-path build-id build-part-name"); AddOutputOptions(m_DiffOptions); + AddWorkerOptions(m_DiffOptions); m_DiffOptions.add_options()("h,help", "Print help"); m_DiffOptions.add_option("", "l", "local-path", "Root file system folder used as base", cxxopts::value(m_Path), "<local-path>"); m_DiffOptions.add_option("", "c", "compare-path", "Root file system folder used as diff", cxxopts::value(m_DiffPath), "<diff-path>"); @@ -7284,6 +8575,8 @@ BuildsCommand::BuildsCommand() AddCloudOptions(m_TestOptions); AddFileOptions(m_TestOptions); AddOutputOptions(m_TestOptions); + AddCacheOptions(m_TestOptions); + AddWorkerOptions(m_TestOptions); m_TestOptions.add_options()("h,help", "Print help"); m_TestOptions.add_option("", "l", "local-path", "Root file system folder used as base", cxxopts::value(m_Path), "<local-path>"); m_TestOptions.add_option("", @@ -7304,6 +8597,8 @@ BuildsCommand::BuildsCommand() AddCloudOptions(m_FetchBlobOptions); AddFileOptions(m_FetchBlobOptions); AddOutputOptions(m_FetchBlobOptions); + AddCacheOptions(m_FetchBlobOptions); + AddZenFolderOptions(m_FetchBlobOptions); m_FetchBlobOptions.add_option("", "", "build-id", "Build Id", cxxopts::value(m_BuildId), "<id>"); m_FetchBlobOptions .add_option("", "", "blob-hash", "IoHash in hex form identifying the blob to download", cxxopts::value(m_BlobHash), "<blob-hash>"); @@ -7313,6 +8608,8 @@ BuildsCommand::BuildsCommand() AddCloudOptions(m_ValidateBuildPartOptions); AddFileOptions(m_ValidateBuildPartOptions); AddOutputOptions(m_ValidateBuildPartOptions); + AddWorkerOptions(m_ValidateBuildPartOptions); + AddZenFolderOptions(m_ValidateBuildPartOptions); m_ValidateBuildPartOptions.add_option("", "", "build-id", "Build Id", cxxopts::value(m_BuildId), "<id>"); m_ValidateBuildPartOptions.add_option("", "", @@ -7333,6 +8630,8 @@ BuildsCommand::BuildsCommand() AddCloudOptions(m_MultiTestDownloadOptions); AddFileOptions(m_MultiTestDownloadOptions); AddOutputOptions(m_MultiTestDownloadOptions); + AddCacheOptions(m_MultiTestDownloadOptions); + AddWorkerOptions(m_MultiTestDownloadOptions); m_MultiTestDownloadOptions .add_option("", "l", "local-path", "Root file system folder used as base", cxxopts::value(m_Path), "<local-path>"); m_MultiTestDownloadOptions.add_option("", "", "build-ids", "Build Ids list separated by ','", cxxopts::value(m_BuildIds), "<ids>"); @@ -7373,7 +8672,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) } auto ParseStorageOptions = [&]() { - if (!m_BuildsUrl.empty()) + if (!m_OverrideHost.empty() || !m_Host.empty()) { if (!m_StoragePath.empty()) { @@ -7385,16 +8684,23 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) fmt::format("namespace and bucket options are required for url option\n{}", m_Options.help())); } } + else if (m_StoragePath.empty()) + { + throw zen::OptionParseException(fmt::format("At least one storage option is required\n{}", m_UploadOptions.help())); + } }; std::unique_ptr<AuthMgr> Auth; - HttpClientSettings ClientSettings{.AssumeHttp2 = m_AssumeHttp2, .AllowResume = true, .RetryCount = 2}; + HttpClientSettings ClientSettings{.LogCategory = "httpbuildsclient", + .AssumeHttp2 = m_AssumeHttp2, + .AllowResume = true, + .RetryCount = 2}; auto CreateAuthMgr = [&]() { if (!Auth) { - std::filesystem::path DataRoot = m_SystemRootDir.empty() ? PickDefaultSystemRootDirectory() : StringToPath(m_SystemRootDir); - + std::filesystem::path DataRoot = + m_SystemRootDir.empty() ? PickDefaultSystemRootDirectory() : MakeSafeAbsolutePath(m_SystemRootDir); if (m_EncryptionKey.empty()) { m_EncryptionKey = "abcdefghijklmnopqrstuvxyz0123456"; @@ -7448,7 +8754,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) } else if (!m_AccessTokenPath.empty()) { - std::string ResolvedAccessToken = ReadAccessTokenFromFile(StringToPath(m_AccessTokenPath)); + std::string ResolvedAccessToken = ReadAccessTokenFromFile(MakeSafeAbsolutePath(m_AccessTokenPath)); if (!ResolvedAccessToken.empty()) { ClientSettings.AccessTokenProvider = httpclientauth::CreateFromStaticToken(ResolvedAccessToken); @@ -7474,7 +8780,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) ClientSettings.AccessTokenProvider = httpclientauth::CreateFromDefaultOpenIdProvider(*Auth); } - if (!m_BuildsUrl.empty() && !ClientSettings.AccessTokenProvider) + if (!ClientSettings.AccessTokenProvider) { ZEN_CONSOLE("Warning: No auth provider given, attempting operation without credentials."); } @@ -7486,15 +8792,243 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) }; ParseOutputOptions(); - try - { - if (SubOption == &m_ListOptions) + auto CreateBuildStorage = [&](BuildStorage::Statistics& StorageStats, + BuildStorageCache::Statistics& StorageCacheStats, + const std::filesystem::path& TempPath) -> StorageInstance { + ParseStorageOptions(); + + StorageInstance Result; + + std::string BuildStorageName = ZEN_CLOUD_STORAGE; + std::string BuildCacheName; + bool CacheAssumeHttp2 = false; + std::string StorageDescription; + std::string CacheDescription; + + if (!m_Host.empty() || !m_OverrideHost.empty()) { - ParseStorageOptions(); ParseAuthOptions(); + } + + std::string CloudHost; + + if (!m_Host.empty()) + { + if (m_OverrideHost.empty() || m_ZenCacheHost.empty()) + { + HttpClient DiscoveryHttpClient(m_Host, ClientSettings); + HttpClient::Response ServerInfoResponse = + DiscoveryHttpClient.Get("/api/v1/status/servers", HttpClient::Accept(HttpContentType::kJSON)); + if (!ServerInfoResponse.IsSuccess()) + { + throw std::runtime_error(fmt::format("Failed to get list of servers from discovery url '{}'. Reason: '{}'", + m_Host, + ServerInfoResponse.ErrorMessage(""))); + } + + std::string_view JsonResponse = ServerInfoResponse.AsText(); + CbObject ResponseObjectView = LoadCompactBinaryFromJson(JsonResponse).AsObject(); - HttpClient Http(m_BuildsUrl, ClientSettings); + if (m_OverrideHost.empty()) + { + CbArrayView ServerEndpointsArray = ResponseObjectView["serverEndpoints"sv].AsArrayView(); + std::uint64_t ServerCount = ServerEndpointsArray.Num(); + if (ServerCount == 0) + { + throw std::runtime_error(fmt::format("Failed to find any builds hosts at {}", m_Host)); + } + for (CbFieldView ServerEndpointView : ServerEndpointsArray) + { + CbObjectView ServerEndpointObject = ServerEndpointView.AsObjectView(); + std::string_view BaseUrl = ServerEndpointObject["baseUrl"sv].AsString(); + if (!BaseUrl.empty()) + { + const bool AssumeHttp2 = ServerEndpointObject["assumeHttp2"sv].AsBool(false); + std::string_view Name = ServerEndpointObject["name"sv].AsString(); + + HttpClientSettings TestClientSettings{.LogCategory = "httpbuildsclient", + .ConnectTimeout = std::chrono::milliseconds{1000}, + .Timeout = std::chrono::milliseconds{2000}, + .AssumeHttp2 = AssumeHttp2, + .AllowResume = true, + .RetryCount = 0}; + + HttpClient TestHttpClient(BaseUrl, TestClientSettings); + HttpClient::Response TestResponse = TestHttpClient.Get("/health/live"); + if (TestResponse.IsSuccess()) + { + CloudHost = BaseUrl; + m_AssumeHttp2 = AssumeHttp2; + BuildStorageName = Name; + break; + } + } + } + if (CloudHost.empty()) + { + throw std::runtime_error( + fmt::format("Failed to find any usable builds hosts out of {} using {}", ServerCount, m_Host)); + } + } + + auto TestCacheEndpoint = [](std::string_view BaseUrl, const bool AssumeHttp2) -> bool { + HttpClientSettings TestClientSettings{.LogCategory = "httpcacheclient", + .ConnectTimeout = std::chrono::milliseconds{1000}, + .Timeout = std::chrono::milliseconds{2000}, + .AssumeHttp2 = AssumeHttp2, + .AllowResume = true, + .RetryCount = 0}; + HttpClient TestHttpClient(BaseUrl, TestClientSettings); + HttpClient::Response TestResponse = TestHttpClient.Get("/status/builds"); + if (TestResponse.IsSuccess()) + { + return true; + } + return false; + }; + + if (m_ZenCacheHost.empty()) + { + CbArrayView CacheEndpointsArray = ResponseObjectView["cacheEndpoints"sv].AsArrayView(); + std::uint64_t CacheCount = CacheEndpointsArray.Num(); + for (CbFieldView CacheEndpointView : CacheEndpointsArray) + { + CbObjectView CacheEndpointObject = CacheEndpointView.AsObjectView(); + + std::string_view BaseUrl = CacheEndpointObject["baseUrl"sv].AsString(); + if (!BaseUrl.empty()) + { + const bool AssumeHttp2 = CacheEndpointObject["assumeHttp2"sv].AsBool(false); + std::string_view Name = CacheEndpointObject["name"sv].AsString(); + + if (TestCacheEndpoint(BaseUrl, AssumeHttp2)) + { + m_ZenCacheHost = BaseUrl; + CacheAssumeHttp2 = AssumeHttp2; + BuildCacheName = Name; + break; + } + } + } + if (m_ZenCacheHost.empty()) + { + ZenServerState State; + if (State.InitializeReadOnly()) + { + State.Snapshot([&](const ZenServerState::ZenServerEntry& Entry) { + if (m_ZenCacheHost.empty()) + { + std::string ZenServerLocalHostUrl = + fmt::format("http://127.0.0.1:{}", Entry.EffectiveListenPort.load()); + if (TestCacheEndpoint(ZenServerLocalHostUrl, false)) + { + m_ZenCacheHost = ZenServerLocalHostUrl; + CacheAssumeHttp2 = false; + BuildCacheName = "localhost"; + } + } + }); + } + if (m_ZenCacheHost.empty()) + { + ZEN_CONSOLE("Warning: Failed to find any usable cache hosts out of {} using {}", CacheCount, m_Host); + } + } + } + else if (TestCacheEndpoint(m_ZenCacheHost, false)) + { + std::string::size_type HostnameStart = 0; + std::string::size_type HostnameLength = std::string::npos; + if (auto StartPos = m_ZenCacheHost.find("//"); StartPos != std::string::npos) + { + HostnameStart = StartPos + 2; + } + if (auto EndPos = m_ZenCacheHost.find("/", HostnameStart); EndPos != std::string::npos) + { + HostnameLength = EndPos - HostnameStart; + } + BuildCacheName = m_ZenCacheHost.substr(HostnameStart, HostnameLength); + } + } + } + else + { + CloudHost = m_OverrideHost; + } + + if (!CloudHost.empty()) + { + Result.BuildStorageHttp = std::make_unique<HttpClient>(CloudHost, ClientSettings); + StorageDescription = fmt::format("Cloud {}{}. SessionId: '{}'. Namespace '{}', Bucket '{}'", + BuildStorageName.empty() ? "" : fmt::format("{}, ", BuildStorageName), + CloudHost, + Result.BuildStorageHttp->GetSessionId(), + m_Namespace, + m_Bucket); + Result.BuildStorage = + CreateJupiterBuildStorage(Log(), *Result.BuildStorageHttp, StorageStats, m_Namespace, m_Bucket, TempPath / "storage"); + Result.StorageName = BuildStorageName; + } + else if (!m_StoragePath.empty()) + { + std::filesystem::path StoragePath = MakeSafeAbsolutePath(m_StoragePath); + StorageDescription = fmt::format("folder {}", StoragePath); + Result.BuildStorage = CreateFileBuildStorage(StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec); + Result.StorageName = fmt::format("Disk {}", StoragePath.stem()); + } + else + { + throw zen::OptionParseException(fmt::format("Storage option is missing\n{}", m_UploadOptions.help())); + } + if (!m_ZenCacheHost.empty()) + { + Result.CacheHttp = std::make_unique<HttpClient>(m_ZenCacheHost, + HttpClientSettings{.LogCategory = "httpcacheclient", + .ConnectTimeout = std::chrono::milliseconds{3000}, + .Timeout = std::chrono::milliseconds{30000}, + .AssumeHttp2 = CacheAssumeHttp2, + .AllowResume = true, + .RetryCount = 0}); + Result.BuildCacheStorage = CreateZenBuildStorageCache(*Result.CacheHttp, + StorageCacheStats, + m_Namespace, + m_Bucket, + TempPath / "zencache", + m_PrimeCacheOnly); + CacheDescription = fmt::format("Zen {}{}. SessionId: '{}'", + BuildCacheName.empty() ? "" : fmt::format("{}, ", BuildCacheName), + m_ZenCacheHost, + Result.CacheHttp->GetSessionId()); + + if (!m_Namespace.empty()) + { + CacheDescription += fmt::format(". Namespace '{}'", m_Namespace); + } + if (!m_Bucket.empty()) + { + CacheDescription += fmt::format(" Bucket '{}'", m_Bucket); + } + Result.CacheName = BuildCacheName; + } + ZEN_CONSOLE("Remote: {}", StorageDescription); + if (!Result.CacheName.empty()) + { + ZEN_CONSOLE("Cache : {}", CacheDescription); + } + return Result; + }; + + BoostWorkerThreads = m_BoostWorkerThreads; + + try + { + if (SubOption == &m_ListOptions) + { + if (!m_ListResultPath.empty()) + { + ZEN_CONSOLE("Running {}: {}", GetRunningExecutablePath(), ZEN_CFG_VERSION_BUILD_STRING_FULL); + } CbObject QueryObject; if (m_ListQueryPath.empty()) { @@ -7505,7 +9039,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) } else { - std::filesystem::path ListQueryPath = StringToPath(m_ListQueryPath); + std::filesystem::path ListQueryPath = MakeSafeAbsolutePath(m_ListQueryPath); if (ToLower(ListQueryPath.extension().string()) == ".cbo") { QueryObject = LoadCompactBinaryObject(IoBufferBuilder::MakeFromFile(ListQueryPath)); @@ -7525,28 +9059,22 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) } BuildStorage::Statistics StorageStats; - std::unique_ptr<BuildStorage> Storage; - if (!m_BuildsUrl.empty()) - { - ZEN_CONSOLE_VERBOSE("Querying builds in cloud endpoint '{}'. SessionId: '{}'. Namespace '{}', Bucket '{}'", - m_BuildsUrl, - Http.GetSessionId(), - m_Namespace, - m_Bucket); - Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, std::filesystem::path{}); - } - else if (!m_StoragePath.empty()) - { - std::filesystem::path StoragePath = StringToPath(m_StoragePath); - ZEN_CONSOLE_VERBOSE("Querying builds in folder '{}'.", StoragePath); - Storage = CreateFileBuildStorage(StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec); - } - else - { - throw zen::OptionParseException(fmt::format("Storage option is missing\n{}", m_UploadOptions.help())); - } + BuildStorageCache::Statistics StorageCacheStats; - CbObject Response = Storage->ListBuilds(QueryObject); + const std::filesystem::path ZenFolderPath = + m_ZenFolderPath.empty() ? MakeSafeAbsolutePath(".") / ZenFolderName : MakeSafeAbsolutePath(m_ZenFolderPath); + CreateDirectories(ZenFolderPath); + auto _ = MakeGuard([ZenFolderPath]() { + if (CleanDirectory(ZenFolderPath, {})) + { + std::error_code DummyEc; + RemoveDir(ZenFolderPath, DummyEc); + } + }); + + StorageInstance Storage = CreateBuildStorage(StorageStats, StorageCacheStats, ZenTempFolderPath(ZenFolderPath)); + + CbObject Response = Storage.BuildStorage->ListBuilds(QueryObject); ZEN_ASSERT(ValidateCompactBinary(Response.GetView(), CbValidateMode::All) == CbValidateError::None); if (m_ListResultPath.empty()) { @@ -7556,7 +9084,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) } else { - std::filesystem::path ListResultPath = StringToPath(m_ListResultPath); + std::filesystem::path ListResultPath = MakeSafeAbsolutePath(m_ListResultPath); if (ToLower(ListResultPath.extension().string()) == ".cbo") { MemoryView ResponseView = Response.GetView(); @@ -7575,10 +9103,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) if (SubOption == &m_UploadOptions) { - ParseStorageOptions(); - ParseAuthOptions(); - - HttpClient Http(m_BuildsUrl, ClientSettings); + ZEN_CONSOLE("Running {}: {}", GetRunningExecutablePath(), ZEN_CFG_VERSION_BUILD_STRING_FULL); if (m_Path.empty()) { @@ -7610,7 +9135,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) } } - std::filesystem::path Path = StringToPath(m_Path); + std::filesystem::path Path = MakeSafeAbsolutePath(m_Path); if (m_BuildPartName.empty()) { @@ -7645,48 +9170,31 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) throw zen::OptionParseException(fmt::format("Invalid build part id\n{}", m_UploadOptions.help())); } + const Oid BuildId = Oid::FromHexString(m_BuildId); + const Oid BuildPartId = Oid::FromHexString(m_BuildPartId); + BuildStorage::Statistics StorageStats; - const Oid BuildId = Oid::FromHexString(m_BuildId); - const Oid BuildPartId = Oid::FromHexString(m_BuildPartId); - std::unique_ptr<BuildStorage> Storage; - std::string StorageName; - if (!m_BuildsUrl.empty()) - { - ZEN_CONSOLE("Uploading '{}' from '{}' to cloud endpoint '{}'. SessionId: '{}'. Namespace '{}', Bucket '{}', {}BuildId '{}'", - m_BuildPartName, - Path, - m_BuildsUrl, - Http.GetSessionId(), - m_Namespace, - m_Bucket, - GeneratedBuildId ? "Generated " : "", - BuildId); - Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, Path / ZenTempStorageFolderName); - StorageName = "Cloud DDC"; - } - else if (!m_StoragePath.empty()) - { - std::filesystem::path StoragePath = StringToPath(m_StoragePath); - ZEN_CONSOLE("Uploading '{}' from '{}' to folder '{}'. {}BuildId '{}'", - m_BuildPartName, - Path, - StoragePath, - GeneratedBuildId ? "Generated " : "", - BuildId); - Storage = CreateFileBuildStorage(StoragePath, StorageStats, m_WriteMetadataAsJson, DefaultLatency, DefaultDelayPerKBSec); - StorageName = fmt::format("Disk {}", StoragePath.stem()); - } - else - { - throw zen::OptionParseException(fmt::format("Storage option is missing\n{}", m_UploadOptions.help())); - } + BuildStorageCache::Statistics StorageCacheStats; + + const std::filesystem::path ZenFolderPath = + m_ZenFolderPath.empty() ? MakeSafeAbsolutePath(".") / ZenFolderName : MakeSafeAbsolutePath(m_ZenFolderPath); + CreateDirectories(ZenFolderPath); + auto _ = MakeGuard([ZenFolderPath]() { + if (CleanDirectory(ZenFolderPath, {})) + { + std::error_code DummyEc; + RemoveDir(ZenFolderPath, DummyEc); + } + }); + + StorageInstance Storage = CreateBuildStorage(StorageStats, StorageCacheStats, ZenTempFolderPath(ZenFolderPath)); CbObject MetaData; if (m_CreateBuild) { if (!m_BuildMetadataPath.empty()) { - std::filesystem::path MetadataPath = StringToPath(m_BuildMetadataPath); + std::filesystem::path MetadataPath = MakeSafeAbsolutePath(m_BuildMetadataPath); IoBuffer MetaDataJson = ReadFile(MetadataPath).Flatten(); std::string_view Json(reinterpret_cast<const char*>(MetaDataJson.GetData()), MetaDataJson.GetSize()); std::string JsonError; @@ -7713,12 +9221,14 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) } } - UploadFolder(*Storage, + UploadFolder(Storage, BuildId, BuildPartId, m_BuildPartName, Path, - StringToPath(m_ManifestPath), + ZenFolderPath, + MakeSafeAbsolutePath(m_ManifestPath), + m_FindBlockMaxCount, m_BlockReuseMinPercentLimit, m_AllowMultiparts, MetaData, @@ -7735,7 +9245,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) "Requests: {}\n" "Avg Request Time: {}\n" "Avg I/O Time: {}", - StorageName, + Storage.StorageName, NiceBytes(StorageStats.TotalBytesRead.load()), NiceBytes(StorageStats.TotalBytesWritten.load()), StorageStats.TotalRequestCount.load(), @@ -7751,10 +9261,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) if (SubOption == &m_DownloadOptions) { - ParseStorageOptions(); - ParseAuthOptions(); - - HttpClient Http(m_BuildsUrl, ClientSettings); + ZEN_CONSOLE("Running {}: {}", GetRunningExecutablePath(), ZEN_CFG_VERSION_BUILD_STRING_FULL); if (m_Path.empty()) { @@ -7775,6 +9282,22 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) throw zen::OptionParseException(fmt::format("build-part-id conflicts with build-part-name\n{}", m_DownloadOptions.help())); } + if (m_PostDownloadVerify && m_PrimeCacheOnly) + { + throw zen::OptionParseException( + fmt::format("'cache-prime-only' option is not compatible with 'verify' option\n{}", m_DownloadOptions.help())); + } + + if (m_Clean && m_PrimeCacheOnly) + { + ZEN_WARN("ignoring 'clean' option when 'cache-prime-only' is enabled"); + } + + if (m_AllowPartialBlockRequests && m_PrimeCacheOnly) + { + ZEN_WARN("ignoring 'allow-partial-block-requests' option when 'cache-prime-only' is enabled"); + } + std::vector<Oid> BuildPartIds; for (const std::string& BuildPartId : m_BuildPartIds) { @@ -7786,45 +9309,27 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) } } - std::filesystem::path Path = StringToPath(m_Path); + std::filesystem::path Path = MakeSafeAbsolutePath(m_Path); BuildStorage::Statistics StorageStats; - std::unique_ptr<BuildStorage> Storage; - std::string StorageName; - if (!m_BuildsUrl.empty()) - { - ZEN_CONSOLE("Downloading '{}' to '{}' from cloud endpoint {}. SessionId: '{}'. Namespace '{}', Bucket '{}', BuildId '{}'", - BuildId, - Path, - m_BuildsUrl, - Http.GetSessionId(), - m_Namespace, - m_Bucket, - BuildId); - Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, Path / ZenTempStorageFolderName); - StorageName = "Cloud DDC"; - } - else if (!m_StoragePath.empty()) - { - std::filesystem::path StoragePath = StringToPath(m_StoragePath); - ZEN_CONSOLE("Downloading '{}' to '{}' from folder {}. BuildId '{}'", BuildId, Path, StoragePath, BuildId); - Storage = CreateFileBuildStorage(StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec); - StorageName = fmt::format("Disk {}", StoragePath.stem()); - } - else - { - throw zen::OptionParseException(fmt::format("Storage option is missing\n{}", m_UploadOptions.help())); - } + BuildStorageCache::Statistics StorageCacheStats; + + const std::filesystem::path ZenFolderPath = + m_ZenFolderPath.empty() ? Path / ZenFolderName : MakeSafeAbsolutePath(m_ZenFolderPath); - DownloadFolder(*Storage, + StorageInstance Storage = CreateBuildStorage(StorageStats, StorageCacheStats, ZenTempFolderPath(ZenFolderPath)); + + DownloadFolder(Storage, BuildId, BuildPartIds, m_BuildPartNames, Path, + ZenFolderPath, m_AllowMultiparts, - m_AllowPartialBlockRequests, + m_AllowPartialBlockRequests && !m_PrimeCacheOnly, m_Clean, - m_PostDownloadVerify); + m_PostDownloadVerify, + m_PrimeCacheOnly); if (false) { @@ -7835,7 +9340,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) "Requests: {}\n" "Avg Request Time: {}\n" "Avg I/O Time: {}", - StorageName, + Storage.StorageName, NiceBytes(StorageStats.TotalBytesRead.load()), NiceBytes(StorageStats.TotalBytesWritten.load()), StorageStats.TotalRequestCount.load(), @@ -7859,8 +9364,8 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) { throw zen::OptionParseException(fmt::format("compare-path is required\n{}", m_DownloadOptions.help())); } - std::filesystem::path Path = StringToPath(m_Path); - std::filesystem::path DiffPath = StringToPath(m_DiffPath); + std::filesystem::path Path = MakeSafeAbsolutePath(m_Path); + std::filesystem::path DiffPath = MakeSafeAbsolutePath(m_DiffPath); DiffFolders(Path, DiffPath, m_OnlyChunked); return AbortFlag ? 11 : 0; } @@ -7872,10 +9377,6 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) throw zen::OptionParseException(fmt::format("local-path is required\n{}", m_DownloadOptions.help())); } - ParseStorageOptions(); - ParseAuthOptions(); - - HttpClient Http(m_BuildsUrl, ClientSettings); // m_StoragePath = "D:\\buildstorage"; // m_Path = "F:\\Saved\\DownloadedBuilds\\++Fortnite+Main-CL-XXXXXXXX\\WindowsClient"; // std::vector<std::string> BuildIdStrings{"07d3942f0e7f4ca1b13b0587", @@ -7886,34 +9387,15 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) // "07d3964f919d577a321a1fdd", // "07d396a6ce875004e16b9528"}; - std::filesystem::path Path = StringToPath(m_Path); + std::filesystem::path Path = MakeSafeAbsolutePath(m_Path); BuildStorage::Statistics StorageStats; - std::unique_ptr<BuildStorage> Storage; - std::string StorageName; - if (!m_BuildsUrl.empty()) - { - ZEN_CONSOLE("Downloading {} to '{}' from cloud endpoint {}. SessionId: '{}'. Namespace '{}', Bucket '{}'", - FormatArray<std::string>(m_BuildIds, " "sv), - Path, - m_BuildsUrl, - Http.GetSessionId(), - m_Namespace, - m_Bucket); - Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, Path / ZenTempStorageFolderName); - StorageName = "Cloud DDC"; - } - else if (!m_StoragePath.empty()) - { - std::filesystem::path StoragePath = StringToPath(m_StoragePath); - ZEN_CONSOLE("Downloading {}'to '{}' from folder {}", FormatArray<std::string>(m_BuildIds, " "sv), Path, StoragePath); - Storage = CreateFileBuildStorage(StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec); - StorageName = fmt::format("Disk {}", StoragePath.stem()); - } - else - { - throw zen::OptionParseException(fmt::format("Storage option is missing\n{}", m_UploadOptions.help())); - } + BuildStorageCache::Statistics StorageCacheStats; + + const std::filesystem::path ZenFolderPath = + m_ZenFolderPath.empty() ? Path / ZenFolderName : MakeSafeAbsolutePath(m_ZenFolderPath); + + StorageInstance Storage = CreateBuildStorage(StorageStats, StorageCacheStats, ZenTempFolderPath(ZenFolderPath)); Stopwatch Timer; for (const std::string& BuildIdString : m_BuildIds) @@ -7923,15 +9405,17 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) { throw zen::OptionParseException(fmt::format("invalid build id {}\n{}", BuildIdString, m_DownloadOptions.help())); } - DownloadFolder(*Storage, + DownloadFolder(Storage, BuildId, {}, {}, Path, + ZenFolderPath, m_AllowMultiparts, m_AllowPartialBlockRequests, BuildIdString == m_BuildIds.front(), - true); + true, + false); if (AbortFlag) { ZEN_CONSOLE("Download cancelled"); @@ -7945,71 +9429,45 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) if (SubOption == &m_TestOptions) { - ParseStorageOptions(); - ParseAuthOptions(); - - HttpClient Http(m_BuildsUrl, ClientSettings); - if (m_Path.empty()) { throw zen::OptionParseException(fmt::format("local-path is required\n{}", m_DownloadOptions.help())); } - std::filesystem::path Path = StringToPath(m_Path); + std::filesystem::path Path = MakeSafeAbsolutePath(m_Path); m_BuildId = Oid::NewOid().ToString(); m_BuildPartName = Path.filename().string(); m_BuildPartId = Oid::NewOid().ToString(); m_CreateBuild = true; - BuildStorage::Statistics StorageStats; - const Oid BuildId = Oid::FromHexString(m_BuildId); - const Oid BuildPartId = Oid::FromHexString(m_BuildPartId); - std::unique_ptr<BuildStorage> Storage; - std::string StorageName; + const Oid BuildId = Oid::FromHexString(m_BuildId); + const Oid BuildPartId = Oid::FromHexString(m_BuildPartId); - std::filesystem::path StoragePath = StringToPath(m_StoragePath); + std::filesystem::path StoragePath = MakeSafeAbsolutePath(m_StoragePath); - if (m_BuildsUrl.empty() && StoragePath.empty()) + if (m_OverrideHost.empty() && StoragePath.empty()) { StoragePath = (GetRunningExecutablePath().parent_path() / ".tmpstore").make_preferred(); CreateDirectories(StoragePath); CleanDirectory(StoragePath, {}); + m_StoragePath = StoragePath.generic_string(); } auto _ = MakeGuard([&]() { - if (m_BuildsUrl.empty() && StoragePath.empty()) + if (m_OverrideHost.empty() && StoragePath.empty()) { DeleteDirectories(StoragePath); } }); - if (!m_BuildsUrl.empty()) - { - ZEN_CONSOLE("Using '{}' to '{}' from cloud endpoint {}. SessionId: '{}'. Namespace '{}', Bucket '{}', BuildId '{}'", - m_BuildPartName.empty() ? m_BuildPartId : m_BuildPartName, - Path, - m_BuildsUrl, - Http.GetSessionId(), - m_Namespace, - m_Bucket, - BuildId); - Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, Path / ZenTempStorageFolderName); - StorageName = "Cloud DDC"; - } - else if (!StoragePath.empty()) - { - ZEN_CONSOLE("Using '{}' to '{}' from folder {}. BuildId '{}'", - m_BuildPartName.empty() ? m_BuildPartId : m_BuildPartName, - Path, - StoragePath, - BuildId); - Storage = CreateFileBuildStorage(StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec); - StorageName = fmt::format("Disk {}", StoragePath.stem()); - } - else - { - throw zen::OptionParseException(fmt::format("Storage option is missing\n{}", m_UploadOptions.help())); - } + BuildStorage::Statistics StorageStats; + BuildStorageCache::Statistics StorageCacheStats; + + const std::filesystem::path DownloadPath = Path.parent_path() / (m_BuildPartName + "_test"); + const std::filesystem::path ZenFolderPath = + m_ZenFolderPath.empty() ? DownloadPath / ZenFolderName : MakeSafeAbsolutePath(m_ZenFolderPath); + + StorageInstance Storage = CreateBuildStorage(StorageStats, StorageCacheStats, ZenTempFolderPath(ZenFolderPath)); auto MakeMetaData = [](const Oid& BuildId) -> CbObject { CbObjectWriter BuildMetaDataWriter; @@ -8032,12 +9490,14 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) ZEN_CONSOLE("Upload Build {}, Part {} ({})\n{}", m_BuildId, BuildPartId, m_BuildPartName, SB.ToView()); } - UploadFolder(*Storage, + UploadFolder(Storage, BuildId, BuildPartId, m_BuildPartName, Path, + ZenFolderPath, {}, + m_FindBlockMaxCount, m_BlockReuseMinPercentLimit, m_AllowMultiparts, MetaData, @@ -8050,9 +9510,18 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) return 11; } - const std::filesystem::path DownloadPath = Path.parent_path() / (m_BuildPartName + "_download"); ZEN_CONSOLE("\nDownload Build {}, Part {} ({}) to '{}'", BuildId, BuildPartId, m_BuildPartName, DownloadPath); - DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, m_AllowPartialBlockRequests, true, true); + DownloadFolder(Storage, + BuildId, + {BuildPartId}, + {}, + DownloadPath, + ZenFolderPath, + m_AllowMultiparts, + m_AllowPartialBlockRequests, + true, + true, + false); if (AbortFlag) { ZEN_CONSOLE("Download failed."); @@ -8064,7 +9533,17 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) BuildPartId, m_BuildPartName, DownloadPath); - DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, m_AllowPartialBlockRequests, false, true); + DownloadFolder(Storage, + BuildId, + {BuildPartId}, + {}, + DownloadPath, + ZenFolderPath, + m_AllowMultiparts, + m_AllowPartialBlockRequests, + false, + true, + false); if (AbortFlag) { ZEN_CONSOLE("Re-download failed. (identical target)"); @@ -8115,11 +9594,11 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) if (SourceSize > 256) { Work.ScheduleWork( - GetMediumWorkerPool(EWorkloadType::Burst), - [SourceSize, FilePath](std::atomic<bool>&) { + GetIOWorkerPool(), + [SourceSize, FilePath = std::filesystem::path(FilePath)](std::atomic<bool>&) { if (!AbortFlag) { - bool IsReadOnly = SetFileReadOnly(FilePath, false); + bool IsReadOnly = SetFileReadOnlyWithRetry(FilePath, false); { BasicFile Source(FilePath, BasicFile::Mode::kWrite); uint64_t RangeSize = Min(SourceSize / 3, 512u * 1024u); @@ -8146,7 +9625,10 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) } break; case 1: - std::filesystem::remove(FilePath); + { + (void)SetFileReadOnlyWithRetry(FilePath, false); + RemoveFileWithRetry(FilePath); + } break; default: break; @@ -8168,7 +9650,17 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) BuildPartId, m_BuildPartName, DownloadPath); - DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, m_AllowPartialBlockRequests, false, true); + DownloadFolder(Storage, + BuildId, + {BuildPartId}, + {}, + DownloadPath, + ZenFolderPath, + m_AllowMultiparts, + m_AllowPartialBlockRequests, + false, + true, + false); if (AbortFlag) { ZEN_CONSOLE("Re-download failed. (scrambled target)"); @@ -8187,12 +9679,14 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) ZEN_CONSOLE("\nUpload scrambled Build {}, Part {} ({})\n{}\n", BuildId2, BuildPartId2, m_BuildPartName, SB.ToView()); } - UploadFolder(*Storage, + UploadFolder(Storage, BuildId2, BuildPartId2, m_BuildPartName, DownloadPath, + ZenFolderPath, {}, + m_FindBlockMaxCount, m_BlockReuseMinPercentLimit, m_AllowMultiparts, MetaData2, @@ -8206,7 +9700,17 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) } ZEN_CONSOLE("\nDownload Build {}, Part {} ({}) to '{}' (original)", BuildId, BuildPartId, m_BuildPartName, DownloadPath); - DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, m_AllowPartialBlockRequests, false, true); + DownloadFolder(Storage, + BuildId, + {BuildPartId}, + {}, + DownloadPath, + ZenFolderPath, + m_AllowMultiparts, + m_AllowPartialBlockRequests, + false, + true, + false); if (AbortFlag) { ZEN_CONSOLE("Re-download failed."); @@ -8214,15 +9718,17 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) } ZEN_CONSOLE("\nDownload Build {}, Part {} ({}) to '{}' (scrambled)", BuildId2, BuildPartId2, m_BuildPartName, DownloadPath); - DownloadFolder(*Storage, + DownloadFolder(Storage, BuildId2, {BuildPartId2}, {}, DownloadPath, + ZenFolderPath, m_AllowMultiparts, m_AllowPartialBlockRequests, false, - true); + true, + false); if (AbortFlag) { ZEN_CONSOLE("Re-download failed."); @@ -8230,15 +9736,17 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) } ZEN_CONSOLE("\nRe-download Build {}, Part {} ({}) to '{}' (scrambled)", BuildId2, BuildPartId2, m_BuildPartName, DownloadPath); - DownloadFolder(*Storage, + DownloadFolder(Storage, BuildId2, {BuildPartId2}, {}, DownloadPath, + ZenFolderPath, m_AllowMultiparts, m_AllowPartialBlockRequests, false, - true); + true, + false); if (AbortFlag) { ZEN_CONSOLE("Re-download failed."); @@ -8250,11 +9758,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) if (SubOption == &m_FetchBlobOptions) { - ParseStorageOptions(); - ParseAuthOptions(); - - HttpClient Http(m_BuildsUrl, ClientSettings); - + ZEN_CONSOLE("Running {}: {}", GetRunningExecutablePath(), ZEN_CFG_VERSION_BUILD_STRING_FULL); if (m_BlobHash.empty()) { throw zen::OptionParseException(fmt::format("Blob hash string is missing\n{}", m_UploadOptions.help())); @@ -8266,44 +9770,29 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) throw zen::OptionParseException(fmt::format("Blob hash string is invalid\n{}", m_UploadOptions.help())); } - if (m_BuildsUrl.empty() && m_StoragePath.empty()) - { - throw zen::OptionParseException(fmt::format("At least one storage option is required\n{}", m_UploadOptions.help())); - } + const Oid BuildId = Oid::FromHexString(m_BuildId); + + std::filesystem::path Path = MakeSafeAbsolutePath(m_Path); BuildStorage::Statistics StorageStats; - const Oid BuildId = Oid::FromHexString(m_BuildId); - std::unique_ptr<BuildStorage> Storage; - std::string StorageName; + BuildStorageCache::Statistics StorageCacheStats; - std::filesystem::path Path = StringToPath(m_Path); + const std::filesystem::path ZenFolderPath = + m_ZenFolderPath.empty() ? MakeSafeAbsolutePath(".") / ZenFolderName : MakeSafeAbsolutePath(m_ZenFolderPath); + CreateDirectories(ZenFolderPath); + auto _ = MakeGuard([ZenFolderPath]() { + if (CleanDirectory(ZenFolderPath, {})) + { + std::error_code DummyEc; + RemoveDir(ZenFolderPath, DummyEc); + } + }); - if (!m_BuildsUrl.empty()) - { - ZEN_CONSOLE("Using from cloud endpoint {}. SessionId: '{}'. Namespace '{}', Bucket '{}', BuildId '{}'", - m_BuildsUrl, - Http.GetSessionId(), - m_Namespace, - m_Bucket, - BuildId); - Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, Path / ZenTempStorageFolderName); - StorageName = "Cloud DDC"; - } - else if (!m_StoragePath.empty()) - { - std::filesystem::path StoragePath = StringToPath(m_StoragePath); - ZEN_CONSOLE("Using folder {}. BuildId '{}'", StoragePath, BuildId); - Storage = CreateFileBuildStorage(StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec); - StorageName = fmt::format("Disk {}", StoragePath.stem()); - } - else - { - throw zen::OptionParseException(fmt::format("Storage option is missing\n{}", m_UploadOptions.help())); - } + StorageInstance Storage = CreateBuildStorage(StorageStats, StorageCacheStats, ZenTempFolderPath(ZenFolderPath)); uint64_t CompressedSize; uint64_t DecompressedSize; - ValidateBlob(*Storage, BuildId, BlobHash, CompressedSize, DecompressedSize); + ValidateBlob(*Storage.BuildStorage, BuildId, BlobHash, CompressedSize, DecompressedSize); if (AbortFlag) { return 11; @@ -8317,15 +9806,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) if (SubOption == &m_ValidateBuildPartOptions) { - ParseStorageOptions(); - ParseAuthOptions(); - - HttpClient Http(m_BuildsUrl, ClientSettings); - - if (m_BuildsUrl.empty() && m_StoragePath.empty()) - { - throw zen::OptionParseException(fmt::format("At least one storage option is required\n{}", m_UploadOptions.help())); - } + ZEN_CONSOLE("Running {}: {}", GetRunningExecutablePath(), ZEN_CFG_VERSION_BUILD_STRING_FULL); if (m_BuildId.empty()) { @@ -8342,39 +9823,29 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) throw zen::OptionParseException(fmt::format("build-part-id conflicts with build-part-name\n{}", m_DownloadOptions.help())); } + std::filesystem::path Path = MakeSafeAbsolutePath(m_Path); + BuildStorage::Statistics StorageStats; - std::unique_ptr<BuildStorage> Storage; - std::string StorageName; + BuildStorageCache::Statistics StorageCacheStats; + + const std::filesystem::path ZenFolderPath = + m_ZenFolderPath.empty() ? MakeSafeAbsolutePath(".") / ZenFolderName : MakeSafeAbsolutePath(m_ZenFolderPath); + CreateDirectories(ZenFolderPath); + auto _ = MakeGuard([ZenFolderPath]() { + if (CleanDirectory(ZenFolderPath, {})) + { + std::error_code DummyEc; + RemoveDir(ZenFolderPath, DummyEc); + } + }); - std::filesystem::path Path = StringToPath(m_Path); + StorageInstance Storage = CreateBuildStorage(StorageStats, StorageCacheStats, ZenTempFolderPath(ZenFolderPath)); - if (!m_BuildsUrl.empty()) - { - ZEN_CONSOLE("Using from cloud endpoint {}. SessionId: '{}'. Namespace '{}', Bucket '{}', BuildId '{}'", - m_BuildsUrl, - Http.GetSessionId(), - m_Namespace, - m_Bucket, - BuildId); - Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, Path / ZenTempStorageFolderName); - StorageName = "Cloud DDC"; - } - else if (!m_StoragePath.empty()) - { - std::filesystem::path StoragePath = StringToPath(m_StoragePath); - ZEN_CONSOLE("Using folder {}. BuildId '{}'", StoragePath, BuildId); - Storage = CreateFileBuildStorage(StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec); - StorageName = fmt::format("Disk {}", StoragePath.stem()); - } - else - { - throw zen::OptionParseException(fmt::format("Storage option is missing\n{}", m_UploadOptions.help())); - } Oid BuildPartId = Oid::TryFromHexString(m_BuildPartId); ValidateStatistics ValidateStats; DownloadStatistics DownloadStats; - ValidateBuildPart(*Storage, BuildId, BuildPartId, m_BuildPartName, ValidateStats, DownloadStats); + ValidateBuildPart(*Storage.BuildStorage, BuildId, BuildPartId, m_BuildPartName, ValidateStats, DownloadStats); return AbortFlag ? 13 : 0; } |