diff options
Diffstat (limited to 'src/zenremotestore/projectstore/remoteprojectstore.cpp')
| -rw-r--r-- | src/zenremotestore/projectstore/remoteprojectstore.cpp | 946 |
1 files changed, 727 insertions, 219 deletions
diff --git a/src/zenremotestore/projectstore/remoteprojectstore.cpp b/src/zenremotestore/projectstore/remoteprojectstore.cpp index 8be8eb0df..2a9da6f58 100644 --- a/src/zenremotestore/projectstore/remoteprojectstore.cpp +++ b/src/zenremotestore/projectstore/remoteprojectstore.cpp @@ -14,6 +14,7 @@ #include <zencore/trace.h> #include <zencore/workthreadpool.h> #include <zenhttp/httpcommon.h> +#include <zenremotestore/chunking/chunkedcontent.h> #include <zenremotestore/chunking/chunkedfile.h> #include <zenremotestore/operationlogoutput.h> #include <zenstore/cidstore.h> @@ -229,29 +230,60 @@ namespace remotestore_impl { struct DownloadInfo { - uint64_t OplogSizeBytes = 0; - std::atomic<uint64_t> AttachmentsDownloaded = 0; - std::atomic<uint64_t> AttachmentBlocksDownloaded = 0; - std::atomic<uint64_t> AttachmentBytesDownloaded = 0; - std::atomic<uint64_t> AttachmentBlockBytesDownloaded = 0; - std::atomic<uint64_t> AttachmentsStored = 0; - std::atomic<uint64_t> AttachmentBytesStored = 0; - std::atomic_size_t MissingAttachmentCount = 0; + uint64_t OplogSizeBytes = 0; + std::atomic<uint64_t> AttachmentsDownloaded = 0; + std::atomic<uint64_t> AttachmentBlocksDownloaded = 0; + std::atomic<uint64_t> AttachmentBlocksRangesDownloaded = 0; + std::atomic<uint64_t> AttachmentBytesDownloaded = 0; + std::atomic<uint64_t> AttachmentBlockBytesDownloaded = 0; + std::atomic<uint64_t> AttachmentBlockRangeBytesDownloaded = 0; + std::atomic<uint64_t> AttachmentsStored = 0; + std::atomic<uint64_t> AttachmentBytesStored = 0; + std::atomic_size_t MissingAttachmentCount = 0; }; - void DownloadAndSaveBlockChunks(CidStore& ChunkStore, - RemoteProjectStore& RemoteStore, - bool IgnoreMissingAttachments, - JobContext* OptionalContext, - WorkerThreadPool& NetworkWorkerPool, - WorkerThreadPool& WorkerPool, - Latch& AttachmentsDownloadLatch, - Latch& AttachmentsWriteLatch, - AsyncRemoteResult& RemoteResult, - DownloadInfo& Info, - Stopwatch& LoadAttachmentsTimer, - std::atomic_uint64_t& DownloadStartMS, - const std::vector<IoHash>& Chunks) + class JobContextLogOutput : public OperationLogOutput + { + public: + JobContextLogOutput(JobContext* OptionalContext) : m_OptionalContext(OptionalContext) {} + virtual void EmitLogMessage(int LogLevel, std::string_view Format, fmt::format_args Args) override + { + ZEN_UNUSED(LogLevel); + if (m_OptionalContext) + { + fmt::basic_memory_buffer<char, 250> MessageBuffer; + fmt::vformat_to(fmt::appender(MessageBuffer), Format, Args); + remotestore_impl::ReportMessage(m_OptionalContext, std::string_view(MessageBuffer.data(), MessageBuffer.size())); + } + } + + virtual void SetLogOperationName(std::string_view Name) override { ZEN_UNUSED(Name); } + virtual void SetLogOperationProgress(uint32_t StepIndex, uint32_t StepCount) override { ZEN_UNUSED(StepIndex, StepCount); } + virtual uint32_t GetProgressUpdateDelayMS() override { return 0; } + virtual ProgressBar* CreateProgressBar(std::string_view InSubTask) override + { + ZEN_UNUSED(InSubTask); + return nullptr; + } + + private: + JobContext* m_OptionalContext; + }; + + void DownloadAndSaveBlockChunks(CidStore& ChunkStore, + RemoteProjectStore& RemoteStore, + bool IgnoreMissingAttachments, + JobContext* OptionalContext, + WorkerThreadPool& NetworkWorkerPool, + WorkerThreadPool& WorkerPool, + Latch& AttachmentsDownloadLatch, + Latch& AttachmentsWriteLatch, + AsyncRemoteResult& RemoteResult, + DownloadInfo& Info, + Stopwatch& LoadAttachmentsTimer, + std::atomic_uint64_t& DownloadStartMS, + ThinChunkBlockDescription&& ThinBlockDescription, + std::vector<uint32_t>&& NeededChunkIndexes) { AttachmentsDownloadLatch.AddCount(1); NetworkWorkerPool.ScheduleWork( @@ -261,7 +293,8 @@ namespace remotestore_impl { &AttachmentsDownloadLatch, &AttachmentsWriteLatch, &RemoteResult, - Chunks = Chunks, + ThinBlockDescription = std::move(ThinBlockDescription), + NeededChunkIndexes = std::move(NeededChunkIndexes), &Info, &LoadAttachmentsTimer, &DownloadStartMS, @@ -276,6 +309,13 @@ namespace remotestore_impl { } try { + std::vector<IoHash> Chunks; + Chunks.reserve(NeededChunkIndexes.size()); + for (uint32_t ChunkIndex : NeededChunkIndexes) + { + Chunks.push_back(ThinBlockDescription.ChunkRawHashes[ChunkIndex]); + } + uint64_t Unset = (std::uint64_t)-1; DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs()); RemoteProjectStore::LoadAttachmentsResult Result = RemoteStore.LoadAttachments(Chunks); @@ -293,7 +333,12 @@ namespace remotestore_impl { } return; } - Info.AttachmentsDownloaded.fetch_add(Chunks.size()); + Info.AttachmentsDownloaded.fetch_add(Result.Chunks.size()); + for (const auto& It : Result.Chunks) + { + uint64_t ChunkSize = It.second.GetCompressedSize(); + Info.AttachmentBytesDownloaded.fetch_add(ChunkSize); + } ZEN_INFO("Loaded {} bulk attachments in {}", Chunks.size(), NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000))); @@ -320,8 +365,6 @@ namespace remotestore_impl { for (const auto& It : Chunks) { - uint64_t ChunkSize = It.second.GetCompressedSize(); - Info.AttachmentBytesDownloaded.fetch_add(ChunkSize); WriteAttachmentBuffers.push_back(It.second.GetCompressed().Flatten().AsIoBuffer()); WriteRawHashes.push_back(It.first); } @@ -350,28 +393,29 @@ namespace remotestore_impl { catch (const std::exception& Ex) { RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), - fmt::format("Failed to bulk load {} attachments", Chunks.size()), + fmt::format("Failed to bulk load {} attachments", NeededChunkIndexes.size()), Ex.what()); } }, WorkerThreadPool::EMode::EnableBacklog); }; - void DownloadAndSaveBlock(CidStore& ChunkStore, - RemoteProjectStore& RemoteStore, - bool IgnoreMissingAttachments, - JobContext* OptionalContext, - WorkerThreadPool& NetworkWorkerPool, - WorkerThreadPool& WorkerPool, - Latch& AttachmentsDownloadLatch, - Latch& AttachmentsWriteLatch, - AsyncRemoteResult& RemoteResult, - DownloadInfo& Info, - Stopwatch& LoadAttachmentsTimer, - std::atomic_uint64_t& DownloadStartMS, - const IoHash& BlockHash, - const std::vector<IoHash>& Chunks, - uint32_t RetriesLeft) + void DownloadAndSaveBlock(CidStore& ChunkStore, + RemoteProjectStore& RemoteStore, + bool IgnoreMissingAttachments, + JobContext* OptionalContext, + WorkerThreadPool& NetworkWorkerPool, + WorkerThreadPool& WorkerPool, + Latch& AttachmentsDownloadLatch, + Latch& AttachmentsWriteLatch, + AsyncRemoteResult& RemoteResult, + DownloadInfo& Info, + Stopwatch& LoadAttachmentsTimer, + std::atomic_uint64_t& DownloadStartMS, + const IoHash& BlockHash, + const tsl::robin_map<IoHash, uint32_t, IoHash::Hasher>& AllNeededPartialChunkHashesLookup, + std::span<std::atomic<bool>> ChunkDownloadedFlags, + uint32_t RetriesLeft) { AttachmentsDownloadLatch.AddCount(1); NetworkWorkerPool.ScheduleWork( @@ -381,7 +425,6 @@ namespace remotestore_impl { &RemoteStore, &NetworkWorkerPool, &WorkerPool, - BlockHash, &RemoteResult, &Info, &LoadAttachmentsTimer, @@ -389,7 +432,9 @@ namespace remotestore_impl { IgnoreMissingAttachments, OptionalContext, RetriesLeft, - Chunks = std::vector<IoHash>(Chunks)]() { + BlockHash = IoHash(BlockHash), + &AllNeededPartialChunkHashesLookup, + ChunkDownloadedFlags]() { ZEN_TRACE_CPU("DownloadBlock"); auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); }); @@ -401,7 +446,7 @@ namespace remotestore_impl { { uint64_t Unset = (std::uint64_t)-1; DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs()); - RemoteProjectStore::LoadAttachmentResult BlockResult = RemoteStore.LoadAttachment(BlockHash); + RemoteProjectStore::LoadAttachmentResult BlockResult = RemoteStore.LoadAttachment(BlockHash, {}); if (BlockResult.ErrorCode) { ReportMessage(OptionalContext, @@ -422,10 +467,10 @@ namespace remotestore_impl { } uint64_t BlockSize = BlockResult.Bytes.GetSize(); Info.AttachmentBlocksDownloaded.fetch_add(1); - ZEN_INFO("Loaded block attachment '{}' in {} ({})", - BlockHash, - NiceTimeSpanMs(static_cast<uint64_t>(BlockResult.ElapsedSeconds * 1000)), - NiceBytes(BlockSize)); + ZEN_DEBUG("Loaded block attachment '{}' in {} ({})", + BlockHash, + NiceTimeSpanMs(static_cast<uint64_t>(BlockResult.ElapsedSeconds * 1000)), + NiceBytes(BlockSize)); Info.AttachmentBlockBytesDownloaded.fetch_add(BlockSize); AttachmentsWriteLatch.AddCount(1); @@ -436,7 +481,6 @@ namespace remotestore_impl { &RemoteStore, &NetworkWorkerPool, &WorkerPool, - BlockHash, &RemoteResult, &Info, &LoadAttachmentsTimer, @@ -444,8 +488,10 @@ namespace remotestore_impl { IgnoreMissingAttachments, OptionalContext, RetriesLeft, - Chunks = std::move(Chunks), - Bytes = std::move(BlockResult.Bytes)]() { + BlockHash = IoHash(BlockHash), + &AllNeededPartialChunkHashesLookup, + ChunkDownloadedFlags, + Bytes = std::move(BlockResult.Bytes)]() { auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); }); if (RemoteResult.IsError()) { @@ -454,9 +500,6 @@ namespace remotestore_impl { try { ZEN_ASSERT(Bytes.Size() > 0); - std::unordered_set<IoHash, IoHash::Hasher> WantedChunks; - WantedChunks.reserve(Chunks.size()); - WantedChunks.insert(Chunks.begin(), Chunks.end()); std::vector<IoBuffer> WriteAttachmentBuffers; std::vector<IoHash> WriteRawHashes; @@ -485,7 +528,8 @@ namespace remotestore_impl { LoadAttachmentsTimer, DownloadStartMS, BlockHash, - std::move(Chunks), + AllNeededPartialChunkHashesLookup, + ChunkDownloadedFlags, RetriesLeft - 1); } ReportMessage( @@ -519,7 +563,8 @@ namespace remotestore_impl { LoadAttachmentsTimer, DownloadStartMS, BlockHash, - std::move(Chunks), + AllNeededPartialChunkHashesLookup, + ChunkDownloadedFlags, RetriesLeft - 1); } ReportMessage(OptionalContext, @@ -546,28 +591,36 @@ namespace remotestore_impl { uint64_t BlockSize = BlockPayload.GetSize(); uint64_t BlockHeaderSize = 0; - bool StoreChunksOK = IterateChunkBlock( - BlockPayload.Flatten(), - [&WantedChunks, &WriteAttachmentBuffers, &WriteRawHashes, &Info, &PotentialSize]( - CompressedBuffer&& Chunk, - const IoHash& AttachmentRawHash) { - if (WantedChunks.contains(AttachmentRawHash)) - { - WriteAttachmentBuffers.emplace_back(Chunk.GetCompressed().Flatten().AsIoBuffer()); - IoHash RawHash; - uint64_t RawSize; - ZEN_ASSERT( - CompressedBuffer::ValidateCompressedHeader(WriteAttachmentBuffers.back(), - RawHash, - RawSize, - /*OutOptionalTotalCompressedSize*/ nullptr)); - ZEN_ASSERT(RawHash == AttachmentRawHash); - WriteRawHashes.emplace_back(AttachmentRawHash); - WantedChunks.erase(AttachmentRawHash); - PotentialSize += WriteAttachmentBuffers.back().GetSize(); - } - }, - BlockHeaderSize); + + bool StoreChunksOK = IterateChunkBlock( + BlockPayload.Flatten(), + [&AllNeededPartialChunkHashesLookup, + &ChunkDownloadedFlags, + &WriteAttachmentBuffers, + &WriteRawHashes, + &Info, + &PotentialSize](CompressedBuffer&& Chunk, const IoHash& AttachmentRawHash) { + auto ChunkIndexIt = AllNeededPartialChunkHashesLookup.find(AttachmentRawHash); + if (ChunkIndexIt != AllNeededPartialChunkHashesLookup.end()) + { + bool Expected = false; + if (ChunkDownloadedFlags[ChunkIndexIt->second].compare_exchange_strong(Expected, true)) + { + WriteAttachmentBuffers.emplace_back(Chunk.GetCompressed().Flatten().AsIoBuffer()); + IoHash RawHash; + uint64_t RawSize; + ZEN_ASSERT( + CompressedBuffer::ValidateCompressedHeader(WriteAttachmentBuffers.back(), + RawHash, + RawSize, + /*OutOptionalTotalCompressedSize*/ nullptr)); + ZEN_ASSERT(RawHash == AttachmentRawHash); + WriteRawHashes.emplace_back(AttachmentRawHash); + PotentialSize += WriteAttachmentBuffers.back().GetSize(); + } + } + }, + BlockHeaderSize); if (!StoreChunksOK) { @@ -582,8 +635,6 @@ namespace remotestore_impl { return; } - ZEN_ASSERT(WantedChunks.empty()); - if (!WriteAttachmentBuffers.empty()) { auto Results = ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes); @@ -625,6 +676,293 @@ namespace remotestore_impl { WorkerThreadPool::EMode::EnableBacklog); }; + void DownloadAndSavePartialBlock(CidStore& ChunkStore, + RemoteProjectStore& RemoteStore, + bool IgnoreMissingAttachments, + JobContext* OptionalContext, + WorkerThreadPool& NetworkWorkerPool, + WorkerThreadPool& WorkerPool, + Latch& AttachmentsDownloadLatch, + Latch& AttachmentsWriteLatch, + AsyncRemoteResult& RemoteResult, + DownloadInfo& Info, + Stopwatch& LoadAttachmentsTimer, + std::atomic_uint64_t& DownloadStartMS, + const ChunkBlockDescription& BlockDescription, + std::span<const ChunkBlockAnalyser::BlockRangeDescriptor> BlockRangeDescriptors, + size_t BlockRangeIndexStart, + size_t BlockRangeCount, + const tsl::robin_map<IoHash, uint32_t, IoHash::Hasher>& AllNeededPartialChunkHashesLookup, + std::span<std::atomic<bool>> ChunkDownloadedFlags, + uint32_t RetriesLeft) + { + AttachmentsDownloadLatch.AddCount(1); + NetworkWorkerPool.ScheduleWork( + [&AttachmentsDownloadLatch, + &AttachmentsWriteLatch, + &ChunkStore, + &RemoteStore, + &NetworkWorkerPool, + &WorkerPool, + &RemoteResult, + &Info, + &LoadAttachmentsTimer, + &DownloadStartMS, + IgnoreMissingAttachments, + OptionalContext, + RetriesLeft, + BlockDescription, + BlockRangeDescriptors, + BlockRangeIndexStart, + BlockRangeCount, + &AllNeededPartialChunkHashesLookup, + ChunkDownloadedFlags]() { + ZEN_TRACE_CPU("DownloadBlockRanges"); + + auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); }); + try + { + uint64_t Unset = (std::uint64_t)-1; + DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs()); + + double DownloadElapsedSeconds = 0; + uint64_t DownloadedBytes = 0; + + for (size_t BlockRangeIndex = BlockRangeIndexStart; BlockRangeIndex < BlockRangeIndexStart + BlockRangeCount; + BlockRangeIndex++) + { + if (RemoteResult.IsError()) + { + return; + } + + const ChunkBlockAnalyser::BlockRangeDescriptor& BlockRange = BlockRangeDescriptors[BlockRangeIndex]; + + RemoteProjectStore::LoadAttachmentResult BlockResult = + RemoteStore.LoadAttachment(BlockDescription.BlockHash, + {.Offset = BlockRange.RangeStart, .Bytes = BlockRange.RangeLength}); + if (BlockResult.ErrorCode) + { + ReportMessage(OptionalContext, + fmt::format("Failed to download block attachment '{}' range {},{} ({}): {}", + BlockDescription.BlockHash, + BlockRange.RangeStart, + BlockRange.RangeLength, + BlockResult.ErrorCode, + BlockResult.Reason)); + Info.MissingAttachmentCount.fetch_add(1); + if (!IgnoreMissingAttachments) + { + RemoteResult.SetError(BlockResult.ErrorCode, BlockResult.Reason, BlockResult.Text); + } + return; + } + if (RemoteResult.IsError()) + { + return; + } + uint64_t BlockPartSize = BlockResult.Bytes.GetSize(); + if (BlockPartSize != BlockRange.RangeLength) + { + std::string ErrorString = + fmt::format("Failed to download block attachment '{}' range {},{}, got {} bytes ({}): {}", + BlockDescription.BlockHash, + BlockRange.RangeStart, + BlockRange.RangeLength, + BlockPartSize, + RemoteResult.GetError(), + RemoteResult.GetErrorReason()); + + ReportMessage(OptionalContext, ErrorString); + Info.MissingAttachmentCount.fetch_add(1); + if (!IgnoreMissingAttachments) + { + RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::NotFound), + "Mismatching block part range received", + ErrorString); + } + return; + } + Info.AttachmentBlocksRangesDownloaded.fetch_add(1); + + DownloadElapsedSeconds += BlockResult.ElapsedSeconds; + DownloadedBytes += BlockPartSize; + + Info.AttachmentBlockRangeBytesDownloaded.fetch_add(BlockPartSize); + + AttachmentsWriteLatch.AddCount(1); + WorkerPool.ScheduleWork( + [&AttachmentsDownloadLatch, + &AttachmentsWriteLatch, + &ChunkStore, + &RemoteStore, + &NetworkWorkerPool, + &WorkerPool, + &RemoteResult, + &Info, + &LoadAttachmentsTimer, + &DownloadStartMS, + IgnoreMissingAttachments, + OptionalContext, + RetriesLeft, + BlockDescription, + BlockRange, + &AllNeededPartialChunkHashesLookup, + ChunkDownloadedFlags, + BlockPayload = std::move(BlockResult.Bytes)]() { + auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); }); + if (RemoteResult.IsError()) + { + return; + } + try + { + ZEN_ASSERT(BlockPayload.Size() > 0); + std::vector<IoBuffer> WriteAttachmentBuffers; + std::vector<IoHash> WriteRawHashes; + + uint64_t PotentialSize = 0; + uint64_t UsedSize = 0; + uint64_t BlockPartSize = BlockPayload.GetSize(); + + uint32_t OffsetInBlock = 0; + for (uint32_t ChunkBlockIndex = BlockRange.ChunkBlockIndexStart; + ChunkBlockIndex < BlockRange.ChunkBlockIndexStart + BlockRange.ChunkBlockIndexCount; + ChunkBlockIndex++) + { + const uint32_t ChunkCompressedSize = BlockDescription.ChunkCompressedLengths[ChunkBlockIndex]; + const IoHash& ChunkHash = BlockDescription.ChunkRawHashes[ChunkBlockIndex]; + + if (auto ChunkIndexIt = AllNeededPartialChunkHashesLookup.find(ChunkHash); + ChunkIndexIt != AllNeededPartialChunkHashesLookup.end()) + { + bool Expected = false; + if (ChunkDownloadedFlags[ChunkIndexIt->second].compare_exchange_strong(Expected, true)) + { + IoHash VerifyChunkHash; + uint64_t VerifyChunkSize; + CompressedBuffer CompressedChunk = CompressedBuffer::FromCompressed( + SharedBuffer(IoBuffer(BlockPayload, OffsetInBlock, ChunkCompressedSize)), + VerifyChunkHash, + VerifyChunkSize); + if (!CompressedChunk) + { + std::string ErrorString = fmt::format( + "Chunk at {},{} in block attachment '{}' is not a valid compressed buffer", + OffsetInBlock, + ChunkCompressedSize, + BlockDescription.BlockHash); + ReportMessage(OptionalContext, ErrorString); + Info.MissingAttachmentCount.fetch_add(1); + if (!IgnoreMissingAttachments) + { + RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::NotFound), + "Malformed chunk block", + ErrorString); + } + continue; + } + if (VerifyChunkHash != ChunkHash) + { + std::string ErrorString = fmt::format( + "Chunk at {},{} in block attachment '{}' has mismatching hash, expected {}, got {}", + OffsetInBlock, + ChunkCompressedSize, + BlockDescription.BlockHash, + ChunkHash, + VerifyChunkHash); + ReportMessage(OptionalContext, ErrorString); + Info.MissingAttachmentCount.fetch_add(1); + if (!IgnoreMissingAttachments) + { + RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::NotFound), + "Malformed chunk block", + ErrorString); + } + continue; + } + if (VerifyChunkSize != BlockDescription.ChunkRawLengths[ChunkBlockIndex]) + { + std::string ErrorString = fmt::format( + "Chunk at {},{} in block attachment '{}' has mismatching raw size, expected {}, " + "got {}", + OffsetInBlock, + ChunkCompressedSize, + BlockDescription.BlockHash, + BlockDescription.ChunkRawLengths[ChunkBlockIndex], + VerifyChunkSize); + ReportMessage(OptionalContext, ErrorString); + Info.MissingAttachmentCount.fetch_add(1); + if (!IgnoreMissingAttachments) + { + RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::NotFound), + "Malformed chunk block", + ErrorString); + } + continue; + } + + WriteAttachmentBuffers.emplace_back(CompressedChunk.GetCompressed().Flatten().AsIoBuffer()); + WriteRawHashes.emplace_back(ChunkHash); + PotentialSize += WriteAttachmentBuffers.back().GetSize(); + } + } + OffsetInBlock += ChunkCompressedSize; + } + + if (!WriteAttachmentBuffers.empty()) + { + auto Results = ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes); + for (size_t Index = 0; Index < Results.size(); Index++) + { + const auto& Result = Results[Index]; + if (Result.New) + { + Info.AttachmentBytesStored.fetch_add(WriteAttachmentBuffers[Index].GetSize()); + Info.AttachmentsStored.fetch_add(1); + UsedSize += WriteAttachmentBuffers[Index].GetSize(); + } + } + ZEN_DEBUG("Used {} (matching {}) out of {} for block {} range {}, {} ({} %) (use of matching {}%)", + NiceBytes(UsedSize), + NiceBytes(PotentialSize), + NiceBytes(BlockPartSize), + BlockDescription.BlockHash, + BlockRange.RangeStart, + BlockRange.RangeLength, + (100 * UsedSize) / BlockPartSize, + PotentialSize > 0 ? (UsedSize * 100) / PotentialSize : 0); + } + } + catch (const std::exception& Ex) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), + fmt::format("Failed save block attachment {} range {}, {}", + BlockDescription.BlockHash, + BlockRange.RangeStart, + BlockRange.RangeLength), + Ex.what()); + } + }, + WorkerThreadPool::EMode::EnableBacklog); + } + + ZEN_DEBUG("Loaded {} ranges from block attachment '{}' in {} ({})", + BlockRangeCount, + BlockDescription.BlockHash, + NiceTimeSpanMs(static_cast<uint64_t>(DownloadElapsedSeconds * 1000)), + NiceBytes(DownloadedBytes)); + } + catch (const std::exception& Ex) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), + fmt::format("Failed to download block attachment {} ranges", BlockDescription.BlockHash), + Ex.what()); + } + }, + WorkerThreadPool::EMode::EnableBacklog); + }; + void DownloadAndSaveAttachment(CidStore& ChunkStore, RemoteProjectStore& RemoteStore, bool IgnoreMissingAttachments, @@ -664,7 +1002,7 @@ namespace remotestore_impl { { uint64_t Unset = (std::uint64_t)-1; DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs()); - RemoteProjectStore::LoadAttachmentResult AttachmentResult = RemoteStore.LoadAttachment(RawHash); + RemoteProjectStore::LoadAttachmentResult AttachmentResult = RemoteStore.LoadAttachment(RawHash, {}); if (AttachmentResult.ErrorCode) { ReportMessage(OptionalContext, @@ -680,10 +1018,10 @@ namespace remotestore_impl { return; } uint64_t AttachmentSize = AttachmentResult.Bytes.GetSize(); - ZEN_INFO("Loaded large attachment '{}' in {} ({})", - RawHash, - NiceTimeSpanMs(static_cast<uint64_t>(AttachmentResult.ElapsedSeconds * 1000)), - NiceBytes(AttachmentSize)); + ZEN_DEBUG("Loaded large attachment '{}' in {} ({})", + RawHash, + NiceTimeSpanMs(static_cast<uint64_t>(AttachmentResult.ElapsedSeconds * 1000)), + NiceBytes(AttachmentSize)); Info.AttachmentsDownloaded.fetch_add(1); if (RemoteResult.IsError()) { @@ -1224,35 +1562,7 @@ BuildContainer(CidStore& ChunkStore, { using namespace std::literals; - class JobContextLogOutput : public OperationLogOutput - { - public: - JobContextLogOutput(JobContext* OptionalContext) : m_OptionalContext(OptionalContext) {} - virtual void EmitLogMessage(int LogLevel, std::string_view Format, fmt::format_args Args) override - { - ZEN_UNUSED(LogLevel); - if (m_OptionalContext) - { - fmt::basic_memory_buffer<char, 250> MessageBuffer; - fmt::vformat_to(fmt::appender(MessageBuffer), Format, Args); - remotestore_impl::ReportMessage(m_OptionalContext, std::string_view(MessageBuffer.data(), MessageBuffer.size())); - } - } - - virtual void SetLogOperationName(std::string_view Name) override { ZEN_UNUSED(Name); } - virtual void SetLogOperationProgress(uint32_t StepIndex, uint32_t StepCount) override { ZEN_UNUSED(StepIndex, StepCount); } - virtual uint32_t GetProgressUpdateDelayMS() override { return 0; } - virtual ProgressBar* CreateProgressBar(std::string_view InSubTask) override - { - ZEN_UNUSED(InSubTask); - return nullptr; - } - - private: - JobContext* m_OptionalContext; - }; - - std::unique_ptr<OperationLogOutput> LogOutput(std::make_unique<JobContextLogOutput>(OptionalContext)); + std::unique_ptr<OperationLogOutput> LogOutput(std::make_unique<remotestore_impl::JobContextLogOutput>(OptionalContext)); size_t OpCount = 0; @@ -2768,14 +3078,15 @@ SaveOplog(CidStore& ChunkStore, }; RemoteProjectStore::Result -ParseOplogContainer(const CbObject& ContainerObject, - const std::function<void(std::span<IoHash> RawHashes)>& OnReferencedAttachments, - const std::function<bool(const IoHash& RawHash)>& HasAttachment, - const std::function<void(const IoHash& BlockHash, std::vector<IoHash>&& Chunks)>& OnNeedBlock, - const std::function<void(const IoHash& RawHash)>& OnNeedAttachment, - const std::function<void(const ChunkedInfo&)>& OnChunkedAttachment, - CbObject& OutOplogSection, - JobContext* OptionalContext) +ParseOplogContainer( + const CbObject& ContainerObject, + const std::function<void(std::span<IoHash> RawHashes)>& OnReferencedAttachments, + const std::function<bool(const IoHash& RawHash)>& HasAttachment, + const std::function<void(ThinChunkBlockDescription&& ThinBlockDescription, std::vector<uint32_t>&& NeededChunkIndexes)>& OnNeedBlock, + const std::function<void(const IoHash& RawHash)>& OnNeedAttachment, + const std::function<void(const ChunkedInfo&)>& OnChunkedAttachment, + CbObject& OutOplogSection, + JobContext* OptionalContext) { using namespace std::literals; @@ -2801,12 +3112,12 @@ ParseOplogContainer(const CbObject& ContainerObject, "Section has unexpected data type", "Failed to save oplog container"}; } - std::unordered_set<IoHash, IoHash::Hasher> OpsAttachments; + std::unordered_set<IoHash, IoHash::Hasher> NeededAttachments; { CbArrayView OpsArray = OutOplogSection["ops"sv].AsArrayView(); for (CbFieldView OpEntry : OpsArray) { - OpEntry.IterateAttachments([&](CbFieldView FieldView) { OpsAttachments.insert(FieldView.AsAttachment()); }); + OpEntry.IterateAttachments([&](CbFieldView FieldView) { NeededAttachments.insert(FieldView.AsAttachment()); }); if (remotestore_impl::IsCancelled(OptionalContext)) { return RemoteProjectStore::Result{.ErrorCode = gsl::narrow<int>(HttpResponseCode::OK), @@ -2816,7 +3127,7 @@ ParseOplogContainer(const CbObject& ContainerObject, } } { - std::vector<IoHash> ReferencedAttachments(OpsAttachments.begin(), OpsAttachments.end()); + std::vector<IoHash> ReferencedAttachments(NeededAttachments.begin(), NeededAttachments.end()); OnReferencedAttachments(ReferencedAttachments); } @@ -2827,24 +3138,27 @@ ParseOplogContainer(const CbObject& ContainerObject, .Reason = "Operation cancelled"}; } - remotestore_impl::ReportMessage(OptionalContext, fmt::format("Oplog references {} attachments", OpsAttachments.size())); + remotestore_impl::ReportMessage(OptionalContext, fmt::format("Oplog references {} attachments", NeededAttachments.size())); CbArrayView ChunkedFilesArray = ContainerObject["chunkedfiles"sv].AsArrayView(); for (CbFieldView ChunkedFileField : ChunkedFilesArray) { CbObjectView ChunkedFileView = ChunkedFileField.AsObjectView(); IoHash RawHash = ChunkedFileView["rawhash"sv].AsHash(); - if (OpsAttachments.contains(RawHash) && (!HasAttachment(RawHash))) + if (NeededAttachments.erase(RawHash) == 1) { - ChunkedInfo Chunked = ReadChunkedInfo(ChunkedFileView); - - OnReferencedAttachments(Chunked.ChunkHashes); - OpsAttachments.insert(Chunked.ChunkHashes.begin(), Chunked.ChunkHashes.end()); - OnChunkedAttachment(Chunked); - ZEN_INFO("Requesting chunked attachment '{}' ({}) built from {} chunks", - Chunked.RawHash, - NiceBytes(Chunked.RawSize), - Chunked.ChunkHashes.size()); + if (!HasAttachment(RawHash)) + { + ChunkedInfo Chunked = ReadChunkedInfo(ChunkedFileView); + + OnReferencedAttachments(Chunked.ChunkHashes); + NeededAttachments.insert(Chunked.ChunkHashes.begin(), Chunked.ChunkHashes.end()); + OnChunkedAttachment(Chunked); + ZEN_INFO("Requesting chunked attachment '{}' ({}) built from {} chunks", + Chunked.RawHash, + NiceBytes(Chunked.RawSize), + Chunked.ChunkHashes.size()); + } } if (remotestore_impl::IsCancelled(OptionalContext)) { @@ -2854,6 +3168,8 @@ ParseOplogContainer(const CbObject& ContainerObject, } } + std::vector<ThinChunkBlockDescription> ThinBlocksDescriptions; + size_t NeedBlockCount = 0; CbArrayView BlocksArray = ContainerObject["blocks"sv].AsArrayView(); for (CbFieldView BlockField : BlocksArray) @@ -2863,45 +3179,38 @@ ParseOplogContainer(const CbObject& ContainerObject, CbArrayView ChunksArray = BlockView["chunks"sv].AsArrayView(); - std::vector<IoHash> NeededChunks; - NeededChunks.reserve(ChunksArray.Num()); - if (BlockHash == IoHash::Zero) + std::vector<IoHash> ChunkHashes; + ChunkHashes.reserve(ChunksArray.Num()); + for (CbFieldView ChunkField : ChunksArray) { - for (CbFieldView ChunkField : ChunksArray) - { - IoHash ChunkHash = ChunkField.AsBinaryAttachment(); - if (OpsAttachments.erase(ChunkHash) == 1) - { - if (!HasAttachment(ChunkHash)) - { - NeededChunks.emplace_back(ChunkHash); - } - } - } + ChunkHashes.push_back(ChunkField.AsHash()); } - else + ThinBlocksDescriptions.push_back(ThinChunkBlockDescription{.BlockHash = BlockHash, .ChunkRawHashes = std::move(ChunkHashes)}); + } + + for (ThinChunkBlockDescription& ThinBlockDescription : ThinBlocksDescriptions) + { + std::vector<uint32_t> NeededBlockChunkIndexes; + for (uint32_t ChunkIndex = 0; ChunkIndex < ThinBlockDescription.ChunkRawHashes.size(); ChunkIndex++) { - for (CbFieldView ChunkField : ChunksArray) + const IoHash& ChunkHash = ThinBlockDescription.ChunkRawHashes[ChunkIndex]; + if (NeededAttachments.erase(ChunkHash) == 1) { - const IoHash ChunkHash = ChunkField.AsHash(); - if (OpsAttachments.erase(ChunkHash) == 1) + if (!HasAttachment(ChunkHash)) { - if (!HasAttachment(ChunkHash)) - { - NeededChunks.emplace_back(ChunkHash); - } + NeededBlockChunkIndexes.push_back(ChunkIndex); } } } - - if (!NeededChunks.empty()) + if (!NeededBlockChunkIndexes.empty()) { - OnNeedBlock(BlockHash, std::move(NeededChunks)); - if (BlockHash != IoHash::Zero) + if (ThinBlockDescription.BlockHash != IoHash::Zero) { NeedBlockCount++; } + OnNeedBlock(std::move(ThinBlockDescription), std::move(NeededBlockChunkIndexes)); } + if (remotestore_impl::IsCancelled(OptionalContext)) { return RemoteProjectStore::Result{.ErrorCode = gsl::narrow<int>(HttpResponseCode::OK), @@ -2909,6 +3218,7 @@ ParseOplogContainer(const CbObject& ContainerObject, .Reason = "Operation cancelled"}; } } + remotestore_impl::ReportMessage(OptionalContext, fmt::format("Requesting {} of {} attachment blocks", NeedBlockCount, BlocksArray.Num())); @@ -2918,7 +3228,7 @@ ParseOplogContainer(const CbObject& ContainerObject, { IoHash AttachmentHash = LargeChunksField.AsBinaryAttachment(); - if (OpsAttachments.erase(AttachmentHash) == 1) + if (NeededAttachments.erase(AttachmentHash) == 1) { if (!HasAttachment(AttachmentHash)) { @@ -2941,14 +3251,15 @@ ParseOplogContainer(const CbObject& ContainerObject, } RemoteProjectStore::Result -SaveOplogContainer(ProjectStore::Oplog& Oplog, - const CbObject& ContainerObject, - const std::function<void(std::span<IoHash> RawHashes)>& OnReferencedAttachments, - const std::function<bool(const IoHash& RawHash)>& HasAttachment, - const std::function<void(const IoHash& BlockHash, std::vector<IoHash>&& Chunks)>& OnNeedBlock, - const std::function<void(const IoHash& RawHash)>& OnNeedAttachment, - const std::function<void(const ChunkedInfo&)>& OnChunkedAttachment, - JobContext* OptionalContext) +SaveOplogContainer( + ProjectStore::Oplog& Oplog, + const CbObject& ContainerObject, + const std::function<void(std::span<IoHash> RawHashes)>& OnReferencedAttachments, + const std::function<bool(const IoHash& RawHash)>& HasAttachment, + const std::function<void(ThinChunkBlockDescription&& ThinBlockDescription, std::vector<uint32_t>&& NeededChunkIndexes)>& OnNeedBlock, + const std::function<void(const IoHash& RawHash)>& OnNeedAttachment, + const std::function<void(const ChunkedInfo&)>& OnChunkedAttachment, + JobContext* OptionalContext) { using namespace std::literals; @@ -2972,18 +3283,23 @@ SaveOplogContainer(ProjectStore::Oplog& Oplog, } RemoteProjectStore::Result -LoadOplog(CidStore& ChunkStore, - RemoteProjectStore& RemoteStore, - ProjectStore::Oplog& Oplog, - WorkerThreadPool& NetworkWorkerPool, - WorkerThreadPool& WorkerPool, - bool ForceDownload, - bool IgnoreMissingAttachments, - bool CleanOplog, - JobContext* OptionalContext) +LoadOplog(CidStore& ChunkStore, + RemoteProjectStore& RemoteStore, + ProjectStore::Oplog& Oplog, + WorkerThreadPool& NetworkWorkerPool, + WorkerThreadPool& WorkerPool, + bool ForceDownload, + bool IgnoreMissingAttachments, + bool CleanOplog, + EPartialBlockRequestMode PartialBlockRequestMode, + double HostLatencySec, + double CacheLatencySec, + JobContext* OptionalContext) { using namespace std::literals; + std::unique_ptr<OperationLogOutput> LogOutput(std::make_unique<remotestore_impl::JobContextLogOutput>(OptionalContext)); + remotestore_impl::DownloadInfo Info; Stopwatch Timer; @@ -3035,6 +3351,14 @@ LoadOplog(CidStore& ChunkStore, return false; }; + struct NeededBlockDownload + { + ThinChunkBlockDescription ThinBlockDescription; + std::vector<uint32_t> NeededChunkIndexes; + }; + + std::vector<NeededBlockDownload> NeededBlockDownloads; + auto OnNeedBlock = [&RemoteStore, &ChunkStore, &NetworkWorkerPool, @@ -3047,8 +3371,9 @@ LoadOplog(CidStore& ChunkStore, &Info, &LoadAttachmentsTimer, &DownloadStartMS, + &NeededBlockDownloads, IgnoreMissingAttachments, - OptionalContext](const IoHash& BlockHash, std::vector<IoHash>&& Chunks) { + OptionalContext](ThinChunkBlockDescription&& ThinBlockDescription, std::vector<uint32_t>&& NeededChunkIndexes) { if (RemoteResult.IsError()) { return; @@ -3056,7 +3381,7 @@ LoadOplog(CidStore& ChunkStore, BlockCountToDownload++; AttachmentCount.fetch_add(1); - if (BlockHash == IoHash::Zero) + if (ThinBlockDescription.BlockHash == IoHash::Zero) { DownloadAndSaveBlockChunks(ChunkStore, RemoteStore, @@ -3070,25 +3395,13 @@ LoadOplog(CidStore& ChunkStore, Info, LoadAttachmentsTimer, DownloadStartMS, - Chunks); + std::move(ThinBlockDescription), + std::move(NeededChunkIndexes)); } else { - DownloadAndSaveBlock(ChunkStore, - RemoteStore, - IgnoreMissingAttachments, - OptionalContext, - NetworkWorkerPool, - WorkerPool, - AttachmentsDownloadLatch, - AttachmentsWriteLatch, - RemoteResult, - Info, - LoadAttachmentsTimer, - DownloadStartMS, - BlockHash, - Chunks, - 3); + NeededBlockDownloads.push_back(NeededBlockDownload{.ThinBlockDescription = std::move(ThinBlockDescription), + .NeededChunkIndexes = std::move(NeededChunkIndexes)}); } }; @@ -3132,12 +3445,7 @@ LoadOplog(CidStore& ChunkStore, }; std::vector<ChunkedInfo> FilesToDechunk; - auto OnChunkedAttachment = [&Oplog, &ChunkStore, &FilesToDechunk, ForceDownload](const ChunkedInfo& Chunked) { - if (ForceDownload || !ChunkStore.ContainsChunk(Chunked.RawHash)) - { - FilesToDechunk.push_back(Chunked); - } - }; + auto OnChunkedAttachment = [&FilesToDechunk](const ChunkedInfo& Chunked) { FilesToDechunk.push_back(Chunked); }; auto OnReferencedAttachments = [&Oplog](std::span<IoHash> RawHashes) { Oplog.CaptureAddedAttachments(RawHashes); }; @@ -3165,6 +3473,185 @@ LoadOplog(CidStore& ChunkStore, BlockCountToDownload, FilesToDechunk.size())); + std::vector<IoHash> BlockHashes; + std::vector<IoHash> AllNeededChunkHashes; + BlockHashes.reserve(NeededBlockDownloads.size()); + for (const NeededBlockDownload& BlockDownload : NeededBlockDownloads) + { + BlockHashes.push_back(BlockDownload.ThinBlockDescription.BlockHash); + for (uint32_t ChunkIndex : BlockDownload.NeededChunkIndexes) + { + AllNeededChunkHashes.push_back(BlockDownload.ThinBlockDescription.ChunkRawHashes[ChunkIndex]); + } + } + + tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> AllNeededPartialChunkHashesLookup = BuildHashLookup(AllNeededChunkHashes); + std::vector<std::atomic<bool>> ChunkDownloadedFlags(AllNeededChunkHashes.size()); + std::vector<bool> DownloadedViaLegacyChunkFlag(AllNeededChunkHashes.size(), false); + ChunkBlockAnalyser::BlockResult PartialBlocksResult; + + RemoteProjectStore::GetBlockDescriptionsResult BlockDescriptions = RemoteStore.GetBlockDescriptions(BlockHashes); + std::vector<IoHash> BlocksWithDescription; + BlocksWithDescription.reserve(BlockDescriptions.Blocks.size()); + for (const ChunkBlockDescription& BlockDescription : BlockDescriptions.Blocks) + { + BlocksWithDescription.push_back(BlockDescription.BlockHash); + } + { + auto WantIt = NeededBlockDownloads.begin(); + auto FindIt = BlockDescriptions.Blocks.begin(); + while (WantIt != NeededBlockDownloads.end()) + { + if (FindIt == BlockDescriptions.Blocks.end()) + { + // Fall back to full download as we can't get enough information about the block + DownloadAndSaveBlock(ChunkStore, + RemoteStore, + IgnoreMissingAttachments, + OptionalContext, + NetworkWorkerPool, + WorkerPool, + AttachmentsDownloadLatch, + AttachmentsWriteLatch, + RemoteResult, + Info, + LoadAttachmentsTimer, + DownloadStartMS, + WantIt->ThinBlockDescription.BlockHash, + AllNeededPartialChunkHashesLookup, + ChunkDownloadedFlags, + 3); + for (uint32_t BlockChunkIndex : WantIt->NeededChunkIndexes) + { + const IoHash& ChunkHash = WantIt->ThinBlockDescription.ChunkRawHashes[BlockChunkIndex]; + auto It = AllNeededPartialChunkHashesLookup.find(ChunkHash); + ZEN_ASSERT(It != AllNeededPartialChunkHashesLookup.end()); + uint32_t ChunkIndex = It->second; + DownloadedViaLegacyChunkFlag[ChunkIndex] = true; + } + WantIt++; + } + else if (WantIt->ThinBlockDescription.BlockHash == FindIt->BlockHash) + { + // Found + FindIt++; + WantIt++; + } + else + { + // Not a requested block? + ZEN_ASSERT(false); + } + } + } + if (!AllNeededChunkHashes.empty()) + { + std::vector<ChunkBlockAnalyser::EPartialBlockDownloadMode> PartialBlockDownloadModes; + + if (PartialBlockRequestMode == EPartialBlockRequestMode::Off) + { + PartialBlockDownloadModes.resize(BlocksWithDescription.size(), ChunkBlockAnalyser::EPartialBlockDownloadMode::Off); + } + else + { + RemoteProjectStore::AttachmentExistsInCacheResult CacheExistsResult = + RemoteStore.AttachmentExistsInCache(BlocksWithDescription); + if (CacheExistsResult.ErrorCode != 0 || CacheExistsResult.HasBody.size() != BlocksWithDescription.size()) + { + CacheExistsResult.HasBody.resize(BlocksWithDescription.size(), false); + } + + PartialBlockDownloadModes.reserve(BlocksWithDescription.size()); + + for (bool ExistsInCache : CacheExistsResult.HasBody) + { + if (PartialBlockRequestMode == EPartialBlockRequestMode::All) + { + PartialBlockDownloadModes.push_back(ExistsInCache ? ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRangeHighSpeed + : ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRange); + } + else if (PartialBlockRequestMode == EPartialBlockRequestMode::ZenCacheOnly) + { + PartialBlockDownloadModes.push_back(ExistsInCache ? ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRangeHighSpeed + : ChunkBlockAnalyser::EPartialBlockDownloadMode::Off); + } + else if (PartialBlockRequestMode == EPartialBlockRequestMode::Mixed) + { + PartialBlockDownloadModes.push_back(ExistsInCache ? ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRangeHighSpeed + : ChunkBlockAnalyser::EPartialBlockDownloadMode::SingleRange); + } + } + } + + ZEN_ASSERT(PartialBlockDownloadModes.size() == BlocksWithDescription.size()); + + ChunkBlockAnalyser PartialAnalyser(*LogOutput, + BlockDescriptions.Blocks, + ChunkBlockAnalyser::Options{.IsQuiet = false, + .IsVerbose = false, + .HostLatencySec = HostLatencySec, + .HostHighSpeedLatencySec = CacheLatencySec}); + + std::vector<ChunkBlockAnalyser::NeededBlock> NeededBlocks = + PartialAnalyser.GetNeeded(AllNeededPartialChunkHashesLookup, + [&](uint32_t ChunkIndex) { return !DownloadedViaLegacyChunkFlag[ChunkIndex]; }); + + PartialBlocksResult = PartialAnalyser.CalculatePartialBlockDownloads(NeededBlocks, PartialBlockDownloadModes); + for (uint32_t FullBlockIndex : PartialBlocksResult.FullBlockIndexes) + { + DownloadAndSaveBlock(ChunkStore, + RemoteStore, + IgnoreMissingAttachments, + OptionalContext, + NetworkWorkerPool, + WorkerPool, + AttachmentsDownloadLatch, + AttachmentsWriteLatch, + RemoteResult, + Info, + LoadAttachmentsTimer, + DownloadStartMS, + BlockDescriptions.Blocks[FullBlockIndex].BlockHash, + AllNeededPartialChunkHashesLookup, + ChunkDownloadedFlags, + 3); + } + + for (size_t BlockRangeIndex = 0; BlockRangeIndex < PartialBlocksResult.BlockRanges.size();) + { + size_t RangeCount = 1; + size_t RangesLeft = PartialBlocksResult.BlockRanges.size() - BlockRangeIndex; + const ChunkBlockAnalyser::BlockRangeDescriptor& CurrentBlockRange = PartialBlocksResult.BlockRanges[BlockRangeIndex]; + while (RangeCount < RangesLeft && + CurrentBlockRange.BlockIndex == PartialBlocksResult.BlockRanges[BlockRangeIndex + RangeCount].BlockIndex) + { + RangeCount++; + } + + DownloadAndSavePartialBlock(ChunkStore, + RemoteStore, + IgnoreMissingAttachments, + OptionalContext, + NetworkWorkerPool, + WorkerPool, + AttachmentsDownloadLatch, + AttachmentsWriteLatch, + RemoteResult, + Info, + LoadAttachmentsTimer, + DownloadStartMS, + BlockDescriptions.Blocks[CurrentBlockRange.BlockIndex], + PartialBlocksResult.BlockRanges, + BlockRangeIndex, + RangeCount, + AllNeededPartialChunkHashesLookup, + ChunkDownloadedFlags, + 3); + + BlockRangeIndex += RangeCount; + } + } + AttachmentsDownloadLatch.CountDown(); while (!AttachmentsDownloadLatch.Wait(1000)) { @@ -3478,21 +3965,30 @@ LoadOplog(CidStore& ChunkStore, } } - remotestore_impl::ReportMessage( - OptionalContext, - fmt::format("Loaded oplog '{}' {} in {} ({}), Blocks: {} ({}), Attachments: {} ({}), Stored: {} ({}), Missing: {} {}", - RemoteStoreInfo.ContainerName, - Result.ErrorCode == 0 ? "SUCCESS" : "FAILURE", - NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0)), - NiceBytes(Info.OplogSizeBytes), - Info.AttachmentBlocksDownloaded.load(), - NiceBytes(Info.AttachmentBlockBytesDownloaded.load()), - Info.AttachmentsDownloaded.load(), - NiceBytes(Info.AttachmentBytesDownloaded.load()), - Info.AttachmentsStored.load(), - NiceBytes(Info.AttachmentBytesStored.load()), - Info.MissingAttachmentCount.load(), - remotestore_impl::GetStats(RemoteStore.GetStats(), TransferWallTimeMS))); + uint64_t TotalDownloads = + 1 + Info.AttachmentBlocksDownloaded.load() + Info.AttachmentBlocksRangesDownloaded.load() + Info.AttachmentsDownloaded.load(); + uint64_t TotalBytesDownloaded = Info.OplogSizeBytes + Info.AttachmentBlockBytesDownloaded.load() + + Info.AttachmentBlockRangeBytesDownloaded.load() + Info.AttachmentBytesDownloaded.load(); + + remotestore_impl::ReportMessage(OptionalContext, + fmt::format("Loaded oplog '{}' {} in {} ({}), Blocks: {} ({}), BlockRanges: {} ({}), Attachments: {} " + "({}), Total: {} ({}), Stored: {} ({}), Missing: {} {}", + RemoteStoreInfo.ContainerName, + Result.ErrorCode == 0 ? "SUCCESS" : "FAILURE", + NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0)), + NiceBytes(Info.OplogSizeBytes), + Info.AttachmentBlocksDownloaded.load(), + NiceBytes(Info.AttachmentBlockBytesDownloaded.load()), + Info.AttachmentBlocksRangesDownloaded.load(), + NiceBytes(Info.AttachmentBlockRangeBytesDownloaded.load()), + Info.AttachmentsDownloaded.load(), + NiceBytes(Info.AttachmentBytesDownloaded.load()), + TotalDownloads, + NiceBytes(TotalBytesDownloaded), + Info.AttachmentsStored.load(), + NiceBytes(Info.AttachmentBytesStored.load()), + Info.MissingAttachmentCount.load(), + remotestore_impl::GetStats(RemoteStore.GetStats(), TransferWallTimeMS))); return Result; } @@ -3697,6 +4193,9 @@ TEST_CASE_TEMPLATE("project.store.export", /*Force*/ false, /*IgnoreMissingAttachments*/ false, /*CleanOplog*/ false, + EPartialBlockRequestMode::Mixed, + /*HostLatencySec*/ -1.0, + /*CacheLatencySec*/ -1.0, nullptr); CHECK(ImportResult.ErrorCode == 0); @@ -3708,6 +4207,9 @@ TEST_CASE_TEMPLATE("project.store.export", /*Force*/ true, /*IgnoreMissingAttachments*/ false, /*CleanOplog*/ false, + EPartialBlockRequestMode::Mixed, + /*HostLatencySec*/ -1.0, + /*CacheLatencySec*/ -1.0, nullptr); CHECK(ImportForceResult.ErrorCode == 0); @@ -3719,6 +4221,9 @@ TEST_CASE_TEMPLATE("project.store.export", /*Force*/ false, /*IgnoreMissingAttachments*/ false, /*CleanOplog*/ true, + EPartialBlockRequestMode::Mixed, + /*HostLatencySec*/ -1.0, + /*CacheLatencySec*/ -1.0, nullptr); CHECK(ImportCleanResult.ErrorCode == 0); @@ -3730,6 +4235,9 @@ TEST_CASE_TEMPLATE("project.store.export", /*Force*/ true, /*IgnoreMissingAttachments*/ false, /*CleanOplog*/ true, + EPartialBlockRequestMode::Mixed, + /*HostLatencySec*/ -1.0, + /*CacheLatencySec*/ -1.0, nullptr); CHECK(ImportForceCleanResult.ErrorCode == 0); } |