aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/projectstore/projectstore.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2023-09-13 16:13:30 -0400
committerGitHub <[email protected]>2023-09-13 22:13:30 +0200
commitb2cef5900b6e251bed4bc0a02161fd90646d37f0 (patch)
treee9085a92e9499bca55dfda9b63779be94218409f /src/zenserver/projectstore/projectstore.cpp
parentscan oplog object for fields (#397) (diff)
downloadzen-b2cef5900b6e251bed4bc0a02161fd90646d37f0.tar.xz
zen-b2cef5900b6e251bed4bc0a02161fd90646d37f0.zip
job queue and async oplog-import/export (#395)
- Feature: New http endpoint for background jobs `/admin/jobs/status` which will return a response listing the currently active background jobs and their status - Feature: New http endpoint for background jobs information `/admin/jobs/status/{jobid}` which will return a response detailing status, pending messages and progress status - GET will return a response detailing status, pending messages and progress status - DELETE will mark the job for cancelling and return without waiting for completion - If status returned is "Complete" or "Aborted" the jobid will be removed from the server and can not be queried again - Feature: New zen command `jobs` to list, get info about and cancel background jobs - If no options are given it will display a list of active background jobs - `--jobid` accepts an id (returned from for example `oplog-export` with `--async`) and will return a response detailing status, pending messages and progress status for that job - `--cancel` can be added when `--jobid` is given which will request zenserver to cancel the background job - Feature: oplog import and export http rpc requests are now async operations that will run in the background - Feature: `oplog-export` and `oplog-import` now reports progress to the console as work progress by default - Feature: `oplog-export` and `oplog-import` can now be cancelled using Ctrl+C - Feature: `oplog-export` and `oplog-import` has a new option `--async` which will only trigger the work and report a background job id back
Diffstat (limited to 'src/zenserver/projectstore/projectstore.cpp')
-rw-r--r--src/zenserver/projectstore/projectstore.cpp120
1 files changed, 77 insertions, 43 deletions
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp
index 6ec18904e..2fd6d492e 100644
--- a/src/zenserver/projectstore/projectstore.cpp
+++ b/src/zenserver/projectstore/projectstore.cpp
@@ -8,6 +8,7 @@
#include <zencore/compactbinaryvalidation.h>
#include <zencore/filesystem.h>
#include <zencore/fmtutils.h>
+#include <zencore/jobqueue.h>
#include <zencore/logging.h>
#include <zencore/scopeguard.h>
#include <zencore/stream.h>
@@ -72,15 +73,20 @@ namespace {
} while (true);
}
- std::pair<std::unique_ptr<RemoteProjectStore>, std::string> CreateRemoteStore(CbObjectView Params,
- AuthMgr& AuthManager,
- size_t MaxBlockSize,
- size_t MaxChunkEmbedSize,
- const std::filesystem::path& TempFilePath)
+ struct CreateRemoteStoreResult
+ {
+ std::shared_ptr<RemoteProjectStore> Store;
+ std::string Description;
+ };
+ CreateRemoteStoreResult CreateRemoteStore(CbObjectView Params,
+ AuthMgr& AuthManager,
+ size_t MaxBlockSize,
+ size_t MaxChunkEmbedSize,
+ const std::filesystem::path& TempFilePath)
{
using namespace std::literals;
- std::unique_ptr<RemoteProjectStore> RemoteStore;
+ std::shared_ptr<RemoteProjectStore> RemoteStore;
if (CbObjectView File = Params["file"sv].AsObjectView(); File)
{
@@ -1565,11 +1571,12 @@ ProjectStore::Project::TouchOplog(std::string_view Oplog) const
//////////////////////////////////////////////////////////////////////////
-ProjectStore::ProjectStore(CidStore& Store, std::filesystem::path BasePath, GcManager& Gc)
+ProjectStore::ProjectStore(CidStore& Store, std::filesystem::path BasePath, GcManager& Gc, JobQueue& JobQueue)
: GcStorage(Gc)
, GcContributor(Gc)
, m_Log(logging::Get("project"))
, m_CidStore(Store)
+, m_JobQueue(JobQueue)
, m_ProjectBasePath(BasePath)
, m_DiskWriteBlocker(Gc.GetDiskWriteBlocker())
{
@@ -2330,7 +2337,8 @@ ProjectStore::WriteOplog(const std::string_view ProjectId, const std::string_vie
Attachments.insert(RawHash);
};
- RemoteProjectStore::Result RemoteResult = SaveOplogContainer(*Oplog, ContainerObject, HasAttachment, OnNeedBlock, OnNeedAttachment);
+ RemoteProjectStore::Result RemoteResult =
+ SaveOplogContainer(*Oplog, ContainerObject, HasAttachment, OnNeedBlock, OnNeedAttachment, nullptr);
if (RemoteResult.ErrorCode)
{
@@ -2536,12 +2544,7 @@ ProjectStore::Rpc(HttpServerRequest& HttpReq,
}
else if (Method == "export"sv)
{
- std::pair<HttpResponseCode, std::string> Result = Export(*Project.Get(), *Oplog, Cb["params"sv].AsObjectView(), AuthManager);
- if (Result.second.empty())
- {
- HttpReq.WriteResponse(Result.first);
- return Result.first != HttpResponseCode::BadRequest;
- }
+ std::pair<HttpResponseCode, std::string> Result = Export(Project, *Oplog, Cb["params"sv].AsObjectView(), AuthManager);
HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second);
return true;
}
@@ -2748,7 +2751,7 @@ ProjectStore::Rpc(HttpServerRequest& HttpReq,
}
std::pair<HttpResponseCode, std::string>
-ProjectStore::Export(ProjectStore::Project& Project, ProjectStore::Oplog& Oplog, CbObjectView&& Params, AuthMgr& AuthManager)
+ProjectStore::Export(Ref<ProjectStore::Project> Project, ProjectStore::Oplog& Oplog, CbObjectView&& Params, AuthMgr& AuthManager)
{
ZEN_TRACE_CPU("ProjectStore::Export");
@@ -2759,35 +2762,52 @@ ProjectStore::Export(ProjectStore::Project& Project, ProjectStore::Oplog& Oplog,
bool Force = Params["force"sv].AsBool(false);
bool EmbedLooseFile = Params["embedloosefiles"sv].AsBool(false);
- std::pair<std::unique_ptr<RemoteProjectStore>, std::string> RemoteStoreResult =
- CreateRemoteStore(Params, AuthManager, MaxBlockSize, MaxChunkEmbedSize, Oplog.TempPath());
+ CreateRemoteStoreResult RemoteStoreResult = CreateRemoteStore(Params, AuthManager, MaxBlockSize, MaxChunkEmbedSize, Oplog.TempPath());
- if (RemoteStoreResult.first == nullptr)
+ if (RemoteStoreResult.Store == nullptr)
{
- return {HttpResponseCode::BadRequest, RemoteStoreResult.second};
+ return {HttpResponseCode::BadRequest, RemoteStoreResult.Description};
}
- std::unique_ptr<RemoteProjectStore> RemoteStore = std::move(RemoteStoreResult.first);
+ std::shared_ptr<RemoteProjectStore> RemoteStore = std::move(RemoteStoreResult.Store);
RemoteProjectStore::RemoteStoreInfo StoreInfo = RemoteStore->GetInfo();
ZEN_INFO("Saving oplog '{}/{}' to {}, maxblocksize {}, maxchunkembedsize {}",
- Project.Identifier,
+ Project->Identifier,
Oplog.OplogId(),
StoreInfo.Description,
NiceBytes(MaxBlockSize),
NiceBytes(MaxChunkEmbedSize));
- RemoteProjectStore::Result Result = SaveOplog(m_CidStore,
- *RemoteStore,
- Project,
- Oplog,
- MaxBlockSize,
- MaxChunkEmbedSize,
- EmbedLooseFile,
- StoreInfo.CreateBlocks,
- StoreInfo.UseTempBlockFiles,
- Force);
+ JobId JobId = m_JobQueue.QueueJob([this,
+ ActualRemoteStore = std::move(RemoteStore),
+ Project,
+ OplogPtr = &Oplog,
+ MaxBlockSize,
+ MaxChunkEmbedSize,
+ EmbedLooseFile,
+ CreateBlocks = StoreInfo.CreateBlocks,
+ UseTempBlockFiles = StoreInfo.UseTempBlockFiles,
+ Force](JobContext& Context) {
+ RemoteProjectStore::Result Result = SaveOplog(m_CidStore,
+ *ActualRemoteStore,
+ *Project.Get(),
+ *OplogPtr,
+ MaxBlockSize,
+ MaxChunkEmbedSize,
+ EmbedLooseFile,
+ CreateBlocks,
+ UseTempBlockFiles,
+ Force,
+ &Context);
+ auto Response = ConvertResult(Result);
+ ZEN_INFO("SaveOplog: Status: {} '{}'", ToString(Response.first), Response.second);
+ if (!IsHttpSuccessCode(Response.first))
+ {
+ throw std::runtime_error(fmt::format("Export failed. Status '{}'. Reason: '{}'", ToString(Response.first), Response.second));
+ }
+ });
- return ConvertResult(Result);
+ return {HttpResponseCode::Accepted, fmt::format("{}", JobId.Id)};
}
std::pair<HttpResponseCode, std::string>
@@ -2801,19 +2821,29 @@ ProjectStore::Import(ProjectStore::Project& Project, ProjectStore::Oplog& Oplog,
size_t MaxChunkEmbedSize = Params["maxchunkembedsize"sv].AsUInt64(1024u * 1024u);
bool Force = Params["force"sv].AsBool(false);
- std::pair<std::unique_ptr<RemoteProjectStore>, std::string> RemoteStoreResult =
- CreateRemoteStore(Params, AuthManager, MaxBlockSize, MaxChunkEmbedSize, Oplog.TempPath());
+ CreateRemoteStoreResult RemoteStoreResult = CreateRemoteStore(Params, AuthManager, MaxBlockSize, MaxChunkEmbedSize, Oplog.TempPath());
- if (RemoteStoreResult.first == nullptr)
+ if (RemoteStoreResult.Store == nullptr)
{
- return {HttpResponseCode::BadRequest, RemoteStoreResult.second};
+ return {HttpResponseCode::BadRequest, RemoteStoreResult.Description};
}
- std::unique_ptr<RemoteProjectStore> RemoteStore = std::move(RemoteStoreResult.first);
+ std::shared_ptr<RemoteProjectStore> RemoteStore = std::move(RemoteStoreResult.Store);
RemoteProjectStore::RemoteStoreInfo StoreInfo = RemoteStore->GetInfo();
ZEN_INFO("Loading oplog '{}/{}' from {}", Project.Identifier, Oplog.OplogId(), StoreInfo.Description);
- RemoteProjectStore::Result Result = LoadOplog(m_CidStore, *RemoteStore, Oplog, Force);
- return ConvertResult(Result);
+ JobId JobId = m_JobQueue.QueueJob(
+ [this, ActualRemoteStore = std::move(RemoteStore), OplogPtr = &Oplog, MaxBlockSize, MaxChunkEmbedSize, Force](JobContext& Context) {
+ RemoteProjectStore::Result Result = LoadOplog(m_CidStore, *ActualRemoteStore, *OplogPtr, Force, &Context);
+ auto Response = ConvertResult(Result);
+ ZEN_INFO("LoadOplog: Status: {} '{}'", ToString(Response.first), Response.second);
+ if (!IsHttpSuccessCode(Response.first))
+ {
+ throw std::runtime_error(
+ fmt::format("Import failed. Status '{}'. Reason: '{}'", ToString(Response.first), Response.second));
+ }
+ });
+
+ return {HttpResponseCode::Accepted, fmt::format("{}", JobId.Id)};
}
bool
@@ -2909,6 +2939,7 @@ TEST_CASE("project.store.create")
ScopedTemporaryDirectory TempDir;
+ auto JobQueue = MakeJobQueue(1, ""sv);
GcManager Gc;
CidStore CidStore(Gc);
CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096};
@@ -2916,7 +2947,7 @@ TEST_CASE("project.store.create")
std::string_view ProjectName("proj1"sv);
std::filesystem::path BasePath = TempDir.Path() / "projectstore";
- ProjectStore ProjectStore(CidStore, BasePath, Gc);
+ ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue);
std::filesystem::path RootDir = TempDir.Path() / "root";
std::filesystem::path EngineRootDir = TempDir.Path() / "engine";
std::filesystem::path ProjectRootDir = TempDir.Path() / "game";
@@ -2938,13 +2969,14 @@ TEST_CASE("project.store.lifetimes")
ScopedTemporaryDirectory TempDir;
+ auto JobQueue = MakeJobQueue(1, ""sv);
GcManager Gc;
CidStore CidStore(Gc);
CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096};
CidStore.Initialize(CidConfig);
std::filesystem::path BasePath = TempDir.Path() / "projectstore";
- ProjectStore ProjectStore(CidStore, BasePath, Gc);
+ ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue);
std::filesystem::path RootDir = TempDir.Path() / "root";
std::filesystem::path EngineRootDir = TempDir.Path() / "engine";
std::filesystem::path ProjectRootDir = TempDir.Path() / "game";
@@ -2976,13 +3008,14 @@ TEST_CASE("project.store.gc")
ScopedTemporaryDirectory TempDir;
+ auto JobQueue = MakeJobQueue(1, ""sv);
GcManager Gc;
CidStore CidStore(Gc);
CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096};
CidStore.Initialize(CidConfig);
std::filesystem::path BasePath = TempDir.Path() / "projectstore";
- ProjectStore ProjectStore(CidStore, BasePath, Gc);
+ ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue);
std::filesystem::path RootDir = TempDir.Path() / "root";
std::filesystem::path EngineRootDir = TempDir.Path() / "engine";
@@ -3135,13 +3168,14 @@ TEST_CASE("project.store.partial.read")
ScopedTemporaryDirectory TempDir;
+ auto JobQueue = MakeJobQueue(1, ""sv);
GcManager Gc;
CidStore CidStore(Gc);
CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas"sv, .TinyValueThreshold = 1024, .HugeValueThreshold = 4096};
CidStore.Initialize(CidConfig);
std::filesystem::path BasePath = TempDir.Path() / "projectstore"sv;
- ProjectStore ProjectStore(CidStore, BasePath, Gc);
+ ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue);
std::filesystem::path RootDir = TempDir.Path() / "root"sv;
std::filesystem::path EngineRootDir = TempDir.Path() / "engine"sv;