diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/zenremotestore/projectstore/remoteprojectstore.cpp | 345 |
1 files changed, 117 insertions, 228 deletions
diff --git a/src/zenremotestore/projectstore/remoteprojectstore.cpp b/src/zenremotestore/projectstore/remoteprojectstore.cpp index 55f40d223..13b2e7b1e 100644 --- a/src/zenremotestore/projectstore/remoteprojectstore.cpp +++ b/src/zenremotestore/projectstore/remoteprojectstore.cpp @@ -69,34 +69,6 @@ namespace zen { namespace remotestore_impl { using namespace std::literals; - ////////////////////////////// AsyncRemoteResult - - struct AsyncRemoteResult - { - void SetError(int32_t ErrorCode, const std::string& ErrorReason, const std::string ErrorText) - { - int32_t Expected = 0; - if (m_ErrorCode.compare_exchange_weak(Expected, ErrorCode ? ErrorCode : -1)) - { - m_ErrorReason = ErrorReason; - m_ErrorText = ErrorText; - } - } - bool IsError() const { return m_ErrorCode.load() != 0; } - int GetError() const { return m_ErrorCode.load(); }; - const std::string& GetErrorReason() const { return m_ErrorReason; }; - const std::string& GetErrorText() const { return m_ErrorText; }; - RemoteProjectStore::Result ConvertResult(double ElapsedSeconds = 0.0) const - { - return RemoteProjectStore::Result{m_ErrorCode, ElapsedSeconds, m_ErrorReason, m_ErrorText}; - } - - private: - std::atomic<int32_t> m_ErrorCode = 0; - std::string m_ErrorReason; - std::string m_ErrorText; - }; - void ReportProgress(JobContext* OptionalContext, std::string_view CurrentOp, std::string_view Details, @@ -462,9 +434,7 @@ namespace remotestore_impl { } } IoBuffer TempAttachmentBuffer = IoBufferBuilder::MakeFromFile(AttachmentPath); - ZEN_ASSERT_SLOW(CompressedBuffer::FromCompressedNoValidate(IoBuffer(TempAttachmentBuffer)).CompressedBuffer::Decompress()); CompressedFile.Close(); - ZEN_ASSERT_SLOW(CompressedBuffer::FromCompressedNoValidate(IoBuffer(TempAttachmentBuffer)).CompressedBuffer::Decompress()); TempAttachmentBuffer.SetDeleteOnClose(true); ZEN_ASSERT_SLOW(CompressedBuffer::FromCompressedNoValidate(IoBuffer(TempAttachmentBuffer)).CompressedBuffer::Decompress()); return TempAttachmentBuffer; @@ -1032,7 +1002,7 @@ namespace remotestore_impl { if (CompressedSize > MaxChunkEmbedSize) { - TGetAttachmentBufferFunc FetchFunc = [Data = std::move(TempAttachmentBuffer)](const IoHash&) { + TGetAttachmentBufferFunc FetchFunc = [Data = std::move(TempAttachmentBuffer)](const IoHash&) mutable { return CompositeBuffer(SharedBuffer(std::move(Data))); }; @@ -2156,7 +2126,6 @@ namespace remotestore_impl { const std::unordered_set<IoHash, IoHash::Hasher>& Needs, bool ForceAll, UploadInfo& Info, - AsyncRemoteResult& RemoteResult, JobContext* OptionalContext) { using namespace std::literals; @@ -2217,22 +2186,15 @@ namespace remotestore_impl { if (!UnknownAttachments.empty()) { - RemoteResult.SetError( - gsl::narrow<int>(HttpResponseCode::NotFound), + throw RemoteStoreError( fmt::format("Upload requested of {} missing attachments, the base container referenced blocks that are no longer available", UnknownAttachments.size()), + gsl::narrow<int>(HttpResponseCode::NotFound), ""); - ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); - return; } if (IsCancelled(OptionalContext)) { - if (!RemoteResult.IsError()) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); - ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); - } return; } @@ -2245,121 +2207,91 @@ namespace remotestore_impl { Stopwatch Timer; - ptrdiff_t AttachmentsToSave(0); - Latch SaveAttachmentsLatch(1); + std::atomic<bool> AbortFlag(false); + std::atomic<bool> PauseFlag(false); + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); + ptrdiff_t AttachmentsToSave(0); for (const IoHash& RawHash : AttachmentsToUpload) { - if (RemoteResult.IsError()) + if (AbortFlag.load()) { break; } - SaveAttachmentsLatch.AddCount(1); AttachmentsToSave++; - WorkerPool.ScheduleWork( - [&ChunkStore, - &RemoteStore, - &SaveAttachmentsLatch, - &RemoteResult, - RawHash, - &CreatedBlocks, - &LooseFileAttachments, - &Info, - OptionalContext]() { + Work.ScheduleWork( + WorkerPool, + [&ChunkStore, &RemoteStore, RawHash, &CreatedBlocks, &LooseFileAttachments, &Info, OptionalContext]( + std::atomic<bool>& AbortFlag) { ZEN_TRACE_CPU("UploadAttachment"); - auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); }); - if (RemoteResult.IsError()) + if (AbortFlag.load()) { return; } - try + CompositeBuffer Payload; + ChunkBlockDescription Block; + if (auto BlockIt = CreatedBlocks.find(RawHash); BlockIt != CreatedBlocks.end()) { - CompositeBuffer Payload; - ChunkBlockDescription Block; - if (auto BlockIt = CreatedBlocks.find(RawHash); BlockIt != CreatedBlocks.end()) - { - Payload = BlockIt->second.Payload; - Block = BlockIt->second.Block; - } - else if (auto LooseTmpFileIt = LooseFileAttachments.find(RawHash); LooseTmpFileIt != LooseFileAttachments.end()) - { - Payload = LooseTmpFileIt->second(RawHash); - } - else - { - Payload = CompositeBuffer(SharedBuffer(ChunkStore.FindChunkByCid(RawHash))); - } - if (!Payload) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), - fmt::format("Failed to find attachment {}", RawHash), - {}); - ZEN_WARN("Failed to save attachment '{}' ({}): {}", - RawHash, - RemoteResult.GetError(), - RemoteResult.GetErrorReason()); - return; - } - const bool IsBlock = Block.BlockHash == RawHash; - size_t PayloadSize = Payload.GetSize(); - RemoteProjectStore::SaveAttachmentResult Result = - RemoteStore.SaveAttachment(std::move(Payload), RawHash, std::move(Block)); - if (Result.ErrorCode) - { - ReportMessage(OptionalContext, - fmt::format("Failed to save attachment '{}', {} ({}): {}", - RawHash, - NiceBytes(PayloadSize), - RemoteResult.GetError(), - RemoteResult.GetErrorReason())); - return; - } - if (IsBlock) - { - Info.AttachmentBlocksUploaded.fetch_add(1); - Info.AttachmentBlockBytesUploaded.fetch_add(PayloadSize); - ZEN_INFO("Saved block attachment '{}' in {} ({})", - RawHash, - NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)), - NiceBytes(PayloadSize)); - } - else - { - Info.AttachmentsUploaded.fetch_add(1); - Info.AttachmentBytesUploaded.fetch_add(PayloadSize); - ZEN_INFO("Saved large attachment '{}' in {} ({})", - RawHash, - NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)), - NiceBytes(PayloadSize)); - } + Payload = BlockIt->second.Payload; + Block = BlockIt->second.Block; } - catch (const std::exception& Ex) + else if (auto LooseTmpFileIt = LooseFileAttachments.find(RawHash); LooseTmpFileIt != LooseFileAttachments.end()) { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), - fmt::format("Failed to upload attachment {}", RawHash), - Ex.what()); + Payload = LooseTmpFileIt->second(RawHash); } - }, - WorkerThreadPool::EMode::EnableBacklog); + else + { + Payload = CompositeBuffer(SharedBuffer(ChunkStore.FindChunkByCid(RawHash))); + } + if (!Payload) + { + throw RemoteStoreError(fmt::format("Failed to find attachment {}", RawHash), + gsl::narrow<int>(HttpResponseCode::NotFound), + {}); + } + const bool IsBlock = Block.BlockHash == RawHash; + size_t PayloadSize = Payload.GetSize(); + RemoteProjectStore::SaveAttachmentResult Result = + RemoteStore.SaveAttachment(std::move(Payload), RawHash, std::move(Block)); + if (Result.ErrorCode) + { + throw RemoteStoreError(fmt::format("Failed to save attachment '{}', {}", RawHash, NiceBytes(PayloadSize)), + Result.ErrorCode, + Result.Text); + } + if (IsBlock) + { + Info.AttachmentBlocksUploaded.fetch_add(1); + Info.AttachmentBlockBytesUploaded.fetch_add(PayloadSize); + ZEN_INFO("Saved block attachment '{}' in {} ({})", + RawHash, + NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)), + NiceBytes(PayloadSize)); + } + else + { + Info.AttachmentsUploaded.fetch_add(1); + Info.AttachmentBytesUploaded.fetch_add(PayloadSize); + ZEN_INFO("Saved large attachment '{}' in {} ({})", + RawHash, + NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)), + NiceBytes(PayloadSize)); + } + }); } if (IsCancelled(OptionalContext)) { - if (!RemoteResult.IsError()) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); - ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); - } - return; + AbortFlag = true; } if (!BulkBlockAttachmentsToUpload.empty()) { for (const std::vector<std::pair<IoHash, FetchChunkFunc>>& Chunks : BlockChunks) { - if (RemoteResult.IsError()) + if (AbortFlag.load()) { break; } @@ -2379,87 +2311,62 @@ namespace remotestore_impl { continue; } - SaveAttachmentsLatch.AddCount(1); AttachmentsToSave++; - WorkerPool.ScheduleWork( + Work.ScheduleWork( + WorkerPool, [&RemoteStore, &ChunkStore, - &SaveAttachmentsLatch, - &RemoteResult, NeededChunks = std::move(NeededChunks), &BulkBlockAttachmentsToUpload, &Info, - OptionalContext]() { + OptionalContext](std::atomic<bool>& AbortFlag) { ZEN_TRACE_CPU("UploadChunk"); - auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); }); - if (RemoteResult.IsError()) + if (AbortFlag.load()) { return; } - try + size_t ChunksSize = 0; + std::vector<SharedBuffer> ChunkBuffers; + ChunkBuffers.reserve(NeededChunks.size()); + for (const IoHash& Chunk : NeededChunks) { - size_t ChunksSize = 0; - std::vector<SharedBuffer> ChunkBuffers; - ChunkBuffers.reserve(NeededChunks.size()); - for (const IoHash& Chunk : NeededChunks) + auto It = BulkBlockAttachmentsToUpload.find(Chunk); + ZEN_ASSERT(It != BulkBlockAttachmentsToUpload.end()); + CompositeBuffer ChunkPayload = It->second(It->first).second; + if (!ChunkPayload) { - auto It = BulkBlockAttachmentsToUpload.find(Chunk); - ZEN_ASSERT(It != BulkBlockAttachmentsToUpload.end()); - CompositeBuffer ChunkPayload = It->second(It->first).second; - if (!ChunkPayload) - { - RemoteResult.SetError(static_cast<int32_t>(HttpResponseCode::NotFound), - fmt::format("Missing chunk {}"sv, Chunk), - fmt::format("Unable to fetch attachment {} required by the oplog"sv, Chunk)); - ChunkBuffers.clear(); - break; - } - ChunksSize += ChunkPayload.GetSize(); - ChunkBuffers.emplace_back(SharedBuffer(ChunkPayload.Flatten().AsIoBuffer())); - } - RemoteProjectStore::SaveAttachmentsResult Result = RemoteStore.SaveAttachments(ChunkBuffers); - if (Result.ErrorCode) - { - ReportMessage(OptionalContext, - fmt::format("Failed to save attachments with {} chunks ({}): {}", - NeededChunks.size(), - RemoteResult.GetError(), - RemoteResult.GetErrorReason())); - return; + throw RemoteStoreError(fmt::format("Missing chunk {}"sv, Chunk), + static_cast<int32_t>(HttpResponseCode::NotFound), + fmt::format("Unable to fetch attachment {} required by the oplog"sv, Chunk)); } - Info.AttachmentsUploaded.fetch_add(ChunkBuffers.size()); - Info.AttachmentBytesUploaded.fetch_add(ChunksSize); - - ZEN_INFO("Saved {} bulk attachments in {} ({})", - NeededChunks.size(), - NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)), - NiceBytes(ChunksSize)); + ChunksSize += ChunkPayload.GetSize(); + ChunkBuffers.emplace_back(SharedBuffer(ChunkPayload.Flatten().AsIoBuffer())); } - catch (const std::exception& Ex) + RemoteProjectStore::SaveAttachmentsResult Result = RemoteStore.SaveAttachments(ChunkBuffers); + if (Result.ErrorCode) { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), - fmt::format("Failed to bulk upload {} attachments", NeededChunks.size()), - Ex.what()); + throw RemoteStoreError(fmt::format("Failed to save attachments with {} chunks", NeededChunks.size()), + Result.ErrorCode, + Result.Text); } - }, - WorkerThreadPool::EMode::EnableBacklog); + Info.AttachmentsUploaded.fetch_add(ChunkBuffers.size()); + Info.AttachmentBytesUploaded.fetch_add(ChunksSize); + + ZEN_INFO("Saved {} bulk attachments in {} ({})", + NeededChunks.size(), + NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)), + NiceBytes(ChunksSize)); + }); } } Stopwatch SaveAttachmentsProgressTimer; - SaveAttachmentsLatch.CountDown(); - while (!SaveAttachmentsLatch.Wait(1000)) - { - ptrdiff_t Remaining = SaveAttachmentsLatch.Remaining(); - if (IsCancelled(OptionalContext)) + Work.Wait(1000, [&](bool IsAborted, bool IsPaused, std::ptrdiff_t Remaining) { + ZEN_UNUSED(IsAborted, IsPaused); + if (IsCancelled(OptionalContext) && !AbortFlag.load()) { - if (!RemoteResult.IsError()) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); - ReportMessage(OptionalContext, - fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); - } + AbortFlag = true; } uint64_t PartialTransferWallTimeMS = Timer.GetElapsedTimeMs(); ReportProgress(OptionalContext, @@ -2470,7 +2377,7 @@ namespace remotestore_impl { AttachmentsToSave, Remaining, SaveAttachmentsProgressTimer.GetElapsedTimeMs()); - } + }); uint64_t ElapsedTimeMS = Timer.GetElapsedTimeMs(); if (AttachmentsToSave > 0) { @@ -2732,12 +2639,13 @@ BuildContainer(CidStore& ChunkStore, } else { + const size_t TotalAttachmentCount = UploadAttachments.size() + ReusedAttachmentCount; remotestore_impl::ReportMessage(OptionalContext, fmt::format("Resolving {} attachments from {} ops ({} ({:.1f}%) found in existing blocks)", UploadAttachments.size(), TotalOpCount, ReusedAttachmentCount, - (100.f * ReusedAttachmentCount) / UploadAttachments.size())); + (100.f * ReusedAttachmentCount) / TotalAttachmentCount)); ResolveAttachments(ChunkStore, WorkerPool, @@ -2926,11 +2834,12 @@ BuildContainer(CidStore& ChunkStore, if (auto It = LooseUploadAttachments.find(AttachmentHash); It != LooseUploadAttachments.end()) { uint64_t RawSize = It->second.first; - IoBuffer Payload = It->second.second; - return [RawSize = RawSize, Payload = Payload](const IoHash& ChunkHash) -> std::pair<uint64_t, CompositeBuffer> { - ZEN_UNUSED(ChunkHash); - return {RawSize, CompositeBuffer(SharedBuffer(std::move(Payload)))}; - }; + IoBuffer Payload = std::move(It->second.second); + return + [RawSize, Payload = std::move(Payload)](const IoHash& ChunkHash) mutable -> std::pair<uint64_t, CompositeBuffer> { + ZEN_UNUSED(ChunkHash); + return {RawSize, CompositeBuffer(SharedBuffer(std::move(Payload)))}; + }; } else { @@ -3433,8 +3342,7 @@ SaveOplog(CidStore& ChunkStore, } { - Stopwatch UploadAttachmentsTimer; - remotestore_impl::AsyncRemoteResult RemoteResult; + Stopwatch UploadAttachmentsTimer; UploadAttachments(NetworkWorkerPool, ChunkStore, RemoteStore, @@ -3445,17 +3353,8 @@ SaveOplog(CidStore& ChunkStore, ContainerSaveResult.Needs, ForceUpload, Info, - RemoteResult, OptionalContext); TransferWallTimeMS += UploadAttachmentsTimer.GetElapsedTimeMs(); - if (RemoteResult.IsError()) - { - throw RemoteStoreError(fmt::format("Failed to upload attachments for oplog '{}': {}", - RemoteStoreInfo.ContainerName, - RemoteResult.GetErrorReason()), - RemoteResult.GetError(), - RemoteResult.GetErrorText()); - } const uint32_t MaxTries = 8; uint32_t Try = 0; @@ -3520,17 +3419,8 @@ SaveOplog(CidStore& ChunkStore, ContainerFinalizeResult.Needs, false, Info, - RemoteResult, OptionalContext); TransferWallTimeMS += RetryUploadAttachmentsTimer.GetElapsedTimeMs(); - if (RemoteResult.IsError()) - { - throw RemoteStoreError(fmt::format("Failed to upload attachments for oplog '{}': {}", - RemoteStoreInfo.ContainerName, - RemoteResult.GetErrorReason()), - RemoteResult.GetError(), - RemoteResult.GetErrorText()); - } } } @@ -3922,7 +3812,6 @@ LoadOplog(LoadOplogContext&& Context) Context.Oplog.EnableUpdateCapture(); auto _ = MakeGuard([&Context]() { Context.Oplog.DisableUpdateCapture(); }); - RemoteProjectStore::Result LoadResult; CbObject OplogSection; RemoteProjectStore::Result Result = ParseOplogContainer(LoadContainerResult.ContainerObject, OnReferencedAttachments, @@ -3934,8 +3823,9 @@ LoadOplog(LoadOplogContext&& Context) Context.OptionalJobContext); if (Result.ErrorCode != 0) { - AbortFlag = true; - LoadResult = {.ErrorCode = Result.ErrorCode, .Reason = Result.Reason, .Text = Result.Text}; + AbortFlag = true; + AttachmentWork.Wait(); + throw RemoteStoreError(Result.Reason, Result.ErrorCode, Result.Text); } remotestore_impl::ReportMessage(Context.OptionalJobContext, fmt::format("Parsed oplog in {}, found {} attachments, {} blocks and {} chunked files to download", @@ -4191,11 +4081,6 @@ LoadOplog(LoadOplogContext&& Context) AttachmentsDownloadProgressTimer.GetElapsedTimeMs()); }); - if (LoadResult.ErrorCode) - { - throw RemoteStoreError(LoadResult.Reason, LoadResult.ErrorCode, LoadResult.Text); - } - if (DownloadStartMS != (uint64_t)-1) { TransferWallTimeMS += LoadAttachmentsTimer.GetElapsedTimeMs() - DownloadStartMS.load(); @@ -4382,10 +4267,6 @@ LoadOplog(LoadOplogContext&& Context) NiceBytes(Chunked.RawSize), NiceTimeSpanMs(Timer.GetElapsedTimeMs())); } - catch (const RemoteStoreError&) - { - throw; - } catch (const std::exception& Ex) { throw RemoteStoreError(fmt::format("Failed to dechunk file {}", Chunked.RawHash), @@ -4694,17 +4575,25 @@ namespace projectstore_testutils { struct CapturingJobContext : public JobContext { bool IsCancelled() const override { return false; } - void ReportMessage(std::string_view Message) override { Messages.emplace_back(Message); } + void ReportMessage(std::string_view Message) override + { + RwLock::ExclusiveLockScope _(m_Lock); + Messages.emplace_back(Message); + } void ReportProgress(std::string_view, std::string_view, ptrdiff_t, ptrdiff_t, uint64_t) override {} bool HasMessage(std::string_view Substr) const { + RwLock::SharedLockScope _(m_Lock); return std::any_of(Messages.begin(), Messages.end(), [Substr](const std::string& M) { return M.find(Substr) != std::string::npos; }); } std::vector<std::string> Messages; + + private: + mutable RwLock m_Lock; }; // Create a test IoHash with a unique value based on a small index. |