aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2024-03-25 16:28:42 +0100
committerGitHub Enterprise <[email protected]>2024-03-25 16:28:42 +0100
commita4fbd98e203345059c58012d3569d18a4f64b6bb (patch)
tree92ffbaeb42e93b9afcc1d31044b5d0c3bf3632e6 /src
parentuse batch request for checking existing blocks as Jupiter is now fixed (#20) (diff)
downloadzen-a4fbd98e203345059c58012d3569d18a4f64b6bb.tar.xz
zen-a4fbd98e203345059c58012d3569d18a4f64b6bb.zip
add a limit to the number of times we attempt to finalize (#22)
- Improvement: Add limit to the number of times we attempt to finalize and exported oplog - Improvement: Switch to large thread pool when executing oplog export/import - Improvement: Clean up reporting of missing attachments in oplog export/import - Improvement: Remove double-reporting of abort reason for oplog export/import
Diffstat (limited to 'src')
-rw-r--r--src/zen/cmds/projectstore_cmd.cpp6
-rw-r--r--src/zencore/jobqueue.cpp1
-rw-r--r--src/zenserver/projectstore/remoteprojectstore.cpp214
3 files changed, 137 insertions, 84 deletions
diff --git a/src/zen/cmds/projectstore_cmd.cpp b/src/zen/cmds/projectstore_cmd.cpp
index dadab22ec..40ba48137 100644
--- a/src/zen/cmds/projectstore_cmd.cpp
+++ b/src/zen/cmds/projectstore_cmd.cpp
@@ -140,7 +140,9 @@ namespace {
{
double QueueTimeS = StatusObject["QueueTimeS"].AsDouble();
double RuntimeS = StatusObject["RunTimeS"].AsDouble();
- ZEN_CONSOLE("Completed: QueueTime: {:.3} s, RunTime: {:.3} s", QueueTimeS, RuntimeS);
+ ZEN_CONSOLE("Completed: QueueTime: {}, RunTime: {}",
+ NiceTimeSpanMs(static_cast<uint64_t>(QueueTimeS * 1000.0)),
+ NiceTimeSpanMs(static_cast<uint64_t>(RuntimeS * 1000.0)));
}
break;
}
@@ -160,7 +162,7 @@ namespace {
if (Status == "Queued")
{
double QueueTimeS = StatusObject["QueueTimeS"].AsDouble();
- ZEN_CONSOLE("Queued, waited {:.3} s...", QueueTimeS);
+ ZEN_CONSOLE("Queued, waited {}...", NiceTimeSpanMs(static_cast<uint64_t>(QueueTimeS * 1000.0)));
}
uint32_t InterruptCounter = SignalCounter[SIGINT].load();
uint32_t BreakCounter = 0;
diff --git a/src/zencore/jobqueue.cpp b/src/zencore/jobqueue.cpp
index e92817431..86c08cda9 100644
--- a/src/zencore/jobqueue.cpp
+++ b/src/zencore/jobqueue.cpp
@@ -363,7 +363,6 @@ public:
{
ZEN_DEBUG("Background job {}:'{}' aborted. Reason: '{}'", CurrentJob->Id.Id, CurrentJob->Name, Ex.what());
QueueLock.WithExclusiveLock([&]() {
- CurrentJob->State.Messages.push_back(Ex.what());
CurrentJob->State.AbortReason = Ex.what();
CurrentJob->EndTick = JobClock::Now();
CurrentJob->WorkerThreadId = 0;
diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp
index d8dfb215d..df560283c 100644
--- a/src/zenserver/projectstore/remoteprojectstore.cpp
+++ b/src/zenserver/projectstore/remoteprojectstore.cpp
@@ -359,20 +359,20 @@ BuildContainer(CidStore& ChunkStore,
std::filesystem::path FilePath = Project.RootDir / ServerPath;
if (!std::filesystem::is_regular_file(FilePath))
{
- ExtendableStringBuilder<1024> Sb;
- Sb.Append("Failed to find attachment '");
- Sb.Append(FilePath.string());
- Sb.Append("' for op: \n");
- Op.ToJson(Sb);
-
- ReportMessage(OptionalContext, fmt::format("Missing attachment '{}': {}", FilePath, Sb.ToView()));
+ ReportMessage(OptionalContext,
+ fmt::format("Missing attachment '{}' for op '{}'", FilePath, View["id"sv].AsObjectId()));
if (IgnoreMissingAttachments)
{
continue;
}
else
{
- throw std::runtime_error(fmt::format("failed to open file '{}'", FilePath));
+ ExtendableStringBuilder<1024> Sb;
+ Sb.Append("Failed to find attachment '");
+ Sb.Append(FilePath.string());
+ Sb.Append("' for op: \n");
+ View.ToJson(Sb);
+ throw std::runtime_error(Sb.ToString());
}
}
@@ -886,18 +886,18 @@ BuildContainer(CidStore& ChunkStore,
ZEN_ASSERT(It != UploadAttachments.end());
std::optional<CbObject> Op = Oplog.GetOpByKey(It->second.Key);
ZEN_ASSERT(Op.has_value());
- ExtendableStringBuilder<1024> Sb;
- Sb.Append("Failed to find attachment '");
- Sb.Append(AttachmentHash.ToHexString());
- Sb.Append("' for op: \n");
- Op.value().ToJson(Sb);
if (IgnoreMissingAttachments)
{
- ReportMessage(OptionalContext, fmt::format("Missing attachment '{}': {}", AttachmentHash, Sb.ToView()));
+ ReportMessage(OptionalContext, fmt::format("Missing attachment '{}' for op '{}'", AttachmentHash, It->second.Key));
}
else
{
+ ExtendableStringBuilder<1024> Sb;
+ Sb.Append("Failed to find attachment '");
+ Sb.Append(AttachmentHash.ToHexString());
+ Sb.Append("' for op: \n");
+ Op.value().ToJson(Sb);
RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), Sb.ToString(), {});
return {};
}
@@ -1362,7 +1362,7 @@ BuildContainer(CidStore& ChunkStore,
const std::function<void(std::vector<std::pair<IoHash, FetchChunkFunc>>&&)>& OnBlockChunks,
bool EmbedLooseFiles)
{
- WorkerThreadPool& WorkerPool = GetSmallWorkerPool();
+ WorkerThreadPool& WorkerPool = GetLargeWorkerPool();
AsyncRemoteResult RemoteResult;
CbObject ContainerObject = BuildContainer(ChunkStore,
@@ -1710,7 +1710,8 @@ SaveOplog(CidStore& ChunkStore,
UploadInfo Info;
- WorkerThreadPool& WorkerPool = GetSmallWorkerPool();
+ WorkerThreadPool& WorkerPool = GetLargeWorkerPool();
+ WorkerThreadPool& NetworkWorkerPool = GetSmallWorkerPool();
const RemoteProjectStore::RemoteStoreInfo RemoteStoreInfo = RemoteStore.GetInfo();
@@ -1806,7 +1807,9 @@ SaveOplog(CidStore& ChunkStore,
}
else
{
- ReportMessage(OptionalContext, fmt::format("Loaded oplog base container in {:.3} s", BaseContainerResult.ElapsedSeconds));
+ ReportMessage(OptionalContext,
+ fmt::format("Loaded oplog base container in {}",
+ NiceTimeSpanMs(static_cast<uint64_t>(BaseContainerResult.ElapsedSeconds * 1000.0))));
CbArrayView BlocksArray = BaseContainerResult.ContainerObject["blocks"sv].AsArrayView();
@@ -1823,10 +1826,11 @@ SaveOplog(CidStore& ChunkStore,
if (HasResult.ErrorCode == 0)
{
ReportMessage(OptionalContext,
- fmt::format("Checked the existance of {} block{} in remote store in {}",
+ fmt::format("Checked the existance of {} block{} in remote store in {}, found {} eligeble blocks",
BlockHashes.size(),
- BlockHashes.size() > 1 ? "S"sv : ""sv,
- NiceTimeSpanMs(static_cast<uint64_t>(HasResult.ElapsedSeconds * 1000))));
+ BlockHashes.size() > 1 ? "s"sv : ""sv,
+ BlockHashes.size() - HasResult.Needs.size(),
+ NiceTimeSpanMs(static_cast<uint64_t>(HasResult.ElapsedSeconds * 1000.0))));
if (HasResult.Needs.size() < BlocksArray.Num())
{
KnownBlocks.reserve(BlocksArray.Num() - HasResult.Needs.size());
@@ -1917,10 +1921,10 @@ SaveOplog(CidStore& ChunkStore,
ReportMessage(OptionalContext,
fmt::format("Saved container '{}' in {}",
RemoteStoreInfo.ContainerName,
- NiceTimeSpanMs(static_cast<uint64_t>(ContainerSaveResult.ElapsedSeconds * 1000))));
+ NiceTimeSpanMs(static_cast<uint64_t>(ContainerSaveResult.ElapsedSeconds * 1000.0))));
}
- UploadAttachments(WorkerPool,
+ UploadAttachments(NetworkWorkerPool,
ChunkStore,
RemoteStore,
LargeAttachments,
@@ -1933,6 +1937,7 @@ SaveOplog(CidStore& ChunkStore,
RemoteResult,
OptionalContext);
+ uint32_t Try = 0;
while (!RemoteResult.IsError())
{
if (IsCancelled(OptionalContext))
@@ -1960,7 +1965,7 @@ SaveOplog(CidStore& ChunkStore,
ReportMessage(OptionalContext,
fmt::format("Finalized container '{}' in {}",
RemoteStoreInfo.ContainerName,
- NiceTimeSpanMs(static_cast<uint64_t>(ContainerFinalizeResult.ElapsedSeconds * 1000))));
+ NiceTimeSpanMs(static_cast<uint64_t>(ContainerFinalizeResult.ElapsedSeconds * 1000.0))));
if (ContainerFinalizeResult.Needs.empty())
{
@@ -1975,23 +1980,46 @@ SaveOplog(CidStore& ChunkStore,
return Result;
}
- ReportMessage(OptionalContext,
- fmt::format("Finalize of container '{}' reported {} missing attachments. Uploading missing attachements.",
- RemoteStoreInfo.ContainerName,
- ContainerFinalizeResult.Needs.size()));
-
- UploadAttachments(WorkerPool,
- ChunkStore,
- RemoteStore,
- LargeAttachments,
- BlockChunks,
- CreatedBlocks,
- LooseLargeFiles,
- ContainerFinalizeResult.Needs,
- false,
- Info,
- RemoteResult,
- OptionalContext);
+ const uint32_t MaxTries = 8;
+ if (Try < MaxTries)
+ {
+ Try++;
+
+ ReportMessage(
+ OptionalContext,
+ fmt::format("Finalize of container '{}' reported {} missing attachments. Uploading missing attachements. Try {}",
+ RemoteStoreInfo.ContainerName,
+ ContainerFinalizeResult.Needs.size(),
+ Try));
+
+ UploadAttachments(NetworkWorkerPool,
+ ChunkStore,
+ RemoteStore,
+ LargeAttachments,
+ BlockChunks,
+ CreatedBlocks,
+ LooseLargeFiles,
+ ContainerFinalizeResult.Needs,
+ false,
+ Info,
+ RemoteResult,
+ OptionalContext);
+ }
+ else
+ {
+ RemoteResult.SetError(
+ gsl::narrow<int>(HttpResponseCode::InternalServerError),
+ "Failed to save oplog container",
+ fmt::format("Giving up finalize oplog container {} after {} retries, still getting reports of missing attachments",
+ ContainerSaveResult.RawHash,
+ ContainerFinalizeResult.Needs.size()));
+ ReportMessage(OptionalContext,
+ fmt::format("Failed to finalize oplog container container {} ({}): {}",
+ ContainerSaveResult.RawHash,
+ RemoteResult.GetError(),
+ RemoteResult.GetErrorReason()));
+ break;
+ }
}
LooseLargeFiles.clear();
@@ -2110,6 +2138,10 @@ SaveOplogContainer(ProjectStore::Oplog& Oplog,
Chunked.ChunkSequence.push_back(SequenceIndex);
}
OnChunkedAttachment(Chunked);
+ ZEN_INFO("Found chunked attachment '{}' ({}) built from {} chunks",
+ Chunked.RawHash,
+ NiceBytes(Chunked.RawSize),
+ Chunked.ChunkHashes.size());
}
ReportMessage(OptionalContext, fmt::format("Requesting {} of {} attachment blocks", NeedBlockCount, BlocksArray.Num()));
@@ -2206,7 +2238,8 @@ LoadOplog(CidStore& ChunkStore,
Stopwatch Timer;
- WorkerThreadPool& WorkerPool = GetSmallWorkerPool();
+ WorkerThreadPool& WorkerPool = GetLargeWorkerPool();
+ WorkerThreadPool& NetworkWorkerPool = GetSmallWorkerPool();
std::unordered_set<IoHash, IoHash::Hasher> Attachments;
@@ -2247,7 +2280,7 @@ LoadOplog(CidStore& ChunkStore,
};
auto OnNeedBlock = [&RemoteStore,
&ChunkStore,
- &WorkerPool,
+ &NetworkWorkerPool,
&AttachmentsWorkLatch,
&AttachmentCount,
&RemoteResult,
@@ -2262,14 +2295,14 @@ LoadOplog(CidStore& ChunkStore,
{
AttachmentsWorkLatch.AddCount(1);
AttachmentCount.fetch_add(1);
- WorkerPool.ScheduleWork([&RemoteStore,
- &ChunkStore,
- &AttachmentsWorkLatch,
- &RemoteResult,
- Chunks = std::move(Chunks),
- &Info,
- IgnoreMissingAttachments,
- OptionalContext]() {
+ NetworkWorkerPool.ScheduleWork([&RemoteStore,
+ &ChunkStore,
+ &AttachmentsWorkLatch,
+ &RemoteResult,
+ Chunks = std::move(Chunks),
+ &Info,
+ IgnoreMissingAttachments,
+ OptionalContext]() {
auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); });
if (RemoteResult.IsError())
{
@@ -2283,12 +2316,9 @@ LoadOplog(CidStore& ChunkStore,
Chunks.size(),
RemoteResult.GetError(),
RemoteResult.GetErrorReason()));
+ Info.MissingAttachmentCount.fetch_add(1);
if (IgnoreMissingAttachments)
{
- Info.MissingAttachmentCount.fetch_add(1);
- }
- else
- {
RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
}
return;
@@ -2318,15 +2348,15 @@ LoadOplog(CidStore& ChunkStore,
}
AttachmentsWorkLatch.AddCount(1);
AttachmentCount.fetch_add(1);
- WorkerPool.ScheduleWork([&AttachmentsWorkLatch,
- &ChunkStore,
- &RemoteStore,
- BlockHash,
- &RemoteResult,
- Chunks = std::move(Chunks),
- &Info,
- IgnoreMissingAttachments,
- OptionalContext]() {
+ NetworkWorkerPool.ScheduleWork([&AttachmentsWorkLatch,
+ &ChunkStore,
+ &RemoteStore,
+ BlockHash,
+ &RemoteResult,
+ Chunks = std::move(Chunks),
+ &Info,
+ IgnoreMissingAttachments,
+ OptionalContext]() {
auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); });
if (RemoteResult.IsError())
{
@@ -2340,11 +2370,8 @@ LoadOplog(CidStore& ChunkStore,
BlockHash,
RemoteResult.GetError(),
RemoteResult.GetErrorReason()));
- if (IgnoreMissingAttachments)
- {
- Info.MissingAttachmentCount.fetch_add(1);
- }
- else
+ Info.MissingAttachmentCount.fetch_add(1);
+ if (!IgnoreMissingAttachments)
{
RemoteResult.SetError(BlockResult.ErrorCode, BlockResult.Reason, BlockResult.Text);
}
@@ -2398,7 +2425,7 @@ LoadOplog(CidStore& ChunkStore,
auto OnNeedAttachment = [&RemoteStore,
&ChunkStore,
- &WorkerPool,
+ &NetworkWorkerPool,
&AttachmentsWorkLatch,
&RemoteResult,
&Attachments,
@@ -2417,7 +2444,7 @@ LoadOplog(CidStore& ChunkStore,
AttachmentsWorkLatch.AddCount(1);
AttachmentCount.fetch_add(1);
- WorkerPool.ScheduleWork(
+ NetworkWorkerPool.ScheduleWork(
[&RemoteStore, &ChunkStore, &RemoteResult, &AttachmentsWorkLatch, RawHash, &Info, IgnoreMissingAttachments, OptionalContext]() {
auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); });
if (RemoteResult.IsError())
@@ -2432,11 +2459,8 @@ LoadOplog(CidStore& ChunkStore,
RawHash,
AttachmentResult.Reason,
AttachmentResult.ErrorCode));
- if (IgnoreMissingAttachments)
- {
- Info.MissingAttachmentCount.fetch_add(1);
- }
- else
+ Info.MissingAttachmentCount.fetch_add(1);
+ if (!IgnoreMissingAttachments)
{
RemoteResult.SetError(AttachmentResult.ErrorCode, AttachmentResult.Reason, AttachmentResult.Text);
}
@@ -2516,7 +2540,14 @@ LoadOplog(CidStore& ChunkStore,
{
std::filesystem::path TempFileName = TempFilePath / Chunked.RawHash.ToHexString();
DechunkLatch.AddCount(1);
- WorkerPool.ScheduleWork([&ChunkStore, &DechunkLatch, TempFileName, &Chunked, &RemoteResult, OptionalContext]() {
+ WorkerPool.ScheduleWork([&ChunkStore,
+ &DechunkLatch,
+ TempFileName,
+ &Chunked,
+ &RemoteResult,
+ IgnoreMissingAttachments,
+ &Info,
+ OptionalContext]() {
auto _ = MakeGuard([&DechunkLatch] { DechunkLatch.CountDown(); });
if (RemoteResult.IsError())
{
@@ -2538,12 +2569,26 @@ LoadOplog(CidStore& ChunkStore,
IoBuffer Chunk = ChunkStore.FindChunkByCid(ChunkHash);
if (!Chunk)
{
- RemoteResult.SetError(
- gsl::narrow<int>(HttpResponseCode::NotFound),
- "Missing chunk",
- fmt::format("Missing chunk {} for chunked attachment {}", ChunkHash, Chunked.RawHash));
ReportMessage(OptionalContext,
fmt::format("Missing chunk {} for chunked attachment {}", ChunkHash, Chunked.RawHash));
+
+ // We only add 1 as the resulting missing count will be 1 for the dechunked file
+ Info.MissingAttachmentCount.fetch_add(1);
+ TmpFile.Close();
+ std::error_code Ec;
+ std::filesystem::remove(TempFileName, Ec);
+ if (Ec)
+ {
+ ZEN_INFO("Failed to remove temporary file '{}'. Reason: {}", TempFileName, Ec.message());
+ }
+
+ if (!IgnoreMissingAttachments)
+ {
+ RemoteResult.SetError(
+ gsl::narrow<int>(HttpResponseCode::NotFound),
+ "Missing chunk",
+ fmt::format("Missing chunk {} for chunked attachment {}", ChunkHash, Chunked.RawHash));
+ }
return;
}
CompositeBuffer Decompressed =
@@ -2570,7 +2615,14 @@ LoadOplog(CidStore& ChunkStore,
ZEN_ASSERT(ValidateRawHash == Chunked.RawHash);
ZEN_ASSERT(ValidateRawSize == Chunked.RawSize);
}
- ChunkStore.AddChunk(TmpBuffer, Chunked.RawHash, CidStore::InsertMode::kMayBeMovedInPlace);
+ CidStore::InsertResult InsertResult =
+ ChunkStore.AddChunk(TmpBuffer, Chunked.RawHash, CidStore::InsertMode::kMayBeMovedInPlace);
+ if (InsertResult.New)
+ {
+ Info.AttachmentBytesStored.fetch_add(TmpBuffer.GetSize());
+ Info.AttachmentsStored.fetch_add(1);
+ }
+
ZEN_INFO("Dechunked attachment {} ({}) in {}",
Chunked.RawHash,
NiceBytes(Chunked.RawSize),