aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2023-09-20 17:22:49 -0400
committerGitHub <[email protected]>2023-09-20 23:22:49 +0200
commit73b86bbbd15e22893f05261dee036b81bfff91cf (patch)
tree45c36ed9f8c7e340bd1ba292ec11cab5186b6965 /src/zenserver
parentcontrolled zenserver shutdown (#413) (diff)
downloadzen-73b86bbbd15e22893f05261dee036b81bfff91cf.tar.xz
zen-73b86bbbd15e22893f05261dee036b81bfff91cf.zip
Improvement: Add names to background jobs for easier debugging (#412)
Improvement: Background jobs now temporarily sets thread name to background job name while executing Improvement: Background jobs tracks worker thread id used while executing
Diffstat (limited to 'src/zenserver')
-rw-r--r--src/zenserver/admin/admin.cpp6
-rw-r--r--src/zenserver/projectstore/projectstore.cpp60
-rw-r--r--src/zenserver/projectstore/remoteprojectstore.cpp44
-rw-r--r--src/zenserver/projectstore/remoteprojectstore.h2
4 files changed, 66 insertions, 46 deletions
diff --git a/src/zenserver/admin/admin.cpp b/src/zenserver/admin/admin.cpp
index 74131e624..cef2ba403 100644
--- a/src/zenserver/admin/admin.cpp
+++ b/src/zenserver/admin/admin.cpp
@@ -99,24 +99,29 @@ HttpAdminService::HttpAdminService(GcScheduler& Scheduler, JobQueue& BackgroundJ
case JobQueue::Status::Queued:
{
CbObjectWriter Obj;
+ Obj.AddString("Name"sv, CurrentState->Name);
Obj.AddString("Status"sv, "Queued"sv);
Obj.AddFloat("QueueTimeS", GetAgeAsSeconds(CurrentState->CreateTime, Now));
+ Obj.AddInteger("WorkerThread", CurrentState->WorkerThreadId);
Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Obj.Save());
}
break;
case JobQueue::Status::Running:
{
CbObjectWriter Obj;
+ Obj.AddString("Name"sv, CurrentState->Name);
Obj.AddString("Status"sv, "Running"sv);
WriteState(Obj, CurrentState->State);
Obj.AddFloat("QueueTimeS", GetAgeAsSeconds(CurrentState->CreateTime, CurrentState->StartTime));
Obj.AddFloat("RunTimeS", GetAgeAsSeconds(CurrentState->StartTime, Now));
+ Obj.AddInteger("WorkerThread", CurrentState->WorkerThreadId);
Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Obj.Save());
}
break;
case JobQueue::Status::Aborted:
{
CbObjectWriter Obj;
+ Obj.AddString("Name"sv, CurrentState->Name);
Obj.AddString("Status"sv, "Aborted"sv);
WriteState(Obj, CurrentState->State);
Obj.AddFloat("QueueTimeS", GetAgeAsSeconds(CurrentState->CreateTime, CurrentState->StartTime));
@@ -128,6 +133,7 @@ HttpAdminService::HttpAdminService(GcScheduler& Scheduler, JobQueue& BackgroundJ
case JobQueue::Status::Completed:
{
CbObjectWriter Obj;
+ Obj.AddString("Name"sv, CurrentState->Name);
Obj.AddString("Status"sv, "Complete"sv);
WriteState(Obj, CurrentState->State);
Obj.AddFloat("QueueTimeS", GetAgeAsSeconds(CurrentState->CreateTime, CurrentState->StartTime));
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp
index 1ad4403f4..70dc678a9 100644
--- a/src/zenserver/projectstore/projectstore.cpp
+++ b/src/zenserver/projectstore/projectstore.cpp
@@ -2881,34 +2881,37 @@ ProjectStore::Export(Ref<ProjectStore::Project> Project, ProjectStore::Oplog& Op
NiceBytes(MaxBlockSize),
NiceBytes(MaxChunkEmbedSize));
- JobId JobId = m_JobQueue.QueueJob([this,
- ActualRemoteStore = std::move(RemoteStore),
- Project,
- OplogPtr = &Oplog,
- MaxBlockSize,
- MaxChunkEmbedSize,
- EmbedLooseFile,
- CreateBlocks = StoreInfo.CreateBlocks,
- UseTempBlockFiles = StoreInfo.UseTempBlockFiles,
- Force](JobContext& Context) {
- RemoteProjectStore::Result Result = SaveOplog(m_CidStore,
- *ActualRemoteStore,
- *Project.Get(),
- *OplogPtr,
- MaxBlockSize,
- MaxChunkEmbedSize,
- EmbedLooseFile,
- CreateBlocks,
- UseTempBlockFiles,
- Force,
- &Context);
- auto Response = ConvertResult(Result);
- ZEN_INFO("SaveOplog: Status: {} '{}'", ToString(Response.first), Response.second);
- if (!IsHttpSuccessCode(Response.first))
- {
- throw std::runtime_error(fmt::format("Export failed. Status '{}'. Reason: '{}'", ToString(Response.first), Response.second));
- }
- });
+ JobId JobId =
+ m_JobQueue.QueueJob(fmt::format("Export oplog '{}/{}' to {}", Project->Identifier, Oplog.OplogId(), StoreInfo.Description),
+ [this,
+ ActualRemoteStore = std::move(RemoteStore),
+ Project,
+ OplogPtr = &Oplog,
+ MaxBlockSize,
+ MaxChunkEmbedSize,
+ EmbedLooseFile,
+ CreateBlocks = StoreInfo.CreateBlocks,
+ UseTempBlockFiles = StoreInfo.UseTempBlockFiles,
+ Force](JobContext& Context) {
+ RemoteProjectStore::Result Result = SaveOplog(m_CidStore,
+ *ActualRemoteStore,
+ *Project.Get(),
+ *OplogPtr,
+ MaxBlockSize,
+ MaxChunkEmbedSize,
+ EmbedLooseFile,
+ CreateBlocks,
+ UseTempBlockFiles,
+ Force,
+ &Context);
+ auto Response = ConvertResult(Result);
+ ZEN_INFO("SaveOplog: Status: {} '{}'", ToString(Response.first), Response.second);
+ if (!IsHttpSuccessCode(Response.first))
+ {
+ throw std::runtime_error(
+ fmt::format("Export failed. Status '{}'. Reason: '{}'", ToString(Response.first), Response.second));
+ }
+ });
return {HttpResponseCode::Accepted, fmt::format("{}", JobId.Id)};
}
@@ -2935,6 +2938,7 @@ ProjectStore::Import(ProjectStore::Project& Project, ProjectStore::Oplog& Oplog,
ZEN_INFO("Loading oplog '{}/{}' from {}", Project.Identifier, Oplog.OplogId(), StoreInfo.Description);
JobId JobId = m_JobQueue.QueueJob(
+ fmt::format("Import oplog '{}/{}' from {}", Project.Identifier, Oplog.OplogId(), StoreInfo.Description),
[this, ActualRemoteStore = std::move(RemoteStore), OplogPtr = &Oplog, MaxBlockSize, MaxChunkEmbedSize, Force](JobContext& Context) {
RemoteProjectStore::Result Result = LoadOplog(m_CidStore, *ActualRemoteStore, *OplogPtr, Force, &Context);
auto Response = ConvertResult(Result);
diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp
index ea744eb35..ee3456ecc 100644
--- a/src/zenserver/projectstore/remoteprojectstore.cpp
+++ b/src/zenserver/projectstore/remoteprojectstore.cpp
@@ -77,7 +77,7 @@ ReportProgress(JobContext* OptionalContext, std::string_view CurrentOp, ptrdiff_
if (OptionalContext)
{
ZEN_ASSERT(Total > 0);
- OptionalContext->Queue.ReportProgress(OptionalContext->Id, CurrentOp, gsl::narrow<uint32_t>((100 * (Total - Remaining)) / Total));
+ OptionalContext->ReportProgress(CurrentOp, gsl::narrow<uint32_t>((100 * (Total - Remaining)) / Total));
}
ZEN_INFO("{}", CurrentOp);
}
@@ -87,12 +87,22 @@ ReportMessage(JobContext* OptionalContext, std::string_view Message)
{
if (OptionalContext)
{
- OptionalContext->Queue.ReportMessage(OptionalContext->Id, Message);
+ OptionalContext->ReportMessage(Message);
}
ZEN_INFO("{}", Message);
}
bool
+IsCancelled(JobContext* OptionalContext)
+{
+ if (!OptionalContext)
+ {
+ return false;
+ }
+ return OptionalContext->IsCancelled();
+}
+
+bool
IterateBlock(IoBuffer&& CompressedBlock, std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor)
{
IoBuffer BlockPayload = CompressedBuffer::FromCompressedNoValidate(std::move(CompressedBlock)).Decompress().AsIoBuffer();
@@ -408,13 +418,13 @@ BuildContainer(CidStore& ChunkStore,
SectionOpsWriter << Op;
}
OpCount++;
- if (OptionalContext && OptionalContext->CancelFlag)
+ if (IsCancelled(OptionalContext))
{
RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
}
});
- if (OptionalContext && OptionalContext->CancelFlag)
+ if (IsCancelled(OptionalContext))
{
RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
return {};
@@ -493,7 +503,7 @@ BuildContainer(CidStore& ChunkStore,
return LhsKeyIt->second < RhsKeyIt->second;
});
- if (OptionalContext && OptionalContext->CancelFlag)
+ if (IsCancelled(OptionalContext))
{
RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
return {};
@@ -522,7 +532,7 @@ BuildContainer(CidStore& ChunkStore,
Latch BlockCreateLatch(1);
for (const IoHash& AttachmentHash : SortedAttachments)
{
- if (OptionalContext && OptionalContext->CancelFlag)
+ if (IsCancelled(OptionalContext))
{
RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
BlockCreateLatch.CountDown();
@@ -652,7 +662,7 @@ BuildContainer(CidStore& ChunkStore,
}
SectionOpsWriter.EndArray(); // "ops"
- if (OptionalContext && OptionalContext->CancelFlag)
+ if (IsCancelled(OptionalContext))
{
RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
BlockCreateLatch.CountDown();
@@ -670,7 +680,7 @@ BuildContainer(CidStore& ChunkStore,
LargeAttachmentCount));
CompressedBuffer CompressedOpsSection = CompressedBuffer::Compress(SectionOpsWriter.Save().GetBuffer());
- if (OptionalContext && OptionalContext->CancelFlag)
+ if (IsCancelled(OptionalContext))
{
RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
BlockCreateLatch.CountDown();
@@ -694,7 +704,7 @@ BuildContainer(CidStore& ChunkStore,
while (!BlockCreateLatch.Wait(1000))
{
ptrdiff_t Remaining = BlockCreateLatch.Remaining();
- if (OptionalContext && OptionalContext->CancelFlag)
+ if (IsCancelled(OptionalContext))
{
RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
while (!BlockCreateLatch.Wait(1000))
@@ -896,7 +906,7 @@ UploadAttachments(WorkerThreadPool& WorkerPool,
return;
}
- if (OptionalContext && OptionalContext->CancelFlag)
+ if (IsCancelled(OptionalContext))
{
if (!RemoteResult.IsError())
{
@@ -978,7 +988,7 @@ UploadAttachments(WorkerThreadPool& WorkerPool,
});
}
- if (OptionalContext && OptionalContext->CancelFlag)
+ if (IsCancelled(OptionalContext))
{
if (!RemoteResult.IsError())
{
@@ -1029,7 +1039,7 @@ UploadAttachments(WorkerThreadPool& WorkerPool,
});
}
- if (OptionalContext && OptionalContext->CancelFlag)
+ if (IsCancelled(OptionalContext))
{
if (!RemoteResult.IsError())
{
@@ -1105,7 +1115,7 @@ UploadAttachments(WorkerThreadPool& WorkerPool,
while (!SaveAttachmentsLatch.Wait(1000))
{
ptrdiff_t Remaining = SaveAttachmentsLatch.Remaining();
- if (OptionalContext && OptionalContext->CancelFlag)
+ if (IsCancelled(OptionalContext))
{
if (!RemoteResult.IsError())
{
@@ -1304,7 +1314,7 @@ SaveOplog(CidStore& ChunkStore,
/* out */ RemoteResult);
if (!RemoteResult.IsError())
{
- if (OptionalContext && OptionalContext->CancelFlag)
+ if (IsCancelled(OptionalContext))
{
RemoteProjectStore::Result Result = {.ErrorCode = 0,
.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500,
@@ -1341,7 +1351,7 @@ SaveOplog(CidStore& ChunkStore,
while (!RemoteResult.IsError())
{
- if (OptionalContext && OptionalContext->CancelFlag)
+ if (IsCancelled(OptionalContext))
{
RemoteProjectStore::Result Result = {.ErrorCode = 0,
.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500,
@@ -1365,7 +1375,7 @@ SaveOplog(CidStore& ChunkStore,
break;
}
- if (OptionalContext && OptionalContext->CancelFlag)
+ if (IsCancelled(OptionalContext))
{
RemoteProjectStore::Result Result = {.ErrorCode = 0,
.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500,
@@ -1654,7 +1664,7 @@ LoadOplog(CidStore& ChunkStore,
while (!AttachmentsWorkLatch.Wait(1000))
{
ptrdiff_t Remaining = AttachmentsWorkLatch.Remaining();
- if (OptionalContext && OptionalContext->CancelFlag)
+ if (IsCancelled(OptionalContext))
{
if (!RemoteResult.IsError())
{
diff --git a/src/zenserver/projectstore/remoteprojectstore.h b/src/zenserver/projectstore/remoteprojectstore.h
index 552b11380..be086084c 100644
--- a/src/zenserver/projectstore/remoteprojectstore.h
+++ b/src/zenserver/projectstore/remoteprojectstore.h
@@ -99,7 +99,7 @@ RemoteProjectStore::LoadContainerResult BuildContainer(
tsl::robin_map<IoHash, IoBuffer, IoHash::Hasher>*
OutOptionalTempAttachments); // Set OutOptionalTempAttachments to nullptr to avoid embedding loose "additional files"
-struct JobContext;
+class JobContext;
RemoteProjectStore::Result SaveOplogContainer(ProjectStore::Oplog& Oplog,
const CbObject& ContainerObject,