diff options
| author | Dan Engelbrecht <[email protected]> | 2023-09-13 16:13:30 -0400 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-09-13 22:13:30 +0200 |
| commit | b2cef5900b6e251bed4bc0a02161fd90646d37f0 (patch) | |
| tree | e9085a92e9499bca55dfda9b63779be94218409f /src/zenserver/projectstore/projectstore.cpp | |
| parent | scan oplog object for fields (#397) (diff) | |
| download | zen-b2cef5900b6e251bed4bc0a02161fd90646d37f0.tar.xz zen-b2cef5900b6e251bed4bc0a02161fd90646d37f0.zip | |
job queue and async oplog-import/export (#395)
- Feature: New http endpoint for background jobs `/admin/jobs/status` which will return a response listing the currently active background jobs and their status
- Feature: New http endpoint for background jobs information `/admin/jobs/status/{jobid}` which will return a response detailing status, pending messages and progress status
- GET will return a response detailing status, pending messages and progress status
- DELETE will mark the job for cancelling and return without waiting for completion
- If status returned is "Complete" or "Aborted" the jobid will be removed from the server and can not be queried again
- Feature: New zen command `jobs` to list, get info about and cancel background jobs
- If no options are given it will display a list of active background jobs
- `--jobid` accepts an id (returned from for example `oplog-export` with `--async`) and will return a response detailing status, pending messages and progress status for that job
- `--cancel` can be added when `--jobid` is given which will request zenserver to cancel the background job
- Feature: oplog import and export http rpc requests are now async operations that will run in the background
- Feature: `oplog-export` and `oplog-import` now reports progress to the console as work progress by default
- Feature: `oplog-export` and `oplog-import` can now be cancelled using Ctrl+C
- Feature: `oplog-export` and `oplog-import` has a new option `--async` which will only trigger the work and report a background job id back
Diffstat (limited to 'src/zenserver/projectstore/projectstore.cpp')
| -rw-r--r-- | src/zenserver/projectstore/projectstore.cpp | 120 |
1 files changed, 77 insertions, 43 deletions
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp index 6ec18904e..2fd6d492e 100644 --- a/src/zenserver/projectstore/projectstore.cpp +++ b/src/zenserver/projectstore/projectstore.cpp @@ -8,6 +8,7 @@ #include <zencore/compactbinaryvalidation.h> #include <zencore/filesystem.h> #include <zencore/fmtutils.h> +#include <zencore/jobqueue.h> #include <zencore/logging.h> #include <zencore/scopeguard.h> #include <zencore/stream.h> @@ -72,15 +73,20 @@ namespace { } while (true); } - std::pair<std::unique_ptr<RemoteProjectStore>, std::string> CreateRemoteStore(CbObjectView Params, - AuthMgr& AuthManager, - size_t MaxBlockSize, - size_t MaxChunkEmbedSize, - const std::filesystem::path& TempFilePath) + struct CreateRemoteStoreResult + { + std::shared_ptr<RemoteProjectStore> Store; + std::string Description; + }; + CreateRemoteStoreResult CreateRemoteStore(CbObjectView Params, + AuthMgr& AuthManager, + size_t MaxBlockSize, + size_t MaxChunkEmbedSize, + const std::filesystem::path& TempFilePath) { using namespace std::literals; - std::unique_ptr<RemoteProjectStore> RemoteStore; + std::shared_ptr<RemoteProjectStore> RemoteStore; if (CbObjectView File = Params["file"sv].AsObjectView(); File) { @@ -1565,11 +1571,12 @@ ProjectStore::Project::TouchOplog(std::string_view Oplog) const ////////////////////////////////////////////////////////////////////////// -ProjectStore::ProjectStore(CidStore& Store, std::filesystem::path BasePath, GcManager& Gc) +ProjectStore::ProjectStore(CidStore& Store, std::filesystem::path BasePath, GcManager& Gc, JobQueue& JobQueue) : GcStorage(Gc) , GcContributor(Gc) , m_Log(logging::Get("project")) , m_CidStore(Store) +, m_JobQueue(JobQueue) , m_ProjectBasePath(BasePath) , m_DiskWriteBlocker(Gc.GetDiskWriteBlocker()) { @@ -2330,7 +2337,8 @@ ProjectStore::WriteOplog(const std::string_view ProjectId, const std::string_vie Attachments.insert(RawHash); }; - RemoteProjectStore::Result RemoteResult = SaveOplogContainer(*Oplog, ContainerObject, HasAttachment, OnNeedBlock, OnNeedAttachment); + RemoteProjectStore::Result RemoteResult = + SaveOplogContainer(*Oplog, ContainerObject, HasAttachment, OnNeedBlock, OnNeedAttachment, nullptr); if (RemoteResult.ErrorCode) { @@ -2536,12 +2544,7 @@ ProjectStore::Rpc(HttpServerRequest& HttpReq, } else if (Method == "export"sv) { - std::pair<HttpResponseCode, std::string> Result = Export(*Project.Get(), *Oplog, Cb["params"sv].AsObjectView(), AuthManager); - if (Result.second.empty()) - { - HttpReq.WriteResponse(Result.first); - return Result.first != HttpResponseCode::BadRequest; - } + std::pair<HttpResponseCode, std::string> Result = Export(Project, *Oplog, Cb["params"sv].AsObjectView(), AuthManager); HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second); return true; } @@ -2748,7 +2751,7 @@ ProjectStore::Rpc(HttpServerRequest& HttpReq, } std::pair<HttpResponseCode, std::string> -ProjectStore::Export(ProjectStore::Project& Project, ProjectStore::Oplog& Oplog, CbObjectView&& Params, AuthMgr& AuthManager) +ProjectStore::Export(Ref<ProjectStore::Project> Project, ProjectStore::Oplog& Oplog, CbObjectView&& Params, AuthMgr& AuthManager) { ZEN_TRACE_CPU("ProjectStore::Export"); @@ -2759,35 +2762,52 @@ ProjectStore::Export(ProjectStore::Project& Project, ProjectStore::Oplog& Oplog, bool Force = Params["force"sv].AsBool(false); bool EmbedLooseFile = Params["embedloosefiles"sv].AsBool(false); - std::pair<std::unique_ptr<RemoteProjectStore>, std::string> RemoteStoreResult = - CreateRemoteStore(Params, AuthManager, MaxBlockSize, MaxChunkEmbedSize, Oplog.TempPath()); + CreateRemoteStoreResult RemoteStoreResult = CreateRemoteStore(Params, AuthManager, MaxBlockSize, MaxChunkEmbedSize, Oplog.TempPath()); - if (RemoteStoreResult.first == nullptr) + if (RemoteStoreResult.Store == nullptr) { - return {HttpResponseCode::BadRequest, RemoteStoreResult.second}; + return {HttpResponseCode::BadRequest, RemoteStoreResult.Description}; } - std::unique_ptr<RemoteProjectStore> RemoteStore = std::move(RemoteStoreResult.first); + std::shared_ptr<RemoteProjectStore> RemoteStore = std::move(RemoteStoreResult.Store); RemoteProjectStore::RemoteStoreInfo StoreInfo = RemoteStore->GetInfo(); ZEN_INFO("Saving oplog '{}/{}' to {}, maxblocksize {}, maxchunkembedsize {}", - Project.Identifier, + Project->Identifier, Oplog.OplogId(), StoreInfo.Description, NiceBytes(MaxBlockSize), NiceBytes(MaxChunkEmbedSize)); - RemoteProjectStore::Result Result = SaveOplog(m_CidStore, - *RemoteStore, - Project, - Oplog, - MaxBlockSize, - MaxChunkEmbedSize, - EmbedLooseFile, - StoreInfo.CreateBlocks, - StoreInfo.UseTempBlockFiles, - Force); + JobId JobId = m_JobQueue.QueueJob([this, + ActualRemoteStore = std::move(RemoteStore), + Project, + OplogPtr = &Oplog, + MaxBlockSize, + MaxChunkEmbedSize, + EmbedLooseFile, + CreateBlocks = StoreInfo.CreateBlocks, + UseTempBlockFiles = StoreInfo.UseTempBlockFiles, + Force](JobContext& Context) { + RemoteProjectStore::Result Result = SaveOplog(m_CidStore, + *ActualRemoteStore, + *Project.Get(), + *OplogPtr, + MaxBlockSize, + MaxChunkEmbedSize, + EmbedLooseFile, + CreateBlocks, + UseTempBlockFiles, + Force, + &Context); + auto Response = ConvertResult(Result); + ZEN_INFO("SaveOplog: Status: {} '{}'", ToString(Response.first), Response.second); + if (!IsHttpSuccessCode(Response.first)) + { + throw std::runtime_error(fmt::format("Export failed. Status '{}'. Reason: '{}'", ToString(Response.first), Response.second)); + } + }); - return ConvertResult(Result); + return {HttpResponseCode::Accepted, fmt::format("{}", JobId.Id)}; } std::pair<HttpResponseCode, std::string> @@ -2801,19 +2821,29 @@ ProjectStore::Import(ProjectStore::Project& Project, ProjectStore::Oplog& Oplog, size_t MaxChunkEmbedSize = Params["maxchunkembedsize"sv].AsUInt64(1024u * 1024u); bool Force = Params["force"sv].AsBool(false); - std::pair<std::unique_ptr<RemoteProjectStore>, std::string> RemoteStoreResult = - CreateRemoteStore(Params, AuthManager, MaxBlockSize, MaxChunkEmbedSize, Oplog.TempPath()); + CreateRemoteStoreResult RemoteStoreResult = CreateRemoteStore(Params, AuthManager, MaxBlockSize, MaxChunkEmbedSize, Oplog.TempPath()); - if (RemoteStoreResult.first == nullptr) + if (RemoteStoreResult.Store == nullptr) { - return {HttpResponseCode::BadRequest, RemoteStoreResult.second}; + return {HttpResponseCode::BadRequest, RemoteStoreResult.Description}; } - std::unique_ptr<RemoteProjectStore> RemoteStore = std::move(RemoteStoreResult.first); + std::shared_ptr<RemoteProjectStore> RemoteStore = std::move(RemoteStoreResult.Store); RemoteProjectStore::RemoteStoreInfo StoreInfo = RemoteStore->GetInfo(); ZEN_INFO("Loading oplog '{}/{}' from {}", Project.Identifier, Oplog.OplogId(), StoreInfo.Description); - RemoteProjectStore::Result Result = LoadOplog(m_CidStore, *RemoteStore, Oplog, Force); - return ConvertResult(Result); + JobId JobId = m_JobQueue.QueueJob( + [this, ActualRemoteStore = std::move(RemoteStore), OplogPtr = &Oplog, MaxBlockSize, MaxChunkEmbedSize, Force](JobContext& Context) { + RemoteProjectStore::Result Result = LoadOplog(m_CidStore, *ActualRemoteStore, *OplogPtr, Force, &Context); + auto Response = ConvertResult(Result); + ZEN_INFO("LoadOplog: Status: {} '{}'", ToString(Response.first), Response.second); + if (!IsHttpSuccessCode(Response.first)) + { + throw std::runtime_error( + fmt::format("Import failed. Status '{}'. Reason: '{}'", ToString(Response.first), Response.second)); + } + }); + + return {HttpResponseCode::Accepted, fmt::format("{}", JobId.Id)}; } bool @@ -2909,6 +2939,7 @@ TEST_CASE("project.store.create") ScopedTemporaryDirectory TempDir; + auto JobQueue = MakeJobQueue(1, ""sv); GcManager Gc; CidStore CidStore(Gc); CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; @@ -2916,7 +2947,7 @@ TEST_CASE("project.store.create") std::string_view ProjectName("proj1"sv); std::filesystem::path BasePath = TempDir.Path() / "projectstore"; - ProjectStore ProjectStore(CidStore, BasePath, Gc); + ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue); std::filesystem::path RootDir = TempDir.Path() / "root"; std::filesystem::path EngineRootDir = TempDir.Path() / "engine"; std::filesystem::path ProjectRootDir = TempDir.Path() / "game"; @@ -2938,13 +2969,14 @@ TEST_CASE("project.store.lifetimes") ScopedTemporaryDirectory TempDir; + auto JobQueue = MakeJobQueue(1, ""sv); GcManager Gc; CidStore CidStore(Gc); CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; CidStore.Initialize(CidConfig); std::filesystem::path BasePath = TempDir.Path() / "projectstore"; - ProjectStore ProjectStore(CidStore, BasePath, Gc); + ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue); std::filesystem::path RootDir = TempDir.Path() / "root"; std::filesystem::path EngineRootDir = TempDir.Path() / "engine"; std::filesystem::path ProjectRootDir = TempDir.Path() / "game"; @@ -2976,13 +3008,14 @@ TEST_CASE("project.store.gc") ScopedTemporaryDirectory TempDir; + auto JobQueue = MakeJobQueue(1, ""sv); GcManager Gc; CidStore CidStore(Gc); CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; CidStore.Initialize(CidConfig); std::filesystem::path BasePath = TempDir.Path() / "projectstore"; - ProjectStore ProjectStore(CidStore, BasePath, Gc); + ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue); std::filesystem::path RootDir = TempDir.Path() / "root"; std::filesystem::path EngineRootDir = TempDir.Path() / "engine"; @@ -3135,13 +3168,14 @@ TEST_CASE("project.store.partial.read") ScopedTemporaryDirectory TempDir; + auto JobQueue = MakeJobQueue(1, ""sv); GcManager Gc; CidStore CidStore(Gc); CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas"sv, .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; CidStore.Initialize(CidConfig); std::filesystem::path BasePath = TempDir.Path() / "projectstore"sv; - ProjectStore ProjectStore(CidStore, BasePath, Gc); + ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue); std::filesystem::path RootDir = TempDir.Path() / "root"sv; std::filesystem::path EngineRootDir = TempDir.Path() / "engine"sv; |