// Copyright Epic Games, Inc. All Rights Reserved. #include #include #include #include #include #if ZEN_WITH_TESTS # include #endif // ZEN_WITH_TESTS ZEN_THIRD_PARTY_INCLUDES_START #include ZEN_THIRD_PARTY_INCLUDES_END #include #include #include namespace zen { namespace JobClock { using ClockSource = std::chrono::system_clock; using Tick = ClockSource::rep; Tick Never() { return ClockSource::time_point::min().time_since_epoch().count(); } Tick Always() { return ClockSource::time_point::max().time_since_epoch().count(); } Tick Now() { return ClockSource::now().time_since_epoch().count(); } ClockSource::time_point TimePointFromTick(const Tick TickCount) { return ClockSource::time_point{ClockSource::duration{TickCount}}; } } // namespace JobClock class JobQueueImpl; class JobQueueImpl : public JobQueue { public: struct Job : public RefCounted, public JobContext { JobQueueImpl* Queue; std::string Name; JobId Id; JobFunction Callback; std::atomic_bool CancelFlag; State State; JobClock::Tick CreateTick; JobClock::Tick StartTick; JobClock::Tick EndTick; int WorkerThreadId; virtual bool IsCancelled() const override { return CancelFlag.load(); } virtual void ReportMessage(std::string_view Message) override { Queue->ReportMessage(Id, Message); } virtual void ReportProgress(std::string_view CurrentOp, std::string_view Details, ptrdiff_t TotalCount, ptrdiff_t RemainingCount) override { Queue->ReportProgress(Id, CurrentOp, Details, TotalCount, RemainingCount); } }; JobQueueImpl(int WorkerCount, std::string_view QueueName) : WorkerPool(WorkerCount, QueueName), WorkerCounter(1) { InitializedFlag.store(true); } virtual ~JobQueueImpl() { try { if (InitializedFlag) { Stop(); } } catch (const std::exception& Ex) { ZEN_WARN("Failed shutting down jobqueue. Reason: '{}'", Ex.what()); } } virtual JobId QueueJob(std::string_view Name, JobFunction&& JobFunc) override { ZEN_ASSERT(InitializedFlag); uint64_t NewJobId = IdGenerator.fetch_add(1); if (NewJobId == 0) { IdGenerator.fetch_add(1); } RefPtr NewJob(new Job()); NewJob->Queue = this; NewJob->Name = Name; NewJob->Callback = std::move(JobFunc); NewJob->CancelFlag = false; NewJob->Id = JobId{.Id = NewJobId}; NewJob->CreateTick = JobClock::Now(); NewJob->StartTick = JobClock::Never(); NewJob->EndTick = JobClock::Never(); NewJob->WorkerThreadId = 0; ZEN_DEBUG("Scheduling background job {}:'{}'", NewJob->Id.Id, NewJob->Name); QueueLock.WithExclusiveLock([&]() { QueuedJobs.emplace_back(std::move(NewJob)); }); WorkerCounter.AddCount(1); try { WorkerPool.ScheduleWork([&]() { auto _ = MakeGuard([&]() { WorkerCounter.CountDown(); }); Worker(); }); return {.Id = NewJobId}; } catch (const std::exception& Ex) { WorkerCounter.CountDown(); QueueLock.WithExclusiveLock([&]() { if (auto It = std::find_if(QueuedJobs.begin(), QueuedJobs.end(), [NewJobId](const RefPtr& Job) { return Job->Id.Id == NewJobId; }); It != QueuedJobs.end()) { QueuedJobs.erase(It); } }); ZEN_ERROR("Failed to schedule job {}:'{}' to job queue. Reason: ''", NewJob->Id.Id, NewJob->Name, Ex.what()); throw; } } virtual bool CancelJob(JobId Id) override { bool Result = false; QueueLock.WithExclusiveLock([&]() { if (auto It = RunningJobs.find(Id.Id); It != RunningJobs.end()) { ZEN_DEBUG("Cancelling running background job {}:'{}'", It->second->Id.Id, It->second->Name); It->second->CancelFlag.store(true); Result = true; return; } if (auto It = CompletedJobs.find(Id.Id); It != CompletedJobs.end()) { Result = true; return; } if (auto It = AbortedJobs.find(Id.Id); It != AbortedJobs.end()) { Result = true; return; } if (auto It = std::find_if(QueuedJobs.begin(), QueuedJobs.end(), [&Id](const RefPtr& Job) { return Job->Id.Id == Id.Id; }); It != QueuedJobs.end()) { ZEN_DEBUG("Cancelling queued background job {}:'{}'", (*It)->Id.Id, (*It)->Name); QueuedJobs.erase(It); Result = true; return; } }); return Result; } virtual void Stop() override { ZEN_DEBUG("Stopping jobqueue"); if (!InitializedFlag) { return; } InitializedFlag.store(false); QueueLock.WithExclusiveLock([&]() { for (auto& Job : RunningJobs) { Job.second->CancelFlag.store(true); } QueuedJobs.clear(); }); WorkerCounter.CountDown(); while (true) { size_t RunningJobCount = 0; QueueLock.WithExclusiveLock([&]() { for (auto& Job : RunningJobs) { Job.second->CancelFlag.store(true); ZEN_INFO("Cancelling background job {}:'{}'", Job.second->Id.Id, Job.second->Name); RunningJobCount++; } QueuedJobs.clear(); }); if (RunningJobCount == 0) { WorkerCounter.Wait(); break; } ptrdiff_t Remaining = WorkerCounter.Remaining(); if (Remaining > 0) { ZEN_INFO("Waiting for {} background jobs to complete", Remaining); WorkerCounter.Wait(500); } } } virtual std::vector GetJobs() override { std::vector DeadJobs; auto IsStale = [](JobClock::Tick Time) { ZEN_ASSERT_SLOW(Time != JobClock::Never()); const std::chrono::system_clock::time_point Now = std::chrono::system_clock::now(); std::chrono::system_clock::duration Age = Now - JobClock::TimePointFromTick(Time); return std::chrono::duration_cast(Age) > std::chrono::days(1); }; std::vector Jobs; QueueLock.WithSharedLock([&]() { for (auto It : RunningJobs) { Jobs.push_back({.Id = JobId{It.first}, .Status = Status::Running}); } for (auto It : CompletedJobs) { if (IsStale(It.second->EndTick)) { DeadJobs.push_back(JobId{It.first}); continue; } Jobs.push_back({.Id = JobId{It.first}, .Status = Status::Completed}); } for (auto It : AbortedJobs) { if (IsStale(It.second->EndTick)) { DeadJobs.push_back(JobId{It.first}); continue; } Jobs.push_back({.Id = JobId{It.first}, .Status = Status::Aborted}); } for (auto It : QueuedJobs) { Jobs.push_back({.Id = It->Id, .Status = Status::Queued}); } }); if (!DeadJobs.empty()) { QueueLock.WithExclusiveLock([&]() { for (JobId Id : DeadJobs) { AbortedJobs.erase(Id.Id); CompletedJobs.erase(Id.Id); } }); } return Jobs; } // Will only respond once when Complete is true virtual std::optional Get(JobId Id) override { auto Convert = [](Status Status, Job& Job) -> JobDetails { return JobDetails{ .Name = Job.Name, .Status = Status, .State = {.CurrentOp = Job.State.CurrentOp, .CurrentOpDetails = Job.State.CurrentOpDetails, .TotalCount = Job.State.TotalCount, .RemainingCount = Job.State.RemainingCount, // .CurrentOpPercentComplete = Job.State.CurrentOpPercentComplete, .Messages = std::move(Job.State.Messages), .AbortReason = Job.State.AbortReason}, .CreateTime = JobClock::TimePointFromTick(Job.CreateTick), .StartTime = JobClock::TimePointFromTick(Job.StartTick), .EndTime = JobClock::TimePointFromTick(Job.EndTick), .WorkerThreadId = Job.WorkerThreadId}; }; std::optional Result; QueueLock.WithExclusiveLock([&]() { if (auto It = RunningJobs.find(Id.Id); It != RunningJobs.end()) { Result = Convert(Status::Running, *It->second); return; } if (auto It = CompletedJobs.find(Id.Id); It != CompletedJobs.end()) { Result = Convert(Status::Completed, *It->second); CompletedJobs.erase(It); return; } if (auto It = AbortedJobs.find(Id.Id); It != AbortedJobs.end()) { Result = Convert(Status::Aborted, *It->second); AbortedJobs.erase(It); return; } if (auto It = std::find_if(QueuedJobs.begin(), QueuedJobs.end(), [&Id](const RefPtr& Job) { return Job->Id.Id == Id.Id; }); It != QueuedJobs.end()) { Result = Convert(Status::Queued, *(*It)); return; } }); return Result; } void ReportMessage(JobId Id, std::string_view Message) { QueueLock.WithExclusiveLock([&]() { auto It = RunningJobs.find(Id.Id); ZEN_ASSERT(It != RunningJobs.end()); It->second->State.Messages.push_back(std::string(Message)); }); } void ReportProgress(JobId Id, std::string_view CurrentOp, std::string_view Details, ptrdiff_t TotalCount, ptrdiff_t RemainingCount) { QueueLock.WithExclusiveLock([&]() { auto It = RunningJobs.find(Id.Id); ZEN_ASSERT(It != RunningJobs.end()); It->second->State.CurrentOp = CurrentOp; It->second->State.CurrentOpDetails = Details; It->second->State.TotalCount = TotalCount; It->second->State.RemainingCount = RemainingCount; }); } std::atomic_uint64_t IdGenerator = 1; std::atomic_bool InitializedFlag = false; RwLock QueueLock; std::deque> QueuedJobs; std::unordered_map RunningJobs; std::unordered_map> CompletedJobs; std::unordered_map> AbortedJobs; WorkerThreadPool WorkerPool; Latch WorkerCounter; void Worker() { int CurrentThreadId = GetCurrentThreadId(); RefPtr CurrentJob; QueueLock.WithExclusiveLock([&]() { if (!QueuedJobs.empty()) { CurrentJob = std::move(QueuedJobs.front()); ZEN_ASSERT(CurrentJob); QueuedJobs.pop_front(); RunningJobs.insert_or_assign(CurrentJob->Id.Id, CurrentJob); CurrentJob->StartTick = JobClock::Now(); CurrentJob->WorkerThreadId = CurrentThreadId; } }); if (!CurrentJob) { return; } try { SetCurrentThreadName(fmt::format("BkgJob: {}", CurrentJob->Name)); ZEN_DEBUG("Executing background job {}:'{}'", CurrentJob->Id.Id, CurrentJob->Name); CurrentJob->Callback(*CurrentJob); ZEN_DEBUG("Completed background job {}:'{}'", CurrentJob->Id.Id, CurrentJob->Name); QueueLock.WithExclusiveLock([&]() { CurrentJob->EndTick = JobClock::Now(); CurrentJob->WorkerThreadId = 0; RunningJobs.erase(CurrentJob->Id.Id); CompletedJobs.insert_or_assign(CurrentJob->Id.Id, std::move(CurrentJob)); }); } catch (const AssertException& Ex) { ZEN_DEBUG("Background job {}:'{}' asserted. Reason: {}", CurrentJob->Id.Id, CurrentJob->Name, Ex.FullDescription()); QueueLock.WithExclusiveLock([&]() { CurrentJob->State.AbortReason = Ex.FullDescription(); CurrentJob->EndTick = JobClock::Now(); CurrentJob->WorkerThreadId = 0; RunningJobs.erase(CurrentJob->Id.Id); AbortedJobs.insert_or_assign(CurrentJob->Id.Id, std::move(CurrentJob)); }); } catch (const std::exception& Ex) { ZEN_DEBUG("Background job {}:'{}' aborted. Reason: '{}'", CurrentJob->Id.Id, CurrentJob->Name, Ex.what()); QueueLock.WithExclusiveLock([&]() { CurrentJob->State.AbortReason = Ex.what(); CurrentJob->EndTick = JobClock::Now(); CurrentJob->WorkerThreadId = 0; RunningJobs.erase(CurrentJob->Id.Id); AbortedJobs.insert_or_assign(CurrentJob->Id.Id, std::move(CurrentJob)); }); } SetCurrentThreadName(fmt::format("JobQueueImpl::Worker {}", GetCurrentThreadId())); } }; std::string_view JobQueue::ToString(Status Status) { using namespace std::literals; switch (Status) { case JobQueue::Status::Queued: return "Queued"sv; break; case JobQueue::Status::Running: return "Running"sv; break; case JobQueue::Status::Aborted: return "Aborted"sv; break; case JobQueue::Status::Completed: return "Completed"sv; break; default: ZEN_ASSERT(false); } return ""sv; } std::unique_ptr MakeJobQueue(int WorkerCount, std::string_view QueueName) { return std::make_unique(WorkerCount, QueueName); } ////////////////////////////////////////////////////////////////////////// #if ZEN_WITH_TESTS void jobqueue_forcelink() { } TEST_CASE("JobQueue") { std::unique_ptr Queue(MakeJobQueue(2, "queue")); WorkerThreadPool Pool(4); Latch JobsLatch(1); for (uint32_t I = 0; I < 100; I++) { JobsLatch.AddCount(1); Pool.ScheduleWork([&Queue, &JobsLatch, I]() { auto _ = MakeGuard([&JobsLatch]() { JobsLatch.CountDown(); }); JobsLatch.AddCount(1); auto Id = Queue->QueueJob(fmt::format("busy {}", I), [&JobsLatch, I](JobContext& Context) { auto $ = MakeGuard([&JobsLatch]() { JobsLatch.CountDown(); }); if (Context.IsCancelled()) { return; } Context.ReportProgress("going to sleep", "", 100, 100); Sleep(10); if (Context.IsCancelled()) { return; } Context.ReportProgress("going to sleep again", "", 100, 50); if ((I & 0xFF) == 0x10) { zen::ThrowSystemError(8, fmt::format("Job {} forced to fail", I)); } Sleep(10); if (Context.IsCancelled()) { return; } Context.ReportProgress("done", "", 100, 0); }); }); } auto Join = [](std::span Strings, std::string_view Delimiter) -> std::string { ExtendableStringBuilder<128> SB; if (Strings.empty()) { return {}; } auto It = Strings.begin(); SB.Append(*It); It++; while (It != Strings.end()) { SB.Append(Delimiter); SB.Append(*It); It++; } return SB.ToString(); }; JobsLatch.CountDown(); while (true) { bool PendingQueue = JobsLatch.Remaining() > 0; size_t PendingCount = 0; std::vector RemainingJobs; std::vector Statuses = Queue->GetJobs(); RemainingJobs.reserve(Statuses.size()); for (const auto& It : Statuses) { JobQueue::Status Status = It.Status; JobId Id = It.Id; std::optional CurrentState; if (Status != JobQueue::Status::Queued) { CurrentState = Queue->Get(Id); CHECK(CurrentState.has_value()); } switch (Status) { case JobQueue::Status::Queued: PendingCount++; RemainingJobs.push_back(Id); break; case JobQueue::Status::Running: ZEN_DEBUG( "{} running. '{}{}' {}% '{}'", Id.Id, CurrentState->State.CurrentOp, CurrentState->State.CurrentOpDetails.empty() ? ""sv : fmt::format(", {}", CurrentState->State.CurrentOpDetails), CurrentState->State.TotalCount > 0 ? gsl::narrow((100 * (CurrentState->State.TotalCount - CurrentState->State.RemainingCount)) / CurrentState->State.TotalCount) : 0, Join(CurrentState->State.Messages, " "sv)); RemainingJobs.push_back(Id); break; case JobQueue::Status::Aborted: ZEN_DEBUG("{} aborted. Reason: '{}'", Id.Id, CurrentState->State.AbortReason); break; case JobQueue::Status::Completed: ZEN_DEBUG("{} completed. '{}'", Id.Id, Join(CurrentState->State.Messages, " "sv)); break; default: CHECK(false); break; } } if (RemainingJobs.empty() && !PendingQueue) { break; } ZEN_INFO("{} jobs active, {} pending in queue, {} running", RemainingJobs.size(), PendingCount, RemainingJobs.size() - PendingCount); Sleep(100); } JobsLatch.Wait(); } #endif } // namespace zen