diff options
| author | Dan Engelbrecht <[email protected]> | 2023-09-20 17:22:49 -0400 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-09-20 23:22:49 +0200 |
| commit | 73b86bbbd15e22893f05261dee036b81bfff91cf (patch) | |
| tree | 45c36ed9f8c7e340bd1ba292ec11cab5186b6965 /src/zenserver | |
| parent | controlled zenserver shutdown (#413) (diff) | |
| download | zen-73b86bbbd15e22893f05261dee036b81bfff91cf.tar.xz zen-73b86bbbd15e22893f05261dee036b81bfff91cf.zip | |
Improvement: Add names to background jobs for easier debugging (#412)
Improvement: Background jobs now temporarily sets thread name to background job name while executing
Improvement: Background jobs tracks worker thread id used while executing
Diffstat (limited to 'src/zenserver')
| -rw-r--r-- | src/zenserver/admin/admin.cpp | 6 | ||||
| -rw-r--r-- | src/zenserver/projectstore/projectstore.cpp | 60 | ||||
| -rw-r--r-- | src/zenserver/projectstore/remoteprojectstore.cpp | 44 | ||||
| -rw-r--r-- | src/zenserver/projectstore/remoteprojectstore.h | 2 |
4 files changed, 66 insertions, 46 deletions
diff --git a/src/zenserver/admin/admin.cpp b/src/zenserver/admin/admin.cpp index 74131e624..cef2ba403 100644 --- a/src/zenserver/admin/admin.cpp +++ b/src/zenserver/admin/admin.cpp @@ -99,24 +99,29 @@ HttpAdminService::HttpAdminService(GcScheduler& Scheduler, JobQueue& BackgroundJ case JobQueue::Status::Queued: { CbObjectWriter Obj; + Obj.AddString("Name"sv, CurrentState->Name); Obj.AddString("Status"sv, "Queued"sv); Obj.AddFloat("QueueTimeS", GetAgeAsSeconds(CurrentState->CreateTime, Now)); + Obj.AddInteger("WorkerThread", CurrentState->WorkerThreadId); Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Obj.Save()); } break; case JobQueue::Status::Running: { CbObjectWriter Obj; + Obj.AddString("Name"sv, CurrentState->Name); Obj.AddString("Status"sv, "Running"sv); WriteState(Obj, CurrentState->State); Obj.AddFloat("QueueTimeS", GetAgeAsSeconds(CurrentState->CreateTime, CurrentState->StartTime)); Obj.AddFloat("RunTimeS", GetAgeAsSeconds(CurrentState->StartTime, Now)); + Obj.AddInteger("WorkerThread", CurrentState->WorkerThreadId); Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Obj.Save()); } break; case JobQueue::Status::Aborted: { CbObjectWriter Obj; + Obj.AddString("Name"sv, CurrentState->Name); Obj.AddString("Status"sv, "Aborted"sv); WriteState(Obj, CurrentState->State); Obj.AddFloat("QueueTimeS", GetAgeAsSeconds(CurrentState->CreateTime, CurrentState->StartTime)); @@ -128,6 +133,7 @@ HttpAdminService::HttpAdminService(GcScheduler& Scheduler, JobQueue& BackgroundJ case JobQueue::Status::Completed: { CbObjectWriter Obj; + Obj.AddString("Name"sv, CurrentState->Name); Obj.AddString("Status"sv, "Complete"sv); WriteState(Obj, CurrentState->State); Obj.AddFloat("QueueTimeS", GetAgeAsSeconds(CurrentState->CreateTime, CurrentState->StartTime)); diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp index 1ad4403f4..70dc678a9 100644 --- a/src/zenserver/projectstore/projectstore.cpp +++ b/src/zenserver/projectstore/projectstore.cpp @@ -2881,34 +2881,37 @@ ProjectStore::Export(Ref<ProjectStore::Project> Project, ProjectStore::Oplog& Op NiceBytes(MaxBlockSize), NiceBytes(MaxChunkEmbedSize)); - JobId JobId = m_JobQueue.QueueJob([this, - ActualRemoteStore = std::move(RemoteStore), - Project, - OplogPtr = &Oplog, - MaxBlockSize, - MaxChunkEmbedSize, - EmbedLooseFile, - CreateBlocks = StoreInfo.CreateBlocks, - UseTempBlockFiles = StoreInfo.UseTempBlockFiles, - Force](JobContext& Context) { - RemoteProjectStore::Result Result = SaveOplog(m_CidStore, - *ActualRemoteStore, - *Project.Get(), - *OplogPtr, - MaxBlockSize, - MaxChunkEmbedSize, - EmbedLooseFile, - CreateBlocks, - UseTempBlockFiles, - Force, - &Context); - auto Response = ConvertResult(Result); - ZEN_INFO("SaveOplog: Status: {} '{}'", ToString(Response.first), Response.second); - if (!IsHttpSuccessCode(Response.first)) - { - throw std::runtime_error(fmt::format("Export failed. Status '{}'. Reason: '{}'", ToString(Response.first), Response.second)); - } - }); + JobId JobId = + m_JobQueue.QueueJob(fmt::format("Export oplog '{}/{}' to {}", Project->Identifier, Oplog.OplogId(), StoreInfo.Description), + [this, + ActualRemoteStore = std::move(RemoteStore), + Project, + OplogPtr = &Oplog, + MaxBlockSize, + MaxChunkEmbedSize, + EmbedLooseFile, + CreateBlocks = StoreInfo.CreateBlocks, + UseTempBlockFiles = StoreInfo.UseTempBlockFiles, + Force](JobContext& Context) { + RemoteProjectStore::Result Result = SaveOplog(m_CidStore, + *ActualRemoteStore, + *Project.Get(), + *OplogPtr, + MaxBlockSize, + MaxChunkEmbedSize, + EmbedLooseFile, + CreateBlocks, + UseTempBlockFiles, + Force, + &Context); + auto Response = ConvertResult(Result); + ZEN_INFO("SaveOplog: Status: {} '{}'", ToString(Response.first), Response.second); + if (!IsHttpSuccessCode(Response.first)) + { + throw std::runtime_error( + fmt::format("Export failed. Status '{}'. Reason: '{}'", ToString(Response.first), Response.second)); + } + }); return {HttpResponseCode::Accepted, fmt::format("{}", JobId.Id)}; } @@ -2935,6 +2938,7 @@ ProjectStore::Import(ProjectStore::Project& Project, ProjectStore::Oplog& Oplog, ZEN_INFO("Loading oplog '{}/{}' from {}", Project.Identifier, Oplog.OplogId(), StoreInfo.Description); JobId JobId = m_JobQueue.QueueJob( + fmt::format("Import oplog '{}/{}' from {}", Project.Identifier, Oplog.OplogId(), StoreInfo.Description), [this, ActualRemoteStore = std::move(RemoteStore), OplogPtr = &Oplog, MaxBlockSize, MaxChunkEmbedSize, Force](JobContext& Context) { RemoteProjectStore::Result Result = LoadOplog(m_CidStore, *ActualRemoteStore, *OplogPtr, Force, &Context); auto Response = ConvertResult(Result); diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp index ea744eb35..ee3456ecc 100644 --- a/src/zenserver/projectstore/remoteprojectstore.cpp +++ b/src/zenserver/projectstore/remoteprojectstore.cpp @@ -77,7 +77,7 @@ ReportProgress(JobContext* OptionalContext, std::string_view CurrentOp, ptrdiff_ if (OptionalContext) { ZEN_ASSERT(Total > 0); - OptionalContext->Queue.ReportProgress(OptionalContext->Id, CurrentOp, gsl::narrow<uint32_t>((100 * (Total - Remaining)) / Total)); + OptionalContext->ReportProgress(CurrentOp, gsl::narrow<uint32_t>((100 * (Total - Remaining)) / Total)); } ZEN_INFO("{}", CurrentOp); } @@ -87,12 +87,22 @@ ReportMessage(JobContext* OptionalContext, std::string_view Message) { if (OptionalContext) { - OptionalContext->Queue.ReportMessage(OptionalContext->Id, Message); + OptionalContext->ReportMessage(Message); } ZEN_INFO("{}", Message); } bool +IsCancelled(JobContext* OptionalContext) +{ + if (!OptionalContext) + { + return false; + } + return OptionalContext->IsCancelled(); +} + +bool IterateBlock(IoBuffer&& CompressedBlock, std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor) { IoBuffer BlockPayload = CompressedBuffer::FromCompressedNoValidate(std::move(CompressedBlock)).Decompress().AsIoBuffer(); @@ -408,13 +418,13 @@ BuildContainer(CidStore& ChunkStore, SectionOpsWriter << Op; } OpCount++; - if (OptionalContext && OptionalContext->CancelFlag) + if (IsCancelled(OptionalContext)) { RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); } }); - if (OptionalContext && OptionalContext->CancelFlag) + if (IsCancelled(OptionalContext)) { RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); return {}; @@ -493,7 +503,7 @@ BuildContainer(CidStore& ChunkStore, return LhsKeyIt->second < RhsKeyIt->second; }); - if (OptionalContext && OptionalContext->CancelFlag) + if (IsCancelled(OptionalContext)) { RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); return {}; @@ -522,7 +532,7 @@ BuildContainer(CidStore& ChunkStore, Latch BlockCreateLatch(1); for (const IoHash& AttachmentHash : SortedAttachments) { - if (OptionalContext && OptionalContext->CancelFlag) + if (IsCancelled(OptionalContext)) { RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); BlockCreateLatch.CountDown(); @@ -652,7 +662,7 @@ BuildContainer(CidStore& ChunkStore, } SectionOpsWriter.EndArray(); // "ops" - if (OptionalContext && OptionalContext->CancelFlag) + if (IsCancelled(OptionalContext)) { RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); BlockCreateLatch.CountDown(); @@ -670,7 +680,7 @@ BuildContainer(CidStore& ChunkStore, LargeAttachmentCount)); CompressedBuffer CompressedOpsSection = CompressedBuffer::Compress(SectionOpsWriter.Save().GetBuffer()); - if (OptionalContext && OptionalContext->CancelFlag) + if (IsCancelled(OptionalContext)) { RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); BlockCreateLatch.CountDown(); @@ -694,7 +704,7 @@ BuildContainer(CidStore& ChunkStore, while (!BlockCreateLatch.Wait(1000)) { ptrdiff_t Remaining = BlockCreateLatch.Remaining(); - if (OptionalContext && OptionalContext->CancelFlag) + if (IsCancelled(OptionalContext)) { RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); while (!BlockCreateLatch.Wait(1000)) @@ -896,7 +906,7 @@ UploadAttachments(WorkerThreadPool& WorkerPool, return; } - if (OptionalContext && OptionalContext->CancelFlag) + if (IsCancelled(OptionalContext)) { if (!RemoteResult.IsError()) { @@ -978,7 +988,7 @@ UploadAttachments(WorkerThreadPool& WorkerPool, }); } - if (OptionalContext && OptionalContext->CancelFlag) + if (IsCancelled(OptionalContext)) { if (!RemoteResult.IsError()) { @@ -1029,7 +1039,7 @@ UploadAttachments(WorkerThreadPool& WorkerPool, }); } - if (OptionalContext && OptionalContext->CancelFlag) + if (IsCancelled(OptionalContext)) { if (!RemoteResult.IsError()) { @@ -1105,7 +1115,7 @@ UploadAttachments(WorkerThreadPool& WorkerPool, while (!SaveAttachmentsLatch.Wait(1000)) { ptrdiff_t Remaining = SaveAttachmentsLatch.Remaining(); - if (OptionalContext && OptionalContext->CancelFlag) + if (IsCancelled(OptionalContext)) { if (!RemoteResult.IsError()) { @@ -1304,7 +1314,7 @@ SaveOplog(CidStore& ChunkStore, /* out */ RemoteResult); if (!RemoteResult.IsError()) { - if (OptionalContext && OptionalContext->CancelFlag) + if (IsCancelled(OptionalContext)) { RemoteProjectStore::Result Result = {.ErrorCode = 0, .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500, @@ -1341,7 +1351,7 @@ SaveOplog(CidStore& ChunkStore, while (!RemoteResult.IsError()) { - if (OptionalContext && OptionalContext->CancelFlag) + if (IsCancelled(OptionalContext)) { RemoteProjectStore::Result Result = {.ErrorCode = 0, .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500, @@ -1365,7 +1375,7 @@ SaveOplog(CidStore& ChunkStore, break; } - if (OptionalContext && OptionalContext->CancelFlag) + if (IsCancelled(OptionalContext)) { RemoteProjectStore::Result Result = {.ErrorCode = 0, .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500, @@ -1654,7 +1664,7 @@ LoadOplog(CidStore& ChunkStore, while (!AttachmentsWorkLatch.Wait(1000)) { ptrdiff_t Remaining = AttachmentsWorkLatch.Remaining(); - if (OptionalContext && OptionalContext->CancelFlag) + if (IsCancelled(OptionalContext)) { if (!RemoteResult.IsError()) { diff --git a/src/zenserver/projectstore/remoteprojectstore.h b/src/zenserver/projectstore/remoteprojectstore.h index 552b11380..be086084c 100644 --- a/src/zenserver/projectstore/remoteprojectstore.h +++ b/src/zenserver/projectstore/remoteprojectstore.h @@ -99,7 +99,7 @@ RemoteProjectStore::LoadContainerResult BuildContainer( tsl::robin_map<IoHash, IoBuffer, IoHash::Hasher>* OutOptionalTempAttachments); // Set OutOptionalTempAttachments to nullptr to avoid embedding loose "additional files" -struct JobContext; +class JobContext; RemoteProjectStore::Result SaveOplogContainer(ProjectStore::Oplog& Oplog, const CbObject& ContainerObject, |