aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2023-09-19 06:04:10 -0400
committerGitHub <[email protected]>2023-09-19 12:04:10 +0200
commit46e17d04154be01d26b2f30afcda37a11ba290fc (patch)
tree39d56fc8e3d0bb289aebcd0155fc68b8f7d582bb /src
parenthandle errors in spdlog gracefully (#410) (diff)
downloadzen-46e17d04154be01d26b2f30afcda37a11ba290fc.tar.xz
zen-46e17d04154be01d26b2f30afcda37a11ba290fc.zip
Add retry if FinalizeRef responds with non-empty "Needs" attachments (#409)
* Add retry if FinalizeRef responds with non-empty "Needs" attachments * better logging/progress report * changelog
Diffstat (limited to 'src')
-rw-r--r--src/zenserver/projectstore/fileremoteprojectstore.cpp2
-rw-r--r--src/zenserver/projectstore/jupiterremoteprojectstore.cpp14
-rw-r--r--src/zenserver/projectstore/remoteprojectstore.cpp611
-rw-r--r--src/zenserver/projectstore/remoteprojectstore.h7
-rw-r--r--src/zenserver/projectstore/zenremoteprojectstore.cpp4
5 files changed, 377 insertions, 261 deletions
diff --git a/src/zenserver/projectstore/fileremoteprojectstore.cpp b/src/zenserver/projectstore/fileremoteprojectstore.cpp
index eeb1f71c4..8029d02de 100644
--- a/src/zenserver/projectstore/fileremoteprojectstore.cpp
+++ b/src/zenserver/projectstore/fileremoteprojectstore.cpp
@@ -135,7 +135,7 @@ public:
return Result;
}
- virtual Result FinalizeContainer(const IoHash&) override { return {}; }
+ virtual FinalizeResult FinalizeContainer(const IoHash&) override { return {}; }
virtual LoadContainerResult LoadContainer() override { return LoadContainer(m_Name); }
virtual LoadContainerResult LoadBaseContainer() override
diff --git a/src/zenserver/projectstore/jupiterremoteprojectstore.cpp b/src/zenserver/projectstore/jupiterremoteprojectstore.cpp
index e59bac6d6..cfe273eba 100644
--- a/src/zenserver/projectstore/jupiterremoteprojectstore.cpp
+++ b/src/zenserver/projectstore/jupiterremoteprojectstore.cpp
@@ -124,22 +124,22 @@ public:
return Result;
}
- virtual Result FinalizeContainer(const IoHash& RawHash) override
+ virtual FinalizeResult FinalizeContainer(const IoHash& RawHash) override
{
- const int32_t MaxAttempts = 3;
- CloudCacheResult FinalizeResult;
+ const int32_t MaxAttempts = 3;
+ FinalizeRefResult FinalizeRefResult;
{
CloudCacheSession Session(m_CloudClient.Get());
- for (int32_t Attempt = 0; Attempt < MaxAttempts && !FinalizeResult.Success; Attempt++)
+ for (int32_t Attempt = 0; Attempt < MaxAttempts && !FinalizeRefResult.Success; Attempt++)
{
- FinalizeResult = Session.FinalizeRef(m_Namespace, m_Bucket, m_Key, RawHash);
- if (!FinalizeResult.Success)
+ FinalizeRefResult = Session.FinalizeRef(m_Namespace, m_Bucket, m_Key, RawHash);
+ if (!FinalizeRefResult.Success)
{
Sleep(100 * (Attempt + 1));
}
}
}
- Result Result{ConvertResult(FinalizeResult)};
+ FinalizeResult Result{ConvertResult(FinalizeRefResult), {FinalizeRefResult.Needs.begin(), FinalizeRefResult.Needs.end()}};
if (Result.ErrorCode)
{
Result.Reason = fmt::format("Failed finalizing oplog container to {}/{}/{}/{}. Reason: '{}'",
diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp
index aca9410a2..ea744eb35 100644
--- a/src/zenserver/projectstore/remoteprojectstore.cpp
+++ b/src/zenserver/projectstore/remoteprojectstore.cpp
@@ -175,6 +175,8 @@ CreateBlock(WorkerThreadPool& WorkerPool,
const std::function<void(CompressedBuffer&&, const IoHash&)>& AsyncOnBlock,
AsyncRemoteResult& RemoteResult)
{
+ ZEN_INFO("Generating block with {} attachments", ChunksInBlock.size());
+
OpSectionsLatch.AddCount(1);
WorkerPool.ScheduleWork(
[&Blocks, &SectionsLock, &OpSectionsLatch, BlockIndex, Chunks = std::move(ChunksInBlock), &AsyncOnBlock, &RemoteResult]() mutable {
@@ -586,7 +588,6 @@ BuildContainer(CidStore& ChunkStore,
if (BlockSize >= MaxBlockSize && (CurrentOpLSN != LastLSNOp))
{
- ZEN_INFO("Generating block with {} attachments", ChunksInBlock.size());
size_t BlockIndex = AddBlock(BlocksLock, Blocks);
if (BuildBlocks)
{
@@ -601,6 +602,7 @@ BuildContainer(CidStore& ChunkStore,
}
else
{
+ ZEN_INFO("Bulk group {} attachments", BlockAttachmentHashes.size());
OnBlockChunks(BlockAttachmentHashes);
}
{
@@ -619,7 +621,6 @@ BuildContainer(CidStore& ChunkStore,
}
if (BlockSize > 0)
{
- ZEN_INFO("Generating block with {} attachments", ChunksInBlock.size());
size_t BlockIndex = AddBlock(BlocksLock, Blocks);
if (BuildBlocks)
{
@@ -634,6 +635,7 @@ BuildContainer(CidStore& ChunkStore,
}
else
{
+ ZEN_INFO("Bulk group {} attachments", BlockAttachmentHashes.size());
OnBlockChunks(BlockAttachmentHashes);
}
{
@@ -810,6 +812,317 @@ BuildContainer(CidStore& ChunkStore,
RemoteResult);
return RemoteProjectStore::LoadContainerResult{RemoteResult.ConvertResult(), ContainerObject};
}
+void
+UploadAttachments(WorkerThreadPool& WorkerPool,
+ CidStore& ChunkStore,
+ RemoteProjectStore& RemoteStore,
+ const std::unordered_set<IoHash, IoHash::Hasher>& LargeAttachments,
+ const std::vector<std::vector<IoHash>>& BlockChunks,
+ const std::unordered_map<IoHash, IoBuffer, IoHash::Hasher>& CreatedBlocks,
+ const tsl::robin_map<IoHash, IoBuffer, IoHash::Hasher>& TempAttachments,
+ const std::unordered_set<IoHash, IoHash::Hasher>& Needs,
+ bool ForceAll,
+ AsyncRemoteResult& RemoteResult,
+ JobContext* OptionalContext)
+{
+ using namespace std::literals;
+
+ if (Needs.empty() && !ForceAll)
+ {
+ return;
+ }
+
+ ReportMessage(OptionalContext, "Filtering needed attachments...");
+
+ std::unordered_set<IoHash, IoHash::Hasher> AttachmentsToUpload;
+
+ size_t BlockAttachmentCountToUpload = 0;
+ size_t LargeAttachmentCountToUpload = 0;
+ std::atomic<ptrdiff_t> BulkAttachmentCountToUpload = 0;
+ AttachmentsToUpload.reserve(ForceAll ? CreatedBlocks.size() + LargeAttachments.size() : Needs.size());
+
+ for (const auto& CreatedBlock : CreatedBlocks)
+ {
+ if (ForceAll || Needs.contains(CreatedBlock.first))
+ {
+ AttachmentsToUpload.insert(CreatedBlock.first);
+ BlockAttachmentCountToUpload++;
+ }
+ }
+ for (const IoHash& LargeAttachment : LargeAttachments)
+ {
+ if (ForceAll || Needs.contains(LargeAttachment))
+ {
+ AttachmentsToUpload.insert(LargeAttachment);
+ LargeAttachmentCountToUpload++;
+ }
+ }
+ for (const std::vector<IoHash>& BlockHashes : BlockChunks)
+ {
+ if (ForceAll)
+ {
+ AttachmentsToUpload.insert(BlockHashes.begin(), BlockHashes.end());
+ BulkAttachmentCountToUpload += BlockHashes.size();
+ continue;
+ }
+ for (const IoHash& Hash : BlockHashes)
+ {
+ if (Needs.contains(Hash))
+ {
+ AttachmentsToUpload.insert(Hash);
+ BulkAttachmentCountToUpload++;
+ }
+ }
+ }
+
+ for (const IoHash& Needed : Needs)
+ {
+ if (!AttachmentsToUpload.contains(Needed))
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound),
+ "Invalid attachment",
+ fmt::format("Upload requested of unknown attachement '{}'", Needed));
+ ZEN_ERROR("Failed to upload attachment '{}'. ({}). Reason: '{}'",
+ Needed,
+ RemoteResult.GetError(),
+ RemoteResult.GetErrorReason());
+ return;
+ }
+ }
+
+ if (AttachmentsToUpload.empty())
+ {
+ ReportMessage(OptionalContext, "No attachments needed");
+ return;
+ }
+
+ if (OptionalContext && OptionalContext->CancelFlag)
+ {
+ if (!RemoteResult.IsError())
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ }
+ return;
+ }
+
+ ReportMessage(OptionalContext,
+ fmt::format("Saving {} attachments ({} blocks, {} attachments, {} bulk attachments)",
+ AttachmentsToUpload.size(),
+ BlockAttachmentCountToUpload,
+ LargeAttachmentCountToUpload,
+ BulkAttachmentCountToUpload.load()));
+
+ ptrdiff_t AttachmentsToSave(0);
+ Latch SaveAttachmentsLatch(1);
+
+ for (const IoHash& RawHash : LargeAttachments)
+ {
+ if (RemoteResult.IsError())
+ {
+ break;
+ }
+ if (!AttachmentsToUpload.contains(RawHash))
+ {
+ continue;
+ }
+
+ IoBuffer Payload;
+ if (auto BlockIt = CreatedBlocks.find(RawHash); BlockIt != CreatedBlocks.end())
+ {
+ Payload = BlockIt->second;
+ }
+ else if (auto LooseTmpFileIt = TempAttachments.find(RawHash); LooseTmpFileIt != TempAttachments.end())
+ {
+ Payload = LooseTmpFileIt->second;
+ }
+
+ SaveAttachmentsLatch.AddCount(1);
+ AttachmentsToSave++;
+ WorkerPool.ScheduleWork(
+ [&ChunkStore, &RemoteStore, &SaveAttachmentsLatch, &RemoteResult, RawHash, &CreatedBlocks, TempPayload = std::move(Payload)]() {
+ auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); });
+ if (RemoteResult.IsError())
+ {
+ return;
+ }
+ IoBuffer Payload = TempPayload ? TempPayload : ChunkStore.FindChunkByCid(RawHash);
+ if (!Payload)
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound),
+ fmt::format("Failed to find attachment {}", RawHash),
+ {});
+ ZEN_ERROR("Failed to save attachment '{}' ({}). Reason: '{}'",
+ RawHash,
+ RemoteResult.GetErrorReason(),
+ RemoteResult.GetError());
+ return;
+ }
+
+ RemoteProjectStore::SaveAttachmentResult Result =
+ RemoteStore.SaveAttachment(CompositeBuffer(SharedBuffer(Payload)), RawHash);
+ if (Result.ErrorCode)
+ {
+ RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
+ ZEN_ERROR("Failed to save attachment '{}', {} ({}). Reason: '{}'",
+ RawHash,
+ NiceBytes(Payload.GetSize()),
+ RemoteResult.GetError(),
+ RemoteResult.GetErrorReason());
+ return;
+ }
+ ZEN_DEBUG("Saved attachment {}, {} in {}",
+ RawHash,
+ NiceBytes(Payload.GetSize()),
+ NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)));
+ return;
+ });
+ }
+
+ if (OptionalContext && OptionalContext->CancelFlag)
+ {
+ if (!RemoteResult.IsError())
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ }
+ return;
+ }
+
+ for (auto& It : CreatedBlocks)
+ {
+ if (RemoteResult.IsError())
+ {
+ break;
+ }
+ const IoHash& RawHash = It.first;
+ if (!AttachmentsToUpload.contains(RawHash))
+ {
+ continue;
+ }
+ IoBuffer Payload = It.second;
+ ZEN_ASSERT(Payload);
+ SaveAttachmentsLatch.AddCount(1);
+ AttachmentsToSave++;
+ WorkerPool.ScheduleWork([&ChunkStore, &RemoteStore, &SaveAttachmentsLatch, &RemoteResult, Payload = std::move(Payload), RawHash]() {
+ auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); });
+ if (RemoteResult.IsError())
+ {
+ return;
+ }
+
+ RemoteProjectStore::SaveAttachmentResult Result = RemoteStore.SaveAttachment(CompositeBuffer(SharedBuffer(Payload)), RawHash);
+ if (Result.ErrorCode)
+ {
+ RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
+ ZEN_ERROR("Failed to save attachment '{}', {} ({}). Reason: '{}'",
+ RawHash,
+ NiceBytes(Payload.GetSize()),
+ RemoteResult.GetError(),
+ RemoteResult.GetErrorReason());
+ return;
+ }
+
+ ZEN_DEBUG("Saved attachment {}, {} in {}",
+ RawHash,
+ NiceBytes(Payload.GetSize()),
+ NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)));
+ return;
+ });
+ }
+
+ if (OptionalContext && OptionalContext->CancelFlag)
+ {
+ if (!RemoteResult.IsError())
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ }
+ return;
+ }
+
+ for (const std::vector<IoHash>& Chunks : BlockChunks)
+ {
+ if (RemoteResult.IsError())
+ {
+ break;
+ }
+
+ std::vector<IoHash> NeededChunks;
+ NeededChunks.reserve(Chunks.size());
+ for (const IoHash& Chunk : Chunks)
+ {
+ if (AttachmentsToUpload.contains(Chunk))
+ {
+ NeededChunks.push_back(Chunk);
+ }
+ }
+ if (NeededChunks.empty())
+ {
+ continue;
+ }
+
+ SaveAttachmentsLatch.AddCount(1);
+ AttachmentsToSave++;
+ WorkerPool.ScheduleWork([&RemoteStore,
+ &ChunkStore,
+ &SaveAttachmentsLatch,
+ &RemoteResult,
+ &Chunks,
+ NeededChunks = std::move(NeededChunks),
+ &BulkAttachmentCountToUpload]() {
+ auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); });
+ std::vector<SharedBuffer> ChunkBuffers;
+ ChunkBuffers.reserve(NeededChunks.size());
+ for (const IoHash& Chunk : NeededChunks)
+ {
+ IoBuffer ChunkPayload = ChunkStore.FindChunkByCid(Chunk);
+ if (!ChunkPayload)
+ {
+ RemoteResult.SetError(static_cast<int32_t>(HttpResponseCode::NotFound),
+ fmt::format("Missing chunk {}"sv, Chunk),
+ fmt::format("Unable to fetch attachment {} required by the oplog"sv, Chunk));
+ ChunkBuffers.clear();
+ break;
+ }
+ ChunkBuffers.emplace_back(SharedBuffer(std::move(ChunkPayload)));
+ }
+ RemoteProjectStore::SaveAttachmentsResult Result = RemoteStore.SaveAttachments(ChunkBuffers);
+ if (Result.ErrorCode)
+ {
+ RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
+ ZEN_ERROR("Failed to save attachments with {} chunks ({}). Reason: '{}'",
+ Chunks.size(),
+ RemoteResult.GetError(),
+ RemoteResult.GetErrorReason());
+ return;
+ }
+ ZEN_DEBUG("Saved {} bulk attachments in {}",
+ Chunks.size(),
+ NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)));
+ BulkAttachmentCountToUpload.fetch_sub(Chunks.size());
+ });
+ }
+
+ SaveAttachmentsLatch.CountDown();
+ while (!SaveAttachmentsLatch.Wait(1000))
+ {
+ ptrdiff_t Remaining = SaveAttachmentsLatch.Remaining();
+ if (OptionalContext && OptionalContext->CancelFlag)
+ {
+ if (!RemoteResult.IsError())
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ }
+ }
+ ReportProgress(
+ OptionalContext,
+ fmt::format("Saving attachments, {} remaining...", BlockChunks.empty() ? Remaining : BulkAttachmentCountToUpload.load()),
+ AttachmentsToSave,
+ Remaining);
+ }
+ if (AttachmentsToSave > 0)
+ {
+ ReportProgress(OptionalContext, fmt::format("Saving attachments, {} remaining...", 0), AttachmentsToSave, 0);
+ }
+}
RemoteProjectStore::Result
SaveOplog(CidStore& ChunkStore,
@@ -949,6 +1262,7 @@ SaveOplog(CidStore& ChunkStore,
else
{
CbArrayView BlocksArray = BaseContainerResult.ContainerObject["blocks"sv].AsArrayView();
+ KnownBlocks.reserve(BlocksArray.Num());
for (CbFieldView BlockField : BlocksArray)
{
CbObjectView BlockView = BlockField.AsObjectView();
@@ -988,7 +1302,6 @@ SaveOplog(CidStore& ChunkStore,
EmbedLooseFiles ? &TempAttachments : nullptr,
OptionalContext,
/* out */ RemoteResult);
-
if (!RemoteResult.IsError())
{
if (OptionalContext && OptionalContext->CancelFlag)
@@ -1014,7 +1327,19 @@ SaveOplog(CidStore& ChunkStore,
ZEN_DEBUG("Saved container in {}", NiceTimeSpanMs(static_cast<uint64_t>(ContainerSaveResult.ElapsedSeconds * 1000)));
}
- if (!ContainerSaveResult.Needs.empty() || ForceUpload)
+ UploadAttachments(WorkerPool,
+ ChunkStore,
+ RemoteStore,
+ LargeAttachments,
+ BlockChunks,
+ CreatedBlocks,
+ TempAttachments,
+ ContainerSaveResult.Needs,
+ ForceUpload,
+ RemoteResult,
+ OptionalContext);
+
+ while (!RemoteResult.IsError())
{
if (OptionalContext && OptionalContext->CancelFlag)
{
@@ -1024,251 +1349,8 @@ SaveOplog(CidStore& ChunkStore,
return Result;
}
- ReportMessage(OptionalContext, "Filtering needed attachments...");
-
- std::vector<IoHash> NeededLargeAttachments;
- std::unordered_set<IoHash, IoHash::Hasher> NeededOtherAttachments;
- NeededLargeAttachments.reserve(LargeAttachments.size());
- NeededOtherAttachments.reserve(CreatedBlocks.size());
- if (ForceUpload)
- {
- // TODO: Check ForceUpload - it should add all attachments and blocks regardless if Needs is empty or not
- NeededLargeAttachments.insert(NeededLargeAttachments.end(), LargeAttachments.begin(), LargeAttachments.end());
- }
- else
- {
- for (const IoHash& RawHash : ContainerSaveResult.Needs)
- {
- if (LargeAttachments.contains(RawHash))
- {
- NeededLargeAttachments.push_back(RawHash);
- continue;
- }
- NeededOtherAttachments.insert(RawHash);
- }
- }
-
- ptrdiff_t AttachmentsToSave(0);
- Latch SaveAttachmentsLatch(1);
- if (!NeededLargeAttachments.empty())
- {
- if (OptionalContext && OptionalContext->CancelFlag)
- {
- RemoteProjectStore::Result Result = {.ErrorCode = 0,
- .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500,
- .Text = "Operation cancelled"};
- return Result;
- }
- ReportMessage(OptionalContext, "Saving large attachments...");
- for (const IoHash& RawHash : NeededLargeAttachments)
- {
- if (RemoteResult.IsError())
- {
- break;
- }
- IoBuffer Payload;
- if (auto BlockIt = CreatedBlocks.find(RawHash); BlockIt != CreatedBlocks.end())
- {
- Payload = std::move(BlockIt->second);
- }
- else if (auto LooseTmpFileIt = TempAttachments.find(RawHash); LooseTmpFileIt != TempAttachments.end())
- {
- Payload = LooseTmpFileIt->second;
- TempAttachments.erase(LooseTmpFileIt);
- }
-
- SaveAttachmentsLatch.AddCount(1);
- AttachmentsToSave++;
- WorkerPool.ScheduleWork([&ChunkStore,
- &RemoteStore,
- &SaveAttachmentsLatch,
- &RemoteResult,
- RawHash,
- &CreatedBlocks,
- TempPayload = std::move(Payload)]() {
- auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); });
- if (RemoteResult.IsError())
- {
- return;
- }
- IoBuffer Payload = TempPayload ? TempPayload : ChunkStore.FindChunkByCid(RawHash);
- if (!Payload)
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound),
- fmt::format("Failed to find attachment {}", RawHash),
- {});
- ZEN_ERROR("Failed to build container ({}). Reason: '{}'",
- RemoteResult.GetErrorReason(),
- RemoteResult.GetError());
- return;
- }
-
- RemoteProjectStore::SaveAttachmentResult Result =
- RemoteStore.SaveAttachment(CompositeBuffer(SharedBuffer(Payload)), RawHash);
- if (Result.ErrorCode)
- {
- RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
- ZEN_ERROR("Failed to save attachment '{}', {} ({}). Reason: '{}'",
- RawHash,
- NiceBytes(Payload.GetSize()),
- RemoteResult.GetError(),
- RemoteResult.GetErrorReason());
- return;
- }
- ZEN_DEBUG("Saved attachment {}, {} in {}",
- RawHash,
- NiceBytes(Payload.GetSize()),
- NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)));
- return;
- });
- }
- }
- TempAttachments.clear();
-
- if (!CreatedBlocks.empty())
- {
- ReportMessage(OptionalContext, "Saving created block attachments...");
- for (auto& It : CreatedBlocks)
- {
- if (RemoteResult.IsError())
- {
- break;
- }
- const IoHash& RawHash = It.first;
- if (ForceUpload || NeededOtherAttachments.contains(RawHash))
- {
- IoBuffer Payload = It.second;
- ZEN_ASSERT(Payload);
- SaveAttachmentsLatch.AddCount(1);
- AttachmentsToSave++;
- WorkerPool.ScheduleWork(
- [&ChunkStore, &RemoteStore, &SaveAttachmentsLatch, &RemoteResult, Payload = std::move(Payload), RawHash]() {
- auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); });
- if (RemoteResult.IsError())
- {
- return;
- }
-
- RemoteProjectStore::SaveAttachmentResult Result =
- RemoteStore.SaveAttachment(CompositeBuffer(SharedBuffer(Payload)), RawHash);
- if (Result.ErrorCode)
- {
- RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
- ZEN_ERROR("Failed to save attachment '{}', {} ({}). Reason: '{}'",
- RawHash,
- NiceBytes(Payload.GetSize()),
- RemoteResult.GetError(),
- RemoteResult.GetErrorReason());
- return;
- }
-
- ZEN_DEBUG("Saved attachment {}, {} in {}",
- RawHash,
- NiceBytes(Payload.GetSize()),
- NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)));
- return;
- });
- }
- It.second = {};
- }
- }
-
- if (!BlockChunks.empty())
- {
- ReportMessage(OptionalContext, "Saving chunk block attachments...");
- for (const std::vector<IoHash>& Chunks : BlockChunks)
- {
- if (RemoteResult.IsError())
- {
- break;
- }
- std::vector<IoHash> NeededChunks;
- if (ForceUpload)
- {
- NeededChunks = Chunks;
- }
- else
- {
- NeededChunks.reserve(Chunks.size());
- for (const IoHash& Chunk : Chunks)
- {
- if (NeededOtherAttachments.contains(Chunk))
- {
- NeededChunks.push_back(Chunk);
- }
- }
- if (NeededChunks.empty())
- {
- continue;
- }
- }
- SaveAttachmentsLatch.AddCount(1);
- AttachmentsToSave++;
- WorkerPool.ScheduleWork([&RemoteStore,
- &ChunkStore,
- &SaveAttachmentsLatch,
- &RemoteResult,
- &Chunks,
- NeededChunks = std::move(NeededChunks),
- ForceUpload]() {
- auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); });
- std::vector<SharedBuffer> ChunkBuffers;
- ChunkBuffers.reserve(NeededChunks.size());
- for (const IoHash& Chunk : NeededChunks)
- {
- IoBuffer ChunkPayload = ChunkStore.FindChunkByCid(Chunk);
- if (!ChunkPayload)
- {
- RemoteResult.SetError(static_cast<int32_t>(HttpResponseCode::NotFound),
- fmt::format("Missing chunk {}"sv, Chunk),
- fmt::format("Unable to fetch attachment {} required by the oplog"sv, Chunk));
- ChunkBuffers.clear();
- break;
- }
- ChunkBuffers.emplace_back(SharedBuffer(std::move(ChunkPayload)));
- }
- RemoteProjectStore::SaveAttachmentsResult Result = RemoteStore.SaveAttachments(ChunkBuffers);
- if (Result.ErrorCode)
- {
- RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
- ZEN_ERROR("Failed to save attachments with {} chunks ({}). Reason: '{}'",
- Chunks.size(),
- RemoteResult.GetError(),
- RemoteResult.GetErrorReason());
- return;
- }
- ZEN_DEBUG("Saved {} bulk attachments in {}",
- Chunks.size(),
- NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)));
- });
- }
- }
- SaveAttachmentsLatch.CountDown();
- while (!SaveAttachmentsLatch.Wait(1000))
- {
- ptrdiff_t Remaining = SaveAttachmentsLatch.Remaining();
- if (OptionalContext && OptionalContext->CancelFlag)
- {
- if (!RemoteResult.IsError())
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
- }
- }
- ReportProgress(OptionalContext,
- fmt::format("Saving attachments, {} remaining...", Remaining),
- AttachmentsToSave,
- Remaining);
- }
- if (AttachmentsToSave > 0)
- {
- ReportProgress(OptionalContext, fmt::format("Saving attachments, {} remaining...", 0), AttachmentsToSave, 0);
- }
- }
-
- if (!RemoteResult.IsError())
- {
ReportMessage(OptionalContext, "Finalizing oplog container...");
- RemoteProjectStore::Result ContainerFinalizeResult = RemoteStore.FinalizeContainer(ContainerSaveResult.RawHash);
+ RemoteProjectStore::FinalizeResult ContainerFinalizeResult = RemoteStore.FinalizeContainer(ContainerSaveResult.RawHash);
if (ContainerFinalizeResult.ErrorCode)
{
RemoteResult.SetError(ContainerFinalizeResult.ErrorCode, ContainerFinalizeResult.Reason, ContainerFinalizeResult.Text);
@@ -1278,9 +1360,38 @@ SaveOplog(CidStore& ChunkStore,
RemoteResult.GetErrorReason());
}
ZEN_DEBUG("Finalized container in {}", NiceTimeSpanMs(static_cast<uint64_t>(ContainerFinalizeResult.ElapsedSeconds * 1000)));
+ if (ContainerFinalizeResult.Needs.empty())
+ {
+ break;
+ }
+
+ if (OptionalContext && OptionalContext->CancelFlag)
+ {
+ RemoteProjectStore::Result Result = {.ErrorCode = 0,
+ .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500,
+ .Text = "Operation cancelled"};
+ return Result;
+ }
+
+ ReportMessage(OptionalContext,
+ fmt::format("Finalize reported {} missing attachments...", ContainerFinalizeResult.Needs.size()));
+
+ UploadAttachments(WorkerPool,
+ ChunkStore,
+ RemoteStore,
+ LargeAttachments,
+ BlockChunks,
+ CreatedBlocks,
+ TempAttachments,
+ ContainerFinalizeResult.Needs,
+ false,
+ RemoteResult,
+ OptionalContext);
}
- }
+ TempAttachments.clear();
+ CreatedBlocks.clear();
+ }
RemoteProjectStore::Result Result = RemoteResult.ConvertResult();
Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500;
ZEN_INFO("Saved oplog {} in {}",
@@ -1450,7 +1561,7 @@ LoadOplog(CidStore& ChunkStore,
if (Result.ErrorCode)
{
RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
- ZEN_ERROR("Failed to attachments with {} chunks ({}). Reason: '{}'",
+ ZEN_ERROR("Failed to load attachments with {} chunks ({}). Reason: '{}'",
Chunks.size(),
RemoteResult.GetError(),
RemoteResult.GetErrorReason());
diff --git a/src/zenserver/projectstore/remoteprojectstore.h b/src/zenserver/projectstore/remoteprojectstore.h
index 501a5eeec..552b11380 100644
--- a/src/zenserver/projectstore/remoteprojectstore.h
+++ b/src/zenserver/projectstore/remoteprojectstore.h
@@ -29,6 +29,11 @@ public:
IoHash RawHash;
};
+ struct FinalizeResult : public Result
+ {
+ std::unordered_set<IoHash, IoHash::Hasher> Needs;
+ };
+
struct SaveAttachmentResult : public Result
{
};
@@ -66,7 +71,7 @@ public:
virtual SaveResult SaveContainer(const IoBuffer& Payload) = 0;
virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash) = 0;
- virtual Result FinalizeContainer(const IoHash& RawHash) = 0;
+ virtual FinalizeResult FinalizeContainer(const IoHash& RawHash) = 0;
virtual SaveAttachmentsResult SaveAttachments(const std::vector<SharedBuffer>& Payloads) = 0;
virtual LoadContainerResult LoadContainer() = 0;
diff --git a/src/zenserver/projectstore/zenremoteprojectstore.cpp b/src/zenserver/projectstore/zenremoteprojectstore.cpp
index c25fd2388..57a09e929 100644
--- a/src/zenserver/projectstore/zenremoteprojectstore.cpp
+++ b/src/zenserver/projectstore/zenremoteprojectstore.cpp
@@ -234,13 +234,13 @@ public:
return Result;
};
- virtual Result FinalizeContainer(const IoHash&) override
+ virtual FinalizeResult FinalizeContainer(const IoHash&) override
{
Stopwatch Timer;
RwLock::ExclusiveLockScope _(SessionsLock);
Sessions.clear();
- return {.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500};
+ return FinalizeResult{Result{.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500}};
}
virtual LoadContainerResult LoadContainer() override