diff options
| author | Dan Engelbrecht <[email protected]> | 2023-09-20 17:22:49 -0400 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-09-20 23:22:49 +0200 |
| commit | 73b86bbbd15e22893f05261dee036b81bfff91cf (patch) | |
| tree | 45c36ed9f8c7e340bd1ba292ec11cab5186b6965 /src/zencore/jobqueue.cpp | |
| parent | controlled zenserver shutdown (#413) (diff) | |
| download | zen-73b86bbbd15e22893f05261dee036b81bfff91cf.tar.xz zen-73b86bbbd15e22893f05261dee036b81bfff91cf.zip | |
Improvement: Add names to background jobs for easier debugging (#412)
Improvement: Background jobs now temporarily sets thread name to background job name while executing
Improvement: Background jobs tracks worker thread id used while executing
Diffstat (limited to 'src/zencore/jobqueue.cpp')
| -rw-r--r-- | src/zencore/jobqueue.cpp | 155 |
1 files changed, 99 insertions, 56 deletions
diff --git a/src/zencore/jobqueue.cpp b/src/zencore/jobqueue.cpp index 34530b3a2..1755b9fe9 100644 --- a/src/zencore/jobqueue.cpp +++ b/src/zencore/jobqueue.cpp @@ -29,11 +29,15 @@ namespace JobClock { ClockSource::time_point TimePointFromTick(const Tick TickCount) { return ClockSource::time_point{ClockSource::duration{TickCount}}; } } // namespace JobClock +class JobQueueImpl; + class JobQueueImpl : public JobQueue { public: - struct Job : public RefCounted + struct Job : public RefCounted, public JobContext { + JobQueueImpl* Queue; + std::string Name; JobId Id; JobFunction Callback; std::atomic_bool CancelFlag; @@ -41,6 +45,14 @@ public: JobClock::Tick CreateTick; JobClock::Tick StartTick; JobClock::Tick EndTick; + int WorkerThreadId; + + virtual bool IsCancelled() const override { return CancelFlag.load(); } + virtual void ReportMessage(std::string_view Message) override { Queue->ReportMessage(Id, Message); } + virtual void ReportProgress(std::string_view CurrentOp, uint32_t CurrentOpPercentComplete) override + { + Queue->ReportProgress(Id, CurrentOp, CurrentOpPercentComplete); + } }; JobQueueImpl(int WorkerCount, std::string_view QueueName) : WorkerPool(WorkerCount, QueueName), WorkerCounter(1) @@ -63,7 +75,7 @@ public: } } - virtual JobId QueueJob(JobFunction&& JobFunc) override + virtual JobId QueueJob(std::string_view Name, JobFunction&& JobFunc) override { ZEN_ASSERT(InitializedFlag); @@ -73,13 +85,17 @@ public: IdGenerator.fetch_add(1); } RefPtr<Job> NewJob(new Job()); - NewJob->Callback = std::move(JobFunc); - NewJob->CancelFlag = false; - NewJob->Id = JobId{.Id = NewJobId}; - NewJob->CreateTick = JobClock::Now(); - NewJob->StartTick = JobClock::Never(); - NewJob->EndTick = JobClock::Never(); - + NewJob->Queue = this; + NewJob->Name = Name; + NewJob->Callback = std::move(JobFunc); + NewJob->CancelFlag = false; + NewJob->Id = JobId{.Id = NewJobId}; + NewJob->CreateTick = JobClock::Now(); + NewJob->StartTick = JobClock::Never(); + NewJob->EndTick = JobClock::Never(); + NewJob->WorkerThreadId = 0; + + ZEN_DEBUG("Scheduling background job {}:'{}'", NewJob->Id.Id, NewJob->Name); QueueLock.WithExclusiveLock([&]() { QueuedJobs.emplace_back(std::move(NewJob)); }); WorkerCounter.AddCount(1); try @@ -102,7 +118,7 @@ public: QueuedJobs.erase(It); } }); - ZEN_ERROR("Failed to schedule job to job queue. Reason: ''", Ex.what()); + ZEN_ERROR("Failed to schedule job {}:'{}' to job queue. Reason: ''", NewJob->Id.Id, NewJob->Name, Ex.what()); throw; } } @@ -113,6 +129,7 @@ public: QueueLock.WithExclusiveLock([&]() { if (auto It = RunningJobs.find(Id.Id); It != RunningJobs.end()) { + ZEN_DEBUG("Cancelling running background job {}:'{}'", It->second->Id.Id, It->second->Name); It->second->CancelFlag.store(true); Result = true; return; @@ -130,6 +147,7 @@ public: if (auto It = std::find_if(QueuedJobs.begin(), QueuedJobs.end(), [&Id](const RefPtr<Job>& Job) { return Job->Id.Id == Id.Id; }); It != QueuedJobs.end()) { + ZEN_DEBUG("Cancelling queued background job {}:'{}'", (*It)->Id.Id, (*It)->Name); QueuedJobs.erase(It); Result = true; return; @@ -140,6 +158,11 @@ public: virtual void Stop() override { + ZEN_DEBUG("Stopping jobqueue"); + if (!InitializedFlag) + { + return; + } InitializedFlag.store(false); QueueLock.WithExclusiveLock([&]() { for (auto& Job : RunningJobs) @@ -149,41 +172,32 @@ public: QueuedJobs.clear(); }); WorkerCounter.CountDown(); - ptrdiff_t Remaining = WorkerCounter.Remaining(); - while (Remaining > 0) + while (true) { - ZEN_INFO("Waiting for {} background jobs to complete", Remaining); - WorkerCounter.Wait(500); + size_t RunningJobCount = 0; QueueLock.WithExclusiveLock([&]() { for (auto& Job : RunningJobs) { Job.second->CancelFlag.store(true); + ZEN_INFO("Cancelling background job {}:'{}'", Job.second->Id.Id, Job.second->Name); + RunningJobCount++; } QueuedJobs.clear(); }); - Remaining = WorkerCounter.Remaining(); + if (RunningJobCount == 0) + { + WorkerCounter.Wait(); + break; + } + ptrdiff_t Remaining = WorkerCounter.Remaining(); + if (Remaining > 0) + { + ZEN_INFO("Waiting for {} background jobs to complete", Remaining); + WorkerCounter.Wait(500); + } } } - virtual void ReportMessage(JobId Id, std::string_view Message) override - { - QueueLock.WithSharedLock([&]() { - auto It = RunningJobs.find(Id.Id); - ZEN_ASSERT(It != RunningJobs.end()); - It->second->State.Messages.push_back(std::string(Message)); - }); - } - - virtual void ReportProgress(JobId Id, std::string_view CurrentOp, uint32_t CurrentOpPercentComplete) override - { - QueueLock.WithSharedLock([&]() { - auto It = RunningJobs.find(Id.Id); - ZEN_ASSERT(It != RunningJobs.end()); - It->second->State.CurrentOp = CurrentOp; - It->second->State.CurrentOpPercentComplete = CurrentOpPercentComplete; - }); - } - virtual std::vector<JobInfo> GetJobs() override { std::vector<JobId> DeadJobs; @@ -240,13 +254,15 @@ public: virtual std::optional<JobDetails> Get(JobId Id) override { auto Convert = [](Status Status, Job& Job) -> JobDetails { - return JobDetails{.Status = Status, - .State = {.CurrentOp = Job.State.CurrentOp, - .CurrentOpPercentComplete = Job.State.CurrentOpPercentComplete, - .Messages = std::move(Job.State.Messages)}, - .CreateTime = JobClock::TimePointFromTick(Job.CreateTick), - .StartTime = JobClock::TimePointFromTick(Job.StartTick), - .EndTime = JobClock::TimePointFromTick(Job.EndTick)}; + return JobDetails{.Name = Job.Name, + .Status = Status, + .State = {.CurrentOp = Job.State.CurrentOp, + .CurrentOpPercentComplete = Job.State.CurrentOpPercentComplete, + .Messages = std::move(Job.State.Messages)}, + .CreateTime = JobClock::TimePointFromTick(Job.CreateTick), + .StartTime = JobClock::TimePointFromTick(Job.StartTick), + .EndTime = JobClock::TimePointFromTick(Job.EndTick), + .WorkerThreadId = Job.WorkerThreadId}; }; std::optional<JobDetails> Result; @@ -278,6 +294,25 @@ public: return Result; } + void ReportMessage(JobId Id, std::string_view Message) + { + QueueLock.WithSharedLock([&]() { + auto It = RunningJobs.find(Id.Id); + ZEN_ASSERT(It != RunningJobs.end()); + It->second->State.Messages.push_back(std::string(Message)); + }); + } + + void ReportProgress(JobId Id, std::string_view CurrentOp, uint32_t CurrentOpPercentComplete) + { + QueueLock.WithSharedLock([&]() { + auto It = RunningJobs.find(Id.Id); + ZEN_ASSERT(It != RunningJobs.end()); + It->second->State.CurrentOp = CurrentOp; + It->second->State.CurrentOpPercentComplete = CurrentOpPercentComplete; + }); + } + std::atomic_uint64_t IdGenerator = 1; std::atomic_bool InitializedFlag = false; @@ -292,6 +327,7 @@ public: void Worker() { + int CurrentThreadId = GetCurrentThreadId(); RefPtr<Job> CurrentJob; QueueLock.WithExclusiveLock([&]() { if (!QueuedJobs.empty()) @@ -300,7 +336,8 @@ public: ZEN_ASSERT(CurrentJob); QueuedJobs.pop_front(); RunningJobs.insert_or_assign(CurrentJob->Id.Id, CurrentJob); - CurrentJob->StartTick = JobClock::Now(); + CurrentJob->StartTick = JobClock::Now(); + CurrentJob->WorkerThreadId = CurrentThreadId; } }); if (!CurrentJob) @@ -310,23 +347,29 @@ public: try { - JobContext Context{.Queue = *this, .Id = CurrentJob->Id, .CancelFlag = CurrentJob->CancelFlag}; - CurrentJob->Callback(Context); + SetCurrentThreadName(fmt::format("BkgJob: {}", CurrentJob->Name)); + ZEN_DEBUG("Executing background job {}:'{}'", CurrentJob->Id.Id, CurrentJob->Name); + CurrentJob->Callback(*CurrentJob); + ZEN_DEBUG("Completed background job {}:'{}'", CurrentJob->Id.Id, CurrentJob->Name); QueueLock.WithExclusiveLock([&]() { - CurrentJob->EndTick = JobClock::Now(); + CurrentJob->EndTick = JobClock::Now(); + CurrentJob->WorkerThreadId = 0; RunningJobs.erase(CurrentJob->Id.Id); CompletedJobs.insert_or_assign(CurrentJob->Id.Id, std::move(CurrentJob)); }); } catch (std::exception& Ex) { + ZEN_DEBUG("Background job {}:'{}' aborted. Reason: '{}'", CurrentJob->Id.Id, CurrentJob->Name, Ex.what()); QueueLock.WithExclusiveLock([&]() { CurrentJob->State.Messages.push_back(Ex.what()); - CurrentJob->EndTick = JobClock::Now(); + CurrentJob->EndTick = JobClock::Now(); + CurrentJob->WorkerThreadId = 0; RunningJobs.erase(CurrentJob->Id.Id); AbortedJobs.insert_or_assign(CurrentJob->Id.Id, std::move(CurrentJob)); }); } + SetCurrentThreadName(fmt::format("JobQueueImpl::Worker {}", GetCurrentThreadId())); } }; @@ -378,30 +421,30 @@ TEST_CASE("JobQueue") for (uint32_t I = 0; I < 100; I++) { JobsLatch.AddCount(1); - Pool.ScheduleWork([&Queue, &JobsLatch]() { + Pool.ScheduleWork([&Queue, &JobsLatch, I]() { auto _ = MakeGuard([&JobsLatch]() { JobsLatch.CountDown(); }); - auto Id = Queue->QueueJob([&](JobContext& Context) { - if (Context.CancelFlag.load()) + auto Id = Queue->QueueJob(fmt::format("busy {}", I), [&](JobContext& Context) { + if (Context.IsCancelled()) { return; } - Context.Queue.ReportProgress(Context.Id, "going to sleep", 0); + Context.ReportProgress("going to sleep", 0); Sleep(10); - if (Context.CancelFlag.load()) + if (Context.IsCancelled()) { return; } - Context.Queue.ReportProgress(Context.Id, "going to sleep again", 50); - if ((Context.Id.Id & 0xFF) == 0x10) + Context.ReportProgress("going to sleep again", 50); + if ((I & 0xFF) == 0x10) { - zen::ThrowSystemError(8, fmt::format("Job {} forced to fail", Context.Id.Id)); + zen::ThrowSystemError(8, fmt::format("Job {} forced to fail", I)); } Sleep(10); - if (Context.CancelFlag.load()) + if (Context.IsCancelled()) { return; } - Context.Queue.ReportProgress(Context.Id, "done", 100); + Context.ReportProgress("done", 100); }); }); } |