diff options
| author | Dan Engelbrecht <[email protected]> | 2026-04-15 09:53:10 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2026-04-15 09:53:10 +0200 |
| commit | d687f23deeb9ffe6514586193cd247bc11863ff1 (patch) | |
| tree | 82932fb822622afc5bd18b87cc2119d9ed1488fe /src | |
| parent | fix OAuth client credentials content type override (#957) (diff) | |
| download | zen-d687f23deeb9ffe6514586193cd247bc11863ff1.tar.xz zen-d687f23deeb9ffe6514586193cd247bc11863ff1.zip | |
fix memory usage for cache upload (#959)
* remove obsolete prime-cache-only flag
* if a downloaded blob should be sent to cache, make sure it is disk based
keeping it in memory overloads memory when boost-worker-memory is enabled
Diffstat (limited to 'src')
| -rw-r--r-- | src/zen/cmds/builds_cmd.cpp | 248 | ||||
| -rw-r--r-- | src/zen/cmds/builds_cmd.h | 3 | ||||
| -rw-r--r-- | src/zenremotestore/builds/buildstoragecache.cpp | 63 | ||||
| -rw-r--r-- | src/zenremotestore/builds/buildstorageoperations.cpp | 504 | ||||
| -rw-r--r-- | src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h | 5 |
5 files changed, 326 insertions, 497 deletions
diff --git a/src/zen/cmds/builds_cmd.cpp b/src/zen/cmds/builds_cmd.cpp index 9a1ec2378..0db729bce 100644 --- a/src/zen/cmds/builds_cmd.cpp +++ b/src/zen/cmds/builds_cmd.cpp @@ -1232,7 +1232,6 @@ namespace builds_impl { EPartialBlockRequestMode PartialBlockRequestMode = EPartialBlockRequestMode::Mixed; bool CleanTargetFolder = false; bool PostDownloadVerify = false; - bool PrimeCacheOnly = false; bool EnableOtherDownloadsScavenging = true; bool EnableTargetFolderScavenging = true; bool AllowFileClone = true; @@ -1272,9 +1271,6 @@ namespace builds_impl { auto EndProgress = MakeGuard([&]() { ProgressBar::SetLogOperationProgress(ProgressMode, TaskSteps::StepCount, TaskSteps::StepCount); }); - ZEN_ASSERT((!Options.PrimeCacheOnly) || - (Options.PrimeCacheOnly && (Options.PartialBlockRequestMode == EPartialBlockRequestMode::Off))); - Stopwatch DownloadTimer; ProgressBar::SetLogOperationProgress(ProgressMode, TaskSteps::CheckState, TaskSteps::StepCount); @@ -1327,75 +1323,67 @@ namespace builds_impl { BuildSaveState LocalState; - if (!Options.PrimeCacheOnly) + if (IsDir(Path)) { - if (IsDir(Path)) + if (!ChunkController && !IsQuiet) { - if (!ChunkController && !IsQuiet) - { - ZEN_CONSOLE_INFO("Unspecified chunking algorithm, using default"); - ChunkController = CreateStandardChunkingController(StandardChunkingControllerSettings{}); - } - std::unique_ptr<ChunkingCache> ChunkCache(CreateNullChunkingCache()); - - LocalState = GetLocalContent(Workers, - LocalFolderScanStats, - ChunkingStats, - Path, - ZenStateFilePath(Path / ZenFolderName), - *ChunkController, - *ChunkCache); - - std::vector<std::filesystem::path> UntrackedPaths = GetNewPaths(LocalState.State.ChunkedContent.Paths, RemoteContent.Paths); - - BuildSaveState UntrackedLocalContent = GetLocalStateFromPaths(Workers, - LocalFolderScanStats, - ChunkingStats, - Path, - *ChunkController, - *ChunkCache, - UntrackedPaths); - - if (!UntrackedLocalContent.State.ChunkedContent.Paths.empty()) - { - LocalState.State.ChunkedContent = - MergeChunkedFolderContents(LocalState.State.ChunkedContent, - std::vector<ChunkedFolderContent>{UntrackedLocalContent.State.ChunkedContent}); - - // TODO: Helper - LocalState.FolderState.Paths.insert(LocalState.FolderState.Paths.begin(), - UntrackedLocalContent.FolderState.Paths.begin(), - UntrackedLocalContent.FolderState.Paths.end()); - LocalState.FolderState.RawSizes.insert(LocalState.FolderState.RawSizes.begin(), - UntrackedLocalContent.FolderState.RawSizes.begin(), - UntrackedLocalContent.FolderState.RawSizes.end()); - LocalState.FolderState.Attributes.insert(LocalState.FolderState.Attributes.begin(), - UntrackedLocalContent.FolderState.Attributes.begin(), - UntrackedLocalContent.FolderState.Attributes.end()); - LocalState.FolderState.ModificationTicks.insert(LocalState.FolderState.ModificationTicks.begin(), - UntrackedLocalContent.FolderState.ModificationTicks.begin(), - UntrackedLocalContent.FolderState.ModificationTicks.end()); - } + ZEN_CONSOLE_INFO("Unspecified chunking algorithm, using default"); + ChunkController = CreateStandardChunkingController(StandardChunkingControllerSettings{}); + } + std::unique_ptr<ChunkingCache> ChunkCache(CreateNullChunkingCache()); - if (Options.AppendNewContent) - { - RemoteContent = ApplyChunkedContentOverlay(LocalState.State.ChunkedContent, - RemoteContent, - Options.IncludeWildcards, - Options.ExcludeWildcards); - } -#if ZEN_BUILD_DEBUG - ValidateChunkedFolderContent(RemoteContent, - BlockDescriptions, - LooseChunkHashes, - Options.IncludeWildcards, - Options.ExcludeWildcards); -#endif // ZEN_BUILD_DEBUG + LocalState = GetLocalContent(Workers, + LocalFolderScanStats, + ChunkingStats, + Path, + ZenStateFilePath(Path / ZenFolderName), + *ChunkController, + *ChunkCache); + + std::vector<std::filesystem::path> UntrackedPaths = GetNewPaths(LocalState.State.ChunkedContent.Paths, RemoteContent.Paths); + + BuildSaveState UntrackedLocalContent = + GetLocalStateFromPaths(Workers, LocalFolderScanStats, ChunkingStats, Path, *ChunkController, *ChunkCache, UntrackedPaths); + + if (!UntrackedLocalContent.State.ChunkedContent.Paths.empty()) + { + LocalState.State.ChunkedContent = + MergeChunkedFolderContents(LocalState.State.ChunkedContent, + std::vector<ChunkedFolderContent>{UntrackedLocalContent.State.ChunkedContent}); + + // TODO: Helper + LocalState.FolderState.Paths.insert(LocalState.FolderState.Paths.begin(), + UntrackedLocalContent.FolderState.Paths.begin(), + UntrackedLocalContent.FolderState.Paths.end()); + LocalState.FolderState.RawSizes.insert(LocalState.FolderState.RawSizes.begin(), + UntrackedLocalContent.FolderState.RawSizes.begin(), + UntrackedLocalContent.FolderState.RawSizes.end()); + LocalState.FolderState.Attributes.insert(LocalState.FolderState.Attributes.begin(), + UntrackedLocalContent.FolderState.Attributes.begin(), + UntrackedLocalContent.FolderState.Attributes.end()); + LocalState.FolderState.ModificationTicks.insert(LocalState.FolderState.ModificationTicks.begin(), + UntrackedLocalContent.FolderState.ModificationTicks.begin(), + UntrackedLocalContent.FolderState.ModificationTicks.end()); } - else + + if (Options.AppendNewContent) { - CreateDirectories(Path); + RemoteContent = ApplyChunkedContentOverlay(LocalState.State.ChunkedContent, + RemoteContent, + Options.IncludeWildcards, + Options.ExcludeWildcards); } +#if ZEN_BUILD_DEBUG + ValidateChunkedFolderContent(RemoteContent, + BlockDescriptions, + LooseChunkHashes, + Options.IncludeWildcards, + Options.ExcludeWildcards); +#endif // ZEN_BUILD_DEBUG + } + else + { + CreateDirectories(Path); } if (AbortFlag) { @@ -1504,7 +1492,6 @@ namespace builds_impl { .PreferredMultipartChunkSize = PreferredMultipartChunkSize, .PartialBlockRequestMode = Options.PartialBlockRequestMode, .WipeTargetFolder = Options.CleanTargetFolder, - .PrimeCacheOnly = Options.PrimeCacheOnly, .EnableOtherDownloadsScavenging = Options.EnableOtherDownloadsScavenging, .EnableTargetFolderScavenging = Options.EnableTargetFolderScavenging || Options.AppendNewContent, .ValidateCompletedSequences = Options.PostDownloadVerify, @@ -1524,40 +1511,37 @@ namespace builds_impl { VerifyFolderStatistics VerifyFolderStats; if (!AbortFlag) { - if (!Options.PrimeCacheOnly) + AddDownloadedPath(Options.SystemRootDir, + BuildsDownloadInfo{.Selection = LocalState.State.Selection, + .LocalPath = Path, + .StateFilePath = ZenStateFilePath(Options.ZenFolderPath), + .Iso8601Date = DateTime::Now().ToIso8601()}); + + ProgressBar::SetLogOperationProgress(ProgressMode, TaskSteps::Verify, TaskSteps::StepCount); + + VerifyFolder(Workers, + RemoteContent, + RemoteLookup, + Path, + Options.ExcludeFolders, + Options.PostDownloadVerify, + VerifyFolderStats); + + Stopwatch WriteStateTimer; + CbObject StateObject = CreateBuildSaveStateObject(LocalState); + + CreateDirectories(ZenStateFilePath(Options.ZenFolderPath).parent_path()); + TemporaryFile::SafeWriteFile(ZenStateFilePath(Options.ZenFolderPath), StateObject.GetView()); + if (!IsQuiet) { - AddDownloadedPath(Options.SystemRootDir, - BuildsDownloadInfo{.Selection = LocalState.State.Selection, - .LocalPath = Path, - .StateFilePath = ZenStateFilePath(Options.ZenFolderPath), - .Iso8601Date = DateTime::Now().ToIso8601()}); - - ProgressBar::SetLogOperationProgress(ProgressMode, TaskSteps::Verify, TaskSteps::StepCount); - - VerifyFolder(Workers, - RemoteContent, - RemoteLookup, - Path, - Options.ExcludeFolders, - Options.PostDownloadVerify, - VerifyFolderStats); - - Stopwatch WriteStateTimer; - CbObject StateObject = CreateBuildSaveStateObject(LocalState); - - CreateDirectories(ZenStateFilePath(Options.ZenFolderPath).parent_path()); - TemporaryFile::SafeWriteFile(ZenStateFilePath(Options.ZenFolderPath), StateObject.GetView()); - if (!IsQuiet) - { - ZEN_CONSOLE("Wrote local state in {}", NiceTimeSpanMs(WriteStateTimer.GetElapsedTimeMs())); - } + ZEN_CONSOLE("Wrote local state in {}", NiceTimeSpanMs(WriteStateTimer.GetElapsedTimeMs())); + } #if 0 ExtendableStringBuilder<1024> SB; CompactBinaryToJson(StateObject, SB); WriteFile(ZenStateFileJsonPath(Options.ZenFolderPath), IoBuffer(IoBuffer::Wrap, SB.Data(), SB.Size())); #endif // 0 - } const uint64_t DownloadCount = Updater.m_DownloadStats.DownloadedChunkCount.load() + Updater.m_DownloadStats.DownloadedBlockCount.load() + Updater.m_DownloadStats.DownloadedPartialBlockCount.load(); @@ -1647,26 +1631,6 @@ namespace builds_impl { } } } - if (Options.PrimeCacheOnly) - { - if (Storage.CacheStorage) - { - Storage.CacheStorage->Flush(5000, [](intptr_t Remaining) { - if (!IsQuiet) - { - if (Remaining == 0) - { - ZEN_CONSOLE("Build cache upload complete"); - } - else - { - ZEN_CONSOLE("Waiting for build cache to complete uploading. {} blobs remaining", Remaining); - } - } - return !AbortFlag; - }); - } - } ProgressBar::SetLogOperationProgress(ProgressMode, TaskSteps::Cleanup, TaskSteps::StepCount); @@ -2794,12 +2758,8 @@ BuildsCommand::ResolveZenFolderPath(const std::filesystem::path& DefaultPath) } EPartialBlockRequestMode -BuildsCommand::ParseAllowPartialBlockRequests(bool PrimeCacheOnly, cxxopts::Options& SubOpts) +BuildsCommand::ParseAllowPartialBlockRequests(cxxopts::Options& SubOpts) { - if (PrimeCacheOnly) - { - return EPartialBlockRequestMode::Off; - } EPartialBlockRequestMode Mode = PartialBlockRequestModeFromString(m_AllowPartialBlockRequests); if (Mode == EPartialBlockRequestMode::Invalid) { @@ -3365,13 +3325,6 @@ BuildsDownloadSubCmd::BuildsDownloadSubCmd(BuildsCommand& Parent) Parent.AddAppendNewContentOptions(Opts); Parent.AddExcludeFolderOption(Opts); - Opts.add_option("cache", - "", - "cache-prime-only", - "Only download blobs missing in cache and upload to cache", - cxxopts::value(m_PrimeCacheOnly), - "<cacheprimeonly>"); - Opts.add_option("", "l", "local-path", "Root file system folder for build", cxxopts::value(m_Path), "<local-path>"); Opts.add_option("", "", "build-id", "Build Id", cxxopts::value(m_BuildId), "<id>"); Opts.add_option("", @@ -3471,36 +3424,16 @@ BuildsDownloadSubCmd::Run(const ZenCliOptions& /*GlobalOptions*/) m_BuildId, /*RequireNamespace*/ true, /*RequireBucket*/ true, - /*BoostCacheBackgroundWorkerPool*/ m_PrimeCacheOnly, + /*BoostCacheBackgroundWorkerPool*/ false, Auth, Opts); const Oid BuildId = m_Parent.ParseBuildId(m_BuildId, Opts); - if (m_PostDownloadVerify && m_PrimeCacheOnly) - { - throw OptionParseException("'--cache-prime-only' conflicts with '--verify'", Opts.help()); - } - - if (m_Clean && m_PrimeCacheOnly) - { - ZEN_CONSOLE_WARN("Ignoring '--clean' option when '--cache-prime-only' is enabled"); - } - - if (m_Force && m_PrimeCacheOnly) - { - ZEN_CONSOLE_WARN("Ignoring '--force' option when '--cache-prime-only' is enabled"); - } - - if (m_Parent.m_AllowPartialBlockRequests != "false" && m_PrimeCacheOnly) - { - ZEN_CONSOLE_WARN("Ignoring '--allow-partial-block-requests' option when '--cache-prime-only' is enabled"); - } - std::vector<Oid> BuildPartIds = m_Parent.ParseBuildPartIds(m_BuildPartIds, Opts); std::vector<std::string> BuildPartNames = m_Parent.ParseBuildPartNames(m_BuildPartNames, Opts); - EPartialBlockRequestMode PartialBlockRequestMode = m_Parent.ParseAllowPartialBlockRequests(m_PrimeCacheOnly, Opts); + EPartialBlockRequestMode PartialBlockRequestMode = m_Parent.ParseAllowPartialBlockRequests(Opts); if (m_Parent.m_AppendNewContent && m_Clean) { @@ -3529,7 +3462,6 @@ BuildsDownloadSubCmd::Run(const ZenCliOptions& /*GlobalOptions*/) .PartialBlockRequestMode = PartialBlockRequestMode, .CleanTargetFolder = m_Clean, .PostDownloadVerify = m_PostDownloadVerify, - .PrimeCacheOnly = m_PrimeCacheOnly, .EnableOtherDownloadsScavenging = m_EnableScavenging && !m_Force, .EnableTargetFolderScavenging = !m_Force, .AllowFileClone = m_AllowFileClone, @@ -4132,7 +4064,7 @@ BuildsTestSubCmd::Run(const ZenCliOptions& /*GlobalOptions*/) } }); - EPartialBlockRequestMode PartialBlockRequestMode = m_Parent.ParseAllowPartialBlockRequests(false, Opts); + EPartialBlockRequestMode PartialBlockRequestMode = m_Parent.ParseAllowPartialBlockRequests(Opts); BuildStorageBase::Statistics StorageStats; BuildStorageCache::Statistics StorageCacheStats; @@ -4284,7 +4216,6 @@ BuildsTestSubCmd::Run(const ZenCliOptions& /*GlobalOptions*/) .PartialBlockRequestMode = PartialBlockRequestMode, .CleanTargetFolder = true, .PostDownloadVerify = true, - .PrimeCacheOnly = false, .EnableOtherDownloadsScavenging = m_EnableScavenging, .EnableTargetFolderScavenging = false, .AllowFileClone = m_AllowFileClone, @@ -4316,7 +4247,6 @@ BuildsTestSubCmd::Run(const ZenCliOptions& /*GlobalOptions*/) .PartialBlockRequestMode = PartialBlockRequestMode, .CleanTargetFolder = true, .PostDownloadVerify = true, - .PrimeCacheOnly = false, .EnableOtherDownloadsScavenging = m_EnableScavenging, .EnableTargetFolderScavenging = true, .AllowFileClone = m_AllowFileClone, @@ -4344,7 +4274,6 @@ BuildsTestSubCmd::Run(const ZenCliOptions& /*GlobalOptions*/) .PartialBlockRequestMode = PartialBlockRequestMode, .CleanTargetFolder = false, .PostDownloadVerify = true, - .PrimeCacheOnly = false, .EnableOtherDownloadsScavenging = m_EnableScavenging, .EnableTargetFolderScavenging = true, .AllowFileClone = m_AllowFileClone, @@ -4374,7 +4303,6 @@ BuildsTestSubCmd::Run(const ZenCliOptions& /*GlobalOptions*/) .PartialBlockRequestMode = PartialBlockRequestMode, .CleanTargetFolder = true, .PostDownloadVerify = true, - .PrimeCacheOnly = false, .EnableOtherDownloadsScavenging = m_EnableScavenging, .EnableTargetFolderScavenging = false, .AllowFileClone = m_AllowFileClone}); @@ -4400,7 +4328,6 @@ BuildsTestSubCmd::Run(const ZenCliOptions& /*GlobalOptions*/) .PartialBlockRequestMode = PartialBlockRequestMode, .CleanTargetFolder = false, .PostDownloadVerify = true, - .PrimeCacheOnly = false, .EnableOtherDownloadsScavenging = m_EnableScavenging, .EnableTargetFolderScavenging = true, .AllowFileClone = m_AllowFileClone}); @@ -4518,7 +4445,6 @@ BuildsTestSubCmd::Run(const ZenCliOptions& /*GlobalOptions*/) .PartialBlockRequestMode = PartialBlockRequestMode, .CleanTargetFolder = false, .PostDownloadVerify = true, - .PrimeCacheOnly = false, .EnableOtherDownloadsScavenging = m_EnableScavenging, .EnableTargetFolderScavenging = true, .AllowFileClone = m_AllowFileClone}); @@ -4582,7 +4508,6 @@ BuildsTestSubCmd::Run(const ZenCliOptions& /*GlobalOptions*/) .PartialBlockRequestMode = PartialBlockRequestMode, .CleanTargetFolder = false, .PostDownloadVerify = true, - .PrimeCacheOnly = false, .EnableOtherDownloadsScavenging = m_EnableScavenging, .EnableTargetFolderScavenging = true, .AllowFileClone = m_AllowFileClone}); @@ -4607,7 +4532,6 @@ BuildsTestSubCmd::Run(const ZenCliOptions& /*GlobalOptions*/) .PartialBlockRequestMode = PartialBlockRequestMode, .CleanTargetFolder = false, .PostDownloadVerify = true, - .PrimeCacheOnly = false, .EnableOtherDownloadsScavenging = m_EnableScavenging, .EnableTargetFolderScavenging = true, .AllowFileClone = m_AllowFileClone}); @@ -4632,7 +4556,6 @@ BuildsTestSubCmd::Run(const ZenCliOptions& /*GlobalOptions*/) .PartialBlockRequestMode = PartialBlockRequestMode, .CleanTargetFolder = false, .PostDownloadVerify = true, - .PrimeCacheOnly = false, .EnableOtherDownloadsScavenging = m_EnableScavenging, .EnableTargetFolderScavenging = true, .AllowFileClone = m_AllowFileClone}); @@ -4657,7 +4580,6 @@ BuildsTestSubCmd::Run(const ZenCliOptions& /*GlobalOptions*/) .PartialBlockRequestMode = PartialBlockRequestMode, .CleanTargetFolder = false, .PostDownloadVerify = true, - .PrimeCacheOnly = false, .EnableOtherDownloadsScavenging = m_EnableScavenging, .EnableTargetFolderScavenging = true, .AllowFileClone = m_AllowFileClone}); @@ -4682,7 +4604,6 @@ BuildsTestSubCmd::Run(const ZenCliOptions& /*GlobalOptions*/) .PartialBlockRequestMode = PartialBlockRequestMode, .CleanTargetFolder = false, .PostDownloadVerify = true, - .PrimeCacheOnly = false, .EnableOtherDownloadsScavenging = m_EnableScavenging, .EnableTargetFolderScavenging = true, .AllowFileClone = m_AllowFileClone}); @@ -4742,7 +4663,7 @@ BuildsMultiTestDownloadSubCmd::Run(const ZenCliOptions& /*GlobalOptions*/) m_Parent.ResolveZenFolderPath(m_Path / ZenFolderName); - EPartialBlockRequestMode PartialBlockRequestMode = m_Parent.ParseAllowPartialBlockRequests(false, Opts); + EPartialBlockRequestMode PartialBlockRequestMode = m_Parent.ParseAllowPartialBlockRequests(Opts); BuildStorageBase::Statistics StorageStats; BuildStorageCache::Statistics StorageCacheStats; @@ -4784,7 +4705,6 @@ BuildsMultiTestDownloadSubCmd::Run(const ZenCliOptions& /*GlobalOptions*/) .PartialBlockRequestMode = PartialBlockRequestMode, .CleanTargetFolder = BuildIdString == m_BuildIds.front(), .PostDownloadVerify = true, - .PrimeCacheOnly = false, .EnableOtherDownloadsScavenging = m_EnableScavenging, .EnableTargetFolderScavenging = false, .AllowFileClone = m_AllowFileClone}); diff --git a/src/zen/cmds/builds_cmd.h b/src/zen/cmds/builds_cmd.h index 7ef71e176..ef7500fd6 100644 --- a/src/zen/cmds/builds_cmd.h +++ b/src/zen/cmds/builds_cmd.h @@ -94,7 +94,6 @@ private: bool m_EnableScavenging = true; std::filesystem::path m_DownloadSpecPath; bool m_UploadToZenCache = true; - bool m_PrimeCacheOnly = false; bool m_AllowFileClone = true; }; @@ -280,7 +279,7 @@ public: cxxopts::Options& SubOpts); void ParsePath(std::filesystem::path& Path, cxxopts::Options& SubOpts); IoHash ParseBlobHash(const std::string& BlobHashStr, cxxopts::Options& SubOpts); - EPartialBlockRequestMode ParseAllowPartialBlockRequests(bool PrimeCacheOnly, cxxopts::Options& SubOpts); + EPartialBlockRequestMode ParseAllowPartialBlockRequests(cxxopts::Options& SubOpts); void ParseZenProcessId(int& ZenProcessId); void ParseFileFilters(std::vector<std::string>& OutIncludeWildcards, std::vector<std::string>& OutExcludeWildcards); void ParseExcludeFolderAndExtension(std::vector<std::string>& OutExcludeFolders, std::vector<std::string>& OutExcludeExtensions); diff --git a/src/zenremotestore/builds/buildstoragecache.cpp b/src/zenremotestore/builds/buildstoragecache.cpp index 0e0b14dca..40ea757eb 100644 --- a/src/zenremotestore/builds/buildstoragecache.cpp +++ b/src/zenremotestore/builds/buildstoragecache.cpp @@ -96,7 +96,8 @@ public: ZEN_ASSERT(!IsFlushed); ZEN_ASSERT(ContentType == ZenContentType::kCompressedBinary); - // Move all segments in Payload to be file handle based so if Payload is materialized it does not affect buffers in queue + // Move all segments in Payload to be file handle based unless they are very small so if Payload is materialized it does not affect + // buffers in queue std::vector<SharedBuffer> FileBasedSegments; std::span<const SharedBuffer> Segments = Payload.GetSegments(); FileBasedSegments.reserve(Segments.size()); @@ -104,42 +105,56 @@ public: tsl::robin_map<void*, std::filesystem::path> HandleToPath; for (const SharedBuffer& Segment : Segments) { - std::filesystem::path FilePath; - IoBufferFileReference Ref; - if (Segment.AsIoBuffer().GetFileReference(Ref)) + const uint64_t SegmentSize = Segment.GetSize(); + if (SegmentSize < 16u * 1024u) { - if (auto It = HandleToPath.find(Ref.FileHandle); It != HandleToPath.end()) - { - FilePath = It->second; - } - else + FileBasedSegments.push_back(Segment); + } + else + { + std::filesystem::path FilePath; + IoBufferFileReference Ref; + if (Segment.AsIoBuffer().GetFileReference(Ref)) { - std::error_code Ec; - std::filesystem::path Path = PathFromHandle(Ref.FileHandle, Ec); - if (!Ec && !Path.empty()) + if (auto It = HandleToPath.find(Ref.FileHandle); It != HandleToPath.end()) { - HandleToPath.insert_or_assign(Ref.FileHandle, Path); - FilePath = std::move(Path); + FilePath = It->second; + } + else + { + std::error_code Ec; + std::filesystem::path Path = PathFromHandle(Ref.FileHandle, Ec); + if (!Ec && !Path.empty()) + { + HandleToPath.insert_or_assign(Ref.FileHandle, Path); + FilePath = std::move(Path); + } + else + { + ZEN_WARN("Failed getting path for chunk to upload to cache. Skipping upload."); + return; + } } } - } - if (!FilePath.empty()) - { - IoBuffer BufferFromFile = IoBufferBuilder::MakeFromFile(FilePath, Ref.FileChunkOffset, Ref.FileChunkSize); - if (BufferFromFile) + if (!FilePath.empty()) { - FileBasedSegments.push_back(SharedBuffer(std::move(BufferFromFile))); + IoBuffer BufferFromFile = IoBufferBuilder::MakeFromFile(FilePath, Ref.FileChunkOffset, Ref.FileChunkSize); + if (BufferFromFile) + { + FileBasedSegments.push_back(SharedBuffer(std::move(BufferFromFile))); + } + else + { + ZEN_WARN("Failed opening file '{}' to upload to cache. Skipping upload.", FilePath); + return; + } } else { FileBasedSegments.push_back(Segment); } } - else - { - FileBasedSegments.push_back(Segment); - } } } diff --git a/src/zenremotestore/builds/buildstorageoperations.cpp b/src/zenremotestore/builds/buildstorageoperations.cpp index 3a41cd7eb..389f8614d 100644 --- a/src/zenremotestore/builds/buildstorageoperations.cpp +++ b/src/zenremotestore/builds/buildstorageoperations.cpp @@ -382,6 +382,46 @@ namespace { return CompositeBuffer{}; } + std::filesystem::path TryMoveDownloadedChunk(IoBuffer& BlockBuffer, const std::filesystem::path& Path, bool ForceDiskBased) + { + uint64_t BlockSize = BlockBuffer.GetSize(); + IoBufferFileReference FileRef; + if (BlockBuffer.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) && (FileRef.FileChunkSize == BlockSize)) + { + ZEN_TRACE_CPU("MoveTempFullBlock"); + std::error_code Ec; + std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec); + if (!Ec) + { + BlockBuffer.SetDeleteOnClose(false); + BlockBuffer = {}; + RenameFile(TempBlobPath, Path, Ec); + if (Ec) + { + // Re-open the temp file again + BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete); + BlockBuffer = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, BlockSize, true); + BlockBuffer.SetDeleteOnClose(true); + } + else + { + return Path; + } + } + } + + if (ForceDiskBased) + { + // Could not be moved and rather large, lets store it on disk + ZEN_TRACE_CPU("WriteTempFullBlock"); + TemporaryFile::SafeWriteFile(Path, BlockBuffer); + BlockBuffer = {}; + return Path; + } + + return {}; + } + } // namespace class ReadFileCache @@ -673,9 +713,6 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) auto EndProgress = MakeGuard([&]() { m_LogOutput.SetLogOperationProgress((uint32_t)TaskSteps::StepCount, (uint32_t)TaskSteps::StepCount); }); - ZEN_ASSERT((!m_Options.PrimeCacheOnly) || - (m_Options.PrimeCacheOnly && (m_Options.PartialBlockRequestMode == EPartialBlockRequestMode::Off))); - m_LogOutput.SetLogOperationProgress((uint32_t)TaskSteps::ScanExistingData, (uint32_t)TaskSteps::StepCount); CreateDirectories(m_CacheFolderPath); @@ -690,20 +727,14 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> CachedChunkHashesFound; tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> CachedSequenceHashesFound; - if (!m_Options.PrimeCacheOnly) - { - ScanCacheFolder(CachedChunkHashesFound, CachedSequenceHashesFound); - } + ScanCacheFolder(CachedChunkHashesFound, CachedSequenceHashesFound); tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> CachedBlocksFound; - if (!m_Options.PrimeCacheOnly) - { - ScanTempBlocksFolder(CachedBlocksFound); - } + ScanTempBlocksFolder(CachedBlocksFound); tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> SequenceIndexesLeftToFindToRemoteIndex; - if (!m_Options.PrimeCacheOnly && m_Options.EnableTargetFolderScavenging) + if (m_Options.EnableTargetFolderScavenging) { // Pick up all whole files we can use from current local state ZEN_TRACE_CPU("GetLocalSequences"); @@ -738,7 +769,7 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) std::vector<ScavengedSequenceCopyOperation> ScavengedSequenceCopyOperations; uint64_t ScavengedPathsCount = 0; - if (!m_Options.PrimeCacheOnly && m_Options.EnableOtherDownloadsScavenging) + if (m_Options.EnableOtherDownloadsScavenging) { ZEN_TRACE_CPU("GetScavengedSequences"); @@ -882,7 +913,7 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) tsl::robin_map<IoHash, size_t, IoHash::Hasher> RawHashToCopyChunkDataIndex; std::vector<CopyChunkData> CopyChunkDatas; - if (!m_Options.PrimeCacheOnly && m_Options.EnableTargetFolderScavenging) + if (m_Options.EnableTargetFolderScavenging) { ZEN_TRACE_CPU("GetLocalChunks"); @@ -902,7 +933,7 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) m_CacheMappingStats.LocalScanElapsedWallTimeUs += LocalTimer.GetElapsedTimeUs(); } - if (!m_Options.PrimeCacheOnly && m_Options.EnableOtherDownloadsScavenging) + if (m_Options.EnableOtherDownloadsScavenging) { ZEN_TRACE_CPU("GetScavengeChunks"); @@ -1025,30 +1056,23 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) ZEN_TRACE_CPU("BlockCacheFileExists"); for (const ChunkBlockAnalyser::NeededBlock& NeededBlock : NeededBlocks) { - if (m_Options.PrimeCacheOnly) + const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[NeededBlock.BlockIndex]; + bool UsingCachedBlock = false; + if (auto It = CachedBlocksFound.find(BlockDescription.BlockHash); It != CachedBlocksFound.end()) { - FetchBlockIndexes.push_back(NeededBlock.BlockIndex); - } - else - { - const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[NeededBlock.BlockIndex]; - bool UsingCachedBlock = false; - if (auto It = CachedBlocksFound.find(BlockDescription.BlockHash); It != CachedBlocksFound.end()) - { - TotalPartWriteCount++; + TotalPartWriteCount++; - std::filesystem::path BlockPath = m_TempBlockFolderPath / BlockDescription.BlockHash.ToHexString(); - if (IsFile(BlockPath)) - { - CachedChunkBlockIndexes.push_back(NeededBlock.BlockIndex); - UsingCachedBlock = true; - } - } - if (!UsingCachedBlock) + std::filesystem::path BlockPath = m_TempBlockFolderPath / BlockDescription.BlockHash.ToHexString(); + if (IsFile(BlockPath)) { - FetchBlockIndexes.push_back(NeededBlock.BlockIndex); + CachedChunkBlockIndexes.push_back(NeededBlock.BlockIndex); + UsingCachedBlock = true; } } + if (!UsingCachedBlock) + { + FetchBlockIndexes.push_back(NeededBlock.BlockIndex); + } } } @@ -1229,10 +1253,9 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) FilteredRate FilteredDownloadedBytesPerSecond; FilteredRate FilteredWrittenBytesPerSecond; - std::unique_ptr<OperationLogOutput::ProgressBar> WriteProgressBarPtr( - m_LogOutput.CreateProgressBar(m_Options.PrimeCacheOnly ? "Downloading" : "Writing")); - OperationLogOutput::ProgressBar& WriteProgressBar(*WriteProgressBarPtr); - ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog); + std::unique_ptr<OperationLogOutput::ProgressBar> WriteProgressBarPtr(m_LogOutput.CreateProgressBar("Writing")); + OperationLogOutput::ProgressBar& WriteProgressBar(*WriteProgressBarPtr); + ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog); TotalPartWriteCount += CopyChunkDatas.size(); TotalPartWriteCount += ScavengedSequenceCopyOperations.size(); @@ -1245,37 +1268,34 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) { break; } - if (!m_Options.PrimeCacheOnly) - { - Work.ScheduleWork( - m_IOWorkerPool, - [this, - &ScavengedPaths, - &ScavengedSequenceCopyOperations, - &ScavengedContents, - &FilteredWrittenBytesPerSecond, - ScavengeOpIndex, - &WritePartsComplete, - TotalPartWriteCount](std::atomic<bool>&) mutable { - if (!m_AbortFlag) - { - ZEN_TRACE_CPU("Async_WriteScavenged"); + Work.ScheduleWork( + m_IOWorkerPool, + [this, + &ScavengedPaths, + &ScavengedSequenceCopyOperations, + &ScavengedContents, + &FilteredWrittenBytesPerSecond, + ScavengeOpIndex, + &WritePartsComplete, + TotalPartWriteCount](std::atomic<bool>&) mutable { + if (!m_AbortFlag) + { + ZEN_TRACE_CPU("Async_WriteScavenged"); - FilteredWrittenBytesPerSecond.Start(); + FilteredWrittenBytesPerSecond.Start(); - const ScavengedSequenceCopyOperation& ScavengeOp = ScavengedSequenceCopyOperations[ScavengeOpIndex]; - const ChunkedFolderContent& ScavengedContent = ScavengedContents[ScavengeOp.ScavengedContentIndex]; - const std::filesystem::path& ScavengeRootPath = ScavengedPaths[ScavengeOp.ScavengedContentIndex]; + const ScavengedSequenceCopyOperation& ScavengeOp = ScavengedSequenceCopyOperations[ScavengeOpIndex]; + const ChunkedFolderContent& ScavengedContent = ScavengedContents[ScavengeOp.ScavengedContentIndex]; + const std::filesystem::path& ScavengeRootPath = ScavengedPaths[ScavengeOp.ScavengedContentIndex]; - WriteScavengedSequenceToCache(ScavengeRootPath, ScavengedContent, ScavengeOp); + WriteScavengedSequenceToCache(ScavengeRootPath, ScavengedContent, ScavengeOp); - if (WritePartsComplete.fetch_add(1) + 1 == TotalPartWriteCount) - { - FilteredWrittenBytesPerSecond.Stop(); - } + if (WritePartsComplete.fetch_add(1) + 1 == TotalPartWriteCount) + { + FilteredWrittenBytesPerSecond.Stop(); } - }); - } + } + }); } for (uint32_t LooseChunkHashWorkIndex = 0; LooseChunkHashWorkIndex < LooseChunkHashWorks.size(); LooseChunkHashWorkIndex++) @@ -1285,16 +1305,6 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) break; } - if (m_Options.PrimeCacheOnly) - { - const uint32_t RemoteChunkIndex = LooseChunkHashWorks[LooseChunkHashWorkIndex].RemoteChunkIndex; - if (ExistsResult.ExistingBlobs.contains(m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex])) - { - m_DownloadStats.RequestsCompleteCount++; - continue; - } - } - Work.ScheduleWork( m_IOWorkerPool, [this, @@ -1338,7 +1348,6 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) for (size_t CopyDataIndex = 0; CopyDataIndex < CopyChunkDatas.size(); CopyDataIndex++) { - ZEN_ASSERT(!m_Options.PrimeCacheOnly); if (m_AbortFlag) { break; @@ -1397,7 +1406,6 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) for (uint32_t BlockIndex : CachedChunkBlockIndexes) { - ZEN_ASSERT(!m_Options.PrimeCacheOnly); if (m_AbortFlag) { break; @@ -1464,7 +1472,6 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) for (size_t BlockRangeIndex = 0; BlockRangeIndex < PartialBlocks.BlockRanges.size();) { - ZEN_ASSERT(!m_Options.PrimeCacheOnly); if (m_AbortFlag) { break; @@ -1627,12 +1634,6 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) break; } - if (m_Options.PrimeCacheOnly && ExistsResult.ExistingBlobs.contains(m_BlockDescriptions[BlockIndex].BlockHash)) - { - m_DownloadStats.RequestsCompleteCount++; - continue; - } - Work.ScheduleWork( m_NetworkPool, [this, @@ -1684,14 +1685,6 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) throw std::runtime_error(fmt::format("Block {} is missing", BlockDescription.BlockHash)); } - if (m_Storage.CacheStorage && m_Options.PopulateCache) - { - m_Storage.CacheStorage->PutBuildBlob(m_BuildId, - BlockDescription.BlockHash, - ZenContentType::kCompressedBinary, - CompositeBuffer(SharedBuffer(BlockBuffer))); - } - uint64_t BlockSize = BlockBuffer.GetSize(); m_DownloadStats.DownloadedBlockCount++; m_DownloadStats.DownloadedBlockByteCount += BlockSize; @@ -1700,47 +1693,27 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) FilteredDownloadedBytesPerSecond.Stop(); } - if (!m_Options.PrimeCacheOnly) - { - std::filesystem::path BlockChunkPath; + const bool PutInCache = !ExistsInCache && m_Storage.CacheStorage && m_Options.PopulateCache; - // Check if the dowloaded block is file based and we can move it directly without rewriting it - { - IoBufferFileReference FileRef; - if (BlockBuffer.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) && - (FileRef.FileChunkSize == BlockSize)) - { - ZEN_TRACE_CPU("MoveTempFullBlock"); - std::error_code Ec; - std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec); - if (!Ec) - { - BlockBuffer.SetDeleteOnClose(false); - BlockBuffer = {}; - BlockChunkPath = m_TempBlockFolderPath / BlockDescription.BlockHash.ToHexString(); - RenameFile(TempBlobPath, BlockChunkPath, Ec); - if (Ec) - { - BlockChunkPath = std::filesystem::path{}; + std::filesystem::path BlockChunkPath = TryMoveDownloadedChunk( + BlockBuffer, + m_TempBlockFolderPath / BlockDescription.BlockHash.ToHexString(), + /* ForceDiskBased */ PutInCache || (BlockSize > m_Options.MaximumInMemoryPayloadSize)); - // Re-open the temp file again - BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete); - BlockBuffer = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, BlockSize, true); - BlockBuffer.SetDeleteOnClose(true); - } - } - } - } - - if (BlockChunkPath.empty() && (BlockSize > m_Options.MaximumInMemoryPayloadSize)) + if (PutInCache) + { + ZEN_ASSERT(!BlockChunkPath.empty()); + IoBuffer CacheBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath); + if (CacheBuffer) { - ZEN_TRACE_CPU("WriteTempFullBlock"); - // Could not be moved and rather large, lets store it on disk - BlockChunkPath = m_TempBlockFolderPath / BlockDescription.BlockHash.ToHexString(); - TemporaryFile::SafeWriteFile(BlockChunkPath, BlockBuffer); - BlockBuffer = {}; + m_Storage.CacheStorage->PutBuildBlob(m_BuildId, + BlockDescription.BlockHash, + ZenContentType::kCompressedBinary, + CompositeBuffer(SharedBuffer(CacheBuffer))); } + } + { if (!m_AbortFlag) { Work.ScheduleWork( @@ -1840,12 +1813,11 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) { CloneDetails = fmt::format(" ({} cloned)", NiceBytes(m_DiskStats.CloneByteCount.load())); } - std::string WriteDetails = m_Options.PrimeCacheOnly ? "" - : fmt::format(" {}/{} ({}B/s) written{}", - NiceBytes(m_WrittenChunkByteCount.load()), - NiceBytes(BytesToWrite), - NiceNum(FilteredWrittenBytesPerSecond.GetCurrent()), - CloneDetails); + std::string WriteDetails = fmt::format(" {}/{} ({}B/s) written{}", + NiceBytes(m_WrittenChunkByteCount.load()), + NiceBytes(BytesToWrite), + NiceNum(FilteredWrittenBytesPerSecond.GetCurrent()), + CloneDetails); std::string Details = fmt::format("{}/{} ({}{}) downloaded.{}", m_DownloadStats.RequestsCompleteCount.load(), @@ -1855,11 +1827,7 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) WriteDetails); std::string Task; - if (m_Options.PrimeCacheOnly) - { - Task = "Downloading "; - } - else if ((m_WrittenChunkByteCount < BytesToWrite) || (BytesToValidate == 0)) + if ((m_WrittenChunkByteCount < BytesToWrite) || (BytesToValidate == 0)) { Task = "Writing chunks "; } @@ -1868,15 +1836,13 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) Task = "Verifying chunks "; } - WriteProgressBar.UpdateState( - {.Task = Task, - .Details = Details, - .TotalCount = m_Options.PrimeCacheOnly ? TotalRequestCount : (BytesToWrite + BytesToValidate), - .RemainingCount = m_Options.PrimeCacheOnly ? (TotalRequestCount - m_DownloadStats.RequestsCompleteCount.load()) - : ((BytesToWrite + BytesToValidate) - - (m_WrittenChunkByteCount.load() + m_ValidatedChunkByteCount.load())), - .Status = OperationLogOutput::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, - false); + WriteProgressBar.UpdateState({.Task = Task, + .Details = Details, + .TotalCount = (BytesToWrite + BytesToValidate), + .RemainingCount = ((BytesToWrite + BytesToValidate) - + (m_WrittenChunkByteCount.load() + m_ValidatedChunkByteCount.load())), + .Status = OperationLogOutput::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, + false); }); } @@ -1891,34 +1857,31 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) return; } - if (!m_Options.PrimeCacheOnly) + uint32_t RawSequencesMissingWriteCount = 0; + for (uint32_t SequenceIndex = 0; SequenceIndex < SequenceIndexChunksLeftToWriteCounters.size(); SequenceIndex++) { - uint32_t RawSequencesMissingWriteCount = 0; - for (uint32_t SequenceIndex = 0; SequenceIndex < SequenceIndexChunksLeftToWriteCounters.size(); SequenceIndex++) + const auto& SequenceIndexChunksLeftToWriteCounter = SequenceIndexChunksLeftToWriteCounters[SequenceIndex]; + if (SequenceIndexChunksLeftToWriteCounter.load() != 0) { - const auto& SequenceIndexChunksLeftToWriteCounter = SequenceIndexChunksLeftToWriteCounters[SequenceIndex]; - if (SequenceIndexChunksLeftToWriteCounter.load() != 0) + RawSequencesMissingWriteCount++; + const uint32_t PathIndex = m_RemoteLookup.SequenceIndexFirstPathIndex[SequenceIndex]; + const std::filesystem::path& IncompletePath = m_RemoteContent.Paths[PathIndex]; + ZEN_ASSERT(!IncompletePath.empty()); + const uint32_t ExpectedSequenceCount = m_RemoteContent.ChunkedContent.ChunkCounts[SequenceIndex]; + if (!m_Options.IsQuiet) { - RawSequencesMissingWriteCount++; - const uint32_t PathIndex = m_RemoteLookup.SequenceIndexFirstPathIndex[SequenceIndex]; - const std::filesystem::path& IncompletePath = m_RemoteContent.Paths[PathIndex]; - ZEN_ASSERT(!IncompletePath.empty()); - const uint32_t ExpectedSequenceCount = m_RemoteContent.ChunkedContent.ChunkCounts[SequenceIndex]; - if (!m_Options.IsQuiet) - { - ZEN_OPERATION_LOG_INFO(m_LogOutput, - "{}: Max count {}, Current count {}", - IncompletePath, - ExpectedSequenceCount, - SequenceIndexChunksLeftToWriteCounter.load()); - } - ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounter.load() <= ExpectedSequenceCount); + ZEN_OPERATION_LOG_INFO(m_LogOutput, + "{}: Max count {}, Current count {}", + IncompletePath, + ExpectedSequenceCount, + SequenceIndexChunksLeftToWriteCounter.load()); } + ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounter.load() <= ExpectedSequenceCount); } - ZEN_ASSERT(RawSequencesMissingWriteCount == 0); - ZEN_ASSERT(m_WrittenChunkByteCount == BytesToWrite); - ZEN_ASSERT(m_ValidatedChunkByteCount == BytesToValidate); } + ZEN_ASSERT(RawSequencesMissingWriteCount == 0); + ZEN_ASSERT(m_WrittenChunkByteCount == BytesToWrite); + ZEN_ASSERT(m_ValidatedChunkByteCount == BytesToValidate); const uint64_t DownloadedBytes = m_DownloadStats.DownloadedChunkByteCount.load() + m_DownloadStats.DownloadedBlockByteCount.load() + @@ -1948,11 +1911,6 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) m_WriteChunkStats.WriteTimeUs = FilteredWrittenBytesPerSecond.GetElapsedTimeUS(); } - if (m_Options.PrimeCacheOnly) - { - return; - } - m_LogOutput.SetLogOperationProgress((uint32_t)TaskSteps::PrepareTarget, (uint32_t)TaskSteps::StepCount); tsl::robin_map<uint32_t, uint32_t> RemotePathIndexToLocalPathIndex; @@ -3104,17 +3062,13 @@ BuildsOperationUpdateFolder::WriteLooseChunk(const uint32_t RemoteChunkInd FilteredRate& FilteredDownloadedBytesPerSecond, FilteredRate& FilteredWrittenBytesPerSecond) { - std::filesystem::path ExistingCompressedChunkPath; - if (!m_Options.PrimeCacheOnly) + const IoHash& ChunkHash = m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; + std::filesystem::path ExistingCompressedChunkPath = FindDownloadedChunk(ChunkHash); + if (!ExistingCompressedChunkPath.empty()) { - const IoHash& ChunkHash = m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; - ExistingCompressedChunkPath = FindDownloadedChunk(ChunkHash); - if (!ExistingCompressedChunkPath.empty()) + if (m_DownloadStats.RequestsCompleteCount.fetch_add(1) + 1 == TotalRequestCount) { - if (m_DownloadStats.RequestsCompleteCount.fetch_add(1) + 1 == TotalRequestCount) - { - FilteredDownloadedBytesPerSecond.Stop(); - } + FilteredDownloadedBytesPerSecond.Stop(); } } if (!m_AbortFlag) @@ -3220,10 +3174,9 @@ BuildsOperationUpdateFolder::WriteLooseChunk(const uint32_t RemoteChunkInd RemoteChunkIndex, &FilteredWrittenBytesPerSecond, ChunkTargetPtrs = std::move(ChunkTargetPtrs)](IoBuffer&& Payload) mutable { - IoBufferFileReference FileRef; - bool EnableBacklog = Payload.GetFileReference(FileRef); AsyncWriteDownloadedChunk(m_Options.ZenFolderPath, RemoteChunkIndex, + ExistsResult, std::move(ChunkTargetPtrs), WriteCache, Work, @@ -3231,8 +3184,7 @@ BuildsOperationUpdateFolder::WriteLooseChunk(const uint32_t RemoteChunkInd SequenceIndexChunksLeftToWriteCounters, WritePartsComplete, TotalPartWriteCount, - FilteredWrittenBytesPerSecond, - EnableBacklog); + FilteredWrittenBytesPerSecond); }); } }); @@ -3293,14 +3245,6 @@ BuildsOperationUpdateFolder::DownloadBuildBlob(uint32_t RemoteChunkInde FilteredDownloadedBytesPerSecond.Stop(); } - if (Payload && m_Storage.CacheStorage && m_Options.PopulateCache) - { - m_Storage.CacheStorage->PutBuildBlob(m_BuildId, - ChunkHash, - ZenContentType::kCompressedBinary, - CompositeBuffer(SharedBuffer(Payload))); - } - OnDownloaded(std::move(Payload)); }); } @@ -3320,31 +3264,22 @@ BuildsOperationUpdateFolder::DownloadBuildBlob(uint32_t RemoteChunkInde } if (!m_AbortFlag) { - if (BuildBlob && m_Storage.CacheStorage && m_Options.PopulateCache) - { - m_Storage.CacheStorage->PutBuildBlob(m_BuildId, - ChunkHash, - ZenContentType::kCompressedBinary, - CompositeBuffer(SharedBuffer(BuildBlob))); - } if (!BuildBlob) { throw std::runtime_error(fmt::format("Chunk {} is missing", ChunkHash)); } - if (!m_Options.PrimeCacheOnly) + + if (!m_AbortFlag) { - if (!m_AbortFlag) + uint64_t BlobSize = BuildBlob.GetSize(); + m_DownloadStats.DownloadedChunkCount++; + m_DownloadStats.DownloadedChunkByteCount += BlobSize; + if (m_DownloadStats.RequestsCompleteCount.fetch_add(1) + 1 == TotalRequestCount) { - uint64_t BlobSize = BuildBlob.GetSize(); - m_DownloadStats.DownloadedChunkCount++; - m_DownloadStats.DownloadedChunkByteCount += BlobSize; - if (m_DownloadStats.RequestsCompleteCount.fetch_add(1) + 1 == TotalRequestCount) - { - FilteredDownloadedBytesPerSecond.Stop(); - } - - OnDownloaded(std::move(BuildBlob)); + FilteredDownloadedBytesPerSecond.Stop(); } + + OnDownloaded(std::move(BuildBlob)); } } } @@ -3388,61 +3323,17 @@ BuildsOperationUpdateFolder::DownloadPartialBlock( FilteredDownloadedBytesPerSecond.Stop(); } - std::filesystem::path BlockChunkPath; - - // Check if the dowloaded block is file based and we can move it directly without rewriting it + IoHashStream RangeId; + for (const std::pair<uint64_t, uint64_t>& Range : BlockOffsetAndLengths) { - IoBufferFileReference FileRef; - if (BlockRangeBuffer.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) && - (FileRef.FileChunkSize == BlockRangeBufferSize)) - { - ZEN_TRACE_CPU("MoveTempPartialBlock"); - - std::error_code Ec; - std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec); - if (!Ec) - { - BlockRangeBuffer.SetDeleteOnClose(false); - BlockRangeBuffer = {}; - - IoHashStream RangeId; - for (const std::pair<uint64_t, uint64_t>& Range : BlockOffsetAndLengths) - { - RangeId.Append(&Range.first, sizeof(uint64_t)); - RangeId.Append(&Range.second, sizeof(uint64_t)); - } - - BlockChunkPath = m_TempBlockFolderPath / fmt::format("{}_{}", BlockDescription.BlockHash, RangeId.GetHash()); - RenameFile(TempBlobPath, BlockChunkPath, Ec); - if (Ec) - { - BlockChunkPath = std::filesystem::path{}; - - // Re-open the temp file again - BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete); - BlockRangeBuffer = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, BlockRangeBufferSize, true); - BlockRangeBuffer.SetDeleteOnClose(true); - } - } - } + RangeId.Append(&Range.first, sizeof(uint64_t)); + RangeId.Append(&Range.second, sizeof(uint64_t)); } + std::filesystem::path BlockChunkPath = + TryMoveDownloadedChunk(BlockRangeBuffer, + m_TempBlockFolderPath / fmt::format("{}_{}", BlockDescription.BlockHash, RangeId.GetHash()), + /* ForceDiskBased */ BlockRangeBufferSize > m_Options.MaximumInMemoryPayloadSize); - if (BlockChunkPath.empty() && (BlockRangeBufferSize > m_Options.MaximumInMemoryPayloadSize)) - { - ZEN_TRACE_CPU("WriteTempPartialBlock"); - - IoHashStream RangeId; - for (const std::pair<uint64_t, uint64_t>& Range : BlockOffsetAndLengths) - { - RangeId.Append(&Range.first, sizeof(uint64_t)); - RangeId.Append(&Range.second, sizeof(uint64_t)); - } - - // Could not be moved and rather large, lets store it on disk - BlockChunkPath = m_TempBlockFolderPath / fmt::format("{}_{}", BlockDescription.BlockHash, RangeId.GetHash()); - TemporaryFile::SafeWriteFile(BlockChunkPath, BlockRangeBuffer); - BlockRangeBuffer = {}; - } if (!m_AbortFlag) { OnDownloaded(std::move(BlockRangeBuffer), std::move(BlockChunkPath), BlockRangeStartIndex, BlockOffsetAndLengths); @@ -3570,16 +3461,36 @@ BuildsOperationUpdateFolder::DownloadPartialBlock( // Jupiter will ignore the ranges and send the whole payload if it fetches the payload from S3 // Upload to cache (if enabled) and use the whole payload for the remaining ranges - if (m_Storage.CacheStorage && m_Options.PopulateCache) + const uint64_t Size = RangeBuffers.PayloadBuffer.GetSize(); + + const bool PopulateCache = !ExistsInCache && m_Storage.CacheStorage && m_Options.PopulateCache; + + std::filesystem::path BlockPath = + TryMoveDownloadedChunk(RangeBuffers.PayloadBuffer, + m_TempBlockFolderPath / BlockDescription.BlockHash.ToHexString(), + /* ForceDiskBased */ PopulateCache || Size > m_Options.MaximumInMemoryPayloadSize); + if (!BlockPath.empty()) + { + RangeBuffers.PayloadBuffer = IoBufferBuilder::MakeFromFile(BlockPath); + if (!RangeBuffers.PayloadBuffer) + { + throw std::runtime_error( + fmt::format("Failed to read block {} from temporary path '{}'", BlockDescription.BlockHash, BlockPath)); + } + RangeBuffers.PayloadBuffer.SetDeleteOnClose(true); + } + + if (PopulateCache) { m_Storage.CacheStorage->PutBuildBlob(m_BuildId, BlockDescription.BlockHash, ZenContentType::kCompressedBinary, - CompositeBuffer(std::vector<IoBuffer>{RangeBuffers.PayloadBuffer})); - if (m_AbortFlag) - { - break; - } + CompositeBuffer(SharedBuffer(RangeBuffers.PayloadBuffer))); + } + + if (m_AbortFlag) + { + break; } SubRangeCount = Ranges.size() - SubRangeCountComplete; @@ -4331,6 +4242,7 @@ BuildsOperationUpdateFolder::WritePartialBlockChunksToCache(const ChunkBlockDesc void BuildsOperationUpdateFolder::AsyncWriteDownloadedChunk(const std::filesystem::path& ZenFolderPath, uint32_t RemoteChunkIndex, + const BlobsExistsResult& ExistsResult, std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>&& ChunkTargetPtrs, BufferedWriteFileCache& WriteCache, ParallelWork& Work, @@ -4338,8 +4250,7 @@ BuildsOperationUpdateFolder::AsyncWriteDownloadedChunk(const std::filesystem::pa std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters, std::atomic<uint64_t>& WritePartsComplete, const uint64_t TotalPartWriteCount, - FilteredRate& FilteredWrittenBytesPerSecond, - bool EnableBacklog) + FilteredRate& FilteredWrittenBytesPerSecond) { ZEN_TRACE_CPU("AsyncWriteDownloadedChunk"); @@ -4347,43 +4258,28 @@ BuildsOperationUpdateFolder::AsyncWriteDownloadedChunk(const std::filesystem::pa const uint64_t Size = Payload.GetSize(); - std::filesystem::path CompressedChunkPath; + const bool ExistsInCache = m_Storage.CacheStorage && ExistsResult.ExistingBlobs.contains(ChunkHash); + + const bool PopulateCache = !ExistsInCache && m_Storage.CacheStorage && m_Options.PopulateCache; - // Check if the dowloaded chunk is file based and we can move it directly without rewriting it + std::filesystem::path CompressedChunkPath = + TryMoveDownloadedChunk(Payload, + m_TempDownloadFolderPath / ChunkHash.ToHexString(), + /* ForceDiskBased */ PopulateCache || Size > m_Options.MaximumInMemoryPayloadSize); + if (PopulateCache) { - IoBufferFileReference FileRef; - if (Payload.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) && (FileRef.FileChunkSize == Size)) + IoBuffer CacheBlob = IoBufferBuilder::MakeFromFile(CompressedChunkPath); + if (CacheBlob) { - ZEN_TRACE_CPU("MoveTempChunk"); - std::error_code Ec; - std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec); - if (!Ec) - { - Payload.SetDeleteOnClose(false); - Payload = {}; - CompressedChunkPath = m_TempDownloadFolderPath / ChunkHash.ToHexString(); - RenameFile(TempBlobPath, CompressedChunkPath, Ec); - if (Ec) - { - CompressedChunkPath = std::filesystem::path{}; - - // Re-open the temp file again - BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete); - Payload = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, Size, true); - Payload.SetDeleteOnClose(true); - } - } + m_Storage.CacheStorage->PutBuildBlob(m_BuildId, + ChunkHash, + ZenContentType::kCompressedBinary, + CompositeBuffer(SharedBuffer(CacheBlob))); } } - if (CompressedChunkPath.empty() && (Size > m_Options.MaximumInMemoryPayloadSize)) - { - ZEN_TRACE_CPU("WriteTempChunk"); - // Could not be moved and rather large, lets store it on disk - CompressedChunkPath = m_TempDownloadFolderPath / ChunkHash.ToHexString(); - TemporaryFile::SafeWriteFile(CompressedChunkPath, Payload); - Payload = {}; - } + IoBufferFileReference FileRef; + bool EnableBacklog = !CompressedChunkPath.empty() || Payload.GetFileReference(FileRef); Work.ScheduleWork( m_IOWorkerPool, diff --git a/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h b/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h index 27dc9de86..b35231776 100644 --- a/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h +++ b/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h @@ -128,7 +128,6 @@ public: std::uint64_t PreferredMultipartChunkSize = 32u * 1024u * 1024u; EPartialBlockRequestMode PartialBlockRequestMode = EPartialBlockRequestMode::Mixed; bool WipeTargetFolder = false; - bool PrimeCacheOnly = false; bool EnableOtherDownloadsScavenging = true; bool EnableTargetFolderScavenging = true; bool ValidateCompletedSequences = true; @@ -329,6 +328,7 @@ private: void AsyncWriteDownloadedChunk(const std::filesystem::path& ZenFolderPath, uint32_t RemoteChunkIndex, + const BlobsExistsResult& ExistsResult, std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>&& ChunkTargetPtrs, BufferedWriteFileCache& WriteCache, ParallelWork& Work, @@ -336,8 +336,7 @@ private: std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters, std::atomic<uint64_t>& WritePartsComplete, const uint64_t TotalPartWriteCount, - FilteredRate& FilteredWrittenBytesPerSecond, - bool EnableBacklog); + FilteredRate& FilteredWrittenBytesPerSecond); void VerifyAndCompleteChunkSequencesAsync(std::span<const uint32_t> RemoteSequenceIndexes, ParallelWork& Work); bool CompleteSequenceChunk(uint32_t RemoteSequenceIndex, std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters); |