aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2026-03-03 20:49:01 +0100
committerGitHub Enterprise <[email protected]>2026-03-03 20:49:01 +0100
commit463a0fde16b827c0ec44c9e88fe3c8c8098aa5ea (patch)
tree736553b3ded853fe945bdeea7585631617d171c3 /src
parentfix objectstore uri path parsing (#801) (diff)
downloadzen-463a0fde16b827c0ec44c9e88fe3c8c8098aa5ea.tar.xz
zen-463a0fde16b827c0ec44c9e88fe3c8c8098aa5ea.zip
use multi range requests (#800)
- Improvement: `zen builds download` now uses multi-range requests for blocks to reduce download size - Improvement: `zen oplog-import` now uses partial block with multi-range requests for blocks to reduce download size - Improvement: Improved feedback in log/console during `zen oplog-import` - Improvement: `--allow-partial-block-requests` now defaults to `true` for `zen builds download` and `zen oplog-import` (was `mixed`) - Improvement: Improved range merging analysis when downloading partial blocks
Diffstat (limited to 'src')
-rw-r--r--src/zen/cmds/builds_cmd.h2
-rw-r--r--src/zen/cmds/projectstore_cmd.h2
-rw-r--r--src/zen/cmds/workspaces_cmd.cpp2
-rw-r--r--src/zen/progressbar.cpp5
-rw-r--r--src/zenhttp/clients/httpclientcommon.cpp33
-rw-r--r--src/zenhttp/httpclient.cpp11
-rw-r--r--src/zenhttp/include/zenhttp/httpclient.h4
-rw-r--r--src/zenremotestore/builds/buildstoragecache.cpp72
-rw-r--r--src/zenremotestore/builds/buildstorageoperations.cpp581
-rw-r--r--src/zenremotestore/builds/buildstorageutil.cpp17
-rw-r--r--src/zenremotestore/builds/filebuildstorage.cpp39
-rw-r--r--src/zenremotestore/builds/jupiterbuildstorage.cpp22
-rw-r--r--src/zenremotestore/chunking/chunkblock.cpp63
-rw-r--r--src/zenremotestore/include/zenremotestore/builds/buildstorage.h21
-rw-r--r--src/zenremotestore/include/zenremotestore/builds/buildstoragecache.h8
-rw-r--r--src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h11
-rw-r--r--src/zenremotestore/include/zenremotestore/builds/buildstorageutil.h1
-rw-r--r--src/zenremotestore/include/zenremotestore/chunking/chunkblock.h31
-rw-r--r--src/zenremotestore/include/zenremotestore/jupiter/jupitersession.h12
-rw-r--r--src/zenremotestore/include/zenremotestore/projectstore/remoteprojectstore.h26
-rw-r--r--src/zenremotestore/jupiter/jupitersession.cpp65
-rw-r--r--src/zenremotestore/projectstore/buildsremoteprojectstore.cpp100
-rw-r--r--src/zenremotestore/projectstore/fileremoteprojectstore.cpp68
-rw-r--r--src/zenremotestore/projectstore/jupiterremoteprojectstore.cpp60
-rw-r--r--src/zenremotestore/projectstore/remoteprojectstore.cpp879
-rw-r--r--src/zenremotestore/projectstore/zenremoteprojectstore.cpp145
-rw-r--r--src/zenserver/storage/buildstore/httpbuildstore.cpp24
27 files changed, 1582 insertions, 722 deletions
diff --git a/src/zen/cmds/builds_cmd.h b/src/zen/cmds/builds_cmd.h
index f5c44ab55..5c80beed5 100644
--- a/src/zen/cmds/builds_cmd.h
+++ b/src/zen/cmds/builds_cmd.h
@@ -71,7 +71,7 @@ private:
bool m_AppendNewContent = false;
uint8_t m_BlockReuseMinPercentLimit = 85;
bool m_AllowMultiparts = true;
- std::string m_AllowPartialBlockRequests = "mixed";
+ std::string m_AllowPartialBlockRequests = "true";
AuthCommandLineOptions m_AuthOptions;
diff --git a/src/zen/cmds/projectstore_cmd.h b/src/zen/cmds/projectstore_cmd.h
index 17fd76e9f..1ba98b39e 100644
--- a/src/zen/cmds/projectstore_cmd.h
+++ b/src/zen/cmds/projectstore_cmd.h
@@ -210,7 +210,7 @@ private:
bool m_BoostWorkerMemory = false;
bool m_BoostWorkers = false;
- std::string m_AllowPartialBlockRequests = "mixed";
+ std::string m_AllowPartialBlockRequests = "true";
};
class SnapshotOplogCommand : public ProjectStoreCommand
diff --git a/src/zen/cmds/workspaces_cmd.cpp b/src/zen/cmds/workspaces_cmd.cpp
index 2661ac9da..af265d898 100644
--- a/src/zen/cmds/workspaces_cmd.cpp
+++ b/src/zen/cmds/workspaces_cmd.cpp
@@ -815,7 +815,7 @@ WorkspaceShareCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char**
if (Results.size() != m_ChunkIds.size())
{
throw std::runtime_error(
- fmt::format("failed to get workspace share batch - invalid result count recevied (expected: {}, received: {}",
+ fmt::format("failed to get workspace share batch - invalid result count received (expected: {}, received: {}",
m_ChunkIds.size(),
Results.size()));
}
diff --git a/src/zen/progressbar.cpp b/src/zen/progressbar.cpp
index 732f16e81..9467ed60d 100644
--- a/src/zen/progressbar.cpp
+++ b/src/zen/progressbar.cpp
@@ -207,8 +207,9 @@ ProgressBar::UpdateState(const State& NewState, bool DoLinebreak)
size_t ProgressBarCount = (ProgressBarSize * PercentDone) / 100;
uint64_t Completed = NewState.TotalCount - NewState.RemainingCount;
uint64_t ETAElapsedMS = ElapsedTimeMS -= m_PausedMS;
- uint64_t ETAMS =
- (NewState.Status == State::EStatus::Running) && (PercentDone > 5) ? (ETAElapsedMS * NewState.RemainingCount) / Completed : 0;
+ uint64_t ETAMS = ((m_State.TotalCount == NewState.TotalCount) && (NewState.Status == State::EStatus::Running)) && (PercentDone > 5)
+ ? (ETAElapsedMS * NewState.RemainingCount) / Completed
+ : 0;
uint32_t ConsoleColumns = TuiConsoleColumns(1024);
diff --git a/src/zenhttp/clients/httpclientcommon.cpp b/src/zenhttp/clients/httpclientcommon.cpp
index 248ae9d70..9ded23375 100644
--- a/src/zenhttp/clients/httpclientcommon.cpp
+++ b/src/zenhttp/clients/httpclientcommon.cpp
@@ -394,31 +394,28 @@ namespace detail {
{
// Yes, we do a substring of the non-lowercase value string as we want the exact boundary string
std::string_view BoundaryName = std::string_view(ContentTypeHeaderValue).substr(BoundaryPos + 9);
+ size_t BoundaryEnd = std::string::npos;
+ while (!BoundaryName.empty() && BoundaryName[0] == ' ')
+ {
+ BoundaryName = BoundaryName.substr(1);
+ }
if (!BoundaryName.empty())
{
- size_t BoundaryEnd = std::string::npos;
- while (BoundaryName[0] == ' ')
- {
- BoundaryName = BoundaryName.substr(1);
- }
- if (!BoundaryName.empty())
+ if (BoundaryName.size() > 2 && BoundaryName.front() == '"' && BoundaryName.back() == '"')
{
- if (BoundaryName.size() > 2 && BoundaryName.front() == '"' && BoundaryName.back() == '"')
+ BoundaryEnd = BoundaryName.find('"', 1);
+ if (BoundaryEnd != std::string::npos)
{
- BoundaryEnd = BoundaryName.find('"', 1);
- if (BoundaryEnd != std::string::npos)
- {
- BoundaryBeginMatcher.Init(fmt::format("\r\n--{}", BoundaryName.substr(1, BoundaryEnd - 1)));
- return true;
- }
- }
- else
- {
- BoundaryEnd = BoundaryName.find_first_of(" \r\n");
- BoundaryBeginMatcher.Init(fmt::format("\r\n--{}", BoundaryName.substr(0, BoundaryEnd)));
+ BoundaryBeginMatcher.Init(fmt::format("\r\n--{}", BoundaryName.substr(1, BoundaryEnd - 1)));
return true;
}
}
+ else
+ {
+ BoundaryEnd = BoundaryName.find_first_of(" \r\n");
+ BoundaryBeginMatcher.Init(fmt::format("\r\n--{}", BoundaryName.substr(0, BoundaryEnd)));
+ return true;
+ }
}
}
}
diff --git a/src/zenhttp/httpclient.cpp b/src/zenhttp/httpclient.cpp
index f94c58581..281d512cf 100644
--- a/src/zenhttp/httpclient.cpp
+++ b/src/zenhttp/httpclient.cpp
@@ -107,17 +107,14 @@ HttpClientBase::GetAccessToken()
std::vector<std::pair<uint64_t, uint64_t>>
HttpClient::Response::GetRanges(std::span<const std::pair<uint64_t, uint64_t>> OffsetAndLengthPairs) const
{
- std::vector<std::pair<uint64_t, uint64_t>> Result;
- Result.reserve(OffsetAndLengthPairs.size());
if (Ranges.empty())
{
- for (const std::pair<uint64_t, uint64_t>& Range : OffsetAndLengthPairs)
- {
- Result.emplace_back(std::make_pair(Range.first, Range.second));
- }
- return Result;
+ return {};
}
+ std::vector<std::pair<uint64_t, uint64_t>> Result;
+ Result.reserve(OffsetAndLengthPairs.size());
+
auto BoundaryIt = Ranges.begin();
auto OffsetAndLengthPairIt = OffsetAndLengthPairs.begin();
while (OffsetAndLengthPairIt != OffsetAndLengthPairs.end())
diff --git a/src/zenhttp/include/zenhttp/httpclient.h b/src/zenhttp/include/zenhttp/httpclient.h
index f00bbec63..53be36b9a 100644
--- a/src/zenhttp/include/zenhttp/httpclient.h
+++ b/src/zenhttp/include/zenhttp/httpclient.h
@@ -190,10 +190,12 @@ public:
HttpContentType ContentType;
};
- // Ranges will map out all recevied ranges, both single and multi-range responses
+ // Ranges will map out all received ranges, both single and multi-range responses
// If no range was requested Ranges will be empty
std::vector<MultipartBoundary> Ranges;
+ // Map the absolute OffsetAndLengthPairs into ResponsePayload from the ranges received (Ranges).
+ // If the response was not a partial response, an empty vector will be returned
std::vector<std::pair<uint64_t, uint64_t>> GetRanges(std::span<const std::pair<uint64_t, uint64_t>> OffsetAndLengthPairs) const;
// This contains any errors from the HTTP stack. It won't contain information on
diff --git a/src/zenremotestore/builds/buildstoragecache.cpp b/src/zenremotestore/builds/buildstoragecache.cpp
index faa85f81b..53d33bd7e 100644
--- a/src/zenremotestore/builds/buildstoragecache.cpp
+++ b/src/zenremotestore/builds/buildstoragecache.cpp
@@ -151,7 +151,7 @@ public:
auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
HttpClient::Response CacheResponse =
- m_HttpClient.Upload(fmt::format("/builds/{}/{}/{}/blobs/{}", m_Namespace, m_Bucket, BuildId, RawHash.ToHexString()),
+ m_HttpClient.Upload(fmt::format("/builds/{}/{}/{}/blobs/{}", m_Namespace, m_Bucket, BuildId, RawHash),
Payload,
ContentType);
@@ -180,7 +180,7 @@ public:
}
CreateDirectories(m_TempFolderPath);
HttpClient::Response CacheResponse =
- m_HttpClient.Download(fmt::format("/builds/{}/{}/{}/blobs/{}", m_Namespace, m_Bucket, BuildId, RawHash.ToHexString()),
+ m_HttpClient.Download(fmt::format("/builds/{}/{}/{}/blobs/{}", m_Namespace, m_Bucket, BuildId, RawHash),
m_TempFolderPath,
Headers);
AddStatistic(CacheResponse);
@@ -191,6 +191,74 @@ public:
return {};
}
+ virtual BuildBlobRanges GetBuildBlobRanges(const Oid& BuildId,
+ const IoHash& RawHash,
+ std::span<const std::pair<uint64_t, uint64_t>> Ranges) override
+ {
+ ZEN_TRACE_CPU("ZenBuildStorageCache::GetBuildBlobRanges");
+
+ Stopwatch ExecutionTimer;
+ auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
+
+ CbObjectWriter Writer;
+ Writer.BeginArray("ranges"sv);
+ {
+ for (const std::pair<uint64_t, uint64_t>& Range : Ranges)
+ {
+ Writer.BeginObject();
+ {
+ Writer.AddInteger("offset"sv, Range.first);
+ Writer.AddInteger("length"sv, Range.second);
+ }
+ Writer.EndObject();
+ }
+ }
+ Writer.EndArray(); // ranges
+
+ CreateDirectories(m_TempFolderPath);
+ HttpClient::Response CacheResponse =
+ m_HttpClient.Post(fmt::format("/builds/{}/{}/{}/blobs/{}", m_Namespace, m_Bucket, BuildId, RawHash),
+ Writer.Save(),
+ HttpClient::Accept(ZenContentType::kCbPackage));
+ AddStatistic(CacheResponse);
+ if (CacheResponse.IsSuccess())
+ {
+ CbPackage ResponsePackage = ParsePackageMessage(CacheResponse.ResponsePayload);
+ CbObjectView ResponseObject = ResponsePackage.GetObject();
+
+ CbArrayView RangeArray = ResponseObject["ranges"sv].AsArrayView();
+
+ std::vector<std::pair<uint64_t, uint64_t>> ReceivedRanges;
+ ReceivedRanges.reserve(RangeArray.Num());
+
+ uint64_t OffsetInPayloadRanges = 0;
+
+ for (CbFieldView View : RangeArray)
+ {
+ CbObjectView RangeView = View.AsObjectView();
+ uint64_t Offset = RangeView["offset"sv].AsUInt64();
+ uint64_t Length = RangeView["length"sv].AsUInt64();
+
+ const std::pair<uint64_t, uint64_t>& Range = Ranges[ReceivedRanges.size()];
+
+ if (Offset != Range.first || Length != Range.second)
+ {
+ return {};
+ }
+ ReceivedRanges.push_back(std::make_pair(OffsetInPayloadRanges, Length));
+ OffsetInPayloadRanges += Length;
+ }
+
+ const CbAttachment* DataAttachment = ResponsePackage.FindAttachment(RawHash);
+ if (DataAttachment)
+ {
+ SharedBuffer PayloadRanges = DataAttachment->AsBinary();
+ return BuildBlobRanges{.PayloadBuffer = PayloadRanges.AsIoBuffer(), .Ranges = std::move(ReceivedRanges)};
+ }
+ }
+ return {};
+ }
+
virtual void PutBlobMetadatas(const Oid& BuildId, std::span<const IoHash> BlobHashes, std::span<const CbObject> MetaDatas) override
{
ZEN_ASSERT(!IsFlushed);
diff --git a/src/zenremotestore/builds/buildstorageoperations.cpp b/src/zenremotestore/builds/buildstorageoperations.cpp
index 5deb00707..f4b4d592b 100644
--- a/src/zenremotestore/builds/buildstorageoperations.cpp
+++ b/src/zenremotestore/builds/buildstorageoperations.cpp
@@ -38,6 +38,7 @@ ZEN_THIRD_PARTY_INCLUDES_END
#if ZEN_WITH_TESTS
# include <zencore/testing.h>
# include <zencore/testutils.h>
+# include <zenhttp/httpclientauth.h>
# include <zenremotestore/builds/filebuildstorage.h>
#endif // ZEN_WITH_TESTS
@@ -883,12 +884,14 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
BlobsExistsResult ExistsResult;
{
- ChunkBlockAnalyser BlockAnalyser(m_LogOutput,
- m_BlockDescriptions,
- ChunkBlockAnalyser::Options{.IsQuiet = m_Options.IsQuiet,
- .IsVerbose = m_Options.IsVerbose,
- .HostLatencySec = m_Storage.BuildStorageLatencySec,
- .HostHighSpeedLatencySec = m_Storage.CacheLatencySec});
+ ChunkBlockAnalyser BlockAnalyser(
+ m_LogOutput,
+ m_BlockDescriptions,
+ ChunkBlockAnalyser::Options{.IsQuiet = m_Options.IsQuiet,
+ .IsVerbose = m_Options.IsVerbose,
+ .HostLatencySec = m_Storage.BuildStorageLatencySec,
+ .HostHighSpeedLatencySec = m_Storage.CacheLatencySec,
+ .HostMaxRangeCountPerRequest = BuildStorageBase::MaxRangeCountPerRequest});
std::vector<ChunkBlockAnalyser::NeededBlock> NeededBlocks = BlockAnalyser.GetNeeded(
m_RemoteLookup.ChunkHashToChunkIndex,
@@ -1027,15 +1030,13 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
const bool BlockExistInCache = ExistsResult.ExistingBlobs.contains(m_BlockDescriptions[BlockIndex].BlockHash);
if (m_Options.PartialBlockRequestMode == EPartialBlockRequestMode::All)
{
- BlockPartialDownloadModes.push_back(BlockExistInCache
- ? ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRangeHighSpeed
- : ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRange);
+ BlockPartialDownloadModes.push_back(BlockExistInCache ? ChunkBlockAnalyser::EPartialBlockDownloadMode::Exact
+ : ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRange);
}
else if (m_Options.PartialBlockRequestMode == EPartialBlockRequestMode::ZenCacheOnly)
{
- BlockPartialDownloadModes.push_back(BlockExistInCache
- ? ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRangeHighSpeed
- : ChunkBlockAnalyser::EPartialBlockDownloadMode::Off);
+ BlockPartialDownloadModes.push_back(BlockExistInCache ? ChunkBlockAnalyser::EPartialBlockDownloadMode::Exact
+ : ChunkBlockAnalyser::EPartialBlockDownloadMode::Off);
}
else if (m_Options.PartialBlockRequestMode == EPartialBlockRequestMode::Mixed)
{
@@ -1045,6 +1046,7 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
}
}
}
+
ZEN_ASSERT(BlockPartialDownloadModes.size() == m_BlockDescriptions.size());
ChunkBlockAnalyser::BlockResult PartialBlocks =
@@ -1356,90 +1358,105 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
&Work,
&PartialBlocks,
BlockRangeStartIndex = BlockRangeIndex,
- RangeCount](std::atomic<bool>&) {
+ RangeCount = RangeCount](std::atomic<bool>&) {
if (!m_AbortFlag)
{
ZEN_TRACE_CPU("Async_GetPartialBlockRanges");
FilteredDownloadedBytesPerSecond.Start();
- for (size_t BlockRangeIndex = BlockRangeStartIndex; BlockRangeIndex < BlockRangeStartIndex + RangeCount;
- BlockRangeIndex++)
- {
- ZEN_TRACE_CPU("GetPartialBlock");
-
- const ChunkBlockAnalyser::BlockRangeDescriptor& BlockRange = PartialBlocks.BlockRanges[BlockRangeIndex];
-
- DownloadPartialBlock(
- BlockRange,
- ExistsResult,
- [this,
- &RemoteChunkIndexNeedsCopyFromSourceFlags,
- &SequenceIndexChunksLeftToWriteCounters,
- &WritePartsComplete,
- &WriteCache,
- &Work,
- TotalRequestCount,
- TotalPartWriteCount,
- &FilteredDownloadedBytesPerSecond,
- &FilteredWrittenBytesPerSecond,
- &BlockRange](IoBuffer&& InMemoryBuffer, const std::filesystem::path& OnDiskPath) {
- if (m_DownloadStats.RequestsCompleteCount == TotalRequestCount)
- {
- FilteredDownloadedBytesPerSecond.Stop();
- }
+ DownloadPartialBlock(
+ PartialBlocks.BlockRanges,
+ BlockRangeStartIndex,
+ RangeCount,
+ ExistsResult,
+ [this,
+ &RemoteChunkIndexNeedsCopyFromSourceFlags,
+ &SequenceIndexChunksLeftToWriteCounters,
+ &WritePartsComplete,
+ &WriteCache,
+ &Work,
+ TotalRequestCount,
+ TotalPartWriteCount,
+ &FilteredDownloadedBytesPerSecond,
+ &FilteredWrittenBytesPerSecond,
+ &PartialBlocks](IoBuffer&& InMemoryBuffer,
+ const std::filesystem::path& OnDiskPath,
+ size_t BlockRangeStartIndex,
+ std::span<const std::pair<uint64_t, uint64_t>> OffsetAndLengths) {
+ if (m_DownloadStats.RequestsCompleteCount == TotalRequestCount)
+ {
+ FilteredDownloadedBytesPerSecond.Stop();
+ }
- if (!m_AbortFlag)
- {
- Work.ScheduleWork(
- m_IOWorkerPool,
- [this,
- &RemoteChunkIndexNeedsCopyFromSourceFlags,
- &SequenceIndexChunksLeftToWriteCounters,
- &WritePartsComplete,
- &WriteCache,
- &Work,
- TotalPartWriteCount,
- &FilteredWrittenBytesPerSecond,
- &BlockRange,
- BlockChunkPath = std::filesystem::path(OnDiskPath),
- BlockPartialBuffer = std::move(InMemoryBuffer)](std::atomic<bool>&) mutable {
- if (!m_AbortFlag)
- {
- ZEN_TRACE_CPU("Async_WritePartialBlock");
+ if (!m_AbortFlag)
+ {
+ Work.ScheduleWork(
+ m_IOWorkerPool,
+ [this,
+ &RemoteChunkIndexNeedsCopyFromSourceFlags,
+ &SequenceIndexChunksLeftToWriteCounters,
+ &WritePartsComplete,
+ &WriteCache,
+ &Work,
+ TotalPartWriteCount,
+ &FilteredWrittenBytesPerSecond,
+ &PartialBlocks,
+ BlockRangeStartIndex,
+ BlockChunkPath = std::filesystem::path(OnDiskPath),
+ BlockPartialBuffer = std::move(InMemoryBuffer),
+ OffsetAndLengths = std::vector<std::pair<uint64_t, uint64_t>>(OffsetAndLengths.begin(),
+ OffsetAndLengths.end())](
+ std::atomic<bool>&) mutable {
+ if (!m_AbortFlag)
+ {
+ ZEN_TRACE_CPU("Async_WritePartialBlock");
- const uint32_t BlockIndex = BlockRange.BlockIndex;
+ const uint32_t BlockIndex = PartialBlocks.BlockRanges[BlockRangeStartIndex].BlockIndex;
- const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex];
+ const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex];
- if (BlockChunkPath.empty())
- {
- ZEN_ASSERT(BlockPartialBuffer);
- }
- else
+ if (BlockChunkPath.empty())
+ {
+ ZEN_ASSERT(BlockPartialBuffer);
+ }
+ else
+ {
+ ZEN_ASSERT(!BlockPartialBuffer);
+ BlockPartialBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath);
+ if (!BlockPartialBuffer)
{
- ZEN_ASSERT(!BlockPartialBuffer);
- BlockPartialBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath);
- if (!BlockPartialBuffer)
- {
- throw std::runtime_error(
- fmt::format("Could not open downloaded block {} from {}",
- BlockDescription.BlockHash,
- BlockChunkPath));
- }
+ throw std::runtime_error(
+ fmt::format("Could not open downloaded block {} from {}",
+ BlockDescription.BlockHash,
+ BlockChunkPath));
}
+ }
+
+ FilteredWrittenBytesPerSecond.Start();
- FilteredWrittenBytesPerSecond.Start();
-
- if (!WritePartialBlockChunksToCache(
- BlockDescription,
- SequenceIndexChunksLeftToWriteCounters,
- Work,
- CompositeBuffer(std::move(BlockPartialBuffer)),
- BlockRange.ChunkBlockIndexStart,
- BlockRange.ChunkBlockIndexStart + BlockRange.ChunkBlockIndexCount - 1,
- RemoteChunkIndexNeedsCopyFromSourceFlags,
- WriteCache))
+ size_t RangeCount = OffsetAndLengths.size();
+
+ for (size_t PartialRangeIndex = 0; PartialRangeIndex < RangeCount; PartialRangeIndex++)
+ {
+ const std::pair<uint64_t, uint64_t>& OffsetAndLength =
+ OffsetAndLengths[PartialRangeIndex];
+ IoBuffer BlockRangeBuffer(BlockPartialBuffer,
+ OffsetAndLength.first,
+ OffsetAndLength.second);
+
+ const ChunkBlockAnalyser::BlockRangeDescriptor& RangeDescriptor =
+ PartialBlocks.BlockRanges[BlockRangeStartIndex + PartialRangeIndex];
+
+ if (!WritePartialBlockChunksToCache(BlockDescription,
+ SequenceIndexChunksLeftToWriteCounters,
+ Work,
+ CompositeBuffer(std::move(BlockRangeBuffer)),
+ RangeDescriptor.ChunkBlockIndexStart,
+ RangeDescriptor.ChunkBlockIndexStart +
+ RangeDescriptor.ChunkBlockIndexCount - 1,
+ RemoteChunkIndexNeedsCopyFromSourceFlags,
+ WriteCache))
{
std::error_code DummyEc;
RemoveFile(BlockChunkPath, DummyEc);
@@ -1447,28 +1464,27 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
fmt::format("Partial block {} is malformed", BlockDescription.BlockHash));
}
- std::error_code Ec = TryRemoveFile(BlockChunkPath);
- if (Ec)
- {
- ZEN_OPERATION_LOG_DEBUG(m_LogOutput,
- "Failed removing file '{}', reason: ({}) {}",
- BlockChunkPath,
- Ec.value(),
- Ec.message());
- }
-
WritePartsComplete++;
if (WritePartsComplete == TotalPartWriteCount)
{
FilteredWrittenBytesPerSecond.Stop();
}
}
- },
- OnDiskPath.empty() ? WorkerThreadPool::EMode::DisableBacklog
- : WorkerThreadPool::EMode::EnableBacklog);
- }
- });
- }
+ std::error_code Ec = TryRemoveFile(BlockChunkPath);
+ if (Ec)
+ {
+ ZEN_OPERATION_LOG_DEBUG(m_LogOutput,
+ "Failed removing file '{}', reason: ({}) {}",
+ BlockChunkPath,
+ Ec.value(),
+ Ec.message());
+ }
+ }
+ },
+ OnDiskPath.empty() ? WorkerThreadPool::EMode::DisableBacklog
+ : WorkerThreadPool::EMode::EnableBacklog);
+ }
+ });
}
});
BlockRangeIndex += RangeCount;
@@ -3161,45 +3177,40 @@ BuildsOperationUpdateFolder::DownloadBuildBlob(uint32_t RemoteChunkInde
void
BuildsOperationUpdateFolder::DownloadPartialBlock(
- const ChunkBlockAnalyser::BlockRangeDescriptor BlockRange,
- const BlobsExistsResult& ExistsResult,
- std::function<void(IoBuffer&& InMemoryBuffer, const std::filesystem::path& OnDiskPath)>&& OnDownloaded)
+ std::span<const ChunkBlockAnalyser::BlockRangeDescriptor> BlockRanges,
+ size_t BlockRangeStartIndex,
+ size_t BlockRangeCount,
+ const BlobsExistsResult& ExistsResult,
+ std::function<void(IoBuffer&& InMemoryBuffer,
+ const std::filesystem::path& OnDiskPath,
+ size_t BlockRangeStartIndex,
+ std::span<const std::pair<uint64_t, uint64_t>> OffsetAndLengths)>&& OnDownloaded)
{
- const uint32_t BlockIndex = BlockRange.BlockIndex;
+ const uint32_t BlockIndex = BlockRanges[BlockRangeStartIndex].BlockIndex;
const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex];
- IoBuffer BlockBuffer;
- if (m_Storage.BuildCacheStorage && ExistsResult.ExistingBlobs.contains(BlockDescription.BlockHash))
- {
- BlockBuffer =
- m_Storage.BuildCacheStorage->GetBuildBlob(m_BuildId, BlockDescription.BlockHash, BlockRange.RangeStart, BlockRange.RangeLength);
- }
- if (!BlockBuffer)
- {
- BlockBuffer =
- m_Storage.BuildStorage->GetBuildBlob(m_BuildId, BlockDescription.BlockHash, BlockRange.RangeStart, BlockRange.RangeLength);
- }
- if (!BlockBuffer)
- {
- throw std::runtime_error(fmt::format("Block {} is missing when fetching range {} -> {}",
- BlockDescription.BlockHash,
- BlockRange.RangeStart,
- BlockRange.RangeStart + BlockRange.RangeLength));
- }
- if (!m_AbortFlag)
- {
- uint64_t BlockSize = BlockBuffer.GetSize();
+ auto ProcessDownload = [this](
+ const ChunkBlockDescription& BlockDescription,
+ IoBuffer&& BlockRangeBuffer,
+ size_t BlockRangeStartIndex,
+ std::span<const std::pair<uint64_t, uint64_t>> BlockOffsetAndLengths,
+ const std::function<void(IoBuffer && InMemoryBuffer,
+ const std::filesystem::path& OnDiskPath,
+ size_t BlockRangeStartIndex,
+ std::span<const std::pair<uint64_t, uint64_t>> OffsetAndLengths)>& OnDownloaded) {
+ uint64_t BlockRangeBufferSize = BlockRangeBuffer.GetSize();
m_DownloadStats.DownloadedBlockCount++;
- m_DownloadStats.DownloadedBlockByteCount += BlockSize;
- m_DownloadStats.RequestsCompleteCount++;
+ m_DownloadStats.DownloadedBlockByteCount += BlockRangeBufferSize;
+ m_DownloadStats.RequestsCompleteCount += BlockOffsetAndLengths.size();
std::filesystem::path BlockChunkPath;
// Check if the dowloaded block is file based and we can move it directly without rewriting it
{
IoBufferFileReference FileRef;
- if (BlockBuffer.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) && (FileRef.FileChunkSize == BlockSize))
+ if (BlockRangeBuffer.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) &&
+ (FileRef.FileChunkSize == BlockRangeBufferSize))
{
ZEN_TRACE_CPU("MoveTempPartialBlock");
@@ -3207,10 +3218,17 @@ BuildsOperationUpdateFolder::DownloadPartialBlock(
std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec);
if (!Ec)
{
- BlockBuffer.SetDeleteOnClose(false);
- BlockBuffer = {};
- BlockChunkPath = m_TempBlockFolderPath /
- fmt::format("{}_{:x}_{:x}", BlockDescription.BlockHash, BlockRange.RangeStart, BlockRange.RangeLength);
+ BlockRangeBuffer.SetDeleteOnClose(false);
+ BlockRangeBuffer = {};
+
+ IoHashStream RangeId;
+ for (const std::pair<uint64_t, uint64_t>& Range : BlockOffsetAndLengths)
+ {
+ RangeId.Append(&Range.first, sizeof(uint64_t));
+ RangeId.Append(&Range.second, sizeof(uint64_t));
+ }
+
+ BlockChunkPath = m_TempBlockFolderPath / fmt::format("{}_{}", BlockDescription.BlockHash, RangeId.GetHash());
RenameFile(TempBlobPath, BlockChunkPath, Ec);
if (Ec)
{
@@ -3218,27 +3236,137 @@ BuildsOperationUpdateFolder::DownloadPartialBlock(
// Re-open the temp file again
BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete);
- BlockBuffer = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, BlockSize, true);
- BlockBuffer.SetDeleteOnClose(true);
+ BlockRangeBuffer = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, BlockRangeBufferSize, true);
+ BlockRangeBuffer.SetDeleteOnClose(true);
}
}
}
}
- if (BlockChunkPath.empty() && (BlockSize > m_Options.MaximumInMemoryPayloadSize))
+ if (BlockChunkPath.empty() && (BlockRangeBufferSize > m_Options.MaximumInMemoryPayloadSize))
{
ZEN_TRACE_CPU("WriteTempPartialBlock");
+
+ IoHashStream RangeId;
+ for (const std::pair<uint64_t, uint64_t>& Range : BlockOffsetAndLengths)
+ {
+ RangeId.Append(&Range.first, sizeof(uint64_t));
+ RangeId.Append(&Range.second, sizeof(uint64_t));
+ }
+
// Could not be moved and rather large, lets store it on disk
- BlockChunkPath = m_TempBlockFolderPath /
- fmt::format("{}_{:x}_{:x}", BlockDescription.BlockHash, BlockRange.RangeStart, BlockRange.RangeLength);
- TemporaryFile::SafeWriteFile(BlockChunkPath, BlockBuffer);
- BlockBuffer = {};
+ BlockChunkPath = m_TempBlockFolderPath / fmt::format("{}_{}", BlockDescription.BlockHash, RangeId.GetHash());
+ TemporaryFile::SafeWriteFile(BlockChunkPath, BlockRangeBuffer);
+ BlockRangeBuffer = {};
}
if (!m_AbortFlag)
{
- OnDownloaded(std::move(BlockBuffer), std::move(BlockChunkPath));
+ OnDownloaded(std::move(BlockRangeBuffer), std::move(BlockChunkPath), BlockRangeStartIndex, BlockOffsetAndLengths);
+ }
+ };
+
+ std::vector<std::pair<uint64_t, uint64_t>> Ranges;
+ Ranges.reserve(BlockRangeCount);
+ for (size_t BlockRangeIndex = BlockRangeStartIndex; BlockRangeIndex < BlockRangeStartIndex + BlockRangeCount; BlockRangeIndex++)
+ {
+ const ChunkBlockAnalyser::BlockRangeDescriptor& BlockRange = BlockRanges[BlockRangeIndex];
+ Ranges.push_back(std::make_pair(BlockRange.RangeStart, BlockRange.RangeLength));
+ }
+
+ if (m_Storage.BuildCacheStorage && ExistsResult.ExistingBlobs.contains(BlockDescription.BlockHash))
+ {
+ BuildStorageCache::BuildBlobRanges RangeBuffers =
+ m_Storage.BuildCacheStorage->GetBuildBlobRanges(m_BuildId, BlockDescription.BlockHash, Ranges);
+ if (RangeBuffers.PayloadBuffer)
+ {
+ if (!m_AbortFlag)
+ {
+ if (RangeBuffers.Ranges.size() != Ranges.size())
+ {
+ throw std::runtime_error(fmt::format("Fetching {} ranges from {} resulted in {} ranges",
+ Ranges.size(),
+ BlockDescription.BlockHash,
+ RangeBuffers.Ranges.size()));
+ }
+
+ std::vector<std::pair<uint64_t, uint64_t>> BlockOffsetAndLengths = std::move(RangeBuffers.Ranges);
+ ProcessDownload(BlockDescription,
+ std::move(RangeBuffers.PayloadBuffer),
+ BlockRangeStartIndex,
+ BlockOffsetAndLengths,
+ OnDownloaded);
+ }
+ return;
}
}
+
+ const size_t MaxRangesPerRequestToJupiter = BuildStorageBase::MaxRangeCountPerRequest;
+
+ size_t SubBlockRangeCount = BlockRangeCount;
+ size_t SubRangeCountComplete = 0;
+ std::span<const std::pair<uint64_t, uint64_t>> RangesSpan(Ranges);
+ while (SubRangeCountComplete < SubBlockRangeCount)
+ {
+ if (m_AbortFlag)
+ {
+ break;
+ }
+ size_t SubRangeCount = Min(BlockRangeCount - SubRangeCountComplete, MaxRangesPerRequestToJupiter);
+ size_t SubRangeStartIndex = BlockRangeStartIndex + SubRangeCountComplete;
+
+ auto SubRanges = RangesSpan.subspan(SubRangeCountComplete, SubRangeCount);
+
+ BuildStorageBase::BuildBlobRanges RangeBuffers =
+ m_Storage.BuildStorage->GetBuildBlobRanges(m_BuildId, BlockDescription.BlockHash, SubRanges);
+ if (RangeBuffers.PayloadBuffer)
+ {
+ if (m_AbortFlag)
+ {
+ break;
+ }
+ if (RangeBuffers.Ranges.empty())
+ {
+ // Jupiter will ignore the ranges and send the whole payload if it fetches the payload from S3
+ // Upload to cache (if enabled) and use the whole payload for the remaining ranges
+
+ if (m_Storage.BuildCacheStorage && m_Options.PopulateCache)
+ {
+ m_Storage.BuildCacheStorage->PutBuildBlob(m_BuildId,
+ BlockDescription.BlockHash,
+ ZenContentType::kCompressedBinary,
+ CompositeBuffer(std::vector<IoBuffer>{RangeBuffers.PayloadBuffer}));
+ }
+
+ SubRangeCount = Ranges.size() - SubRangeCountComplete;
+ ProcessDownload(BlockDescription,
+ std::move(RangeBuffers.PayloadBuffer),
+ SubRangeStartIndex,
+ RangesSpan.subspan(SubRangeCountComplete, SubRangeCount),
+ OnDownloaded);
+ }
+ else
+ {
+ if (RangeBuffers.Ranges.size() != SubRanges.size())
+ {
+ throw std::runtime_error(fmt::format("Fetching {} ranges from {} resulted in {} ranges",
+ SubRanges.size(),
+ BlockDescription.BlockHash,
+ RangeBuffers.Ranges.size()));
+ }
+ ProcessDownload(BlockDescription,
+ std::move(RangeBuffers.PayloadBuffer),
+ SubRangeStartIndex,
+ RangeBuffers.Ranges,
+ OnDownloaded);
+ }
+ }
+ else
+ {
+ throw std::runtime_error(fmt::format("Block {} is missing when fetching {} ranges", BlockDescription.BlockHash, SubRangeCount));
+ }
+
+ SubRangeCountComplete += SubRangeCount;
+ }
}
std::vector<uint32_t>
@@ -7083,16 +7211,31 @@ GetRemoteContent(OperationLogOutput& Output,
// TODO: GetBlockDescriptions for all BlockRawHashes in one go - check for local block descriptions when we cache them
{
+ if (!IsQuiet)
+ {
+ ZEN_OPERATION_LOG_INFO(Output, "Fetching metadata for {} blocks", BlockRawHashes.size());
+ }
+
+ Stopwatch GetBlockMetadataTimer;
+
bool AttemptFallback = false;
OutBlockDescriptions = GetBlockDescriptions(Output,
*Storage.BuildStorage,
Storage.BuildCacheStorage.get(),
BuildId,
- BuildPartId,
BlockRawHashes,
AttemptFallback,
IsQuiet,
IsVerbose);
+
+ if (!IsQuiet)
+ {
+ ZEN_OPERATION_LOG_INFO(Output,
+ "GetBlockMetadata for {} took {}. Found {} blocks",
+ BuildPartId,
+ NiceTimeSpanMs(GetBlockMetadataTimer.GetElapsedTimeMs()),
+ OutBlockDescriptions.size());
+ }
}
CalculateLocalChunkOrders(AbsoluteChunkOrders,
@@ -7935,6 +8078,164 @@ TEST_CASE("buildstorageoperations.upload.multipart")
}
}
+TEST_CASE("buildstorageoperations.partial.block.download" * doctest::skip(true))
+{
+ const std::string OidcExecutableName = "OidcToken" ZEN_EXE_SUFFIX_LITERAL;
+ std::filesystem::path OidcTokenExePath = (GetRunningExecutablePath().parent_path() / OidcExecutableName).make_preferred();
+
+ HttpClientSettings ClientSettings{
+ .LogCategory = "httpbuildsclient",
+ .AccessTokenProvider =
+ httpclientauth::CreateFromOidcTokenExecutable(OidcTokenExePath, "https://jupiter.devtools.epicgames.com", true, false, false),
+ .AssumeHttp2 = false,
+ .AllowResume = true,
+ .RetryCount = 0,
+ .Verbose = false};
+
+ HttpClient HttpClient("https://euc.jupiter.devtools.epicgames.com", ClientSettings);
+
+ const std::string_view Namespace = "fortnite.oplog";
+ const std::string_view Bucket = "fortnitegame.staged-build.fortnite-main.ps4-client";
+ const Oid BuildId = Oid::FromHexString("09a76ea92ad301d4724fafad");
+
+ {
+ HttpClient::Response Response = HttpClient.Get(fmt::format("/api/v2/builds/{}/{}/{}", Namespace, Bucket, BuildId),
+ HttpClient::Accept(ZenContentType::kCbObject));
+ CbValidateError ValidateResult = CbValidateError::None;
+ CbObject Object = ValidateAndReadCompactBinaryObject(IoBuffer(Response.ResponsePayload), ValidateResult);
+ REQUIRE(ValidateResult == CbValidateError::None);
+ }
+
+ std::vector<ChunkBlockDescription> BlockDescriptions;
+ {
+ CbObjectWriter Request;
+
+ Request.BeginArray("blocks"sv);
+ {
+ Request.AddHash(IoHash::FromHexString("7c353ed782675a5e8f968e61e51fc797ecdc2882"));
+ }
+ Request.EndArray();
+
+ IoBuffer Payload = Request.Save().GetBuffer().AsIoBuffer();
+ Payload.SetContentType(ZenContentType::kCbObject);
+
+ HttpClient::Response BlockDescriptionsResponse =
+ HttpClient.Post(fmt::format("/api/v2/builds/{}/{}/{}/blocks/getBlockMetadata", Namespace, Bucket, BuildId),
+ Payload,
+ HttpClient::Accept(ZenContentType::kCbObject));
+ REQUIRE(BlockDescriptionsResponse.IsSuccess());
+
+ CbValidateError ValidateResult = CbValidateError::None;
+ CbObject Object = ValidateAndReadCompactBinaryObject(IoBuffer(BlockDescriptionsResponse.ResponsePayload), ValidateResult);
+ REQUIRE(ValidateResult == CbValidateError::None);
+
+ {
+ CbArrayView BlocksArray = Object["blocks"sv].AsArrayView();
+ for (CbFieldView Block : BlocksArray)
+ {
+ ChunkBlockDescription Description = ParseChunkBlockDescription(Block.AsObjectView());
+ BlockDescriptions.emplace_back(std::move(Description));
+ }
+ }
+ }
+
+ REQUIRE(!BlockDescriptions.empty());
+
+ const IoHash BlockHash = BlockDescriptions.back().BlockHash;
+
+ const ChunkBlockDescription& BlockDescription = BlockDescriptions.front();
+ REQUIRE(!BlockDescription.ChunkRawHashes.empty());
+ REQUIRE(!BlockDescription.ChunkCompressedLengths.empty());
+
+ std::vector<std::pair<uint64_t, uint64_t>> ChunkOffsetAndSizes;
+ uint64_t Offset = gsl::narrow<uint32_t>(CompressedBuffer::GetHeaderSizeForNoneEncoder() + BlockDescription.HeaderSize);
+
+ for (uint32_t ChunkCompressedSize : BlockDescription.ChunkCompressedLengths)
+ {
+ ChunkOffsetAndSizes.push_back(std::make_pair(Offset, ChunkCompressedSize));
+ Offset += ChunkCompressedSize;
+ }
+
+ ScopedTemporaryDirectory SourceFolder;
+
+ auto Validate = [&](std::span<const uint32_t> ChunkIndexesToFetch) {
+ std::vector<std::pair<uint64_t, uint64_t>> Ranges;
+ for (uint32_t ChunkIndex : ChunkIndexesToFetch)
+ {
+ Ranges.push_back(ChunkOffsetAndSizes[ChunkIndex]);
+ }
+
+ HttpClient::KeyValueMap Headers;
+ if (!Ranges.empty())
+ {
+ ExtendableStringBuilder<512> SB;
+ for (const std::pair<uint64_t, uint64_t>& R : Ranges)
+ {
+ if (SB.Size() > 0)
+ {
+ SB << ", ";
+ }
+ SB << R.first << "-" << R.first + R.second - 1;
+ }
+ Headers.Entries.insert({"Range", fmt::format("bytes={}", SB.ToView())});
+ }
+
+ HttpClient::Response GetBlobRangesResponse = HttpClient.Download(
+ fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}?supportsRedirect=false", Namespace, Bucket, BuildId, BlockHash),
+ SourceFolder.Path(),
+ Headers);
+
+ REQUIRE(GetBlobRangesResponse.IsSuccess());
+ MemoryView RangesMemoryView = GetBlobRangesResponse.ResponsePayload.GetView();
+
+ std::vector<std::pair<uint64_t, uint64_t>> PayloadRanges = GetBlobRangesResponse.GetRanges(Ranges);
+ if (PayloadRanges.empty())
+ {
+ // We got the whole blob, use the ranges as is
+ PayloadRanges = Ranges;
+ }
+
+ REQUIRE(PayloadRanges.size() == Ranges.size());
+
+ for (uint32_t RangeIndex = 0; RangeIndex < PayloadRanges.size(); RangeIndex++)
+ {
+ const std::pair<uint64_t, uint64_t>& PayloadRange = PayloadRanges[RangeIndex];
+
+ CHECK_EQ(PayloadRange.second, Ranges[RangeIndex].second);
+
+ IoBuffer ChunkPayload(GetBlobRangesResponse.ResponsePayload, PayloadRange.first, PayloadRange.second);
+ IoHash RawHash;
+ uint64_t RawSize;
+ CompressedBuffer CompressedChunk = CompressedBuffer::FromCompressed(SharedBuffer(ChunkPayload), RawHash, RawSize);
+ CHECK(CompressedChunk);
+ CHECK_EQ(RawHash, BlockDescription.ChunkRawHashes[ChunkIndexesToFetch[RangeIndex]]);
+ CHECK_EQ(RawSize, BlockDescription.ChunkRawLengths[ChunkIndexesToFetch[RangeIndex]]);
+ }
+ };
+
+ {
+ // Single
+ std::vector<uint32_t> ChunkIndexesToFetch{uint32_t(BlockDescription.ChunkCompressedLengths.size() / 2)};
+ Validate(ChunkIndexesToFetch);
+ }
+ {
+ // Many
+ std::vector<uint32_t> ChunkIndexesToFetch;
+ for (uint32_t Index = 0; Index < BlockDescription.ChunkCompressedLengths.size() / 16; Index++)
+ {
+ ChunkIndexesToFetch.push_back(uint32_t(BlockDescription.ChunkCompressedLengths.size() / 6 + Index * 7));
+ ChunkIndexesToFetch.push_back(uint32_t(BlockDescription.ChunkCompressedLengths.size() / 6 + Index * 7 + 1));
+ ChunkIndexesToFetch.push_back(uint32_t(BlockDescription.ChunkCompressedLengths.size() / 6 + Index * 7 + 3));
+ }
+ Validate(ChunkIndexesToFetch);
+ }
+
+ {
+ // First and last
+ std::vector<uint32_t> ChunkIndexesToFetch{0, uint32_t(BlockDescription.ChunkCompressedLengths.size() - 1)};
+ Validate(ChunkIndexesToFetch);
+ }
+}
TEST_SUITE_END();
void
diff --git a/src/zenremotestore/builds/buildstorageutil.cpp b/src/zenremotestore/builds/buildstorageutil.cpp
index b249d7d52..d65f18b9a 100644
--- a/src/zenremotestore/builds/buildstorageutil.cpp
+++ b/src/zenremotestore/builds/buildstorageutil.cpp
@@ -251,7 +251,6 @@ GetBlockDescriptions(OperationLogOutput& Output,
BuildStorageBase& Storage,
BuildStorageCache* OptionalCacheStorage,
const Oid& BuildId,
- const Oid& BuildPartId,
std::span<const IoHash> BlockRawHashes,
bool AttemptFallback,
bool IsQuiet,
@@ -259,13 +258,6 @@ GetBlockDescriptions(OperationLogOutput& Output,
{
using namespace std::literals;
- if (!IsQuiet)
- {
- ZEN_OPERATION_LOG_INFO(Output, "Fetching metadata for {} blocks", BlockRawHashes.size());
- }
-
- Stopwatch GetBlockMetadataTimer;
-
std::vector<ChunkBlockDescription> UnorderedList;
tsl::robin_map<IoHash, size_t, IoHash::Hasher> BlockDescriptionLookup;
if (OptionalCacheStorage && !BlockRawHashes.empty())
@@ -355,15 +347,6 @@ GetBlockDescriptions(OperationLogOutput& Output,
}
}
- if (!IsQuiet)
- {
- ZEN_OPERATION_LOG_INFO(Output,
- "GetBlockMetadata for {} took {}. Found {} blocks",
- BuildPartId,
- NiceTimeSpanMs(GetBlockMetadataTimer.GetElapsedTimeMs()),
- Result.size());
- }
-
if (Result.size() != BlockRawHashes.size())
{
std::string ErrorDescription =
diff --git a/src/zenremotestore/builds/filebuildstorage.cpp b/src/zenremotestore/builds/filebuildstorage.cpp
index 55e69de61..2f4904449 100644
--- a/src/zenremotestore/builds/filebuildstorage.cpp
+++ b/src/zenremotestore/builds/filebuildstorage.cpp
@@ -432,6 +432,45 @@ public:
return IoBuffer{};
}
+ virtual BuildBlobRanges GetBuildBlobRanges(const Oid& BuildId,
+ const IoHash& RawHash,
+ std::span<const std::pair<uint64_t, uint64_t>> Ranges) override
+ {
+ ZEN_TRACE_CPU("FileBuildStorage::GetBuildBlobRanges");
+ ZEN_UNUSED(BuildId);
+ ZEN_ASSERT(!Ranges.empty());
+
+ uint64_t ReceivedBytes = 0;
+ uint64_t SentBytes = Ranges.size() * 2 * 8;
+
+ SimulateLatency(SentBytes, 0);
+ auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); });
+
+ Stopwatch ExecutionTimer;
+ auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); });
+
+ BuildBlobRanges Result;
+
+ const std::filesystem::path BlockPath = GetBlobPayloadPath(RawHash);
+ if (IsFile(BlockPath))
+ {
+ BasicFile File(BlockPath, BasicFile::Mode::kRead);
+
+ uint64_t RangeOffset = Ranges.front().first;
+ uint64_t RangeBytes = Ranges.back().first + Ranges.back().second - RangeOffset;
+ Result.PayloadBuffer = IoBufferBuilder::MakeFromFileHandle(File.Detach(), RangeOffset, RangeBytes);
+
+ Result.Ranges.reserve(Ranges.size());
+
+ for (const std::pair<uint64_t, uint64_t>& Range : Ranges)
+ {
+ Result.Ranges.push_back(std::make_pair(Range.first - RangeOffset, Range.second));
+ }
+ ReceivedBytes = Result.PayloadBuffer.GetSize();
+ }
+ return Result;
+ }
+
virtual std::vector<std::function<void()>> GetLargeBuildBlob(const Oid& BuildId,
const IoHash& RawHash,
uint64_t ChunkSize,
diff --git a/src/zenremotestore/builds/jupiterbuildstorage.cpp b/src/zenremotestore/builds/jupiterbuildstorage.cpp
index 23d0ddd4c..8e16da1a9 100644
--- a/src/zenremotestore/builds/jupiterbuildstorage.cpp
+++ b/src/zenremotestore/builds/jupiterbuildstorage.cpp
@@ -21,7 +21,7 @@ namespace zen {
using namespace std::literals;
namespace {
- void ThrowFromJupiterResult(const JupiterResult& Result, std::string_view Prefix)
+ [[noreturn]] void ThrowFromJupiterResult(const JupiterResult& Result, std::string_view Prefix)
{
int Error = Result.ErrorCode < (int)HttpResponseCode::Continue ? Result.ErrorCode : 0;
HttpResponseCode Status =
@@ -295,6 +295,26 @@ public:
return std::move(GetBuildBlobResult.Response);
}
+ virtual BuildBlobRanges GetBuildBlobRanges(const Oid& BuildId,
+ const IoHash& RawHash,
+ std::span<const std::pair<uint64_t, uint64_t>> Ranges) override
+ {
+ ZEN_TRACE_CPU("Jupiter::GetBuildBlob");
+
+ Stopwatch ExecutionTimer;
+ auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
+ CreateDirectories(m_TempFolderPath);
+
+ BuildBlobRangesResult GetBuildBlobResult =
+ m_Session.GetBuildBlob(m_Namespace, m_Bucket, BuildId, RawHash, m_TempFolderPath, Ranges);
+ AddStatistic(GetBuildBlobResult);
+ if (!GetBuildBlobResult.Success)
+ {
+ ThrowFromJupiterResult(GetBuildBlobResult, "Failed fetching build blob ranges"sv);
+ }
+ return BuildBlobRanges{.PayloadBuffer = std::move(GetBuildBlobResult.Response), .Ranges = std::move(GetBuildBlobResult.Ranges)};
+ }
+
virtual std::vector<std::function<void()>> GetLargeBuildBlob(const Oid& BuildId,
const IoHash& RawHash,
uint64_t ChunkSize,
diff --git a/src/zenremotestore/chunking/chunkblock.cpp b/src/zenremotestore/chunking/chunkblock.cpp
index 3a4e6011d..9c3fe8a0b 100644
--- a/src/zenremotestore/chunking/chunkblock.cpp
+++ b/src/zenremotestore/chunking/chunkblock.cpp
@@ -608,40 +608,49 @@ ChunkBlockAnalyser::CalculatePartialBlockDownloads(std::span<const NeededBlock>
if (PartialBlockDownloadMode != EPartialBlockDownloadMode::Exact && BlockRanges.size() > 1)
{
- // TODO: Once we have support in our http client to request multiple ranges in one request this
- // logic would need to change as the per-request overhead would go away
+ const uint64_t MaxRangeCountPerRequest = PartialBlockDownloadMode == EPartialBlockDownloadMode::MultiRangeHighSpeed
+ ? m_Options.HostHighSpeedMaxRangeCountPerRequest
+ : m_Options.HostMaxRangeCountPerRequest;
- const double LatencySec = PartialBlockDownloadMode == EPartialBlockDownloadMode::MultiRangeHighSpeed
- ? m_Options.HostHighSpeedLatencySec
- : m_Options.HostLatencySec;
- if (LatencySec > 0)
+ ZEN_ASSERT(MaxRangeCountPerRequest != 0);
+
+ if (MaxRangeCountPerRequest != (uint64_t)-1)
{
- const uint64_t BytesPerSec = PartialBlockDownloadMode == EPartialBlockDownloadMode::MultiRangeHighSpeed
- ? m_Options.HostHighSpeedBytesPerSec
- : m_Options.HostSpeedBytesPerSec;
+ const uint64_t ExtraRequestCount = BlockRanges.size() / MaxRangeCountPerRequest;
- const double ExtraRequestTimeSec = (BlockRanges.size() - 1) * LatencySec;
- const uint64_t ExtraRequestTimeBytes = uint64_t(ExtraRequestTimeSec * BytesPerSec);
+ const double LatencySec = PartialBlockDownloadMode == EPartialBlockDownloadMode::MultiRangeHighSpeed
+ ? m_Options.HostHighSpeedLatencySec
+ : m_Options.HostLatencySec;
+ if (LatencySec > 0)
+ {
+ const uint64_t BytesPerSec = PartialBlockDownloadMode == EPartialBlockDownloadMode::MultiRangeHighSpeed
+ ? m_Options.HostHighSpeedBytesPerSec
+ : m_Options.HostSpeedBytesPerSec;
- const uint64_t FullRangeSize =
- BlockRanges.back().RangeStart + BlockRanges.back().RangeLength - BlockRanges.front().RangeStart;
+ const double ExtraRequestTimeSec = ExtraRequestCount * LatencySec;
+ const uint64_t ExtraRequestTimeBytes = uint64_t(ExtraRequestTimeSec * BytesPerSec);
- if (ExtraRequestTimeBytes + RequestedSize >= FullRangeSize)
- {
- BlockRanges = std::vector<BlockRangeDescriptor>{MergeBlockRanges(BlockRanges)};
+ const uint64_t FullRangeSize =
+ BlockRanges.back().RangeStart + BlockRanges.back().RangeLength - BlockRanges.front().RangeStart;
- if (m_Options.IsVerbose)
+ if (ExtraRequestTimeBytes + RequestedSize >= FullRangeSize)
{
- ZEN_OPERATION_LOG_INFO(m_LogOutput,
- "Merging {} chunks ({}) from block {} ({}) to single request (extra bytes {})",
- NeededBlock.ChunkIndexes.size(),
- NiceBytes(RequestedSize),
- BlockDescription.BlockHash,
- NiceBytes(TotalBlockSize),
- NiceBytes(BlockRanges.front().RangeLength - RequestedSize));
+ BlockRanges = std::vector<BlockRangeDescriptor>{MergeBlockRanges(BlockRanges)};
+
+ if (m_Options.IsVerbose)
+ {
+ ZEN_OPERATION_LOG_INFO(
+ m_LogOutput,
+ "Merging {} chunks ({}) from block {} ({}) to single request (extra bytes {})",
+ NeededBlock.ChunkIndexes.size(),
+ NiceBytes(RequestedSize),
+ BlockDescription.BlockHash,
+ NiceBytes(TotalBlockSize),
+ NiceBytes(BlockRanges.front().RangeLength - RequestedSize));
+ }
+
+ RequestedSize = BlockRanges.front().RangeLength;
}
-
- RequestedSize = BlockRanges.front().RangeLength;
}
}
}
@@ -730,7 +739,7 @@ ChunkBlockAnalyser::CalculatePartialBlockDownloads(std::span<const NeededBlock>
ZEN_OPERATION_LOG_INFO(m_LogOutput,
"Analysis of partial block requests saves download of {} out of {}, {:.1f}% of possible {} using {} extra "
- "requests. Completed in {}",
+ "ranges. Completed in {}",
NiceBytes(ActualSkippedSize),
NiceBytes(AllBlocksTotalBlocksSize),
PercentOfIdealPartialSkippedSize,
diff --git a/src/zenremotestore/include/zenremotestore/builds/buildstorage.h b/src/zenremotestore/include/zenremotestore/builds/buildstorage.h
index 85dabc59f..ce3da41c1 100644
--- a/src/zenremotestore/include/zenremotestore/builds/buildstorage.h
+++ b/src/zenremotestore/include/zenremotestore/builds/buildstorage.h
@@ -53,15 +53,26 @@ public:
std::function<IoBuffer(uint64_t Offset, uint64_t Size)>&& Transmitter,
std::function<void(uint64_t, bool)>&& OnSentBytes) = 0;
- virtual IoBuffer GetBuildBlob(const Oid& BuildId,
- const IoHash& RawHash,
- uint64_t RangeOffset = 0,
- uint64_t RangeBytes = (uint64_t)-1) = 0;
+ virtual IoBuffer GetBuildBlob(const Oid& BuildId,
+ const IoHash& RawHash,
+ uint64_t RangeOffset = 0,
+ uint64_t RangeBytes = (uint64_t)-1) = 0;
+
+ static constexpr size_t MaxRangeCountPerRequest = 128u;
+
+ struct BuildBlobRanges
+ {
+ IoBuffer PayloadBuffer;
+ std::vector<std::pair<uint64_t, uint64_t>> Ranges;
+ };
+ virtual BuildBlobRanges GetBuildBlobRanges(const Oid& BuildId,
+ const IoHash& RawHash,
+ std::span<const std::pair<uint64_t, uint64_t>> Ranges) = 0;
virtual std::vector<std::function<void()>> GetLargeBuildBlob(const Oid& BuildId,
const IoHash& RawHash,
uint64_t ChunkSize,
std::function<void(uint64_t Offset, const IoBuffer& Chunk)>&& OnReceive,
- std::function<void()>&& OnComplete) = 0;
+ std::function<void()>&& OnComplete) = 0;
[[nodiscard]] virtual bool PutBlockMetadata(const Oid& BuildId, const IoHash& BlockRawHash, const CbObject& MetaData) = 0;
virtual CbObject FindBlocks(const Oid& BuildId, uint64_t MaxBlockCount) = 0;
diff --git a/src/zenremotestore/include/zenremotestore/builds/buildstoragecache.h b/src/zenremotestore/include/zenremotestore/builds/buildstoragecache.h
index f25ce5b5e..67c93480b 100644
--- a/src/zenremotestore/include/zenremotestore/builds/buildstoragecache.h
+++ b/src/zenremotestore/include/zenremotestore/builds/buildstoragecache.h
@@ -37,6 +37,14 @@ public:
const IoHash& RawHash,
uint64_t RangeOffset = 0,
uint64_t RangeBytes = (uint64_t)-1) = 0;
+ struct BuildBlobRanges
+ {
+ IoBuffer PayloadBuffer;
+ std::vector<std::pair<uint64_t, uint64_t>> Ranges;
+ };
+ virtual BuildBlobRanges GetBuildBlobRanges(const Oid& BuildId,
+ const IoHash& RawHash,
+ std::span<const std::pair<uint64_t, uint64_t>> Ranges) = 0;
virtual void PutBlobMetadatas(const Oid& BuildId, std::span<const IoHash> BlobHashes, std::span<const CbObject> MetaDatas) = 0;
virtual std::vector<CbObject> GetBlobMetadatas(const Oid& BuildId, std::span<const IoHash> BlobHashes) = 0;
diff --git a/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h b/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h
index 31733569e..875b8593b 100644
--- a/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h
+++ b/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h
@@ -263,9 +263,14 @@ private:
ParallelWork& Work,
std::function<void(IoBuffer&& Payload)>&& OnDownloaded);
- void DownloadPartialBlock(const ChunkBlockAnalyser::BlockRangeDescriptor BlockRange,
- const BlobsExistsResult& ExistsResult,
- std::function<void(IoBuffer&& InMemoryBuffer, const std::filesystem::path& OnDiskPath)>&& OnDownloaded);
+ void DownloadPartialBlock(std::span<const ChunkBlockAnalyser::BlockRangeDescriptor> BlockRanges,
+ size_t BlockRangeIndex,
+ size_t BlockRangeCount,
+ const BlobsExistsResult& ExistsResult,
+ std::function<void(IoBuffer&& InMemoryBuffer,
+ const std::filesystem::path& OnDiskPath,
+ size_t BlockRangeStartIndex,
+ std::span<const std::pair<uint64_t, uint64_t>> OffsetAndLengths)>&& OnDownloaded);
std::vector<uint32_t> WriteLocalChunkToCache(CloneQueryInterface* CloneQuery,
const CopyChunkData& CopyData,
diff --git a/src/zenremotestore/include/zenremotestore/builds/buildstorageutil.h b/src/zenremotestore/include/zenremotestore/builds/buildstorageutil.h
index 4b85d8f1e..764a24e61 100644
--- a/src/zenremotestore/include/zenremotestore/builds/buildstorageutil.h
+++ b/src/zenremotestore/include/zenremotestore/builds/buildstorageutil.h
@@ -45,7 +45,6 @@ std::vector<ChunkBlockDescription> GetBlockDescriptions(OperationLogOutput& Out
BuildStorageBase& Storage,
BuildStorageCache* OptionalCacheStorage,
const Oid& BuildId,
- const Oid& BuildPartId,
std::span<const IoHash> BlockRawHashes,
bool AttemptFallback,
bool IsQuiet,
diff --git a/src/zenremotestore/include/zenremotestore/chunking/chunkblock.h b/src/zenremotestore/include/zenremotestore/chunking/chunkblock.h
index 5a17ef79c..7aae1442e 100644
--- a/src/zenremotestore/include/zenremotestore/chunking/chunkblock.h
+++ b/src/zenremotestore/include/zenremotestore/chunking/chunkblock.h
@@ -82,12 +82,14 @@ class ChunkBlockAnalyser
public:
struct Options
{
- bool IsQuiet = false;
- bool IsVerbose = false;
- double HostLatencySec = -1.0;
- double HostHighSpeedLatencySec = -1.0;
- uint64_t HostSpeedBytesPerSec = (1u * 1024u * 1024u * 1024u) / 8u; // 1GBit
- uint64_t HostHighSpeedBytesPerSec = (2u * 1024u * 1024u * 1024u) / 8u; // 2GBit
+ bool IsQuiet = false;
+ bool IsVerbose = false;
+ double HostLatencySec = -1.0;
+ double HostHighSpeedLatencySec = -1.0;
+ uint64_t HostSpeedBytesPerSec = (1u * 1024u * 1024u * 1024u) / 8u; // 1GBit
+ uint64_t HostHighSpeedBytesPerSec = (2u * 1024u * 1024u * 1024u) / 8u; // 2GBit
+ uint64_t HostMaxRangeCountPerRequest = (uint64_t)-1;
+ uint64_t HostHighSpeedMaxRangeCountPerRequest = (uint64_t)-1; // No limit
};
ChunkBlockAnalyser(OperationLogOutput& LogOutput, std::span<const ChunkBlockDescription> BlockDescriptions, const Options& Options);
@@ -137,14 +139,15 @@ private:
static constexpr uint16_t FullBlockRangePercentLimit = 98;
- static constexpr BlockRangeLimit ForceMergeLimits[] = {{.SizePercent = FullBlockRangePercentLimit, .MaxRangeCount = 1},
- {.SizePercent = 90, .MaxRangeCount = 4},
- {.SizePercent = 85, .MaxRangeCount = 16},
- {.SizePercent = 80, .MaxRangeCount = 32},
- {.SizePercent = 75, .MaxRangeCount = 48},
- {.SizePercent = 70, .MaxRangeCount = 64},
- {.SizePercent = 4, .MaxRangeCount = 82},
- {.SizePercent = 0, .MaxRangeCount = 96}};
+ static constexpr BlockRangeLimit ForceMergeLimits[] = {{.SizePercent = FullBlockRangePercentLimit, .MaxRangeCount = 8},
+ {.SizePercent = 90, .MaxRangeCount = 16},
+ {.SizePercent = 85, .MaxRangeCount = 32},
+ {.SizePercent = 80, .MaxRangeCount = 48},
+ {.SizePercent = 75, .MaxRangeCount = 64},
+ {.SizePercent = 70, .MaxRangeCount = 92},
+ {.SizePercent = 50, .MaxRangeCount = 128},
+ {.SizePercent = 4, .MaxRangeCount = 192},
+ {.SizePercent = 0, .MaxRangeCount = 256}};
BlockRangeDescriptor MergeBlockRanges(std::span<const BlockRangeDescriptor> Ranges);
std::optional<std::vector<BlockRangeDescriptor>> MakeOptionalBlockRangeVector(uint64_t TotalBlockSize,
diff --git a/src/zenremotestore/include/zenremotestore/jupiter/jupitersession.h b/src/zenremotestore/include/zenremotestore/jupiter/jupitersession.h
index eaf6962fd..8721bc37f 100644
--- a/src/zenremotestore/include/zenremotestore/jupiter/jupitersession.h
+++ b/src/zenremotestore/include/zenremotestore/jupiter/jupitersession.h
@@ -56,6 +56,11 @@ struct FinalizeBuildPartResult : JupiterResult
std::vector<IoHash> Needs;
};
+struct BuildBlobRangesResult : JupiterResult
+{
+ std::vector<std::pair<uint64_t, uint64_t>> Ranges;
+};
+
/**
* Context for performing Jupiter operations
*
@@ -135,6 +140,13 @@ public:
uint64_t Offset = 0,
uint64_t Size = (uint64_t)-1);
+ BuildBlobRangesResult GetBuildBlob(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const IoHash& Hash,
+ std::filesystem::path TempFolderPath,
+ std::span<const std::pair<uint64_t, uint64_t>> Ranges);
+
JupiterResult PutMultipartBuildBlob(std::string_view Namespace,
std::string_view BucketId,
const Oid& BuildId,
diff --git a/src/zenremotestore/include/zenremotestore/projectstore/remoteprojectstore.h b/src/zenremotestore/include/zenremotestore/projectstore/remoteprojectstore.h
index 152c02ee2..2cf10c664 100644
--- a/src/zenremotestore/include/zenremotestore/projectstore/remoteprojectstore.h
+++ b/src/zenremotestore/include/zenremotestore/projectstore/remoteprojectstore.h
@@ -84,6 +84,12 @@ public:
std::vector<bool> HasBody;
};
+ struct LoadAttachmentRangesResult : public Result
+ {
+ IoBuffer Bytes;
+ std::vector<std::pair<uint64_t, uint64_t>> Ranges;
+ };
+
struct RemoteStoreInfo
{
bool CreateBlocks;
@@ -127,15 +133,21 @@ public:
virtual GetBlockDescriptionsResult GetBlockDescriptions(std::span<const IoHash> BlockHashes) = 0;
virtual AttachmentExistsInCacheResult AttachmentExistsInCache(std::span<const IoHash> RawHashes) = 0;
- struct AttachmentRange
+ enum ESourceMode
{
- uint64_t Offset = 0;
- uint64_t Bytes = (uint64_t)-1;
-
- inline operator bool() const { return Offset != 0 || Bytes != (uint64_t)-1; }
+ kAny,
+ kCacheOnly,
+ kHostOnly
};
- virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash, const AttachmentRange& Range) = 0;
- virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes) = 0;
+
+ virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash, ESourceMode SourceMode = ESourceMode::kAny) = 0;
+
+ static constexpr size_t MaxRangeCountPerRequest = 128u;
+
+ virtual LoadAttachmentRangesResult LoadAttachmentRanges(const IoHash& RawHash,
+ std::span<const std::pair<uint64_t, uint64_t>> Ranges,
+ ESourceMode SourceMode = ESourceMode::kAny) = 0;
+ virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes, ESourceMode SourceMode = ESourceMode::kAny) = 0;
virtual void Flush() = 0;
};
diff --git a/src/zenremotestore/jupiter/jupitersession.cpp b/src/zenremotestore/jupiter/jupitersession.cpp
index 1bc6564ce..52f9eb678 100644
--- a/src/zenremotestore/jupiter/jupitersession.cpp
+++ b/src/zenremotestore/jupiter/jupitersession.cpp
@@ -852,6 +852,71 @@ JupiterSession::GetBuildBlob(std::string_view Namespace,
return detail::ConvertResponse(Response, "JupiterSession::GetBuildBlob"sv);
}
+BuildBlobRangesResult
+JupiterSession::GetBuildBlob(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const IoHash& Hash,
+ std::filesystem::path TempFolderPath,
+ std::span<const std::pair<uint64_t, uint64_t>> Ranges)
+{
+ HttpClient::KeyValueMap Headers;
+ if (!Ranges.empty())
+ {
+ ExtendableStringBuilder<512> SB;
+ for (const std::pair<uint64_t, uint64_t>& R : Ranges)
+ {
+ if (SB.Size() > 0)
+ {
+ SB << ", ";
+ }
+ SB << R.first << "-" << R.first + R.second - 1;
+ }
+ Headers.Entries.insert({"Range", fmt::format("bytes={}", SB.ToView())});
+ }
+ std::string Url = fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}?supportsRedirect={}",
+ Namespace,
+ BucketId,
+ BuildId,
+ Hash.ToHexString(),
+ m_AllowRedirect ? "true"sv : "false"sv);
+
+ HttpClient::Response Response = m_HttpClient.Download(Url, TempFolderPath, Headers);
+ if (Response.StatusCode == HttpResponseCode::RangeNotSatisfiable && Ranges.size() > 1)
+ {
+ // Requests to Jupiter that is not served via nginx (content not stored locally in the file system) can not serve multi-range
+ // requests (asp.net limitation) This rejection is not implemented as of 2026-03-02, it is in the backlog (@joakim.lindqvist)
+ // If we encounter this error we fall back to a single range which covers all the requested ranges
+ uint64_t RangeStart = Ranges.front().first;
+ uint64_t RangeEnd = Ranges.back().first + Ranges.back().second - 1;
+ Headers.Entries.insert_or_assign("Range", fmt::format("bytes={}-{}", RangeStart, RangeEnd));
+ Response = m_HttpClient.Download(Url, TempFolderPath, Headers);
+ }
+ if (Response.IsSuccess())
+ {
+ // If we get a redirect to S3 or a non-Jupiter endpoint the content type will not be correct, validate it and set it
+ if (m_AllowRedirect && (Response.ResponsePayload.GetContentType() == HttpContentType::kBinary))
+ {
+ IoHash ValidateRawHash;
+ uint64_t ValidateRawSize = 0;
+ if (!Headers.Entries.contains("Range"))
+ {
+ ZEN_ASSERT_SLOW(CompressedBuffer::ValidateCompressedHeader(Response.ResponsePayload,
+ ValidateRawHash,
+ ValidateRawSize,
+ /*OutOptionalTotalCompressedSize*/ nullptr));
+ ZEN_ASSERT_SLOW(ValidateRawHash == Hash);
+ ZEN_ASSERT_SLOW(ValidateRawSize > 0);
+ ZEN_UNUSED(ValidateRawHash, ValidateRawSize);
+ Response.ResponsePayload.SetContentType(ZenContentType::kCompressedBinary);
+ }
+ }
+ }
+ BuildBlobRangesResult Result = {detail::ConvertResponse(Response, "JupiterSession::GetBuildBlob"sv)};
+ Result.Ranges = Response.GetRanges(Ranges);
+ return Result;
+}
+
JupiterResult
JupiterSession::PutBlockMetadata(std::string_view Namespace,
std::string_view BucketId,
diff --git a/src/zenremotestore/projectstore/buildsremoteprojectstore.cpp b/src/zenremotestore/projectstore/buildsremoteprojectstore.cpp
index c42373e4d..3400cdbf5 100644
--- a/src/zenremotestore/projectstore/buildsremoteprojectstore.cpp
+++ b/src/zenremotestore/projectstore/buildsremoteprojectstore.cpp
@@ -478,7 +478,6 @@ public:
*m_BuildStorage,
m_BuildCacheStorage.get(),
m_BuildId,
- m_OplogBuildPartId,
BlockHashes,
/*AttemptFallback*/ false,
/*IsQuiet*/ false,
@@ -549,7 +548,7 @@ public:
return Result;
}
- virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash, const AttachmentRange& Range) override
+ virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash, ESourceMode SourceMode) override
{
ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero);
@@ -559,25 +558,90 @@ public:
try
{
- if (m_BuildCacheStorage)
+ if (m_BuildCacheStorage && SourceMode != ESourceMode::kHostOnly)
{
- IoBuffer CachedBlob = m_BuildCacheStorage->GetBuildBlob(m_BuildId, RawHash, Range.Offset, Range.Bytes);
+ IoBuffer CachedBlob = m_BuildCacheStorage->GetBuildBlob(m_BuildId, RawHash);
if (CachedBlob)
{
Result.Bytes = std::move(CachedBlob);
}
}
- if (!Result.Bytes)
+ if (!Result.Bytes && SourceMode != ESourceMode::kCacheOnly)
{
- Result.Bytes = m_BuildStorage->GetBuildBlob(m_BuildId, RawHash, Range.Offset, Range.Bytes);
+ Result.Bytes = m_BuildStorage->GetBuildBlob(m_BuildId, RawHash);
if (m_BuildCacheStorage && Result.Bytes && m_PopulateCache)
{
- if (!Range)
+ m_BuildCacheStorage->PutBuildBlob(m_BuildId,
+ RawHash,
+ Result.Bytes.GetContentType(),
+ CompositeBuffer(SharedBuffer(Result.Bytes)));
+ }
+ }
+ }
+ catch (const HttpClientError& Ex)
+ {
+ Result.ErrorCode = MakeErrorCode(Ex);
+ Result.Reason = fmt::format("Failed getting blob {}/{}/{}/{}/{}. Reason: '{}'",
+ m_BuildStorageHttp.GetBaseUri(),
+ m_Namespace,
+ m_Bucket,
+ m_BuildId,
+ RawHash,
+ Ex.what());
+ }
+ catch (const std::exception& Ex)
+ {
+ Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
+ Result.Reason = fmt::format("Failed getting blob {}/{}/{}/{}/{}. Reason: '{}'",
+ m_BuildStorageHttp.GetBaseUri(),
+ m_Namespace,
+ m_Bucket,
+ m_BuildId,
+ RawHash,
+ Ex.what());
+ }
+
+ return Result;
+ }
+
+ virtual LoadAttachmentRangesResult LoadAttachmentRanges(const IoHash& RawHash,
+ std::span<const std::pair<uint64_t, uint64_t>> Ranges,
+ ESourceMode SourceMode) override
+ {
+ LoadAttachmentRangesResult Result;
+ Stopwatch Timer;
+ auto _ = MakeGuard([&Timer, &Result]() { Result.ElapsedSeconds = Timer.GetElapsedTimeUs() / 1000000.0; });
+
+ try
+ {
+ if (m_BuildCacheStorage && SourceMode != ESourceMode::kHostOnly)
+ {
+ BuildStorageCache::BuildBlobRanges BlobRanges = m_BuildCacheStorage->GetBuildBlobRanges(m_BuildId, RawHash, Ranges);
+ if (BlobRanges.PayloadBuffer)
+ {
+ Result.Bytes = std::move(BlobRanges.PayloadBuffer);
+ Result.Ranges = std::move(BlobRanges.Ranges);
+ }
+ }
+ if (!Result.Bytes && SourceMode != ESourceMode::kCacheOnly)
+ {
+ BuildStorageBase::BuildBlobRanges BlobRanges = m_BuildStorage->GetBuildBlobRanges(m_BuildId, RawHash, Ranges);
+ if (BlobRanges.PayloadBuffer)
+ {
+ Result.Bytes = std::move(BlobRanges.PayloadBuffer);
+ Result.Ranges = std::move(BlobRanges.Ranges);
+
+ if (Result.Ranges.empty())
{
- m_BuildCacheStorage->PutBuildBlob(m_BuildId,
- RawHash,
- Result.Bytes.GetContentType(),
- CompositeBuffer(SharedBuffer(Result.Bytes)));
+ // Jupiter will ignore the ranges and send the whole payload if it fetches the payload from S3/Replicated
+ // Upload to cache (if enabled)
+ if (m_BuildCacheStorage && Result.Bytes && m_PopulateCache)
+ {
+ m_BuildCacheStorage->PutBuildBlob(m_BuildId,
+ RawHash,
+ Result.Bytes.GetContentType(),
+ CompositeBuffer(SharedBuffer(Result.Bytes)));
+ }
}
}
}
@@ -585,28 +649,32 @@ public:
catch (const HttpClientError& Ex)
{
Result.ErrorCode = MakeErrorCode(Ex);
- Result.Reason = fmt::format("Failed listing known blocks for {}/{}/{}/{}. Reason: '{}'",
+ Result.Reason = fmt::format("Failed getting {} ranges for blob {}/{}/{}/{}/{}. Reason: '{}'",
+ Ranges.size(),
m_BuildStorageHttp.GetBaseUri(),
m_Namespace,
m_Bucket,
m_BuildId,
+ RawHash,
Ex.what());
}
catch (const std::exception& Ex)
{
Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
- Result.Reason = fmt::format("Failed listing known blocks for {}/{}/{}/{}. Reason: '{}'",
+ Result.Reason = fmt::format("Failed getting {} ranges for blob {}/{}/{}/{}/{}. Reason: '{}'",
+ Ranges.size(),
m_BuildStorageHttp.GetBaseUri(),
m_Namespace,
m_Bucket,
m_BuildId,
+ RawHash,
Ex.what());
}
return Result;
}
- virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes) override
+ virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes, ESourceMode SourceMode) override
{
LoadAttachmentsResult Result;
Stopwatch Timer;
@@ -614,7 +682,7 @@ public:
std::vector<IoHash> AttachmentsLeftToFind = RawHashes;
- if (m_BuildCacheStorage)
+ if (m_BuildCacheStorage && SourceMode != ESourceMode::kHostOnly)
{
std::vector<BuildStorageCache::BlobExistsResult> ExistCheck = m_BuildCacheStorage->BlobsExists(m_BuildId, RawHashes);
if (ExistCheck.size() == RawHashes.size())
@@ -648,7 +716,7 @@ public:
for (const IoHash& Hash : AttachmentsLeftToFind)
{
- LoadAttachmentResult ChunkResult = LoadAttachment(Hash, {});
+ LoadAttachmentResult ChunkResult = LoadAttachment(Hash, SourceMode);
if (ChunkResult.ErrorCode)
{
return LoadAttachmentsResult{ChunkResult};
diff --git a/src/zenremotestore/projectstore/fileremoteprojectstore.cpp b/src/zenremotestore/projectstore/fileremoteprojectstore.cpp
index ec7fb7bbc..f950fd46c 100644
--- a/src/zenremotestore/projectstore/fileremoteprojectstore.cpp
+++ b/src/zenremotestore/projectstore/fileremoteprojectstore.cpp
@@ -228,28 +228,62 @@ public:
return AttachmentExistsInCacheResult{Result{.ErrorCode = 0}, std::vector<bool>(RawHashes.size(), false)};
}
- virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash, const AttachmentRange& Range) override
+ virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash, ESourceMode SourceMode) override
{
- Stopwatch Timer;
- LoadAttachmentResult Result;
- std::filesystem::path ChunkPath = GetAttachmentPath(RawHash);
- if (!IsFile(ChunkPath))
+ Stopwatch Timer;
+ LoadAttachmentResult Result;
+ if (SourceMode != ESourceMode::kCacheOnly)
{
- Result.ErrorCode = gsl::narrow<int>(HttpResponseCode::NotFound);
- Result.Reason = fmt::format("Failed loading oplog attachment from '{}'. Reason: 'The file does not exist'", ChunkPath.string());
- Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0;
- return Result;
+ std::filesystem::path ChunkPath = GetAttachmentPath(RawHash);
+ if (!IsFile(ChunkPath))
+ {
+ Result.ErrorCode = gsl::narrow<int>(HttpResponseCode::NotFound);
+ Result.Reason =
+ fmt::format("Failed loading oplog attachment from '{}'. Reason: 'The file does not exist'", ChunkPath.string());
+ Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0;
+ return Result;
+ }
+ {
+ BasicFile ChunkFile;
+ ChunkFile.Open(ChunkPath, BasicFile::Mode::kRead);
+ Result.Bytes = ChunkFile.ReadAll();
+ }
}
+ AddStats(0, Result.Bytes.GetSize(), Timer.GetElapsedTimeUs() * 1000);
+ Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0;
+ return Result;
+ }
+
+ virtual LoadAttachmentRangesResult LoadAttachmentRanges(const IoHash& RawHash,
+ std::span<const std::pair<uint64_t, uint64_t>> Ranges,
+ ESourceMode SourceMode) override
+ {
+ Stopwatch Timer;
+ LoadAttachmentRangesResult Result;
+ if (SourceMode != ESourceMode::kCacheOnly)
{
- BasicFile ChunkFile;
- ChunkFile.Open(ChunkPath, BasicFile::Mode::kRead);
- if (Range)
+ std::filesystem::path ChunkPath = GetAttachmentPath(RawHash);
+ if (!IsFile(ChunkPath))
{
- Result.Bytes = ChunkFile.ReadRange(Range.Offset, Range.Bytes);
+ Result.ErrorCode = gsl::narrow<int>(HttpResponseCode::NotFound);
+ Result.Reason =
+ fmt::format("Failed loading oplog attachment from '{}'. Reason: 'The file does not exist'", ChunkPath.string());
+ Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0;
+ return Result;
}
- else
{
- Result.Bytes = ChunkFile.ReadAll();
+ BasicFile ChunkFile;
+ ChunkFile.Open(ChunkPath, BasicFile::Mode::kRead);
+
+ uint64_t Start = Ranges.front().first;
+ uint64_t Length = Ranges.back().first + Ranges.back().second - Ranges.front().first;
+
+ Result.Bytes = ChunkFile.ReadRange(Start, Length);
+ Result.Ranges.reserve(Ranges.size());
+ for (const std::pair<uint64_t, uint64_t>& Range : Ranges)
+ {
+ Result.Ranges.push_back(std::make_pair(Range.first - Start, Range.second));
+ }
}
}
AddStats(0, Result.Bytes.GetSize(), Timer.GetElapsedTimeUs() * 1000);
@@ -257,13 +291,13 @@ public:
return Result;
}
- virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes) override
+ virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes, ESourceMode SourceMode) override
{
Stopwatch Timer;
LoadAttachmentsResult Result;
for (const IoHash& Hash : RawHashes)
{
- LoadAttachmentResult ChunkResult = LoadAttachment(Hash, {});
+ LoadAttachmentResult ChunkResult = LoadAttachment(Hash, SourceMode);
if (ChunkResult.ErrorCode)
{
ChunkResult.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0;
diff --git a/src/zenremotestore/projectstore/jupiterremoteprojectstore.cpp b/src/zenremotestore/projectstore/jupiterremoteprojectstore.cpp
index f8179831c..514484f30 100644
--- a/src/zenremotestore/projectstore/jupiterremoteprojectstore.cpp
+++ b/src/zenremotestore/projectstore/jupiterremoteprojectstore.cpp
@@ -223,34 +223,62 @@ public:
return AttachmentExistsInCacheResult{Result{.ErrorCode = 0}, std::vector<bool>(RawHashes.size(), false)};
}
- virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash, const AttachmentRange& Range) override
+ virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash, ESourceMode SourceMode) override
{
- JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect);
- JupiterResult GetResult = Session.GetCompressedBlob(m_Namespace, RawHash, m_TempFilePath);
- AddStats(GetResult);
-
- LoadAttachmentResult Result{ConvertResult(GetResult), std::move(GetResult.Response)};
- if (GetResult.ErrorCode)
+ LoadAttachmentResult Result;
+ if (SourceMode != ESourceMode::kCacheOnly)
{
- Result.Reason = fmt::format("Failed fetching oplog attachment from {}/{}/{}. Reason: '{}'",
- m_JupiterClient->ServiceUrl(),
- m_Namespace,
- RawHash,
- Result.Reason);
+ JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect);
+ JupiterResult GetResult = Session.GetCompressedBlob(m_Namespace, RawHash, m_TempFilePath);
+ AddStats(GetResult);
+
+ Result = {ConvertResult(GetResult), std::move(GetResult.Response)};
+ if (GetResult.ErrorCode)
+ {
+ Result.Reason = fmt::format("Failed fetching oplog attachment from {}/{}/{}. Reason: '{}'",
+ m_JupiterClient->ServiceUrl(),
+ m_Namespace,
+ RawHash,
+ Result.Reason);
+ }
}
- if (!Result.ErrorCode && Range)
+ return Result;
+ }
+
+ virtual LoadAttachmentRangesResult LoadAttachmentRanges(const IoHash& RawHash,
+ std::span<const std::pair<uint64_t, uint64_t>> Ranges,
+ ESourceMode SourceMode) override
+ {
+ LoadAttachmentRangesResult Result;
+ if (SourceMode != ESourceMode::kCacheOnly)
{
- Result.Bytes = IoBuffer(Result.Bytes, Range.Offset, Range.Bytes);
+ JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect);
+ JupiterResult GetResult = Session.GetCompressedBlob(m_Namespace, RawHash, m_TempFilePath);
+ AddStats(GetResult);
+
+ Result = LoadAttachmentRangesResult{ConvertResult(GetResult), std::move(GetResult.Response)};
+ if (GetResult.ErrorCode)
+ {
+ Result.Reason = fmt::format("Failed fetching oplog attachment from {}/{}/{}. Reason: '{}'",
+ m_JupiterClient->ServiceUrl(),
+ m_Namespace,
+ RawHash,
+ Result.Reason);
+ }
+ else
+ {
+ Result.Ranges = std::vector<std::pair<uint64_t, uint64_t>>(Ranges.begin(), Ranges.end());
+ }
}
return Result;
}
- virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes) override
+ virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes, ESourceMode SourceMode) override
{
LoadAttachmentsResult Result;
for (const IoHash& Hash : RawHashes)
{
- LoadAttachmentResult ChunkResult = LoadAttachment(Hash, {});
+ LoadAttachmentResult ChunkResult = LoadAttachment(Hash, SourceMode);
if (ChunkResult.ErrorCode)
{
return LoadAttachmentsResult{ChunkResult};
diff --git a/src/zenremotestore/projectstore/remoteprojectstore.cpp b/src/zenremotestore/projectstore/remoteprojectstore.cpp
index 2a9da6f58..1882f599a 100644
--- a/src/zenremotestore/projectstore/remoteprojectstore.cpp
+++ b/src/zenremotestore/projectstore/remoteprojectstore.cpp
@@ -339,9 +339,10 @@ namespace remotestore_impl {
uint64_t ChunkSize = It.second.GetCompressedSize();
Info.AttachmentBytesDownloaded.fetch_add(ChunkSize);
}
- ZEN_INFO("Loaded {} bulk attachments in {}",
- Chunks.size(),
- NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)));
+ remotestore_impl::ReportMessage(OptionalContext,
+ fmt::format("Loaded {} bulk attachments in {}",
+ Chunks.size(),
+ NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000))));
if (RemoteResult.IsError())
{
return;
@@ -446,7 +447,7 @@ namespace remotestore_impl {
{
uint64_t Unset = (std::uint64_t)-1;
DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs());
- RemoteProjectStore::LoadAttachmentResult BlockResult = RemoteStore.LoadAttachment(BlockHash, {});
+ RemoteProjectStore::LoadAttachmentResult BlockResult = RemoteStore.LoadAttachment(BlockHash);
if (BlockResult.ErrorCode)
{
ReportMessage(OptionalContext,
@@ -506,50 +507,100 @@ namespace remotestore_impl {
IoHash RawHash;
uint64_t RawSize;
CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Bytes), RawHash, RawSize);
+
+ std::string ErrorString;
+
if (!Compressed)
{
- if (RetriesLeft > 0)
+ ErrorString =
+ fmt::format("Block attachment {} is malformed, can't parse as compressed binary", BlockHash);
+ }
+ else if (RawHash != BlockHash)
+ {
+ ErrorString = fmt::format("Block attachment {} has mismatching raw hash ({})", BlockHash, RawHash);
+ }
+ else if (CompositeBuffer BlockPayload = Compressed.DecompressToComposite(); !BlockPayload)
+ {
+ ErrorString = fmt::format("Block attachment {} is malformed, can't decompress payload", BlockHash);
+ }
+ else
+ {
+ uint64_t PotentialSize = 0;
+ uint64_t UsedSize = 0;
+ uint64_t BlockSize = BlockPayload.GetSize();
+
+ uint64_t BlockHeaderSize = 0;
+
+ bool StoreChunksOK = IterateChunkBlock(
+ BlockPayload.Flatten(),
+ [&AllNeededPartialChunkHashesLookup,
+ &ChunkDownloadedFlags,
+ &WriteAttachmentBuffers,
+ &WriteRawHashes,
+ &Info,
+ &PotentialSize](CompressedBuffer&& Chunk, const IoHash& AttachmentRawHash) {
+ auto ChunkIndexIt = AllNeededPartialChunkHashesLookup.find(AttachmentRawHash);
+ if (ChunkIndexIt != AllNeededPartialChunkHashesLookup.end())
+ {
+ bool Expected = false;
+ if (ChunkDownloadedFlags[ChunkIndexIt->second].compare_exchange_strong(Expected, true))
+ {
+ WriteAttachmentBuffers.emplace_back(Chunk.GetCompressed().Flatten().AsIoBuffer());
+ IoHash RawHash;
+ uint64_t RawSize;
+ ZEN_ASSERT(CompressedBuffer::ValidateCompressedHeader(
+ WriteAttachmentBuffers.back(),
+ RawHash,
+ RawSize,
+ /*OutOptionalTotalCompressedSize*/ nullptr));
+ ZEN_ASSERT(RawHash == AttachmentRawHash);
+ WriteRawHashes.emplace_back(AttachmentRawHash);
+ PotentialSize += WriteAttachmentBuffers.back().GetSize();
+ }
+ }
+ },
+ BlockHeaderSize);
+
+ if (!StoreChunksOK)
{
- ReportMessage(
- OptionalContext,
- fmt::format(
- "Block attachment {} is malformed, can't parse as compressed binary, retrying download",
- BlockHash));
- return DownloadAndSaveBlock(ChunkStore,
- RemoteStore,
- IgnoreMissingAttachments,
- OptionalContext,
- NetworkWorkerPool,
- WorkerPool,
- AttachmentsDownloadLatch,
- AttachmentsWriteLatch,
- RemoteResult,
- Info,
- LoadAttachmentsTimer,
- DownloadStartMS,
- BlockHash,
- AllNeededPartialChunkHashesLookup,
- ChunkDownloadedFlags,
- RetriesLeft - 1);
+ ErrorString = fmt::format("Invalid format for block {}", BlockHash);
+ }
+ else
+ {
+ if (!WriteAttachmentBuffers.empty())
+ {
+ std::vector<CidStore::InsertResult> Results =
+ ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes);
+ for (size_t Index = 0; Index < Results.size(); Index++)
+ {
+ const CidStore::InsertResult& Result = Results[Index];
+ if (Result.New)
+ {
+ Info.AttachmentBytesStored.fetch_add(WriteAttachmentBuffers[Index].GetSize());
+ Info.AttachmentsStored.fetch_add(1);
+ UsedSize += WriteAttachmentBuffers[Index].GetSize();
+ }
+ }
+ if (UsedSize < BlockSize)
+ {
+ ZEN_DEBUG("Used {} (skipping {}) out of {} for block {} ({} %) (use of matching {}%)",
+ NiceBytes(UsedSize),
+ NiceBytes(BlockSize - UsedSize),
+ NiceBytes(BlockSize),
+ BlockHash,
+ (100 * UsedSize) / BlockSize,
+ PotentialSize > 0 ? (UsedSize * 100) / PotentialSize : 0);
+ }
+ }
}
- ReportMessage(
- OptionalContext,
- fmt::format("Block attachment {} is malformed, can't parse as compressed binary", BlockHash));
- RemoteResult.SetError(
- gsl::narrow<int32_t>(HttpResponseCode::InternalServerError),
- fmt::format("Block attachment {} is malformed, can't parse as compressed binary", BlockHash),
- {});
- return;
}
- CompositeBuffer BlockPayload = Compressed.DecompressToComposite();
- if (!BlockPayload)
+
+ if (!ErrorString.empty())
{
if (RetriesLeft > 0)
{
- ReportMessage(
- OptionalContext,
- fmt::format("Block attachment {} is malformed, can't decompress payload, retrying download",
- BlockHash));
+ ReportMessage(OptionalContext, fmt::format("{}, retrying download", ErrorString));
+
return DownloadAndSaveBlock(ChunkStore,
RemoteStore,
IgnoreMissingAttachments,
@@ -567,94 +618,12 @@ namespace remotestore_impl {
ChunkDownloadedFlags,
RetriesLeft - 1);
}
- ReportMessage(OptionalContext,
- fmt::format("Block attachment {} is malformed, can't decompress payload", BlockHash));
- RemoteResult.SetError(
- gsl::narrow<int32_t>(HttpResponseCode::InternalServerError),
- fmt::format("Block attachment {} is malformed, can't decompress payload", BlockHash),
- {});
- return;
- }
- if (RawHash != BlockHash)
- {
- ReportMessage(OptionalContext,
- fmt::format("Block attachment {} has mismatching raw hash ({})", BlockHash, RawHash));
- RemoteResult.SetError(
- gsl::narrow<int32_t>(HttpResponseCode::InternalServerError),
- fmt::format("Block attachment {} has mismatching raw hash ({})", BlockHash, RawHash),
- {});
- return;
- }
-
- uint64_t PotentialSize = 0;
- uint64_t UsedSize = 0;
- uint64_t BlockSize = BlockPayload.GetSize();
-
- uint64_t BlockHeaderSize = 0;
-
- bool StoreChunksOK = IterateChunkBlock(
- BlockPayload.Flatten(),
- [&AllNeededPartialChunkHashesLookup,
- &ChunkDownloadedFlags,
- &WriteAttachmentBuffers,
- &WriteRawHashes,
- &Info,
- &PotentialSize](CompressedBuffer&& Chunk, const IoHash& AttachmentRawHash) {
- auto ChunkIndexIt = AllNeededPartialChunkHashesLookup.find(AttachmentRawHash);
- if (ChunkIndexIt != AllNeededPartialChunkHashesLookup.end())
- {
- bool Expected = false;
- if (ChunkDownloadedFlags[ChunkIndexIt->second].compare_exchange_strong(Expected, true))
- {
- WriteAttachmentBuffers.emplace_back(Chunk.GetCompressed().Flatten().AsIoBuffer());
- IoHash RawHash;
- uint64_t RawSize;
- ZEN_ASSERT(
- CompressedBuffer::ValidateCompressedHeader(WriteAttachmentBuffers.back(),
- RawHash,
- RawSize,
- /*OutOptionalTotalCompressedSize*/ nullptr));
- ZEN_ASSERT(RawHash == AttachmentRawHash);
- WriteRawHashes.emplace_back(AttachmentRawHash);
- PotentialSize += WriteAttachmentBuffers.back().GetSize();
- }
- }
- },
- BlockHeaderSize);
-
- if (!StoreChunksOK)
- {
- ReportMessage(OptionalContext,
- fmt::format("Block attachment {} has invalid format ({}): {}",
- BlockHash,
- RemoteResult.GetError(),
- RemoteResult.GetErrorReason()));
- RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::InternalServerError),
- fmt::format("Invalid format for block {}", BlockHash),
- {});
- return;
- }
-
- if (!WriteAttachmentBuffers.empty())
- {
- auto Results = ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes);
- for (size_t Index = 0; Index < Results.size(); Index++)
+ else
{
- const auto& Result = Results[Index];
- if (Result.New)
- {
- Info.AttachmentBytesStored.fetch_add(WriteAttachmentBuffers[Index].GetSize());
- Info.AttachmentsStored.fetch_add(1);
- UsedSize += WriteAttachmentBuffers[Index].GetSize();
- }
+ ReportMessage(OptionalContext, ErrorString);
+ RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::InternalServerError), ErrorString, {});
+ return;
}
- ZEN_DEBUG("Used {} (matching {}) out of {} for block {} ({} %) (use of matching {}%)",
- NiceBytes(UsedSize),
- NiceBytes(PotentialSize),
- NiceBytes(BlockSize),
- BlockHash,
- (100 * UsedSize) / BlockSize,
- PotentialSize > 0 ? (UsedSize * 100) / PotentialSize : 0);
}
}
catch (const std::exception& Ex)
@@ -676,6 +645,119 @@ namespace remotestore_impl {
WorkerThreadPool::EMode::EnableBacklog);
};
+ bool DownloadPartialBlock(RemoteProjectStore& RemoteStore,
+ bool IgnoreMissingAttachments,
+ JobContext* OptionalContext,
+ AsyncRemoteResult& RemoteResult,
+ DownloadInfo& Info,
+ double& DownloadTimeSeconds,
+ const ChunkBlockDescription& BlockDescription,
+ bool BlockExistsInCache,
+ std::span<const ChunkBlockAnalyser::BlockRangeDescriptor> BlockRangeDescriptors,
+ size_t BlockRangeIndexStart,
+ size_t BlockRangeCount,
+ std::function<void(IoBuffer&& Buffer,
+ size_t BlockRangeStartIndex,
+ std::span<const std::pair<uint64_t, uint64_t>> OffsetAndLengths)>&& OnDownloaded)
+ {
+ std::vector<std::pair<uint64_t, uint64_t>> Ranges;
+ Ranges.reserve(BlockRangeDescriptors.size());
+ for (size_t BlockRangeIndex = BlockRangeIndexStart; BlockRangeIndex < BlockRangeIndexStart + BlockRangeCount; BlockRangeIndex++)
+ {
+ const ChunkBlockAnalyser::BlockRangeDescriptor& BlockRange = BlockRangeDescriptors[BlockRangeIndex];
+ Ranges.push_back(std::make_pair(BlockRange.RangeStart, BlockRange.RangeLength));
+ }
+
+ if (BlockExistsInCache)
+ {
+ RemoteProjectStore::LoadAttachmentRangesResult BlockResult =
+ RemoteStore.LoadAttachmentRanges(BlockDescription.BlockHash, Ranges, RemoteProjectStore::ESourceMode::kCacheOnly);
+ DownloadTimeSeconds += BlockResult.ElapsedSeconds;
+ if (RemoteResult.IsError())
+ {
+ return false;
+ }
+ if (!BlockResult.ErrorCode && BlockResult.Bytes)
+ {
+ if (BlockResult.Ranges.size() != Ranges.size())
+ {
+ throw std::runtime_error(fmt::format("Fetching {} ranges from {} resulted in {} ranges",
+ Ranges.size(),
+ BlockDescription.BlockHash,
+ BlockResult.Ranges.size()));
+ }
+ OnDownloaded(std::move(BlockResult.Bytes), BlockRangeIndexStart, BlockResult.Ranges);
+ return true;
+ }
+ }
+
+ const size_t MaxRangesPerRequestToJupiter = RemoteProjectStore::MaxRangeCountPerRequest;
+
+ size_t SubBlockRangeCount = BlockRangeCount;
+ size_t SubRangeCountComplete = 0;
+ std::span<const std::pair<uint64_t, uint64_t>> RangesSpan(Ranges);
+ while (SubRangeCountComplete < SubBlockRangeCount)
+ {
+ if (RemoteResult.IsError())
+ {
+ break;
+ }
+ size_t SubRangeCount = Min(BlockRangeCount - SubRangeCountComplete, MaxRangesPerRequestToJupiter);
+ size_t SubRangeStartIndex = BlockRangeIndexStart + SubRangeCountComplete;
+
+ auto SubRanges = RangesSpan.subspan(SubRangeCountComplete, SubRangeCount);
+
+ RemoteProjectStore::LoadAttachmentRangesResult BlockResult =
+ RemoteStore.LoadAttachmentRanges(BlockDescription.BlockHash, SubRanges, RemoteProjectStore::ESourceMode::kHostOnly);
+ DownloadTimeSeconds += BlockResult.ElapsedSeconds;
+ if (RemoteResult.IsError())
+ {
+ return false;
+ }
+ if (BlockResult.ErrorCode || !BlockResult.Bytes)
+ {
+ ReportMessage(OptionalContext,
+ fmt::format("Failed to download {} ranges from block attachment '{}' ({}): {}",
+ SubRanges.size(),
+ BlockDescription.BlockHash,
+ BlockResult.ErrorCode,
+ BlockResult.Reason));
+ Info.MissingAttachmentCount.fetch_add(1);
+ if (!IgnoreMissingAttachments)
+ {
+ RemoteResult.SetError(BlockResult.ErrorCode, BlockResult.Reason, BlockResult.Text);
+ return false;
+ }
+ }
+ else
+ {
+ if (BlockResult.Ranges.empty())
+ {
+ // Jupiter will ignore the ranges and send the whole payload if it fetches the payload from S3
+ // Use the whole payload for the remaining ranges
+ SubRangeCount = Ranges.size() - SubRangeCountComplete;
+ OnDownloaded(std::move(BlockResult.Bytes),
+ SubRangeStartIndex,
+ RangesSpan.subspan(SubRangeCountComplete, SubRangeCount));
+ }
+ else
+ {
+ if (BlockResult.Ranges.size() != SubRanges.size())
+ {
+ throw std::runtime_error(fmt::format("Fetching {} ranges from {} resulted in {} ranges",
+ SubRanges.size(),
+ BlockDescription.BlockHash,
+ BlockResult.Ranges.size()));
+ }
+ OnDownloaded(std::move(BlockResult.Bytes), SubRangeStartIndex, BlockResult.Ranges);
+ }
+ }
+
+ SubRangeCountComplete += SubRangeCount;
+ }
+ return true;
+ }
+
void DownloadAndSavePartialBlock(CidStore& ChunkStore,
RemoteProjectStore& RemoteStore,
bool IgnoreMissingAttachments,
@@ -689,6 +771,7 @@ namespace remotestore_impl {
Stopwatch& LoadAttachmentsTimer,
std::atomic_uint64_t& DownloadStartMS,
const ChunkBlockDescription& BlockDescription,
+ bool BlockExistsInCache,
std::span<const ChunkBlockAnalyser::BlockRangeDescriptor> BlockRangeDescriptors,
size_t BlockRangeIndexStart,
size_t BlockRangeCount,
@@ -710,13 +793,14 @@ namespace remotestore_impl {
&DownloadStartMS,
IgnoreMissingAttachments,
OptionalContext,
- RetriesLeft,
BlockDescription,
+ BlockExistsInCache,
BlockRangeDescriptors,
BlockRangeIndexStart,
BlockRangeCount,
&AllNeededPartialChunkHashesLookup,
- ChunkDownloadedFlags]() {
+ ChunkDownloadedFlags,
+ RetriesLeft]() {
ZEN_TRACE_CPU("DownloadBlockRanges");
auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); });
@@ -728,230 +812,240 @@ namespace remotestore_impl {
double DownloadElapsedSeconds = 0;
uint64_t DownloadedBytes = 0;
- for (size_t BlockRangeIndex = BlockRangeIndexStart; BlockRangeIndex < BlockRangeIndexStart + BlockRangeCount;
- BlockRangeIndex++)
- {
- if (RemoteResult.IsError())
- {
- return;
- }
-
- const ChunkBlockAnalyser::BlockRangeDescriptor& BlockRange = BlockRangeDescriptors[BlockRangeIndex];
+ bool Success = DownloadPartialBlock(
+ RemoteStore,
+ IgnoreMissingAttachments,
+ OptionalContext,
+ RemoteResult,
+ Info,
+ DownloadElapsedSeconds,
+ BlockDescription,
+ BlockExistsInCache,
+ BlockRangeDescriptors,
+ BlockRangeIndexStart,
+ BlockRangeCount,
+ [&](IoBuffer&& Buffer,
+ size_t BlockRangeStartIndex,
+ std::span<const std::pair<uint64_t, uint64_t>> OffsetAndLengths) {
+ uint64_t BlockPartSize = Buffer.GetSize();
+ DownloadedBytes += BlockPartSize;
+
+ Info.AttachmentBlockRangeBytesDownloaded.fetch_add(BlockPartSize);
+ Info.AttachmentBlocksRangesDownloaded++;
+
+ AttachmentsWriteLatch.AddCount(1);
+ WorkerPool.ScheduleWork(
+ [&AttachmentsWriteLatch,
+ &ChunkStore,
+ &RemoteStore,
+ &NetworkWorkerPool,
+ &WorkerPool,
+ &AttachmentsDownloadLatch,
+ &RemoteResult,
+ &Info,
+ &LoadAttachmentsTimer,
+ &DownloadStartMS,
+ IgnoreMissingAttachments,
+ OptionalContext,
+ BlockDescription,
+ BlockExistsInCache,
+ BlockRangeDescriptors,
+ BlockRangeStartIndex,
+ &AllNeededPartialChunkHashesLookup,
+ ChunkDownloadedFlags,
+ RetriesLeft,
+ BlockPayload = std::move(Buffer),
+ OffsetAndLengths =
+ std::vector<std::pair<uint64_t, uint64_t>>(OffsetAndLengths.begin(), OffsetAndLengths.end())]() {
+ auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); });
+ try
+ {
+ ZEN_ASSERT(BlockPayload.Size() > 0);
- RemoteProjectStore::LoadAttachmentResult BlockResult =
- RemoteStore.LoadAttachment(BlockDescription.BlockHash,
- {.Offset = BlockRange.RangeStart, .Bytes = BlockRange.RangeLength});
- if (BlockResult.ErrorCode)
- {
- ReportMessage(OptionalContext,
- fmt::format("Failed to download block attachment '{}' range {},{} ({}): {}",
- BlockDescription.BlockHash,
- BlockRange.RangeStart,
- BlockRange.RangeLength,
- BlockResult.ErrorCode,
- BlockResult.Reason));
- Info.MissingAttachmentCount.fetch_add(1);
- if (!IgnoreMissingAttachments)
- {
- RemoteResult.SetError(BlockResult.ErrorCode, BlockResult.Reason, BlockResult.Text);
- }
- return;
- }
- if (RemoteResult.IsError())
- {
- return;
- }
- uint64_t BlockPartSize = BlockResult.Bytes.GetSize();
- if (BlockPartSize != BlockRange.RangeLength)
- {
- std::string ErrorString =
- fmt::format("Failed to download block attachment '{}' range {},{}, got {} bytes ({}): {}",
- BlockDescription.BlockHash,
- BlockRange.RangeStart,
- BlockRange.RangeLength,
- BlockPartSize,
- RemoteResult.GetError(),
- RemoteResult.GetErrorReason());
-
- ReportMessage(OptionalContext, ErrorString);
- Info.MissingAttachmentCount.fetch_add(1);
- if (!IgnoreMissingAttachments)
- {
- RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::NotFound),
- "Mismatching block part range received",
- ErrorString);
- }
- return;
- }
- Info.AttachmentBlocksRangesDownloaded.fetch_add(1);
+ size_t RangeCount = OffsetAndLengths.size();
+ for (size_t RangeOffset = 0; RangeOffset < RangeCount; RangeOffset++)
+ {
+ if (RemoteResult.IsError())
+ {
+ return;
+ }
- DownloadElapsedSeconds += BlockResult.ElapsedSeconds;
- DownloadedBytes += BlockPartSize;
+ const ChunkBlockAnalyser::BlockRangeDescriptor& BlockRange =
+ BlockRangeDescriptors[BlockRangeStartIndex + RangeOffset];
+ const std::pair<uint64_t, uint64_t>& OffsetAndLength = OffsetAndLengths[RangeOffset];
+ IoBuffer BlockRangeBuffer(BlockPayload, OffsetAndLength.first, OffsetAndLength.second);
- Info.AttachmentBlockRangeBytesDownloaded.fetch_add(BlockPartSize);
+ std::vector<IoBuffer> WriteAttachmentBuffers;
+ std::vector<IoHash> WriteRawHashes;
- AttachmentsWriteLatch.AddCount(1);
- WorkerPool.ScheduleWork(
- [&AttachmentsDownloadLatch,
- &AttachmentsWriteLatch,
- &ChunkStore,
- &RemoteStore,
- &NetworkWorkerPool,
- &WorkerPool,
- &RemoteResult,
- &Info,
- &LoadAttachmentsTimer,
- &DownloadStartMS,
- IgnoreMissingAttachments,
- OptionalContext,
- RetriesLeft,
- BlockDescription,
- BlockRange,
- &AllNeededPartialChunkHashesLookup,
- ChunkDownloadedFlags,
- BlockPayload = std::move(BlockResult.Bytes)]() {
- auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); });
- if (RemoteResult.IsError())
- {
- return;
- }
- try
- {
- ZEN_ASSERT(BlockPayload.Size() > 0);
- std::vector<IoBuffer> WriteAttachmentBuffers;
- std::vector<IoHash> WriteRawHashes;
+ uint64_t PotentialSize = 0;
+ uint64_t UsedSize = 0;
+ uint64_t BlockPartSize = BlockRangeBuffer.GetSize();
- uint64_t PotentialSize = 0;
- uint64_t UsedSize = 0;
- uint64_t BlockPartSize = BlockPayload.GetSize();
+ uint32_t OffsetInBlock = 0;
+ for (uint32_t ChunkBlockIndex = BlockRange.ChunkBlockIndexStart;
+ ChunkBlockIndex < BlockRange.ChunkBlockIndexStart + BlockRange.ChunkBlockIndexCount;
+ ChunkBlockIndex++)
+ {
+ if (RemoteResult.IsError())
+ {
+ break;
+ }
- uint32_t OffsetInBlock = 0;
- for (uint32_t ChunkBlockIndex = BlockRange.ChunkBlockIndexStart;
- ChunkBlockIndex < BlockRange.ChunkBlockIndexStart + BlockRange.ChunkBlockIndexCount;
- ChunkBlockIndex++)
- {
- const uint32_t ChunkCompressedSize = BlockDescription.ChunkCompressedLengths[ChunkBlockIndex];
- const IoHash& ChunkHash = BlockDescription.ChunkRawHashes[ChunkBlockIndex];
+ const uint32_t ChunkCompressedSize =
+ BlockDescription.ChunkCompressedLengths[ChunkBlockIndex];
+ const IoHash& ChunkHash = BlockDescription.ChunkRawHashes[ChunkBlockIndex];
- if (auto ChunkIndexIt = AllNeededPartialChunkHashesLookup.find(ChunkHash);
- ChunkIndexIt != AllNeededPartialChunkHashesLookup.end())
- {
- bool Expected = false;
- if (ChunkDownloadedFlags[ChunkIndexIt->second].compare_exchange_strong(Expected, true))
- {
- IoHash VerifyChunkHash;
- uint64_t VerifyChunkSize;
- CompressedBuffer CompressedChunk = CompressedBuffer::FromCompressed(
- SharedBuffer(IoBuffer(BlockPayload, OffsetInBlock, ChunkCompressedSize)),
- VerifyChunkHash,
- VerifyChunkSize);
- if (!CompressedChunk)
+ if (auto ChunkIndexIt = AllNeededPartialChunkHashesLookup.find(ChunkHash);
+ ChunkIndexIt != AllNeededPartialChunkHashesLookup.end())
{
- std::string ErrorString = fmt::format(
- "Chunk at {},{} in block attachment '{}' is not a valid compressed buffer",
- OffsetInBlock,
- ChunkCompressedSize,
- BlockDescription.BlockHash);
- ReportMessage(OptionalContext, ErrorString);
- Info.MissingAttachmentCount.fetch_add(1);
- if (!IgnoreMissingAttachments)
+ if (!ChunkDownloadedFlags[ChunkIndexIt->second])
{
- RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::NotFound),
- "Malformed chunk block",
- ErrorString);
+ IoHash VerifyChunkHash;
+ uint64_t VerifyChunkSize;
+ CompressedBuffer CompressedChunk = CompressedBuffer::FromCompressed(
+ SharedBuffer(IoBuffer(BlockRangeBuffer, OffsetInBlock, ChunkCompressedSize)),
+ VerifyChunkHash,
+ VerifyChunkSize);
+
+ std::string ErrorString;
+
+ if (!CompressedChunk)
+ {
+ ErrorString = fmt::format(
+ "Chunk at {},{} in block attachment '{}' is not a valid compressed buffer",
+ OffsetInBlock,
+ ChunkCompressedSize,
+ BlockDescription.BlockHash);
+ }
+ else if (VerifyChunkHash != ChunkHash)
+ {
+ ErrorString = fmt::format(
+ "Chunk at {},{} in block attachment '{}' has mismatching hash, expected "
+ "{}, got {}",
+ OffsetInBlock,
+ ChunkCompressedSize,
+ BlockDescription.BlockHash,
+ ChunkHash,
+ VerifyChunkHash);
+ }
+ else if (VerifyChunkSize != BlockDescription.ChunkRawLengths[ChunkBlockIndex])
+ {
+ ErrorString = fmt::format(
+ "Chunk at {},{} in block attachment '{}' has mismatching raw size, "
+ "expected {}, "
+ "got {}",
+ OffsetInBlock,
+ ChunkCompressedSize,
+ BlockDescription.BlockHash,
+ BlockDescription.ChunkRawLengths[ChunkBlockIndex],
+ VerifyChunkSize);
+ }
+
+ if (!ErrorString.empty())
+ {
+ if (RetriesLeft > 0)
+ {
+ ReportMessage(OptionalContext,
+ fmt::format("{}, retrying download", ErrorString));
+ return DownloadAndSavePartialBlock(ChunkStore,
+ RemoteStore,
+ IgnoreMissingAttachments,
+ OptionalContext,
+ NetworkWorkerPool,
+ WorkerPool,
+ AttachmentsDownloadLatch,
+ AttachmentsWriteLatch,
+ RemoteResult,
+ Info,
+ LoadAttachmentsTimer,
+ DownloadStartMS,
+ BlockDescription,
+ BlockExistsInCache,
+ BlockRangeDescriptors,
+ BlockRangeStartIndex,
+ RangeCount,
+ AllNeededPartialChunkHashesLookup,
+ ChunkDownloadedFlags,
+ RetriesLeft - 1);
+ }
+
+ ReportMessage(OptionalContext, ErrorString);
+ Info.MissingAttachmentCount.fetch_add(1);
+ if (!IgnoreMissingAttachments)
+ {
+ RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::NotFound),
+ "Malformed chunk block",
+ ErrorString);
+ }
+ }
+ else
+ {
+ bool Expected = false;
+ if (ChunkDownloadedFlags[ChunkIndexIt->second].compare_exchange_strong(Expected,
+ true))
+ {
+ WriteAttachmentBuffers.emplace_back(
+ CompressedChunk.GetCompressed().Flatten().AsIoBuffer());
+ WriteRawHashes.emplace_back(ChunkHash);
+ PotentialSize += WriteAttachmentBuffers.back().GetSize();
+ }
+ }
}
- continue;
}
- if (VerifyChunkHash != ChunkHash)
+ OffsetInBlock += ChunkCompressedSize;
+ }
+
+ if (!WriteAttachmentBuffers.empty())
+ {
+ std::vector<CidStore::InsertResult> Results =
+ ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes);
+ for (size_t Index = 0; Index < Results.size(); Index++)
{
- std::string ErrorString = fmt::format(
- "Chunk at {},{} in block attachment '{}' has mismatching hash, expected {}, got {}",
- OffsetInBlock,
- ChunkCompressedSize,
- BlockDescription.BlockHash,
- ChunkHash,
- VerifyChunkHash);
- ReportMessage(OptionalContext, ErrorString);
- Info.MissingAttachmentCount.fetch_add(1);
- if (!IgnoreMissingAttachments)
+ const CidStore::InsertResult& Result = Results[Index];
+ if (Result.New)
{
- RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::NotFound),
- "Malformed chunk block",
- ErrorString);
+ Info.AttachmentBytesStored.fetch_add(WriteAttachmentBuffers[Index].GetSize());
+ Info.AttachmentsStored.fetch_add(1);
+ UsedSize += WriteAttachmentBuffers[Index].GetSize();
}
- continue;
}
- if (VerifyChunkSize != BlockDescription.ChunkRawLengths[ChunkBlockIndex])
+ if (UsedSize < BlockPartSize)
{
- std::string ErrorString = fmt::format(
- "Chunk at {},{} in block attachment '{}' has mismatching raw size, expected {}, "
- "got {}",
- OffsetInBlock,
- ChunkCompressedSize,
+ ZEN_DEBUG(
+ "Used {} (skipping {}) out of {} for block {} range {}, {} ({} %) (use of matching "
+ "{}%)",
+ NiceBytes(UsedSize),
+ NiceBytes(BlockPartSize - UsedSize),
+ NiceBytes(BlockPartSize),
BlockDescription.BlockHash,
- BlockDescription.ChunkRawLengths[ChunkBlockIndex],
- VerifyChunkSize);
- ReportMessage(OptionalContext, ErrorString);
- Info.MissingAttachmentCount.fetch_add(1);
- if (!IgnoreMissingAttachments)
- {
- RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::NotFound),
- "Malformed chunk block",
- ErrorString);
- }
- continue;
+ BlockRange.RangeStart,
+ BlockRange.RangeLength,
+ (100 * UsedSize) / BlockPartSize,
+ PotentialSize > 0 ? (UsedSize * 100) / PotentialSize : 0);
}
-
- WriteAttachmentBuffers.emplace_back(CompressedChunk.GetCompressed().Flatten().AsIoBuffer());
- WriteRawHashes.emplace_back(ChunkHash);
- PotentialSize += WriteAttachmentBuffers.back().GetSize();
}
}
- OffsetInBlock += ChunkCompressedSize;
}
-
- if (!WriteAttachmentBuffers.empty())
+ catch (const std::exception& Ex)
{
- auto Results = ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes);
- for (size_t Index = 0; Index < Results.size(); Index++)
- {
- const auto& Result = Results[Index];
- if (Result.New)
- {
- Info.AttachmentBytesStored.fetch_add(WriteAttachmentBuffers[Index].GetSize());
- Info.AttachmentsStored.fetch_add(1);
- UsedSize += WriteAttachmentBuffers[Index].GetSize();
- }
- }
- ZEN_DEBUG("Used {} (matching {}) out of {} for block {} range {}, {} ({} %) (use of matching {}%)",
- NiceBytes(UsedSize),
- NiceBytes(PotentialSize),
- NiceBytes(BlockPartSize),
- BlockDescription.BlockHash,
- BlockRange.RangeStart,
- BlockRange.RangeLength,
- (100 * UsedSize) / BlockPartSize,
- PotentialSize > 0 ? (UsedSize * 100) / PotentialSize : 0);
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
+ fmt::format("Failed saving {} ranges from block attachment {}",
+ OffsetAndLengths.size(),
+ BlockDescription.BlockHash),
+ Ex.what());
}
- }
- catch (const std::exception& Ex)
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
- fmt::format("Failed save block attachment {} range {}, {}",
- BlockDescription.BlockHash,
- BlockRange.RangeStart,
- BlockRange.RangeLength),
- Ex.what());
- }
- },
- WorkerThreadPool::EMode::EnableBacklog);
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
+ });
+ if (Success)
+ {
+ ZEN_DEBUG("Loaded {} ranges from block attachment '{}' in {} ({})",
+ BlockRangeCount,
+ BlockDescription.BlockHash,
+ NiceTimeSpanMs(static_cast<uint64_t>(DownloadElapsedSeconds * 1000)),
+ NiceBytes(DownloadedBytes));
}
-
- ZEN_DEBUG("Loaded {} ranges from block attachment '{}' in {} ({})",
- BlockRangeCount,
- BlockDescription.BlockHash,
- NiceTimeSpanMs(static_cast<uint64_t>(DownloadElapsedSeconds * 1000)),
- NiceBytes(DownloadedBytes));
}
catch (const std::exception& Ex)
{
@@ -1002,7 +1096,7 @@ namespace remotestore_impl {
{
uint64_t Unset = (std::uint64_t)-1;
DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs());
- RemoteProjectStore::LoadAttachmentResult AttachmentResult = RemoteStore.LoadAttachment(RawHash, {});
+ RemoteProjectStore::LoadAttachmentResult AttachmentResult = RemoteStore.LoadAttachment(RawHash);
if (AttachmentResult.ErrorCode)
{
ReportMessage(OptionalContext,
@@ -3115,6 +3209,12 @@ ParseOplogContainer(
std::unordered_set<IoHash, IoHash::Hasher> NeededAttachments;
{
CbArrayView OpsArray = OutOplogSection["ops"sv].AsArrayView();
+
+ size_t OpCount = OpsArray.Num();
+ size_t OpsCompleteCount = 0;
+
+ remotestore_impl::ReportMessage(OptionalContext, fmt::format("Scanning {} ops for attachments", OpCount));
+
for (CbFieldView OpEntry : OpsArray)
{
OpEntry.IterateAttachments([&](CbFieldView FieldView) { NeededAttachments.insert(FieldView.AsAttachment()); });
@@ -3124,6 +3224,16 @@ ParseOplogContainer(
.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0,
.Reason = "Operation cancelled"};
}
+ OpsCompleteCount++;
+ if ((OpsCompleteCount & 4095) == 0)
+ {
+ remotestore_impl::ReportProgress(
+ OptionalContext,
+ "Scanning oplog"sv,
+ fmt::format("{} attachments found, {} ops remaining...", NeededAttachments.size(), OpCount - OpsCompleteCount),
+ OpCount,
+ OpCount - OpsCompleteCount);
+ }
}
}
{
@@ -3151,13 +3261,27 @@ ParseOplogContainer(
{
ChunkedInfo Chunked = ReadChunkedInfo(ChunkedFileView);
+ size_t NeededChunkAttachmentCount = 0;
+
OnReferencedAttachments(Chunked.ChunkHashes);
- NeededAttachments.insert(Chunked.ChunkHashes.begin(), Chunked.ChunkHashes.end());
+ for (const IoHash& ChunkHash : Chunked.ChunkHashes)
+ {
+ if (!HasAttachment(ChunkHash))
+ {
+ if (NeededAttachments.insert(ChunkHash).second)
+ {
+ NeededChunkAttachmentCount++;
+ }
+ }
+ }
OnChunkedAttachment(Chunked);
- ZEN_INFO("Requesting chunked attachment '{}' ({}) built from {} chunks",
- Chunked.RawHash,
- NiceBytes(Chunked.RawSize),
- Chunked.ChunkHashes.size());
+
+ remotestore_impl::ReportMessage(OptionalContext,
+ fmt::format("Requesting chunked attachment '{}' ({}) built from {} chunks, need {} chunks",
+ Chunked.RawHash,
+ NiceBytes(Chunked.RawSize),
+ Chunked.ChunkHashes.size(),
+ NeededChunkAttachmentCount));
}
}
if (remotestore_impl::IsCancelled(OptionalContext))
@@ -3490,8 +3614,16 @@ LoadOplog(CidStore& ChunkStore,
std::vector<bool> DownloadedViaLegacyChunkFlag(AllNeededChunkHashes.size(), false);
ChunkBlockAnalyser::BlockResult PartialBlocksResult;
+ remotestore_impl::ReportMessage(OptionalContext, fmt::format("Fetching descriptions for {} blocks", BlockHashes.size()));
+
RemoteProjectStore::GetBlockDescriptionsResult BlockDescriptions = RemoteStore.GetBlockDescriptions(BlockHashes);
- std::vector<IoHash> BlocksWithDescription;
+
+ remotestore_impl::ReportMessage(OptionalContext,
+ fmt::format("GetBlockDescriptions took {}. Found {} blocks",
+ NiceTimeSpanMs(uint64_t(BlockDescriptions.ElapsedSeconds * 1000)),
+ BlockDescriptions.Blocks.size()));
+
+ std::vector<IoHash> BlocksWithDescription;
BlocksWithDescription.reserve(BlockDescriptions.Blocks.size());
for (const ChunkBlockDescription& BlockDescription : BlockDescriptions.Blocks)
{
@@ -3547,6 +3679,7 @@ LoadOplog(CidStore& ChunkStore,
if (!AllNeededChunkHashes.empty())
{
std::vector<ChunkBlockAnalyser::EPartialBlockDownloadMode> PartialBlockDownloadModes;
+ std::vector<bool> BlockExistsInCache;
if (PartialBlockRequestMode == EPartialBlockRequestMode::Off)
{
@@ -3558,21 +3691,25 @@ LoadOplog(CidStore& ChunkStore,
RemoteStore.AttachmentExistsInCache(BlocksWithDescription);
if (CacheExistsResult.ErrorCode != 0 || CacheExistsResult.HasBody.size() != BlocksWithDescription.size())
{
- CacheExistsResult.HasBody.resize(BlocksWithDescription.size(), false);
+ BlockExistsInCache.resize(BlocksWithDescription.size(), false);
+ }
+ else
+ {
+ BlockExistsInCache = std::move(CacheExistsResult.HasBody);
}
PartialBlockDownloadModes.reserve(BlocksWithDescription.size());
- for (bool ExistsInCache : CacheExistsResult.HasBody)
+ for (bool ExistsInCache : BlockExistsInCache)
{
if (PartialBlockRequestMode == EPartialBlockRequestMode::All)
{
- PartialBlockDownloadModes.push_back(ExistsInCache ? ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRangeHighSpeed
+ PartialBlockDownloadModes.push_back(ExistsInCache ? ChunkBlockAnalyser::EPartialBlockDownloadMode::Exact
: ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRange);
}
else if (PartialBlockRequestMode == EPartialBlockRequestMode::ZenCacheOnly)
{
- PartialBlockDownloadModes.push_back(ExistsInCache ? ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRangeHighSpeed
+ PartialBlockDownloadModes.push_back(ExistsInCache ? ChunkBlockAnalyser::EPartialBlockDownloadMode::Exact
: ChunkBlockAnalyser::EPartialBlockDownloadMode::Off);
}
else if (PartialBlockRequestMode == EPartialBlockRequestMode::Mixed)
@@ -3584,13 +3721,14 @@ LoadOplog(CidStore& ChunkStore,
}
ZEN_ASSERT(PartialBlockDownloadModes.size() == BlocksWithDescription.size());
-
- ChunkBlockAnalyser PartialAnalyser(*LogOutput,
- BlockDescriptions.Blocks,
- ChunkBlockAnalyser::Options{.IsQuiet = false,
- .IsVerbose = false,
- .HostLatencySec = HostLatencySec,
- .HostHighSpeedLatencySec = CacheLatencySec});
+ ChunkBlockAnalyser PartialAnalyser(
+ *LogOutput,
+ BlockDescriptions.Blocks,
+ ChunkBlockAnalyser::Options{.IsQuiet = false,
+ .IsVerbose = false,
+ .HostLatencySec = HostLatencySec,
+ .HostHighSpeedLatencySec = CacheLatencySec,
+ .HostMaxRangeCountPerRequest = RemoteProjectStore::MaxRangeCountPerRequest});
std::vector<ChunkBlockAnalyser::NeededBlock> NeededBlocks =
PartialAnalyser.GetNeeded(AllNeededPartialChunkHashesLookup,
@@ -3641,12 +3779,13 @@ LoadOplog(CidStore& ChunkStore,
LoadAttachmentsTimer,
DownloadStartMS,
BlockDescriptions.Blocks[CurrentBlockRange.BlockIndex],
+ BlockExistsInCache[CurrentBlockRange.BlockIndex],
PartialBlocksResult.BlockRanges,
BlockRangeIndex,
RangeCount,
AllNeededPartialChunkHashesLookup,
ChunkDownloadedFlags,
- 3);
+ /* RetriesLeft*/ 3);
BlockRangeIndex += RangeCount;
}
@@ -3668,12 +3807,23 @@ LoadOplog(CidStore& ChunkStore,
{
PartialTransferWallTimeMS += LoadAttachmentsTimer.GetElapsedTimeMs() - DownloadStartMS.load();
}
- remotestore_impl::ReportProgress(
- OptionalContext,
- "Loading attachments"sv,
- fmt::format("{} remaining. {}", Remaining, remotestore_impl::GetStats(RemoteStore.GetStats(), PartialTransferWallTimeMS)),
- AttachmentCount.load(),
- Remaining);
+
+ uint64_t AttachmentsDownloaded =
+ Info.AttachmentBlocksDownloaded.load() + Info.AttachmentBlocksRangesDownloaded.load() + Info.AttachmentsDownloaded.load();
+ uint64_t AttachmentBytesDownloaded = Info.AttachmentBlockBytesDownloaded.load() + Info.AttachmentBlockRangeBytesDownloaded.load() +
+ Info.AttachmentBytesDownloaded.load();
+
+ remotestore_impl::ReportProgress(OptionalContext,
+ "Loading attachments"sv,
+ fmt::format("{} ({}) downloaded, {} ({}) stored, {} remaining. {}",
+ AttachmentsDownloaded,
+ NiceBytes(AttachmentBytesDownloaded),
+ Info.AttachmentsStored.load(),
+ NiceBytes(Info.AttachmentBytesStored.load()),
+ Remaining,
+ remotestore_impl::GetStats(RemoteStore.GetStats(), PartialTransferWallTimeMS)),
+ AttachmentCount.load(),
+ Remaining);
}
if (DownloadStartMS != (uint64_t)-1)
{
@@ -3700,11 +3850,12 @@ LoadOplog(CidStore& ChunkStore,
RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
}
}
- remotestore_impl::ReportProgress(OptionalContext,
- "Writing attachments"sv,
- fmt::format("{} remaining.", Remaining),
- AttachmentCount.load(),
- Remaining);
+ remotestore_impl::ReportProgress(
+ OptionalContext,
+ "Writing attachments"sv,
+ fmt::format("{} ({}), {} remaining.", Info.AttachmentsStored.load(), NiceBytes(Info.AttachmentBytesStored.load()), Remaining),
+ AttachmentCount.load(),
+ Remaining);
}
if (AttachmentCount.load() > 0)
@@ -3867,18 +4018,20 @@ LoadOplog(CidStore& ChunkStore,
TmpFile.Close();
TmpBuffer = IoBufferBuilder::MakeFromTemporaryFile(TempFileName);
}
+ uint64_t TmpBufferSize = TmpBuffer.GetSize();
CidStore::InsertResult InsertResult =
ChunkStore.AddChunk(TmpBuffer, Chunked.RawHash, CidStore::InsertMode::kMayBeMovedInPlace);
if (InsertResult.New)
{
- Info.AttachmentBytesStored.fetch_add(TmpBuffer.GetSize());
+ Info.AttachmentBytesStored.fetch_add(TmpBufferSize);
Info.AttachmentsStored.fetch_add(1);
}
- ZEN_INFO("Dechunked attachment {} ({}) in {}",
- Chunked.RawHash,
- NiceBytes(Chunked.RawSize),
- NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ remotestore_impl::ReportMessage(OptionalContext,
+ fmt::format("Dechunked attachment {} ({}) in {}",
+ Chunked.RawHash,
+ NiceBytes(Chunked.RawSize),
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs())));
}
catch (const std::exception& Ex)
{
diff --git a/src/zenremotestore/projectstore/zenremoteprojectstore.cpp b/src/zenremotestore/projectstore/zenremoteprojectstore.cpp
index b4c1156ac..ef82c45e0 100644
--- a/src/zenremotestore/projectstore/zenremoteprojectstore.cpp
+++ b/src/zenremotestore/projectstore/zenremoteprojectstore.cpp
@@ -157,55 +157,59 @@ public:
return Result;
}
- virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes) override
+ virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes, ESourceMode SourceMode) override
{
- std::string LoadRequest = fmt::format("/{}/oplog/{}/rpc"sv, m_Project, m_Oplog);
-
- CbObject Request;
+ LoadAttachmentsResult Result;
+ if (SourceMode != ESourceMode::kCacheOnly)
{
- CbObjectWriter RequestWriter;
- RequestWriter.AddString("method"sv, "getchunks"sv);
- RequestWriter.BeginObject("Request"sv);
+ std::string LoadRequest = fmt::format("/{}/oplog/{}/rpc"sv, m_Project, m_Oplog);
+
+ CbObject Request;
{
- RequestWriter.BeginArray("Chunks"sv);
+ CbObjectWriter RequestWriter;
+ RequestWriter.AddString("method"sv, "getchunks"sv);
+ RequestWriter.BeginObject("Request"sv);
{
- for (const IoHash& RawHash : RawHashes)
+ RequestWriter.BeginArray("Chunks"sv);
{
- RequestWriter.BeginObject();
+ for (const IoHash& RawHash : RawHashes)
{
- RequestWriter.AddHash("RawHash", RawHash);
+ RequestWriter.BeginObject();
+ {
+ RequestWriter.AddHash("RawHash", RawHash);
+ }
+ RequestWriter.EndObject();
}
- RequestWriter.EndObject();
}
+ RequestWriter.EndArray(); // "chunks"
}
- RequestWriter.EndArray(); // "chunks"
+ RequestWriter.EndObject();
+ Request = RequestWriter.Save();
}
- RequestWriter.EndObject();
- Request = RequestWriter.Save();
- }
- HttpClient::Response Response = m_Client.Post(LoadRequest, Request, HttpClient::Accept(ZenContentType::kCbPackage));
- AddStats(Response);
+ HttpClient::Response Response = m_Client.Post(LoadRequest, Request, HttpClient::Accept(ZenContentType::kCbPackage));
+ AddStats(Response);
- LoadAttachmentsResult Result = LoadAttachmentsResult{ConvertResult(Response)};
- if (Result.ErrorCode)
- {
- Result.Reason = fmt::format("Failed fetching {} oplog attachments from {}/{}/{}. Reason: '{}'",
- RawHashes.size(),
- m_ProjectStoreUrl,
- m_Project,
- m_Oplog,
- Result.Reason);
- }
- else
- {
- CbPackage Package = Response.AsPackage();
- std::span<const CbAttachment> Attachments = Package.GetAttachments();
- Result.Chunks.reserve(Attachments.size());
- for (const CbAttachment& Attachment : Attachments)
+ Result = LoadAttachmentsResult{ConvertResult(Response)};
+ if (Result.ErrorCode)
{
- Result.Chunks.emplace_back(
- std::pair<IoHash, CompressedBuffer>{Attachment.GetHash(), Attachment.AsCompressedBinary().MakeOwned()});
+ Result.Reason = fmt::format("Failed fetching {} oplog attachments from {}/{}/{}. Reason: '{}'",
+ RawHashes.size(),
+ m_ProjectStoreUrl,
+ m_Project,
+ m_Oplog,
+ Result.Reason);
+ }
+ else
+ {
+ CbPackage Package = Response.AsPackage();
+ std::span<const CbAttachment> Attachments = Package.GetAttachments();
+ Result.Chunks.reserve(Attachments.size());
+ for (const CbAttachment& Attachment : Attachments)
+ {
+ Result.Chunks.emplace_back(
+ std::pair<IoHash, CompressedBuffer>{Attachment.GetHash(), Attachment.AsCompressedBinary().MakeOwned()});
+ }
}
}
return Result;
@@ -260,32 +264,59 @@ public:
return AttachmentExistsInCacheResult{Result{.ErrorCode = 0}, std::vector<bool>(RawHashes.size(), false)};
}
- virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash, const AttachmentRange& Range) override
+ virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash, ESourceMode SourceMode) override
{
- std::string LoadRequest = fmt::format("/{}/oplog/{}/{}"sv, m_Project, m_Oplog, RawHash);
- HttpClient::Response Response =
- m_Client.Download(LoadRequest, m_TempFilePath, HttpClient::Accept(ZenContentType::kCompressedBinary));
- AddStats(Response);
-
- LoadAttachmentResult Result = LoadAttachmentResult{ConvertResult(Response)};
- if (Result.ErrorCode)
+ LoadAttachmentResult Result;
+ if (SourceMode != ESourceMode::kCacheOnly)
{
- Result.Reason = fmt::format("Failed fetching oplog attachment from {}/{}/{}/{}. Reason: '{}'",
- m_ProjectStoreUrl,
- m_Project,
- m_Oplog,
- RawHash,
- Result.Reason);
- }
- if (!Result.ErrorCode && Range)
- {
- Result.Bytes = IoBuffer(Response.ResponsePayload, Range.Offset, Range.Bytes);
+ std::string LoadRequest = fmt::format("/{}/oplog/{}/{}"sv, m_Project, m_Oplog, RawHash);
+ HttpClient::Response Response =
+ m_Client.Download(LoadRequest, m_TempFilePath, HttpClient::Accept(ZenContentType::kCompressedBinary));
+ AddStats(Response);
+
+ Result = LoadAttachmentResult{ConvertResult(Response)};
+ if (Result.ErrorCode)
+ {
+ Result.Reason = fmt::format("Failed fetching oplog attachment from {}/{}/{}/{}. Reason: '{}'",
+ m_ProjectStoreUrl,
+ m_Project,
+ m_Oplog,
+ RawHash,
+ Result.Reason);
+ }
+ Result.Bytes = Response.ResponsePayload;
+ Result.Bytes.MakeOwned();
}
- else
+ return Result;
+ }
+
+ virtual LoadAttachmentRangesResult LoadAttachmentRanges(const IoHash& RawHash,
+ std::span<const std::pair<uint64_t, uint64_t>> Ranges,
+ ESourceMode SourceMode) override
+ {
+ LoadAttachmentRangesResult Result;
+ if (SourceMode != ESourceMode::kCacheOnly)
{
- Result.Bytes = Response.ResponsePayload;
+ std::string LoadRequest = fmt::format("/{}/oplog/{}/{}"sv, m_Project, m_Oplog, RawHash);
+ HttpClient::Response Response =
+ m_Client.Download(LoadRequest, m_TempFilePath, HttpClient::Accept(ZenContentType::kCompressedBinary));
+ AddStats(Response);
+
+ Result = LoadAttachmentRangesResult{ConvertResult(Response)};
+ if (Result.ErrorCode)
+ {
+ Result.Reason = fmt::format("Failed fetching oplog attachment from {}/{}/{}/{}. Reason: '{}'",
+ m_ProjectStoreUrl,
+ m_Project,
+ m_Oplog,
+ RawHash,
+ Result.Reason);
+ }
+ else
+ {
+ Result.Ranges = std::vector<std::pair<uint64_t, uint64_t>>(Ranges.begin(), Ranges.end());
+ }
}
- Result.Bytes.MakeOwned();
return Result;
}
diff --git a/src/zenserver/storage/buildstore/httpbuildstore.cpp b/src/zenserver/storage/buildstore/httpbuildstore.cpp
index 6ada085a5..459e044eb 100644
--- a/src/zenserver/storage/buildstore/httpbuildstore.cpp
+++ b/src/zenserver/storage/buildstore/httpbuildstore.cpp
@@ -233,16 +233,21 @@ HttpBuildStoreService::GetBlobRequest(HttpRouterRequest& Req)
{
const uint64_t MaxBlobSize = Range.first < BlobSize ? BlobSize - Range.first : 0;
const uint64_t RangeSize = Min(Range.second, MaxBlobSize);
- if (Range.first + RangeSize <= BlobSize)
+ Writer.BeginObject();
{
- RangeBuffers.push_back(IoBuffer(Blob, Range.first, RangeSize));
- Writer.BeginObject();
+ if (Range.first + RangeSize <= BlobSize)
{
+ RangeBuffers.push_back(IoBuffer(Blob, Range.first, RangeSize));
Writer.AddInteger("offset"sv, Range.first);
Writer.AddInteger("length"sv, RangeSize);
}
- Writer.EndObject();
+ else
+ {
+ Writer.AddInteger("offset"sv, Range.first);
+ Writer.AddInteger("length"sv, 0);
+ }
}
+ Writer.EndObject();
}
Writer.EndArray();
@@ -262,7 +267,16 @@ HttpBuildStoreService::GetBlobRequest(HttpRouterRequest& Req)
}
else
{
- ZEN_ASSERT(OffsetAndLengthPairs.size() == 1);
+ if (OffsetAndLengthPairs.size() != 1)
+ {
+ // Only a single http range is supported, we have limited support for http multirange responses
+ m_BuildStoreStats.BadRequestCount++;
+ return ServerRequest.WriteResponse(
+ HttpResponseCode::BadRequest,
+ HttpContentType::kText,
+ fmt::format("Multiple ranges in blob request is only supported for {} accept type", ToString(HttpContentType::kCbPackage)));
+ }
+
const std::pair<uint64_t, uint64_t>& OffsetAndLength = OffsetAndLengthPairs.front();
const uint64_t BlobSize = Blob.GetSize();
const uint64_t MaxBlobSize = OffsetAndLength.first < BlobSize ? BlobSize - OffsetAndLength.first : 0;