diff options
Diffstat (limited to 'src/zenremotestore/projectstore/remoteprojectstore.cpp')
| -rw-r--r-- | src/zenremotestore/projectstore/remoteprojectstore.cpp | 2522 |
1 files changed, 1943 insertions, 579 deletions
diff --git a/src/zenremotestore/projectstore/remoteprojectstore.cpp b/src/zenremotestore/projectstore/remoteprojectstore.cpp index 8be8eb0df..247bd6cb9 100644 --- a/src/zenremotestore/projectstore/remoteprojectstore.cpp +++ b/src/zenremotestore/projectstore/remoteprojectstore.cpp @@ -14,6 +14,8 @@ #include <zencore/trace.h> #include <zencore/workthreadpool.h> #include <zenhttp/httpcommon.h> +#include <zenremotestore/builds/buildstoragecache.h> +#include <zenremotestore/chunking/chunkedcontent.h> #include <zenremotestore/chunking/chunkedfile.h> #include <zenremotestore/operationlogoutput.h> #include <zenstore/cidstore.h> @@ -123,14 +125,17 @@ namespace remotestore_impl { return OptionalContext->IsCancelled(); } - std::string GetStats(const RemoteProjectStore::Stats& Stats, uint64_t ElapsedWallTimeMS) + std::string GetStats(const RemoteProjectStore::Stats& Stats, + const BuildStorageCache::Statistics* OptionalCacheStats, + uint64_t ElapsedWallTimeMS) { - return fmt::format( - "Sent: {} ({}bits/s) Recv: {} ({}bits/s)", - NiceBytes(Stats.m_SentBytes), - NiceNum(ElapsedWallTimeMS > 0u ? static_cast<uint64_t>((Stats.m_SentBytes * 8 * 1000) / ElapsedWallTimeMS) : 0u), - NiceBytes(Stats.m_ReceivedBytes), - NiceNum(ElapsedWallTimeMS > 0u ? static_cast<uint64_t>((Stats.m_ReceivedBytes * 8 * 1000) / ElapsedWallTimeMS) : 0u)); + uint64_t SentBytes = Stats.m_SentBytes + (OptionalCacheStats ? OptionalCacheStats->TotalBytesWritten.load() : 0); + uint64_t ReceivedBytes = Stats.m_ReceivedBytes + (OptionalCacheStats ? OptionalCacheStats->TotalBytesRead.load() : 0); + return fmt::format("Sent: {} ({}bits/s) Recv: {} ({}bits/s)", + NiceBytes(SentBytes), + NiceNum(ElapsedWallTimeMS > 0u ? static_cast<uint64_t>((SentBytes * 8 * 1000) / ElapsedWallTimeMS) : 0u), + NiceBytes(ReceivedBytes), + NiceNum(ElapsedWallTimeMS > 0u ? static_cast<uint64_t>((ReceivedBytes * 8 * 1000) / ElapsedWallTimeMS) : 0u)); } void LogRemoteStoreStatsDetails(const RemoteProjectStore::Stats& Stats) @@ -229,44 +234,66 @@ namespace remotestore_impl { struct DownloadInfo { - uint64_t OplogSizeBytes = 0; - std::atomic<uint64_t> AttachmentsDownloaded = 0; - std::atomic<uint64_t> AttachmentBlocksDownloaded = 0; - std::atomic<uint64_t> AttachmentBytesDownloaded = 0; - std::atomic<uint64_t> AttachmentBlockBytesDownloaded = 0; - std::atomic<uint64_t> AttachmentsStored = 0; - std::atomic<uint64_t> AttachmentBytesStored = 0; - std::atomic_size_t MissingAttachmentCount = 0; + uint64_t OplogSizeBytes = 0; + std::atomic<uint64_t> AttachmentsDownloaded = 0; + std::atomic<uint64_t> AttachmentBlocksDownloaded = 0; + std::atomic<uint64_t> AttachmentBlocksRangesDownloaded = 0; + std::atomic<uint64_t> AttachmentBytesDownloaded = 0; + std::atomic<uint64_t> AttachmentBlockBytesDownloaded = 0; + std::atomic<uint64_t> AttachmentBlockRangeBytesDownloaded = 0; + std::atomic<uint64_t> AttachmentsStored = 0; + std::atomic<uint64_t> AttachmentBytesStored = 0; + std::atomic_size_t MissingAttachmentCount = 0; }; - void DownloadAndSaveBlockChunks(CidStore& ChunkStore, - RemoteProjectStore& RemoteStore, - bool IgnoreMissingAttachments, - JobContext* OptionalContext, - WorkerThreadPool& NetworkWorkerPool, - WorkerThreadPool& WorkerPool, - Latch& AttachmentsDownloadLatch, - Latch& AttachmentsWriteLatch, - AsyncRemoteResult& RemoteResult, - DownloadInfo& Info, - Stopwatch& LoadAttachmentsTimer, - std::atomic_uint64_t& DownloadStartMS, - const std::vector<IoHash>& Chunks) + class JobContextLogOutput : public OperationLogOutput + { + public: + JobContextLogOutput(JobContext* OptionalContext) : m_OptionalContext(OptionalContext) {} + virtual void EmitLogMessage(const logging::LogPoint& Point, fmt::format_args Args) override + { + if (m_OptionalContext) + { + fmt::basic_memory_buffer<char, 250> MessageBuffer; + fmt::vformat_to(fmt::appender(MessageBuffer), Point.FormatString, Args); + remotestore_impl::ReportMessage(m_OptionalContext, std::string_view(MessageBuffer.data(), MessageBuffer.size())); + } + } + + virtual void SetLogOperationName(std::string_view Name) override { ZEN_UNUSED(Name); } + virtual void SetLogOperationProgress(uint32_t StepIndex, uint32_t StepCount) override { ZEN_UNUSED(StepIndex, StepCount); } + virtual uint32_t GetProgressUpdateDelayMS() override { return 0; } + virtual ProgressBar* CreateProgressBar(std::string_view InSubTask) override + { + ZEN_UNUSED(InSubTask); + return nullptr; + } + + private: + JobContext* m_OptionalContext; + }; + + void DownloadAndSaveBlockChunks(LoadOplogContext& Context, + Latch& AttachmentsDownloadLatch, + Latch& AttachmentsWriteLatch, + AsyncRemoteResult& RemoteResult, + DownloadInfo& Info, + Stopwatch& LoadAttachmentsTimer, + std::atomic_uint64_t& DownloadStartMS, + ThinChunkBlockDescription&& ThinBlockDescription, + std::vector<uint32_t>&& NeededChunkIndexes) { AttachmentsDownloadLatch.AddCount(1); - NetworkWorkerPool.ScheduleWork( - [&RemoteStore, - &ChunkStore, - &WorkerPool, + Context.NetworkWorkerPool.ScheduleWork( + [&Context, &AttachmentsDownloadLatch, &AttachmentsWriteLatch, &RemoteResult, - Chunks = Chunks, + ThinBlockDescription = std::move(ThinBlockDescription), + NeededChunkIndexes = std::move(NeededChunkIndexes), &Info, &LoadAttachmentsTimer, - &DownloadStartMS, - IgnoreMissingAttachments, - OptionalContext]() { + &DownloadStartMS]() { ZEN_TRACE_CPU("DownloadBlockChunks"); auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); }); @@ -276,34 +303,47 @@ namespace remotestore_impl { } try { + std::vector<IoHash> Chunks; + Chunks.reserve(NeededChunkIndexes.size()); + for (uint32_t ChunkIndex : NeededChunkIndexes) + { + Chunks.push_back(ThinBlockDescription.ChunkRawHashes[ChunkIndex]); + } + uint64_t Unset = (std::uint64_t)-1; DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs()); - RemoteProjectStore::LoadAttachmentsResult Result = RemoteStore.LoadAttachments(Chunks); + RemoteProjectStore::LoadAttachmentsResult Result = Context.RemoteStore.LoadAttachments(Chunks); if (Result.ErrorCode) { - ReportMessage(OptionalContext, + ReportMessage(Context.OptionalJobContext, fmt::format("Failed to load attachments with {} chunks ({}): {}", Chunks.size(), RemoteResult.GetError(), RemoteResult.GetErrorReason())); Info.MissingAttachmentCount.fetch_add(1); - if (IgnoreMissingAttachments) + if (Context.IgnoreMissingAttachments) { RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); } return; } - Info.AttachmentsDownloaded.fetch_add(Chunks.size()); - ZEN_INFO("Loaded {} bulk attachments in {}", - Chunks.size(), - NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000))); + Info.AttachmentsDownloaded.fetch_add(Result.Chunks.size()); + for (const auto& It : Result.Chunks) + { + uint64_t ChunkSize = It.second.GetCompressedSize(); + Info.AttachmentBytesDownloaded.fetch_add(ChunkSize); + } + remotestore_impl::ReportMessage(Context.OptionalJobContext, + fmt::format("Loaded {} bulk attachments in {}", + Chunks.size(), + NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)))); if (RemoteResult.IsError()) { return; } AttachmentsWriteLatch.AddCount(1); - WorkerPool.ScheduleWork( - [&AttachmentsWriteLatch, &RemoteResult, &Info, &ChunkStore, Chunks = std::move(Result.Chunks)]() { + Context.WorkerPool.ScheduleWork( + [&AttachmentsWriteLatch, &RemoteResult, &Info, &Context, Chunks = std::move(Result.Chunks)]() { auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); }); if (RemoteResult.IsError()) { @@ -320,13 +360,13 @@ namespace remotestore_impl { for (const auto& It : Chunks) { - uint64_t ChunkSize = It.second.GetCompressedSize(); - Info.AttachmentBytesDownloaded.fetch_add(ChunkSize); WriteAttachmentBuffers.push_back(It.second.GetCompressed().Flatten().AsIoBuffer()); WriteRawHashes.push_back(It.first); } std::vector<CidStore::InsertResult> InsertResults = - ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes, CidStore::InsertMode::kCopyOnly); + Context.ChunkStore.AddChunks(WriteAttachmentBuffers, + WriteRawHashes, + CidStore::InsertMode::kCopyOnly); for (size_t Index = 0; Index < InsertResults.size(); Index++) { @@ -350,46 +390,38 @@ namespace remotestore_impl { catch (const std::exception& Ex) { RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), - fmt::format("Failed to bulk load {} attachments", Chunks.size()), + fmt::format("Failed to bulk load {} attachments", NeededChunkIndexes.size()), Ex.what()); } }, WorkerThreadPool::EMode::EnableBacklog); }; - void DownloadAndSaveBlock(CidStore& ChunkStore, - RemoteProjectStore& RemoteStore, - bool IgnoreMissingAttachments, - JobContext* OptionalContext, - WorkerThreadPool& NetworkWorkerPool, - WorkerThreadPool& WorkerPool, - Latch& AttachmentsDownloadLatch, - Latch& AttachmentsWriteLatch, - AsyncRemoteResult& RemoteResult, - DownloadInfo& Info, - Stopwatch& LoadAttachmentsTimer, - std::atomic_uint64_t& DownloadStartMS, - const IoHash& BlockHash, - const std::vector<IoHash>& Chunks, - uint32_t RetriesLeft) + void DownloadAndSaveBlock(LoadOplogContext& Context, + Latch& AttachmentsDownloadLatch, + Latch& AttachmentsWriteLatch, + AsyncRemoteResult& RemoteResult, + DownloadInfo& Info, + Stopwatch& LoadAttachmentsTimer, + std::atomic_uint64_t& DownloadStartMS, + const IoHash& BlockHash, + const tsl::robin_map<IoHash, uint32_t, IoHash::Hasher>& AllNeededPartialChunkHashesLookup, + std::span<std::atomic<bool>> ChunkDownloadedFlags, + uint32_t RetriesLeft) { AttachmentsDownloadLatch.AddCount(1); - NetworkWorkerPool.ScheduleWork( + Context.NetworkWorkerPool.ScheduleWork( [&AttachmentsDownloadLatch, &AttachmentsWriteLatch, - &ChunkStore, - &RemoteStore, - &NetworkWorkerPool, - &WorkerPool, - BlockHash, + &Context, &RemoteResult, &Info, &LoadAttachmentsTimer, &DownloadStartMS, - IgnoreMissingAttachments, - OptionalContext, RetriesLeft, - Chunks = std::vector<IoHash>(Chunks)]() { + BlockHash = IoHash(BlockHash), + &AllNeededPartialChunkHashesLookup, + ChunkDownloadedFlags]() { ZEN_TRACE_CPU("DownloadBlock"); auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); }); @@ -401,51 +433,65 @@ namespace remotestore_impl { { uint64_t Unset = (std::uint64_t)-1; DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs()); - RemoteProjectStore::LoadAttachmentResult BlockResult = RemoteStore.LoadAttachment(BlockHash); - if (BlockResult.ErrorCode) + + IoBuffer BlobBuffer; + if (Context.OptionalCache) { - ReportMessage(OptionalContext, - fmt::format("Failed to download block attachment {} ({}): {}", - BlockHash, - RemoteResult.GetError(), - RemoteResult.GetErrorReason())); - Info.MissingAttachmentCount.fetch_add(1); - if (!IgnoreMissingAttachments) - { - RemoteResult.SetError(BlockResult.ErrorCode, BlockResult.Reason, BlockResult.Text); - } - return; + BlobBuffer = Context.OptionalCache->GetBuildBlob(Context.CacheBuildId, BlockHash); } - if (RemoteResult.IsError()) + + if (!BlobBuffer) { - return; + RemoteProjectStore::LoadAttachmentResult BlockResult = Context.RemoteStore.LoadAttachment(BlockHash); + if (BlockResult.ErrorCode) + { + ReportMessage(Context.OptionalJobContext, + fmt::format("Failed to download block attachment {} ({}): {}", + BlockHash, + BlockResult.Reason, + BlockResult.Text)); + Info.MissingAttachmentCount.fetch_add(1); + if (!Context.IgnoreMissingAttachments) + { + RemoteResult.SetError(BlockResult.ErrorCode, BlockResult.Reason, BlockResult.Text); + } + return; + } + if (RemoteResult.IsError()) + { + return; + } + BlobBuffer = std::move(BlockResult.Bytes); + ZEN_DEBUG("Loaded block attachment '{}' in {} ({})", + BlockHash, + NiceTimeSpanMs(static_cast<uint64_t>(BlockResult.ElapsedSeconds * 1000)), + NiceBytes(BlobBuffer.Size())); + if (Context.OptionalCache && Context.PopulateCache) + { + Context.OptionalCache->PutBuildBlob(Context.CacheBuildId, + BlockHash, + BlobBuffer.GetContentType(), + CompositeBuffer(SharedBuffer(BlobBuffer))); + } } - uint64_t BlockSize = BlockResult.Bytes.GetSize(); + uint64_t BlockSize = BlobBuffer.GetSize(); Info.AttachmentBlocksDownloaded.fetch_add(1); - ZEN_INFO("Loaded block attachment '{}' in {} ({})", - BlockHash, - NiceTimeSpanMs(static_cast<uint64_t>(BlockResult.ElapsedSeconds * 1000)), - NiceBytes(BlockSize)); Info.AttachmentBlockBytesDownloaded.fetch_add(BlockSize); AttachmentsWriteLatch.AddCount(1); - WorkerPool.ScheduleWork( + Context.WorkerPool.ScheduleWork( [&AttachmentsDownloadLatch, &AttachmentsWriteLatch, - &ChunkStore, - &RemoteStore, - &NetworkWorkerPool, - &WorkerPool, - BlockHash, + &Context, &RemoteResult, &Info, &LoadAttachmentsTimer, &DownloadStartMS, - IgnoreMissingAttachments, - OptionalContext, RetriesLeft, - Chunks = std::move(Chunks), - Bytes = std::move(BlockResult.Bytes)]() { + BlockHash = IoHash(BlockHash), + &AllNeededPartialChunkHashesLookup, + ChunkDownloadedFlags, + Bytes = std::move(BlobBuffer)]() { auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); }); if (RemoteResult.IsError()) { @@ -454,64 +500,107 @@ namespace remotestore_impl { try { ZEN_ASSERT(Bytes.Size() > 0); - std::unordered_set<IoHash, IoHash::Hasher> WantedChunks; - WantedChunks.reserve(Chunks.size()); - WantedChunks.insert(Chunks.begin(), Chunks.end()); std::vector<IoBuffer> WriteAttachmentBuffers; std::vector<IoHash> WriteRawHashes; IoHash RawHash; uint64_t RawSize; CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Bytes), RawHash, RawSize); + + std::string ErrorString; + if (!Compressed) { - if (RetriesLeft > 0) + ErrorString = + fmt::format("Block attachment {} is malformed, can't parse as compressed binary", BlockHash); + } + else if (RawHash != BlockHash) + { + ErrorString = fmt::format("Block attachment {} has mismatching raw hash ({})", BlockHash, RawHash); + } + else if (CompositeBuffer BlockPayload = Compressed.DecompressToComposite(); !BlockPayload) + { + ErrorString = fmt::format("Block attachment {} is malformed, can't decompress payload", BlockHash); + } + else + { + uint64_t PotentialSize = 0; + uint64_t UsedSize = 0; + uint64_t BlockSize = BlockPayload.GetSize(); + + uint64_t BlockHeaderSize = 0; + + bool StoreChunksOK = IterateChunkBlock( + BlockPayload.Flatten(), + [&AllNeededPartialChunkHashesLookup, + &ChunkDownloadedFlags, + &WriteAttachmentBuffers, + &WriteRawHashes, + &Info, + &PotentialSize](CompressedBuffer&& Chunk, const IoHash& AttachmentRawHash) { + auto ChunkIndexIt = AllNeededPartialChunkHashesLookup.find(AttachmentRawHash); + if (ChunkIndexIt != AllNeededPartialChunkHashesLookup.end()) + { + bool Expected = false; + if (ChunkDownloadedFlags[ChunkIndexIt->second].compare_exchange_strong(Expected, true)) + { + WriteAttachmentBuffers.emplace_back(Chunk.GetCompressed().Flatten().AsIoBuffer()); + IoHash RawHash; + uint64_t RawSize; + ZEN_ASSERT(CompressedBuffer::ValidateCompressedHeader( + WriteAttachmentBuffers.back(), + RawHash, + RawSize, + /*OutOptionalTotalCompressedSize*/ nullptr)); + ZEN_ASSERT(RawHash == AttachmentRawHash); + WriteRawHashes.emplace_back(AttachmentRawHash); + PotentialSize += WriteAttachmentBuffers.back().GetSize(); + } + } + }, + BlockHeaderSize); + + if (!StoreChunksOK) { - ReportMessage( - OptionalContext, - fmt::format( - "Block attachment {} is malformed, can't parse as compressed binary, retrying download", - BlockHash)); - return DownloadAndSaveBlock(ChunkStore, - RemoteStore, - IgnoreMissingAttachments, - OptionalContext, - NetworkWorkerPool, - WorkerPool, - AttachmentsDownloadLatch, - AttachmentsWriteLatch, - RemoteResult, - Info, - LoadAttachmentsTimer, - DownloadStartMS, - BlockHash, - std::move(Chunks), - RetriesLeft - 1); + ErrorString = fmt::format("Invalid format for block {}", BlockHash); + } + else + { + if (!WriteAttachmentBuffers.empty()) + { + std::vector<CidStore::InsertResult> Results = + Context.ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes); + for (size_t Index = 0; Index < Results.size(); Index++) + { + const CidStore::InsertResult& Result = Results[Index]; + if (Result.New) + { + Info.AttachmentBytesStored.fetch_add(WriteAttachmentBuffers[Index].GetSize()); + Info.AttachmentsStored.fetch_add(1); + UsedSize += WriteAttachmentBuffers[Index].GetSize(); + } + } + if (UsedSize < BlockSize) + { + ZEN_DEBUG("Used {} (skipping {}) out of {} for block {} ({} %) (use of matching {}%)", + NiceBytes(UsedSize), + NiceBytes(BlockSize - UsedSize), + NiceBytes(BlockSize), + BlockHash, + (100 * UsedSize) / BlockSize, + PotentialSize > 0 ? (UsedSize * 100) / PotentialSize : 0); + } + } } - ReportMessage( - OptionalContext, - fmt::format("Block attachment {} is malformed, can't parse as compressed binary", BlockHash)); - RemoteResult.SetError( - gsl::narrow<int32_t>(HttpResponseCode::InternalServerError), - fmt::format("Block attachment {} is malformed, can't parse as compressed binary", BlockHash), - {}); - return; } - CompositeBuffer BlockPayload = Compressed.DecompressToComposite(); - if (!BlockPayload) + + if (!ErrorString.empty()) { if (RetriesLeft > 0) { - ReportMessage( - OptionalContext, - fmt::format("Block attachment {} is malformed, can't decompress payload, retrying download", - BlockHash)); - return DownloadAndSaveBlock(ChunkStore, - RemoteStore, - IgnoreMissingAttachments, - OptionalContext, - NetworkWorkerPool, - WorkerPool, + ReportMessage(Context.OptionalJobContext, fmt::format("{}, retrying download", ErrorString)); + + return DownloadAndSaveBlock(Context, AttachmentsDownloadLatch, AttachmentsWriteLatch, RemoteResult, @@ -519,91 +608,16 @@ namespace remotestore_impl { LoadAttachmentsTimer, DownloadStartMS, BlockHash, - std::move(Chunks), + AllNeededPartialChunkHashesLookup, + ChunkDownloadedFlags, RetriesLeft - 1); } - ReportMessage(OptionalContext, - fmt::format("Block attachment {} is malformed, can't decompress payload", BlockHash)); - RemoteResult.SetError( - gsl::narrow<int32_t>(HttpResponseCode::InternalServerError), - fmt::format("Block attachment {} is malformed, can't decompress payload", BlockHash), - {}); - return; - } - if (RawHash != BlockHash) - { - ReportMessage(OptionalContext, - fmt::format("Block attachment {} has mismatching raw hash ({})", BlockHash, RawHash)); - RemoteResult.SetError( - gsl::narrow<int32_t>(HttpResponseCode::InternalServerError), - fmt::format("Block attachment {} has mismatching raw hash ({})", BlockHash, RawHash), - {}); - return; - } - - uint64_t PotentialSize = 0; - uint64_t UsedSize = 0; - uint64_t BlockSize = BlockPayload.GetSize(); - - uint64_t BlockHeaderSize = 0; - bool StoreChunksOK = IterateChunkBlock( - BlockPayload.Flatten(), - [&WantedChunks, &WriteAttachmentBuffers, &WriteRawHashes, &Info, &PotentialSize]( - CompressedBuffer&& Chunk, - const IoHash& AttachmentRawHash) { - if (WantedChunks.contains(AttachmentRawHash)) - { - WriteAttachmentBuffers.emplace_back(Chunk.GetCompressed().Flatten().AsIoBuffer()); - IoHash RawHash; - uint64_t RawSize; - ZEN_ASSERT( - CompressedBuffer::ValidateCompressedHeader(WriteAttachmentBuffers.back(), - RawHash, - RawSize, - /*OutOptionalTotalCompressedSize*/ nullptr)); - ZEN_ASSERT(RawHash == AttachmentRawHash); - WriteRawHashes.emplace_back(AttachmentRawHash); - WantedChunks.erase(AttachmentRawHash); - PotentialSize += WriteAttachmentBuffers.back().GetSize(); - } - }, - BlockHeaderSize); - - if (!StoreChunksOK) - { - ReportMessage(OptionalContext, - fmt::format("Block attachment {} has invalid format ({}): {}", - BlockHash, - RemoteResult.GetError(), - RemoteResult.GetErrorReason())); - RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::InternalServerError), - fmt::format("Invalid format for block {}", BlockHash), - {}); - return; - } - - ZEN_ASSERT(WantedChunks.empty()); - - if (!WriteAttachmentBuffers.empty()) - { - auto Results = ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes); - for (size_t Index = 0; Index < Results.size(); Index++) + else { - const auto& Result = Results[Index]; - if (Result.New) - { - Info.AttachmentBytesStored.fetch_add(WriteAttachmentBuffers[Index].GetSize()); - Info.AttachmentsStored.fetch_add(1); - UsedSize += WriteAttachmentBuffers[Index].GetSize(); - } + ReportMessage(Context.OptionalJobContext, ErrorString); + RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::InternalServerError), ErrorString, {}); + return; } - ZEN_DEBUG("Used {} (matching {}) out of {} for block {} ({} %) (use of matching {}%)", - NiceBytes(UsedSize), - NiceBytes(PotentialSize), - NiceBytes(BlockSize), - BlockHash, - (100 * UsedSize) / BlockSize, - PotentialSize > 0 ? (UsedSize * 100) / PotentialSize : 0); } } catch (const std::exception& Ex) @@ -618,19 +632,458 @@ namespace remotestore_impl { catch (const std::exception& Ex) { RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), - fmt::format("Failed to block attachment {}", BlockHash), + fmt::format("Failed to download block attachment {}", BlockHash), + Ex.what()); + } + }, + WorkerThreadPool::EMode::EnableBacklog); + }; + + void DownloadPartialBlock(LoadOplogContext& Context, + AsyncRemoteResult& RemoteResult, + DownloadInfo& Info, + double& DownloadTimeSeconds, + const ChunkBlockDescription& BlockDescription, + bool BlockExistsInCache, + std::span<const ChunkBlockAnalyser::BlockRangeDescriptor> BlockRangeDescriptors, + size_t BlockRangeIndexStart, + size_t BlockRangeCount, + std::function<void(IoBuffer&& Buffer, + size_t BlockRangeStartIndex, + std::span<const std::pair<uint64_t, uint64_t>> OffsetAndLengths)>&& OnDownloaded) + { + ZEN_ASSERT(Context.StoreMaxRangeCountPerRequest != 0); + ZEN_ASSERT(BlockExistsInCache == false || Context.CacheMaxRangeCountPerRequest != 0); + + std::vector<std::pair<uint64_t, uint64_t>> Ranges; + Ranges.reserve(BlockRangeDescriptors.size()); + for (size_t BlockRangeIndex = BlockRangeIndexStart; BlockRangeIndex < BlockRangeIndexStart + BlockRangeCount; BlockRangeIndex++) + { + const ChunkBlockAnalyser::BlockRangeDescriptor& BlockRange = BlockRangeDescriptors[BlockRangeIndex]; + Ranges.push_back(std::make_pair(BlockRange.RangeStart, BlockRange.RangeLength)); + } + + size_t SubBlockRangeCount = BlockRangeCount; + size_t SubRangeCountComplete = 0; + std::span<const std::pair<uint64_t, uint64_t>> RangesSpan(Ranges); + + while (SubRangeCountComplete < SubBlockRangeCount) + { + if (RemoteResult.IsError()) + { + break; + } + + size_t SubRangeStartIndex = BlockRangeIndexStart + SubRangeCountComplete; + if (BlockExistsInCache) + { + ZEN_ASSERT(Context.OptionalCache); + size_t SubRangeCount = Min(BlockRangeCount - SubRangeCountComplete, Context.CacheMaxRangeCountPerRequest); + + if (SubRangeCount == 1) + { + // Legacy single-range path, prefer that for max compatibility + + const std::pair<uint64_t, uint64_t> SubRange = RangesSpan[SubRangeCountComplete]; + Stopwatch CacheTimer; + IoBuffer PayloadBuffer = Context.OptionalCache->GetBuildBlob(Context.CacheBuildId, + BlockDescription.BlockHash, + SubRange.first, + SubRange.second); + DownloadTimeSeconds += CacheTimer.GetElapsedTimeMs() / 1000.0; + if (RemoteResult.IsError()) + { + break; + } + if (PayloadBuffer) + { + OnDownloaded(std::move(PayloadBuffer), + SubRangeStartIndex, + std::vector<std::pair<uint64_t, uint64_t>>{std::make_pair(0u, SubRange.second)}); + SubRangeCountComplete += SubRangeCount; + continue; + } + } + else + { + auto SubRanges = RangesSpan.subspan(SubRangeCountComplete, SubRangeCount); + + Stopwatch CacheTimer; + BuildStorageCache::BuildBlobRanges RangeBuffers = + Context.OptionalCache->GetBuildBlobRanges(Context.CacheBuildId, BlockDescription.BlockHash, SubRanges); + DownloadTimeSeconds += CacheTimer.GetElapsedTimeMs() / 1000.0; + if (RemoteResult.IsError()) + { + break; + } + if (RangeBuffers.PayloadBuffer) + { + if (RangeBuffers.Ranges.empty()) + { + SubRangeCount = Ranges.size() - SubRangeCountComplete; + OnDownloaded(std::move(RangeBuffers.PayloadBuffer), + SubRangeStartIndex, + RangesSpan.subspan(SubRangeCountComplete, SubRangeCount)); + SubRangeCountComplete += SubRangeCount; + continue; + } + else if (RangeBuffers.Ranges.size() == SubRangeCount) + { + OnDownloaded(std::move(RangeBuffers.PayloadBuffer), SubRangeStartIndex, RangeBuffers.Ranges); + SubRangeCountComplete += SubRangeCount; + continue; + } + } + } + } + + size_t SubRangeCount = Min(BlockRangeCount - SubRangeCountComplete, Context.StoreMaxRangeCountPerRequest); + + auto SubRanges = RangesSpan.subspan(SubRangeCountComplete, SubRangeCount); + + RemoteProjectStore::LoadAttachmentRangesResult BlockResult = + Context.RemoteStore.LoadAttachmentRanges(BlockDescription.BlockHash, SubRanges); + DownloadTimeSeconds += BlockResult.ElapsedSeconds; + if (RemoteResult.IsError()) + { + break; + } + if (BlockResult.ErrorCode || !BlockResult.Bytes) + { + ReportMessage(Context.OptionalJobContext, + fmt::format("Failed to download {} ranges from block attachment '{}' ({}): {}", + SubRanges.size(), + BlockDescription.BlockHash, + BlockResult.ErrorCode, + BlockResult.Reason)); + Info.MissingAttachmentCount.fetch_add(1); + if (!Context.IgnoreMissingAttachments) + { + RemoteResult.SetError(BlockResult.ErrorCode, BlockResult.Reason, BlockResult.Text); + break; + } + } + else + { + if (BlockResult.Ranges.empty()) + { + // Jupiter will ignore the ranges and send the whole payload if it fetches the payload from S3 + // Use the whole payload for the remaining ranges + + if (Context.OptionalCache && Context.PopulateCache) + { + Context.OptionalCache->PutBuildBlob(Context.CacheBuildId, + BlockDescription.BlockHash, + ZenContentType::kCompressedBinary, + CompositeBuffer(std::vector<IoBuffer>{BlockResult.Bytes})); + if (RemoteResult.IsError()) + { + break; + } + } + SubRangeCount = Ranges.size() - SubRangeCountComplete; + OnDownloaded(std::move(BlockResult.Bytes), + SubRangeStartIndex, + RangesSpan.subspan(SubRangeCountComplete, SubRangeCount)); + } + else + { + if (BlockResult.Ranges.size() != SubRanges.size()) + { + RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::InternalServerError), + fmt::format("Range response for block {} contains {} ranges, expected {} ranges", + BlockDescription.BlockHash, + BlockResult.Ranges.size(), + SubRanges.size()), + ""); + break; + } + OnDownloaded(std::move(BlockResult.Bytes), SubRangeStartIndex, BlockResult.Ranges); + } + } + + SubRangeCountComplete += SubRangeCount; + } + } + + void DownloadAndSavePartialBlock(LoadOplogContext& Context, + Latch& AttachmentsDownloadLatch, + Latch& AttachmentsWriteLatch, + AsyncRemoteResult& RemoteResult, + DownloadInfo& Info, + Stopwatch& LoadAttachmentsTimer, + std::atomic_uint64_t& DownloadStartMS, + const ChunkBlockDescription& BlockDescription, + bool BlockExistsInCache, + std::span<const ChunkBlockAnalyser::BlockRangeDescriptor> BlockRangeDescriptors, + size_t BlockRangeIndexStart, + size_t BlockRangeCount, + const tsl::robin_map<IoHash, uint32_t, IoHash::Hasher>& AllNeededPartialChunkHashesLookup, + std::span<std::atomic<bool>> ChunkDownloadedFlags, + uint32_t RetriesLeft) + { + AttachmentsDownloadLatch.AddCount(1); + Context.NetworkWorkerPool.ScheduleWork( + [&AttachmentsDownloadLatch, + &AttachmentsWriteLatch, + &Context, + &RemoteResult, + &Info, + &LoadAttachmentsTimer, + &DownloadStartMS, + BlockDescription, + BlockExistsInCache, + BlockRangeDescriptors, + BlockRangeIndexStart, + BlockRangeCount, + &AllNeededPartialChunkHashesLookup, + ChunkDownloadedFlags, + RetriesLeft]() { + ZEN_TRACE_CPU("DownloadBlockRanges"); + + auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); }); + try + { + uint64_t Unset = (std::uint64_t)-1; + DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs()); + + double DownloadElapsedSeconds = 0; + uint64_t DownloadedBytes = 0; + + DownloadPartialBlock( + Context, + RemoteResult, + Info, + DownloadElapsedSeconds, + BlockDescription, + BlockExistsInCache, + BlockRangeDescriptors, + BlockRangeIndexStart, + BlockRangeCount, + [&](IoBuffer&& Buffer, + size_t BlockRangeStartIndex, + std::span<const std::pair<uint64_t, uint64_t>> OffsetAndLengths) { + uint64_t BlockPartSize = Buffer.GetSize(); + DownloadedBytes += BlockPartSize; + + Info.AttachmentBlockRangeBytesDownloaded.fetch_add(BlockPartSize); + Info.AttachmentBlocksRangesDownloaded++; + + AttachmentsWriteLatch.AddCount(1); + Context.WorkerPool.ScheduleWork( + [&AttachmentsWriteLatch, + &Context, + &AttachmentsDownloadLatch, + &RemoteResult, + &Info, + &LoadAttachmentsTimer, + &DownloadStartMS, + BlockDescription, + BlockExistsInCache, + BlockRangeDescriptors, + BlockRangeStartIndex, + &AllNeededPartialChunkHashesLookup, + ChunkDownloadedFlags, + RetriesLeft, + BlockPayload = std::move(Buffer), + OffsetAndLengths = + std::vector<std::pair<uint64_t, uint64_t>>(OffsetAndLengths.begin(), OffsetAndLengths.end())]() { + auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); }); + try + { + ZEN_ASSERT(BlockPayload.Size() > 0); + + size_t RangeCount = OffsetAndLengths.size(); + for (size_t RangeOffset = 0; RangeOffset < RangeCount; RangeOffset++) + { + if (RemoteResult.IsError()) + { + return; + } + + const ChunkBlockAnalyser::BlockRangeDescriptor& BlockRange = + BlockRangeDescriptors[BlockRangeStartIndex + RangeOffset]; + const std::pair<uint64_t, uint64_t>& OffsetAndLength = OffsetAndLengths[RangeOffset]; + IoBuffer BlockRangeBuffer(BlockPayload, OffsetAndLength.first, OffsetAndLength.second); + + std::vector<IoBuffer> WriteAttachmentBuffers; + std::vector<IoHash> WriteRawHashes; + + uint64_t PotentialSize = 0; + uint64_t UsedSize = 0; + uint64_t BlockPartSize = BlockRangeBuffer.GetSize(); + + uint32_t OffsetInBlock = 0; + for (uint32_t ChunkBlockIndex = BlockRange.ChunkBlockIndexStart; + ChunkBlockIndex < BlockRange.ChunkBlockIndexStart + BlockRange.ChunkBlockIndexCount; + ChunkBlockIndex++) + { + if (RemoteResult.IsError()) + { + break; + } + + const uint32_t ChunkCompressedSize = + BlockDescription.ChunkCompressedLengths[ChunkBlockIndex]; + const IoHash& ChunkHash = BlockDescription.ChunkRawHashes[ChunkBlockIndex]; + + if (auto ChunkIndexIt = AllNeededPartialChunkHashesLookup.find(ChunkHash); + ChunkIndexIt != AllNeededPartialChunkHashesLookup.end()) + { + if (!ChunkDownloadedFlags[ChunkIndexIt->second]) + { + IoHash VerifyChunkHash; + uint64_t VerifyChunkSize; + CompressedBuffer CompressedChunk = CompressedBuffer::FromCompressed( + SharedBuffer(IoBuffer(BlockRangeBuffer, OffsetInBlock, ChunkCompressedSize)), + VerifyChunkHash, + VerifyChunkSize); + + std::string ErrorString; + + if (!CompressedChunk) + { + ErrorString = fmt::format( + "Chunk at {},{} in block attachment '{}' is not a valid compressed buffer", + OffsetInBlock, + ChunkCompressedSize, + BlockDescription.BlockHash); + } + else if (VerifyChunkHash != ChunkHash) + { + ErrorString = fmt::format( + "Chunk at {},{} in block attachment '{}' has mismatching hash, expected " + "{}, got {}", + OffsetInBlock, + ChunkCompressedSize, + BlockDescription.BlockHash, + ChunkHash, + VerifyChunkHash); + } + else if (VerifyChunkSize != BlockDescription.ChunkRawLengths[ChunkBlockIndex]) + { + ErrorString = fmt::format( + "Chunk at {},{} in block attachment '{}' has mismatching raw size, " + "expected {}, " + "got {}", + OffsetInBlock, + ChunkCompressedSize, + BlockDescription.BlockHash, + BlockDescription.ChunkRawLengths[ChunkBlockIndex], + VerifyChunkSize); + } + + if (!ErrorString.empty()) + { + if (RetriesLeft > 0) + { + ReportMessage(Context.OptionalJobContext, + fmt::format("{}, retrying download", ErrorString)); + return DownloadAndSavePartialBlock(Context, + AttachmentsDownloadLatch, + AttachmentsWriteLatch, + RemoteResult, + Info, + LoadAttachmentsTimer, + DownloadStartMS, + BlockDescription, + BlockExistsInCache, + BlockRangeDescriptors, + BlockRangeStartIndex, + RangeCount, + AllNeededPartialChunkHashesLookup, + ChunkDownloadedFlags, + RetriesLeft - 1); + } + + ReportMessage(Context.OptionalJobContext, ErrorString); + Info.MissingAttachmentCount.fetch_add(1); + if (!Context.IgnoreMissingAttachments) + { + RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::NotFound), + "Malformed chunk block", + ErrorString); + } + } + else + { + bool Expected = false; + if (ChunkDownloadedFlags[ChunkIndexIt->second].compare_exchange_strong(Expected, + true)) + { + WriteAttachmentBuffers.emplace_back( + CompressedChunk.GetCompressed().Flatten().AsIoBuffer()); + WriteRawHashes.emplace_back(ChunkHash); + PotentialSize += WriteAttachmentBuffers.back().GetSize(); + } + } + } + } + OffsetInBlock += ChunkCompressedSize; + } + + if (!WriteAttachmentBuffers.empty()) + { + std::vector<CidStore::InsertResult> Results = + Context.ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes); + for (size_t Index = 0; Index < Results.size(); Index++) + { + const CidStore::InsertResult& Result = Results[Index]; + if (Result.New) + { + Info.AttachmentBytesStored.fetch_add(WriteAttachmentBuffers[Index].GetSize()); + Info.AttachmentsStored.fetch_add(1); + UsedSize += WriteAttachmentBuffers[Index].GetSize(); + } + } + if (UsedSize < BlockPartSize) + { + ZEN_DEBUG( + "Used {} (skipping {}) out of {} for block {} range {}, {} ({} %) (use of matching " + "{}%)", + NiceBytes(UsedSize), + NiceBytes(BlockPartSize - UsedSize), + NiceBytes(BlockPartSize), + BlockDescription.BlockHash, + BlockRange.RangeStart, + BlockRange.RangeLength, + (100 * UsedSize) / BlockPartSize, + PotentialSize > 0 ? (UsedSize * 100) / PotentialSize : 0); + } + } + } + } + catch (const std::exception& Ex) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), + fmt::format("Failed saving {} ranges from block attachment {}", + OffsetAndLengths.size(), + BlockDescription.BlockHash), + Ex.what()); + } + }, + WorkerThreadPool::EMode::EnableBacklog); + }); + if (!RemoteResult.IsError()) + { + ZEN_DEBUG("Loaded {} ranges from block attachment '{}' in {} ({})", + BlockRangeCount, + BlockDescription.BlockHash, + NiceTimeSpanMs(static_cast<uint64_t>(DownloadElapsedSeconds * 1000)), + NiceBytes(DownloadedBytes)); + } + } + catch (const std::exception& Ex) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), + fmt::format("Failed to download block attachment {} ranges", BlockDescription.BlockHash), Ex.what()); } }, WorkerThreadPool::EMode::EnableBacklog); }; - void DownloadAndSaveAttachment(CidStore& ChunkStore, - RemoteProjectStore& RemoteStore, - bool IgnoreMissingAttachments, - JobContext* OptionalContext, - WorkerThreadPool& NetworkWorkerPool, - WorkerThreadPool& WorkerPool, + void DownloadAndSaveAttachment(LoadOplogContext& Context, Latch& AttachmentsDownloadLatch, Latch& AttachmentsWriteLatch, AsyncRemoteResult& RemoteResult, @@ -640,19 +1093,15 @@ namespace remotestore_impl { const IoHash& RawHash) { AttachmentsDownloadLatch.AddCount(1); - NetworkWorkerPool.ScheduleWork( - [&RemoteStore, - &ChunkStore, - &WorkerPool, + Context.NetworkWorkerPool.ScheduleWork( + [&Context, &RemoteResult, &AttachmentsDownloadLatch, &AttachmentsWriteLatch, RawHash, &LoadAttachmentsTimer, &DownloadStartMS, - &Info, - IgnoreMissingAttachments, - OptionalContext]() { + &Info]() { ZEN_TRACE_CPU("DownloadAttachment"); auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); }); @@ -664,43 +1113,52 @@ namespace remotestore_impl { { uint64_t Unset = (std::uint64_t)-1; DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs()); - RemoteProjectStore::LoadAttachmentResult AttachmentResult = RemoteStore.LoadAttachment(RawHash); - if (AttachmentResult.ErrorCode) + IoBuffer BlobBuffer; + if (Context.OptionalCache) { - ReportMessage(OptionalContext, - fmt::format("Failed to download large attachment {}: '{}', error code : {}", - RawHash, - AttachmentResult.Reason, - AttachmentResult.ErrorCode)); - Info.MissingAttachmentCount.fetch_add(1); - if (!IgnoreMissingAttachments) + BlobBuffer = Context.OptionalCache->GetBuildBlob(Context.CacheBuildId, RawHash); + } + if (!BlobBuffer) + { + RemoteProjectStore::LoadAttachmentResult AttachmentResult = Context.RemoteStore.LoadAttachment(RawHash); + if (AttachmentResult.ErrorCode) { - RemoteResult.SetError(AttachmentResult.ErrorCode, AttachmentResult.Reason, AttachmentResult.Text); + ReportMessage(Context.OptionalJobContext, + fmt::format("Failed to download large attachment {}: '{}', error code : {}", + RawHash, + AttachmentResult.Reason, + AttachmentResult.ErrorCode)); + Info.MissingAttachmentCount.fetch_add(1); + if (!Context.IgnoreMissingAttachments) + { + RemoteResult.SetError(AttachmentResult.ErrorCode, AttachmentResult.Reason, AttachmentResult.Text); + } + return; + } + BlobBuffer = std::move(AttachmentResult.Bytes); + ZEN_DEBUG("Loaded large attachment '{}' in {} ({})", + RawHash, + NiceTimeSpanMs(static_cast<uint64_t>(AttachmentResult.ElapsedSeconds * 1000)), + NiceBytes(BlobBuffer.GetSize())); + if (Context.OptionalCache && Context.PopulateCache) + { + Context.OptionalCache->PutBuildBlob(Context.CacheBuildId, + RawHash, + BlobBuffer.GetContentType(), + CompositeBuffer(SharedBuffer(BlobBuffer))); } - return; } - uint64_t AttachmentSize = AttachmentResult.Bytes.GetSize(); - ZEN_INFO("Loaded large attachment '{}' in {} ({})", - RawHash, - NiceTimeSpanMs(static_cast<uint64_t>(AttachmentResult.ElapsedSeconds * 1000)), - NiceBytes(AttachmentSize)); - Info.AttachmentsDownloaded.fetch_add(1); if (RemoteResult.IsError()) { return; } + uint64_t AttachmentSize = BlobBuffer.GetSize(); + Info.AttachmentsDownloaded.fetch_add(1); Info.AttachmentBytesDownloaded.fetch_add(AttachmentSize); AttachmentsWriteLatch.AddCount(1); - WorkerPool.ScheduleWork( - [&AttachmentsWriteLatch, - &RemoteResult, - &Info, - &ChunkStore, - RawHash, - AttachmentSize, - Bytes = std::move(AttachmentResult.Bytes), - OptionalContext]() { + Context.WorkerPool.ScheduleWork( + [&Context, &AttachmentsWriteLatch, &RemoteResult, &Info, RawHash, AttachmentSize, Bytes = std::move(BlobBuffer)]() { ZEN_TRACE_CPU("WriteAttachment"); auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); }); @@ -710,7 +1168,7 @@ namespace remotestore_impl { } try { - CidStore::InsertResult InsertResult = ChunkStore.AddChunk(Bytes, RawHash); + CidStore::InsertResult InsertResult = Context.ChunkStore.AddChunk(Bytes, RawHash); if (InsertResult.New) { Info.AttachmentBytesStored.fetch_add(AttachmentSize); @@ -1126,7 +1584,9 @@ namespace remotestore_impl { uint64_t PartialTransferWallTimeMS = Timer.GetElapsedTimeMs(); ReportProgress(OptionalContext, "Saving attachments"sv, - fmt::format("{} remaining... {}", Remaining, GetStats(RemoteStore.GetStats(), PartialTransferWallTimeMS)), + fmt::format("{} remaining... {}", + Remaining, + GetStats(RemoteStore.GetStats(), /*OptionalCacheStats*/ nullptr, PartialTransferWallTimeMS)), AttachmentsToSave, Remaining); } @@ -1135,7 +1595,7 @@ namespace remotestore_impl { { ReportProgress(OptionalContext, "Saving attachments"sv, - fmt::format("{}", GetStats(RemoteStore.GetStats(), ElapsedTimeMS)), + fmt::format("{}", GetStats(RemoteStore.GetStats(), /*OptionalCacheStats*/ nullptr, ElapsedTimeMS)), AttachmentsToSave, 0); } @@ -1146,7 +1606,7 @@ namespace remotestore_impl { LargeAttachmentCountToUpload, BulkAttachmentCountToUpload, NiceTimeSpanMs(ElapsedTimeMS), - GetStats(RemoteStore.GetStats(), ElapsedTimeMS))); + GetStats(RemoteStore.GetStats(), /*OptionalCacheStats*/ nullptr, ElapsedTimeMS))); } } // namespace remotestore_impl @@ -1224,35 +1684,7 @@ BuildContainer(CidStore& ChunkStore, { using namespace std::literals; - class JobContextLogOutput : public OperationLogOutput - { - public: - JobContextLogOutput(JobContext* OptionalContext) : m_OptionalContext(OptionalContext) {} - virtual void EmitLogMessage(int LogLevel, std::string_view Format, fmt::format_args Args) override - { - ZEN_UNUSED(LogLevel); - if (m_OptionalContext) - { - fmt::basic_memory_buffer<char, 250> MessageBuffer; - fmt::vformat_to(fmt::appender(MessageBuffer), Format, Args); - remotestore_impl::ReportMessage(m_OptionalContext, std::string_view(MessageBuffer.data(), MessageBuffer.size())); - } - } - - virtual void SetLogOperationName(std::string_view Name) override { ZEN_UNUSED(Name); } - virtual void SetLogOperationProgress(uint32_t StepIndex, uint32_t StepCount) override { ZEN_UNUSED(StepIndex, StepCount); } - virtual uint32_t GetProgressUpdateDelayMS() override { return 0; } - virtual ProgressBar* CreateProgressBar(std::string_view InSubTask) override - { - ZEN_UNUSED(InSubTask); - return nullptr; - } - - private: - JobContext* m_OptionalContext; - }; - - std::unique_ptr<OperationLogOutput> LogOutput(std::make_unique<JobContextLogOutput>(OptionalContext)); + std::unique_ptr<OperationLogOutput> LogOutput(std::make_unique<remotestore_impl::JobContextLogOutput>(OptionalContext)); size_t OpCount = 0; @@ -1783,31 +2215,36 @@ BuildContainer(CidStore& ChunkStore, } ResolveAttachmentsLatch.CountDown(); - while (!ResolveAttachmentsLatch.Wait(1000)) { - ptrdiff_t Remaining = ResolveAttachmentsLatch.Remaining(); - if (remotestore_impl::IsCancelled(OptionalContext)) + ptrdiff_t AttachmentCountToUseForProgress = ResolveAttachmentsLatch.Remaining(); + while (!ResolveAttachmentsLatch.Wait(1000)) { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); - remotestore_impl::ReportMessage(OptionalContext, - fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); - while (!ResolveAttachmentsLatch.Wait(1000)) + ptrdiff_t Remaining = ResolveAttachmentsLatch.Remaining(); + if (remotestore_impl::IsCancelled(OptionalContext)) { - Remaining = ResolveAttachmentsLatch.Remaining(); - remotestore_impl::ReportProgress(OptionalContext, - "Resolving attachments"sv, - fmt::format("Aborting, {} attachments remaining...", Remaining), - UploadAttachments.size(), - Remaining); + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); + remotestore_impl::ReportMessage( + OptionalContext, + fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); + while (!ResolveAttachmentsLatch.Wait(1000)) + { + Remaining = ResolveAttachmentsLatch.Remaining(); + remotestore_impl::ReportProgress(OptionalContext, + "Resolving attachments"sv, + fmt::format("Aborting, {} attachments remaining...", Remaining), + UploadAttachments.size(), + Remaining); + } + remotestore_impl::ReportProgress(OptionalContext, "Resolving attachments"sv, "Aborted"sv, UploadAttachments.size(), 0); + return {}; } - remotestore_impl::ReportProgress(OptionalContext, "Resolving attachments"sv, "Aborted"sv, UploadAttachments.size(), 0); - return {}; + AttachmentCountToUseForProgress = Max(Remaining, AttachmentCountToUseForProgress); + remotestore_impl::ReportProgress(OptionalContext, + "Resolving attachments"sv, + fmt::format("{} remaining...", Remaining), + AttachmentCountToUseForProgress, + Remaining); } - remotestore_impl::ReportProgress(OptionalContext, - "Resolving attachments"sv, - fmt::format("{} remaining...", Remaining), - UploadAttachments.size(), - Remaining); } if (UploadAttachments.size() > 0) { @@ -2010,14 +2447,13 @@ BuildContainer(CidStore& ChunkStore, AsyncOnBlock, RemoteResult); ComposedBlocks++; + // Worker will set Blocks[BlockIndex] = Block (including ChunkRawHashes) under shared lock } else { ZEN_INFO("Bulk group {} attachments", ChunkCount); OnBlockChunks(std::move(ChunksInBlock)); - } - { - // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index + // We can share the lock as we are not resizing the vector and only touch our own index RwLock::SharedLockScope _(BlocksLock); Blocks[BlockIndex].ChunkRawHashes = std::move(ChunkRawHashes); } @@ -2195,12 +2631,14 @@ BuildContainer(CidStore& ChunkStore, 0); } - remotestore_impl::ReportMessage(OptionalContext, - fmt::format("Built oplog and collected {} attachments from {} ops into {} blocks and in {}", - ChunkAssembleCount, - TotalOpCount, - GeneratedBlockCount, - NiceTimeSpanMs(static_cast<uint64_t>(Timer.GetElapsedTimeMs())))); + remotestore_impl::ReportMessage( + OptionalContext, + fmt::format("Built oplog and collected {} attachments from {} ops into {} blocks and {} loose attachments in {}", + ChunkAssembleCount, + TotalOpCount, + GeneratedBlockCount, + LargeChunkHashes.size(), + NiceTimeSpanMs(static_cast<uint64_t>(Timer.GetElapsedTimeMs())))); if (remotestore_impl::IsCancelled(OptionalContext)) { @@ -2752,30 +3190,32 @@ SaveOplog(CidStore& ChunkStore, remotestore_impl::LogRemoteStoreStatsDetails(RemoteStore.GetStats()); - remotestore_impl::ReportMessage(OptionalContext, - fmt::format("Saved oplog '{}' {} in {} ({}), Blocks: {} ({}), Attachments: {} ({}) {}", - RemoteStoreInfo.ContainerName, - RemoteResult.GetError() == 0 ? "SUCCESS" : "FAILURE", - NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0)), - NiceBytes(Info.OplogSizeBytes), - Info.AttachmentBlocksUploaded.load(), - NiceBytes(Info.AttachmentBlockBytesUploaded.load()), - Info.AttachmentsUploaded.load(), - NiceBytes(Info.AttachmentBytesUploaded.load()), - remotestore_impl::GetStats(RemoteStore.GetStats(), TransferWallTimeMS))); + remotestore_impl::ReportMessage( + OptionalContext, + fmt::format("Saved oplog '{}' {} in {} ({}), Blocks: {} ({}), Attachments: {} ({}) {}", + RemoteStoreInfo.ContainerName, + RemoteResult.GetError() == 0 ? "SUCCESS" : "FAILURE", + NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0)), + NiceBytes(Info.OplogSizeBytes), + Info.AttachmentBlocksUploaded.load(), + NiceBytes(Info.AttachmentBlockBytesUploaded.load()), + Info.AttachmentsUploaded.load(), + NiceBytes(Info.AttachmentBytesUploaded.load()), + remotestore_impl::GetStats(RemoteStore.GetStats(), /*OptionalCacheStats*/ nullptr, TransferWallTimeMS))); return Result; }; RemoteProjectStore::Result -ParseOplogContainer(const CbObject& ContainerObject, - const std::function<void(std::span<IoHash> RawHashes)>& OnReferencedAttachments, - const std::function<bool(const IoHash& RawHash)>& HasAttachment, - const std::function<void(const IoHash& BlockHash, std::vector<IoHash>&& Chunks)>& OnNeedBlock, - const std::function<void(const IoHash& RawHash)>& OnNeedAttachment, - const std::function<void(const ChunkedInfo&)>& OnChunkedAttachment, - CbObject& OutOplogSection, - JobContext* OptionalContext) +ParseOplogContainer( + const CbObject& ContainerObject, + const std::function<void(std::span<IoHash> RawHashes)>& OnReferencedAttachments, + const std::function<bool(const IoHash& RawHash)>& HasAttachment, + const std::function<void(ThinChunkBlockDescription&& ThinBlockDescription, std::vector<uint32_t>&& NeededChunkIndexes)>& OnNeedBlock, + const std::function<void(const IoHash& RawHash)>& OnNeedAttachment, + const std::function<void(const ChunkedInfo&)>& OnChunkedAttachment, + CbObject& OutOplogSection, + JobContext* OptionalContext) { using namespace std::literals; @@ -2801,22 +3241,43 @@ ParseOplogContainer(const CbObject& ContainerObject, "Section has unexpected data type", "Failed to save oplog container"}; } - std::unordered_set<IoHash, IoHash::Hasher> OpsAttachments; + std::unordered_set<IoHash, IoHash::Hasher> NeededAttachments; { CbArrayView OpsArray = OutOplogSection["ops"sv].AsArrayView(); + + size_t OpCount = OpsArray.Num(); + size_t OpsCompleteCount = 0; + + remotestore_impl::ReportMessage(OptionalContext, fmt::format("Scanning {} ops for attachments", OpCount)); + for (CbFieldView OpEntry : OpsArray) { - OpEntry.IterateAttachments([&](CbFieldView FieldView) { OpsAttachments.insert(FieldView.AsAttachment()); }); + OpEntry.IterateAttachments([&](CbFieldView FieldView) { NeededAttachments.insert(FieldView.AsAttachment()); }); if (remotestore_impl::IsCancelled(OptionalContext)) { return RemoteProjectStore::Result{.ErrorCode = gsl::narrow<int>(HttpResponseCode::OK), .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0, .Reason = "Operation cancelled"}; } + OpsCompleteCount++; + if ((OpsCompleteCount & 4095) == 0) + { + remotestore_impl::ReportProgress( + OptionalContext, + "Scanning oplog"sv, + fmt::format("{} attachments found, {} ops remaining...", NeededAttachments.size(), OpCount - OpsCompleteCount), + OpCount, + OpCount - OpsCompleteCount); + } } + remotestore_impl::ReportProgress(OptionalContext, + "Scanning oplog"sv, + fmt::format("{} attachments found", NeededAttachments.size()), + OpCount, + OpCount - OpsCompleteCount); } { - std::vector<IoHash> ReferencedAttachments(OpsAttachments.begin(), OpsAttachments.end()); + std::vector<IoHash> ReferencedAttachments(NeededAttachments.begin(), NeededAttachments.end()); OnReferencedAttachments(ReferencedAttachments); } @@ -2827,24 +3288,41 @@ ParseOplogContainer(const CbObject& ContainerObject, .Reason = "Operation cancelled"}; } - remotestore_impl::ReportMessage(OptionalContext, fmt::format("Oplog references {} attachments", OpsAttachments.size())); + remotestore_impl::ReportMessage(OptionalContext, fmt::format("Oplog references {} attachments", NeededAttachments.size())); CbArrayView ChunkedFilesArray = ContainerObject["chunkedfiles"sv].AsArrayView(); for (CbFieldView ChunkedFileField : ChunkedFilesArray) { CbObjectView ChunkedFileView = ChunkedFileField.AsObjectView(); IoHash RawHash = ChunkedFileView["rawhash"sv].AsHash(); - if (OpsAttachments.contains(RawHash) && (!HasAttachment(RawHash))) + if (NeededAttachments.erase(RawHash) == 1) { - ChunkedInfo Chunked = ReadChunkedInfo(ChunkedFileView); + if (!HasAttachment(RawHash)) + { + ChunkedInfo Chunked = ReadChunkedInfo(ChunkedFileView); + + size_t NeededChunkAttachmentCount = 0; - OnReferencedAttachments(Chunked.ChunkHashes); - OpsAttachments.insert(Chunked.ChunkHashes.begin(), Chunked.ChunkHashes.end()); - OnChunkedAttachment(Chunked); - ZEN_INFO("Requesting chunked attachment '{}' ({}) built from {} chunks", - Chunked.RawHash, - NiceBytes(Chunked.RawSize), - Chunked.ChunkHashes.size()); + OnReferencedAttachments(Chunked.ChunkHashes); + for (const IoHash& ChunkHash : Chunked.ChunkHashes) + { + if (!HasAttachment(ChunkHash)) + { + if (NeededAttachments.insert(ChunkHash).second) + { + NeededChunkAttachmentCount++; + } + } + } + OnChunkedAttachment(Chunked); + + remotestore_impl::ReportMessage(OptionalContext, + fmt::format("Requesting chunked attachment '{}' ({}) built from {} chunks, need {} chunks", + Chunked.RawHash, + NiceBytes(Chunked.RawSize), + Chunked.ChunkHashes.size(), + NeededChunkAttachmentCount)); + } } if (remotestore_impl::IsCancelled(OptionalContext)) { @@ -2854,6 +3332,8 @@ ParseOplogContainer(const CbObject& ContainerObject, } } + std::vector<ThinChunkBlockDescription> ThinBlocksDescriptions; + size_t NeedBlockCount = 0; CbArrayView BlocksArray = ContainerObject["blocks"sv].AsArrayView(); for (CbFieldView BlockField : BlocksArray) @@ -2863,45 +3343,38 @@ ParseOplogContainer(const CbObject& ContainerObject, CbArrayView ChunksArray = BlockView["chunks"sv].AsArrayView(); - std::vector<IoHash> NeededChunks; - NeededChunks.reserve(ChunksArray.Num()); - if (BlockHash == IoHash::Zero) + std::vector<IoHash> ChunkHashes; + ChunkHashes.reserve(ChunksArray.Num()); + for (CbFieldView ChunkField : ChunksArray) { - for (CbFieldView ChunkField : ChunksArray) - { - IoHash ChunkHash = ChunkField.AsBinaryAttachment(); - if (OpsAttachments.erase(ChunkHash) == 1) - { - if (!HasAttachment(ChunkHash)) - { - NeededChunks.emplace_back(ChunkHash); - } - } - } + ChunkHashes.push_back(ChunkField.AsHash()); } - else + ThinBlocksDescriptions.push_back(ThinChunkBlockDescription{.BlockHash = BlockHash, .ChunkRawHashes = std::move(ChunkHashes)}); + } + + for (ThinChunkBlockDescription& ThinBlockDescription : ThinBlocksDescriptions) + { + std::vector<uint32_t> NeededBlockChunkIndexes; + for (uint32_t ChunkIndex = 0; ChunkIndex < ThinBlockDescription.ChunkRawHashes.size(); ChunkIndex++) { - for (CbFieldView ChunkField : ChunksArray) + const IoHash& ChunkHash = ThinBlockDescription.ChunkRawHashes[ChunkIndex]; + if (NeededAttachments.erase(ChunkHash) == 1) { - const IoHash ChunkHash = ChunkField.AsHash(); - if (OpsAttachments.erase(ChunkHash) == 1) + if (!HasAttachment(ChunkHash)) { - if (!HasAttachment(ChunkHash)) - { - NeededChunks.emplace_back(ChunkHash); - } + NeededBlockChunkIndexes.push_back(ChunkIndex); } } } - - if (!NeededChunks.empty()) + if (!NeededBlockChunkIndexes.empty()) { - OnNeedBlock(BlockHash, std::move(NeededChunks)); - if (BlockHash != IoHash::Zero) + if (ThinBlockDescription.BlockHash != IoHash::Zero) { NeedBlockCount++; } + OnNeedBlock(std::move(ThinBlockDescription), std::move(NeededBlockChunkIndexes)); } + if (remotestore_impl::IsCancelled(OptionalContext)) { return RemoteProjectStore::Result{.ErrorCode = gsl::narrow<int>(HttpResponseCode::OK), @@ -2909,6 +3382,7 @@ ParseOplogContainer(const CbObject& ContainerObject, .Reason = "Operation cancelled"}; } } + remotestore_impl::ReportMessage(OptionalContext, fmt::format("Requesting {} of {} attachment blocks", NeedBlockCount, BlocksArray.Num())); @@ -2918,7 +3392,7 @@ ParseOplogContainer(const CbObject& ContainerObject, { IoHash AttachmentHash = LargeChunksField.AsBinaryAttachment(); - if (OpsAttachments.erase(AttachmentHash) == 1) + if (NeededAttachments.erase(AttachmentHash) == 1) { if (!HasAttachment(AttachmentHash)) { @@ -2941,14 +3415,15 @@ ParseOplogContainer(const CbObject& ContainerObject, } RemoteProjectStore::Result -SaveOplogContainer(ProjectStore::Oplog& Oplog, - const CbObject& ContainerObject, - const std::function<void(std::span<IoHash> RawHashes)>& OnReferencedAttachments, - const std::function<bool(const IoHash& RawHash)>& HasAttachment, - const std::function<void(const IoHash& BlockHash, std::vector<IoHash>&& Chunks)>& OnNeedBlock, - const std::function<void(const IoHash& RawHash)>& OnNeedAttachment, - const std::function<void(const ChunkedInfo&)>& OnChunkedAttachment, - JobContext* OptionalContext) +SaveOplogContainer( + ProjectStore::Oplog& Oplog, + const CbObject& ContainerObject, + const std::function<void(std::span<IoHash> RawHashes)>& OnReferencedAttachments, + const std::function<bool(const IoHash& RawHash)>& HasAttachment, + const std::function<void(ThinChunkBlockDescription&& ThinBlockDescription, std::vector<uint32_t>&& NeededChunkIndexes)>& OnNeedBlock, + const std::function<void(const IoHash& RawHash)>& OnNeedAttachment, + const std::function<void(const ChunkedInfo&)>& OnChunkedAttachment, + JobContext* OptionalContext) { using namespace std::literals; @@ -2972,18 +3447,12 @@ SaveOplogContainer(ProjectStore::Oplog& Oplog, } RemoteProjectStore::Result -LoadOplog(CidStore& ChunkStore, - RemoteProjectStore& RemoteStore, - ProjectStore::Oplog& Oplog, - WorkerThreadPool& NetworkWorkerPool, - WorkerThreadPool& WorkerPool, - bool ForceDownload, - bool IgnoreMissingAttachments, - bool CleanOplog, - JobContext* OptionalContext) +LoadOplog(LoadOplogContext&& Context) { using namespace std::literals; + std::unique_ptr<OperationLogOutput> LogOutput(std::make_unique<remotestore_impl::JobContextLogOutput>(Context.OptionalJobContext)); + remotestore_impl::DownloadInfo Info; Stopwatch Timer; @@ -2991,25 +3460,25 @@ LoadOplog(CidStore& ChunkStore, std::unordered_set<IoHash, IoHash::Hasher> Attachments; uint64_t BlockCountToDownload = 0; - RemoteProjectStore::RemoteStoreInfo RemoteStoreInfo = RemoteStore.GetInfo(); - remotestore_impl::ReportMessage(OptionalContext, fmt::format("Loading oplog container '{}'", RemoteStoreInfo.ContainerName)); + RemoteProjectStore::RemoteStoreInfo RemoteStoreInfo = Context.RemoteStore.GetInfo(); + remotestore_impl::ReportMessage(Context.OptionalJobContext, fmt::format("Loading oplog container '{}'", RemoteStoreInfo.ContainerName)); uint64_t TransferWallTimeMS = 0; Stopwatch LoadContainerTimer; - RemoteProjectStore::LoadContainerResult LoadContainerResult = RemoteStore.LoadContainer(); + RemoteProjectStore::LoadContainerResult LoadContainerResult = Context.RemoteStore.LoadContainer(); TransferWallTimeMS += LoadContainerTimer.GetElapsedTimeMs(); if (LoadContainerResult.ErrorCode) { remotestore_impl::ReportMessage( - OptionalContext, + Context.OptionalJobContext, fmt::format("Failed to load oplog container: '{}', error code: {}", LoadContainerResult.Reason, LoadContainerResult.ErrorCode)); return RemoteProjectStore::Result{.ErrorCode = LoadContainerResult.ErrorCode, .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0, .Reason = LoadContainerResult.Reason, .Text = LoadContainerResult.Text}; } - remotestore_impl::ReportMessage(OptionalContext, + remotestore_impl::ReportMessage(Context.OptionalJobContext, fmt::format("Loaded container in {} ({})", NiceTimeSpanMs(static_cast<uint64_t>(LoadContainerResult.ElapsedSeconds * 1000)), NiceBytes(LoadContainerResult.ContainerObject.GetSize()))); @@ -3023,22 +3492,27 @@ LoadOplog(CidStore& ChunkStore, Stopwatch LoadAttachmentsTimer; std::atomic_uint64_t DownloadStartMS = (std::uint64_t)-1; - auto HasAttachment = [&Oplog, &ChunkStore, ForceDownload](const IoHash& RawHash) { - if (ForceDownload) + auto HasAttachment = [&Context](const IoHash& RawHash) { + if (Context.ForceDownload) { return false; } - if (ChunkStore.ContainsChunk(RawHash)) + if (Context.ChunkStore.ContainsChunk(RawHash)) { return true; } return false; }; - auto OnNeedBlock = [&RemoteStore, - &ChunkStore, - &NetworkWorkerPool, - &WorkerPool, + struct NeededBlockDownload + { + ThinChunkBlockDescription ThinBlockDescription; + std::vector<uint32_t> NeededChunkIndexes; + }; + + std::vector<NeededBlockDownload> NeededBlockDownloads; + + auto OnNeedBlock = [&Context, &AttachmentsDownloadLatch, &AttachmentsWriteLatch, &AttachmentCount, @@ -3047,8 +3521,8 @@ LoadOplog(CidStore& ChunkStore, &Info, &LoadAttachmentsTimer, &DownloadStartMS, - IgnoreMissingAttachments, - OptionalContext](const IoHash& BlockHash, std::vector<IoHash>&& Chunks) { + &NeededBlockDownloads](ThinChunkBlockDescription&& ThinBlockDescription, + std::vector<uint32_t>&& NeededChunkIndexes) { if (RemoteResult.IsError()) { return; @@ -3056,47 +3530,26 @@ LoadOplog(CidStore& ChunkStore, BlockCountToDownload++; AttachmentCount.fetch_add(1); - if (BlockHash == IoHash::Zero) - { - DownloadAndSaveBlockChunks(ChunkStore, - RemoteStore, - IgnoreMissingAttachments, - OptionalContext, - NetworkWorkerPool, - WorkerPool, + if (ThinBlockDescription.BlockHash == IoHash::Zero) + { + DownloadAndSaveBlockChunks(Context, AttachmentsDownloadLatch, AttachmentsWriteLatch, RemoteResult, Info, LoadAttachmentsTimer, DownloadStartMS, - Chunks); + std::move(ThinBlockDescription), + std::move(NeededChunkIndexes)); } else { - DownloadAndSaveBlock(ChunkStore, - RemoteStore, - IgnoreMissingAttachments, - OptionalContext, - NetworkWorkerPool, - WorkerPool, - AttachmentsDownloadLatch, - AttachmentsWriteLatch, - RemoteResult, - Info, - LoadAttachmentsTimer, - DownloadStartMS, - BlockHash, - Chunks, - 3); + NeededBlockDownloads.push_back(NeededBlockDownload{.ThinBlockDescription = std::move(ThinBlockDescription), + .NeededChunkIndexes = std::move(NeededChunkIndexes)}); } }; - auto OnNeedAttachment = [&RemoteStore, - &Oplog, - &ChunkStore, - &NetworkWorkerPool, - &WorkerPool, + auto OnNeedAttachment = [&Context, &AttachmentsDownloadLatch, &AttachmentsWriteLatch, &RemoteResult, @@ -3104,9 +3557,7 @@ LoadOplog(CidStore& ChunkStore, &AttachmentCount, &LoadAttachmentsTimer, &DownloadStartMS, - &Info, - IgnoreMissingAttachments, - OptionalContext](const IoHash& RawHash) { + &Info](const IoHash& RawHash) { if (!Attachments.insert(RawHash).second) { return; @@ -3116,12 +3567,7 @@ LoadOplog(CidStore& ChunkStore, return; } AttachmentCount.fetch_add(1); - DownloadAndSaveAttachment(ChunkStore, - RemoteStore, - IgnoreMissingAttachments, - OptionalContext, - NetworkWorkerPool, - WorkerPool, + DownloadAndSaveAttachment(Context, AttachmentsDownloadLatch, AttachmentsWriteLatch, RemoteResult, @@ -3132,18 +3578,13 @@ LoadOplog(CidStore& ChunkStore, }; std::vector<ChunkedInfo> FilesToDechunk; - auto OnChunkedAttachment = [&Oplog, &ChunkStore, &FilesToDechunk, ForceDownload](const ChunkedInfo& Chunked) { - if (ForceDownload || !ChunkStore.ContainsChunk(Chunked.RawHash)) - { - FilesToDechunk.push_back(Chunked); - } - }; + auto OnChunkedAttachment = [&FilesToDechunk](const ChunkedInfo& Chunked) { FilesToDechunk.push_back(Chunked); }; - auto OnReferencedAttachments = [&Oplog](std::span<IoHash> RawHashes) { Oplog.CaptureAddedAttachments(RawHashes); }; + auto OnReferencedAttachments = [&Context](std::span<IoHash> RawHashes) { Context.Oplog.CaptureAddedAttachments(RawHashes); }; // Make sure we retain any attachments we download before writing the oplog - Oplog.EnableUpdateCapture(); - auto _ = MakeGuard([&Oplog]() { Oplog.DisableUpdateCapture(); }); + Context.Oplog.EnableUpdateCapture(); + auto _ = MakeGuard([&Context]() { Context.Oplog.DisableUpdateCapture(); }); CbObject OplogSection; RemoteProjectStore::Result Result = ParseOplogContainer(LoadContainerResult.ContainerObject, @@ -3153,40 +3594,268 @@ LoadOplog(CidStore& ChunkStore, OnNeedAttachment, OnChunkedAttachment, OplogSection, - OptionalContext); + Context.OptionalJobContext); if (Result.ErrorCode != 0) { RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); } - remotestore_impl::ReportMessage(OptionalContext, + remotestore_impl::ReportMessage(Context.OptionalJobContext, fmt::format("Parsed oplog in {}, found {} attachments, {} blocks and {} chunked files to download", NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0)), Attachments.size(), BlockCountToDownload, FilesToDechunk.size())); - AttachmentsDownloadLatch.CountDown(); - while (!AttachmentsDownloadLatch.Wait(1000)) + std::vector<IoHash> BlockHashes; + std::vector<IoHash> AllNeededChunkHashes; + BlockHashes.reserve(NeededBlockDownloads.size()); + for (const NeededBlockDownload& BlockDownload : NeededBlockDownloads) { - ptrdiff_t Remaining = AttachmentsDownloadLatch.Remaining(); - if (remotestore_impl::IsCancelled(OptionalContext)) + BlockHashes.push_back(BlockDownload.ThinBlockDescription.BlockHash); + for (uint32_t ChunkIndex : BlockDownload.NeededChunkIndexes) { - if (!RemoteResult.IsError()) + AllNeededChunkHashes.push_back(BlockDownload.ThinBlockDescription.ChunkRawHashes[ChunkIndex]); + } + } + + tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> AllNeededPartialChunkHashesLookup = BuildHashLookup(AllNeededChunkHashes); + std::vector<std::atomic<bool>> ChunkDownloadedFlags(AllNeededChunkHashes.size()); + std::vector<bool> DownloadedViaLegacyChunkFlag(AllNeededChunkHashes.size(), false); + ChunkBlockAnalyser::BlockResult PartialBlocksResult; + + remotestore_impl::ReportMessage(Context.OptionalJobContext, fmt::format("Fetching descriptions for {} blocks", BlockHashes.size())); + + RemoteProjectStore::GetBlockDescriptionsResult BlockDescriptions = + Context.RemoteStore.GetBlockDescriptions(BlockHashes, Context.OptionalCache, Context.CacheBuildId); + + remotestore_impl::ReportMessage(Context.OptionalJobContext, + fmt::format("GetBlockDescriptions took {}. Found {} blocks", + NiceTimeSpanMs(uint64_t(BlockDescriptions.ElapsedSeconds * 1000)), + BlockDescriptions.Blocks.size())); + + std::vector<IoHash> BlocksWithDescription; + BlocksWithDescription.reserve(BlockDescriptions.Blocks.size()); + for (const ChunkBlockDescription& BlockDescription : BlockDescriptions.Blocks) + { + BlocksWithDescription.push_back(BlockDescription.BlockHash); + } + { + auto WantIt = NeededBlockDownloads.begin(); + auto FindIt = BlockDescriptions.Blocks.begin(); + while (WantIt != NeededBlockDownloads.end()) + { + if (FindIt == BlockDescriptions.Blocks.end()) { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); + // Fall back to full download as we can't get enough information about the block + DownloadAndSaveBlock(Context, + AttachmentsDownloadLatch, + AttachmentsWriteLatch, + RemoteResult, + Info, + LoadAttachmentsTimer, + DownloadStartMS, + WantIt->ThinBlockDescription.BlockHash, + AllNeededPartialChunkHashesLookup, + ChunkDownloadedFlags, + 3); + for (uint32_t BlockChunkIndex : WantIt->NeededChunkIndexes) + { + const IoHash& ChunkHash = WantIt->ThinBlockDescription.ChunkRawHashes[BlockChunkIndex]; + auto It = AllNeededPartialChunkHashesLookup.find(ChunkHash); + ZEN_ASSERT(It != AllNeededPartialChunkHashesLookup.end()); + uint32_t ChunkIndex = It->second; + DownloadedViaLegacyChunkFlag[ChunkIndex] = true; + } + WantIt++; + } + else if (WantIt->ThinBlockDescription.BlockHash == FindIt->BlockHash) + { + // Found + FindIt++; + WantIt++; + } + else + { + // Not a requested block? + ZEN_ASSERT(false); } } - uint64_t PartialTransferWallTimeMS = TransferWallTimeMS; - if (DownloadStartMS != (uint64_t)-1) + } + if (!AllNeededChunkHashes.empty()) + { + std::vector<ChunkBlockAnalyser::EPartialBlockDownloadMode> PartialBlockDownloadModes; + std::vector<bool> BlockExistsInCache(BlocksWithDescription.size(), false); + + if (Context.PartialBlockRequestMode == EPartialBlockRequestMode::Off) { - PartialTransferWallTimeMS += LoadAttachmentsTimer.GetElapsedTimeMs() - DownloadStartMS.load(); + PartialBlockDownloadModes.resize(BlocksWithDescription.size(), ChunkBlockAnalyser::EPartialBlockDownloadMode::Off); + } + else + { + if (Context.OptionalCache) + { + std::vector<BuildStorageCache::BlobExistsResult> CacheExistsResult = + Context.OptionalCache->BlobsExists(Context.CacheBuildId, BlocksWithDescription); + if (CacheExistsResult.size() == BlocksWithDescription.size()) + { + for (size_t BlobIndex = 0; BlobIndex < CacheExistsResult.size(); BlobIndex++) + { + BlockExistsInCache[BlobIndex] = CacheExistsResult[BlobIndex].HasBody; + } + } + uint64_t FoundBlocks = + std::accumulate(BlockExistsInCache.begin(), + BlockExistsInCache.end(), + uint64_t(0u), + [](uint64_t Current, bool Exists) -> uint64_t { return Current + (Exists ? 1 : 0); }); + if (FoundBlocks > 0) + { + remotestore_impl::ReportMessage( + Context.OptionalJobContext, + fmt::format("Found {} out of {} blocks in cache", FoundBlocks, BlockExistsInCache.size())); + } + } + + ChunkBlockAnalyser::EPartialBlockDownloadMode CloudPartialDownloadMode = ChunkBlockAnalyser::EPartialBlockDownloadMode::Off; + ChunkBlockAnalyser::EPartialBlockDownloadMode CachePartialDownloadMode = ChunkBlockAnalyser::EPartialBlockDownloadMode::Off; + + switch (Context.PartialBlockRequestMode) + { + case EPartialBlockRequestMode::Off: + break; + case EPartialBlockRequestMode::ZenCacheOnly: + CachePartialDownloadMode = Context.CacheMaxRangeCountPerRequest > 1 + ? ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRangeHighSpeed + : ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRange; + CloudPartialDownloadMode = ChunkBlockAnalyser::EPartialBlockDownloadMode::Off; + break; + case EPartialBlockRequestMode::Mixed: + CachePartialDownloadMode = Context.CacheMaxRangeCountPerRequest > 1 + ? ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRangeHighSpeed + : ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRange; + CloudPartialDownloadMode = ChunkBlockAnalyser::EPartialBlockDownloadMode::SingleRange; + break; + case EPartialBlockRequestMode::All: + CachePartialDownloadMode = Context.CacheMaxRangeCountPerRequest > 1 + ? ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRangeHighSpeed + : ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRange; + CloudPartialDownloadMode = Context.StoreMaxRangeCountPerRequest > 1 + ? ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRange + : ChunkBlockAnalyser::EPartialBlockDownloadMode::SingleRange; + break; + } + + PartialBlockDownloadModes.reserve(BlocksWithDescription.size()); + for (uint32_t BlockIndex = 0; BlockIndex < BlocksWithDescription.size(); BlockIndex++) + { + const bool BlockExistInCache = BlockExistsInCache[BlockIndex]; + PartialBlockDownloadModes.push_back(BlockExistInCache ? CachePartialDownloadMode : CloudPartialDownloadMode); + } + } + + ZEN_ASSERT(PartialBlockDownloadModes.size() == BlocksWithDescription.size()); + + ChunkBlockAnalyser PartialAnalyser( + *LogOutput, + BlockDescriptions.Blocks, + ChunkBlockAnalyser::Options{.IsQuiet = false, + .IsVerbose = false, + .HostLatencySec = Context.StoreLatencySec, + .HostHighSpeedLatencySec = Context.CacheLatencySec, + .HostMaxRangeCountPerRequest = Context.StoreMaxRangeCountPerRequest, + .HostHighSpeedMaxRangeCountPerRequest = Context.CacheMaxRangeCountPerRequest}); + + std::vector<ChunkBlockAnalyser::NeededBlock> NeededBlocks = + PartialAnalyser.GetNeeded(AllNeededPartialChunkHashesLookup, + [&](uint32_t ChunkIndex) { return !DownloadedViaLegacyChunkFlag[ChunkIndex]; }); + + PartialBlocksResult = PartialAnalyser.CalculatePartialBlockDownloads(NeededBlocks, PartialBlockDownloadModes); + + for (uint32_t FullBlockIndex : PartialBlocksResult.FullBlockIndexes) + { + DownloadAndSaveBlock(Context, + AttachmentsDownloadLatch, + AttachmentsWriteLatch, + RemoteResult, + Info, + LoadAttachmentsTimer, + DownloadStartMS, + BlockDescriptions.Blocks[FullBlockIndex].BlockHash, + AllNeededPartialChunkHashesLookup, + ChunkDownloadedFlags, + 3); + } + + for (size_t BlockRangeIndex = 0; BlockRangeIndex < PartialBlocksResult.BlockRanges.size();) + { + size_t RangeCount = 1; + size_t RangesLeft = PartialBlocksResult.BlockRanges.size() - BlockRangeIndex; + const ChunkBlockAnalyser::BlockRangeDescriptor& CurrentBlockRange = PartialBlocksResult.BlockRanges[BlockRangeIndex]; + while (RangeCount < RangesLeft && + CurrentBlockRange.BlockIndex == PartialBlocksResult.BlockRanges[BlockRangeIndex + RangeCount].BlockIndex) + { + RangeCount++; + } + + DownloadAndSavePartialBlock(Context, + AttachmentsDownloadLatch, + AttachmentsWriteLatch, + RemoteResult, + Info, + LoadAttachmentsTimer, + DownloadStartMS, + BlockDescriptions.Blocks[CurrentBlockRange.BlockIndex], + BlockExistsInCache[CurrentBlockRange.BlockIndex], + PartialBlocksResult.BlockRanges, + BlockRangeIndex, + RangeCount, + AllNeededPartialChunkHashesLookup, + ChunkDownloadedFlags, + /* RetriesLeft*/ 3); + + BlockRangeIndex += RangeCount; + } + } + + AttachmentsDownloadLatch.CountDown(); + { + ptrdiff_t AttachmentCountToUseForProgress = AttachmentsDownloadLatch.Remaining(); + while (!AttachmentsDownloadLatch.Wait(1000)) + { + ptrdiff_t Remaining = AttachmentsDownloadLatch.Remaining(); + if (remotestore_impl::IsCancelled(Context.OptionalJobContext)) + { + if (!RemoteResult.IsError()) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); + } + } + uint64_t PartialTransferWallTimeMS = TransferWallTimeMS; + if (DownloadStartMS != (uint64_t)-1) + { + PartialTransferWallTimeMS += LoadAttachmentsTimer.GetElapsedTimeMs() - DownloadStartMS.load(); + } + + uint64_t AttachmentsDownloaded = + Info.AttachmentBlocksDownloaded.load() + Info.AttachmentBlocksRangesDownloaded.load() + Info.AttachmentsDownloaded.load(); + uint64_t AttachmentBytesDownloaded = Info.AttachmentBlockBytesDownloaded.load() + + Info.AttachmentBlockRangeBytesDownloaded.load() + Info.AttachmentBytesDownloaded.load(); + + AttachmentCountToUseForProgress = Max(Remaining, AttachmentCountToUseForProgress); + remotestore_impl::ReportProgress( + Context.OptionalJobContext, + "Loading attachments"sv, + fmt::format( + "{} ({}) downloaded, {} ({}) stored, {} remaining. {}", + AttachmentsDownloaded, + NiceBytes(AttachmentBytesDownloaded), + Info.AttachmentsStored.load(), + NiceBytes(Info.AttachmentBytesStored.load()), + Remaining, + remotestore_impl::GetStats(Context.RemoteStore.GetStats(), Context.OptionalCacheStats, PartialTransferWallTimeMS)), + AttachmentCountToUseForProgress, + Remaining); } - remotestore_impl::ReportProgress( - OptionalContext, - "Loading attachments"sv, - fmt::format("{} remaining. {}", Remaining, remotestore_impl::GetStats(RemoteStore.GetStats(), PartialTransferWallTimeMS)), - AttachmentCount.load(), - Remaining); } if (DownloadStartMS != (uint64_t)-1) { @@ -3195,57 +3864,58 @@ LoadOplog(CidStore& ChunkStore, if (AttachmentCount.load() > 0) { - remotestore_impl::ReportProgress(OptionalContext, - "Loading attachments"sv, - fmt::format("{}", remotestore_impl::GetStats(RemoteStore.GetStats(), TransferWallTimeMS)), - AttachmentCount.load(), - 0); + remotestore_impl::ReportProgress( + Context.OptionalJobContext, + "Loading attachments"sv, + fmt::format("{}", remotestore_impl::GetStats(Context.RemoteStore.GetStats(), Context.OptionalCacheStats, TransferWallTimeMS)), + AttachmentCount.load(), + 0); } AttachmentsWriteLatch.CountDown(); - while (!AttachmentsWriteLatch.Wait(1000)) { - ptrdiff_t Remaining = AttachmentsWriteLatch.Remaining(); - if (remotestore_impl::IsCancelled(OptionalContext)) + ptrdiff_t AttachmentCountToUseForProgress = AttachmentsWriteLatch.Remaining(); + while (!AttachmentsWriteLatch.Wait(1000)) { - if (!RemoteResult.IsError()) + ptrdiff_t Remaining = AttachmentsWriteLatch.Remaining(); + if (remotestore_impl::IsCancelled(Context.OptionalJobContext)) { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); + if (!RemoteResult.IsError()) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); + } } + AttachmentCountToUseForProgress = Max(Remaining, AttachmentCountToUseForProgress); + remotestore_impl::ReportProgress(Context.OptionalJobContext, + "Writing attachments"sv, + fmt::format("{} ({}), {} remaining.", + Info.AttachmentsStored.load(), + NiceBytes(Info.AttachmentBytesStored.load()), + Remaining), + AttachmentCountToUseForProgress, + Remaining); } - remotestore_impl::ReportProgress(OptionalContext, - "Writing attachments"sv, - fmt::format("{} remaining.", Remaining), - AttachmentCount.load(), - Remaining); } if (AttachmentCount.load() > 0) { - remotestore_impl::ReportProgress(OptionalContext, "Writing attachments", ""sv, AttachmentCount.load(), 0); + remotestore_impl::ReportProgress(Context.OptionalJobContext, "Writing attachments", ""sv, AttachmentCount.load(), 0); } if (Result.ErrorCode == 0) { if (!FilesToDechunk.empty()) { - remotestore_impl::ReportMessage(OptionalContext, fmt::format("Dechunking {} attachments", FilesToDechunk.size())); + remotestore_impl::ReportMessage(Context.OptionalJobContext, fmt::format("Dechunking {} attachments", FilesToDechunk.size())); Latch DechunkLatch(1); - std::filesystem::path TempFilePath = Oplog.TempPath(); + std::filesystem::path TempFilePath = Context.Oplog.TempPath(); for (const ChunkedInfo& Chunked : FilesToDechunk) { std::filesystem::path TempFileName = TempFilePath / Chunked.RawHash.ToHexString(); DechunkLatch.AddCount(1); - WorkerPool.ScheduleWork( - [&ChunkStore, - &DechunkLatch, - TempFileName, - &Chunked, - &RemoteResult, - IgnoreMissingAttachments, - &Info, - OptionalContext]() { + Context.WorkerPool.ScheduleWork( + [&Context, &DechunkLatch, TempFileName, &Chunked, &RemoteResult, &Info]() { ZEN_TRACE_CPU("DechunkAttachment"); auto _ = MakeGuard([&DechunkLatch, &TempFileName] { @@ -3279,16 +3949,16 @@ LoadOplog(CidStore& ChunkStore, for (std::uint32_t SequenceIndex : Chunked.ChunkSequence) { const IoHash& ChunkHash = Chunked.ChunkHashes[SequenceIndex]; - IoBuffer Chunk = ChunkStore.FindChunkByCid(ChunkHash); + IoBuffer Chunk = Context.ChunkStore.FindChunkByCid(ChunkHash); if (!Chunk) { remotestore_impl::ReportMessage( - OptionalContext, + Context.OptionalJobContext, fmt::format("Missing chunk {} for chunked attachment {}", ChunkHash, Chunked.RawHash)); // We only add 1 as the resulting missing count will be 1 for the dechunked file Info.MissingAttachmentCount.fetch_add(1); - if (!IgnoreMissingAttachments) + if (!Context.IgnoreMissingAttachments) { RemoteResult.SetError( gsl::narrow<int>(HttpResponseCode::NotFound), @@ -3306,7 +3976,7 @@ LoadOplog(CidStore& ChunkStore, if (RawHash != ChunkHash) { remotestore_impl::ReportMessage( - OptionalContext, + Context.OptionalJobContext, fmt::format("Mismatching raw hash {} for chunk {} for chunked attachment {}", RawHash, ChunkHash, @@ -3314,7 +3984,7 @@ LoadOplog(CidStore& ChunkStore, // We only add 1 as the resulting missing count will be 1 for the dechunked file Info.MissingAttachmentCount.fetch_add(1); - if (!IgnoreMissingAttachments) + if (!Context.IgnoreMissingAttachments) { RemoteResult.SetError( gsl::narrow<int>(HttpResponseCode::NotFound), @@ -3351,14 +4021,14 @@ LoadOplog(CidStore& ChunkStore, })) { remotestore_impl::ReportMessage( - OptionalContext, + Context.OptionalJobContext, fmt::format("Failed to decompress chunk {} for chunked attachment {}", ChunkHash, Chunked.RawHash)); // We only add 1 as the resulting missing count will be 1 for the dechunked file Info.MissingAttachmentCount.fetch_add(1); - if (!IgnoreMissingAttachments) + if (!Context.IgnoreMissingAttachments) { RemoteResult.SetError( gsl::narrow<int>(HttpResponseCode::NotFound), @@ -3380,11 +4050,12 @@ LoadOplog(CidStore& ChunkStore, TmpFile.Close(); TmpBuffer = IoBufferBuilder::MakeFromTemporaryFile(TempFileName); } + uint64_t TmpBufferSize = TmpBuffer.GetSize(); CidStore::InsertResult InsertResult = - ChunkStore.AddChunk(TmpBuffer, Chunked.RawHash, CidStore::InsertMode::kMayBeMovedInPlace); + Context.ChunkStore.AddChunk(TmpBuffer, Chunked.RawHash, CidStore::InsertMode::kMayBeMovedInPlace); if (InsertResult.New) { - Info.AttachmentBytesStored.fetch_add(TmpBuffer.GetSize()); + Info.AttachmentBytesStored.fetch_add(TmpBufferSize); Info.AttachmentsStored.fetch_add(1); } @@ -3407,54 +4078,58 @@ LoadOplog(CidStore& ChunkStore, while (!DechunkLatch.Wait(1000)) { ptrdiff_t Remaining = DechunkLatch.Remaining(); - if (remotestore_impl::IsCancelled(OptionalContext)) + if (remotestore_impl::IsCancelled(Context.OptionalJobContext)) { if (!RemoteResult.IsError()) { RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); remotestore_impl::ReportMessage( - OptionalContext, + Context.OptionalJobContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); } } - remotestore_impl::ReportProgress(OptionalContext, + remotestore_impl::ReportProgress(Context.OptionalJobContext, "Dechunking attachments"sv, fmt::format("{} remaining...", Remaining), FilesToDechunk.size(), Remaining); } - remotestore_impl::ReportProgress(OptionalContext, "Dechunking attachments"sv, ""sv, FilesToDechunk.size(), 0); + remotestore_impl::ReportProgress(Context.OptionalJobContext, "Dechunking attachments"sv, ""sv, FilesToDechunk.size(), 0); } Result = RemoteResult.ConvertResult(); } if (Result.ErrorCode == 0) { - if (CleanOplog) + if (Context.CleanOplog) { - RemoteStore.Flush(); - if (!Oplog.Reset()) + if (Context.OptionalCache) + { + Context.OptionalCache->Flush(100, [](intptr_t) { return /*DontWaitForPendingOperation*/ false; }); + } + if (!Context.Oplog.Reset()) { Result = RemoteProjectStore::Result{.ErrorCode = gsl::narrow<int>(HttpResponseCode::InternalServerError), .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0, - .Reason = fmt::format("Failed to clean existing oplog '{}'", Oplog.OplogId())}; - remotestore_impl::ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", Result.ErrorCode, Result.Reason)); + .Reason = fmt::format("Failed to clean existing oplog '{}'", Context.Oplog.OplogId())}; + remotestore_impl::ReportMessage(Context.OptionalJobContext, + fmt::format("Aborting ({}): {}", Result.ErrorCode, Result.Reason)); } } if (Result.ErrorCode == 0) { - remotestore_impl::WriteOplogSection(Oplog, OplogSection, OptionalContext); + remotestore_impl::WriteOplogSection(Context.Oplog, OplogSection, Context.OptionalJobContext); } } Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; - remotestore_impl::LogRemoteStoreStatsDetails(RemoteStore.GetStats()); + remotestore_impl::LogRemoteStoreStatsDetails(Context.RemoteStore.GetStats()); { std::string DownloadDetails; RemoteProjectStore::ExtendedStats ExtendedStats; - if (RemoteStore.GetExtendedStats(ExtendedStats)) + if (Context.RemoteStore.GetExtendedStats(ExtendedStats)) { if (!ExtendedStats.m_ReceivedBytesPerSource.empty()) { @@ -3473,26 +4148,37 @@ LoadOplog(CidStore& ChunkStore, Total += It.second; } - remotestore_impl::ReportMessage(OptionalContext, fmt::format("Downloaded {} ({})", NiceBytes(Total), SB.ToView())); + remotestore_impl::ReportMessage(Context.OptionalJobContext, + fmt::format("Downloaded {} ({})", NiceBytes(Total), SB.ToView())); } } } + uint64_t TotalDownloads = + 1 + Info.AttachmentBlocksDownloaded.load() + Info.AttachmentBlocksRangesDownloaded.load() + Info.AttachmentsDownloaded.load(); + uint64_t TotalBytesDownloaded = Info.OplogSizeBytes + Info.AttachmentBlockBytesDownloaded.load() + + Info.AttachmentBlockRangeBytesDownloaded.load() + Info.AttachmentBytesDownloaded.load(); + remotestore_impl::ReportMessage( - OptionalContext, - fmt::format("Loaded oplog '{}' {} in {} ({}), Blocks: {} ({}), Attachments: {} ({}), Stored: {} ({}), Missing: {} {}", + Context.OptionalJobContext, + fmt::format("Loaded oplog '{}' {} in {} ({}), Blocks: {} ({}), BlockRanges: {} ({}), Attachments: {} " + "({}), Total: {} ({}), Stored: {} ({}), Missing: {} {}", RemoteStoreInfo.ContainerName, Result.ErrorCode == 0 ? "SUCCESS" : "FAILURE", NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0)), NiceBytes(Info.OplogSizeBytes), Info.AttachmentBlocksDownloaded.load(), NiceBytes(Info.AttachmentBlockBytesDownloaded.load()), + Info.AttachmentBlocksRangesDownloaded.load(), + NiceBytes(Info.AttachmentBlockRangeBytesDownloaded.load()), Info.AttachmentsDownloaded.load(), NiceBytes(Info.AttachmentBytesDownloaded.load()), + TotalDownloads, + NiceBytes(TotalBytesDownloaded), Info.AttachmentsStored.load(), NiceBytes(Info.AttachmentBytesStored.load()), Info.MissingAttachmentCount.load(), - remotestore_impl::GetStats(RemoteStore.GetStats(), TransferWallTimeMS))); + remotestore_impl::GetStats(Context.RemoteStore.GetStats(), Context.OptionalCacheStats, TransferWallTimeMS))); return Result; } @@ -3537,7 +4223,7 @@ RemoteProjectStore::~RemoteProjectStore() #if ZEN_WITH_TESTS -namespace testutils { +namespace projectstore_testutils { using namespace std::literals; static std::string OidAsString(const Oid& Id) @@ -3589,7 +4275,29 @@ namespace testutils { return Result; } -} // namespace testutils + class TestJobContext : public JobContext + { + public: + explicit TestJobContext(int& OpIndex) : m_OpIndex(OpIndex) {} + virtual bool IsCancelled() const { return false; } + virtual void ReportMessage(std::string_view Message) { ZEN_INFO("Job {}: {}", m_OpIndex, Message); } + virtual void ReportProgress(std::string_view CurrentOp, std::string_view Details, ptrdiff_t TotalCount, ptrdiff_t RemainingCount) + { + ZEN_INFO("Job {}: Op '{}'{} {}/{}", + m_OpIndex, + CurrentOp, + Details.empty() ? "" : fmt::format(" {}", Details), + TotalCount - RemainingCount, + TotalCount); + } + + private: + int& m_OpIndex; + }; + +} // namespace projectstore_testutils + +TEST_SUITE_BEGIN("remotestore.projectstore"); struct ExportForceDisableBlocksTrue_ForceTempBlocksFalse { @@ -3616,7 +4324,7 @@ TEST_CASE_TEMPLATE("project.store.export", ExportForceDisableBlocksFalse_ForceTempBlocksTrue) { using namespace std::literals; - using namespace testutils; + using namespace projectstore_testutils; ScopedTemporaryDirectory TempDir; ScopedTemporaryDirectory ExportDir; @@ -3684,56 +4392,712 @@ TEST_CASE_TEMPLATE("project.store.export", false, nullptr); - CHECK(ExportResult.ErrorCode == 0); + REQUIRE(ExportResult.ErrorCode == 0); Ref<ProjectStore::Oplog> OplogImport = Project->NewOplog("oplog2", {}); CHECK(OplogImport); - RemoteProjectStore::Result ImportResult = LoadOplog(CidStore, - *RemoteStore, - *OplogImport, - NetworkPool, - WorkerPool, - /*Force*/ false, - /*IgnoreMissingAttachments*/ false, - /*CleanOplog*/ false, - nullptr); + int OpJobIndex = 0; + TestJobContext OpJobContext(OpJobIndex); + + RemoteProjectStore::Result ImportResult = LoadOplog(LoadOplogContext{.ChunkStore = CidStore, + .RemoteStore = *RemoteStore, + .OptionalCache = nullptr, + .CacheBuildId = Oid::Zero, + .Oplog = *OplogImport, + .NetworkWorkerPool = NetworkPool, + .WorkerPool = WorkerPool, + .ForceDownload = false, + .IgnoreMissingAttachments = false, + .CleanOplog = false, + .PartialBlockRequestMode = EPartialBlockRequestMode::Mixed, + .OptionalJobContext = &OpJobContext}); CHECK(ImportResult.ErrorCode == 0); - - RemoteProjectStore::Result ImportForceResult = LoadOplog(CidStore, - *RemoteStore, - *OplogImport, - NetworkPool, - WorkerPool, - /*Force*/ true, - /*IgnoreMissingAttachments*/ false, - /*CleanOplog*/ false, - nullptr); + OpJobIndex++; + + RemoteProjectStore::Result ImportForceResult = LoadOplog(LoadOplogContext{.ChunkStore = CidStore, + .RemoteStore = *RemoteStore, + .OptionalCache = nullptr, + .CacheBuildId = Oid::Zero, + .Oplog = *OplogImport, + .NetworkWorkerPool = NetworkPool, + .WorkerPool = WorkerPool, + .ForceDownload = true, + .IgnoreMissingAttachments = false, + .CleanOplog = false, + .PartialBlockRequestMode = EPartialBlockRequestMode::Mixed, + .OptionalJobContext = &OpJobContext}); CHECK(ImportForceResult.ErrorCode == 0); - - RemoteProjectStore::Result ImportCleanResult = LoadOplog(CidStore, - *RemoteStore, - *OplogImport, - NetworkPool, - WorkerPool, - /*Force*/ false, - /*IgnoreMissingAttachments*/ false, - /*CleanOplog*/ true, - nullptr); + OpJobIndex++; + + RemoteProjectStore::Result ImportCleanResult = LoadOplog(LoadOplogContext{.ChunkStore = CidStore, + .RemoteStore = *RemoteStore, + .OptionalCache = nullptr, + .CacheBuildId = Oid::Zero, + .Oplog = *OplogImport, + .NetworkWorkerPool = NetworkPool, + .WorkerPool = WorkerPool, + .ForceDownload = false, + .IgnoreMissingAttachments = false, + .CleanOplog = true, + .PartialBlockRequestMode = EPartialBlockRequestMode::Mixed, + .OptionalJobContext = &OpJobContext}); CHECK(ImportCleanResult.ErrorCode == 0); - - RemoteProjectStore::Result ImportForceCleanResult = LoadOplog(CidStore, - *RemoteStore, - *OplogImport, - NetworkPool, - WorkerPool, - /*Force*/ true, - /*IgnoreMissingAttachments*/ false, - /*CleanOplog*/ true, - nullptr); + OpJobIndex++; + + RemoteProjectStore::Result ImportForceCleanResult = + LoadOplog(LoadOplogContext{.ChunkStore = CidStore, + .RemoteStore = *RemoteStore, + .OptionalCache = nullptr, + .CacheBuildId = Oid::Zero, + .Oplog = *OplogImport, + .NetworkWorkerPool = NetworkPool, + .WorkerPool = WorkerPool, + .ForceDownload = true, + .IgnoreMissingAttachments = false, + .CleanOplog = true, + .PartialBlockRequestMode = EPartialBlockRequestMode::Mixed, + .OptionalJobContext = &OpJobContext}); CHECK(ImportForceCleanResult.ErrorCode == 0); + OpJobIndex++; } +// Common oplog setup used by the two tests below. +// Returns a FileRemoteStore backed by ExportDir that has been populated with a SaveOplog call. +// Keeps the test data identical to project.store.export so the two test suites exercise the same blocks/attachments. +static RemoteProjectStore::Result +SetupExportStore(CidStore& CidStore, + ProjectStore::Project& Project, + WorkerThreadPool& NetworkPool, + WorkerThreadPool& WorkerPool, + const std::filesystem::path& ExportDir, + std::shared_ptr<RemoteProjectStore>& OutRemoteStore) +{ + using namespace projectstore_testutils; + using namespace std::literals; + + Ref<ProjectStore::Oplog> Oplog = Project.NewOplog("oplog_export", {}); + if (!Oplog) + { + return RemoteProjectStore::Result{.ErrorCode = -1}; + } + + Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), {})); + Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{77}))); + Oplog->AppendNewOplogEntry( + CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{7123, 583, 690, 99}))); + Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{55, 122}))); + Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage( + Oid::NewOid(), + CreateAttachments(std::initializer_list<size_t>{256u * 1024u, 92u * 1024u}, OodleCompressionLevel::None))); + + FileRemoteStoreOptions Options = {RemoteStoreOptions{.MaxBlockSize = 64u * 1024, + .MaxChunksPerBlock = 1000, + .MaxChunkEmbedSize = 32 * 1024u, + .ChunkFileSizeLimit = 64u * 1024u}, + /*.FolderPath =*/ExportDir, + /*.Name =*/std::string("oplog_export"), + /*.OptionalBaseName =*/std::string(), + /*.ForceDisableBlocks =*/false, + /*.ForceEnableTempBlocks =*/false}; + + OutRemoteStore = CreateFileRemoteStore(Log(), Options); + return SaveOplog(CidStore, + *OutRemoteStore, + Project, + *Oplog, + NetworkPool, + WorkerPool, + Options.MaxBlockSize, + Options.MaxChunksPerBlock, + Options.MaxChunkEmbedSize, + Options.ChunkFileSizeLimit, + /*EmbedLooseFiles*/ true, + /*ForceUpload*/ false, + /*IgnoreMissingAttachments*/ false, + /*OptionalContext*/ nullptr); +} + +// Creates an export store with a single oplog entry that packs six 512 KB chunks into one +// ~3 MB block (MaxBlockSize = 8 MB). The resulting block slack (~1.5 MB) far exceeds the +// 512 KB threshold that ChunkBlockAnalyser requires before it will consider partial-block +// downloads instead of full-block downloads. +// +// This function is self-contained: it creates its own GcManager, CidStore, ProjectStore and +// Project internally so that each call is independent of any outer test context. After +// SaveOplog returns, all persistent data lives on disk inside ExportDir and the caller can +// freely query OutRemoteStore without holding any references to the internal context. +static RemoteProjectStore::Result +SetupPartialBlockExportStore(WorkerThreadPool& NetworkPool, + WorkerThreadPool& WorkerPool, + const std::filesystem::path& ExportDir, + std::shared_ptr<RemoteProjectStore>& OutRemoteStore) +{ + using namespace projectstore_testutils; + using namespace std::literals; + + // Self-contained CAS and project store. Subdirectories of ExportDir keep everything + // together without relying on the outer TEST_CASE's ExportCidStore / ExportProject. + GcManager LocalGc; + CidStore LocalCidStore(LocalGc); + CidStoreConfiguration LocalCidConfig = {.RootDirectory = ExportDir / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; + LocalCidStore.Initialize(LocalCidConfig); + + std::filesystem::path LocalProjectBasePath = ExportDir / "proj"; + ProjectStore LocalProjectStore(LocalCidStore, LocalProjectBasePath, LocalGc, ProjectStore::Configuration{}); + Ref<ProjectStore::Project> LocalProject(LocalProjectStore.NewProject(LocalProjectBasePath / "p"sv, + "p"sv, + (ExportDir / "root").string(), + (ExportDir / "engine").string(), + (ExportDir / "game").string(), + (ExportDir / "game" / "game.uproject").string())); + + Ref<ProjectStore::Oplog> Oplog = LocalProject->NewOplog("oplog_partial_block", {}); + if (!Oplog) + { + return RemoteProjectStore::Result{.ErrorCode = -1}; + } + + // Six 512 KB chunks with OodleCompressionLevel::None so the compressed size stays large + // and the block genuinely exceeds the 512 KB slack threshold. + Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage( + Oid::NewOid(), + CreateAttachments(std::initializer_list<size_t>{512u * 1024u, 512u * 1024u, 512u * 1024u, 512u * 1024u, 512u * 1024u, 512u * 1024u}, + OodleCompressionLevel::None))); + + // MaxChunkEmbedSize must be larger than the compressed size of each 512 KB chunk + // (OodleCompressionLevel::None → compressed ≈ raw ≈ 512 KB). With the legacy + // 32 KB limit all six chunks would become loose large attachments and no block would + // be created, so we use the production default of 1.5 MB instead. + FileRemoteStoreOptions Options = {RemoteStoreOptions{.MaxBlockSize = 8u * 1024u * 1024u, + .MaxChunksPerBlock = 1000, + .MaxChunkEmbedSize = RemoteStoreOptions::DefaultMaxChunkEmbedSize, + .ChunkFileSizeLimit = 64u * 1024u * 1024u}, + /*.FolderPath =*/ExportDir, + /*.Name =*/std::string("oplog_partial_block"), + /*.OptionalBaseName =*/std::string(), + /*.ForceDisableBlocks =*/false, + /*.ForceEnableTempBlocks =*/false}; + OutRemoteStore = CreateFileRemoteStore(Log(), Options); + return SaveOplog(LocalCidStore, + *OutRemoteStore, + *LocalProject, + *Oplog, + NetworkPool, + WorkerPool, + Options.MaxBlockSize, + Options.MaxChunksPerBlock, + Options.MaxChunkEmbedSize, + Options.ChunkFileSizeLimit, + /*EmbedLooseFiles*/ true, + /*ForceUpload*/ false, + /*IgnoreMissingAttachments*/ false, + /*OptionalContext*/ nullptr); +} + +// Returns the first block hash that has at least MinChunkCount chunks, or a zero IoHash +// if no qualifying block exists in Store. +static IoHash +FindBlockWithMultipleChunks(RemoteProjectStore& Store, size_t MinChunkCount) +{ + RemoteProjectStore::LoadContainerResult ContainerResult = Store.LoadContainer(); + if (ContainerResult.ErrorCode != 0) + { + return {}; + } + std::vector<IoHash> BlockHashes = GetBlockHashesFromOplog(ContainerResult.ContainerObject); + if (BlockHashes.empty()) + { + return {}; + } + RemoteProjectStore::GetBlockDescriptionsResult Descriptions = Store.GetBlockDescriptions(BlockHashes, nullptr, Oid{}); + if (Descriptions.ErrorCode != 0) + { + return {}; + } + for (const ChunkBlockDescription& Desc : Descriptions.Blocks) + { + if (Desc.ChunkRawHashes.size() >= MinChunkCount) + { + return Desc.BlockHash; + } + } + return {}; +} + +// Loads BlockHash from Source and inserts every even-indexed chunk (0, 2, 4, …) into +// TargetCidStore. Odd-indexed chunks are left absent so that when an import is run +// against the same block, HasAttachment returns false for three non-adjacent positions +// — the minimum needed to exercise the multi-range partial-block download paths. +static void +SeedCidStoreWithAlternateChunks(CidStore& TargetCidStore, RemoteProjectStore& Source, const IoHash& BlockHash) +{ + RemoteProjectStore::LoadAttachmentResult BlockResult = Source.LoadAttachment(BlockHash); + if (BlockResult.ErrorCode != 0 || !BlockResult.Bytes) + { + return; + } + + IoHash RawHash; + uint64_t RawSize; + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(BlockResult.Bytes), RawHash, RawSize); + if (!Compressed) + { + return; + } + CompositeBuffer BlockPayload = Compressed.DecompressToComposite(); + if (!BlockPayload) + { + return; + } + + uint32_t ChunkIndex = 0; + uint64_t HeaderSize = 0; + IterateChunkBlock( + BlockPayload.Flatten(), + [&TargetCidStore, &ChunkIndex](CompressedBuffer&& Chunk, const IoHash& AttachmentHash) { + if (ChunkIndex % 2 == 0) + { + IoBuffer ChunkData = Chunk.GetCompressed().Flatten().AsIoBuffer(); + TargetCidStore.AddChunk(ChunkData, AttachmentHash); + } + ++ChunkIndex; + }, + HeaderSize); +} + +TEST_CASE("project.store.import.context_settings") +{ + using namespace std::literals; + using namespace projectstore_testutils; + + ScopedTemporaryDirectory TempDir; + ScopedTemporaryDirectory ExportDir; + + std::filesystem::path RootDir = TempDir.Path() / "root"; + std::filesystem::path EngineRootDir = TempDir.Path() / "engine"; + std::filesystem::path ProjectRootDir = TempDir.Path() / "game"; + std::filesystem::path ProjectFilePath = TempDir.Path() / "game" / "game.uproject"; + + // Export-side CAS and project store: used only by SetupExportStore to build the remote store + // payload. Kept separate from the import side so the two CAS instances are disjoint. + GcManager ExportGc; + CidStore ExportCidStore(ExportGc); + CidStoreConfiguration ExportCidConfig = {.RootDirectory = TempDir.Path() / "export_cas", + .TinyValueThreshold = 1024, + .HugeValueThreshold = 4096}; + ExportCidStore.Initialize(ExportCidConfig); + + std::filesystem::path ExportBasePath = TempDir.Path() / "export_projectstore"; + ProjectStore ExportProjectStore(ExportCidStore, ExportBasePath, ExportGc, ProjectStore::Configuration{}); + Ref<ProjectStore::Project> ExportProject(ExportProjectStore.NewProject(ExportBasePath / "proj1"sv, + "proj1"sv, + RootDir.string(), + EngineRootDir.string(), + ProjectRootDir.string(), + ProjectFilePath.string())); + + uint32_t NetworkWorkerCount = Max(GetHardwareConcurrency() / 4u, 2u); + uint32_t WorkerCount = (NetworkWorkerCount < GetHardwareConcurrency()) ? Max(GetHardwareConcurrency() - NetworkWorkerCount, 4u) : 4u; + WorkerThreadPool WorkerPool(WorkerCount); + WorkerThreadPool NetworkPool(NetworkWorkerCount); + + std::shared_ptr<RemoteProjectStore> RemoteStore; + RemoteProjectStore::Result ExportResult = + SetupExportStore(ExportCidStore, *ExportProject, NetworkPool, WorkerPool, ExportDir.Path(), RemoteStore); + REQUIRE(ExportResult.ErrorCode == 0); + + // Import-side CAS and project store: starts empty, mirroring a fresh machine that has never + // downloaded the data. HasAttachment() therefore returns false for every chunk, so the import + // genuinely contacts the remote store without needing ForceDownload on the populate pass. + GcManager ImportGc; + CidStore ImportCidStore(ImportGc); + CidStoreConfiguration ImportCidConfig = {.RootDirectory = TempDir.Path() / "import_cas", + .TinyValueThreshold = 1024, + .HugeValueThreshold = 4096}; + ImportCidStore.Initialize(ImportCidConfig); + + std::filesystem::path ImportBasePath = TempDir.Path() / "import_projectstore"; + ProjectStore ImportProjectStore(ImportCidStore, ImportBasePath, ImportGc, ProjectStore::Configuration{}); + Ref<ProjectStore::Project> ImportProject(ImportProjectStore.NewProject(ImportBasePath / "proj1"sv, + "proj1"sv, + RootDir.string(), + EngineRootDir.string(), + ProjectRootDir.string(), + ProjectFilePath.string())); + + const Oid CacheBuildId = Oid::NewOid(); + BuildStorageCache::Statistics CacheStats; + std::unique_ptr<BuildStorageCache> Cache = CreateInMemoryBuildStorageCache(256u, CacheStats); + auto ResetCacheStats = [&]() { + CacheStats.TotalBytesRead = 0; + CacheStats.TotalBytesWritten = 0; + CacheStats.TotalRequestCount = 0; + CacheStats.TotalRequestTimeUs = 0; + CacheStats.TotalExecutionTimeUs = 0; + CacheStats.PeakSentBytes = 0; + CacheStats.PeakReceivedBytes = 0; + CacheStats.PeakBytesPerSec = 0; + CacheStats.PutBlobCount = 0; + CacheStats.PutBlobByteCount = 0; + }; + + int OpJobIndex = 0; + + TestJobContext OpJobContext(OpJobIndex); + + // Helper: run a LoadOplog against the import-side CAS/project with the given context knobs. + // Each call creates a fresh oplog so repeated calls within one SUBCASE don't short-circuit on + // already-present data. + auto DoImport = [&](BuildStorageCache* OptCache, + EPartialBlockRequestMode Mode, + double StoreLatency, + uint64_t StoreRanges, + double CacheLatency, + uint64_t CacheRanges, + bool PopulateCache, + bool ForceDownload) -> RemoteProjectStore::Result { + Ref<ProjectStore::Oplog> ImportOplog = ImportProject->NewOplog(fmt::format("import_{}", OpJobIndex++), {}); + return LoadOplog(LoadOplogContext{.ChunkStore = ImportCidStore, + .RemoteStore = *RemoteStore, + .OptionalCache = OptCache, + .CacheBuildId = CacheBuildId, + .Oplog = *ImportOplog, + .NetworkWorkerPool = NetworkPool, + .WorkerPool = WorkerPool, + .ForceDownload = ForceDownload, + .IgnoreMissingAttachments = false, + .CleanOplog = false, + .PartialBlockRequestMode = Mode, + .PopulateCache = PopulateCache, + .StoreLatencySec = StoreLatency, + .StoreMaxRangeCountPerRequest = StoreRanges, + .CacheLatencySec = CacheLatency, + .CacheMaxRangeCountPerRequest = CacheRanges, + .OptionalJobContext = &OpJobContext}); + }; + + // Shorthand: Mode=All, low latency, 128 ranges for both store and cache. + auto ImportAll = [&](BuildStorageCache* OptCache, bool Populate, bool Force) { + return DoImport(OptCache, EPartialBlockRequestMode::All, 0.001, 128u, 0.001, 128u, Populate, Force); + }; + + SUBCASE("mode_off_no_cache") + { + // Baseline: no partial block requests, no cache. + RemoteProjectStore::Result R = + DoImport(nullptr, EPartialBlockRequestMode::Off, -1.0, (uint64_t)-1, -1.0, (uint64_t)-1, false, false); + CHECK(R.ErrorCode == 0); + } + + SUBCASE("mode_all_multirange_cloud_no_cache") + { + // StoreMaxRangeCountPerRequest > 1 → MultiRange cloud path. + RemoteProjectStore::Result R = DoImport(nullptr, EPartialBlockRequestMode::All, 0.001, 128u, -1.0, 0u, false, false); + CHECK(R.ErrorCode == 0); + } + + SUBCASE("mode_all_singlerange_cloud_no_cache") + { + // StoreMaxRangeCountPerRequest == 1 → SingleRange cloud path. + RemoteProjectStore::Result R = DoImport(nullptr, EPartialBlockRequestMode::All, 0.001, 1u, -1.0, 0u, false, false); + CHECK(R.ErrorCode == 0); + } + + SUBCASE("mode_mixed_high_latency_no_cache") + { + // High store latency encourages range merging; Mixed uses SingleRange for cloud, Off for cache. + RemoteProjectStore::Result R = DoImport(nullptr, EPartialBlockRequestMode::Mixed, 0.1, 128u, -1.0, 0u, false, false); + CHECK(R.ErrorCode == 0); + } + + SUBCASE("cache_populate_and_hit") + { + // First import: ImportCidStore is empty so all blocks are downloaded from the remote store + // and written to the cache. + RemoteProjectStore::Result PopulateResult = ImportAll(Cache.get(), /*PopulateCache=*/true, /*Force=*/false); + CHECK(PopulateResult.ErrorCode == 0); + CHECK(CacheStats.PutBlobCount > 0); + + // Re-import with ForceDownload=true: all chunks are now in ImportCidStore but Force overrides + // HasAttachment() so the download logic re-runs and serves blocks from the cache instead of + // the remote store. + ResetCacheStats(); + RemoteProjectStore::Result HitResult = ImportAll(Cache.get(), /*PopulateCache=*/false, /*Force=*/true); + CHECK(HitResult.ErrorCode == 0); + CHECK(CacheStats.PutBlobCount == 0); + // TotalRequestCount covers both full-blob cache hits and partial-range cache hits. + CHECK(CacheStats.TotalRequestCount > 0); + } + + SUBCASE("cache_no_populate_flag") + { + // Cache is provided but PopulateCache=false: blocks are downloaded to ImportCidStore but + // nothing should be written to the cache. + RemoteProjectStore::Result R = ImportAll(Cache.get(), /*PopulateCache=*/false, /*Force=*/false); + CHECK(R.ErrorCode == 0); + CHECK(CacheStats.PutBlobCount == 0); + } + + SUBCASE("mode_zencacheonly_cache_multirange") + { + // Pre-populate the cache via a plain import, then re-import with ZenCacheOnly + + // CacheMaxRangeCountPerRequest=128. With 100% of chunks needed, all blocks go to + // FullBlockIndexes and GetBuildBlob (full blob) is called from the cache. + // CacheMaxRangeCountPerRequest > 1 would route partial downloads through GetBuildBlobRanges + // if the analyser ever emits BlockRanges entries. + RemoteProjectStore::Result Populate = ImportAll(Cache.get(), /*PopulateCache=*/true, /*Force=*/false); + CHECK(Populate.ErrorCode == 0); + ResetCacheStats(); + + RemoteProjectStore::Result R = DoImport(Cache.get(), EPartialBlockRequestMode::ZenCacheOnly, 0.1, 128u, 0.001, 128u, false, true); + CHECK(R.ErrorCode == 0); + CHECK(CacheStats.TotalRequestCount > 0); + } + + SUBCASE("mode_zencacheonly_cache_singlerange") + { + // Pre-populate the cache, then re-import with ZenCacheOnly + CacheMaxRangeCountPerRequest=1. + // With 100% of chunks needed the analyser sends all blocks to FullBlockIndexes (full-block + // download path), which calls GetBuildBlob with no range offset — a full-blob cache hit. + // The single-range vs multi-range distinction only matters for the partial-block (BlockRanges) + // path, which is not reached when all chunks are needed. + RemoteProjectStore::Result Populate = ImportAll(Cache.get(), /*PopulateCache=*/true, /*Force=*/false); + CHECK(Populate.ErrorCode == 0); + ResetCacheStats(); + + RemoteProjectStore::Result R = DoImport(Cache.get(), EPartialBlockRequestMode::ZenCacheOnly, 0.1, 128u, 0.001, 1u, false, true); + CHECK(R.ErrorCode == 0); + CHECK(CacheStats.TotalRequestCount > 0); + } + + SUBCASE("mode_all_cache_and_cloud_multirange") + { + // Pre-populate cache; All mode uses multi-range for both the cache and cloud paths. + RemoteProjectStore::Result Populate = ImportAll(Cache.get(), /*PopulateCache=*/true, /*Force=*/false); + CHECK(Populate.ErrorCode == 0); + ResetCacheStats(); + + RemoteProjectStore::Result R = ImportAll(Cache.get(), /*PopulateCache=*/false, /*Force=*/true); + CHECK(R.ErrorCode == 0); + CHECK(CacheStats.TotalRequestCount > 0); + } + + SUBCASE("partial_block_cloud_multirange") + { + // Export store with 6 × 512 KB chunks packed into one ~3 MB block. + ScopedTemporaryDirectory PartialExportDir; + std::shared_ptr<RemoteProjectStore> PartialRemoteStore; + RemoteProjectStore::Result ExportR = + SetupPartialBlockExportStore(NetworkPool, WorkerPool, PartialExportDir.Path(), PartialRemoteStore); + REQUIRE(ExportR.ErrorCode == 0); + + // Seeding even-indexed chunks (0, 2, 4) leaves odd ones (1, 3, 5) absent in + // ImportCidStore. Three non-adjacent needed positions → three BlockRangeDescriptors. + IoHash BlockHash = FindBlockWithMultipleChunks(*PartialRemoteStore, 4u); + CHECK(BlockHash != IoHash::Zero); + SeedCidStoreWithAlternateChunks(ImportCidStore, *PartialRemoteStore, BlockHash); + + // StoreMaxRangeCountPerRequest=128 → all three ranges sent in one LoadAttachmentRanges call. + Ref<ProjectStore::Oplog> PartialOplog = ImportProject->NewOplog(fmt::format("partial_cloud_multi_{}", OpJobIndex++), {}); + RemoteProjectStore::Result R = LoadOplog(LoadOplogContext{.ChunkStore = ImportCidStore, + .RemoteStore = *PartialRemoteStore, + .OptionalCache = nullptr, + .CacheBuildId = CacheBuildId, + .Oplog = *PartialOplog, + .NetworkWorkerPool = NetworkPool, + .WorkerPool = WorkerPool, + .ForceDownload = false, + .IgnoreMissingAttachments = false, + .CleanOplog = false, + .PartialBlockRequestMode = EPartialBlockRequestMode::All, + .PopulateCache = false, + .StoreLatencySec = 0.001, + .StoreMaxRangeCountPerRequest = 128u, + .CacheLatencySec = -1.0, + .CacheMaxRangeCountPerRequest = 0u, + .OptionalJobContext = &OpJobContext}); + CHECK(R.ErrorCode == 0); + } + + SUBCASE("partial_block_cloud_singlerange") + { + // Same block layout as partial_block_cloud_multirange but StoreMaxRangeCountPerRequest=1. + // DownloadPartialBlock issues one LoadAttachmentRanges call per range. + ScopedTemporaryDirectory PartialExportDir; + std::shared_ptr<RemoteProjectStore> PartialRemoteStore; + RemoteProjectStore::Result ExportR = + SetupPartialBlockExportStore(NetworkPool, WorkerPool, PartialExportDir.Path(), PartialRemoteStore); + REQUIRE(ExportR.ErrorCode == 0); + + IoHash BlockHash = FindBlockWithMultipleChunks(*PartialRemoteStore, 4u); + CHECK(BlockHash != IoHash::Zero); + SeedCidStoreWithAlternateChunks(ImportCidStore, *PartialRemoteStore, BlockHash); + + Ref<ProjectStore::Oplog> PartialOplog = ImportProject->NewOplog(fmt::format("partial_cloud_single_{}", OpJobIndex++), {}); + RemoteProjectStore::Result R = LoadOplog(LoadOplogContext{.ChunkStore = ImportCidStore, + .RemoteStore = *PartialRemoteStore, + .OptionalCache = nullptr, + .CacheBuildId = CacheBuildId, + .Oplog = *PartialOplog, + .NetworkWorkerPool = NetworkPool, + .WorkerPool = WorkerPool, + .ForceDownload = false, + .IgnoreMissingAttachments = false, + .CleanOplog = false, + .PartialBlockRequestMode = EPartialBlockRequestMode::All, + .PopulateCache = false, + .StoreLatencySec = 0.001, + .StoreMaxRangeCountPerRequest = 1u, + .CacheLatencySec = -1.0, + .CacheMaxRangeCountPerRequest = 0u, + .OptionalJobContext = &OpJobContext}); + CHECK(R.ErrorCode == 0); + } + + SUBCASE("partial_block_cache_multirange") + { + ScopedTemporaryDirectory PartialExportDir; + std::shared_ptr<RemoteProjectStore> PartialRemoteStore; + RemoteProjectStore::Result ExportR = + SetupPartialBlockExportStore(NetworkPool, WorkerPool, PartialExportDir.Path(), PartialRemoteStore); + REQUIRE(ExportR.ErrorCode == 0); + + IoHash BlockHash = FindBlockWithMultipleChunks(*PartialRemoteStore, 4u); + CHECK(BlockHash != IoHash::Zero); + + // Phase 1: ImportCidStore starts empty → full block download from remote → PutBuildBlob + // populates the cache. + { + Ref<ProjectStore::Oplog> Phase1Oplog = ImportProject->NewOplog(fmt::format("partial_cache_multi_p1_{}", OpJobIndex++), {}); + RemoteProjectStore::Result Phase1R = LoadOplog(LoadOplogContext{.ChunkStore = ImportCidStore, + .RemoteStore = *PartialRemoteStore, + .OptionalCache = Cache.get(), + .CacheBuildId = CacheBuildId, + .Oplog = *Phase1Oplog, + .NetworkWorkerPool = NetworkPool, + .WorkerPool = WorkerPool, + .ForceDownload = false, + .IgnoreMissingAttachments = false, + .CleanOplog = false, + .PartialBlockRequestMode = EPartialBlockRequestMode::All, + .PopulateCache = true, + .StoreLatencySec = 0.001, + .StoreMaxRangeCountPerRequest = 128u, + .CacheLatencySec = 0.001, + .CacheMaxRangeCountPerRequest = 128u, + .OptionalJobContext = &OpJobContext}); + CHECK(Phase1R.ErrorCode == 0); + CHECK(CacheStats.PutBlobCount > 0); + } + ResetCacheStats(); + + // Phase 2: fresh CidStore with only even-indexed chunks seeded. + // HasAttachment returns false for odd chunks (1, 3, 5) → three BlockRangeDescriptors. + // Block is in cache from Phase 1 → cache partial path. + // CacheMaxRangeCountPerRequest=128 → SubRangeCount=3 > 1 → GetBuildBlobRanges. + GcManager Phase2Gc; + CidStore Phase2CidStore(Phase2Gc); + CidStoreConfiguration Phase2CidConfig = {.RootDirectory = TempDir.Path() / "partial_cas", + .TinyValueThreshold = 1024, + .HugeValueThreshold = 4096}; + Phase2CidStore.Initialize(Phase2CidConfig); + SeedCidStoreWithAlternateChunks(Phase2CidStore, *PartialRemoteStore, BlockHash); + + Ref<ProjectStore::Oplog> Phase2Oplog = ImportProject->NewOplog(fmt::format("partial_cache_multi_p2_{}", OpJobIndex++), {}); + RemoteProjectStore::Result Phase2R = LoadOplog(LoadOplogContext{.ChunkStore = Phase2CidStore, + .RemoteStore = *PartialRemoteStore, + .OptionalCache = Cache.get(), + .CacheBuildId = CacheBuildId, + .Oplog = *Phase2Oplog, + .NetworkWorkerPool = NetworkPool, + .WorkerPool = WorkerPool, + .ForceDownload = false, + .IgnoreMissingAttachments = false, + .CleanOplog = false, + .PartialBlockRequestMode = EPartialBlockRequestMode::ZenCacheOnly, + .PopulateCache = false, + .StoreLatencySec = 0.001, + .StoreMaxRangeCountPerRequest = 128u, + .CacheLatencySec = 0.001, + .CacheMaxRangeCountPerRequest = 128u, + .OptionalJobContext = &OpJobContext}); + CHECK(Phase2R.ErrorCode == 0); + CHECK(CacheStats.TotalRequestCount > 0); + } + + SUBCASE("partial_block_cache_singlerange") + { + ScopedTemporaryDirectory PartialExportDir; + std::shared_ptr<RemoteProjectStore> PartialRemoteStore; + RemoteProjectStore::Result ExportR = + SetupPartialBlockExportStore(NetworkPool, WorkerPool, PartialExportDir.Path(), PartialRemoteStore); + REQUIRE(ExportR.ErrorCode == 0); + + IoHash BlockHash = FindBlockWithMultipleChunks(*PartialRemoteStore, 4u); + CHECK(BlockHash != IoHash::Zero); + + // Phase 1: full block download from remote into cache. + { + Ref<ProjectStore::Oplog> Phase1Oplog = ImportProject->NewOplog(fmt::format("partial_cache_single_p1_{}", OpJobIndex++), {}); + RemoteProjectStore::Result Phase1R = LoadOplog(LoadOplogContext{.ChunkStore = ImportCidStore, + .RemoteStore = *PartialRemoteStore, + .OptionalCache = Cache.get(), + .CacheBuildId = CacheBuildId, + .Oplog = *Phase1Oplog, + .NetworkWorkerPool = NetworkPool, + .WorkerPool = WorkerPool, + .ForceDownload = false, + .IgnoreMissingAttachments = false, + .CleanOplog = false, + .PartialBlockRequestMode = EPartialBlockRequestMode::All, + .PopulateCache = true, + .StoreLatencySec = 0.001, + .StoreMaxRangeCountPerRequest = 128u, + .CacheLatencySec = 0.001, + .CacheMaxRangeCountPerRequest = 128u, + .OptionalJobContext = &OpJobContext}); + CHECK(Phase1R.ErrorCode == 0); + CHECK(CacheStats.PutBlobCount > 0); + } + ResetCacheStats(); + + // Phase 2: fresh CidStore with only even-indexed chunks seeded. + // CacheMaxRangeCountPerRequest=1 → SubRangeCount=Min(3,1)=1 → GetBuildBlob with range + // offset (single-range legacy cache path), called once per needed chunk range. + GcManager Phase2Gc; + CidStore Phase2CidStore(Phase2Gc); + CidStoreConfiguration Phase2CidConfig = {.RootDirectory = TempDir.Path() / "partial_cas_single", + .TinyValueThreshold = 1024, + .HugeValueThreshold = 4096}; + Phase2CidStore.Initialize(Phase2CidConfig); + SeedCidStoreWithAlternateChunks(Phase2CidStore, *PartialRemoteStore, BlockHash); + + Ref<ProjectStore::Oplog> Phase2Oplog = ImportProject->NewOplog(fmt::format("partial_cache_single_p2_{}", OpJobIndex++), {}); + RemoteProjectStore::Result Phase2R = LoadOplog(LoadOplogContext{.ChunkStore = Phase2CidStore, + .RemoteStore = *PartialRemoteStore, + .OptionalCache = Cache.get(), + .CacheBuildId = CacheBuildId, + .Oplog = *Phase2Oplog, + .NetworkWorkerPool = NetworkPool, + .WorkerPool = WorkerPool, + .ForceDownload = false, + .IgnoreMissingAttachments = false, + .CleanOplog = false, + .PartialBlockRequestMode = EPartialBlockRequestMode::ZenCacheOnly, + .PopulateCache = false, + .StoreLatencySec = 0.001, + .StoreMaxRangeCountPerRequest = 128u, + .CacheLatencySec = 0.001, + .CacheMaxRangeCountPerRequest = 1u, + .OptionalJobContext = &OpJobContext}); + CHECK(Phase2R.ErrorCode == 0); + CHECK(CacheStats.TotalRequestCount > 0); + } +} + +TEST_SUITE_END(); + #endif // ZEN_WITH_TESTS void |