aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2023-09-13 16:13:30 -0400
committerGitHub <[email protected]>2023-09-13 22:13:30 +0200
commitb2cef5900b6e251bed4bc0a02161fd90646d37f0 (patch)
treee9085a92e9499bca55dfda9b63779be94218409f /src/zenserver
parentscan oplog object for fields (#397) (diff)
downloadzen-b2cef5900b6e251bed4bc0a02161fd90646d37f0.tar.xz
zen-b2cef5900b6e251bed4bc0a02161fd90646d37f0.zip
job queue and async oplog-import/export (#395)
- Feature: New http endpoint for background jobs `/admin/jobs/status` which will return a response listing the currently active background jobs and their status - Feature: New http endpoint for background jobs information `/admin/jobs/status/{jobid}` which will return a response detailing status, pending messages and progress status - GET will return a response detailing status, pending messages and progress status - DELETE will mark the job for cancelling and return without waiting for completion - If status returned is "Complete" or "Aborted" the jobid will be removed from the server and can not be queried again - Feature: New zen command `jobs` to list, get info about and cancel background jobs - If no options are given it will display a list of active background jobs - `--jobid` accepts an id (returned from for example `oplog-export` with `--async`) and will return a response detailing status, pending messages and progress status for that job - `--cancel` can be added when `--jobid` is given which will request zenserver to cancel the background job - Feature: oplog import and export http rpc requests are now async operations that will run in the background - Feature: `oplog-export` and `oplog-import` now reports progress to the console as work progress by default - Feature: `oplog-export` and `oplog-import` can now be cancelled using Ctrl+C - Feature: `oplog-export` and `oplog-import` has a new option `--async` which will only trigger the work and report a background job id back
Diffstat (limited to 'src/zenserver')
-rw-r--r--src/zenserver/admin/admin.cpp140
-rw-r--r--src/zenserver/admin/admin.h4
-rw-r--r--src/zenserver/projectstore/fileremoteprojectstore.cpp4
-rw-r--r--src/zenserver/projectstore/fileremoteprojectstore.h2
-rw-r--r--src/zenserver/projectstore/jupiterremoteprojectstore.cpp9
-rw-r--r--src/zenserver/projectstore/jupiterremoteprojectstore.h2
-rw-r--r--src/zenserver/projectstore/projectstore.cpp120
-rw-r--r--src/zenserver/projectstore/projectstore.h12
-rw-r--r--src/zenserver/projectstore/remoteprojectstore.cpp306
-rw-r--r--src/zenserver/projectstore/remoteprojectstore.h17
-rw-r--r--src/zenserver/projectstore/zenremoteprojectstore.cpp6
-rw-r--r--src/zenserver/projectstore/zenremoteprojectstore.h2
-rw-r--r--src/zenserver/zenserver.cpp18
13 files changed, 507 insertions, 135 deletions
diff --git a/src/zenserver/admin/admin.cpp b/src/zenserver/admin/admin.cpp
index 575a10d83..74131e624 100644
--- a/src/zenserver/admin/admin.cpp
+++ b/src/zenserver/admin/admin.cpp
@@ -3,6 +3,7 @@
#include "admin.h"
#include <zencore/compactbinarybuilder.h>
+#include <zencore/jobqueue.h>
#include <zencore/string.h>
#include <zenstore/gc.h>
@@ -10,7 +11,9 @@
namespace zen {
-HttpAdminService::HttpAdminService(GcScheduler& Scheduler) : m_GcScheduler(Scheduler)
+HttpAdminService::HttpAdminService(GcScheduler& Scheduler, JobQueue& BackgroundJobQueue)
+: m_GcScheduler(Scheduler)
+, m_BackgroundJobQueue(BackgroundJobQueue)
{
using namespace std::literals;
@@ -23,6 +26,141 @@ HttpAdminService::HttpAdminService(GcScheduler& Scheduler) : m_GcScheduler(Sched
},
HttpVerb::kGet);
+ m_Router.AddPattern("jobid", "([[:digit:]]+?)");
+
+ m_Router.RegisterRoute(
+ "jobs",
+ [&](HttpRouterRequest& Req) {
+ std::vector<JobQueue::JobInfo> Jobs = m_BackgroundJobQueue.GetJobs();
+ CbObjectWriter Obj;
+ Obj.BeginArray("jobs");
+ for (const auto& Job : Jobs)
+ {
+ Obj.BeginObject();
+ Obj.AddInteger("Id", Job.Id.Id);
+ Obj.AddString("Status", JobQueue::ToString(Job.Status));
+ Obj.EndObject();
+ }
+ Obj.EndArray();
+ Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Obj.Save());
+ },
+ HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
+ "jobs/{jobid}",
+ [&](HttpRouterRequest& Req) {
+ const auto& JobIdString = Req.GetCapture(1);
+ std::optional<uint64_t> JobIdArg = ParseInt<uint64_t>(JobIdString);
+ if (!JobIdArg)
+ {
+ Req.ServerRequest().WriteResponse(HttpResponseCode::BadRequest);
+ }
+ JobId Id{.Id = JobIdArg.value_or(0)};
+ if (Id.Id == 0)
+ {
+ return Req.ServerRequest().WriteResponse(HttpResponseCode::BadRequest,
+ ZenContentType::kText,
+ fmt::format("Invalid Job Id: {}", Id.Id));
+ }
+
+ std::optional<JobQueue::JobDetails> CurrentState = m_BackgroundJobQueue.Get(Id);
+ if (!CurrentState)
+ {
+ return Req.ServerRequest().WriteResponse(HttpResponseCode::NotFound);
+ }
+
+ auto WriteState = [](CbObjectWriter& Obj, const JobQueue::State& State) {
+ if (!State.CurrentOp.empty())
+ {
+ Obj.AddString("CurrentOp"sv, State.CurrentOp);
+ Obj.AddInteger("CurrentOpPercentComplete"sv, State.CurrentOpPercentComplete);
+ }
+ if (!State.Messages.empty())
+ {
+ Obj.BeginArray("Messages");
+ for (const std::string& Message : State.Messages)
+ {
+ Obj.AddString(Message);
+ }
+ Obj.EndArray();
+ }
+ };
+
+ auto GetAgeAsSeconds = [](std::chrono::system_clock::time_point Start, std::chrono::system_clock::time_point End) {
+ auto Age = End - Start;
+ auto Milliseconds = std::chrono::duration_cast<std::chrono::milliseconds>(Age);
+ return Milliseconds.count() / 1000.0;
+ };
+
+ const std::chrono::system_clock::time_point Now = std::chrono::system_clock::now();
+
+ switch (CurrentState->Status)
+ {
+ case JobQueue::Status::Queued:
+ {
+ CbObjectWriter Obj;
+ Obj.AddString("Status"sv, "Queued"sv);
+ Obj.AddFloat("QueueTimeS", GetAgeAsSeconds(CurrentState->CreateTime, Now));
+ Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Obj.Save());
+ }
+ break;
+ case JobQueue::Status::Running:
+ {
+ CbObjectWriter Obj;
+ Obj.AddString("Status"sv, "Running"sv);
+ WriteState(Obj, CurrentState->State);
+ Obj.AddFloat("QueueTimeS", GetAgeAsSeconds(CurrentState->CreateTime, CurrentState->StartTime));
+ Obj.AddFloat("RunTimeS", GetAgeAsSeconds(CurrentState->StartTime, Now));
+ Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Obj.Save());
+ }
+ break;
+ case JobQueue::Status::Aborted:
+ {
+ CbObjectWriter Obj;
+ Obj.AddString("Status"sv, "Aborted"sv);
+ WriteState(Obj, CurrentState->State);
+ Obj.AddFloat("QueueTimeS", GetAgeAsSeconds(CurrentState->CreateTime, CurrentState->StartTime));
+ Obj.AddFloat("RunTimeS", GetAgeAsSeconds(CurrentState->StartTime, CurrentState->EndTime));
+ Obj.AddFloat("CompleteTimeS", GetAgeAsSeconds(CurrentState->EndTime, Now));
+ Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Obj.Save());
+ }
+ break;
+ case JobQueue::Status::Completed:
+ {
+ CbObjectWriter Obj;
+ Obj.AddString("Status"sv, "Complete"sv);
+ WriteState(Obj, CurrentState->State);
+ Obj.AddFloat("QueueTimeS", GetAgeAsSeconds(CurrentState->CreateTime, CurrentState->StartTime));
+ Obj.AddFloat("RunTimeS", GetAgeAsSeconds(CurrentState->StartTime, CurrentState->EndTime));
+ Obj.AddFloat("CompleteTimeS", GetAgeAsSeconds(CurrentState->EndTime, Now));
+ Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Obj.Save());
+ }
+ break;
+ }
+ },
+ HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
+ "jobs/{jobid}",
+ [&](HttpRouterRequest& Req) {
+ const auto& JobIdString = Req.GetCapture(1);
+ std::optional<uint64_t> JobIdArg = ParseInt<uint64_t>(JobIdString);
+ if (!JobIdArg)
+ {
+ Req.ServerRequest().WriteResponse(HttpResponseCode::BadRequest);
+ }
+ JobId Id{.Id = JobIdArg.value_or(0)};
+ if (m_BackgroundJobQueue.CancelJob(Id))
+ {
+ Req.ServerRequest().WriteResponse(HttpResponseCode::OK);
+ }
+ else
+ {
+ Req.ServerRequest().WriteResponse(HttpResponseCode::NotFound);
+ }
+ },
+ HttpVerb::kDelete);
+
m_Router.RegisterRoute(
"gc",
[this](HttpRouterRequest& Req) {
diff --git a/src/zenserver/admin/admin.h b/src/zenserver/admin/admin.h
index 9463ffbb3..3152f87ab 100644
--- a/src/zenserver/admin/admin.h
+++ b/src/zenserver/admin/admin.h
@@ -8,11 +8,12 @@
namespace zen {
class GcScheduler;
+class JobQueue;
class HttpAdminService : public zen::HttpService
{
public:
- HttpAdminService(GcScheduler& Scheduler);
+ HttpAdminService(GcScheduler& Scheduler, JobQueue& BackgroundJobQueue);
~HttpAdminService();
virtual const char* BaseUri() const override;
@@ -21,6 +22,7 @@ public:
private:
HttpRequestRouter m_Router;
GcScheduler& m_GcScheduler;
+ JobQueue& m_BackgroundJobQueue;
};
} // namespace zen
diff --git a/src/zenserver/projectstore/fileremoteprojectstore.cpp b/src/zenserver/projectstore/fileremoteprojectstore.cpp
index faa748c5d..eeb1f71c4 100644
--- a/src/zenserver/projectstore/fileremoteprojectstore.cpp
+++ b/src/zenserver/projectstore/fileremoteprojectstore.cpp
@@ -244,10 +244,10 @@ private:
bool m_UseTempBlocks = false;
};
-std::unique_ptr<RemoteProjectStore>
+std::shared_ptr<RemoteProjectStore>
CreateFileRemoteStore(const FileRemoteStoreOptions& Options)
{
- std::unique_ptr<RemoteProjectStore> RemoteStore = std::make_unique<LocalExportProjectStore>(Options.Name,
+ std::shared_ptr<RemoteProjectStore> RemoteStore = std::make_shared<LocalExportProjectStore>(Options.Name,
Options.OptionalBaseName,
std::filesystem::path(Options.FolderPath),
Options.ForceDisableBlocks,
diff --git a/src/zenserver/projectstore/fileremoteprojectstore.h b/src/zenserver/projectstore/fileremoteprojectstore.h
index f398bbfbc..8da9692d5 100644
--- a/src/zenserver/projectstore/fileremoteprojectstore.h
+++ b/src/zenserver/projectstore/fileremoteprojectstore.h
@@ -15,6 +15,6 @@ struct FileRemoteStoreOptions : RemoteStoreOptions
bool ForceEnableTempBlocks = false;
};
-std::unique_ptr<RemoteProjectStore> CreateFileRemoteStore(const FileRemoteStoreOptions& Options);
+std::shared_ptr<RemoteProjectStore> CreateFileRemoteStore(const FileRemoteStoreOptions& Options);
} // namespace zen
diff --git a/src/zenserver/projectstore/jupiterremoteprojectstore.cpp b/src/zenserver/projectstore/jupiterremoteprojectstore.cpp
index e1a4a9dd4..e59bac6d6 100644
--- a/src/zenserver/projectstore/jupiterremoteprojectstore.cpp
+++ b/src/zenserver/projectstore/jupiterremoteprojectstore.cpp
@@ -44,13 +44,12 @@ public:
{
return {.CreateBlocks = m_EnableBlocks,
.UseTempBlockFiles = m_UseTempBlocks,
- .Description = fmt::format("[cloud] {} as {}/{}/{}{}{}"sv,
+ .Description = fmt::format("[cloud] {} as {}/{}/{}{}"sv,
m_CloudClient->ServiceUrl(),
m_Namespace,
m_Bucket,
m_Key,
- m_OptionalBaseKey == IoHash::Zero ? "" : " Base: ",
- m_OptionalBaseKey)};
+ m_OptionalBaseKey == IoHash::Zero ? "" : fmt::format(" Base {}", m_OptionalBaseKey))};
}
virtual SaveResult SaveContainer(const IoBuffer& Payload) override
@@ -283,7 +282,7 @@ private:
bool m_UseTempBlocks = true;
};
-std::unique_ptr<RemoteProjectStore>
+std::shared_ptr<RemoteProjectStore>
CreateJupiterRemoteStore(const JupiterRemoteStoreOptions& Options, const std::filesystem::path& TempFilePath)
{
std::string Url = Options.Url;
@@ -319,7 +318,7 @@ CreateJupiterRemoteStore(const JupiterRemoteStoreOptions& Options, const std::fi
Ref<CloudCacheClient> CloudClient(new CloudCacheClient(ClientOptions, std::move(TokenProvider)));
- std::unique_ptr<RemoteProjectStore> RemoteStore = std::make_unique<JupiterRemoteStore>(std::move(CloudClient),
+ std::shared_ptr<RemoteProjectStore> RemoteStore = std::make_shared<JupiterRemoteStore>(std::move(CloudClient),
Options.Namespace,
Options.Bucket,
Options.Key,
diff --git a/src/zenserver/projectstore/jupiterremoteprojectstore.h b/src/zenserver/projectstore/jupiterremoteprojectstore.h
index 4ae6c88cb..27f3d9b73 100644
--- a/src/zenserver/projectstore/jupiterremoteprojectstore.h
+++ b/src/zenserver/projectstore/jupiterremoteprojectstore.h
@@ -23,7 +23,7 @@ struct JupiterRemoteStoreOptions : RemoteStoreOptions
bool AssumeHttp2 = false;
};
-std::unique_ptr<RemoteProjectStore> CreateJupiterRemoteStore(const JupiterRemoteStoreOptions& Options,
+std::shared_ptr<RemoteProjectStore> CreateJupiterRemoteStore(const JupiterRemoteStoreOptions& Options,
const std::filesystem::path& TempFilePath);
} // namespace zen
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp
index 6ec18904e..2fd6d492e 100644
--- a/src/zenserver/projectstore/projectstore.cpp
+++ b/src/zenserver/projectstore/projectstore.cpp
@@ -8,6 +8,7 @@
#include <zencore/compactbinaryvalidation.h>
#include <zencore/filesystem.h>
#include <zencore/fmtutils.h>
+#include <zencore/jobqueue.h>
#include <zencore/logging.h>
#include <zencore/scopeguard.h>
#include <zencore/stream.h>
@@ -72,15 +73,20 @@ namespace {
} while (true);
}
- std::pair<std::unique_ptr<RemoteProjectStore>, std::string> CreateRemoteStore(CbObjectView Params,
- AuthMgr& AuthManager,
- size_t MaxBlockSize,
- size_t MaxChunkEmbedSize,
- const std::filesystem::path& TempFilePath)
+ struct CreateRemoteStoreResult
+ {
+ std::shared_ptr<RemoteProjectStore> Store;
+ std::string Description;
+ };
+ CreateRemoteStoreResult CreateRemoteStore(CbObjectView Params,
+ AuthMgr& AuthManager,
+ size_t MaxBlockSize,
+ size_t MaxChunkEmbedSize,
+ const std::filesystem::path& TempFilePath)
{
using namespace std::literals;
- std::unique_ptr<RemoteProjectStore> RemoteStore;
+ std::shared_ptr<RemoteProjectStore> RemoteStore;
if (CbObjectView File = Params["file"sv].AsObjectView(); File)
{
@@ -1565,11 +1571,12 @@ ProjectStore::Project::TouchOplog(std::string_view Oplog) const
//////////////////////////////////////////////////////////////////////////
-ProjectStore::ProjectStore(CidStore& Store, std::filesystem::path BasePath, GcManager& Gc)
+ProjectStore::ProjectStore(CidStore& Store, std::filesystem::path BasePath, GcManager& Gc, JobQueue& JobQueue)
: GcStorage(Gc)
, GcContributor(Gc)
, m_Log(logging::Get("project"))
, m_CidStore(Store)
+, m_JobQueue(JobQueue)
, m_ProjectBasePath(BasePath)
, m_DiskWriteBlocker(Gc.GetDiskWriteBlocker())
{
@@ -2330,7 +2337,8 @@ ProjectStore::WriteOplog(const std::string_view ProjectId, const std::string_vie
Attachments.insert(RawHash);
};
- RemoteProjectStore::Result RemoteResult = SaveOplogContainer(*Oplog, ContainerObject, HasAttachment, OnNeedBlock, OnNeedAttachment);
+ RemoteProjectStore::Result RemoteResult =
+ SaveOplogContainer(*Oplog, ContainerObject, HasAttachment, OnNeedBlock, OnNeedAttachment, nullptr);
if (RemoteResult.ErrorCode)
{
@@ -2536,12 +2544,7 @@ ProjectStore::Rpc(HttpServerRequest& HttpReq,
}
else if (Method == "export"sv)
{
- std::pair<HttpResponseCode, std::string> Result = Export(*Project.Get(), *Oplog, Cb["params"sv].AsObjectView(), AuthManager);
- if (Result.second.empty())
- {
- HttpReq.WriteResponse(Result.first);
- return Result.first != HttpResponseCode::BadRequest;
- }
+ std::pair<HttpResponseCode, std::string> Result = Export(Project, *Oplog, Cb["params"sv].AsObjectView(), AuthManager);
HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second);
return true;
}
@@ -2748,7 +2751,7 @@ ProjectStore::Rpc(HttpServerRequest& HttpReq,
}
std::pair<HttpResponseCode, std::string>
-ProjectStore::Export(ProjectStore::Project& Project, ProjectStore::Oplog& Oplog, CbObjectView&& Params, AuthMgr& AuthManager)
+ProjectStore::Export(Ref<ProjectStore::Project> Project, ProjectStore::Oplog& Oplog, CbObjectView&& Params, AuthMgr& AuthManager)
{
ZEN_TRACE_CPU("ProjectStore::Export");
@@ -2759,35 +2762,52 @@ ProjectStore::Export(ProjectStore::Project& Project, ProjectStore::Oplog& Oplog,
bool Force = Params["force"sv].AsBool(false);
bool EmbedLooseFile = Params["embedloosefiles"sv].AsBool(false);
- std::pair<std::unique_ptr<RemoteProjectStore>, std::string> RemoteStoreResult =
- CreateRemoteStore(Params, AuthManager, MaxBlockSize, MaxChunkEmbedSize, Oplog.TempPath());
+ CreateRemoteStoreResult RemoteStoreResult = CreateRemoteStore(Params, AuthManager, MaxBlockSize, MaxChunkEmbedSize, Oplog.TempPath());
- if (RemoteStoreResult.first == nullptr)
+ if (RemoteStoreResult.Store == nullptr)
{
- return {HttpResponseCode::BadRequest, RemoteStoreResult.second};
+ return {HttpResponseCode::BadRequest, RemoteStoreResult.Description};
}
- std::unique_ptr<RemoteProjectStore> RemoteStore = std::move(RemoteStoreResult.first);
+ std::shared_ptr<RemoteProjectStore> RemoteStore = std::move(RemoteStoreResult.Store);
RemoteProjectStore::RemoteStoreInfo StoreInfo = RemoteStore->GetInfo();
ZEN_INFO("Saving oplog '{}/{}' to {}, maxblocksize {}, maxchunkembedsize {}",
- Project.Identifier,
+ Project->Identifier,
Oplog.OplogId(),
StoreInfo.Description,
NiceBytes(MaxBlockSize),
NiceBytes(MaxChunkEmbedSize));
- RemoteProjectStore::Result Result = SaveOplog(m_CidStore,
- *RemoteStore,
- Project,
- Oplog,
- MaxBlockSize,
- MaxChunkEmbedSize,
- EmbedLooseFile,
- StoreInfo.CreateBlocks,
- StoreInfo.UseTempBlockFiles,
- Force);
+ JobId JobId = m_JobQueue.QueueJob([this,
+ ActualRemoteStore = std::move(RemoteStore),
+ Project,
+ OplogPtr = &Oplog,
+ MaxBlockSize,
+ MaxChunkEmbedSize,
+ EmbedLooseFile,
+ CreateBlocks = StoreInfo.CreateBlocks,
+ UseTempBlockFiles = StoreInfo.UseTempBlockFiles,
+ Force](JobContext& Context) {
+ RemoteProjectStore::Result Result = SaveOplog(m_CidStore,
+ *ActualRemoteStore,
+ *Project.Get(),
+ *OplogPtr,
+ MaxBlockSize,
+ MaxChunkEmbedSize,
+ EmbedLooseFile,
+ CreateBlocks,
+ UseTempBlockFiles,
+ Force,
+ &Context);
+ auto Response = ConvertResult(Result);
+ ZEN_INFO("SaveOplog: Status: {} '{}'", ToString(Response.first), Response.second);
+ if (!IsHttpSuccessCode(Response.first))
+ {
+ throw std::runtime_error(fmt::format("Export failed. Status '{}'. Reason: '{}'", ToString(Response.first), Response.second));
+ }
+ });
- return ConvertResult(Result);
+ return {HttpResponseCode::Accepted, fmt::format("{}", JobId.Id)};
}
std::pair<HttpResponseCode, std::string>
@@ -2801,19 +2821,29 @@ ProjectStore::Import(ProjectStore::Project& Project, ProjectStore::Oplog& Oplog,
size_t MaxChunkEmbedSize = Params["maxchunkembedsize"sv].AsUInt64(1024u * 1024u);
bool Force = Params["force"sv].AsBool(false);
- std::pair<std::unique_ptr<RemoteProjectStore>, std::string> RemoteStoreResult =
- CreateRemoteStore(Params, AuthManager, MaxBlockSize, MaxChunkEmbedSize, Oplog.TempPath());
+ CreateRemoteStoreResult RemoteStoreResult = CreateRemoteStore(Params, AuthManager, MaxBlockSize, MaxChunkEmbedSize, Oplog.TempPath());
- if (RemoteStoreResult.first == nullptr)
+ if (RemoteStoreResult.Store == nullptr)
{
- return {HttpResponseCode::BadRequest, RemoteStoreResult.second};
+ return {HttpResponseCode::BadRequest, RemoteStoreResult.Description};
}
- std::unique_ptr<RemoteProjectStore> RemoteStore = std::move(RemoteStoreResult.first);
+ std::shared_ptr<RemoteProjectStore> RemoteStore = std::move(RemoteStoreResult.Store);
RemoteProjectStore::RemoteStoreInfo StoreInfo = RemoteStore->GetInfo();
ZEN_INFO("Loading oplog '{}/{}' from {}", Project.Identifier, Oplog.OplogId(), StoreInfo.Description);
- RemoteProjectStore::Result Result = LoadOplog(m_CidStore, *RemoteStore, Oplog, Force);
- return ConvertResult(Result);
+ JobId JobId = m_JobQueue.QueueJob(
+ [this, ActualRemoteStore = std::move(RemoteStore), OplogPtr = &Oplog, MaxBlockSize, MaxChunkEmbedSize, Force](JobContext& Context) {
+ RemoteProjectStore::Result Result = LoadOplog(m_CidStore, *ActualRemoteStore, *OplogPtr, Force, &Context);
+ auto Response = ConvertResult(Result);
+ ZEN_INFO("LoadOplog: Status: {} '{}'", ToString(Response.first), Response.second);
+ if (!IsHttpSuccessCode(Response.first))
+ {
+ throw std::runtime_error(
+ fmt::format("Import failed. Status '{}'. Reason: '{}'", ToString(Response.first), Response.second));
+ }
+ });
+
+ return {HttpResponseCode::Accepted, fmt::format("{}", JobId.Id)};
}
bool
@@ -2909,6 +2939,7 @@ TEST_CASE("project.store.create")
ScopedTemporaryDirectory TempDir;
+ auto JobQueue = MakeJobQueue(1, ""sv);
GcManager Gc;
CidStore CidStore(Gc);
CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096};
@@ -2916,7 +2947,7 @@ TEST_CASE("project.store.create")
std::string_view ProjectName("proj1"sv);
std::filesystem::path BasePath = TempDir.Path() / "projectstore";
- ProjectStore ProjectStore(CidStore, BasePath, Gc);
+ ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue);
std::filesystem::path RootDir = TempDir.Path() / "root";
std::filesystem::path EngineRootDir = TempDir.Path() / "engine";
std::filesystem::path ProjectRootDir = TempDir.Path() / "game";
@@ -2938,13 +2969,14 @@ TEST_CASE("project.store.lifetimes")
ScopedTemporaryDirectory TempDir;
+ auto JobQueue = MakeJobQueue(1, ""sv);
GcManager Gc;
CidStore CidStore(Gc);
CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096};
CidStore.Initialize(CidConfig);
std::filesystem::path BasePath = TempDir.Path() / "projectstore";
- ProjectStore ProjectStore(CidStore, BasePath, Gc);
+ ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue);
std::filesystem::path RootDir = TempDir.Path() / "root";
std::filesystem::path EngineRootDir = TempDir.Path() / "engine";
std::filesystem::path ProjectRootDir = TempDir.Path() / "game";
@@ -2976,13 +3008,14 @@ TEST_CASE("project.store.gc")
ScopedTemporaryDirectory TempDir;
+ auto JobQueue = MakeJobQueue(1, ""sv);
GcManager Gc;
CidStore CidStore(Gc);
CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096};
CidStore.Initialize(CidConfig);
std::filesystem::path BasePath = TempDir.Path() / "projectstore";
- ProjectStore ProjectStore(CidStore, BasePath, Gc);
+ ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue);
std::filesystem::path RootDir = TempDir.Path() / "root";
std::filesystem::path EngineRootDir = TempDir.Path() / "engine";
@@ -3135,13 +3168,14 @@ TEST_CASE("project.store.partial.read")
ScopedTemporaryDirectory TempDir;
+ auto JobQueue = MakeJobQueue(1, ""sv);
GcManager Gc;
CidStore CidStore(Gc);
CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas"sv, .TinyValueThreshold = 1024, .HugeValueThreshold = 4096};
CidStore.Initialize(CidConfig);
std::filesystem::path BasePath = TempDir.Path() / "projectstore"sv;
- ProjectStore ProjectStore(CidStore, BasePath, Gc);
+ ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue);
std::filesystem::path RootDir = TempDir.Path() / "root"sv;
std::filesystem::path EngineRootDir = TempDir.Path() / "engine"sv;
diff --git a/src/zenserver/projectstore/projectstore.h b/src/zenserver/projectstore/projectstore.h
index a2f92fb25..aa84d04ca 100644
--- a/src/zenserver/projectstore/projectstore.h
+++ b/src/zenserver/projectstore/projectstore.h
@@ -20,6 +20,7 @@ class CbPackage;
class CidStore;
class AuthMgr;
class ScrubContext;
+class JobQueue;
enum class HttpResponseCode;
@@ -64,7 +65,7 @@ class ProjectStore : public RefCounted, public GcStorage, public GcContributor
struct OplogStorage;
public:
- ProjectStore(CidStore& Store, std::filesystem::path BasePath, GcManager& Gc);
+ ProjectStore(CidStore& Store, std::filesystem::path BasePath, GcManager& Gc, JobQueue& JobQueue);
~ProjectStore();
struct Project;
@@ -329,10 +330,10 @@ public:
IoBuffer&& Payload,
AuthMgr& AuthManager);
- std::pair<HttpResponseCode, std::string> Export(ProjectStore::Project& Project,
- ProjectStore::Oplog& Oplog,
- CbObjectView&& Params,
- AuthMgr& AuthManager);
+ std::pair<HttpResponseCode, std::string> Export(Ref<ProjectStore::Project> Project,
+ ProjectStore::Oplog& Oplog,
+ CbObjectView&& Params,
+ AuthMgr& AuthManager);
std::pair<HttpResponseCode, std::string> Import(ProjectStore::Project& Project,
ProjectStore::Oplog& Oplog,
@@ -344,6 +345,7 @@ public:
private:
spdlog::logger& m_Log;
CidStore& m_CidStore;
+ JobQueue& m_JobQueue;
std::filesystem::path m_ProjectBasePath;
mutable RwLock m_ProjectsLock;
std::map<std::string, Ref<Project>> m_Projects;
diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp
index 235166659..aca9410a2 100644
--- a/src/zenserver/projectstore/remoteprojectstore.cpp
+++ b/src/zenserver/projectstore/remoteprojectstore.cpp
@@ -71,6 +71,27 @@ private:
std::string m_ErrorText;
};
+void
+ReportProgress(JobContext* OptionalContext, std::string_view CurrentOp, ptrdiff_t Total, ptrdiff_t Remaining)
+{
+ if (OptionalContext)
+ {
+ ZEN_ASSERT(Total > 0);
+ OptionalContext->Queue.ReportProgress(OptionalContext->Id, CurrentOp, gsl::narrow<uint32_t>((100 * (Total - Remaining)) / Total));
+ }
+ ZEN_INFO("{}", CurrentOp);
+}
+
+void
+ReportMessage(JobContext* OptionalContext, std::string_view Message)
+{
+ if (OptionalContext)
+ {
+ OptionalContext->Queue.ReportMessage(OptionalContext->Id, Message);
+ }
+ ZEN_INFO("{}", Message);
+}
+
bool
IterateBlock(IoBuffer&& CompressedBlock, std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor)
{
@@ -201,6 +222,7 @@ BuildContainer(CidStore& ChunkStore,
const std::function<void(const IoHash&)>& OnLargeAttachment,
const std::function<void(const std::unordered_set<IoHash, IoHash::Hasher>)>& OnBlockChunks,
tsl::robin_map<IoHash, IoBuffer, IoHash::Hasher>* OutLooseAttachments,
+ JobContext* OptionalContext,
AsyncRemoteResult& RemoteResult)
{
using namespace std::literals;
@@ -217,8 +239,6 @@ BuildContainer(CidStore& ChunkStore,
std::vector<Block> Blocks;
CompressedBuffer OpsBuffer;
- Latch BlockCreateLatch(1);
-
std::unordered_set<IoHash, IoHash::Hasher> BlockAttachmentHashes;
size_t BlockSize = 0;
@@ -365,11 +385,15 @@ BuildContainer(CidStore& ChunkStore,
CB(RewrittenOp);
};
- ZEN_INFO("Building exported oplog and fetching attachments");
+ ReportMessage(OptionalContext, "Building exported oplog and fetching attachments");
tsl::robin_map<int, std::string> OpLSNToKey;
Oplog.IterateOplogWithKey([&](int LSN, const Oid&, CbObject Op) {
+ if (RemoteResult.IsError())
+ {
+ return;
+ }
std::string_view Key = Op["key"sv].AsString();
OpLSNToKey.insert({LSN, std::string(Key)});
Op.IterateAttachments([&](CbFieldView FieldView) { Attachments.insert({FieldView.AsAttachment(), LSN}); });
@@ -382,12 +406,23 @@ BuildContainer(CidStore& ChunkStore,
SectionOpsWriter << Op;
}
OpCount++;
+ if (OptionalContext && OptionalContext->CancelFlag)
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ }
});
+ if (OptionalContext && OptionalContext->CancelFlag)
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ return {};
+ }
+
if (!Attachments.empty() && !KnownBlocks.empty())
{
+ ReportMessage(OptionalContext, fmt::format("Checking {} known blocks for reuse", KnownBlocks.size()));
+
size_t ReusedBlockCount = 0;
- ZEN_INFO("Checking {} known blocks for reuse", KnownBlocks.size());
for (const Block& KnownBlock : KnownBlocks)
{
size_t BlockAttachmentCount = KnownBlock.ChunksInBlock.size();
@@ -428,10 +463,10 @@ BuildContainer(CidStore& ChunkStore,
ReusePercent);
}
}
- ZEN_INFO("Reusing {} out of {} known blocks", ReusedBlockCount, KnownBlocks.size());
+ ReportMessage(OptionalContext, fmt::format("Reusing {} out of {} known blocks", ReusedBlockCount, KnownBlocks.size()));
}
- ZEN_INFO("Sorting {} attachments from {} ops", Attachments.size(), OpLSNToKey.size());
+ ReportMessage(OptionalContext, fmt::format("Sorting {} attachments from {} ops", Attachments.size(), OpLSNToKey.size()));
// Sort attachments so we get predictable blocks for the same oplog upload
std::vector<IoHash> SortedAttachments;
@@ -456,7 +491,15 @@ BuildContainer(CidStore& ChunkStore,
return LhsKeyIt->second < RhsKeyIt->second;
});
- ZEN_INFO("Assembling {} attachments from {} ops into blocks and loose attachments", SortedAttachments.size(), OpLSNToKey.size());
+ if (OptionalContext && OptionalContext->CancelFlag)
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ return {};
+ }
+ ReportMessage(OptionalContext,
+ fmt::format("Assembling {} attachments from {} ops into blocks and loose attachments",
+ SortedAttachments.size(),
+ OpLSNToKey.size()));
auto GetPayload = [&](const IoHash& AttachmentHash) {
if (OutLooseAttachments != nullptr)
@@ -474,8 +517,20 @@ BuildContainer(CidStore& ChunkStore,
size_t GeneratedBlockCount = 0;
size_t LargeAttachmentCount = 0;
+ Latch BlockCreateLatch(1);
for (const IoHash& AttachmentHash : SortedAttachments)
{
+ if (OptionalContext && OptionalContext->CancelFlag)
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ BlockCreateLatch.CountDown();
+ while (!BlockCreateLatch.Wait(1000))
+ {
+ ZEN_INFO("Aborting, {} blocks remaining...", BlockCreateLatch.Remaining());
+ }
+ return {};
+ }
+
auto It = Attachments.find(AttachmentHash);
ZEN_ASSERT(It != Attachments.end());
IoBuffer Payload = GetPayload(AttachmentHash);
@@ -595,19 +650,67 @@ BuildContainer(CidStore& ChunkStore,
}
SectionOpsWriter.EndArray(); // "ops"
- ZEN_INFO("Assembled {} attachments from {} ops into {} blocks and {} loose attachments",
- SortedAttachments.size(),
- OpLSNToKey.size(),
- GeneratedBlockCount,
- LargeAttachmentCount);
+ if (OptionalContext && OptionalContext->CancelFlag)
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ BlockCreateLatch.CountDown();
+ while (!BlockCreateLatch.Wait(1000))
+ {
+ ZEN_INFO("Aborting, {} blocks remaining...", BlockCreateLatch.Remaining());
+ }
+ return {};
+ }
+ ReportMessage(OptionalContext,
+ fmt::format("Assembled {} attachments from {} ops into {} blocks and {} loose attachments",
+ SortedAttachments.size(),
+ OpLSNToKey.size(),
+ GeneratedBlockCount,
+ LargeAttachmentCount));
CompressedBuffer CompressedOpsSection = CompressedBuffer::Compress(SectionOpsWriter.Save().GetBuffer());
- ZEN_DEBUG("Added oplog section {}, {}", CompressedOpsSection.DecodeRawHash(), NiceBytes(CompressedOpsSection.GetCompressedSize()));
+ if (OptionalContext && OptionalContext->CancelFlag)
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ BlockCreateLatch.CountDown();
+ while (!BlockCreateLatch.Wait(1000))
+ {
+ ptrdiff_t Remaining = BlockCreateLatch.Remaining();
+ ReportProgress(OptionalContext, fmt::format("Aborting, {} blocks remaining...", Remaining), GeneratedBlockCount, Remaining);
+ }
+ if (GeneratedBlockCount > 0)
+ {
+ ReportProgress(OptionalContext, fmt::format("Aborting, {} blocks remaining...", 0), GeneratedBlockCount, 0);
+ }
+ return {};
+ }
+ ReportMessage(OptionalContext,
+ fmt::format("Added oplog section {}, {}",
+ CompressedOpsSection.DecodeRawHash(),
+ NiceBytes(CompressedOpsSection.GetCompressedSize())));
BlockCreateLatch.CountDown();
while (!BlockCreateLatch.Wait(1000))
{
- ZEN_INFO("Creating blocks, {} remaining...", BlockCreateLatch.Remaining());
+ ptrdiff_t Remaining = BlockCreateLatch.Remaining();
+ if (OptionalContext && OptionalContext->CancelFlag)
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ while (!BlockCreateLatch.Wait(1000))
+ {
+ Remaining = BlockCreateLatch.Remaining();
+ ReportProgress(OptionalContext,
+ fmt::format("Aborting, {} blocks remaining...", Remaining),
+ GeneratedBlockCount,
+ Remaining);
+ }
+ ReportProgress(OptionalContext, fmt::format("Creating blocks, {} remaining...", 0), GeneratedBlockCount, 0);
+ return {};
+ }
+ ReportProgress(OptionalContext, fmt::format("Creating blocks, {} remaining...", Remaining), GeneratedBlockCount, Remaining);
+ }
+ if (GeneratedBlockCount > 0)
+ {
+ ReportProgress(OptionalContext, fmt::format("Creating blocks, {} remaining...", 0), GeneratedBlockCount, 0);
}
if (!RemoteResult.IsError())
@@ -703,6 +806,7 @@ BuildContainer(CidStore& ChunkStore,
OnLargeAttachment,
OnBlockChunks,
OutOptionalTempAttachments,
+ nullptr,
RemoteResult);
return RemoteProjectStore::LoadContainerResult{RemoteResult.ConvertResult(), ContainerObject};
}
@@ -717,7 +821,8 @@ SaveOplog(CidStore& ChunkStore,
bool EmbedLooseFiles,
bool BuildBlocks,
bool UseTempBlocks,
- bool ForceUpload)
+ bool ForceUpload,
+ JobContext* OptionalContext)
{
using namespace std::literals;
@@ -831,7 +936,7 @@ SaveOplog(CidStore& ChunkStore,
if (BuildBlocks)
{
- ZEN_INFO("Loading oplog base container");
+ ReportMessage(OptionalContext, "Loading oplog base container");
RemoteProjectStore::LoadContainerResult BaseContainerResult = RemoteStore.LoadBaseContainer();
if (BaseContainerResult.ErrorCode != static_cast<int>(HttpResponseCode::NoContent))
{
@@ -864,6 +969,7 @@ SaveOplog(CidStore& ChunkStore,
KnownBlocks.push_back({.BlockHash = BlockHash, .ChunksInBlock = std::move(ChunksInBlock)});
};
}
+ ReportMessage(OptionalContext, fmt::format("Loading oplog base container in {:.3} s", BaseContainerResult.ElapsedSeconds));
}
}
@@ -880,13 +986,22 @@ SaveOplog(CidStore& ChunkStore,
OnLargeAttachment,
OnBlockChunks,
EmbedLooseFiles ? &TempAttachments : nullptr,
+ OptionalContext,
/* out */ RemoteResult);
if (!RemoteResult.IsError())
{
+ if (OptionalContext && OptionalContext->CancelFlag)
+ {
+ RemoteProjectStore::Result Result = {.ErrorCode = 0,
+ .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500,
+ .Text = "Operation cancelled"};
+ return Result;
+ }
+
uint64_t ChunkCount = OplogContainerObject["chunks"sv].AsArrayView().Num();
uint64_t BlockCount = OplogContainerObject["blocks"sv].AsArrayView().Num();
- ZEN_INFO("Saving oplog container with {} attachments and {} blocks...", ChunkCount, BlockCount);
+ ReportMessage(OptionalContext, fmt::format("Saving oplog container with {} attachments and {} blocks...", ChunkCount, BlockCount));
RemoteProjectStore::SaveResult ContainerSaveResult = RemoteStore.SaveContainer(OplogContainerObject.GetBuffer().AsIoBuffer());
if (ContainerSaveResult.ErrorCode)
@@ -901,7 +1016,16 @@ SaveOplog(CidStore& ChunkStore,
if (!ContainerSaveResult.Needs.empty() || ForceUpload)
{
- ZEN_INFO("Filtering needed attachments...");
+ if (OptionalContext && OptionalContext->CancelFlag)
+ {
+ RemoteProjectStore::Result Result = {.ErrorCode = 0,
+ .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500,
+ .Text = "Operation cancelled"};
+ return Result;
+ }
+
+ ReportMessage(OptionalContext, "Filtering needed attachments...");
+
std::vector<IoHash> NeededLargeAttachments;
std::unordered_set<IoHash, IoHash::Hasher> NeededOtherAttachments;
NeededLargeAttachments.reserve(LargeAttachments.size());
@@ -924,10 +1048,18 @@ SaveOplog(CidStore& ChunkStore,
}
}
- Latch SaveAttachmentsLatch(1);
+ ptrdiff_t AttachmentsToSave(0);
+ Latch SaveAttachmentsLatch(1);
if (!NeededLargeAttachments.empty())
{
- ZEN_INFO("Saving large attachments...");
+ if (OptionalContext && OptionalContext->CancelFlag)
+ {
+ RemoteProjectStore::Result Result = {.ErrorCode = 0,
+ .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500,
+ .Text = "Operation cancelled"};
+ return Result;
+ }
+ ReportMessage(OptionalContext, "Saving large attachments...");
for (const IoHash& RawHash : NeededLargeAttachments)
{
if (RemoteResult.IsError())
@@ -946,6 +1078,7 @@ SaveOplog(CidStore& ChunkStore,
}
SaveAttachmentsLatch.AddCount(1);
+ AttachmentsToSave++;
WorkerPool.ScheduleWork([&ChunkStore,
&RemoteStore,
&SaveAttachmentsLatch,
@@ -994,7 +1127,7 @@ SaveOplog(CidStore& ChunkStore,
if (!CreatedBlocks.empty())
{
- ZEN_INFO("Saving created block attachments...");
+ ReportMessage(OptionalContext, "Saving created block attachments...");
for (auto& It : CreatedBlocks)
{
if (RemoteResult.IsError())
@@ -1007,6 +1140,7 @@ SaveOplog(CidStore& ChunkStore,
IoBuffer Payload = It.second;
ZEN_ASSERT(Payload);
SaveAttachmentsLatch.AddCount(1);
+ AttachmentsToSave++;
WorkerPool.ScheduleWork(
[&ChunkStore, &RemoteStore, &SaveAttachmentsLatch, &RemoteResult, Payload = std::move(Payload), RawHash]() {
auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); });
@@ -1041,7 +1175,7 @@ SaveOplog(CidStore& ChunkStore,
if (!BlockChunks.empty())
{
- ZEN_INFO("Saving chunk block attachments...");
+ ReportMessage(OptionalContext, "Saving chunk block attachments...");
for (const std::vector<IoHash>& Chunks : BlockChunks)
{
if (RemoteResult.IsError())
@@ -1069,6 +1203,7 @@ SaveOplog(CidStore& ChunkStore,
}
}
SaveAttachmentsLatch.AddCount(1);
+ AttachmentsToSave++;
WorkerPool.ScheduleWork([&RemoteStore,
&ChunkStore,
&SaveAttachmentsLatch,
@@ -1111,14 +1246,28 @@ SaveOplog(CidStore& ChunkStore,
SaveAttachmentsLatch.CountDown();
while (!SaveAttachmentsLatch.Wait(1000))
{
- ZEN_INFO("Saving attachments, {} remaining...", SaveAttachmentsLatch.Remaining());
+ ptrdiff_t Remaining = SaveAttachmentsLatch.Remaining();
+ if (OptionalContext && OptionalContext->CancelFlag)
+ {
+ if (!RemoteResult.IsError())
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ }
+ }
+ ReportProgress(OptionalContext,
+ fmt::format("Saving attachments, {} remaining...", Remaining),
+ AttachmentsToSave,
+ Remaining);
+ }
+ if (AttachmentsToSave > 0)
+ {
+ ReportProgress(OptionalContext, fmt::format("Saving attachments, {} remaining...", 0), AttachmentsToSave, 0);
}
- SaveAttachmentsLatch.Wait();
}
if (!RemoteResult.IsError())
{
- ZEN_INFO("Finalizing oplog container...");
+ ReportMessage(OptionalContext, "Finalizing oplog container...");
RemoteProjectStore::Result ContainerFinalizeResult = RemoteStore.FinalizeContainer(ContainerSaveResult.RawHash);
if (ContainerFinalizeResult.ErrorCode)
{
@@ -1145,13 +1294,15 @@ SaveOplogContainer(ProjectStore::Oplog& Oplog,
const CbObject& ContainerObject,
const std::function<bool(const IoHash& RawHash)>& HasAttachment,
const std::function<void(const IoHash& BlockHash, std::vector<IoHash>&& Chunks)>& OnNeedBlock,
- const std::function<void(const IoHash& RawHash)>& OnNeedAttachment)
+ const std::function<void(const IoHash& RawHash)>& OnNeedAttachment,
+ JobContext* OptionalContext)
{
using namespace std::literals;
Stopwatch Timer;
- CbArrayView LargeChunksArray = ContainerObject["chunks"sv].AsArrayView();
+ size_t NeedAttachmentCount = 0;
+ CbArrayView LargeChunksArray = ContainerObject["chunks"sv].AsArrayView();
for (CbFieldView LargeChunksField : LargeChunksArray)
{
IoHash AttachmentHash = LargeChunksField.AsBinaryAttachment();
@@ -1161,8 +1312,10 @@ SaveOplogContainer(ProjectStore::Oplog& Oplog,
}
OnNeedAttachment(AttachmentHash);
};
+ ReportMessage(OptionalContext, fmt::format("Requesting {} of {} large attachements", NeedAttachmentCount, LargeChunksArray.Num()));
- CbArrayView BlocksArray = ContainerObject["blocks"sv].AsArrayView();
+ size_t NeedBlockCount = 0;
+ CbArrayView BlocksArray = ContainerObject["blocks"sv].AsArrayView();
for (CbFieldView BlockField : BlocksArray)
{
CbObjectView BlockView = BlockField.AsObjectView();
@@ -1202,6 +1355,7 @@ SaveOplogContainer(ProjectStore::Oplog& Oplog,
break;
}
};
+ ReportMessage(OptionalContext, fmt::format("Requesting {} of {} attachment blocks", NeedBlockCount, BlocksArray.Num()));
MemoryView OpsSection = ContainerObject["ops"sv].AsBinaryView();
IoBuffer OpsBuffer(IoBuffer::Wrap, OpsSection.GetData(), OpsSection.GetSize());
@@ -1218,6 +1372,7 @@ SaveOplogContainer(ProjectStore::Oplog& Oplog,
}
CbArrayView OpsArray = SectionObject["ops"sv].AsArrayView();
+ ReportMessage(OptionalContext, fmt::format("Writing {} ops to oplog", OpsArray.Num()));
for (CbFieldView OpEntry : OpsArray)
{
CbObjectView Core = OpEntry.AsObjectView();
@@ -1240,7 +1395,11 @@ SaveOplogContainer(ProjectStore::Oplog& Oplog,
}
RemoteProjectStore::Result
-LoadOplog(CidStore& ChunkStore, RemoteProjectStore& RemoteStore, ProjectStore::Oplog& Oplog, bool ForceDownload)
+LoadOplog(CidStore& ChunkStore,
+ RemoteProjectStore& RemoteStore,
+ ProjectStore::Oplog& Oplog,
+ bool ForceDownload,
+ JobContext* OptionalContext)
{
using namespace std::literals;
@@ -1263,20 +1422,23 @@ LoadOplog(CidStore& ChunkStore, RemoteProjectStore& RemoteStore, ProjectStore::O
.Reason = LoadContainerResult.Reason,
.Text = LoadContainerResult.Text};
}
- ZEN_DEBUG("Loaded container in {}", NiceTimeSpanMs(static_cast<uint64_t>(LoadContainerResult.ElapsedSeconds * 1000)));
+ ReportMessage(OptionalContext,
+ fmt::format("Loaded container in {}", NiceTimeSpanMs(static_cast<uint64_t>(LoadContainerResult.ElapsedSeconds * 1000))));
- AsyncRemoteResult RemoteResult;
- Latch AttachmentsWorkLatch(1);
+ AsyncRemoteResult RemoteResult;
+ Latch AttachmentsWorkLatch(1);
+ std::atomic_size_t AttachmentCount = 0;
auto HasAttachment = [&ChunkStore, ForceDownload](const IoHash& RawHash) {
return !ForceDownload && ChunkStore.ContainsChunk(RawHash);
};
- auto OnNeedBlock = [&RemoteStore, &ChunkStore, &WorkerPool, &ChunksInBlocks, &AttachmentsWorkLatch, &RemoteResult](
+ auto OnNeedBlock = [&RemoteStore, &ChunkStore, &WorkerPool, &ChunksInBlocks, &AttachmentsWorkLatch, &AttachmentCount, &RemoteResult](
const IoHash& BlockHash,
std::vector<IoHash>&& Chunks) {
if (BlockHash == IoHash::Zero)
{
AttachmentsWorkLatch.AddCount(1);
+ AttachmentCount.fetch_add(1);
WorkerPool.ScheduleWork([&RemoteStore, &ChunkStore, &AttachmentsWorkLatch, &RemoteResult, Chunks = std::move(Chunks)]() {
auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); });
if (RemoteResult.IsError())
@@ -1305,6 +1467,7 @@ LoadOplog(CidStore& ChunkStore, RemoteProjectStore& RemoteStore, ProjectStore::O
return;
}
AttachmentsWorkLatch.AddCount(1);
+ AttachmentCount.fetch_add(1);
WorkerPool.ScheduleWork([&AttachmentsWorkLatch, &ChunkStore, &RemoteStore, BlockHash, &RemoteResult]() {
auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); });
if (RemoteResult.IsError())
@@ -1339,57 +1502,70 @@ LoadOplog(CidStore& ChunkStore, RemoteProjectStore& RemoteStore, ProjectStore::O
});
};
- auto OnNeedAttachment =
- [&RemoteStore, &ChunkStore, &WorkerPool, &AttachmentsWorkLatch, &RemoteResult, &Attachments](const IoHash& RawHash) {
- if (!Attachments.insert(RawHash).second)
+ auto OnNeedAttachment = [&RemoteStore, &ChunkStore, &WorkerPool, &AttachmentsWorkLatch, &RemoteResult, &Attachments, &AttachmentCount](
+ const IoHash& RawHash) {
+ if (!Attachments.insert(RawHash).second)
+ {
+ return;
+ }
+
+ AttachmentsWorkLatch.AddCount(1);
+ AttachmentCount.fetch_add(1);
+ WorkerPool.ScheduleWork([&RemoteStore, &ChunkStore, &RemoteResult, &AttachmentsWorkLatch, RawHash]() {
+ auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); });
+ if (RemoteResult.IsError())
{
return;
}
-
- AttachmentsWorkLatch.AddCount(1);
- WorkerPool.ScheduleWork([&RemoteStore, &ChunkStore, &RemoteResult, &AttachmentsWorkLatch, RawHash]() {
- auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); });
- if (RemoteResult.IsError())
- {
- return;
- }
- RemoteProjectStore::LoadAttachmentResult AttachmentResult = RemoteStore.LoadAttachment(RawHash);
- if (AttachmentResult.ErrorCode)
- {
- RemoteResult.SetError(AttachmentResult.ErrorCode, AttachmentResult.Reason, AttachmentResult.Text);
- ZEN_ERROR("Failed to download attachment {}, reason: '{}', error code: {}",
- RawHash,
- AttachmentResult.Reason,
- AttachmentResult.ErrorCode);
- return;
- }
- ZEN_DEBUG("Loaded attachment in {}", NiceTimeSpanMs(static_cast<uint64_t>(AttachmentResult.ElapsedSeconds * 1000)));
- ChunkStore.AddChunk(AttachmentResult.Bytes, RawHash);
- });
- };
+ RemoteProjectStore::LoadAttachmentResult AttachmentResult = RemoteStore.LoadAttachment(RawHash);
+ if (AttachmentResult.ErrorCode)
+ {
+ RemoteResult.SetError(AttachmentResult.ErrorCode, AttachmentResult.Reason, AttachmentResult.Text);
+ ZEN_ERROR("Failed to download attachment {}, reason: '{}', error code: {}",
+ RawHash,
+ AttachmentResult.Reason,
+ AttachmentResult.ErrorCode);
+ return;
+ }
+ ZEN_DEBUG("Loaded attachment in {}", NiceTimeSpanMs(static_cast<uint64_t>(AttachmentResult.ElapsedSeconds * 1000)));
+ ChunkStore.AddChunk(AttachmentResult.Bytes, RawHash);
+ });
+ };
RemoteProjectStore::Result Result =
- SaveOplogContainer(Oplog, LoadContainerResult.ContainerObject, HasAttachment, OnNeedBlock, OnNeedAttachment);
+ SaveOplogContainer(Oplog, LoadContainerResult.ContainerObject, HasAttachment, OnNeedBlock, OnNeedAttachment, OptionalContext);
if (!Attachments.empty())
{
- ZEN_INFO("Found {} attachments to download", Attachments.size());
+ ReportMessage(OptionalContext, fmt::format("Found {} attachments to download", Attachments.size()));
}
AttachmentsWorkLatch.CountDown();
while (!AttachmentsWorkLatch.Wait(1000))
{
- ZEN_INFO("Loading attachments, {} remaining...", AttachmentsWorkLatch.Remaining());
+ ptrdiff_t Remaining = AttachmentsWorkLatch.Remaining();
+ if (OptionalContext && OptionalContext->CancelFlag)
+ {
+ if (!RemoteResult.IsError())
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ }
+ }
+ ReportProgress(OptionalContext, fmt::format("Loading attachments, {} remaining...", Remaining), AttachmentCount.load(), Remaining);
+ }
+ if (AttachmentCount.load() > 0)
+ {
+ ReportProgress(OptionalContext, fmt::format("Loading attachments, {} remaining...", 0), AttachmentCount.load(), 0);
}
- AttachmentsWorkLatch.Wait();
if (Result.ErrorCode == 0)
{
Result = RemoteResult.ConvertResult();
}
Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500;
- ZEN_INFO("Loaded oplog {} in {}",
- RemoteResult.GetError() == 0 ? "SUCCESS" : "FAILURE",
- NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0)));
+ ReportMessage(OptionalContext,
+ fmt::format("Loaded oplog {} in {}",
+ RemoteResult.GetError() == 0 ? "SUCCESS" : "FAILURE",
+ NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0))));
return Result;
}
diff --git a/src/zenserver/projectstore/remoteprojectstore.h b/src/zenserver/projectstore/remoteprojectstore.h
index 3134fdb4a..501a5eeec 100644
--- a/src/zenserver/projectstore/remoteprojectstore.h
+++ b/src/zenserver/projectstore/remoteprojectstore.h
@@ -2,6 +2,7 @@
#pragma once
+#include <zencore/jobqueue.h>
#include "projectstore.h"
#include <unordered_set>
@@ -93,11 +94,14 @@ RemoteProjectStore::LoadContainerResult BuildContainer(
tsl::robin_map<IoHash, IoBuffer, IoHash::Hasher>*
OutOptionalTempAttachments); // Set OutOptionalTempAttachments to nullptr to avoid embedding loose "additional files"
+struct JobContext;
+
RemoteProjectStore::Result SaveOplogContainer(ProjectStore::Oplog& Oplog,
const CbObject& ContainerObject,
const std::function<bool(const IoHash& RawHash)>& HasAttachment,
const std::function<void(const IoHash& BlockHash, std::vector<IoHash>&& Chunks)>& OnNeedBlock,
- const std::function<void(const IoHash& RawHash)>& OnNeedAttachment);
+ const std::function<void(const IoHash& RawHash)>& OnNeedAttachment,
+ JobContext* OptionalContext);
RemoteProjectStore::Result SaveOplog(CidStore& ChunkStore,
RemoteProjectStore& RemoteStore,
@@ -108,9 +112,14 @@ RemoteProjectStore::Result SaveOplog(CidStore& ChunkStore,
bool EmbedLooseFiles,
bool BuildBlocks,
bool UseTempBlocks,
- bool ForceUpload);
-
-RemoteProjectStore::Result LoadOplog(CidStore& ChunkStore, RemoteProjectStore& RemoteStore, ProjectStore::Oplog& Oplog, bool ForceDownload);
+ bool ForceUpload,
+ JobContext* OptionalContext);
+
+RemoteProjectStore::Result LoadOplog(CidStore& ChunkStore,
+ RemoteProjectStore& RemoteStore,
+ ProjectStore::Oplog& Oplog,
+ bool ForceDownload,
+ JobContext* OptionalContext);
CompressedBuffer GenerateBlock(std::vector<SharedBuffer>&& Chunks);
bool IterateBlock(IoBuffer&& CompressedBlock, std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor);
diff --git a/src/zenserver/projectstore/zenremoteprojectstore.cpp b/src/zenserver/projectstore/zenremoteprojectstore.cpp
index 72a7f00f8..c25fd2388 100644
--- a/src/zenserver/projectstore/zenremoteprojectstore.cpp
+++ b/src/zenserver/projectstore/zenremoteprojectstore.cpp
@@ -376,7 +376,7 @@ private:
const size_t m_MaxChunkEmbedSize;
};
-std::unique_ptr<RemoteProjectStore>
+std::shared_ptr<RemoteProjectStore>
CreateZenRemoteStore(const ZenRemoteStoreOptions& Options)
{
std::string Url = Options.Url;
@@ -385,8 +385,8 @@ CreateZenRemoteStore(const ZenRemoteStoreOptions& Options)
// Assume http URL
Url = fmt::format("http://{}"sv, Url);
}
- std::unique_ptr<RemoteProjectStore> RemoteStore =
- std::make_unique<ZenRemoteStore>(Url, Options.ProjectId, Options.OplogId, Options.MaxBlockSize, Options.MaxChunkEmbedSize);
+ std::shared_ptr<RemoteProjectStore> RemoteStore =
+ std::make_shared<ZenRemoteStore>(Url, Options.ProjectId, Options.OplogId, Options.MaxBlockSize, Options.MaxChunkEmbedSize);
return RemoteStore;
}
diff --git a/src/zenserver/projectstore/zenremoteprojectstore.h b/src/zenserver/projectstore/zenremoteprojectstore.h
index ef9dcad8c..9f079ee74 100644
--- a/src/zenserver/projectstore/zenremoteprojectstore.h
+++ b/src/zenserver/projectstore/zenremoteprojectstore.h
@@ -13,6 +13,6 @@ struct ZenRemoteStoreOptions : RemoteStoreOptions
std::string OplogId;
};
-std::unique_ptr<RemoteProjectStore> CreateZenRemoteStore(const ZenRemoteStoreOptions& Options);
+std::shared_ptr<RemoteProjectStore> CreateZenRemoteStore(const ZenRemoteStoreOptions& Options);
} // namespace zen
diff --git a/src/zenserver/zenserver.cpp b/src/zenserver/zenserver.cpp
index cfc4a228b..8056b6506 100644
--- a/src/zenserver/zenserver.cpp
+++ b/src/zenserver/zenserver.cpp
@@ -6,6 +6,7 @@
#include <zencore/filesystem.h>
#include <zencore/fmtutils.h>
#include <zencore/iobuffer.h>
+#include <zencore/jobqueue.h>
#include <zencore/logging.h>
#include <zencore/refcount.h>
#include <zencore/scopeguard.h>
@@ -271,6 +272,8 @@ public:
InitializeState(ServerOptions);
+ m_JobQueue = MakeJobQueue(8, "backgroundjobs");
+
m_HealthService.SetHealthInfo({.DataRoot = m_DataRoot,
.AbsLogPath = ServerOptions.AbsLogFile,
.HttpServerClass = std::string(ServerOptions.HttpServerClass),
@@ -341,7 +344,7 @@ public:
ZEN_INFO("instantiating project service");
- m_ProjectStore = new zen::ProjectStore(*m_CidStore, m_DataRoot / "projects", m_GcManager);
+ m_ProjectStore = new zen::ProjectStore(*m_CidStore, m_DataRoot / "projects", m_GcManager, *m_JobQueue);
m_HttpProjectService.reset(new zen::HttpProjectService{*m_CidStore, m_ProjectStore, m_StatsService, *m_AuthMgr});
#if ZEN_WITH_COMPUTE_SERVICES
@@ -365,7 +368,6 @@ public:
}
m_Http->RegisterService(m_TestService); // NOTE: this is intentionally not limited to test mode as it's useful for diagnostics
- m_Http->RegisterService(m_AdminService);
#if ZEN_WITH_TESTS
m_Http->RegisterService(m_TestingService);
@@ -431,6 +433,10 @@ public:
.MinimumFreeDiskSpaceToAllowWrites = ServerOptions.GcConfig.MinimumFreeDiskSpaceToAllowWrites};
m_GcScheduler.Initialize(GcConfig);
+ // Create and register admin interface last to make sure all is properly initialized
+ m_AdminService = std::make_unique<HttpAdminService>(m_GcScheduler, *m_JobQueue);
+ m_Http->RegisterService(*m_AdminService);
+
return EffectiveBasePort;
}
@@ -498,6 +504,11 @@ public:
}
Flush();
+
+ if (m_JobQueue)
+ {
+ m_JobQueue->Stop();
+ }
}
void RequestExit(int ExitCode)
@@ -733,13 +744,14 @@ private:
std::unique_ptr<zen::UpstreamCache> m_UpstreamCache;
std::unique_ptr<zen::HttpUpstreamService> m_UpstreamService;
std::unique_ptr<zen::HttpStructuredCacheService> m_StructuredCacheService;
- zen::HttpAdminService m_AdminService{m_GcScheduler};
zen::HttpHealthService m_HealthService;
#if ZEN_WITH_COMPUTE_SERVICES
std::unique_ptr<zen::HttpFunctionService> m_HttpFunctionService;
#endif // ZEN_WITH_COMPUTE_SERVICES
std::unique_ptr<zen::HttpFrontendService> m_FrontendService;
std::unique_ptr<zen::HttpObjectStoreService> m_ObjStoreService;
+ std::unique_ptr<JobQueue> m_JobQueue;
+ std::unique_ptr<zen::HttpAdminService> m_AdminService;
bool m_DebugOptionForcedCrash = false;
bool m_UseSentry = false;