From eb3079e2ec2969829cbc5b6921575d53df351f0f Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Tue, 24 Feb 2026 16:10:36 +0100 Subject: use partial blocks for oplog import (#780) Feature: Add --allow-partial-block-requests to zen oplog-import Improvement: zen oplog-import now uses partial block requests to reduce download size Improvement: Use latency to Cloud Storage host and Zen Cache host when calculating partial block requests --- CHANGELOG.md | 3 + src/zen/cmds/builds_cmd.cpp | 28 +- src/zen/cmds/projectstore_cmd.cpp | 28 +- src/zen/cmds/projectstore_cmd.h | 2 + src/zenhttp/httpclient.cpp | 38 + src/zenhttp/include/zenhttp/httpclient.h | 9 + src/zenremotestore/builds/buildstoragecache.cpp | 8 +- .../builds/buildstorageoperations.cpp | 45 +- src/zenremotestore/builds/buildstorageutil.cpp | 19 +- src/zenremotestore/chunking/chunkblock.cpp | 79 +- .../zenremotestore/builds/buildstoragecache.h | 1 + .../zenremotestore/builds/buildstorageoperations.h | 12 +- .../zenremotestore/builds/buildstorageutil.h | 4 + .../include/zenremotestore/chunking/chunkblock.h | 25 +- .../include/zenremotestore/jupiter/jupiterhost.h | 1 + .../include/zenremotestore/operationlogoutput.h | 5 +- .../zenremotestore/partialblockrequestmode.h | 20 + .../projectstore/buildsremoteprojectstore.h | 4 +- .../projectstore/remoteprojectstore.h | 67 +- src/zenremotestore/jupiter/jupiterhost.cpp | 8 +- src/zenremotestore/operationlogoutput.cpp | 2 +- src/zenremotestore/partialblockrequestmode.cpp | 27 + .../projectstore/buildsremoteprojectstore.cpp | 122 ++- .../projectstore/fileremoteprojectstore.cpp | 24 +- .../projectstore/jupiterremoteprojectstore.cpp | 19 +- .../projectstore/remoteprojectstore.cpp | 946 ++++++++++++++++----- .../projectstore/zenremoteprojectstore.cpp | 29 +- .../storage/projectstore/httpprojectstore.cpp | 33 +- 28 files changed, 1246 insertions(+), 362 deletions(-) create mode 100644 src/zenremotestore/include/zenremotestore/partialblockrequestmode.h create mode 100644 src/zenremotestore/partialblockrequestmode.cpp diff --git a/CHANGELOG.md b/CHANGELOG.md index 3670e451e..e9d3b79c8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,8 @@ ## +- Feature: Add `--allow-partial-block-requests` to `zen oplog-import` - Feature: `zen ui` can be used to open dashboards for local instances +- Improvement: `zen oplog-import` now uses partial block requests to reduce download size +- Improvement: Use latency to Cloud Storage host and Zen Cache host when calculating partial block requests - Bugfix: `--plain-progress` style progress bar should now show elapsed time correctly - Bugfix: Time spent indexing local and remote state during `zen builds download` now show the correct time diff --git a/src/zen/cmds/builds_cmd.cpp b/src/zen/cmds/builds_cmd.cpp index 849259013..5254ef3cf 100644 --- a/src/zen/cmds/builds_cmd.cpp +++ b/src/zen/cmds/builds_cmd.cpp @@ -2842,13 +2842,16 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) TempPath / "storage"); Result.StorageName = ResolveRes.HostName; - StorageDescription = fmt::format("Cloud {}{}. SessionId: '{}'. Namespace '{}', Bucket '{}'", - ResolveRes.HostName, - (ResolveRes.HostUrl == ResolveRes.HostName) ? "" : fmt::format(" {}", ResolveRes.HostUrl), - Result.BuildStorageHttp->GetSessionId(), - m_Namespace, - m_Bucket); - ; + uint64_t HostLatencyNs = ResolveRes.HostLatencySec >= 0 ? uint64_t(ResolveRes.HostLatencySec * 1000000000.0) : 0; + + StorageDescription = fmt::format("Cloud {}{}. SessionId: '{}'. Namespace '{}', Bucket '{}'. Latency: {}", + ResolveRes.HostName, + (ResolveRes.HostUrl == ResolveRes.HostName) ? "" : fmt::format(" {}", ResolveRes.HostUrl), + Result.BuildStorageHttp->GetSessionId(), + m_Namespace, + m_Bucket, + NiceLatencyNs(HostLatencyNs)); + Result.BuildStorageLatencySec = ResolveRes.HostLatencySec; if (!ResolveRes.CacheUrl.empty()) { @@ -2874,12 +2877,17 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) : GetTinyWorkerPool(EWorkloadType::Background)); Result.CacheName = ResolveRes.CacheName; + uint64_t CacheLatencyNs = ResolveRes.CacheLatencySec >= 0 ? uint64_t(ResolveRes.CacheLatencySec * 1000000000.0) : 0; + CacheDescription = - fmt::format("Zen {}{}. SessionId: '{}'", + fmt::format("Zen {}{}. SessionId: '{}'. Latency: {}", ResolveRes.CacheName, (ResolveRes.CacheUrl == ResolveRes.CacheName) ? "" : fmt::format(" {}", ResolveRes.CacheUrl), - Result.CacheHttp->GetSessionId()); - ; + Result.CacheHttp->GetSessionId(), + NiceLatencyNs(CacheLatencyNs)); + + Result.CacheLatencySec = ResolveRes.CacheLatencySec; + if (!m_Namespace.empty()) { CacheDescription += fmt::format(". Namespace '{}'", m_Namespace); diff --git a/src/zen/cmds/projectstore_cmd.cpp b/src/zen/cmds/projectstore_cmd.cpp index 4de6ad25c..bedab3cfd 100644 --- a/src/zen/cmds/projectstore_cmd.cpp +++ b/src/zen/cmds/projectstore_cmd.cpp @@ -1469,6 +1469,20 @@ ImportOplogCommand::ImportOplogCommand() "Enables both 'boost-worker-count' and 'boost-worker-memory' - may cause computer to be less responsive", cxxopts::value(m_BoostWorkers), ""); + m_Options.add_option( + "", + "", + "allow-partial-block-requests", + "Allow request for partial chunk blocks.\n" + " false = only full block requests allowed\n" + " mixed = multiple partial block ranges requests per block allowed to zen cache, single partial block range " + "request per block to host\n" + " zencacheonly = multiple partial block ranges requests per block allowed to zen cache, only full block requests " + "allowed to host\n" + " true = multiple partial block ranges requests per block allowed to zen cache and host\n" + "Defaults to 'mixed'.", + cxxopts::value(m_AllowPartialBlockRequests), + ""); m_Options.parse_positional({"project", "oplog", "gcpath"}); m_Options.positional_help("[ []]"); @@ -1513,6 +1527,13 @@ ImportOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg throw OptionParseException("'--oplog' is required", m_Options.help()); } + EPartialBlockRequestMode Mode = PartialBlockRequestModeFromString(m_AllowPartialBlockRequests); + if (Mode == EPartialBlockRequestMode::Invalid) + { + throw OptionParseException(fmt::format("'--allow-partial-block-requests' ('{}') is invalid", m_AllowPartialBlockRequests), + m_Options.help()); + } + HttpClient Http(m_HostName); m_ProjectName = ResolveProject(Http, m_ProjectName); if (m_ProjectName.empty()) @@ -1649,6 +1670,9 @@ ImportOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg { Writer.AddBool("boostworkermemory"sv, true); } + + Writer.AddString("partialblockrequestmode", m_AllowPartialBlockRequests); + if (!m_FileDirectoryPath.empty()) { Writer.BeginObject("file"sv); @@ -2571,6 +2595,7 @@ OplogDownloadCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** a ClientSettings.AssumeHttp2 = ResolveRes.HostAssumeHttp2; ClientSettings.MaximumInMemoryDownloadSize = m_BoostWorkerMemory ? RemoteStoreOptions::DefaultMaxBlockSize : 1024u * 1024u; Storage.BuildStorageHttp = std::make_unique(ResolveRes.HostUrl, ClientSettings); + Storage.BuildStorageLatencySec = ResolveRes.HostLatencySec; BuildStorageCache::Statistics StorageCacheStats; @@ -2589,7 +2614,8 @@ OplogDownloadCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** a .RetryCount = 0, .MaximumInMemoryDownloadSize = m_BoostWorkerMemory ? RemoteStoreOptions::DefaultMaxBlockSize : 1024u * 1024u}, [&AbortFlag]() { return AbortFlag.load(); }); - Storage.CacheName = ResolveRes.CacheName; + Storage.CacheName = ResolveRes.CacheName; + Storage.CacheLatencySec = ResolveRes.CacheLatencySec; } if (!m_Quiet) diff --git a/src/zen/cmds/projectstore_cmd.h b/src/zen/cmds/projectstore_cmd.h index e415b41b7..17fd76e9f 100644 --- a/src/zen/cmds/projectstore_cmd.h +++ b/src/zen/cmds/projectstore_cmd.h @@ -209,6 +209,8 @@ private: bool m_BoostWorkerCount = false; bool m_BoostWorkerMemory = false; bool m_BoostWorkers = false; + + std::string m_AllowPartialBlockRequests = "mixed"; }; class SnapshotOplogCommand : public ProjectStoreCommand diff --git a/src/zenhttp/httpclient.cpp b/src/zenhttp/httpclient.cpp index d3b59df2b..078e27b34 100644 --- a/src/zenhttp/httpclient.cpp +++ b/src/zenhttp/httpclient.cpp @@ -21,6 +21,8 @@ #include "clients/httpclientcommon.h" +#include + #if ZEN_WITH_TESTS # include # include @@ -340,6 +342,42 @@ HttpClient::Authenticate() return m_Inner->Authenticate(); } +LatencyTestResult +MeasureLatency(HttpClient& Client, std::string_view Url) +{ + std::vector MeasurementTimes; + std::string ErrorMessage; + + for (uint32_t AttemptCount = 0; AttemptCount < 20 && MeasurementTimes.size() < 5; AttemptCount++) + { + HttpClient::Response MeasureResponse = Client.Get(Url); + if (MeasureResponse.IsSuccess()) + { + MeasurementTimes.push_back(MeasureResponse.ElapsedSeconds); + Sleep(5); + } + else + { + ErrorMessage = MeasureResponse.ErrorMessage(fmt::format("Unable to measure latency using {}", Url)); + } + } + + if (MeasurementTimes.empty()) + { + return {.Success = false, .FailureReason = ErrorMessage}; + } + + if (MeasurementTimes.size() > 2) + { + std::sort(MeasurementTimes.begin(), MeasurementTimes.end()); + MeasurementTimes.pop_back(); // Remove the worst time + } + + double AverageLatency = std::accumulate(MeasurementTimes.begin(), MeasurementTimes.end(), 0.0) / MeasurementTimes.size(); + + return {.Success = true, .LatencySeconds = AverageLatency}; +} + ////////////////////////////////////////////////////////////////////////// #if ZEN_WITH_TESTS diff --git a/src/zenhttp/include/zenhttp/httpclient.h b/src/zenhttp/include/zenhttp/httpclient.h index 9a9b74d72..7a129a98c 100644 --- a/src/zenhttp/include/zenhttp/httpclient.h +++ b/src/zenhttp/include/zenhttp/httpclient.h @@ -260,6 +260,15 @@ private: const HttpClientSettings m_ConnectionSettings; }; +struct LatencyTestResult +{ + bool Success = false; + std::string FailureReason; + double LatencySeconds = -1.0; +}; + +LatencyTestResult MeasureLatency(HttpClient& Client, std::string_view Url); + void httpclient_forcelink(); // internal } // namespace zen diff --git a/src/zenremotestore/builds/buildstoragecache.cpp b/src/zenremotestore/builds/buildstoragecache.cpp index 07fcd62ba..faa85f81b 100644 --- a/src/zenremotestore/builds/buildstoragecache.cpp +++ b/src/zenremotestore/builds/buildstoragecache.cpp @@ -474,7 +474,13 @@ TestZenCacheEndpoint(std::string_view BaseUrl, const bool AssumeHttp2, const boo HttpClient::Response TestResponse = TestHttpClient.Get("/status/builds"); if (TestResponse.IsSuccess()) { - return {.Success = true}; + LatencyTestResult LatencyResult = MeasureLatency(TestHttpClient, "/health"); + + if (!LatencyResult.Success) + { + return {.Success = false, .FailureReason = LatencyResult.FailureReason}; + } + return {.Success = true, .LatencySeconds = LatencyResult.LatencySeconds}; } return {.Success = false, .FailureReason = TestResponse.ErrorMessage("")}; }; diff --git a/src/zenremotestore/builds/buildstorageoperations.cpp b/src/zenremotestore/builds/buildstorageoperations.cpp index 4f1b07c37..5219e86d8 100644 --- a/src/zenremotestore/builds/buildstorageoperations.cpp +++ b/src/zenremotestore/builds/buildstorageoperations.cpp @@ -484,24 +484,6 @@ private: uint64_t FilteredPerSecond = 0; }; -EPartialBlockRequestMode -PartialBlockRequestModeFromString(const std::string_view ModeString) -{ - switch (HashStringAsLowerDjb2(ModeString)) - { - case HashStringDjb2("false"): - return EPartialBlockRequestMode::Off; - case HashStringDjb2("zencacheonly"): - return EPartialBlockRequestMode::ZenCacheOnly; - case HashStringDjb2("mixed"): - return EPartialBlockRequestMode::Mixed; - case HashStringDjb2("true"): - return EPartialBlockRequestMode::All; - default: - return EPartialBlockRequestMode::Invalid; - } -} - std::filesystem::path ZenStateFilePath(const std::filesystem::path& ZenFolderPath) { @@ -903,7 +885,10 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) { ChunkBlockAnalyser BlockAnalyser(m_LogOutput, m_BlockDescriptions, - ChunkBlockAnalyser::Options{.IsQuiet = m_Options.IsQuiet, .IsVerbose = m_Options.IsVerbose}); + ChunkBlockAnalyser::Options{.IsQuiet = m_Options.IsQuiet, + .IsVerbose = m_Options.IsVerbose, + .HostLatencySec = m_Storage.BuildStorageLatencySec, + .HostHighSpeedLatencySec = m_Storage.CacheLatencySec}); std::vector NeededBlocks = BlockAnalyser.GetNeeded( m_RemoteLookup.ChunkHashToChunkIndex, @@ -1034,25 +1019,29 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) { BlockPartialDownloadModes.resize(m_BlockDescriptions.size(), ChunkBlockAnalyser::EPartialBlockDownloadMode::Off); } - else if (m_Options.PartialBlockRequestMode == EPartialBlockRequestMode::All) - { - BlockPartialDownloadModes.resize(m_BlockDescriptions.size(), ChunkBlockAnalyser::EPartialBlockDownloadMode::On); - } else { BlockPartialDownloadModes.reserve(m_BlockDescriptions.size()); for (uint32_t BlockIndex = 0; BlockIndex < m_BlockDescriptions.size(); BlockIndex++) { const bool BlockExistInCache = ExistsResult.ExistingBlobs.contains(m_BlockDescriptions[BlockIndex].BlockHash); - if (m_Options.PartialBlockRequestMode == EPartialBlockRequestMode::ZenCacheOnly) + if (m_Options.PartialBlockRequestMode == EPartialBlockRequestMode::All) + { + BlockPartialDownloadModes.push_back(BlockExistInCache + ? ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRangeHighSpeed + : ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRange); + } + else if (m_Options.PartialBlockRequestMode == EPartialBlockRequestMode::ZenCacheOnly) { - BlockPartialDownloadModes.push_back(BlockExistInCache ? ChunkBlockAnalyser::EPartialBlockDownloadMode::On - : ChunkBlockAnalyser::EPartialBlockDownloadMode::Off); + BlockPartialDownloadModes.push_back(BlockExistInCache + ? ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRangeHighSpeed + : ChunkBlockAnalyser::EPartialBlockDownloadMode::Off); } else if (m_Options.PartialBlockRequestMode == EPartialBlockRequestMode::Mixed) { - BlockPartialDownloadModes.push_back(BlockExistInCache ? ChunkBlockAnalyser::EPartialBlockDownloadMode::On - : ChunkBlockAnalyser::EPartialBlockDownloadMode::SingleRange); + BlockPartialDownloadModes.push_back(BlockExistInCache + ? ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRangeHighSpeed + : ChunkBlockAnalyser::EPartialBlockDownloadMode::SingleRange); } } } diff --git a/src/zenremotestore/builds/buildstorageutil.cpp b/src/zenremotestore/builds/buildstorageutil.cpp index 36b45e800..b249d7d52 100644 --- a/src/zenremotestore/builds/buildstorageutil.cpp +++ b/src/zenremotestore/builds/buildstorageutil.cpp @@ -63,11 +63,13 @@ ResolveBuildStorage(OperationLogOutput& Output, std::string HostUrl; std::string HostName; + double HostLatencySec = -1.0; std::string CacheUrl; std::string CacheName; bool HostAssumeHttp2 = ClientSettings.AssumeHttp2; bool CacheAssumeHttp2 = ClientSettings.AssumeHttp2; + double CacheLatencySec = -1.0; JupiterServerDiscovery DiscoveryResponse; const std::string_view DiscoveryHost = Host.empty() ? OverrideHost : Host; @@ -98,8 +100,9 @@ ResolveBuildStorage(OperationLogOutput& Output, { ZEN_OPERATION_LOG_INFO(Output, "Server endpoint at '{}/api/v1/status/servers' succeeded", OverrideHost); } - HostUrl = OverrideHost; - HostName = GetHostNameFromUrl(OverrideHost); + HostUrl = OverrideHost; + HostName = GetHostNameFromUrl(OverrideHost); + HostLatencySec = TestResult.LatencySeconds; } else { @@ -137,6 +140,7 @@ ResolveBuildStorage(OperationLogOutput& Output, HostUrl = ServerEndpoint.BaseUrl; HostAssumeHttp2 = ServerEndpoint.AssumeHttp2; HostName = ServerEndpoint.Name; + HostLatencySec = TestResult.LatencySeconds; break; } else @@ -183,6 +187,7 @@ ResolveBuildStorage(OperationLogOutput& Output, CacheUrl = CacheEndpoint.BaseUrl; CacheAssumeHttp2 = CacheEndpoint.AssumeHttp2; CacheName = CacheEndpoint.Name; + CacheLatencySec = TestResult.LatencySeconds; break; } } @@ -204,6 +209,7 @@ ResolveBuildStorage(OperationLogOutput& Output, CacheUrl = ZenServerLocalHostUrl; CacheAssumeHttp2 = false; CacheName = "localhost"; + CacheLatencySec = TestResult.LatencySeconds; } } }); @@ -219,8 +225,9 @@ ResolveBuildStorage(OperationLogOutput& Output, if (ZenCacheEndpointTestResult TestResult = TestZenCacheEndpoint(ZenCacheHost, /*AssumeHttp2*/ false, ClientSettings.Verbose); TestResult.Success) { - CacheUrl = ZenCacheHost; - CacheName = GetHostNameFromUrl(ZenCacheHost); + CacheUrl = ZenCacheHost; + CacheName = GetHostNameFromUrl(ZenCacheHost); + CacheLatencySec = TestResult.LatencySeconds; } else { @@ -231,10 +238,12 @@ ResolveBuildStorage(OperationLogOutput& Output, return BuildStorageResolveResult{.HostUrl = HostUrl, .HostName = HostName, .HostAssumeHttp2 = HostAssumeHttp2, + .HostLatencySec = HostLatencySec, .CacheUrl = CacheUrl, .CacheName = CacheName, - .CacheAssumeHttp2 = CacheAssumeHttp2}; + .CacheAssumeHttp2 = CacheAssumeHttp2, + .CacheLatencySec = CacheLatencySec}; } std::vector diff --git a/src/zenremotestore/chunking/chunkblock.cpp b/src/zenremotestore/chunking/chunkblock.cpp index 06cedae3f..d203e0292 100644 --- a/src/zenremotestore/chunking/chunkblock.cpp +++ b/src/zenremotestore/chunking/chunkblock.cpp @@ -597,7 +597,7 @@ ChunkBlockAnalyser::CalculatePartialBlockDownloads(std::span if (MaybeBlockRanges.has_value()) { - const std::vector& BlockRanges = MaybeBlockRanges.value(); + std::vector BlockRanges = MaybeBlockRanges.value(); ZEN_ASSERT(!BlockRanges.empty()); uint64_t RequestedSize = @@ -606,12 +606,54 @@ ChunkBlockAnalyser::CalculatePartialBlockDownloads(std::span uint64_t(0), [](uint64_t Current, const BlockRangeDescriptor& Range) { return Current + Range.RangeLength; }); - if ((PartialBlockDownloadMode != EPartialBlockDownloadMode::Exact) && ((RequestedSize * 100) / TotalBlockSize) >= 200) + if (PartialBlockDownloadMode != EPartialBlockDownloadMode::Exact && BlockRanges.size() > 1) + { + // TODO: Once we have support in our http client to request multiple ranges in one request this + // logic would need to change as the per-request overhead would go away + + const double LatencySec = PartialBlockDownloadMode == EPartialBlockDownloadMode::MultiRangeHighSpeed + ? m_Options.HostHighSpeedLatencySec + : m_Options.HostLatencySec; + if (LatencySec > 0) + { + const uint64_t BytesPerSec = PartialBlockDownloadMode == EPartialBlockDownloadMode::MultiRangeHighSpeed + ? m_Options.HostHighSpeedBytesPerSec + : m_Options.HostSpeedBytesPerSec; + + const double ExtraRequestTimeSec = (BlockRanges.size() - 1) * LatencySec; + const uint64_t ExtraRequestTimeBytes = uint64_t(ExtraRequestTimeSec * BytesPerSec); + + const uint64_t FullRangeSize = + BlockRanges.back().RangeStart + BlockRanges.back().RangeLength - BlockRanges.front().RangeStart; + + if (ExtraRequestTimeBytes + RequestedSize >= FullRangeSize) + { + BlockRanges = std::vector{MergeBlockRanges(BlockRanges)}; + + if (m_Options.IsVerbose) + { + ZEN_OPERATION_LOG_INFO(m_LogOutput, + "Merging {} chunks ({}) from block {} ({}) to single request (extra bytes {})", + NeededBlock.ChunkIndexes.size(), + NiceBytes(RequestedSize), + BlockDescription.BlockHash, + NiceBytes(TotalBlockSize), + NiceBytes(BlockRanges.front().RangeLength - RequestedSize)); + } + + RequestedSize = BlockRanges.front().RangeLength; + } + } + } + + if ((PartialBlockDownloadMode != EPartialBlockDownloadMode::Exact) && + ((TotalBlockSize - RequestedSize) < (512u * 1024u))) { if (m_Options.IsVerbose) { ZEN_OPERATION_LOG_INFO(m_LogOutput, - "Requesting {} chunks ({}) from block {} ({}) using full block request (extra bytes {})", + "Requesting {} chunks ({}) from block {} ({}) using full block request due to small " + "total slack (extra bytes {})", NeededBlock.ChunkIndexes.size(), NiceBytes(RequestedSize), BlockDescription.BlockHash, @@ -624,19 +666,16 @@ ChunkBlockAnalyser::CalculatePartialBlockDownloads(std::span { Result.BlockRanges.insert(Result.BlockRanges.end(), BlockRanges.begin(), BlockRanges.end()); - if (RequestedSize > TotalWantedChunksSize) + if (m_Options.IsVerbose) { - if (m_Options.IsVerbose) - { - ZEN_OPERATION_LOG_INFO(m_LogOutput, - "Requesting {} chunks ({}) from block {} ({}) using {} requests (extra bytes {})", - NeededBlock.ChunkIndexes.size(), - NiceBytes(RequestedSize), - BlockDescription.BlockHash, - NiceBytes(TotalBlockSize), - BlockRanges.size(), - NiceBytes(RequestedSize - TotalWantedChunksSize)); - } + ZEN_OPERATION_LOG_INFO(m_LogOutput, + "Requesting {} chunks ({}) from block {} ({}) using {} requests (extra bytes {})", + NeededBlock.ChunkIndexes.size(), + NiceBytes(RequestedSize), + BlockDescription.BlockHash, + NiceBytes(TotalBlockSize), + BlockRanges.size(), + NiceBytes(RequestedSize - TotalWantedChunksSize)); } } } @@ -786,7 +825,7 @@ ChunkBlockAnalyser::CollapseBlockRanges(const uint64_t AlwaysAcceptableGap, std: }; uint64_t -ChunkBlockAnalyser::CalculateNextGap(std::span BlockRanges) +ChunkBlockAnalyser::CalculateNextGap(const uint64_t AlwaysAcceptableGap, std::span BlockRanges) { ZEN_ASSERT(BlockRanges.size() > 1); uint64_t AcceptableGap = (uint64_t)-1; @@ -798,7 +837,7 @@ ChunkBlockAnalyser::CalculateNextGap(std::span Block const uint64_t Gap = NextRange.RangeStart - (Range.RangeStart + Range.RangeLength); AcceptableGap = Min(Gap, AcceptableGap); } - AcceptableGap = RoundUp(AcceptableGap, 16u * 1024u); + AcceptableGap = RoundUp(AcceptableGap, AlwaysAcceptableGap); return AcceptableGap; }; @@ -949,10 +988,12 @@ ChunkBlockAnalyser::CalculateBlockRanges(uint32_t BlockIndex, return MakeOptionalBlockRangeVector(TotalBlockSize, MergedRange); } - std::vector CollapsedBlockRanges = CollapseBlockRanges(16u * 1024u, BlockRanges); + const uint64_t AlwaysAcceptableGap = 4u * 1024u; + + std::vector CollapsedBlockRanges = CollapseBlockRanges(AlwaysAcceptableGap, BlockRanges); while (GetBlockRangeLimitForRange(ForceMergeLimits, TotalBlockSize, CollapsedBlockRanges)) { - CollapsedBlockRanges = CollapseBlockRanges(CalculateNextGap(CollapsedBlockRanges), CollapsedBlockRanges); + CollapsedBlockRanges = CollapseBlockRanges(CalculateNextGap(AlwaysAcceptableGap, CollapsedBlockRanges), CollapsedBlockRanges); } const std::uint64_t WantedCollapsedSize = diff --git a/src/zenremotestore/include/zenremotestore/builds/buildstoragecache.h b/src/zenremotestore/include/zenremotestore/builds/buildstoragecache.h index bb5b1c5f4..f25ce5b5e 100644 --- a/src/zenremotestore/include/zenremotestore/builds/buildstoragecache.h +++ b/src/zenremotestore/include/zenremotestore/builds/buildstoragecache.h @@ -65,6 +65,7 @@ struct ZenCacheEndpointTestResult { bool Success = false; std::string FailureReason; + double LatencySeconds = -1.0; }; ZenCacheEndpointTestResult TestZenCacheEndpoint(std::string_view BaseUrl, const bool AssumeHttp2, const bool HttpVerbose); diff --git a/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h b/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h index 6800444e0..31733569e 100644 --- a/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h +++ b/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h @@ -9,6 +9,7 @@ #include #include #include +#include #include #include @@ -109,17 +110,6 @@ struct RebuildFolderStateStatistics uint64_t FinalizeTreeElapsedWallTimeUs = 0; }; -enum EPartialBlockRequestMode -{ - Off, - ZenCacheOnly, - Mixed, - All, - Invalid -}; - -EPartialBlockRequestMode PartialBlockRequestModeFromString(const std::string_view ModeString); - std::filesystem::path ZenStateFilePath(const std::filesystem::path& ZenFolderPath); std::filesystem::path ZenTempFolderPath(const std::filesystem::path& ZenFolderPath); diff --git a/src/zenremotestore/include/zenremotestore/builds/buildstorageutil.h b/src/zenremotestore/include/zenremotestore/builds/buildstorageutil.h index ab3037c89..4b85d8f1e 100644 --- a/src/zenremotestore/include/zenremotestore/builds/buildstorageutil.h +++ b/src/zenremotestore/include/zenremotestore/builds/buildstorageutil.h @@ -17,10 +17,12 @@ struct BuildStorageResolveResult std::string HostUrl; std::string HostName; bool HostAssumeHttp2 = false; + double HostLatencySec = -1.0; std::string CacheUrl; std::string CacheName; bool CacheAssumeHttp2 = false; + double CacheLatencySec = -1.0; }; enum class ZenCacheResolveMode @@ -54,9 +56,11 @@ struct StorageInstance std::unique_ptr BuildStorageHttp; std::unique_ptr BuildStorage; std::string StorageName; + double BuildStorageLatencySec = -1.0; std::unique_ptr CacheHttp; std::unique_ptr BuildCacheStorage; std::string CacheName; + double CacheLatencySec = -1.0; }; } // namespace zen diff --git a/src/zenremotestore/include/zenremotestore/chunking/chunkblock.h b/src/zenremotestore/include/zenremotestore/chunking/chunkblock.h index 57710fcf5..5a17ef79c 100644 --- a/src/zenremotestore/include/zenremotestore/chunking/chunkblock.h +++ b/src/zenremotestore/include/zenremotestore/chunking/chunkblock.h @@ -82,8 +82,12 @@ class ChunkBlockAnalyser public: struct Options { - bool IsQuiet = false; - bool IsVerbose = false; + bool IsQuiet = false; + bool IsVerbose = false; + double HostLatencySec = -1.0; + double HostHighSpeedLatencySec = -1.0; + uint64_t HostSpeedBytesPerSec = (1u * 1024u * 1024u * 1024u) / 8u; // 1GBit + uint64_t HostHighSpeedBytesPerSec = (2u * 1024u * 1024u * 1024u) / 8u; // 2GBit }; ChunkBlockAnalyser(OperationLogOutput& LogOutput, std::span BlockDescriptions, const Options& Options); @@ -110,7 +114,8 @@ public: { Off, SingleRange, - On, + MultiRange, + MultiRangeHighSpeed, Exact }; @@ -130,14 +135,14 @@ private: uint16_t MaxRangeCount; }; - static constexpr uint16_t FullBlockRangePercentLimit = 95; + static constexpr uint16_t FullBlockRangePercentLimit = 98; static constexpr BlockRangeLimit ForceMergeLimits[] = {{.SizePercent = FullBlockRangePercentLimit, .MaxRangeCount = 1}, - {.SizePercent = 90, .MaxRangeCount = 2}, - {.SizePercent = 85, .MaxRangeCount = 8}, - {.SizePercent = 80, .MaxRangeCount = 16}, - {.SizePercent = 75, .MaxRangeCount = 32}, - {.SizePercent = 70, .MaxRangeCount = 48}, + {.SizePercent = 90, .MaxRangeCount = 4}, + {.SizePercent = 85, .MaxRangeCount = 16}, + {.SizePercent = 80, .MaxRangeCount = 32}, + {.SizePercent = 75, .MaxRangeCount = 48}, + {.SizePercent = 70, .MaxRangeCount = 64}, {.SizePercent = 4, .MaxRangeCount = 82}, {.SizePercent = 0, .MaxRangeCount = 96}}; @@ -149,7 +154,7 @@ private: std::span Ranges); std::vector CollapseBlockRanges(const uint64_t AlwaysAcceptableGap, std::span BlockRanges); - uint64_t CalculateNextGap(std::span BlockRanges); + uint64_t CalculateNextGap(const uint64_t AlwaysAcceptableGap, std::span BlockRanges); std::optional> CalculateBlockRanges(uint32_t BlockIndex, const ChunkBlockDescription& BlockDescription, std::span BlockChunkIndexNeeded, diff --git a/src/zenremotestore/include/zenremotestore/jupiter/jupiterhost.h b/src/zenremotestore/include/zenremotestore/jupiter/jupiterhost.h index 432496bc1..7bbf40dfa 100644 --- a/src/zenremotestore/include/zenremotestore/jupiter/jupiterhost.h +++ b/src/zenremotestore/include/zenremotestore/jupiter/jupiterhost.h @@ -28,6 +28,7 @@ struct JupiterEndpointTestResult { bool Success = false; std::string FailureReason; + double LatencySeconds = -1.0; }; JupiterEndpointTestResult TestJupiterEndpoint(std::string_view BaseUrl, const bool AssumeHttp2, const bool HttpVerbose); diff --git a/src/zenremotestore/include/zenremotestore/operationlogoutput.h b/src/zenremotestore/include/zenremotestore/operationlogoutput.h index 9693e69cf..6f10ab156 100644 --- a/src/zenremotestore/include/zenremotestore/operationlogoutput.h +++ b/src/zenremotestore/include/zenremotestore/operationlogoutput.h @@ -3,6 +3,7 @@ #pragma once #include +#include namespace zen { @@ -57,9 +58,7 @@ public: virtual ProgressBar* CreateProgressBar(std::string_view InSubTask) = 0; }; -struct LoggerRef; - -OperationLogOutput* CreateStandardLogOutput(LoggerRef& Log); +OperationLogOutput* CreateStandardLogOutput(LoggerRef Log); #define ZEN_OPERATION_LOG(OutputTarget, InLevel, fmtstr, ...) \ do \ diff --git a/src/zenremotestore/include/zenremotestore/partialblockrequestmode.h b/src/zenremotestore/include/zenremotestore/partialblockrequestmode.h new file mode 100644 index 000000000..54adea2b2 --- /dev/null +++ b/src/zenremotestore/include/zenremotestore/partialblockrequestmode.h @@ -0,0 +1,20 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include + +namespace zen { + +enum EPartialBlockRequestMode +{ + Off, + ZenCacheOnly, + Mixed, + All, + Invalid +}; + +EPartialBlockRequestMode PartialBlockRequestModeFromString(const std::string_view ModeString); + +} // namespace zen diff --git a/src/zenremotestore/include/zenremotestore/projectstore/buildsremoteprojectstore.h b/src/zenremotestore/include/zenremotestore/projectstore/buildsremoteprojectstore.h index e8b7c15c0..66dfcc62d 100644 --- a/src/zenremotestore/include/zenremotestore/projectstore/buildsremoteprojectstore.h +++ b/src/zenremotestore/include/zenremotestore/projectstore/buildsremoteprojectstore.h @@ -34,6 +34,8 @@ std::shared_ptr CreateJupiterBuildsRemoteStore(LoggerRef bool Quiet, bool Unattended, bool Hidden, - WorkerThreadPool& CacheBackgroundWorkerPool); + WorkerThreadPool& CacheBackgroundWorkerPool, + double& OutHostLatencySec, + double& OutCacheLatencySec); } // namespace zen diff --git a/src/zenremotestore/include/zenremotestore/projectstore/remoteprojectstore.h b/src/zenremotestore/include/zenremotestore/projectstore/remoteprojectstore.h index 008f94351..152c02ee2 100644 --- a/src/zenremotestore/include/zenremotestore/projectstore/remoteprojectstore.h +++ b/src/zenremotestore/include/zenremotestore/projectstore/remoteprojectstore.h @@ -6,6 +6,7 @@ #include #include +#include #include @@ -73,6 +74,16 @@ public: std::vector Blocks; }; + struct GetBlockDescriptionsResult : public Result + { + std::vector Blocks; + }; + + struct AttachmentExistsInCacheResult : public Result + { + std::vector HasBody; + }; + struct RemoteStoreInfo { bool CreateBlocks; @@ -111,10 +122,20 @@ public: virtual FinalizeResult FinalizeContainer(const IoHash& RawHash) = 0; virtual SaveAttachmentsResult SaveAttachments(const std::vector& Payloads) = 0; - virtual LoadContainerResult LoadContainer() = 0; - virtual GetKnownBlocksResult GetKnownBlocks() = 0; - virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) = 0; - virtual LoadAttachmentsResult LoadAttachments(const std::vector& RawHashes) = 0; + virtual LoadContainerResult LoadContainer() = 0; + virtual GetKnownBlocksResult GetKnownBlocks() = 0; + virtual GetBlockDescriptionsResult GetBlockDescriptions(std::span BlockHashes) = 0; + virtual AttachmentExistsInCacheResult AttachmentExistsInCache(std::span RawHashes) = 0; + + struct AttachmentRange + { + uint64_t Offset = 0; + uint64_t Bytes = (uint64_t)-1; + + inline operator bool() const { return Offset != 0 || Bytes != (uint64_t)-1; } + }; + virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash, const AttachmentRange& Range) = 0; + virtual LoadAttachmentsResult LoadAttachments(const std::vector& RawHashes) = 0; virtual void Flush() = 0; }; @@ -153,14 +174,15 @@ RemoteProjectStore::LoadContainerResult BuildContainer( class JobContext; -RemoteProjectStore::Result SaveOplogContainer(ProjectStore::Oplog& Oplog, - const CbObject& ContainerObject, - const std::function RawHashes)>& OnReferencedAttachments, - const std::function& HasAttachment, - const std::function&& Chunks)>& OnNeedBlock, - const std::function& OnNeedAttachment, - const std::function& OnChunkedAttachment, - JobContext* OptionalContext); +RemoteProjectStore::Result SaveOplogContainer( + ProjectStore::Oplog& Oplog, + const CbObject& ContainerObject, + const std::function RawHashes)>& OnReferencedAttachments, + const std::function& HasAttachment, + const std::function&& NeededChunkIndexes)>& OnNeedBlock, + const std::function& OnNeedAttachment, + const std::function& OnChunkedAttachment, + JobContext* OptionalContext); RemoteProjectStore::Result SaveOplog(CidStore& ChunkStore, RemoteProjectStore& RemoteStore, @@ -177,15 +199,18 @@ RemoteProjectStore::Result SaveOplog(CidStore& ChunkStore, bool IgnoreMissingAttachments, JobContext* OptionalContext); -RemoteProjectStore::Result LoadOplog(CidStore& ChunkStore, - RemoteProjectStore& RemoteStore, - ProjectStore::Oplog& Oplog, - WorkerThreadPool& NetworkWorkerPool, - WorkerThreadPool& WorkerPool, - bool ForceDownload, - bool IgnoreMissingAttachments, - bool CleanOplog, - JobContext* OptionalContext); +RemoteProjectStore::Result LoadOplog(CidStore& ChunkStore, + RemoteProjectStore& RemoteStore, + ProjectStore::Oplog& Oplog, + WorkerThreadPool& NetworkWorkerPool, + WorkerThreadPool& WorkerPool, + bool ForceDownload, + bool IgnoreMissingAttachments, + bool CleanOplog, + EPartialBlockRequestMode PartialBlockRequestMode, + double HostLatencySec, + double CacheLatencySec, + JobContext* OptionalContext); std::vector GetBlockHashesFromOplog(CbObjectView ContainerObject); std::vector GetBlocksFromOplog(CbObjectView ContainerObject, std::span IncludeBlockHashes); diff --git a/src/zenremotestore/jupiter/jupiterhost.cpp b/src/zenremotestore/jupiter/jupiterhost.cpp index 7706f00c2..2583cfc84 100644 --- a/src/zenremotestore/jupiter/jupiterhost.cpp +++ b/src/zenremotestore/jupiter/jupiterhost.cpp @@ -59,7 +59,13 @@ TestJupiterEndpoint(std::string_view BaseUrl, const bool AssumeHttp2, const bool HttpClient::Response TestResponse = TestHttpClient.Get("/health/live"); if (TestResponse.IsSuccess()) { - return {.Success = true}; + LatencyTestResult LatencyResult = MeasureLatency(TestHttpClient, "/health/ready"); + + if (!LatencyResult.Success) + { + return {.Success = false, .FailureReason = LatencyResult.FailureReason}; + } + return {.Success = true, .LatencySeconds = LatencyResult.LatencySeconds}; } return {.Success = false, .FailureReason = TestResponse.ErrorMessage("")}; } diff --git a/src/zenremotestore/operationlogoutput.cpp b/src/zenremotestore/operationlogoutput.cpp index 0837ed716..7ed93c947 100644 --- a/src/zenremotestore/operationlogoutput.cpp +++ b/src/zenremotestore/operationlogoutput.cpp @@ -95,7 +95,7 @@ StandardLogOutputProgressBar::Finish() } OperationLogOutput* -CreateStandardLogOutput(LoggerRef& Log) +CreateStandardLogOutput(LoggerRef Log) { return new StandardLogOutput(Log); } diff --git a/src/zenremotestore/partialblockrequestmode.cpp b/src/zenremotestore/partialblockrequestmode.cpp new file mode 100644 index 000000000..b3edf515b --- /dev/null +++ b/src/zenremotestore/partialblockrequestmode.cpp @@ -0,0 +1,27 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include + +#include + +namespace zen { + +EPartialBlockRequestMode +PartialBlockRequestModeFromString(const std::string_view ModeString) +{ + switch (HashStringAsLowerDjb2(ModeString)) + { + case HashStringDjb2("false"): + return EPartialBlockRequestMode::Off; + case HashStringDjb2("zencacheonly"): + return EPartialBlockRequestMode::ZenCacheOnly; + case HashStringDjb2("mixed"): + return EPartialBlockRequestMode::Mixed; + case HashStringDjb2("true"): + return EPartialBlockRequestMode::All; + default: + return EPartialBlockRequestMode::Invalid; + } +} + +} // namespace zen diff --git a/src/zenremotestore/projectstore/buildsremoteprojectstore.cpp b/src/zenremotestore/projectstore/buildsremoteprojectstore.cpp index a8e883dde..c42373e4d 100644 --- a/src/zenremotestore/projectstore/buildsremoteprojectstore.cpp +++ b/src/zenremotestore/projectstore/buildsremoteprojectstore.cpp @@ -441,7 +441,7 @@ public: catch (const HttpClientError& Ex) { Result.ErrorCode = MakeErrorCode(Ex); - Result.Reason = fmt::format("Failed listing know blocks for {}/{}/{}/{}. Reason: '{}'", + Result.Reason = fmt::format("Failed listing known blocks for {}/{}/{}/{}. Reason: '{}'", m_BuildStorageHttp.GetBaseUri(), m_Namespace, m_Bucket, @@ -451,7 +451,7 @@ public: catch (const std::exception& Ex) { Result.ErrorCode = gsl::narrow(HttpResponseCode::InternalServerError); - Result.Reason = fmt::format("Failed listing know blocks for {}/{}/{}/{}. Reason: '{}'", + Result.Reason = fmt::format("Failed listing known blocks for {}/{}/{}/{}. Reason: '{}'", m_BuildStorageHttp.GetBaseUri(), m_Namespace, m_Bucket, @@ -462,7 +462,94 @@ public: return Result; } - virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) override + virtual GetBlockDescriptionsResult GetBlockDescriptions(std::span BlockHashes) override + { + std::unique_ptr Output(CreateStandardLogOutput(Log())); + + ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero); + + GetBlockDescriptionsResult Result; + Stopwatch Timer; + auto _ = MakeGuard([&Timer, &Result]() { Result.ElapsedSeconds = Timer.GetElapsedTimeUs() / 1000000.0; }); + + try + { + Result.Blocks = zen::GetBlockDescriptions(*Output, + *m_BuildStorage, + m_BuildCacheStorage.get(), + m_BuildId, + m_OplogBuildPartId, + BlockHashes, + /*AttemptFallback*/ false, + /*IsQuiet*/ false, + /*IsVerbose)*/ false); + } + catch (const HttpClientError& Ex) + { + Result.ErrorCode = MakeErrorCode(Ex); + Result.Reason = fmt::format("Failed listing known blocks for {}/{}/{}/{}. Reason: '{}'", + m_BuildStorageHttp.GetBaseUri(), + m_Namespace, + m_Bucket, + m_BuildId, + Ex.what()); + } + catch (const std::exception& Ex) + { + Result.ErrorCode = gsl::narrow(HttpResponseCode::InternalServerError); + Result.Reason = fmt::format("Failed listing known blocks for {}/{}/{}/{}. Reason: '{}'", + m_BuildStorageHttp.GetBaseUri(), + m_Namespace, + m_Bucket, + m_BuildId, + Ex.what()); + } + return Result; + } + + virtual AttachmentExistsInCacheResult AttachmentExistsInCache(std::span RawHashes) override + { + AttachmentExistsInCacheResult Result; + Stopwatch Timer; + auto _ = MakeGuard([&Timer, &Result]() { Result.ElapsedSeconds = Timer.GetElapsedTimeUs() / 1000000.0; }); + try + { + const std::vector CacheExistsResult = + m_BuildCacheStorage->BlobsExists(m_BuildId, RawHashes); + + if (CacheExistsResult.size() == RawHashes.size()) + { + Result.HasBody.reserve(CacheExistsResult.size()); + for (size_t BlobIndex = 0; BlobIndex < CacheExistsResult.size(); BlobIndex++) + { + Result.HasBody.push_back(CacheExistsResult[BlobIndex].HasBody); + } + } + } + catch (const HttpClientError& Ex) + { + Result.ErrorCode = MakeErrorCode(Ex); + Result.Reason = fmt::format("Remote cache: Failed finding known blobs for {}/{}/{}/{}. Reason: '{}'", + m_BuildStorageHttp.GetBaseUri(), + m_Namespace, + m_Bucket, + m_BuildId, + Ex.what()); + } + catch (const std::exception& Ex) + { + Result.ErrorCode = gsl::narrow(HttpResponseCode::InternalServerError); + Result.Reason = fmt::format("Remote cache: Failed finding known blobs for {}/{}/{}/{}. Reason: '{}'", + m_BuildStorageHttp.GetBaseUri(), + m_Namespace, + m_Bucket, + m_BuildId, + Ex.what()); + } + return Result; + } + + virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash, const AttachmentRange& Range) override { ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero); @@ -474,7 +561,7 @@ public: { if (m_BuildCacheStorage) { - IoBuffer CachedBlob = m_BuildCacheStorage->GetBuildBlob(m_BuildId, RawHash); + IoBuffer CachedBlob = m_BuildCacheStorage->GetBuildBlob(m_BuildId, RawHash, Range.Offset, Range.Bytes); if (CachedBlob) { Result.Bytes = std::move(CachedBlob); @@ -482,20 +569,23 @@ public: } if (!Result.Bytes) { - Result.Bytes = m_BuildStorage->GetBuildBlob(m_BuildId, RawHash); + Result.Bytes = m_BuildStorage->GetBuildBlob(m_BuildId, RawHash, Range.Offset, Range.Bytes); if (m_BuildCacheStorage && Result.Bytes && m_PopulateCache) { - m_BuildCacheStorage->PutBuildBlob(m_BuildId, - RawHash, - Result.Bytes.GetContentType(), - CompositeBuffer(SharedBuffer(Result.Bytes))); + if (!Range) + { + m_BuildCacheStorage->PutBuildBlob(m_BuildId, + RawHash, + Result.Bytes.GetContentType(), + CompositeBuffer(SharedBuffer(Result.Bytes))); + } } } } catch (const HttpClientError& Ex) { Result.ErrorCode = MakeErrorCode(Ex); - Result.Reason = fmt::format("Failed listing know blocks for {}/{}/{}/{}. Reason: '{}'", + Result.Reason = fmt::format("Failed listing known blocks for {}/{}/{}/{}. Reason: '{}'", m_BuildStorageHttp.GetBaseUri(), m_Namespace, m_Bucket, @@ -505,7 +595,7 @@ public: catch (const std::exception& Ex) { Result.ErrorCode = gsl::narrow(HttpResponseCode::InternalServerError); - Result.Reason = fmt::format("Failed listing know blocks for {}/{}/{}/{}. Reason: '{}'", + Result.Reason = fmt::format("Failed listing known blocks for {}/{}/{}/{}. Reason: '{}'", m_BuildStorageHttp.GetBaseUri(), m_Namespace, m_Bucket, @@ -558,7 +648,7 @@ public: for (const IoHash& Hash : AttachmentsLeftToFind) { - LoadAttachmentResult ChunkResult = LoadAttachment(Hash); + LoadAttachmentResult ChunkResult = LoadAttachment(Hash, {}); if (ChunkResult.ErrorCode) { return LoadAttachmentsResult{ChunkResult}; @@ -623,7 +713,9 @@ CreateJupiterBuildsRemoteStore(LoggerRef InLog, bool Quiet, bool Unattended, bool Hidden, - WorkerThreadPool& CacheBackgroundWorkerPool) + WorkerThreadPool& CacheBackgroundWorkerPool, + double& OutHostLatencySec, + double& OutCacheLatencySec) { std::string Host = Options.Host; if (!Host.empty() && Host.find("://"sv) == std::string::npos) @@ -727,6 +819,10 @@ CreateJupiterBuildsRemoteStore(LoggerRef InLog, Options.ForceDisableBlocks, Options.ForceDisableTempBlocks, Options.PopulateCache); + + OutHostLatencySec = ResolveRes.HostLatencySec; + OutCacheLatencySec = ResolveRes.CacheLatencySec; + return RemoteStore; } diff --git a/src/zenremotestore/projectstore/fileremoteprojectstore.cpp b/src/zenremotestore/projectstore/fileremoteprojectstore.cpp index 3a67d3842..ec7fb7bbc 100644 --- a/src/zenremotestore/projectstore/fileremoteprojectstore.cpp +++ b/src/zenremotestore/projectstore/fileremoteprojectstore.cpp @@ -217,7 +217,18 @@ public: return Result; } - virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) override + virtual GetBlockDescriptionsResult GetBlockDescriptions(std::span BlockHashes) override + { + ZEN_UNUSED(BlockHashes); + return GetBlockDescriptionsResult{Result{.ErrorCode = int(HttpResponseCode::NotFound)}}; + } + + virtual AttachmentExistsInCacheResult AttachmentExistsInCache(std::span RawHashes) override + { + return AttachmentExistsInCacheResult{Result{.ErrorCode = 0}, std::vector(RawHashes.size(), false)}; + } + + virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash, const AttachmentRange& Range) override { Stopwatch Timer; LoadAttachmentResult Result; @@ -232,7 +243,14 @@ public: { BasicFile ChunkFile; ChunkFile.Open(ChunkPath, BasicFile::Mode::kRead); - Result.Bytes = ChunkFile.ReadAll(); + if (Range) + { + Result.Bytes = ChunkFile.ReadRange(Range.Offset, Range.Bytes); + } + else + { + Result.Bytes = ChunkFile.ReadAll(); + } } AddStats(0, Result.Bytes.GetSize(), Timer.GetElapsedTimeUs() * 1000); Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; @@ -245,7 +263,7 @@ public: LoadAttachmentsResult Result; for (const IoHash& Hash : RawHashes) { - LoadAttachmentResult ChunkResult = LoadAttachment(Hash); + LoadAttachmentResult ChunkResult = LoadAttachment(Hash, {}); if (ChunkResult.ErrorCode) { ChunkResult.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; diff --git a/src/zenremotestore/projectstore/jupiterremoteprojectstore.cpp b/src/zenremotestore/projectstore/jupiterremoteprojectstore.cpp index 462de2988..f8179831c 100644 --- a/src/zenremotestore/projectstore/jupiterremoteprojectstore.cpp +++ b/src/zenremotestore/projectstore/jupiterremoteprojectstore.cpp @@ -212,7 +212,18 @@ public: return Result; } - virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) override + virtual GetBlockDescriptionsResult GetBlockDescriptions(std::span BlockHashes) override + { + ZEN_UNUSED(BlockHashes); + return GetBlockDescriptionsResult{Result{.ErrorCode = int(HttpResponseCode::NotFound)}}; + } + + virtual AttachmentExistsInCacheResult AttachmentExistsInCache(std::span RawHashes) override + { + return AttachmentExistsInCacheResult{Result{.ErrorCode = 0}, std::vector(RawHashes.size(), false)}; + } + + virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash, const AttachmentRange& Range) override { JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect); JupiterResult GetResult = Session.GetCompressedBlob(m_Namespace, RawHash, m_TempFilePath); @@ -227,6 +238,10 @@ public: RawHash, Result.Reason); } + if (!Result.ErrorCode && Range) + { + Result.Bytes = IoBuffer(Result.Bytes, Range.Offset, Range.Bytes); + } return Result; } @@ -235,7 +250,7 @@ public: LoadAttachmentsResult Result; for (const IoHash& Hash : RawHashes) { - LoadAttachmentResult ChunkResult = LoadAttachment(Hash); + LoadAttachmentResult ChunkResult = LoadAttachment(Hash, {}); if (ChunkResult.ErrorCode) { return LoadAttachmentsResult{ChunkResult}; diff --git a/src/zenremotestore/projectstore/remoteprojectstore.cpp b/src/zenremotestore/projectstore/remoteprojectstore.cpp index 8be8eb0df..2a9da6f58 100644 --- a/src/zenremotestore/projectstore/remoteprojectstore.cpp +++ b/src/zenremotestore/projectstore/remoteprojectstore.cpp @@ -14,6 +14,7 @@ #include #include #include +#include #include #include #include @@ -229,29 +230,60 @@ namespace remotestore_impl { struct DownloadInfo { - uint64_t OplogSizeBytes = 0; - std::atomic AttachmentsDownloaded = 0; - std::atomic AttachmentBlocksDownloaded = 0; - std::atomic AttachmentBytesDownloaded = 0; - std::atomic AttachmentBlockBytesDownloaded = 0; - std::atomic AttachmentsStored = 0; - std::atomic AttachmentBytesStored = 0; - std::atomic_size_t MissingAttachmentCount = 0; + uint64_t OplogSizeBytes = 0; + std::atomic AttachmentsDownloaded = 0; + std::atomic AttachmentBlocksDownloaded = 0; + std::atomic AttachmentBlocksRangesDownloaded = 0; + std::atomic AttachmentBytesDownloaded = 0; + std::atomic AttachmentBlockBytesDownloaded = 0; + std::atomic AttachmentBlockRangeBytesDownloaded = 0; + std::atomic AttachmentsStored = 0; + std::atomic AttachmentBytesStored = 0; + std::atomic_size_t MissingAttachmentCount = 0; }; - void DownloadAndSaveBlockChunks(CidStore& ChunkStore, - RemoteProjectStore& RemoteStore, - bool IgnoreMissingAttachments, - JobContext* OptionalContext, - WorkerThreadPool& NetworkWorkerPool, - WorkerThreadPool& WorkerPool, - Latch& AttachmentsDownloadLatch, - Latch& AttachmentsWriteLatch, - AsyncRemoteResult& RemoteResult, - DownloadInfo& Info, - Stopwatch& LoadAttachmentsTimer, - std::atomic_uint64_t& DownloadStartMS, - const std::vector& Chunks) + class JobContextLogOutput : public OperationLogOutput + { + public: + JobContextLogOutput(JobContext* OptionalContext) : m_OptionalContext(OptionalContext) {} + virtual void EmitLogMessage(int LogLevel, std::string_view Format, fmt::format_args Args) override + { + ZEN_UNUSED(LogLevel); + if (m_OptionalContext) + { + fmt::basic_memory_buffer MessageBuffer; + fmt::vformat_to(fmt::appender(MessageBuffer), Format, Args); + remotestore_impl::ReportMessage(m_OptionalContext, std::string_view(MessageBuffer.data(), MessageBuffer.size())); + } + } + + virtual void SetLogOperationName(std::string_view Name) override { ZEN_UNUSED(Name); } + virtual void SetLogOperationProgress(uint32_t StepIndex, uint32_t StepCount) override { ZEN_UNUSED(StepIndex, StepCount); } + virtual uint32_t GetProgressUpdateDelayMS() override { return 0; } + virtual ProgressBar* CreateProgressBar(std::string_view InSubTask) override + { + ZEN_UNUSED(InSubTask); + return nullptr; + } + + private: + JobContext* m_OptionalContext; + }; + + void DownloadAndSaveBlockChunks(CidStore& ChunkStore, + RemoteProjectStore& RemoteStore, + bool IgnoreMissingAttachments, + JobContext* OptionalContext, + WorkerThreadPool& NetworkWorkerPool, + WorkerThreadPool& WorkerPool, + Latch& AttachmentsDownloadLatch, + Latch& AttachmentsWriteLatch, + AsyncRemoteResult& RemoteResult, + DownloadInfo& Info, + Stopwatch& LoadAttachmentsTimer, + std::atomic_uint64_t& DownloadStartMS, + ThinChunkBlockDescription&& ThinBlockDescription, + std::vector&& NeededChunkIndexes) { AttachmentsDownloadLatch.AddCount(1); NetworkWorkerPool.ScheduleWork( @@ -261,7 +293,8 @@ namespace remotestore_impl { &AttachmentsDownloadLatch, &AttachmentsWriteLatch, &RemoteResult, - Chunks = Chunks, + ThinBlockDescription = std::move(ThinBlockDescription), + NeededChunkIndexes = std::move(NeededChunkIndexes), &Info, &LoadAttachmentsTimer, &DownloadStartMS, @@ -276,6 +309,13 @@ namespace remotestore_impl { } try { + std::vector Chunks; + Chunks.reserve(NeededChunkIndexes.size()); + for (uint32_t ChunkIndex : NeededChunkIndexes) + { + Chunks.push_back(ThinBlockDescription.ChunkRawHashes[ChunkIndex]); + } + uint64_t Unset = (std::uint64_t)-1; DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs()); RemoteProjectStore::LoadAttachmentsResult Result = RemoteStore.LoadAttachments(Chunks); @@ -293,7 +333,12 @@ namespace remotestore_impl { } return; } - Info.AttachmentsDownloaded.fetch_add(Chunks.size()); + Info.AttachmentsDownloaded.fetch_add(Result.Chunks.size()); + for (const auto& It : Result.Chunks) + { + uint64_t ChunkSize = It.second.GetCompressedSize(); + Info.AttachmentBytesDownloaded.fetch_add(ChunkSize); + } ZEN_INFO("Loaded {} bulk attachments in {}", Chunks.size(), NiceTimeSpanMs(static_cast(Result.ElapsedSeconds * 1000))); @@ -320,8 +365,6 @@ namespace remotestore_impl { for (const auto& It : Chunks) { - uint64_t ChunkSize = It.second.GetCompressedSize(); - Info.AttachmentBytesDownloaded.fetch_add(ChunkSize); WriteAttachmentBuffers.push_back(It.second.GetCompressed().Flatten().AsIoBuffer()); WriteRawHashes.push_back(It.first); } @@ -350,28 +393,29 @@ namespace remotestore_impl { catch (const std::exception& Ex) { RemoteResult.SetError(gsl::narrow(HttpResponseCode::InternalServerError), - fmt::format("Failed to bulk load {} attachments", Chunks.size()), + fmt::format("Failed to bulk load {} attachments", NeededChunkIndexes.size()), Ex.what()); } }, WorkerThreadPool::EMode::EnableBacklog); }; - void DownloadAndSaveBlock(CidStore& ChunkStore, - RemoteProjectStore& RemoteStore, - bool IgnoreMissingAttachments, - JobContext* OptionalContext, - WorkerThreadPool& NetworkWorkerPool, - WorkerThreadPool& WorkerPool, - Latch& AttachmentsDownloadLatch, - Latch& AttachmentsWriteLatch, - AsyncRemoteResult& RemoteResult, - DownloadInfo& Info, - Stopwatch& LoadAttachmentsTimer, - std::atomic_uint64_t& DownloadStartMS, - const IoHash& BlockHash, - const std::vector& Chunks, - uint32_t RetriesLeft) + void DownloadAndSaveBlock(CidStore& ChunkStore, + RemoteProjectStore& RemoteStore, + bool IgnoreMissingAttachments, + JobContext* OptionalContext, + WorkerThreadPool& NetworkWorkerPool, + WorkerThreadPool& WorkerPool, + Latch& AttachmentsDownloadLatch, + Latch& AttachmentsWriteLatch, + AsyncRemoteResult& RemoteResult, + DownloadInfo& Info, + Stopwatch& LoadAttachmentsTimer, + std::atomic_uint64_t& DownloadStartMS, + const IoHash& BlockHash, + const tsl::robin_map& AllNeededPartialChunkHashesLookup, + std::span> ChunkDownloadedFlags, + uint32_t RetriesLeft) { AttachmentsDownloadLatch.AddCount(1); NetworkWorkerPool.ScheduleWork( @@ -381,7 +425,6 @@ namespace remotestore_impl { &RemoteStore, &NetworkWorkerPool, &WorkerPool, - BlockHash, &RemoteResult, &Info, &LoadAttachmentsTimer, @@ -389,7 +432,9 @@ namespace remotestore_impl { IgnoreMissingAttachments, OptionalContext, RetriesLeft, - Chunks = std::vector(Chunks)]() { + BlockHash = IoHash(BlockHash), + &AllNeededPartialChunkHashesLookup, + ChunkDownloadedFlags]() { ZEN_TRACE_CPU("DownloadBlock"); auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); }); @@ -401,7 +446,7 @@ namespace remotestore_impl { { uint64_t Unset = (std::uint64_t)-1; DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs()); - RemoteProjectStore::LoadAttachmentResult BlockResult = RemoteStore.LoadAttachment(BlockHash); + RemoteProjectStore::LoadAttachmentResult BlockResult = RemoteStore.LoadAttachment(BlockHash, {}); if (BlockResult.ErrorCode) { ReportMessage(OptionalContext, @@ -422,10 +467,10 @@ namespace remotestore_impl { } uint64_t BlockSize = BlockResult.Bytes.GetSize(); Info.AttachmentBlocksDownloaded.fetch_add(1); - ZEN_INFO("Loaded block attachment '{}' in {} ({})", - BlockHash, - NiceTimeSpanMs(static_cast(BlockResult.ElapsedSeconds * 1000)), - NiceBytes(BlockSize)); + ZEN_DEBUG("Loaded block attachment '{}' in {} ({})", + BlockHash, + NiceTimeSpanMs(static_cast(BlockResult.ElapsedSeconds * 1000)), + NiceBytes(BlockSize)); Info.AttachmentBlockBytesDownloaded.fetch_add(BlockSize); AttachmentsWriteLatch.AddCount(1); @@ -436,7 +481,6 @@ namespace remotestore_impl { &RemoteStore, &NetworkWorkerPool, &WorkerPool, - BlockHash, &RemoteResult, &Info, &LoadAttachmentsTimer, @@ -444,8 +488,10 @@ namespace remotestore_impl { IgnoreMissingAttachments, OptionalContext, RetriesLeft, - Chunks = std::move(Chunks), - Bytes = std::move(BlockResult.Bytes)]() { + BlockHash = IoHash(BlockHash), + &AllNeededPartialChunkHashesLookup, + ChunkDownloadedFlags, + Bytes = std::move(BlockResult.Bytes)]() { auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); }); if (RemoteResult.IsError()) { @@ -454,9 +500,6 @@ namespace remotestore_impl { try { ZEN_ASSERT(Bytes.Size() > 0); - std::unordered_set WantedChunks; - WantedChunks.reserve(Chunks.size()); - WantedChunks.insert(Chunks.begin(), Chunks.end()); std::vector WriteAttachmentBuffers; std::vector WriteRawHashes; @@ -485,7 +528,8 @@ namespace remotestore_impl { LoadAttachmentsTimer, DownloadStartMS, BlockHash, - std::move(Chunks), + AllNeededPartialChunkHashesLookup, + ChunkDownloadedFlags, RetriesLeft - 1); } ReportMessage( @@ -519,7 +563,8 @@ namespace remotestore_impl { LoadAttachmentsTimer, DownloadStartMS, BlockHash, - std::move(Chunks), + AllNeededPartialChunkHashesLookup, + ChunkDownloadedFlags, RetriesLeft - 1); } ReportMessage(OptionalContext, @@ -546,28 +591,36 @@ namespace remotestore_impl { uint64_t BlockSize = BlockPayload.GetSize(); uint64_t BlockHeaderSize = 0; - bool StoreChunksOK = IterateChunkBlock( - BlockPayload.Flatten(), - [&WantedChunks, &WriteAttachmentBuffers, &WriteRawHashes, &Info, &PotentialSize]( - CompressedBuffer&& Chunk, - const IoHash& AttachmentRawHash) { - if (WantedChunks.contains(AttachmentRawHash)) - { - WriteAttachmentBuffers.emplace_back(Chunk.GetCompressed().Flatten().AsIoBuffer()); - IoHash RawHash; - uint64_t RawSize; - ZEN_ASSERT( - CompressedBuffer::ValidateCompressedHeader(WriteAttachmentBuffers.back(), - RawHash, - RawSize, - /*OutOptionalTotalCompressedSize*/ nullptr)); - ZEN_ASSERT(RawHash == AttachmentRawHash); - WriteRawHashes.emplace_back(AttachmentRawHash); - WantedChunks.erase(AttachmentRawHash); - PotentialSize += WriteAttachmentBuffers.back().GetSize(); - } - }, - BlockHeaderSize); + + bool StoreChunksOK = IterateChunkBlock( + BlockPayload.Flatten(), + [&AllNeededPartialChunkHashesLookup, + &ChunkDownloadedFlags, + &WriteAttachmentBuffers, + &WriteRawHashes, + &Info, + &PotentialSize](CompressedBuffer&& Chunk, const IoHash& AttachmentRawHash) { + auto ChunkIndexIt = AllNeededPartialChunkHashesLookup.find(AttachmentRawHash); + if (ChunkIndexIt != AllNeededPartialChunkHashesLookup.end()) + { + bool Expected = false; + if (ChunkDownloadedFlags[ChunkIndexIt->second].compare_exchange_strong(Expected, true)) + { + WriteAttachmentBuffers.emplace_back(Chunk.GetCompressed().Flatten().AsIoBuffer()); + IoHash RawHash; + uint64_t RawSize; + ZEN_ASSERT( + CompressedBuffer::ValidateCompressedHeader(WriteAttachmentBuffers.back(), + RawHash, + RawSize, + /*OutOptionalTotalCompressedSize*/ nullptr)); + ZEN_ASSERT(RawHash == AttachmentRawHash); + WriteRawHashes.emplace_back(AttachmentRawHash); + PotentialSize += WriteAttachmentBuffers.back().GetSize(); + } + } + }, + BlockHeaderSize); if (!StoreChunksOK) { @@ -582,8 +635,6 @@ namespace remotestore_impl { return; } - ZEN_ASSERT(WantedChunks.empty()); - if (!WriteAttachmentBuffers.empty()) { auto Results = ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes); @@ -625,6 +676,293 @@ namespace remotestore_impl { WorkerThreadPool::EMode::EnableBacklog); }; + void DownloadAndSavePartialBlock(CidStore& ChunkStore, + RemoteProjectStore& RemoteStore, + bool IgnoreMissingAttachments, + JobContext* OptionalContext, + WorkerThreadPool& NetworkWorkerPool, + WorkerThreadPool& WorkerPool, + Latch& AttachmentsDownloadLatch, + Latch& AttachmentsWriteLatch, + AsyncRemoteResult& RemoteResult, + DownloadInfo& Info, + Stopwatch& LoadAttachmentsTimer, + std::atomic_uint64_t& DownloadStartMS, + const ChunkBlockDescription& BlockDescription, + std::span BlockRangeDescriptors, + size_t BlockRangeIndexStart, + size_t BlockRangeCount, + const tsl::robin_map& AllNeededPartialChunkHashesLookup, + std::span> ChunkDownloadedFlags, + uint32_t RetriesLeft) + { + AttachmentsDownloadLatch.AddCount(1); + NetworkWorkerPool.ScheduleWork( + [&AttachmentsDownloadLatch, + &AttachmentsWriteLatch, + &ChunkStore, + &RemoteStore, + &NetworkWorkerPool, + &WorkerPool, + &RemoteResult, + &Info, + &LoadAttachmentsTimer, + &DownloadStartMS, + IgnoreMissingAttachments, + OptionalContext, + RetriesLeft, + BlockDescription, + BlockRangeDescriptors, + BlockRangeIndexStart, + BlockRangeCount, + &AllNeededPartialChunkHashesLookup, + ChunkDownloadedFlags]() { + ZEN_TRACE_CPU("DownloadBlockRanges"); + + auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); }); + try + { + uint64_t Unset = (std::uint64_t)-1; + DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs()); + + double DownloadElapsedSeconds = 0; + uint64_t DownloadedBytes = 0; + + for (size_t BlockRangeIndex = BlockRangeIndexStart; BlockRangeIndex < BlockRangeIndexStart + BlockRangeCount; + BlockRangeIndex++) + { + if (RemoteResult.IsError()) + { + return; + } + + const ChunkBlockAnalyser::BlockRangeDescriptor& BlockRange = BlockRangeDescriptors[BlockRangeIndex]; + + RemoteProjectStore::LoadAttachmentResult BlockResult = + RemoteStore.LoadAttachment(BlockDescription.BlockHash, + {.Offset = BlockRange.RangeStart, .Bytes = BlockRange.RangeLength}); + if (BlockResult.ErrorCode) + { + ReportMessage(OptionalContext, + fmt::format("Failed to download block attachment '{}' range {},{} ({}): {}", + BlockDescription.BlockHash, + BlockRange.RangeStart, + BlockRange.RangeLength, + BlockResult.ErrorCode, + BlockResult.Reason)); + Info.MissingAttachmentCount.fetch_add(1); + if (!IgnoreMissingAttachments) + { + RemoteResult.SetError(BlockResult.ErrorCode, BlockResult.Reason, BlockResult.Text); + } + return; + } + if (RemoteResult.IsError()) + { + return; + } + uint64_t BlockPartSize = BlockResult.Bytes.GetSize(); + if (BlockPartSize != BlockRange.RangeLength) + { + std::string ErrorString = + fmt::format("Failed to download block attachment '{}' range {},{}, got {} bytes ({}): {}", + BlockDescription.BlockHash, + BlockRange.RangeStart, + BlockRange.RangeLength, + BlockPartSize, + RemoteResult.GetError(), + RemoteResult.GetErrorReason()); + + ReportMessage(OptionalContext, ErrorString); + Info.MissingAttachmentCount.fetch_add(1); + if (!IgnoreMissingAttachments) + { + RemoteResult.SetError(gsl::narrow(HttpResponseCode::NotFound), + "Mismatching block part range received", + ErrorString); + } + return; + } + Info.AttachmentBlocksRangesDownloaded.fetch_add(1); + + DownloadElapsedSeconds += BlockResult.ElapsedSeconds; + DownloadedBytes += BlockPartSize; + + Info.AttachmentBlockRangeBytesDownloaded.fetch_add(BlockPartSize); + + AttachmentsWriteLatch.AddCount(1); + WorkerPool.ScheduleWork( + [&AttachmentsDownloadLatch, + &AttachmentsWriteLatch, + &ChunkStore, + &RemoteStore, + &NetworkWorkerPool, + &WorkerPool, + &RemoteResult, + &Info, + &LoadAttachmentsTimer, + &DownloadStartMS, + IgnoreMissingAttachments, + OptionalContext, + RetriesLeft, + BlockDescription, + BlockRange, + &AllNeededPartialChunkHashesLookup, + ChunkDownloadedFlags, + BlockPayload = std::move(BlockResult.Bytes)]() { + auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); }); + if (RemoteResult.IsError()) + { + return; + } + try + { + ZEN_ASSERT(BlockPayload.Size() > 0); + std::vector WriteAttachmentBuffers; + std::vector WriteRawHashes; + + uint64_t PotentialSize = 0; + uint64_t UsedSize = 0; + uint64_t BlockPartSize = BlockPayload.GetSize(); + + uint32_t OffsetInBlock = 0; + for (uint32_t ChunkBlockIndex = BlockRange.ChunkBlockIndexStart; + ChunkBlockIndex < BlockRange.ChunkBlockIndexStart + BlockRange.ChunkBlockIndexCount; + ChunkBlockIndex++) + { + const uint32_t ChunkCompressedSize = BlockDescription.ChunkCompressedLengths[ChunkBlockIndex]; + const IoHash& ChunkHash = BlockDescription.ChunkRawHashes[ChunkBlockIndex]; + + if (auto ChunkIndexIt = AllNeededPartialChunkHashesLookup.find(ChunkHash); + ChunkIndexIt != AllNeededPartialChunkHashesLookup.end()) + { + bool Expected = false; + if (ChunkDownloadedFlags[ChunkIndexIt->second].compare_exchange_strong(Expected, true)) + { + IoHash VerifyChunkHash; + uint64_t VerifyChunkSize; + CompressedBuffer CompressedChunk = CompressedBuffer::FromCompressed( + SharedBuffer(IoBuffer(BlockPayload, OffsetInBlock, ChunkCompressedSize)), + VerifyChunkHash, + VerifyChunkSize); + if (!CompressedChunk) + { + std::string ErrorString = fmt::format( + "Chunk at {},{} in block attachment '{}' is not a valid compressed buffer", + OffsetInBlock, + ChunkCompressedSize, + BlockDescription.BlockHash); + ReportMessage(OptionalContext, ErrorString); + Info.MissingAttachmentCount.fetch_add(1); + if (!IgnoreMissingAttachments) + { + RemoteResult.SetError(gsl::narrow(HttpResponseCode::NotFound), + "Malformed chunk block", + ErrorString); + } + continue; + } + if (VerifyChunkHash != ChunkHash) + { + std::string ErrorString = fmt::format( + "Chunk at {},{} in block attachment '{}' has mismatching hash, expected {}, got {}", + OffsetInBlock, + ChunkCompressedSize, + BlockDescription.BlockHash, + ChunkHash, + VerifyChunkHash); + ReportMessage(OptionalContext, ErrorString); + Info.MissingAttachmentCount.fetch_add(1); + if (!IgnoreMissingAttachments) + { + RemoteResult.SetError(gsl::narrow(HttpResponseCode::NotFound), + "Malformed chunk block", + ErrorString); + } + continue; + } + if (VerifyChunkSize != BlockDescription.ChunkRawLengths[ChunkBlockIndex]) + { + std::string ErrorString = fmt::format( + "Chunk at {},{} in block attachment '{}' has mismatching raw size, expected {}, " + "got {}", + OffsetInBlock, + ChunkCompressedSize, + BlockDescription.BlockHash, + BlockDescription.ChunkRawLengths[ChunkBlockIndex], + VerifyChunkSize); + ReportMessage(OptionalContext, ErrorString); + Info.MissingAttachmentCount.fetch_add(1); + if (!IgnoreMissingAttachments) + { + RemoteResult.SetError(gsl::narrow(HttpResponseCode::NotFound), + "Malformed chunk block", + ErrorString); + } + continue; + } + + WriteAttachmentBuffers.emplace_back(CompressedChunk.GetCompressed().Flatten().AsIoBuffer()); + WriteRawHashes.emplace_back(ChunkHash); + PotentialSize += WriteAttachmentBuffers.back().GetSize(); + } + } + OffsetInBlock += ChunkCompressedSize; + } + + if (!WriteAttachmentBuffers.empty()) + { + auto Results = ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes); + for (size_t Index = 0; Index < Results.size(); Index++) + { + const auto& Result = Results[Index]; + if (Result.New) + { + Info.AttachmentBytesStored.fetch_add(WriteAttachmentBuffers[Index].GetSize()); + Info.AttachmentsStored.fetch_add(1); + UsedSize += WriteAttachmentBuffers[Index].GetSize(); + } + } + ZEN_DEBUG("Used {} (matching {}) out of {} for block {} range {}, {} ({} %) (use of matching {}%)", + NiceBytes(UsedSize), + NiceBytes(PotentialSize), + NiceBytes(BlockPartSize), + BlockDescription.BlockHash, + BlockRange.RangeStart, + BlockRange.RangeLength, + (100 * UsedSize) / BlockPartSize, + PotentialSize > 0 ? (UsedSize * 100) / PotentialSize : 0); + } + } + catch (const std::exception& Ex) + { + RemoteResult.SetError(gsl::narrow(HttpResponseCode::InternalServerError), + fmt::format("Failed save block attachment {} range {}, {}", + BlockDescription.BlockHash, + BlockRange.RangeStart, + BlockRange.RangeLength), + Ex.what()); + } + }, + WorkerThreadPool::EMode::EnableBacklog); + } + + ZEN_DEBUG("Loaded {} ranges from block attachment '{}' in {} ({})", + BlockRangeCount, + BlockDescription.BlockHash, + NiceTimeSpanMs(static_cast(DownloadElapsedSeconds * 1000)), + NiceBytes(DownloadedBytes)); + } + catch (const std::exception& Ex) + { + RemoteResult.SetError(gsl::narrow(HttpResponseCode::InternalServerError), + fmt::format("Failed to download block attachment {} ranges", BlockDescription.BlockHash), + Ex.what()); + } + }, + WorkerThreadPool::EMode::EnableBacklog); + }; + void DownloadAndSaveAttachment(CidStore& ChunkStore, RemoteProjectStore& RemoteStore, bool IgnoreMissingAttachments, @@ -664,7 +1002,7 @@ namespace remotestore_impl { { uint64_t Unset = (std::uint64_t)-1; DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs()); - RemoteProjectStore::LoadAttachmentResult AttachmentResult = RemoteStore.LoadAttachment(RawHash); + RemoteProjectStore::LoadAttachmentResult AttachmentResult = RemoteStore.LoadAttachment(RawHash, {}); if (AttachmentResult.ErrorCode) { ReportMessage(OptionalContext, @@ -680,10 +1018,10 @@ namespace remotestore_impl { return; } uint64_t AttachmentSize = AttachmentResult.Bytes.GetSize(); - ZEN_INFO("Loaded large attachment '{}' in {} ({})", - RawHash, - NiceTimeSpanMs(static_cast(AttachmentResult.ElapsedSeconds * 1000)), - NiceBytes(AttachmentSize)); + ZEN_DEBUG("Loaded large attachment '{}' in {} ({})", + RawHash, + NiceTimeSpanMs(static_cast(AttachmentResult.ElapsedSeconds * 1000)), + NiceBytes(AttachmentSize)); Info.AttachmentsDownloaded.fetch_add(1); if (RemoteResult.IsError()) { @@ -1224,35 +1562,7 @@ BuildContainer(CidStore& ChunkStore, { using namespace std::literals; - class JobContextLogOutput : public OperationLogOutput - { - public: - JobContextLogOutput(JobContext* OptionalContext) : m_OptionalContext(OptionalContext) {} - virtual void EmitLogMessage(int LogLevel, std::string_view Format, fmt::format_args Args) override - { - ZEN_UNUSED(LogLevel); - if (m_OptionalContext) - { - fmt::basic_memory_buffer MessageBuffer; - fmt::vformat_to(fmt::appender(MessageBuffer), Format, Args); - remotestore_impl::ReportMessage(m_OptionalContext, std::string_view(MessageBuffer.data(), MessageBuffer.size())); - } - } - - virtual void SetLogOperationName(std::string_view Name) override { ZEN_UNUSED(Name); } - virtual void SetLogOperationProgress(uint32_t StepIndex, uint32_t StepCount) override { ZEN_UNUSED(StepIndex, StepCount); } - virtual uint32_t GetProgressUpdateDelayMS() override { return 0; } - virtual ProgressBar* CreateProgressBar(std::string_view InSubTask) override - { - ZEN_UNUSED(InSubTask); - return nullptr; - } - - private: - JobContext* m_OptionalContext; - }; - - std::unique_ptr LogOutput(std::make_unique(OptionalContext)); + std::unique_ptr LogOutput(std::make_unique(OptionalContext)); size_t OpCount = 0; @@ -2768,14 +3078,15 @@ SaveOplog(CidStore& ChunkStore, }; RemoteProjectStore::Result -ParseOplogContainer(const CbObject& ContainerObject, - const std::function RawHashes)>& OnReferencedAttachments, - const std::function& HasAttachment, - const std::function&& Chunks)>& OnNeedBlock, - const std::function& OnNeedAttachment, - const std::function& OnChunkedAttachment, - CbObject& OutOplogSection, - JobContext* OptionalContext) +ParseOplogContainer( + const CbObject& ContainerObject, + const std::function RawHashes)>& OnReferencedAttachments, + const std::function& HasAttachment, + const std::function&& NeededChunkIndexes)>& OnNeedBlock, + const std::function& OnNeedAttachment, + const std::function& OnChunkedAttachment, + CbObject& OutOplogSection, + JobContext* OptionalContext) { using namespace std::literals; @@ -2801,12 +3112,12 @@ ParseOplogContainer(const CbObject& ContainerObject, "Section has unexpected data type", "Failed to save oplog container"}; } - std::unordered_set OpsAttachments; + std::unordered_set NeededAttachments; { CbArrayView OpsArray = OutOplogSection["ops"sv].AsArrayView(); for (CbFieldView OpEntry : OpsArray) { - OpEntry.IterateAttachments([&](CbFieldView FieldView) { OpsAttachments.insert(FieldView.AsAttachment()); }); + OpEntry.IterateAttachments([&](CbFieldView FieldView) { NeededAttachments.insert(FieldView.AsAttachment()); }); if (remotestore_impl::IsCancelled(OptionalContext)) { return RemoteProjectStore::Result{.ErrorCode = gsl::narrow(HttpResponseCode::OK), @@ -2816,7 +3127,7 @@ ParseOplogContainer(const CbObject& ContainerObject, } } { - std::vector ReferencedAttachments(OpsAttachments.begin(), OpsAttachments.end()); + std::vector ReferencedAttachments(NeededAttachments.begin(), NeededAttachments.end()); OnReferencedAttachments(ReferencedAttachments); } @@ -2827,24 +3138,27 @@ ParseOplogContainer(const CbObject& ContainerObject, .Reason = "Operation cancelled"}; } - remotestore_impl::ReportMessage(OptionalContext, fmt::format("Oplog references {} attachments", OpsAttachments.size())); + remotestore_impl::ReportMessage(OptionalContext, fmt::format("Oplog references {} attachments", NeededAttachments.size())); CbArrayView ChunkedFilesArray = ContainerObject["chunkedfiles"sv].AsArrayView(); for (CbFieldView ChunkedFileField : ChunkedFilesArray) { CbObjectView ChunkedFileView = ChunkedFileField.AsObjectView(); IoHash RawHash = ChunkedFileView["rawhash"sv].AsHash(); - if (OpsAttachments.contains(RawHash) && (!HasAttachment(RawHash))) + if (NeededAttachments.erase(RawHash) == 1) { - ChunkedInfo Chunked = ReadChunkedInfo(ChunkedFileView); - - OnReferencedAttachments(Chunked.ChunkHashes); - OpsAttachments.insert(Chunked.ChunkHashes.begin(), Chunked.ChunkHashes.end()); - OnChunkedAttachment(Chunked); - ZEN_INFO("Requesting chunked attachment '{}' ({}) built from {} chunks", - Chunked.RawHash, - NiceBytes(Chunked.RawSize), - Chunked.ChunkHashes.size()); + if (!HasAttachment(RawHash)) + { + ChunkedInfo Chunked = ReadChunkedInfo(ChunkedFileView); + + OnReferencedAttachments(Chunked.ChunkHashes); + NeededAttachments.insert(Chunked.ChunkHashes.begin(), Chunked.ChunkHashes.end()); + OnChunkedAttachment(Chunked); + ZEN_INFO("Requesting chunked attachment '{}' ({}) built from {} chunks", + Chunked.RawHash, + NiceBytes(Chunked.RawSize), + Chunked.ChunkHashes.size()); + } } if (remotestore_impl::IsCancelled(OptionalContext)) { @@ -2854,6 +3168,8 @@ ParseOplogContainer(const CbObject& ContainerObject, } } + std::vector ThinBlocksDescriptions; + size_t NeedBlockCount = 0; CbArrayView BlocksArray = ContainerObject["blocks"sv].AsArrayView(); for (CbFieldView BlockField : BlocksArray) @@ -2863,45 +3179,38 @@ ParseOplogContainer(const CbObject& ContainerObject, CbArrayView ChunksArray = BlockView["chunks"sv].AsArrayView(); - std::vector NeededChunks; - NeededChunks.reserve(ChunksArray.Num()); - if (BlockHash == IoHash::Zero) + std::vector ChunkHashes; + ChunkHashes.reserve(ChunksArray.Num()); + for (CbFieldView ChunkField : ChunksArray) { - for (CbFieldView ChunkField : ChunksArray) - { - IoHash ChunkHash = ChunkField.AsBinaryAttachment(); - if (OpsAttachments.erase(ChunkHash) == 1) - { - if (!HasAttachment(ChunkHash)) - { - NeededChunks.emplace_back(ChunkHash); - } - } - } + ChunkHashes.push_back(ChunkField.AsHash()); } - else + ThinBlocksDescriptions.push_back(ThinChunkBlockDescription{.BlockHash = BlockHash, .ChunkRawHashes = std::move(ChunkHashes)}); + } + + for (ThinChunkBlockDescription& ThinBlockDescription : ThinBlocksDescriptions) + { + std::vector NeededBlockChunkIndexes; + for (uint32_t ChunkIndex = 0; ChunkIndex < ThinBlockDescription.ChunkRawHashes.size(); ChunkIndex++) { - for (CbFieldView ChunkField : ChunksArray) + const IoHash& ChunkHash = ThinBlockDescription.ChunkRawHashes[ChunkIndex]; + if (NeededAttachments.erase(ChunkHash) == 1) { - const IoHash ChunkHash = ChunkField.AsHash(); - if (OpsAttachments.erase(ChunkHash) == 1) + if (!HasAttachment(ChunkHash)) { - if (!HasAttachment(ChunkHash)) - { - NeededChunks.emplace_back(ChunkHash); - } + NeededBlockChunkIndexes.push_back(ChunkIndex); } } } - - if (!NeededChunks.empty()) + if (!NeededBlockChunkIndexes.empty()) { - OnNeedBlock(BlockHash, std::move(NeededChunks)); - if (BlockHash != IoHash::Zero) + if (ThinBlockDescription.BlockHash != IoHash::Zero) { NeedBlockCount++; } + OnNeedBlock(std::move(ThinBlockDescription), std::move(NeededBlockChunkIndexes)); } + if (remotestore_impl::IsCancelled(OptionalContext)) { return RemoteProjectStore::Result{.ErrorCode = gsl::narrow(HttpResponseCode::OK), @@ -2909,6 +3218,7 @@ ParseOplogContainer(const CbObject& ContainerObject, .Reason = "Operation cancelled"}; } } + remotestore_impl::ReportMessage(OptionalContext, fmt::format("Requesting {} of {} attachment blocks", NeedBlockCount, BlocksArray.Num())); @@ -2918,7 +3228,7 @@ ParseOplogContainer(const CbObject& ContainerObject, { IoHash AttachmentHash = LargeChunksField.AsBinaryAttachment(); - if (OpsAttachments.erase(AttachmentHash) == 1) + if (NeededAttachments.erase(AttachmentHash) == 1) { if (!HasAttachment(AttachmentHash)) { @@ -2941,14 +3251,15 @@ ParseOplogContainer(const CbObject& ContainerObject, } RemoteProjectStore::Result -SaveOplogContainer(ProjectStore::Oplog& Oplog, - const CbObject& ContainerObject, - const std::function RawHashes)>& OnReferencedAttachments, - const std::function& HasAttachment, - const std::function&& Chunks)>& OnNeedBlock, - const std::function& OnNeedAttachment, - const std::function& OnChunkedAttachment, - JobContext* OptionalContext) +SaveOplogContainer( + ProjectStore::Oplog& Oplog, + const CbObject& ContainerObject, + const std::function RawHashes)>& OnReferencedAttachments, + const std::function& HasAttachment, + const std::function&& NeededChunkIndexes)>& OnNeedBlock, + const std::function& OnNeedAttachment, + const std::function& OnChunkedAttachment, + JobContext* OptionalContext) { using namespace std::literals; @@ -2972,18 +3283,23 @@ SaveOplogContainer(ProjectStore::Oplog& Oplog, } RemoteProjectStore::Result -LoadOplog(CidStore& ChunkStore, - RemoteProjectStore& RemoteStore, - ProjectStore::Oplog& Oplog, - WorkerThreadPool& NetworkWorkerPool, - WorkerThreadPool& WorkerPool, - bool ForceDownload, - bool IgnoreMissingAttachments, - bool CleanOplog, - JobContext* OptionalContext) +LoadOplog(CidStore& ChunkStore, + RemoteProjectStore& RemoteStore, + ProjectStore::Oplog& Oplog, + WorkerThreadPool& NetworkWorkerPool, + WorkerThreadPool& WorkerPool, + bool ForceDownload, + bool IgnoreMissingAttachments, + bool CleanOplog, + EPartialBlockRequestMode PartialBlockRequestMode, + double HostLatencySec, + double CacheLatencySec, + JobContext* OptionalContext) { using namespace std::literals; + std::unique_ptr LogOutput(std::make_unique(OptionalContext)); + remotestore_impl::DownloadInfo Info; Stopwatch Timer; @@ -3035,6 +3351,14 @@ LoadOplog(CidStore& ChunkStore, return false; }; + struct NeededBlockDownload + { + ThinChunkBlockDescription ThinBlockDescription; + std::vector NeededChunkIndexes; + }; + + std::vector NeededBlockDownloads; + auto OnNeedBlock = [&RemoteStore, &ChunkStore, &NetworkWorkerPool, @@ -3047,8 +3371,9 @@ LoadOplog(CidStore& ChunkStore, &Info, &LoadAttachmentsTimer, &DownloadStartMS, + &NeededBlockDownloads, IgnoreMissingAttachments, - OptionalContext](const IoHash& BlockHash, std::vector&& Chunks) { + OptionalContext](ThinChunkBlockDescription&& ThinBlockDescription, std::vector&& NeededChunkIndexes) { if (RemoteResult.IsError()) { return; @@ -3056,7 +3381,7 @@ LoadOplog(CidStore& ChunkStore, BlockCountToDownload++; AttachmentCount.fetch_add(1); - if (BlockHash == IoHash::Zero) + if (ThinBlockDescription.BlockHash == IoHash::Zero) { DownloadAndSaveBlockChunks(ChunkStore, RemoteStore, @@ -3070,25 +3395,13 @@ LoadOplog(CidStore& ChunkStore, Info, LoadAttachmentsTimer, DownloadStartMS, - Chunks); + std::move(ThinBlockDescription), + std::move(NeededChunkIndexes)); } else { - DownloadAndSaveBlock(ChunkStore, - RemoteStore, - IgnoreMissingAttachments, - OptionalContext, - NetworkWorkerPool, - WorkerPool, - AttachmentsDownloadLatch, - AttachmentsWriteLatch, - RemoteResult, - Info, - LoadAttachmentsTimer, - DownloadStartMS, - BlockHash, - Chunks, - 3); + NeededBlockDownloads.push_back(NeededBlockDownload{.ThinBlockDescription = std::move(ThinBlockDescription), + .NeededChunkIndexes = std::move(NeededChunkIndexes)}); } }; @@ -3132,12 +3445,7 @@ LoadOplog(CidStore& ChunkStore, }; std::vector FilesToDechunk; - auto OnChunkedAttachment = [&Oplog, &ChunkStore, &FilesToDechunk, ForceDownload](const ChunkedInfo& Chunked) { - if (ForceDownload || !ChunkStore.ContainsChunk(Chunked.RawHash)) - { - FilesToDechunk.push_back(Chunked); - } - }; + auto OnChunkedAttachment = [&FilesToDechunk](const ChunkedInfo& Chunked) { FilesToDechunk.push_back(Chunked); }; auto OnReferencedAttachments = [&Oplog](std::span RawHashes) { Oplog.CaptureAddedAttachments(RawHashes); }; @@ -3165,6 +3473,185 @@ LoadOplog(CidStore& ChunkStore, BlockCountToDownload, FilesToDechunk.size())); + std::vector BlockHashes; + std::vector AllNeededChunkHashes; + BlockHashes.reserve(NeededBlockDownloads.size()); + for (const NeededBlockDownload& BlockDownload : NeededBlockDownloads) + { + BlockHashes.push_back(BlockDownload.ThinBlockDescription.BlockHash); + for (uint32_t ChunkIndex : BlockDownload.NeededChunkIndexes) + { + AllNeededChunkHashes.push_back(BlockDownload.ThinBlockDescription.ChunkRawHashes[ChunkIndex]); + } + } + + tsl::robin_map AllNeededPartialChunkHashesLookup = BuildHashLookup(AllNeededChunkHashes); + std::vector> ChunkDownloadedFlags(AllNeededChunkHashes.size()); + std::vector DownloadedViaLegacyChunkFlag(AllNeededChunkHashes.size(), false); + ChunkBlockAnalyser::BlockResult PartialBlocksResult; + + RemoteProjectStore::GetBlockDescriptionsResult BlockDescriptions = RemoteStore.GetBlockDescriptions(BlockHashes); + std::vector BlocksWithDescription; + BlocksWithDescription.reserve(BlockDescriptions.Blocks.size()); + for (const ChunkBlockDescription& BlockDescription : BlockDescriptions.Blocks) + { + BlocksWithDescription.push_back(BlockDescription.BlockHash); + } + { + auto WantIt = NeededBlockDownloads.begin(); + auto FindIt = BlockDescriptions.Blocks.begin(); + while (WantIt != NeededBlockDownloads.end()) + { + if (FindIt == BlockDescriptions.Blocks.end()) + { + // Fall back to full download as we can't get enough information about the block + DownloadAndSaveBlock(ChunkStore, + RemoteStore, + IgnoreMissingAttachments, + OptionalContext, + NetworkWorkerPool, + WorkerPool, + AttachmentsDownloadLatch, + AttachmentsWriteLatch, + RemoteResult, + Info, + LoadAttachmentsTimer, + DownloadStartMS, + WantIt->ThinBlockDescription.BlockHash, + AllNeededPartialChunkHashesLookup, + ChunkDownloadedFlags, + 3); + for (uint32_t BlockChunkIndex : WantIt->NeededChunkIndexes) + { + const IoHash& ChunkHash = WantIt->ThinBlockDescription.ChunkRawHashes[BlockChunkIndex]; + auto It = AllNeededPartialChunkHashesLookup.find(ChunkHash); + ZEN_ASSERT(It != AllNeededPartialChunkHashesLookup.end()); + uint32_t ChunkIndex = It->second; + DownloadedViaLegacyChunkFlag[ChunkIndex] = true; + } + WantIt++; + } + else if (WantIt->ThinBlockDescription.BlockHash == FindIt->BlockHash) + { + // Found + FindIt++; + WantIt++; + } + else + { + // Not a requested block? + ZEN_ASSERT(false); + } + } + } + if (!AllNeededChunkHashes.empty()) + { + std::vector PartialBlockDownloadModes; + + if (PartialBlockRequestMode == EPartialBlockRequestMode::Off) + { + PartialBlockDownloadModes.resize(BlocksWithDescription.size(), ChunkBlockAnalyser::EPartialBlockDownloadMode::Off); + } + else + { + RemoteProjectStore::AttachmentExistsInCacheResult CacheExistsResult = + RemoteStore.AttachmentExistsInCache(BlocksWithDescription); + if (CacheExistsResult.ErrorCode != 0 || CacheExistsResult.HasBody.size() != BlocksWithDescription.size()) + { + CacheExistsResult.HasBody.resize(BlocksWithDescription.size(), false); + } + + PartialBlockDownloadModes.reserve(BlocksWithDescription.size()); + + for (bool ExistsInCache : CacheExistsResult.HasBody) + { + if (PartialBlockRequestMode == EPartialBlockRequestMode::All) + { + PartialBlockDownloadModes.push_back(ExistsInCache ? ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRangeHighSpeed + : ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRange); + } + else if (PartialBlockRequestMode == EPartialBlockRequestMode::ZenCacheOnly) + { + PartialBlockDownloadModes.push_back(ExistsInCache ? ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRangeHighSpeed + : ChunkBlockAnalyser::EPartialBlockDownloadMode::Off); + } + else if (PartialBlockRequestMode == EPartialBlockRequestMode::Mixed) + { + PartialBlockDownloadModes.push_back(ExistsInCache ? ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRangeHighSpeed + : ChunkBlockAnalyser::EPartialBlockDownloadMode::SingleRange); + } + } + } + + ZEN_ASSERT(PartialBlockDownloadModes.size() == BlocksWithDescription.size()); + + ChunkBlockAnalyser PartialAnalyser(*LogOutput, + BlockDescriptions.Blocks, + ChunkBlockAnalyser::Options{.IsQuiet = false, + .IsVerbose = false, + .HostLatencySec = HostLatencySec, + .HostHighSpeedLatencySec = CacheLatencySec}); + + std::vector NeededBlocks = + PartialAnalyser.GetNeeded(AllNeededPartialChunkHashesLookup, + [&](uint32_t ChunkIndex) { return !DownloadedViaLegacyChunkFlag[ChunkIndex]; }); + + PartialBlocksResult = PartialAnalyser.CalculatePartialBlockDownloads(NeededBlocks, PartialBlockDownloadModes); + for (uint32_t FullBlockIndex : PartialBlocksResult.FullBlockIndexes) + { + DownloadAndSaveBlock(ChunkStore, + RemoteStore, + IgnoreMissingAttachments, + OptionalContext, + NetworkWorkerPool, + WorkerPool, + AttachmentsDownloadLatch, + AttachmentsWriteLatch, + RemoteResult, + Info, + LoadAttachmentsTimer, + DownloadStartMS, + BlockDescriptions.Blocks[FullBlockIndex].BlockHash, + AllNeededPartialChunkHashesLookup, + ChunkDownloadedFlags, + 3); + } + + for (size_t BlockRangeIndex = 0; BlockRangeIndex < PartialBlocksResult.BlockRanges.size();) + { + size_t RangeCount = 1; + size_t RangesLeft = PartialBlocksResult.BlockRanges.size() - BlockRangeIndex; + const ChunkBlockAnalyser::BlockRangeDescriptor& CurrentBlockRange = PartialBlocksResult.BlockRanges[BlockRangeIndex]; + while (RangeCount < RangesLeft && + CurrentBlockRange.BlockIndex == PartialBlocksResult.BlockRanges[BlockRangeIndex + RangeCount].BlockIndex) + { + RangeCount++; + } + + DownloadAndSavePartialBlock(ChunkStore, + RemoteStore, + IgnoreMissingAttachments, + OptionalContext, + NetworkWorkerPool, + WorkerPool, + AttachmentsDownloadLatch, + AttachmentsWriteLatch, + RemoteResult, + Info, + LoadAttachmentsTimer, + DownloadStartMS, + BlockDescriptions.Blocks[CurrentBlockRange.BlockIndex], + PartialBlocksResult.BlockRanges, + BlockRangeIndex, + RangeCount, + AllNeededPartialChunkHashesLookup, + ChunkDownloadedFlags, + 3); + + BlockRangeIndex += RangeCount; + } + } + AttachmentsDownloadLatch.CountDown(); while (!AttachmentsDownloadLatch.Wait(1000)) { @@ -3478,21 +3965,30 @@ LoadOplog(CidStore& ChunkStore, } } - remotestore_impl::ReportMessage( - OptionalContext, - fmt::format("Loaded oplog '{}' {} in {} ({}), Blocks: {} ({}), Attachments: {} ({}), Stored: {} ({}), Missing: {} {}", - RemoteStoreInfo.ContainerName, - Result.ErrorCode == 0 ? "SUCCESS" : "FAILURE", - NiceTimeSpanMs(static_cast(Result.ElapsedSeconds * 1000.0)), - NiceBytes(Info.OplogSizeBytes), - Info.AttachmentBlocksDownloaded.load(), - NiceBytes(Info.AttachmentBlockBytesDownloaded.load()), - Info.AttachmentsDownloaded.load(), - NiceBytes(Info.AttachmentBytesDownloaded.load()), - Info.AttachmentsStored.load(), - NiceBytes(Info.AttachmentBytesStored.load()), - Info.MissingAttachmentCount.load(), - remotestore_impl::GetStats(RemoteStore.GetStats(), TransferWallTimeMS))); + uint64_t TotalDownloads = + 1 + Info.AttachmentBlocksDownloaded.load() + Info.AttachmentBlocksRangesDownloaded.load() + Info.AttachmentsDownloaded.load(); + uint64_t TotalBytesDownloaded = Info.OplogSizeBytes + Info.AttachmentBlockBytesDownloaded.load() + + Info.AttachmentBlockRangeBytesDownloaded.load() + Info.AttachmentBytesDownloaded.load(); + + remotestore_impl::ReportMessage(OptionalContext, + fmt::format("Loaded oplog '{}' {} in {} ({}), Blocks: {} ({}), BlockRanges: {} ({}), Attachments: {} " + "({}), Total: {} ({}), Stored: {} ({}), Missing: {} {}", + RemoteStoreInfo.ContainerName, + Result.ErrorCode == 0 ? "SUCCESS" : "FAILURE", + NiceTimeSpanMs(static_cast(Result.ElapsedSeconds * 1000.0)), + NiceBytes(Info.OplogSizeBytes), + Info.AttachmentBlocksDownloaded.load(), + NiceBytes(Info.AttachmentBlockBytesDownloaded.load()), + Info.AttachmentBlocksRangesDownloaded.load(), + NiceBytes(Info.AttachmentBlockRangeBytesDownloaded.load()), + Info.AttachmentsDownloaded.load(), + NiceBytes(Info.AttachmentBytesDownloaded.load()), + TotalDownloads, + NiceBytes(TotalBytesDownloaded), + Info.AttachmentsStored.load(), + NiceBytes(Info.AttachmentBytesStored.load()), + Info.MissingAttachmentCount.load(), + remotestore_impl::GetStats(RemoteStore.GetStats(), TransferWallTimeMS))); return Result; } @@ -3697,6 +4193,9 @@ TEST_CASE_TEMPLATE("project.store.export", /*Force*/ false, /*IgnoreMissingAttachments*/ false, /*CleanOplog*/ false, + EPartialBlockRequestMode::Mixed, + /*HostLatencySec*/ -1.0, + /*CacheLatencySec*/ -1.0, nullptr); CHECK(ImportResult.ErrorCode == 0); @@ -3708,6 +4207,9 @@ TEST_CASE_TEMPLATE("project.store.export", /*Force*/ true, /*IgnoreMissingAttachments*/ false, /*CleanOplog*/ false, + EPartialBlockRequestMode::Mixed, + /*HostLatencySec*/ -1.0, + /*CacheLatencySec*/ -1.0, nullptr); CHECK(ImportForceResult.ErrorCode == 0); @@ -3719,6 +4221,9 @@ TEST_CASE_TEMPLATE("project.store.export", /*Force*/ false, /*IgnoreMissingAttachments*/ false, /*CleanOplog*/ true, + EPartialBlockRequestMode::Mixed, + /*HostLatencySec*/ -1.0, + /*CacheLatencySec*/ -1.0, nullptr); CHECK(ImportCleanResult.ErrorCode == 0); @@ -3730,6 +4235,9 @@ TEST_CASE_TEMPLATE("project.store.export", /*Force*/ true, /*IgnoreMissingAttachments*/ false, /*CleanOplog*/ true, + EPartialBlockRequestMode::Mixed, + /*HostLatencySec*/ -1.0, + /*CacheLatencySec*/ -1.0, nullptr); CHECK(ImportForceCleanResult.ErrorCode == 0); } diff --git a/src/zenremotestore/projectstore/zenremoteprojectstore.cpp b/src/zenremotestore/projectstore/zenremoteprojectstore.cpp index ab82edbef..b4c1156ac 100644 --- a/src/zenremotestore/projectstore/zenremoteprojectstore.cpp +++ b/src/zenremotestore/projectstore/zenremoteprojectstore.cpp @@ -249,7 +249,18 @@ public: return GetKnownBlocksResult{{.ErrorCode = static_cast(HttpResponseCode::NoContent)}}; } - virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) override + virtual GetBlockDescriptionsResult GetBlockDescriptions(std::span BlockHashes) override + { + ZEN_UNUSED(BlockHashes); + return GetBlockDescriptionsResult{Result{.ErrorCode = int(HttpResponseCode::NotFound)}}; + } + + virtual AttachmentExistsInCacheResult AttachmentExistsInCache(std::span RawHashes) override + { + return AttachmentExistsInCacheResult{Result{.ErrorCode = 0}, std::vector(RawHashes.size(), false)}; + } + + virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash, const AttachmentRange& Range) override { std::string LoadRequest = fmt::format("/{}/oplog/{}/{}"sv, m_Project, m_Oplog, RawHash); HttpClient::Response Response = @@ -257,12 +268,7 @@ public: AddStats(Response); LoadAttachmentResult Result = LoadAttachmentResult{ConvertResult(Response)}; - if (!Result.ErrorCode) - { - Result.Bytes = Response.ResponsePayload; - Result.Bytes.MakeOwned(); - } - if (!Result.ErrorCode) + if (Result.ErrorCode) { Result.Reason = fmt::format("Failed fetching oplog attachment from {}/{}/{}/{}. Reason: '{}'", m_ProjectStoreUrl, @@ -271,6 +277,15 @@ public: RawHash, Result.Reason); } + if (!Result.ErrorCode && Range) + { + Result.Bytes = IoBuffer(Response.ResponsePayload, Range.Offset, Range.Bytes); + } + else + { + Result.Bytes = Response.ResponsePayload; + } + Result.Bytes.MakeOwned(); return Result; } diff --git a/src/zenserver/storage/projectstore/httpprojectstore.cpp b/src/zenserver/storage/projectstore/httpprojectstore.cpp index 416e2ed69..2b5474d00 100644 --- a/src/zenserver/storage/projectstore/httpprojectstore.cpp +++ b/src/zenserver/storage/projectstore/httpprojectstore.cpp @@ -244,6 +244,8 @@ namespace { { std::shared_ptr Store; std::string Description; + double HostLatencySec = -1.0; + double CacheLatencySec = -1.0; }; CreateRemoteStoreResult CreateRemoteStore(LoggerRef InLog, @@ -261,6 +263,8 @@ namespace { using namespace std::literals; std::shared_ptr RemoteStore; + double HostLatencySec = -1.0; + double CacheLatencySec = -1.0; if (CbObjectView File = Params["file"sv].AsObjectView(); File) { @@ -495,7 +499,9 @@ namespace { /*Quiet*/ false, /*Unattended*/ false, /*Hidden*/ true, - GetTinyWorkerPool(EWorkloadType::Background)); + GetTinyWorkerPool(EWorkloadType::Background), + HostLatencySec, + CacheLatencySec); } if (!RemoteStore) @@ -503,7 +509,10 @@ namespace { return {nullptr, "Unknown remote store type"}; } - return {std::move(RemoteStore), ""}; + return CreateRemoteStoreResult{.Store = std::move(RemoteStore), + .Description = "", + .HostLatencySec = HostLatencySec, + .CacheLatencySec = CacheLatencySec}; } std::pair ConvertResult(const RemoteProjectStore::Result& Result) @@ -2356,15 +2365,19 @@ HttpProjectService::HandleOplogSaveRequest(HttpRouterRequest& Req) tsl::robin_set Attachments; auto HasAttachment = [this](const IoHash& RawHash) { return m_CidStore.ContainsChunk(RawHash); }; - auto OnNeedBlock = [&AttachmentsLock, &Attachments](const IoHash& BlockHash, const std::vector&& ChunkHashes) { + auto OnNeedBlock = [&AttachmentsLock, &Attachments](ThinChunkBlockDescription&& ThinBlockDescription, + std::vector&& NeededChunkIndexes) { RwLock::ExclusiveLockScope _(AttachmentsLock); - if (BlockHash != IoHash::Zero) + if (ThinBlockDescription.BlockHash != IoHash::Zero) { - Attachments.insert(BlockHash); + Attachments.insert(ThinBlockDescription.BlockHash); } else { - Attachments.insert(ChunkHashes.begin(), ChunkHashes.end()); + for (uint32_t ChunkIndex : NeededChunkIndexes) + { + Attachments.insert(ThinBlockDescription.ChunkRawHashes[ChunkIndex]); + } } }; auto OnNeedAttachment = [&AttachmentsLock, &Attachments](const IoHash& RawHash) { @@ -2663,6 +2676,8 @@ HttpProjectService::HandleRpcRequest(HttpRouterRequest& Req) bool CleanOplog = Params["clean"].AsBool(false); bool BoostWorkerCount = Params["boostworkercount"].AsBool(false); bool BoostWorkerMemory = Params["boostworkermemory"sv].AsBool(false); + EPartialBlockRequestMode PartialBlockRequestMode = + PartialBlockRequestModeFromString(Params["partialblockrequestmode"sv].AsString("true")); CreateRemoteStoreResult RemoteStoreResult = CreateRemoteStore(Log(), Params, @@ -2688,6 +2703,9 @@ HttpProjectService::HandleRpcRequest(HttpRouterRequest& Req) Force, IgnoreMissingAttachments, CleanOplog, + PartialBlockRequestMode, + HostLatencySec = RemoteStoreResult.HostLatencySec, + CacheLatencySec = RemoteStoreResult.CacheLatencySec, BoostWorkerCount](JobContext& Context) { Context.ReportMessage(fmt::format("Loading oplog '{}/{}' from {}", Oplog->GetOuterProjectIdentifier(), @@ -2709,6 +2727,9 @@ HttpProjectService::HandleRpcRequest(HttpRouterRequest& Req) Force, IgnoreMissingAttachments, CleanOplog, + PartialBlockRequestMode, + HostLatencySec, + CacheLatencySec, &Context); auto Response = ConvertResult(Result); ZEN_INFO("LoadOplog: Status: {} '{}'", ToString(Response.first), Response.second); -- cgit v1.2.3