aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2023-09-13 16:13:30 -0400
committerGitHub <[email protected]>2023-09-13 22:13:30 +0200
commitb2cef5900b6e251bed4bc0a02161fd90646d37f0 (patch)
treee9085a92e9499bca55dfda9b63779be94218409f /src
parentscan oplog object for fields (#397) (diff)
downloadzen-b2cef5900b6e251bed4bc0a02161fd90646d37f0.tar.xz
zen-b2cef5900b6e251bed4bc0a02161fd90646d37f0.zip
job queue and async oplog-import/export (#395)
- Feature: New http endpoint for background jobs `/admin/jobs/status` which will return a response listing the currently active background jobs and their status - Feature: New http endpoint for background jobs information `/admin/jobs/status/{jobid}` which will return a response detailing status, pending messages and progress status - GET will return a response detailing status, pending messages and progress status - DELETE will mark the job for cancelling and return without waiting for completion - If status returned is "Complete" or "Aborted" the jobid will be removed from the server and can not be queried again - Feature: New zen command `jobs` to list, get info about and cancel background jobs - If no options are given it will display a list of active background jobs - `--jobid` accepts an id (returned from for example `oplog-export` with `--async`) and will return a response detailing status, pending messages and progress status for that job - `--cancel` can be added when `--jobid` is given which will request zenserver to cancel the background job - Feature: oplog import and export http rpc requests are now async operations that will run in the background - Feature: `oplog-export` and `oplog-import` now reports progress to the console as work progress by default - Feature: `oplog-export` and `oplog-import` can now be cancelled using Ctrl+C - Feature: `oplog-export` and `oplog-import` has a new option `--async` which will only trigger the work and report a background job id back
Diffstat (limited to 'src')
-rw-r--r--src/zen/cmds/jobs.cpp82
-rw-r--r--src/zen/cmds/jobs.h27
-rw-r--r--src/zen/cmds/projectstore.cpp156
-rw-r--r--src/zen/cmds/projectstore.h2
-rw-r--r--src/zen/zen.cpp3
-rw-r--r--src/zencore/include/zencore/jobqueue.h84
-rw-r--r--src/zencore/jobqueue.cpp486
-rw-r--r--src/zencore/zencore.cpp2
-rw-r--r--src/zenserver-test/zenserver-test.cpp38
-rw-r--r--src/zenserver/admin/admin.cpp140
-rw-r--r--src/zenserver/admin/admin.h4
-rw-r--r--src/zenserver/projectstore/fileremoteprojectstore.cpp4
-rw-r--r--src/zenserver/projectstore/fileremoteprojectstore.h2
-rw-r--r--src/zenserver/projectstore/jupiterremoteprojectstore.cpp9
-rw-r--r--src/zenserver/projectstore/jupiterremoteprojectstore.h2
-rw-r--r--src/zenserver/projectstore/projectstore.cpp120
-rw-r--r--src/zenserver/projectstore/projectstore.h12
-rw-r--r--src/zenserver/projectstore/remoteprojectstore.cpp306
-rw-r--r--src/zenserver/projectstore/remoteprojectstore.h17
-rw-r--r--src/zenserver/projectstore/zenremoteprojectstore.cpp6
-rw-r--r--src/zenserver/projectstore/zenremoteprojectstore.h2
-rw-r--r--src/zenserver/zenserver.cpp18
22 files changed, 1369 insertions, 153 deletions
diff --git a/src/zen/cmds/jobs.cpp b/src/zen/cmds/jobs.cpp
new file mode 100644
index 000000000..137c321af
--- /dev/null
+++ b/src/zen/cmds/jobs.cpp
@@ -0,0 +1,82 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "jobs.h"
+
+#include <zencore/fmtutils.h>
+#include <zencore/logging.h>
+#include <zencore/string.h>
+#include <zencore/uid.h>
+#include <zenhttp/formatters.h>
+#include <zenhttp/httpclient.h>
+
+namespace zen {
+
+////////////////////////////////////////////
+
+JobCommand::JobCommand()
+{
+ m_Options.add_options()("h,help", "Print help");
+ m_Options.add_option("", "u", "hosturl", "Host URL", cxxopts::value(m_HostName)->default_value(""), "<hosturl>");
+ m_Options.add_option("", "j", "jobid", "Job id", cxxopts::value(m_JobId), "<jobid>");
+ m_Options.add_option("", "c", "cancel", "Cancel job id", cxxopts::value(m_Cancel), "<cancel>");
+}
+
+JobCommand::~JobCommand() = default;
+
+int
+JobCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
+{
+ ZEN_UNUSED(GlobalOptions);
+
+ using namespace std::literals;
+
+ if (!ParseOptions(argc, argv))
+ {
+ return 0;
+ }
+
+ m_HostName = ResolveTargetHostSpec(m_HostName);
+
+ if (m_HostName.empty())
+ {
+ throw OptionParseException("unable to resolve server specification");
+ }
+
+ HttpClient Http(m_HostName);
+
+ if (m_Cancel)
+ {
+ if (m_JobId == 0)
+ {
+ ZEN_ERROR("Job id must be given");
+ return 1;
+ }
+ }
+ std::string Url = m_JobId != 0 ? fmt::format("/admin/jobs/{}", m_JobId) : "/admin/jobs";
+
+ if (m_Cancel)
+ {
+ if (HttpClient::Response Result = Http.Delete(Url, HttpClient::Accept(ZenContentType::kJSON)))
+ {
+ ZEN_CONSOLE("{}", Result);
+ }
+ else
+ {
+ Result.ThrowError("failed cancelling job"sv);
+ return 1;
+ }
+ }
+ else if (HttpClient::Response Result = Http.Get(Url, HttpClient::Accept(ZenContentType::kJSON)))
+ {
+ ZEN_CONSOLE("{}", Result.AsText());
+ }
+ else
+ {
+ Result.ThrowError("failed fetching job info"sv);
+ return 1;
+ }
+
+ return 0;
+}
+
+} // namespace zen
diff --git a/src/zen/cmds/jobs.h b/src/zen/cmds/jobs.h
new file mode 100644
index 000000000..2c523f24a
--- /dev/null
+++ b/src/zen/cmds/jobs.h
@@ -0,0 +1,27 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include "../zen.h"
+
+namespace zen {
+
+////////////////////////////////////////////
+
+class JobCommand : public ZenCmdBase
+{
+public:
+ JobCommand();
+ ~JobCommand();
+
+ virtual int Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) override;
+ virtual cxxopts::Options& Options() override { return m_Options; }
+
+private:
+ cxxopts::Options m_Options{"jobs", "Show/cancel zen background jobs"};
+ std::string m_HostName;
+ std::uint64_t m_JobId = 0;
+ bool m_Cancel = 0;
+};
+
+} // namespace zen
diff --git a/src/zen/cmds/projectstore.cpp b/src/zen/cmds/projectstore.cpp
index c3bfeaa4f..edeff7d85 100644
--- a/src/zen/cmds/projectstore.cpp
+++ b/src/zen/cmds/projectstore.cpp
@@ -17,6 +17,8 @@ ZEN_THIRD_PARTY_INCLUDES_START
#include <cpr/cpr.h>
ZEN_THIRD_PARTY_INCLUDES_END
+#include <signal.h>
+
namespace zen {
namespace {
@@ -42,6 +44,116 @@ namespace {
return Payload;
};
+ static std::atomic_uint32_t SignalCounter[NSIG] = {0};
+
+ static void SignalCallbackHandler(int SigNum)
+ {
+ if (SigNum >= 0 && SigNum < NSIG)
+ {
+ SignalCounter[SigNum].fetch_add(1);
+ }
+ }
+
+ void AsyncPost(HttpClient& Http, std::string_view Url, IoBuffer&& Payload)
+ {
+ if (HttpClient::Response Result = Http.Post(Url, Payload))
+ {
+ if (Result.StatusCode == HttpResponseCode::Accepted)
+ {
+ signal(SIGINT, SignalCallbackHandler);
+ bool Cancelled = false;
+
+ std::string_view JobIdText = Result.AsText();
+ std::optional<uint64_t> JobIdMaybe = ParseInt<uint64_t>(JobIdText);
+ if (!JobIdMaybe)
+ {
+ Result.ThrowError("invalid job id"sv);
+ }
+
+ std::string LastCurrentOp;
+ uint32_t LastCurrentOpPercentComplete = 0;
+
+ uint64_t JobId = JobIdMaybe.value();
+ while (true)
+ {
+ HttpClient::Response StatusResult =
+ Http.Get(fmt::format("/admin/jobs/{}", JobId), HttpClient::Accept(ZenContentType::kCbObject));
+ if (!StatusResult)
+ {
+ StatusResult.ThrowError("failed to create project"sv);
+ }
+ CbObject StatusObject = StatusResult.AsObject();
+ std::string_view Status = StatusObject["Status"sv].AsString();
+ CbArrayView Messages = StatusObject["Messages"sv].AsArrayView();
+ for (auto M : Messages)
+ {
+ std::string_view Message = M.AsString();
+ ZEN_CONSOLE("{}", Message);
+ }
+ if (Status == "Complete")
+ {
+ if (Cancelled)
+ {
+ ZEN_CONSOLE("Cancelled");
+ }
+ else
+ {
+ double QueueTimeS = StatusObject["QueueTimeS"].AsDouble();
+ double RuntimeS = StatusObject["RunTimeS"].AsDouble();
+ ZEN_CONSOLE("Completed: QueueTime: {:.3} s, RunTime: {:.3} s", QueueTimeS, RuntimeS);
+ }
+ break;
+ }
+ if (Status == "Aborted")
+ {
+ Result.ThrowError("Aborted");
+ break;
+ }
+ if (Status == "Queued")
+ {
+ double QueueTimeS = StatusObject["QueueTimeS"].AsDouble();
+ ZEN_CONSOLE("Queued, waited {:.3} s...", QueueTimeS);
+ }
+ if (Status == "Running")
+ {
+ std::string_view CurrentOp = StatusObject["CurrentOp"sv].AsString();
+ uint32_t CurrentOpPercentComplete = StatusObject["CurrentOpPercentComplete"sv].AsUInt32();
+ if (CurrentOp != LastCurrentOp || CurrentOpPercentComplete != LastCurrentOpPercentComplete)
+ {
+ LastCurrentOp = CurrentOp;
+ LastCurrentOpPercentComplete = CurrentOpPercentComplete;
+ ZEN_CONSOLE("{} {}%", CurrentOp, CurrentOpPercentComplete);
+ }
+ }
+ uint32_t AbortCounter = SignalCounter[SIGINT].load();
+ if (SignalCounter[SIGINT] > 0)
+ {
+ SignalCounter[SIGINT].fetch_sub(AbortCounter);
+ if (HttpClient::Response DeleteResult = Http.Delete(fmt::format("/admin/jobs/{}", JobId)))
+ {
+ ZEN_CONSOLE("Requested cancel...");
+ Cancelled = true;
+ }
+ else
+ {
+ ZEN_CONSOLE("Failed cancelling job {}", DeleteResult);
+ }
+ continue;
+ }
+ Sleep(100);
+ }
+ }
+ else
+ {
+ ZEN_CONSOLE("{}", Result);
+ }
+ }
+ else
+ {
+ Result.ThrowError("failed to start operation"sv);
+ }
+ }
+
} // namespace
///////////////////////////////////////
@@ -481,6 +593,7 @@ ExportOplogCommand::ExportOplogCommand()
"Disable block creation and save all attachments individually (applies to file and cloud target)",
cxxopts::value(m_DisableBlocks),
"<disable>");
+ m_Options.add_option("", "a", "async", "Trigger export but don't wait for completion", cxxopts::value(m_Async), "<async>");
m_Options.add_option("", "", "cloud", "Cloud Storage URL", cxxopts::value(m_CloudUrl), "<url>");
m_Options.add_option("cloud", "", "namespace", "Cloud Storage namespace", cxxopts::value(m_CloudNamespace), "<namespace>");
@@ -693,6 +806,7 @@ ExportOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg
{
Writer.AddBool("force"sv, true);
}
+ Writer.AddBool("async"sv, true);
if (!m_FileDirectoryPath.empty())
{
Writer.BeginObject("file"sv);
@@ -793,16 +907,26 @@ ExportOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg
ZEN_CONSOLE("Saving oplog '{}/{}' from '{}' to {}", m_ProjectName, m_OplogName, m_HostName, TargetDescription);
HttpClient Http(m_HostName);
- if (HttpClient::Response Result = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", m_ProjectName, m_OplogName), Payload))
+ if (m_Async)
{
- ZEN_CONSOLE("{}", Result);
- return 0;
+ if (HttpClient::Response Result = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", m_ProjectName, m_OplogName),
+ std::move(Payload),
+ HttpClient::Accept(ZenContentType::kJSON));
+ Result)
+ {
+ ZEN_CONSOLE("{}", Result.AsText());
+ }
+ else
+ {
+ Result.ThrowError("failed requesting loading oplog export"sv);
+ return 1;
+ }
}
else
{
- Result.ThrowError("failed to create project"sv);
- return 1;
+ AsyncPost(Http, fmt::format("/prj/{}/oplog/{}/rpc", m_ProjectName, m_OplogName), std::move(Payload));
}
+ return 0;
}
////////////////////////////
@@ -821,6 +945,7 @@ ImportOplogCommand::ImportOplogCommand()
cxxopts::value(m_MaxChunkEmbedSize),
"<chunksize>");
m_Options.add_option("", "f", "force", "Force import of all attachments", cxxopts::value(m_Force), "<force>");
+ m_Options.add_option("", "a", "async", "Trigger import but don't wait for completion", cxxopts::value(m_Async), "<async>");
m_Options.add_option("", "", "cloud", "Cloud Storage URL", cxxopts::value(m_CloudUrl), "<url>");
m_Options.add_option("cloud", "", "namespace", "Cloud Storage namespace", cxxopts::value(m_CloudNamespace), "<namespace>");
@@ -1050,16 +1175,27 @@ ImportOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg
});
ZEN_CONSOLE("Loading oplog '{}/{}' from '{}' to {}", m_ProjectName, m_OplogName, SourceDescription, m_HostName);
- if (HttpClient::Response Result = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", m_ProjectName, m_OplogName), Payload))
+
+ if (m_Async)
{
- ZEN_CONSOLE("{}", Result);
- return 0;
+ if (HttpClient::Response Result = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", m_ProjectName, m_OplogName),
+ std::move(Payload),
+ HttpClient::Accept(ZenContentType::kJSON));
+ Result)
+ {
+ ZEN_CONSOLE("{}", Result.AsText());
+ }
+ else
+ {
+ Result.ThrowError("failed requesting loading oplog import"sv);
+ return 1;
+ }
}
else
{
- Result.ThrowError("failed to create project"sv);
- return 1;
+ AsyncPost(Http, fmt::format("/prj/{}/oplog/{}/rpc", m_ProjectName, m_OplogName), std::move(Payload));
}
+ return 0;
}
////////////////////////////
diff --git a/src/zen/cmds/projectstore.h b/src/zen/cmds/projectstore.h
index e22357d09..fd1590423 100644
--- a/src/zen/cmds/projectstore.h
+++ b/src/zen/cmds/projectstore.h
@@ -126,6 +126,7 @@ private:
bool m_EmbedLooseFiles = false;
bool m_Force = false;
bool m_DisableBlocks = false;
+ bool m_Async = false;
std::string m_CloudUrl;
std::string m_CloudNamespace;
@@ -167,6 +168,7 @@ private:
size_t m_MaxBlockSize = 0;
size_t m_MaxChunkEmbedSize = 0;
bool m_Force = false;
+ bool m_Async = false;
std::string m_CloudUrl;
std::string m_CloudNamespace;
diff --git a/src/zen/zen.cpp b/src/zen/zen.cpp
index 0ed9226fc..5530b57b6 100644
--- a/src/zen/zen.cpp
+++ b/src/zen/zen.cpp
@@ -10,6 +10,7 @@
#include "cmds/copy.h"
#include "cmds/dedup.h"
#include "cmds/hash.h"
+#include "cmds/jobs.h"
#include "cmds/print.h"
#include "cmds/projectstore.h"
#include "cmds/rpcreplay.h"
@@ -224,6 +225,7 @@ main(int argc, char** argv)
ServeCommand ServeCmd;
SnapshotOplogCommand SnapshotOplogCmd;
StatusCommand StatusCmd;
+ JobCommand JobCmd;
TopCommand TopCmd;
UpCommand UpCmd;
AttachCommand AttachCmd;
@@ -272,6 +274,7 @@ main(int argc, char** argv)
{"scrub", &ScrubCmd, "Scrub zen storage (verify data integrity)"},
{"serve", &ServeCmd, "Serve files from a directory"},
{"status", &StatusCmd, "Show zen status"},
+ {"jobs", &JobCmd, "Show/cancel zen background jobs"},
{"top", &TopCmd, "Monitor zen server activity"},
{"up", &UpCmd, "Bring zen server up"},
{"attach", &AttachCmd, "Add a sponsor process to a running zen service"},
diff --git a/src/zencore/include/zencore/jobqueue.h b/src/zencore/include/zencore/jobqueue.h
new file mode 100644
index 000000000..41a3288e1
--- /dev/null
+++ b/src/zencore/include/zencore/jobqueue.h
@@ -0,0 +1,84 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <chrono>
+#include <functional>
+#include <memory>
+#include <optional>
+#include <string>
+#include <string_view>
+#include <vector>
+
+namespace zen {
+
+struct JobId
+{
+ uint64_t Id;
+};
+
+class JobQueue;
+
+struct JobContext
+{
+ JobQueue& Queue;
+ const JobId Id;
+ std::atomic_bool& CancelFlag;
+};
+
+class JobQueue
+{
+public:
+ typedef std::function<void(JobContext& Context)> JobFunction;
+ virtual ~JobQueue() = default;
+
+ virtual JobId QueueJob(JobFunction&& JobFunc) = 0;
+ virtual bool CancelJob(JobId Id) = 0;
+ virtual void Stop() = 0;
+
+ virtual void ReportMessage(JobId Id, std::string_view Message) = 0;
+ virtual void ReportProgress(JobId Id, std::string_view CurrentOp, uint32_t CurrentOpPercentComplete) = 0;
+
+ enum class Status : uint32_t
+ {
+ Queued,
+ Running,
+ Aborted,
+ Completed
+ };
+
+ struct State
+ {
+ std::string CurrentOp;
+ uint32_t CurrentOpPercentComplete = 0;
+ std::vector<std::string> Messages;
+ };
+
+ struct JobInfo
+ {
+ JobId Id;
+ Status Status;
+ };
+
+ virtual std::vector<JobInfo> GetJobs() = 0;
+
+ struct JobDetails
+ {
+ Status Status;
+ State State;
+ std::chrono::system_clock::time_point CreateTime;
+ std::chrono::system_clock::time_point StartTime;
+ std::chrono::system_clock::time_point EndTime;
+ };
+
+ // Will only respond once when status is Complete or Aborted
+ virtual std::optional<JobDetails> Get(JobId Id) = 0;
+
+ static std::string_view ToString(Status Status);
+};
+
+std::unique_ptr<JobQueue> MakeJobQueue(int WorkerCount, std::string_view QueueName);
+
+void jobqueue_forcelink();
+
+} // namespace zen
diff --git a/src/zencore/jobqueue.cpp b/src/zencore/jobqueue.cpp
new file mode 100644
index 000000000..34530b3a2
--- /dev/null
+++ b/src/zencore/jobqueue.cpp
@@ -0,0 +1,486 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <zencore/jobqueue.h>
+
+#include <zencore/except.h>
+#include <zencore/scopeguard.h>
+#include <zencore/thread.h>
+#include <zencore/workthreadpool.h>
+
+#if ZEN_WITH_TESTS
+# include <zencore/testing.h>
+#endif // ZEN_WITH_TESTS
+
+#include <deque>
+#include <thread>
+#include <unordered_map>
+
+namespace zen {
+
+namespace JobClock {
+ using ClockSource = std::chrono::system_clock;
+
+ using Tick = ClockSource::rep;
+
+ Tick Never() { return ClockSource::time_point::min().time_since_epoch().count(); }
+ Tick Always() { return ClockSource::time_point::max().time_since_epoch().count(); }
+ Tick Now() { return ClockSource::now().time_since_epoch().count(); }
+
+ ClockSource::time_point TimePointFromTick(const Tick TickCount) { return ClockSource::time_point{ClockSource::duration{TickCount}}; }
+} // namespace JobClock
+
+class JobQueueImpl : public JobQueue
+{
+public:
+ struct Job : public RefCounted
+ {
+ JobId Id;
+ JobFunction Callback;
+ std::atomic_bool CancelFlag;
+ State State;
+ JobClock::Tick CreateTick;
+ JobClock::Tick StartTick;
+ JobClock::Tick EndTick;
+ };
+
+ JobQueueImpl(int WorkerCount, std::string_view QueueName) : WorkerPool(WorkerCount, QueueName), WorkerCounter(1)
+ {
+ InitializedFlag.store(true);
+ }
+
+ virtual ~JobQueueImpl()
+ {
+ try
+ {
+ if (InitializedFlag)
+ {
+ Stop();
+ }
+ }
+ catch (std::exception& Ex)
+ {
+ ZEN_WARN("Failed shutting down jobqueue. Reason: '{}'", Ex.what());
+ }
+ }
+
+ virtual JobId QueueJob(JobFunction&& JobFunc) override
+ {
+ ZEN_ASSERT(InitializedFlag);
+
+ uint64_t NewJobId = IdGenerator.fetch_add(1);
+ if (NewJobId == 0)
+ {
+ IdGenerator.fetch_add(1);
+ }
+ RefPtr<Job> NewJob(new Job());
+ NewJob->Callback = std::move(JobFunc);
+ NewJob->CancelFlag = false;
+ NewJob->Id = JobId{.Id = NewJobId};
+ NewJob->CreateTick = JobClock::Now();
+ NewJob->StartTick = JobClock::Never();
+ NewJob->EndTick = JobClock::Never();
+
+ QueueLock.WithExclusiveLock([&]() { QueuedJobs.emplace_back(std::move(NewJob)); });
+ WorkerCounter.AddCount(1);
+ try
+ {
+ WorkerPool.ScheduleWork([&]() {
+ auto _ = MakeGuard([&]() { WorkerCounter.CountDown(); });
+ Worker();
+ });
+ return {.Id = NewJobId};
+ }
+ catch (std::exception& Ex)
+ {
+ WorkerCounter.CountDown();
+ QueueLock.WithExclusiveLock([&]() {
+ if (auto It = std::find_if(QueuedJobs.begin(),
+ QueuedJobs.end(),
+ [NewJobId](const RefPtr<Job>& Job) { return Job->Id.Id == NewJobId; });
+ It != QueuedJobs.end())
+ {
+ QueuedJobs.erase(It);
+ }
+ });
+ ZEN_ERROR("Failed to schedule job to job queue. Reason: ''", Ex.what());
+ throw;
+ }
+ }
+
+ virtual bool CancelJob(JobId Id) override
+ {
+ bool Result = false;
+ QueueLock.WithExclusiveLock([&]() {
+ if (auto It = RunningJobs.find(Id.Id); It != RunningJobs.end())
+ {
+ It->second->CancelFlag.store(true);
+ Result = true;
+ return;
+ }
+ if (auto It = CompletedJobs.find(Id.Id); It != CompletedJobs.end())
+ {
+ Result = true;
+ return;
+ }
+ if (auto It = AbortedJobs.find(Id.Id); It != AbortedJobs.end())
+ {
+ Result = true;
+ return;
+ }
+ if (auto It = std::find_if(QueuedJobs.begin(), QueuedJobs.end(), [&Id](const RefPtr<Job>& Job) { return Job->Id.Id == Id.Id; });
+ It != QueuedJobs.end())
+ {
+ QueuedJobs.erase(It);
+ Result = true;
+ return;
+ }
+ });
+ return Result;
+ }
+
+ virtual void Stop() override
+ {
+ InitializedFlag.store(false);
+ QueueLock.WithExclusiveLock([&]() {
+ for (auto& Job : RunningJobs)
+ {
+ Job.second->CancelFlag.store(true);
+ }
+ QueuedJobs.clear();
+ });
+ WorkerCounter.CountDown();
+ ptrdiff_t Remaining = WorkerCounter.Remaining();
+ while (Remaining > 0)
+ {
+ ZEN_INFO("Waiting for {} background jobs to complete", Remaining);
+ WorkerCounter.Wait(500);
+ QueueLock.WithExclusiveLock([&]() {
+ for (auto& Job : RunningJobs)
+ {
+ Job.second->CancelFlag.store(true);
+ }
+ QueuedJobs.clear();
+ });
+ Remaining = WorkerCounter.Remaining();
+ }
+ }
+
+ virtual void ReportMessage(JobId Id, std::string_view Message) override
+ {
+ QueueLock.WithSharedLock([&]() {
+ auto It = RunningJobs.find(Id.Id);
+ ZEN_ASSERT(It != RunningJobs.end());
+ It->second->State.Messages.push_back(std::string(Message));
+ });
+ }
+
+ virtual void ReportProgress(JobId Id, std::string_view CurrentOp, uint32_t CurrentOpPercentComplete) override
+ {
+ QueueLock.WithSharedLock([&]() {
+ auto It = RunningJobs.find(Id.Id);
+ ZEN_ASSERT(It != RunningJobs.end());
+ It->second->State.CurrentOp = CurrentOp;
+ It->second->State.CurrentOpPercentComplete = CurrentOpPercentComplete;
+ });
+ }
+
+ virtual std::vector<JobInfo> GetJobs() override
+ {
+ std::vector<JobId> DeadJobs;
+ auto IsStale = [](JobClock::Tick Time) {
+ ZEN_ASSERT_SLOW(Time != JobClock::Never());
+ const std::chrono::system_clock::time_point Now = std::chrono::system_clock::now();
+ std::chrono::system_clock::duration Age = Now - JobClock::TimePointFromTick(Time);
+ return std::chrono::duration_cast<std::chrono::days>(Age) > std::chrono::days(1);
+ };
+
+ std::vector<JobInfo> Jobs;
+ QueueLock.WithSharedLock([&]() {
+ for (auto It : RunningJobs)
+ {
+ Jobs.push_back({.Id = JobId{It.first}, .Status = Status::Running});
+ }
+ for (auto It : CompletedJobs)
+ {
+ if (IsStale(It.second->EndTick))
+ {
+ DeadJobs.push_back(JobId{It.first});
+ continue;
+ }
+ Jobs.push_back({.Id = JobId{It.first}, .Status = Status::Completed});
+ }
+ for (auto It : AbortedJobs)
+ {
+ if (IsStale(It.second->EndTick))
+ {
+ DeadJobs.push_back(JobId{It.first});
+ continue;
+ }
+ Jobs.push_back({.Id = JobId{It.first}, .Status = Status::Aborted});
+ }
+ for (auto It : QueuedJobs)
+ {
+ Jobs.push_back({.Id = It->Id, .Status = Status::Queued});
+ }
+ });
+ if (!DeadJobs.empty())
+ {
+ QueueLock.WithExclusiveLock([&]() {
+ for (JobId Id : DeadJobs)
+ {
+ AbortedJobs.erase(Id.Id);
+ CompletedJobs.erase(Id.Id);
+ }
+ });
+ }
+ return Jobs;
+ }
+
+ // Will only respond once when Complete is true
+ virtual std::optional<JobDetails> Get(JobId Id) override
+ {
+ auto Convert = [](Status Status, Job& Job) -> JobDetails {
+ return JobDetails{.Status = Status,
+ .State = {.CurrentOp = Job.State.CurrentOp,
+ .CurrentOpPercentComplete = Job.State.CurrentOpPercentComplete,
+ .Messages = std::move(Job.State.Messages)},
+ .CreateTime = JobClock::TimePointFromTick(Job.CreateTick),
+ .StartTime = JobClock::TimePointFromTick(Job.StartTick),
+ .EndTime = JobClock::TimePointFromTick(Job.EndTick)};
+ };
+
+ std::optional<JobDetails> Result;
+ QueueLock.WithExclusiveLock([&]() {
+ if (auto It = RunningJobs.find(Id.Id); It != RunningJobs.end())
+ {
+ Result = Convert(Status::Running, *It->second);
+ return;
+ }
+ if (auto It = CompletedJobs.find(Id.Id); It != CompletedJobs.end())
+ {
+ Result = Convert(Status::Completed, *It->second);
+ CompletedJobs.erase(It);
+ return;
+ }
+ if (auto It = AbortedJobs.find(Id.Id); It != AbortedJobs.end())
+ {
+ Result = Convert(Status::Aborted, *It->second);
+ AbortedJobs.erase(It);
+ return;
+ }
+ if (auto It = std::find_if(QueuedJobs.begin(), QueuedJobs.end(), [&Id](const RefPtr<Job>& Job) { return Job->Id.Id == Id.Id; });
+ It != QueuedJobs.end())
+ {
+ Result = Convert(Status::Queued, *(*It));
+ return;
+ }
+ });
+ return Result;
+ }
+
+ std::atomic_uint64_t IdGenerator = 1;
+
+ std::atomic_bool InitializedFlag = false;
+ RwLock QueueLock;
+ std::deque<RefPtr<Job>> QueuedJobs;
+ std::unordered_map<uint64_t, Job*> RunningJobs;
+ std::unordered_map<uint64_t, RefPtr<Job>> CompletedJobs;
+ std::unordered_map<uint64_t, RefPtr<Job>> AbortedJobs;
+
+ WorkerThreadPool WorkerPool;
+ Latch WorkerCounter;
+
+ void Worker()
+ {
+ RefPtr<Job> CurrentJob;
+ QueueLock.WithExclusiveLock([&]() {
+ if (!QueuedJobs.empty())
+ {
+ CurrentJob = std::move(QueuedJobs.front());
+ ZEN_ASSERT(CurrentJob);
+ QueuedJobs.pop_front();
+ RunningJobs.insert_or_assign(CurrentJob->Id.Id, CurrentJob);
+ CurrentJob->StartTick = JobClock::Now();
+ }
+ });
+ if (!CurrentJob)
+ {
+ return;
+ }
+
+ try
+ {
+ JobContext Context{.Queue = *this, .Id = CurrentJob->Id, .CancelFlag = CurrentJob->CancelFlag};
+ CurrentJob->Callback(Context);
+ QueueLock.WithExclusiveLock([&]() {
+ CurrentJob->EndTick = JobClock::Now();
+ RunningJobs.erase(CurrentJob->Id.Id);
+ CompletedJobs.insert_or_assign(CurrentJob->Id.Id, std::move(CurrentJob));
+ });
+ }
+ catch (std::exception& Ex)
+ {
+ QueueLock.WithExclusiveLock([&]() {
+ CurrentJob->State.Messages.push_back(Ex.what());
+ CurrentJob->EndTick = JobClock::Now();
+ RunningJobs.erase(CurrentJob->Id.Id);
+ AbortedJobs.insert_or_assign(CurrentJob->Id.Id, std::move(CurrentJob));
+ });
+ }
+ }
+};
+
+std::string_view
+JobQueue::ToString(Status Status)
+{
+ using namespace std::literals;
+
+ switch (Status)
+ {
+ case JobQueue::Status::Queued:
+ return "Queued"sv;
+ break;
+ case JobQueue::Status::Running:
+ return "Running"sv;
+ break;
+ case JobQueue::Status::Aborted:
+ return "Aborted"sv;
+ break;
+ case JobQueue::Status::Completed:
+ return "Completed"sv;
+ break;
+ default:
+ ZEN_ASSERT(false);
+ }
+ return ""sv;
+}
+
+std::unique_ptr<JobQueue>
+MakeJobQueue(int WorkerCount, std::string_view QueueName)
+{
+ return std::make_unique<JobQueueImpl>(WorkerCount, QueueName);
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+#if ZEN_WITH_TESTS
+
+void
+jobqueue_forcelink()
+{
+}
+
+TEST_CASE("JobQueue")
+{
+ std::unique_ptr<JobQueue> Queue(MakeJobQueue(2, "queue"));
+ WorkerThreadPool Pool(4);
+ Latch JobsLatch(1);
+ for (uint32_t I = 0; I < 100; I++)
+ {
+ JobsLatch.AddCount(1);
+ Pool.ScheduleWork([&Queue, &JobsLatch]() {
+ auto _ = MakeGuard([&JobsLatch]() { JobsLatch.CountDown(); });
+ auto Id = Queue->QueueJob([&](JobContext& Context) {
+ if (Context.CancelFlag.load())
+ {
+ return;
+ }
+ Context.Queue.ReportProgress(Context.Id, "going to sleep", 0);
+ Sleep(10);
+ if (Context.CancelFlag.load())
+ {
+ return;
+ }
+ Context.Queue.ReportProgress(Context.Id, "going to sleep again", 50);
+ if ((Context.Id.Id & 0xFF) == 0x10)
+ {
+ zen::ThrowSystemError(8, fmt::format("Job {} forced to fail", Context.Id.Id));
+ }
+ Sleep(10);
+ if (Context.CancelFlag.load())
+ {
+ return;
+ }
+ Context.Queue.ReportProgress(Context.Id, "done", 100);
+ });
+ });
+ }
+
+ auto Join = [](std::span<std::string> Strings, std::string_view Delimiter) -> std::string {
+ ExtendableStringBuilder<128> SB;
+ if (Strings.empty())
+ {
+ return {};
+ }
+ auto It = Strings.begin();
+ SB.Append(*It);
+ It++;
+ while (It != Strings.end())
+ {
+ SB.Append(Delimiter);
+ SB.Append(*It);
+ It++;
+ }
+ return SB.ToString();
+ };
+
+ JobsLatch.CountDown();
+ while (true)
+ {
+ bool PendingQueue = JobsLatch.Remaining() > 0;
+ size_t PendingCount = 0;
+ std::vector<JobId> RemainingJobs;
+ std::vector<JobQueue::JobInfo> Statuses = Queue->GetJobs();
+ RemainingJobs.reserve(Statuses.size());
+ for (const auto& It : Statuses)
+ {
+ JobQueue::Status Status = It.Status;
+
+ JobId Id = It.Id;
+ std::optional<JobQueue::JobDetails> CurrentState;
+ if (Status != JobQueue::Status::Queued)
+ {
+ CurrentState = Queue->Get(Id);
+ CHECK(CurrentState.has_value());
+ }
+ switch (Status)
+ {
+ case JobQueue::Status::Queued:
+ PendingCount++;
+ RemainingJobs.push_back(Id);
+ break;
+ case JobQueue::Status::Running:
+ ZEN_DEBUG("{} running. '{}' {}% '{}'",
+ Id.Id,
+ CurrentState->State.CurrentOp,
+ CurrentState->State.CurrentOpPercentComplete,
+ Join(CurrentState->State.Messages, " "sv));
+ RemainingJobs.push_back(Id);
+ break;
+ case JobQueue::Status::Aborted:
+ ZEN_DEBUG("{} aborted. Reason: '{}'", Id.Id, Join(CurrentState->State.Messages, " "sv));
+ break;
+ case JobQueue::Status::Completed:
+ ZEN_DEBUG("{} completed. '{}'", Id.Id, Join(CurrentState->State.Messages, " "sv));
+ break;
+ default:
+ CHECK(false);
+ break;
+ }
+ }
+ if (RemainingJobs.empty() && !PendingQueue)
+ {
+ break;
+ }
+ ZEN_INFO("{} jobs active, {} pending in queue, {} running",
+ RemainingJobs.size(),
+ PendingCount,
+ RemainingJobs.size() - PendingCount);
+ Sleep(100);
+ }
+ JobsLatch.Wait();
+}
+
+#endif
+
+} // namespace zen
diff --git a/src/zencore/zencore.cpp b/src/zencore/zencore.cpp
index 83e6ea772..84ffee45b 100644
--- a/src/zencore/zencore.cpp
+++ b/src/zencore/zencore.cpp
@@ -20,6 +20,7 @@
#include <zencore/filesystem.h>
#include <zencore/intmath.h>
#include <zencore/iobuffer.h>
+#include <zencore/jobqueue.h>
#include <zencore/logging.h>
#include <zencore/memory.h>
#include <zencore/mpscqueue.h>
@@ -115,6 +116,7 @@ zencore_forcelinktests()
zen::compress_forcelink();
zen::crypto_forcelink();
zen::filesystem_forcelink();
+ zen::jobqueue_forcelink();
zen::intmath_forcelink();
zen::iobuffer_forcelink();
zen::logging_forcelink();
diff --git a/src/zenserver-test/zenserver-test.cpp b/src/zenserver-test/zenserver-test.cpp
index 8c95835fd..2f52e3225 100644
--- a/src/zenserver-test/zenserver-test.cpp
+++ b/src/zenserver-test/zenserver-test.cpp
@@ -2933,6 +2933,28 @@ TEST_CASE("project.remote")
CHECK(SourceOps == TargetOps);
};
+ auto WaitForCompletion = [&Session](ZenServerInstance& Server, const cpr::Response& Response) {
+ CHECK(IsHttpSuccessCode(Response.status_code));
+ uint64_t JobId = ParseInt<uint64_t>(Response.text).value_or(0);
+ CHECK(JobId != 0);
+ Session.SetUrl(fmt::format("{}/admin/jobs/{}", Server.GetBaseUri(), JobId));
+ Session.SetHeader(cpr::Header{{"Accept", std::string(ToString(ZenContentType::kCbObject))}});
+ while (true)
+ {
+ cpr::Response StatusResponse = Session.Get();
+ CHECK(IsHttpSuccessCode(StatusResponse.status_code));
+ CbObject ReponseObject =
+ LoadCompactBinaryObject(IoBuffer(IoBuffer::Wrap, StatusResponse.text.data(), StatusResponse.text.size()));
+ std::string_view Status = ReponseObject["Status"sv].AsString();
+ CHECK(Status != "Aborted"sv);
+ if (Status == "Complete"sv)
+ {
+ return;
+ }
+ Sleep(10);
+ }
+ };
+
SUBCASE("File")
{
ScopedTemporaryDirectory TempDir;
@@ -2961,7 +2983,7 @@ TEST_CASE("project.remote")
Session.SetBody(AsBody(Payload));
Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}});
cpr::Response Response = Session.Post();
- CHECK(IsHttpSuccessCode(Response.status_code));
+ WaitForCompletion(Servers.GetInstance(0), Response);
}
{
MakeProject(Session, Servers.GetInstance(1).GetBaseUri(), "proj0_copy");
@@ -2990,7 +3012,7 @@ TEST_CASE("project.remote")
Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}});
cpr::Response Response = Session.Post();
- CHECK(IsHttpSuccessCode(Response.status_code));
+ WaitForCompletion(Servers.GetInstance(1), Response);
}
ValidateAttachments(1, "proj0_copy", "oplog0_copy");
ValidateOplog(1, "proj0_copy", "oplog0_copy");
@@ -3025,7 +3047,7 @@ TEST_CASE("project.remote")
Session.SetBody(AsBody(Payload));
Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}});
cpr::Response Response = Session.Post();
- CHECK(IsHttpSuccessCode(Response.status_code));
+ WaitForCompletion(Servers.GetInstance(0), Response);
}
{
MakeProject(Session, Servers.GetInstance(1).GetBaseUri(), "proj0_copy");
@@ -3052,7 +3074,7 @@ TEST_CASE("project.remote")
Session.SetBody(AsBody(Payload));
Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}});
cpr::Response Response = Session.Post();
- CHECK(IsHttpSuccessCode(Response.status_code));
+ WaitForCompletion(Servers.GetInstance(1), Response);
}
ValidateAttachments(1, "proj0_copy", "oplog0_copy");
ValidateOplog(1, "proj0_copy", "oplog0_copy");
@@ -3086,7 +3108,7 @@ TEST_CASE("project.remote")
Session.SetBody(AsBody(Payload));
Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}});
cpr::Response Response = Session.Post();
- CHECK(IsHttpSuccessCode(Response.status_code));
+ WaitForCompletion(Servers.GetInstance(0), Response);
}
{
MakeProject(Session, Servers.GetInstance(1).GetBaseUri(), "proj0_copy");
@@ -3113,7 +3135,7 @@ TEST_CASE("project.remote")
Session.SetBody(AsBody(Payload));
Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}});
cpr::Response Response = Session.Post();
- CHECK(IsHttpSuccessCode(Response.status_code));
+ WaitForCompletion(Servers.GetInstance(1), Response);
}
ValidateAttachments(1, "proj0_copy", "oplog0_copy");
ValidateOplog(1, "proj0_copy", "oplog0_copy");
@@ -3154,7 +3176,7 @@ TEST_CASE("project.remote")
Session.SetBody(AsBody(Payload));
Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}});
cpr::Response Response = Session.Post();
- CHECK(IsHttpSuccessCode(Response.status_code));
+ WaitForCompletion(Servers.GetInstance(0), Response);
}
ValidateAttachments(1, "proj0_copy", "oplog0_copy");
ValidateOplog(1, "proj0_copy", "oplog0_copy");
@@ -3188,7 +3210,7 @@ TEST_CASE("project.remote")
Session.SetBody(AsBody(Payload));
Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}});
cpr::Response Response = Session.Post();
- CHECK(IsHttpSuccessCode(Response.status_code));
+ WaitForCompletion(Servers.GetInstance(2), Response);
}
ValidateAttachments(2, "proj1", "oplog1");
ValidateOplog(2, "proj1", "oplog1");
diff --git a/src/zenserver/admin/admin.cpp b/src/zenserver/admin/admin.cpp
index 575a10d83..74131e624 100644
--- a/src/zenserver/admin/admin.cpp
+++ b/src/zenserver/admin/admin.cpp
@@ -3,6 +3,7 @@
#include "admin.h"
#include <zencore/compactbinarybuilder.h>
+#include <zencore/jobqueue.h>
#include <zencore/string.h>
#include <zenstore/gc.h>
@@ -10,7 +11,9 @@
namespace zen {
-HttpAdminService::HttpAdminService(GcScheduler& Scheduler) : m_GcScheduler(Scheduler)
+HttpAdminService::HttpAdminService(GcScheduler& Scheduler, JobQueue& BackgroundJobQueue)
+: m_GcScheduler(Scheduler)
+, m_BackgroundJobQueue(BackgroundJobQueue)
{
using namespace std::literals;
@@ -23,6 +26,141 @@ HttpAdminService::HttpAdminService(GcScheduler& Scheduler) : m_GcScheduler(Sched
},
HttpVerb::kGet);
+ m_Router.AddPattern("jobid", "([[:digit:]]+?)");
+
+ m_Router.RegisterRoute(
+ "jobs",
+ [&](HttpRouterRequest& Req) {
+ std::vector<JobQueue::JobInfo> Jobs = m_BackgroundJobQueue.GetJobs();
+ CbObjectWriter Obj;
+ Obj.BeginArray("jobs");
+ for (const auto& Job : Jobs)
+ {
+ Obj.BeginObject();
+ Obj.AddInteger("Id", Job.Id.Id);
+ Obj.AddString("Status", JobQueue::ToString(Job.Status));
+ Obj.EndObject();
+ }
+ Obj.EndArray();
+ Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Obj.Save());
+ },
+ HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
+ "jobs/{jobid}",
+ [&](HttpRouterRequest& Req) {
+ const auto& JobIdString = Req.GetCapture(1);
+ std::optional<uint64_t> JobIdArg = ParseInt<uint64_t>(JobIdString);
+ if (!JobIdArg)
+ {
+ Req.ServerRequest().WriteResponse(HttpResponseCode::BadRequest);
+ }
+ JobId Id{.Id = JobIdArg.value_or(0)};
+ if (Id.Id == 0)
+ {
+ return Req.ServerRequest().WriteResponse(HttpResponseCode::BadRequest,
+ ZenContentType::kText,
+ fmt::format("Invalid Job Id: {}", Id.Id));
+ }
+
+ std::optional<JobQueue::JobDetails> CurrentState = m_BackgroundJobQueue.Get(Id);
+ if (!CurrentState)
+ {
+ return Req.ServerRequest().WriteResponse(HttpResponseCode::NotFound);
+ }
+
+ auto WriteState = [](CbObjectWriter& Obj, const JobQueue::State& State) {
+ if (!State.CurrentOp.empty())
+ {
+ Obj.AddString("CurrentOp"sv, State.CurrentOp);
+ Obj.AddInteger("CurrentOpPercentComplete"sv, State.CurrentOpPercentComplete);
+ }
+ if (!State.Messages.empty())
+ {
+ Obj.BeginArray("Messages");
+ for (const std::string& Message : State.Messages)
+ {
+ Obj.AddString(Message);
+ }
+ Obj.EndArray();
+ }
+ };
+
+ auto GetAgeAsSeconds = [](std::chrono::system_clock::time_point Start, std::chrono::system_clock::time_point End) {
+ auto Age = End - Start;
+ auto Milliseconds = std::chrono::duration_cast<std::chrono::milliseconds>(Age);
+ return Milliseconds.count() / 1000.0;
+ };
+
+ const std::chrono::system_clock::time_point Now = std::chrono::system_clock::now();
+
+ switch (CurrentState->Status)
+ {
+ case JobQueue::Status::Queued:
+ {
+ CbObjectWriter Obj;
+ Obj.AddString("Status"sv, "Queued"sv);
+ Obj.AddFloat("QueueTimeS", GetAgeAsSeconds(CurrentState->CreateTime, Now));
+ Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Obj.Save());
+ }
+ break;
+ case JobQueue::Status::Running:
+ {
+ CbObjectWriter Obj;
+ Obj.AddString("Status"sv, "Running"sv);
+ WriteState(Obj, CurrentState->State);
+ Obj.AddFloat("QueueTimeS", GetAgeAsSeconds(CurrentState->CreateTime, CurrentState->StartTime));
+ Obj.AddFloat("RunTimeS", GetAgeAsSeconds(CurrentState->StartTime, Now));
+ Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Obj.Save());
+ }
+ break;
+ case JobQueue::Status::Aborted:
+ {
+ CbObjectWriter Obj;
+ Obj.AddString("Status"sv, "Aborted"sv);
+ WriteState(Obj, CurrentState->State);
+ Obj.AddFloat("QueueTimeS", GetAgeAsSeconds(CurrentState->CreateTime, CurrentState->StartTime));
+ Obj.AddFloat("RunTimeS", GetAgeAsSeconds(CurrentState->StartTime, CurrentState->EndTime));
+ Obj.AddFloat("CompleteTimeS", GetAgeAsSeconds(CurrentState->EndTime, Now));
+ Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Obj.Save());
+ }
+ break;
+ case JobQueue::Status::Completed:
+ {
+ CbObjectWriter Obj;
+ Obj.AddString("Status"sv, "Complete"sv);
+ WriteState(Obj, CurrentState->State);
+ Obj.AddFloat("QueueTimeS", GetAgeAsSeconds(CurrentState->CreateTime, CurrentState->StartTime));
+ Obj.AddFloat("RunTimeS", GetAgeAsSeconds(CurrentState->StartTime, CurrentState->EndTime));
+ Obj.AddFloat("CompleteTimeS", GetAgeAsSeconds(CurrentState->EndTime, Now));
+ Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Obj.Save());
+ }
+ break;
+ }
+ },
+ HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
+ "jobs/{jobid}",
+ [&](HttpRouterRequest& Req) {
+ const auto& JobIdString = Req.GetCapture(1);
+ std::optional<uint64_t> JobIdArg = ParseInt<uint64_t>(JobIdString);
+ if (!JobIdArg)
+ {
+ Req.ServerRequest().WriteResponse(HttpResponseCode::BadRequest);
+ }
+ JobId Id{.Id = JobIdArg.value_or(0)};
+ if (m_BackgroundJobQueue.CancelJob(Id))
+ {
+ Req.ServerRequest().WriteResponse(HttpResponseCode::OK);
+ }
+ else
+ {
+ Req.ServerRequest().WriteResponse(HttpResponseCode::NotFound);
+ }
+ },
+ HttpVerb::kDelete);
+
m_Router.RegisterRoute(
"gc",
[this](HttpRouterRequest& Req) {
diff --git a/src/zenserver/admin/admin.h b/src/zenserver/admin/admin.h
index 9463ffbb3..3152f87ab 100644
--- a/src/zenserver/admin/admin.h
+++ b/src/zenserver/admin/admin.h
@@ -8,11 +8,12 @@
namespace zen {
class GcScheduler;
+class JobQueue;
class HttpAdminService : public zen::HttpService
{
public:
- HttpAdminService(GcScheduler& Scheduler);
+ HttpAdminService(GcScheduler& Scheduler, JobQueue& BackgroundJobQueue);
~HttpAdminService();
virtual const char* BaseUri() const override;
@@ -21,6 +22,7 @@ public:
private:
HttpRequestRouter m_Router;
GcScheduler& m_GcScheduler;
+ JobQueue& m_BackgroundJobQueue;
};
} // namespace zen
diff --git a/src/zenserver/projectstore/fileremoteprojectstore.cpp b/src/zenserver/projectstore/fileremoteprojectstore.cpp
index faa748c5d..eeb1f71c4 100644
--- a/src/zenserver/projectstore/fileremoteprojectstore.cpp
+++ b/src/zenserver/projectstore/fileremoteprojectstore.cpp
@@ -244,10 +244,10 @@ private:
bool m_UseTempBlocks = false;
};
-std::unique_ptr<RemoteProjectStore>
+std::shared_ptr<RemoteProjectStore>
CreateFileRemoteStore(const FileRemoteStoreOptions& Options)
{
- std::unique_ptr<RemoteProjectStore> RemoteStore = std::make_unique<LocalExportProjectStore>(Options.Name,
+ std::shared_ptr<RemoteProjectStore> RemoteStore = std::make_shared<LocalExportProjectStore>(Options.Name,
Options.OptionalBaseName,
std::filesystem::path(Options.FolderPath),
Options.ForceDisableBlocks,
diff --git a/src/zenserver/projectstore/fileremoteprojectstore.h b/src/zenserver/projectstore/fileremoteprojectstore.h
index f398bbfbc..8da9692d5 100644
--- a/src/zenserver/projectstore/fileremoteprojectstore.h
+++ b/src/zenserver/projectstore/fileremoteprojectstore.h
@@ -15,6 +15,6 @@ struct FileRemoteStoreOptions : RemoteStoreOptions
bool ForceEnableTempBlocks = false;
};
-std::unique_ptr<RemoteProjectStore> CreateFileRemoteStore(const FileRemoteStoreOptions& Options);
+std::shared_ptr<RemoteProjectStore> CreateFileRemoteStore(const FileRemoteStoreOptions& Options);
} // namespace zen
diff --git a/src/zenserver/projectstore/jupiterremoteprojectstore.cpp b/src/zenserver/projectstore/jupiterremoteprojectstore.cpp
index e1a4a9dd4..e59bac6d6 100644
--- a/src/zenserver/projectstore/jupiterremoteprojectstore.cpp
+++ b/src/zenserver/projectstore/jupiterremoteprojectstore.cpp
@@ -44,13 +44,12 @@ public:
{
return {.CreateBlocks = m_EnableBlocks,
.UseTempBlockFiles = m_UseTempBlocks,
- .Description = fmt::format("[cloud] {} as {}/{}/{}{}{}"sv,
+ .Description = fmt::format("[cloud] {} as {}/{}/{}{}"sv,
m_CloudClient->ServiceUrl(),
m_Namespace,
m_Bucket,
m_Key,
- m_OptionalBaseKey == IoHash::Zero ? "" : " Base: ",
- m_OptionalBaseKey)};
+ m_OptionalBaseKey == IoHash::Zero ? "" : fmt::format(" Base {}", m_OptionalBaseKey))};
}
virtual SaveResult SaveContainer(const IoBuffer& Payload) override
@@ -283,7 +282,7 @@ private:
bool m_UseTempBlocks = true;
};
-std::unique_ptr<RemoteProjectStore>
+std::shared_ptr<RemoteProjectStore>
CreateJupiterRemoteStore(const JupiterRemoteStoreOptions& Options, const std::filesystem::path& TempFilePath)
{
std::string Url = Options.Url;
@@ -319,7 +318,7 @@ CreateJupiterRemoteStore(const JupiterRemoteStoreOptions& Options, const std::fi
Ref<CloudCacheClient> CloudClient(new CloudCacheClient(ClientOptions, std::move(TokenProvider)));
- std::unique_ptr<RemoteProjectStore> RemoteStore = std::make_unique<JupiterRemoteStore>(std::move(CloudClient),
+ std::shared_ptr<RemoteProjectStore> RemoteStore = std::make_shared<JupiterRemoteStore>(std::move(CloudClient),
Options.Namespace,
Options.Bucket,
Options.Key,
diff --git a/src/zenserver/projectstore/jupiterremoteprojectstore.h b/src/zenserver/projectstore/jupiterremoteprojectstore.h
index 4ae6c88cb..27f3d9b73 100644
--- a/src/zenserver/projectstore/jupiterremoteprojectstore.h
+++ b/src/zenserver/projectstore/jupiterremoteprojectstore.h
@@ -23,7 +23,7 @@ struct JupiterRemoteStoreOptions : RemoteStoreOptions
bool AssumeHttp2 = false;
};
-std::unique_ptr<RemoteProjectStore> CreateJupiterRemoteStore(const JupiterRemoteStoreOptions& Options,
+std::shared_ptr<RemoteProjectStore> CreateJupiterRemoteStore(const JupiterRemoteStoreOptions& Options,
const std::filesystem::path& TempFilePath);
} // namespace zen
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp
index 6ec18904e..2fd6d492e 100644
--- a/src/zenserver/projectstore/projectstore.cpp
+++ b/src/zenserver/projectstore/projectstore.cpp
@@ -8,6 +8,7 @@
#include <zencore/compactbinaryvalidation.h>
#include <zencore/filesystem.h>
#include <zencore/fmtutils.h>
+#include <zencore/jobqueue.h>
#include <zencore/logging.h>
#include <zencore/scopeguard.h>
#include <zencore/stream.h>
@@ -72,15 +73,20 @@ namespace {
} while (true);
}
- std::pair<std::unique_ptr<RemoteProjectStore>, std::string> CreateRemoteStore(CbObjectView Params,
- AuthMgr& AuthManager,
- size_t MaxBlockSize,
- size_t MaxChunkEmbedSize,
- const std::filesystem::path& TempFilePath)
+ struct CreateRemoteStoreResult
+ {
+ std::shared_ptr<RemoteProjectStore> Store;
+ std::string Description;
+ };
+ CreateRemoteStoreResult CreateRemoteStore(CbObjectView Params,
+ AuthMgr& AuthManager,
+ size_t MaxBlockSize,
+ size_t MaxChunkEmbedSize,
+ const std::filesystem::path& TempFilePath)
{
using namespace std::literals;
- std::unique_ptr<RemoteProjectStore> RemoteStore;
+ std::shared_ptr<RemoteProjectStore> RemoteStore;
if (CbObjectView File = Params["file"sv].AsObjectView(); File)
{
@@ -1565,11 +1571,12 @@ ProjectStore::Project::TouchOplog(std::string_view Oplog) const
//////////////////////////////////////////////////////////////////////////
-ProjectStore::ProjectStore(CidStore& Store, std::filesystem::path BasePath, GcManager& Gc)
+ProjectStore::ProjectStore(CidStore& Store, std::filesystem::path BasePath, GcManager& Gc, JobQueue& JobQueue)
: GcStorage(Gc)
, GcContributor(Gc)
, m_Log(logging::Get("project"))
, m_CidStore(Store)
+, m_JobQueue(JobQueue)
, m_ProjectBasePath(BasePath)
, m_DiskWriteBlocker(Gc.GetDiskWriteBlocker())
{
@@ -2330,7 +2337,8 @@ ProjectStore::WriteOplog(const std::string_view ProjectId, const std::string_vie
Attachments.insert(RawHash);
};
- RemoteProjectStore::Result RemoteResult = SaveOplogContainer(*Oplog, ContainerObject, HasAttachment, OnNeedBlock, OnNeedAttachment);
+ RemoteProjectStore::Result RemoteResult =
+ SaveOplogContainer(*Oplog, ContainerObject, HasAttachment, OnNeedBlock, OnNeedAttachment, nullptr);
if (RemoteResult.ErrorCode)
{
@@ -2536,12 +2544,7 @@ ProjectStore::Rpc(HttpServerRequest& HttpReq,
}
else if (Method == "export"sv)
{
- std::pair<HttpResponseCode, std::string> Result = Export(*Project.Get(), *Oplog, Cb["params"sv].AsObjectView(), AuthManager);
- if (Result.second.empty())
- {
- HttpReq.WriteResponse(Result.first);
- return Result.first != HttpResponseCode::BadRequest;
- }
+ std::pair<HttpResponseCode, std::string> Result = Export(Project, *Oplog, Cb["params"sv].AsObjectView(), AuthManager);
HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second);
return true;
}
@@ -2748,7 +2751,7 @@ ProjectStore::Rpc(HttpServerRequest& HttpReq,
}
std::pair<HttpResponseCode, std::string>
-ProjectStore::Export(ProjectStore::Project& Project, ProjectStore::Oplog& Oplog, CbObjectView&& Params, AuthMgr& AuthManager)
+ProjectStore::Export(Ref<ProjectStore::Project> Project, ProjectStore::Oplog& Oplog, CbObjectView&& Params, AuthMgr& AuthManager)
{
ZEN_TRACE_CPU("ProjectStore::Export");
@@ -2759,35 +2762,52 @@ ProjectStore::Export(ProjectStore::Project& Project, ProjectStore::Oplog& Oplog,
bool Force = Params["force"sv].AsBool(false);
bool EmbedLooseFile = Params["embedloosefiles"sv].AsBool(false);
- std::pair<std::unique_ptr<RemoteProjectStore>, std::string> RemoteStoreResult =
- CreateRemoteStore(Params, AuthManager, MaxBlockSize, MaxChunkEmbedSize, Oplog.TempPath());
+ CreateRemoteStoreResult RemoteStoreResult = CreateRemoteStore(Params, AuthManager, MaxBlockSize, MaxChunkEmbedSize, Oplog.TempPath());
- if (RemoteStoreResult.first == nullptr)
+ if (RemoteStoreResult.Store == nullptr)
{
- return {HttpResponseCode::BadRequest, RemoteStoreResult.second};
+ return {HttpResponseCode::BadRequest, RemoteStoreResult.Description};
}
- std::unique_ptr<RemoteProjectStore> RemoteStore = std::move(RemoteStoreResult.first);
+ std::shared_ptr<RemoteProjectStore> RemoteStore = std::move(RemoteStoreResult.Store);
RemoteProjectStore::RemoteStoreInfo StoreInfo = RemoteStore->GetInfo();
ZEN_INFO("Saving oplog '{}/{}' to {}, maxblocksize {}, maxchunkembedsize {}",
- Project.Identifier,
+ Project->Identifier,
Oplog.OplogId(),
StoreInfo.Description,
NiceBytes(MaxBlockSize),
NiceBytes(MaxChunkEmbedSize));
- RemoteProjectStore::Result Result = SaveOplog(m_CidStore,
- *RemoteStore,
- Project,
- Oplog,
- MaxBlockSize,
- MaxChunkEmbedSize,
- EmbedLooseFile,
- StoreInfo.CreateBlocks,
- StoreInfo.UseTempBlockFiles,
- Force);
+ JobId JobId = m_JobQueue.QueueJob([this,
+ ActualRemoteStore = std::move(RemoteStore),
+ Project,
+ OplogPtr = &Oplog,
+ MaxBlockSize,
+ MaxChunkEmbedSize,
+ EmbedLooseFile,
+ CreateBlocks = StoreInfo.CreateBlocks,
+ UseTempBlockFiles = StoreInfo.UseTempBlockFiles,
+ Force](JobContext& Context) {
+ RemoteProjectStore::Result Result = SaveOplog(m_CidStore,
+ *ActualRemoteStore,
+ *Project.Get(),
+ *OplogPtr,
+ MaxBlockSize,
+ MaxChunkEmbedSize,
+ EmbedLooseFile,
+ CreateBlocks,
+ UseTempBlockFiles,
+ Force,
+ &Context);
+ auto Response = ConvertResult(Result);
+ ZEN_INFO("SaveOplog: Status: {} '{}'", ToString(Response.first), Response.second);
+ if (!IsHttpSuccessCode(Response.first))
+ {
+ throw std::runtime_error(fmt::format("Export failed. Status '{}'. Reason: '{}'", ToString(Response.first), Response.second));
+ }
+ });
- return ConvertResult(Result);
+ return {HttpResponseCode::Accepted, fmt::format("{}", JobId.Id)};
}
std::pair<HttpResponseCode, std::string>
@@ -2801,19 +2821,29 @@ ProjectStore::Import(ProjectStore::Project& Project, ProjectStore::Oplog& Oplog,
size_t MaxChunkEmbedSize = Params["maxchunkembedsize"sv].AsUInt64(1024u * 1024u);
bool Force = Params["force"sv].AsBool(false);
- std::pair<std::unique_ptr<RemoteProjectStore>, std::string> RemoteStoreResult =
- CreateRemoteStore(Params, AuthManager, MaxBlockSize, MaxChunkEmbedSize, Oplog.TempPath());
+ CreateRemoteStoreResult RemoteStoreResult = CreateRemoteStore(Params, AuthManager, MaxBlockSize, MaxChunkEmbedSize, Oplog.TempPath());
- if (RemoteStoreResult.first == nullptr)
+ if (RemoteStoreResult.Store == nullptr)
{
- return {HttpResponseCode::BadRequest, RemoteStoreResult.second};
+ return {HttpResponseCode::BadRequest, RemoteStoreResult.Description};
}
- std::unique_ptr<RemoteProjectStore> RemoteStore = std::move(RemoteStoreResult.first);
+ std::shared_ptr<RemoteProjectStore> RemoteStore = std::move(RemoteStoreResult.Store);
RemoteProjectStore::RemoteStoreInfo StoreInfo = RemoteStore->GetInfo();
ZEN_INFO("Loading oplog '{}/{}' from {}", Project.Identifier, Oplog.OplogId(), StoreInfo.Description);
- RemoteProjectStore::Result Result = LoadOplog(m_CidStore, *RemoteStore, Oplog, Force);
- return ConvertResult(Result);
+ JobId JobId = m_JobQueue.QueueJob(
+ [this, ActualRemoteStore = std::move(RemoteStore), OplogPtr = &Oplog, MaxBlockSize, MaxChunkEmbedSize, Force](JobContext& Context) {
+ RemoteProjectStore::Result Result = LoadOplog(m_CidStore, *ActualRemoteStore, *OplogPtr, Force, &Context);
+ auto Response = ConvertResult(Result);
+ ZEN_INFO("LoadOplog: Status: {} '{}'", ToString(Response.first), Response.second);
+ if (!IsHttpSuccessCode(Response.first))
+ {
+ throw std::runtime_error(
+ fmt::format("Import failed. Status '{}'. Reason: '{}'", ToString(Response.first), Response.second));
+ }
+ });
+
+ return {HttpResponseCode::Accepted, fmt::format("{}", JobId.Id)};
}
bool
@@ -2909,6 +2939,7 @@ TEST_CASE("project.store.create")
ScopedTemporaryDirectory TempDir;
+ auto JobQueue = MakeJobQueue(1, ""sv);
GcManager Gc;
CidStore CidStore(Gc);
CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096};
@@ -2916,7 +2947,7 @@ TEST_CASE("project.store.create")
std::string_view ProjectName("proj1"sv);
std::filesystem::path BasePath = TempDir.Path() / "projectstore";
- ProjectStore ProjectStore(CidStore, BasePath, Gc);
+ ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue);
std::filesystem::path RootDir = TempDir.Path() / "root";
std::filesystem::path EngineRootDir = TempDir.Path() / "engine";
std::filesystem::path ProjectRootDir = TempDir.Path() / "game";
@@ -2938,13 +2969,14 @@ TEST_CASE("project.store.lifetimes")
ScopedTemporaryDirectory TempDir;
+ auto JobQueue = MakeJobQueue(1, ""sv);
GcManager Gc;
CidStore CidStore(Gc);
CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096};
CidStore.Initialize(CidConfig);
std::filesystem::path BasePath = TempDir.Path() / "projectstore";
- ProjectStore ProjectStore(CidStore, BasePath, Gc);
+ ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue);
std::filesystem::path RootDir = TempDir.Path() / "root";
std::filesystem::path EngineRootDir = TempDir.Path() / "engine";
std::filesystem::path ProjectRootDir = TempDir.Path() / "game";
@@ -2976,13 +3008,14 @@ TEST_CASE("project.store.gc")
ScopedTemporaryDirectory TempDir;
+ auto JobQueue = MakeJobQueue(1, ""sv);
GcManager Gc;
CidStore CidStore(Gc);
CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096};
CidStore.Initialize(CidConfig);
std::filesystem::path BasePath = TempDir.Path() / "projectstore";
- ProjectStore ProjectStore(CidStore, BasePath, Gc);
+ ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue);
std::filesystem::path RootDir = TempDir.Path() / "root";
std::filesystem::path EngineRootDir = TempDir.Path() / "engine";
@@ -3135,13 +3168,14 @@ TEST_CASE("project.store.partial.read")
ScopedTemporaryDirectory TempDir;
+ auto JobQueue = MakeJobQueue(1, ""sv);
GcManager Gc;
CidStore CidStore(Gc);
CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas"sv, .TinyValueThreshold = 1024, .HugeValueThreshold = 4096};
CidStore.Initialize(CidConfig);
std::filesystem::path BasePath = TempDir.Path() / "projectstore"sv;
- ProjectStore ProjectStore(CidStore, BasePath, Gc);
+ ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue);
std::filesystem::path RootDir = TempDir.Path() / "root"sv;
std::filesystem::path EngineRootDir = TempDir.Path() / "engine"sv;
diff --git a/src/zenserver/projectstore/projectstore.h b/src/zenserver/projectstore/projectstore.h
index a2f92fb25..aa84d04ca 100644
--- a/src/zenserver/projectstore/projectstore.h
+++ b/src/zenserver/projectstore/projectstore.h
@@ -20,6 +20,7 @@ class CbPackage;
class CidStore;
class AuthMgr;
class ScrubContext;
+class JobQueue;
enum class HttpResponseCode;
@@ -64,7 +65,7 @@ class ProjectStore : public RefCounted, public GcStorage, public GcContributor
struct OplogStorage;
public:
- ProjectStore(CidStore& Store, std::filesystem::path BasePath, GcManager& Gc);
+ ProjectStore(CidStore& Store, std::filesystem::path BasePath, GcManager& Gc, JobQueue& JobQueue);
~ProjectStore();
struct Project;
@@ -329,10 +330,10 @@ public:
IoBuffer&& Payload,
AuthMgr& AuthManager);
- std::pair<HttpResponseCode, std::string> Export(ProjectStore::Project& Project,
- ProjectStore::Oplog& Oplog,
- CbObjectView&& Params,
- AuthMgr& AuthManager);
+ std::pair<HttpResponseCode, std::string> Export(Ref<ProjectStore::Project> Project,
+ ProjectStore::Oplog& Oplog,
+ CbObjectView&& Params,
+ AuthMgr& AuthManager);
std::pair<HttpResponseCode, std::string> Import(ProjectStore::Project& Project,
ProjectStore::Oplog& Oplog,
@@ -344,6 +345,7 @@ public:
private:
spdlog::logger& m_Log;
CidStore& m_CidStore;
+ JobQueue& m_JobQueue;
std::filesystem::path m_ProjectBasePath;
mutable RwLock m_ProjectsLock;
std::map<std::string, Ref<Project>> m_Projects;
diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp
index 235166659..aca9410a2 100644
--- a/src/zenserver/projectstore/remoteprojectstore.cpp
+++ b/src/zenserver/projectstore/remoteprojectstore.cpp
@@ -71,6 +71,27 @@ private:
std::string m_ErrorText;
};
+void
+ReportProgress(JobContext* OptionalContext, std::string_view CurrentOp, ptrdiff_t Total, ptrdiff_t Remaining)
+{
+ if (OptionalContext)
+ {
+ ZEN_ASSERT(Total > 0);
+ OptionalContext->Queue.ReportProgress(OptionalContext->Id, CurrentOp, gsl::narrow<uint32_t>((100 * (Total - Remaining)) / Total));
+ }
+ ZEN_INFO("{}", CurrentOp);
+}
+
+void
+ReportMessage(JobContext* OptionalContext, std::string_view Message)
+{
+ if (OptionalContext)
+ {
+ OptionalContext->Queue.ReportMessage(OptionalContext->Id, Message);
+ }
+ ZEN_INFO("{}", Message);
+}
+
bool
IterateBlock(IoBuffer&& CompressedBlock, std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor)
{
@@ -201,6 +222,7 @@ BuildContainer(CidStore& ChunkStore,
const std::function<void(const IoHash&)>& OnLargeAttachment,
const std::function<void(const std::unordered_set<IoHash, IoHash::Hasher>)>& OnBlockChunks,
tsl::robin_map<IoHash, IoBuffer, IoHash::Hasher>* OutLooseAttachments,
+ JobContext* OptionalContext,
AsyncRemoteResult& RemoteResult)
{
using namespace std::literals;
@@ -217,8 +239,6 @@ BuildContainer(CidStore& ChunkStore,
std::vector<Block> Blocks;
CompressedBuffer OpsBuffer;
- Latch BlockCreateLatch(1);
-
std::unordered_set<IoHash, IoHash::Hasher> BlockAttachmentHashes;
size_t BlockSize = 0;
@@ -365,11 +385,15 @@ BuildContainer(CidStore& ChunkStore,
CB(RewrittenOp);
};
- ZEN_INFO("Building exported oplog and fetching attachments");
+ ReportMessage(OptionalContext, "Building exported oplog and fetching attachments");
tsl::robin_map<int, std::string> OpLSNToKey;
Oplog.IterateOplogWithKey([&](int LSN, const Oid&, CbObject Op) {
+ if (RemoteResult.IsError())
+ {
+ return;
+ }
std::string_view Key = Op["key"sv].AsString();
OpLSNToKey.insert({LSN, std::string(Key)});
Op.IterateAttachments([&](CbFieldView FieldView) { Attachments.insert({FieldView.AsAttachment(), LSN}); });
@@ -382,12 +406,23 @@ BuildContainer(CidStore& ChunkStore,
SectionOpsWriter << Op;
}
OpCount++;
+ if (OptionalContext && OptionalContext->CancelFlag)
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ }
});
+ if (OptionalContext && OptionalContext->CancelFlag)
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ return {};
+ }
+
if (!Attachments.empty() && !KnownBlocks.empty())
{
+ ReportMessage(OptionalContext, fmt::format("Checking {} known blocks for reuse", KnownBlocks.size()));
+
size_t ReusedBlockCount = 0;
- ZEN_INFO("Checking {} known blocks for reuse", KnownBlocks.size());
for (const Block& KnownBlock : KnownBlocks)
{
size_t BlockAttachmentCount = KnownBlock.ChunksInBlock.size();
@@ -428,10 +463,10 @@ BuildContainer(CidStore& ChunkStore,
ReusePercent);
}
}
- ZEN_INFO("Reusing {} out of {} known blocks", ReusedBlockCount, KnownBlocks.size());
+ ReportMessage(OptionalContext, fmt::format("Reusing {} out of {} known blocks", ReusedBlockCount, KnownBlocks.size()));
}
- ZEN_INFO("Sorting {} attachments from {} ops", Attachments.size(), OpLSNToKey.size());
+ ReportMessage(OptionalContext, fmt::format("Sorting {} attachments from {} ops", Attachments.size(), OpLSNToKey.size()));
// Sort attachments so we get predictable blocks for the same oplog upload
std::vector<IoHash> SortedAttachments;
@@ -456,7 +491,15 @@ BuildContainer(CidStore& ChunkStore,
return LhsKeyIt->second < RhsKeyIt->second;
});
- ZEN_INFO("Assembling {} attachments from {} ops into blocks and loose attachments", SortedAttachments.size(), OpLSNToKey.size());
+ if (OptionalContext && OptionalContext->CancelFlag)
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ return {};
+ }
+ ReportMessage(OptionalContext,
+ fmt::format("Assembling {} attachments from {} ops into blocks and loose attachments",
+ SortedAttachments.size(),
+ OpLSNToKey.size()));
auto GetPayload = [&](const IoHash& AttachmentHash) {
if (OutLooseAttachments != nullptr)
@@ -474,8 +517,20 @@ BuildContainer(CidStore& ChunkStore,
size_t GeneratedBlockCount = 0;
size_t LargeAttachmentCount = 0;
+ Latch BlockCreateLatch(1);
for (const IoHash& AttachmentHash : SortedAttachments)
{
+ if (OptionalContext && OptionalContext->CancelFlag)
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ BlockCreateLatch.CountDown();
+ while (!BlockCreateLatch.Wait(1000))
+ {
+ ZEN_INFO("Aborting, {} blocks remaining...", BlockCreateLatch.Remaining());
+ }
+ return {};
+ }
+
auto It = Attachments.find(AttachmentHash);
ZEN_ASSERT(It != Attachments.end());
IoBuffer Payload = GetPayload(AttachmentHash);
@@ -595,19 +650,67 @@ BuildContainer(CidStore& ChunkStore,
}
SectionOpsWriter.EndArray(); // "ops"
- ZEN_INFO("Assembled {} attachments from {} ops into {} blocks and {} loose attachments",
- SortedAttachments.size(),
- OpLSNToKey.size(),
- GeneratedBlockCount,
- LargeAttachmentCount);
+ if (OptionalContext && OptionalContext->CancelFlag)
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ BlockCreateLatch.CountDown();
+ while (!BlockCreateLatch.Wait(1000))
+ {
+ ZEN_INFO("Aborting, {} blocks remaining...", BlockCreateLatch.Remaining());
+ }
+ return {};
+ }
+ ReportMessage(OptionalContext,
+ fmt::format("Assembled {} attachments from {} ops into {} blocks and {} loose attachments",
+ SortedAttachments.size(),
+ OpLSNToKey.size(),
+ GeneratedBlockCount,
+ LargeAttachmentCount));
CompressedBuffer CompressedOpsSection = CompressedBuffer::Compress(SectionOpsWriter.Save().GetBuffer());
- ZEN_DEBUG("Added oplog section {}, {}", CompressedOpsSection.DecodeRawHash(), NiceBytes(CompressedOpsSection.GetCompressedSize()));
+ if (OptionalContext && OptionalContext->CancelFlag)
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ BlockCreateLatch.CountDown();
+ while (!BlockCreateLatch.Wait(1000))
+ {
+ ptrdiff_t Remaining = BlockCreateLatch.Remaining();
+ ReportProgress(OptionalContext, fmt::format("Aborting, {} blocks remaining...", Remaining), GeneratedBlockCount, Remaining);
+ }
+ if (GeneratedBlockCount > 0)
+ {
+ ReportProgress(OptionalContext, fmt::format("Aborting, {} blocks remaining...", 0), GeneratedBlockCount, 0);
+ }
+ return {};
+ }
+ ReportMessage(OptionalContext,
+ fmt::format("Added oplog section {}, {}",
+ CompressedOpsSection.DecodeRawHash(),
+ NiceBytes(CompressedOpsSection.GetCompressedSize())));
BlockCreateLatch.CountDown();
while (!BlockCreateLatch.Wait(1000))
{
- ZEN_INFO("Creating blocks, {} remaining...", BlockCreateLatch.Remaining());
+ ptrdiff_t Remaining = BlockCreateLatch.Remaining();
+ if (OptionalContext && OptionalContext->CancelFlag)
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ while (!BlockCreateLatch.Wait(1000))
+ {
+ Remaining = BlockCreateLatch.Remaining();
+ ReportProgress(OptionalContext,
+ fmt::format("Aborting, {} blocks remaining...", Remaining),
+ GeneratedBlockCount,
+ Remaining);
+ }
+ ReportProgress(OptionalContext, fmt::format("Creating blocks, {} remaining...", 0), GeneratedBlockCount, 0);
+ return {};
+ }
+ ReportProgress(OptionalContext, fmt::format("Creating blocks, {} remaining...", Remaining), GeneratedBlockCount, Remaining);
+ }
+ if (GeneratedBlockCount > 0)
+ {
+ ReportProgress(OptionalContext, fmt::format("Creating blocks, {} remaining...", 0), GeneratedBlockCount, 0);
}
if (!RemoteResult.IsError())
@@ -703,6 +806,7 @@ BuildContainer(CidStore& ChunkStore,
OnLargeAttachment,
OnBlockChunks,
OutOptionalTempAttachments,
+ nullptr,
RemoteResult);
return RemoteProjectStore::LoadContainerResult{RemoteResult.ConvertResult(), ContainerObject};
}
@@ -717,7 +821,8 @@ SaveOplog(CidStore& ChunkStore,
bool EmbedLooseFiles,
bool BuildBlocks,
bool UseTempBlocks,
- bool ForceUpload)
+ bool ForceUpload,
+ JobContext* OptionalContext)
{
using namespace std::literals;
@@ -831,7 +936,7 @@ SaveOplog(CidStore& ChunkStore,
if (BuildBlocks)
{
- ZEN_INFO("Loading oplog base container");
+ ReportMessage(OptionalContext, "Loading oplog base container");
RemoteProjectStore::LoadContainerResult BaseContainerResult = RemoteStore.LoadBaseContainer();
if (BaseContainerResult.ErrorCode != static_cast<int>(HttpResponseCode::NoContent))
{
@@ -864,6 +969,7 @@ SaveOplog(CidStore& ChunkStore,
KnownBlocks.push_back({.BlockHash = BlockHash, .ChunksInBlock = std::move(ChunksInBlock)});
};
}
+ ReportMessage(OptionalContext, fmt::format("Loading oplog base container in {:.3} s", BaseContainerResult.ElapsedSeconds));
}
}
@@ -880,13 +986,22 @@ SaveOplog(CidStore& ChunkStore,
OnLargeAttachment,
OnBlockChunks,
EmbedLooseFiles ? &TempAttachments : nullptr,
+ OptionalContext,
/* out */ RemoteResult);
if (!RemoteResult.IsError())
{
+ if (OptionalContext && OptionalContext->CancelFlag)
+ {
+ RemoteProjectStore::Result Result = {.ErrorCode = 0,
+ .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500,
+ .Text = "Operation cancelled"};
+ return Result;
+ }
+
uint64_t ChunkCount = OplogContainerObject["chunks"sv].AsArrayView().Num();
uint64_t BlockCount = OplogContainerObject["blocks"sv].AsArrayView().Num();
- ZEN_INFO("Saving oplog container with {} attachments and {} blocks...", ChunkCount, BlockCount);
+ ReportMessage(OptionalContext, fmt::format("Saving oplog container with {} attachments and {} blocks...", ChunkCount, BlockCount));
RemoteProjectStore::SaveResult ContainerSaveResult = RemoteStore.SaveContainer(OplogContainerObject.GetBuffer().AsIoBuffer());
if (ContainerSaveResult.ErrorCode)
@@ -901,7 +1016,16 @@ SaveOplog(CidStore& ChunkStore,
if (!ContainerSaveResult.Needs.empty() || ForceUpload)
{
- ZEN_INFO("Filtering needed attachments...");
+ if (OptionalContext && OptionalContext->CancelFlag)
+ {
+ RemoteProjectStore::Result Result = {.ErrorCode = 0,
+ .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500,
+ .Text = "Operation cancelled"};
+ return Result;
+ }
+
+ ReportMessage(OptionalContext, "Filtering needed attachments...");
+
std::vector<IoHash> NeededLargeAttachments;
std::unordered_set<IoHash, IoHash::Hasher> NeededOtherAttachments;
NeededLargeAttachments.reserve(LargeAttachments.size());
@@ -924,10 +1048,18 @@ SaveOplog(CidStore& ChunkStore,
}
}
- Latch SaveAttachmentsLatch(1);
+ ptrdiff_t AttachmentsToSave(0);
+ Latch SaveAttachmentsLatch(1);
if (!NeededLargeAttachments.empty())
{
- ZEN_INFO("Saving large attachments...");
+ if (OptionalContext && OptionalContext->CancelFlag)
+ {
+ RemoteProjectStore::Result Result = {.ErrorCode = 0,
+ .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500,
+ .Text = "Operation cancelled"};
+ return Result;
+ }
+ ReportMessage(OptionalContext, "Saving large attachments...");
for (const IoHash& RawHash : NeededLargeAttachments)
{
if (RemoteResult.IsError())
@@ -946,6 +1078,7 @@ SaveOplog(CidStore& ChunkStore,
}
SaveAttachmentsLatch.AddCount(1);
+ AttachmentsToSave++;
WorkerPool.ScheduleWork([&ChunkStore,
&RemoteStore,
&SaveAttachmentsLatch,
@@ -994,7 +1127,7 @@ SaveOplog(CidStore& ChunkStore,
if (!CreatedBlocks.empty())
{
- ZEN_INFO("Saving created block attachments...");
+ ReportMessage(OptionalContext, "Saving created block attachments...");
for (auto& It : CreatedBlocks)
{
if (RemoteResult.IsError())
@@ -1007,6 +1140,7 @@ SaveOplog(CidStore& ChunkStore,
IoBuffer Payload = It.second;
ZEN_ASSERT(Payload);
SaveAttachmentsLatch.AddCount(1);
+ AttachmentsToSave++;
WorkerPool.ScheduleWork(
[&ChunkStore, &RemoteStore, &SaveAttachmentsLatch, &RemoteResult, Payload = std::move(Payload), RawHash]() {
auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); });
@@ -1041,7 +1175,7 @@ SaveOplog(CidStore& ChunkStore,
if (!BlockChunks.empty())
{
- ZEN_INFO("Saving chunk block attachments...");
+ ReportMessage(OptionalContext, "Saving chunk block attachments...");
for (const std::vector<IoHash>& Chunks : BlockChunks)
{
if (RemoteResult.IsError())
@@ -1069,6 +1203,7 @@ SaveOplog(CidStore& ChunkStore,
}
}
SaveAttachmentsLatch.AddCount(1);
+ AttachmentsToSave++;
WorkerPool.ScheduleWork([&RemoteStore,
&ChunkStore,
&SaveAttachmentsLatch,
@@ -1111,14 +1246,28 @@ SaveOplog(CidStore& ChunkStore,
SaveAttachmentsLatch.CountDown();
while (!SaveAttachmentsLatch.Wait(1000))
{
- ZEN_INFO("Saving attachments, {} remaining...", SaveAttachmentsLatch.Remaining());
+ ptrdiff_t Remaining = SaveAttachmentsLatch.Remaining();
+ if (OptionalContext && OptionalContext->CancelFlag)
+ {
+ if (!RemoteResult.IsError())
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ }
+ }
+ ReportProgress(OptionalContext,
+ fmt::format("Saving attachments, {} remaining...", Remaining),
+ AttachmentsToSave,
+ Remaining);
+ }
+ if (AttachmentsToSave > 0)
+ {
+ ReportProgress(OptionalContext, fmt::format("Saving attachments, {} remaining...", 0), AttachmentsToSave, 0);
}
- SaveAttachmentsLatch.Wait();
}
if (!RemoteResult.IsError())
{
- ZEN_INFO("Finalizing oplog container...");
+ ReportMessage(OptionalContext, "Finalizing oplog container...");
RemoteProjectStore::Result ContainerFinalizeResult = RemoteStore.FinalizeContainer(ContainerSaveResult.RawHash);
if (ContainerFinalizeResult.ErrorCode)
{
@@ -1145,13 +1294,15 @@ SaveOplogContainer(ProjectStore::Oplog& Oplog,
const CbObject& ContainerObject,
const std::function<bool(const IoHash& RawHash)>& HasAttachment,
const std::function<void(const IoHash& BlockHash, std::vector<IoHash>&& Chunks)>& OnNeedBlock,
- const std::function<void(const IoHash& RawHash)>& OnNeedAttachment)
+ const std::function<void(const IoHash& RawHash)>& OnNeedAttachment,
+ JobContext* OptionalContext)
{
using namespace std::literals;
Stopwatch Timer;
- CbArrayView LargeChunksArray = ContainerObject["chunks"sv].AsArrayView();
+ size_t NeedAttachmentCount = 0;
+ CbArrayView LargeChunksArray = ContainerObject["chunks"sv].AsArrayView();
for (CbFieldView LargeChunksField : LargeChunksArray)
{
IoHash AttachmentHash = LargeChunksField.AsBinaryAttachment();
@@ -1161,8 +1312,10 @@ SaveOplogContainer(ProjectStore::Oplog& Oplog,
}
OnNeedAttachment(AttachmentHash);
};
+ ReportMessage(OptionalContext, fmt::format("Requesting {} of {} large attachements", NeedAttachmentCount, LargeChunksArray.Num()));
- CbArrayView BlocksArray = ContainerObject["blocks"sv].AsArrayView();
+ size_t NeedBlockCount = 0;
+ CbArrayView BlocksArray = ContainerObject["blocks"sv].AsArrayView();
for (CbFieldView BlockField : BlocksArray)
{
CbObjectView BlockView = BlockField.AsObjectView();
@@ -1202,6 +1355,7 @@ SaveOplogContainer(ProjectStore::Oplog& Oplog,
break;
}
};
+ ReportMessage(OptionalContext, fmt::format("Requesting {} of {} attachment blocks", NeedBlockCount, BlocksArray.Num()));
MemoryView OpsSection = ContainerObject["ops"sv].AsBinaryView();
IoBuffer OpsBuffer(IoBuffer::Wrap, OpsSection.GetData(), OpsSection.GetSize());
@@ -1218,6 +1372,7 @@ SaveOplogContainer(ProjectStore::Oplog& Oplog,
}
CbArrayView OpsArray = SectionObject["ops"sv].AsArrayView();
+ ReportMessage(OptionalContext, fmt::format("Writing {} ops to oplog", OpsArray.Num()));
for (CbFieldView OpEntry : OpsArray)
{
CbObjectView Core = OpEntry.AsObjectView();
@@ -1240,7 +1395,11 @@ SaveOplogContainer(ProjectStore::Oplog& Oplog,
}
RemoteProjectStore::Result
-LoadOplog(CidStore& ChunkStore, RemoteProjectStore& RemoteStore, ProjectStore::Oplog& Oplog, bool ForceDownload)
+LoadOplog(CidStore& ChunkStore,
+ RemoteProjectStore& RemoteStore,
+ ProjectStore::Oplog& Oplog,
+ bool ForceDownload,
+ JobContext* OptionalContext)
{
using namespace std::literals;
@@ -1263,20 +1422,23 @@ LoadOplog(CidStore& ChunkStore, RemoteProjectStore& RemoteStore, ProjectStore::O
.Reason = LoadContainerResult.Reason,
.Text = LoadContainerResult.Text};
}
- ZEN_DEBUG("Loaded container in {}", NiceTimeSpanMs(static_cast<uint64_t>(LoadContainerResult.ElapsedSeconds * 1000)));
+ ReportMessage(OptionalContext,
+ fmt::format("Loaded container in {}", NiceTimeSpanMs(static_cast<uint64_t>(LoadContainerResult.ElapsedSeconds * 1000))));
- AsyncRemoteResult RemoteResult;
- Latch AttachmentsWorkLatch(1);
+ AsyncRemoteResult RemoteResult;
+ Latch AttachmentsWorkLatch(1);
+ std::atomic_size_t AttachmentCount = 0;
auto HasAttachment = [&ChunkStore, ForceDownload](const IoHash& RawHash) {
return !ForceDownload && ChunkStore.ContainsChunk(RawHash);
};
- auto OnNeedBlock = [&RemoteStore, &ChunkStore, &WorkerPool, &ChunksInBlocks, &AttachmentsWorkLatch, &RemoteResult](
+ auto OnNeedBlock = [&RemoteStore, &ChunkStore, &WorkerPool, &ChunksInBlocks, &AttachmentsWorkLatch, &AttachmentCount, &RemoteResult](
const IoHash& BlockHash,
std::vector<IoHash>&& Chunks) {
if (BlockHash == IoHash::Zero)
{
AttachmentsWorkLatch.AddCount(1);
+ AttachmentCount.fetch_add(1);
WorkerPool.ScheduleWork([&RemoteStore, &ChunkStore, &AttachmentsWorkLatch, &RemoteResult, Chunks = std::move(Chunks)]() {
auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); });
if (RemoteResult.IsError())
@@ -1305,6 +1467,7 @@ LoadOplog(CidStore& ChunkStore, RemoteProjectStore& RemoteStore, ProjectStore::O
return;
}
AttachmentsWorkLatch.AddCount(1);
+ AttachmentCount.fetch_add(1);
WorkerPool.ScheduleWork([&AttachmentsWorkLatch, &ChunkStore, &RemoteStore, BlockHash, &RemoteResult]() {
auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); });
if (RemoteResult.IsError())
@@ -1339,57 +1502,70 @@ LoadOplog(CidStore& ChunkStore, RemoteProjectStore& RemoteStore, ProjectStore::O
});
};
- auto OnNeedAttachment =
- [&RemoteStore, &ChunkStore, &WorkerPool, &AttachmentsWorkLatch, &RemoteResult, &Attachments](const IoHash& RawHash) {
- if (!Attachments.insert(RawHash).second)
+ auto OnNeedAttachment = [&RemoteStore, &ChunkStore, &WorkerPool, &AttachmentsWorkLatch, &RemoteResult, &Attachments, &AttachmentCount](
+ const IoHash& RawHash) {
+ if (!Attachments.insert(RawHash).second)
+ {
+ return;
+ }
+
+ AttachmentsWorkLatch.AddCount(1);
+ AttachmentCount.fetch_add(1);
+ WorkerPool.ScheduleWork([&RemoteStore, &ChunkStore, &RemoteResult, &AttachmentsWorkLatch, RawHash]() {
+ auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); });
+ if (RemoteResult.IsError())
{
return;
}
-
- AttachmentsWorkLatch.AddCount(1);
- WorkerPool.ScheduleWork([&RemoteStore, &ChunkStore, &RemoteResult, &AttachmentsWorkLatch, RawHash]() {
- auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); });
- if (RemoteResult.IsError())
- {
- return;
- }
- RemoteProjectStore::LoadAttachmentResult AttachmentResult = RemoteStore.LoadAttachment(RawHash);
- if (AttachmentResult.ErrorCode)
- {
- RemoteResult.SetError(AttachmentResult.ErrorCode, AttachmentResult.Reason, AttachmentResult.Text);
- ZEN_ERROR("Failed to download attachment {}, reason: '{}', error code: {}",
- RawHash,
- AttachmentResult.Reason,
- AttachmentResult.ErrorCode);
- return;
- }
- ZEN_DEBUG("Loaded attachment in {}", NiceTimeSpanMs(static_cast<uint64_t>(AttachmentResult.ElapsedSeconds * 1000)));
- ChunkStore.AddChunk(AttachmentResult.Bytes, RawHash);
- });
- };
+ RemoteProjectStore::LoadAttachmentResult AttachmentResult = RemoteStore.LoadAttachment(RawHash);
+ if (AttachmentResult.ErrorCode)
+ {
+ RemoteResult.SetError(AttachmentResult.ErrorCode, AttachmentResult.Reason, AttachmentResult.Text);
+ ZEN_ERROR("Failed to download attachment {}, reason: '{}', error code: {}",
+ RawHash,
+ AttachmentResult.Reason,
+ AttachmentResult.ErrorCode);
+ return;
+ }
+ ZEN_DEBUG("Loaded attachment in {}", NiceTimeSpanMs(static_cast<uint64_t>(AttachmentResult.ElapsedSeconds * 1000)));
+ ChunkStore.AddChunk(AttachmentResult.Bytes, RawHash);
+ });
+ };
RemoteProjectStore::Result Result =
- SaveOplogContainer(Oplog, LoadContainerResult.ContainerObject, HasAttachment, OnNeedBlock, OnNeedAttachment);
+ SaveOplogContainer(Oplog, LoadContainerResult.ContainerObject, HasAttachment, OnNeedBlock, OnNeedAttachment, OptionalContext);
if (!Attachments.empty())
{
- ZEN_INFO("Found {} attachments to download", Attachments.size());
+ ReportMessage(OptionalContext, fmt::format("Found {} attachments to download", Attachments.size()));
}
AttachmentsWorkLatch.CountDown();
while (!AttachmentsWorkLatch.Wait(1000))
{
- ZEN_INFO("Loading attachments, {} remaining...", AttachmentsWorkLatch.Remaining());
+ ptrdiff_t Remaining = AttachmentsWorkLatch.Remaining();
+ if (OptionalContext && OptionalContext->CancelFlag)
+ {
+ if (!RemoteResult.IsError())
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ }
+ }
+ ReportProgress(OptionalContext, fmt::format("Loading attachments, {} remaining...", Remaining), AttachmentCount.load(), Remaining);
+ }
+ if (AttachmentCount.load() > 0)
+ {
+ ReportProgress(OptionalContext, fmt::format("Loading attachments, {} remaining...", 0), AttachmentCount.load(), 0);
}
- AttachmentsWorkLatch.Wait();
if (Result.ErrorCode == 0)
{
Result = RemoteResult.ConvertResult();
}
Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500;
- ZEN_INFO("Loaded oplog {} in {}",
- RemoteResult.GetError() == 0 ? "SUCCESS" : "FAILURE",
- NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0)));
+ ReportMessage(OptionalContext,
+ fmt::format("Loaded oplog {} in {}",
+ RemoteResult.GetError() == 0 ? "SUCCESS" : "FAILURE",
+ NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0))));
return Result;
}
diff --git a/src/zenserver/projectstore/remoteprojectstore.h b/src/zenserver/projectstore/remoteprojectstore.h
index 3134fdb4a..501a5eeec 100644
--- a/src/zenserver/projectstore/remoteprojectstore.h
+++ b/src/zenserver/projectstore/remoteprojectstore.h
@@ -2,6 +2,7 @@
#pragma once
+#include <zencore/jobqueue.h>
#include "projectstore.h"
#include <unordered_set>
@@ -93,11 +94,14 @@ RemoteProjectStore::LoadContainerResult BuildContainer(
tsl::robin_map<IoHash, IoBuffer, IoHash::Hasher>*
OutOptionalTempAttachments); // Set OutOptionalTempAttachments to nullptr to avoid embedding loose "additional files"
+struct JobContext;
+
RemoteProjectStore::Result SaveOplogContainer(ProjectStore::Oplog& Oplog,
const CbObject& ContainerObject,
const std::function<bool(const IoHash& RawHash)>& HasAttachment,
const std::function<void(const IoHash& BlockHash, std::vector<IoHash>&& Chunks)>& OnNeedBlock,
- const std::function<void(const IoHash& RawHash)>& OnNeedAttachment);
+ const std::function<void(const IoHash& RawHash)>& OnNeedAttachment,
+ JobContext* OptionalContext);
RemoteProjectStore::Result SaveOplog(CidStore& ChunkStore,
RemoteProjectStore& RemoteStore,
@@ -108,9 +112,14 @@ RemoteProjectStore::Result SaveOplog(CidStore& ChunkStore,
bool EmbedLooseFiles,
bool BuildBlocks,
bool UseTempBlocks,
- bool ForceUpload);
-
-RemoteProjectStore::Result LoadOplog(CidStore& ChunkStore, RemoteProjectStore& RemoteStore, ProjectStore::Oplog& Oplog, bool ForceDownload);
+ bool ForceUpload,
+ JobContext* OptionalContext);
+
+RemoteProjectStore::Result LoadOplog(CidStore& ChunkStore,
+ RemoteProjectStore& RemoteStore,
+ ProjectStore::Oplog& Oplog,
+ bool ForceDownload,
+ JobContext* OptionalContext);
CompressedBuffer GenerateBlock(std::vector<SharedBuffer>&& Chunks);
bool IterateBlock(IoBuffer&& CompressedBlock, std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor);
diff --git a/src/zenserver/projectstore/zenremoteprojectstore.cpp b/src/zenserver/projectstore/zenremoteprojectstore.cpp
index 72a7f00f8..c25fd2388 100644
--- a/src/zenserver/projectstore/zenremoteprojectstore.cpp
+++ b/src/zenserver/projectstore/zenremoteprojectstore.cpp
@@ -376,7 +376,7 @@ private:
const size_t m_MaxChunkEmbedSize;
};
-std::unique_ptr<RemoteProjectStore>
+std::shared_ptr<RemoteProjectStore>
CreateZenRemoteStore(const ZenRemoteStoreOptions& Options)
{
std::string Url = Options.Url;
@@ -385,8 +385,8 @@ CreateZenRemoteStore(const ZenRemoteStoreOptions& Options)
// Assume http URL
Url = fmt::format("http://{}"sv, Url);
}
- std::unique_ptr<RemoteProjectStore> RemoteStore =
- std::make_unique<ZenRemoteStore>(Url, Options.ProjectId, Options.OplogId, Options.MaxBlockSize, Options.MaxChunkEmbedSize);
+ std::shared_ptr<RemoteProjectStore> RemoteStore =
+ std::make_shared<ZenRemoteStore>(Url, Options.ProjectId, Options.OplogId, Options.MaxBlockSize, Options.MaxChunkEmbedSize);
return RemoteStore;
}
diff --git a/src/zenserver/projectstore/zenremoteprojectstore.h b/src/zenserver/projectstore/zenremoteprojectstore.h
index ef9dcad8c..9f079ee74 100644
--- a/src/zenserver/projectstore/zenremoteprojectstore.h
+++ b/src/zenserver/projectstore/zenremoteprojectstore.h
@@ -13,6 +13,6 @@ struct ZenRemoteStoreOptions : RemoteStoreOptions
std::string OplogId;
};
-std::unique_ptr<RemoteProjectStore> CreateZenRemoteStore(const ZenRemoteStoreOptions& Options);
+std::shared_ptr<RemoteProjectStore> CreateZenRemoteStore(const ZenRemoteStoreOptions& Options);
} // namespace zen
diff --git a/src/zenserver/zenserver.cpp b/src/zenserver/zenserver.cpp
index cfc4a228b..8056b6506 100644
--- a/src/zenserver/zenserver.cpp
+++ b/src/zenserver/zenserver.cpp
@@ -6,6 +6,7 @@
#include <zencore/filesystem.h>
#include <zencore/fmtutils.h>
#include <zencore/iobuffer.h>
+#include <zencore/jobqueue.h>
#include <zencore/logging.h>
#include <zencore/refcount.h>
#include <zencore/scopeguard.h>
@@ -271,6 +272,8 @@ public:
InitializeState(ServerOptions);
+ m_JobQueue = MakeJobQueue(8, "backgroundjobs");
+
m_HealthService.SetHealthInfo({.DataRoot = m_DataRoot,
.AbsLogPath = ServerOptions.AbsLogFile,
.HttpServerClass = std::string(ServerOptions.HttpServerClass),
@@ -341,7 +344,7 @@ public:
ZEN_INFO("instantiating project service");
- m_ProjectStore = new zen::ProjectStore(*m_CidStore, m_DataRoot / "projects", m_GcManager);
+ m_ProjectStore = new zen::ProjectStore(*m_CidStore, m_DataRoot / "projects", m_GcManager, *m_JobQueue);
m_HttpProjectService.reset(new zen::HttpProjectService{*m_CidStore, m_ProjectStore, m_StatsService, *m_AuthMgr});
#if ZEN_WITH_COMPUTE_SERVICES
@@ -365,7 +368,6 @@ public:
}
m_Http->RegisterService(m_TestService); // NOTE: this is intentionally not limited to test mode as it's useful for diagnostics
- m_Http->RegisterService(m_AdminService);
#if ZEN_WITH_TESTS
m_Http->RegisterService(m_TestingService);
@@ -431,6 +433,10 @@ public:
.MinimumFreeDiskSpaceToAllowWrites = ServerOptions.GcConfig.MinimumFreeDiskSpaceToAllowWrites};
m_GcScheduler.Initialize(GcConfig);
+ // Create and register admin interface last to make sure all is properly initialized
+ m_AdminService = std::make_unique<HttpAdminService>(m_GcScheduler, *m_JobQueue);
+ m_Http->RegisterService(*m_AdminService);
+
return EffectiveBasePort;
}
@@ -498,6 +504,11 @@ public:
}
Flush();
+
+ if (m_JobQueue)
+ {
+ m_JobQueue->Stop();
+ }
}
void RequestExit(int ExitCode)
@@ -733,13 +744,14 @@ private:
std::unique_ptr<zen::UpstreamCache> m_UpstreamCache;
std::unique_ptr<zen::HttpUpstreamService> m_UpstreamService;
std::unique_ptr<zen::HttpStructuredCacheService> m_StructuredCacheService;
- zen::HttpAdminService m_AdminService{m_GcScheduler};
zen::HttpHealthService m_HealthService;
#if ZEN_WITH_COMPUTE_SERVICES
std::unique_ptr<zen::HttpFunctionService> m_HttpFunctionService;
#endif // ZEN_WITH_COMPUTE_SERVICES
std::unique_ptr<zen::HttpFrontendService> m_FrontendService;
std::unique_ptr<zen::HttpObjectStoreService> m_ObjStoreService;
+ std::unique_ptr<JobQueue> m_JobQueue;
+ std::unique_ptr<zen::HttpAdminService> m_AdminService;
bool m_DebugOptionForcedCrash = false;
bool m_UseSentry = false;