aboutsummaryrefslogtreecommitdiff
path: root/src/zencore/jobqueue.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2023-09-20 17:22:49 -0400
committerGitHub <[email protected]>2023-09-20 23:22:49 +0200
commit73b86bbbd15e22893f05261dee036b81bfff91cf (patch)
tree45c36ed9f8c7e340bd1ba292ec11cab5186b6965 /src/zencore/jobqueue.cpp
parentcontrolled zenserver shutdown (#413) (diff)
downloadzen-73b86bbbd15e22893f05261dee036b81bfff91cf.tar.xz
zen-73b86bbbd15e22893f05261dee036b81bfff91cf.zip
Improvement: Add names to background jobs for easier debugging (#412)
Improvement: Background jobs now temporarily sets thread name to background job name while executing Improvement: Background jobs tracks worker thread id used while executing
Diffstat (limited to 'src/zencore/jobqueue.cpp')
-rw-r--r--src/zencore/jobqueue.cpp155
1 files changed, 99 insertions, 56 deletions
diff --git a/src/zencore/jobqueue.cpp b/src/zencore/jobqueue.cpp
index 34530b3a2..1755b9fe9 100644
--- a/src/zencore/jobqueue.cpp
+++ b/src/zencore/jobqueue.cpp
@@ -29,11 +29,15 @@ namespace JobClock {
ClockSource::time_point TimePointFromTick(const Tick TickCount) { return ClockSource::time_point{ClockSource::duration{TickCount}}; }
} // namespace JobClock
+class JobQueueImpl;
+
class JobQueueImpl : public JobQueue
{
public:
- struct Job : public RefCounted
+ struct Job : public RefCounted, public JobContext
{
+ JobQueueImpl* Queue;
+ std::string Name;
JobId Id;
JobFunction Callback;
std::atomic_bool CancelFlag;
@@ -41,6 +45,14 @@ public:
JobClock::Tick CreateTick;
JobClock::Tick StartTick;
JobClock::Tick EndTick;
+ int WorkerThreadId;
+
+ virtual bool IsCancelled() const override { return CancelFlag.load(); }
+ virtual void ReportMessage(std::string_view Message) override { Queue->ReportMessage(Id, Message); }
+ virtual void ReportProgress(std::string_view CurrentOp, uint32_t CurrentOpPercentComplete) override
+ {
+ Queue->ReportProgress(Id, CurrentOp, CurrentOpPercentComplete);
+ }
};
JobQueueImpl(int WorkerCount, std::string_view QueueName) : WorkerPool(WorkerCount, QueueName), WorkerCounter(1)
@@ -63,7 +75,7 @@ public:
}
}
- virtual JobId QueueJob(JobFunction&& JobFunc) override
+ virtual JobId QueueJob(std::string_view Name, JobFunction&& JobFunc) override
{
ZEN_ASSERT(InitializedFlag);
@@ -73,13 +85,17 @@ public:
IdGenerator.fetch_add(1);
}
RefPtr<Job> NewJob(new Job());
- NewJob->Callback = std::move(JobFunc);
- NewJob->CancelFlag = false;
- NewJob->Id = JobId{.Id = NewJobId};
- NewJob->CreateTick = JobClock::Now();
- NewJob->StartTick = JobClock::Never();
- NewJob->EndTick = JobClock::Never();
-
+ NewJob->Queue = this;
+ NewJob->Name = Name;
+ NewJob->Callback = std::move(JobFunc);
+ NewJob->CancelFlag = false;
+ NewJob->Id = JobId{.Id = NewJobId};
+ NewJob->CreateTick = JobClock::Now();
+ NewJob->StartTick = JobClock::Never();
+ NewJob->EndTick = JobClock::Never();
+ NewJob->WorkerThreadId = 0;
+
+ ZEN_DEBUG("Scheduling background job {}:'{}'", NewJob->Id.Id, NewJob->Name);
QueueLock.WithExclusiveLock([&]() { QueuedJobs.emplace_back(std::move(NewJob)); });
WorkerCounter.AddCount(1);
try
@@ -102,7 +118,7 @@ public:
QueuedJobs.erase(It);
}
});
- ZEN_ERROR("Failed to schedule job to job queue. Reason: ''", Ex.what());
+ ZEN_ERROR("Failed to schedule job {}:'{}' to job queue. Reason: ''", NewJob->Id.Id, NewJob->Name, Ex.what());
throw;
}
}
@@ -113,6 +129,7 @@ public:
QueueLock.WithExclusiveLock([&]() {
if (auto It = RunningJobs.find(Id.Id); It != RunningJobs.end())
{
+ ZEN_DEBUG("Cancelling running background job {}:'{}'", It->second->Id.Id, It->second->Name);
It->second->CancelFlag.store(true);
Result = true;
return;
@@ -130,6 +147,7 @@ public:
if (auto It = std::find_if(QueuedJobs.begin(), QueuedJobs.end(), [&Id](const RefPtr<Job>& Job) { return Job->Id.Id == Id.Id; });
It != QueuedJobs.end())
{
+ ZEN_DEBUG("Cancelling queued background job {}:'{}'", (*It)->Id.Id, (*It)->Name);
QueuedJobs.erase(It);
Result = true;
return;
@@ -140,6 +158,11 @@ public:
virtual void Stop() override
{
+ ZEN_DEBUG("Stopping jobqueue");
+ if (!InitializedFlag)
+ {
+ return;
+ }
InitializedFlag.store(false);
QueueLock.WithExclusiveLock([&]() {
for (auto& Job : RunningJobs)
@@ -149,41 +172,32 @@ public:
QueuedJobs.clear();
});
WorkerCounter.CountDown();
- ptrdiff_t Remaining = WorkerCounter.Remaining();
- while (Remaining > 0)
+ while (true)
{
- ZEN_INFO("Waiting for {} background jobs to complete", Remaining);
- WorkerCounter.Wait(500);
+ size_t RunningJobCount = 0;
QueueLock.WithExclusiveLock([&]() {
for (auto& Job : RunningJobs)
{
Job.second->CancelFlag.store(true);
+ ZEN_INFO("Cancelling background job {}:'{}'", Job.second->Id.Id, Job.second->Name);
+ RunningJobCount++;
}
QueuedJobs.clear();
});
- Remaining = WorkerCounter.Remaining();
+ if (RunningJobCount == 0)
+ {
+ WorkerCounter.Wait();
+ break;
+ }
+ ptrdiff_t Remaining = WorkerCounter.Remaining();
+ if (Remaining > 0)
+ {
+ ZEN_INFO("Waiting for {} background jobs to complete", Remaining);
+ WorkerCounter.Wait(500);
+ }
}
}
- virtual void ReportMessage(JobId Id, std::string_view Message) override
- {
- QueueLock.WithSharedLock([&]() {
- auto It = RunningJobs.find(Id.Id);
- ZEN_ASSERT(It != RunningJobs.end());
- It->second->State.Messages.push_back(std::string(Message));
- });
- }
-
- virtual void ReportProgress(JobId Id, std::string_view CurrentOp, uint32_t CurrentOpPercentComplete) override
- {
- QueueLock.WithSharedLock([&]() {
- auto It = RunningJobs.find(Id.Id);
- ZEN_ASSERT(It != RunningJobs.end());
- It->second->State.CurrentOp = CurrentOp;
- It->second->State.CurrentOpPercentComplete = CurrentOpPercentComplete;
- });
- }
-
virtual std::vector<JobInfo> GetJobs() override
{
std::vector<JobId> DeadJobs;
@@ -240,13 +254,15 @@ public:
virtual std::optional<JobDetails> Get(JobId Id) override
{
auto Convert = [](Status Status, Job& Job) -> JobDetails {
- return JobDetails{.Status = Status,
- .State = {.CurrentOp = Job.State.CurrentOp,
- .CurrentOpPercentComplete = Job.State.CurrentOpPercentComplete,
- .Messages = std::move(Job.State.Messages)},
- .CreateTime = JobClock::TimePointFromTick(Job.CreateTick),
- .StartTime = JobClock::TimePointFromTick(Job.StartTick),
- .EndTime = JobClock::TimePointFromTick(Job.EndTick)};
+ return JobDetails{.Name = Job.Name,
+ .Status = Status,
+ .State = {.CurrentOp = Job.State.CurrentOp,
+ .CurrentOpPercentComplete = Job.State.CurrentOpPercentComplete,
+ .Messages = std::move(Job.State.Messages)},
+ .CreateTime = JobClock::TimePointFromTick(Job.CreateTick),
+ .StartTime = JobClock::TimePointFromTick(Job.StartTick),
+ .EndTime = JobClock::TimePointFromTick(Job.EndTick),
+ .WorkerThreadId = Job.WorkerThreadId};
};
std::optional<JobDetails> Result;
@@ -278,6 +294,25 @@ public:
return Result;
}
+ void ReportMessage(JobId Id, std::string_view Message)
+ {
+ QueueLock.WithSharedLock([&]() {
+ auto It = RunningJobs.find(Id.Id);
+ ZEN_ASSERT(It != RunningJobs.end());
+ It->second->State.Messages.push_back(std::string(Message));
+ });
+ }
+
+ void ReportProgress(JobId Id, std::string_view CurrentOp, uint32_t CurrentOpPercentComplete)
+ {
+ QueueLock.WithSharedLock([&]() {
+ auto It = RunningJobs.find(Id.Id);
+ ZEN_ASSERT(It != RunningJobs.end());
+ It->second->State.CurrentOp = CurrentOp;
+ It->second->State.CurrentOpPercentComplete = CurrentOpPercentComplete;
+ });
+ }
+
std::atomic_uint64_t IdGenerator = 1;
std::atomic_bool InitializedFlag = false;
@@ -292,6 +327,7 @@ public:
void Worker()
{
+ int CurrentThreadId = GetCurrentThreadId();
RefPtr<Job> CurrentJob;
QueueLock.WithExclusiveLock([&]() {
if (!QueuedJobs.empty())
@@ -300,7 +336,8 @@ public:
ZEN_ASSERT(CurrentJob);
QueuedJobs.pop_front();
RunningJobs.insert_or_assign(CurrentJob->Id.Id, CurrentJob);
- CurrentJob->StartTick = JobClock::Now();
+ CurrentJob->StartTick = JobClock::Now();
+ CurrentJob->WorkerThreadId = CurrentThreadId;
}
});
if (!CurrentJob)
@@ -310,23 +347,29 @@ public:
try
{
- JobContext Context{.Queue = *this, .Id = CurrentJob->Id, .CancelFlag = CurrentJob->CancelFlag};
- CurrentJob->Callback(Context);
+ SetCurrentThreadName(fmt::format("BkgJob: {}", CurrentJob->Name));
+ ZEN_DEBUG("Executing background job {}:'{}'", CurrentJob->Id.Id, CurrentJob->Name);
+ CurrentJob->Callback(*CurrentJob);
+ ZEN_DEBUG("Completed background job {}:'{}'", CurrentJob->Id.Id, CurrentJob->Name);
QueueLock.WithExclusiveLock([&]() {
- CurrentJob->EndTick = JobClock::Now();
+ CurrentJob->EndTick = JobClock::Now();
+ CurrentJob->WorkerThreadId = 0;
RunningJobs.erase(CurrentJob->Id.Id);
CompletedJobs.insert_or_assign(CurrentJob->Id.Id, std::move(CurrentJob));
});
}
catch (std::exception& Ex)
{
+ ZEN_DEBUG("Background job {}:'{}' aborted. Reason: '{}'", CurrentJob->Id.Id, CurrentJob->Name, Ex.what());
QueueLock.WithExclusiveLock([&]() {
CurrentJob->State.Messages.push_back(Ex.what());
- CurrentJob->EndTick = JobClock::Now();
+ CurrentJob->EndTick = JobClock::Now();
+ CurrentJob->WorkerThreadId = 0;
RunningJobs.erase(CurrentJob->Id.Id);
AbortedJobs.insert_or_assign(CurrentJob->Id.Id, std::move(CurrentJob));
});
}
+ SetCurrentThreadName(fmt::format("JobQueueImpl::Worker {}", GetCurrentThreadId()));
}
};
@@ -378,30 +421,30 @@ TEST_CASE("JobQueue")
for (uint32_t I = 0; I < 100; I++)
{
JobsLatch.AddCount(1);
- Pool.ScheduleWork([&Queue, &JobsLatch]() {
+ Pool.ScheduleWork([&Queue, &JobsLatch, I]() {
auto _ = MakeGuard([&JobsLatch]() { JobsLatch.CountDown(); });
- auto Id = Queue->QueueJob([&](JobContext& Context) {
- if (Context.CancelFlag.load())
+ auto Id = Queue->QueueJob(fmt::format("busy {}", I), [&](JobContext& Context) {
+ if (Context.IsCancelled())
{
return;
}
- Context.Queue.ReportProgress(Context.Id, "going to sleep", 0);
+ Context.ReportProgress("going to sleep", 0);
Sleep(10);
- if (Context.CancelFlag.load())
+ if (Context.IsCancelled())
{
return;
}
- Context.Queue.ReportProgress(Context.Id, "going to sleep again", 50);
- if ((Context.Id.Id & 0xFF) == 0x10)
+ Context.ReportProgress("going to sleep again", 50);
+ if ((I & 0xFF) == 0x10)
{
- zen::ThrowSystemError(8, fmt::format("Job {} forced to fail", Context.Id.Id));
+ zen::ThrowSystemError(8, fmt::format("Job {} forced to fail", I));
}
Sleep(10);
- if (Context.CancelFlag.load())
+ if (Context.IsCancelled())
{
return;
}
- Context.Queue.ReportProgress(Context.Id, "done", 100);
+ Context.ReportProgress("done", 100);
});
});
}