diff options
| author | Dan Engelbrecht <[email protected]> | 2026-03-03 20:49:01 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2026-03-03 20:49:01 +0100 |
| commit | 463a0fde16b827c0ec44c9e88fe3c8c8098aa5ea (patch) | |
| tree | 736553b3ded853fe945bdeea7585631617d171c3 /src | |
| parent | fix objectstore uri path parsing (#801) (diff) | |
| download | zen-463a0fde16b827c0ec44c9e88fe3c8c8098aa5ea.tar.xz zen-463a0fde16b827c0ec44c9e88fe3c8c8098aa5ea.zip | |
use multi range requests (#800)
- Improvement: `zen builds download` now uses multi-range requests for blocks to reduce download size
- Improvement: `zen oplog-import` now uses partial block with multi-range requests for blocks to reduce download size
- Improvement: Improved feedback in log/console during `zen oplog-import`
- Improvement: `--allow-partial-block-requests` now defaults to `true` for `zen builds download` and `zen oplog-import` (was `mixed`)
- Improvement: Improved range merging analysis when downloading partial blocks
Diffstat (limited to 'src')
27 files changed, 1582 insertions, 722 deletions
diff --git a/src/zen/cmds/builds_cmd.h b/src/zen/cmds/builds_cmd.h index f5c44ab55..5c80beed5 100644 --- a/src/zen/cmds/builds_cmd.h +++ b/src/zen/cmds/builds_cmd.h @@ -71,7 +71,7 @@ private: bool m_AppendNewContent = false; uint8_t m_BlockReuseMinPercentLimit = 85; bool m_AllowMultiparts = true; - std::string m_AllowPartialBlockRequests = "mixed"; + std::string m_AllowPartialBlockRequests = "true"; AuthCommandLineOptions m_AuthOptions; diff --git a/src/zen/cmds/projectstore_cmd.h b/src/zen/cmds/projectstore_cmd.h index 17fd76e9f..1ba98b39e 100644 --- a/src/zen/cmds/projectstore_cmd.h +++ b/src/zen/cmds/projectstore_cmd.h @@ -210,7 +210,7 @@ private: bool m_BoostWorkerMemory = false; bool m_BoostWorkers = false; - std::string m_AllowPartialBlockRequests = "mixed"; + std::string m_AllowPartialBlockRequests = "true"; }; class SnapshotOplogCommand : public ProjectStoreCommand diff --git a/src/zen/cmds/workspaces_cmd.cpp b/src/zen/cmds/workspaces_cmd.cpp index 2661ac9da..af265d898 100644 --- a/src/zen/cmds/workspaces_cmd.cpp +++ b/src/zen/cmds/workspaces_cmd.cpp @@ -815,7 +815,7 @@ WorkspaceShareCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** if (Results.size() != m_ChunkIds.size()) { throw std::runtime_error( - fmt::format("failed to get workspace share batch - invalid result count recevied (expected: {}, received: {}", + fmt::format("failed to get workspace share batch - invalid result count received (expected: {}, received: {}", m_ChunkIds.size(), Results.size())); } diff --git a/src/zen/progressbar.cpp b/src/zen/progressbar.cpp index 732f16e81..9467ed60d 100644 --- a/src/zen/progressbar.cpp +++ b/src/zen/progressbar.cpp @@ -207,8 +207,9 @@ ProgressBar::UpdateState(const State& NewState, bool DoLinebreak) size_t ProgressBarCount = (ProgressBarSize * PercentDone) / 100; uint64_t Completed = NewState.TotalCount - NewState.RemainingCount; uint64_t ETAElapsedMS = ElapsedTimeMS -= m_PausedMS; - uint64_t ETAMS = - (NewState.Status == State::EStatus::Running) && (PercentDone > 5) ? (ETAElapsedMS * NewState.RemainingCount) / Completed : 0; + uint64_t ETAMS = ((m_State.TotalCount == NewState.TotalCount) && (NewState.Status == State::EStatus::Running)) && (PercentDone > 5) + ? (ETAElapsedMS * NewState.RemainingCount) / Completed + : 0; uint32_t ConsoleColumns = TuiConsoleColumns(1024); diff --git a/src/zenhttp/clients/httpclientcommon.cpp b/src/zenhttp/clients/httpclientcommon.cpp index 248ae9d70..9ded23375 100644 --- a/src/zenhttp/clients/httpclientcommon.cpp +++ b/src/zenhttp/clients/httpclientcommon.cpp @@ -394,31 +394,28 @@ namespace detail { { // Yes, we do a substring of the non-lowercase value string as we want the exact boundary string std::string_view BoundaryName = std::string_view(ContentTypeHeaderValue).substr(BoundaryPos + 9); + size_t BoundaryEnd = std::string::npos; + while (!BoundaryName.empty() && BoundaryName[0] == ' ') + { + BoundaryName = BoundaryName.substr(1); + } if (!BoundaryName.empty()) { - size_t BoundaryEnd = std::string::npos; - while (BoundaryName[0] == ' ') - { - BoundaryName = BoundaryName.substr(1); - } - if (!BoundaryName.empty()) + if (BoundaryName.size() > 2 && BoundaryName.front() == '"' && BoundaryName.back() == '"') { - if (BoundaryName.size() > 2 && BoundaryName.front() == '"' && BoundaryName.back() == '"') + BoundaryEnd = BoundaryName.find('"', 1); + if (BoundaryEnd != std::string::npos) { - BoundaryEnd = BoundaryName.find('"', 1); - if (BoundaryEnd != std::string::npos) - { - BoundaryBeginMatcher.Init(fmt::format("\r\n--{}", BoundaryName.substr(1, BoundaryEnd - 1))); - return true; - } - } - else - { - BoundaryEnd = BoundaryName.find_first_of(" \r\n"); - BoundaryBeginMatcher.Init(fmt::format("\r\n--{}", BoundaryName.substr(0, BoundaryEnd))); + BoundaryBeginMatcher.Init(fmt::format("\r\n--{}", BoundaryName.substr(1, BoundaryEnd - 1))); return true; } } + else + { + BoundaryEnd = BoundaryName.find_first_of(" \r\n"); + BoundaryBeginMatcher.Init(fmt::format("\r\n--{}", BoundaryName.substr(0, BoundaryEnd))); + return true; + } } } } diff --git a/src/zenhttp/httpclient.cpp b/src/zenhttp/httpclient.cpp index f94c58581..281d512cf 100644 --- a/src/zenhttp/httpclient.cpp +++ b/src/zenhttp/httpclient.cpp @@ -107,17 +107,14 @@ HttpClientBase::GetAccessToken() std::vector<std::pair<uint64_t, uint64_t>> HttpClient::Response::GetRanges(std::span<const std::pair<uint64_t, uint64_t>> OffsetAndLengthPairs) const { - std::vector<std::pair<uint64_t, uint64_t>> Result; - Result.reserve(OffsetAndLengthPairs.size()); if (Ranges.empty()) { - for (const std::pair<uint64_t, uint64_t>& Range : OffsetAndLengthPairs) - { - Result.emplace_back(std::make_pair(Range.first, Range.second)); - } - return Result; + return {}; } + std::vector<std::pair<uint64_t, uint64_t>> Result; + Result.reserve(OffsetAndLengthPairs.size()); + auto BoundaryIt = Ranges.begin(); auto OffsetAndLengthPairIt = OffsetAndLengthPairs.begin(); while (OffsetAndLengthPairIt != OffsetAndLengthPairs.end()) diff --git a/src/zenhttp/include/zenhttp/httpclient.h b/src/zenhttp/include/zenhttp/httpclient.h index f00bbec63..53be36b9a 100644 --- a/src/zenhttp/include/zenhttp/httpclient.h +++ b/src/zenhttp/include/zenhttp/httpclient.h @@ -190,10 +190,12 @@ public: HttpContentType ContentType; }; - // Ranges will map out all recevied ranges, both single and multi-range responses + // Ranges will map out all received ranges, both single and multi-range responses // If no range was requested Ranges will be empty std::vector<MultipartBoundary> Ranges; + // Map the absolute OffsetAndLengthPairs into ResponsePayload from the ranges received (Ranges). + // If the response was not a partial response, an empty vector will be returned std::vector<std::pair<uint64_t, uint64_t>> GetRanges(std::span<const std::pair<uint64_t, uint64_t>> OffsetAndLengthPairs) const; // This contains any errors from the HTTP stack. It won't contain information on diff --git a/src/zenremotestore/builds/buildstoragecache.cpp b/src/zenremotestore/builds/buildstoragecache.cpp index faa85f81b..53d33bd7e 100644 --- a/src/zenremotestore/builds/buildstoragecache.cpp +++ b/src/zenremotestore/builds/buildstoragecache.cpp @@ -151,7 +151,7 @@ public: auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); HttpClient::Response CacheResponse = - m_HttpClient.Upload(fmt::format("/builds/{}/{}/{}/blobs/{}", m_Namespace, m_Bucket, BuildId, RawHash.ToHexString()), + m_HttpClient.Upload(fmt::format("/builds/{}/{}/{}/blobs/{}", m_Namespace, m_Bucket, BuildId, RawHash), Payload, ContentType); @@ -180,7 +180,7 @@ public: } CreateDirectories(m_TempFolderPath); HttpClient::Response CacheResponse = - m_HttpClient.Download(fmt::format("/builds/{}/{}/{}/blobs/{}", m_Namespace, m_Bucket, BuildId, RawHash.ToHexString()), + m_HttpClient.Download(fmt::format("/builds/{}/{}/{}/blobs/{}", m_Namespace, m_Bucket, BuildId, RawHash), m_TempFolderPath, Headers); AddStatistic(CacheResponse); @@ -191,6 +191,74 @@ public: return {}; } + virtual BuildBlobRanges GetBuildBlobRanges(const Oid& BuildId, + const IoHash& RawHash, + std::span<const std::pair<uint64_t, uint64_t>> Ranges) override + { + ZEN_TRACE_CPU("ZenBuildStorageCache::GetBuildBlobRanges"); + + Stopwatch ExecutionTimer; + auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); + + CbObjectWriter Writer; + Writer.BeginArray("ranges"sv); + { + for (const std::pair<uint64_t, uint64_t>& Range : Ranges) + { + Writer.BeginObject(); + { + Writer.AddInteger("offset"sv, Range.first); + Writer.AddInteger("length"sv, Range.second); + } + Writer.EndObject(); + } + } + Writer.EndArray(); // ranges + + CreateDirectories(m_TempFolderPath); + HttpClient::Response CacheResponse = + m_HttpClient.Post(fmt::format("/builds/{}/{}/{}/blobs/{}", m_Namespace, m_Bucket, BuildId, RawHash), + Writer.Save(), + HttpClient::Accept(ZenContentType::kCbPackage)); + AddStatistic(CacheResponse); + if (CacheResponse.IsSuccess()) + { + CbPackage ResponsePackage = ParsePackageMessage(CacheResponse.ResponsePayload); + CbObjectView ResponseObject = ResponsePackage.GetObject(); + + CbArrayView RangeArray = ResponseObject["ranges"sv].AsArrayView(); + + std::vector<std::pair<uint64_t, uint64_t>> ReceivedRanges; + ReceivedRanges.reserve(RangeArray.Num()); + + uint64_t OffsetInPayloadRanges = 0; + + for (CbFieldView View : RangeArray) + { + CbObjectView RangeView = View.AsObjectView(); + uint64_t Offset = RangeView["offset"sv].AsUInt64(); + uint64_t Length = RangeView["length"sv].AsUInt64(); + + const std::pair<uint64_t, uint64_t>& Range = Ranges[ReceivedRanges.size()]; + + if (Offset != Range.first || Length != Range.second) + { + return {}; + } + ReceivedRanges.push_back(std::make_pair(OffsetInPayloadRanges, Length)); + OffsetInPayloadRanges += Length; + } + + const CbAttachment* DataAttachment = ResponsePackage.FindAttachment(RawHash); + if (DataAttachment) + { + SharedBuffer PayloadRanges = DataAttachment->AsBinary(); + return BuildBlobRanges{.PayloadBuffer = PayloadRanges.AsIoBuffer(), .Ranges = std::move(ReceivedRanges)}; + } + } + return {}; + } + virtual void PutBlobMetadatas(const Oid& BuildId, std::span<const IoHash> BlobHashes, std::span<const CbObject> MetaDatas) override { ZEN_ASSERT(!IsFlushed); diff --git a/src/zenremotestore/builds/buildstorageoperations.cpp b/src/zenremotestore/builds/buildstorageoperations.cpp index 5deb00707..f4b4d592b 100644 --- a/src/zenremotestore/builds/buildstorageoperations.cpp +++ b/src/zenremotestore/builds/buildstorageoperations.cpp @@ -38,6 +38,7 @@ ZEN_THIRD_PARTY_INCLUDES_END #if ZEN_WITH_TESTS # include <zencore/testing.h> # include <zencore/testutils.h> +# include <zenhttp/httpclientauth.h> # include <zenremotestore/builds/filebuildstorage.h> #endif // ZEN_WITH_TESTS @@ -883,12 +884,14 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) BlobsExistsResult ExistsResult; { - ChunkBlockAnalyser BlockAnalyser(m_LogOutput, - m_BlockDescriptions, - ChunkBlockAnalyser::Options{.IsQuiet = m_Options.IsQuiet, - .IsVerbose = m_Options.IsVerbose, - .HostLatencySec = m_Storage.BuildStorageLatencySec, - .HostHighSpeedLatencySec = m_Storage.CacheLatencySec}); + ChunkBlockAnalyser BlockAnalyser( + m_LogOutput, + m_BlockDescriptions, + ChunkBlockAnalyser::Options{.IsQuiet = m_Options.IsQuiet, + .IsVerbose = m_Options.IsVerbose, + .HostLatencySec = m_Storage.BuildStorageLatencySec, + .HostHighSpeedLatencySec = m_Storage.CacheLatencySec, + .HostMaxRangeCountPerRequest = BuildStorageBase::MaxRangeCountPerRequest}); std::vector<ChunkBlockAnalyser::NeededBlock> NeededBlocks = BlockAnalyser.GetNeeded( m_RemoteLookup.ChunkHashToChunkIndex, @@ -1027,15 +1030,13 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) const bool BlockExistInCache = ExistsResult.ExistingBlobs.contains(m_BlockDescriptions[BlockIndex].BlockHash); if (m_Options.PartialBlockRequestMode == EPartialBlockRequestMode::All) { - BlockPartialDownloadModes.push_back(BlockExistInCache - ? ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRangeHighSpeed - : ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRange); + BlockPartialDownloadModes.push_back(BlockExistInCache ? ChunkBlockAnalyser::EPartialBlockDownloadMode::Exact + : ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRange); } else if (m_Options.PartialBlockRequestMode == EPartialBlockRequestMode::ZenCacheOnly) { - BlockPartialDownloadModes.push_back(BlockExistInCache - ? ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRangeHighSpeed - : ChunkBlockAnalyser::EPartialBlockDownloadMode::Off); + BlockPartialDownloadModes.push_back(BlockExistInCache ? ChunkBlockAnalyser::EPartialBlockDownloadMode::Exact + : ChunkBlockAnalyser::EPartialBlockDownloadMode::Off); } else if (m_Options.PartialBlockRequestMode == EPartialBlockRequestMode::Mixed) { @@ -1045,6 +1046,7 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) } } } + ZEN_ASSERT(BlockPartialDownloadModes.size() == m_BlockDescriptions.size()); ChunkBlockAnalyser::BlockResult PartialBlocks = @@ -1356,90 +1358,105 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) &Work, &PartialBlocks, BlockRangeStartIndex = BlockRangeIndex, - RangeCount](std::atomic<bool>&) { + RangeCount = RangeCount](std::atomic<bool>&) { if (!m_AbortFlag) { ZEN_TRACE_CPU("Async_GetPartialBlockRanges"); FilteredDownloadedBytesPerSecond.Start(); - for (size_t BlockRangeIndex = BlockRangeStartIndex; BlockRangeIndex < BlockRangeStartIndex + RangeCount; - BlockRangeIndex++) - { - ZEN_TRACE_CPU("GetPartialBlock"); - - const ChunkBlockAnalyser::BlockRangeDescriptor& BlockRange = PartialBlocks.BlockRanges[BlockRangeIndex]; - - DownloadPartialBlock( - BlockRange, - ExistsResult, - [this, - &RemoteChunkIndexNeedsCopyFromSourceFlags, - &SequenceIndexChunksLeftToWriteCounters, - &WritePartsComplete, - &WriteCache, - &Work, - TotalRequestCount, - TotalPartWriteCount, - &FilteredDownloadedBytesPerSecond, - &FilteredWrittenBytesPerSecond, - &BlockRange](IoBuffer&& InMemoryBuffer, const std::filesystem::path& OnDiskPath) { - if (m_DownloadStats.RequestsCompleteCount == TotalRequestCount) - { - FilteredDownloadedBytesPerSecond.Stop(); - } + DownloadPartialBlock( + PartialBlocks.BlockRanges, + BlockRangeStartIndex, + RangeCount, + ExistsResult, + [this, + &RemoteChunkIndexNeedsCopyFromSourceFlags, + &SequenceIndexChunksLeftToWriteCounters, + &WritePartsComplete, + &WriteCache, + &Work, + TotalRequestCount, + TotalPartWriteCount, + &FilteredDownloadedBytesPerSecond, + &FilteredWrittenBytesPerSecond, + &PartialBlocks](IoBuffer&& InMemoryBuffer, + const std::filesystem::path& OnDiskPath, + size_t BlockRangeStartIndex, + std::span<const std::pair<uint64_t, uint64_t>> OffsetAndLengths) { + if (m_DownloadStats.RequestsCompleteCount == TotalRequestCount) + { + FilteredDownloadedBytesPerSecond.Stop(); + } - if (!m_AbortFlag) - { - Work.ScheduleWork( - m_IOWorkerPool, - [this, - &RemoteChunkIndexNeedsCopyFromSourceFlags, - &SequenceIndexChunksLeftToWriteCounters, - &WritePartsComplete, - &WriteCache, - &Work, - TotalPartWriteCount, - &FilteredWrittenBytesPerSecond, - &BlockRange, - BlockChunkPath = std::filesystem::path(OnDiskPath), - BlockPartialBuffer = std::move(InMemoryBuffer)](std::atomic<bool>&) mutable { - if (!m_AbortFlag) - { - ZEN_TRACE_CPU("Async_WritePartialBlock"); + if (!m_AbortFlag) + { + Work.ScheduleWork( + m_IOWorkerPool, + [this, + &RemoteChunkIndexNeedsCopyFromSourceFlags, + &SequenceIndexChunksLeftToWriteCounters, + &WritePartsComplete, + &WriteCache, + &Work, + TotalPartWriteCount, + &FilteredWrittenBytesPerSecond, + &PartialBlocks, + BlockRangeStartIndex, + BlockChunkPath = std::filesystem::path(OnDiskPath), + BlockPartialBuffer = std::move(InMemoryBuffer), + OffsetAndLengths = std::vector<std::pair<uint64_t, uint64_t>>(OffsetAndLengths.begin(), + OffsetAndLengths.end())]( + std::atomic<bool>&) mutable { + if (!m_AbortFlag) + { + ZEN_TRACE_CPU("Async_WritePartialBlock"); - const uint32_t BlockIndex = BlockRange.BlockIndex; + const uint32_t BlockIndex = PartialBlocks.BlockRanges[BlockRangeStartIndex].BlockIndex; - const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex]; + const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex]; - if (BlockChunkPath.empty()) - { - ZEN_ASSERT(BlockPartialBuffer); - } - else + if (BlockChunkPath.empty()) + { + ZEN_ASSERT(BlockPartialBuffer); + } + else + { + ZEN_ASSERT(!BlockPartialBuffer); + BlockPartialBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath); + if (!BlockPartialBuffer) { - ZEN_ASSERT(!BlockPartialBuffer); - BlockPartialBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath); - if (!BlockPartialBuffer) - { - throw std::runtime_error( - fmt::format("Could not open downloaded block {} from {}", - BlockDescription.BlockHash, - BlockChunkPath)); - } + throw std::runtime_error( + fmt::format("Could not open downloaded block {} from {}", + BlockDescription.BlockHash, + BlockChunkPath)); } + } + + FilteredWrittenBytesPerSecond.Start(); - FilteredWrittenBytesPerSecond.Start(); - - if (!WritePartialBlockChunksToCache( - BlockDescription, - SequenceIndexChunksLeftToWriteCounters, - Work, - CompositeBuffer(std::move(BlockPartialBuffer)), - BlockRange.ChunkBlockIndexStart, - BlockRange.ChunkBlockIndexStart + BlockRange.ChunkBlockIndexCount - 1, - RemoteChunkIndexNeedsCopyFromSourceFlags, - WriteCache)) + size_t RangeCount = OffsetAndLengths.size(); + + for (size_t PartialRangeIndex = 0; PartialRangeIndex < RangeCount; PartialRangeIndex++) + { + const std::pair<uint64_t, uint64_t>& OffsetAndLength = + OffsetAndLengths[PartialRangeIndex]; + IoBuffer BlockRangeBuffer(BlockPartialBuffer, + OffsetAndLength.first, + OffsetAndLength.second); + + const ChunkBlockAnalyser::BlockRangeDescriptor& RangeDescriptor = + PartialBlocks.BlockRanges[BlockRangeStartIndex + PartialRangeIndex]; + + if (!WritePartialBlockChunksToCache(BlockDescription, + SequenceIndexChunksLeftToWriteCounters, + Work, + CompositeBuffer(std::move(BlockRangeBuffer)), + RangeDescriptor.ChunkBlockIndexStart, + RangeDescriptor.ChunkBlockIndexStart + + RangeDescriptor.ChunkBlockIndexCount - 1, + RemoteChunkIndexNeedsCopyFromSourceFlags, + WriteCache)) { std::error_code DummyEc; RemoveFile(BlockChunkPath, DummyEc); @@ -1447,28 +1464,27 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) fmt::format("Partial block {} is malformed", BlockDescription.BlockHash)); } - std::error_code Ec = TryRemoveFile(BlockChunkPath); - if (Ec) - { - ZEN_OPERATION_LOG_DEBUG(m_LogOutput, - "Failed removing file '{}', reason: ({}) {}", - BlockChunkPath, - Ec.value(), - Ec.message()); - } - WritePartsComplete++; if (WritePartsComplete == TotalPartWriteCount) { FilteredWrittenBytesPerSecond.Stop(); } } - }, - OnDiskPath.empty() ? WorkerThreadPool::EMode::DisableBacklog - : WorkerThreadPool::EMode::EnableBacklog); - } - }); - } + std::error_code Ec = TryRemoveFile(BlockChunkPath); + if (Ec) + { + ZEN_OPERATION_LOG_DEBUG(m_LogOutput, + "Failed removing file '{}', reason: ({}) {}", + BlockChunkPath, + Ec.value(), + Ec.message()); + } + } + }, + OnDiskPath.empty() ? WorkerThreadPool::EMode::DisableBacklog + : WorkerThreadPool::EMode::EnableBacklog); + } + }); } }); BlockRangeIndex += RangeCount; @@ -3161,45 +3177,40 @@ BuildsOperationUpdateFolder::DownloadBuildBlob(uint32_t RemoteChunkInde void BuildsOperationUpdateFolder::DownloadPartialBlock( - const ChunkBlockAnalyser::BlockRangeDescriptor BlockRange, - const BlobsExistsResult& ExistsResult, - std::function<void(IoBuffer&& InMemoryBuffer, const std::filesystem::path& OnDiskPath)>&& OnDownloaded) + std::span<const ChunkBlockAnalyser::BlockRangeDescriptor> BlockRanges, + size_t BlockRangeStartIndex, + size_t BlockRangeCount, + const BlobsExistsResult& ExistsResult, + std::function<void(IoBuffer&& InMemoryBuffer, + const std::filesystem::path& OnDiskPath, + size_t BlockRangeStartIndex, + std::span<const std::pair<uint64_t, uint64_t>> OffsetAndLengths)>&& OnDownloaded) { - const uint32_t BlockIndex = BlockRange.BlockIndex; + const uint32_t BlockIndex = BlockRanges[BlockRangeStartIndex].BlockIndex; const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex]; - IoBuffer BlockBuffer; - if (m_Storage.BuildCacheStorage && ExistsResult.ExistingBlobs.contains(BlockDescription.BlockHash)) - { - BlockBuffer = - m_Storage.BuildCacheStorage->GetBuildBlob(m_BuildId, BlockDescription.BlockHash, BlockRange.RangeStart, BlockRange.RangeLength); - } - if (!BlockBuffer) - { - BlockBuffer = - m_Storage.BuildStorage->GetBuildBlob(m_BuildId, BlockDescription.BlockHash, BlockRange.RangeStart, BlockRange.RangeLength); - } - if (!BlockBuffer) - { - throw std::runtime_error(fmt::format("Block {} is missing when fetching range {} -> {}", - BlockDescription.BlockHash, - BlockRange.RangeStart, - BlockRange.RangeStart + BlockRange.RangeLength)); - } - if (!m_AbortFlag) - { - uint64_t BlockSize = BlockBuffer.GetSize(); + auto ProcessDownload = [this]( + const ChunkBlockDescription& BlockDescription, + IoBuffer&& BlockRangeBuffer, + size_t BlockRangeStartIndex, + std::span<const std::pair<uint64_t, uint64_t>> BlockOffsetAndLengths, + const std::function<void(IoBuffer && InMemoryBuffer, + const std::filesystem::path& OnDiskPath, + size_t BlockRangeStartIndex, + std::span<const std::pair<uint64_t, uint64_t>> OffsetAndLengths)>& OnDownloaded) { + uint64_t BlockRangeBufferSize = BlockRangeBuffer.GetSize(); m_DownloadStats.DownloadedBlockCount++; - m_DownloadStats.DownloadedBlockByteCount += BlockSize; - m_DownloadStats.RequestsCompleteCount++; + m_DownloadStats.DownloadedBlockByteCount += BlockRangeBufferSize; + m_DownloadStats.RequestsCompleteCount += BlockOffsetAndLengths.size(); std::filesystem::path BlockChunkPath; // Check if the dowloaded block is file based and we can move it directly without rewriting it { IoBufferFileReference FileRef; - if (BlockBuffer.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) && (FileRef.FileChunkSize == BlockSize)) + if (BlockRangeBuffer.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) && + (FileRef.FileChunkSize == BlockRangeBufferSize)) { ZEN_TRACE_CPU("MoveTempPartialBlock"); @@ -3207,10 +3218,17 @@ BuildsOperationUpdateFolder::DownloadPartialBlock( std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec); if (!Ec) { - BlockBuffer.SetDeleteOnClose(false); - BlockBuffer = {}; - BlockChunkPath = m_TempBlockFolderPath / - fmt::format("{}_{:x}_{:x}", BlockDescription.BlockHash, BlockRange.RangeStart, BlockRange.RangeLength); + BlockRangeBuffer.SetDeleteOnClose(false); + BlockRangeBuffer = {}; + + IoHashStream RangeId; + for (const std::pair<uint64_t, uint64_t>& Range : BlockOffsetAndLengths) + { + RangeId.Append(&Range.first, sizeof(uint64_t)); + RangeId.Append(&Range.second, sizeof(uint64_t)); + } + + BlockChunkPath = m_TempBlockFolderPath / fmt::format("{}_{}", BlockDescription.BlockHash, RangeId.GetHash()); RenameFile(TempBlobPath, BlockChunkPath, Ec); if (Ec) { @@ -3218,27 +3236,137 @@ BuildsOperationUpdateFolder::DownloadPartialBlock( // Re-open the temp file again BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete); - BlockBuffer = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, BlockSize, true); - BlockBuffer.SetDeleteOnClose(true); + BlockRangeBuffer = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, BlockRangeBufferSize, true); + BlockRangeBuffer.SetDeleteOnClose(true); } } } } - if (BlockChunkPath.empty() && (BlockSize > m_Options.MaximumInMemoryPayloadSize)) + if (BlockChunkPath.empty() && (BlockRangeBufferSize > m_Options.MaximumInMemoryPayloadSize)) { ZEN_TRACE_CPU("WriteTempPartialBlock"); + + IoHashStream RangeId; + for (const std::pair<uint64_t, uint64_t>& Range : BlockOffsetAndLengths) + { + RangeId.Append(&Range.first, sizeof(uint64_t)); + RangeId.Append(&Range.second, sizeof(uint64_t)); + } + // Could not be moved and rather large, lets store it on disk - BlockChunkPath = m_TempBlockFolderPath / - fmt::format("{}_{:x}_{:x}", BlockDescription.BlockHash, BlockRange.RangeStart, BlockRange.RangeLength); - TemporaryFile::SafeWriteFile(BlockChunkPath, BlockBuffer); - BlockBuffer = {}; + BlockChunkPath = m_TempBlockFolderPath / fmt::format("{}_{}", BlockDescription.BlockHash, RangeId.GetHash()); + TemporaryFile::SafeWriteFile(BlockChunkPath, BlockRangeBuffer); + BlockRangeBuffer = {}; } if (!m_AbortFlag) { - OnDownloaded(std::move(BlockBuffer), std::move(BlockChunkPath)); + OnDownloaded(std::move(BlockRangeBuffer), std::move(BlockChunkPath), BlockRangeStartIndex, BlockOffsetAndLengths); + } + }; + + std::vector<std::pair<uint64_t, uint64_t>> Ranges; + Ranges.reserve(BlockRangeCount); + for (size_t BlockRangeIndex = BlockRangeStartIndex; BlockRangeIndex < BlockRangeStartIndex + BlockRangeCount; BlockRangeIndex++) + { + const ChunkBlockAnalyser::BlockRangeDescriptor& BlockRange = BlockRanges[BlockRangeIndex]; + Ranges.push_back(std::make_pair(BlockRange.RangeStart, BlockRange.RangeLength)); + } + + if (m_Storage.BuildCacheStorage && ExistsResult.ExistingBlobs.contains(BlockDescription.BlockHash)) + { + BuildStorageCache::BuildBlobRanges RangeBuffers = + m_Storage.BuildCacheStorage->GetBuildBlobRanges(m_BuildId, BlockDescription.BlockHash, Ranges); + if (RangeBuffers.PayloadBuffer) + { + if (!m_AbortFlag) + { + if (RangeBuffers.Ranges.size() != Ranges.size()) + { + throw std::runtime_error(fmt::format("Fetching {} ranges from {} resulted in {} ranges", + Ranges.size(), + BlockDescription.BlockHash, + RangeBuffers.Ranges.size())); + } + + std::vector<std::pair<uint64_t, uint64_t>> BlockOffsetAndLengths = std::move(RangeBuffers.Ranges); + ProcessDownload(BlockDescription, + std::move(RangeBuffers.PayloadBuffer), + BlockRangeStartIndex, + BlockOffsetAndLengths, + OnDownloaded); + } + return; } } + + const size_t MaxRangesPerRequestToJupiter = BuildStorageBase::MaxRangeCountPerRequest; + + size_t SubBlockRangeCount = BlockRangeCount; + size_t SubRangeCountComplete = 0; + std::span<const std::pair<uint64_t, uint64_t>> RangesSpan(Ranges); + while (SubRangeCountComplete < SubBlockRangeCount) + { + if (m_AbortFlag) + { + break; + } + size_t SubRangeCount = Min(BlockRangeCount - SubRangeCountComplete, MaxRangesPerRequestToJupiter); + size_t SubRangeStartIndex = BlockRangeStartIndex + SubRangeCountComplete; + + auto SubRanges = RangesSpan.subspan(SubRangeCountComplete, SubRangeCount); + + BuildStorageBase::BuildBlobRanges RangeBuffers = + m_Storage.BuildStorage->GetBuildBlobRanges(m_BuildId, BlockDescription.BlockHash, SubRanges); + if (RangeBuffers.PayloadBuffer) + { + if (m_AbortFlag) + { + break; + } + if (RangeBuffers.Ranges.empty()) + { + // Jupiter will ignore the ranges and send the whole payload if it fetches the payload from S3 + // Upload to cache (if enabled) and use the whole payload for the remaining ranges + + if (m_Storage.BuildCacheStorage && m_Options.PopulateCache) + { + m_Storage.BuildCacheStorage->PutBuildBlob(m_BuildId, + BlockDescription.BlockHash, + ZenContentType::kCompressedBinary, + CompositeBuffer(std::vector<IoBuffer>{RangeBuffers.PayloadBuffer})); + } + + SubRangeCount = Ranges.size() - SubRangeCountComplete; + ProcessDownload(BlockDescription, + std::move(RangeBuffers.PayloadBuffer), + SubRangeStartIndex, + RangesSpan.subspan(SubRangeCountComplete, SubRangeCount), + OnDownloaded); + } + else + { + if (RangeBuffers.Ranges.size() != SubRanges.size()) + { + throw std::runtime_error(fmt::format("Fetching {} ranges from {} resulted in {} ranges", + SubRanges.size(), + BlockDescription.BlockHash, + RangeBuffers.Ranges.size())); + } + ProcessDownload(BlockDescription, + std::move(RangeBuffers.PayloadBuffer), + SubRangeStartIndex, + RangeBuffers.Ranges, + OnDownloaded); + } + } + else + { + throw std::runtime_error(fmt::format("Block {} is missing when fetching {} ranges", BlockDescription.BlockHash, SubRangeCount)); + } + + SubRangeCountComplete += SubRangeCount; + } } std::vector<uint32_t> @@ -7083,16 +7211,31 @@ GetRemoteContent(OperationLogOutput& Output, // TODO: GetBlockDescriptions for all BlockRawHashes in one go - check for local block descriptions when we cache them { + if (!IsQuiet) + { + ZEN_OPERATION_LOG_INFO(Output, "Fetching metadata for {} blocks", BlockRawHashes.size()); + } + + Stopwatch GetBlockMetadataTimer; + bool AttemptFallback = false; OutBlockDescriptions = GetBlockDescriptions(Output, *Storage.BuildStorage, Storage.BuildCacheStorage.get(), BuildId, - BuildPartId, BlockRawHashes, AttemptFallback, IsQuiet, IsVerbose); + + if (!IsQuiet) + { + ZEN_OPERATION_LOG_INFO(Output, + "GetBlockMetadata for {} took {}. Found {} blocks", + BuildPartId, + NiceTimeSpanMs(GetBlockMetadataTimer.GetElapsedTimeMs()), + OutBlockDescriptions.size()); + } } CalculateLocalChunkOrders(AbsoluteChunkOrders, @@ -7935,6 +8078,164 @@ TEST_CASE("buildstorageoperations.upload.multipart") } } +TEST_CASE("buildstorageoperations.partial.block.download" * doctest::skip(true)) +{ + const std::string OidcExecutableName = "OidcToken" ZEN_EXE_SUFFIX_LITERAL; + std::filesystem::path OidcTokenExePath = (GetRunningExecutablePath().parent_path() / OidcExecutableName).make_preferred(); + + HttpClientSettings ClientSettings{ + .LogCategory = "httpbuildsclient", + .AccessTokenProvider = + httpclientauth::CreateFromOidcTokenExecutable(OidcTokenExePath, "https://jupiter.devtools.epicgames.com", true, false, false), + .AssumeHttp2 = false, + .AllowResume = true, + .RetryCount = 0, + .Verbose = false}; + + HttpClient HttpClient("https://euc.jupiter.devtools.epicgames.com", ClientSettings); + + const std::string_view Namespace = "fortnite.oplog"; + const std::string_view Bucket = "fortnitegame.staged-build.fortnite-main.ps4-client"; + const Oid BuildId = Oid::FromHexString("09a76ea92ad301d4724fafad"); + + { + HttpClient::Response Response = HttpClient.Get(fmt::format("/api/v2/builds/{}/{}/{}", Namespace, Bucket, BuildId), + HttpClient::Accept(ZenContentType::kCbObject)); + CbValidateError ValidateResult = CbValidateError::None; + CbObject Object = ValidateAndReadCompactBinaryObject(IoBuffer(Response.ResponsePayload), ValidateResult); + REQUIRE(ValidateResult == CbValidateError::None); + } + + std::vector<ChunkBlockDescription> BlockDescriptions; + { + CbObjectWriter Request; + + Request.BeginArray("blocks"sv); + { + Request.AddHash(IoHash::FromHexString("7c353ed782675a5e8f968e61e51fc797ecdc2882")); + } + Request.EndArray(); + + IoBuffer Payload = Request.Save().GetBuffer().AsIoBuffer(); + Payload.SetContentType(ZenContentType::kCbObject); + + HttpClient::Response BlockDescriptionsResponse = + HttpClient.Post(fmt::format("/api/v2/builds/{}/{}/{}/blocks/getBlockMetadata", Namespace, Bucket, BuildId), + Payload, + HttpClient::Accept(ZenContentType::kCbObject)); + REQUIRE(BlockDescriptionsResponse.IsSuccess()); + + CbValidateError ValidateResult = CbValidateError::None; + CbObject Object = ValidateAndReadCompactBinaryObject(IoBuffer(BlockDescriptionsResponse.ResponsePayload), ValidateResult); + REQUIRE(ValidateResult == CbValidateError::None); + + { + CbArrayView BlocksArray = Object["blocks"sv].AsArrayView(); + for (CbFieldView Block : BlocksArray) + { + ChunkBlockDescription Description = ParseChunkBlockDescription(Block.AsObjectView()); + BlockDescriptions.emplace_back(std::move(Description)); + } + } + } + + REQUIRE(!BlockDescriptions.empty()); + + const IoHash BlockHash = BlockDescriptions.back().BlockHash; + + const ChunkBlockDescription& BlockDescription = BlockDescriptions.front(); + REQUIRE(!BlockDescription.ChunkRawHashes.empty()); + REQUIRE(!BlockDescription.ChunkCompressedLengths.empty()); + + std::vector<std::pair<uint64_t, uint64_t>> ChunkOffsetAndSizes; + uint64_t Offset = gsl::narrow<uint32_t>(CompressedBuffer::GetHeaderSizeForNoneEncoder() + BlockDescription.HeaderSize); + + for (uint32_t ChunkCompressedSize : BlockDescription.ChunkCompressedLengths) + { + ChunkOffsetAndSizes.push_back(std::make_pair(Offset, ChunkCompressedSize)); + Offset += ChunkCompressedSize; + } + + ScopedTemporaryDirectory SourceFolder; + + auto Validate = [&](std::span<const uint32_t> ChunkIndexesToFetch) { + std::vector<std::pair<uint64_t, uint64_t>> Ranges; + for (uint32_t ChunkIndex : ChunkIndexesToFetch) + { + Ranges.push_back(ChunkOffsetAndSizes[ChunkIndex]); + } + + HttpClient::KeyValueMap Headers; + if (!Ranges.empty()) + { + ExtendableStringBuilder<512> SB; + for (const std::pair<uint64_t, uint64_t>& R : Ranges) + { + if (SB.Size() > 0) + { + SB << ", "; + } + SB << R.first << "-" << R.first + R.second - 1; + } + Headers.Entries.insert({"Range", fmt::format("bytes={}", SB.ToView())}); + } + + HttpClient::Response GetBlobRangesResponse = HttpClient.Download( + fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}?supportsRedirect=false", Namespace, Bucket, BuildId, BlockHash), + SourceFolder.Path(), + Headers); + + REQUIRE(GetBlobRangesResponse.IsSuccess()); + MemoryView RangesMemoryView = GetBlobRangesResponse.ResponsePayload.GetView(); + + std::vector<std::pair<uint64_t, uint64_t>> PayloadRanges = GetBlobRangesResponse.GetRanges(Ranges); + if (PayloadRanges.empty()) + { + // We got the whole blob, use the ranges as is + PayloadRanges = Ranges; + } + + REQUIRE(PayloadRanges.size() == Ranges.size()); + + for (uint32_t RangeIndex = 0; RangeIndex < PayloadRanges.size(); RangeIndex++) + { + const std::pair<uint64_t, uint64_t>& PayloadRange = PayloadRanges[RangeIndex]; + + CHECK_EQ(PayloadRange.second, Ranges[RangeIndex].second); + + IoBuffer ChunkPayload(GetBlobRangesResponse.ResponsePayload, PayloadRange.first, PayloadRange.second); + IoHash RawHash; + uint64_t RawSize; + CompressedBuffer CompressedChunk = CompressedBuffer::FromCompressed(SharedBuffer(ChunkPayload), RawHash, RawSize); + CHECK(CompressedChunk); + CHECK_EQ(RawHash, BlockDescription.ChunkRawHashes[ChunkIndexesToFetch[RangeIndex]]); + CHECK_EQ(RawSize, BlockDescription.ChunkRawLengths[ChunkIndexesToFetch[RangeIndex]]); + } + }; + + { + // Single + std::vector<uint32_t> ChunkIndexesToFetch{uint32_t(BlockDescription.ChunkCompressedLengths.size() / 2)}; + Validate(ChunkIndexesToFetch); + } + { + // Many + std::vector<uint32_t> ChunkIndexesToFetch; + for (uint32_t Index = 0; Index < BlockDescription.ChunkCompressedLengths.size() / 16; Index++) + { + ChunkIndexesToFetch.push_back(uint32_t(BlockDescription.ChunkCompressedLengths.size() / 6 + Index * 7)); + ChunkIndexesToFetch.push_back(uint32_t(BlockDescription.ChunkCompressedLengths.size() / 6 + Index * 7 + 1)); + ChunkIndexesToFetch.push_back(uint32_t(BlockDescription.ChunkCompressedLengths.size() / 6 + Index * 7 + 3)); + } + Validate(ChunkIndexesToFetch); + } + + { + // First and last + std::vector<uint32_t> ChunkIndexesToFetch{0, uint32_t(BlockDescription.ChunkCompressedLengths.size() - 1)}; + Validate(ChunkIndexesToFetch); + } +} TEST_SUITE_END(); void diff --git a/src/zenremotestore/builds/buildstorageutil.cpp b/src/zenremotestore/builds/buildstorageutil.cpp index b249d7d52..d65f18b9a 100644 --- a/src/zenremotestore/builds/buildstorageutil.cpp +++ b/src/zenremotestore/builds/buildstorageutil.cpp @@ -251,7 +251,6 @@ GetBlockDescriptions(OperationLogOutput& Output, BuildStorageBase& Storage, BuildStorageCache* OptionalCacheStorage, const Oid& BuildId, - const Oid& BuildPartId, std::span<const IoHash> BlockRawHashes, bool AttemptFallback, bool IsQuiet, @@ -259,13 +258,6 @@ GetBlockDescriptions(OperationLogOutput& Output, { using namespace std::literals; - if (!IsQuiet) - { - ZEN_OPERATION_LOG_INFO(Output, "Fetching metadata for {} blocks", BlockRawHashes.size()); - } - - Stopwatch GetBlockMetadataTimer; - std::vector<ChunkBlockDescription> UnorderedList; tsl::robin_map<IoHash, size_t, IoHash::Hasher> BlockDescriptionLookup; if (OptionalCacheStorage && !BlockRawHashes.empty()) @@ -355,15 +347,6 @@ GetBlockDescriptions(OperationLogOutput& Output, } } - if (!IsQuiet) - { - ZEN_OPERATION_LOG_INFO(Output, - "GetBlockMetadata for {} took {}. Found {} blocks", - BuildPartId, - NiceTimeSpanMs(GetBlockMetadataTimer.GetElapsedTimeMs()), - Result.size()); - } - if (Result.size() != BlockRawHashes.size()) { std::string ErrorDescription = diff --git a/src/zenremotestore/builds/filebuildstorage.cpp b/src/zenremotestore/builds/filebuildstorage.cpp index 55e69de61..2f4904449 100644 --- a/src/zenremotestore/builds/filebuildstorage.cpp +++ b/src/zenremotestore/builds/filebuildstorage.cpp @@ -432,6 +432,45 @@ public: return IoBuffer{}; } + virtual BuildBlobRanges GetBuildBlobRanges(const Oid& BuildId, + const IoHash& RawHash, + std::span<const std::pair<uint64_t, uint64_t>> Ranges) override + { + ZEN_TRACE_CPU("FileBuildStorage::GetBuildBlobRanges"); + ZEN_UNUSED(BuildId); + ZEN_ASSERT(!Ranges.empty()); + + uint64_t ReceivedBytes = 0; + uint64_t SentBytes = Ranges.size() * 2 * 8; + + SimulateLatency(SentBytes, 0); + auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); + + Stopwatch ExecutionTimer; + auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); + + BuildBlobRanges Result; + + const std::filesystem::path BlockPath = GetBlobPayloadPath(RawHash); + if (IsFile(BlockPath)) + { + BasicFile File(BlockPath, BasicFile::Mode::kRead); + + uint64_t RangeOffset = Ranges.front().first; + uint64_t RangeBytes = Ranges.back().first + Ranges.back().second - RangeOffset; + Result.PayloadBuffer = IoBufferBuilder::MakeFromFileHandle(File.Detach(), RangeOffset, RangeBytes); + + Result.Ranges.reserve(Ranges.size()); + + for (const std::pair<uint64_t, uint64_t>& Range : Ranges) + { + Result.Ranges.push_back(std::make_pair(Range.first - RangeOffset, Range.second)); + } + ReceivedBytes = Result.PayloadBuffer.GetSize(); + } + return Result; + } + virtual std::vector<std::function<void()>> GetLargeBuildBlob(const Oid& BuildId, const IoHash& RawHash, uint64_t ChunkSize, diff --git a/src/zenremotestore/builds/jupiterbuildstorage.cpp b/src/zenremotestore/builds/jupiterbuildstorage.cpp index 23d0ddd4c..8e16da1a9 100644 --- a/src/zenremotestore/builds/jupiterbuildstorage.cpp +++ b/src/zenremotestore/builds/jupiterbuildstorage.cpp @@ -21,7 +21,7 @@ namespace zen { using namespace std::literals; namespace { - void ThrowFromJupiterResult(const JupiterResult& Result, std::string_view Prefix) + [[noreturn]] void ThrowFromJupiterResult(const JupiterResult& Result, std::string_view Prefix) { int Error = Result.ErrorCode < (int)HttpResponseCode::Continue ? Result.ErrorCode : 0; HttpResponseCode Status = @@ -295,6 +295,26 @@ public: return std::move(GetBuildBlobResult.Response); } + virtual BuildBlobRanges GetBuildBlobRanges(const Oid& BuildId, + const IoHash& RawHash, + std::span<const std::pair<uint64_t, uint64_t>> Ranges) override + { + ZEN_TRACE_CPU("Jupiter::GetBuildBlob"); + + Stopwatch ExecutionTimer; + auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); + CreateDirectories(m_TempFolderPath); + + BuildBlobRangesResult GetBuildBlobResult = + m_Session.GetBuildBlob(m_Namespace, m_Bucket, BuildId, RawHash, m_TempFolderPath, Ranges); + AddStatistic(GetBuildBlobResult); + if (!GetBuildBlobResult.Success) + { + ThrowFromJupiterResult(GetBuildBlobResult, "Failed fetching build blob ranges"sv); + } + return BuildBlobRanges{.PayloadBuffer = std::move(GetBuildBlobResult.Response), .Ranges = std::move(GetBuildBlobResult.Ranges)}; + } + virtual std::vector<std::function<void()>> GetLargeBuildBlob(const Oid& BuildId, const IoHash& RawHash, uint64_t ChunkSize, diff --git a/src/zenremotestore/chunking/chunkblock.cpp b/src/zenremotestore/chunking/chunkblock.cpp index 3a4e6011d..9c3fe8a0b 100644 --- a/src/zenremotestore/chunking/chunkblock.cpp +++ b/src/zenremotestore/chunking/chunkblock.cpp @@ -608,40 +608,49 @@ ChunkBlockAnalyser::CalculatePartialBlockDownloads(std::span<const NeededBlock> if (PartialBlockDownloadMode != EPartialBlockDownloadMode::Exact && BlockRanges.size() > 1) { - // TODO: Once we have support in our http client to request multiple ranges in one request this - // logic would need to change as the per-request overhead would go away + const uint64_t MaxRangeCountPerRequest = PartialBlockDownloadMode == EPartialBlockDownloadMode::MultiRangeHighSpeed + ? m_Options.HostHighSpeedMaxRangeCountPerRequest + : m_Options.HostMaxRangeCountPerRequest; - const double LatencySec = PartialBlockDownloadMode == EPartialBlockDownloadMode::MultiRangeHighSpeed - ? m_Options.HostHighSpeedLatencySec - : m_Options.HostLatencySec; - if (LatencySec > 0) + ZEN_ASSERT(MaxRangeCountPerRequest != 0); + + if (MaxRangeCountPerRequest != (uint64_t)-1) { - const uint64_t BytesPerSec = PartialBlockDownloadMode == EPartialBlockDownloadMode::MultiRangeHighSpeed - ? m_Options.HostHighSpeedBytesPerSec - : m_Options.HostSpeedBytesPerSec; + const uint64_t ExtraRequestCount = BlockRanges.size() / MaxRangeCountPerRequest; - const double ExtraRequestTimeSec = (BlockRanges.size() - 1) * LatencySec; - const uint64_t ExtraRequestTimeBytes = uint64_t(ExtraRequestTimeSec * BytesPerSec); + const double LatencySec = PartialBlockDownloadMode == EPartialBlockDownloadMode::MultiRangeHighSpeed + ? m_Options.HostHighSpeedLatencySec + : m_Options.HostLatencySec; + if (LatencySec > 0) + { + const uint64_t BytesPerSec = PartialBlockDownloadMode == EPartialBlockDownloadMode::MultiRangeHighSpeed + ? m_Options.HostHighSpeedBytesPerSec + : m_Options.HostSpeedBytesPerSec; - const uint64_t FullRangeSize = - BlockRanges.back().RangeStart + BlockRanges.back().RangeLength - BlockRanges.front().RangeStart; + const double ExtraRequestTimeSec = ExtraRequestCount * LatencySec; + const uint64_t ExtraRequestTimeBytes = uint64_t(ExtraRequestTimeSec * BytesPerSec); - if (ExtraRequestTimeBytes + RequestedSize >= FullRangeSize) - { - BlockRanges = std::vector<BlockRangeDescriptor>{MergeBlockRanges(BlockRanges)}; + const uint64_t FullRangeSize = + BlockRanges.back().RangeStart + BlockRanges.back().RangeLength - BlockRanges.front().RangeStart; - if (m_Options.IsVerbose) + if (ExtraRequestTimeBytes + RequestedSize >= FullRangeSize) { - ZEN_OPERATION_LOG_INFO(m_LogOutput, - "Merging {} chunks ({}) from block {} ({}) to single request (extra bytes {})", - NeededBlock.ChunkIndexes.size(), - NiceBytes(RequestedSize), - BlockDescription.BlockHash, - NiceBytes(TotalBlockSize), - NiceBytes(BlockRanges.front().RangeLength - RequestedSize)); + BlockRanges = std::vector<BlockRangeDescriptor>{MergeBlockRanges(BlockRanges)}; + + if (m_Options.IsVerbose) + { + ZEN_OPERATION_LOG_INFO( + m_LogOutput, + "Merging {} chunks ({}) from block {} ({}) to single request (extra bytes {})", + NeededBlock.ChunkIndexes.size(), + NiceBytes(RequestedSize), + BlockDescription.BlockHash, + NiceBytes(TotalBlockSize), + NiceBytes(BlockRanges.front().RangeLength - RequestedSize)); + } + + RequestedSize = BlockRanges.front().RangeLength; } - - RequestedSize = BlockRanges.front().RangeLength; } } } @@ -730,7 +739,7 @@ ChunkBlockAnalyser::CalculatePartialBlockDownloads(std::span<const NeededBlock> ZEN_OPERATION_LOG_INFO(m_LogOutput, "Analysis of partial block requests saves download of {} out of {}, {:.1f}% of possible {} using {} extra " - "requests. Completed in {}", + "ranges. Completed in {}", NiceBytes(ActualSkippedSize), NiceBytes(AllBlocksTotalBlocksSize), PercentOfIdealPartialSkippedSize, diff --git a/src/zenremotestore/include/zenremotestore/builds/buildstorage.h b/src/zenremotestore/include/zenremotestore/builds/buildstorage.h index 85dabc59f..ce3da41c1 100644 --- a/src/zenremotestore/include/zenremotestore/builds/buildstorage.h +++ b/src/zenremotestore/include/zenremotestore/builds/buildstorage.h @@ -53,15 +53,26 @@ public: std::function<IoBuffer(uint64_t Offset, uint64_t Size)>&& Transmitter, std::function<void(uint64_t, bool)>&& OnSentBytes) = 0; - virtual IoBuffer GetBuildBlob(const Oid& BuildId, - const IoHash& RawHash, - uint64_t RangeOffset = 0, - uint64_t RangeBytes = (uint64_t)-1) = 0; + virtual IoBuffer GetBuildBlob(const Oid& BuildId, + const IoHash& RawHash, + uint64_t RangeOffset = 0, + uint64_t RangeBytes = (uint64_t)-1) = 0; + + static constexpr size_t MaxRangeCountPerRequest = 128u; + + struct BuildBlobRanges + { + IoBuffer PayloadBuffer; + std::vector<std::pair<uint64_t, uint64_t>> Ranges; + }; + virtual BuildBlobRanges GetBuildBlobRanges(const Oid& BuildId, + const IoHash& RawHash, + std::span<const std::pair<uint64_t, uint64_t>> Ranges) = 0; virtual std::vector<std::function<void()>> GetLargeBuildBlob(const Oid& BuildId, const IoHash& RawHash, uint64_t ChunkSize, std::function<void(uint64_t Offset, const IoBuffer& Chunk)>&& OnReceive, - std::function<void()>&& OnComplete) = 0; + std::function<void()>&& OnComplete) = 0; [[nodiscard]] virtual bool PutBlockMetadata(const Oid& BuildId, const IoHash& BlockRawHash, const CbObject& MetaData) = 0; virtual CbObject FindBlocks(const Oid& BuildId, uint64_t MaxBlockCount) = 0; diff --git a/src/zenremotestore/include/zenremotestore/builds/buildstoragecache.h b/src/zenremotestore/include/zenremotestore/builds/buildstoragecache.h index f25ce5b5e..67c93480b 100644 --- a/src/zenremotestore/include/zenremotestore/builds/buildstoragecache.h +++ b/src/zenremotestore/include/zenremotestore/builds/buildstoragecache.h @@ -37,6 +37,14 @@ public: const IoHash& RawHash, uint64_t RangeOffset = 0, uint64_t RangeBytes = (uint64_t)-1) = 0; + struct BuildBlobRanges + { + IoBuffer PayloadBuffer; + std::vector<std::pair<uint64_t, uint64_t>> Ranges; + }; + virtual BuildBlobRanges GetBuildBlobRanges(const Oid& BuildId, + const IoHash& RawHash, + std::span<const std::pair<uint64_t, uint64_t>> Ranges) = 0; virtual void PutBlobMetadatas(const Oid& BuildId, std::span<const IoHash> BlobHashes, std::span<const CbObject> MetaDatas) = 0; virtual std::vector<CbObject> GetBlobMetadatas(const Oid& BuildId, std::span<const IoHash> BlobHashes) = 0; diff --git a/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h b/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h index 31733569e..875b8593b 100644 --- a/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h +++ b/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h @@ -263,9 +263,14 @@ private: ParallelWork& Work, std::function<void(IoBuffer&& Payload)>&& OnDownloaded); - void DownloadPartialBlock(const ChunkBlockAnalyser::BlockRangeDescriptor BlockRange, - const BlobsExistsResult& ExistsResult, - std::function<void(IoBuffer&& InMemoryBuffer, const std::filesystem::path& OnDiskPath)>&& OnDownloaded); + void DownloadPartialBlock(std::span<const ChunkBlockAnalyser::BlockRangeDescriptor> BlockRanges, + size_t BlockRangeIndex, + size_t BlockRangeCount, + const BlobsExistsResult& ExistsResult, + std::function<void(IoBuffer&& InMemoryBuffer, + const std::filesystem::path& OnDiskPath, + size_t BlockRangeStartIndex, + std::span<const std::pair<uint64_t, uint64_t>> OffsetAndLengths)>&& OnDownloaded); std::vector<uint32_t> WriteLocalChunkToCache(CloneQueryInterface* CloneQuery, const CopyChunkData& CopyData, diff --git a/src/zenremotestore/include/zenremotestore/builds/buildstorageutil.h b/src/zenremotestore/include/zenremotestore/builds/buildstorageutil.h index 4b85d8f1e..764a24e61 100644 --- a/src/zenremotestore/include/zenremotestore/builds/buildstorageutil.h +++ b/src/zenremotestore/include/zenremotestore/builds/buildstorageutil.h @@ -45,7 +45,6 @@ std::vector<ChunkBlockDescription> GetBlockDescriptions(OperationLogOutput& Out BuildStorageBase& Storage, BuildStorageCache* OptionalCacheStorage, const Oid& BuildId, - const Oid& BuildPartId, std::span<const IoHash> BlockRawHashes, bool AttemptFallback, bool IsQuiet, diff --git a/src/zenremotestore/include/zenremotestore/chunking/chunkblock.h b/src/zenremotestore/include/zenremotestore/chunking/chunkblock.h index 5a17ef79c..7aae1442e 100644 --- a/src/zenremotestore/include/zenremotestore/chunking/chunkblock.h +++ b/src/zenremotestore/include/zenremotestore/chunking/chunkblock.h @@ -82,12 +82,14 @@ class ChunkBlockAnalyser public: struct Options { - bool IsQuiet = false; - bool IsVerbose = false; - double HostLatencySec = -1.0; - double HostHighSpeedLatencySec = -1.0; - uint64_t HostSpeedBytesPerSec = (1u * 1024u * 1024u * 1024u) / 8u; // 1GBit - uint64_t HostHighSpeedBytesPerSec = (2u * 1024u * 1024u * 1024u) / 8u; // 2GBit + bool IsQuiet = false; + bool IsVerbose = false; + double HostLatencySec = -1.0; + double HostHighSpeedLatencySec = -1.0; + uint64_t HostSpeedBytesPerSec = (1u * 1024u * 1024u * 1024u) / 8u; // 1GBit + uint64_t HostHighSpeedBytesPerSec = (2u * 1024u * 1024u * 1024u) / 8u; // 2GBit + uint64_t HostMaxRangeCountPerRequest = (uint64_t)-1; + uint64_t HostHighSpeedMaxRangeCountPerRequest = (uint64_t)-1; // No limit }; ChunkBlockAnalyser(OperationLogOutput& LogOutput, std::span<const ChunkBlockDescription> BlockDescriptions, const Options& Options); @@ -137,14 +139,15 @@ private: static constexpr uint16_t FullBlockRangePercentLimit = 98; - static constexpr BlockRangeLimit ForceMergeLimits[] = {{.SizePercent = FullBlockRangePercentLimit, .MaxRangeCount = 1}, - {.SizePercent = 90, .MaxRangeCount = 4}, - {.SizePercent = 85, .MaxRangeCount = 16}, - {.SizePercent = 80, .MaxRangeCount = 32}, - {.SizePercent = 75, .MaxRangeCount = 48}, - {.SizePercent = 70, .MaxRangeCount = 64}, - {.SizePercent = 4, .MaxRangeCount = 82}, - {.SizePercent = 0, .MaxRangeCount = 96}}; + static constexpr BlockRangeLimit ForceMergeLimits[] = {{.SizePercent = FullBlockRangePercentLimit, .MaxRangeCount = 8}, + {.SizePercent = 90, .MaxRangeCount = 16}, + {.SizePercent = 85, .MaxRangeCount = 32}, + {.SizePercent = 80, .MaxRangeCount = 48}, + {.SizePercent = 75, .MaxRangeCount = 64}, + {.SizePercent = 70, .MaxRangeCount = 92}, + {.SizePercent = 50, .MaxRangeCount = 128}, + {.SizePercent = 4, .MaxRangeCount = 192}, + {.SizePercent = 0, .MaxRangeCount = 256}}; BlockRangeDescriptor MergeBlockRanges(std::span<const BlockRangeDescriptor> Ranges); std::optional<std::vector<BlockRangeDescriptor>> MakeOptionalBlockRangeVector(uint64_t TotalBlockSize, diff --git a/src/zenremotestore/include/zenremotestore/jupiter/jupitersession.h b/src/zenremotestore/include/zenremotestore/jupiter/jupitersession.h index eaf6962fd..8721bc37f 100644 --- a/src/zenremotestore/include/zenremotestore/jupiter/jupitersession.h +++ b/src/zenremotestore/include/zenremotestore/jupiter/jupitersession.h @@ -56,6 +56,11 @@ struct FinalizeBuildPartResult : JupiterResult std::vector<IoHash> Needs; }; +struct BuildBlobRangesResult : JupiterResult +{ + std::vector<std::pair<uint64_t, uint64_t>> Ranges; +}; + /** * Context for performing Jupiter operations * @@ -135,6 +140,13 @@ public: uint64_t Offset = 0, uint64_t Size = (uint64_t)-1); + BuildBlobRangesResult GetBuildBlob(std::string_view Namespace, + std::string_view BucketId, + const Oid& BuildId, + const IoHash& Hash, + std::filesystem::path TempFolderPath, + std::span<const std::pair<uint64_t, uint64_t>> Ranges); + JupiterResult PutMultipartBuildBlob(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, diff --git a/src/zenremotestore/include/zenremotestore/projectstore/remoteprojectstore.h b/src/zenremotestore/include/zenremotestore/projectstore/remoteprojectstore.h index 152c02ee2..2cf10c664 100644 --- a/src/zenremotestore/include/zenremotestore/projectstore/remoteprojectstore.h +++ b/src/zenremotestore/include/zenremotestore/projectstore/remoteprojectstore.h @@ -84,6 +84,12 @@ public: std::vector<bool> HasBody; }; + struct LoadAttachmentRangesResult : public Result + { + IoBuffer Bytes; + std::vector<std::pair<uint64_t, uint64_t>> Ranges; + }; + struct RemoteStoreInfo { bool CreateBlocks; @@ -127,15 +133,21 @@ public: virtual GetBlockDescriptionsResult GetBlockDescriptions(std::span<const IoHash> BlockHashes) = 0; virtual AttachmentExistsInCacheResult AttachmentExistsInCache(std::span<const IoHash> RawHashes) = 0; - struct AttachmentRange + enum ESourceMode { - uint64_t Offset = 0; - uint64_t Bytes = (uint64_t)-1; - - inline operator bool() const { return Offset != 0 || Bytes != (uint64_t)-1; } + kAny, + kCacheOnly, + kHostOnly }; - virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash, const AttachmentRange& Range) = 0; - virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes) = 0; + + virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash, ESourceMode SourceMode = ESourceMode::kAny) = 0; + + static constexpr size_t MaxRangeCountPerRequest = 128u; + + virtual LoadAttachmentRangesResult LoadAttachmentRanges(const IoHash& RawHash, + std::span<const std::pair<uint64_t, uint64_t>> Ranges, + ESourceMode SourceMode = ESourceMode::kAny) = 0; + virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes, ESourceMode SourceMode = ESourceMode::kAny) = 0; virtual void Flush() = 0; }; diff --git a/src/zenremotestore/jupiter/jupitersession.cpp b/src/zenremotestore/jupiter/jupitersession.cpp index 1bc6564ce..52f9eb678 100644 --- a/src/zenremotestore/jupiter/jupitersession.cpp +++ b/src/zenremotestore/jupiter/jupitersession.cpp @@ -852,6 +852,71 @@ JupiterSession::GetBuildBlob(std::string_view Namespace, return detail::ConvertResponse(Response, "JupiterSession::GetBuildBlob"sv); } +BuildBlobRangesResult +JupiterSession::GetBuildBlob(std::string_view Namespace, + std::string_view BucketId, + const Oid& BuildId, + const IoHash& Hash, + std::filesystem::path TempFolderPath, + std::span<const std::pair<uint64_t, uint64_t>> Ranges) +{ + HttpClient::KeyValueMap Headers; + if (!Ranges.empty()) + { + ExtendableStringBuilder<512> SB; + for (const std::pair<uint64_t, uint64_t>& R : Ranges) + { + if (SB.Size() > 0) + { + SB << ", "; + } + SB << R.first << "-" << R.first + R.second - 1; + } + Headers.Entries.insert({"Range", fmt::format("bytes={}", SB.ToView())}); + } + std::string Url = fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}?supportsRedirect={}", + Namespace, + BucketId, + BuildId, + Hash.ToHexString(), + m_AllowRedirect ? "true"sv : "false"sv); + + HttpClient::Response Response = m_HttpClient.Download(Url, TempFolderPath, Headers); + if (Response.StatusCode == HttpResponseCode::RangeNotSatisfiable && Ranges.size() > 1) + { + // Requests to Jupiter that is not served via nginx (content not stored locally in the file system) can not serve multi-range + // requests (asp.net limitation) This rejection is not implemented as of 2026-03-02, it is in the backlog (@joakim.lindqvist) + // If we encounter this error we fall back to a single range which covers all the requested ranges + uint64_t RangeStart = Ranges.front().first; + uint64_t RangeEnd = Ranges.back().first + Ranges.back().second - 1; + Headers.Entries.insert_or_assign("Range", fmt::format("bytes={}-{}", RangeStart, RangeEnd)); + Response = m_HttpClient.Download(Url, TempFolderPath, Headers); + } + if (Response.IsSuccess()) + { + // If we get a redirect to S3 or a non-Jupiter endpoint the content type will not be correct, validate it and set it + if (m_AllowRedirect && (Response.ResponsePayload.GetContentType() == HttpContentType::kBinary)) + { + IoHash ValidateRawHash; + uint64_t ValidateRawSize = 0; + if (!Headers.Entries.contains("Range")) + { + ZEN_ASSERT_SLOW(CompressedBuffer::ValidateCompressedHeader(Response.ResponsePayload, + ValidateRawHash, + ValidateRawSize, + /*OutOptionalTotalCompressedSize*/ nullptr)); + ZEN_ASSERT_SLOW(ValidateRawHash == Hash); + ZEN_ASSERT_SLOW(ValidateRawSize > 0); + ZEN_UNUSED(ValidateRawHash, ValidateRawSize); + Response.ResponsePayload.SetContentType(ZenContentType::kCompressedBinary); + } + } + } + BuildBlobRangesResult Result = {detail::ConvertResponse(Response, "JupiterSession::GetBuildBlob"sv)}; + Result.Ranges = Response.GetRanges(Ranges); + return Result; +} + JupiterResult JupiterSession::PutBlockMetadata(std::string_view Namespace, std::string_view BucketId, diff --git a/src/zenremotestore/projectstore/buildsremoteprojectstore.cpp b/src/zenremotestore/projectstore/buildsremoteprojectstore.cpp index c42373e4d..3400cdbf5 100644 --- a/src/zenremotestore/projectstore/buildsremoteprojectstore.cpp +++ b/src/zenremotestore/projectstore/buildsremoteprojectstore.cpp @@ -478,7 +478,6 @@ public: *m_BuildStorage, m_BuildCacheStorage.get(), m_BuildId, - m_OplogBuildPartId, BlockHashes, /*AttemptFallback*/ false, /*IsQuiet*/ false, @@ -549,7 +548,7 @@ public: return Result; } - virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash, const AttachmentRange& Range) override + virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash, ESourceMode SourceMode) override { ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero); @@ -559,25 +558,90 @@ public: try { - if (m_BuildCacheStorage) + if (m_BuildCacheStorage && SourceMode != ESourceMode::kHostOnly) { - IoBuffer CachedBlob = m_BuildCacheStorage->GetBuildBlob(m_BuildId, RawHash, Range.Offset, Range.Bytes); + IoBuffer CachedBlob = m_BuildCacheStorage->GetBuildBlob(m_BuildId, RawHash); if (CachedBlob) { Result.Bytes = std::move(CachedBlob); } } - if (!Result.Bytes) + if (!Result.Bytes && SourceMode != ESourceMode::kCacheOnly) { - Result.Bytes = m_BuildStorage->GetBuildBlob(m_BuildId, RawHash, Range.Offset, Range.Bytes); + Result.Bytes = m_BuildStorage->GetBuildBlob(m_BuildId, RawHash); if (m_BuildCacheStorage && Result.Bytes && m_PopulateCache) { - if (!Range) + m_BuildCacheStorage->PutBuildBlob(m_BuildId, + RawHash, + Result.Bytes.GetContentType(), + CompositeBuffer(SharedBuffer(Result.Bytes))); + } + } + } + catch (const HttpClientError& Ex) + { + Result.ErrorCode = MakeErrorCode(Ex); + Result.Reason = fmt::format("Failed getting blob {}/{}/{}/{}/{}. Reason: '{}'", + m_BuildStorageHttp.GetBaseUri(), + m_Namespace, + m_Bucket, + m_BuildId, + RawHash, + Ex.what()); + } + catch (const std::exception& Ex) + { + Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); + Result.Reason = fmt::format("Failed getting blob {}/{}/{}/{}/{}. Reason: '{}'", + m_BuildStorageHttp.GetBaseUri(), + m_Namespace, + m_Bucket, + m_BuildId, + RawHash, + Ex.what()); + } + + return Result; + } + + virtual LoadAttachmentRangesResult LoadAttachmentRanges(const IoHash& RawHash, + std::span<const std::pair<uint64_t, uint64_t>> Ranges, + ESourceMode SourceMode) override + { + LoadAttachmentRangesResult Result; + Stopwatch Timer; + auto _ = MakeGuard([&Timer, &Result]() { Result.ElapsedSeconds = Timer.GetElapsedTimeUs() / 1000000.0; }); + + try + { + if (m_BuildCacheStorage && SourceMode != ESourceMode::kHostOnly) + { + BuildStorageCache::BuildBlobRanges BlobRanges = m_BuildCacheStorage->GetBuildBlobRanges(m_BuildId, RawHash, Ranges); + if (BlobRanges.PayloadBuffer) + { + Result.Bytes = std::move(BlobRanges.PayloadBuffer); + Result.Ranges = std::move(BlobRanges.Ranges); + } + } + if (!Result.Bytes && SourceMode != ESourceMode::kCacheOnly) + { + BuildStorageBase::BuildBlobRanges BlobRanges = m_BuildStorage->GetBuildBlobRanges(m_BuildId, RawHash, Ranges); + if (BlobRanges.PayloadBuffer) + { + Result.Bytes = std::move(BlobRanges.PayloadBuffer); + Result.Ranges = std::move(BlobRanges.Ranges); + + if (Result.Ranges.empty()) { - m_BuildCacheStorage->PutBuildBlob(m_BuildId, - RawHash, - Result.Bytes.GetContentType(), - CompositeBuffer(SharedBuffer(Result.Bytes))); + // Jupiter will ignore the ranges and send the whole payload if it fetches the payload from S3/Replicated + // Upload to cache (if enabled) + if (m_BuildCacheStorage && Result.Bytes && m_PopulateCache) + { + m_BuildCacheStorage->PutBuildBlob(m_BuildId, + RawHash, + Result.Bytes.GetContentType(), + CompositeBuffer(SharedBuffer(Result.Bytes))); + } } } } @@ -585,28 +649,32 @@ public: catch (const HttpClientError& Ex) { Result.ErrorCode = MakeErrorCode(Ex); - Result.Reason = fmt::format("Failed listing known blocks for {}/{}/{}/{}. Reason: '{}'", + Result.Reason = fmt::format("Failed getting {} ranges for blob {}/{}/{}/{}/{}. Reason: '{}'", + Ranges.size(), m_BuildStorageHttp.GetBaseUri(), m_Namespace, m_Bucket, m_BuildId, + RawHash, Ex.what()); } catch (const std::exception& Ex) { Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); - Result.Reason = fmt::format("Failed listing known blocks for {}/{}/{}/{}. Reason: '{}'", + Result.Reason = fmt::format("Failed getting {} ranges for blob {}/{}/{}/{}/{}. Reason: '{}'", + Ranges.size(), m_BuildStorageHttp.GetBaseUri(), m_Namespace, m_Bucket, m_BuildId, + RawHash, Ex.what()); } return Result; } - virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes) override + virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes, ESourceMode SourceMode) override { LoadAttachmentsResult Result; Stopwatch Timer; @@ -614,7 +682,7 @@ public: std::vector<IoHash> AttachmentsLeftToFind = RawHashes; - if (m_BuildCacheStorage) + if (m_BuildCacheStorage && SourceMode != ESourceMode::kHostOnly) { std::vector<BuildStorageCache::BlobExistsResult> ExistCheck = m_BuildCacheStorage->BlobsExists(m_BuildId, RawHashes); if (ExistCheck.size() == RawHashes.size()) @@ -648,7 +716,7 @@ public: for (const IoHash& Hash : AttachmentsLeftToFind) { - LoadAttachmentResult ChunkResult = LoadAttachment(Hash, {}); + LoadAttachmentResult ChunkResult = LoadAttachment(Hash, SourceMode); if (ChunkResult.ErrorCode) { return LoadAttachmentsResult{ChunkResult}; diff --git a/src/zenremotestore/projectstore/fileremoteprojectstore.cpp b/src/zenremotestore/projectstore/fileremoteprojectstore.cpp index ec7fb7bbc..f950fd46c 100644 --- a/src/zenremotestore/projectstore/fileremoteprojectstore.cpp +++ b/src/zenremotestore/projectstore/fileremoteprojectstore.cpp @@ -228,28 +228,62 @@ public: return AttachmentExistsInCacheResult{Result{.ErrorCode = 0}, std::vector<bool>(RawHashes.size(), false)}; } - virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash, const AttachmentRange& Range) override + virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash, ESourceMode SourceMode) override { - Stopwatch Timer; - LoadAttachmentResult Result; - std::filesystem::path ChunkPath = GetAttachmentPath(RawHash); - if (!IsFile(ChunkPath)) + Stopwatch Timer; + LoadAttachmentResult Result; + if (SourceMode != ESourceMode::kCacheOnly) { - Result.ErrorCode = gsl::narrow<int>(HttpResponseCode::NotFound); - Result.Reason = fmt::format("Failed loading oplog attachment from '{}'. Reason: 'The file does not exist'", ChunkPath.string()); - Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; - return Result; + std::filesystem::path ChunkPath = GetAttachmentPath(RawHash); + if (!IsFile(ChunkPath)) + { + Result.ErrorCode = gsl::narrow<int>(HttpResponseCode::NotFound); + Result.Reason = + fmt::format("Failed loading oplog attachment from '{}'. Reason: 'The file does not exist'", ChunkPath.string()); + Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; + return Result; + } + { + BasicFile ChunkFile; + ChunkFile.Open(ChunkPath, BasicFile::Mode::kRead); + Result.Bytes = ChunkFile.ReadAll(); + } } + AddStats(0, Result.Bytes.GetSize(), Timer.GetElapsedTimeUs() * 1000); + Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; + return Result; + } + + virtual LoadAttachmentRangesResult LoadAttachmentRanges(const IoHash& RawHash, + std::span<const std::pair<uint64_t, uint64_t>> Ranges, + ESourceMode SourceMode) override + { + Stopwatch Timer; + LoadAttachmentRangesResult Result; + if (SourceMode != ESourceMode::kCacheOnly) { - BasicFile ChunkFile; - ChunkFile.Open(ChunkPath, BasicFile::Mode::kRead); - if (Range) + std::filesystem::path ChunkPath = GetAttachmentPath(RawHash); + if (!IsFile(ChunkPath)) { - Result.Bytes = ChunkFile.ReadRange(Range.Offset, Range.Bytes); + Result.ErrorCode = gsl::narrow<int>(HttpResponseCode::NotFound); + Result.Reason = + fmt::format("Failed loading oplog attachment from '{}'. Reason: 'The file does not exist'", ChunkPath.string()); + Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; + return Result; } - else { - Result.Bytes = ChunkFile.ReadAll(); + BasicFile ChunkFile; + ChunkFile.Open(ChunkPath, BasicFile::Mode::kRead); + + uint64_t Start = Ranges.front().first; + uint64_t Length = Ranges.back().first + Ranges.back().second - Ranges.front().first; + + Result.Bytes = ChunkFile.ReadRange(Start, Length); + Result.Ranges.reserve(Ranges.size()); + for (const std::pair<uint64_t, uint64_t>& Range : Ranges) + { + Result.Ranges.push_back(std::make_pair(Range.first - Start, Range.second)); + } } } AddStats(0, Result.Bytes.GetSize(), Timer.GetElapsedTimeUs() * 1000); @@ -257,13 +291,13 @@ public: return Result; } - virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes) override + virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes, ESourceMode SourceMode) override { Stopwatch Timer; LoadAttachmentsResult Result; for (const IoHash& Hash : RawHashes) { - LoadAttachmentResult ChunkResult = LoadAttachment(Hash, {}); + LoadAttachmentResult ChunkResult = LoadAttachment(Hash, SourceMode); if (ChunkResult.ErrorCode) { ChunkResult.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; diff --git a/src/zenremotestore/projectstore/jupiterremoteprojectstore.cpp b/src/zenremotestore/projectstore/jupiterremoteprojectstore.cpp index f8179831c..514484f30 100644 --- a/src/zenremotestore/projectstore/jupiterremoteprojectstore.cpp +++ b/src/zenremotestore/projectstore/jupiterremoteprojectstore.cpp @@ -223,34 +223,62 @@ public: return AttachmentExistsInCacheResult{Result{.ErrorCode = 0}, std::vector<bool>(RawHashes.size(), false)}; } - virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash, const AttachmentRange& Range) override + virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash, ESourceMode SourceMode) override { - JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect); - JupiterResult GetResult = Session.GetCompressedBlob(m_Namespace, RawHash, m_TempFilePath); - AddStats(GetResult); - - LoadAttachmentResult Result{ConvertResult(GetResult), std::move(GetResult.Response)}; - if (GetResult.ErrorCode) + LoadAttachmentResult Result; + if (SourceMode != ESourceMode::kCacheOnly) { - Result.Reason = fmt::format("Failed fetching oplog attachment from {}/{}/{}. Reason: '{}'", - m_JupiterClient->ServiceUrl(), - m_Namespace, - RawHash, - Result.Reason); + JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect); + JupiterResult GetResult = Session.GetCompressedBlob(m_Namespace, RawHash, m_TempFilePath); + AddStats(GetResult); + + Result = {ConvertResult(GetResult), std::move(GetResult.Response)}; + if (GetResult.ErrorCode) + { + Result.Reason = fmt::format("Failed fetching oplog attachment from {}/{}/{}. Reason: '{}'", + m_JupiterClient->ServiceUrl(), + m_Namespace, + RawHash, + Result.Reason); + } } - if (!Result.ErrorCode && Range) + return Result; + } + + virtual LoadAttachmentRangesResult LoadAttachmentRanges(const IoHash& RawHash, + std::span<const std::pair<uint64_t, uint64_t>> Ranges, + ESourceMode SourceMode) override + { + LoadAttachmentRangesResult Result; + if (SourceMode != ESourceMode::kCacheOnly) { - Result.Bytes = IoBuffer(Result.Bytes, Range.Offset, Range.Bytes); + JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect); + JupiterResult GetResult = Session.GetCompressedBlob(m_Namespace, RawHash, m_TempFilePath); + AddStats(GetResult); + + Result = LoadAttachmentRangesResult{ConvertResult(GetResult), std::move(GetResult.Response)}; + if (GetResult.ErrorCode) + { + Result.Reason = fmt::format("Failed fetching oplog attachment from {}/{}/{}. Reason: '{}'", + m_JupiterClient->ServiceUrl(), + m_Namespace, + RawHash, + Result.Reason); + } + else + { + Result.Ranges = std::vector<std::pair<uint64_t, uint64_t>>(Ranges.begin(), Ranges.end()); + } } return Result; } - virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes) override + virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes, ESourceMode SourceMode) override { LoadAttachmentsResult Result; for (const IoHash& Hash : RawHashes) { - LoadAttachmentResult ChunkResult = LoadAttachment(Hash, {}); + LoadAttachmentResult ChunkResult = LoadAttachment(Hash, SourceMode); if (ChunkResult.ErrorCode) { return LoadAttachmentsResult{ChunkResult}; diff --git a/src/zenremotestore/projectstore/remoteprojectstore.cpp b/src/zenremotestore/projectstore/remoteprojectstore.cpp index 2a9da6f58..1882f599a 100644 --- a/src/zenremotestore/projectstore/remoteprojectstore.cpp +++ b/src/zenremotestore/projectstore/remoteprojectstore.cpp @@ -339,9 +339,10 @@ namespace remotestore_impl { uint64_t ChunkSize = It.second.GetCompressedSize(); Info.AttachmentBytesDownloaded.fetch_add(ChunkSize); } - ZEN_INFO("Loaded {} bulk attachments in {}", - Chunks.size(), - NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000))); + remotestore_impl::ReportMessage(OptionalContext, + fmt::format("Loaded {} bulk attachments in {}", + Chunks.size(), + NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)))); if (RemoteResult.IsError()) { return; @@ -446,7 +447,7 @@ namespace remotestore_impl { { uint64_t Unset = (std::uint64_t)-1; DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs()); - RemoteProjectStore::LoadAttachmentResult BlockResult = RemoteStore.LoadAttachment(BlockHash, {}); + RemoteProjectStore::LoadAttachmentResult BlockResult = RemoteStore.LoadAttachment(BlockHash); if (BlockResult.ErrorCode) { ReportMessage(OptionalContext, @@ -506,50 +507,100 @@ namespace remotestore_impl { IoHash RawHash; uint64_t RawSize; CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Bytes), RawHash, RawSize); + + std::string ErrorString; + if (!Compressed) { - if (RetriesLeft > 0) + ErrorString = + fmt::format("Block attachment {} is malformed, can't parse as compressed binary", BlockHash); + } + else if (RawHash != BlockHash) + { + ErrorString = fmt::format("Block attachment {} has mismatching raw hash ({})", BlockHash, RawHash); + } + else if (CompositeBuffer BlockPayload = Compressed.DecompressToComposite(); !BlockPayload) + { + ErrorString = fmt::format("Block attachment {} is malformed, can't decompress payload", BlockHash); + } + else + { + uint64_t PotentialSize = 0; + uint64_t UsedSize = 0; + uint64_t BlockSize = BlockPayload.GetSize(); + + uint64_t BlockHeaderSize = 0; + + bool StoreChunksOK = IterateChunkBlock( + BlockPayload.Flatten(), + [&AllNeededPartialChunkHashesLookup, + &ChunkDownloadedFlags, + &WriteAttachmentBuffers, + &WriteRawHashes, + &Info, + &PotentialSize](CompressedBuffer&& Chunk, const IoHash& AttachmentRawHash) { + auto ChunkIndexIt = AllNeededPartialChunkHashesLookup.find(AttachmentRawHash); + if (ChunkIndexIt != AllNeededPartialChunkHashesLookup.end()) + { + bool Expected = false; + if (ChunkDownloadedFlags[ChunkIndexIt->second].compare_exchange_strong(Expected, true)) + { + WriteAttachmentBuffers.emplace_back(Chunk.GetCompressed().Flatten().AsIoBuffer()); + IoHash RawHash; + uint64_t RawSize; + ZEN_ASSERT(CompressedBuffer::ValidateCompressedHeader( + WriteAttachmentBuffers.back(), + RawHash, + RawSize, + /*OutOptionalTotalCompressedSize*/ nullptr)); + ZEN_ASSERT(RawHash == AttachmentRawHash); + WriteRawHashes.emplace_back(AttachmentRawHash); + PotentialSize += WriteAttachmentBuffers.back().GetSize(); + } + } + }, + BlockHeaderSize); + + if (!StoreChunksOK) { - ReportMessage( - OptionalContext, - fmt::format( - "Block attachment {} is malformed, can't parse as compressed binary, retrying download", - BlockHash)); - return DownloadAndSaveBlock(ChunkStore, - RemoteStore, - IgnoreMissingAttachments, - OptionalContext, - NetworkWorkerPool, - WorkerPool, - AttachmentsDownloadLatch, - AttachmentsWriteLatch, - RemoteResult, - Info, - LoadAttachmentsTimer, - DownloadStartMS, - BlockHash, - AllNeededPartialChunkHashesLookup, - ChunkDownloadedFlags, - RetriesLeft - 1); + ErrorString = fmt::format("Invalid format for block {}", BlockHash); + } + else + { + if (!WriteAttachmentBuffers.empty()) + { + std::vector<CidStore::InsertResult> Results = + ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes); + for (size_t Index = 0; Index < Results.size(); Index++) + { + const CidStore::InsertResult& Result = Results[Index]; + if (Result.New) + { + Info.AttachmentBytesStored.fetch_add(WriteAttachmentBuffers[Index].GetSize()); + Info.AttachmentsStored.fetch_add(1); + UsedSize += WriteAttachmentBuffers[Index].GetSize(); + } + } + if (UsedSize < BlockSize) + { + ZEN_DEBUG("Used {} (skipping {}) out of {} for block {} ({} %) (use of matching {}%)", + NiceBytes(UsedSize), + NiceBytes(BlockSize - UsedSize), + NiceBytes(BlockSize), + BlockHash, + (100 * UsedSize) / BlockSize, + PotentialSize > 0 ? (UsedSize * 100) / PotentialSize : 0); + } + } } - ReportMessage( - OptionalContext, - fmt::format("Block attachment {} is malformed, can't parse as compressed binary", BlockHash)); - RemoteResult.SetError( - gsl::narrow<int32_t>(HttpResponseCode::InternalServerError), - fmt::format("Block attachment {} is malformed, can't parse as compressed binary", BlockHash), - {}); - return; } - CompositeBuffer BlockPayload = Compressed.DecompressToComposite(); - if (!BlockPayload) + + if (!ErrorString.empty()) { if (RetriesLeft > 0) { - ReportMessage( - OptionalContext, - fmt::format("Block attachment {} is malformed, can't decompress payload, retrying download", - BlockHash)); + ReportMessage(OptionalContext, fmt::format("{}, retrying download", ErrorString)); + return DownloadAndSaveBlock(ChunkStore, RemoteStore, IgnoreMissingAttachments, @@ -567,94 +618,12 @@ namespace remotestore_impl { ChunkDownloadedFlags, RetriesLeft - 1); } - ReportMessage(OptionalContext, - fmt::format("Block attachment {} is malformed, can't decompress payload", BlockHash)); - RemoteResult.SetError( - gsl::narrow<int32_t>(HttpResponseCode::InternalServerError), - fmt::format("Block attachment {} is malformed, can't decompress payload", BlockHash), - {}); - return; - } - if (RawHash != BlockHash) - { - ReportMessage(OptionalContext, - fmt::format("Block attachment {} has mismatching raw hash ({})", BlockHash, RawHash)); - RemoteResult.SetError( - gsl::narrow<int32_t>(HttpResponseCode::InternalServerError), - fmt::format("Block attachment {} has mismatching raw hash ({})", BlockHash, RawHash), - {}); - return; - } - - uint64_t PotentialSize = 0; - uint64_t UsedSize = 0; - uint64_t BlockSize = BlockPayload.GetSize(); - - uint64_t BlockHeaderSize = 0; - - bool StoreChunksOK = IterateChunkBlock( - BlockPayload.Flatten(), - [&AllNeededPartialChunkHashesLookup, - &ChunkDownloadedFlags, - &WriteAttachmentBuffers, - &WriteRawHashes, - &Info, - &PotentialSize](CompressedBuffer&& Chunk, const IoHash& AttachmentRawHash) { - auto ChunkIndexIt = AllNeededPartialChunkHashesLookup.find(AttachmentRawHash); - if (ChunkIndexIt != AllNeededPartialChunkHashesLookup.end()) - { - bool Expected = false; - if (ChunkDownloadedFlags[ChunkIndexIt->second].compare_exchange_strong(Expected, true)) - { - WriteAttachmentBuffers.emplace_back(Chunk.GetCompressed().Flatten().AsIoBuffer()); - IoHash RawHash; - uint64_t RawSize; - ZEN_ASSERT( - CompressedBuffer::ValidateCompressedHeader(WriteAttachmentBuffers.back(), - RawHash, - RawSize, - /*OutOptionalTotalCompressedSize*/ nullptr)); - ZEN_ASSERT(RawHash == AttachmentRawHash); - WriteRawHashes.emplace_back(AttachmentRawHash); - PotentialSize += WriteAttachmentBuffers.back().GetSize(); - } - } - }, - BlockHeaderSize); - - if (!StoreChunksOK) - { - ReportMessage(OptionalContext, - fmt::format("Block attachment {} has invalid format ({}): {}", - BlockHash, - RemoteResult.GetError(), - RemoteResult.GetErrorReason())); - RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::InternalServerError), - fmt::format("Invalid format for block {}", BlockHash), - {}); - return; - } - - if (!WriteAttachmentBuffers.empty()) - { - auto Results = ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes); - for (size_t Index = 0; Index < Results.size(); Index++) + else { - const auto& Result = Results[Index]; - if (Result.New) - { - Info.AttachmentBytesStored.fetch_add(WriteAttachmentBuffers[Index].GetSize()); - Info.AttachmentsStored.fetch_add(1); - UsedSize += WriteAttachmentBuffers[Index].GetSize(); - } + ReportMessage(OptionalContext, ErrorString); + RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::InternalServerError), ErrorString, {}); + return; } - ZEN_DEBUG("Used {} (matching {}) out of {} for block {} ({} %) (use of matching {}%)", - NiceBytes(UsedSize), - NiceBytes(PotentialSize), - NiceBytes(BlockSize), - BlockHash, - (100 * UsedSize) / BlockSize, - PotentialSize > 0 ? (UsedSize * 100) / PotentialSize : 0); } } catch (const std::exception& Ex) @@ -676,6 +645,119 @@ namespace remotestore_impl { WorkerThreadPool::EMode::EnableBacklog); }; + bool DownloadPartialBlock(RemoteProjectStore& RemoteStore, + bool IgnoreMissingAttachments, + JobContext* OptionalContext, + AsyncRemoteResult& RemoteResult, + DownloadInfo& Info, + double& DownloadTimeSeconds, + const ChunkBlockDescription& BlockDescription, + bool BlockExistsInCache, + std::span<const ChunkBlockAnalyser::BlockRangeDescriptor> BlockRangeDescriptors, + size_t BlockRangeIndexStart, + size_t BlockRangeCount, + std::function<void(IoBuffer&& Buffer, + size_t BlockRangeStartIndex, + std::span<const std::pair<uint64_t, uint64_t>> OffsetAndLengths)>&& OnDownloaded) + { + std::vector<std::pair<uint64_t, uint64_t>> Ranges; + Ranges.reserve(BlockRangeDescriptors.size()); + for (size_t BlockRangeIndex = BlockRangeIndexStart; BlockRangeIndex < BlockRangeIndexStart + BlockRangeCount; BlockRangeIndex++) + { + const ChunkBlockAnalyser::BlockRangeDescriptor& BlockRange = BlockRangeDescriptors[BlockRangeIndex]; + Ranges.push_back(std::make_pair(BlockRange.RangeStart, BlockRange.RangeLength)); + } + + if (BlockExistsInCache) + { + RemoteProjectStore::LoadAttachmentRangesResult BlockResult = + RemoteStore.LoadAttachmentRanges(BlockDescription.BlockHash, Ranges, RemoteProjectStore::ESourceMode::kCacheOnly); + DownloadTimeSeconds += BlockResult.ElapsedSeconds; + if (RemoteResult.IsError()) + { + return false; + } + if (!BlockResult.ErrorCode && BlockResult.Bytes) + { + if (BlockResult.Ranges.size() != Ranges.size()) + { + throw std::runtime_error(fmt::format("Fetching {} ranges from {} resulted in {} ranges", + Ranges.size(), + BlockDescription.BlockHash, + BlockResult.Ranges.size())); + } + OnDownloaded(std::move(BlockResult.Bytes), BlockRangeIndexStart, BlockResult.Ranges); + return true; + } + } + + const size_t MaxRangesPerRequestToJupiter = RemoteProjectStore::MaxRangeCountPerRequest; + + size_t SubBlockRangeCount = BlockRangeCount; + size_t SubRangeCountComplete = 0; + std::span<const std::pair<uint64_t, uint64_t>> RangesSpan(Ranges); + while (SubRangeCountComplete < SubBlockRangeCount) + { + if (RemoteResult.IsError()) + { + break; + } + size_t SubRangeCount = Min(BlockRangeCount - SubRangeCountComplete, MaxRangesPerRequestToJupiter); + size_t SubRangeStartIndex = BlockRangeIndexStart + SubRangeCountComplete; + + auto SubRanges = RangesSpan.subspan(SubRangeCountComplete, SubRangeCount); + + RemoteProjectStore::LoadAttachmentRangesResult BlockResult = + RemoteStore.LoadAttachmentRanges(BlockDescription.BlockHash, SubRanges, RemoteProjectStore::ESourceMode::kHostOnly); + DownloadTimeSeconds += BlockResult.ElapsedSeconds; + if (RemoteResult.IsError()) + { + return false; + } + if (BlockResult.ErrorCode || !BlockResult.Bytes) + { + ReportMessage(OptionalContext, + fmt::format("Failed to download {} ranges from block attachment '{}' ({}): {}", + SubRanges.size(), + BlockDescription.BlockHash, + BlockResult.ErrorCode, + BlockResult.Reason)); + Info.MissingAttachmentCount.fetch_add(1); + if (!IgnoreMissingAttachments) + { + RemoteResult.SetError(BlockResult.ErrorCode, BlockResult.Reason, BlockResult.Text); + return false; + } + } + else + { + if (BlockResult.Ranges.empty()) + { + // Jupiter will ignore the ranges and send the whole payload if it fetches the payload from S3 + // Use the whole payload for the remaining ranges + SubRangeCount = Ranges.size() - SubRangeCountComplete; + OnDownloaded(std::move(BlockResult.Bytes), + SubRangeStartIndex, + RangesSpan.subspan(SubRangeCountComplete, SubRangeCount)); + } + else + { + if (BlockResult.Ranges.size() != SubRanges.size()) + { + throw std::runtime_error(fmt::format("Fetching {} ranges from {} resulted in {} ranges", + SubRanges.size(), + BlockDescription.BlockHash, + BlockResult.Ranges.size())); + } + OnDownloaded(std::move(BlockResult.Bytes), SubRangeStartIndex, BlockResult.Ranges); + } + } + + SubRangeCountComplete += SubRangeCount; + } + return true; + } + void DownloadAndSavePartialBlock(CidStore& ChunkStore, RemoteProjectStore& RemoteStore, bool IgnoreMissingAttachments, @@ -689,6 +771,7 @@ namespace remotestore_impl { Stopwatch& LoadAttachmentsTimer, std::atomic_uint64_t& DownloadStartMS, const ChunkBlockDescription& BlockDescription, + bool BlockExistsInCache, std::span<const ChunkBlockAnalyser::BlockRangeDescriptor> BlockRangeDescriptors, size_t BlockRangeIndexStart, size_t BlockRangeCount, @@ -710,13 +793,14 @@ namespace remotestore_impl { &DownloadStartMS, IgnoreMissingAttachments, OptionalContext, - RetriesLeft, BlockDescription, + BlockExistsInCache, BlockRangeDescriptors, BlockRangeIndexStart, BlockRangeCount, &AllNeededPartialChunkHashesLookup, - ChunkDownloadedFlags]() { + ChunkDownloadedFlags, + RetriesLeft]() { ZEN_TRACE_CPU("DownloadBlockRanges"); auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); }); @@ -728,230 +812,240 @@ namespace remotestore_impl { double DownloadElapsedSeconds = 0; uint64_t DownloadedBytes = 0; - for (size_t BlockRangeIndex = BlockRangeIndexStart; BlockRangeIndex < BlockRangeIndexStart + BlockRangeCount; - BlockRangeIndex++) - { - if (RemoteResult.IsError()) - { - return; - } - - const ChunkBlockAnalyser::BlockRangeDescriptor& BlockRange = BlockRangeDescriptors[BlockRangeIndex]; + bool Success = DownloadPartialBlock( + RemoteStore, + IgnoreMissingAttachments, + OptionalContext, + RemoteResult, + Info, + DownloadElapsedSeconds, + BlockDescription, + BlockExistsInCache, + BlockRangeDescriptors, + BlockRangeIndexStart, + BlockRangeCount, + [&](IoBuffer&& Buffer, + size_t BlockRangeStartIndex, + std::span<const std::pair<uint64_t, uint64_t>> OffsetAndLengths) { + uint64_t BlockPartSize = Buffer.GetSize(); + DownloadedBytes += BlockPartSize; + + Info.AttachmentBlockRangeBytesDownloaded.fetch_add(BlockPartSize); + Info.AttachmentBlocksRangesDownloaded++; + + AttachmentsWriteLatch.AddCount(1); + WorkerPool.ScheduleWork( + [&AttachmentsWriteLatch, + &ChunkStore, + &RemoteStore, + &NetworkWorkerPool, + &WorkerPool, + &AttachmentsDownloadLatch, + &RemoteResult, + &Info, + &LoadAttachmentsTimer, + &DownloadStartMS, + IgnoreMissingAttachments, + OptionalContext, + BlockDescription, + BlockExistsInCache, + BlockRangeDescriptors, + BlockRangeStartIndex, + &AllNeededPartialChunkHashesLookup, + ChunkDownloadedFlags, + RetriesLeft, + BlockPayload = std::move(Buffer), + OffsetAndLengths = + std::vector<std::pair<uint64_t, uint64_t>>(OffsetAndLengths.begin(), OffsetAndLengths.end())]() { + auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); }); + try + { + ZEN_ASSERT(BlockPayload.Size() > 0); - RemoteProjectStore::LoadAttachmentResult BlockResult = - RemoteStore.LoadAttachment(BlockDescription.BlockHash, - {.Offset = BlockRange.RangeStart, .Bytes = BlockRange.RangeLength}); - if (BlockResult.ErrorCode) - { - ReportMessage(OptionalContext, - fmt::format("Failed to download block attachment '{}' range {},{} ({}): {}", - BlockDescription.BlockHash, - BlockRange.RangeStart, - BlockRange.RangeLength, - BlockResult.ErrorCode, - BlockResult.Reason)); - Info.MissingAttachmentCount.fetch_add(1); - if (!IgnoreMissingAttachments) - { - RemoteResult.SetError(BlockResult.ErrorCode, BlockResult.Reason, BlockResult.Text); - } - return; - } - if (RemoteResult.IsError()) - { - return; - } - uint64_t BlockPartSize = BlockResult.Bytes.GetSize(); - if (BlockPartSize != BlockRange.RangeLength) - { - std::string ErrorString = - fmt::format("Failed to download block attachment '{}' range {},{}, got {} bytes ({}): {}", - BlockDescription.BlockHash, - BlockRange.RangeStart, - BlockRange.RangeLength, - BlockPartSize, - RemoteResult.GetError(), - RemoteResult.GetErrorReason()); - - ReportMessage(OptionalContext, ErrorString); - Info.MissingAttachmentCount.fetch_add(1); - if (!IgnoreMissingAttachments) - { - RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::NotFound), - "Mismatching block part range received", - ErrorString); - } - return; - } - Info.AttachmentBlocksRangesDownloaded.fetch_add(1); + size_t RangeCount = OffsetAndLengths.size(); + for (size_t RangeOffset = 0; RangeOffset < RangeCount; RangeOffset++) + { + if (RemoteResult.IsError()) + { + return; + } - DownloadElapsedSeconds += BlockResult.ElapsedSeconds; - DownloadedBytes += BlockPartSize; + const ChunkBlockAnalyser::BlockRangeDescriptor& BlockRange = + BlockRangeDescriptors[BlockRangeStartIndex + RangeOffset]; + const std::pair<uint64_t, uint64_t>& OffsetAndLength = OffsetAndLengths[RangeOffset]; + IoBuffer BlockRangeBuffer(BlockPayload, OffsetAndLength.first, OffsetAndLength.second); - Info.AttachmentBlockRangeBytesDownloaded.fetch_add(BlockPartSize); + std::vector<IoBuffer> WriteAttachmentBuffers; + std::vector<IoHash> WriteRawHashes; - AttachmentsWriteLatch.AddCount(1); - WorkerPool.ScheduleWork( - [&AttachmentsDownloadLatch, - &AttachmentsWriteLatch, - &ChunkStore, - &RemoteStore, - &NetworkWorkerPool, - &WorkerPool, - &RemoteResult, - &Info, - &LoadAttachmentsTimer, - &DownloadStartMS, - IgnoreMissingAttachments, - OptionalContext, - RetriesLeft, - BlockDescription, - BlockRange, - &AllNeededPartialChunkHashesLookup, - ChunkDownloadedFlags, - BlockPayload = std::move(BlockResult.Bytes)]() { - auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); }); - if (RemoteResult.IsError()) - { - return; - } - try - { - ZEN_ASSERT(BlockPayload.Size() > 0); - std::vector<IoBuffer> WriteAttachmentBuffers; - std::vector<IoHash> WriteRawHashes; + uint64_t PotentialSize = 0; + uint64_t UsedSize = 0; + uint64_t BlockPartSize = BlockRangeBuffer.GetSize(); - uint64_t PotentialSize = 0; - uint64_t UsedSize = 0; - uint64_t BlockPartSize = BlockPayload.GetSize(); + uint32_t OffsetInBlock = 0; + for (uint32_t ChunkBlockIndex = BlockRange.ChunkBlockIndexStart; + ChunkBlockIndex < BlockRange.ChunkBlockIndexStart + BlockRange.ChunkBlockIndexCount; + ChunkBlockIndex++) + { + if (RemoteResult.IsError()) + { + break; + } - uint32_t OffsetInBlock = 0; - for (uint32_t ChunkBlockIndex = BlockRange.ChunkBlockIndexStart; - ChunkBlockIndex < BlockRange.ChunkBlockIndexStart + BlockRange.ChunkBlockIndexCount; - ChunkBlockIndex++) - { - const uint32_t ChunkCompressedSize = BlockDescription.ChunkCompressedLengths[ChunkBlockIndex]; - const IoHash& ChunkHash = BlockDescription.ChunkRawHashes[ChunkBlockIndex]; + const uint32_t ChunkCompressedSize = + BlockDescription.ChunkCompressedLengths[ChunkBlockIndex]; + const IoHash& ChunkHash = BlockDescription.ChunkRawHashes[ChunkBlockIndex]; - if (auto ChunkIndexIt = AllNeededPartialChunkHashesLookup.find(ChunkHash); - ChunkIndexIt != AllNeededPartialChunkHashesLookup.end()) - { - bool Expected = false; - if (ChunkDownloadedFlags[ChunkIndexIt->second].compare_exchange_strong(Expected, true)) - { - IoHash VerifyChunkHash; - uint64_t VerifyChunkSize; - CompressedBuffer CompressedChunk = CompressedBuffer::FromCompressed( - SharedBuffer(IoBuffer(BlockPayload, OffsetInBlock, ChunkCompressedSize)), - VerifyChunkHash, - VerifyChunkSize); - if (!CompressedChunk) + if (auto ChunkIndexIt = AllNeededPartialChunkHashesLookup.find(ChunkHash); + ChunkIndexIt != AllNeededPartialChunkHashesLookup.end()) { - std::string ErrorString = fmt::format( - "Chunk at {},{} in block attachment '{}' is not a valid compressed buffer", - OffsetInBlock, - ChunkCompressedSize, - BlockDescription.BlockHash); - ReportMessage(OptionalContext, ErrorString); - Info.MissingAttachmentCount.fetch_add(1); - if (!IgnoreMissingAttachments) + if (!ChunkDownloadedFlags[ChunkIndexIt->second]) { - RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::NotFound), - "Malformed chunk block", - ErrorString); + IoHash VerifyChunkHash; + uint64_t VerifyChunkSize; + CompressedBuffer CompressedChunk = CompressedBuffer::FromCompressed( + SharedBuffer(IoBuffer(BlockRangeBuffer, OffsetInBlock, ChunkCompressedSize)), + VerifyChunkHash, + VerifyChunkSize); + + std::string ErrorString; + + if (!CompressedChunk) + { + ErrorString = fmt::format( + "Chunk at {},{} in block attachment '{}' is not a valid compressed buffer", + OffsetInBlock, + ChunkCompressedSize, + BlockDescription.BlockHash); + } + else if (VerifyChunkHash != ChunkHash) + { + ErrorString = fmt::format( + "Chunk at {},{} in block attachment '{}' has mismatching hash, expected " + "{}, got {}", + OffsetInBlock, + ChunkCompressedSize, + BlockDescription.BlockHash, + ChunkHash, + VerifyChunkHash); + } + else if (VerifyChunkSize != BlockDescription.ChunkRawLengths[ChunkBlockIndex]) + { + ErrorString = fmt::format( + "Chunk at {},{} in block attachment '{}' has mismatching raw size, " + "expected {}, " + "got {}", + OffsetInBlock, + ChunkCompressedSize, + BlockDescription.BlockHash, + BlockDescription.ChunkRawLengths[ChunkBlockIndex], + VerifyChunkSize); + } + + if (!ErrorString.empty()) + { + if (RetriesLeft > 0) + { + ReportMessage(OptionalContext, + fmt::format("{}, retrying download", ErrorString)); + return DownloadAndSavePartialBlock(ChunkStore, + RemoteStore, + IgnoreMissingAttachments, + OptionalContext, + NetworkWorkerPool, + WorkerPool, + AttachmentsDownloadLatch, + AttachmentsWriteLatch, + RemoteResult, + Info, + LoadAttachmentsTimer, + DownloadStartMS, + BlockDescription, + BlockExistsInCache, + BlockRangeDescriptors, + BlockRangeStartIndex, + RangeCount, + AllNeededPartialChunkHashesLookup, + ChunkDownloadedFlags, + RetriesLeft - 1); + } + + ReportMessage(OptionalContext, ErrorString); + Info.MissingAttachmentCount.fetch_add(1); + if (!IgnoreMissingAttachments) + { + RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::NotFound), + "Malformed chunk block", + ErrorString); + } + } + else + { + bool Expected = false; + if (ChunkDownloadedFlags[ChunkIndexIt->second].compare_exchange_strong(Expected, + true)) + { + WriteAttachmentBuffers.emplace_back( + CompressedChunk.GetCompressed().Flatten().AsIoBuffer()); + WriteRawHashes.emplace_back(ChunkHash); + PotentialSize += WriteAttachmentBuffers.back().GetSize(); + } + } } - continue; } - if (VerifyChunkHash != ChunkHash) + OffsetInBlock += ChunkCompressedSize; + } + + if (!WriteAttachmentBuffers.empty()) + { + std::vector<CidStore::InsertResult> Results = + ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes); + for (size_t Index = 0; Index < Results.size(); Index++) { - std::string ErrorString = fmt::format( - "Chunk at {},{} in block attachment '{}' has mismatching hash, expected {}, got {}", - OffsetInBlock, - ChunkCompressedSize, - BlockDescription.BlockHash, - ChunkHash, - VerifyChunkHash); - ReportMessage(OptionalContext, ErrorString); - Info.MissingAttachmentCount.fetch_add(1); - if (!IgnoreMissingAttachments) + const CidStore::InsertResult& Result = Results[Index]; + if (Result.New) { - RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::NotFound), - "Malformed chunk block", - ErrorString); + Info.AttachmentBytesStored.fetch_add(WriteAttachmentBuffers[Index].GetSize()); + Info.AttachmentsStored.fetch_add(1); + UsedSize += WriteAttachmentBuffers[Index].GetSize(); } - continue; } - if (VerifyChunkSize != BlockDescription.ChunkRawLengths[ChunkBlockIndex]) + if (UsedSize < BlockPartSize) { - std::string ErrorString = fmt::format( - "Chunk at {},{} in block attachment '{}' has mismatching raw size, expected {}, " - "got {}", - OffsetInBlock, - ChunkCompressedSize, + ZEN_DEBUG( + "Used {} (skipping {}) out of {} for block {} range {}, {} ({} %) (use of matching " + "{}%)", + NiceBytes(UsedSize), + NiceBytes(BlockPartSize - UsedSize), + NiceBytes(BlockPartSize), BlockDescription.BlockHash, - BlockDescription.ChunkRawLengths[ChunkBlockIndex], - VerifyChunkSize); - ReportMessage(OptionalContext, ErrorString); - Info.MissingAttachmentCount.fetch_add(1); - if (!IgnoreMissingAttachments) - { - RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::NotFound), - "Malformed chunk block", - ErrorString); - } - continue; + BlockRange.RangeStart, + BlockRange.RangeLength, + (100 * UsedSize) / BlockPartSize, + PotentialSize > 0 ? (UsedSize * 100) / PotentialSize : 0); } - - WriteAttachmentBuffers.emplace_back(CompressedChunk.GetCompressed().Flatten().AsIoBuffer()); - WriteRawHashes.emplace_back(ChunkHash); - PotentialSize += WriteAttachmentBuffers.back().GetSize(); } } - OffsetInBlock += ChunkCompressedSize; } - - if (!WriteAttachmentBuffers.empty()) + catch (const std::exception& Ex) { - auto Results = ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes); - for (size_t Index = 0; Index < Results.size(); Index++) - { - const auto& Result = Results[Index]; - if (Result.New) - { - Info.AttachmentBytesStored.fetch_add(WriteAttachmentBuffers[Index].GetSize()); - Info.AttachmentsStored.fetch_add(1); - UsedSize += WriteAttachmentBuffers[Index].GetSize(); - } - } - ZEN_DEBUG("Used {} (matching {}) out of {} for block {} range {}, {} ({} %) (use of matching {}%)", - NiceBytes(UsedSize), - NiceBytes(PotentialSize), - NiceBytes(BlockPartSize), - BlockDescription.BlockHash, - BlockRange.RangeStart, - BlockRange.RangeLength, - (100 * UsedSize) / BlockPartSize, - PotentialSize > 0 ? (UsedSize * 100) / PotentialSize : 0); + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), + fmt::format("Failed saving {} ranges from block attachment {}", + OffsetAndLengths.size(), + BlockDescription.BlockHash), + Ex.what()); } - } - catch (const std::exception& Ex) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), - fmt::format("Failed save block attachment {} range {}, {}", - BlockDescription.BlockHash, - BlockRange.RangeStart, - BlockRange.RangeLength), - Ex.what()); - } - }, - WorkerThreadPool::EMode::EnableBacklog); + }, + WorkerThreadPool::EMode::EnableBacklog); + }); + if (Success) + { + ZEN_DEBUG("Loaded {} ranges from block attachment '{}' in {} ({})", + BlockRangeCount, + BlockDescription.BlockHash, + NiceTimeSpanMs(static_cast<uint64_t>(DownloadElapsedSeconds * 1000)), + NiceBytes(DownloadedBytes)); } - - ZEN_DEBUG("Loaded {} ranges from block attachment '{}' in {} ({})", - BlockRangeCount, - BlockDescription.BlockHash, - NiceTimeSpanMs(static_cast<uint64_t>(DownloadElapsedSeconds * 1000)), - NiceBytes(DownloadedBytes)); } catch (const std::exception& Ex) { @@ -1002,7 +1096,7 @@ namespace remotestore_impl { { uint64_t Unset = (std::uint64_t)-1; DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs()); - RemoteProjectStore::LoadAttachmentResult AttachmentResult = RemoteStore.LoadAttachment(RawHash, {}); + RemoteProjectStore::LoadAttachmentResult AttachmentResult = RemoteStore.LoadAttachment(RawHash); if (AttachmentResult.ErrorCode) { ReportMessage(OptionalContext, @@ -3115,6 +3209,12 @@ ParseOplogContainer( std::unordered_set<IoHash, IoHash::Hasher> NeededAttachments; { CbArrayView OpsArray = OutOplogSection["ops"sv].AsArrayView(); + + size_t OpCount = OpsArray.Num(); + size_t OpsCompleteCount = 0; + + remotestore_impl::ReportMessage(OptionalContext, fmt::format("Scanning {} ops for attachments", OpCount)); + for (CbFieldView OpEntry : OpsArray) { OpEntry.IterateAttachments([&](CbFieldView FieldView) { NeededAttachments.insert(FieldView.AsAttachment()); }); @@ -3124,6 +3224,16 @@ ParseOplogContainer( .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0, .Reason = "Operation cancelled"}; } + OpsCompleteCount++; + if ((OpsCompleteCount & 4095) == 0) + { + remotestore_impl::ReportProgress( + OptionalContext, + "Scanning oplog"sv, + fmt::format("{} attachments found, {} ops remaining...", NeededAttachments.size(), OpCount - OpsCompleteCount), + OpCount, + OpCount - OpsCompleteCount); + } } } { @@ -3151,13 +3261,27 @@ ParseOplogContainer( { ChunkedInfo Chunked = ReadChunkedInfo(ChunkedFileView); + size_t NeededChunkAttachmentCount = 0; + OnReferencedAttachments(Chunked.ChunkHashes); - NeededAttachments.insert(Chunked.ChunkHashes.begin(), Chunked.ChunkHashes.end()); + for (const IoHash& ChunkHash : Chunked.ChunkHashes) + { + if (!HasAttachment(ChunkHash)) + { + if (NeededAttachments.insert(ChunkHash).second) + { + NeededChunkAttachmentCount++; + } + } + } OnChunkedAttachment(Chunked); - ZEN_INFO("Requesting chunked attachment '{}' ({}) built from {} chunks", - Chunked.RawHash, - NiceBytes(Chunked.RawSize), - Chunked.ChunkHashes.size()); + + remotestore_impl::ReportMessage(OptionalContext, + fmt::format("Requesting chunked attachment '{}' ({}) built from {} chunks, need {} chunks", + Chunked.RawHash, + NiceBytes(Chunked.RawSize), + Chunked.ChunkHashes.size(), + NeededChunkAttachmentCount)); } } if (remotestore_impl::IsCancelled(OptionalContext)) @@ -3490,8 +3614,16 @@ LoadOplog(CidStore& ChunkStore, std::vector<bool> DownloadedViaLegacyChunkFlag(AllNeededChunkHashes.size(), false); ChunkBlockAnalyser::BlockResult PartialBlocksResult; + remotestore_impl::ReportMessage(OptionalContext, fmt::format("Fetching descriptions for {} blocks", BlockHashes.size())); + RemoteProjectStore::GetBlockDescriptionsResult BlockDescriptions = RemoteStore.GetBlockDescriptions(BlockHashes); - std::vector<IoHash> BlocksWithDescription; + + remotestore_impl::ReportMessage(OptionalContext, + fmt::format("GetBlockDescriptions took {}. Found {} blocks", + NiceTimeSpanMs(uint64_t(BlockDescriptions.ElapsedSeconds * 1000)), + BlockDescriptions.Blocks.size())); + + std::vector<IoHash> BlocksWithDescription; BlocksWithDescription.reserve(BlockDescriptions.Blocks.size()); for (const ChunkBlockDescription& BlockDescription : BlockDescriptions.Blocks) { @@ -3547,6 +3679,7 @@ LoadOplog(CidStore& ChunkStore, if (!AllNeededChunkHashes.empty()) { std::vector<ChunkBlockAnalyser::EPartialBlockDownloadMode> PartialBlockDownloadModes; + std::vector<bool> BlockExistsInCache; if (PartialBlockRequestMode == EPartialBlockRequestMode::Off) { @@ -3558,21 +3691,25 @@ LoadOplog(CidStore& ChunkStore, RemoteStore.AttachmentExistsInCache(BlocksWithDescription); if (CacheExistsResult.ErrorCode != 0 || CacheExistsResult.HasBody.size() != BlocksWithDescription.size()) { - CacheExistsResult.HasBody.resize(BlocksWithDescription.size(), false); + BlockExistsInCache.resize(BlocksWithDescription.size(), false); + } + else + { + BlockExistsInCache = std::move(CacheExistsResult.HasBody); } PartialBlockDownloadModes.reserve(BlocksWithDescription.size()); - for (bool ExistsInCache : CacheExistsResult.HasBody) + for (bool ExistsInCache : BlockExistsInCache) { if (PartialBlockRequestMode == EPartialBlockRequestMode::All) { - PartialBlockDownloadModes.push_back(ExistsInCache ? ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRangeHighSpeed + PartialBlockDownloadModes.push_back(ExistsInCache ? ChunkBlockAnalyser::EPartialBlockDownloadMode::Exact : ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRange); } else if (PartialBlockRequestMode == EPartialBlockRequestMode::ZenCacheOnly) { - PartialBlockDownloadModes.push_back(ExistsInCache ? ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRangeHighSpeed + PartialBlockDownloadModes.push_back(ExistsInCache ? ChunkBlockAnalyser::EPartialBlockDownloadMode::Exact : ChunkBlockAnalyser::EPartialBlockDownloadMode::Off); } else if (PartialBlockRequestMode == EPartialBlockRequestMode::Mixed) @@ -3584,13 +3721,14 @@ LoadOplog(CidStore& ChunkStore, } ZEN_ASSERT(PartialBlockDownloadModes.size() == BlocksWithDescription.size()); - - ChunkBlockAnalyser PartialAnalyser(*LogOutput, - BlockDescriptions.Blocks, - ChunkBlockAnalyser::Options{.IsQuiet = false, - .IsVerbose = false, - .HostLatencySec = HostLatencySec, - .HostHighSpeedLatencySec = CacheLatencySec}); + ChunkBlockAnalyser PartialAnalyser( + *LogOutput, + BlockDescriptions.Blocks, + ChunkBlockAnalyser::Options{.IsQuiet = false, + .IsVerbose = false, + .HostLatencySec = HostLatencySec, + .HostHighSpeedLatencySec = CacheLatencySec, + .HostMaxRangeCountPerRequest = RemoteProjectStore::MaxRangeCountPerRequest}); std::vector<ChunkBlockAnalyser::NeededBlock> NeededBlocks = PartialAnalyser.GetNeeded(AllNeededPartialChunkHashesLookup, @@ -3641,12 +3779,13 @@ LoadOplog(CidStore& ChunkStore, LoadAttachmentsTimer, DownloadStartMS, BlockDescriptions.Blocks[CurrentBlockRange.BlockIndex], + BlockExistsInCache[CurrentBlockRange.BlockIndex], PartialBlocksResult.BlockRanges, BlockRangeIndex, RangeCount, AllNeededPartialChunkHashesLookup, ChunkDownloadedFlags, - 3); + /* RetriesLeft*/ 3); BlockRangeIndex += RangeCount; } @@ -3668,12 +3807,23 @@ LoadOplog(CidStore& ChunkStore, { PartialTransferWallTimeMS += LoadAttachmentsTimer.GetElapsedTimeMs() - DownloadStartMS.load(); } - remotestore_impl::ReportProgress( - OptionalContext, - "Loading attachments"sv, - fmt::format("{} remaining. {}", Remaining, remotestore_impl::GetStats(RemoteStore.GetStats(), PartialTransferWallTimeMS)), - AttachmentCount.load(), - Remaining); + + uint64_t AttachmentsDownloaded = + Info.AttachmentBlocksDownloaded.load() + Info.AttachmentBlocksRangesDownloaded.load() + Info.AttachmentsDownloaded.load(); + uint64_t AttachmentBytesDownloaded = Info.AttachmentBlockBytesDownloaded.load() + Info.AttachmentBlockRangeBytesDownloaded.load() + + Info.AttachmentBytesDownloaded.load(); + + remotestore_impl::ReportProgress(OptionalContext, + "Loading attachments"sv, + fmt::format("{} ({}) downloaded, {} ({}) stored, {} remaining. {}", + AttachmentsDownloaded, + NiceBytes(AttachmentBytesDownloaded), + Info.AttachmentsStored.load(), + NiceBytes(Info.AttachmentBytesStored.load()), + Remaining, + remotestore_impl::GetStats(RemoteStore.GetStats(), PartialTransferWallTimeMS)), + AttachmentCount.load(), + Remaining); } if (DownloadStartMS != (uint64_t)-1) { @@ -3700,11 +3850,12 @@ LoadOplog(CidStore& ChunkStore, RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); } } - remotestore_impl::ReportProgress(OptionalContext, - "Writing attachments"sv, - fmt::format("{} remaining.", Remaining), - AttachmentCount.load(), - Remaining); + remotestore_impl::ReportProgress( + OptionalContext, + "Writing attachments"sv, + fmt::format("{} ({}), {} remaining.", Info.AttachmentsStored.load(), NiceBytes(Info.AttachmentBytesStored.load()), Remaining), + AttachmentCount.load(), + Remaining); } if (AttachmentCount.load() > 0) @@ -3867,18 +4018,20 @@ LoadOplog(CidStore& ChunkStore, TmpFile.Close(); TmpBuffer = IoBufferBuilder::MakeFromTemporaryFile(TempFileName); } + uint64_t TmpBufferSize = TmpBuffer.GetSize(); CidStore::InsertResult InsertResult = ChunkStore.AddChunk(TmpBuffer, Chunked.RawHash, CidStore::InsertMode::kMayBeMovedInPlace); if (InsertResult.New) { - Info.AttachmentBytesStored.fetch_add(TmpBuffer.GetSize()); + Info.AttachmentBytesStored.fetch_add(TmpBufferSize); Info.AttachmentsStored.fetch_add(1); } - ZEN_INFO("Dechunked attachment {} ({}) in {}", - Chunked.RawHash, - NiceBytes(Chunked.RawSize), - NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + remotestore_impl::ReportMessage(OptionalContext, + fmt::format("Dechunked attachment {} ({}) in {}", + Chunked.RawHash, + NiceBytes(Chunked.RawSize), + NiceTimeSpanMs(Timer.GetElapsedTimeMs()))); } catch (const std::exception& Ex) { diff --git a/src/zenremotestore/projectstore/zenremoteprojectstore.cpp b/src/zenremotestore/projectstore/zenremoteprojectstore.cpp index b4c1156ac..ef82c45e0 100644 --- a/src/zenremotestore/projectstore/zenremoteprojectstore.cpp +++ b/src/zenremotestore/projectstore/zenremoteprojectstore.cpp @@ -157,55 +157,59 @@ public: return Result; } - virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes) override + virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes, ESourceMode SourceMode) override { - std::string LoadRequest = fmt::format("/{}/oplog/{}/rpc"sv, m_Project, m_Oplog); - - CbObject Request; + LoadAttachmentsResult Result; + if (SourceMode != ESourceMode::kCacheOnly) { - CbObjectWriter RequestWriter; - RequestWriter.AddString("method"sv, "getchunks"sv); - RequestWriter.BeginObject("Request"sv); + std::string LoadRequest = fmt::format("/{}/oplog/{}/rpc"sv, m_Project, m_Oplog); + + CbObject Request; { - RequestWriter.BeginArray("Chunks"sv); + CbObjectWriter RequestWriter; + RequestWriter.AddString("method"sv, "getchunks"sv); + RequestWriter.BeginObject("Request"sv); { - for (const IoHash& RawHash : RawHashes) + RequestWriter.BeginArray("Chunks"sv); { - RequestWriter.BeginObject(); + for (const IoHash& RawHash : RawHashes) { - RequestWriter.AddHash("RawHash", RawHash); + RequestWriter.BeginObject(); + { + RequestWriter.AddHash("RawHash", RawHash); + } + RequestWriter.EndObject(); } - RequestWriter.EndObject(); } + RequestWriter.EndArray(); // "chunks" } - RequestWriter.EndArray(); // "chunks" + RequestWriter.EndObject(); + Request = RequestWriter.Save(); } - RequestWriter.EndObject(); - Request = RequestWriter.Save(); - } - HttpClient::Response Response = m_Client.Post(LoadRequest, Request, HttpClient::Accept(ZenContentType::kCbPackage)); - AddStats(Response); + HttpClient::Response Response = m_Client.Post(LoadRequest, Request, HttpClient::Accept(ZenContentType::kCbPackage)); + AddStats(Response); - LoadAttachmentsResult Result = LoadAttachmentsResult{ConvertResult(Response)}; - if (Result.ErrorCode) - { - Result.Reason = fmt::format("Failed fetching {} oplog attachments from {}/{}/{}. Reason: '{}'", - RawHashes.size(), - m_ProjectStoreUrl, - m_Project, - m_Oplog, - Result.Reason); - } - else - { - CbPackage Package = Response.AsPackage(); - std::span<const CbAttachment> Attachments = Package.GetAttachments(); - Result.Chunks.reserve(Attachments.size()); - for (const CbAttachment& Attachment : Attachments) + Result = LoadAttachmentsResult{ConvertResult(Response)}; + if (Result.ErrorCode) { - Result.Chunks.emplace_back( - std::pair<IoHash, CompressedBuffer>{Attachment.GetHash(), Attachment.AsCompressedBinary().MakeOwned()}); + Result.Reason = fmt::format("Failed fetching {} oplog attachments from {}/{}/{}. Reason: '{}'", + RawHashes.size(), + m_ProjectStoreUrl, + m_Project, + m_Oplog, + Result.Reason); + } + else + { + CbPackage Package = Response.AsPackage(); + std::span<const CbAttachment> Attachments = Package.GetAttachments(); + Result.Chunks.reserve(Attachments.size()); + for (const CbAttachment& Attachment : Attachments) + { + Result.Chunks.emplace_back( + std::pair<IoHash, CompressedBuffer>{Attachment.GetHash(), Attachment.AsCompressedBinary().MakeOwned()}); + } } } return Result; @@ -260,32 +264,59 @@ public: return AttachmentExistsInCacheResult{Result{.ErrorCode = 0}, std::vector<bool>(RawHashes.size(), false)}; } - virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash, const AttachmentRange& Range) override + virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash, ESourceMode SourceMode) override { - std::string LoadRequest = fmt::format("/{}/oplog/{}/{}"sv, m_Project, m_Oplog, RawHash); - HttpClient::Response Response = - m_Client.Download(LoadRequest, m_TempFilePath, HttpClient::Accept(ZenContentType::kCompressedBinary)); - AddStats(Response); - - LoadAttachmentResult Result = LoadAttachmentResult{ConvertResult(Response)}; - if (Result.ErrorCode) + LoadAttachmentResult Result; + if (SourceMode != ESourceMode::kCacheOnly) { - Result.Reason = fmt::format("Failed fetching oplog attachment from {}/{}/{}/{}. Reason: '{}'", - m_ProjectStoreUrl, - m_Project, - m_Oplog, - RawHash, - Result.Reason); - } - if (!Result.ErrorCode && Range) - { - Result.Bytes = IoBuffer(Response.ResponsePayload, Range.Offset, Range.Bytes); + std::string LoadRequest = fmt::format("/{}/oplog/{}/{}"sv, m_Project, m_Oplog, RawHash); + HttpClient::Response Response = + m_Client.Download(LoadRequest, m_TempFilePath, HttpClient::Accept(ZenContentType::kCompressedBinary)); + AddStats(Response); + + Result = LoadAttachmentResult{ConvertResult(Response)}; + if (Result.ErrorCode) + { + Result.Reason = fmt::format("Failed fetching oplog attachment from {}/{}/{}/{}. Reason: '{}'", + m_ProjectStoreUrl, + m_Project, + m_Oplog, + RawHash, + Result.Reason); + } + Result.Bytes = Response.ResponsePayload; + Result.Bytes.MakeOwned(); } - else + return Result; + } + + virtual LoadAttachmentRangesResult LoadAttachmentRanges(const IoHash& RawHash, + std::span<const std::pair<uint64_t, uint64_t>> Ranges, + ESourceMode SourceMode) override + { + LoadAttachmentRangesResult Result; + if (SourceMode != ESourceMode::kCacheOnly) { - Result.Bytes = Response.ResponsePayload; + std::string LoadRequest = fmt::format("/{}/oplog/{}/{}"sv, m_Project, m_Oplog, RawHash); + HttpClient::Response Response = + m_Client.Download(LoadRequest, m_TempFilePath, HttpClient::Accept(ZenContentType::kCompressedBinary)); + AddStats(Response); + + Result = LoadAttachmentRangesResult{ConvertResult(Response)}; + if (Result.ErrorCode) + { + Result.Reason = fmt::format("Failed fetching oplog attachment from {}/{}/{}/{}. Reason: '{}'", + m_ProjectStoreUrl, + m_Project, + m_Oplog, + RawHash, + Result.Reason); + } + else + { + Result.Ranges = std::vector<std::pair<uint64_t, uint64_t>>(Ranges.begin(), Ranges.end()); + } } - Result.Bytes.MakeOwned(); return Result; } diff --git a/src/zenserver/storage/buildstore/httpbuildstore.cpp b/src/zenserver/storage/buildstore/httpbuildstore.cpp index 6ada085a5..459e044eb 100644 --- a/src/zenserver/storage/buildstore/httpbuildstore.cpp +++ b/src/zenserver/storage/buildstore/httpbuildstore.cpp @@ -233,16 +233,21 @@ HttpBuildStoreService::GetBlobRequest(HttpRouterRequest& Req) { const uint64_t MaxBlobSize = Range.first < BlobSize ? BlobSize - Range.first : 0; const uint64_t RangeSize = Min(Range.second, MaxBlobSize); - if (Range.first + RangeSize <= BlobSize) + Writer.BeginObject(); { - RangeBuffers.push_back(IoBuffer(Blob, Range.first, RangeSize)); - Writer.BeginObject(); + if (Range.first + RangeSize <= BlobSize) { + RangeBuffers.push_back(IoBuffer(Blob, Range.first, RangeSize)); Writer.AddInteger("offset"sv, Range.first); Writer.AddInteger("length"sv, RangeSize); } - Writer.EndObject(); + else + { + Writer.AddInteger("offset"sv, Range.first); + Writer.AddInteger("length"sv, 0); + } } + Writer.EndObject(); } Writer.EndArray(); @@ -262,7 +267,16 @@ HttpBuildStoreService::GetBlobRequest(HttpRouterRequest& Req) } else { - ZEN_ASSERT(OffsetAndLengthPairs.size() == 1); + if (OffsetAndLengthPairs.size() != 1) + { + // Only a single http range is supported, we have limited support for http multirange responses + m_BuildStoreStats.BadRequestCount++; + return ServerRequest.WriteResponse( + HttpResponseCode::BadRequest, + HttpContentType::kText, + fmt::format("Multiple ranges in blob request is only supported for {} accept type", ToString(HttpContentType::kCbPackage))); + } + const std::pair<uint64_t, uint64_t>& OffsetAndLength = OffsetAndLengthPairs.front(); const uint64_t BlobSize = Blob.GetSize(); const uint64_t MaxBlobSize = OffsetAndLength.first < BlobSize ? BlobSize - OffsetAndLength.first : 0; |