aboutsummaryrefslogtreecommitdiff
path: root/src/zencore/jobqueue.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2023-09-13 16:13:30 -0400
committerGitHub <[email protected]>2023-09-13 22:13:30 +0200
commitb2cef5900b6e251bed4bc0a02161fd90646d37f0 (patch)
treee9085a92e9499bca55dfda9b63779be94218409f /src/zencore/jobqueue.cpp
parentscan oplog object for fields (#397) (diff)
downloadzen-b2cef5900b6e251bed4bc0a02161fd90646d37f0.tar.xz
zen-b2cef5900b6e251bed4bc0a02161fd90646d37f0.zip
job queue and async oplog-import/export (#395)
- Feature: New http endpoint for background jobs `/admin/jobs/status` which will return a response listing the currently active background jobs and their status - Feature: New http endpoint for background jobs information `/admin/jobs/status/{jobid}` which will return a response detailing status, pending messages and progress status - GET will return a response detailing status, pending messages and progress status - DELETE will mark the job for cancelling and return without waiting for completion - If status returned is "Complete" or "Aborted" the jobid will be removed from the server and can not be queried again - Feature: New zen command `jobs` to list, get info about and cancel background jobs - If no options are given it will display a list of active background jobs - `--jobid` accepts an id (returned from for example `oplog-export` with `--async`) and will return a response detailing status, pending messages and progress status for that job - `--cancel` can be added when `--jobid` is given which will request zenserver to cancel the background job - Feature: oplog import and export http rpc requests are now async operations that will run in the background - Feature: `oplog-export` and `oplog-import` now reports progress to the console as work progress by default - Feature: `oplog-export` and `oplog-import` can now be cancelled using Ctrl+C - Feature: `oplog-export` and `oplog-import` has a new option `--async` which will only trigger the work and report a background job id back
Diffstat (limited to 'src/zencore/jobqueue.cpp')
-rw-r--r--src/zencore/jobqueue.cpp486
1 files changed, 486 insertions, 0 deletions
diff --git a/src/zencore/jobqueue.cpp b/src/zencore/jobqueue.cpp
new file mode 100644
index 000000000..34530b3a2
--- /dev/null
+++ b/src/zencore/jobqueue.cpp
@@ -0,0 +1,486 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <zencore/jobqueue.h>
+
+#include <zencore/except.h>
+#include <zencore/scopeguard.h>
+#include <zencore/thread.h>
+#include <zencore/workthreadpool.h>
+
+#if ZEN_WITH_TESTS
+# include <zencore/testing.h>
+#endif // ZEN_WITH_TESTS
+
+#include <deque>
+#include <thread>
+#include <unordered_map>
+
+namespace zen {
+
+namespace JobClock {
+ using ClockSource = std::chrono::system_clock;
+
+ using Tick = ClockSource::rep;
+
+ Tick Never() { return ClockSource::time_point::min().time_since_epoch().count(); }
+ Tick Always() { return ClockSource::time_point::max().time_since_epoch().count(); }
+ Tick Now() { return ClockSource::now().time_since_epoch().count(); }
+
+ ClockSource::time_point TimePointFromTick(const Tick TickCount) { return ClockSource::time_point{ClockSource::duration{TickCount}}; }
+} // namespace JobClock
+
+class JobQueueImpl : public JobQueue
+{
+public:
+ struct Job : public RefCounted
+ {
+ JobId Id;
+ JobFunction Callback;
+ std::atomic_bool CancelFlag;
+ State State;
+ JobClock::Tick CreateTick;
+ JobClock::Tick StartTick;
+ JobClock::Tick EndTick;
+ };
+
+ JobQueueImpl(int WorkerCount, std::string_view QueueName) : WorkerPool(WorkerCount, QueueName), WorkerCounter(1)
+ {
+ InitializedFlag.store(true);
+ }
+
+ virtual ~JobQueueImpl()
+ {
+ try
+ {
+ if (InitializedFlag)
+ {
+ Stop();
+ }
+ }
+ catch (std::exception& Ex)
+ {
+ ZEN_WARN("Failed shutting down jobqueue. Reason: '{}'", Ex.what());
+ }
+ }
+
+ virtual JobId QueueJob(JobFunction&& JobFunc) override
+ {
+ ZEN_ASSERT(InitializedFlag);
+
+ uint64_t NewJobId = IdGenerator.fetch_add(1);
+ if (NewJobId == 0)
+ {
+ IdGenerator.fetch_add(1);
+ }
+ RefPtr<Job> NewJob(new Job());
+ NewJob->Callback = std::move(JobFunc);
+ NewJob->CancelFlag = false;
+ NewJob->Id = JobId{.Id = NewJobId};
+ NewJob->CreateTick = JobClock::Now();
+ NewJob->StartTick = JobClock::Never();
+ NewJob->EndTick = JobClock::Never();
+
+ QueueLock.WithExclusiveLock([&]() { QueuedJobs.emplace_back(std::move(NewJob)); });
+ WorkerCounter.AddCount(1);
+ try
+ {
+ WorkerPool.ScheduleWork([&]() {
+ auto _ = MakeGuard([&]() { WorkerCounter.CountDown(); });
+ Worker();
+ });
+ return {.Id = NewJobId};
+ }
+ catch (std::exception& Ex)
+ {
+ WorkerCounter.CountDown();
+ QueueLock.WithExclusiveLock([&]() {
+ if (auto It = std::find_if(QueuedJobs.begin(),
+ QueuedJobs.end(),
+ [NewJobId](const RefPtr<Job>& Job) { return Job->Id.Id == NewJobId; });
+ It != QueuedJobs.end())
+ {
+ QueuedJobs.erase(It);
+ }
+ });
+ ZEN_ERROR("Failed to schedule job to job queue. Reason: ''", Ex.what());
+ throw;
+ }
+ }
+
+ virtual bool CancelJob(JobId Id) override
+ {
+ bool Result = false;
+ QueueLock.WithExclusiveLock([&]() {
+ if (auto It = RunningJobs.find(Id.Id); It != RunningJobs.end())
+ {
+ It->second->CancelFlag.store(true);
+ Result = true;
+ return;
+ }
+ if (auto It = CompletedJobs.find(Id.Id); It != CompletedJobs.end())
+ {
+ Result = true;
+ return;
+ }
+ if (auto It = AbortedJobs.find(Id.Id); It != AbortedJobs.end())
+ {
+ Result = true;
+ return;
+ }
+ if (auto It = std::find_if(QueuedJobs.begin(), QueuedJobs.end(), [&Id](const RefPtr<Job>& Job) { return Job->Id.Id == Id.Id; });
+ It != QueuedJobs.end())
+ {
+ QueuedJobs.erase(It);
+ Result = true;
+ return;
+ }
+ });
+ return Result;
+ }
+
+ virtual void Stop() override
+ {
+ InitializedFlag.store(false);
+ QueueLock.WithExclusiveLock([&]() {
+ for (auto& Job : RunningJobs)
+ {
+ Job.second->CancelFlag.store(true);
+ }
+ QueuedJobs.clear();
+ });
+ WorkerCounter.CountDown();
+ ptrdiff_t Remaining = WorkerCounter.Remaining();
+ while (Remaining > 0)
+ {
+ ZEN_INFO("Waiting for {} background jobs to complete", Remaining);
+ WorkerCounter.Wait(500);
+ QueueLock.WithExclusiveLock([&]() {
+ for (auto& Job : RunningJobs)
+ {
+ Job.second->CancelFlag.store(true);
+ }
+ QueuedJobs.clear();
+ });
+ Remaining = WorkerCounter.Remaining();
+ }
+ }
+
+ virtual void ReportMessage(JobId Id, std::string_view Message) override
+ {
+ QueueLock.WithSharedLock([&]() {
+ auto It = RunningJobs.find(Id.Id);
+ ZEN_ASSERT(It != RunningJobs.end());
+ It->second->State.Messages.push_back(std::string(Message));
+ });
+ }
+
+ virtual void ReportProgress(JobId Id, std::string_view CurrentOp, uint32_t CurrentOpPercentComplete) override
+ {
+ QueueLock.WithSharedLock([&]() {
+ auto It = RunningJobs.find(Id.Id);
+ ZEN_ASSERT(It != RunningJobs.end());
+ It->second->State.CurrentOp = CurrentOp;
+ It->second->State.CurrentOpPercentComplete = CurrentOpPercentComplete;
+ });
+ }
+
+ virtual std::vector<JobInfo> GetJobs() override
+ {
+ std::vector<JobId> DeadJobs;
+ auto IsStale = [](JobClock::Tick Time) {
+ ZEN_ASSERT_SLOW(Time != JobClock::Never());
+ const std::chrono::system_clock::time_point Now = std::chrono::system_clock::now();
+ std::chrono::system_clock::duration Age = Now - JobClock::TimePointFromTick(Time);
+ return std::chrono::duration_cast<std::chrono::days>(Age) > std::chrono::days(1);
+ };
+
+ std::vector<JobInfo> Jobs;
+ QueueLock.WithSharedLock([&]() {
+ for (auto It : RunningJobs)
+ {
+ Jobs.push_back({.Id = JobId{It.first}, .Status = Status::Running});
+ }
+ for (auto It : CompletedJobs)
+ {
+ if (IsStale(It.second->EndTick))
+ {
+ DeadJobs.push_back(JobId{It.first});
+ continue;
+ }
+ Jobs.push_back({.Id = JobId{It.first}, .Status = Status::Completed});
+ }
+ for (auto It : AbortedJobs)
+ {
+ if (IsStale(It.second->EndTick))
+ {
+ DeadJobs.push_back(JobId{It.first});
+ continue;
+ }
+ Jobs.push_back({.Id = JobId{It.first}, .Status = Status::Aborted});
+ }
+ for (auto It : QueuedJobs)
+ {
+ Jobs.push_back({.Id = It->Id, .Status = Status::Queued});
+ }
+ });
+ if (!DeadJobs.empty())
+ {
+ QueueLock.WithExclusiveLock([&]() {
+ for (JobId Id : DeadJobs)
+ {
+ AbortedJobs.erase(Id.Id);
+ CompletedJobs.erase(Id.Id);
+ }
+ });
+ }
+ return Jobs;
+ }
+
+ // Will only respond once when Complete is true
+ virtual std::optional<JobDetails> Get(JobId Id) override
+ {
+ auto Convert = [](Status Status, Job& Job) -> JobDetails {
+ return JobDetails{.Status = Status,
+ .State = {.CurrentOp = Job.State.CurrentOp,
+ .CurrentOpPercentComplete = Job.State.CurrentOpPercentComplete,
+ .Messages = std::move(Job.State.Messages)},
+ .CreateTime = JobClock::TimePointFromTick(Job.CreateTick),
+ .StartTime = JobClock::TimePointFromTick(Job.StartTick),
+ .EndTime = JobClock::TimePointFromTick(Job.EndTick)};
+ };
+
+ std::optional<JobDetails> Result;
+ QueueLock.WithExclusiveLock([&]() {
+ if (auto It = RunningJobs.find(Id.Id); It != RunningJobs.end())
+ {
+ Result = Convert(Status::Running, *It->second);
+ return;
+ }
+ if (auto It = CompletedJobs.find(Id.Id); It != CompletedJobs.end())
+ {
+ Result = Convert(Status::Completed, *It->second);
+ CompletedJobs.erase(It);
+ return;
+ }
+ if (auto It = AbortedJobs.find(Id.Id); It != AbortedJobs.end())
+ {
+ Result = Convert(Status::Aborted, *It->second);
+ AbortedJobs.erase(It);
+ return;
+ }
+ if (auto It = std::find_if(QueuedJobs.begin(), QueuedJobs.end(), [&Id](const RefPtr<Job>& Job) { return Job->Id.Id == Id.Id; });
+ It != QueuedJobs.end())
+ {
+ Result = Convert(Status::Queued, *(*It));
+ return;
+ }
+ });
+ return Result;
+ }
+
+ std::atomic_uint64_t IdGenerator = 1;
+
+ std::atomic_bool InitializedFlag = false;
+ RwLock QueueLock;
+ std::deque<RefPtr<Job>> QueuedJobs;
+ std::unordered_map<uint64_t, Job*> RunningJobs;
+ std::unordered_map<uint64_t, RefPtr<Job>> CompletedJobs;
+ std::unordered_map<uint64_t, RefPtr<Job>> AbortedJobs;
+
+ WorkerThreadPool WorkerPool;
+ Latch WorkerCounter;
+
+ void Worker()
+ {
+ RefPtr<Job> CurrentJob;
+ QueueLock.WithExclusiveLock([&]() {
+ if (!QueuedJobs.empty())
+ {
+ CurrentJob = std::move(QueuedJobs.front());
+ ZEN_ASSERT(CurrentJob);
+ QueuedJobs.pop_front();
+ RunningJobs.insert_or_assign(CurrentJob->Id.Id, CurrentJob);
+ CurrentJob->StartTick = JobClock::Now();
+ }
+ });
+ if (!CurrentJob)
+ {
+ return;
+ }
+
+ try
+ {
+ JobContext Context{.Queue = *this, .Id = CurrentJob->Id, .CancelFlag = CurrentJob->CancelFlag};
+ CurrentJob->Callback(Context);
+ QueueLock.WithExclusiveLock([&]() {
+ CurrentJob->EndTick = JobClock::Now();
+ RunningJobs.erase(CurrentJob->Id.Id);
+ CompletedJobs.insert_or_assign(CurrentJob->Id.Id, std::move(CurrentJob));
+ });
+ }
+ catch (std::exception& Ex)
+ {
+ QueueLock.WithExclusiveLock([&]() {
+ CurrentJob->State.Messages.push_back(Ex.what());
+ CurrentJob->EndTick = JobClock::Now();
+ RunningJobs.erase(CurrentJob->Id.Id);
+ AbortedJobs.insert_or_assign(CurrentJob->Id.Id, std::move(CurrentJob));
+ });
+ }
+ }
+};
+
+std::string_view
+JobQueue::ToString(Status Status)
+{
+ using namespace std::literals;
+
+ switch (Status)
+ {
+ case JobQueue::Status::Queued:
+ return "Queued"sv;
+ break;
+ case JobQueue::Status::Running:
+ return "Running"sv;
+ break;
+ case JobQueue::Status::Aborted:
+ return "Aborted"sv;
+ break;
+ case JobQueue::Status::Completed:
+ return "Completed"sv;
+ break;
+ default:
+ ZEN_ASSERT(false);
+ }
+ return ""sv;
+}
+
+std::unique_ptr<JobQueue>
+MakeJobQueue(int WorkerCount, std::string_view QueueName)
+{
+ return std::make_unique<JobQueueImpl>(WorkerCount, QueueName);
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+#if ZEN_WITH_TESTS
+
+void
+jobqueue_forcelink()
+{
+}
+
+TEST_CASE("JobQueue")
+{
+ std::unique_ptr<JobQueue> Queue(MakeJobQueue(2, "queue"));
+ WorkerThreadPool Pool(4);
+ Latch JobsLatch(1);
+ for (uint32_t I = 0; I < 100; I++)
+ {
+ JobsLatch.AddCount(1);
+ Pool.ScheduleWork([&Queue, &JobsLatch]() {
+ auto _ = MakeGuard([&JobsLatch]() { JobsLatch.CountDown(); });
+ auto Id = Queue->QueueJob([&](JobContext& Context) {
+ if (Context.CancelFlag.load())
+ {
+ return;
+ }
+ Context.Queue.ReportProgress(Context.Id, "going to sleep", 0);
+ Sleep(10);
+ if (Context.CancelFlag.load())
+ {
+ return;
+ }
+ Context.Queue.ReportProgress(Context.Id, "going to sleep again", 50);
+ if ((Context.Id.Id & 0xFF) == 0x10)
+ {
+ zen::ThrowSystemError(8, fmt::format("Job {} forced to fail", Context.Id.Id));
+ }
+ Sleep(10);
+ if (Context.CancelFlag.load())
+ {
+ return;
+ }
+ Context.Queue.ReportProgress(Context.Id, "done", 100);
+ });
+ });
+ }
+
+ auto Join = [](std::span<std::string> Strings, std::string_view Delimiter) -> std::string {
+ ExtendableStringBuilder<128> SB;
+ if (Strings.empty())
+ {
+ return {};
+ }
+ auto It = Strings.begin();
+ SB.Append(*It);
+ It++;
+ while (It != Strings.end())
+ {
+ SB.Append(Delimiter);
+ SB.Append(*It);
+ It++;
+ }
+ return SB.ToString();
+ };
+
+ JobsLatch.CountDown();
+ while (true)
+ {
+ bool PendingQueue = JobsLatch.Remaining() > 0;
+ size_t PendingCount = 0;
+ std::vector<JobId> RemainingJobs;
+ std::vector<JobQueue::JobInfo> Statuses = Queue->GetJobs();
+ RemainingJobs.reserve(Statuses.size());
+ for (const auto& It : Statuses)
+ {
+ JobQueue::Status Status = It.Status;
+
+ JobId Id = It.Id;
+ std::optional<JobQueue::JobDetails> CurrentState;
+ if (Status != JobQueue::Status::Queued)
+ {
+ CurrentState = Queue->Get(Id);
+ CHECK(CurrentState.has_value());
+ }
+ switch (Status)
+ {
+ case JobQueue::Status::Queued:
+ PendingCount++;
+ RemainingJobs.push_back(Id);
+ break;
+ case JobQueue::Status::Running:
+ ZEN_DEBUG("{} running. '{}' {}% '{}'",
+ Id.Id,
+ CurrentState->State.CurrentOp,
+ CurrentState->State.CurrentOpPercentComplete,
+ Join(CurrentState->State.Messages, " "sv));
+ RemainingJobs.push_back(Id);
+ break;
+ case JobQueue::Status::Aborted:
+ ZEN_DEBUG("{} aborted. Reason: '{}'", Id.Id, Join(CurrentState->State.Messages, " "sv));
+ break;
+ case JobQueue::Status::Completed:
+ ZEN_DEBUG("{} completed. '{}'", Id.Id, Join(CurrentState->State.Messages, " "sv));
+ break;
+ default:
+ CHECK(false);
+ break;
+ }
+ }
+ if (RemainingJobs.empty() && !PendingQueue)
+ {
+ break;
+ }
+ ZEN_INFO("{} jobs active, {} pending in queue, {} running",
+ RemainingJobs.size(),
+ PendingCount,
+ RemainingJobs.size() - PendingCount);
+ Sleep(100);
+ }
+ JobsLatch.Wait();
+}
+
+#endif
+
+} // namespace zen