aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2026-02-24 16:10:36 +0100
committerGitHub Enterprise <[email protected]>2026-02-24 16:10:36 +0100
commiteb3079e2ec2969829cbc5b6921575d53df351f0f (patch)
treedb00660bd9132988abf66d43b43ac76d737b3723
parentAdd `zen ui` command (#779) (diff)
downloadzen-eb3079e2ec2969829cbc5b6921575d53df351f0f.tar.xz
zen-eb3079e2ec2969829cbc5b6921575d53df351f0f.zip
use partial blocks for oplog import (#780)
Feature: Add --allow-partial-block-requests to zen oplog-import Improvement: zen oplog-import now uses partial block requests to reduce download size Improvement: Use latency to Cloud Storage host and Zen Cache host when calculating partial block requests
-rw-r--r--CHANGELOG.md3
-rw-r--r--src/zen/cmds/builds_cmd.cpp28
-rw-r--r--src/zen/cmds/projectstore_cmd.cpp28
-rw-r--r--src/zen/cmds/projectstore_cmd.h2
-rw-r--r--src/zenhttp/httpclient.cpp38
-rw-r--r--src/zenhttp/include/zenhttp/httpclient.h9
-rw-r--r--src/zenremotestore/builds/buildstoragecache.cpp8
-rw-r--r--src/zenremotestore/builds/buildstorageoperations.cpp45
-rw-r--r--src/zenremotestore/builds/buildstorageutil.cpp19
-rw-r--r--src/zenremotestore/chunking/chunkblock.cpp79
-rw-r--r--src/zenremotestore/include/zenremotestore/builds/buildstoragecache.h1
-rw-r--r--src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h12
-rw-r--r--src/zenremotestore/include/zenremotestore/builds/buildstorageutil.h4
-rw-r--r--src/zenremotestore/include/zenremotestore/chunking/chunkblock.h25
-rw-r--r--src/zenremotestore/include/zenremotestore/jupiter/jupiterhost.h1
-rw-r--r--src/zenremotestore/include/zenremotestore/operationlogoutput.h5
-rw-r--r--src/zenremotestore/include/zenremotestore/partialblockrequestmode.h20
-rw-r--r--src/zenremotestore/include/zenremotestore/projectstore/buildsremoteprojectstore.h4
-rw-r--r--src/zenremotestore/include/zenremotestore/projectstore/remoteprojectstore.h67
-rw-r--r--src/zenremotestore/jupiter/jupiterhost.cpp8
-rw-r--r--src/zenremotestore/operationlogoutput.cpp2
-rw-r--r--src/zenremotestore/partialblockrequestmode.cpp27
-rw-r--r--src/zenremotestore/projectstore/buildsremoteprojectstore.cpp122
-rw-r--r--src/zenremotestore/projectstore/fileremoteprojectstore.cpp24
-rw-r--r--src/zenremotestore/projectstore/jupiterremoteprojectstore.cpp19
-rw-r--r--src/zenremotestore/projectstore/remoteprojectstore.cpp946
-rw-r--r--src/zenremotestore/projectstore/zenremoteprojectstore.cpp29
-rw-r--r--src/zenserver/storage/projectstore/httpprojectstore.cpp33
28 files changed, 1246 insertions, 362 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 3670e451e..e9d3b79c8 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,5 +1,8 @@
##
+- Feature: Add `--allow-partial-block-requests` to `zen oplog-import`
- Feature: `zen ui` can be used to open dashboards for local instances
+- Improvement: `zen oplog-import` now uses partial block requests to reduce download size
+- Improvement: Use latency to Cloud Storage host and Zen Cache host when calculating partial block requests
- Bugfix: `--plain-progress` style progress bar should now show elapsed time correctly
- Bugfix: Time spent indexing local and remote state during `zen builds download` now show the correct time
diff --git a/src/zen/cmds/builds_cmd.cpp b/src/zen/cmds/builds_cmd.cpp
index 849259013..5254ef3cf 100644
--- a/src/zen/cmds/builds_cmd.cpp
+++ b/src/zen/cmds/builds_cmd.cpp
@@ -2842,13 +2842,16 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
TempPath / "storage");
Result.StorageName = ResolveRes.HostName;
- StorageDescription = fmt::format("Cloud {}{}. SessionId: '{}'. Namespace '{}', Bucket '{}'",
- ResolveRes.HostName,
- (ResolveRes.HostUrl == ResolveRes.HostName) ? "" : fmt::format(" {}", ResolveRes.HostUrl),
- Result.BuildStorageHttp->GetSessionId(),
- m_Namespace,
- m_Bucket);
- ;
+ uint64_t HostLatencyNs = ResolveRes.HostLatencySec >= 0 ? uint64_t(ResolveRes.HostLatencySec * 1000000000.0) : 0;
+
+ StorageDescription = fmt::format("Cloud {}{}. SessionId: '{}'. Namespace '{}', Bucket '{}'. Latency: {}",
+ ResolveRes.HostName,
+ (ResolveRes.HostUrl == ResolveRes.HostName) ? "" : fmt::format(" {}", ResolveRes.HostUrl),
+ Result.BuildStorageHttp->GetSessionId(),
+ m_Namespace,
+ m_Bucket,
+ NiceLatencyNs(HostLatencyNs));
+ Result.BuildStorageLatencySec = ResolveRes.HostLatencySec;
if (!ResolveRes.CacheUrl.empty())
{
@@ -2874,12 +2877,17 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
: GetTinyWorkerPool(EWorkloadType::Background));
Result.CacheName = ResolveRes.CacheName;
+ uint64_t CacheLatencyNs = ResolveRes.CacheLatencySec >= 0 ? uint64_t(ResolveRes.CacheLatencySec * 1000000000.0) : 0;
+
CacheDescription =
- fmt::format("Zen {}{}. SessionId: '{}'",
+ fmt::format("Zen {}{}. SessionId: '{}'. Latency: {}",
ResolveRes.CacheName,
(ResolveRes.CacheUrl == ResolveRes.CacheName) ? "" : fmt::format(" {}", ResolveRes.CacheUrl),
- Result.CacheHttp->GetSessionId());
- ;
+ Result.CacheHttp->GetSessionId(),
+ NiceLatencyNs(CacheLatencyNs));
+
+ Result.CacheLatencySec = ResolveRes.CacheLatencySec;
+
if (!m_Namespace.empty())
{
CacheDescription += fmt::format(". Namespace '{}'", m_Namespace);
diff --git a/src/zen/cmds/projectstore_cmd.cpp b/src/zen/cmds/projectstore_cmd.cpp
index 4de6ad25c..bedab3cfd 100644
--- a/src/zen/cmds/projectstore_cmd.cpp
+++ b/src/zen/cmds/projectstore_cmd.cpp
@@ -1469,6 +1469,20 @@ ImportOplogCommand::ImportOplogCommand()
"Enables both 'boost-worker-count' and 'boost-worker-memory' - may cause computer to be less responsive",
cxxopts::value(m_BoostWorkers),
"<boostworkermemory>");
+ m_Options.add_option(
+ "",
+ "",
+ "allow-partial-block-requests",
+ "Allow request for partial chunk blocks.\n"
+ " false = only full block requests allowed\n"
+ " mixed = multiple partial block ranges requests per block allowed to zen cache, single partial block range "
+ "request per block to host\n"
+ " zencacheonly = multiple partial block ranges requests per block allowed to zen cache, only full block requests "
+ "allowed to host\n"
+ " true = multiple partial block ranges requests per block allowed to zen cache and host\n"
+ "Defaults to 'mixed'.",
+ cxxopts::value(m_AllowPartialBlockRequests),
+ "<allowpartialblockrequests>");
m_Options.parse_positional({"project", "oplog", "gcpath"});
m_Options.positional_help("[<projectid> <oplogid> [<gcpath>]]");
@@ -1513,6 +1527,13 @@ ImportOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg
throw OptionParseException("'--oplog' is required", m_Options.help());
}
+ EPartialBlockRequestMode Mode = PartialBlockRequestModeFromString(m_AllowPartialBlockRequests);
+ if (Mode == EPartialBlockRequestMode::Invalid)
+ {
+ throw OptionParseException(fmt::format("'--allow-partial-block-requests' ('{}') is invalid", m_AllowPartialBlockRequests),
+ m_Options.help());
+ }
+
HttpClient Http(m_HostName);
m_ProjectName = ResolveProject(Http, m_ProjectName);
if (m_ProjectName.empty())
@@ -1649,6 +1670,9 @@ ImportOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg
{
Writer.AddBool("boostworkermemory"sv, true);
}
+
+ Writer.AddString("partialblockrequestmode", m_AllowPartialBlockRequests);
+
if (!m_FileDirectoryPath.empty())
{
Writer.BeginObject("file"sv);
@@ -2571,6 +2595,7 @@ OplogDownloadCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** a
ClientSettings.AssumeHttp2 = ResolveRes.HostAssumeHttp2;
ClientSettings.MaximumInMemoryDownloadSize = m_BoostWorkerMemory ? RemoteStoreOptions::DefaultMaxBlockSize : 1024u * 1024u;
Storage.BuildStorageHttp = std::make_unique<HttpClient>(ResolveRes.HostUrl, ClientSettings);
+ Storage.BuildStorageLatencySec = ResolveRes.HostLatencySec;
BuildStorageCache::Statistics StorageCacheStats;
@@ -2589,7 +2614,8 @@ OplogDownloadCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** a
.RetryCount = 0,
.MaximumInMemoryDownloadSize = m_BoostWorkerMemory ? RemoteStoreOptions::DefaultMaxBlockSize : 1024u * 1024u},
[&AbortFlag]() { return AbortFlag.load(); });
- Storage.CacheName = ResolveRes.CacheName;
+ Storage.CacheName = ResolveRes.CacheName;
+ Storage.CacheLatencySec = ResolveRes.CacheLatencySec;
}
if (!m_Quiet)
diff --git a/src/zen/cmds/projectstore_cmd.h b/src/zen/cmds/projectstore_cmd.h
index e415b41b7..17fd76e9f 100644
--- a/src/zen/cmds/projectstore_cmd.h
+++ b/src/zen/cmds/projectstore_cmd.h
@@ -209,6 +209,8 @@ private:
bool m_BoostWorkerCount = false;
bool m_BoostWorkerMemory = false;
bool m_BoostWorkers = false;
+
+ std::string m_AllowPartialBlockRequests = "mixed";
};
class SnapshotOplogCommand : public ProjectStoreCommand
diff --git a/src/zenhttp/httpclient.cpp b/src/zenhttp/httpclient.cpp
index d3b59df2b..078e27b34 100644
--- a/src/zenhttp/httpclient.cpp
+++ b/src/zenhttp/httpclient.cpp
@@ -21,6 +21,8 @@
#include "clients/httpclientcommon.h"
+#include <numeric>
+
#if ZEN_WITH_TESTS
# include <zencore/scopeguard.h>
# include <zencore/testing.h>
@@ -340,6 +342,42 @@ HttpClient::Authenticate()
return m_Inner->Authenticate();
}
+LatencyTestResult
+MeasureLatency(HttpClient& Client, std::string_view Url)
+{
+ std::vector<double> MeasurementTimes;
+ std::string ErrorMessage;
+
+ for (uint32_t AttemptCount = 0; AttemptCount < 20 && MeasurementTimes.size() < 5; AttemptCount++)
+ {
+ HttpClient::Response MeasureResponse = Client.Get(Url);
+ if (MeasureResponse.IsSuccess())
+ {
+ MeasurementTimes.push_back(MeasureResponse.ElapsedSeconds);
+ Sleep(5);
+ }
+ else
+ {
+ ErrorMessage = MeasureResponse.ErrorMessage(fmt::format("Unable to measure latency using {}", Url));
+ }
+ }
+
+ if (MeasurementTimes.empty())
+ {
+ return {.Success = false, .FailureReason = ErrorMessage};
+ }
+
+ if (MeasurementTimes.size() > 2)
+ {
+ std::sort(MeasurementTimes.begin(), MeasurementTimes.end());
+ MeasurementTimes.pop_back(); // Remove the worst time
+ }
+
+ double AverageLatency = std::accumulate(MeasurementTimes.begin(), MeasurementTimes.end(), 0.0) / MeasurementTimes.size();
+
+ return {.Success = true, .LatencySeconds = AverageLatency};
+}
+
//////////////////////////////////////////////////////////////////////////
#if ZEN_WITH_TESTS
diff --git a/src/zenhttp/include/zenhttp/httpclient.h b/src/zenhttp/include/zenhttp/httpclient.h
index 9a9b74d72..7a129a98c 100644
--- a/src/zenhttp/include/zenhttp/httpclient.h
+++ b/src/zenhttp/include/zenhttp/httpclient.h
@@ -260,6 +260,15 @@ private:
const HttpClientSettings m_ConnectionSettings;
};
+struct LatencyTestResult
+{
+ bool Success = false;
+ std::string FailureReason;
+ double LatencySeconds = -1.0;
+};
+
+LatencyTestResult MeasureLatency(HttpClient& Client, std::string_view Url);
+
void httpclient_forcelink(); // internal
} // namespace zen
diff --git a/src/zenremotestore/builds/buildstoragecache.cpp b/src/zenremotestore/builds/buildstoragecache.cpp
index 07fcd62ba..faa85f81b 100644
--- a/src/zenremotestore/builds/buildstoragecache.cpp
+++ b/src/zenremotestore/builds/buildstoragecache.cpp
@@ -474,7 +474,13 @@ TestZenCacheEndpoint(std::string_view BaseUrl, const bool AssumeHttp2, const boo
HttpClient::Response TestResponse = TestHttpClient.Get("/status/builds");
if (TestResponse.IsSuccess())
{
- return {.Success = true};
+ LatencyTestResult LatencyResult = MeasureLatency(TestHttpClient, "/health");
+
+ if (!LatencyResult.Success)
+ {
+ return {.Success = false, .FailureReason = LatencyResult.FailureReason};
+ }
+ return {.Success = true, .LatencySeconds = LatencyResult.LatencySeconds};
}
return {.Success = false, .FailureReason = TestResponse.ErrorMessage("")};
};
diff --git a/src/zenremotestore/builds/buildstorageoperations.cpp b/src/zenremotestore/builds/buildstorageoperations.cpp
index 4f1b07c37..5219e86d8 100644
--- a/src/zenremotestore/builds/buildstorageoperations.cpp
+++ b/src/zenremotestore/builds/buildstorageoperations.cpp
@@ -484,24 +484,6 @@ private:
uint64_t FilteredPerSecond = 0;
};
-EPartialBlockRequestMode
-PartialBlockRequestModeFromString(const std::string_view ModeString)
-{
- switch (HashStringAsLowerDjb2(ModeString))
- {
- case HashStringDjb2("false"):
- return EPartialBlockRequestMode::Off;
- case HashStringDjb2("zencacheonly"):
- return EPartialBlockRequestMode::ZenCacheOnly;
- case HashStringDjb2("mixed"):
- return EPartialBlockRequestMode::Mixed;
- case HashStringDjb2("true"):
- return EPartialBlockRequestMode::All;
- default:
- return EPartialBlockRequestMode::Invalid;
- }
-}
-
std::filesystem::path
ZenStateFilePath(const std::filesystem::path& ZenFolderPath)
{
@@ -903,7 +885,10 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
{
ChunkBlockAnalyser BlockAnalyser(m_LogOutput,
m_BlockDescriptions,
- ChunkBlockAnalyser::Options{.IsQuiet = m_Options.IsQuiet, .IsVerbose = m_Options.IsVerbose});
+ ChunkBlockAnalyser::Options{.IsQuiet = m_Options.IsQuiet,
+ .IsVerbose = m_Options.IsVerbose,
+ .HostLatencySec = m_Storage.BuildStorageLatencySec,
+ .HostHighSpeedLatencySec = m_Storage.CacheLatencySec});
std::vector<ChunkBlockAnalyser::NeededBlock> NeededBlocks = BlockAnalyser.GetNeeded(
m_RemoteLookup.ChunkHashToChunkIndex,
@@ -1034,25 +1019,29 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
{
BlockPartialDownloadModes.resize(m_BlockDescriptions.size(), ChunkBlockAnalyser::EPartialBlockDownloadMode::Off);
}
- else if (m_Options.PartialBlockRequestMode == EPartialBlockRequestMode::All)
- {
- BlockPartialDownloadModes.resize(m_BlockDescriptions.size(), ChunkBlockAnalyser::EPartialBlockDownloadMode::On);
- }
else
{
BlockPartialDownloadModes.reserve(m_BlockDescriptions.size());
for (uint32_t BlockIndex = 0; BlockIndex < m_BlockDescriptions.size(); BlockIndex++)
{
const bool BlockExistInCache = ExistsResult.ExistingBlobs.contains(m_BlockDescriptions[BlockIndex].BlockHash);
- if (m_Options.PartialBlockRequestMode == EPartialBlockRequestMode::ZenCacheOnly)
+ if (m_Options.PartialBlockRequestMode == EPartialBlockRequestMode::All)
+ {
+ BlockPartialDownloadModes.push_back(BlockExistInCache
+ ? ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRangeHighSpeed
+ : ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRange);
+ }
+ else if (m_Options.PartialBlockRequestMode == EPartialBlockRequestMode::ZenCacheOnly)
{
- BlockPartialDownloadModes.push_back(BlockExistInCache ? ChunkBlockAnalyser::EPartialBlockDownloadMode::On
- : ChunkBlockAnalyser::EPartialBlockDownloadMode::Off);
+ BlockPartialDownloadModes.push_back(BlockExistInCache
+ ? ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRangeHighSpeed
+ : ChunkBlockAnalyser::EPartialBlockDownloadMode::Off);
}
else if (m_Options.PartialBlockRequestMode == EPartialBlockRequestMode::Mixed)
{
- BlockPartialDownloadModes.push_back(BlockExistInCache ? ChunkBlockAnalyser::EPartialBlockDownloadMode::On
- : ChunkBlockAnalyser::EPartialBlockDownloadMode::SingleRange);
+ BlockPartialDownloadModes.push_back(BlockExistInCache
+ ? ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRangeHighSpeed
+ : ChunkBlockAnalyser::EPartialBlockDownloadMode::SingleRange);
}
}
}
diff --git a/src/zenremotestore/builds/buildstorageutil.cpp b/src/zenremotestore/builds/buildstorageutil.cpp
index 36b45e800..b249d7d52 100644
--- a/src/zenremotestore/builds/buildstorageutil.cpp
+++ b/src/zenremotestore/builds/buildstorageutil.cpp
@@ -63,11 +63,13 @@ ResolveBuildStorage(OperationLogOutput& Output,
std::string HostUrl;
std::string HostName;
+ double HostLatencySec = -1.0;
std::string CacheUrl;
std::string CacheName;
bool HostAssumeHttp2 = ClientSettings.AssumeHttp2;
bool CacheAssumeHttp2 = ClientSettings.AssumeHttp2;
+ double CacheLatencySec = -1.0;
JupiterServerDiscovery DiscoveryResponse;
const std::string_view DiscoveryHost = Host.empty() ? OverrideHost : Host;
@@ -98,8 +100,9 @@ ResolveBuildStorage(OperationLogOutput& Output,
{
ZEN_OPERATION_LOG_INFO(Output, "Server endpoint at '{}/api/v1/status/servers' succeeded", OverrideHost);
}
- HostUrl = OverrideHost;
- HostName = GetHostNameFromUrl(OverrideHost);
+ HostUrl = OverrideHost;
+ HostName = GetHostNameFromUrl(OverrideHost);
+ HostLatencySec = TestResult.LatencySeconds;
}
else
{
@@ -137,6 +140,7 @@ ResolveBuildStorage(OperationLogOutput& Output,
HostUrl = ServerEndpoint.BaseUrl;
HostAssumeHttp2 = ServerEndpoint.AssumeHttp2;
HostName = ServerEndpoint.Name;
+ HostLatencySec = TestResult.LatencySeconds;
break;
}
else
@@ -183,6 +187,7 @@ ResolveBuildStorage(OperationLogOutput& Output,
CacheUrl = CacheEndpoint.BaseUrl;
CacheAssumeHttp2 = CacheEndpoint.AssumeHttp2;
CacheName = CacheEndpoint.Name;
+ CacheLatencySec = TestResult.LatencySeconds;
break;
}
}
@@ -204,6 +209,7 @@ ResolveBuildStorage(OperationLogOutput& Output,
CacheUrl = ZenServerLocalHostUrl;
CacheAssumeHttp2 = false;
CacheName = "localhost";
+ CacheLatencySec = TestResult.LatencySeconds;
}
}
});
@@ -219,8 +225,9 @@ ResolveBuildStorage(OperationLogOutput& Output,
if (ZenCacheEndpointTestResult TestResult = TestZenCacheEndpoint(ZenCacheHost, /*AssumeHttp2*/ false, ClientSettings.Verbose);
TestResult.Success)
{
- CacheUrl = ZenCacheHost;
- CacheName = GetHostNameFromUrl(ZenCacheHost);
+ CacheUrl = ZenCacheHost;
+ CacheName = GetHostNameFromUrl(ZenCacheHost);
+ CacheLatencySec = TestResult.LatencySeconds;
}
else
{
@@ -231,10 +238,12 @@ ResolveBuildStorage(OperationLogOutput& Output,
return BuildStorageResolveResult{.HostUrl = HostUrl,
.HostName = HostName,
.HostAssumeHttp2 = HostAssumeHttp2,
+ .HostLatencySec = HostLatencySec,
.CacheUrl = CacheUrl,
.CacheName = CacheName,
- .CacheAssumeHttp2 = CacheAssumeHttp2};
+ .CacheAssumeHttp2 = CacheAssumeHttp2,
+ .CacheLatencySec = CacheLatencySec};
}
std::vector<ChunkBlockDescription>
diff --git a/src/zenremotestore/chunking/chunkblock.cpp b/src/zenremotestore/chunking/chunkblock.cpp
index 06cedae3f..d203e0292 100644
--- a/src/zenremotestore/chunking/chunkblock.cpp
+++ b/src/zenremotestore/chunking/chunkblock.cpp
@@ -597,7 +597,7 @@ ChunkBlockAnalyser::CalculatePartialBlockDownloads(std::span<const NeededBlock>
if (MaybeBlockRanges.has_value())
{
- const std::vector<BlockRangeDescriptor>& BlockRanges = MaybeBlockRanges.value();
+ std::vector<BlockRangeDescriptor> BlockRanges = MaybeBlockRanges.value();
ZEN_ASSERT(!BlockRanges.empty());
uint64_t RequestedSize =
@@ -606,12 +606,54 @@ ChunkBlockAnalyser::CalculatePartialBlockDownloads(std::span<const NeededBlock>
uint64_t(0),
[](uint64_t Current, const BlockRangeDescriptor& Range) { return Current + Range.RangeLength; });
- if ((PartialBlockDownloadMode != EPartialBlockDownloadMode::Exact) && ((RequestedSize * 100) / TotalBlockSize) >= 200)
+ if (PartialBlockDownloadMode != EPartialBlockDownloadMode::Exact && BlockRanges.size() > 1)
+ {
+ // TODO: Once we have support in our http client to request multiple ranges in one request this
+ // logic would need to change as the per-request overhead would go away
+
+ const double LatencySec = PartialBlockDownloadMode == EPartialBlockDownloadMode::MultiRangeHighSpeed
+ ? m_Options.HostHighSpeedLatencySec
+ : m_Options.HostLatencySec;
+ if (LatencySec > 0)
+ {
+ const uint64_t BytesPerSec = PartialBlockDownloadMode == EPartialBlockDownloadMode::MultiRangeHighSpeed
+ ? m_Options.HostHighSpeedBytesPerSec
+ : m_Options.HostSpeedBytesPerSec;
+
+ const double ExtraRequestTimeSec = (BlockRanges.size() - 1) * LatencySec;
+ const uint64_t ExtraRequestTimeBytes = uint64_t(ExtraRequestTimeSec * BytesPerSec);
+
+ const uint64_t FullRangeSize =
+ BlockRanges.back().RangeStart + BlockRanges.back().RangeLength - BlockRanges.front().RangeStart;
+
+ if (ExtraRequestTimeBytes + RequestedSize >= FullRangeSize)
+ {
+ BlockRanges = std::vector<BlockRangeDescriptor>{MergeBlockRanges(BlockRanges)};
+
+ if (m_Options.IsVerbose)
+ {
+ ZEN_OPERATION_LOG_INFO(m_LogOutput,
+ "Merging {} chunks ({}) from block {} ({}) to single request (extra bytes {})",
+ NeededBlock.ChunkIndexes.size(),
+ NiceBytes(RequestedSize),
+ BlockDescription.BlockHash,
+ NiceBytes(TotalBlockSize),
+ NiceBytes(BlockRanges.front().RangeLength - RequestedSize));
+ }
+
+ RequestedSize = BlockRanges.front().RangeLength;
+ }
+ }
+ }
+
+ if ((PartialBlockDownloadMode != EPartialBlockDownloadMode::Exact) &&
+ ((TotalBlockSize - RequestedSize) < (512u * 1024u)))
{
if (m_Options.IsVerbose)
{
ZEN_OPERATION_LOG_INFO(m_LogOutput,
- "Requesting {} chunks ({}) from block {} ({}) using full block request (extra bytes {})",
+ "Requesting {} chunks ({}) from block {} ({}) using full block request due to small "
+ "total slack (extra bytes {})",
NeededBlock.ChunkIndexes.size(),
NiceBytes(RequestedSize),
BlockDescription.BlockHash,
@@ -624,19 +666,16 @@ ChunkBlockAnalyser::CalculatePartialBlockDownloads(std::span<const NeededBlock>
{
Result.BlockRanges.insert(Result.BlockRanges.end(), BlockRanges.begin(), BlockRanges.end());
- if (RequestedSize > TotalWantedChunksSize)
+ if (m_Options.IsVerbose)
{
- if (m_Options.IsVerbose)
- {
- ZEN_OPERATION_LOG_INFO(m_LogOutput,
- "Requesting {} chunks ({}) from block {} ({}) using {} requests (extra bytes {})",
- NeededBlock.ChunkIndexes.size(),
- NiceBytes(RequestedSize),
- BlockDescription.BlockHash,
- NiceBytes(TotalBlockSize),
- BlockRanges.size(),
- NiceBytes(RequestedSize - TotalWantedChunksSize));
- }
+ ZEN_OPERATION_LOG_INFO(m_LogOutput,
+ "Requesting {} chunks ({}) from block {} ({}) using {} requests (extra bytes {})",
+ NeededBlock.ChunkIndexes.size(),
+ NiceBytes(RequestedSize),
+ BlockDescription.BlockHash,
+ NiceBytes(TotalBlockSize),
+ BlockRanges.size(),
+ NiceBytes(RequestedSize - TotalWantedChunksSize));
}
}
}
@@ -786,7 +825,7 @@ ChunkBlockAnalyser::CollapseBlockRanges(const uint64_t AlwaysAcceptableGap, std:
};
uint64_t
-ChunkBlockAnalyser::CalculateNextGap(std::span<const BlockRangeDescriptor> BlockRanges)
+ChunkBlockAnalyser::CalculateNextGap(const uint64_t AlwaysAcceptableGap, std::span<const BlockRangeDescriptor> BlockRanges)
{
ZEN_ASSERT(BlockRanges.size() > 1);
uint64_t AcceptableGap = (uint64_t)-1;
@@ -798,7 +837,7 @@ ChunkBlockAnalyser::CalculateNextGap(std::span<const BlockRangeDescriptor> Block
const uint64_t Gap = NextRange.RangeStart - (Range.RangeStart + Range.RangeLength);
AcceptableGap = Min(Gap, AcceptableGap);
}
- AcceptableGap = RoundUp(AcceptableGap, 16u * 1024u);
+ AcceptableGap = RoundUp(AcceptableGap, AlwaysAcceptableGap);
return AcceptableGap;
};
@@ -949,10 +988,12 @@ ChunkBlockAnalyser::CalculateBlockRanges(uint32_t BlockIndex,
return MakeOptionalBlockRangeVector(TotalBlockSize, MergedRange);
}
- std::vector<BlockRangeDescriptor> CollapsedBlockRanges = CollapseBlockRanges(16u * 1024u, BlockRanges);
+ const uint64_t AlwaysAcceptableGap = 4u * 1024u;
+
+ std::vector<BlockRangeDescriptor> CollapsedBlockRanges = CollapseBlockRanges(AlwaysAcceptableGap, BlockRanges);
while (GetBlockRangeLimitForRange(ForceMergeLimits, TotalBlockSize, CollapsedBlockRanges))
{
- CollapsedBlockRanges = CollapseBlockRanges(CalculateNextGap(CollapsedBlockRanges), CollapsedBlockRanges);
+ CollapsedBlockRanges = CollapseBlockRanges(CalculateNextGap(AlwaysAcceptableGap, CollapsedBlockRanges), CollapsedBlockRanges);
}
const std::uint64_t WantedCollapsedSize =
diff --git a/src/zenremotestore/include/zenremotestore/builds/buildstoragecache.h b/src/zenremotestore/include/zenremotestore/builds/buildstoragecache.h
index bb5b1c5f4..f25ce5b5e 100644
--- a/src/zenremotestore/include/zenremotestore/builds/buildstoragecache.h
+++ b/src/zenremotestore/include/zenremotestore/builds/buildstoragecache.h
@@ -65,6 +65,7 @@ struct ZenCacheEndpointTestResult
{
bool Success = false;
std::string FailureReason;
+ double LatencySeconds = -1.0;
};
ZenCacheEndpointTestResult TestZenCacheEndpoint(std::string_view BaseUrl, const bool AssumeHttp2, const bool HttpVerbose);
diff --git a/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h b/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h
index 6800444e0..31733569e 100644
--- a/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h
+++ b/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h
@@ -9,6 +9,7 @@
#include <zenremotestore/builds/buildstoragecache.h>
#include <zenremotestore/chunking/chunkblock.h>
#include <zenremotestore/chunking/chunkedcontent.h>
+#include <zenremotestore/partialblockrequestmode.h>
#include <zenutil/bufferedwritefilecache.h>
#include <atomic>
@@ -109,17 +110,6 @@ struct RebuildFolderStateStatistics
uint64_t FinalizeTreeElapsedWallTimeUs = 0;
};
-enum EPartialBlockRequestMode
-{
- Off,
- ZenCacheOnly,
- Mixed,
- All,
- Invalid
-};
-
-EPartialBlockRequestMode PartialBlockRequestModeFromString(const std::string_view ModeString);
-
std::filesystem::path ZenStateFilePath(const std::filesystem::path& ZenFolderPath);
std::filesystem::path ZenTempFolderPath(const std::filesystem::path& ZenFolderPath);
diff --git a/src/zenremotestore/include/zenremotestore/builds/buildstorageutil.h b/src/zenremotestore/include/zenremotestore/builds/buildstorageutil.h
index ab3037c89..4b85d8f1e 100644
--- a/src/zenremotestore/include/zenremotestore/builds/buildstorageutil.h
+++ b/src/zenremotestore/include/zenremotestore/builds/buildstorageutil.h
@@ -17,10 +17,12 @@ struct BuildStorageResolveResult
std::string HostUrl;
std::string HostName;
bool HostAssumeHttp2 = false;
+ double HostLatencySec = -1.0;
std::string CacheUrl;
std::string CacheName;
bool CacheAssumeHttp2 = false;
+ double CacheLatencySec = -1.0;
};
enum class ZenCacheResolveMode
@@ -54,9 +56,11 @@ struct StorageInstance
std::unique_ptr<HttpClient> BuildStorageHttp;
std::unique_ptr<BuildStorageBase> BuildStorage;
std::string StorageName;
+ double BuildStorageLatencySec = -1.0;
std::unique_ptr<HttpClient> CacheHttp;
std::unique_ptr<BuildStorageCache> BuildCacheStorage;
std::string CacheName;
+ double CacheLatencySec = -1.0;
};
} // namespace zen
diff --git a/src/zenremotestore/include/zenremotestore/chunking/chunkblock.h b/src/zenremotestore/include/zenremotestore/chunking/chunkblock.h
index 57710fcf5..5a17ef79c 100644
--- a/src/zenremotestore/include/zenremotestore/chunking/chunkblock.h
+++ b/src/zenremotestore/include/zenremotestore/chunking/chunkblock.h
@@ -82,8 +82,12 @@ class ChunkBlockAnalyser
public:
struct Options
{
- bool IsQuiet = false;
- bool IsVerbose = false;
+ bool IsQuiet = false;
+ bool IsVerbose = false;
+ double HostLatencySec = -1.0;
+ double HostHighSpeedLatencySec = -1.0;
+ uint64_t HostSpeedBytesPerSec = (1u * 1024u * 1024u * 1024u) / 8u; // 1GBit
+ uint64_t HostHighSpeedBytesPerSec = (2u * 1024u * 1024u * 1024u) / 8u; // 2GBit
};
ChunkBlockAnalyser(OperationLogOutput& LogOutput, std::span<const ChunkBlockDescription> BlockDescriptions, const Options& Options);
@@ -110,7 +114,8 @@ public:
{
Off,
SingleRange,
- On,
+ MultiRange,
+ MultiRangeHighSpeed,
Exact
};
@@ -130,14 +135,14 @@ private:
uint16_t MaxRangeCount;
};
- static constexpr uint16_t FullBlockRangePercentLimit = 95;
+ static constexpr uint16_t FullBlockRangePercentLimit = 98;
static constexpr BlockRangeLimit ForceMergeLimits[] = {{.SizePercent = FullBlockRangePercentLimit, .MaxRangeCount = 1},
- {.SizePercent = 90, .MaxRangeCount = 2},
- {.SizePercent = 85, .MaxRangeCount = 8},
- {.SizePercent = 80, .MaxRangeCount = 16},
- {.SizePercent = 75, .MaxRangeCount = 32},
- {.SizePercent = 70, .MaxRangeCount = 48},
+ {.SizePercent = 90, .MaxRangeCount = 4},
+ {.SizePercent = 85, .MaxRangeCount = 16},
+ {.SizePercent = 80, .MaxRangeCount = 32},
+ {.SizePercent = 75, .MaxRangeCount = 48},
+ {.SizePercent = 70, .MaxRangeCount = 64},
{.SizePercent = 4, .MaxRangeCount = 82},
{.SizePercent = 0, .MaxRangeCount = 96}};
@@ -149,7 +154,7 @@ private:
std::span<const BlockRangeDescriptor> Ranges);
std::vector<BlockRangeDescriptor> CollapseBlockRanges(const uint64_t AlwaysAcceptableGap,
std::span<const BlockRangeDescriptor> BlockRanges);
- uint64_t CalculateNextGap(std::span<const BlockRangeDescriptor> BlockRanges);
+ uint64_t CalculateNextGap(const uint64_t AlwaysAcceptableGap, std::span<const BlockRangeDescriptor> BlockRanges);
std::optional<std::vector<BlockRangeDescriptor>> CalculateBlockRanges(uint32_t BlockIndex,
const ChunkBlockDescription& BlockDescription,
std::span<const uint32_t> BlockChunkIndexNeeded,
diff --git a/src/zenremotestore/include/zenremotestore/jupiter/jupiterhost.h b/src/zenremotestore/include/zenremotestore/jupiter/jupiterhost.h
index 432496bc1..7bbf40dfa 100644
--- a/src/zenremotestore/include/zenremotestore/jupiter/jupiterhost.h
+++ b/src/zenremotestore/include/zenremotestore/jupiter/jupiterhost.h
@@ -28,6 +28,7 @@ struct JupiterEndpointTestResult
{
bool Success = false;
std::string FailureReason;
+ double LatencySeconds = -1.0;
};
JupiterEndpointTestResult TestJupiterEndpoint(std::string_view BaseUrl, const bool AssumeHttp2, const bool HttpVerbose);
diff --git a/src/zenremotestore/include/zenremotestore/operationlogoutput.h b/src/zenremotestore/include/zenremotestore/operationlogoutput.h
index 9693e69cf..6f10ab156 100644
--- a/src/zenremotestore/include/zenremotestore/operationlogoutput.h
+++ b/src/zenremotestore/include/zenremotestore/operationlogoutput.h
@@ -3,6 +3,7 @@
#pragma once
#include <zencore/fmtutils.h>
+#include <zencore/logbase.h>
namespace zen {
@@ -57,9 +58,7 @@ public:
virtual ProgressBar* CreateProgressBar(std::string_view InSubTask) = 0;
};
-struct LoggerRef;
-
-OperationLogOutput* CreateStandardLogOutput(LoggerRef& Log);
+OperationLogOutput* CreateStandardLogOutput(LoggerRef Log);
#define ZEN_OPERATION_LOG(OutputTarget, InLevel, fmtstr, ...) \
do \
diff --git a/src/zenremotestore/include/zenremotestore/partialblockrequestmode.h b/src/zenremotestore/include/zenremotestore/partialblockrequestmode.h
new file mode 100644
index 000000000..54adea2b2
--- /dev/null
+++ b/src/zenremotestore/include/zenremotestore/partialblockrequestmode.h
@@ -0,0 +1,20 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <string_view>
+
+namespace zen {
+
+enum EPartialBlockRequestMode
+{
+ Off,
+ ZenCacheOnly,
+ Mixed,
+ All,
+ Invalid
+};
+
+EPartialBlockRequestMode PartialBlockRequestModeFromString(const std::string_view ModeString);
+
+} // namespace zen
diff --git a/src/zenremotestore/include/zenremotestore/projectstore/buildsremoteprojectstore.h b/src/zenremotestore/include/zenremotestore/projectstore/buildsremoteprojectstore.h
index e8b7c15c0..66dfcc62d 100644
--- a/src/zenremotestore/include/zenremotestore/projectstore/buildsremoteprojectstore.h
+++ b/src/zenremotestore/include/zenremotestore/projectstore/buildsremoteprojectstore.h
@@ -34,6 +34,8 @@ std::shared_ptr<RemoteProjectStore> CreateJupiterBuildsRemoteStore(LoggerRef
bool Quiet,
bool Unattended,
bool Hidden,
- WorkerThreadPool& CacheBackgroundWorkerPool);
+ WorkerThreadPool& CacheBackgroundWorkerPool,
+ double& OutHostLatencySec,
+ double& OutCacheLatencySec);
} // namespace zen
diff --git a/src/zenremotestore/include/zenremotestore/projectstore/remoteprojectstore.h b/src/zenremotestore/include/zenremotestore/projectstore/remoteprojectstore.h
index 008f94351..152c02ee2 100644
--- a/src/zenremotestore/include/zenremotestore/projectstore/remoteprojectstore.h
+++ b/src/zenremotestore/include/zenremotestore/projectstore/remoteprojectstore.h
@@ -6,6 +6,7 @@
#include <zenstore/projectstore.h>
#include <zenremotestore/chunking/chunkblock.h>
+#include <zenremotestore/partialblockrequestmode.h>
#include <unordered_set>
@@ -73,6 +74,16 @@ public:
std::vector<ChunkBlockDescription> Blocks;
};
+ struct GetBlockDescriptionsResult : public Result
+ {
+ std::vector<ChunkBlockDescription> Blocks;
+ };
+
+ struct AttachmentExistsInCacheResult : public Result
+ {
+ std::vector<bool> HasBody;
+ };
+
struct RemoteStoreInfo
{
bool CreateBlocks;
@@ -111,10 +122,20 @@ public:
virtual FinalizeResult FinalizeContainer(const IoHash& RawHash) = 0;
virtual SaveAttachmentsResult SaveAttachments(const std::vector<SharedBuffer>& Payloads) = 0;
- virtual LoadContainerResult LoadContainer() = 0;
- virtual GetKnownBlocksResult GetKnownBlocks() = 0;
- virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) = 0;
- virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes) = 0;
+ virtual LoadContainerResult LoadContainer() = 0;
+ virtual GetKnownBlocksResult GetKnownBlocks() = 0;
+ virtual GetBlockDescriptionsResult GetBlockDescriptions(std::span<const IoHash> BlockHashes) = 0;
+ virtual AttachmentExistsInCacheResult AttachmentExistsInCache(std::span<const IoHash> RawHashes) = 0;
+
+ struct AttachmentRange
+ {
+ uint64_t Offset = 0;
+ uint64_t Bytes = (uint64_t)-1;
+
+ inline operator bool() const { return Offset != 0 || Bytes != (uint64_t)-1; }
+ };
+ virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash, const AttachmentRange& Range) = 0;
+ virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes) = 0;
virtual void Flush() = 0;
};
@@ -153,14 +174,15 @@ RemoteProjectStore::LoadContainerResult BuildContainer(
class JobContext;
-RemoteProjectStore::Result SaveOplogContainer(ProjectStore::Oplog& Oplog,
- const CbObject& ContainerObject,
- const std::function<void(std::span<IoHash> RawHashes)>& OnReferencedAttachments,
- const std::function<bool(const IoHash& RawHash)>& HasAttachment,
- const std::function<void(const IoHash& BlockHash, std::vector<IoHash>&& Chunks)>& OnNeedBlock,
- const std::function<void(const IoHash& RawHash)>& OnNeedAttachment,
- const std::function<void(const ChunkedInfo& Chunked)>& OnChunkedAttachment,
- JobContext* OptionalContext);
+RemoteProjectStore::Result SaveOplogContainer(
+ ProjectStore::Oplog& Oplog,
+ const CbObject& ContainerObject,
+ const std::function<void(std::span<IoHash> RawHashes)>& OnReferencedAttachments,
+ const std::function<bool(const IoHash& RawHash)>& HasAttachment,
+ const std::function<void(ThinChunkBlockDescription&& ThinBlockDescription, std::vector<uint32_t>&& NeededChunkIndexes)>& OnNeedBlock,
+ const std::function<void(const IoHash& RawHash)>& OnNeedAttachment,
+ const std::function<void(const ChunkedInfo& Chunked)>& OnChunkedAttachment,
+ JobContext* OptionalContext);
RemoteProjectStore::Result SaveOplog(CidStore& ChunkStore,
RemoteProjectStore& RemoteStore,
@@ -177,15 +199,18 @@ RemoteProjectStore::Result SaveOplog(CidStore& ChunkStore,
bool IgnoreMissingAttachments,
JobContext* OptionalContext);
-RemoteProjectStore::Result LoadOplog(CidStore& ChunkStore,
- RemoteProjectStore& RemoteStore,
- ProjectStore::Oplog& Oplog,
- WorkerThreadPool& NetworkWorkerPool,
- WorkerThreadPool& WorkerPool,
- bool ForceDownload,
- bool IgnoreMissingAttachments,
- bool CleanOplog,
- JobContext* OptionalContext);
+RemoteProjectStore::Result LoadOplog(CidStore& ChunkStore,
+ RemoteProjectStore& RemoteStore,
+ ProjectStore::Oplog& Oplog,
+ WorkerThreadPool& NetworkWorkerPool,
+ WorkerThreadPool& WorkerPool,
+ bool ForceDownload,
+ bool IgnoreMissingAttachments,
+ bool CleanOplog,
+ EPartialBlockRequestMode PartialBlockRequestMode,
+ double HostLatencySec,
+ double CacheLatencySec,
+ JobContext* OptionalContext);
std::vector<IoHash> GetBlockHashesFromOplog(CbObjectView ContainerObject);
std::vector<ThinChunkBlockDescription> GetBlocksFromOplog(CbObjectView ContainerObject, std::span<const IoHash> IncludeBlockHashes);
diff --git a/src/zenremotestore/jupiter/jupiterhost.cpp b/src/zenremotestore/jupiter/jupiterhost.cpp
index 7706f00c2..2583cfc84 100644
--- a/src/zenremotestore/jupiter/jupiterhost.cpp
+++ b/src/zenremotestore/jupiter/jupiterhost.cpp
@@ -59,7 +59,13 @@ TestJupiterEndpoint(std::string_view BaseUrl, const bool AssumeHttp2, const bool
HttpClient::Response TestResponse = TestHttpClient.Get("/health/live");
if (TestResponse.IsSuccess())
{
- return {.Success = true};
+ LatencyTestResult LatencyResult = MeasureLatency(TestHttpClient, "/health/ready");
+
+ if (!LatencyResult.Success)
+ {
+ return {.Success = false, .FailureReason = LatencyResult.FailureReason};
+ }
+ return {.Success = true, .LatencySeconds = LatencyResult.LatencySeconds};
}
return {.Success = false, .FailureReason = TestResponse.ErrorMessage("")};
}
diff --git a/src/zenremotestore/operationlogoutput.cpp b/src/zenremotestore/operationlogoutput.cpp
index 0837ed716..7ed93c947 100644
--- a/src/zenremotestore/operationlogoutput.cpp
+++ b/src/zenremotestore/operationlogoutput.cpp
@@ -95,7 +95,7 @@ StandardLogOutputProgressBar::Finish()
}
OperationLogOutput*
-CreateStandardLogOutput(LoggerRef& Log)
+CreateStandardLogOutput(LoggerRef Log)
{
return new StandardLogOutput(Log);
}
diff --git a/src/zenremotestore/partialblockrequestmode.cpp b/src/zenremotestore/partialblockrequestmode.cpp
new file mode 100644
index 000000000..b3edf515b
--- /dev/null
+++ b/src/zenremotestore/partialblockrequestmode.cpp
@@ -0,0 +1,27 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <zenremotestore/partialblockrequestmode.h>
+
+#include <zencore/string.h>
+
+namespace zen {
+
+EPartialBlockRequestMode
+PartialBlockRequestModeFromString(const std::string_view ModeString)
+{
+ switch (HashStringAsLowerDjb2(ModeString))
+ {
+ case HashStringDjb2("false"):
+ return EPartialBlockRequestMode::Off;
+ case HashStringDjb2("zencacheonly"):
+ return EPartialBlockRequestMode::ZenCacheOnly;
+ case HashStringDjb2("mixed"):
+ return EPartialBlockRequestMode::Mixed;
+ case HashStringDjb2("true"):
+ return EPartialBlockRequestMode::All;
+ default:
+ return EPartialBlockRequestMode::Invalid;
+ }
+}
+
+} // namespace zen
diff --git a/src/zenremotestore/projectstore/buildsremoteprojectstore.cpp b/src/zenremotestore/projectstore/buildsremoteprojectstore.cpp
index a8e883dde..c42373e4d 100644
--- a/src/zenremotestore/projectstore/buildsremoteprojectstore.cpp
+++ b/src/zenremotestore/projectstore/buildsremoteprojectstore.cpp
@@ -441,7 +441,7 @@ public:
catch (const HttpClientError& Ex)
{
Result.ErrorCode = MakeErrorCode(Ex);
- Result.Reason = fmt::format("Failed listing know blocks for {}/{}/{}/{}. Reason: '{}'",
+ Result.Reason = fmt::format("Failed listing known blocks for {}/{}/{}/{}. Reason: '{}'",
m_BuildStorageHttp.GetBaseUri(),
m_Namespace,
m_Bucket,
@@ -451,7 +451,7 @@ public:
catch (const std::exception& Ex)
{
Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
- Result.Reason = fmt::format("Failed listing know blocks for {}/{}/{}/{}. Reason: '{}'",
+ Result.Reason = fmt::format("Failed listing known blocks for {}/{}/{}/{}. Reason: '{}'",
m_BuildStorageHttp.GetBaseUri(),
m_Namespace,
m_Bucket,
@@ -462,7 +462,94 @@ public:
return Result;
}
- virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) override
+ virtual GetBlockDescriptionsResult GetBlockDescriptions(std::span<const IoHash> BlockHashes) override
+ {
+ std::unique_ptr<OperationLogOutput> Output(CreateStandardLogOutput(Log()));
+
+ ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero);
+
+ GetBlockDescriptionsResult Result;
+ Stopwatch Timer;
+ auto _ = MakeGuard([&Timer, &Result]() { Result.ElapsedSeconds = Timer.GetElapsedTimeUs() / 1000000.0; });
+
+ try
+ {
+ Result.Blocks = zen::GetBlockDescriptions(*Output,
+ *m_BuildStorage,
+ m_BuildCacheStorage.get(),
+ m_BuildId,
+ m_OplogBuildPartId,
+ BlockHashes,
+ /*AttemptFallback*/ false,
+ /*IsQuiet*/ false,
+ /*IsVerbose)*/ false);
+ }
+ catch (const HttpClientError& Ex)
+ {
+ Result.ErrorCode = MakeErrorCode(Ex);
+ Result.Reason = fmt::format("Failed listing known blocks for {}/{}/{}/{}. Reason: '{}'",
+ m_BuildStorageHttp.GetBaseUri(),
+ m_Namespace,
+ m_Bucket,
+ m_BuildId,
+ Ex.what());
+ }
+ catch (const std::exception& Ex)
+ {
+ Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
+ Result.Reason = fmt::format("Failed listing known blocks for {}/{}/{}/{}. Reason: '{}'",
+ m_BuildStorageHttp.GetBaseUri(),
+ m_Namespace,
+ m_Bucket,
+ m_BuildId,
+ Ex.what());
+ }
+ return Result;
+ }
+
+ virtual AttachmentExistsInCacheResult AttachmentExistsInCache(std::span<const IoHash> RawHashes) override
+ {
+ AttachmentExistsInCacheResult Result;
+ Stopwatch Timer;
+ auto _ = MakeGuard([&Timer, &Result]() { Result.ElapsedSeconds = Timer.GetElapsedTimeUs() / 1000000.0; });
+ try
+ {
+ const std::vector<BuildStorageCache::BlobExistsResult> CacheExistsResult =
+ m_BuildCacheStorage->BlobsExists(m_BuildId, RawHashes);
+
+ if (CacheExistsResult.size() == RawHashes.size())
+ {
+ Result.HasBody.reserve(CacheExistsResult.size());
+ for (size_t BlobIndex = 0; BlobIndex < CacheExistsResult.size(); BlobIndex++)
+ {
+ Result.HasBody.push_back(CacheExistsResult[BlobIndex].HasBody);
+ }
+ }
+ }
+ catch (const HttpClientError& Ex)
+ {
+ Result.ErrorCode = MakeErrorCode(Ex);
+ Result.Reason = fmt::format("Remote cache: Failed finding known blobs for {}/{}/{}/{}. Reason: '{}'",
+ m_BuildStorageHttp.GetBaseUri(),
+ m_Namespace,
+ m_Bucket,
+ m_BuildId,
+ Ex.what());
+ }
+ catch (const std::exception& Ex)
+ {
+ Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
+ Result.Reason = fmt::format("Remote cache: Failed finding known blobs for {}/{}/{}/{}. Reason: '{}'",
+ m_BuildStorageHttp.GetBaseUri(),
+ m_Namespace,
+ m_Bucket,
+ m_BuildId,
+ Ex.what());
+ }
+ return Result;
+ }
+
+ virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash, const AttachmentRange& Range) override
{
ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero);
@@ -474,7 +561,7 @@ public:
{
if (m_BuildCacheStorage)
{
- IoBuffer CachedBlob = m_BuildCacheStorage->GetBuildBlob(m_BuildId, RawHash);
+ IoBuffer CachedBlob = m_BuildCacheStorage->GetBuildBlob(m_BuildId, RawHash, Range.Offset, Range.Bytes);
if (CachedBlob)
{
Result.Bytes = std::move(CachedBlob);
@@ -482,20 +569,23 @@ public:
}
if (!Result.Bytes)
{
- Result.Bytes = m_BuildStorage->GetBuildBlob(m_BuildId, RawHash);
+ Result.Bytes = m_BuildStorage->GetBuildBlob(m_BuildId, RawHash, Range.Offset, Range.Bytes);
if (m_BuildCacheStorage && Result.Bytes && m_PopulateCache)
{
- m_BuildCacheStorage->PutBuildBlob(m_BuildId,
- RawHash,
- Result.Bytes.GetContentType(),
- CompositeBuffer(SharedBuffer(Result.Bytes)));
+ if (!Range)
+ {
+ m_BuildCacheStorage->PutBuildBlob(m_BuildId,
+ RawHash,
+ Result.Bytes.GetContentType(),
+ CompositeBuffer(SharedBuffer(Result.Bytes)));
+ }
}
}
}
catch (const HttpClientError& Ex)
{
Result.ErrorCode = MakeErrorCode(Ex);
- Result.Reason = fmt::format("Failed listing know blocks for {}/{}/{}/{}. Reason: '{}'",
+ Result.Reason = fmt::format("Failed listing known blocks for {}/{}/{}/{}. Reason: '{}'",
m_BuildStorageHttp.GetBaseUri(),
m_Namespace,
m_Bucket,
@@ -505,7 +595,7 @@ public:
catch (const std::exception& Ex)
{
Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
- Result.Reason = fmt::format("Failed listing know blocks for {}/{}/{}/{}. Reason: '{}'",
+ Result.Reason = fmt::format("Failed listing known blocks for {}/{}/{}/{}. Reason: '{}'",
m_BuildStorageHttp.GetBaseUri(),
m_Namespace,
m_Bucket,
@@ -558,7 +648,7 @@ public:
for (const IoHash& Hash : AttachmentsLeftToFind)
{
- LoadAttachmentResult ChunkResult = LoadAttachment(Hash);
+ LoadAttachmentResult ChunkResult = LoadAttachment(Hash, {});
if (ChunkResult.ErrorCode)
{
return LoadAttachmentsResult{ChunkResult};
@@ -623,7 +713,9 @@ CreateJupiterBuildsRemoteStore(LoggerRef InLog,
bool Quiet,
bool Unattended,
bool Hidden,
- WorkerThreadPool& CacheBackgroundWorkerPool)
+ WorkerThreadPool& CacheBackgroundWorkerPool,
+ double& OutHostLatencySec,
+ double& OutCacheLatencySec)
{
std::string Host = Options.Host;
if (!Host.empty() && Host.find("://"sv) == std::string::npos)
@@ -727,6 +819,10 @@ CreateJupiterBuildsRemoteStore(LoggerRef InLog,
Options.ForceDisableBlocks,
Options.ForceDisableTempBlocks,
Options.PopulateCache);
+
+ OutHostLatencySec = ResolveRes.HostLatencySec;
+ OutCacheLatencySec = ResolveRes.CacheLatencySec;
+
return RemoteStore;
}
diff --git a/src/zenremotestore/projectstore/fileremoteprojectstore.cpp b/src/zenremotestore/projectstore/fileremoteprojectstore.cpp
index 3a67d3842..ec7fb7bbc 100644
--- a/src/zenremotestore/projectstore/fileremoteprojectstore.cpp
+++ b/src/zenremotestore/projectstore/fileremoteprojectstore.cpp
@@ -217,7 +217,18 @@ public:
return Result;
}
- virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) override
+ virtual GetBlockDescriptionsResult GetBlockDescriptions(std::span<const IoHash> BlockHashes) override
+ {
+ ZEN_UNUSED(BlockHashes);
+ return GetBlockDescriptionsResult{Result{.ErrorCode = int(HttpResponseCode::NotFound)}};
+ }
+
+ virtual AttachmentExistsInCacheResult AttachmentExistsInCache(std::span<const IoHash> RawHashes) override
+ {
+ return AttachmentExistsInCacheResult{Result{.ErrorCode = 0}, std::vector<bool>(RawHashes.size(), false)};
+ }
+
+ virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash, const AttachmentRange& Range) override
{
Stopwatch Timer;
LoadAttachmentResult Result;
@@ -232,7 +243,14 @@ public:
{
BasicFile ChunkFile;
ChunkFile.Open(ChunkPath, BasicFile::Mode::kRead);
- Result.Bytes = ChunkFile.ReadAll();
+ if (Range)
+ {
+ Result.Bytes = ChunkFile.ReadRange(Range.Offset, Range.Bytes);
+ }
+ else
+ {
+ Result.Bytes = ChunkFile.ReadAll();
+ }
}
AddStats(0, Result.Bytes.GetSize(), Timer.GetElapsedTimeUs() * 1000);
Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0;
@@ -245,7 +263,7 @@ public:
LoadAttachmentsResult Result;
for (const IoHash& Hash : RawHashes)
{
- LoadAttachmentResult ChunkResult = LoadAttachment(Hash);
+ LoadAttachmentResult ChunkResult = LoadAttachment(Hash, {});
if (ChunkResult.ErrorCode)
{
ChunkResult.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0;
diff --git a/src/zenremotestore/projectstore/jupiterremoteprojectstore.cpp b/src/zenremotestore/projectstore/jupiterremoteprojectstore.cpp
index 462de2988..f8179831c 100644
--- a/src/zenremotestore/projectstore/jupiterremoteprojectstore.cpp
+++ b/src/zenremotestore/projectstore/jupiterremoteprojectstore.cpp
@@ -212,7 +212,18 @@ public:
return Result;
}
- virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) override
+ virtual GetBlockDescriptionsResult GetBlockDescriptions(std::span<const IoHash> BlockHashes) override
+ {
+ ZEN_UNUSED(BlockHashes);
+ return GetBlockDescriptionsResult{Result{.ErrorCode = int(HttpResponseCode::NotFound)}};
+ }
+
+ virtual AttachmentExistsInCacheResult AttachmentExistsInCache(std::span<const IoHash> RawHashes) override
+ {
+ return AttachmentExistsInCacheResult{Result{.ErrorCode = 0}, std::vector<bool>(RawHashes.size(), false)};
+ }
+
+ virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash, const AttachmentRange& Range) override
{
JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect);
JupiterResult GetResult = Session.GetCompressedBlob(m_Namespace, RawHash, m_TempFilePath);
@@ -227,6 +238,10 @@ public:
RawHash,
Result.Reason);
}
+ if (!Result.ErrorCode && Range)
+ {
+ Result.Bytes = IoBuffer(Result.Bytes, Range.Offset, Range.Bytes);
+ }
return Result;
}
@@ -235,7 +250,7 @@ public:
LoadAttachmentsResult Result;
for (const IoHash& Hash : RawHashes)
{
- LoadAttachmentResult ChunkResult = LoadAttachment(Hash);
+ LoadAttachmentResult ChunkResult = LoadAttachment(Hash, {});
if (ChunkResult.ErrorCode)
{
return LoadAttachmentsResult{ChunkResult};
diff --git a/src/zenremotestore/projectstore/remoteprojectstore.cpp b/src/zenremotestore/projectstore/remoteprojectstore.cpp
index 8be8eb0df..2a9da6f58 100644
--- a/src/zenremotestore/projectstore/remoteprojectstore.cpp
+++ b/src/zenremotestore/projectstore/remoteprojectstore.cpp
@@ -14,6 +14,7 @@
#include <zencore/trace.h>
#include <zencore/workthreadpool.h>
#include <zenhttp/httpcommon.h>
+#include <zenremotestore/chunking/chunkedcontent.h>
#include <zenremotestore/chunking/chunkedfile.h>
#include <zenremotestore/operationlogoutput.h>
#include <zenstore/cidstore.h>
@@ -229,29 +230,60 @@ namespace remotestore_impl {
struct DownloadInfo
{
- uint64_t OplogSizeBytes = 0;
- std::atomic<uint64_t> AttachmentsDownloaded = 0;
- std::atomic<uint64_t> AttachmentBlocksDownloaded = 0;
- std::atomic<uint64_t> AttachmentBytesDownloaded = 0;
- std::atomic<uint64_t> AttachmentBlockBytesDownloaded = 0;
- std::atomic<uint64_t> AttachmentsStored = 0;
- std::atomic<uint64_t> AttachmentBytesStored = 0;
- std::atomic_size_t MissingAttachmentCount = 0;
+ uint64_t OplogSizeBytes = 0;
+ std::atomic<uint64_t> AttachmentsDownloaded = 0;
+ std::atomic<uint64_t> AttachmentBlocksDownloaded = 0;
+ std::atomic<uint64_t> AttachmentBlocksRangesDownloaded = 0;
+ std::atomic<uint64_t> AttachmentBytesDownloaded = 0;
+ std::atomic<uint64_t> AttachmentBlockBytesDownloaded = 0;
+ std::atomic<uint64_t> AttachmentBlockRangeBytesDownloaded = 0;
+ std::atomic<uint64_t> AttachmentsStored = 0;
+ std::atomic<uint64_t> AttachmentBytesStored = 0;
+ std::atomic_size_t MissingAttachmentCount = 0;
};
- void DownloadAndSaveBlockChunks(CidStore& ChunkStore,
- RemoteProjectStore& RemoteStore,
- bool IgnoreMissingAttachments,
- JobContext* OptionalContext,
- WorkerThreadPool& NetworkWorkerPool,
- WorkerThreadPool& WorkerPool,
- Latch& AttachmentsDownloadLatch,
- Latch& AttachmentsWriteLatch,
- AsyncRemoteResult& RemoteResult,
- DownloadInfo& Info,
- Stopwatch& LoadAttachmentsTimer,
- std::atomic_uint64_t& DownloadStartMS,
- const std::vector<IoHash>& Chunks)
+ class JobContextLogOutput : public OperationLogOutput
+ {
+ public:
+ JobContextLogOutput(JobContext* OptionalContext) : m_OptionalContext(OptionalContext) {}
+ virtual void EmitLogMessage(int LogLevel, std::string_view Format, fmt::format_args Args) override
+ {
+ ZEN_UNUSED(LogLevel);
+ if (m_OptionalContext)
+ {
+ fmt::basic_memory_buffer<char, 250> MessageBuffer;
+ fmt::vformat_to(fmt::appender(MessageBuffer), Format, Args);
+ remotestore_impl::ReportMessage(m_OptionalContext, std::string_view(MessageBuffer.data(), MessageBuffer.size()));
+ }
+ }
+
+ virtual void SetLogOperationName(std::string_view Name) override { ZEN_UNUSED(Name); }
+ virtual void SetLogOperationProgress(uint32_t StepIndex, uint32_t StepCount) override { ZEN_UNUSED(StepIndex, StepCount); }
+ virtual uint32_t GetProgressUpdateDelayMS() override { return 0; }
+ virtual ProgressBar* CreateProgressBar(std::string_view InSubTask) override
+ {
+ ZEN_UNUSED(InSubTask);
+ return nullptr;
+ }
+
+ private:
+ JobContext* m_OptionalContext;
+ };
+
+ void DownloadAndSaveBlockChunks(CidStore& ChunkStore,
+ RemoteProjectStore& RemoteStore,
+ bool IgnoreMissingAttachments,
+ JobContext* OptionalContext,
+ WorkerThreadPool& NetworkWorkerPool,
+ WorkerThreadPool& WorkerPool,
+ Latch& AttachmentsDownloadLatch,
+ Latch& AttachmentsWriteLatch,
+ AsyncRemoteResult& RemoteResult,
+ DownloadInfo& Info,
+ Stopwatch& LoadAttachmentsTimer,
+ std::atomic_uint64_t& DownloadStartMS,
+ ThinChunkBlockDescription&& ThinBlockDescription,
+ std::vector<uint32_t>&& NeededChunkIndexes)
{
AttachmentsDownloadLatch.AddCount(1);
NetworkWorkerPool.ScheduleWork(
@@ -261,7 +293,8 @@ namespace remotestore_impl {
&AttachmentsDownloadLatch,
&AttachmentsWriteLatch,
&RemoteResult,
- Chunks = Chunks,
+ ThinBlockDescription = std::move(ThinBlockDescription),
+ NeededChunkIndexes = std::move(NeededChunkIndexes),
&Info,
&LoadAttachmentsTimer,
&DownloadStartMS,
@@ -276,6 +309,13 @@ namespace remotestore_impl {
}
try
{
+ std::vector<IoHash> Chunks;
+ Chunks.reserve(NeededChunkIndexes.size());
+ for (uint32_t ChunkIndex : NeededChunkIndexes)
+ {
+ Chunks.push_back(ThinBlockDescription.ChunkRawHashes[ChunkIndex]);
+ }
+
uint64_t Unset = (std::uint64_t)-1;
DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs());
RemoteProjectStore::LoadAttachmentsResult Result = RemoteStore.LoadAttachments(Chunks);
@@ -293,7 +333,12 @@ namespace remotestore_impl {
}
return;
}
- Info.AttachmentsDownloaded.fetch_add(Chunks.size());
+ Info.AttachmentsDownloaded.fetch_add(Result.Chunks.size());
+ for (const auto& It : Result.Chunks)
+ {
+ uint64_t ChunkSize = It.second.GetCompressedSize();
+ Info.AttachmentBytesDownloaded.fetch_add(ChunkSize);
+ }
ZEN_INFO("Loaded {} bulk attachments in {}",
Chunks.size(),
NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)));
@@ -320,8 +365,6 @@ namespace remotestore_impl {
for (const auto& It : Chunks)
{
- uint64_t ChunkSize = It.second.GetCompressedSize();
- Info.AttachmentBytesDownloaded.fetch_add(ChunkSize);
WriteAttachmentBuffers.push_back(It.second.GetCompressed().Flatten().AsIoBuffer());
WriteRawHashes.push_back(It.first);
}
@@ -350,28 +393,29 @@ namespace remotestore_impl {
catch (const std::exception& Ex)
{
RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
- fmt::format("Failed to bulk load {} attachments", Chunks.size()),
+ fmt::format("Failed to bulk load {} attachments", NeededChunkIndexes.size()),
Ex.what());
}
},
WorkerThreadPool::EMode::EnableBacklog);
};
- void DownloadAndSaveBlock(CidStore& ChunkStore,
- RemoteProjectStore& RemoteStore,
- bool IgnoreMissingAttachments,
- JobContext* OptionalContext,
- WorkerThreadPool& NetworkWorkerPool,
- WorkerThreadPool& WorkerPool,
- Latch& AttachmentsDownloadLatch,
- Latch& AttachmentsWriteLatch,
- AsyncRemoteResult& RemoteResult,
- DownloadInfo& Info,
- Stopwatch& LoadAttachmentsTimer,
- std::atomic_uint64_t& DownloadStartMS,
- const IoHash& BlockHash,
- const std::vector<IoHash>& Chunks,
- uint32_t RetriesLeft)
+ void DownloadAndSaveBlock(CidStore& ChunkStore,
+ RemoteProjectStore& RemoteStore,
+ bool IgnoreMissingAttachments,
+ JobContext* OptionalContext,
+ WorkerThreadPool& NetworkWorkerPool,
+ WorkerThreadPool& WorkerPool,
+ Latch& AttachmentsDownloadLatch,
+ Latch& AttachmentsWriteLatch,
+ AsyncRemoteResult& RemoteResult,
+ DownloadInfo& Info,
+ Stopwatch& LoadAttachmentsTimer,
+ std::atomic_uint64_t& DownloadStartMS,
+ const IoHash& BlockHash,
+ const tsl::robin_map<IoHash, uint32_t, IoHash::Hasher>& AllNeededPartialChunkHashesLookup,
+ std::span<std::atomic<bool>> ChunkDownloadedFlags,
+ uint32_t RetriesLeft)
{
AttachmentsDownloadLatch.AddCount(1);
NetworkWorkerPool.ScheduleWork(
@@ -381,7 +425,6 @@ namespace remotestore_impl {
&RemoteStore,
&NetworkWorkerPool,
&WorkerPool,
- BlockHash,
&RemoteResult,
&Info,
&LoadAttachmentsTimer,
@@ -389,7 +432,9 @@ namespace remotestore_impl {
IgnoreMissingAttachments,
OptionalContext,
RetriesLeft,
- Chunks = std::vector<IoHash>(Chunks)]() {
+ BlockHash = IoHash(BlockHash),
+ &AllNeededPartialChunkHashesLookup,
+ ChunkDownloadedFlags]() {
ZEN_TRACE_CPU("DownloadBlock");
auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); });
@@ -401,7 +446,7 @@ namespace remotestore_impl {
{
uint64_t Unset = (std::uint64_t)-1;
DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs());
- RemoteProjectStore::LoadAttachmentResult BlockResult = RemoteStore.LoadAttachment(BlockHash);
+ RemoteProjectStore::LoadAttachmentResult BlockResult = RemoteStore.LoadAttachment(BlockHash, {});
if (BlockResult.ErrorCode)
{
ReportMessage(OptionalContext,
@@ -422,10 +467,10 @@ namespace remotestore_impl {
}
uint64_t BlockSize = BlockResult.Bytes.GetSize();
Info.AttachmentBlocksDownloaded.fetch_add(1);
- ZEN_INFO("Loaded block attachment '{}' in {} ({})",
- BlockHash,
- NiceTimeSpanMs(static_cast<uint64_t>(BlockResult.ElapsedSeconds * 1000)),
- NiceBytes(BlockSize));
+ ZEN_DEBUG("Loaded block attachment '{}' in {} ({})",
+ BlockHash,
+ NiceTimeSpanMs(static_cast<uint64_t>(BlockResult.ElapsedSeconds * 1000)),
+ NiceBytes(BlockSize));
Info.AttachmentBlockBytesDownloaded.fetch_add(BlockSize);
AttachmentsWriteLatch.AddCount(1);
@@ -436,7 +481,6 @@ namespace remotestore_impl {
&RemoteStore,
&NetworkWorkerPool,
&WorkerPool,
- BlockHash,
&RemoteResult,
&Info,
&LoadAttachmentsTimer,
@@ -444,8 +488,10 @@ namespace remotestore_impl {
IgnoreMissingAttachments,
OptionalContext,
RetriesLeft,
- Chunks = std::move(Chunks),
- Bytes = std::move(BlockResult.Bytes)]() {
+ BlockHash = IoHash(BlockHash),
+ &AllNeededPartialChunkHashesLookup,
+ ChunkDownloadedFlags,
+ Bytes = std::move(BlockResult.Bytes)]() {
auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); });
if (RemoteResult.IsError())
{
@@ -454,9 +500,6 @@ namespace remotestore_impl {
try
{
ZEN_ASSERT(Bytes.Size() > 0);
- std::unordered_set<IoHash, IoHash::Hasher> WantedChunks;
- WantedChunks.reserve(Chunks.size());
- WantedChunks.insert(Chunks.begin(), Chunks.end());
std::vector<IoBuffer> WriteAttachmentBuffers;
std::vector<IoHash> WriteRawHashes;
@@ -485,7 +528,8 @@ namespace remotestore_impl {
LoadAttachmentsTimer,
DownloadStartMS,
BlockHash,
- std::move(Chunks),
+ AllNeededPartialChunkHashesLookup,
+ ChunkDownloadedFlags,
RetriesLeft - 1);
}
ReportMessage(
@@ -519,7 +563,8 @@ namespace remotestore_impl {
LoadAttachmentsTimer,
DownloadStartMS,
BlockHash,
- std::move(Chunks),
+ AllNeededPartialChunkHashesLookup,
+ ChunkDownloadedFlags,
RetriesLeft - 1);
}
ReportMessage(OptionalContext,
@@ -546,28 +591,36 @@ namespace remotestore_impl {
uint64_t BlockSize = BlockPayload.GetSize();
uint64_t BlockHeaderSize = 0;
- bool StoreChunksOK = IterateChunkBlock(
- BlockPayload.Flatten(),
- [&WantedChunks, &WriteAttachmentBuffers, &WriteRawHashes, &Info, &PotentialSize](
- CompressedBuffer&& Chunk,
- const IoHash& AttachmentRawHash) {
- if (WantedChunks.contains(AttachmentRawHash))
- {
- WriteAttachmentBuffers.emplace_back(Chunk.GetCompressed().Flatten().AsIoBuffer());
- IoHash RawHash;
- uint64_t RawSize;
- ZEN_ASSERT(
- CompressedBuffer::ValidateCompressedHeader(WriteAttachmentBuffers.back(),
- RawHash,
- RawSize,
- /*OutOptionalTotalCompressedSize*/ nullptr));
- ZEN_ASSERT(RawHash == AttachmentRawHash);
- WriteRawHashes.emplace_back(AttachmentRawHash);
- WantedChunks.erase(AttachmentRawHash);
- PotentialSize += WriteAttachmentBuffers.back().GetSize();
- }
- },
- BlockHeaderSize);
+
+ bool StoreChunksOK = IterateChunkBlock(
+ BlockPayload.Flatten(),
+ [&AllNeededPartialChunkHashesLookup,
+ &ChunkDownloadedFlags,
+ &WriteAttachmentBuffers,
+ &WriteRawHashes,
+ &Info,
+ &PotentialSize](CompressedBuffer&& Chunk, const IoHash& AttachmentRawHash) {
+ auto ChunkIndexIt = AllNeededPartialChunkHashesLookup.find(AttachmentRawHash);
+ if (ChunkIndexIt != AllNeededPartialChunkHashesLookup.end())
+ {
+ bool Expected = false;
+ if (ChunkDownloadedFlags[ChunkIndexIt->second].compare_exchange_strong(Expected, true))
+ {
+ WriteAttachmentBuffers.emplace_back(Chunk.GetCompressed().Flatten().AsIoBuffer());
+ IoHash RawHash;
+ uint64_t RawSize;
+ ZEN_ASSERT(
+ CompressedBuffer::ValidateCompressedHeader(WriteAttachmentBuffers.back(),
+ RawHash,
+ RawSize,
+ /*OutOptionalTotalCompressedSize*/ nullptr));
+ ZEN_ASSERT(RawHash == AttachmentRawHash);
+ WriteRawHashes.emplace_back(AttachmentRawHash);
+ PotentialSize += WriteAttachmentBuffers.back().GetSize();
+ }
+ }
+ },
+ BlockHeaderSize);
if (!StoreChunksOK)
{
@@ -582,8 +635,6 @@ namespace remotestore_impl {
return;
}
- ZEN_ASSERT(WantedChunks.empty());
-
if (!WriteAttachmentBuffers.empty())
{
auto Results = ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes);
@@ -625,6 +676,293 @@ namespace remotestore_impl {
WorkerThreadPool::EMode::EnableBacklog);
};
+ void DownloadAndSavePartialBlock(CidStore& ChunkStore,
+ RemoteProjectStore& RemoteStore,
+ bool IgnoreMissingAttachments,
+ JobContext* OptionalContext,
+ WorkerThreadPool& NetworkWorkerPool,
+ WorkerThreadPool& WorkerPool,
+ Latch& AttachmentsDownloadLatch,
+ Latch& AttachmentsWriteLatch,
+ AsyncRemoteResult& RemoteResult,
+ DownloadInfo& Info,
+ Stopwatch& LoadAttachmentsTimer,
+ std::atomic_uint64_t& DownloadStartMS,
+ const ChunkBlockDescription& BlockDescription,
+ std::span<const ChunkBlockAnalyser::BlockRangeDescriptor> BlockRangeDescriptors,
+ size_t BlockRangeIndexStart,
+ size_t BlockRangeCount,
+ const tsl::robin_map<IoHash, uint32_t, IoHash::Hasher>& AllNeededPartialChunkHashesLookup,
+ std::span<std::atomic<bool>> ChunkDownloadedFlags,
+ uint32_t RetriesLeft)
+ {
+ AttachmentsDownloadLatch.AddCount(1);
+ NetworkWorkerPool.ScheduleWork(
+ [&AttachmentsDownloadLatch,
+ &AttachmentsWriteLatch,
+ &ChunkStore,
+ &RemoteStore,
+ &NetworkWorkerPool,
+ &WorkerPool,
+ &RemoteResult,
+ &Info,
+ &LoadAttachmentsTimer,
+ &DownloadStartMS,
+ IgnoreMissingAttachments,
+ OptionalContext,
+ RetriesLeft,
+ BlockDescription,
+ BlockRangeDescriptors,
+ BlockRangeIndexStart,
+ BlockRangeCount,
+ &AllNeededPartialChunkHashesLookup,
+ ChunkDownloadedFlags]() {
+ ZEN_TRACE_CPU("DownloadBlockRanges");
+
+ auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); });
+ try
+ {
+ uint64_t Unset = (std::uint64_t)-1;
+ DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs());
+
+ double DownloadElapsedSeconds = 0;
+ uint64_t DownloadedBytes = 0;
+
+ for (size_t BlockRangeIndex = BlockRangeIndexStart; BlockRangeIndex < BlockRangeIndexStart + BlockRangeCount;
+ BlockRangeIndex++)
+ {
+ if (RemoteResult.IsError())
+ {
+ return;
+ }
+
+ const ChunkBlockAnalyser::BlockRangeDescriptor& BlockRange = BlockRangeDescriptors[BlockRangeIndex];
+
+ RemoteProjectStore::LoadAttachmentResult BlockResult =
+ RemoteStore.LoadAttachment(BlockDescription.BlockHash,
+ {.Offset = BlockRange.RangeStart, .Bytes = BlockRange.RangeLength});
+ if (BlockResult.ErrorCode)
+ {
+ ReportMessage(OptionalContext,
+ fmt::format("Failed to download block attachment '{}' range {},{} ({}): {}",
+ BlockDescription.BlockHash,
+ BlockRange.RangeStart,
+ BlockRange.RangeLength,
+ BlockResult.ErrorCode,
+ BlockResult.Reason));
+ Info.MissingAttachmentCount.fetch_add(1);
+ if (!IgnoreMissingAttachments)
+ {
+ RemoteResult.SetError(BlockResult.ErrorCode, BlockResult.Reason, BlockResult.Text);
+ }
+ return;
+ }
+ if (RemoteResult.IsError())
+ {
+ return;
+ }
+ uint64_t BlockPartSize = BlockResult.Bytes.GetSize();
+ if (BlockPartSize != BlockRange.RangeLength)
+ {
+ std::string ErrorString =
+ fmt::format("Failed to download block attachment '{}' range {},{}, got {} bytes ({}): {}",
+ BlockDescription.BlockHash,
+ BlockRange.RangeStart,
+ BlockRange.RangeLength,
+ BlockPartSize,
+ RemoteResult.GetError(),
+ RemoteResult.GetErrorReason());
+
+ ReportMessage(OptionalContext, ErrorString);
+ Info.MissingAttachmentCount.fetch_add(1);
+ if (!IgnoreMissingAttachments)
+ {
+ RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::NotFound),
+ "Mismatching block part range received",
+ ErrorString);
+ }
+ return;
+ }
+ Info.AttachmentBlocksRangesDownloaded.fetch_add(1);
+
+ DownloadElapsedSeconds += BlockResult.ElapsedSeconds;
+ DownloadedBytes += BlockPartSize;
+
+ Info.AttachmentBlockRangeBytesDownloaded.fetch_add(BlockPartSize);
+
+ AttachmentsWriteLatch.AddCount(1);
+ WorkerPool.ScheduleWork(
+ [&AttachmentsDownloadLatch,
+ &AttachmentsWriteLatch,
+ &ChunkStore,
+ &RemoteStore,
+ &NetworkWorkerPool,
+ &WorkerPool,
+ &RemoteResult,
+ &Info,
+ &LoadAttachmentsTimer,
+ &DownloadStartMS,
+ IgnoreMissingAttachments,
+ OptionalContext,
+ RetriesLeft,
+ BlockDescription,
+ BlockRange,
+ &AllNeededPartialChunkHashesLookup,
+ ChunkDownloadedFlags,
+ BlockPayload = std::move(BlockResult.Bytes)]() {
+ auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); });
+ if (RemoteResult.IsError())
+ {
+ return;
+ }
+ try
+ {
+ ZEN_ASSERT(BlockPayload.Size() > 0);
+ std::vector<IoBuffer> WriteAttachmentBuffers;
+ std::vector<IoHash> WriteRawHashes;
+
+ uint64_t PotentialSize = 0;
+ uint64_t UsedSize = 0;
+ uint64_t BlockPartSize = BlockPayload.GetSize();
+
+ uint32_t OffsetInBlock = 0;
+ for (uint32_t ChunkBlockIndex = BlockRange.ChunkBlockIndexStart;
+ ChunkBlockIndex < BlockRange.ChunkBlockIndexStart + BlockRange.ChunkBlockIndexCount;
+ ChunkBlockIndex++)
+ {
+ const uint32_t ChunkCompressedSize = BlockDescription.ChunkCompressedLengths[ChunkBlockIndex];
+ const IoHash& ChunkHash = BlockDescription.ChunkRawHashes[ChunkBlockIndex];
+
+ if (auto ChunkIndexIt = AllNeededPartialChunkHashesLookup.find(ChunkHash);
+ ChunkIndexIt != AllNeededPartialChunkHashesLookup.end())
+ {
+ bool Expected = false;
+ if (ChunkDownloadedFlags[ChunkIndexIt->second].compare_exchange_strong(Expected, true))
+ {
+ IoHash VerifyChunkHash;
+ uint64_t VerifyChunkSize;
+ CompressedBuffer CompressedChunk = CompressedBuffer::FromCompressed(
+ SharedBuffer(IoBuffer(BlockPayload, OffsetInBlock, ChunkCompressedSize)),
+ VerifyChunkHash,
+ VerifyChunkSize);
+ if (!CompressedChunk)
+ {
+ std::string ErrorString = fmt::format(
+ "Chunk at {},{} in block attachment '{}' is not a valid compressed buffer",
+ OffsetInBlock,
+ ChunkCompressedSize,
+ BlockDescription.BlockHash);
+ ReportMessage(OptionalContext, ErrorString);
+ Info.MissingAttachmentCount.fetch_add(1);
+ if (!IgnoreMissingAttachments)
+ {
+ RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::NotFound),
+ "Malformed chunk block",
+ ErrorString);
+ }
+ continue;
+ }
+ if (VerifyChunkHash != ChunkHash)
+ {
+ std::string ErrorString = fmt::format(
+ "Chunk at {},{} in block attachment '{}' has mismatching hash, expected {}, got {}",
+ OffsetInBlock,
+ ChunkCompressedSize,
+ BlockDescription.BlockHash,
+ ChunkHash,
+ VerifyChunkHash);
+ ReportMessage(OptionalContext, ErrorString);
+ Info.MissingAttachmentCount.fetch_add(1);
+ if (!IgnoreMissingAttachments)
+ {
+ RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::NotFound),
+ "Malformed chunk block",
+ ErrorString);
+ }
+ continue;
+ }
+ if (VerifyChunkSize != BlockDescription.ChunkRawLengths[ChunkBlockIndex])
+ {
+ std::string ErrorString = fmt::format(
+ "Chunk at {},{} in block attachment '{}' has mismatching raw size, expected {}, "
+ "got {}",
+ OffsetInBlock,
+ ChunkCompressedSize,
+ BlockDescription.BlockHash,
+ BlockDescription.ChunkRawLengths[ChunkBlockIndex],
+ VerifyChunkSize);
+ ReportMessage(OptionalContext, ErrorString);
+ Info.MissingAttachmentCount.fetch_add(1);
+ if (!IgnoreMissingAttachments)
+ {
+ RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::NotFound),
+ "Malformed chunk block",
+ ErrorString);
+ }
+ continue;
+ }
+
+ WriteAttachmentBuffers.emplace_back(CompressedChunk.GetCompressed().Flatten().AsIoBuffer());
+ WriteRawHashes.emplace_back(ChunkHash);
+ PotentialSize += WriteAttachmentBuffers.back().GetSize();
+ }
+ }
+ OffsetInBlock += ChunkCompressedSize;
+ }
+
+ if (!WriteAttachmentBuffers.empty())
+ {
+ auto Results = ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes);
+ for (size_t Index = 0; Index < Results.size(); Index++)
+ {
+ const auto& Result = Results[Index];
+ if (Result.New)
+ {
+ Info.AttachmentBytesStored.fetch_add(WriteAttachmentBuffers[Index].GetSize());
+ Info.AttachmentsStored.fetch_add(1);
+ UsedSize += WriteAttachmentBuffers[Index].GetSize();
+ }
+ }
+ ZEN_DEBUG("Used {} (matching {}) out of {} for block {} range {}, {} ({} %) (use of matching {}%)",
+ NiceBytes(UsedSize),
+ NiceBytes(PotentialSize),
+ NiceBytes(BlockPartSize),
+ BlockDescription.BlockHash,
+ BlockRange.RangeStart,
+ BlockRange.RangeLength,
+ (100 * UsedSize) / BlockPartSize,
+ PotentialSize > 0 ? (UsedSize * 100) / PotentialSize : 0);
+ }
+ }
+ catch (const std::exception& Ex)
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
+ fmt::format("Failed save block attachment {} range {}, {}",
+ BlockDescription.BlockHash,
+ BlockRange.RangeStart,
+ BlockRange.RangeLength),
+ Ex.what());
+ }
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
+ }
+
+ ZEN_DEBUG("Loaded {} ranges from block attachment '{}' in {} ({})",
+ BlockRangeCount,
+ BlockDescription.BlockHash,
+ NiceTimeSpanMs(static_cast<uint64_t>(DownloadElapsedSeconds * 1000)),
+ NiceBytes(DownloadedBytes));
+ }
+ catch (const std::exception& Ex)
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
+ fmt::format("Failed to download block attachment {} ranges", BlockDescription.BlockHash),
+ Ex.what());
+ }
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
+ };
+
void DownloadAndSaveAttachment(CidStore& ChunkStore,
RemoteProjectStore& RemoteStore,
bool IgnoreMissingAttachments,
@@ -664,7 +1002,7 @@ namespace remotestore_impl {
{
uint64_t Unset = (std::uint64_t)-1;
DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs());
- RemoteProjectStore::LoadAttachmentResult AttachmentResult = RemoteStore.LoadAttachment(RawHash);
+ RemoteProjectStore::LoadAttachmentResult AttachmentResult = RemoteStore.LoadAttachment(RawHash, {});
if (AttachmentResult.ErrorCode)
{
ReportMessage(OptionalContext,
@@ -680,10 +1018,10 @@ namespace remotestore_impl {
return;
}
uint64_t AttachmentSize = AttachmentResult.Bytes.GetSize();
- ZEN_INFO("Loaded large attachment '{}' in {} ({})",
- RawHash,
- NiceTimeSpanMs(static_cast<uint64_t>(AttachmentResult.ElapsedSeconds * 1000)),
- NiceBytes(AttachmentSize));
+ ZEN_DEBUG("Loaded large attachment '{}' in {} ({})",
+ RawHash,
+ NiceTimeSpanMs(static_cast<uint64_t>(AttachmentResult.ElapsedSeconds * 1000)),
+ NiceBytes(AttachmentSize));
Info.AttachmentsDownloaded.fetch_add(1);
if (RemoteResult.IsError())
{
@@ -1224,35 +1562,7 @@ BuildContainer(CidStore& ChunkStore,
{
using namespace std::literals;
- class JobContextLogOutput : public OperationLogOutput
- {
- public:
- JobContextLogOutput(JobContext* OptionalContext) : m_OptionalContext(OptionalContext) {}
- virtual void EmitLogMessage(int LogLevel, std::string_view Format, fmt::format_args Args) override
- {
- ZEN_UNUSED(LogLevel);
- if (m_OptionalContext)
- {
- fmt::basic_memory_buffer<char, 250> MessageBuffer;
- fmt::vformat_to(fmt::appender(MessageBuffer), Format, Args);
- remotestore_impl::ReportMessage(m_OptionalContext, std::string_view(MessageBuffer.data(), MessageBuffer.size()));
- }
- }
-
- virtual void SetLogOperationName(std::string_view Name) override { ZEN_UNUSED(Name); }
- virtual void SetLogOperationProgress(uint32_t StepIndex, uint32_t StepCount) override { ZEN_UNUSED(StepIndex, StepCount); }
- virtual uint32_t GetProgressUpdateDelayMS() override { return 0; }
- virtual ProgressBar* CreateProgressBar(std::string_view InSubTask) override
- {
- ZEN_UNUSED(InSubTask);
- return nullptr;
- }
-
- private:
- JobContext* m_OptionalContext;
- };
-
- std::unique_ptr<OperationLogOutput> LogOutput(std::make_unique<JobContextLogOutput>(OptionalContext));
+ std::unique_ptr<OperationLogOutput> LogOutput(std::make_unique<remotestore_impl::JobContextLogOutput>(OptionalContext));
size_t OpCount = 0;
@@ -2768,14 +3078,15 @@ SaveOplog(CidStore& ChunkStore,
};
RemoteProjectStore::Result
-ParseOplogContainer(const CbObject& ContainerObject,
- const std::function<void(std::span<IoHash> RawHashes)>& OnReferencedAttachments,
- const std::function<bool(const IoHash& RawHash)>& HasAttachment,
- const std::function<void(const IoHash& BlockHash, std::vector<IoHash>&& Chunks)>& OnNeedBlock,
- const std::function<void(const IoHash& RawHash)>& OnNeedAttachment,
- const std::function<void(const ChunkedInfo&)>& OnChunkedAttachment,
- CbObject& OutOplogSection,
- JobContext* OptionalContext)
+ParseOplogContainer(
+ const CbObject& ContainerObject,
+ const std::function<void(std::span<IoHash> RawHashes)>& OnReferencedAttachments,
+ const std::function<bool(const IoHash& RawHash)>& HasAttachment,
+ const std::function<void(ThinChunkBlockDescription&& ThinBlockDescription, std::vector<uint32_t>&& NeededChunkIndexes)>& OnNeedBlock,
+ const std::function<void(const IoHash& RawHash)>& OnNeedAttachment,
+ const std::function<void(const ChunkedInfo&)>& OnChunkedAttachment,
+ CbObject& OutOplogSection,
+ JobContext* OptionalContext)
{
using namespace std::literals;
@@ -2801,12 +3112,12 @@ ParseOplogContainer(const CbObject& ContainerObject,
"Section has unexpected data type",
"Failed to save oplog container"};
}
- std::unordered_set<IoHash, IoHash::Hasher> OpsAttachments;
+ std::unordered_set<IoHash, IoHash::Hasher> NeededAttachments;
{
CbArrayView OpsArray = OutOplogSection["ops"sv].AsArrayView();
for (CbFieldView OpEntry : OpsArray)
{
- OpEntry.IterateAttachments([&](CbFieldView FieldView) { OpsAttachments.insert(FieldView.AsAttachment()); });
+ OpEntry.IterateAttachments([&](CbFieldView FieldView) { NeededAttachments.insert(FieldView.AsAttachment()); });
if (remotestore_impl::IsCancelled(OptionalContext))
{
return RemoteProjectStore::Result{.ErrorCode = gsl::narrow<int>(HttpResponseCode::OK),
@@ -2816,7 +3127,7 @@ ParseOplogContainer(const CbObject& ContainerObject,
}
}
{
- std::vector<IoHash> ReferencedAttachments(OpsAttachments.begin(), OpsAttachments.end());
+ std::vector<IoHash> ReferencedAttachments(NeededAttachments.begin(), NeededAttachments.end());
OnReferencedAttachments(ReferencedAttachments);
}
@@ -2827,24 +3138,27 @@ ParseOplogContainer(const CbObject& ContainerObject,
.Reason = "Operation cancelled"};
}
- remotestore_impl::ReportMessage(OptionalContext, fmt::format("Oplog references {} attachments", OpsAttachments.size()));
+ remotestore_impl::ReportMessage(OptionalContext, fmt::format("Oplog references {} attachments", NeededAttachments.size()));
CbArrayView ChunkedFilesArray = ContainerObject["chunkedfiles"sv].AsArrayView();
for (CbFieldView ChunkedFileField : ChunkedFilesArray)
{
CbObjectView ChunkedFileView = ChunkedFileField.AsObjectView();
IoHash RawHash = ChunkedFileView["rawhash"sv].AsHash();
- if (OpsAttachments.contains(RawHash) && (!HasAttachment(RawHash)))
+ if (NeededAttachments.erase(RawHash) == 1)
{
- ChunkedInfo Chunked = ReadChunkedInfo(ChunkedFileView);
-
- OnReferencedAttachments(Chunked.ChunkHashes);
- OpsAttachments.insert(Chunked.ChunkHashes.begin(), Chunked.ChunkHashes.end());
- OnChunkedAttachment(Chunked);
- ZEN_INFO("Requesting chunked attachment '{}' ({}) built from {} chunks",
- Chunked.RawHash,
- NiceBytes(Chunked.RawSize),
- Chunked.ChunkHashes.size());
+ if (!HasAttachment(RawHash))
+ {
+ ChunkedInfo Chunked = ReadChunkedInfo(ChunkedFileView);
+
+ OnReferencedAttachments(Chunked.ChunkHashes);
+ NeededAttachments.insert(Chunked.ChunkHashes.begin(), Chunked.ChunkHashes.end());
+ OnChunkedAttachment(Chunked);
+ ZEN_INFO("Requesting chunked attachment '{}' ({}) built from {} chunks",
+ Chunked.RawHash,
+ NiceBytes(Chunked.RawSize),
+ Chunked.ChunkHashes.size());
+ }
}
if (remotestore_impl::IsCancelled(OptionalContext))
{
@@ -2854,6 +3168,8 @@ ParseOplogContainer(const CbObject& ContainerObject,
}
}
+ std::vector<ThinChunkBlockDescription> ThinBlocksDescriptions;
+
size_t NeedBlockCount = 0;
CbArrayView BlocksArray = ContainerObject["blocks"sv].AsArrayView();
for (CbFieldView BlockField : BlocksArray)
@@ -2863,45 +3179,38 @@ ParseOplogContainer(const CbObject& ContainerObject,
CbArrayView ChunksArray = BlockView["chunks"sv].AsArrayView();
- std::vector<IoHash> NeededChunks;
- NeededChunks.reserve(ChunksArray.Num());
- if (BlockHash == IoHash::Zero)
+ std::vector<IoHash> ChunkHashes;
+ ChunkHashes.reserve(ChunksArray.Num());
+ for (CbFieldView ChunkField : ChunksArray)
{
- for (CbFieldView ChunkField : ChunksArray)
- {
- IoHash ChunkHash = ChunkField.AsBinaryAttachment();
- if (OpsAttachments.erase(ChunkHash) == 1)
- {
- if (!HasAttachment(ChunkHash))
- {
- NeededChunks.emplace_back(ChunkHash);
- }
- }
- }
+ ChunkHashes.push_back(ChunkField.AsHash());
}
- else
+ ThinBlocksDescriptions.push_back(ThinChunkBlockDescription{.BlockHash = BlockHash, .ChunkRawHashes = std::move(ChunkHashes)});
+ }
+
+ for (ThinChunkBlockDescription& ThinBlockDescription : ThinBlocksDescriptions)
+ {
+ std::vector<uint32_t> NeededBlockChunkIndexes;
+ for (uint32_t ChunkIndex = 0; ChunkIndex < ThinBlockDescription.ChunkRawHashes.size(); ChunkIndex++)
{
- for (CbFieldView ChunkField : ChunksArray)
+ const IoHash& ChunkHash = ThinBlockDescription.ChunkRawHashes[ChunkIndex];
+ if (NeededAttachments.erase(ChunkHash) == 1)
{
- const IoHash ChunkHash = ChunkField.AsHash();
- if (OpsAttachments.erase(ChunkHash) == 1)
+ if (!HasAttachment(ChunkHash))
{
- if (!HasAttachment(ChunkHash))
- {
- NeededChunks.emplace_back(ChunkHash);
- }
+ NeededBlockChunkIndexes.push_back(ChunkIndex);
}
}
}
-
- if (!NeededChunks.empty())
+ if (!NeededBlockChunkIndexes.empty())
{
- OnNeedBlock(BlockHash, std::move(NeededChunks));
- if (BlockHash != IoHash::Zero)
+ if (ThinBlockDescription.BlockHash != IoHash::Zero)
{
NeedBlockCount++;
}
+ OnNeedBlock(std::move(ThinBlockDescription), std::move(NeededBlockChunkIndexes));
}
+
if (remotestore_impl::IsCancelled(OptionalContext))
{
return RemoteProjectStore::Result{.ErrorCode = gsl::narrow<int>(HttpResponseCode::OK),
@@ -2909,6 +3218,7 @@ ParseOplogContainer(const CbObject& ContainerObject,
.Reason = "Operation cancelled"};
}
}
+
remotestore_impl::ReportMessage(OptionalContext,
fmt::format("Requesting {} of {} attachment blocks", NeedBlockCount, BlocksArray.Num()));
@@ -2918,7 +3228,7 @@ ParseOplogContainer(const CbObject& ContainerObject,
{
IoHash AttachmentHash = LargeChunksField.AsBinaryAttachment();
- if (OpsAttachments.erase(AttachmentHash) == 1)
+ if (NeededAttachments.erase(AttachmentHash) == 1)
{
if (!HasAttachment(AttachmentHash))
{
@@ -2941,14 +3251,15 @@ ParseOplogContainer(const CbObject& ContainerObject,
}
RemoteProjectStore::Result
-SaveOplogContainer(ProjectStore::Oplog& Oplog,
- const CbObject& ContainerObject,
- const std::function<void(std::span<IoHash> RawHashes)>& OnReferencedAttachments,
- const std::function<bool(const IoHash& RawHash)>& HasAttachment,
- const std::function<void(const IoHash& BlockHash, std::vector<IoHash>&& Chunks)>& OnNeedBlock,
- const std::function<void(const IoHash& RawHash)>& OnNeedAttachment,
- const std::function<void(const ChunkedInfo&)>& OnChunkedAttachment,
- JobContext* OptionalContext)
+SaveOplogContainer(
+ ProjectStore::Oplog& Oplog,
+ const CbObject& ContainerObject,
+ const std::function<void(std::span<IoHash> RawHashes)>& OnReferencedAttachments,
+ const std::function<bool(const IoHash& RawHash)>& HasAttachment,
+ const std::function<void(ThinChunkBlockDescription&& ThinBlockDescription, std::vector<uint32_t>&& NeededChunkIndexes)>& OnNeedBlock,
+ const std::function<void(const IoHash& RawHash)>& OnNeedAttachment,
+ const std::function<void(const ChunkedInfo&)>& OnChunkedAttachment,
+ JobContext* OptionalContext)
{
using namespace std::literals;
@@ -2972,18 +3283,23 @@ SaveOplogContainer(ProjectStore::Oplog& Oplog,
}
RemoteProjectStore::Result
-LoadOplog(CidStore& ChunkStore,
- RemoteProjectStore& RemoteStore,
- ProjectStore::Oplog& Oplog,
- WorkerThreadPool& NetworkWorkerPool,
- WorkerThreadPool& WorkerPool,
- bool ForceDownload,
- bool IgnoreMissingAttachments,
- bool CleanOplog,
- JobContext* OptionalContext)
+LoadOplog(CidStore& ChunkStore,
+ RemoteProjectStore& RemoteStore,
+ ProjectStore::Oplog& Oplog,
+ WorkerThreadPool& NetworkWorkerPool,
+ WorkerThreadPool& WorkerPool,
+ bool ForceDownload,
+ bool IgnoreMissingAttachments,
+ bool CleanOplog,
+ EPartialBlockRequestMode PartialBlockRequestMode,
+ double HostLatencySec,
+ double CacheLatencySec,
+ JobContext* OptionalContext)
{
using namespace std::literals;
+ std::unique_ptr<OperationLogOutput> LogOutput(std::make_unique<remotestore_impl::JobContextLogOutput>(OptionalContext));
+
remotestore_impl::DownloadInfo Info;
Stopwatch Timer;
@@ -3035,6 +3351,14 @@ LoadOplog(CidStore& ChunkStore,
return false;
};
+ struct NeededBlockDownload
+ {
+ ThinChunkBlockDescription ThinBlockDescription;
+ std::vector<uint32_t> NeededChunkIndexes;
+ };
+
+ std::vector<NeededBlockDownload> NeededBlockDownloads;
+
auto OnNeedBlock = [&RemoteStore,
&ChunkStore,
&NetworkWorkerPool,
@@ -3047,8 +3371,9 @@ LoadOplog(CidStore& ChunkStore,
&Info,
&LoadAttachmentsTimer,
&DownloadStartMS,
+ &NeededBlockDownloads,
IgnoreMissingAttachments,
- OptionalContext](const IoHash& BlockHash, std::vector<IoHash>&& Chunks) {
+ OptionalContext](ThinChunkBlockDescription&& ThinBlockDescription, std::vector<uint32_t>&& NeededChunkIndexes) {
if (RemoteResult.IsError())
{
return;
@@ -3056,7 +3381,7 @@ LoadOplog(CidStore& ChunkStore,
BlockCountToDownload++;
AttachmentCount.fetch_add(1);
- if (BlockHash == IoHash::Zero)
+ if (ThinBlockDescription.BlockHash == IoHash::Zero)
{
DownloadAndSaveBlockChunks(ChunkStore,
RemoteStore,
@@ -3070,25 +3395,13 @@ LoadOplog(CidStore& ChunkStore,
Info,
LoadAttachmentsTimer,
DownloadStartMS,
- Chunks);
+ std::move(ThinBlockDescription),
+ std::move(NeededChunkIndexes));
}
else
{
- DownloadAndSaveBlock(ChunkStore,
- RemoteStore,
- IgnoreMissingAttachments,
- OptionalContext,
- NetworkWorkerPool,
- WorkerPool,
- AttachmentsDownloadLatch,
- AttachmentsWriteLatch,
- RemoteResult,
- Info,
- LoadAttachmentsTimer,
- DownloadStartMS,
- BlockHash,
- Chunks,
- 3);
+ NeededBlockDownloads.push_back(NeededBlockDownload{.ThinBlockDescription = std::move(ThinBlockDescription),
+ .NeededChunkIndexes = std::move(NeededChunkIndexes)});
}
};
@@ -3132,12 +3445,7 @@ LoadOplog(CidStore& ChunkStore,
};
std::vector<ChunkedInfo> FilesToDechunk;
- auto OnChunkedAttachment = [&Oplog, &ChunkStore, &FilesToDechunk, ForceDownload](const ChunkedInfo& Chunked) {
- if (ForceDownload || !ChunkStore.ContainsChunk(Chunked.RawHash))
- {
- FilesToDechunk.push_back(Chunked);
- }
- };
+ auto OnChunkedAttachment = [&FilesToDechunk](const ChunkedInfo& Chunked) { FilesToDechunk.push_back(Chunked); };
auto OnReferencedAttachments = [&Oplog](std::span<IoHash> RawHashes) { Oplog.CaptureAddedAttachments(RawHashes); };
@@ -3165,6 +3473,185 @@ LoadOplog(CidStore& ChunkStore,
BlockCountToDownload,
FilesToDechunk.size()));
+ std::vector<IoHash> BlockHashes;
+ std::vector<IoHash> AllNeededChunkHashes;
+ BlockHashes.reserve(NeededBlockDownloads.size());
+ for (const NeededBlockDownload& BlockDownload : NeededBlockDownloads)
+ {
+ BlockHashes.push_back(BlockDownload.ThinBlockDescription.BlockHash);
+ for (uint32_t ChunkIndex : BlockDownload.NeededChunkIndexes)
+ {
+ AllNeededChunkHashes.push_back(BlockDownload.ThinBlockDescription.ChunkRawHashes[ChunkIndex]);
+ }
+ }
+
+ tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> AllNeededPartialChunkHashesLookup = BuildHashLookup(AllNeededChunkHashes);
+ std::vector<std::atomic<bool>> ChunkDownloadedFlags(AllNeededChunkHashes.size());
+ std::vector<bool> DownloadedViaLegacyChunkFlag(AllNeededChunkHashes.size(), false);
+ ChunkBlockAnalyser::BlockResult PartialBlocksResult;
+
+ RemoteProjectStore::GetBlockDescriptionsResult BlockDescriptions = RemoteStore.GetBlockDescriptions(BlockHashes);
+ std::vector<IoHash> BlocksWithDescription;
+ BlocksWithDescription.reserve(BlockDescriptions.Blocks.size());
+ for (const ChunkBlockDescription& BlockDescription : BlockDescriptions.Blocks)
+ {
+ BlocksWithDescription.push_back(BlockDescription.BlockHash);
+ }
+ {
+ auto WantIt = NeededBlockDownloads.begin();
+ auto FindIt = BlockDescriptions.Blocks.begin();
+ while (WantIt != NeededBlockDownloads.end())
+ {
+ if (FindIt == BlockDescriptions.Blocks.end())
+ {
+ // Fall back to full download as we can't get enough information about the block
+ DownloadAndSaveBlock(ChunkStore,
+ RemoteStore,
+ IgnoreMissingAttachments,
+ OptionalContext,
+ NetworkWorkerPool,
+ WorkerPool,
+ AttachmentsDownloadLatch,
+ AttachmentsWriteLatch,
+ RemoteResult,
+ Info,
+ LoadAttachmentsTimer,
+ DownloadStartMS,
+ WantIt->ThinBlockDescription.BlockHash,
+ AllNeededPartialChunkHashesLookup,
+ ChunkDownloadedFlags,
+ 3);
+ for (uint32_t BlockChunkIndex : WantIt->NeededChunkIndexes)
+ {
+ const IoHash& ChunkHash = WantIt->ThinBlockDescription.ChunkRawHashes[BlockChunkIndex];
+ auto It = AllNeededPartialChunkHashesLookup.find(ChunkHash);
+ ZEN_ASSERT(It != AllNeededPartialChunkHashesLookup.end());
+ uint32_t ChunkIndex = It->second;
+ DownloadedViaLegacyChunkFlag[ChunkIndex] = true;
+ }
+ WantIt++;
+ }
+ else if (WantIt->ThinBlockDescription.BlockHash == FindIt->BlockHash)
+ {
+ // Found
+ FindIt++;
+ WantIt++;
+ }
+ else
+ {
+ // Not a requested block?
+ ZEN_ASSERT(false);
+ }
+ }
+ }
+ if (!AllNeededChunkHashes.empty())
+ {
+ std::vector<ChunkBlockAnalyser::EPartialBlockDownloadMode> PartialBlockDownloadModes;
+
+ if (PartialBlockRequestMode == EPartialBlockRequestMode::Off)
+ {
+ PartialBlockDownloadModes.resize(BlocksWithDescription.size(), ChunkBlockAnalyser::EPartialBlockDownloadMode::Off);
+ }
+ else
+ {
+ RemoteProjectStore::AttachmentExistsInCacheResult CacheExistsResult =
+ RemoteStore.AttachmentExistsInCache(BlocksWithDescription);
+ if (CacheExistsResult.ErrorCode != 0 || CacheExistsResult.HasBody.size() != BlocksWithDescription.size())
+ {
+ CacheExistsResult.HasBody.resize(BlocksWithDescription.size(), false);
+ }
+
+ PartialBlockDownloadModes.reserve(BlocksWithDescription.size());
+
+ for (bool ExistsInCache : CacheExistsResult.HasBody)
+ {
+ if (PartialBlockRequestMode == EPartialBlockRequestMode::All)
+ {
+ PartialBlockDownloadModes.push_back(ExistsInCache ? ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRangeHighSpeed
+ : ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRange);
+ }
+ else if (PartialBlockRequestMode == EPartialBlockRequestMode::ZenCacheOnly)
+ {
+ PartialBlockDownloadModes.push_back(ExistsInCache ? ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRangeHighSpeed
+ : ChunkBlockAnalyser::EPartialBlockDownloadMode::Off);
+ }
+ else if (PartialBlockRequestMode == EPartialBlockRequestMode::Mixed)
+ {
+ PartialBlockDownloadModes.push_back(ExistsInCache ? ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRangeHighSpeed
+ : ChunkBlockAnalyser::EPartialBlockDownloadMode::SingleRange);
+ }
+ }
+ }
+
+ ZEN_ASSERT(PartialBlockDownloadModes.size() == BlocksWithDescription.size());
+
+ ChunkBlockAnalyser PartialAnalyser(*LogOutput,
+ BlockDescriptions.Blocks,
+ ChunkBlockAnalyser::Options{.IsQuiet = false,
+ .IsVerbose = false,
+ .HostLatencySec = HostLatencySec,
+ .HostHighSpeedLatencySec = CacheLatencySec});
+
+ std::vector<ChunkBlockAnalyser::NeededBlock> NeededBlocks =
+ PartialAnalyser.GetNeeded(AllNeededPartialChunkHashesLookup,
+ [&](uint32_t ChunkIndex) { return !DownloadedViaLegacyChunkFlag[ChunkIndex]; });
+
+ PartialBlocksResult = PartialAnalyser.CalculatePartialBlockDownloads(NeededBlocks, PartialBlockDownloadModes);
+ for (uint32_t FullBlockIndex : PartialBlocksResult.FullBlockIndexes)
+ {
+ DownloadAndSaveBlock(ChunkStore,
+ RemoteStore,
+ IgnoreMissingAttachments,
+ OptionalContext,
+ NetworkWorkerPool,
+ WorkerPool,
+ AttachmentsDownloadLatch,
+ AttachmentsWriteLatch,
+ RemoteResult,
+ Info,
+ LoadAttachmentsTimer,
+ DownloadStartMS,
+ BlockDescriptions.Blocks[FullBlockIndex].BlockHash,
+ AllNeededPartialChunkHashesLookup,
+ ChunkDownloadedFlags,
+ 3);
+ }
+
+ for (size_t BlockRangeIndex = 0; BlockRangeIndex < PartialBlocksResult.BlockRanges.size();)
+ {
+ size_t RangeCount = 1;
+ size_t RangesLeft = PartialBlocksResult.BlockRanges.size() - BlockRangeIndex;
+ const ChunkBlockAnalyser::BlockRangeDescriptor& CurrentBlockRange = PartialBlocksResult.BlockRanges[BlockRangeIndex];
+ while (RangeCount < RangesLeft &&
+ CurrentBlockRange.BlockIndex == PartialBlocksResult.BlockRanges[BlockRangeIndex + RangeCount].BlockIndex)
+ {
+ RangeCount++;
+ }
+
+ DownloadAndSavePartialBlock(ChunkStore,
+ RemoteStore,
+ IgnoreMissingAttachments,
+ OptionalContext,
+ NetworkWorkerPool,
+ WorkerPool,
+ AttachmentsDownloadLatch,
+ AttachmentsWriteLatch,
+ RemoteResult,
+ Info,
+ LoadAttachmentsTimer,
+ DownloadStartMS,
+ BlockDescriptions.Blocks[CurrentBlockRange.BlockIndex],
+ PartialBlocksResult.BlockRanges,
+ BlockRangeIndex,
+ RangeCount,
+ AllNeededPartialChunkHashesLookup,
+ ChunkDownloadedFlags,
+ 3);
+
+ BlockRangeIndex += RangeCount;
+ }
+ }
+
AttachmentsDownloadLatch.CountDown();
while (!AttachmentsDownloadLatch.Wait(1000))
{
@@ -3478,21 +3965,30 @@ LoadOplog(CidStore& ChunkStore,
}
}
- remotestore_impl::ReportMessage(
- OptionalContext,
- fmt::format("Loaded oplog '{}' {} in {} ({}), Blocks: {} ({}), Attachments: {} ({}), Stored: {} ({}), Missing: {} {}",
- RemoteStoreInfo.ContainerName,
- Result.ErrorCode == 0 ? "SUCCESS" : "FAILURE",
- NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0)),
- NiceBytes(Info.OplogSizeBytes),
- Info.AttachmentBlocksDownloaded.load(),
- NiceBytes(Info.AttachmentBlockBytesDownloaded.load()),
- Info.AttachmentsDownloaded.load(),
- NiceBytes(Info.AttachmentBytesDownloaded.load()),
- Info.AttachmentsStored.load(),
- NiceBytes(Info.AttachmentBytesStored.load()),
- Info.MissingAttachmentCount.load(),
- remotestore_impl::GetStats(RemoteStore.GetStats(), TransferWallTimeMS)));
+ uint64_t TotalDownloads =
+ 1 + Info.AttachmentBlocksDownloaded.load() + Info.AttachmentBlocksRangesDownloaded.load() + Info.AttachmentsDownloaded.load();
+ uint64_t TotalBytesDownloaded = Info.OplogSizeBytes + Info.AttachmentBlockBytesDownloaded.load() +
+ Info.AttachmentBlockRangeBytesDownloaded.load() + Info.AttachmentBytesDownloaded.load();
+
+ remotestore_impl::ReportMessage(OptionalContext,
+ fmt::format("Loaded oplog '{}' {} in {} ({}), Blocks: {} ({}), BlockRanges: {} ({}), Attachments: {} "
+ "({}), Total: {} ({}), Stored: {} ({}), Missing: {} {}",
+ RemoteStoreInfo.ContainerName,
+ Result.ErrorCode == 0 ? "SUCCESS" : "FAILURE",
+ NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0)),
+ NiceBytes(Info.OplogSizeBytes),
+ Info.AttachmentBlocksDownloaded.load(),
+ NiceBytes(Info.AttachmentBlockBytesDownloaded.load()),
+ Info.AttachmentBlocksRangesDownloaded.load(),
+ NiceBytes(Info.AttachmentBlockRangeBytesDownloaded.load()),
+ Info.AttachmentsDownloaded.load(),
+ NiceBytes(Info.AttachmentBytesDownloaded.load()),
+ TotalDownloads,
+ NiceBytes(TotalBytesDownloaded),
+ Info.AttachmentsStored.load(),
+ NiceBytes(Info.AttachmentBytesStored.load()),
+ Info.MissingAttachmentCount.load(),
+ remotestore_impl::GetStats(RemoteStore.GetStats(), TransferWallTimeMS)));
return Result;
}
@@ -3697,6 +4193,9 @@ TEST_CASE_TEMPLATE("project.store.export",
/*Force*/ false,
/*IgnoreMissingAttachments*/ false,
/*CleanOplog*/ false,
+ EPartialBlockRequestMode::Mixed,
+ /*HostLatencySec*/ -1.0,
+ /*CacheLatencySec*/ -1.0,
nullptr);
CHECK(ImportResult.ErrorCode == 0);
@@ -3708,6 +4207,9 @@ TEST_CASE_TEMPLATE("project.store.export",
/*Force*/ true,
/*IgnoreMissingAttachments*/ false,
/*CleanOplog*/ false,
+ EPartialBlockRequestMode::Mixed,
+ /*HostLatencySec*/ -1.0,
+ /*CacheLatencySec*/ -1.0,
nullptr);
CHECK(ImportForceResult.ErrorCode == 0);
@@ -3719,6 +4221,9 @@ TEST_CASE_TEMPLATE("project.store.export",
/*Force*/ false,
/*IgnoreMissingAttachments*/ false,
/*CleanOplog*/ true,
+ EPartialBlockRequestMode::Mixed,
+ /*HostLatencySec*/ -1.0,
+ /*CacheLatencySec*/ -1.0,
nullptr);
CHECK(ImportCleanResult.ErrorCode == 0);
@@ -3730,6 +4235,9 @@ TEST_CASE_TEMPLATE("project.store.export",
/*Force*/ true,
/*IgnoreMissingAttachments*/ false,
/*CleanOplog*/ true,
+ EPartialBlockRequestMode::Mixed,
+ /*HostLatencySec*/ -1.0,
+ /*CacheLatencySec*/ -1.0,
nullptr);
CHECK(ImportForceCleanResult.ErrorCode == 0);
}
diff --git a/src/zenremotestore/projectstore/zenremoteprojectstore.cpp b/src/zenremotestore/projectstore/zenremoteprojectstore.cpp
index ab82edbef..b4c1156ac 100644
--- a/src/zenremotestore/projectstore/zenremoteprojectstore.cpp
+++ b/src/zenremotestore/projectstore/zenremoteprojectstore.cpp
@@ -249,7 +249,18 @@ public:
return GetKnownBlocksResult{{.ErrorCode = static_cast<int>(HttpResponseCode::NoContent)}};
}
- virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) override
+ virtual GetBlockDescriptionsResult GetBlockDescriptions(std::span<const IoHash> BlockHashes) override
+ {
+ ZEN_UNUSED(BlockHashes);
+ return GetBlockDescriptionsResult{Result{.ErrorCode = int(HttpResponseCode::NotFound)}};
+ }
+
+ virtual AttachmentExistsInCacheResult AttachmentExistsInCache(std::span<const IoHash> RawHashes) override
+ {
+ return AttachmentExistsInCacheResult{Result{.ErrorCode = 0}, std::vector<bool>(RawHashes.size(), false)};
+ }
+
+ virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash, const AttachmentRange& Range) override
{
std::string LoadRequest = fmt::format("/{}/oplog/{}/{}"sv, m_Project, m_Oplog, RawHash);
HttpClient::Response Response =
@@ -257,12 +268,7 @@ public:
AddStats(Response);
LoadAttachmentResult Result = LoadAttachmentResult{ConvertResult(Response)};
- if (!Result.ErrorCode)
- {
- Result.Bytes = Response.ResponsePayload;
- Result.Bytes.MakeOwned();
- }
- if (!Result.ErrorCode)
+ if (Result.ErrorCode)
{
Result.Reason = fmt::format("Failed fetching oplog attachment from {}/{}/{}/{}. Reason: '{}'",
m_ProjectStoreUrl,
@@ -271,6 +277,15 @@ public:
RawHash,
Result.Reason);
}
+ if (!Result.ErrorCode && Range)
+ {
+ Result.Bytes = IoBuffer(Response.ResponsePayload, Range.Offset, Range.Bytes);
+ }
+ else
+ {
+ Result.Bytes = Response.ResponsePayload;
+ }
+ Result.Bytes.MakeOwned();
return Result;
}
diff --git a/src/zenserver/storage/projectstore/httpprojectstore.cpp b/src/zenserver/storage/projectstore/httpprojectstore.cpp
index 416e2ed69..2b5474d00 100644
--- a/src/zenserver/storage/projectstore/httpprojectstore.cpp
+++ b/src/zenserver/storage/projectstore/httpprojectstore.cpp
@@ -244,6 +244,8 @@ namespace {
{
std::shared_ptr<RemoteProjectStore> Store;
std::string Description;
+ double HostLatencySec = -1.0;
+ double CacheLatencySec = -1.0;
};
CreateRemoteStoreResult CreateRemoteStore(LoggerRef InLog,
@@ -261,6 +263,8 @@ namespace {
using namespace std::literals;
std::shared_ptr<RemoteProjectStore> RemoteStore;
+ double HostLatencySec = -1.0;
+ double CacheLatencySec = -1.0;
if (CbObjectView File = Params["file"sv].AsObjectView(); File)
{
@@ -495,7 +499,9 @@ namespace {
/*Quiet*/ false,
/*Unattended*/ false,
/*Hidden*/ true,
- GetTinyWorkerPool(EWorkloadType::Background));
+ GetTinyWorkerPool(EWorkloadType::Background),
+ HostLatencySec,
+ CacheLatencySec);
}
if (!RemoteStore)
@@ -503,7 +509,10 @@ namespace {
return {nullptr, "Unknown remote store type"};
}
- return {std::move(RemoteStore), ""};
+ return CreateRemoteStoreResult{.Store = std::move(RemoteStore),
+ .Description = "",
+ .HostLatencySec = HostLatencySec,
+ .CacheLatencySec = CacheLatencySec};
}
std::pair<HttpResponseCode, std::string> ConvertResult(const RemoteProjectStore::Result& Result)
@@ -2356,15 +2365,19 @@ HttpProjectService::HandleOplogSaveRequest(HttpRouterRequest& Req)
tsl::robin_set<IoHash, IoHash::Hasher> Attachments;
auto HasAttachment = [this](const IoHash& RawHash) { return m_CidStore.ContainsChunk(RawHash); };
- auto OnNeedBlock = [&AttachmentsLock, &Attachments](const IoHash& BlockHash, const std::vector<IoHash>&& ChunkHashes) {
+ auto OnNeedBlock = [&AttachmentsLock, &Attachments](ThinChunkBlockDescription&& ThinBlockDescription,
+ std::vector<uint32_t>&& NeededChunkIndexes) {
RwLock::ExclusiveLockScope _(AttachmentsLock);
- if (BlockHash != IoHash::Zero)
+ if (ThinBlockDescription.BlockHash != IoHash::Zero)
{
- Attachments.insert(BlockHash);
+ Attachments.insert(ThinBlockDescription.BlockHash);
}
else
{
- Attachments.insert(ChunkHashes.begin(), ChunkHashes.end());
+ for (uint32_t ChunkIndex : NeededChunkIndexes)
+ {
+ Attachments.insert(ThinBlockDescription.ChunkRawHashes[ChunkIndex]);
+ }
}
};
auto OnNeedAttachment = [&AttachmentsLock, &Attachments](const IoHash& RawHash) {
@@ -2663,6 +2676,8 @@ HttpProjectService::HandleRpcRequest(HttpRouterRequest& Req)
bool CleanOplog = Params["clean"].AsBool(false);
bool BoostWorkerCount = Params["boostworkercount"].AsBool(false);
bool BoostWorkerMemory = Params["boostworkermemory"sv].AsBool(false);
+ EPartialBlockRequestMode PartialBlockRequestMode =
+ PartialBlockRequestModeFromString(Params["partialblockrequestmode"sv].AsString("true"));
CreateRemoteStoreResult RemoteStoreResult = CreateRemoteStore(Log(),
Params,
@@ -2688,6 +2703,9 @@ HttpProjectService::HandleRpcRequest(HttpRouterRequest& Req)
Force,
IgnoreMissingAttachments,
CleanOplog,
+ PartialBlockRequestMode,
+ HostLatencySec = RemoteStoreResult.HostLatencySec,
+ CacheLatencySec = RemoteStoreResult.CacheLatencySec,
BoostWorkerCount](JobContext& Context) {
Context.ReportMessage(fmt::format("Loading oplog '{}/{}' from {}",
Oplog->GetOuterProjectIdentifier(),
@@ -2709,6 +2727,9 @@ HttpProjectService::HandleRpcRequest(HttpRouterRequest& Req)
Force,
IgnoreMissingAttachments,
CleanOplog,
+ PartialBlockRequestMode,
+ HostLatencySec,
+ CacheLatencySec,
&Context);
auto Response = ConvertResult(Result);
ZEN_INFO("LoadOplog: Status: {} '{}'", ToString(Response.first), Response.second);