diff options
| author | Dan Engelbrecht <[email protected]> | 2023-09-13 16:13:30 -0400 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-09-13 22:13:30 +0200 |
| commit | b2cef5900b6e251bed4bc0a02161fd90646d37f0 (patch) | |
| tree | e9085a92e9499bca55dfda9b63779be94218409f /src/zenserver | |
| parent | scan oplog object for fields (#397) (diff) | |
| download | zen-b2cef5900b6e251bed4bc0a02161fd90646d37f0.tar.xz zen-b2cef5900b6e251bed4bc0a02161fd90646d37f0.zip | |
job queue and async oplog-import/export (#395)
- Feature: New http endpoint for background jobs `/admin/jobs/status` which will return a response listing the currently active background jobs and their status
- Feature: New http endpoint for background jobs information `/admin/jobs/status/{jobid}` which will return a response detailing status, pending messages and progress status
- GET will return a response detailing status, pending messages and progress status
- DELETE will mark the job for cancelling and return without waiting for completion
- If status returned is "Complete" or "Aborted" the jobid will be removed from the server and can not be queried again
- Feature: New zen command `jobs` to list, get info about and cancel background jobs
- If no options are given it will display a list of active background jobs
- `--jobid` accepts an id (returned from for example `oplog-export` with `--async`) and will return a response detailing status, pending messages and progress status for that job
- `--cancel` can be added when `--jobid` is given which will request zenserver to cancel the background job
- Feature: oplog import and export http rpc requests are now async operations that will run in the background
- Feature: `oplog-export` and `oplog-import` now reports progress to the console as work progress by default
- Feature: `oplog-export` and `oplog-import` can now be cancelled using Ctrl+C
- Feature: `oplog-export` and `oplog-import` has a new option `--async` which will only trigger the work and report a background job id back
Diffstat (limited to 'src/zenserver')
| -rw-r--r-- | src/zenserver/admin/admin.cpp | 140 | ||||
| -rw-r--r-- | src/zenserver/admin/admin.h | 4 | ||||
| -rw-r--r-- | src/zenserver/projectstore/fileremoteprojectstore.cpp | 4 | ||||
| -rw-r--r-- | src/zenserver/projectstore/fileremoteprojectstore.h | 2 | ||||
| -rw-r--r-- | src/zenserver/projectstore/jupiterremoteprojectstore.cpp | 9 | ||||
| -rw-r--r-- | src/zenserver/projectstore/jupiterremoteprojectstore.h | 2 | ||||
| -rw-r--r-- | src/zenserver/projectstore/projectstore.cpp | 120 | ||||
| -rw-r--r-- | src/zenserver/projectstore/projectstore.h | 12 | ||||
| -rw-r--r-- | src/zenserver/projectstore/remoteprojectstore.cpp | 306 | ||||
| -rw-r--r-- | src/zenserver/projectstore/remoteprojectstore.h | 17 | ||||
| -rw-r--r-- | src/zenserver/projectstore/zenremoteprojectstore.cpp | 6 | ||||
| -rw-r--r-- | src/zenserver/projectstore/zenremoteprojectstore.h | 2 | ||||
| -rw-r--r-- | src/zenserver/zenserver.cpp | 18 |
13 files changed, 507 insertions, 135 deletions
diff --git a/src/zenserver/admin/admin.cpp b/src/zenserver/admin/admin.cpp index 575a10d83..74131e624 100644 --- a/src/zenserver/admin/admin.cpp +++ b/src/zenserver/admin/admin.cpp @@ -3,6 +3,7 @@ #include "admin.h" #include <zencore/compactbinarybuilder.h> +#include <zencore/jobqueue.h> #include <zencore/string.h> #include <zenstore/gc.h> @@ -10,7 +11,9 @@ namespace zen { -HttpAdminService::HttpAdminService(GcScheduler& Scheduler) : m_GcScheduler(Scheduler) +HttpAdminService::HttpAdminService(GcScheduler& Scheduler, JobQueue& BackgroundJobQueue) +: m_GcScheduler(Scheduler) +, m_BackgroundJobQueue(BackgroundJobQueue) { using namespace std::literals; @@ -23,6 +26,141 @@ HttpAdminService::HttpAdminService(GcScheduler& Scheduler) : m_GcScheduler(Sched }, HttpVerb::kGet); + m_Router.AddPattern("jobid", "([[:digit:]]+?)"); + + m_Router.RegisterRoute( + "jobs", + [&](HttpRouterRequest& Req) { + std::vector<JobQueue::JobInfo> Jobs = m_BackgroundJobQueue.GetJobs(); + CbObjectWriter Obj; + Obj.BeginArray("jobs"); + for (const auto& Job : Jobs) + { + Obj.BeginObject(); + Obj.AddInteger("Id", Job.Id.Id); + Obj.AddString("Status", JobQueue::ToString(Job.Status)); + Obj.EndObject(); + } + Obj.EndArray(); + Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Obj.Save()); + }, + HttpVerb::kGet); + + m_Router.RegisterRoute( + "jobs/{jobid}", + [&](HttpRouterRequest& Req) { + const auto& JobIdString = Req.GetCapture(1); + std::optional<uint64_t> JobIdArg = ParseInt<uint64_t>(JobIdString); + if (!JobIdArg) + { + Req.ServerRequest().WriteResponse(HttpResponseCode::BadRequest); + } + JobId Id{.Id = JobIdArg.value_or(0)}; + if (Id.Id == 0) + { + return Req.ServerRequest().WriteResponse(HttpResponseCode::BadRequest, + ZenContentType::kText, + fmt::format("Invalid Job Id: {}", Id.Id)); + } + + std::optional<JobQueue::JobDetails> CurrentState = m_BackgroundJobQueue.Get(Id); + if (!CurrentState) + { + return Req.ServerRequest().WriteResponse(HttpResponseCode::NotFound); + } + + auto WriteState = [](CbObjectWriter& Obj, const JobQueue::State& State) { + if (!State.CurrentOp.empty()) + { + Obj.AddString("CurrentOp"sv, State.CurrentOp); + Obj.AddInteger("CurrentOpPercentComplete"sv, State.CurrentOpPercentComplete); + } + if (!State.Messages.empty()) + { + Obj.BeginArray("Messages"); + for (const std::string& Message : State.Messages) + { + Obj.AddString(Message); + } + Obj.EndArray(); + } + }; + + auto GetAgeAsSeconds = [](std::chrono::system_clock::time_point Start, std::chrono::system_clock::time_point End) { + auto Age = End - Start; + auto Milliseconds = std::chrono::duration_cast<std::chrono::milliseconds>(Age); + return Milliseconds.count() / 1000.0; + }; + + const std::chrono::system_clock::time_point Now = std::chrono::system_clock::now(); + + switch (CurrentState->Status) + { + case JobQueue::Status::Queued: + { + CbObjectWriter Obj; + Obj.AddString("Status"sv, "Queued"sv); + Obj.AddFloat("QueueTimeS", GetAgeAsSeconds(CurrentState->CreateTime, Now)); + Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Obj.Save()); + } + break; + case JobQueue::Status::Running: + { + CbObjectWriter Obj; + Obj.AddString("Status"sv, "Running"sv); + WriteState(Obj, CurrentState->State); + Obj.AddFloat("QueueTimeS", GetAgeAsSeconds(CurrentState->CreateTime, CurrentState->StartTime)); + Obj.AddFloat("RunTimeS", GetAgeAsSeconds(CurrentState->StartTime, Now)); + Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Obj.Save()); + } + break; + case JobQueue::Status::Aborted: + { + CbObjectWriter Obj; + Obj.AddString("Status"sv, "Aborted"sv); + WriteState(Obj, CurrentState->State); + Obj.AddFloat("QueueTimeS", GetAgeAsSeconds(CurrentState->CreateTime, CurrentState->StartTime)); + Obj.AddFloat("RunTimeS", GetAgeAsSeconds(CurrentState->StartTime, CurrentState->EndTime)); + Obj.AddFloat("CompleteTimeS", GetAgeAsSeconds(CurrentState->EndTime, Now)); + Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Obj.Save()); + } + break; + case JobQueue::Status::Completed: + { + CbObjectWriter Obj; + Obj.AddString("Status"sv, "Complete"sv); + WriteState(Obj, CurrentState->State); + Obj.AddFloat("QueueTimeS", GetAgeAsSeconds(CurrentState->CreateTime, CurrentState->StartTime)); + Obj.AddFloat("RunTimeS", GetAgeAsSeconds(CurrentState->StartTime, CurrentState->EndTime)); + Obj.AddFloat("CompleteTimeS", GetAgeAsSeconds(CurrentState->EndTime, Now)); + Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Obj.Save()); + } + break; + } + }, + HttpVerb::kGet); + + m_Router.RegisterRoute( + "jobs/{jobid}", + [&](HttpRouterRequest& Req) { + const auto& JobIdString = Req.GetCapture(1); + std::optional<uint64_t> JobIdArg = ParseInt<uint64_t>(JobIdString); + if (!JobIdArg) + { + Req.ServerRequest().WriteResponse(HttpResponseCode::BadRequest); + } + JobId Id{.Id = JobIdArg.value_or(0)}; + if (m_BackgroundJobQueue.CancelJob(Id)) + { + Req.ServerRequest().WriteResponse(HttpResponseCode::OK); + } + else + { + Req.ServerRequest().WriteResponse(HttpResponseCode::NotFound); + } + }, + HttpVerb::kDelete); + m_Router.RegisterRoute( "gc", [this](HttpRouterRequest& Req) { diff --git a/src/zenserver/admin/admin.h b/src/zenserver/admin/admin.h index 9463ffbb3..3152f87ab 100644 --- a/src/zenserver/admin/admin.h +++ b/src/zenserver/admin/admin.h @@ -8,11 +8,12 @@ namespace zen { class GcScheduler; +class JobQueue; class HttpAdminService : public zen::HttpService { public: - HttpAdminService(GcScheduler& Scheduler); + HttpAdminService(GcScheduler& Scheduler, JobQueue& BackgroundJobQueue); ~HttpAdminService(); virtual const char* BaseUri() const override; @@ -21,6 +22,7 @@ public: private: HttpRequestRouter m_Router; GcScheduler& m_GcScheduler; + JobQueue& m_BackgroundJobQueue; }; } // namespace zen diff --git a/src/zenserver/projectstore/fileremoteprojectstore.cpp b/src/zenserver/projectstore/fileremoteprojectstore.cpp index faa748c5d..eeb1f71c4 100644 --- a/src/zenserver/projectstore/fileremoteprojectstore.cpp +++ b/src/zenserver/projectstore/fileremoteprojectstore.cpp @@ -244,10 +244,10 @@ private: bool m_UseTempBlocks = false; }; -std::unique_ptr<RemoteProjectStore> +std::shared_ptr<RemoteProjectStore> CreateFileRemoteStore(const FileRemoteStoreOptions& Options) { - std::unique_ptr<RemoteProjectStore> RemoteStore = std::make_unique<LocalExportProjectStore>(Options.Name, + std::shared_ptr<RemoteProjectStore> RemoteStore = std::make_shared<LocalExportProjectStore>(Options.Name, Options.OptionalBaseName, std::filesystem::path(Options.FolderPath), Options.ForceDisableBlocks, diff --git a/src/zenserver/projectstore/fileremoteprojectstore.h b/src/zenserver/projectstore/fileremoteprojectstore.h index f398bbfbc..8da9692d5 100644 --- a/src/zenserver/projectstore/fileremoteprojectstore.h +++ b/src/zenserver/projectstore/fileremoteprojectstore.h @@ -15,6 +15,6 @@ struct FileRemoteStoreOptions : RemoteStoreOptions bool ForceEnableTempBlocks = false; }; -std::unique_ptr<RemoteProjectStore> CreateFileRemoteStore(const FileRemoteStoreOptions& Options); +std::shared_ptr<RemoteProjectStore> CreateFileRemoteStore(const FileRemoteStoreOptions& Options); } // namespace zen diff --git a/src/zenserver/projectstore/jupiterremoteprojectstore.cpp b/src/zenserver/projectstore/jupiterremoteprojectstore.cpp index e1a4a9dd4..e59bac6d6 100644 --- a/src/zenserver/projectstore/jupiterremoteprojectstore.cpp +++ b/src/zenserver/projectstore/jupiterremoteprojectstore.cpp @@ -44,13 +44,12 @@ public: { return {.CreateBlocks = m_EnableBlocks, .UseTempBlockFiles = m_UseTempBlocks, - .Description = fmt::format("[cloud] {} as {}/{}/{}{}{}"sv, + .Description = fmt::format("[cloud] {} as {}/{}/{}{}"sv, m_CloudClient->ServiceUrl(), m_Namespace, m_Bucket, m_Key, - m_OptionalBaseKey == IoHash::Zero ? "" : " Base: ", - m_OptionalBaseKey)}; + m_OptionalBaseKey == IoHash::Zero ? "" : fmt::format(" Base {}", m_OptionalBaseKey))}; } virtual SaveResult SaveContainer(const IoBuffer& Payload) override @@ -283,7 +282,7 @@ private: bool m_UseTempBlocks = true; }; -std::unique_ptr<RemoteProjectStore> +std::shared_ptr<RemoteProjectStore> CreateJupiterRemoteStore(const JupiterRemoteStoreOptions& Options, const std::filesystem::path& TempFilePath) { std::string Url = Options.Url; @@ -319,7 +318,7 @@ CreateJupiterRemoteStore(const JupiterRemoteStoreOptions& Options, const std::fi Ref<CloudCacheClient> CloudClient(new CloudCacheClient(ClientOptions, std::move(TokenProvider))); - std::unique_ptr<RemoteProjectStore> RemoteStore = std::make_unique<JupiterRemoteStore>(std::move(CloudClient), + std::shared_ptr<RemoteProjectStore> RemoteStore = std::make_shared<JupiterRemoteStore>(std::move(CloudClient), Options.Namespace, Options.Bucket, Options.Key, diff --git a/src/zenserver/projectstore/jupiterremoteprojectstore.h b/src/zenserver/projectstore/jupiterremoteprojectstore.h index 4ae6c88cb..27f3d9b73 100644 --- a/src/zenserver/projectstore/jupiterremoteprojectstore.h +++ b/src/zenserver/projectstore/jupiterremoteprojectstore.h @@ -23,7 +23,7 @@ struct JupiterRemoteStoreOptions : RemoteStoreOptions bool AssumeHttp2 = false; }; -std::unique_ptr<RemoteProjectStore> CreateJupiterRemoteStore(const JupiterRemoteStoreOptions& Options, +std::shared_ptr<RemoteProjectStore> CreateJupiterRemoteStore(const JupiterRemoteStoreOptions& Options, const std::filesystem::path& TempFilePath); } // namespace zen diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp index 6ec18904e..2fd6d492e 100644 --- a/src/zenserver/projectstore/projectstore.cpp +++ b/src/zenserver/projectstore/projectstore.cpp @@ -8,6 +8,7 @@ #include <zencore/compactbinaryvalidation.h> #include <zencore/filesystem.h> #include <zencore/fmtutils.h> +#include <zencore/jobqueue.h> #include <zencore/logging.h> #include <zencore/scopeguard.h> #include <zencore/stream.h> @@ -72,15 +73,20 @@ namespace { } while (true); } - std::pair<std::unique_ptr<RemoteProjectStore>, std::string> CreateRemoteStore(CbObjectView Params, - AuthMgr& AuthManager, - size_t MaxBlockSize, - size_t MaxChunkEmbedSize, - const std::filesystem::path& TempFilePath) + struct CreateRemoteStoreResult + { + std::shared_ptr<RemoteProjectStore> Store; + std::string Description; + }; + CreateRemoteStoreResult CreateRemoteStore(CbObjectView Params, + AuthMgr& AuthManager, + size_t MaxBlockSize, + size_t MaxChunkEmbedSize, + const std::filesystem::path& TempFilePath) { using namespace std::literals; - std::unique_ptr<RemoteProjectStore> RemoteStore; + std::shared_ptr<RemoteProjectStore> RemoteStore; if (CbObjectView File = Params["file"sv].AsObjectView(); File) { @@ -1565,11 +1571,12 @@ ProjectStore::Project::TouchOplog(std::string_view Oplog) const ////////////////////////////////////////////////////////////////////////// -ProjectStore::ProjectStore(CidStore& Store, std::filesystem::path BasePath, GcManager& Gc) +ProjectStore::ProjectStore(CidStore& Store, std::filesystem::path BasePath, GcManager& Gc, JobQueue& JobQueue) : GcStorage(Gc) , GcContributor(Gc) , m_Log(logging::Get("project")) , m_CidStore(Store) +, m_JobQueue(JobQueue) , m_ProjectBasePath(BasePath) , m_DiskWriteBlocker(Gc.GetDiskWriteBlocker()) { @@ -2330,7 +2337,8 @@ ProjectStore::WriteOplog(const std::string_view ProjectId, const std::string_vie Attachments.insert(RawHash); }; - RemoteProjectStore::Result RemoteResult = SaveOplogContainer(*Oplog, ContainerObject, HasAttachment, OnNeedBlock, OnNeedAttachment); + RemoteProjectStore::Result RemoteResult = + SaveOplogContainer(*Oplog, ContainerObject, HasAttachment, OnNeedBlock, OnNeedAttachment, nullptr); if (RemoteResult.ErrorCode) { @@ -2536,12 +2544,7 @@ ProjectStore::Rpc(HttpServerRequest& HttpReq, } else if (Method == "export"sv) { - std::pair<HttpResponseCode, std::string> Result = Export(*Project.Get(), *Oplog, Cb["params"sv].AsObjectView(), AuthManager); - if (Result.second.empty()) - { - HttpReq.WriteResponse(Result.first); - return Result.first != HttpResponseCode::BadRequest; - } + std::pair<HttpResponseCode, std::string> Result = Export(Project, *Oplog, Cb["params"sv].AsObjectView(), AuthManager); HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second); return true; } @@ -2748,7 +2751,7 @@ ProjectStore::Rpc(HttpServerRequest& HttpReq, } std::pair<HttpResponseCode, std::string> -ProjectStore::Export(ProjectStore::Project& Project, ProjectStore::Oplog& Oplog, CbObjectView&& Params, AuthMgr& AuthManager) +ProjectStore::Export(Ref<ProjectStore::Project> Project, ProjectStore::Oplog& Oplog, CbObjectView&& Params, AuthMgr& AuthManager) { ZEN_TRACE_CPU("ProjectStore::Export"); @@ -2759,35 +2762,52 @@ ProjectStore::Export(ProjectStore::Project& Project, ProjectStore::Oplog& Oplog, bool Force = Params["force"sv].AsBool(false); bool EmbedLooseFile = Params["embedloosefiles"sv].AsBool(false); - std::pair<std::unique_ptr<RemoteProjectStore>, std::string> RemoteStoreResult = - CreateRemoteStore(Params, AuthManager, MaxBlockSize, MaxChunkEmbedSize, Oplog.TempPath()); + CreateRemoteStoreResult RemoteStoreResult = CreateRemoteStore(Params, AuthManager, MaxBlockSize, MaxChunkEmbedSize, Oplog.TempPath()); - if (RemoteStoreResult.first == nullptr) + if (RemoteStoreResult.Store == nullptr) { - return {HttpResponseCode::BadRequest, RemoteStoreResult.second}; + return {HttpResponseCode::BadRequest, RemoteStoreResult.Description}; } - std::unique_ptr<RemoteProjectStore> RemoteStore = std::move(RemoteStoreResult.first); + std::shared_ptr<RemoteProjectStore> RemoteStore = std::move(RemoteStoreResult.Store); RemoteProjectStore::RemoteStoreInfo StoreInfo = RemoteStore->GetInfo(); ZEN_INFO("Saving oplog '{}/{}' to {}, maxblocksize {}, maxchunkembedsize {}", - Project.Identifier, + Project->Identifier, Oplog.OplogId(), StoreInfo.Description, NiceBytes(MaxBlockSize), NiceBytes(MaxChunkEmbedSize)); - RemoteProjectStore::Result Result = SaveOplog(m_CidStore, - *RemoteStore, - Project, - Oplog, - MaxBlockSize, - MaxChunkEmbedSize, - EmbedLooseFile, - StoreInfo.CreateBlocks, - StoreInfo.UseTempBlockFiles, - Force); + JobId JobId = m_JobQueue.QueueJob([this, + ActualRemoteStore = std::move(RemoteStore), + Project, + OplogPtr = &Oplog, + MaxBlockSize, + MaxChunkEmbedSize, + EmbedLooseFile, + CreateBlocks = StoreInfo.CreateBlocks, + UseTempBlockFiles = StoreInfo.UseTempBlockFiles, + Force](JobContext& Context) { + RemoteProjectStore::Result Result = SaveOplog(m_CidStore, + *ActualRemoteStore, + *Project.Get(), + *OplogPtr, + MaxBlockSize, + MaxChunkEmbedSize, + EmbedLooseFile, + CreateBlocks, + UseTempBlockFiles, + Force, + &Context); + auto Response = ConvertResult(Result); + ZEN_INFO("SaveOplog: Status: {} '{}'", ToString(Response.first), Response.second); + if (!IsHttpSuccessCode(Response.first)) + { + throw std::runtime_error(fmt::format("Export failed. Status '{}'. Reason: '{}'", ToString(Response.first), Response.second)); + } + }); - return ConvertResult(Result); + return {HttpResponseCode::Accepted, fmt::format("{}", JobId.Id)}; } std::pair<HttpResponseCode, std::string> @@ -2801,19 +2821,29 @@ ProjectStore::Import(ProjectStore::Project& Project, ProjectStore::Oplog& Oplog, size_t MaxChunkEmbedSize = Params["maxchunkembedsize"sv].AsUInt64(1024u * 1024u); bool Force = Params["force"sv].AsBool(false); - std::pair<std::unique_ptr<RemoteProjectStore>, std::string> RemoteStoreResult = - CreateRemoteStore(Params, AuthManager, MaxBlockSize, MaxChunkEmbedSize, Oplog.TempPath()); + CreateRemoteStoreResult RemoteStoreResult = CreateRemoteStore(Params, AuthManager, MaxBlockSize, MaxChunkEmbedSize, Oplog.TempPath()); - if (RemoteStoreResult.first == nullptr) + if (RemoteStoreResult.Store == nullptr) { - return {HttpResponseCode::BadRequest, RemoteStoreResult.second}; + return {HttpResponseCode::BadRequest, RemoteStoreResult.Description}; } - std::unique_ptr<RemoteProjectStore> RemoteStore = std::move(RemoteStoreResult.first); + std::shared_ptr<RemoteProjectStore> RemoteStore = std::move(RemoteStoreResult.Store); RemoteProjectStore::RemoteStoreInfo StoreInfo = RemoteStore->GetInfo(); ZEN_INFO("Loading oplog '{}/{}' from {}", Project.Identifier, Oplog.OplogId(), StoreInfo.Description); - RemoteProjectStore::Result Result = LoadOplog(m_CidStore, *RemoteStore, Oplog, Force); - return ConvertResult(Result); + JobId JobId = m_JobQueue.QueueJob( + [this, ActualRemoteStore = std::move(RemoteStore), OplogPtr = &Oplog, MaxBlockSize, MaxChunkEmbedSize, Force](JobContext& Context) { + RemoteProjectStore::Result Result = LoadOplog(m_CidStore, *ActualRemoteStore, *OplogPtr, Force, &Context); + auto Response = ConvertResult(Result); + ZEN_INFO("LoadOplog: Status: {} '{}'", ToString(Response.first), Response.second); + if (!IsHttpSuccessCode(Response.first)) + { + throw std::runtime_error( + fmt::format("Import failed. Status '{}'. Reason: '{}'", ToString(Response.first), Response.second)); + } + }); + + return {HttpResponseCode::Accepted, fmt::format("{}", JobId.Id)}; } bool @@ -2909,6 +2939,7 @@ TEST_CASE("project.store.create") ScopedTemporaryDirectory TempDir; + auto JobQueue = MakeJobQueue(1, ""sv); GcManager Gc; CidStore CidStore(Gc); CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; @@ -2916,7 +2947,7 @@ TEST_CASE("project.store.create") std::string_view ProjectName("proj1"sv); std::filesystem::path BasePath = TempDir.Path() / "projectstore"; - ProjectStore ProjectStore(CidStore, BasePath, Gc); + ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue); std::filesystem::path RootDir = TempDir.Path() / "root"; std::filesystem::path EngineRootDir = TempDir.Path() / "engine"; std::filesystem::path ProjectRootDir = TempDir.Path() / "game"; @@ -2938,13 +2969,14 @@ TEST_CASE("project.store.lifetimes") ScopedTemporaryDirectory TempDir; + auto JobQueue = MakeJobQueue(1, ""sv); GcManager Gc; CidStore CidStore(Gc); CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; CidStore.Initialize(CidConfig); std::filesystem::path BasePath = TempDir.Path() / "projectstore"; - ProjectStore ProjectStore(CidStore, BasePath, Gc); + ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue); std::filesystem::path RootDir = TempDir.Path() / "root"; std::filesystem::path EngineRootDir = TempDir.Path() / "engine"; std::filesystem::path ProjectRootDir = TempDir.Path() / "game"; @@ -2976,13 +3008,14 @@ TEST_CASE("project.store.gc") ScopedTemporaryDirectory TempDir; + auto JobQueue = MakeJobQueue(1, ""sv); GcManager Gc; CidStore CidStore(Gc); CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; CidStore.Initialize(CidConfig); std::filesystem::path BasePath = TempDir.Path() / "projectstore"; - ProjectStore ProjectStore(CidStore, BasePath, Gc); + ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue); std::filesystem::path RootDir = TempDir.Path() / "root"; std::filesystem::path EngineRootDir = TempDir.Path() / "engine"; @@ -3135,13 +3168,14 @@ TEST_CASE("project.store.partial.read") ScopedTemporaryDirectory TempDir; + auto JobQueue = MakeJobQueue(1, ""sv); GcManager Gc; CidStore CidStore(Gc); CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas"sv, .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; CidStore.Initialize(CidConfig); std::filesystem::path BasePath = TempDir.Path() / "projectstore"sv; - ProjectStore ProjectStore(CidStore, BasePath, Gc); + ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue); std::filesystem::path RootDir = TempDir.Path() / "root"sv; std::filesystem::path EngineRootDir = TempDir.Path() / "engine"sv; diff --git a/src/zenserver/projectstore/projectstore.h b/src/zenserver/projectstore/projectstore.h index a2f92fb25..aa84d04ca 100644 --- a/src/zenserver/projectstore/projectstore.h +++ b/src/zenserver/projectstore/projectstore.h @@ -20,6 +20,7 @@ class CbPackage; class CidStore; class AuthMgr; class ScrubContext; +class JobQueue; enum class HttpResponseCode; @@ -64,7 +65,7 @@ class ProjectStore : public RefCounted, public GcStorage, public GcContributor struct OplogStorage; public: - ProjectStore(CidStore& Store, std::filesystem::path BasePath, GcManager& Gc); + ProjectStore(CidStore& Store, std::filesystem::path BasePath, GcManager& Gc, JobQueue& JobQueue); ~ProjectStore(); struct Project; @@ -329,10 +330,10 @@ public: IoBuffer&& Payload, AuthMgr& AuthManager); - std::pair<HttpResponseCode, std::string> Export(ProjectStore::Project& Project, - ProjectStore::Oplog& Oplog, - CbObjectView&& Params, - AuthMgr& AuthManager); + std::pair<HttpResponseCode, std::string> Export(Ref<ProjectStore::Project> Project, + ProjectStore::Oplog& Oplog, + CbObjectView&& Params, + AuthMgr& AuthManager); std::pair<HttpResponseCode, std::string> Import(ProjectStore::Project& Project, ProjectStore::Oplog& Oplog, @@ -344,6 +345,7 @@ public: private: spdlog::logger& m_Log; CidStore& m_CidStore; + JobQueue& m_JobQueue; std::filesystem::path m_ProjectBasePath; mutable RwLock m_ProjectsLock; std::map<std::string, Ref<Project>> m_Projects; diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp index 235166659..aca9410a2 100644 --- a/src/zenserver/projectstore/remoteprojectstore.cpp +++ b/src/zenserver/projectstore/remoteprojectstore.cpp @@ -71,6 +71,27 @@ private: std::string m_ErrorText; }; +void +ReportProgress(JobContext* OptionalContext, std::string_view CurrentOp, ptrdiff_t Total, ptrdiff_t Remaining) +{ + if (OptionalContext) + { + ZEN_ASSERT(Total > 0); + OptionalContext->Queue.ReportProgress(OptionalContext->Id, CurrentOp, gsl::narrow<uint32_t>((100 * (Total - Remaining)) / Total)); + } + ZEN_INFO("{}", CurrentOp); +} + +void +ReportMessage(JobContext* OptionalContext, std::string_view Message) +{ + if (OptionalContext) + { + OptionalContext->Queue.ReportMessage(OptionalContext->Id, Message); + } + ZEN_INFO("{}", Message); +} + bool IterateBlock(IoBuffer&& CompressedBlock, std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor) { @@ -201,6 +222,7 @@ BuildContainer(CidStore& ChunkStore, const std::function<void(const IoHash&)>& OnLargeAttachment, const std::function<void(const std::unordered_set<IoHash, IoHash::Hasher>)>& OnBlockChunks, tsl::robin_map<IoHash, IoBuffer, IoHash::Hasher>* OutLooseAttachments, + JobContext* OptionalContext, AsyncRemoteResult& RemoteResult) { using namespace std::literals; @@ -217,8 +239,6 @@ BuildContainer(CidStore& ChunkStore, std::vector<Block> Blocks; CompressedBuffer OpsBuffer; - Latch BlockCreateLatch(1); - std::unordered_set<IoHash, IoHash::Hasher> BlockAttachmentHashes; size_t BlockSize = 0; @@ -365,11 +385,15 @@ BuildContainer(CidStore& ChunkStore, CB(RewrittenOp); }; - ZEN_INFO("Building exported oplog and fetching attachments"); + ReportMessage(OptionalContext, "Building exported oplog and fetching attachments"); tsl::robin_map<int, std::string> OpLSNToKey; Oplog.IterateOplogWithKey([&](int LSN, const Oid&, CbObject Op) { + if (RemoteResult.IsError()) + { + return; + } std::string_view Key = Op["key"sv].AsString(); OpLSNToKey.insert({LSN, std::string(Key)}); Op.IterateAttachments([&](CbFieldView FieldView) { Attachments.insert({FieldView.AsAttachment(), LSN}); }); @@ -382,12 +406,23 @@ BuildContainer(CidStore& ChunkStore, SectionOpsWriter << Op; } OpCount++; + if (OptionalContext && OptionalContext->CancelFlag) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); + } }); + if (OptionalContext && OptionalContext->CancelFlag) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); + return {}; + } + if (!Attachments.empty() && !KnownBlocks.empty()) { + ReportMessage(OptionalContext, fmt::format("Checking {} known blocks for reuse", KnownBlocks.size())); + size_t ReusedBlockCount = 0; - ZEN_INFO("Checking {} known blocks for reuse", KnownBlocks.size()); for (const Block& KnownBlock : KnownBlocks) { size_t BlockAttachmentCount = KnownBlock.ChunksInBlock.size(); @@ -428,10 +463,10 @@ BuildContainer(CidStore& ChunkStore, ReusePercent); } } - ZEN_INFO("Reusing {} out of {} known blocks", ReusedBlockCount, KnownBlocks.size()); + ReportMessage(OptionalContext, fmt::format("Reusing {} out of {} known blocks", ReusedBlockCount, KnownBlocks.size())); } - ZEN_INFO("Sorting {} attachments from {} ops", Attachments.size(), OpLSNToKey.size()); + ReportMessage(OptionalContext, fmt::format("Sorting {} attachments from {} ops", Attachments.size(), OpLSNToKey.size())); // Sort attachments so we get predictable blocks for the same oplog upload std::vector<IoHash> SortedAttachments; @@ -456,7 +491,15 @@ BuildContainer(CidStore& ChunkStore, return LhsKeyIt->second < RhsKeyIt->second; }); - ZEN_INFO("Assembling {} attachments from {} ops into blocks and loose attachments", SortedAttachments.size(), OpLSNToKey.size()); + if (OptionalContext && OptionalContext->CancelFlag) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); + return {}; + } + ReportMessage(OptionalContext, + fmt::format("Assembling {} attachments from {} ops into blocks and loose attachments", + SortedAttachments.size(), + OpLSNToKey.size())); auto GetPayload = [&](const IoHash& AttachmentHash) { if (OutLooseAttachments != nullptr) @@ -474,8 +517,20 @@ BuildContainer(CidStore& ChunkStore, size_t GeneratedBlockCount = 0; size_t LargeAttachmentCount = 0; + Latch BlockCreateLatch(1); for (const IoHash& AttachmentHash : SortedAttachments) { + if (OptionalContext && OptionalContext->CancelFlag) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); + BlockCreateLatch.CountDown(); + while (!BlockCreateLatch.Wait(1000)) + { + ZEN_INFO("Aborting, {} blocks remaining...", BlockCreateLatch.Remaining()); + } + return {}; + } + auto It = Attachments.find(AttachmentHash); ZEN_ASSERT(It != Attachments.end()); IoBuffer Payload = GetPayload(AttachmentHash); @@ -595,19 +650,67 @@ BuildContainer(CidStore& ChunkStore, } SectionOpsWriter.EndArray(); // "ops" - ZEN_INFO("Assembled {} attachments from {} ops into {} blocks and {} loose attachments", - SortedAttachments.size(), - OpLSNToKey.size(), - GeneratedBlockCount, - LargeAttachmentCount); + if (OptionalContext && OptionalContext->CancelFlag) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); + BlockCreateLatch.CountDown(); + while (!BlockCreateLatch.Wait(1000)) + { + ZEN_INFO("Aborting, {} blocks remaining...", BlockCreateLatch.Remaining()); + } + return {}; + } + ReportMessage(OptionalContext, + fmt::format("Assembled {} attachments from {} ops into {} blocks and {} loose attachments", + SortedAttachments.size(), + OpLSNToKey.size(), + GeneratedBlockCount, + LargeAttachmentCount)); CompressedBuffer CompressedOpsSection = CompressedBuffer::Compress(SectionOpsWriter.Save().GetBuffer()); - ZEN_DEBUG("Added oplog section {}, {}", CompressedOpsSection.DecodeRawHash(), NiceBytes(CompressedOpsSection.GetCompressedSize())); + if (OptionalContext && OptionalContext->CancelFlag) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); + BlockCreateLatch.CountDown(); + while (!BlockCreateLatch.Wait(1000)) + { + ptrdiff_t Remaining = BlockCreateLatch.Remaining(); + ReportProgress(OptionalContext, fmt::format("Aborting, {} blocks remaining...", Remaining), GeneratedBlockCount, Remaining); + } + if (GeneratedBlockCount > 0) + { + ReportProgress(OptionalContext, fmt::format("Aborting, {} blocks remaining...", 0), GeneratedBlockCount, 0); + } + return {}; + } + ReportMessage(OptionalContext, + fmt::format("Added oplog section {}, {}", + CompressedOpsSection.DecodeRawHash(), + NiceBytes(CompressedOpsSection.GetCompressedSize()))); BlockCreateLatch.CountDown(); while (!BlockCreateLatch.Wait(1000)) { - ZEN_INFO("Creating blocks, {} remaining...", BlockCreateLatch.Remaining()); + ptrdiff_t Remaining = BlockCreateLatch.Remaining(); + if (OptionalContext && OptionalContext->CancelFlag) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); + while (!BlockCreateLatch.Wait(1000)) + { + Remaining = BlockCreateLatch.Remaining(); + ReportProgress(OptionalContext, + fmt::format("Aborting, {} blocks remaining...", Remaining), + GeneratedBlockCount, + Remaining); + } + ReportProgress(OptionalContext, fmt::format("Creating blocks, {} remaining...", 0), GeneratedBlockCount, 0); + return {}; + } + ReportProgress(OptionalContext, fmt::format("Creating blocks, {} remaining...", Remaining), GeneratedBlockCount, Remaining); + } + if (GeneratedBlockCount > 0) + { + ReportProgress(OptionalContext, fmt::format("Creating blocks, {} remaining...", 0), GeneratedBlockCount, 0); } if (!RemoteResult.IsError()) @@ -703,6 +806,7 @@ BuildContainer(CidStore& ChunkStore, OnLargeAttachment, OnBlockChunks, OutOptionalTempAttachments, + nullptr, RemoteResult); return RemoteProjectStore::LoadContainerResult{RemoteResult.ConvertResult(), ContainerObject}; } @@ -717,7 +821,8 @@ SaveOplog(CidStore& ChunkStore, bool EmbedLooseFiles, bool BuildBlocks, bool UseTempBlocks, - bool ForceUpload) + bool ForceUpload, + JobContext* OptionalContext) { using namespace std::literals; @@ -831,7 +936,7 @@ SaveOplog(CidStore& ChunkStore, if (BuildBlocks) { - ZEN_INFO("Loading oplog base container"); + ReportMessage(OptionalContext, "Loading oplog base container"); RemoteProjectStore::LoadContainerResult BaseContainerResult = RemoteStore.LoadBaseContainer(); if (BaseContainerResult.ErrorCode != static_cast<int>(HttpResponseCode::NoContent)) { @@ -864,6 +969,7 @@ SaveOplog(CidStore& ChunkStore, KnownBlocks.push_back({.BlockHash = BlockHash, .ChunksInBlock = std::move(ChunksInBlock)}); }; } + ReportMessage(OptionalContext, fmt::format("Loading oplog base container in {:.3} s", BaseContainerResult.ElapsedSeconds)); } } @@ -880,13 +986,22 @@ SaveOplog(CidStore& ChunkStore, OnLargeAttachment, OnBlockChunks, EmbedLooseFiles ? &TempAttachments : nullptr, + OptionalContext, /* out */ RemoteResult); if (!RemoteResult.IsError()) { + if (OptionalContext && OptionalContext->CancelFlag) + { + RemoteProjectStore::Result Result = {.ErrorCode = 0, + .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500, + .Text = "Operation cancelled"}; + return Result; + } + uint64_t ChunkCount = OplogContainerObject["chunks"sv].AsArrayView().Num(); uint64_t BlockCount = OplogContainerObject["blocks"sv].AsArrayView().Num(); - ZEN_INFO("Saving oplog container with {} attachments and {} blocks...", ChunkCount, BlockCount); + ReportMessage(OptionalContext, fmt::format("Saving oplog container with {} attachments and {} blocks...", ChunkCount, BlockCount)); RemoteProjectStore::SaveResult ContainerSaveResult = RemoteStore.SaveContainer(OplogContainerObject.GetBuffer().AsIoBuffer()); if (ContainerSaveResult.ErrorCode) @@ -901,7 +1016,16 @@ SaveOplog(CidStore& ChunkStore, if (!ContainerSaveResult.Needs.empty() || ForceUpload) { - ZEN_INFO("Filtering needed attachments..."); + if (OptionalContext && OptionalContext->CancelFlag) + { + RemoteProjectStore::Result Result = {.ErrorCode = 0, + .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500, + .Text = "Operation cancelled"}; + return Result; + } + + ReportMessage(OptionalContext, "Filtering needed attachments..."); + std::vector<IoHash> NeededLargeAttachments; std::unordered_set<IoHash, IoHash::Hasher> NeededOtherAttachments; NeededLargeAttachments.reserve(LargeAttachments.size()); @@ -924,10 +1048,18 @@ SaveOplog(CidStore& ChunkStore, } } - Latch SaveAttachmentsLatch(1); + ptrdiff_t AttachmentsToSave(0); + Latch SaveAttachmentsLatch(1); if (!NeededLargeAttachments.empty()) { - ZEN_INFO("Saving large attachments..."); + if (OptionalContext && OptionalContext->CancelFlag) + { + RemoteProjectStore::Result Result = {.ErrorCode = 0, + .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500, + .Text = "Operation cancelled"}; + return Result; + } + ReportMessage(OptionalContext, "Saving large attachments..."); for (const IoHash& RawHash : NeededLargeAttachments) { if (RemoteResult.IsError()) @@ -946,6 +1078,7 @@ SaveOplog(CidStore& ChunkStore, } SaveAttachmentsLatch.AddCount(1); + AttachmentsToSave++; WorkerPool.ScheduleWork([&ChunkStore, &RemoteStore, &SaveAttachmentsLatch, @@ -994,7 +1127,7 @@ SaveOplog(CidStore& ChunkStore, if (!CreatedBlocks.empty()) { - ZEN_INFO("Saving created block attachments..."); + ReportMessage(OptionalContext, "Saving created block attachments..."); for (auto& It : CreatedBlocks) { if (RemoteResult.IsError()) @@ -1007,6 +1140,7 @@ SaveOplog(CidStore& ChunkStore, IoBuffer Payload = It.second; ZEN_ASSERT(Payload); SaveAttachmentsLatch.AddCount(1); + AttachmentsToSave++; WorkerPool.ScheduleWork( [&ChunkStore, &RemoteStore, &SaveAttachmentsLatch, &RemoteResult, Payload = std::move(Payload), RawHash]() { auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); }); @@ -1041,7 +1175,7 @@ SaveOplog(CidStore& ChunkStore, if (!BlockChunks.empty()) { - ZEN_INFO("Saving chunk block attachments..."); + ReportMessage(OptionalContext, "Saving chunk block attachments..."); for (const std::vector<IoHash>& Chunks : BlockChunks) { if (RemoteResult.IsError()) @@ -1069,6 +1203,7 @@ SaveOplog(CidStore& ChunkStore, } } SaveAttachmentsLatch.AddCount(1); + AttachmentsToSave++; WorkerPool.ScheduleWork([&RemoteStore, &ChunkStore, &SaveAttachmentsLatch, @@ -1111,14 +1246,28 @@ SaveOplog(CidStore& ChunkStore, SaveAttachmentsLatch.CountDown(); while (!SaveAttachmentsLatch.Wait(1000)) { - ZEN_INFO("Saving attachments, {} remaining...", SaveAttachmentsLatch.Remaining()); + ptrdiff_t Remaining = SaveAttachmentsLatch.Remaining(); + if (OptionalContext && OptionalContext->CancelFlag) + { + if (!RemoteResult.IsError()) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); + } + } + ReportProgress(OptionalContext, + fmt::format("Saving attachments, {} remaining...", Remaining), + AttachmentsToSave, + Remaining); + } + if (AttachmentsToSave > 0) + { + ReportProgress(OptionalContext, fmt::format("Saving attachments, {} remaining...", 0), AttachmentsToSave, 0); } - SaveAttachmentsLatch.Wait(); } if (!RemoteResult.IsError()) { - ZEN_INFO("Finalizing oplog container..."); + ReportMessage(OptionalContext, "Finalizing oplog container..."); RemoteProjectStore::Result ContainerFinalizeResult = RemoteStore.FinalizeContainer(ContainerSaveResult.RawHash); if (ContainerFinalizeResult.ErrorCode) { @@ -1145,13 +1294,15 @@ SaveOplogContainer(ProjectStore::Oplog& Oplog, const CbObject& ContainerObject, const std::function<bool(const IoHash& RawHash)>& HasAttachment, const std::function<void(const IoHash& BlockHash, std::vector<IoHash>&& Chunks)>& OnNeedBlock, - const std::function<void(const IoHash& RawHash)>& OnNeedAttachment) + const std::function<void(const IoHash& RawHash)>& OnNeedAttachment, + JobContext* OptionalContext) { using namespace std::literals; Stopwatch Timer; - CbArrayView LargeChunksArray = ContainerObject["chunks"sv].AsArrayView(); + size_t NeedAttachmentCount = 0; + CbArrayView LargeChunksArray = ContainerObject["chunks"sv].AsArrayView(); for (CbFieldView LargeChunksField : LargeChunksArray) { IoHash AttachmentHash = LargeChunksField.AsBinaryAttachment(); @@ -1161,8 +1312,10 @@ SaveOplogContainer(ProjectStore::Oplog& Oplog, } OnNeedAttachment(AttachmentHash); }; + ReportMessage(OptionalContext, fmt::format("Requesting {} of {} large attachements", NeedAttachmentCount, LargeChunksArray.Num())); - CbArrayView BlocksArray = ContainerObject["blocks"sv].AsArrayView(); + size_t NeedBlockCount = 0; + CbArrayView BlocksArray = ContainerObject["blocks"sv].AsArrayView(); for (CbFieldView BlockField : BlocksArray) { CbObjectView BlockView = BlockField.AsObjectView(); @@ -1202,6 +1355,7 @@ SaveOplogContainer(ProjectStore::Oplog& Oplog, break; } }; + ReportMessage(OptionalContext, fmt::format("Requesting {} of {} attachment blocks", NeedBlockCount, BlocksArray.Num())); MemoryView OpsSection = ContainerObject["ops"sv].AsBinaryView(); IoBuffer OpsBuffer(IoBuffer::Wrap, OpsSection.GetData(), OpsSection.GetSize()); @@ -1218,6 +1372,7 @@ SaveOplogContainer(ProjectStore::Oplog& Oplog, } CbArrayView OpsArray = SectionObject["ops"sv].AsArrayView(); + ReportMessage(OptionalContext, fmt::format("Writing {} ops to oplog", OpsArray.Num())); for (CbFieldView OpEntry : OpsArray) { CbObjectView Core = OpEntry.AsObjectView(); @@ -1240,7 +1395,11 @@ SaveOplogContainer(ProjectStore::Oplog& Oplog, } RemoteProjectStore::Result -LoadOplog(CidStore& ChunkStore, RemoteProjectStore& RemoteStore, ProjectStore::Oplog& Oplog, bool ForceDownload) +LoadOplog(CidStore& ChunkStore, + RemoteProjectStore& RemoteStore, + ProjectStore::Oplog& Oplog, + bool ForceDownload, + JobContext* OptionalContext) { using namespace std::literals; @@ -1263,20 +1422,23 @@ LoadOplog(CidStore& ChunkStore, RemoteProjectStore& RemoteStore, ProjectStore::O .Reason = LoadContainerResult.Reason, .Text = LoadContainerResult.Text}; } - ZEN_DEBUG("Loaded container in {}", NiceTimeSpanMs(static_cast<uint64_t>(LoadContainerResult.ElapsedSeconds * 1000))); + ReportMessage(OptionalContext, + fmt::format("Loaded container in {}", NiceTimeSpanMs(static_cast<uint64_t>(LoadContainerResult.ElapsedSeconds * 1000)))); - AsyncRemoteResult RemoteResult; - Latch AttachmentsWorkLatch(1); + AsyncRemoteResult RemoteResult; + Latch AttachmentsWorkLatch(1); + std::atomic_size_t AttachmentCount = 0; auto HasAttachment = [&ChunkStore, ForceDownload](const IoHash& RawHash) { return !ForceDownload && ChunkStore.ContainsChunk(RawHash); }; - auto OnNeedBlock = [&RemoteStore, &ChunkStore, &WorkerPool, &ChunksInBlocks, &AttachmentsWorkLatch, &RemoteResult]( + auto OnNeedBlock = [&RemoteStore, &ChunkStore, &WorkerPool, &ChunksInBlocks, &AttachmentsWorkLatch, &AttachmentCount, &RemoteResult]( const IoHash& BlockHash, std::vector<IoHash>&& Chunks) { if (BlockHash == IoHash::Zero) { AttachmentsWorkLatch.AddCount(1); + AttachmentCount.fetch_add(1); WorkerPool.ScheduleWork([&RemoteStore, &ChunkStore, &AttachmentsWorkLatch, &RemoteResult, Chunks = std::move(Chunks)]() { auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); }); if (RemoteResult.IsError()) @@ -1305,6 +1467,7 @@ LoadOplog(CidStore& ChunkStore, RemoteProjectStore& RemoteStore, ProjectStore::O return; } AttachmentsWorkLatch.AddCount(1); + AttachmentCount.fetch_add(1); WorkerPool.ScheduleWork([&AttachmentsWorkLatch, &ChunkStore, &RemoteStore, BlockHash, &RemoteResult]() { auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); }); if (RemoteResult.IsError()) @@ -1339,57 +1502,70 @@ LoadOplog(CidStore& ChunkStore, RemoteProjectStore& RemoteStore, ProjectStore::O }); }; - auto OnNeedAttachment = - [&RemoteStore, &ChunkStore, &WorkerPool, &AttachmentsWorkLatch, &RemoteResult, &Attachments](const IoHash& RawHash) { - if (!Attachments.insert(RawHash).second) + auto OnNeedAttachment = [&RemoteStore, &ChunkStore, &WorkerPool, &AttachmentsWorkLatch, &RemoteResult, &Attachments, &AttachmentCount]( + const IoHash& RawHash) { + if (!Attachments.insert(RawHash).second) + { + return; + } + + AttachmentsWorkLatch.AddCount(1); + AttachmentCount.fetch_add(1); + WorkerPool.ScheduleWork([&RemoteStore, &ChunkStore, &RemoteResult, &AttachmentsWorkLatch, RawHash]() { + auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); }); + if (RemoteResult.IsError()) { return; } - - AttachmentsWorkLatch.AddCount(1); - WorkerPool.ScheduleWork([&RemoteStore, &ChunkStore, &RemoteResult, &AttachmentsWorkLatch, RawHash]() { - auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); }); - if (RemoteResult.IsError()) - { - return; - } - RemoteProjectStore::LoadAttachmentResult AttachmentResult = RemoteStore.LoadAttachment(RawHash); - if (AttachmentResult.ErrorCode) - { - RemoteResult.SetError(AttachmentResult.ErrorCode, AttachmentResult.Reason, AttachmentResult.Text); - ZEN_ERROR("Failed to download attachment {}, reason: '{}', error code: {}", - RawHash, - AttachmentResult.Reason, - AttachmentResult.ErrorCode); - return; - } - ZEN_DEBUG("Loaded attachment in {}", NiceTimeSpanMs(static_cast<uint64_t>(AttachmentResult.ElapsedSeconds * 1000))); - ChunkStore.AddChunk(AttachmentResult.Bytes, RawHash); - }); - }; + RemoteProjectStore::LoadAttachmentResult AttachmentResult = RemoteStore.LoadAttachment(RawHash); + if (AttachmentResult.ErrorCode) + { + RemoteResult.SetError(AttachmentResult.ErrorCode, AttachmentResult.Reason, AttachmentResult.Text); + ZEN_ERROR("Failed to download attachment {}, reason: '{}', error code: {}", + RawHash, + AttachmentResult.Reason, + AttachmentResult.ErrorCode); + return; + } + ZEN_DEBUG("Loaded attachment in {}", NiceTimeSpanMs(static_cast<uint64_t>(AttachmentResult.ElapsedSeconds * 1000))); + ChunkStore.AddChunk(AttachmentResult.Bytes, RawHash); + }); + }; RemoteProjectStore::Result Result = - SaveOplogContainer(Oplog, LoadContainerResult.ContainerObject, HasAttachment, OnNeedBlock, OnNeedAttachment); + SaveOplogContainer(Oplog, LoadContainerResult.ContainerObject, HasAttachment, OnNeedBlock, OnNeedAttachment, OptionalContext); if (!Attachments.empty()) { - ZEN_INFO("Found {} attachments to download", Attachments.size()); + ReportMessage(OptionalContext, fmt::format("Found {} attachments to download", Attachments.size())); } AttachmentsWorkLatch.CountDown(); while (!AttachmentsWorkLatch.Wait(1000)) { - ZEN_INFO("Loading attachments, {} remaining...", AttachmentsWorkLatch.Remaining()); + ptrdiff_t Remaining = AttachmentsWorkLatch.Remaining(); + if (OptionalContext && OptionalContext->CancelFlag) + { + if (!RemoteResult.IsError()) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); + } + } + ReportProgress(OptionalContext, fmt::format("Loading attachments, {} remaining...", Remaining), AttachmentCount.load(), Remaining); + } + if (AttachmentCount.load() > 0) + { + ReportProgress(OptionalContext, fmt::format("Loading attachments, {} remaining...", 0), AttachmentCount.load(), 0); } - AttachmentsWorkLatch.Wait(); if (Result.ErrorCode == 0) { Result = RemoteResult.ConvertResult(); } Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; - ZEN_INFO("Loaded oplog {} in {}", - RemoteResult.GetError() == 0 ? "SUCCESS" : "FAILURE", - NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0))); + ReportMessage(OptionalContext, + fmt::format("Loaded oplog {} in {}", + RemoteResult.GetError() == 0 ? "SUCCESS" : "FAILURE", + NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0)))); return Result; } diff --git a/src/zenserver/projectstore/remoteprojectstore.h b/src/zenserver/projectstore/remoteprojectstore.h index 3134fdb4a..501a5eeec 100644 --- a/src/zenserver/projectstore/remoteprojectstore.h +++ b/src/zenserver/projectstore/remoteprojectstore.h @@ -2,6 +2,7 @@ #pragma once +#include <zencore/jobqueue.h> #include "projectstore.h" #include <unordered_set> @@ -93,11 +94,14 @@ RemoteProjectStore::LoadContainerResult BuildContainer( tsl::robin_map<IoHash, IoBuffer, IoHash::Hasher>* OutOptionalTempAttachments); // Set OutOptionalTempAttachments to nullptr to avoid embedding loose "additional files" +struct JobContext; + RemoteProjectStore::Result SaveOplogContainer(ProjectStore::Oplog& Oplog, const CbObject& ContainerObject, const std::function<bool(const IoHash& RawHash)>& HasAttachment, const std::function<void(const IoHash& BlockHash, std::vector<IoHash>&& Chunks)>& OnNeedBlock, - const std::function<void(const IoHash& RawHash)>& OnNeedAttachment); + const std::function<void(const IoHash& RawHash)>& OnNeedAttachment, + JobContext* OptionalContext); RemoteProjectStore::Result SaveOplog(CidStore& ChunkStore, RemoteProjectStore& RemoteStore, @@ -108,9 +112,14 @@ RemoteProjectStore::Result SaveOplog(CidStore& ChunkStore, bool EmbedLooseFiles, bool BuildBlocks, bool UseTempBlocks, - bool ForceUpload); - -RemoteProjectStore::Result LoadOplog(CidStore& ChunkStore, RemoteProjectStore& RemoteStore, ProjectStore::Oplog& Oplog, bool ForceDownload); + bool ForceUpload, + JobContext* OptionalContext); + +RemoteProjectStore::Result LoadOplog(CidStore& ChunkStore, + RemoteProjectStore& RemoteStore, + ProjectStore::Oplog& Oplog, + bool ForceDownload, + JobContext* OptionalContext); CompressedBuffer GenerateBlock(std::vector<SharedBuffer>&& Chunks); bool IterateBlock(IoBuffer&& CompressedBlock, std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor); diff --git a/src/zenserver/projectstore/zenremoteprojectstore.cpp b/src/zenserver/projectstore/zenremoteprojectstore.cpp index 72a7f00f8..c25fd2388 100644 --- a/src/zenserver/projectstore/zenremoteprojectstore.cpp +++ b/src/zenserver/projectstore/zenremoteprojectstore.cpp @@ -376,7 +376,7 @@ private: const size_t m_MaxChunkEmbedSize; }; -std::unique_ptr<RemoteProjectStore> +std::shared_ptr<RemoteProjectStore> CreateZenRemoteStore(const ZenRemoteStoreOptions& Options) { std::string Url = Options.Url; @@ -385,8 +385,8 @@ CreateZenRemoteStore(const ZenRemoteStoreOptions& Options) // Assume http URL Url = fmt::format("http://{}"sv, Url); } - std::unique_ptr<RemoteProjectStore> RemoteStore = - std::make_unique<ZenRemoteStore>(Url, Options.ProjectId, Options.OplogId, Options.MaxBlockSize, Options.MaxChunkEmbedSize); + std::shared_ptr<RemoteProjectStore> RemoteStore = + std::make_shared<ZenRemoteStore>(Url, Options.ProjectId, Options.OplogId, Options.MaxBlockSize, Options.MaxChunkEmbedSize); return RemoteStore; } diff --git a/src/zenserver/projectstore/zenremoteprojectstore.h b/src/zenserver/projectstore/zenremoteprojectstore.h index ef9dcad8c..9f079ee74 100644 --- a/src/zenserver/projectstore/zenremoteprojectstore.h +++ b/src/zenserver/projectstore/zenremoteprojectstore.h @@ -13,6 +13,6 @@ struct ZenRemoteStoreOptions : RemoteStoreOptions std::string OplogId; }; -std::unique_ptr<RemoteProjectStore> CreateZenRemoteStore(const ZenRemoteStoreOptions& Options); +std::shared_ptr<RemoteProjectStore> CreateZenRemoteStore(const ZenRemoteStoreOptions& Options); } // namespace zen diff --git a/src/zenserver/zenserver.cpp b/src/zenserver/zenserver.cpp index cfc4a228b..8056b6506 100644 --- a/src/zenserver/zenserver.cpp +++ b/src/zenserver/zenserver.cpp @@ -6,6 +6,7 @@ #include <zencore/filesystem.h> #include <zencore/fmtutils.h> #include <zencore/iobuffer.h> +#include <zencore/jobqueue.h> #include <zencore/logging.h> #include <zencore/refcount.h> #include <zencore/scopeguard.h> @@ -271,6 +272,8 @@ public: InitializeState(ServerOptions); + m_JobQueue = MakeJobQueue(8, "backgroundjobs"); + m_HealthService.SetHealthInfo({.DataRoot = m_DataRoot, .AbsLogPath = ServerOptions.AbsLogFile, .HttpServerClass = std::string(ServerOptions.HttpServerClass), @@ -341,7 +344,7 @@ public: ZEN_INFO("instantiating project service"); - m_ProjectStore = new zen::ProjectStore(*m_CidStore, m_DataRoot / "projects", m_GcManager); + m_ProjectStore = new zen::ProjectStore(*m_CidStore, m_DataRoot / "projects", m_GcManager, *m_JobQueue); m_HttpProjectService.reset(new zen::HttpProjectService{*m_CidStore, m_ProjectStore, m_StatsService, *m_AuthMgr}); #if ZEN_WITH_COMPUTE_SERVICES @@ -365,7 +368,6 @@ public: } m_Http->RegisterService(m_TestService); // NOTE: this is intentionally not limited to test mode as it's useful for diagnostics - m_Http->RegisterService(m_AdminService); #if ZEN_WITH_TESTS m_Http->RegisterService(m_TestingService); @@ -431,6 +433,10 @@ public: .MinimumFreeDiskSpaceToAllowWrites = ServerOptions.GcConfig.MinimumFreeDiskSpaceToAllowWrites}; m_GcScheduler.Initialize(GcConfig); + // Create and register admin interface last to make sure all is properly initialized + m_AdminService = std::make_unique<HttpAdminService>(m_GcScheduler, *m_JobQueue); + m_Http->RegisterService(*m_AdminService); + return EffectiveBasePort; } @@ -498,6 +504,11 @@ public: } Flush(); + + if (m_JobQueue) + { + m_JobQueue->Stop(); + } } void RequestExit(int ExitCode) @@ -733,13 +744,14 @@ private: std::unique_ptr<zen::UpstreamCache> m_UpstreamCache; std::unique_ptr<zen::HttpUpstreamService> m_UpstreamService; std::unique_ptr<zen::HttpStructuredCacheService> m_StructuredCacheService; - zen::HttpAdminService m_AdminService{m_GcScheduler}; zen::HttpHealthService m_HealthService; #if ZEN_WITH_COMPUTE_SERVICES std::unique_ptr<zen::HttpFunctionService> m_HttpFunctionService; #endif // ZEN_WITH_COMPUTE_SERVICES std::unique_ptr<zen::HttpFrontendService> m_FrontendService; std::unique_ptr<zen::HttpObjectStoreService> m_ObjStoreService; + std::unique_ptr<JobQueue> m_JobQueue; + std::unique_ptr<zen::HttpAdminService> m_AdminService; bool m_DebugOptionForcedCrash = false; bool m_UseSentry = false; |