diff options
| author | Dan Engelbrecht <[email protected]> | 2023-09-13 16:13:30 -0400 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-09-13 22:13:30 +0200 |
| commit | b2cef5900b6e251bed4bc0a02161fd90646d37f0 (patch) | |
| tree | e9085a92e9499bca55dfda9b63779be94218409f /src | |
| parent | scan oplog object for fields (#397) (diff) | |
| download | zen-b2cef5900b6e251bed4bc0a02161fd90646d37f0.tar.xz zen-b2cef5900b6e251bed4bc0a02161fd90646d37f0.zip | |
job queue and async oplog-import/export (#395)
- Feature: New http endpoint for background jobs `/admin/jobs/status` which will return a response listing the currently active background jobs and their status
- Feature: New http endpoint for background jobs information `/admin/jobs/status/{jobid}` which will return a response detailing status, pending messages and progress status
- GET will return a response detailing status, pending messages and progress status
- DELETE will mark the job for cancelling and return without waiting for completion
- If status returned is "Complete" or "Aborted" the jobid will be removed from the server and can not be queried again
- Feature: New zen command `jobs` to list, get info about and cancel background jobs
- If no options are given it will display a list of active background jobs
- `--jobid` accepts an id (returned from for example `oplog-export` with `--async`) and will return a response detailing status, pending messages and progress status for that job
- `--cancel` can be added when `--jobid` is given which will request zenserver to cancel the background job
- Feature: oplog import and export http rpc requests are now async operations that will run in the background
- Feature: `oplog-export` and `oplog-import` now reports progress to the console as work progress by default
- Feature: `oplog-export` and `oplog-import` can now be cancelled using Ctrl+C
- Feature: `oplog-export` and `oplog-import` has a new option `--async` which will only trigger the work and report a background job id back
Diffstat (limited to 'src')
22 files changed, 1369 insertions, 153 deletions
diff --git a/src/zen/cmds/jobs.cpp b/src/zen/cmds/jobs.cpp new file mode 100644 index 000000000..137c321af --- /dev/null +++ b/src/zen/cmds/jobs.cpp @@ -0,0 +1,82 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "jobs.h" + +#include <zencore/fmtutils.h> +#include <zencore/logging.h> +#include <zencore/string.h> +#include <zencore/uid.h> +#include <zenhttp/formatters.h> +#include <zenhttp/httpclient.h> + +namespace zen { + +//////////////////////////////////////////// + +JobCommand::JobCommand() +{ + m_Options.add_options()("h,help", "Print help"); + m_Options.add_option("", "u", "hosturl", "Host URL", cxxopts::value(m_HostName)->default_value(""), "<hosturl>"); + m_Options.add_option("", "j", "jobid", "Job id", cxxopts::value(m_JobId), "<jobid>"); + m_Options.add_option("", "c", "cancel", "Cancel job id", cxxopts::value(m_Cancel), "<cancel>"); +} + +JobCommand::~JobCommand() = default; + +int +JobCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) +{ + ZEN_UNUSED(GlobalOptions); + + using namespace std::literals; + + if (!ParseOptions(argc, argv)) + { + return 0; + } + + m_HostName = ResolveTargetHostSpec(m_HostName); + + if (m_HostName.empty()) + { + throw OptionParseException("unable to resolve server specification"); + } + + HttpClient Http(m_HostName); + + if (m_Cancel) + { + if (m_JobId == 0) + { + ZEN_ERROR("Job id must be given"); + return 1; + } + } + std::string Url = m_JobId != 0 ? fmt::format("/admin/jobs/{}", m_JobId) : "/admin/jobs"; + + if (m_Cancel) + { + if (HttpClient::Response Result = Http.Delete(Url, HttpClient::Accept(ZenContentType::kJSON))) + { + ZEN_CONSOLE("{}", Result); + } + else + { + Result.ThrowError("failed cancelling job"sv); + return 1; + } + } + else if (HttpClient::Response Result = Http.Get(Url, HttpClient::Accept(ZenContentType::kJSON))) + { + ZEN_CONSOLE("{}", Result.AsText()); + } + else + { + Result.ThrowError("failed fetching job info"sv); + return 1; + } + + return 0; +} + +} // namespace zen diff --git a/src/zen/cmds/jobs.h b/src/zen/cmds/jobs.h new file mode 100644 index 000000000..2c523f24a --- /dev/null +++ b/src/zen/cmds/jobs.h @@ -0,0 +1,27 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include "../zen.h" + +namespace zen { + +//////////////////////////////////////////// + +class JobCommand : public ZenCmdBase +{ +public: + JobCommand(); + ~JobCommand(); + + virtual int Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) override; + virtual cxxopts::Options& Options() override { return m_Options; } + +private: + cxxopts::Options m_Options{"jobs", "Show/cancel zen background jobs"}; + std::string m_HostName; + std::uint64_t m_JobId = 0; + bool m_Cancel = 0; +}; + +} // namespace zen diff --git a/src/zen/cmds/projectstore.cpp b/src/zen/cmds/projectstore.cpp index c3bfeaa4f..edeff7d85 100644 --- a/src/zen/cmds/projectstore.cpp +++ b/src/zen/cmds/projectstore.cpp @@ -17,6 +17,8 @@ ZEN_THIRD_PARTY_INCLUDES_START #include <cpr/cpr.h> ZEN_THIRD_PARTY_INCLUDES_END +#include <signal.h> + namespace zen { namespace { @@ -42,6 +44,116 @@ namespace { return Payload; }; + static std::atomic_uint32_t SignalCounter[NSIG] = {0}; + + static void SignalCallbackHandler(int SigNum) + { + if (SigNum >= 0 && SigNum < NSIG) + { + SignalCounter[SigNum].fetch_add(1); + } + } + + void AsyncPost(HttpClient& Http, std::string_view Url, IoBuffer&& Payload) + { + if (HttpClient::Response Result = Http.Post(Url, Payload)) + { + if (Result.StatusCode == HttpResponseCode::Accepted) + { + signal(SIGINT, SignalCallbackHandler); + bool Cancelled = false; + + std::string_view JobIdText = Result.AsText(); + std::optional<uint64_t> JobIdMaybe = ParseInt<uint64_t>(JobIdText); + if (!JobIdMaybe) + { + Result.ThrowError("invalid job id"sv); + } + + std::string LastCurrentOp; + uint32_t LastCurrentOpPercentComplete = 0; + + uint64_t JobId = JobIdMaybe.value(); + while (true) + { + HttpClient::Response StatusResult = + Http.Get(fmt::format("/admin/jobs/{}", JobId), HttpClient::Accept(ZenContentType::kCbObject)); + if (!StatusResult) + { + StatusResult.ThrowError("failed to create project"sv); + } + CbObject StatusObject = StatusResult.AsObject(); + std::string_view Status = StatusObject["Status"sv].AsString(); + CbArrayView Messages = StatusObject["Messages"sv].AsArrayView(); + for (auto M : Messages) + { + std::string_view Message = M.AsString(); + ZEN_CONSOLE("{}", Message); + } + if (Status == "Complete") + { + if (Cancelled) + { + ZEN_CONSOLE("Cancelled"); + } + else + { + double QueueTimeS = StatusObject["QueueTimeS"].AsDouble(); + double RuntimeS = StatusObject["RunTimeS"].AsDouble(); + ZEN_CONSOLE("Completed: QueueTime: {:.3} s, RunTime: {:.3} s", QueueTimeS, RuntimeS); + } + break; + } + if (Status == "Aborted") + { + Result.ThrowError("Aborted"); + break; + } + if (Status == "Queued") + { + double QueueTimeS = StatusObject["QueueTimeS"].AsDouble(); + ZEN_CONSOLE("Queued, waited {:.3} s...", QueueTimeS); + } + if (Status == "Running") + { + std::string_view CurrentOp = StatusObject["CurrentOp"sv].AsString(); + uint32_t CurrentOpPercentComplete = StatusObject["CurrentOpPercentComplete"sv].AsUInt32(); + if (CurrentOp != LastCurrentOp || CurrentOpPercentComplete != LastCurrentOpPercentComplete) + { + LastCurrentOp = CurrentOp; + LastCurrentOpPercentComplete = CurrentOpPercentComplete; + ZEN_CONSOLE("{} {}%", CurrentOp, CurrentOpPercentComplete); + } + } + uint32_t AbortCounter = SignalCounter[SIGINT].load(); + if (SignalCounter[SIGINT] > 0) + { + SignalCounter[SIGINT].fetch_sub(AbortCounter); + if (HttpClient::Response DeleteResult = Http.Delete(fmt::format("/admin/jobs/{}", JobId))) + { + ZEN_CONSOLE("Requested cancel..."); + Cancelled = true; + } + else + { + ZEN_CONSOLE("Failed cancelling job {}", DeleteResult); + } + continue; + } + Sleep(100); + } + } + else + { + ZEN_CONSOLE("{}", Result); + } + } + else + { + Result.ThrowError("failed to start operation"sv); + } + } + } // namespace /////////////////////////////////////// @@ -481,6 +593,7 @@ ExportOplogCommand::ExportOplogCommand() "Disable block creation and save all attachments individually (applies to file and cloud target)", cxxopts::value(m_DisableBlocks), "<disable>"); + m_Options.add_option("", "a", "async", "Trigger export but don't wait for completion", cxxopts::value(m_Async), "<async>"); m_Options.add_option("", "", "cloud", "Cloud Storage URL", cxxopts::value(m_CloudUrl), "<url>"); m_Options.add_option("cloud", "", "namespace", "Cloud Storage namespace", cxxopts::value(m_CloudNamespace), "<namespace>"); @@ -693,6 +806,7 @@ ExportOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg { Writer.AddBool("force"sv, true); } + Writer.AddBool("async"sv, true); if (!m_FileDirectoryPath.empty()) { Writer.BeginObject("file"sv); @@ -793,16 +907,26 @@ ExportOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg ZEN_CONSOLE("Saving oplog '{}/{}' from '{}' to {}", m_ProjectName, m_OplogName, m_HostName, TargetDescription); HttpClient Http(m_HostName); - if (HttpClient::Response Result = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", m_ProjectName, m_OplogName), Payload)) + if (m_Async) { - ZEN_CONSOLE("{}", Result); - return 0; + if (HttpClient::Response Result = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", m_ProjectName, m_OplogName), + std::move(Payload), + HttpClient::Accept(ZenContentType::kJSON)); + Result) + { + ZEN_CONSOLE("{}", Result.AsText()); + } + else + { + Result.ThrowError("failed requesting loading oplog export"sv); + return 1; + } } else { - Result.ThrowError("failed to create project"sv); - return 1; + AsyncPost(Http, fmt::format("/prj/{}/oplog/{}/rpc", m_ProjectName, m_OplogName), std::move(Payload)); } + return 0; } //////////////////////////// @@ -821,6 +945,7 @@ ImportOplogCommand::ImportOplogCommand() cxxopts::value(m_MaxChunkEmbedSize), "<chunksize>"); m_Options.add_option("", "f", "force", "Force import of all attachments", cxxopts::value(m_Force), "<force>"); + m_Options.add_option("", "a", "async", "Trigger import but don't wait for completion", cxxopts::value(m_Async), "<async>"); m_Options.add_option("", "", "cloud", "Cloud Storage URL", cxxopts::value(m_CloudUrl), "<url>"); m_Options.add_option("cloud", "", "namespace", "Cloud Storage namespace", cxxopts::value(m_CloudNamespace), "<namespace>"); @@ -1050,16 +1175,27 @@ ImportOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg }); ZEN_CONSOLE("Loading oplog '{}/{}' from '{}' to {}", m_ProjectName, m_OplogName, SourceDescription, m_HostName); - if (HttpClient::Response Result = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", m_ProjectName, m_OplogName), Payload)) + + if (m_Async) { - ZEN_CONSOLE("{}", Result); - return 0; + if (HttpClient::Response Result = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", m_ProjectName, m_OplogName), + std::move(Payload), + HttpClient::Accept(ZenContentType::kJSON)); + Result) + { + ZEN_CONSOLE("{}", Result.AsText()); + } + else + { + Result.ThrowError("failed requesting loading oplog import"sv); + return 1; + } } else { - Result.ThrowError("failed to create project"sv); - return 1; + AsyncPost(Http, fmt::format("/prj/{}/oplog/{}/rpc", m_ProjectName, m_OplogName), std::move(Payload)); } + return 0; } //////////////////////////// diff --git a/src/zen/cmds/projectstore.h b/src/zen/cmds/projectstore.h index e22357d09..fd1590423 100644 --- a/src/zen/cmds/projectstore.h +++ b/src/zen/cmds/projectstore.h @@ -126,6 +126,7 @@ private: bool m_EmbedLooseFiles = false; bool m_Force = false; bool m_DisableBlocks = false; + bool m_Async = false; std::string m_CloudUrl; std::string m_CloudNamespace; @@ -167,6 +168,7 @@ private: size_t m_MaxBlockSize = 0; size_t m_MaxChunkEmbedSize = 0; bool m_Force = false; + bool m_Async = false; std::string m_CloudUrl; std::string m_CloudNamespace; diff --git a/src/zen/zen.cpp b/src/zen/zen.cpp index 0ed9226fc..5530b57b6 100644 --- a/src/zen/zen.cpp +++ b/src/zen/zen.cpp @@ -10,6 +10,7 @@ #include "cmds/copy.h" #include "cmds/dedup.h" #include "cmds/hash.h" +#include "cmds/jobs.h" #include "cmds/print.h" #include "cmds/projectstore.h" #include "cmds/rpcreplay.h" @@ -224,6 +225,7 @@ main(int argc, char** argv) ServeCommand ServeCmd; SnapshotOplogCommand SnapshotOplogCmd; StatusCommand StatusCmd; + JobCommand JobCmd; TopCommand TopCmd; UpCommand UpCmd; AttachCommand AttachCmd; @@ -272,6 +274,7 @@ main(int argc, char** argv) {"scrub", &ScrubCmd, "Scrub zen storage (verify data integrity)"}, {"serve", &ServeCmd, "Serve files from a directory"}, {"status", &StatusCmd, "Show zen status"}, + {"jobs", &JobCmd, "Show/cancel zen background jobs"}, {"top", &TopCmd, "Monitor zen server activity"}, {"up", &UpCmd, "Bring zen server up"}, {"attach", &AttachCmd, "Add a sponsor process to a running zen service"}, diff --git a/src/zencore/include/zencore/jobqueue.h b/src/zencore/include/zencore/jobqueue.h new file mode 100644 index 000000000..41a3288e1 --- /dev/null +++ b/src/zencore/include/zencore/jobqueue.h @@ -0,0 +1,84 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <chrono> +#include <functional> +#include <memory> +#include <optional> +#include <string> +#include <string_view> +#include <vector> + +namespace zen { + +struct JobId +{ + uint64_t Id; +}; + +class JobQueue; + +struct JobContext +{ + JobQueue& Queue; + const JobId Id; + std::atomic_bool& CancelFlag; +}; + +class JobQueue +{ +public: + typedef std::function<void(JobContext& Context)> JobFunction; + virtual ~JobQueue() = default; + + virtual JobId QueueJob(JobFunction&& JobFunc) = 0; + virtual bool CancelJob(JobId Id) = 0; + virtual void Stop() = 0; + + virtual void ReportMessage(JobId Id, std::string_view Message) = 0; + virtual void ReportProgress(JobId Id, std::string_view CurrentOp, uint32_t CurrentOpPercentComplete) = 0; + + enum class Status : uint32_t + { + Queued, + Running, + Aborted, + Completed + }; + + struct State + { + std::string CurrentOp; + uint32_t CurrentOpPercentComplete = 0; + std::vector<std::string> Messages; + }; + + struct JobInfo + { + JobId Id; + Status Status; + }; + + virtual std::vector<JobInfo> GetJobs() = 0; + + struct JobDetails + { + Status Status; + State State; + std::chrono::system_clock::time_point CreateTime; + std::chrono::system_clock::time_point StartTime; + std::chrono::system_clock::time_point EndTime; + }; + + // Will only respond once when status is Complete or Aborted + virtual std::optional<JobDetails> Get(JobId Id) = 0; + + static std::string_view ToString(Status Status); +}; + +std::unique_ptr<JobQueue> MakeJobQueue(int WorkerCount, std::string_view QueueName); + +void jobqueue_forcelink(); + +} // namespace zen diff --git a/src/zencore/jobqueue.cpp b/src/zencore/jobqueue.cpp new file mode 100644 index 000000000..34530b3a2 --- /dev/null +++ b/src/zencore/jobqueue.cpp @@ -0,0 +1,486 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <zencore/jobqueue.h> + +#include <zencore/except.h> +#include <zencore/scopeguard.h> +#include <zencore/thread.h> +#include <zencore/workthreadpool.h> + +#if ZEN_WITH_TESTS +# include <zencore/testing.h> +#endif // ZEN_WITH_TESTS + +#include <deque> +#include <thread> +#include <unordered_map> + +namespace zen { + +namespace JobClock { + using ClockSource = std::chrono::system_clock; + + using Tick = ClockSource::rep; + + Tick Never() { return ClockSource::time_point::min().time_since_epoch().count(); } + Tick Always() { return ClockSource::time_point::max().time_since_epoch().count(); } + Tick Now() { return ClockSource::now().time_since_epoch().count(); } + + ClockSource::time_point TimePointFromTick(const Tick TickCount) { return ClockSource::time_point{ClockSource::duration{TickCount}}; } +} // namespace JobClock + +class JobQueueImpl : public JobQueue +{ +public: + struct Job : public RefCounted + { + JobId Id; + JobFunction Callback; + std::atomic_bool CancelFlag; + State State; + JobClock::Tick CreateTick; + JobClock::Tick StartTick; + JobClock::Tick EndTick; + }; + + JobQueueImpl(int WorkerCount, std::string_view QueueName) : WorkerPool(WorkerCount, QueueName), WorkerCounter(1) + { + InitializedFlag.store(true); + } + + virtual ~JobQueueImpl() + { + try + { + if (InitializedFlag) + { + Stop(); + } + } + catch (std::exception& Ex) + { + ZEN_WARN("Failed shutting down jobqueue. Reason: '{}'", Ex.what()); + } + } + + virtual JobId QueueJob(JobFunction&& JobFunc) override + { + ZEN_ASSERT(InitializedFlag); + + uint64_t NewJobId = IdGenerator.fetch_add(1); + if (NewJobId == 0) + { + IdGenerator.fetch_add(1); + } + RefPtr<Job> NewJob(new Job()); + NewJob->Callback = std::move(JobFunc); + NewJob->CancelFlag = false; + NewJob->Id = JobId{.Id = NewJobId}; + NewJob->CreateTick = JobClock::Now(); + NewJob->StartTick = JobClock::Never(); + NewJob->EndTick = JobClock::Never(); + + QueueLock.WithExclusiveLock([&]() { QueuedJobs.emplace_back(std::move(NewJob)); }); + WorkerCounter.AddCount(1); + try + { + WorkerPool.ScheduleWork([&]() { + auto _ = MakeGuard([&]() { WorkerCounter.CountDown(); }); + Worker(); + }); + return {.Id = NewJobId}; + } + catch (std::exception& Ex) + { + WorkerCounter.CountDown(); + QueueLock.WithExclusiveLock([&]() { + if (auto It = std::find_if(QueuedJobs.begin(), + QueuedJobs.end(), + [NewJobId](const RefPtr<Job>& Job) { return Job->Id.Id == NewJobId; }); + It != QueuedJobs.end()) + { + QueuedJobs.erase(It); + } + }); + ZEN_ERROR("Failed to schedule job to job queue. Reason: ''", Ex.what()); + throw; + } + } + + virtual bool CancelJob(JobId Id) override + { + bool Result = false; + QueueLock.WithExclusiveLock([&]() { + if (auto It = RunningJobs.find(Id.Id); It != RunningJobs.end()) + { + It->second->CancelFlag.store(true); + Result = true; + return; + } + if (auto It = CompletedJobs.find(Id.Id); It != CompletedJobs.end()) + { + Result = true; + return; + } + if (auto It = AbortedJobs.find(Id.Id); It != AbortedJobs.end()) + { + Result = true; + return; + } + if (auto It = std::find_if(QueuedJobs.begin(), QueuedJobs.end(), [&Id](const RefPtr<Job>& Job) { return Job->Id.Id == Id.Id; }); + It != QueuedJobs.end()) + { + QueuedJobs.erase(It); + Result = true; + return; + } + }); + return Result; + } + + virtual void Stop() override + { + InitializedFlag.store(false); + QueueLock.WithExclusiveLock([&]() { + for (auto& Job : RunningJobs) + { + Job.second->CancelFlag.store(true); + } + QueuedJobs.clear(); + }); + WorkerCounter.CountDown(); + ptrdiff_t Remaining = WorkerCounter.Remaining(); + while (Remaining > 0) + { + ZEN_INFO("Waiting for {} background jobs to complete", Remaining); + WorkerCounter.Wait(500); + QueueLock.WithExclusiveLock([&]() { + for (auto& Job : RunningJobs) + { + Job.second->CancelFlag.store(true); + } + QueuedJobs.clear(); + }); + Remaining = WorkerCounter.Remaining(); + } + } + + virtual void ReportMessage(JobId Id, std::string_view Message) override + { + QueueLock.WithSharedLock([&]() { + auto It = RunningJobs.find(Id.Id); + ZEN_ASSERT(It != RunningJobs.end()); + It->second->State.Messages.push_back(std::string(Message)); + }); + } + + virtual void ReportProgress(JobId Id, std::string_view CurrentOp, uint32_t CurrentOpPercentComplete) override + { + QueueLock.WithSharedLock([&]() { + auto It = RunningJobs.find(Id.Id); + ZEN_ASSERT(It != RunningJobs.end()); + It->second->State.CurrentOp = CurrentOp; + It->second->State.CurrentOpPercentComplete = CurrentOpPercentComplete; + }); + } + + virtual std::vector<JobInfo> GetJobs() override + { + std::vector<JobId> DeadJobs; + auto IsStale = [](JobClock::Tick Time) { + ZEN_ASSERT_SLOW(Time != JobClock::Never()); + const std::chrono::system_clock::time_point Now = std::chrono::system_clock::now(); + std::chrono::system_clock::duration Age = Now - JobClock::TimePointFromTick(Time); + return std::chrono::duration_cast<std::chrono::days>(Age) > std::chrono::days(1); + }; + + std::vector<JobInfo> Jobs; + QueueLock.WithSharedLock([&]() { + for (auto It : RunningJobs) + { + Jobs.push_back({.Id = JobId{It.first}, .Status = Status::Running}); + } + for (auto It : CompletedJobs) + { + if (IsStale(It.second->EndTick)) + { + DeadJobs.push_back(JobId{It.first}); + continue; + } + Jobs.push_back({.Id = JobId{It.first}, .Status = Status::Completed}); + } + for (auto It : AbortedJobs) + { + if (IsStale(It.second->EndTick)) + { + DeadJobs.push_back(JobId{It.first}); + continue; + } + Jobs.push_back({.Id = JobId{It.first}, .Status = Status::Aborted}); + } + for (auto It : QueuedJobs) + { + Jobs.push_back({.Id = It->Id, .Status = Status::Queued}); + } + }); + if (!DeadJobs.empty()) + { + QueueLock.WithExclusiveLock([&]() { + for (JobId Id : DeadJobs) + { + AbortedJobs.erase(Id.Id); + CompletedJobs.erase(Id.Id); + } + }); + } + return Jobs; + } + + // Will only respond once when Complete is true + virtual std::optional<JobDetails> Get(JobId Id) override + { + auto Convert = [](Status Status, Job& Job) -> JobDetails { + return JobDetails{.Status = Status, + .State = {.CurrentOp = Job.State.CurrentOp, + .CurrentOpPercentComplete = Job.State.CurrentOpPercentComplete, + .Messages = std::move(Job.State.Messages)}, + .CreateTime = JobClock::TimePointFromTick(Job.CreateTick), + .StartTime = JobClock::TimePointFromTick(Job.StartTick), + .EndTime = JobClock::TimePointFromTick(Job.EndTick)}; + }; + + std::optional<JobDetails> Result; + QueueLock.WithExclusiveLock([&]() { + if (auto It = RunningJobs.find(Id.Id); It != RunningJobs.end()) + { + Result = Convert(Status::Running, *It->second); + return; + } + if (auto It = CompletedJobs.find(Id.Id); It != CompletedJobs.end()) + { + Result = Convert(Status::Completed, *It->second); + CompletedJobs.erase(It); + return; + } + if (auto It = AbortedJobs.find(Id.Id); It != AbortedJobs.end()) + { + Result = Convert(Status::Aborted, *It->second); + AbortedJobs.erase(It); + return; + } + if (auto It = std::find_if(QueuedJobs.begin(), QueuedJobs.end(), [&Id](const RefPtr<Job>& Job) { return Job->Id.Id == Id.Id; }); + It != QueuedJobs.end()) + { + Result = Convert(Status::Queued, *(*It)); + return; + } + }); + return Result; + } + + std::atomic_uint64_t IdGenerator = 1; + + std::atomic_bool InitializedFlag = false; + RwLock QueueLock; + std::deque<RefPtr<Job>> QueuedJobs; + std::unordered_map<uint64_t, Job*> RunningJobs; + std::unordered_map<uint64_t, RefPtr<Job>> CompletedJobs; + std::unordered_map<uint64_t, RefPtr<Job>> AbortedJobs; + + WorkerThreadPool WorkerPool; + Latch WorkerCounter; + + void Worker() + { + RefPtr<Job> CurrentJob; + QueueLock.WithExclusiveLock([&]() { + if (!QueuedJobs.empty()) + { + CurrentJob = std::move(QueuedJobs.front()); + ZEN_ASSERT(CurrentJob); + QueuedJobs.pop_front(); + RunningJobs.insert_or_assign(CurrentJob->Id.Id, CurrentJob); + CurrentJob->StartTick = JobClock::Now(); + } + }); + if (!CurrentJob) + { + return; + } + + try + { + JobContext Context{.Queue = *this, .Id = CurrentJob->Id, .CancelFlag = CurrentJob->CancelFlag}; + CurrentJob->Callback(Context); + QueueLock.WithExclusiveLock([&]() { + CurrentJob->EndTick = JobClock::Now(); + RunningJobs.erase(CurrentJob->Id.Id); + CompletedJobs.insert_or_assign(CurrentJob->Id.Id, std::move(CurrentJob)); + }); + } + catch (std::exception& Ex) + { + QueueLock.WithExclusiveLock([&]() { + CurrentJob->State.Messages.push_back(Ex.what()); + CurrentJob->EndTick = JobClock::Now(); + RunningJobs.erase(CurrentJob->Id.Id); + AbortedJobs.insert_or_assign(CurrentJob->Id.Id, std::move(CurrentJob)); + }); + } + } +}; + +std::string_view +JobQueue::ToString(Status Status) +{ + using namespace std::literals; + + switch (Status) + { + case JobQueue::Status::Queued: + return "Queued"sv; + break; + case JobQueue::Status::Running: + return "Running"sv; + break; + case JobQueue::Status::Aborted: + return "Aborted"sv; + break; + case JobQueue::Status::Completed: + return "Completed"sv; + break; + default: + ZEN_ASSERT(false); + } + return ""sv; +} + +std::unique_ptr<JobQueue> +MakeJobQueue(int WorkerCount, std::string_view QueueName) +{ + return std::make_unique<JobQueueImpl>(WorkerCount, QueueName); +} + +////////////////////////////////////////////////////////////////////////// + +#if ZEN_WITH_TESTS + +void +jobqueue_forcelink() +{ +} + +TEST_CASE("JobQueue") +{ + std::unique_ptr<JobQueue> Queue(MakeJobQueue(2, "queue")); + WorkerThreadPool Pool(4); + Latch JobsLatch(1); + for (uint32_t I = 0; I < 100; I++) + { + JobsLatch.AddCount(1); + Pool.ScheduleWork([&Queue, &JobsLatch]() { + auto _ = MakeGuard([&JobsLatch]() { JobsLatch.CountDown(); }); + auto Id = Queue->QueueJob([&](JobContext& Context) { + if (Context.CancelFlag.load()) + { + return; + } + Context.Queue.ReportProgress(Context.Id, "going to sleep", 0); + Sleep(10); + if (Context.CancelFlag.load()) + { + return; + } + Context.Queue.ReportProgress(Context.Id, "going to sleep again", 50); + if ((Context.Id.Id & 0xFF) == 0x10) + { + zen::ThrowSystemError(8, fmt::format("Job {} forced to fail", Context.Id.Id)); + } + Sleep(10); + if (Context.CancelFlag.load()) + { + return; + } + Context.Queue.ReportProgress(Context.Id, "done", 100); + }); + }); + } + + auto Join = [](std::span<std::string> Strings, std::string_view Delimiter) -> std::string { + ExtendableStringBuilder<128> SB; + if (Strings.empty()) + { + return {}; + } + auto It = Strings.begin(); + SB.Append(*It); + It++; + while (It != Strings.end()) + { + SB.Append(Delimiter); + SB.Append(*It); + It++; + } + return SB.ToString(); + }; + + JobsLatch.CountDown(); + while (true) + { + bool PendingQueue = JobsLatch.Remaining() > 0; + size_t PendingCount = 0; + std::vector<JobId> RemainingJobs; + std::vector<JobQueue::JobInfo> Statuses = Queue->GetJobs(); + RemainingJobs.reserve(Statuses.size()); + for (const auto& It : Statuses) + { + JobQueue::Status Status = It.Status; + + JobId Id = It.Id; + std::optional<JobQueue::JobDetails> CurrentState; + if (Status != JobQueue::Status::Queued) + { + CurrentState = Queue->Get(Id); + CHECK(CurrentState.has_value()); + } + switch (Status) + { + case JobQueue::Status::Queued: + PendingCount++; + RemainingJobs.push_back(Id); + break; + case JobQueue::Status::Running: + ZEN_DEBUG("{} running. '{}' {}% '{}'", + Id.Id, + CurrentState->State.CurrentOp, + CurrentState->State.CurrentOpPercentComplete, + Join(CurrentState->State.Messages, " "sv)); + RemainingJobs.push_back(Id); + break; + case JobQueue::Status::Aborted: + ZEN_DEBUG("{} aborted. Reason: '{}'", Id.Id, Join(CurrentState->State.Messages, " "sv)); + break; + case JobQueue::Status::Completed: + ZEN_DEBUG("{} completed. '{}'", Id.Id, Join(CurrentState->State.Messages, " "sv)); + break; + default: + CHECK(false); + break; + } + } + if (RemainingJobs.empty() && !PendingQueue) + { + break; + } + ZEN_INFO("{} jobs active, {} pending in queue, {} running", + RemainingJobs.size(), + PendingCount, + RemainingJobs.size() - PendingCount); + Sleep(100); + } + JobsLatch.Wait(); +} + +#endif + +} // namespace zen diff --git a/src/zencore/zencore.cpp b/src/zencore/zencore.cpp index 83e6ea772..84ffee45b 100644 --- a/src/zencore/zencore.cpp +++ b/src/zencore/zencore.cpp @@ -20,6 +20,7 @@ #include <zencore/filesystem.h> #include <zencore/intmath.h> #include <zencore/iobuffer.h> +#include <zencore/jobqueue.h> #include <zencore/logging.h> #include <zencore/memory.h> #include <zencore/mpscqueue.h> @@ -115,6 +116,7 @@ zencore_forcelinktests() zen::compress_forcelink(); zen::crypto_forcelink(); zen::filesystem_forcelink(); + zen::jobqueue_forcelink(); zen::intmath_forcelink(); zen::iobuffer_forcelink(); zen::logging_forcelink(); diff --git a/src/zenserver-test/zenserver-test.cpp b/src/zenserver-test/zenserver-test.cpp index 8c95835fd..2f52e3225 100644 --- a/src/zenserver-test/zenserver-test.cpp +++ b/src/zenserver-test/zenserver-test.cpp @@ -2933,6 +2933,28 @@ TEST_CASE("project.remote") CHECK(SourceOps == TargetOps); }; + auto WaitForCompletion = [&Session](ZenServerInstance& Server, const cpr::Response& Response) { + CHECK(IsHttpSuccessCode(Response.status_code)); + uint64_t JobId = ParseInt<uint64_t>(Response.text).value_or(0); + CHECK(JobId != 0); + Session.SetUrl(fmt::format("{}/admin/jobs/{}", Server.GetBaseUri(), JobId)); + Session.SetHeader(cpr::Header{{"Accept", std::string(ToString(ZenContentType::kCbObject))}}); + while (true) + { + cpr::Response StatusResponse = Session.Get(); + CHECK(IsHttpSuccessCode(StatusResponse.status_code)); + CbObject ReponseObject = + LoadCompactBinaryObject(IoBuffer(IoBuffer::Wrap, StatusResponse.text.data(), StatusResponse.text.size())); + std::string_view Status = ReponseObject["Status"sv].AsString(); + CHECK(Status != "Aborted"sv); + if (Status == "Complete"sv) + { + return; + } + Sleep(10); + } + }; + SUBCASE("File") { ScopedTemporaryDirectory TempDir; @@ -2961,7 +2983,7 @@ TEST_CASE("project.remote") Session.SetBody(AsBody(Payload)); Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}}); cpr::Response Response = Session.Post(); - CHECK(IsHttpSuccessCode(Response.status_code)); + WaitForCompletion(Servers.GetInstance(0), Response); } { MakeProject(Session, Servers.GetInstance(1).GetBaseUri(), "proj0_copy"); @@ -2990,7 +3012,7 @@ TEST_CASE("project.remote") Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}}); cpr::Response Response = Session.Post(); - CHECK(IsHttpSuccessCode(Response.status_code)); + WaitForCompletion(Servers.GetInstance(1), Response); } ValidateAttachments(1, "proj0_copy", "oplog0_copy"); ValidateOplog(1, "proj0_copy", "oplog0_copy"); @@ -3025,7 +3047,7 @@ TEST_CASE("project.remote") Session.SetBody(AsBody(Payload)); Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}}); cpr::Response Response = Session.Post(); - CHECK(IsHttpSuccessCode(Response.status_code)); + WaitForCompletion(Servers.GetInstance(0), Response); } { MakeProject(Session, Servers.GetInstance(1).GetBaseUri(), "proj0_copy"); @@ -3052,7 +3074,7 @@ TEST_CASE("project.remote") Session.SetBody(AsBody(Payload)); Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}}); cpr::Response Response = Session.Post(); - CHECK(IsHttpSuccessCode(Response.status_code)); + WaitForCompletion(Servers.GetInstance(1), Response); } ValidateAttachments(1, "proj0_copy", "oplog0_copy"); ValidateOplog(1, "proj0_copy", "oplog0_copy"); @@ -3086,7 +3108,7 @@ TEST_CASE("project.remote") Session.SetBody(AsBody(Payload)); Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}}); cpr::Response Response = Session.Post(); - CHECK(IsHttpSuccessCode(Response.status_code)); + WaitForCompletion(Servers.GetInstance(0), Response); } { MakeProject(Session, Servers.GetInstance(1).GetBaseUri(), "proj0_copy"); @@ -3113,7 +3135,7 @@ TEST_CASE("project.remote") Session.SetBody(AsBody(Payload)); Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}}); cpr::Response Response = Session.Post(); - CHECK(IsHttpSuccessCode(Response.status_code)); + WaitForCompletion(Servers.GetInstance(1), Response); } ValidateAttachments(1, "proj0_copy", "oplog0_copy"); ValidateOplog(1, "proj0_copy", "oplog0_copy"); @@ -3154,7 +3176,7 @@ TEST_CASE("project.remote") Session.SetBody(AsBody(Payload)); Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}}); cpr::Response Response = Session.Post(); - CHECK(IsHttpSuccessCode(Response.status_code)); + WaitForCompletion(Servers.GetInstance(0), Response); } ValidateAttachments(1, "proj0_copy", "oplog0_copy"); ValidateOplog(1, "proj0_copy", "oplog0_copy"); @@ -3188,7 +3210,7 @@ TEST_CASE("project.remote") Session.SetBody(AsBody(Payload)); Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}}); cpr::Response Response = Session.Post(); - CHECK(IsHttpSuccessCode(Response.status_code)); + WaitForCompletion(Servers.GetInstance(2), Response); } ValidateAttachments(2, "proj1", "oplog1"); ValidateOplog(2, "proj1", "oplog1"); diff --git a/src/zenserver/admin/admin.cpp b/src/zenserver/admin/admin.cpp index 575a10d83..74131e624 100644 --- a/src/zenserver/admin/admin.cpp +++ b/src/zenserver/admin/admin.cpp @@ -3,6 +3,7 @@ #include "admin.h" #include <zencore/compactbinarybuilder.h> +#include <zencore/jobqueue.h> #include <zencore/string.h> #include <zenstore/gc.h> @@ -10,7 +11,9 @@ namespace zen { -HttpAdminService::HttpAdminService(GcScheduler& Scheduler) : m_GcScheduler(Scheduler) +HttpAdminService::HttpAdminService(GcScheduler& Scheduler, JobQueue& BackgroundJobQueue) +: m_GcScheduler(Scheduler) +, m_BackgroundJobQueue(BackgroundJobQueue) { using namespace std::literals; @@ -23,6 +26,141 @@ HttpAdminService::HttpAdminService(GcScheduler& Scheduler) : m_GcScheduler(Sched }, HttpVerb::kGet); + m_Router.AddPattern("jobid", "([[:digit:]]+?)"); + + m_Router.RegisterRoute( + "jobs", + [&](HttpRouterRequest& Req) { + std::vector<JobQueue::JobInfo> Jobs = m_BackgroundJobQueue.GetJobs(); + CbObjectWriter Obj; + Obj.BeginArray("jobs"); + for (const auto& Job : Jobs) + { + Obj.BeginObject(); + Obj.AddInteger("Id", Job.Id.Id); + Obj.AddString("Status", JobQueue::ToString(Job.Status)); + Obj.EndObject(); + } + Obj.EndArray(); + Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Obj.Save()); + }, + HttpVerb::kGet); + + m_Router.RegisterRoute( + "jobs/{jobid}", + [&](HttpRouterRequest& Req) { + const auto& JobIdString = Req.GetCapture(1); + std::optional<uint64_t> JobIdArg = ParseInt<uint64_t>(JobIdString); + if (!JobIdArg) + { + Req.ServerRequest().WriteResponse(HttpResponseCode::BadRequest); + } + JobId Id{.Id = JobIdArg.value_or(0)}; + if (Id.Id == 0) + { + return Req.ServerRequest().WriteResponse(HttpResponseCode::BadRequest, + ZenContentType::kText, + fmt::format("Invalid Job Id: {}", Id.Id)); + } + + std::optional<JobQueue::JobDetails> CurrentState = m_BackgroundJobQueue.Get(Id); + if (!CurrentState) + { + return Req.ServerRequest().WriteResponse(HttpResponseCode::NotFound); + } + + auto WriteState = [](CbObjectWriter& Obj, const JobQueue::State& State) { + if (!State.CurrentOp.empty()) + { + Obj.AddString("CurrentOp"sv, State.CurrentOp); + Obj.AddInteger("CurrentOpPercentComplete"sv, State.CurrentOpPercentComplete); + } + if (!State.Messages.empty()) + { + Obj.BeginArray("Messages"); + for (const std::string& Message : State.Messages) + { + Obj.AddString(Message); + } + Obj.EndArray(); + } + }; + + auto GetAgeAsSeconds = [](std::chrono::system_clock::time_point Start, std::chrono::system_clock::time_point End) { + auto Age = End - Start; + auto Milliseconds = std::chrono::duration_cast<std::chrono::milliseconds>(Age); + return Milliseconds.count() / 1000.0; + }; + + const std::chrono::system_clock::time_point Now = std::chrono::system_clock::now(); + + switch (CurrentState->Status) + { + case JobQueue::Status::Queued: + { + CbObjectWriter Obj; + Obj.AddString("Status"sv, "Queued"sv); + Obj.AddFloat("QueueTimeS", GetAgeAsSeconds(CurrentState->CreateTime, Now)); + Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Obj.Save()); + } + break; + case JobQueue::Status::Running: + { + CbObjectWriter Obj; + Obj.AddString("Status"sv, "Running"sv); + WriteState(Obj, CurrentState->State); + Obj.AddFloat("QueueTimeS", GetAgeAsSeconds(CurrentState->CreateTime, CurrentState->StartTime)); + Obj.AddFloat("RunTimeS", GetAgeAsSeconds(CurrentState->StartTime, Now)); + Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Obj.Save()); + } + break; + case JobQueue::Status::Aborted: + { + CbObjectWriter Obj; + Obj.AddString("Status"sv, "Aborted"sv); + WriteState(Obj, CurrentState->State); + Obj.AddFloat("QueueTimeS", GetAgeAsSeconds(CurrentState->CreateTime, CurrentState->StartTime)); + Obj.AddFloat("RunTimeS", GetAgeAsSeconds(CurrentState->StartTime, CurrentState->EndTime)); + Obj.AddFloat("CompleteTimeS", GetAgeAsSeconds(CurrentState->EndTime, Now)); + Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Obj.Save()); + } + break; + case JobQueue::Status::Completed: + { + CbObjectWriter Obj; + Obj.AddString("Status"sv, "Complete"sv); + WriteState(Obj, CurrentState->State); + Obj.AddFloat("QueueTimeS", GetAgeAsSeconds(CurrentState->CreateTime, CurrentState->StartTime)); + Obj.AddFloat("RunTimeS", GetAgeAsSeconds(CurrentState->StartTime, CurrentState->EndTime)); + Obj.AddFloat("CompleteTimeS", GetAgeAsSeconds(CurrentState->EndTime, Now)); + Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Obj.Save()); + } + break; + } + }, + HttpVerb::kGet); + + m_Router.RegisterRoute( + "jobs/{jobid}", + [&](HttpRouterRequest& Req) { + const auto& JobIdString = Req.GetCapture(1); + std::optional<uint64_t> JobIdArg = ParseInt<uint64_t>(JobIdString); + if (!JobIdArg) + { + Req.ServerRequest().WriteResponse(HttpResponseCode::BadRequest); + } + JobId Id{.Id = JobIdArg.value_or(0)}; + if (m_BackgroundJobQueue.CancelJob(Id)) + { + Req.ServerRequest().WriteResponse(HttpResponseCode::OK); + } + else + { + Req.ServerRequest().WriteResponse(HttpResponseCode::NotFound); + } + }, + HttpVerb::kDelete); + m_Router.RegisterRoute( "gc", [this](HttpRouterRequest& Req) { diff --git a/src/zenserver/admin/admin.h b/src/zenserver/admin/admin.h index 9463ffbb3..3152f87ab 100644 --- a/src/zenserver/admin/admin.h +++ b/src/zenserver/admin/admin.h @@ -8,11 +8,12 @@ namespace zen { class GcScheduler; +class JobQueue; class HttpAdminService : public zen::HttpService { public: - HttpAdminService(GcScheduler& Scheduler); + HttpAdminService(GcScheduler& Scheduler, JobQueue& BackgroundJobQueue); ~HttpAdminService(); virtual const char* BaseUri() const override; @@ -21,6 +22,7 @@ public: private: HttpRequestRouter m_Router; GcScheduler& m_GcScheduler; + JobQueue& m_BackgroundJobQueue; }; } // namespace zen diff --git a/src/zenserver/projectstore/fileremoteprojectstore.cpp b/src/zenserver/projectstore/fileremoteprojectstore.cpp index faa748c5d..eeb1f71c4 100644 --- a/src/zenserver/projectstore/fileremoteprojectstore.cpp +++ b/src/zenserver/projectstore/fileremoteprojectstore.cpp @@ -244,10 +244,10 @@ private: bool m_UseTempBlocks = false; }; -std::unique_ptr<RemoteProjectStore> +std::shared_ptr<RemoteProjectStore> CreateFileRemoteStore(const FileRemoteStoreOptions& Options) { - std::unique_ptr<RemoteProjectStore> RemoteStore = std::make_unique<LocalExportProjectStore>(Options.Name, + std::shared_ptr<RemoteProjectStore> RemoteStore = std::make_shared<LocalExportProjectStore>(Options.Name, Options.OptionalBaseName, std::filesystem::path(Options.FolderPath), Options.ForceDisableBlocks, diff --git a/src/zenserver/projectstore/fileremoteprojectstore.h b/src/zenserver/projectstore/fileremoteprojectstore.h index f398bbfbc..8da9692d5 100644 --- a/src/zenserver/projectstore/fileremoteprojectstore.h +++ b/src/zenserver/projectstore/fileremoteprojectstore.h @@ -15,6 +15,6 @@ struct FileRemoteStoreOptions : RemoteStoreOptions bool ForceEnableTempBlocks = false; }; -std::unique_ptr<RemoteProjectStore> CreateFileRemoteStore(const FileRemoteStoreOptions& Options); +std::shared_ptr<RemoteProjectStore> CreateFileRemoteStore(const FileRemoteStoreOptions& Options); } // namespace zen diff --git a/src/zenserver/projectstore/jupiterremoteprojectstore.cpp b/src/zenserver/projectstore/jupiterremoteprojectstore.cpp index e1a4a9dd4..e59bac6d6 100644 --- a/src/zenserver/projectstore/jupiterremoteprojectstore.cpp +++ b/src/zenserver/projectstore/jupiterremoteprojectstore.cpp @@ -44,13 +44,12 @@ public: { return {.CreateBlocks = m_EnableBlocks, .UseTempBlockFiles = m_UseTempBlocks, - .Description = fmt::format("[cloud] {} as {}/{}/{}{}{}"sv, + .Description = fmt::format("[cloud] {} as {}/{}/{}{}"sv, m_CloudClient->ServiceUrl(), m_Namespace, m_Bucket, m_Key, - m_OptionalBaseKey == IoHash::Zero ? "" : " Base: ", - m_OptionalBaseKey)}; + m_OptionalBaseKey == IoHash::Zero ? "" : fmt::format(" Base {}", m_OptionalBaseKey))}; } virtual SaveResult SaveContainer(const IoBuffer& Payload) override @@ -283,7 +282,7 @@ private: bool m_UseTempBlocks = true; }; -std::unique_ptr<RemoteProjectStore> +std::shared_ptr<RemoteProjectStore> CreateJupiterRemoteStore(const JupiterRemoteStoreOptions& Options, const std::filesystem::path& TempFilePath) { std::string Url = Options.Url; @@ -319,7 +318,7 @@ CreateJupiterRemoteStore(const JupiterRemoteStoreOptions& Options, const std::fi Ref<CloudCacheClient> CloudClient(new CloudCacheClient(ClientOptions, std::move(TokenProvider))); - std::unique_ptr<RemoteProjectStore> RemoteStore = std::make_unique<JupiterRemoteStore>(std::move(CloudClient), + std::shared_ptr<RemoteProjectStore> RemoteStore = std::make_shared<JupiterRemoteStore>(std::move(CloudClient), Options.Namespace, Options.Bucket, Options.Key, diff --git a/src/zenserver/projectstore/jupiterremoteprojectstore.h b/src/zenserver/projectstore/jupiterremoteprojectstore.h index 4ae6c88cb..27f3d9b73 100644 --- a/src/zenserver/projectstore/jupiterremoteprojectstore.h +++ b/src/zenserver/projectstore/jupiterremoteprojectstore.h @@ -23,7 +23,7 @@ struct JupiterRemoteStoreOptions : RemoteStoreOptions bool AssumeHttp2 = false; }; -std::unique_ptr<RemoteProjectStore> CreateJupiterRemoteStore(const JupiterRemoteStoreOptions& Options, +std::shared_ptr<RemoteProjectStore> CreateJupiterRemoteStore(const JupiterRemoteStoreOptions& Options, const std::filesystem::path& TempFilePath); } // namespace zen diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp index 6ec18904e..2fd6d492e 100644 --- a/src/zenserver/projectstore/projectstore.cpp +++ b/src/zenserver/projectstore/projectstore.cpp @@ -8,6 +8,7 @@ #include <zencore/compactbinaryvalidation.h> #include <zencore/filesystem.h> #include <zencore/fmtutils.h> +#include <zencore/jobqueue.h> #include <zencore/logging.h> #include <zencore/scopeguard.h> #include <zencore/stream.h> @@ -72,15 +73,20 @@ namespace { } while (true); } - std::pair<std::unique_ptr<RemoteProjectStore>, std::string> CreateRemoteStore(CbObjectView Params, - AuthMgr& AuthManager, - size_t MaxBlockSize, - size_t MaxChunkEmbedSize, - const std::filesystem::path& TempFilePath) + struct CreateRemoteStoreResult + { + std::shared_ptr<RemoteProjectStore> Store; + std::string Description; + }; + CreateRemoteStoreResult CreateRemoteStore(CbObjectView Params, + AuthMgr& AuthManager, + size_t MaxBlockSize, + size_t MaxChunkEmbedSize, + const std::filesystem::path& TempFilePath) { using namespace std::literals; - std::unique_ptr<RemoteProjectStore> RemoteStore; + std::shared_ptr<RemoteProjectStore> RemoteStore; if (CbObjectView File = Params["file"sv].AsObjectView(); File) { @@ -1565,11 +1571,12 @@ ProjectStore::Project::TouchOplog(std::string_view Oplog) const ////////////////////////////////////////////////////////////////////////// -ProjectStore::ProjectStore(CidStore& Store, std::filesystem::path BasePath, GcManager& Gc) +ProjectStore::ProjectStore(CidStore& Store, std::filesystem::path BasePath, GcManager& Gc, JobQueue& JobQueue) : GcStorage(Gc) , GcContributor(Gc) , m_Log(logging::Get("project")) , m_CidStore(Store) +, m_JobQueue(JobQueue) , m_ProjectBasePath(BasePath) , m_DiskWriteBlocker(Gc.GetDiskWriteBlocker()) { @@ -2330,7 +2337,8 @@ ProjectStore::WriteOplog(const std::string_view ProjectId, const std::string_vie Attachments.insert(RawHash); }; - RemoteProjectStore::Result RemoteResult = SaveOplogContainer(*Oplog, ContainerObject, HasAttachment, OnNeedBlock, OnNeedAttachment); + RemoteProjectStore::Result RemoteResult = + SaveOplogContainer(*Oplog, ContainerObject, HasAttachment, OnNeedBlock, OnNeedAttachment, nullptr); if (RemoteResult.ErrorCode) { @@ -2536,12 +2544,7 @@ ProjectStore::Rpc(HttpServerRequest& HttpReq, } else if (Method == "export"sv) { - std::pair<HttpResponseCode, std::string> Result = Export(*Project.Get(), *Oplog, Cb["params"sv].AsObjectView(), AuthManager); - if (Result.second.empty()) - { - HttpReq.WriteResponse(Result.first); - return Result.first != HttpResponseCode::BadRequest; - } + std::pair<HttpResponseCode, std::string> Result = Export(Project, *Oplog, Cb["params"sv].AsObjectView(), AuthManager); HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second); return true; } @@ -2748,7 +2751,7 @@ ProjectStore::Rpc(HttpServerRequest& HttpReq, } std::pair<HttpResponseCode, std::string> -ProjectStore::Export(ProjectStore::Project& Project, ProjectStore::Oplog& Oplog, CbObjectView&& Params, AuthMgr& AuthManager) +ProjectStore::Export(Ref<ProjectStore::Project> Project, ProjectStore::Oplog& Oplog, CbObjectView&& Params, AuthMgr& AuthManager) { ZEN_TRACE_CPU("ProjectStore::Export"); @@ -2759,35 +2762,52 @@ ProjectStore::Export(ProjectStore::Project& Project, ProjectStore::Oplog& Oplog, bool Force = Params["force"sv].AsBool(false); bool EmbedLooseFile = Params["embedloosefiles"sv].AsBool(false); - std::pair<std::unique_ptr<RemoteProjectStore>, std::string> RemoteStoreResult = - CreateRemoteStore(Params, AuthManager, MaxBlockSize, MaxChunkEmbedSize, Oplog.TempPath()); + CreateRemoteStoreResult RemoteStoreResult = CreateRemoteStore(Params, AuthManager, MaxBlockSize, MaxChunkEmbedSize, Oplog.TempPath()); - if (RemoteStoreResult.first == nullptr) + if (RemoteStoreResult.Store == nullptr) { - return {HttpResponseCode::BadRequest, RemoteStoreResult.second}; + return {HttpResponseCode::BadRequest, RemoteStoreResult.Description}; } - std::unique_ptr<RemoteProjectStore> RemoteStore = std::move(RemoteStoreResult.first); + std::shared_ptr<RemoteProjectStore> RemoteStore = std::move(RemoteStoreResult.Store); RemoteProjectStore::RemoteStoreInfo StoreInfo = RemoteStore->GetInfo(); ZEN_INFO("Saving oplog '{}/{}' to {}, maxblocksize {}, maxchunkembedsize {}", - Project.Identifier, + Project->Identifier, Oplog.OplogId(), StoreInfo.Description, NiceBytes(MaxBlockSize), NiceBytes(MaxChunkEmbedSize)); - RemoteProjectStore::Result Result = SaveOplog(m_CidStore, - *RemoteStore, - Project, - Oplog, - MaxBlockSize, - MaxChunkEmbedSize, - EmbedLooseFile, - StoreInfo.CreateBlocks, - StoreInfo.UseTempBlockFiles, - Force); + JobId JobId = m_JobQueue.QueueJob([this, + ActualRemoteStore = std::move(RemoteStore), + Project, + OplogPtr = &Oplog, + MaxBlockSize, + MaxChunkEmbedSize, + EmbedLooseFile, + CreateBlocks = StoreInfo.CreateBlocks, + UseTempBlockFiles = StoreInfo.UseTempBlockFiles, + Force](JobContext& Context) { + RemoteProjectStore::Result Result = SaveOplog(m_CidStore, + *ActualRemoteStore, + *Project.Get(), + *OplogPtr, + MaxBlockSize, + MaxChunkEmbedSize, + EmbedLooseFile, + CreateBlocks, + UseTempBlockFiles, + Force, + &Context); + auto Response = ConvertResult(Result); + ZEN_INFO("SaveOplog: Status: {} '{}'", ToString(Response.first), Response.second); + if (!IsHttpSuccessCode(Response.first)) + { + throw std::runtime_error(fmt::format("Export failed. Status '{}'. Reason: '{}'", ToString(Response.first), Response.second)); + } + }); - return ConvertResult(Result); + return {HttpResponseCode::Accepted, fmt::format("{}", JobId.Id)}; } std::pair<HttpResponseCode, std::string> @@ -2801,19 +2821,29 @@ ProjectStore::Import(ProjectStore::Project& Project, ProjectStore::Oplog& Oplog, size_t MaxChunkEmbedSize = Params["maxchunkembedsize"sv].AsUInt64(1024u * 1024u); bool Force = Params["force"sv].AsBool(false); - std::pair<std::unique_ptr<RemoteProjectStore>, std::string> RemoteStoreResult = - CreateRemoteStore(Params, AuthManager, MaxBlockSize, MaxChunkEmbedSize, Oplog.TempPath()); + CreateRemoteStoreResult RemoteStoreResult = CreateRemoteStore(Params, AuthManager, MaxBlockSize, MaxChunkEmbedSize, Oplog.TempPath()); - if (RemoteStoreResult.first == nullptr) + if (RemoteStoreResult.Store == nullptr) { - return {HttpResponseCode::BadRequest, RemoteStoreResult.second}; + return {HttpResponseCode::BadRequest, RemoteStoreResult.Description}; } - std::unique_ptr<RemoteProjectStore> RemoteStore = std::move(RemoteStoreResult.first); + std::shared_ptr<RemoteProjectStore> RemoteStore = std::move(RemoteStoreResult.Store); RemoteProjectStore::RemoteStoreInfo StoreInfo = RemoteStore->GetInfo(); ZEN_INFO("Loading oplog '{}/{}' from {}", Project.Identifier, Oplog.OplogId(), StoreInfo.Description); - RemoteProjectStore::Result Result = LoadOplog(m_CidStore, *RemoteStore, Oplog, Force); - return ConvertResult(Result); + JobId JobId = m_JobQueue.QueueJob( + [this, ActualRemoteStore = std::move(RemoteStore), OplogPtr = &Oplog, MaxBlockSize, MaxChunkEmbedSize, Force](JobContext& Context) { + RemoteProjectStore::Result Result = LoadOplog(m_CidStore, *ActualRemoteStore, *OplogPtr, Force, &Context); + auto Response = ConvertResult(Result); + ZEN_INFO("LoadOplog: Status: {} '{}'", ToString(Response.first), Response.second); + if (!IsHttpSuccessCode(Response.first)) + { + throw std::runtime_error( + fmt::format("Import failed. Status '{}'. Reason: '{}'", ToString(Response.first), Response.second)); + } + }); + + return {HttpResponseCode::Accepted, fmt::format("{}", JobId.Id)}; } bool @@ -2909,6 +2939,7 @@ TEST_CASE("project.store.create") ScopedTemporaryDirectory TempDir; + auto JobQueue = MakeJobQueue(1, ""sv); GcManager Gc; CidStore CidStore(Gc); CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; @@ -2916,7 +2947,7 @@ TEST_CASE("project.store.create") std::string_view ProjectName("proj1"sv); std::filesystem::path BasePath = TempDir.Path() / "projectstore"; - ProjectStore ProjectStore(CidStore, BasePath, Gc); + ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue); std::filesystem::path RootDir = TempDir.Path() / "root"; std::filesystem::path EngineRootDir = TempDir.Path() / "engine"; std::filesystem::path ProjectRootDir = TempDir.Path() / "game"; @@ -2938,13 +2969,14 @@ TEST_CASE("project.store.lifetimes") ScopedTemporaryDirectory TempDir; + auto JobQueue = MakeJobQueue(1, ""sv); GcManager Gc; CidStore CidStore(Gc); CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; CidStore.Initialize(CidConfig); std::filesystem::path BasePath = TempDir.Path() / "projectstore"; - ProjectStore ProjectStore(CidStore, BasePath, Gc); + ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue); std::filesystem::path RootDir = TempDir.Path() / "root"; std::filesystem::path EngineRootDir = TempDir.Path() / "engine"; std::filesystem::path ProjectRootDir = TempDir.Path() / "game"; @@ -2976,13 +3008,14 @@ TEST_CASE("project.store.gc") ScopedTemporaryDirectory TempDir; + auto JobQueue = MakeJobQueue(1, ""sv); GcManager Gc; CidStore CidStore(Gc); CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; CidStore.Initialize(CidConfig); std::filesystem::path BasePath = TempDir.Path() / "projectstore"; - ProjectStore ProjectStore(CidStore, BasePath, Gc); + ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue); std::filesystem::path RootDir = TempDir.Path() / "root"; std::filesystem::path EngineRootDir = TempDir.Path() / "engine"; @@ -3135,13 +3168,14 @@ TEST_CASE("project.store.partial.read") ScopedTemporaryDirectory TempDir; + auto JobQueue = MakeJobQueue(1, ""sv); GcManager Gc; CidStore CidStore(Gc); CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas"sv, .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; CidStore.Initialize(CidConfig); std::filesystem::path BasePath = TempDir.Path() / "projectstore"sv; - ProjectStore ProjectStore(CidStore, BasePath, Gc); + ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue); std::filesystem::path RootDir = TempDir.Path() / "root"sv; std::filesystem::path EngineRootDir = TempDir.Path() / "engine"sv; diff --git a/src/zenserver/projectstore/projectstore.h b/src/zenserver/projectstore/projectstore.h index a2f92fb25..aa84d04ca 100644 --- a/src/zenserver/projectstore/projectstore.h +++ b/src/zenserver/projectstore/projectstore.h @@ -20,6 +20,7 @@ class CbPackage; class CidStore; class AuthMgr; class ScrubContext; +class JobQueue; enum class HttpResponseCode; @@ -64,7 +65,7 @@ class ProjectStore : public RefCounted, public GcStorage, public GcContributor struct OplogStorage; public: - ProjectStore(CidStore& Store, std::filesystem::path BasePath, GcManager& Gc); + ProjectStore(CidStore& Store, std::filesystem::path BasePath, GcManager& Gc, JobQueue& JobQueue); ~ProjectStore(); struct Project; @@ -329,10 +330,10 @@ public: IoBuffer&& Payload, AuthMgr& AuthManager); - std::pair<HttpResponseCode, std::string> Export(ProjectStore::Project& Project, - ProjectStore::Oplog& Oplog, - CbObjectView&& Params, - AuthMgr& AuthManager); + std::pair<HttpResponseCode, std::string> Export(Ref<ProjectStore::Project> Project, + ProjectStore::Oplog& Oplog, + CbObjectView&& Params, + AuthMgr& AuthManager); std::pair<HttpResponseCode, std::string> Import(ProjectStore::Project& Project, ProjectStore::Oplog& Oplog, @@ -344,6 +345,7 @@ public: private: spdlog::logger& m_Log; CidStore& m_CidStore; + JobQueue& m_JobQueue; std::filesystem::path m_ProjectBasePath; mutable RwLock m_ProjectsLock; std::map<std::string, Ref<Project>> m_Projects; diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp index 235166659..aca9410a2 100644 --- a/src/zenserver/projectstore/remoteprojectstore.cpp +++ b/src/zenserver/projectstore/remoteprojectstore.cpp @@ -71,6 +71,27 @@ private: std::string m_ErrorText; }; +void +ReportProgress(JobContext* OptionalContext, std::string_view CurrentOp, ptrdiff_t Total, ptrdiff_t Remaining) +{ + if (OptionalContext) + { + ZEN_ASSERT(Total > 0); + OptionalContext->Queue.ReportProgress(OptionalContext->Id, CurrentOp, gsl::narrow<uint32_t>((100 * (Total - Remaining)) / Total)); + } + ZEN_INFO("{}", CurrentOp); +} + +void +ReportMessage(JobContext* OptionalContext, std::string_view Message) +{ + if (OptionalContext) + { + OptionalContext->Queue.ReportMessage(OptionalContext->Id, Message); + } + ZEN_INFO("{}", Message); +} + bool IterateBlock(IoBuffer&& CompressedBlock, std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor) { @@ -201,6 +222,7 @@ BuildContainer(CidStore& ChunkStore, const std::function<void(const IoHash&)>& OnLargeAttachment, const std::function<void(const std::unordered_set<IoHash, IoHash::Hasher>)>& OnBlockChunks, tsl::robin_map<IoHash, IoBuffer, IoHash::Hasher>* OutLooseAttachments, + JobContext* OptionalContext, AsyncRemoteResult& RemoteResult) { using namespace std::literals; @@ -217,8 +239,6 @@ BuildContainer(CidStore& ChunkStore, std::vector<Block> Blocks; CompressedBuffer OpsBuffer; - Latch BlockCreateLatch(1); - std::unordered_set<IoHash, IoHash::Hasher> BlockAttachmentHashes; size_t BlockSize = 0; @@ -365,11 +385,15 @@ BuildContainer(CidStore& ChunkStore, CB(RewrittenOp); }; - ZEN_INFO("Building exported oplog and fetching attachments"); + ReportMessage(OptionalContext, "Building exported oplog and fetching attachments"); tsl::robin_map<int, std::string> OpLSNToKey; Oplog.IterateOplogWithKey([&](int LSN, const Oid&, CbObject Op) { + if (RemoteResult.IsError()) + { + return; + } std::string_view Key = Op["key"sv].AsString(); OpLSNToKey.insert({LSN, std::string(Key)}); Op.IterateAttachments([&](CbFieldView FieldView) { Attachments.insert({FieldView.AsAttachment(), LSN}); }); @@ -382,12 +406,23 @@ BuildContainer(CidStore& ChunkStore, SectionOpsWriter << Op; } OpCount++; + if (OptionalContext && OptionalContext->CancelFlag) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); + } }); + if (OptionalContext && OptionalContext->CancelFlag) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); + return {}; + } + if (!Attachments.empty() && !KnownBlocks.empty()) { + ReportMessage(OptionalContext, fmt::format("Checking {} known blocks for reuse", KnownBlocks.size())); + size_t ReusedBlockCount = 0; - ZEN_INFO("Checking {} known blocks for reuse", KnownBlocks.size()); for (const Block& KnownBlock : KnownBlocks) { size_t BlockAttachmentCount = KnownBlock.ChunksInBlock.size(); @@ -428,10 +463,10 @@ BuildContainer(CidStore& ChunkStore, ReusePercent); } } - ZEN_INFO("Reusing {} out of {} known blocks", ReusedBlockCount, KnownBlocks.size()); + ReportMessage(OptionalContext, fmt::format("Reusing {} out of {} known blocks", ReusedBlockCount, KnownBlocks.size())); } - ZEN_INFO("Sorting {} attachments from {} ops", Attachments.size(), OpLSNToKey.size()); + ReportMessage(OptionalContext, fmt::format("Sorting {} attachments from {} ops", Attachments.size(), OpLSNToKey.size())); // Sort attachments so we get predictable blocks for the same oplog upload std::vector<IoHash> SortedAttachments; @@ -456,7 +491,15 @@ BuildContainer(CidStore& ChunkStore, return LhsKeyIt->second < RhsKeyIt->second; }); - ZEN_INFO("Assembling {} attachments from {} ops into blocks and loose attachments", SortedAttachments.size(), OpLSNToKey.size()); + if (OptionalContext && OptionalContext->CancelFlag) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); + return {}; + } + ReportMessage(OptionalContext, + fmt::format("Assembling {} attachments from {} ops into blocks and loose attachments", + SortedAttachments.size(), + OpLSNToKey.size())); auto GetPayload = [&](const IoHash& AttachmentHash) { if (OutLooseAttachments != nullptr) @@ -474,8 +517,20 @@ BuildContainer(CidStore& ChunkStore, size_t GeneratedBlockCount = 0; size_t LargeAttachmentCount = 0; + Latch BlockCreateLatch(1); for (const IoHash& AttachmentHash : SortedAttachments) { + if (OptionalContext && OptionalContext->CancelFlag) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); + BlockCreateLatch.CountDown(); + while (!BlockCreateLatch.Wait(1000)) + { + ZEN_INFO("Aborting, {} blocks remaining...", BlockCreateLatch.Remaining()); + } + return {}; + } + auto It = Attachments.find(AttachmentHash); ZEN_ASSERT(It != Attachments.end()); IoBuffer Payload = GetPayload(AttachmentHash); @@ -595,19 +650,67 @@ BuildContainer(CidStore& ChunkStore, } SectionOpsWriter.EndArray(); // "ops" - ZEN_INFO("Assembled {} attachments from {} ops into {} blocks and {} loose attachments", - SortedAttachments.size(), - OpLSNToKey.size(), - GeneratedBlockCount, - LargeAttachmentCount); + if (OptionalContext && OptionalContext->CancelFlag) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); + BlockCreateLatch.CountDown(); + while (!BlockCreateLatch.Wait(1000)) + { + ZEN_INFO("Aborting, {} blocks remaining...", BlockCreateLatch.Remaining()); + } + return {}; + } + ReportMessage(OptionalContext, + fmt::format("Assembled {} attachments from {} ops into {} blocks and {} loose attachments", + SortedAttachments.size(), + OpLSNToKey.size(), + GeneratedBlockCount, + LargeAttachmentCount)); CompressedBuffer CompressedOpsSection = CompressedBuffer::Compress(SectionOpsWriter.Save().GetBuffer()); - ZEN_DEBUG("Added oplog section {}, {}", CompressedOpsSection.DecodeRawHash(), NiceBytes(CompressedOpsSection.GetCompressedSize())); + if (OptionalContext && OptionalContext->CancelFlag) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); + BlockCreateLatch.CountDown(); + while (!BlockCreateLatch.Wait(1000)) + { + ptrdiff_t Remaining = BlockCreateLatch.Remaining(); + ReportProgress(OptionalContext, fmt::format("Aborting, {} blocks remaining...", Remaining), GeneratedBlockCount, Remaining); + } + if (GeneratedBlockCount > 0) + { + ReportProgress(OptionalContext, fmt::format("Aborting, {} blocks remaining...", 0), GeneratedBlockCount, 0); + } + return {}; + } + ReportMessage(OptionalContext, + fmt::format("Added oplog section {}, {}", + CompressedOpsSection.DecodeRawHash(), + NiceBytes(CompressedOpsSection.GetCompressedSize()))); BlockCreateLatch.CountDown(); while (!BlockCreateLatch.Wait(1000)) { - ZEN_INFO("Creating blocks, {} remaining...", BlockCreateLatch.Remaining()); + ptrdiff_t Remaining = BlockCreateLatch.Remaining(); + if (OptionalContext && OptionalContext->CancelFlag) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); + while (!BlockCreateLatch.Wait(1000)) + { + Remaining = BlockCreateLatch.Remaining(); + ReportProgress(OptionalContext, + fmt::format("Aborting, {} blocks remaining...", Remaining), + GeneratedBlockCount, + Remaining); + } + ReportProgress(OptionalContext, fmt::format("Creating blocks, {} remaining...", 0), GeneratedBlockCount, 0); + return {}; + } + ReportProgress(OptionalContext, fmt::format("Creating blocks, {} remaining...", Remaining), GeneratedBlockCount, Remaining); + } + if (GeneratedBlockCount > 0) + { + ReportProgress(OptionalContext, fmt::format("Creating blocks, {} remaining...", 0), GeneratedBlockCount, 0); } if (!RemoteResult.IsError()) @@ -703,6 +806,7 @@ BuildContainer(CidStore& ChunkStore, OnLargeAttachment, OnBlockChunks, OutOptionalTempAttachments, + nullptr, RemoteResult); return RemoteProjectStore::LoadContainerResult{RemoteResult.ConvertResult(), ContainerObject}; } @@ -717,7 +821,8 @@ SaveOplog(CidStore& ChunkStore, bool EmbedLooseFiles, bool BuildBlocks, bool UseTempBlocks, - bool ForceUpload) + bool ForceUpload, + JobContext* OptionalContext) { using namespace std::literals; @@ -831,7 +936,7 @@ SaveOplog(CidStore& ChunkStore, if (BuildBlocks) { - ZEN_INFO("Loading oplog base container"); + ReportMessage(OptionalContext, "Loading oplog base container"); RemoteProjectStore::LoadContainerResult BaseContainerResult = RemoteStore.LoadBaseContainer(); if (BaseContainerResult.ErrorCode != static_cast<int>(HttpResponseCode::NoContent)) { @@ -864,6 +969,7 @@ SaveOplog(CidStore& ChunkStore, KnownBlocks.push_back({.BlockHash = BlockHash, .ChunksInBlock = std::move(ChunksInBlock)}); }; } + ReportMessage(OptionalContext, fmt::format("Loading oplog base container in {:.3} s", BaseContainerResult.ElapsedSeconds)); } } @@ -880,13 +986,22 @@ SaveOplog(CidStore& ChunkStore, OnLargeAttachment, OnBlockChunks, EmbedLooseFiles ? &TempAttachments : nullptr, + OptionalContext, /* out */ RemoteResult); if (!RemoteResult.IsError()) { + if (OptionalContext && OptionalContext->CancelFlag) + { + RemoteProjectStore::Result Result = {.ErrorCode = 0, + .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500, + .Text = "Operation cancelled"}; + return Result; + } + uint64_t ChunkCount = OplogContainerObject["chunks"sv].AsArrayView().Num(); uint64_t BlockCount = OplogContainerObject["blocks"sv].AsArrayView().Num(); - ZEN_INFO("Saving oplog container with {} attachments and {} blocks...", ChunkCount, BlockCount); + ReportMessage(OptionalContext, fmt::format("Saving oplog container with {} attachments and {} blocks...", ChunkCount, BlockCount)); RemoteProjectStore::SaveResult ContainerSaveResult = RemoteStore.SaveContainer(OplogContainerObject.GetBuffer().AsIoBuffer()); if (ContainerSaveResult.ErrorCode) @@ -901,7 +1016,16 @@ SaveOplog(CidStore& ChunkStore, if (!ContainerSaveResult.Needs.empty() || ForceUpload) { - ZEN_INFO("Filtering needed attachments..."); + if (OptionalContext && OptionalContext->CancelFlag) + { + RemoteProjectStore::Result Result = {.ErrorCode = 0, + .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500, + .Text = "Operation cancelled"}; + return Result; + } + + ReportMessage(OptionalContext, "Filtering needed attachments..."); + std::vector<IoHash> NeededLargeAttachments; std::unordered_set<IoHash, IoHash::Hasher> NeededOtherAttachments; NeededLargeAttachments.reserve(LargeAttachments.size()); @@ -924,10 +1048,18 @@ SaveOplog(CidStore& ChunkStore, } } - Latch SaveAttachmentsLatch(1); + ptrdiff_t AttachmentsToSave(0); + Latch SaveAttachmentsLatch(1); if (!NeededLargeAttachments.empty()) { - ZEN_INFO("Saving large attachments..."); + if (OptionalContext && OptionalContext->CancelFlag) + { + RemoteProjectStore::Result Result = {.ErrorCode = 0, + .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500, + .Text = "Operation cancelled"}; + return Result; + } + ReportMessage(OptionalContext, "Saving large attachments..."); for (const IoHash& RawHash : NeededLargeAttachments) { if (RemoteResult.IsError()) @@ -946,6 +1078,7 @@ SaveOplog(CidStore& ChunkStore, } SaveAttachmentsLatch.AddCount(1); + AttachmentsToSave++; WorkerPool.ScheduleWork([&ChunkStore, &RemoteStore, &SaveAttachmentsLatch, @@ -994,7 +1127,7 @@ SaveOplog(CidStore& ChunkStore, if (!CreatedBlocks.empty()) { - ZEN_INFO("Saving created block attachments..."); + ReportMessage(OptionalContext, "Saving created block attachments..."); for (auto& It : CreatedBlocks) { if (RemoteResult.IsError()) @@ -1007,6 +1140,7 @@ SaveOplog(CidStore& ChunkStore, IoBuffer Payload = It.second; ZEN_ASSERT(Payload); SaveAttachmentsLatch.AddCount(1); + AttachmentsToSave++; WorkerPool.ScheduleWork( [&ChunkStore, &RemoteStore, &SaveAttachmentsLatch, &RemoteResult, Payload = std::move(Payload), RawHash]() { auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); }); @@ -1041,7 +1175,7 @@ SaveOplog(CidStore& ChunkStore, if (!BlockChunks.empty()) { - ZEN_INFO("Saving chunk block attachments..."); + ReportMessage(OptionalContext, "Saving chunk block attachments..."); for (const std::vector<IoHash>& Chunks : BlockChunks) { if (RemoteResult.IsError()) @@ -1069,6 +1203,7 @@ SaveOplog(CidStore& ChunkStore, } } SaveAttachmentsLatch.AddCount(1); + AttachmentsToSave++; WorkerPool.ScheduleWork([&RemoteStore, &ChunkStore, &SaveAttachmentsLatch, @@ -1111,14 +1246,28 @@ SaveOplog(CidStore& ChunkStore, SaveAttachmentsLatch.CountDown(); while (!SaveAttachmentsLatch.Wait(1000)) { - ZEN_INFO("Saving attachments, {} remaining...", SaveAttachmentsLatch.Remaining()); + ptrdiff_t Remaining = SaveAttachmentsLatch.Remaining(); + if (OptionalContext && OptionalContext->CancelFlag) + { + if (!RemoteResult.IsError()) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); + } + } + ReportProgress(OptionalContext, + fmt::format("Saving attachments, {} remaining...", Remaining), + AttachmentsToSave, + Remaining); + } + if (AttachmentsToSave > 0) + { + ReportProgress(OptionalContext, fmt::format("Saving attachments, {} remaining...", 0), AttachmentsToSave, 0); } - SaveAttachmentsLatch.Wait(); } if (!RemoteResult.IsError()) { - ZEN_INFO("Finalizing oplog container..."); + ReportMessage(OptionalContext, "Finalizing oplog container..."); RemoteProjectStore::Result ContainerFinalizeResult = RemoteStore.FinalizeContainer(ContainerSaveResult.RawHash); if (ContainerFinalizeResult.ErrorCode) { @@ -1145,13 +1294,15 @@ SaveOplogContainer(ProjectStore::Oplog& Oplog, const CbObject& ContainerObject, const std::function<bool(const IoHash& RawHash)>& HasAttachment, const std::function<void(const IoHash& BlockHash, std::vector<IoHash>&& Chunks)>& OnNeedBlock, - const std::function<void(const IoHash& RawHash)>& OnNeedAttachment) + const std::function<void(const IoHash& RawHash)>& OnNeedAttachment, + JobContext* OptionalContext) { using namespace std::literals; Stopwatch Timer; - CbArrayView LargeChunksArray = ContainerObject["chunks"sv].AsArrayView(); + size_t NeedAttachmentCount = 0; + CbArrayView LargeChunksArray = ContainerObject["chunks"sv].AsArrayView(); for (CbFieldView LargeChunksField : LargeChunksArray) { IoHash AttachmentHash = LargeChunksField.AsBinaryAttachment(); @@ -1161,8 +1312,10 @@ SaveOplogContainer(ProjectStore::Oplog& Oplog, } OnNeedAttachment(AttachmentHash); }; + ReportMessage(OptionalContext, fmt::format("Requesting {} of {} large attachements", NeedAttachmentCount, LargeChunksArray.Num())); - CbArrayView BlocksArray = ContainerObject["blocks"sv].AsArrayView(); + size_t NeedBlockCount = 0; + CbArrayView BlocksArray = ContainerObject["blocks"sv].AsArrayView(); for (CbFieldView BlockField : BlocksArray) { CbObjectView BlockView = BlockField.AsObjectView(); @@ -1202,6 +1355,7 @@ SaveOplogContainer(ProjectStore::Oplog& Oplog, break; } }; + ReportMessage(OptionalContext, fmt::format("Requesting {} of {} attachment blocks", NeedBlockCount, BlocksArray.Num())); MemoryView OpsSection = ContainerObject["ops"sv].AsBinaryView(); IoBuffer OpsBuffer(IoBuffer::Wrap, OpsSection.GetData(), OpsSection.GetSize()); @@ -1218,6 +1372,7 @@ SaveOplogContainer(ProjectStore::Oplog& Oplog, } CbArrayView OpsArray = SectionObject["ops"sv].AsArrayView(); + ReportMessage(OptionalContext, fmt::format("Writing {} ops to oplog", OpsArray.Num())); for (CbFieldView OpEntry : OpsArray) { CbObjectView Core = OpEntry.AsObjectView(); @@ -1240,7 +1395,11 @@ SaveOplogContainer(ProjectStore::Oplog& Oplog, } RemoteProjectStore::Result -LoadOplog(CidStore& ChunkStore, RemoteProjectStore& RemoteStore, ProjectStore::Oplog& Oplog, bool ForceDownload) +LoadOplog(CidStore& ChunkStore, + RemoteProjectStore& RemoteStore, + ProjectStore::Oplog& Oplog, + bool ForceDownload, + JobContext* OptionalContext) { using namespace std::literals; @@ -1263,20 +1422,23 @@ LoadOplog(CidStore& ChunkStore, RemoteProjectStore& RemoteStore, ProjectStore::O .Reason = LoadContainerResult.Reason, .Text = LoadContainerResult.Text}; } - ZEN_DEBUG("Loaded container in {}", NiceTimeSpanMs(static_cast<uint64_t>(LoadContainerResult.ElapsedSeconds * 1000))); + ReportMessage(OptionalContext, + fmt::format("Loaded container in {}", NiceTimeSpanMs(static_cast<uint64_t>(LoadContainerResult.ElapsedSeconds * 1000)))); - AsyncRemoteResult RemoteResult; - Latch AttachmentsWorkLatch(1); + AsyncRemoteResult RemoteResult; + Latch AttachmentsWorkLatch(1); + std::atomic_size_t AttachmentCount = 0; auto HasAttachment = [&ChunkStore, ForceDownload](const IoHash& RawHash) { return !ForceDownload && ChunkStore.ContainsChunk(RawHash); }; - auto OnNeedBlock = [&RemoteStore, &ChunkStore, &WorkerPool, &ChunksInBlocks, &AttachmentsWorkLatch, &RemoteResult]( + auto OnNeedBlock = [&RemoteStore, &ChunkStore, &WorkerPool, &ChunksInBlocks, &AttachmentsWorkLatch, &AttachmentCount, &RemoteResult]( const IoHash& BlockHash, std::vector<IoHash>&& Chunks) { if (BlockHash == IoHash::Zero) { AttachmentsWorkLatch.AddCount(1); + AttachmentCount.fetch_add(1); WorkerPool.ScheduleWork([&RemoteStore, &ChunkStore, &AttachmentsWorkLatch, &RemoteResult, Chunks = std::move(Chunks)]() { auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); }); if (RemoteResult.IsError()) @@ -1305,6 +1467,7 @@ LoadOplog(CidStore& ChunkStore, RemoteProjectStore& RemoteStore, ProjectStore::O return; } AttachmentsWorkLatch.AddCount(1); + AttachmentCount.fetch_add(1); WorkerPool.ScheduleWork([&AttachmentsWorkLatch, &ChunkStore, &RemoteStore, BlockHash, &RemoteResult]() { auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); }); if (RemoteResult.IsError()) @@ -1339,57 +1502,70 @@ LoadOplog(CidStore& ChunkStore, RemoteProjectStore& RemoteStore, ProjectStore::O }); }; - auto OnNeedAttachment = - [&RemoteStore, &ChunkStore, &WorkerPool, &AttachmentsWorkLatch, &RemoteResult, &Attachments](const IoHash& RawHash) { - if (!Attachments.insert(RawHash).second) + auto OnNeedAttachment = [&RemoteStore, &ChunkStore, &WorkerPool, &AttachmentsWorkLatch, &RemoteResult, &Attachments, &AttachmentCount]( + const IoHash& RawHash) { + if (!Attachments.insert(RawHash).second) + { + return; + } + + AttachmentsWorkLatch.AddCount(1); + AttachmentCount.fetch_add(1); + WorkerPool.ScheduleWork([&RemoteStore, &ChunkStore, &RemoteResult, &AttachmentsWorkLatch, RawHash]() { + auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); }); + if (RemoteResult.IsError()) { return; } - - AttachmentsWorkLatch.AddCount(1); - WorkerPool.ScheduleWork([&RemoteStore, &ChunkStore, &RemoteResult, &AttachmentsWorkLatch, RawHash]() { - auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); }); - if (RemoteResult.IsError()) - { - return; - } - RemoteProjectStore::LoadAttachmentResult AttachmentResult = RemoteStore.LoadAttachment(RawHash); - if (AttachmentResult.ErrorCode) - { - RemoteResult.SetError(AttachmentResult.ErrorCode, AttachmentResult.Reason, AttachmentResult.Text); - ZEN_ERROR("Failed to download attachment {}, reason: '{}', error code: {}", - RawHash, - AttachmentResult.Reason, - AttachmentResult.ErrorCode); - return; - } - ZEN_DEBUG("Loaded attachment in {}", NiceTimeSpanMs(static_cast<uint64_t>(AttachmentResult.ElapsedSeconds * 1000))); - ChunkStore.AddChunk(AttachmentResult.Bytes, RawHash); - }); - }; + RemoteProjectStore::LoadAttachmentResult AttachmentResult = RemoteStore.LoadAttachment(RawHash); + if (AttachmentResult.ErrorCode) + { + RemoteResult.SetError(AttachmentResult.ErrorCode, AttachmentResult.Reason, AttachmentResult.Text); + ZEN_ERROR("Failed to download attachment {}, reason: '{}', error code: {}", + RawHash, + AttachmentResult.Reason, + AttachmentResult.ErrorCode); + return; + } + ZEN_DEBUG("Loaded attachment in {}", NiceTimeSpanMs(static_cast<uint64_t>(AttachmentResult.ElapsedSeconds * 1000))); + ChunkStore.AddChunk(AttachmentResult.Bytes, RawHash); + }); + }; RemoteProjectStore::Result Result = - SaveOplogContainer(Oplog, LoadContainerResult.ContainerObject, HasAttachment, OnNeedBlock, OnNeedAttachment); + SaveOplogContainer(Oplog, LoadContainerResult.ContainerObject, HasAttachment, OnNeedBlock, OnNeedAttachment, OptionalContext); if (!Attachments.empty()) { - ZEN_INFO("Found {} attachments to download", Attachments.size()); + ReportMessage(OptionalContext, fmt::format("Found {} attachments to download", Attachments.size())); } AttachmentsWorkLatch.CountDown(); while (!AttachmentsWorkLatch.Wait(1000)) { - ZEN_INFO("Loading attachments, {} remaining...", AttachmentsWorkLatch.Remaining()); + ptrdiff_t Remaining = AttachmentsWorkLatch.Remaining(); + if (OptionalContext && OptionalContext->CancelFlag) + { + if (!RemoteResult.IsError()) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); + } + } + ReportProgress(OptionalContext, fmt::format("Loading attachments, {} remaining...", Remaining), AttachmentCount.load(), Remaining); + } + if (AttachmentCount.load() > 0) + { + ReportProgress(OptionalContext, fmt::format("Loading attachments, {} remaining...", 0), AttachmentCount.load(), 0); } - AttachmentsWorkLatch.Wait(); if (Result.ErrorCode == 0) { Result = RemoteResult.ConvertResult(); } Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; - ZEN_INFO("Loaded oplog {} in {}", - RemoteResult.GetError() == 0 ? "SUCCESS" : "FAILURE", - NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0))); + ReportMessage(OptionalContext, + fmt::format("Loaded oplog {} in {}", + RemoteResult.GetError() == 0 ? "SUCCESS" : "FAILURE", + NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0)))); return Result; } diff --git a/src/zenserver/projectstore/remoteprojectstore.h b/src/zenserver/projectstore/remoteprojectstore.h index 3134fdb4a..501a5eeec 100644 --- a/src/zenserver/projectstore/remoteprojectstore.h +++ b/src/zenserver/projectstore/remoteprojectstore.h @@ -2,6 +2,7 @@ #pragma once +#include <zencore/jobqueue.h> #include "projectstore.h" #include <unordered_set> @@ -93,11 +94,14 @@ RemoteProjectStore::LoadContainerResult BuildContainer( tsl::robin_map<IoHash, IoBuffer, IoHash::Hasher>* OutOptionalTempAttachments); // Set OutOptionalTempAttachments to nullptr to avoid embedding loose "additional files" +struct JobContext; + RemoteProjectStore::Result SaveOplogContainer(ProjectStore::Oplog& Oplog, const CbObject& ContainerObject, const std::function<bool(const IoHash& RawHash)>& HasAttachment, const std::function<void(const IoHash& BlockHash, std::vector<IoHash>&& Chunks)>& OnNeedBlock, - const std::function<void(const IoHash& RawHash)>& OnNeedAttachment); + const std::function<void(const IoHash& RawHash)>& OnNeedAttachment, + JobContext* OptionalContext); RemoteProjectStore::Result SaveOplog(CidStore& ChunkStore, RemoteProjectStore& RemoteStore, @@ -108,9 +112,14 @@ RemoteProjectStore::Result SaveOplog(CidStore& ChunkStore, bool EmbedLooseFiles, bool BuildBlocks, bool UseTempBlocks, - bool ForceUpload); - -RemoteProjectStore::Result LoadOplog(CidStore& ChunkStore, RemoteProjectStore& RemoteStore, ProjectStore::Oplog& Oplog, bool ForceDownload); + bool ForceUpload, + JobContext* OptionalContext); + +RemoteProjectStore::Result LoadOplog(CidStore& ChunkStore, + RemoteProjectStore& RemoteStore, + ProjectStore::Oplog& Oplog, + bool ForceDownload, + JobContext* OptionalContext); CompressedBuffer GenerateBlock(std::vector<SharedBuffer>&& Chunks); bool IterateBlock(IoBuffer&& CompressedBlock, std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor); diff --git a/src/zenserver/projectstore/zenremoteprojectstore.cpp b/src/zenserver/projectstore/zenremoteprojectstore.cpp index 72a7f00f8..c25fd2388 100644 --- a/src/zenserver/projectstore/zenremoteprojectstore.cpp +++ b/src/zenserver/projectstore/zenremoteprojectstore.cpp @@ -376,7 +376,7 @@ private: const size_t m_MaxChunkEmbedSize; }; -std::unique_ptr<RemoteProjectStore> +std::shared_ptr<RemoteProjectStore> CreateZenRemoteStore(const ZenRemoteStoreOptions& Options) { std::string Url = Options.Url; @@ -385,8 +385,8 @@ CreateZenRemoteStore(const ZenRemoteStoreOptions& Options) // Assume http URL Url = fmt::format("http://{}"sv, Url); } - std::unique_ptr<RemoteProjectStore> RemoteStore = - std::make_unique<ZenRemoteStore>(Url, Options.ProjectId, Options.OplogId, Options.MaxBlockSize, Options.MaxChunkEmbedSize); + std::shared_ptr<RemoteProjectStore> RemoteStore = + std::make_shared<ZenRemoteStore>(Url, Options.ProjectId, Options.OplogId, Options.MaxBlockSize, Options.MaxChunkEmbedSize); return RemoteStore; } diff --git a/src/zenserver/projectstore/zenremoteprojectstore.h b/src/zenserver/projectstore/zenremoteprojectstore.h index ef9dcad8c..9f079ee74 100644 --- a/src/zenserver/projectstore/zenremoteprojectstore.h +++ b/src/zenserver/projectstore/zenremoteprojectstore.h @@ -13,6 +13,6 @@ struct ZenRemoteStoreOptions : RemoteStoreOptions std::string OplogId; }; -std::unique_ptr<RemoteProjectStore> CreateZenRemoteStore(const ZenRemoteStoreOptions& Options); +std::shared_ptr<RemoteProjectStore> CreateZenRemoteStore(const ZenRemoteStoreOptions& Options); } // namespace zen diff --git a/src/zenserver/zenserver.cpp b/src/zenserver/zenserver.cpp index cfc4a228b..8056b6506 100644 --- a/src/zenserver/zenserver.cpp +++ b/src/zenserver/zenserver.cpp @@ -6,6 +6,7 @@ #include <zencore/filesystem.h> #include <zencore/fmtutils.h> #include <zencore/iobuffer.h> +#include <zencore/jobqueue.h> #include <zencore/logging.h> #include <zencore/refcount.h> #include <zencore/scopeguard.h> @@ -271,6 +272,8 @@ public: InitializeState(ServerOptions); + m_JobQueue = MakeJobQueue(8, "backgroundjobs"); + m_HealthService.SetHealthInfo({.DataRoot = m_DataRoot, .AbsLogPath = ServerOptions.AbsLogFile, .HttpServerClass = std::string(ServerOptions.HttpServerClass), @@ -341,7 +344,7 @@ public: ZEN_INFO("instantiating project service"); - m_ProjectStore = new zen::ProjectStore(*m_CidStore, m_DataRoot / "projects", m_GcManager); + m_ProjectStore = new zen::ProjectStore(*m_CidStore, m_DataRoot / "projects", m_GcManager, *m_JobQueue); m_HttpProjectService.reset(new zen::HttpProjectService{*m_CidStore, m_ProjectStore, m_StatsService, *m_AuthMgr}); #if ZEN_WITH_COMPUTE_SERVICES @@ -365,7 +368,6 @@ public: } m_Http->RegisterService(m_TestService); // NOTE: this is intentionally not limited to test mode as it's useful for diagnostics - m_Http->RegisterService(m_AdminService); #if ZEN_WITH_TESTS m_Http->RegisterService(m_TestingService); @@ -431,6 +433,10 @@ public: .MinimumFreeDiskSpaceToAllowWrites = ServerOptions.GcConfig.MinimumFreeDiskSpaceToAllowWrites}; m_GcScheduler.Initialize(GcConfig); + // Create and register admin interface last to make sure all is properly initialized + m_AdminService = std::make_unique<HttpAdminService>(m_GcScheduler, *m_JobQueue); + m_Http->RegisterService(*m_AdminService); + return EffectiveBasePort; } @@ -498,6 +504,11 @@ public: } Flush(); + + if (m_JobQueue) + { + m_JobQueue->Stop(); + } } void RequestExit(int ExitCode) @@ -733,13 +744,14 @@ private: std::unique_ptr<zen::UpstreamCache> m_UpstreamCache; std::unique_ptr<zen::HttpUpstreamService> m_UpstreamService; std::unique_ptr<zen::HttpStructuredCacheService> m_StructuredCacheService; - zen::HttpAdminService m_AdminService{m_GcScheduler}; zen::HttpHealthService m_HealthService; #if ZEN_WITH_COMPUTE_SERVICES std::unique_ptr<zen::HttpFunctionService> m_HttpFunctionService; #endif // ZEN_WITH_COMPUTE_SERVICES std::unique_ptr<zen::HttpFrontendService> m_FrontendService; std::unique_ptr<zen::HttpObjectStoreService> m_ObjStoreService; + std::unique_ptr<JobQueue> m_JobQueue; + std::unique_ptr<zen::HttpAdminService> m_AdminService; bool m_DebugOptionForcedCrash = false; bool m_UseSentry = false; |