diff options
| author | Dan Engelbrecht <[email protected]> | 2023-09-13 16:13:30 -0400 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-09-13 22:13:30 +0200 |
| commit | b2cef5900b6e251bed4bc0a02161fd90646d37f0 (patch) | |
| tree | e9085a92e9499bca55dfda9b63779be94218409f /src/zen/cmds/projectstore.cpp | |
| parent | scan oplog object for fields (#397) (diff) | |
| download | archived-zen-b2cef5900b6e251bed4bc0a02161fd90646d37f0.tar.xz archived-zen-b2cef5900b6e251bed4bc0a02161fd90646d37f0.zip | |
job queue and async oplog-import/export (#395)
- Feature: New http endpoint for background jobs `/admin/jobs/status` which will return a response listing the currently active background jobs and their status
- Feature: New http endpoint for background jobs information `/admin/jobs/status/{jobid}` which will return a response detailing status, pending messages and progress status
- GET will return a response detailing status, pending messages and progress status
- DELETE will mark the job for cancelling and return without waiting for completion
- If status returned is "Complete" or "Aborted" the jobid will be removed from the server and can not be queried again
- Feature: New zen command `jobs` to list, get info about and cancel background jobs
- If no options are given it will display a list of active background jobs
- `--jobid` accepts an id (returned from for example `oplog-export` with `--async`) and will return a response detailing status, pending messages and progress status for that job
- `--cancel` can be added when `--jobid` is given which will request zenserver to cancel the background job
- Feature: oplog import and export http rpc requests are now async operations that will run in the background
- Feature: `oplog-export` and `oplog-import` now reports progress to the console as work progress by default
- Feature: `oplog-export` and `oplog-import` can now be cancelled using Ctrl+C
- Feature: `oplog-export` and `oplog-import` has a new option `--async` which will only trigger the work and report a background job id back
Diffstat (limited to 'src/zen/cmds/projectstore.cpp')
| -rw-r--r-- | src/zen/cmds/projectstore.cpp | 156 |
1 files changed, 146 insertions, 10 deletions
diff --git a/src/zen/cmds/projectstore.cpp b/src/zen/cmds/projectstore.cpp index c3bfeaa4f..edeff7d85 100644 --- a/src/zen/cmds/projectstore.cpp +++ b/src/zen/cmds/projectstore.cpp @@ -17,6 +17,8 @@ ZEN_THIRD_PARTY_INCLUDES_START #include <cpr/cpr.h> ZEN_THIRD_PARTY_INCLUDES_END +#include <signal.h> + namespace zen { namespace { @@ -42,6 +44,116 @@ namespace { return Payload; }; + static std::atomic_uint32_t SignalCounter[NSIG] = {0}; + + static void SignalCallbackHandler(int SigNum) + { + if (SigNum >= 0 && SigNum < NSIG) + { + SignalCounter[SigNum].fetch_add(1); + } + } + + void AsyncPost(HttpClient& Http, std::string_view Url, IoBuffer&& Payload) + { + if (HttpClient::Response Result = Http.Post(Url, Payload)) + { + if (Result.StatusCode == HttpResponseCode::Accepted) + { + signal(SIGINT, SignalCallbackHandler); + bool Cancelled = false; + + std::string_view JobIdText = Result.AsText(); + std::optional<uint64_t> JobIdMaybe = ParseInt<uint64_t>(JobIdText); + if (!JobIdMaybe) + { + Result.ThrowError("invalid job id"sv); + } + + std::string LastCurrentOp; + uint32_t LastCurrentOpPercentComplete = 0; + + uint64_t JobId = JobIdMaybe.value(); + while (true) + { + HttpClient::Response StatusResult = + Http.Get(fmt::format("/admin/jobs/{}", JobId), HttpClient::Accept(ZenContentType::kCbObject)); + if (!StatusResult) + { + StatusResult.ThrowError("failed to create project"sv); + } + CbObject StatusObject = StatusResult.AsObject(); + std::string_view Status = StatusObject["Status"sv].AsString(); + CbArrayView Messages = StatusObject["Messages"sv].AsArrayView(); + for (auto M : Messages) + { + std::string_view Message = M.AsString(); + ZEN_CONSOLE("{}", Message); + } + if (Status == "Complete") + { + if (Cancelled) + { + ZEN_CONSOLE("Cancelled"); + } + else + { + double QueueTimeS = StatusObject["QueueTimeS"].AsDouble(); + double RuntimeS = StatusObject["RunTimeS"].AsDouble(); + ZEN_CONSOLE("Completed: QueueTime: {:.3} s, RunTime: {:.3} s", QueueTimeS, RuntimeS); + } + break; + } + if (Status == "Aborted") + { + Result.ThrowError("Aborted"); + break; + } + if (Status == "Queued") + { + double QueueTimeS = StatusObject["QueueTimeS"].AsDouble(); + ZEN_CONSOLE("Queued, waited {:.3} s...", QueueTimeS); + } + if (Status == "Running") + { + std::string_view CurrentOp = StatusObject["CurrentOp"sv].AsString(); + uint32_t CurrentOpPercentComplete = StatusObject["CurrentOpPercentComplete"sv].AsUInt32(); + if (CurrentOp != LastCurrentOp || CurrentOpPercentComplete != LastCurrentOpPercentComplete) + { + LastCurrentOp = CurrentOp; + LastCurrentOpPercentComplete = CurrentOpPercentComplete; + ZEN_CONSOLE("{} {}%", CurrentOp, CurrentOpPercentComplete); + } + } + uint32_t AbortCounter = SignalCounter[SIGINT].load(); + if (SignalCounter[SIGINT] > 0) + { + SignalCounter[SIGINT].fetch_sub(AbortCounter); + if (HttpClient::Response DeleteResult = Http.Delete(fmt::format("/admin/jobs/{}", JobId))) + { + ZEN_CONSOLE("Requested cancel..."); + Cancelled = true; + } + else + { + ZEN_CONSOLE("Failed cancelling job {}", DeleteResult); + } + continue; + } + Sleep(100); + } + } + else + { + ZEN_CONSOLE("{}", Result); + } + } + else + { + Result.ThrowError("failed to start operation"sv); + } + } + } // namespace /////////////////////////////////////// @@ -481,6 +593,7 @@ ExportOplogCommand::ExportOplogCommand() "Disable block creation and save all attachments individually (applies to file and cloud target)", cxxopts::value(m_DisableBlocks), "<disable>"); + m_Options.add_option("", "a", "async", "Trigger export but don't wait for completion", cxxopts::value(m_Async), "<async>"); m_Options.add_option("", "", "cloud", "Cloud Storage URL", cxxopts::value(m_CloudUrl), "<url>"); m_Options.add_option("cloud", "", "namespace", "Cloud Storage namespace", cxxopts::value(m_CloudNamespace), "<namespace>"); @@ -693,6 +806,7 @@ ExportOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg { Writer.AddBool("force"sv, true); } + Writer.AddBool("async"sv, true); if (!m_FileDirectoryPath.empty()) { Writer.BeginObject("file"sv); @@ -793,16 +907,26 @@ ExportOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg ZEN_CONSOLE("Saving oplog '{}/{}' from '{}' to {}", m_ProjectName, m_OplogName, m_HostName, TargetDescription); HttpClient Http(m_HostName); - if (HttpClient::Response Result = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", m_ProjectName, m_OplogName), Payload)) + if (m_Async) { - ZEN_CONSOLE("{}", Result); - return 0; + if (HttpClient::Response Result = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", m_ProjectName, m_OplogName), + std::move(Payload), + HttpClient::Accept(ZenContentType::kJSON)); + Result) + { + ZEN_CONSOLE("{}", Result.AsText()); + } + else + { + Result.ThrowError("failed requesting loading oplog export"sv); + return 1; + } } else { - Result.ThrowError("failed to create project"sv); - return 1; + AsyncPost(Http, fmt::format("/prj/{}/oplog/{}/rpc", m_ProjectName, m_OplogName), std::move(Payload)); } + return 0; } //////////////////////////// @@ -821,6 +945,7 @@ ImportOplogCommand::ImportOplogCommand() cxxopts::value(m_MaxChunkEmbedSize), "<chunksize>"); m_Options.add_option("", "f", "force", "Force import of all attachments", cxxopts::value(m_Force), "<force>"); + m_Options.add_option("", "a", "async", "Trigger import but don't wait for completion", cxxopts::value(m_Async), "<async>"); m_Options.add_option("", "", "cloud", "Cloud Storage URL", cxxopts::value(m_CloudUrl), "<url>"); m_Options.add_option("cloud", "", "namespace", "Cloud Storage namespace", cxxopts::value(m_CloudNamespace), "<namespace>"); @@ -1050,16 +1175,27 @@ ImportOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg }); ZEN_CONSOLE("Loading oplog '{}/{}' from '{}' to {}", m_ProjectName, m_OplogName, SourceDescription, m_HostName); - if (HttpClient::Response Result = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", m_ProjectName, m_OplogName), Payload)) + + if (m_Async) { - ZEN_CONSOLE("{}", Result); - return 0; + if (HttpClient::Response Result = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", m_ProjectName, m_OplogName), + std::move(Payload), + HttpClient::Accept(ZenContentType::kJSON)); + Result) + { + ZEN_CONSOLE("{}", Result.AsText()); + } + else + { + Result.ThrowError("failed requesting loading oplog import"sv); + return 1; + } } else { - Result.ThrowError("failed to create project"sv); - return 1; + AsyncPost(Http, fmt::format("/prj/{}/oplog/{}/rpc", m_ProjectName, m_OplogName), std::move(Payload)); } + return 0; } //////////////////////////// |