diff options
| author | Dan Engelbrecht <[email protected]> | 2024-03-25 16:28:42 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2024-03-25 16:28:42 +0100 |
| commit | a4fbd98e203345059c58012d3569d18a4f64b6bb (patch) | |
| tree | 92ffbaeb42e93b9afcc1d31044b5d0c3bf3632e6 /src | |
| parent | use batch request for checking existing blocks as Jupiter is now fixed (#20) (diff) | |
| download | zen-a4fbd98e203345059c58012d3569d18a4f64b6bb.tar.xz zen-a4fbd98e203345059c58012d3569d18a4f64b6bb.zip | |
add a limit to the number of times we attempt to finalize (#22)
- Improvement: Add limit to the number of times we attempt to finalize and exported oplog
- Improvement: Switch to large thread pool when executing oplog export/import
- Improvement: Clean up reporting of missing attachments in oplog export/import
- Improvement: Remove double-reporting of abort reason for oplog export/import
Diffstat (limited to 'src')
| -rw-r--r-- | src/zen/cmds/projectstore_cmd.cpp | 6 | ||||
| -rw-r--r-- | src/zencore/jobqueue.cpp | 1 | ||||
| -rw-r--r-- | src/zenserver/projectstore/remoteprojectstore.cpp | 214 |
3 files changed, 137 insertions, 84 deletions
diff --git a/src/zen/cmds/projectstore_cmd.cpp b/src/zen/cmds/projectstore_cmd.cpp index dadab22ec..40ba48137 100644 --- a/src/zen/cmds/projectstore_cmd.cpp +++ b/src/zen/cmds/projectstore_cmd.cpp @@ -140,7 +140,9 @@ namespace { { double QueueTimeS = StatusObject["QueueTimeS"].AsDouble(); double RuntimeS = StatusObject["RunTimeS"].AsDouble(); - ZEN_CONSOLE("Completed: QueueTime: {:.3} s, RunTime: {:.3} s", QueueTimeS, RuntimeS); + ZEN_CONSOLE("Completed: QueueTime: {}, RunTime: {}", + NiceTimeSpanMs(static_cast<uint64_t>(QueueTimeS * 1000.0)), + NiceTimeSpanMs(static_cast<uint64_t>(RuntimeS * 1000.0))); } break; } @@ -160,7 +162,7 @@ namespace { if (Status == "Queued") { double QueueTimeS = StatusObject["QueueTimeS"].AsDouble(); - ZEN_CONSOLE("Queued, waited {:.3} s...", QueueTimeS); + ZEN_CONSOLE("Queued, waited {}...", NiceTimeSpanMs(static_cast<uint64_t>(QueueTimeS * 1000.0))); } uint32_t InterruptCounter = SignalCounter[SIGINT].load(); uint32_t BreakCounter = 0; diff --git a/src/zencore/jobqueue.cpp b/src/zencore/jobqueue.cpp index e92817431..86c08cda9 100644 --- a/src/zencore/jobqueue.cpp +++ b/src/zencore/jobqueue.cpp @@ -363,7 +363,6 @@ public: { ZEN_DEBUG("Background job {}:'{}' aborted. Reason: '{}'", CurrentJob->Id.Id, CurrentJob->Name, Ex.what()); QueueLock.WithExclusiveLock([&]() { - CurrentJob->State.Messages.push_back(Ex.what()); CurrentJob->State.AbortReason = Ex.what(); CurrentJob->EndTick = JobClock::Now(); CurrentJob->WorkerThreadId = 0; diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp index d8dfb215d..df560283c 100644 --- a/src/zenserver/projectstore/remoteprojectstore.cpp +++ b/src/zenserver/projectstore/remoteprojectstore.cpp @@ -359,20 +359,20 @@ BuildContainer(CidStore& ChunkStore, std::filesystem::path FilePath = Project.RootDir / ServerPath; if (!std::filesystem::is_regular_file(FilePath)) { - ExtendableStringBuilder<1024> Sb; - Sb.Append("Failed to find attachment '"); - Sb.Append(FilePath.string()); - Sb.Append("' for op: \n"); - Op.ToJson(Sb); - - ReportMessage(OptionalContext, fmt::format("Missing attachment '{}': {}", FilePath, Sb.ToView())); + ReportMessage(OptionalContext, + fmt::format("Missing attachment '{}' for op '{}'", FilePath, View["id"sv].AsObjectId())); if (IgnoreMissingAttachments) { continue; } else { - throw std::runtime_error(fmt::format("failed to open file '{}'", FilePath)); + ExtendableStringBuilder<1024> Sb; + Sb.Append("Failed to find attachment '"); + Sb.Append(FilePath.string()); + Sb.Append("' for op: \n"); + View.ToJson(Sb); + throw std::runtime_error(Sb.ToString()); } } @@ -886,18 +886,18 @@ BuildContainer(CidStore& ChunkStore, ZEN_ASSERT(It != UploadAttachments.end()); std::optional<CbObject> Op = Oplog.GetOpByKey(It->second.Key); ZEN_ASSERT(Op.has_value()); - ExtendableStringBuilder<1024> Sb; - Sb.Append("Failed to find attachment '"); - Sb.Append(AttachmentHash.ToHexString()); - Sb.Append("' for op: \n"); - Op.value().ToJson(Sb); if (IgnoreMissingAttachments) { - ReportMessage(OptionalContext, fmt::format("Missing attachment '{}': {}", AttachmentHash, Sb.ToView())); + ReportMessage(OptionalContext, fmt::format("Missing attachment '{}' for op '{}'", AttachmentHash, It->second.Key)); } else { + ExtendableStringBuilder<1024> Sb; + Sb.Append("Failed to find attachment '"); + Sb.Append(AttachmentHash.ToHexString()); + Sb.Append("' for op: \n"); + Op.value().ToJson(Sb); RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), Sb.ToString(), {}); return {}; } @@ -1362,7 +1362,7 @@ BuildContainer(CidStore& ChunkStore, const std::function<void(std::vector<std::pair<IoHash, FetchChunkFunc>>&&)>& OnBlockChunks, bool EmbedLooseFiles) { - WorkerThreadPool& WorkerPool = GetSmallWorkerPool(); + WorkerThreadPool& WorkerPool = GetLargeWorkerPool(); AsyncRemoteResult RemoteResult; CbObject ContainerObject = BuildContainer(ChunkStore, @@ -1710,7 +1710,8 @@ SaveOplog(CidStore& ChunkStore, UploadInfo Info; - WorkerThreadPool& WorkerPool = GetSmallWorkerPool(); + WorkerThreadPool& WorkerPool = GetLargeWorkerPool(); + WorkerThreadPool& NetworkWorkerPool = GetSmallWorkerPool(); const RemoteProjectStore::RemoteStoreInfo RemoteStoreInfo = RemoteStore.GetInfo(); @@ -1806,7 +1807,9 @@ SaveOplog(CidStore& ChunkStore, } else { - ReportMessage(OptionalContext, fmt::format("Loaded oplog base container in {:.3} s", BaseContainerResult.ElapsedSeconds)); + ReportMessage(OptionalContext, + fmt::format("Loaded oplog base container in {}", + NiceTimeSpanMs(static_cast<uint64_t>(BaseContainerResult.ElapsedSeconds * 1000.0)))); CbArrayView BlocksArray = BaseContainerResult.ContainerObject["blocks"sv].AsArrayView(); @@ -1823,10 +1826,11 @@ SaveOplog(CidStore& ChunkStore, if (HasResult.ErrorCode == 0) { ReportMessage(OptionalContext, - fmt::format("Checked the existance of {} block{} in remote store in {}", + fmt::format("Checked the existance of {} block{} in remote store in {}, found {} eligeble blocks", BlockHashes.size(), - BlockHashes.size() > 1 ? "S"sv : ""sv, - NiceTimeSpanMs(static_cast<uint64_t>(HasResult.ElapsedSeconds * 1000)))); + BlockHashes.size() > 1 ? "s"sv : ""sv, + BlockHashes.size() - HasResult.Needs.size(), + NiceTimeSpanMs(static_cast<uint64_t>(HasResult.ElapsedSeconds * 1000.0)))); if (HasResult.Needs.size() < BlocksArray.Num()) { KnownBlocks.reserve(BlocksArray.Num() - HasResult.Needs.size()); @@ -1917,10 +1921,10 @@ SaveOplog(CidStore& ChunkStore, ReportMessage(OptionalContext, fmt::format("Saved container '{}' in {}", RemoteStoreInfo.ContainerName, - NiceTimeSpanMs(static_cast<uint64_t>(ContainerSaveResult.ElapsedSeconds * 1000)))); + NiceTimeSpanMs(static_cast<uint64_t>(ContainerSaveResult.ElapsedSeconds * 1000.0)))); } - UploadAttachments(WorkerPool, + UploadAttachments(NetworkWorkerPool, ChunkStore, RemoteStore, LargeAttachments, @@ -1933,6 +1937,7 @@ SaveOplog(CidStore& ChunkStore, RemoteResult, OptionalContext); + uint32_t Try = 0; while (!RemoteResult.IsError()) { if (IsCancelled(OptionalContext)) @@ -1960,7 +1965,7 @@ SaveOplog(CidStore& ChunkStore, ReportMessage(OptionalContext, fmt::format("Finalized container '{}' in {}", RemoteStoreInfo.ContainerName, - NiceTimeSpanMs(static_cast<uint64_t>(ContainerFinalizeResult.ElapsedSeconds * 1000)))); + NiceTimeSpanMs(static_cast<uint64_t>(ContainerFinalizeResult.ElapsedSeconds * 1000.0)))); if (ContainerFinalizeResult.Needs.empty()) { @@ -1975,23 +1980,46 @@ SaveOplog(CidStore& ChunkStore, return Result; } - ReportMessage(OptionalContext, - fmt::format("Finalize of container '{}' reported {} missing attachments. Uploading missing attachements.", - RemoteStoreInfo.ContainerName, - ContainerFinalizeResult.Needs.size())); - - UploadAttachments(WorkerPool, - ChunkStore, - RemoteStore, - LargeAttachments, - BlockChunks, - CreatedBlocks, - LooseLargeFiles, - ContainerFinalizeResult.Needs, - false, - Info, - RemoteResult, - OptionalContext); + const uint32_t MaxTries = 8; + if (Try < MaxTries) + { + Try++; + + ReportMessage( + OptionalContext, + fmt::format("Finalize of container '{}' reported {} missing attachments. Uploading missing attachements. Try {}", + RemoteStoreInfo.ContainerName, + ContainerFinalizeResult.Needs.size(), + Try)); + + UploadAttachments(NetworkWorkerPool, + ChunkStore, + RemoteStore, + LargeAttachments, + BlockChunks, + CreatedBlocks, + LooseLargeFiles, + ContainerFinalizeResult.Needs, + false, + Info, + RemoteResult, + OptionalContext); + } + else + { + RemoteResult.SetError( + gsl::narrow<int>(HttpResponseCode::InternalServerError), + "Failed to save oplog container", + fmt::format("Giving up finalize oplog container {} after {} retries, still getting reports of missing attachments", + ContainerSaveResult.RawHash, + ContainerFinalizeResult.Needs.size())); + ReportMessage(OptionalContext, + fmt::format("Failed to finalize oplog container container {} ({}): {}", + ContainerSaveResult.RawHash, + RemoteResult.GetError(), + RemoteResult.GetErrorReason())); + break; + } } LooseLargeFiles.clear(); @@ -2110,6 +2138,10 @@ SaveOplogContainer(ProjectStore::Oplog& Oplog, Chunked.ChunkSequence.push_back(SequenceIndex); } OnChunkedAttachment(Chunked); + ZEN_INFO("Found chunked attachment '{}' ({}) built from {} chunks", + Chunked.RawHash, + NiceBytes(Chunked.RawSize), + Chunked.ChunkHashes.size()); } ReportMessage(OptionalContext, fmt::format("Requesting {} of {} attachment blocks", NeedBlockCount, BlocksArray.Num())); @@ -2206,7 +2238,8 @@ LoadOplog(CidStore& ChunkStore, Stopwatch Timer; - WorkerThreadPool& WorkerPool = GetSmallWorkerPool(); + WorkerThreadPool& WorkerPool = GetLargeWorkerPool(); + WorkerThreadPool& NetworkWorkerPool = GetSmallWorkerPool(); std::unordered_set<IoHash, IoHash::Hasher> Attachments; @@ -2247,7 +2280,7 @@ LoadOplog(CidStore& ChunkStore, }; auto OnNeedBlock = [&RemoteStore, &ChunkStore, - &WorkerPool, + &NetworkWorkerPool, &AttachmentsWorkLatch, &AttachmentCount, &RemoteResult, @@ -2262,14 +2295,14 @@ LoadOplog(CidStore& ChunkStore, { AttachmentsWorkLatch.AddCount(1); AttachmentCount.fetch_add(1); - WorkerPool.ScheduleWork([&RemoteStore, - &ChunkStore, - &AttachmentsWorkLatch, - &RemoteResult, - Chunks = std::move(Chunks), - &Info, - IgnoreMissingAttachments, - OptionalContext]() { + NetworkWorkerPool.ScheduleWork([&RemoteStore, + &ChunkStore, + &AttachmentsWorkLatch, + &RemoteResult, + Chunks = std::move(Chunks), + &Info, + IgnoreMissingAttachments, + OptionalContext]() { auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); }); if (RemoteResult.IsError()) { @@ -2283,12 +2316,9 @@ LoadOplog(CidStore& ChunkStore, Chunks.size(), RemoteResult.GetError(), RemoteResult.GetErrorReason())); + Info.MissingAttachmentCount.fetch_add(1); if (IgnoreMissingAttachments) { - Info.MissingAttachmentCount.fetch_add(1); - } - else - { RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); } return; @@ -2318,15 +2348,15 @@ LoadOplog(CidStore& ChunkStore, } AttachmentsWorkLatch.AddCount(1); AttachmentCount.fetch_add(1); - WorkerPool.ScheduleWork([&AttachmentsWorkLatch, - &ChunkStore, - &RemoteStore, - BlockHash, - &RemoteResult, - Chunks = std::move(Chunks), - &Info, - IgnoreMissingAttachments, - OptionalContext]() { + NetworkWorkerPool.ScheduleWork([&AttachmentsWorkLatch, + &ChunkStore, + &RemoteStore, + BlockHash, + &RemoteResult, + Chunks = std::move(Chunks), + &Info, + IgnoreMissingAttachments, + OptionalContext]() { auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); }); if (RemoteResult.IsError()) { @@ -2340,11 +2370,8 @@ LoadOplog(CidStore& ChunkStore, BlockHash, RemoteResult.GetError(), RemoteResult.GetErrorReason())); - if (IgnoreMissingAttachments) - { - Info.MissingAttachmentCount.fetch_add(1); - } - else + Info.MissingAttachmentCount.fetch_add(1); + if (!IgnoreMissingAttachments) { RemoteResult.SetError(BlockResult.ErrorCode, BlockResult.Reason, BlockResult.Text); } @@ -2398,7 +2425,7 @@ LoadOplog(CidStore& ChunkStore, auto OnNeedAttachment = [&RemoteStore, &ChunkStore, - &WorkerPool, + &NetworkWorkerPool, &AttachmentsWorkLatch, &RemoteResult, &Attachments, @@ -2417,7 +2444,7 @@ LoadOplog(CidStore& ChunkStore, AttachmentsWorkLatch.AddCount(1); AttachmentCount.fetch_add(1); - WorkerPool.ScheduleWork( + NetworkWorkerPool.ScheduleWork( [&RemoteStore, &ChunkStore, &RemoteResult, &AttachmentsWorkLatch, RawHash, &Info, IgnoreMissingAttachments, OptionalContext]() { auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); }); if (RemoteResult.IsError()) @@ -2432,11 +2459,8 @@ LoadOplog(CidStore& ChunkStore, RawHash, AttachmentResult.Reason, AttachmentResult.ErrorCode)); - if (IgnoreMissingAttachments) - { - Info.MissingAttachmentCount.fetch_add(1); - } - else + Info.MissingAttachmentCount.fetch_add(1); + if (!IgnoreMissingAttachments) { RemoteResult.SetError(AttachmentResult.ErrorCode, AttachmentResult.Reason, AttachmentResult.Text); } @@ -2516,7 +2540,14 @@ LoadOplog(CidStore& ChunkStore, { std::filesystem::path TempFileName = TempFilePath / Chunked.RawHash.ToHexString(); DechunkLatch.AddCount(1); - WorkerPool.ScheduleWork([&ChunkStore, &DechunkLatch, TempFileName, &Chunked, &RemoteResult, OptionalContext]() { + WorkerPool.ScheduleWork([&ChunkStore, + &DechunkLatch, + TempFileName, + &Chunked, + &RemoteResult, + IgnoreMissingAttachments, + &Info, + OptionalContext]() { auto _ = MakeGuard([&DechunkLatch] { DechunkLatch.CountDown(); }); if (RemoteResult.IsError()) { @@ -2538,12 +2569,26 @@ LoadOplog(CidStore& ChunkStore, IoBuffer Chunk = ChunkStore.FindChunkByCid(ChunkHash); if (!Chunk) { - RemoteResult.SetError( - gsl::narrow<int>(HttpResponseCode::NotFound), - "Missing chunk", - fmt::format("Missing chunk {} for chunked attachment {}", ChunkHash, Chunked.RawHash)); ReportMessage(OptionalContext, fmt::format("Missing chunk {} for chunked attachment {}", ChunkHash, Chunked.RawHash)); + + // We only add 1 as the resulting missing count will be 1 for the dechunked file + Info.MissingAttachmentCount.fetch_add(1); + TmpFile.Close(); + std::error_code Ec; + std::filesystem::remove(TempFileName, Ec); + if (Ec) + { + ZEN_INFO("Failed to remove temporary file '{}'. Reason: {}", TempFileName, Ec.message()); + } + + if (!IgnoreMissingAttachments) + { + RemoteResult.SetError( + gsl::narrow<int>(HttpResponseCode::NotFound), + "Missing chunk", + fmt::format("Missing chunk {} for chunked attachment {}", ChunkHash, Chunked.RawHash)); + } return; } CompositeBuffer Decompressed = @@ -2570,7 +2615,14 @@ LoadOplog(CidStore& ChunkStore, ZEN_ASSERT(ValidateRawHash == Chunked.RawHash); ZEN_ASSERT(ValidateRawSize == Chunked.RawSize); } - ChunkStore.AddChunk(TmpBuffer, Chunked.RawHash, CidStore::InsertMode::kMayBeMovedInPlace); + CidStore::InsertResult InsertResult = + ChunkStore.AddChunk(TmpBuffer, Chunked.RawHash, CidStore::InsertMode::kMayBeMovedInPlace); + if (InsertResult.New) + { + Info.AttachmentBytesStored.fetch_add(TmpBuffer.GetSize()); + Info.AttachmentsStored.fetch_add(1); + } + ZEN_INFO("Dechunked attachment {} ({}) in {}", Chunked.RawHash, NiceBytes(Chunked.RawSize), |