diff options
| author | Dan Engelbrecht <[email protected]> | 2024-12-10 09:09:51 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2024-12-10 09:09:51 +0100 |
| commit | f714751342e996697e136dbf027cd12393fad493 (patch) | |
| tree | 7c43858db1d2f36013960647460ae9237fa0c3a6 /src | |
| parent | auth fixes (#260) (diff) | |
| download | zen-f714751342e996697e136dbf027cd12393fad493.tar.xz zen-f714751342e996697e136dbf027cd12393fad493.zip | |
improved payload validation in HttpClient (#259)
* improved payload validation in HttpClient
* separate error messages for FromCompressed and Decompress
* refactor so we can do retry if decompression of block fails
Diffstat (limited to 'src')
| -rw-r--r-- | src/zenhttp/httpclient.cpp | 78 | ||||
| -rw-r--r-- | src/zenserver/projectstore/remoteprojectstore.cpp | 2691 |
2 files changed, 1510 insertions, 1259 deletions
diff --git a/src/zenhttp/httpclient.cpp b/src/zenhttp/httpclient.cpp index 9af909fcf..d8ce25304 100644 --- a/src/zenhttp/httpclient.cpp +++ b/src/zenhttp/httpclient.cpp @@ -375,37 +375,75 @@ ShouldRetry(const cpr::Response& Response) static bool ValidatePayload(cpr::Response& Response, std::unique_ptr<detail::TempPayloadFile>& PayloadFile) { - if (IsHttpSuccessCode(Response.status_code)) + IoBuffer ResponseBuffer = (Response.text.empty() && PayloadFile) ? PayloadFile->BorrowIoBuffer() + : IoBuffer(IoBuffer::Wrap, Response.text.data(), Response.text.size()); + + if (auto ContentLength = Response.header.find("Content-Length"); ContentLength != Response.header.end()) + { + std::optional<uint64_t> ExpectedContentSize = ParseInt<uint64_t>(ContentLength->second); + if (!ExpectedContentSize.has_value()) + { + Response.error = + cpr::Error(/*CURLE_READ_ERROR*/ 26, fmt::format("Can not parse Content-Length header. Value: '{}'", ContentLength->second)); + return false; + } + if (ExpectedContentSize.value() != ResponseBuffer.GetSize()) + { + Response.error = cpr::Error( + /*CURLE_READ_ERROR*/ 26, + fmt::format("Payload size {} does not match Content-Length {}", ResponseBuffer.GetSize(), ContentLength->second)); + return false; + } + } + + if (auto JupiterHash = Response.header.find("X-Jupiter-IoHash"); JupiterHash != Response.header.end()) { - if (auto ContentType = Response.header.find("Content-Type"); - ContentType != Response.header.end() && ContentType->second == "application/x-ue-comp") + IoHash ExpectedPayloadHash; + if (IoHash::TryParse(JupiterHash->second, ExpectedPayloadHash)) + { + IoHash PayloadHash = IoHash::HashBuffer(ResponseBuffer); + if (PayloadHash != ExpectedPayloadHash) + { + Response.error = cpr::Error(/*CURLE_READ_ERROR*/ 26, + fmt::format("Payload hash {} does not match X-Jupiter-IoHash {}", + PayloadHash.ToHexString(), + ExpectedPayloadHash.ToHexString())); + return false; + } + } + } + + if (auto ContentType = Response.header.find("Content-Type"); ContentType != Response.header.end()) + { + if (ContentType->second == "application/x-ue-comp") { - IoBuffer ResponseBuffer = (Response.text.empty() && PayloadFile) - ? PayloadFile->BorrowIoBuffer() - : IoBuffer(IoBuffer::Wrap, Response.text.data(), Response.text.size()); IoHash RawHash; uint64_t RawSize; - if (!CompressedBuffer::ValidateCompressedHeader(ResponseBuffer, RawHash, RawSize)) + if (CompressedBuffer::ValidateCompressedHeader(ResponseBuffer, RawHash, RawSize)) + { + return true; + } + else { Response.error = cpr::Error(/*CURLE_READ_ERROR*/ 26, "Compressed binary failed validation"); return false; } } - else if (auto JupiterHash = Response.header.find("X-Jupiter-IoHash"); JupiterHash != Response.header.end()) + if (ContentType->second == "application/x-ue-cb") { - IoHash ExpectedPayloadHash; - if (IoHash::TryParse(JupiterHash->second, ExpectedPayloadHash)) + if (CbValidateError Error = ValidateCompactBinary(ResponseBuffer.GetView(), CbValidateMode::Default); + Error == CbValidateError::None) { - IoBuffer ResponseBuffer(IoBuffer::Wrap, Response.text.data(), Response.text.size()); - IoHash PayloadHash = IoHash::HashBuffer(ResponseBuffer); - if (PayloadHash != ExpectedPayloadHash) - { - Response.error = cpr::Error(/*CURLE_READ_ERROR*/ 26, "Compressed binary failed validation"); - return false; - } + return true; + } + else + { + Response.error = cpr::Error(/*CURLE_READ_ERROR*/ 26, fmt::format("Compact binary failed validation: {}", ToString(Error))); + return false; } } } + return true; } @@ -433,7 +471,11 @@ DoWithRetry(std::function<cpr::Response()>&& Func, std::unique_ptr<detail::TempP { if (!ShouldRetry(Result)) { - if (Result.error || ValidatePayload(Result, PayloadFile)) + if (Result.error || !IsHttpSuccessCode(Result.status_code)) + { + break; + } + if (ValidatePayload(Result, PayloadFile)) { break; } diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp index 137db255c..49403d39c 100644 --- a/src/zenserver/projectstore/remoteprojectstore.cpp +++ b/src/zenserver/projectstore/remoteprojectstore.cpp @@ -54,94 +54,1086 @@ namespace zen { uint8_t[chunksize])[ChunkCount] } */ +namespace remotestore_impl { + ////////////////////////////// AsyncRemoteResult -////////////////////////////// AsyncRemoteResult + struct AsyncRemoteResult + { + void SetError(int32_t ErrorCode, const std::string& ErrorReason, const std::string ErrorText) + { + int32_t Expected = 0; + if (m_ErrorCode.compare_exchange_weak(Expected, ErrorCode ? ErrorCode : -1)) + { + m_ErrorReason = ErrorReason; + m_ErrorText = ErrorText; + } + } + bool IsError() const { return m_ErrorCode.load() != 0; } + int GetError() const { return m_ErrorCode.load(); }; + const std::string& GetErrorReason() const { return m_ErrorReason; }; + const std::string& GetErrorText() const { return m_ErrorText; }; + RemoteProjectStore::Result ConvertResult(double ElapsedSeconds = 0.0) const + { + return RemoteProjectStore::Result{m_ErrorCode, ElapsedSeconds, m_ErrorReason, m_ErrorText}; + } -struct AsyncRemoteResult -{ - void SetError(int32_t ErrorCode, const std::string& ErrorReason, const std::string ErrorText) + private: + std::atomic<int32_t> m_ErrorCode = 0; + std::string m_ErrorReason; + std::string m_ErrorText; + }; + + void ReportProgress(JobContext* OptionalContext, + std::string_view CurrentOp, + std::string_view Details, + ptrdiff_t Total, + ptrdiff_t Remaining) { - int32_t Expected = 0; - if (m_ErrorCode.compare_exchange_weak(Expected, ErrorCode ? ErrorCode : -1)) + if (OptionalContext) { - m_ErrorReason = ErrorReason; - m_ErrorText = ErrorText; + ZEN_ASSERT(Total > 0); + OptionalContext->ReportProgress(CurrentOp, Details, Total, Remaining); } } - bool IsError() const { return m_ErrorCode.load() != 0; } - int GetError() const { return m_ErrorCode.load(); }; - const std::string& GetErrorReason() const { return m_ErrorReason; }; - const std::string& GetErrorText() const { return m_ErrorText; }; - RemoteProjectStore::Result ConvertResult(double ElapsedSeconds = 0.0) const + + void ReportMessage(JobContext* OptionalContext, std::string_view Message) { - return RemoteProjectStore::Result{m_ErrorCode, ElapsedSeconds, m_ErrorReason, m_ErrorText}; + if (OptionalContext) + { + OptionalContext->ReportMessage(Message); + } + ZEN_INFO("{}", Message); } -private: - std::atomic<int32_t> m_ErrorCode = 0; - std::string m_ErrorReason; - std::string m_ErrorText; -}; + bool IsCancelled(JobContext* OptionalContext) + { + if (!OptionalContext) + { + return false; + } + return OptionalContext->IsCancelled(); + } -void -ReportProgress(JobContext* OptionalContext, std::string_view CurrentOp, std::string_view Details, ptrdiff_t Total, ptrdiff_t Remaining) -{ - if (OptionalContext) + std::string GetStats(const RemoteProjectStore::Stats& Stats, uint64_t ElapsedWallTimeMS) { - ZEN_ASSERT(Total > 0); - OptionalContext->ReportProgress(CurrentOp, Details, Total, Remaining); + return fmt::format( + "Sent: {} ({}/s) Recv: {} ({}/s)", + NiceBytes(Stats.m_SentBytes), + NiceBytes(ElapsedWallTimeMS > 0u ? static_cast<uint64_t>((Stats.m_SentBytes * 1000) / ElapsedWallTimeMS) : 0u), + NiceBytes(Stats.m_ReceivedBytes), + NiceBytes(ElapsedWallTimeMS > 0u ? static_cast<uint64_t>((Stats.m_ReceivedBytes * 1000) / ElapsedWallTimeMS) : 0u)); } -} -void -ReportMessage(JobContext* OptionalContext, std::string_view Message) -{ - if (OptionalContext) + void LogRemoteStoreStatsDetails(const RemoteProjectStore::Stats& Stats) { - OptionalContext->ReportMessage(Message); + ZEN_INFO("Oplog request count: {}. Average request size: {}. Average request time: {}, Peak request speed: {}", + Stats.m_RequestCount, + NiceBytes(Stats.m_RequestCount > 0 ? (Stats.m_ReceivedBytes + Stats.m_SentBytes) / Stats.m_RequestCount : 0u), + NiceLatencyNs(Stats.m_RequestCount > 0 ? (Stats.m_RequestTimeNS / Stats.m_RequestCount) : 0u), + NiceBytes(Stats.m_PeakBytesPerSec)); + ZEN_INFO( + "Oplog sent request avg: {} ({}/s). Peak: {}", + NiceBytes(Stats.m_RequestCount > 0u ? static_cast<uint64_t>((Stats.m_SentBytes) / Stats.m_RequestCount) : 0u), + NiceBytes(Stats.m_RequestTimeNS > 0u ? static_cast<uint64_t>((Stats.m_SentBytes * 1000000000) / Stats.m_RequestTimeNS) : 0u), + NiceBytes(Stats.m_PeakSentBytes)); + ZEN_INFO("Oplog recv request avg: {} ({}/s). Peak: {}", + NiceBytes(Stats.m_RequestCount > 0u ? static_cast<uint64_t>((Stats.m_ReceivedBytes) / Stats.m_RequestCount) : 0u), + NiceBytes(Stats.m_RequestTimeNS > 0u ? static_cast<uint64_t>((Stats.m_ReceivedBytes * 1000000000) / Stats.m_RequestTimeNS) + : 0u), + NiceBytes(Stats.m_PeakReceivedBytes)); } - ZEN_INFO("{}", Message); -} -bool -IsCancelled(JobContext* OptionalContext) -{ - if (!OptionalContext) + struct Block { - return false; + IoHash BlockHash; + std::vector<IoHash> ChunksInBlock; + }; + + size_t AddBlock(RwLock& BlocksLock, std::vector<Block>& Blocks) + { + size_t BlockIndex; + { + RwLock::ExclusiveLockScope _(BlocksLock); + BlockIndex = Blocks.size(); + Blocks.resize(BlockIndex + 1); + } + return BlockIndex; } - return OptionalContext->IsCancelled(); -} -std::string -GetStats(const RemoteProjectStore::Stats& Stats, uint64_t ElapsedWallTimeMS) -{ - return fmt::format("Sent: {} ({}/s) Recv: {} ({}/s)", - NiceBytes(Stats.m_SentBytes), - NiceBytes(ElapsedWallTimeMS > 0u ? static_cast<uint64_t>((Stats.m_SentBytes * 1000) / ElapsedWallTimeMS) : 0u), - NiceBytes(Stats.m_ReceivedBytes), - NiceBytes(ElapsedWallTimeMS > 0u ? static_cast<uint64_t>((Stats.m_ReceivedBytes * 1000) / ElapsedWallTimeMS) : 0u)); -} + IoBuffer WriteToTempFile(CompressedBuffer&& CompressedBuffer, std::filesystem::path Path) + { + if (std::filesystem::is_regular_file(Path)) + { + IoBuffer ExistingTempFile = IoBuffer(IoBufferBuilder::MakeFromFile(Path)); + if (ExistingTempFile && ExistingTempFile.GetSize() == CompressedBuffer.GetCompressedSize()) + { + ExistingTempFile.SetDeleteOnClose(true); + return ExistingTempFile; + } + } + IoBuffer BlockBuffer; + BasicFile BlockFile; + uint32_t RetriesLeft = 3; + BlockFile.Open(Path, BasicFile::Mode::kTruncateDelete, [&](std::error_code& Ec) { + if (RetriesLeft == 0) + { + return false; + } + ZEN_WARN("Failed to create temporary oplog block '{}': '{}', retries left: {}.", Path, Ec.message(), RetriesLeft); + Sleep(100 - (3 - RetriesLeft) * 100); // Total 600 ms + RetriesLeft--; + return true; + }); + uint64_t Offset = 0; + { + CompositeBuffer Compressed = std::move(CompressedBuffer).GetCompressed(); + for (const SharedBuffer& Segment : Compressed.GetSegments()) + { + size_t SegmentSize = Segment.GetSize(); + static const uint64_t BufferingSize = 256u * 1024u; -void -LogRemoteStoreStatsDetails(const RemoteProjectStore::Stats& Stats) -{ - ZEN_INFO("Oplog request count: {}. Average request size: {}. Average request time: {}, Peak request speed: {}", - Stats.m_RequestCount, - NiceBytes(Stats.m_RequestCount > 0 ? (Stats.m_ReceivedBytes + Stats.m_SentBytes) / Stats.m_RequestCount : 0u), - NiceLatencyNs(Stats.m_RequestCount > 0 ? (Stats.m_RequestTimeNS / Stats.m_RequestCount) : 0u), - NiceBytes(Stats.m_PeakBytesPerSec)); - ZEN_INFO("Oplog sent request avg: {} ({}/s). Peak: {}", - NiceBytes(Stats.m_RequestCount > 0u ? static_cast<uint64_t>((Stats.m_SentBytes) / Stats.m_RequestCount) : 0u), - NiceBytes(Stats.m_RequestTimeNS > 0u ? static_cast<uint64_t>((Stats.m_SentBytes * 1000000000) / Stats.m_RequestTimeNS) : 0u), - NiceBytes(Stats.m_PeakSentBytes)); - ZEN_INFO( - "Oplog recv request avg: {} ({}/s). Peak: {}", - NiceBytes(Stats.m_RequestCount > 0u ? static_cast<uint64_t>((Stats.m_ReceivedBytes) / Stats.m_RequestCount) : 0u), - NiceBytes(Stats.m_RequestTimeNS > 0u ? static_cast<uint64_t>((Stats.m_ReceivedBytes * 1000000000) / Stats.m_RequestTimeNS) : 0u), - NiceBytes(Stats.m_PeakReceivedBytes)); -} + IoBufferFileReference FileRef; + if (SegmentSize >= (BufferingSize + BufferingSize / 2) && Segment.GetFileReference(FileRef)) + { + ScanFile(FileRef.FileHandle, + FileRef.FileChunkOffset, + FileRef.FileChunkSize, + BufferingSize, + [&BlockFile, &Offset](const void* Data, size_t Size) { + BlockFile.Write(Data, Size, Offset); + Offset += Size; + }); + } + else + { + BlockFile.Write(Segment.GetData(), SegmentSize, Offset); + Offset += SegmentSize; + } + } + } + void* FileHandle = BlockFile.Detach(); + BlockBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, Offset, /*IsWholeFile*/ true); + BlockBuffer.SetDeleteOnClose(true); + return BlockBuffer; + } + + RemoteProjectStore::Result WriteOplogSection(ProjectStore::Oplog& Oplog, const CbObjectView& SectionObject, JobContext* OptionalContext) + { + using namespace std::literals; + + Stopwatch Timer; + + CbArrayView OpsArray = SectionObject["ops"sv].AsArrayView(); + const uint64_t OpCount = OpsArray.Num(); + + ReportMessage(OptionalContext, fmt::format("Writing {} ops to oplog", OpCount)); + + const size_t OpsBatchSize = 8192; + std::vector<uint8_t> OpsData; + std::vector<size_t> OpDataOffsets; + size_t OpsCompleteCount = 0; + + OpsData.reserve(OpsBatchSize); + + auto AppendBatch = [&]() { + std::vector<CbObjectView> Ops; + Ops.reserve(OpDataOffsets.size()); + for (size_t OpDataOffset : OpDataOffsets) + { + Ops.emplace_back(CbObjectView(&OpsData[OpDataOffset])); + } + std::vector<uint32_t> OpLsns = Oplog.AppendNewOplogEntries(Ops); + OpsCompleteCount += OpLsns.size(); + OpsData.clear(); + OpDataOffsets.clear(); + ReportProgress(OptionalContext, + "Writing oplog"sv, + fmt::format("{} remaining...", OpCount - OpsCompleteCount), + OpCount, + OpCount - OpsCompleteCount); + }; + + BinaryWriter Writer; + for (CbFieldView OpEntry : OpsArray) + { + CbObjectView Op = OpEntry.AsObjectView(); + Op.CopyTo(Writer); + OpDataOffsets.push_back(OpsData.size()); + OpsData.insert(OpsData.end(), (const uint8_t*)Writer.GetData(), ((const uint8_t*)Writer.GetData()) + Writer.GetSize()); + Writer.Reset(); + + if (OpDataOffsets.size() == OpsBatchSize) + { + AppendBatch(); + } + } + if (!OpDataOffsets.empty()) + { + AppendBatch(); + } + + ReportProgress(OptionalContext, "Writing oplog"sv, ""sv, OpCount, 0); + + return RemoteProjectStore::Result{.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0}; + } + + struct DownloadInfo + { + uint64_t OplogSizeBytes = 0; + std::atomic<uint64_t> AttachmentsDownloaded = 0; + std::atomic<uint64_t> AttachmentBlocksDownloaded = 0; + std::atomic<uint64_t> AttachmentBytesDownloaded = 0; + std::atomic<uint64_t> AttachmentBlockBytesDownloaded = 0; + std::atomic<uint64_t> AttachmentsStored = 0; + std::atomic<uint64_t> AttachmentBytesStored = 0; + std::atomic_size_t MissingAttachmentCount = 0; + }; + + void DownloadAndSaveBlockChunks(CidStore& ChunkStore, + RemoteProjectStore& RemoteStore, + bool IgnoreMissingAttachments, + JobContext* OptionalContext, + WorkerThreadPool& NetworkWorkerPool, + WorkerThreadPool& WorkerPool, + Latch& AttachmentsDownloadLatch, + Latch& AttachmentsWriteLatch, + AsyncRemoteResult& RemoteResult, + DownloadInfo& Info, + Stopwatch& LoadAttachmentsTimer, + std::atomic_uint64_t& DownloadStartMS, + const std::vector<IoHash>& Chunks) + { + AttachmentsDownloadLatch.AddCount(1); + NetworkWorkerPool.ScheduleWork([&RemoteStore, + &ChunkStore, + &WorkerPool, + &AttachmentsDownloadLatch, + &AttachmentsWriteLatch, + &RemoteResult, + Chunks = Chunks, + &Info, + &LoadAttachmentsTimer, + &DownloadStartMS, + IgnoreMissingAttachments, + OptionalContext]() { + auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); }); + if (RemoteResult.IsError()) + { + return; + } + try + { + uint64_t Unset = (std::uint64_t)-1; + DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs()); + RemoteProjectStore::LoadAttachmentsResult Result = RemoteStore.LoadAttachments(Chunks); + if (Result.ErrorCode) + { + ReportMessage(OptionalContext, + fmt::format("Failed to load attachments with {} chunks ({}): {}", + Chunks.size(), + RemoteResult.GetError(), + RemoteResult.GetErrorReason())); + Info.MissingAttachmentCount.fetch_add(1); + if (IgnoreMissingAttachments) + { + RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); + } + return; + } + Info.AttachmentsDownloaded.fetch_add(Chunks.size()); + ZEN_INFO("Loaded {} bulk attachments in {}", + Chunks.size(), + NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000))); + if (RemoteResult.IsError()) + { + return; + } + AttachmentsWriteLatch.AddCount(1); + WorkerPool.ScheduleWork([&AttachmentsWriteLatch, &RemoteResult, &Info, &ChunkStore, Chunks = std::move(Result.Chunks)]() { + auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); }); + if (RemoteResult.IsError()) + { + return; + } + if (!Chunks.empty()) + { + try + { + std::vector<IoBuffer> WriteAttachmentBuffers; + std::vector<IoHash> WriteRawHashes; + WriteAttachmentBuffers.reserve(Chunks.size()); + WriteRawHashes.reserve(Chunks.size()); + + for (const auto& It : Chunks) + { + uint64_t ChunkSize = It.second.GetCompressedSize(); + Info.AttachmentBytesDownloaded.fetch_add(ChunkSize); + WriteAttachmentBuffers.push_back(It.second.GetCompressed().Flatten().AsIoBuffer()); + WriteRawHashes.push_back(It.first); + } + std::vector<CidStore::InsertResult> InsertResults = + ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes, CidStore::InsertMode::kCopyOnly); + + for (size_t Index = 0; Index < InsertResults.size(); Index++) + { + if (InsertResults[Index].New) + { + Info.AttachmentBytesStored.fetch_add(WriteAttachmentBuffers[Index].GetSize()); + Info.AttachmentsStored.fetch_add(1); + } + } + } + catch (const std::exception& Ex) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), + fmt::format("Failed to bulk save {} attachments", Chunks.size()), + Ex.what()); + } + } + }); + } + catch (const std::exception& Ex) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), + fmt::format("Failed to bulk load {} attachments", Chunks.size()), + Ex.what()); + } + }); + }; + + void DownloadAndSaveBlock(CidStore& ChunkStore, + RemoteProjectStore& RemoteStore, + bool IgnoreMissingAttachments, + JobContext* OptionalContext, + WorkerThreadPool& NetworkWorkerPool, + WorkerThreadPool& WorkerPool, + Latch& AttachmentsDownloadLatch, + Latch& AttachmentsWriteLatch, + AsyncRemoteResult& RemoteResult, + DownloadInfo& Info, + Stopwatch& LoadAttachmentsTimer, + std::atomic_uint64_t& DownloadStartMS, + const IoHash& BlockHash, + const std::vector<IoHash>& Chunks, + uint32_t RetriesLeft) + { + AttachmentsDownloadLatch.AddCount(1); + NetworkWorkerPool.ScheduleWork([&AttachmentsDownloadLatch, + &AttachmentsWriteLatch, + &ChunkStore, + &RemoteStore, + &NetworkWorkerPool, + &WorkerPool, + BlockHash, + &RemoteResult, + &Info, + &LoadAttachmentsTimer, + &DownloadStartMS, + IgnoreMissingAttachments, + OptionalContext, + RetriesLeft, + Chunks = Chunks]() { + auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); }); + if (RemoteResult.IsError()) + { + return; + } + try + { + uint64_t Unset = (std::uint64_t)-1; + DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs()); + RemoteProjectStore::LoadAttachmentResult BlockResult = RemoteStore.LoadAttachment(BlockHash); + if (BlockResult.ErrorCode) + { + ReportMessage(OptionalContext, + fmt::format("Failed to download block attachment {} ({}): {}", + BlockHash, + RemoteResult.GetError(), + RemoteResult.GetErrorReason())); + Info.MissingAttachmentCount.fetch_add(1); + if (!IgnoreMissingAttachments) + { + RemoteResult.SetError(BlockResult.ErrorCode, BlockResult.Reason, BlockResult.Text); + } + return; + } + if (RemoteResult.IsError()) + { + return; + } + uint64_t BlockSize = BlockResult.Bytes.GetSize(); + Info.AttachmentBlocksDownloaded.fetch_add(1); + ZEN_INFO("Loaded block attachment '{}' in {} ({})", + BlockHash, + NiceTimeSpanMs(static_cast<uint64_t>(BlockResult.ElapsedSeconds * 1000)), + NiceBytes(BlockSize)); + Info.AttachmentBlockBytesDownloaded.fetch_add(BlockSize); + + AttachmentsWriteLatch.AddCount(1); + WorkerPool.ScheduleWork([&AttachmentsDownloadLatch, + &AttachmentsWriteLatch, + &ChunkStore, + &RemoteStore, + &NetworkWorkerPool, + &WorkerPool, + BlockHash, + &RemoteResult, + &Info, + &LoadAttachmentsTimer, + &DownloadStartMS, + IgnoreMissingAttachments, + OptionalContext, + RetriesLeft, + Chunks = Chunks, + Bytes = std::move(BlockResult.Bytes)]() { + auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); }); + if (RemoteResult.IsError()) + { + return; + } + try + { + ZEN_ASSERT(Bytes.Size() > 0); + std::unordered_set<IoHash, IoHash::Hasher> WantedChunks; + WantedChunks.reserve(Chunks.size()); + WantedChunks.insert(Chunks.begin(), Chunks.end()); + std::vector<IoBuffer> WriteAttachmentBuffers; + std::vector<IoHash> WriteRawHashes; + IoHash RawHash; + uint64_t RawSize; + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Bytes), RawHash, RawSize); + if (!Compressed) + { + if (RetriesLeft > 0) + { + ReportMessage( + OptionalContext, + fmt::format("Block attachment {} is malformed, can't parse as compressed binary, retrying download", + BlockHash)); + return DownloadAndSaveBlock(ChunkStore, + RemoteStore, + IgnoreMissingAttachments, + OptionalContext, + NetworkWorkerPool, + WorkerPool, + AttachmentsDownloadLatch, + AttachmentsWriteLatch, + RemoteResult, + Info, + LoadAttachmentsTimer, + DownloadStartMS, + BlockHash, + std::move(Chunks), + RetriesLeft - 1); + } + ReportMessage(OptionalContext, + fmt::format("Block attachment {} is malformed, can't parse as compressed binary", BlockHash)); + RemoteResult.SetError( + gsl::narrow<int32_t>(HttpResponseCode::InternalServerError), + fmt::format("Block attachment {} is malformed, can't parse as compressed binary", BlockHash), + {}); + return; + } + SharedBuffer BlockPayload = Compressed.Decompress(); + if (!BlockPayload) + { + if (RetriesLeft > 0) + { + ReportMessage(OptionalContext, + fmt::format("Block attachment {} is malformed, can't decompress payload, retrying download", + BlockHash)); + return DownloadAndSaveBlock(ChunkStore, + RemoteStore, + IgnoreMissingAttachments, + OptionalContext, + NetworkWorkerPool, + WorkerPool, + AttachmentsDownloadLatch, + AttachmentsWriteLatch, + RemoteResult, + Info, + LoadAttachmentsTimer, + DownloadStartMS, + BlockHash, + std::move(Chunks), + RetriesLeft - 1); + } + ReportMessage(OptionalContext, + fmt::format("Block attachment {} is malformed, can't decompress payload", BlockHash)); + RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::InternalServerError), + fmt::format("Block attachment {} is malformed, can't decompress payload", BlockHash), + {}); + return; + } + if (RawHash != BlockHash) + { + ReportMessage(OptionalContext, + fmt::format("Block attachment {} has mismatching raw hash ({})", BlockHash, RawHash)); + RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::InternalServerError), + fmt::format("Block attachment {} has mismatching raw hash ({})", BlockHash, RawHash), + {}); + return; + } + + bool StoreChunksOK = IterateBlock( + BlockPayload, + [&WantedChunks, &WriteAttachmentBuffers, &WriteRawHashes, &Info](CompressedBuffer&& Chunk, + const IoHash& AttachmentRawHash) { + if (WantedChunks.contains(AttachmentRawHash)) + { + WriteAttachmentBuffers.emplace_back(Chunk.GetCompressed().Flatten().AsIoBuffer()); + IoHash RawHash; + uint64_t RawSize; + ZEN_ASSERT(CompressedBuffer::ValidateCompressedHeader(WriteAttachmentBuffers.back(), RawHash, RawSize)); + ZEN_ASSERT(RawHash == AttachmentRawHash); + WriteRawHashes.emplace_back(AttachmentRawHash); + WantedChunks.erase(AttachmentRawHash); + } + }); + + if (!StoreChunksOK) + { + ReportMessage(OptionalContext, + fmt::format("Block attachment {} has invalid format ({}): {}", + BlockHash, + RemoteResult.GetError(), + RemoteResult.GetErrorReason())); + RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::InternalServerError), + fmt::format("Invalid format for block {}", BlockHash), + {}); + return; + } + + ZEN_ASSERT(WantedChunks.empty()); + + if (!WriteAttachmentBuffers.empty()) + { + auto Results = ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes); + for (size_t Index = 0; Index < Results.size(); Index++) + { + const auto& Result = Results[Index]; + if (Result.New) + { + Info.AttachmentBytesStored.fetch_add(WriteAttachmentBuffers[Index].GetSize()); + Info.AttachmentsStored.fetch_add(1); + } + } + } + } + catch (const std::exception& Ex) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), + fmt::format("Failed save block attachment {}", BlockHash), + Ex.what()); + } + }); + } + catch (const std::exception& Ex) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), + fmt::format("Failed to block attachment {}", BlockHash), + Ex.what()); + } + }); + }; + + void DownloadAndSaveAttachment(CidStore& ChunkStore, + RemoteProjectStore& RemoteStore, + bool IgnoreMissingAttachments, + JobContext* OptionalContext, + WorkerThreadPool& NetworkWorkerPool, + WorkerThreadPool& WorkerPool, + Latch& AttachmentsDownloadLatch, + Latch& AttachmentsWriteLatch, + AsyncRemoteResult& RemoteResult, + DownloadInfo& Info, + Stopwatch& LoadAttachmentsTimer, + std::atomic_uint64_t& DownloadStartMS, + const IoHash& RawHash) + { + AttachmentsDownloadLatch.AddCount(1); + NetworkWorkerPool.ScheduleWork([&RemoteStore, + &ChunkStore, + &WorkerPool, + &RemoteResult, + &AttachmentsDownloadLatch, + &AttachmentsWriteLatch, + RawHash, + &LoadAttachmentsTimer, + &DownloadStartMS, + &Info, + IgnoreMissingAttachments, + OptionalContext]() { + auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); }); + if (RemoteResult.IsError()) + { + return; + } + try + { + uint64_t Unset = (std::uint64_t)-1; + DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs()); + RemoteProjectStore::LoadAttachmentResult AttachmentResult = RemoteStore.LoadAttachment(RawHash); + if (AttachmentResult.ErrorCode) + { + ReportMessage(OptionalContext, + fmt::format("Failed to download large attachment {}: '{}', error code : {}", + RawHash, + AttachmentResult.Reason, + AttachmentResult.ErrorCode)); + Info.MissingAttachmentCount.fetch_add(1); + if (!IgnoreMissingAttachments) + { + RemoteResult.SetError(AttachmentResult.ErrorCode, AttachmentResult.Reason, AttachmentResult.Text); + } + return; + } + uint64_t AttachmentSize = AttachmentResult.Bytes.GetSize(); + ZEN_INFO("Loaded large attachment '{}' in {} ({})", + RawHash, + NiceTimeSpanMs(static_cast<uint64_t>(AttachmentResult.ElapsedSeconds * 1000)), + NiceBytes(AttachmentSize)); + Info.AttachmentsDownloaded.fetch_add(1); + if (RemoteResult.IsError()) + { + return; + } + Info.AttachmentBytesDownloaded.fetch_add(AttachmentSize); + + AttachmentsWriteLatch.AddCount(1); + WorkerPool.ScheduleWork([&AttachmentsWriteLatch, + &RemoteResult, + &Info, + &ChunkStore, + RawHash, + AttachmentSize, + Bytes = std::move(AttachmentResult.Bytes), + OptionalContext]() { + auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); }); + if (RemoteResult.IsError()) + { + return; + } + try + { + CidStore::InsertResult InsertResult = ChunkStore.AddChunk(Bytes, RawHash); + if (InsertResult.New) + { + Info.AttachmentBytesStored.fetch_add(AttachmentSize); + Info.AttachmentsStored.fetch_add(1); + } + } + catch (const std::exception& Ex) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), + fmt::format("Saving attachment {} failed", RawHash), + Ex.what()); + } + }); + } + catch (const std::exception& Ex) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), + fmt::format("Loading attachment {} failed", RawHash), + Ex.what()); + } + }); + }; + + void CreateBlock(WorkerThreadPool& WorkerPool, + Latch& OpSectionsLatch, + std::vector<std::pair<IoHash, FetchChunkFunc>>&& ChunksInBlock, + RwLock& SectionsLock, + std::vector<Block>& Blocks, + size_t BlockIndex, + const std::function<void(CompressedBuffer&&, const IoHash&)>& AsyncOnBlock, + AsyncRemoteResult& RemoteResult) + { + OpSectionsLatch.AddCount(1); + WorkerPool.ScheduleWork([&Blocks, + &SectionsLock, + &OpSectionsLatch, + BlockIndex, + Chunks = std::move(ChunksInBlock), + &AsyncOnBlock, + &RemoteResult]() mutable { + auto _ = MakeGuard([&OpSectionsLatch] { OpSectionsLatch.CountDown(); }); + if (RemoteResult.IsError()) + { + return; + } + size_t ChunkCount = Chunks.size(); + try + { + ZEN_ASSERT(ChunkCount > 0); + Stopwatch Timer; + CompressedBuffer CompressedBlock = GenerateBlock(std::move(Chunks)); + IoHash BlockHash = CompressedBlock.DecodeRawHash(); + { + // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index + RwLock::SharedLockScope __(SectionsLock); + Blocks[BlockIndex].BlockHash = BlockHash; + } + uint64_t BlockSize = CompressedBlock.GetCompressedSize(); + AsyncOnBlock(std::move(CompressedBlock), BlockHash); + ZEN_INFO("Generated block with {} attachments in {} ({})", + ChunkCount, + NiceTimeSpanMs(Timer.GetElapsedTimeMs()), + NiceBytes(BlockSize)); + } + catch (const std::exception& Ex) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), + fmt::format("Failed creating block {} with {} chunks", BlockIndex, ChunkCount), + Ex.what()); + } + }); + } + + struct UploadInfo + { + uint64_t OplogSizeBytes = 0; + std::atomic<uint64_t> AttachmentsUploaded = 0; + std::atomic<uint64_t> AttachmentBlocksUploaded = 0; + std::atomic<uint64_t> AttachmentBytesUploaded = 0; + std::atomic<uint64_t> AttachmentBlockBytesUploaded = 0; + }; + + void UploadAttachments(WorkerThreadPool& WorkerPool, + CidStore& ChunkStore, + RemoteProjectStore& RemoteStore, + const std::unordered_set<IoHash, IoHash::Hasher>& LargeAttachments, + const std::vector<std::vector<std::pair<IoHash, FetchChunkFunc>>>& BlockChunks, + const std::unordered_map<IoHash, IoBuffer, IoHash::Hasher>& CreatedBlocks, + const tsl::robin_map<IoHash, TGetAttachmentBufferFunc, IoHash::Hasher>& LooseFileAttachments, + const std::unordered_set<IoHash, IoHash::Hasher>& Needs, + bool ForceAll, + UploadInfo& Info, + AsyncRemoteResult& RemoteResult, + JobContext* OptionalContext) + { + using namespace std::literals; + + if (Needs.empty() && !ForceAll) + { + return; + } + + ReportMessage(OptionalContext, "Filtering needed attachments for upload..."); + + std::unordered_set<IoHash, IoHash::Hasher> AttachmentsToUpload; + std::unordered_map<IoHash, FetchChunkFunc, IoHash::Hasher> BulkBlockAttachmentsToUpload; + + size_t BlockAttachmentCountToUpload = 0; + size_t LargeAttachmentCountToUpload = 0; + size_t BulkAttachmentCountToUpload = 0; + AttachmentsToUpload.reserve(ForceAll ? CreatedBlocks.size() + LargeAttachments.size() : Needs.size()); + + std::unordered_set<IoHash, IoHash::Hasher> UnknownAttachments(Needs); + + for (const auto& CreatedBlock : CreatedBlocks) + { + if (ForceAll || Needs.contains(CreatedBlock.first)) + { + AttachmentsToUpload.insert(CreatedBlock.first); + BlockAttachmentCountToUpload++; + UnknownAttachments.erase(CreatedBlock.first); + } + } + for (const IoHash& LargeAttachment : LargeAttachments) + { + if (ForceAll || Needs.contains(LargeAttachment)) + { + AttachmentsToUpload.insert(LargeAttachment); + LargeAttachmentCountToUpload++; + UnknownAttachments.erase(LargeAttachment); + } + } + for (const std::vector<std::pair<IoHash, FetchChunkFunc>>& BlockHashes : BlockChunks) + { + for (const std::pair<IoHash, FetchChunkFunc>& Chunk : BlockHashes) + { + if (ForceAll || Needs.contains(Chunk.first)) + { + BulkBlockAttachmentsToUpload.insert(std::make_pair(Chunk.first, Chunk.second)); + BulkAttachmentCountToUpload++; + UnknownAttachments.erase(Chunk.first); + } + } + } + + if (AttachmentsToUpload.empty() && BulkBlockAttachmentsToUpload.empty()) + { + ReportMessage(OptionalContext, "No attachments needed"); + return; + } + + if (!UnknownAttachments.empty()) + { + RemoteResult.SetError( + gsl::narrow<int>(HttpResponseCode::NotFound), + fmt::format("Upload requested of {} missing attachments, the base container referenced blocks that are no longer available", + UnknownAttachments.size()), + ""); + ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); + return; + } + + if (IsCancelled(OptionalContext)) + { + if (!RemoteResult.IsError()) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); + ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); + } + return; + } + + ReportMessage(OptionalContext, + fmt::format("Saving {} attachments ({} blocks, {} attachments, {} bulk attachments)", + AttachmentsToUpload.size() + BulkBlockAttachmentsToUpload.size(), + BlockAttachmentCountToUpload, + LargeAttachmentCountToUpload, + BulkAttachmentCountToUpload)); + + Stopwatch Timer; + + ptrdiff_t AttachmentsToSave(0); + Latch SaveAttachmentsLatch(1); + + for (const IoHash& RawHash : AttachmentsToUpload) + { + if (RemoteResult.IsError()) + { + break; + } + + SaveAttachmentsLatch.AddCount(1); + AttachmentsToSave++; + WorkerPool.ScheduleWork([&ChunkStore, + &RemoteStore, + &SaveAttachmentsLatch, + &RemoteResult, + RawHash, + &CreatedBlocks, + &LooseFileAttachments, + &Info, + OptionalContext]() { + auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); }); + if (RemoteResult.IsError()) + { + return; + } + try + { + bool IsBlock = false; + IoBuffer Payload; + if (auto BlockIt = CreatedBlocks.find(RawHash); BlockIt != CreatedBlocks.end()) + { + Payload = BlockIt->second; + IsBlock = true; + } + else if (auto LooseTmpFileIt = LooseFileAttachments.find(RawHash); LooseTmpFileIt != LooseFileAttachments.end()) + { + Payload = LooseTmpFileIt->second(RawHash); + } + else + { + Payload = ChunkStore.FindChunkByCid(RawHash); + } + if (!Payload) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), + fmt::format("Failed to find attachment {}", RawHash), + {}); + ZEN_WARN("Failed to save attachment '{}' ({}): {}", + RawHash, + RemoteResult.GetError(), + RemoteResult.GetErrorReason()); + return; + } + size_t PayloadSize = Payload.GetSize(); + RemoteProjectStore::SaveAttachmentResult Result = + RemoteStore.SaveAttachment(CompositeBuffer(SharedBuffer(std::move(Payload))), RawHash); + if (Result.ErrorCode) + { + RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); + ReportMessage(OptionalContext, + fmt::format("Failed to save attachment '{}', {} ({}): {}", + RawHash, + NiceBytes(PayloadSize), + RemoteResult.GetError(), + RemoteResult.GetErrorReason())); + return; + } + if (IsBlock) + { + Info.AttachmentBlocksUploaded.fetch_add(1); + Info.AttachmentBlockBytesUploaded.fetch_add(PayloadSize); + ZEN_INFO("Saved block attachment '{}' in {} ({})", + RawHash, + NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)), + NiceBytes(PayloadSize)); + } + else + { + Info.AttachmentsUploaded.fetch_add(1); + Info.AttachmentBytesUploaded.fetch_add(PayloadSize); + ZEN_INFO("Saved large attachment '{}' in {} ({})", + RawHash, + NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)), + NiceBytes(PayloadSize)); + } + } + catch (const std::exception& Ex) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), + fmt::format("To upload attachment {}", RawHash), + Ex.what()); + } + }); + } + + if (IsCancelled(OptionalContext)) + { + if (!RemoteResult.IsError()) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); + ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); + } + return; + } + + if (!BulkBlockAttachmentsToUpload.empty()) + { + for (const std::vector<std::pair<IoHash, FetchChunkFunc>>& Chunks : BlockChunks) + { + if (RemoteResult.IsError()) + { + break; + } + + std::vector<IoHash> NeededChunks; + NeededChunks.reserve(Chunks.size()); + for (const std::pair<IoHash, FetchChunkFunc>& Chunk : Chunks) + { + const IoHash& ChunkHash = Chunk.first; + if (BulkBlockAttachmentsToUpload.contains(ChunkHash) && !AttachmentsToUpload.contains(ChunkHash)) + { + NeededChunks.push_back(Chunk.first); + } + } + if (NeededChunks.empty()) + { + continue; + } + + SaveAttachmentsLatch.AddCount(1); + AttachmentsToSave++; + WorkerPool.ScheduleWork([&RemoteStore, + &ChunkStore, + &SaveAttachmentsLatch, + &RemoteResult, + NeededChunks = std::move(NeededChunks), + &BulkBlockAttachmentsToUpload, + &Info, + OptionalContext]() { + auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); }); + if (RemoteResult.IsError()) + { + return; + } + try + { + size_t ChunksSize = 0; + std::vector<SharedBuffer> ChunkBuffers; + ChunkBuffers.reserve(NeededChunks.size()); + for (const IoHash& Chunk : NeededChunks) + { + auto It = BulkBlockAttachmentsToUpload.find(Chunk); + ZEN_ASSERT(It != BulkBlockAttachmentsToUpload.end()); + CompositeBuffer ChunkPayload = It->second(It->first); + if (!ChunkPayload) + { + RemoteResult.SetError(static_cast<int32_t>(HttpResponseCode::NotFound), + fmt::format("Missing chunk {}"sv, Chunk), + fmt::format("Unable to fetch attachment {} required by the oplog"sv, Chunk)); + ChunkBuffers.clear(); + break; + } + ChunksSize += ChunkPayload.GetSize(); + ChunkBuffers.emplace_back(SharedBuffer(std::move(ChunkPayload).Flatten().AsIoBuffer())); + } + RemoteProjectStore::SaveAttachmentsResult Result = RemoteStore.SaveAttachments(ChunkBuffers); + if (Result.ErrorCode) + { + RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); + ReportMessage(OptionalContext, + fmt::format("Failed to save attachments with {} chunks ({}): {}", + NeededChunks.size(), + RemoteResult.GetError(), + RemoteResult.GetErrorReason())); + return; + } + Info.AttachmentsUploaded.fetch_add(ChunkBuffers.size()); + Info.AttachmentBytesUploaded.fetch_add(ChunksSize); + + ZEN_INFO("Saved {} bulk attachments in {} ({})", + NeededChunks.size(), + NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)), + NiceBytes(ChunksSize)); + } + catch (const std::exception& Ex) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), + fmt::format("Failed to buck upload {} attachments", NeededChunks.size()), + Ex.what()); + } + }); + } + } + + SaveAttachmentsLatch.CountDown(); + while (!SaveAttachmentsLatch.Wait(1000)) + { + ptrdiff_t Remaining = SaveAttachmentsLatch.Remaining(); + if (IsCancelled(OptionalContext)) + { + if (!RemoteResult.IsError()) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); + ReportMessage(OptionalContext, + fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); + } + } + uint64_t PartialTransferWallTimeMS = Timer.GetElapsedTimeMs(); + ReportProgress(OptionalContext, + "Saving attachments"sv, + fmt::format("{} remaining... {}", Remaining, GetStats(RemoteStore.GetStats(), PartialTransferWallTimeMS)), + AttachmentsToSave, + Remaining); + } + uint64_t ElapsedTimeMS = Timer.GetElapsedTimeMs(); + if (AttachmentsToSave > 0) + { + ReportProgress(OptionalContext, + "Saving attachments"sv, + fmt::format("{}", GetStats(RemoteStore.GetStats(), ElapsedTimeMS)), + AttachmentsToSave, + 0); + } + ReportMessage(OptionalContext, + fmt::format("Saved {} attachments ({} blocks, {} attachments, {} bulk attachments) in {} {}", + AttachmentsToUpload.size() + BulkBlockAttachmentsToUpload.size(), + BlockAttachmentCountToUpload, + LargeAttachmentCountToUpload, + BulkAttachmentCountToUpload, + NiceTimeSpanMs(ElapsedTimeMS), + GetStats(RemoteStore.GetStats(), ElapsedTimeMS))); + } + +} // namespace remotestore_impl bool IterateBlock(const SharedBuffer& BlockPayload, std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor) { @@ -219,128 +1211,6 @@ GenerateBlock(std::vector<std::pair<IoHash, FetchChunkFunc>>&& FetchChunks) return CompressedBlock; } -struct Block -{ - IoHash BlockHash; - std::vector<IoHash> ChunksInBlock; -}; - -void -CreateBlock(WorkerThreadPool& WorkerPool, - Latch& OpSectionsLatch, - std::vector<std::pair<IoHash, FetchChunkFunc>>&& ChunksInBlock, - RwLock& SectionsLock, - std::vector<Block>& Blocks, - size_t BlockIndex, - const std::function<void(CompressedBuffer&&, const IoHash&)>& AsyncOnBlock, - AsyncRemoteResult& RemoteResult) -{ - OpSectionsLatch.AddCount(1); - WorkerPool.ScheduleWork( - [&Blocks, &SectionsLock, &OpSectionsLatch, BlockIndex, Chunks = std::move(ChunksInBlock), &AsyncOnBlock, &RemoteResult]() mutable { - auto _ = MakeGuard([&OpSectionsLatch] { OpSectionsLatch.CountDown(); }); - if (RemoteResult.IsError()) - { - return; - } - size_t ChunkCount = Chunks.size(); - try - { - ZEN_ASSERT(ChunkCount > 0); - Stopwatch Timer; - CompressedBuffer CompressedBlock = GenerateBlock(std::move(Chunks)); - IoHash BlockHash = CompressedBlock.DecodeRawHash(); - { - // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index - RwLock::SharedLockScope __(SectionsLock); - Blocks[BlockIndex].BlockHash = BlockHash; - } - uint64_t BlockSize = CompressedBlock.GetCompressedSize(); - AsyncOnBlock(std::move(CompressedBlock), BlockHash); - ZEN_INFO("Generated block with {} attachments in {} ({})", - ChunkCount, - NiceTimeSpanMs(Timer.GetElapsedTimeMs()), - NiceBytes(BlockSize)); - } - catch (const std::exception& Ex) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), - fmt::format("Failed creating block {} with {} chunks", BlockIndex, ChunkCount), - Ex.what()); - } - }); -} - -static size_t -AddBlock(RwLock& BlocksLock, std::vector<Block>& Blocks) -{ - size_t BlockIndex; - { - RwLock::ExclusiveLockScope _(BlocksLock); - BlockIndex = Blocks.size(); - Blocks.resize(BlockIndex + 1); - } - return BlockIndex; -} - -static IoBuffer -WriteToTempFile(CompressedBuffer&& CompressedBuffer, std::filesystem::path Path) -{ - if (std::filesystem::is_regular_file(Path)) - { - IoBuffer ExistingTempFile = IoBuffer(IoBufferBuilder::MakeFromFile(Path)); - if (ExistingTempFile && ExistingTempFile.GetSize() == CompressedBuffer.GetCompressedSize()) - { - ExistingTempFile.SetDeleteOnClose(true); - return ExistingTempFile; - } - } - IoBuffer BlockBuffer; - BasicFile BlockFile; - uint32_t RetriesLeft = 3; - BlockFile.Open(Path, BasicFile::Mode::kTruncateDelete, [&](std::error_code& Ec) { - if (RetriesLeft == 0) - { - return false; - } - ZEN_WARN("Failed to create temporary oplog block '{}': '{}', retries left: {}.", Path, Ec.message(), RetriesLeft); - Sleep(100 - (3 - RetriesLeft) * 100); // Total 600 ms - RetriesLeft--; - return true; - }); - uint64_t Offset = 0; - { - CompositeBuffer Compressed = std::move(CompressedBuffer).GetCompressed(); - for (const SharedBuffer& Segment : Compressed.GetSegments()) - { - size_t SegmentSize = Segment.GetSize(); - static const uint64_t BufferingSize = 256u * 1024u; - - IoBufferFileReference FileRef; - if (SegmentSize >= (BufferingSize + BufferingSize / 2) && Segment.GetFileReference(FileRef)) - { - ScanFile(FileRef.FileHandle, - FileRef.FileChunkOffset, - FileRef.FileChunkSize, - BufferingSize, - [&BlockFile, &Offset](const void* Data, size_t Size) { - BlockFile.Write(Data, Size, Offset); - Offset += Size; - }); - } - else - { - BlockFile.Write(Segment.GetData(), SegmentSize, Offset); - Offset += SegmentSize; - } - } - } - void* FileHandle = BlockFile.Detach(); - BlockBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, Offset, /*IsWholeFile*/ true); - BlockBuffer.SetDeleteOnClose(true); - return BlockBuffer; -} - CbObject BuildContainer(CidStore& ChunkStore, ProjectStore::Project& Project, @@ -351,14 +1221,14 @@ BuildContainer(CidStore& ChunkStore, bool BuildBlocks, bool IgnoreMissingAttachments, bool AllowChunking, - const std::vector<Block>& KnownBlocks, + const std::vector<remotestore_impl::Block>& KnownBlocks, WorkerThreadPool& WorkerPool, const std::function<void(CompressedBuffer&&, const IoHash&)>& AsyncOnBlock, const std::function<void(const IoHash&, TGetAttachmentBufferFunc&&)>& OnLargeAttachment, const std::function<void(std::vector<std::pair<IoHash, FetchChunkFunc>>&&)>& OnBlockChunks, bool EmbedLooseFiles, JobContext* OptionalContext, - AsyncRemoteResult& RemoteResult) + remotestore_impl::AsyncRemoteResult& RemoteResult) { using namespace std::literals; @@ -375,9 +1245,9 @@ BuildContainer(CidStore& ChunkStore, std::unordered_map<IoHash, FoundAttachment, IoHash::Hasher> UploadAttachments; - RwLock BlocksLock; - std::vector<Block> Blocks; - CompressedBuffer OpsBuffer; + RwLock BlocksLock; + std::vector<remotestore_impl::Block> Blocks; + CompressedBuffer OpsBuffer; std::filesystem::path AttachmentTempPath = Oplog.TempPath(); AttachmentTempPath.append(".pending"); @@ -397,11 +1267,12 @@ BuildContainer(CidStore& ChunkStore, for (CbFieldView& Field : Files) { - if (IsCancelled(OptionalContext)) + if (remotestore_impl::IsCancelled(OptionalContext)) { RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); - ReportMessage(OptionalContext, - fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); + remotestore_impl::ReportMessage( + OptionalContext, + fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); CB(Op); return; } @@ -418,8 +1289,9 @@ BuildContainer(CidStore& ChunkStore, std::filesystem::path FilePath = Project.RootDir / ServerPath; if (!std::filesystem::is_regular_file(FilePath)) { - ReportMessage(OptionalContext, - fmt::format("Missing attachment '{}' for op '{}'", FilePath, View["id"sv].AsObjectId())); + remotestore_impl::ReportMessage( + OptionalContext, + fmt::format("Missing attachment '{}' for op '{}'", FilePath, View["id"sv].AsObjectId())); if (IgnoreMissingAttachments) { continue; @@ -497,7 +1369,7 @@ BuildContainer(CidStore& ChunkStore, CB(RewrittenOp); }; - ReportMessage(OptionalContext, "Building exported oplog and collecting attachments"); + remotestore_impl::ReportMessage(OptionalContext, "Building exported oplog and collecting attachments"); Stopwatch Timer; @@ -525,80 +1397,87 @@ BuildContainer(CidStore& ChunkStore, SectionOpsWriter << Op; } OpCount++; - if (IsCancelled(OptionalContext)) + if (remotestore_impl::IsCancelled(OptionalContext)) { RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); - ReportMessage(OptionalContext, - fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); + remotestore_impl::ReportMessage( + OptionalContext, + fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); return; } if (OpCount % 1000 == 0) { - ReportProgress(OptionalContext, - "Building oplog"sv, - fmt::format("{} ops processed", OpCount), - TotalOpCount, - TotalOpCount - OpCount); + remotestore_impl::ReportProgress(OptionalContext, + "Building oplog"sv, + fmt::format("{} ops processed", OpCount), + TotalOpCount, + TotalOpCount - OpCount); } }); if (RemoteResult.IsError()) { return {}; } - if (IsCancelled(OptionalContext)) + if (remotestore_impl::IsCancelled(OptionalContext)) { RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); - ReportMessage(OptionalContext, - fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); + remotestore_impl::ReportMessage( + OptionalContext, + fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); return {}; } if (TotalOpCount > 0) { - ReportProgress(OptionalContext, "Building oplog"sv, fmt::format("{} ops processed", OpCount), TotalOpCount, 0); + remotestore_impl::ReportProgress(OptionalContext, + "Building oplog"sv, + fmt::format("{} ops processed", OpCount), + TotalOpCount, + 0); } } SectionOpsWriter.EndArray(); // "ops" - ReportMessage(OptionalContext, - fmt::format("Rewrote {} ops to new oplog in {}", - OpCount, - NiceTimeSpanMs(static_cast<uint64_t>(RewriteOplogTimer.GetElapsedTimeMs())))); + remotestore_impl::ReportMessage(OptionalContext, + fmt::format("Rewrote {} ops to new oplog in {}", + OpCount, + NiceTimeSpanMs(static_cast<uint64_t>(RewriteOplogTimer.GetElapsedTimeMs())))); { Stopwatch CompressOpsTimer; CompressedOpsSection = CompressedBuffer::Compress(SectionOpsWriter.Save().GetBuffer(), OodleCompressor::Mermaid, OodleCompressionLevel::Fast); - ReportMessage(OptionalContext, - fmt::format("Compressed oplog section {} ({} -> {}) in {}", - CompressedOpsSection.DecodeRawHash(), - NiceBytes(CompressedOpsSection.DecodeRawSize()), - NiceBytes(CompressedOpsSection.GetCompressedSize()), - NiceTimeSpanMs(static_cast<uint64_t>(CompressOpsTimer.GetElapsedTimeMs())))); + remotestore_impl::ReportMessage(OptionalContext, + fmt::format("Compressed oplog section {} ({} -> {}) in {}", + CompressedOpsSection.DecodeRawHash(), + NiceBytes(CompressedOpsSection.DecodeRawSize()), + NiceBytes(CompressedOpsSection.GetCompressedSize()), + NiceTimeSpanMs(static_cast<uint64_t>(CompressOpsTimer.GetElapsedTimeMs())))); } } - if (IsCancelled(OptionalContext)) + if (remotestore_impl::IsCancelled(OptionalContext)) { RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); - ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); + remotestore_impl::ReportMessage(OptionalContext, + fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); return {}; } - auto FindReuseBlocks = [](const std::vector<Block>& KnownBlocks, + auto FindReuseBlocks = [](const std::vector<remotestore_impl::Block>& KnownBlocks, const std::unordered_set<IoHash, IoHash::Hasher>& Attachments, JobContext* OptionalContext) -> std::vector<size_t> { std::vector<size_t> ReuseBlockIndexes; if (!Attachments.empty() && !KnownBlocks.empty()) { - ReportMessage( + remotestore_impl::ReportMessage( OptionalContext, fmt::format("Checking {} Attachments against {} known blocks for reuse", Attachments.size(), KnownBlocks.size())); Stopwatch ReuseTimer; for (size_t KnownBlockIndex = 0; KnownBlockIndex < KnownBlocks.size(); KnownBlockIndex++) { - const Block& KnownBlock = KnownBlocks[KnownBlockIndex]; - size_t BlockAttachmentCount = KnownBlock.ChunksInBlock.size(); + const remotestore_impl::Block& KnownBlock = KnownBlocks[KnownBlockIndex]; + size_t BlockAttachmentCount = KnownBlock.ChunksInBlock.size(); if (BlockAttachmentCount == 0) { continue; @@ -645,7 +1524,7 @@ BuildContainer(CidStore& ChunkStore, std::vector<size_t> ReusedBlockIndexes = FindReuseBlocks(KnownBlocks, FoundHashes, OptionalContext); for (size_t KnownBlockIndex : ReusedBlockIndexes) { - const Block& KnownBlock = KnownBlocks[KnownBlockIndex]; + const remotestore_impl::Block& KnownBlock = KnownBlocks[KnownBlockIndex]; for (const IoHash& KnownHash : KnownBlock.ChunksInBlock) { if (UploadAttachments.erase(KnownHash) == 1) @@ -698,15 +1577,17 @@ BuildContainer(CidStore& ChunkStore, std::unordered_map<IoHash, IoBuffer, IoHash::Hasher> LooseUploadAttachments; std::unordered_set<IoHash, IoHash::Hasher> MissingHashes; - ReportMessage(OptionalContext, fmt::format("Resolving {} attachments from {} ops", UploadAttachments.size(), TotalOpCount)); + remotestore_impl::ReportMessage(OptionalContext, + fmt::format("Resolving {} attachments from {} ops", UploadAttachments.size(), TotalOpCount)); Latch ResolveAttachmentsLatch(1); for (auto& It : UploadAttachments) { - if (IsCancelled(OptionalContext)) + if (remotestore_impl::IsCancelled(OptionalContext)) { RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); - ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); + remotestore_impl::ReportMessage(OptionalContext, + fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); return {}; } @@ -732,7 +1613,7 @@ BuildContainer(CidStore& ChunkStore, &RemoteResult, OptionalContext]() { auto _ = MakeGuard([&ResolveAttachmentsLatch] { ResolveAttachmentsLatch.CountDown(); }); - if (IsCancelled(OptionalContext)) + if (remotestore_impl::IsCancelled(OptionalContext)) { return; } @@ -775,7 +1656,8 @@ BuildContainer(CidStore& ChunkStore, std::filesystem::path AttachmentPath = AttachmentTempPath; AttachmentPath.append(RawHash.ToHexString()); - IoBuffer TempAttachmentBuffer = WriteToTempFile(std::move(Compressed), AttachmentPath); + IoBuffer TempAttachmentBuffer = + remotestore_impl::WriteToTempFile(std::move(Compressed), AttachmentPath); ZEN_INFO("Saved temp attachment to '{}', {} ({})", AttachmentPath, NiceBytes(RawSize), @@ -794,7 +1676,7 @@ BuildContainer(CidStore& ChunkStore, std::filesystem::path AttachmentPath = AttachmentTempPath; AttachmentPath.append(RawHash.ToHexString()); - IoBuffer TempAttachmentBuffer = WriteToTempFile(std::move(Compressed), AttachmentPath); + IoBuffer TempAttachmentBuffer = remotestore_impl::WriteToTempFile(std::move(Compressed), AttachmentPath); ZEN_INFO("Saved temp attachment to '{}', {} ({})", AttachmentPath, NiceBytes(RawSize), @@ -910,37 +1792,39 @@ BuildContainer(CidStore& ChunkStore, while (!ResolveAttachmentsLatch.Wait(1000)) { ptrdiff_t Remaining = ResolveAttachmentsLatch.Remaining(); - if (IsCancelled(OptionalContext)) + if (remotestore_impl::IsCancelled(OptionalContext)) { RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); - ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); + remotestore_impl::ReportMessage(OptionalContext, + fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); while (!ResolveAttachmentsLatch.Wait(1000)) { Remaining = ResolveAttachmentsLatch.Remaining(); - ReportProgress(OptionalContext, - "Resolving attachments"sv, - fmt::format("Aborting, {} attachments remaining...", Remaining), - UploadAttachments.size(), - Remaining); + remotestore_impl::ReportProgress(OptionalContext, + "Resolving attachments"sv, + fmt::format("Aborting, {} attachments remaining...", Remaining), + UploadAttachments.size(), + Remaining); } - ReportProgress(OptionalContext, "Resolving attachments"sv, "Aborted"sv, UploadAttachments.size(), 0); + remotestore_impl::ReportProgress(OptionalContext, "Resolving attachments"sv, "Aborted"sv, UploadAttachments.size(), 0); return {}; } - ReportProgress(OptionalContext, - "Resolving attachments"sv, - fmt::format("{} remaining...", Remaining), - UploadAttachments.size(), - Remaining); + remotestore_impl::ReportProgress(OptionalContext, + "Resolving attachments"sv, + fmt::format("{} remaining...", Remaining), + UploadAttachments.size(), + Remaining); } if (UploadAttachments.size() > 0) { - ReportProgress(OptionalContext, "Resolving attachments"sv, ""sv, UploadAttachments.size(), 0); + remotestore_impl::ReportProgress(OptionalContext, "Resolving attachments"sv, ""sv, UploadAttachments.size(), 0); } - if (IsCancelled(OptionalContext)) + if (remotestore_impl::IsCancelled(OptionalContext)) { RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); - ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); + remotestore_impl::ReportMessage(OptionalContext, + fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); return {}; } @@ -953,7 +1837,8 @@ BuildContainer(CidStore& ChunkStore, if (IgnoreMissingAttachments) { - ReportMessage(OptionalContext, fmt::format("Missing attachment '{}' for op '{}'", AttachmentHash, It->second.Key)); + remotestore_impl::ReportMessage(OptionalContext, + fmt::format("Missing attachment '{}' for op '{}'", AttachmentHash, It->second.Key)); } else { @@ -980,7 +1865,7 @@ BuildContainer(CidStore& ChunkStore, std::vector<size_t> ReusedBlockFromChunking = FindReuseBlocks(KnownBlocks, ChunkedHashes, OptionalContext); for (size_t KnownBlockIndex : ReusedBlockIndexes) { - const Block& KnownBlock = KnownBlocks[KnownBlockIndex]; + const remotestore_impl::Block& KnownBlock = KnownBlocks[KnownBlockIndex]; for (const IoHash& KnownHash : KnownBlock.ChunksInBlock) { if (ChunkedHashes.erase(KnownHash) == 1) @@ -1001,7 +1886,8 @@ BuildContainer(CidStore& ChunkStore, { Blocks.push_back(KnownBlocks[*It]); } - ReportMessage(OptionalContext, fmt::format("Reused {} attachments from {} blocks", ReusedAttachmentCount, ReuseBlockCount)); + remotestore_impl::ReportMessage(OptionalContext, + fmt::format("Reused {} attachments from {} blocks", ReusedAttachmentCount, ReuseBlockCount)); } std::vector<std::pair<IoHash, Oid>> SortedUploadAttachments; @@ -1011,14 +1897,16 @@ BuildContainer(CidStore& ChunkStore, SortedUploadAttachments.push_back(std::make_pair(It.first, It.second.Key)); } - if (IsCancelled(OptionalContext)) + if (remotestore_impl::IsCancelled(OptionalContext)) { RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); - ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); + remotestore_impl::ReportMessage(OptionalContext, + fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); return {}; } - ReportMessage(OptionalContext, fmt::format("Sorting {} attachments from {} ops", SortedUploadAttachments.size(), TotalOpCount)); + remotestore_impl::ReportMessage(OptionalContext, + fmt::format("Sorting {} attachments from {} ops", SortedUploadAttachments.size(), TotalOpCount)); // Sort attachments so we get predictable blocks for the same oplog upload std::sort(SortedUploadAttachments.begin(), @@ -1046,22 +1934,25 @@ BuildContainer(CidStore& ChunkStore, // SortedUploadAttachments now contains all whole chunks with size to be composed into blocks and uploaded // ChunkedHashes contains all chunked up chunks to be composed into blocks - if (IsCancelled(OptionalContext)) + if (remotestore_impl::IsCancelled(OptionalContext)) { RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); - ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); + remotestore_impl::ReportMessage(OptionalContext, + fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); return {}; } - ReportMessage(OptionalContext, - fmt::format("Assembling {} attachments and {} chunked parts from {} ops into blocks and loose attachments", - SortedUploadAttachments.size(), - ChunkedHashes.size(), - TotalOpCount)); + remotestore_impl::ReportMessage( + OptionalContext, + fmt::format("Assembling {} attachments and {} chunked parts from {} ops into blocks and loose attachments", + SortedUploadAttachments.size(), + ChunkedHashes.size(), + TotalOpCount)); - if (IsCancelled(OptionalContext)) + if (remotestore_impl::IsCancelled(OptionalContext)) { RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); - ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); + remotestore_impl::ReportMessage(OptionalContext, + fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); return {}; } @@ -1070,7 +1961,8 @@ BuildContainer(CidStore& ChunkStore, size_t ChunkAssembleCount = SortedUploadAttachments.size() + ChunkedHashes.size(); size_t ChunksAssembled = 0; - ReportMessage(OptionalContext, fmt::format("Assembling {} attachments from {} ops into blocks", ChunkAssembleCount, TotalOpCount)); + remotestore_impl::ReportMessage(OptionalContext, + fmt::format("Assembling {} attachments from {} ops into blocks", ChunkAssembleCount, TotalOpCount)); Latch BlockCreateLatch(1); size_t GeneratedBlockCount = 0; @@ -1090,14 +1982,14 @@ BuildContainer(CidStore& ChunkStore, size_t ChunkCount = ChunksInBlock.size(); if (BuildBlocks) { - CreateBlock(WorkerPool, - BlockCreateLatch, - std::move(ChunksInBlock), - BlocksLock, - Blocks, - BlockIndex, - AsyncOnBlock, - RemoteResult); + remotestore_impl::CreateBlock(WorkerPool, + BlockCreateLatch, + std::move(ChunksInBlock), + BlocksLock, + Blocks, + BlockIndex, + AsyncOnBlock, + RemoteResult); ComposedBlocks++; } else @@ -1127,20 +2019,22 @@ BuildContainer(CidStore& ChunkStore, for (auto HashIt = SortedUploadAttachments.begin(); HashIt != SortedUploadAttachments.end(); HashIt++) { - if (IsCancelled(OptionalContext)) + if (remotestore_impl::IsCancelled(OptionalContext)) { RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); - ReportMessage(OptionalContext, - fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); + remotestore_impl::ReportMessage( + OptionalContext, + fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); break; } if (ChunksAssembled % 1000 == 0) { - ReportProgress(OptionalContext, - "Assembling blocks"sv, - fmt::format("{} attachments processed, {} blocks assembled", ChunksAssembled, ComposedBlocks), - ChunkAssembleCount, - ChunkAssembleCount - ChunksAssembled); + remotestore_impl::ReportProgress( + OptionalContext, + "Assembling blocks"sv, + fmt::format("{} attachments processed, {} blocks assembled", ChunksAssembled, ComposedBlocks), + ChunkAssembleCount, + ChunkAssembleCount - ChunksAssembled); } const IoHash& RawHash(HashIt->first); const Oid CurrentOpKey = HashIt->second; @@ -1190,20 +2084,22 @@ BuildContainer(CidStore& ChunkStore, size_t ChunkCount = Chunked.Info.ChunkHashes.size(); for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ChunkIndex++) { - if (IsCancelled(OptionalContext)) + if (remotestore_impl::IsCancelled(OptionalContext)) { RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); - ReportMessage(OptionalContext, - fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); + remotestore_impl::ReportMessage( + OptionalContext, + fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); break; } if (ChunksAssembled % 1000 == 0) { - ReportProgress(OptionalContext, - "Assembling blocks"sv, - fmt::format("{} attachments processed, {} blocks assembled", ChunksAssembled, ComposedBlocks), - ChunkAssembleCount, - ChunkAssembleCount - ChunksAssembled); + remotestore_impl::ReportProgress( + OptionalContext, + "Assembling blocks"sv, + fmt::format("{} attachments processed, {} blocks assembled", ChunksAssembled, ComposedBlocks), + ChunkAssembleCount, + ChunkAssembleCount - ChunksAssembled); } const IoHash& ChunkHash = ChunkedFile.Chunked.Info.ChunkHashes[ChunkIndex]; if (auto FindIt = ChunkedHashes.find(ChunkHash); FindIt != ChunkedHashes.end()) @@ -1237,7 +2133,7 @@ BuildContainer(CidStore& ChunkStore, if (BlockSize > 0 && !RemoteResult.IsError()) { - if (!IsCancelled(OptionalContext)) + if (!remotestore_impl::IsCancelled(OptionalContext)) { NewBlock(); } @@ -1245,41 +2141,43 @@ BuildContainer(CidStore& ChunkStore, if (ChunkAssembleCount > 0) { - ReportProgress(OptionalContext, - "Assembling blocks"sv, - fmt::format("{} attachments processed, {} blocks assembled", ChunksAssembled, ComposedBlocks), - ChunkAssembleCount, - 0); + remotestore_impl::ReportProgress( + OptionalContext, + "Assembling blocks"sv, + fmt::format("{} attachments processed, {} blocks assembled", ChunksAssembled, ComposedBlocks), + ChunkAssembleCount, + 0); } - ReportMessage(OptionalContext, - fmt::format("Built oplog and collected {} attachments from {} ops into {} blocks and in {}", - ChunkAssembleCount, - TotalOpCount, - GeneratedBlockCount, - NiceTimeSpanMs(static_cast<uint64_t>(Timer.GetElapsedTimeMs())))); + remotestore_impl::ReportMessage(OptionalContext, + fmt::format("Built oplog and collected {} attachments from {} ops into {} blocks and in {}", + ChunkAssembleCount, + TotalOpCount, + GeneratedBlockCount, + NiceTimeSpanMs(static_cast<uint64_t>(Timer.GetElapsedTimeMs())))); - if (IsCancelled(OptionalContext)) + if (remotestore_impl::IsCancelled(OptionalContext)) { RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); - ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); + remotestore_impl::ReportMessage(OptionalContext, + fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); BlockCreateLatch.CountDown(); while (!BlockCreateLatch.Wait(1000)) { ptrdiff_t Remaining = BlockCreateLatch.Remaining(); - ReportProgress(OptionalContext, - "Assembling blocks"sv, - fmt::format("Aborting, {} blocks remaining...", Remaining), - GeneratedBlockCount, - Remaining); + remotestore_impl::ReportProgress(OptionalContext, + "Assembling blocks"sv, + fmt::format("Aborting, {} blocks remaining...", Remaining), + GeneratedBlockCount, + Remaining); } if (GeneratedBlockCount > 0) { - ReportProgress(OptionalContext, - "Assembling blocks"sv, - fmt::format("Aborting, {} blocks remaining...", 0), - GeneratedBlockCount, - 0); + remotestore_impl::ReportProgress(OptionalContext, + "Assembling blocks"sv, + fmt::format("Aborting, {} blocks remaining...", 0), + GeneratedBlockCount, + 0); } return {}; } @@ -1298,31 +2196,37 @@ BuildContainer(CidStore& ChunkStore, while (!BlockCreateLatch.Wait(1000)) { ptrdiff_t Remaining = BlockCreateLatch.Remaining(); - if (IsCancelled(OptionalContext)) + if (remotestore_impl::IsCancelled(OptionalContext)) { RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); - ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); + remotestore_impl::ReportMessage(OptionalContext, + fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); while (!BlockCreateLatch.Wait(1000)) { Remaining = BlockCreateLatch.Remaining(); - ReportProgress(OptionalContext, - "Creating blocks"sv, - fmt::format("Aborting, {} blocks remaining...", Remaining), - GeneratedBlockCount, - Remaining); + remotestore_impl::ReportProgress(OptionalContext, + "Creating blocks"sv, + fmt::format("Aborting, {} blocks remaining...", Remaining), + GeneratedBlockCount, + Remaining); } - ReportProgress(OptionalContext, "Creating blocks"sv, "Aborted"sv, GeneratedBlockCount, 0); + remotestore_impl::ReportProgress(OptionalContext, "Creating blocks"sv, "Aborted"sv, GeneratedBlockCount, 0); return {}; } - ReportProgress(OptionalContext, "Creating blocks"sv, fmt::format("{} remaining...", Remaining), GeneratedBlockCount, Remaining); + remotestore_impl::ReportProgress(OptionalContext, + "Creating blocks"sv, + fmt::format("{} remaining...", Remaining), + GeneratedBlockCount, + Remaining); } if (GeneratedBlockCount > 0) { uint64_t NowMS = Timer.GetElapsedTimeMs(); - ReportProgress(OptionalContext, "Creating blocks"sv, ""sv, GeneratedBlockCount, 0); - ReportMessage(OptionalContext, - fmt::format("Created {} blocks in {}", GeneratedBlockCount, NiceTimeSpanMs(NowMS - CreateBlocksStartMS))); + remotestore_impl::ReportProgress(OptionalContext, "Creating blocks"sv, ""sv, GeneratedBlockCount, 0); + remotestore_impl::ReportMessage( + OptionalContext, + fmt::format("Created {} blocks in {}", GeneratedBlockCount, NiceTimeSpanMs(NowMS - CreateBlocksStartMS))); } if (!RemoteResult.IsError()) @@ -1333,7 +2237,7 @@ BuildContainer(CidStore& ChunkStore, OplogContinerWriter.BeginArray("blocks"sv); { - for (const Block& B : Blocks) + for (const remotestore_impl::Block& B : Blocks) { ZEN_ASSERT(!B.ChunksInBlock.empty()); if (BuildBlocks) @@ -1434,366 +2338,27 @@ BuildContainer(CidStore& ChunkStore, { WorkerThreadPool& WorkerPool = GetLargeWorkerPool(EWorkloadType::Background); - AsyncRemoteResult RemoteResult; - CbObject ContainerObject = BuildContainer(ChunkStore, - Project, - Oplog, - MaxBlockSize, - MaxChunkEmbedSize, - ChunkFileSizeLimit, - BuildBlocks, - IgnoreMissingAttachments, - AllowChunking, - {}, - WorkerPool, - AsyncOnBlock, - OnLargeAttachment, - OnBlockChunks, - EmbedLooseFiles, - nullptr, - RemoteResult); + remotestore_impl::AsyncRemoteResult RemoteResult; + CbObject ContainerObject = BuildContainer(ChunkStore, + Project, + Oplog, + MaxBlockSize, + MaxChunkEmbedSize, + ChunkFileSizeLimit, + BuildBlocks, + IgnoreMissingAttachments, + AllowChunking, + {}, + WorkerPool, + AsyncOnBlock, + OnLargeAttachment, + OnBlockChunks, + EmbedLooseFiles, + nullptr, + RemoteResult); return RemoteProjectStore::LoadContainerResult{RemoteResult.ConvertResult(), ContainerObject}; } -struct UploadInfo -{ - uint64_t OplogSizeBytes = 0; - std::atomic<uint64_t> AttachmentsUploaded = 0; - std::atomic<uint64_t> AttachmentBlocksUploaded = 0; - std::atomic<uint64_t> AttachmentBytesUploaded = 0; - std::atomic<uint64_t> AttachmentBlockBytesUploaded = 0; -}; - -void -UploadAttachments(WorkerThreadPool& WorkerPool, - CidStore& ChunkStore, - RemoteProjectStore& RemoteStore, - const std::unordered_set<IoHash, IoHash::Hasher>& LargeAttachments, - const std::vector<std::vector<std::pair<IoHash, FetchChunkFunc>>>& BlockChunks, - const std::unordered_map<IoHash, IoBuffer, IoHash::Hasher>& CreatedBlocks, - const tsl::robin_map<IoHash, TGetAttachmentBufferFunc, IoHash::Hasher>& LooseFileAttachments, - const std::unordered_set<IoHash, IoHash::Hasher>& Needs, - bool ForceAll, - UploadInfo& Info, - AsyncRemoteResult& RemoteResult, - JobContext* OptionalContext) -{ - using namespace std::literals; - - if (Needs.empty() && !ForceAll) - { - return; - } - - ReportMessage(OptionalContext, "Filtering needed attachments for upload..."); - - std::unordered_set<IoHash, IoHash::Hasher> AttachmentsToUpload; - std::unordered_map<IoHash, FetchChunkFunc, IoHash::Hasher> BulkBlockAttachmentsToUpload; - - size_t BlockAttachmentCountToUpload = 0; - size_t LargeAttachmentCountToUpload = 0; - size_t BulkAttachmentCountToUpload = 0; - AttachmentsToUpload.reserve(ForceAll ? CreatedBlocks.size() + LargeAttachments.size() : Needs.size()); - - std::unordered_set<IoHash, IoHash::Hasher> UnknownAttachments(Needs); - - for (const auto& CreatedBlock : CreatedBlocks) - { - if (ForceAll || Needs.contains(CreatedBlock.first)) - { - AttachmentsToUpload.insert(CreatedBlock.first); - BlockAttachmentCountToUpload++; - UnknownAttachments.erase(CreatedBlock.first); - } - } - for (const IoHash& LargeAttachment : LargeAttachments) - { - if (ForceAll || Needs.contains(LargeAttachment)) - { - AttachmentsToUpload.insert(LargeAttachment); - LargeAttachmentCountToUpload++; - UnknownAttachments.erase(LargeAttachment); - } - } - for (const std::vector<std::pair<IoHash, FetchChunkFunc>>& BlockHashes : BlockChunks) - { - for (const std::pair<IoHash, FetchChunkFunc>& Chunk : BlockHashes) - { - if (ForceAll || Needs.contains(Chunk.first)) - { - BulkBlockAttachmentsToUpload.insert(std::make_pair(Chunk.first, Chunk.second)); - BulkAttachmentCountToUpload++; - UnknownAttachments.erase(Chunk.first); - } - } - } - - if (AttachmentsToUpload.empty() && BulkBlockAttachmentsToUpload.empty()) - { - ReportMessage(OptionalContext, "No attachments needed"); - return; - } - - if (!UnknownAttachments.empty()) - { - RemoteResult.SetError( - gsl::narrow<int>(HttpResponseCode::NotFound), - fmt::format("Upload requested of {} missing attachments, the base container referenced blocks that are no longer available", - UnknownAttachments.size()), - ""); - ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); - return; - } - - if (IsCancelled(OptionalContext)) - { - if (!RemoteResult.IsError()) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); - ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); - } - return; - } - - ReportMessage(OptionalContext, - fmt::format("Saving {} attachments ({} blocks, {} attachments, {} bulk attachments)", - AttachmentsToUpload.size() + BulkBlockAttachmentsToUpload.size(), - BlockAttachmentCountToUpload, - LargeAttachmentCountToUpload, - BulkAttachmentCountToUpload)); - - Stopwatch Timer; - - ptrdiff_t AttachmentsToSave(0); - Latch SaveAttachmentsLatch(1); - - for (const IoHash& RawHash : AttachmentsToUpload) - { - if (RemoteResult.IsError()) - { - break; - } - - SaveAttachmentsLatch.AddCount(1); - AttachmentsToSave++; - WorkerPool.ScheduleWork([&ChunkStore, - &RemoteStore, - &SaveAttachmentsLatch, - &RemoteResult, - RawHash, - &CreatedBlocks, - &LooseFileAttachments, - &Info, - OptionalContext]() { - auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); }); - if (RemoteResult.IsError()) - { - return; - } - try - { - bool IsBlock = false; - IoBuffer Payload; - if (auto BlockIt = CreatedBlocks.find(RawHash); BlockIt != CreatedBlocks.end()) - { - Payload = BlockIt->second; - IsBlock = true; - } - else if (auto LooseTmpFileIt = LooseFileAttachments.find(RawHash); LooseTmpFileIt != LooseFileAttachments.end()) - { - Payload = LooseTmpFileIt->second(RawHash); - } - else - { - Payload = ChunkStore.FindChunkByCid(RawHash); - } - if (!Payload) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), - fmt::format("Failed to find attachment {}", RawHash), - {}); - ZEN_WARN("Failed to save attachment '{}' ({}): {}", RawHash, RemoteResult.GetError(), RemoteResult.GetErrorReason()); - return; - } - size_t PayloadSize = Payload.GetSize(); - RemoteProjectStore::SaveAttachmentResult Result = - RemoteStore.SaveAttachment(CompositeBuffer(SharedBuffer(std::move(Payload))), RawHash); - if (Result.ErrorCode) - { - RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); - ReportMessage(OptionalContext, - fmt::format("Failed to save attachment '{}', {} ({}): {}", - RawHash, - NiceBytes(PayloadSize), - RemoteResult.GetError(), - RemoteResult.GetErrorReason())); - return; - } - if (IsBlock) - { - Info.AttachmentBlocksUploaded.fetch_add(1); - Info.AttachmentBlockBytesUploaded.fetch_add(PayloadSize); - ZEN_INFO("Saved block attachment '{}' in {} ({})", - RawHash, - NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)), - NiceBytes(PayloadSize)); - } - else - { - Info.AttachmentsUploaded.fetch_add(1); - Info.AttachmentBytesUploaded.fetch_add(PayloadSize); - ZEN_INFO("Saved large attachment '{}' in {} ({})", - RawHash, - NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)), - NiceBytes(PayloadSize)); - } - } - catch (const std::exception& Ex) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), - fmt::format("To upload attachment {}", RawHash), - Ex.what()); - } - }); - } - - if (IsCancelled(OptionalContext)) - { - if (!RemoteResult.IsError()) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); - ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); - } - return; - } - - if (!BulkBlockAttachmentsToUpload.empty()) - { - for (const std::vector<std::pair<IoHash, FetchChunkFunc>>& Chunks : BlockChunks) - { - if (RemoteResult.IsError()) - { - break; - } - - std::vector<IoHash> NeededChunks; - NeededChunks.reserve(Chunks.size()); - for (const std::pair<IoHash, FetchChunkFunc>& Chunk : Chunks) - { - const IoHash& ChunkHash = Chunk.first; - if (BulkBlockAttachmentsToUpload.contains(ChunkHash) && !AttachmentsToUpload.contains(ChunkHash)) - { - NeededChunks.push_back(Chunk.first); - } - } - if (NeededChunks.empty()) - { - continue; - } - - SaveAttachmentsLatch.AddCount(1); - AttachmentsToSave++; - WorkerPool.ScheduleWork([&RemoteStore, - &ChunkStore, - &SaveAttachmentsLatch, - &RemoteResult, - NeededChunks = std::move(NeededChunks), - &BulkBlockAttachmentsToUpload, - &Info, - OptionalContext]() { - auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); }); - if (RemoteResult.IsError()) - { - return; - } - try - { - size_t ChunksSize = 0; - std::vector<SharedBuffer> ChunkBuffers; - ChunkBuffers.reserve(NeededChunks.size()); - for (const IoHash& Chunk : NeededChunks) - { - auto It = BulkBlockAttachmentsToUpload.find(Chunk); - ZEN_ASSERT(It != BulkBlockAttachmentsToUpload.end()); - CompositeBuffer ChunkPayload = It->second(It->first); - if (!ChunkPayload) - { - RemoteResult.SetError(static_cast<int32_t>(HttpResponseCode::NotFound), - fmt::format("Missing chunk {}"sv, Chunk), - fmt::format("Unable to fetch attachment {} required by the oplog"sv, Chunk)); - ChunkBuffers.clear(); - break; - } - ChunksSize += ChunkPayload.GetSize(); - ChunkBuffers.emplace_back(SharedBuffer(std::move(ChunkPayload).Flatten().AsIoBuffer())); - } - RemoteProjectStore::SaveAttachmentsResult Result = RemoteStore.SaveAttachments(ChunkBuffers); - if (Result.ErrorCode) - { - RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); - ReportMessage(OptionalContext, - fmt::format("Failed to save attachments with {} chunks ({}): {}", - NeededChunks.size(), - RemoteResult.GetError(), - RemoteResult.GetErrorReason())); - return; - } - Info.AttachmentsUploaded.fetch_add(ChunkBuffers.size()); - Info.AttachmentBytesUploaded.fetch_add(ChunksSize); - - ZEN_INFO("Saved {} bulk attachments in {} ({})", - NeededChunks.size(), - NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)), - NiceBytes(ChunksSize)); - } - catch (const std::exception& Ex) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), - fmt::format("Failed to buck upload {} attachments", NeededChunks.size()), - Ex.what()); - } - }); - } - } - - SaveAttachmentsLatch.CountDown(); - while (!SaveAttachmentsLatch.Wait(1000)) - { - ptrdiff_t Remaining = SaveAttachmentsLatch.Remaining(); - if (IsCancelled(OptionalContext)) - { - if (!RemoteResult.IsError()) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); - ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); - } - } - uint64_t PartialTransferWallTimeMS = Timer.GetElapsedTimeMs(); - ReportProgress(OptionalContext, - "Saving attachments"sv, - fmt::format("{} remaining... {}", Remaining, GetStats(RemoteStore.GetStats(), PartialTransferWallTimeMS)), - AttachmentsToSave, - Remaining); - } - uint64_t ElapsedTimeMS = Timer.GetElapsedTimeMs(); - if (AttachmentsToSave > 0) - { - ReportProgress(OptionalContext, - "Saving attachments"sv, - fmt::format("{}", GetStats(RemoteStore.GetStats(), ElapsedTimeMS)), - AttachmentsToSave, - 0); - } - ReportMessage(OptionalContext, - fmt::format("Saved {} attachments ({} blocks, {} attachments, {} bulk attachments) in {} {}", - AttachmentsToUpload.size() + BulkBlockAttachmentsToUpload.size(), - BlockAttachmentCountToUpload, - LargeAttachmentCountToUpload, - BulkAttachmentCountToUpload, - NiceTimeSpanMs(ElapsedTimeMS), - GetStats(RemoteStore.GetStats(), ElapsedTimeMS))); -} - RemoteProjectStore::Result SaveOplog(CidStore& ChunkStore, RemoteProjectStore& RemoteStore, @@ -1811,7 +2376,7 @@ SaveOplog(CidStore& ChunkStore, Stopwatch Timer; - UploadInfo Info; + remotestore_impl::UploadInfo Info; WorkerThreadPool& WorkerPool = GetLargeWorkerPool(EWorkloadType::Background); WorkerThreadPool& NetworkWorkerPool = GetMediumWorkerPool(EWorkloadType::Background); @@ -1826,7 +2391,7 @@ SaveOplog(CidStore& ChunkStore, CreateDirectories(AttachmentTempPath); } - AsyncRemoteResult RemoteResult; + remotestore_impl::AsyncRemoteResult RemoteResult; RwLock AttachmentsLock; std::unordered_set<IoHash, IoHash::Hasher> LargeAttachments; std::unordered_map<IoHash, IoBuffer, IoHash::Hasher> CreatedBlocks; @@ -1838,7 +2403,7 @@ SaveOplog(CidStore& ChunkStore, BlockPath.append(BlockHash.ToHexString()); try { - IoBuffer BlockBuffer = WriteToTempFile(std::move(CompressedBlock), BlockPath); + IoBuffer BlockBuffer = remotestore_impl::WriteToTempFile(std::move(CompressedBlock), BlockPath); RwLock::ExclusiveLockScope __(AttachmentsLock); CreatedBlocks.insert({BlockHash, std::move(BlockBuffer)}); ZEN_DEBUG("Saved temp block to '{}', {}", AttachmentTempPath, NiceBytes(BlockBuffer.GetSize())); @@ -1857,8 +2422,9 @@ SaveOplog(CidStore& ChunkStore, if (Result.ErrorCode) { RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); - ReportMessage(OptionalContext, - fmt::format("Failed to save attachment ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); + remotestore_impl::ReportMessage( + OptionalContext, + fmt::format("Failed to save attachment ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); return; } Info.AttachmentBlocksUploaded.fetch_add(1); @@ -1892,13 +2458,14 @@ SaveOplog(CidStore& ChunkStore, OnBlock = UploadBlock; } - std::vector<Block> KnownBlocks; + std::vector<remotestore_impl::Block> KnownBlocks; uint64_t TransferWallTimeMS = 0; if (RemoteStoreInfo.CreateBlocks && !RemoteStoreInfo.BaseContainerName.empty()) { - ReportMessage(OptionalContext, fmt::format("Loading oplog base container '{}'", RemoteStoreInfo.BaseContainerName)); + remotestore_impl::ReportMessage(OptionalContext, + fmt::format("Loading oplog base container '{}'", RemoteStoreInfo.BaseContainerName)); Stopwatch LoadBaseContainerTimer; RemoteProjectStore::LoadContainerResult BaseContainerResult = RemoteStore.LoadBaseContainer(); TransferWallTimeMS += LoadBaseContainerTimer.GetElapsedTimeMs(); @@ -1907,17 +2474,18 @@ SaveOplog(CidStore& ChunkStore, { if (BaseContainerResult.ErrorCode) { - ReportMessage(OptionalContext, - fmt::format("Failed to load oplog base container '{}' ({}): {}, uploading all attachments", - RemoteStoreInfo.BaseContainerName, - BaseContainerResult.ErrorCode, - BaseContainerResult.Reason)); + remotestore_impl::ReportMessage(OptionalContext, + fmt::format("Failed to load oplog base container '{}' ({}): {}, uploading all attachments", + RemoteStoreInfo.BaseContainerName, + BaseContainerResult.ErrorCode, + BaseContainerResult.Reason)); } else { - ReportMessage(OptionalContext, - fmt::format("Loaded oplog base container in {}", - NiceTimeSpanMs(static_cast<uint64_t>(BaseContainerResult.ElapsedSeconds * 1000.0)))); + remotestore_impl::ReportMessage( + OptionalContext, + fmt::format("Loaded oplog base container in {}", + NiceTimeSpanMs(static_cast<uint64_t>(BaseContainerResult.ElapsedSeconds * 1000.0)))); CbArrayView BlocksArray = BaseContainerResult.ContainerObject["blocks"sv].AsArrayView(); @@ -1933,12 +2501,13 @@ SaveOplog(CidStore& ChunkStore, RemoteProjectStore::HasAttachmentsResult HasResult = RemoteStore.HasAttachments(BlockHashes); if (HasResult.ErrorCode == 0) { - ReportMessage(OptionalContext, - fmt::format("Checked the existance of {} block{} in remote store, found {} existing blocks in {}", - BlockHashes.size(), - BlockHashes.size() > 1 ? "s"sv : ""sv, - BlockHashes.size() - HasResult.Needs.size(), - NiceTimeSpanMs(static_cast<uint64_t>(HasResult.ElapsedSeconds * 1000.0)))); + remotestore_impl::ReportMessage( + OptionalContext, + fmt::format("Checked the existance of {} block{} in remote store, found {} existing blocks in {}", + BlockHashes.size(), + BlockHashes.size() > 1 ? "s"sv : ""sv, + BlockHashes.size() - HasResult.Needs.size(), + NiceTimeSpanMs(static_cast<uint64_t>(HasResult.ElapsedSeconds * 1000.0)))); if (HasResult.Needs.size() < BlocksArray.Num()) { KnownBlocks.reserve(BlocksArray.Num() - HasResult.Needs.size()); @@ -1970,11 +2539,12 @@ SaveOplog(CidStore& ChunkStore, } else { - ReportMessage(OptionalContext, - fmt::format("Unable to determine which blocks in base container exist in remote store, assuming none " - "does: '{}', error code : {}", - HasResult.Reason, - HasResult.ErrorCode)); + remotestore_impl::ReportMessage( + OptionalContext, + fmt::format("Unable to determine which blocks in base container exist in remote store, assuming none " + "does: '{}', error code : {}", + HasResult.Reason, + HasResult.ErrorCode)); } } } @@ -2001,37 +2571,40 @@ SaveOplog(CidStore& ChunkStore, { Info.OplogSizeBytes = OplogContainerObject.GetSize(); - if (IsCancelled(OptionalContext)) + if (remotestore_impl::IsCancelled(OptionalContext)) { RemoteProjectStore::Result Result = {.ErrorCode = 0, .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0, .Text = "Operation cancelled"}; - ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); + remotestore_impl::ReportMessage(OptionalContext, + fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); return Result; } uint64_t ChunkCount = OplogContainerObject["chunks"sv].AsArrayView().Num(); uint64_t BlockCount = OplogContainerObject["blocks"sv].AsArrayView().Num(); - ReportMessage(OptionalContext, - fmt::format("Saving oplog container '{}' with {} attachments and {} blocks...", - RemoteStoreInfo.ContainerName, - ChunkCount, - BlockCount)); + remotestore_impl::ReportMessage(OptionalContext, + fmt::format("Saving oplog container '{}' with {} attachments and {} blocks...", + RemoteStoreInfo.ContainerName, + ChunkCount, + BlockCount)); Stopwatch SaveContainerTimer; RemoteProjectStore::SaveResult ContainerSaveResult = RemoteStore.SaveContainer(OplogContainerObject.GetBuffer().AsIoBuffer()); TransferWallTimeMS += SaveContainerTimer.GetElapsedTimeMs(); if (ContainerSaveResult.ErrorCode) { RemoteResult.SetError(ContainerSaveResult.ErrorCode, ContainerSaveResult.Reason, "Failed to save oplog container"); - ReportMessage(OptionalContext, - fmt::format("Failed to save oplog container ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); + remotestore_impl::ReportMessage( + OptionalContext, + fmt::format("Failed to save oplog container ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); } else { - ReportMessage(OptionalContext, - fmt::format("Saved container '{}' in {}", - RemoteStoreInfo.ContainerName, - NiceTimeSpanMs(static_cast<uint64_t>(ContainerSaveResult.ElapsedSeconds * 1000.0)))); + remotestore_impl::ReportMessage( + OptionalContext, + fmt::format("Saved container '{}' in {}", + RemoteStoreInfo.ContainerName, + NiceTimeSpanMs(static_cast<uint64_t>(ContainerSaveResult.ElapsedSeconds * 1000.0)))); } { @@ -2054,39 +2627,40 @@ SaveOplog(CidStore& ChunkStore, uint32_t Try = 0; while (!RemoteResult.IsError()) { - if (IsCancelled(OptionalContext)) + if (remotestore_impl::IsCancelled(OptionalContext)) { RemoteProjectStore::Result Result = {.ErrorCode = 0, .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0, .Text = "Operation cancelled"}; - ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", Result.ErrorCode, Result.Text)); + remotestore_impl::ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", Result.ErrorCode, Result.Text)); return Result; } - ReportMessage(OptionalContext, "Finalizing oplog container..."); + remotestore_impl::ReportMessage(OptionalContext, "Finalizing oplog container..."); RemoteProjectStore::FinalizeResult ContainerFinalizeResult = RemoteStore.FinalizeContainer(ContainerSaveResult.RawHash); if (ContainerFinalizeResult.ErrorCode) { RemoteResult.SetError(ContainerFinalizeResult.ErrorCode, ContainerFinalizeResult.Reason, ContainerFinalizeResult.Text); - ReportMessage(OptionalContext, - fmt::format("Failed to finalize oplog container {} ({}): {}", - ContainerSaveResult.RawHash, - RemoteResult.GetError(), - RemoteResult.GetErrorReason())); + remotestore_impl::ReportMessage(OptionalContext, + fmt::format("Failed to finalize oplog container {} ({}): {}", + ContainerSaveResult.RawHash, + RemoteResult.GetError(), + RemoteResult.GetErrorReason())); RemoteProjectStore::Result Result = RemoteResult.ConvertResult(); return Result; } - ReportMessage(OptionalContext, - fmt::format("Finalized container '{}' in {}", - RemoteStoreInfo.ContainerName, - NiceTimeSpanMs(static_cast<uint64_t>(ContainerFinalizeResult.ElapsedSeconds * 1000.0)))); + remotestore_impl::ReportMessage( + OptionalContext, + fmt::format("Finalized container '{}' in {}", + RemoteStoreInfo.ContainerName, + NiceTimeSpanMs(static_cast<uint64_t>(ContainerFinalizeResult.ElapsedSeconds * 1000.0)))); if (ContainerFinalizeResult.Needs.empty()) { break; } - if (IsCancelled(OptionalContext)) + if (remotestore_impl::IsCancelled(OptionalContext)) { RemoteProjectStore::Result Result = {.ErrorCode = 0, .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0, @@ -2099,7 +2673,7 @@ SaveOplog(CidStore& ChunkStore, { Try++; - ReportMessage( + remotestore_impl::ReportMessage( OptionalContext, fmt::format("Finalize of container '{}' reported {} missing attachments. Uploading missing attachements. Try {}", RemoteStoreInfo.ContainerName, @@ -2129,11 +2703,11 @@ SaveOplog(CidStore& ChunkStore, fmt::format("Giving up finalize oplog container {} after {} retries, still getting reports of missing attachments", ContainerSaveResult.RawHash, ContainerFinalizeResult.Needs.size())); - ReportMessage(OptionalContext, - fmt::format("Failed to finalize oplog container container {} ({}): {}", - ContainerSaveResult.RawHash, - RemoteResult.GetError(), - RemoteResult.GetErrorReason())); + remotestore_impl::ReportMessage(OptionalContext, + fmt::format("Failed to finalize oplog container container {} ({}): {}", + ContainerSaveResult.RawHash, + RemoteResult.GetError(), + RemoteResult.GetErrorReason())); break; } } @@ -2144,19 +2718,19 @@ SaveOplog(CidStore& ChunkStore, RemoteProjectStore::Result Result = RemoteResult.ConvertResult(); Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; - LogRemoteStoreStatsDetails(RemoteStore.GetStats()); + remotestore_impl::LogRemoteStoreStatsDetails(RemoteStore.GetStats()); - ReportMessage(OptionalContext, - fmt::format("Saved oplog '{}' {} in {} ({}), Blocks: {} ({}), Attachments: {} ({}) {}", - RemoteStoreInfo.ContainerName, - RemoteResult.GetError() == 0 ? "SUCCESS" : "FAILURE", - NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0)), - NiceBytes(Info.OplogSizeBytes), - Info.AttachmentBlocksUploaded.load(), - NiceBytes(Info.AttachmentBlockBytesUploaded.load()), - Info.AttachmentsUploaded.load(), - NiceBytes(Info.AttachmentBytesUploaded.load()), - GetStats(RemoteStore.GetStats(), TransferWallTimeMS))); + remotestore_impl::ReportMessage(OptionalContext, + fmt::format("Saved oplog '{}' {} in {} ({}), Blocks: {} ({}), Attachments: {} ({}) {}", + RemoteStoreInfo.ContainerName, + RemoteResult.GetError() == 0 ? "SUCCESS" : "FAILURE", + NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0)), + NiceBytes(Info.OplogSizeBytes), + Info.AttachmentBlocksUploaded.load(), + NiceBytes(Info.AttachmentBlockBytesUploaded.load()), + Info.AttachmentsUploaded.load(), + NiceBytes(Info.AttachmentBytesUploaded.load()), + remotestore_impl::GetStats(RemoteStore.GetStats(), TransferWallTimeMS))); return Result; }; @@ -2183,7 +2757,8 @@ ParseOplogContainer(const CbObject& ContainerObject, CbObject SectionObject = LoadCompactBinaryObject(SectionPayload); if (!SectionObject) { - ReportMessage(OptionalContext, fmt::format("Failed to save oplog container: '{}'", "Section has unexpected data type")); + remotestore_impl::ReportMessage(OptionalContext, + fmt::format("Failed to save oplog container: '{}'", "Section has unexpected data type")); return RemoteProjectStore::Result{gsl::narrow<int>(HttpResponseCode::BadRequest), Timer.GetElapsedTimeMs() / 1000.0, "Section has unexpected data type", @@ -2203,7 +2778,7 @@ ParseOplogContainer(const CbObject& ContainerObject, std::vector<IoHash> ReferencedAttachments(OpsAttachments.begin(), OpsAttachments.end()); OnReferencedAttachments(ReferencedAttachments); } - ReportMessage(OptionalContext, fmt::format("Oplog references {} attachments", OpsAttachments.size())); + remotestore_impl::ReportMessage(OptionalContext, fmt::format("Oplog references {} attachments", OpsAttachments.size())); CbArrayView ChunkedFilesArray = ContainerObject["chunkedfiles"sv].AsArrayView(); for (CbFieldView ChunkedFileField : ChunkedFilesArray) @@ -2283,7 +2858,8 @@ ParseOplogContainer(const CbObject& ContainerObject, } } } - ReportMessage(OptionalContext, fmt::format("Requesting {} of {} attachment blocks", NeedBlockCount, BlocksArray.Num())); + remotestore_impl::ReportMessage(OptionalContext, + fmt::format("Requesting {} of {} attachment blocks", NeedBlockCount, BlocksArray.Num())); size_t NeedAttachmentCount = 0; CbArrayView LargeChunksArray = ContainerObject["chunks"sv].AsArrayView(); @@ -2296,68 +2872,8 @@ ParseOplogContainer(const CbObject& ContainerObject, NeedAttachmentCount++; } }; - ReportMessage(OptionalContext, fmt::format("Requesting {} of {} large attachments", NeedAttachmentCount, LargeChunksArray.Num())); - - return RemoteProjectStore::Result{.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0}; -} - -static RemoteProjectStore::Result -WriteOplogSection(ProjectStore::Oplog& Oplog, const CbObjectView& SectionObject, JobContext* OptionalContext) -{ - using namespace std::literals; - - Stopwatch Timer; - - CbArrayView OpsArray = SectionObject["ops"sv].AsArrayView(); - const uint64_t OpCount = OpsArray.Num(); - - ReportMessage(OptionalContext, fmt::format("Writing {} ops to oplog", OpCount)); - - const size_t OpsBatchSize = 8192; - std::vector<uint8_t> OpsData; - std::vector<size_t> OpDataOffsets; - size_t OpsCompleteCount = 0; - - OpsData.reserve(OpsBatchSize); - - auto AppendBatch = [&]() { - std::vector<CbObjectView> Ops; - Ops.reserve(OpDataOffsets.size()); - for (size_t OpDataOffset : OpDataOffsets) - { - Ops.emplace_back(CbObjectView(&OpsData[OpDataOffset])); - } - std::vector<uint32_t> OpLsns = Oplog.AppendNewOplogEntries(Ops); - OpsCompleteCount += OpLsns.size(); - OpsData.clear(); - OpDataOffsets.clear(); - ReportProgress(OptionalContext, - "Writing oplog"sv, - fmt::format("{} remaining...", OpCount - OpsCompleteCount), - OpCount, - OpCount - OpsCompleteCount); - }; - - BinaryWriter Writer; - for (CbFieldView OpEntry : OpsArray) - { - CbObjectView Op = OpEntry.AsObjectView(); - Op.CopyTo(Writer); - OpDataOffsets.push_back(OpsData.size()); - OpsData.insert(OpsData.end(), (const uint8_t*)Writer.GetData(), ((const uint8_t*)Writer.GetData()) + Writer.GetSize()); - Writer.Reset(); - - if (OpDataOffsets.size() == OpsBatchSize) - { - AppendBatch(); - } - } - if (!OpDataOffsets.empty()) - { - AppendBatch(); - } - - ReportProgress(OptionalContext, "Writing oplog"sv, ""sv, OpCount, 0); + remotestore_impl::ReportMessage(OptionalContext, + fmt::format("Requesting {} of {} large attachments", NeedAttachmentCount, LargeChunksArray.Num())); return RemoteProjectStore::Result{.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0}; } @@ -2388,7 +2904,7 @@ SaveOplogContainer(ProjectStore::Oplog& Oplog, { return Result; } - Result = WriteOplogSection(Oplog, OplogSection, OptionalContext); + Result = remotestore_impl::WriteOplogSection(Oplog, OplogSection, OptionalContext); Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; return Result; } @@ -2404,19 +2920,7 @@ LoadOplog(CidStore& ChunkStore, { using namespace std::literals; - struct DownloadInfo - { - uint64_t OplogSizeBytes = 0; - std::atomic<uint64_t> AttachmentsDownloaded = 0; - std::atomic<uint64_t> AttachmentBlocksDownloaded = 0; - std::atomic<uint64_t> AttachmentBytesDownloaded = 0; - std::atomic<uint64_t> AttachmentBlockBytesDownloaded = 0; - std::atomic<uint64_t> AttachmentsStored = 0; - std::atomic<uint64_t> AttachmentBytesStored = 0; - std::atomic_size_t MissingAttachmentCount = 0; - }; - - DownloadInfo Info; + remotestore_impl::DownloadInfo Info; Stopwatch Timer; @@ -2427,7 +2931,7 @@ LoadOplog(CidStore& ChunkStore, uint64_t BlockCountToDownload = 0; RemoteProjectStore::RemoteStoreInfo RemoteStoreInfo = RemoteStore.GetInfo(); - ReportMessage(OptionalContext, fmt::format("Loading oplog container '{}'", RemoteStoreInfo.ContainerName)); + remotestore_impl::ReportMessage(OptionalContext, fmt::format("Loading oplog container '{}'", RemoteStoreInfo.ContainerName)); uint64_t TransferWallTimeMS = 0; @@ -2436,7 +2940,7 @@ LoadOplog(CidStore& ChunkStore, TransferWallTimeMS += LoadContainerTimer.GetElapsedTimeMs(); if (LoadContainerResult.ErrorCode) { - ReportMessage( + remotestore_impl::ReportMessage( OptionalContext, fmt::format("Failed to load oplog container: '{}', error code: {}", LoadContainerResult.Reason, LoadContainerResult.ErrorCode)); return RemoteProjectStore::Result{.ErrorCode = LoadContainerResult.ErrorCode, @@ -2444,16 +2948,16 @@ LoadOplog(CidStore& ChunkStore, .Reason = LoadContainerResult.Reason, .Text = LoadContainerResult.Text}; } - ReportMessage(OptionalContext, - fmt::format("Loaded container in {} ({})", - NiceTimeSpanMs(static_cast<uint64_t>(LoadContainerResult.ElapsedSeconds * 1000)), - NiceBytes(LoadContainerResult.ContainerObject.GetSize()))); + remotestore_impl::ReportMessage(OptionalContext, + fmt::format("Loaded container in {} ({})", + NiceTimeSpanMs(static_cast<uint64_t>(LoadContainerResult.ElapsedSeconds * 1000)), + NiceBytes(LoadContainerResult.ContainerObject.GetSize()))); Info.OplogSizeBytes = LoadContainerResult.ContainerObject.GetSize(); - AsyncRemoteResult RemoteResult; - Latch AttachmentsDownloadLatch(1); - Latch AttachmentsWriteLatch(1); - std::atomic_size_t AttachmentCount = 0; + remotestore_impl::AsyncRemoteResult RemoteResult; + Latch AttachmentsDownloadLatch(1); + Latch AttachmentsWriteLatch(1); + std::atomic_size_t AttachmentCount = 0; Stopwatch LoadAttachmentsTimer; std::atomic_uint64_t DownloadStartMS = (std::uint64_t)-1; @@ -2469,6 +2973,7 @@ LoadOplog(CidStore& ChunkStore, } return false; }; + auto OnNeedBlock = [&RemoteStore, &ChunkStore, &NetworkWorkerPool, @@ -2489,265 +2994,41 @@ LoadOplog(CidStore& ChunkStore, } BlockCountToDownload++; + AttachmentCount.fetch_add(1); if (BlockHash == IoHash::Zero) { - AttachmentsDownloadLatch.AddCount(1); - AttachmentCount.fetch_add(1); - NetworkWorkerPool.ScheduleWork([&RemoteStore, - &ChunkStore, - &WorkerPool, - &AttachmentsDownloadLatch, - &AttachmentsWriteLatch, - &RemoteResult, - Chunks = std::move(Chunks), - &Info, - &LoadAttachmentsTimer, - &DownloadStartMS, - IgnoreMissingAttachments, - OptionalContext]() { - auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); }); - if (RemoteResult.IsError()) - { - return; - } - try - { - uint64_t Unset = (std::uint64_t)-1; - DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs()); - RemoteProjectStore::LoadAttachmentsResult Result = RemoteStore.LoadAttachments(Chunks); - if (Result.ErrorCode) - { - ReportMessage(OptionalContext, - fmt::format("Failed to load attachments with {} chunks ({}): {}", - Chunks.size(), - RemoteResult.GetError(), - RemoteResult.GetErrorReason())); - Info.MissingAttachmentCount.fetch_add(1); - if (IgnoreMissingAttachments) - { - RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); - } - return; - } - Info.AttachmentsDownloaded.fetch_add(Chunks.size()); - ZEN_INFO("Loaded {} bulk attachments in {}", - Chunks.size(), - NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000))); - if (RemoteResult.IsError()) - { - return; - } - AttachmentsWriteLatch.AddCount(1); - WorkerPool.ScheduleWork( - [&AttachmentsWriteLatch, &RemoteResult, &Info, &ChunkStore, Chunks = std::move(Result.Chunks)]() { - auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); }); - if (RemoteResult.IsError()) - { - return; - } - if (!Chunks.empty()) - { - try - { - std::vector<IoBuffer> WriteAttachmentBuffers; - std::vector<IoHash> WriteRawHashes; - WriteAttachmentBuffers.reserve(Chunks.size()); - WriteRawHashes.reserve(Chunks.size()); - - for (const auto& It : Chunks) - { - uint64_t ChunkSize = It.second.GetCompressedSize(); - Info.AttachmentBytesDownloaded.fetch_add(ChunkSize); - WriteAttachmentBuffers.push_back(It.second.GetCompressed().Flatten().AsIoBuffer()); - WriteRawHashes.push_back(It.first); - } - std::vector<CidStore::InsertResult> InsertResults = - ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes, CidStore::InsertMode::kCopyOnly); - - for (size_t Index = 0; Index < InsertResults.size(); Index++) - { - if (InsertResults[Index].New) - { - Info.AttachmentBytesStored.fetch_add(WriteAttachmentBuffers[Index].GetSize()); - Info.AttachmentsStored.fetch_add(1); - } - } - } - catch (const std::exception& Ex) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), - fmt::format("Failed to bulk save {} attachments", Chunks.size()), - Ex.what()); - } - } - }); - } - catch (const std::exception& Ex) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), - fmt::format("Failed to bulk load {} attachments", Chunks.size()), - Ex.what()); - } - }); - return; + DownloadAndSaveBlockChunks(ChunkStore, + RemoteStore, + IgnoreMissingAttachments, + OptionalContext, + NetworkWorkerPool, + WorkerPool, + AttachmentsDownloadLatch, + AttachmentsWriteLatch, + RemoteResult, + Info, + LoadAttachmentsTimer, + DownloadStartMS, + Chunks); + } + else + { + DownloadAndSaveBlock(ChunkStore, + RemoteStore, + IgnoreMissingAttachments, + OptionalContext, + NetworkWorkerPool, + WorkerPool, + AttachmentsDownloadLatch, + AttachmentsWriteLatch, + RemoteResult, + Info, + LoadAttachmentsTimer, + DownloadStartMS, + BlockHash, + Chunks, + 3); } - AttachmentsDownloadLatch.AddCount(1); - AttachmentCount.fetch_add(1); - NetworkWorkerPool.ScheduleWork([&AttachmentsDownloadLatch, - &AttachmentsWriteLatch, - &ChunkStore, - &RemoteStore, - &WorkerPool, - BlockHash, - &RemoteResult, - Chunks = std::move(Chunks), - &Info, - &LoadAttachmentsTimer, - &DownloadStartMS, - IgnoreMissingAttachments, - OptionalContext]() { - auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); }); - if (RemoteResult.IsError()) - { - return; - } - try - { - uint64_t Unset = (std::uint64_t)-1; - DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs()); - RemoteProjectStore::LoadAttachmentResult BlockResult = RemoteStore.LoadAttachment(BlockHash); - if (BlockResult.ErrorCode) - { - ReportMessage(OptionalContext, - fmt::format("Failed to download block attachment {} ({}): {}", - BlockHash, - RemoteResult.GetError(), - RemoteResult.GetErrorReason())); - Info.MissingAttachmentCount.fetch_add(1); - if (!IgnoreMissingAttachments) - { - RemoteResult.SetError(BlockResult.ErrorCode, BlockResult.Reason, BlockResult.Text); - } - return; - } - if (RemoteResult.IsError()) - { - return; - } - uint64_t BlockSize = BlockResult.Bytes.GetSize(); - Info.AttachmentBlocksDownloaded.fetch_add(1); - ZEN_INFO("Loaded block attachment '{}' in {} ({})", - BlockHash, - NiceTimeSpanMs(static_cast<uint64_t>(BlockResult.ElapsedSeconds * 1000)), - NiceBytes(BlockSize)); - Info.AttachmentBlockBytesDownloaded.fetch_add(BlockSize); - - AttachmentsWriteLatch.AddCount(1); - WorkerPool.ScheduleWork([&AttachmentsWriteLatch, - &RemoteResult, - &Info, - &ChunkStore, - BlockHash, - Chunks = std::move(Chunks), - Bytes = std::move(BlockResult.Bytes), - OptionalContext]() { - auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); }); - if (RemoteResult.IsError()) - { - return; - } - try - { - ZEN_ASSERT(Bytes.Size() > 0); - std::unordered_set<IoHash, IoHash::Hasher> WantedChunks; - WantedChunks.reserve(Chunks.size()); - WantedChunks.insert(Chunks.begin(), Chunks.end()); - std::vector<IoBuffer> WriteAttachmentBuffers; - std::vector<IoHash> WriteRawHashes; - - IoHash RawHash; - uint64_t RawSize; - SharedBuffer BlockPayload = CompressedBuffer::FromCompressed(SharedBuffer(Bytes), RawHash, RawSize).Decompress(); - if (!BlockPayload) - { - ReportMessage(OptionalContext, - fmt::format("Block attachment {} is malformed, can't parse as compressed binary", BlockHash)); - RemoteResult.SetError( - gsl::narrow<int32_t>(HttpResponseCode::InternalServerError), - fmt::format("Block attachment {} is malformed, can't parse as compressed binary", BlockHash), - {}); - return; - } - if (RawHash != BlockHash) - { - ReportMessage(OptionalContext, - fmt::format("Block attachment {} has mismatching raw hash ({})", BlockHash, RawHash)); - RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::InternalServerError), - fmt::format("Block attachment {} has mismatching raw hash ({})", BlockHash, RawHash), - {}); - return; - } - - bool StoreChunksOK = IterateBlock( - BlockPayload, - [&WantedChunks, &WriteAttachmentBuffers, &WriteRawHashes, &Info](CompressedBuffer&& Chunk, - const IoHash& AttachmentRawHash) { - if (WantedChunks.contains(AttachmentRawHash)) - { - WriteAttachmentBuffers.emplace_back(Chunk.GetCompressed().Flatten().AsIoBuffer()); - IoHash RawHash; - uint64_t RawSize; - ZEN_ASSERT(CompressedBuffer::ValidateCompressedHeader(WriteAttachmentBuffers.back(), RawHash, RawSize)); - ZEN_ASSERT(RawHash == AttachmentRawHash); - WriteRawHashes.emplace_back(AttachmentRawHash); - WantedChunks.erase(AttachmentRawHash); - } - }); - - if (!StoreChunksOK) - { - ReportMessage(OptionalContext, - fmt::format("Block attachment {} has invalid format ({}): {}", - BlockHash, - RemoteResult.GetError(), - RemoteResult.GetErrorReason())); - RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::InternalServerError), - fmt::format("Invalid format for block {}", BlockHash), - {}); - return; - } - - ZEN_ASSERT(WantedChunks.empty()); - - if (!WriteAttachmentBuffers.empty()) - { - auto Results = ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes); - for (size_t Index = 0; Index < Results.size(); Index++) - { - const auto& Result = Results[Index]; - if (Result.New) - { - Info.AttachmentBytesStored.fetch_add(WriteAttachmentBuffers[Index].GetSize()); - Info.AttachmentsStored.fetch_add(1); - } - } - } - } - catch (const std::exception& Ex) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), - fmt::format("Failed save block attachment {}", BlockHash), - Ex.what()); - } - }); - } - catch (const std::exception& Ex) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), - fmt::format("Failed to block attachment {}", BlockHash), - Ex.what()); - } - }); }; auto OnNeedAttachment = [&RemoteStore, @@ -2773,95 +3054,20 @@ LoadOplog(CidStore& ChunkStore, { return; } - - AttachmentsDownloadLatch.AddCount(1); AttachmentCount.fetch_add(1); - NetworkWorkerPool.ScheduleWork([&RemoteStore, - &ChunkStore, - &WorkerPool, - &RemoteResult, - &AttachmentsDownloadLatch, - &AttachmentsWriteLatch, - RawHash, - &LoadAttachmentsTimer, - &DownloadStartMS, - &Info, - IgnoreMissingAttachments, - OptionalContext]() { - auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); }); - if (RemoteResult.IsError()) - { - return; - } - try - { - uint64_t Unset = (std::uint64_t)-1; - DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs()); - RemoteProjectStore::LoadAttachmentResult AttachmentResult = RemoteStore.LoadAttachment(RawHash); - if (AttachmentResult.ErrorCode) - { - ReportMessage(OptionalContext, - fmt::format("Failed to download large attachment {}: '{}', error code : {}", - RawHash, - AttachmentResult.Reason, - AttachmentResult.ErrorCode)); - Info.MissingAttachmentCount.fetch_add(1); - if (!IgnoreMissingAttachments) - { - RemoteResult.SetError(AttachmentResult.ErrorCode, AttachmentResult.Reason, AttachmentResult.Text); - } - return; - } - uint64_t AttachmentSize = AttachmentResult.Bytes.GetSize(); - ZEN_INFO("Loaded large attachment '{}' in {} ({})", - RawHash, - NiceTimeSpanMs(static_cast<uint64_t>(AttachmentResult.ElapsedSeconds * 1000)), - NiceBytes(AttachmentSize)); - Info.AttachmentsDownloaded.fetch_add(1); - if (RemoteResult.IsError()) - { - return; - } - Info.AttachmentBytesDownloaded.fetch_add(AttachmentSize); - - AttachmentsWriteLatch.AddCount(1); - WorkerPool.ScheduleWork([&AttachmentsWriteLatch, - &RemoteResult, - &Info, - &ChunkStore, - RawHash, - AttachmentSize, - Bytes = std::move(AttachmentResult.Bytes), - OptionalContext]() { - auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); }); - if (RemoteResult.IsError()) - { - return; - } - try - { - CidStore::InsertResult InsertResult = ChunkStore.AddChunk(Bytes, RawHash); - if (InsertResult.New) - { - Info.AttachmentBytesStored.fetch_add(AttachmentSize); - Info.AttachmentsStored.fetch_add(1); - } - } - catch (const std::exception& Ex) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), - fmt::format("Saving attachment {} failed", RawHash), - Ex.what()); - } - }); - } - catch (const std::exception& Ex) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), - fmt::format("Loading attachment {} failed", RawHash), - Ex.what()); - } - }); + DownloadAndSaveAttachment(ChunkStore, + RemoteStore, + IgnoreMissingAttachments, + OptionalContext, + NetworkWorkerPool, + WorkerPool, + AttachmentsDownloadLatch, + AttachmentsWriteLatch, + RemoteResult, + Info, + LoadAttachmentsTimer, + DownloadStartMS, + RawHash); }; std::vector<ChunkedInfo> FilesToDechunk; @@ -2891,18 +3097,18 @@ LoadOplog(CidStore& ChunkStore, { RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); } - ReportMessage(OptionalContext, - fmt::format("Parsed oplog in {}, found {} attachments, {} blocks and {} chunked files to download", - NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0)), - Attachments.size(), - BlockCountToDownload, - FilesToDechunk.size())); + remotestore_impl::ReportMessage(OptionalContext, + fmt::format("Parsed oplog in {}, found {} attachments, {} blocks and {} chunked files to download", + NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0)), + Attachments.size(), + BlockCountToDownload, + FilesToDechunk.size())); AttachmentsDownloadLatch.CountDown(); while (!AttachmentsDownloadLatch.Wait(1000)) { ptrdiff_t Remaining = AttachmentsDownloadLatch.Remaining(); - if (IsCancelled(OptionalContext)) + if (remotestore_impl::IsCancelled(OptionalContext)) { if (!RemoteResult.IsError()) { @@ -2914,11 +3120,12 @@ LoadOplog(CidStore& ChunkStore, { PartialTransferWallTimeMS += LoadAttachmentsTimer.GetElapsedTimeMs() - DownloadStartMS.load(); } - ReportProgress(OptionalContext, - "Loading attachments"sv, - fmt::format("{} remaining. {}", Remaining, GetStats(RemoteStore.GetStats(), PartialTransferWallTimeMS)), - AttachmentCount.load(), - Remaining); + remotestore_impl::ReportProgress( + OptionalContext, + "Loading attachments"sv, + fmt::format("{} remaining. {}", Remaining, remotestore_impl::GetStats(RemoteStore.GetStats(), PartialTransferWallTimeMS)), + AttachmentCount.load(), + Remaining); } if (DownloadStartMS != (uint64_t)-1) { @@ -2927,41 +3134,41 @@ LoadOplog(CidStore& ChunkStore, if (AttachmentCount.load() > 0) { - ReportProgress(OptionalContext, - "Loading attachments"sv, - fmt::format("{}", GetStats(RemoteStore.GetStats(), TransferWallTimeMS)), - AttachmentCount.load(), - 0); + remotestore_impl::ReportProgress(OptionalContext, + "Loading attachments"sv, + fmt::format("{}", remotestore_impl::GetStats(RemoteStore.GetStats(), TransferWallTimeMS)), + AttachmentCount.load(), + 0); } AttachmentsWriteLatch.CountDown(); while (!AttachmentsWriteLatch.Wait(1000)) { ptrdiff_t Remaining = AttachmentsWriteLatch.Remaining(); - if (IsCancelled(OptionalContext)) + if (remotestore_impl::IsCancelled(OptionalContext)) { if (!RemoteResult.IsError()) { RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); } } - ReportProgress(OptionalContext, - "Writing attachments"sv, - fmt::format("{} remaining.", Remaining), - AttachmentCount.load(), - Remaining); + remotestore_impl::ReportProgress(OptionalContext, + "Writing attachments"sv, + fmt::format("{} remaining.", Remaining), + AttachmentCount.load(), + Remaining); } if (AttachmentCount.load() > 0) { - ReportProgress(OptionalContext, "Writing attachments", ""sv, AttachmentCount.load(), 0); + remotestore_impl::ReportProgress(OptionalContext, "Writing attachments", ""sv, AttachmentCount.load(), 0); } if (Result.ErrorCode == 0) { if (!FilesToDechunk.empty()) { - ReportMessage(OptionalContext, fmt::format("Dechunking {} attachments", FilesToDechunk.size())); + remotestore_impl::ReportMessage(OptionalContext, fmt::format("Dechunking {} attachments", FilesToDechunk.size())); Latch DechunkLatch(1); std::filesystem::path TempFilePath = Oplog.TempPath(); @@ -3011,7 +3218,7 @@ LoadOplog(CidStore& ChunkStore, IoBuffer Chunk = ChunkStore.FindChunkByCid(ChunkHash); if (!Chunk) { - ReportMessage( + remotestore_impl::ReportMessage( OptionalContext, fmt::format("Missing chunk {} for chunked attachment {}", ChunkHash, Chunked.RawHash)); @@ -3070,22 +3277,23 @@ LoadOplog(CidStore& ChunkStore, while (!DechunkLatch.Wait(1000)) { ptrdiff_t Remaining = DechunkLatch.Remaining(); - if (IsCancelled(OptionalContext)) + if (remotestore_impl::IsCancelled(OptionalContext)) { if (!RemoteResult.IsError()) { RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); - ReportMessage(OptionalContext, - fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); + remotestore_impl::ReportMessage( + OptionalContext, + fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); } } - ReportProgress(OptionalContext, - "Dechunking attachments"sv, - fmt::format("{} remaining...", Remaining), - FilesToDechunk.size(), - Remaining); + remotestore_impl::ReportProgress(OptionalContext, + "Dechunking attachments"sv, + fmt::format("{} remaining...", Remaining), + FilesToDechunk.size(), + Remaining); } - ReportProgress(OptionalContext, "Dechunking attachments"sv, ""sv, FilesToDechunk.size(), 0); + remotestore_impl::ReportProgress(OptionalContext, "Dechunking attachments"sv, ""sv, FilesToDechunk.size(), 0); } Result = RemoteResult.ConvertResult(); } @@ -3099,33 +3307,34 @@ LoadOplog(CidStore& ChunkStore, Result = RemoteProjectStore::Result{.ErrorCode = gsl::narrow<int>(HttpResponseCode::InternalServerError), .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0, .Reason = fmt::format("Failed to clean existing oplog '{}'", Oplog.OplogId())}; - ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", Result.ErrorCode, Result.Reason)); + remotestore_impl::ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", Result.ErrorCode, Result.Reason)); } } if (Result.ErrorCode == 0) { - WriteOplogSection(Oplog, OplogSection, OptionalContext); + remotestore_impl::WriteOplogSection(Oplog, OplogSection, OptionalContext); } } Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; - LogRemoteStoreStatsDetails(RemoteStore.GetStats()); - - ReportMessage(OptionalContext, - fmt::format("Loaded oplog '{}' {} in {} ({}), Blocks: {} ({}), Attachments: {} ({}), Stored: {} ({}), Missing: {} {}", - RemoteStoreInfo.ContainerName, - Result.ErrorCode == 0 ? "SUCCESS" : "FAILURE", - NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0)), - NiceBytes(Info.OplogSizeBytes), - Info.AttachmentBlocksDownloaded.load(), - NiceBytes(Info.AttachmentBlockBytesDownloaded.load()), - Info.AttachmentsDownloaded.load(), - NiceBytes(Info.AttachmentBytesDownloaded.load()), - Info.AttachmentsStored.load(), - NiceBytes(Info.AttachmentBytesStored.load()), - Info.MissingAttachmentCount.load(), - GetStats(RemoteStore.GetStats(), TransferWallTimeMS))); + remotestore_impl::LogRemoteStoreStatsDetails(RemoteStore.GetStats()); + + remotestore_impl::ReportMessage( + OptionalContext, + fmt::format("Loaded oplog '{}' {} in {} ({}), Blocks: {} ({}), Attachments: {} ({}), Stored: {} ({}), Missing: {} {}", + RemoteStoreInfo.ContainerName, + Result.ErrorCode == 0 ? "SUCCESS" : "FAILURE", + NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0)), + NiceBytes(Info.OplogSizeBytes), + Info.AttachmentBlocksDownloaded.load(), + NiceBytes(Info.AttachmentBlockBytesDownloaded.load()), + Info.AttachmentsDownloaded.load(), + NiceBytes(Info.AttachmentBytesDownloaded.load()), + Info.AttachmentsStored.load(), + NiceBytes(Info.AttachmentBytesStored.load()), + Info.MissingAttachmentCount.load(), + remotestore_impl::GetStats(RemoteStore.GetStats(), TransferWallTimeMS))); return Result; } |