diff options
| author | Dan Engelbrecht <[email protected]> | 2023-09-19 06:04:10 -0400 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-09-19 12:04:10 +0200 |
| commit | 46e17d04154be01d26b2f30afcda37a11ba290fc (patch) | |
| tree | 39d56fc8e3d0bb289aebcd0155fc68b8f7d582bb /src | |
| parent | handle errors in spdlog gracefully (#410) (diff) | |
| download | zen-46e17d04154be01d26b2f30afcda37a11ba290fc.tar.xz zen-46e17d04154be01d26b2f30afcda37a11ba290fc.zip | |
Add retry if FinalizeRef responds with non-empty "Needs" attachments (#409)
* Add retry if FinalizeRef responds with non-empty "Needs" attachments
* better logging/progress report
* changelog
Diffstat (limited to 'src')
5 files changed, 377 insertions, 261 deletions
diff --git a/src/zenserver/projectstore/fileremoteprojectstore.cpp b/src/zenserver/projectstore/fileremoteprojectstore.cpp index eeb1f71c4..8029d02de 100644 --- a/src/zenserver/projectstore/fileremoteprojectstore.cpp +++ b/src/zenserver/projectstore/fileremoteprojectstore.cpp @@ -135,7 +135,7 @@ public: return Result; } - virtual Result FinalizeContainer(const IoHash&) override { return {}; } + virtual FinalizeResult FinalizeContainer(const IoHash&) override { return {}; } virtual LoadContainerResult LoadContainer() override { return LoadContainer(m_Name); } virtual LoadContainerResult LoadBaseContainer() override diff --git a/src/zenserver/projectstore/jupiterremoteprojectstore.cpp b/src/zenserver/projectstore/jupiterremoteprojectstore.cpp index e59bac6d6..cfe273eba 100644 --- a/src/zenserver/projectstore/jupiterremoteprojectstore.cpp +++ b/src/zenserver/projectstore/jupiterremoteprojectstore.cpp @@ -124,22 +124,22 @@ public: return Result; } - virtual Result FinalizeContainer(const IoHash& RawHash) override + virtual FinalizeResult FinalizeContainer(const IoHash& RawHash) override { - const int32_t MaxAttempts = 3; - CloudCacheResult FinalizeResult; + const int32_t MaxAttempts = 3; + FinalizeRefResult FinalizeRefResult; { CloudCacheSession Session(m_CloudClient.Get()); - for (int32_t Attempt = 0; Attempt < MaxAttempts && !FinalizeResult.Success; Attempt++) + for (int32_t Attempt = 0; Attempt < MaxAttempts && !FinalizeRefResult.Success; Attempt++) { - FinalizeResult = Session.FinalizeRef(m_Namespace, m_Bucket, m_Key, RawHash); - if (!FinalizeResult.Success) + FinalizeRefResult = Session.FinalizeRef(m_Namespace, m_Bucket, m_Key, RawHash); + if (!FinalizeRefResult.Success) { Sleep(100 * (Attempt + 1)); } } } - Result Result{ConvertResult(FinalizeResult)}; + FinalizeResult Result{ConvertResult(FinalizeRefResult), {FinalizeRefResult.Needs.begin(), FinalizeRefResult.Needs.end()}}; if (Result.ErrorCode) { Result.Reason = fmt::format("Failed finalizing oplog container to {}/{}/{}/{}. Reason: '{}'", diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp index aca9410a2..ea744eb35 100644 --- a/src/zenserver/projectstore/remoteprojectstore.cpp +++ b/src/zenserver/projectstore/remoteprojectstore.cpp @@ -175,6 +175,8 @@ CreateBlock(WorkerThreadPool& WorkerPool, const std::function<void(CompressedBuffer&&, const IoHash&)>& AsyncOnBlock, AsyncRemoteResult& RemoteResult) { + ZEN_INFO("Generating block with {} attachments", ChunksInBlock.size()); + OpSectionsLatch.AddCount(1); WorkerPool.ScheduleWork( [&Blocks, &SectionsLock, &OpSectionsLatch, BlockIndex, Chunks = std::move(ChunksInBlock), &AsyncOnBlock, &RemoteResult]() mutable { @@ -586,7 +588,6 @@ BuildContainer(CidStore& ChunkStore, if (BlockSize >= MaxBlockSize && (CurrentOpLSN != LastLSNOp)) { - ZEN_INFO("Generating block with {} attachments", ChunksInBlock.size()); size_t BlockIndex = AddBlock(BlocksLock, Blocks); if (BuildBlocks) { @@ -601,6 +602,7 @@ BuildContainer(CidStore& ChunkStore, } else { + ZEN_INFO("Bulk group {} attachments", BlockAttachmentHashes.size()); OnBlockChunks(BlockAttachmentHashes); } { @@ -619,7 +621,6 @@ BuildContainer(CidStore& ChunkStore, } if (BlockSize > 0) { - ZEN_INFO("Generating block with {} attachments", ChunksInBlock.size()); size_t BlockIndex = AddBlock(BlocksLock, Blocks); if (BuildBlocks) { @@ -634,6 +635,7 @@ BuildContainer(CidStore& ChunkStore, } else { + ZEN_INFO("Bulk group {} attachments", BlockAttachmentHashes.size()); OnBlockChunks(BlockAttachmentHashes); } { @@ -810,6 +812,317 @@ BuildContainer(CidStore& ChunkStore, RemoteResult); return RemoteProjectStore::LoadContainerResult{RemoteResult.ConvertResult(), ContainerObject}; } +void +UploadAttachments(WorkerThreadPool& WorkerPool, + CidStore& ChunkStore, + RemoteProjectStore& RemoteStore, + const std::unordered_set<IoHash, IoHash::Hasher>& LargeAttachments, + const std::vector<std::vector<IoHash>>& BlockChunks, + const std::unordered_map<IoHash, IoBuffer, IoHash::Hasher>& CreatedBlocks, + const tsl::robin_map<IoHash, IoBuffer, IoHash::Hasher>& TempAttachments, + const std::unordered_set<IoHash, IoHash::Hasher>& Needs, + bool ForceAll, + AsyncRemoteResult& RemoteResult, + JobContext* OptionalContext) +{ + using namespace std::literals; + + if (Needs.empty() && !ForceAll) + { + return; + } + + ReportMessage(OptionalContext, "Filtering needed attachments..."); + + std::unordered_set<IoHash, IoHash::Hasher> AttachmentsToUpload; + + size_t BlockAttachmentCountToUpload = 0; + size_t LargeAttachmentCountToUpload = 0; + std::atomic<ptrdiff_t> BulkAttachmentCountToUpload = 0; + AttachmentsToUpload.reserve(ForceAll ? CreatedBlocks.size() + LargeAttachments.size() : Needs.size()); + + for (const auto& CreatedBlock : CreatedBlocks) + { + if (ForceAll || Needs.contains(CreatedBlock.first)) + { + AttachmentsToUpload.insert(CreatedBlock.first); + BlockAttachmentCountToUpload++; + } + } + for (const IoHash& LargeAttachment : LargeAttachments) + { + if (ForceAll || Needs.contains(LargeAttachment)) + { + AttachmentsToUpload.insert(LargeAttachment); + LargeAttachmentCountToUpload++; + } + } + for (const std::vector<IoHash>& BlockHashes : BlockChunks) + { + if (ForceAll) + { + AttachmentsToUpload.insert(BlockHashes.begin(), BlockHashes.end()); + BulkAttachmentCountToUpload += BlockHashes.size(); + continue; + } + for (const IoHash& Hash : BlockHashes) + { + if (Needs.contains(Hash)) + { + AttachmentsToUpload.insert(Hash); + BulkAttachmentCountToUpload++; + } + } + } + + for (const IoHash& Needed : Needs) + { + if (!AttachmentsToUpload.contains(Needed)) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), + "Invalid attachment", + fmt::format("Upload requested of unknown attachement '{}'", Needed)); + ZEN_ERROR("Failed to upload attachment '{}'. ({}). Reason: '{}'", + Needed, + RemoteResult.GetError(), + RemoteResult.GetErrorReason()); + return; + } + } + + if (AttachmentsToUpload.empty()) + { + ReportMessage(OptionalContext, "No attachments needed"); + return; + } + + if (OptionalContext && OptionalContext->CancelFlag) + { + if (!RemoteResult.IsError()) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); + } + return; + } + + ReportMessage(OptionalContext, + fmt::format("Saving {} attachments ({} blocks, {} attachments, {} bulk attachments)", + AttachmentsToUpload.size(), + BlockAttachmentCountToUpload, + LargeAttachmentCountToUpload, + BulkAttachmentCountToUpload.load())); + + ptrdiff_t AttachmentsToSave(0); + Latch SaveAttachmentsLatch(1); + + for (const IoHash& RawHash : LargeAttachments) + { + if (RemoteResult.IsError()) + { + break; + } + if (!AttachmentsToUpload.contains(RawHash)) + { + continue; + } + + IoBuffer Payload; + if (auto BlockIt = CreatedBlocks.find(RawHash); BlockIt != CreatedBlocks.end()) + { + Payload = BlockIt->second; + } + else if (auto LooseTmpFileIt = TempAttachments.find(RawHash); LooseTmpFileIt != TempAttachments.end()) + { + Payload = LooseTmpFileIt->second; + } + + SaveAttachmentsLatch.AddCount(1); + AttachmentsToSave++; + WorkerPool.ScheduleWork( + [&ChunkStore, &RemoteStore, &SaveAttachmentsLatch, &RemoteResult, RawHash, &CreatedBlocks, TempPayload = std::move(Payload)]() { + auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); }); + if (RemoteResult.IsError()) + { + return; + } + IoBuffer Payload = TempPayload ? TempPayload : ChunkStore.FindChunkByCid(RawHash); + if (!Payload) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), + fmt::format("Failed to find attachment {}", RawHash), + {}); + ZEN_ERROR("Failed to save attachment '{}' ({}). Reason: '{}'", + RawHash, + RemoteResult.GetErrorReason(), + RemoteResult.GetError()); + return; + } + + RemoteProjectStore::SaveAttachmentResult Result = + RemoteStore.SaveAttachment(CompositeBuffer(SharedBuffer(Payload)), RawHash); + if (Result.ErrorCode) + { + RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); + ZEN_ERROR("Failed to save attachment '{}', {} ({}). Reason: '{}'", + RawHash, + NiceBytes(Payload.GetSize()), + RemoteResult.GetError(), + RemoteResult.GetErrorReason()); + return; + } + ZEN_DEBUG("Saved attachment {}, {} in {}", + RawHash, + NiceBytes(Payload.GetSize()), + NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000))); + return; + }); + } + + if (OptionalContext && OptionalContext->CancelFlag) + { + if (!RemoteResult.IsError()) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); + } + return; + } + + for (auto& It : CreatedBlocks) + { + if (RemoteResult.IsError()) + { + break; + } + const IoHash& RawHash = It.first; + if (!AttachmentsToUpload.contains(RawHash)) + { + continue; + } + IoBuffer Payload = It.second; + ZEN_ASSERT(Payload); + SaveAttachmentsLatch.AddCount(1); + AttachmentsToSave++; + WorkerPool.ScheduleWork([&ChunkStore, &RemoteStore, &SaveAttachmentsLatch, &RemoteResult, Payload = std::move(Payload), RawHash]() { + auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); }); + if (RemoteResult.IsError()) + { + return; + } + + RemoteProjectStore::SaveAttachmentResult Result = RemoteStore.SaveAttachment(CompositeBuffer(SharedBuffer(Payload)), RawHash); + if (Result.ErrorCode) + { + RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); + ZEN_ERROR("Failed to save attachment '{}', {} ({}). Reason: '{}'", + RawHash, + NiceBytes(Payload.GetSize()), + RemoteResult.GetError(), + RemoteResult.GetErrorReason()); + return; + } + + ZEN_DEBUG("Saved attachment {}, {} in {}", + RawHash, + NiceBytes(Payload.GetSize()), + NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000))); + return; + }); + } + + if (OptionalContext && OptionalContext->CancelFlag) + { + if (!RemoteResult.IsError()) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); + } + return; + } + + for (const std::vector<IoHash>& Chunks : BlockChunks) + { + if (RemoteResult.IsError()) + { + break; + } + + std::vector<IoHash> NeededChunks; + NeededChunks.reserve(Chunks.size()); + for (const IoHash& Chunk : Chunks) + { + if (AttachmentsToUpload.contains(Chunk)) + { + NeededChunks.push_back(Chunk); + } + } + if (NeededChunks.empty()) + { + continue; + } + + SaveAttachmentsLatch.AddCount(1); + AttachmentsToSave++; + WorkerPool.ScheduleWork([&RemoteStore, + &ChunkStore, + &SaveAttachmentsLatch, + &RemoteResult, + &Chunks, + NeededChunks = std::move(NeededChunks), + &BulkAttachmentCountToUpload]() { + auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); }); + std::vector<SharedBuffer> ChunkBuffers; + ChunkBuffers.reserve(NeededChunks.size()); + for (const IoHash& Chunk : NeededChunks) + { + IoBuffer ChunkPayload = ChunkStore.FindChunkByCid(Chunk); + if (!ChunkPayload) + { + RemoteResult.SetError(static_cast<int32_t>(HttpResponseCode::NotFound), + fmt::format("Missing chunk {}"sv, Chunk), + fmt::format("Unable to fetch attachment {} required by the oplog"sv, Chunk)); + ChunkBuffers.clear(); + break; + } + ChunkBuffers.emplace_back(SharedBuffer(std::move(ChunkPayload))); + } + RemoteProjectStore::SaveAttachmentsResult Result = RemoteStore.SaveAttachments(ChunkBuffers); + if (Result.ErrorCode) + { + RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); + ZEN_ERROR("Failed to save attachments with {} chunks ({}). Reason: '{}'", + Chunks.size(), + RemoteResult.GetError(), + RemoteResult.GetErrorReason()); + return; + } + ZEN_DEBUG("Saved {} bulk attachments in {}", + Chunks.size(), + NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000))); + BulkAttachmentCountToUpload.fetch_sub(Chunks.size()); + }); + } + + SaveAttachmentsLatch.CountDown(); + while (!SaveAttachmentsLatch.Wait(1000)) + { + ptrdiff_t Remaining = SaveAttachmentsLatch.Remaining(); + if (OptionalContext && OptionalContext->CancelFlag) + { + if (!RemoteResult.IsError()) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); + } + } + ReportProgress( + OptionalContext, + fmt::format("Saving attachments, {} remaining...", BlockChunks.empty() ? Remaining : BulkAttachmentCountToUpload.load()), + AttachmentsToSave, + Remaining); + } + if (AttachmentsToSave > 0) + { + ReportProgress(OptionalContext, fmt::format("Saving attachments, {} remaining...", 0), AttachmentsToSave, 0); + } +} RemoteProjectStore::Result SaveOplog(CidStore& ChunkStore, @@ -949,6 +1262,7 @@ SaveOplog(CidStore& ChunkStore, else { CbArrayView BlocksArray = BaseContainerResult.ContainerObject["blocks"sv].AsArrayView(); + KnownBlocks.reserve(BlocksArray.Num()); for (CbFieldView BlockField : BlocksArray) { CbObjectView BlockView = BlockField.AsObjectView(); @@ -988,7 +1302,6 @@ SaveOplog(CidStore& ChunkStore, EmbedLooseFiles ? &TempAttachments : nullptr, OptionalContext, /* out */ RemoteResult); - if (!RemoteResult.IsError()) { if (OptionalContext && OptionalContext->CancelFlag) @@ -1014,7 +1327,19 @@ SaveOplog(CidStore& ChunkStore, ZEN_DEBUG("Saved container in {}", NiceTimeSpanMs(static_cast<uint64_t>(ContainerSaveResult.ElapsedSeconds * 1000))); } - if (!ContainerSaveResult.Needs.empty() || ForceUpload) + UploadAttachments(WorkerPool, + ChunkStore, + RemoteStore, + LargeAttachments, + BlockChunks, + CreatedBlocks, + TempAttachments, + ContainerSaveResult.Needs, + ForceUpload, + RemoteResult, + OptionalContext); + + while (!RemoteResult.IsError()) { if (OptionalContext && OptionalContext->CancelFlag) { @@ -1024,251 +1349,8 @@ SaveOplog(CidStore& ChunkStore, return Result; } - ReportMessage(OptionalContext, "Filtering needed attachments..."); - - std::vector<IoHash> NeededLargeAttachments; - std::unordered_set<IoHash, IoHash::Hasher> NeededOtherAttachments; - NeededLargeAttachments.reserve(LargeAttachments.size()); - NeededOtherAttachments.reserve(CreatedBlocks.size()); - if (ForceUpload) - { - // TODO: Check ForceUpload - it should add all attachments and blocks regardless if Needs is empty or not - NeededLargeAttachments.insert(NeededLargeAttachments.end(), LargeAttachments.begin(), LargeAttachments.end()); - } - else - { - for (const IoHash& RawHash : ContainerSaveResult.Needs) - { - if (LargeAttachments.contains(RawHash)) - { - NeededLargeAttachments.push_back(RawHash); - continue; - } - NeededOtherAttachments.insert(RawHash); - } - } - - ptrdiff_t AttachmentsToSave(0); - Latch SaveAttachmentsLatch(1); - if (!NeededLargeAttachments.empty()) - { - if (OptionalContext && OptionalContext->CancelFlag) - { - RemoteProjectStore::Result Result = {.ErrorCode = 0, - .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500, - .Text = "Operation cancelled"}; - return Result; - } - ReportMessage(OptionalContext, "Saving large attachments..."); - for (const IoHash& RawHash : NeededLargeAttachments) - { - if (RemoteResult.IsError()) - { - break; - } - IoBuffer Payload; - if (auto BlockIt = CreatedBlocks.find(RawHash); BlockIt != CreatedBlocks.end()) - { - Payload = std::move(BlockIt->second); - } - else if (auto LooseTmpFileIt = TempAttachments.find(RawHash); LooseTmpFileIt != TempAttachments.end()) - { - Payload = LooseTmpFileIt->second; - TempAttachments.erase(LooseTmpFileIt); - } - - SaveAttachmentsLatch.AddCount(1); - AttachmentsToSave++; - WorkerPool.ScheduleWork([&ChunkStore, - &RemoteStore, - &SaveAttachmentsLatch, - &RemoteResult, - RawHash, - &CreatedBlocks, - TempPayload = std::move(Payload)]() { - auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); }); - if (RemoteResult.IsError()) - { - return; - } - IoBuffer Payload = TempPayload ? TempPayload : ChunkStore.FindChunkByCid(RawHash); - if (!Payload) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), - fmt::format("Failed to find attachment {}", RawHash), - {}); - ZEN_ERROR("Failed to build container ({}). Reason: '{}'", - RemoteResult.GetErrorReason(), - RemoteResult.GetError()); - return; - } - - RemoteProjectStore::SaveAttachmentResult Result = - RemoteStore.SaveAttachment(CompositeBuffer(SharedBuffer(Payload)), RawHash); - if (Result.ErrorCode) - { - RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); - ZEN_ERROR("Failed to save attachment '{}', {} ({}). Reason: '{}'", - RawHash, - NiceBytes(Payload.GetSize()), - RemoteResult.GetError(), - RemoteResult.GetErrorReason()); - return; - } - ZEN_DEBUG("Saved attachment {}, {} in {}", - RawHash, - NiceBytes(Payload.GetSize()), - NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000))); - return; - }); - } - } - TempAttachments.clear(); - - if (!CreatedBlocks.empty()) - { - ReportMessage(OptionalContext, "Saving created block attachments..."); - for (auto& It : CreatedBlocks) - { - if (RemoteResult.IsError()) - { - break; - } - const IoHash& RawHash = It.first; - if (ForceUpload || NeededOtherAttachments.contains(RawHash)) - { - IoBuffer Payload = It.second; - ZEN_ASSERT(Payload); - SaveAttachmentsLatch.AddCount(1); - AttachmentsToSave++; - WorkerPool.ScheduleWork( - [&ChunkStore, &RemoteStore, &SaveAttachmentsLatch, &RemoteResult, Payload = std::move(Payload), RawHash]() { - auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); }); - if (RemoteResult.IsError()) - { - return; - } - - RemoteProjectStore::SaveAttachmentResult Result = - RemoteStore.SaveAttachment(CompositeBuffer(SharedBuffer(Payload)), RawHash); - if (Result.ErrorCode) - { - RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); - ZEN_ERROR("Failed to save attachment '{}', {} ({}). Reason: '{}'", - RawHash, - NiceBytes(Payload.GetSize()), - RemoteResult.GetError(), - RemoteResult.GetErrorReason()); - return; - } - - ZEN_DEBUG("Saved attachment {}, {} in {}", - RawHash, - NiceBytes(Payload.GetSize()), - NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000))); - return; - }); - } - It.second = {}; - } - } - - if (!BlockChunks.empty()) - { - ReportMessage(OptionalContext, "Saving chunk block attachments..."); - for (const std::vector<IoHash>& Chunks : BlockChunks) - { - if (RemoteResult.IsError()) - { - break; - } - std::vector<IoHash> NeededChunks; - if (ForceUpload) - { - NeededChunks = Chunks; - } - else - { - NeededChunks.reserve(Chunks.size()); - for (const IoHash& Chunk : Chunks) - { - if (NeededOtherAttachments.contains(Chunk)) - { - NeededChunks.push_back(Chunk); - } - } - if (NeededChunks.empty()) - { - continue; - } - } - SaveAttachmentsLatch.AddCount(1); - AttachmentsToSave++; - WorkerPool.ScheduleWork([&RemoteStore, - &ChunkStore, - &SaveAttachmentsLatch, - &RemoteResult, - &Chunks, - NeededChunks = std::move(NeededChunks), - ForceUpload]() { - auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); }); - std::vector<SharedBuffer> ChunkBuffers; - ChunkBuffers.reserve(NeededChunks.size()); - for (const IoHash& Chunk : NeededChunks) - { - IoBuffer ChunkPayload = ChunkStore.FindChunkByCid(Chunk); - if (!ChunkPayload) - { - RemoteResult.SetError(static_cast<int32_t>(HttpResponseCode::NotFound), - fmt::format("Missing chunk {}"sv, Chunk), - fmt::format("Unable to fetch attachment {} required by the oplog"sv, Chunk)); - ChunkBuffers.clear(); - break; - } - ChunkBuffers.emplace_back(SharedBuffer(std::move(ChunkPayload))); - } - RemoteProjectStore::SaveAttachmentsResult Result = RemoteStore.SaveAttachments(ChunkBuffers); - if (Result.ErrorCode) - { - RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); - ZEN_ERROR("Failed to save attachments with {} chunks ({}). Reason: '{}'", - Chunks.size(), - RemoteResult.GetError(), - RemoteResult.GetErrorReason()); - return; - } - ZEN_DEBUG("Saved {} bulk attachments in {}", - Chunks.size(), - NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000))); - }); - } - } - SaveAttachmentsLatch.CountDown(); - while (!SaveAttachmentsLatch.Wait(1000)) - { - ptrdiff_t Remaining = SaveAttachmentsLatch.Remaining(); - if (OptionalContext && OptionalContext->CancelFlag) - { - if (!RemoteResult.IsError()) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); - } - } - ReportProgress(OptionalContext, - fmt::format("Saving attachments, {} remaining...", Remaining), - AttachmentsToSave, - Remaining); - } - if (AttachmentsToSave > 0) - { - ReportProgress(OptionalContext, fmt::format("Saving attachments, {} remaining...", 0), AttachmentsToSave, 0); - } - } - - if (!RemoteResult.IsError()) - { ReportMessage(OptionalContext, "Finalizing oplog container..."); - RemoteProjectStore::Result ContainerFinalizeResult = RemoteStore.FinalizeContainer(ContainerSaveResult.RawHash); + RemoteProjectStore::FinalizeResult ContainerFinalizeResult = RemoteStore.FinalizeContainer(ContainerSaveResult.RawHash); if (ContainerFinalizeResult.ErrorCode) { RemoteResult.SetError(ContainerFinalizeResult.ErrorCode, ContainerFinalizeResult.Reason, ContainerFinalizeResult.Text); @@ -1278,9 +1360,38 @@ SaveOplog(CidStore& ChunkStore, RemoteResult.GetErrorReason()); } ZEN_DEBUG("Finalized container in {}", NiceTimeSpanMs(static_cast<uint64_t>(ContainerFinalizeResult.ElapsedSeconds * 1000))); + if (ContainerFinalizeResult.Needs.empty()) + { + break; + } + + if (OptionalContext && OptionalContext->CancelFlag) + { + RemoteProjectStore::Result Result = {.ErrorCode = 0, + .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500, + .Text = "Operation cancelled"}; + return Result; + } + + ReportMessage(OptionalContext, + fmt::format("Finalize reported {} missing attachments...", ContainerFinalizeResult.Needs.size())); + + UploadAttachments(WorkerPool, + ChunkStore, + RemoteStore, + LargeAttachments, + BlockChunks, + CreatedBlocks, + TempAttachments, + ContainerFinalizeResult.Needs, + false, + RemoteResult, + OptionalContext); } - } + TempAttachments.clear(); + CreatedBlocks.clear(); + } RemoteProjectStore::Result Result = RemoteResult.ConvertResult(); Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; ZEN_INFO("Saved oplog {} in {}", @@ -1450,7 +1561,7 @@ LoadOplog(CidStore& ChunkStore, if (Result.ErrorCode) { RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); - ZEN_ERROR("Failed to attachments with {} chunks ({}). Reason: '{}'", + ZEN_ERROR("Failed to load attachments with {} chunks ({}). Reason: '{}'", Chunks.size(), RemoteResult.GetError(), RemoteResult.GetErrorReason()); diff --git a/src/zenserver/projectstore/remoteprojectstore.h b/src/zenserver/projectstore/remoteprojectstore.h index 501a5eeec..552b11380 100644 --- a/src/zenserver/projectstore/remoteprojectstore.h +++ b/src/zenserver/projectstore/remoteprojectstore.h @@ -29,6 +29,11 @@ public: IoHash RawHash; }; + struct FinalizeResult : public Result + { + std::unordered_set<IoHash, IoHash::Hasher> Needs; + }; + struct SaveAttachmentResult : public Result { }; @@ -66,7 +71,7 @@ public: virtual SaveResult SaveContainer(const IoBuffer& Payload) = 0; virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash) = 0; - virtual Result FinalizeContainer(const IoHash& RawHash) = 0; + virtual FinalizeResult FinalizeContainer(const IoHash& RawHash) = 0; virtual SaveAttachmentsResult SaveAttachments(const std::vector<SharedBuffer>& Payloads) = 0; virtual LoadContainerResult LoadContainer() = 0; diff --git a/src/zenserver/projectstore/zenremoteprojectstore.cpp b/src/zenserver/projectstore/zenremoteprojectstore.cpp index c25fd2388..57a09e929 100644 --- a/src/zenserver/projectstore/zenremoteprojectstore.cpp +++ b/src/zenserver/projectstore/zenremoteprojectstore.cpp @@ -234,13 +234,13 @@ public: return Result; }; - virtual Result FinalizeContainer(const IoHash&) override + virtual FinalizeResult FinalizeContainer(const IoHash&) override { Stopwatch Timer; RwLock::ExclusiveLockScope _(SessionsLock); Sessions.clear(); - return {.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500}; + return FinalizeResult{Result{.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500}}; } virtual LoadContainerResult LoadContainer() override |