diff options
| author | Dan Engelbrecht <[email protected]> | 2023-09-13 16:13:30 -0400 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-09-13 22:13:30 +0200 |
| commit | b2cef5900b6e251bed4bc0a02161fd90646d37f0 (patch) | |
| tree | e9085a92e9499bca55dfda9b63779be94218409f /src/zencore/jobqueue.cpp | |
| parent | scan oplog object for fields (#397) (diff) | |
| download | zen-b2cef5900b6e251bed4bc0a02161fd90646d37f0.tar.xz zen-b2cef5900b6e251bed4bc0a02161fd90646d37f0.zip | |
job queue and async oplog-import/export (#395)
- Feature: New http endpoint for background jobs `/admin/jobs/status` which will return a response listing the currently active background jobs and their status
- Feature: New http endpoint for background jobs information `/admin/jobs/status/{jobid}` which will return a response detailing status, pending messages and progress status
- GET will return a response detailing status, pending messages and progress status
- DELETE will mark the job for cancelling and return without waiting for completion
- If status returned is "Complete" or "Aborted" the jobid will be removed from the server and can not be queried again
- Feature: New zen command `jobs` to list, get info about and cancel background jobs
- If no options are given it will display a list of active background jobs
- `--jobid` accepts an id (returned from for example `oplog-export` with `--async`) and will return a response detailing status, pending messages and progress status for that job
- `--cancel` can be added when `--jobid` is given which will request zenserver to cancel the background job
- Feature: oplog import and export http rpc requests are now async operations that will run in the background
- Feature: `oplog-export` and `oplog-import` now reports progress to the console as work progress by default
- Feature: `oplog-export` and `oplog-import` can now be cancelled using Ctrl+C
- Feature: `oplog-export` and `oplog-import` has a new option `--async` which will only trigger the work and report a background job id back
Diffstat (limited to 'src/zencore/jobqueue.cpp')
| -rw-r--r-- | src/zencore/jobqueue.cpp | 486 |
1 files changed, 486 insertions, 0 deletions
diff --git a/src/zencore/jobqueue.cpp b/src/zencore/jobqueue.cpp new file mode 100644 index 000000000..34530b3a2 --- /dev/null +++ b/src/zencore/jobqueue.cpp @@ -0,0 +1,486 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <zencore/jobqueue.h> + +#include <zencore/except.h> +#include <zencore/scopeguard.h> +#include <zencore/thread.h> +#include <zencore/workthreadpool.h> + +#if ZEN_WITH_TESTS +# include <zencore/testing.h> +#endif // ZEN_WITH_TESTS + +#include <deque> +#include <thread> +#include <unordered_map> + +namespace zen { + +namespace JobClock { + using ClockSource = std::chrono::system_clock; + + using Tick = ClockSource::rep; + + Tick Never() { return ClockSource::time_point::min().time_since_epoch().count(); } + Tick Always() { return ClockSource::time_point::max().time_since_epoch().count(); } + Tick Now() { return ClockSource::now().time_since_epoch().count(); } + + ClockSource::time_point TimePointFromTick(const Tick TickCount) { return ClockSource::time_point{ClockSource::duration{TickCount}}; } +} // namespace JobClock + +class JobQueueImpl : public JobQueue +{ +public: + struct Job : public RefCounted + { + JobId Id; + JobFunction Callback; + std::atomic_bool CancelFlag; + State State; + JobClock::Tick CreateTick; + JobClock::Tick StartTick; + JobClock::Tick EndTick; + }; + + JobQueueImpl(int WorkerCount, std::string_view QueueName) : WorkerPool(WorkerCount, QueueName), WorkerCounter(1) + { + InitializedFlag.store(true); + } + + virtual ~JobQueueImpl() + { + try + { + if (InitializedFlag) + { + Stop(); + } + } + catch (std::exception& Ex) + { + ZEN_WARN("Failed shutting down jobqueue. Reason: '{}'", Ex.what()); + } + } + + virtual JobId QueueJob(JobFunction&& JobFunc) override + { + ZEN_ASSERT(InitializedFlag); + + uint64_t NewJobId = IdGenerator.fetch_add(1); + if (NewJobId == 0) + { + IdGenerator.fetch_add(1); + } + RefPtr<Job> NewJob(new Job()); + NewJob->Callback = std::move(JobFunc); + NewJob->CancelFlag = false; + NewJob->Id = JobId{.Id = NewJobId}; + NewJob->CreateTick = JobClock::Now(); + NewJob->StartTick = JobClock::Never(); + NewJob->EndTick = JobClock::Never(); + + QueueLock.WithExclusiveLock([&]() { QueuedJobs.emplace_back(std::move(NewJob)); }); + WorkerCounter.AddCount(1); + try + { + WorkerPool.ScheduleWork([&]() { + auto _ = MakeGuard([&]() { WorkerCounter.CountDown(); }); + Worker(); + }); + return {.Id = NewJobId}; + } + catch (std::exception& Ex) + { + WorkerCounter.CountDown(); + QueueLock.WithExclusiveLock([&]() { + if (auto It = std::find_if(QueuedJobs.begin(), + QueuedJobs.end(), + [NewJobId](const RefPtr<Job>& Job) { return Job->Id.Id == NewJobId; }); + It != QueuedJobs.end()) + { + QueuedJobs.erase(It); + } + }); + ZEN_ERROR("Failed to schedule job to job queue. Reason: ''", Ex.what()); + throw; + } + } + + virtual bool CancelJob(JobId Id) override + { + bool Result = false; + QueueLock.WithExclusiveLock([&]() { + if (auto It = RunningJobs.find(Id.Id); It != RunningJobs.end()) + { + It->second->CancelFlag.store(true); + Result = true; + return; + } + if (auto It = CompletedJobs.find(Id.Id); It != CompletedJobs.end()) + { + Result = true; + return; + } + if (auto It = AbortedJobs.find(Id.Id); It != AbortedJobs.end()) + { + Result = true; + return; + } + if (auto It = std::find_if(QueuedJobs.begin(), QueuedJobs.end(), [&Id](const RefPtr<Job>& Job) { return Job->Id.Id == Id.Id; }); + It != QueuedJobs.end()) + { + QueuedJobs.erase(It); + Result = true; + return; + } + }); + return Result; + } + + virtual void Stop() override + { + InitializedFlag.store(false); + QueueLock.WithExclusiveLock([&]() { + for (auto& Job : RunningJobs) + { + Job.second->CancelFlag.store(true); + } + QueuedJobs.clear(); + }); + WorkerCounter.CountDown(); + ptrdiff_t Remaining = WorkerCounter.Remaining(); + while (Remaining > 0) + { + ZEN_INFO("Waiting for {} background jobs to complete", Remaining); + WorkerCounter.Wait(500); + QueueLock.WithExclusiveLock([&]() { + for (auto& Job : RunningJobs) + { + Job.second->CancelFlag.store(true); + } + QueuedJobs.clear(); + }); + Remaining = WorkerCounter.Remaining(); + } + } + + virtual void ReportMessage(JobId Id, std::string_view Message) override + { + QueueLock.WithSharedLock([&]() { + auto It = RunningJobs.find(Id.Id); + ZEN_ASSERT(It != RunningJobs.end()); + It->second->State.Messages.push_back(std::string(Message)); + }); + } + + virtual void ReportProgress(JobId Id, std::string_view CurrentOp, uint32_t CurrentOpPercentComplete) override + { + QueueLock.WithSharedLock([&]() { + auto It = RunningJobs.find(Id.Id); + ZEN_ASSERT(It != RunningJobs.end()); + It->second->State.CurrentOp = CurrentOp; + It->second->State.CurrentOpPercentComplete = CurrentOpPercentComplete; + }); + } + + virtual std::vector<JobInfo> GetJobs() override + { + std::vector<JobId> DeadJobs; + auto IsStale = [](JobClock::Tick Time) { + ZEN_ASSERT_SLOW(Time != JobClock::Never()); + const std::chrono::system_clock::time_point Now = std::chrono::system_clock::now(); + std::chrono::system_clock::duration Age = Now - JobClock::TimePointFromTick(Time); + return std::chrono::duration_cast<std::chrono::days>(Age) > std::chrono::days(1); + }; + + std::vector<JobInfo> Jobs; + QueueLock.WithSharedLock([&]() { + for (auto It : RunningJobs) + { + Jobs.push_back({.Id = JobId{It.first}, .Status = Status::Running}); + } + for (auto It : CompletedJobs) + { + if (IsStale(It.second->EndTick)) + { + DeadJobs.push_back(JobId{It.first}); + continue; + } + Jobs.push_back({.Id = JobId{It.first}, .Status = Status::Completed}); + } + for (auto It : AbortedJobs) + { + if (IsStale(It.second->EndTick)) + { + DeadJobs.push_back(JobId{It.first}); + continue; + } + Jobs.push_back({.Id = JobId{It.first}, .Status = Status::Aborted}); + } + for (auto It : QueuedJobs) + { + Jobs.push_back({.Id = It->Id, .Status = Status::Queued}); + } + }); + if (!DeadJobs.empty()) + { + QueueLock.WithExclusiveLock([&]() { + for (JobId Id : DeadJobs) + { + AbortedJobs.erase(Id.Id); + CompletedJobs.erase(Id.Id); + } + }); + } + return Jobs; + } + + // Will only respond once when Complete is true + virtual std::optional<JobDetails> Get(JobId Id) override + { + auto Convert = [](Status Status, Job& Job) -> JobDetails { + return JobDetails{.Status = Status, + .State = {.CurrentOp = Job.State.CurrentOp, + .CurrentOpPercentComplete = Job.State.CurrentOpPercentComplete, + .Messages = std::move(Job.State.Messages)}, + .CreateTime = JobClock::TimePointFromTick(Job.CreateTick), + .StartTime = JobClock::TimePointFromTick(Job.StartTick), + .EndTime = JobClock::TimePointFromTick(Job.EndTick)}; + }; + + std::optional<JobDetails> Result; + QueueLock.WithExclusiveLock([&]() { + if (auto It = RunningJobs.find(Id.Id); It != RunningJobs.end()) + { + Result = Convert(Status::Running, *It->second); + return; + } + if (auto It = CompletedJobs.find(Id.Id); It != CompletedJobs.end()) + { + Result = Convert(Status::Completed, *It->second); + CompletedJobs.erase(It); + return; + } + if (auto It = AbortedJobs.find(Id.Id); It != AbortedJobs.end()) + { + Result = Convert(Status::Aborted, *It->second); + AbortedJobs.erase(It); + return; + } + if (auto It = std::find_if(QueuedJobs.begin(), QueuedJobs.end(), [&Id](const RefPtr<Job>& Job) { return Job->Id.Id == Id.Id; }); + It != QueuedJobs.end()) + { + Result = Convert(Status::Queued, *(*It)); + return; + } + }); + return Result; + } + + std::atomic_uint64_t IdGenerator = 1; + + std::atomic_bool InitializedFlag = false; + RwLock QueueLock; + std::deque<RefPtr<Job>> QueuedJobs; + std::unordered_map<uint64_t, Job*> RunningJobs; + std::unordered_map<uint64_t, RefPtr<Job>> CompletedJobs; + std::unordered_map<uint64_t, RefPtr<Job>> AbortedJobs; + + WorkerThreadPool WorkerPool; + Latch WorkerCounter; + + void Worker() + { + RefPtr<Job> CurrentJob; + QueueLock.WithExclusiveLock([&]() { + if (!QueuedJobs.empty()) + { + CurrentJob = std::move(QueuedJobs.front()); + ZEN_ASSERT(CurrentJob); + QueuedJobs.pop_front(); + RunningJobs.insert_or_assign(CurrentJob->Id.Id, CurrentJob); + CurrentJob->StartTick = JobClock::Now(); + } + }); + if (!CurrentJob) + { + return; + } + + try + { + JobContext Context{.Queue = *this, .Id = CurrentJob->Id, .CancelFlag = CurrentJob->CancelFlag}; + CurrentJob->Callback(Context); + QueueLock.WithExclusiveLock([&]() { + CurrentJob->EndTick = JobClock::Now(); + RunningJobs.erase(CurrentJob->Id.Id); + CompletedJobs.insert_or_assign(CurrentJob->Id.Id, std::move(CurrentJob)); + }); + } + catch (std::exception& Ex) + { + QueueLock.WithExclusiveLock([&]() { + CurrentJob->State.Messages.push_back(Ex.what()); + CurrentJob->EndTick = JobClock::Now(); + RunningJobs.erase(CurrentJob->Id.Id); + AbortedJobs.insert_or_assign(CurrentJob->Id.Id, std::move(CurrentJob)); + }); + } + } +}; + +std::string_view +JobQueue::ToString(Status Status) +{ + using namespace std::literals; + + switch (Status) + { + case JobQueue::Status::Queued: + return "Queued"sv; + break; + case JobQueue::Status::Running: + return "Running"sv; + break; + case JobQueue::Status::Aborted: + return "Aborted"sv; + break; + case JobQueue::Status::Completed: + return "Completed"sv; + break; + default: + ZEN_ASSERT(false); + } + return ""sv; +} + +std::unique_ptr<JobQueue> +MakeJobQueue(int WorkerCount, std::string_view QueueName) +{ + return std::make_unique<JobQueueImpl>(WorkerCount, QueueName); +} + +////////////////////////////////////////////////////////////////////////// + +#if ZEN_WITH_TESTS + +void +jobqueue_forcelink() +{ +} + +TEST_CASE("JobQueue") +{ + std::unique_ptr<JobQueue> Queue(MakeJobQueue(2, "queue")); + WorkerThreadPool Pool(4); + Latch JobsLatch(1); + for (uint32_t I = 0; I < 100; I++) + { + JobsLatch.AddCount(1); + Pool.ScheduleWork([&Queue, &JobsLatch]() { + auto _ = MakeGuard([&JobsLatch]() { JobsLatch.CountDown(); }); + auto Id = Queue->QueueJob([&](JobContext& Context) { + if (Context.CancelFlag.load()) + { + return; + } + Context.Queue.ReportProgress(Context.Id, "going to sleep", 0); + Sleep(10); + if (Context.CancelFlag.load()) + { + return; + } + Context.Queue.ReportProgress(Context.Id, "going to sleep again", 50); + if ((Context.Id.Id & 0xFF) == 0x10) + { + zen::ThrowSystemError(8, fmt::format("Job {} forced to fail", Context.Id.Id)); + } + Sleep(10); + if (Context.CancelFlag.load()) + { + return; + } + Context.Queue.ReportProgress(Context.Id, "done", 100); + }); + }); + } + + auto Join = [](std::span<std::string> Strings, std::string_view Delimiter) -> std::string { + ExtendableStringBuilder<128> SB; + if (Strings.empty()) + { + return {}; + } + auto It = Strings.begin(); + SB.Append(*It); + It++; + while (It != Strings.end()) + { + SB.Append(Delimiter); + SB.Append(*It); + It++; + } + return SB.ToString(); + }; + + JobsLatch.CountDown(); + while (true) + { + bool PendingQueue = JobsLatch.Remaining() > 0; + size_t PendingCount = 0; + std::vector<JobId> RemainingJobs; + std::vector<JobQueue::JobInfo> Statuses = Queue->GetJobs(); + RemainingJobs.reserve(Statuses.size()); + for (const auto& It : Statuses) + { + JobQueue::Status Status = It.Status; + + JobId Id = It.Id; + std::optional<JobQueue::JobDetails> CurrentState; + if (Status != JobQueue::Status::Queued) + { + CurrentState = Queue->Get(Id); + CHECK(CurrentState.has_value()); + } + switch (Status) + { + case JobQueue::Status::Queued: + PendingCount++; + RemainingJobs.push_back(Id); + break; + case JobQueue::Status::Running: + ZEN_DEBUG("{} running. '{}' {}% '{}'", + Id.Id, + CurrentState->State.CurrentOp, + CurrentState->State.CurrentOpPercentComplete, + Join(CurrentState->State.Messages, " "sv)); + RemainingJobs.push_back(Id); + break; + case JobQueue::Status::Aborted: + ZEN_DEBUG("{} aborted. Reason: '{}'", Id.Id, Join(CurrentState->State.Messages, " "sv)); + break; + case JobQueue::Status::Completed: + ZEN_DEBUG("{} completed. '{}'", Id.Id, Join(CurrentState->State.Messages, " "sv)); + break; + default: + CHECK(false); + break; + } + } + if (RemainingJobs.empty() && !PendingQueue) + { + break; + } + ZEN_INFO("{} jobs active, {} pending in queue, {} running", + RemainingJobs.size(), + PendingCount, + RemainingJobs.size() - PendingCount); + Sleep(100); + } + JobsLatch.Wait(); +} + +#endif + +} // namespace zen |