aboutsummaryrefslogtreecommitdiff
path: root/src/zen/cmds/projectstore.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2023-09-13 16:13:30 -0400
committerGitHub <[email protected]>2023-09-13 22:13:30 +0200
commitb2cef5900b6e251bed4bc0a02161fd90646d37f0 (patch)
treee9085a92e9499bca55dfda9b63779be94218409f /src/zen/cmds/projectstore.cpp
parentscan oplog object for fields (#397) (diff)
downloadarchived-zen-b2cef5900b6e251bed4bc0a02161fd90646d37f0.tar.xz
archived-zen-b2cef5900b6e251bed4bc0a02161fd90646d37f0.zip
job queue and async oplog-import/export (#395)
- Feature: New http endpoint for background jobs `/admin/jobs/status` which will return a response listing the currently active background jobs and their status - Feature: New http endpoint for background jobs information `/admin/jobs/status/{jobid}` which will return a response detailing status, pending messages and progress status - GET will return a response detailing status, pending messages and progress status - DELETE will mark the job for cancelling and return without waiting for completion - If status returned is "Complete" or "Aborted" the jobid will be removed from the server and can not be queried again - Feature: New zen command `jobs` to list, get info about and cancel background jobs - If no options are given it will display a list of active background jobs - `--jobid` accepts an id (returned from for example `oplog-export` with `--async`) and will return a response detailing status, pending messages and progress status for that job - `--cancel` can be added when `--jobid` is given which will request zenserver to cancel the background job - Feature: oplog import and export http rpc requests are now async operations that will run in the background - Feature: `oplog-export` and `oplog-import` now reports progress to the console as work progress by default - Feature: `oplog-export` and `oplog-import` can now be cancelled using Ctrl+C - Feature: `oplog-export` and `oplog-import` has a new option `--async` which will only trigger the work and report a background job id back
Diffstat (limited to 'src/zen/cmds/projectstore.cpp')
-rw-r--r--src/zen/cmds/projectstore.cpp156
1 files changed, 146 insertions, 10 deletions
diff --git a/src/zen/cmds/projectstore.cpp b/src/zen/cmds/projectstore.cpp
index c3bfeaa4f..edeff7d85 100644
--- a/src/zen/cmds/projectstore.cpp
+++ b/src/zen/cmds/projectstore.cpp
@@ -17,6 +17,8 @@ ZEN_THIRD_PARTY_INCLUDES_START
#include <cpr/cpr.h>
ZEN_THIRD_PARTY_INCLUDES_END
+#include <signal.h>
+
namespace zen {
namespace {
@@ -42,6 +44,116 @@ namespace {
return Payload;
};
+ static std::atomic_uint32_t SignalCounter[NSIG] = {0};
+
+ static void SignalCallbackHandler(int SigNum)
+ {
+ if (SigNum >= 0 && SigNum < NSIG)
+ {
+ SignalCounter[SigNum].fetch_add(1);
+ }
+ }
+
+ void AsyncPost(HttpClient& Http, std::string_view Url, IoBuffer&& Payload)
+ {
+ if (HttpClient::Response Result = Http.Post(Url, Payload))
+ {
+ if (Result.StatusCode == HttpResponseCode::Accepted)
+ {
+ signal(SIGINT, SignalCallbackHandler);
+ bool Cancelled = false;
+
+ std::string_view JobIdText = Result.AsText();
+ std::optional<uint64_t> JobIdMaybe = ParseInt<uint64_t>(JobIdText);
+ if (!JobIdMaybe)
+ {
+ Result.ThrowError("invalid job id"sv);
+ }
+
+ std::string LastCurrentOp;
+ uint32_t LastCurrentOpPercentComplete = 0;
+
+ uint64_t JobId = JobIdMaybe.value();
+ while (true)
+ {
+ HttpClient::Response StatusResult =
+ Http.Get(fmt::format("/admin/jobs/{}", JobId), HttpClient::Accept(ZenContentType::kCbObject));
+ if (!StatusResult)
+ {
+ StatusResult.ThrowError("failed to create project"sv);
+ }
+ CbObject StatusObject = StatusResult.AsObject();
+ std::string_view Status = StatusObject["Status"sv].AsString();
+ CbArrayView Messages = StatusObject["Messages"sv].AsArrayView();
+ for (auto M : Messages)
+ {
+ std::string_view Message = M.AsString();
+ ZEN_CONSOLE("{}", Message);
+ }
+ if (Status == "Complete")
+ {
+ if (Cancelled)
+ {
+ ZEN_CONSOLE("Cancelled");
+ }
+ else
+ {
+ double QueueTimeS = StatusObject["QueueTimeS"].AsDouble();
+ double RuntimeS = StatusObject["RunTimeS"].AsDouble();
+ ZEN_CONSOLE("Completed: QueueTime: {:.3} s, RunTime: {:.3} s", QueueTimeS, RuntimeS);
+ }
+ break;
+ }
+ if (Status == "Aborted")
+ {
+ Result.ThrowError("Aborted");
+ break;
+ }
+ if (Status == "Queued")
+ {
+ double QueueTimeS = StatusObject["QueueTimeS"].AsDouble();
+ ZEN_CONSOLE("Queued, waited {:.3} s...", QueueTimeS);
+ }
+ if (Status == "Running")
+ {
+ std::string_view CurrentOp = StatusObject["CurrentOp"sv].AsString();
+ uint32_t CurrentOpPercentComplete = StatusObject["CurrentOpPercentComplete"sv].AsUInt32();
+ if (CurrentOp != LastCurrentOp || CurrentOpPercentComplete != LastCurrentOpPercentComplete)
+ {
+ LastCurrentOp = CurrentOp;
+ LastCurrentOpPercentComplete = CurrentOpPercentComplete;
+ ZEN_CONSOLE("{} {}%", CurrentOp, CurrentOpPercentComplete);
+ }
+ }
+ uint32_t AbortCounter = SignalCounter[SIGINT].load();
+ if (SignalCounter[SIGINT] > 0)
+ {
+ SignalCounter[SIGINT].fetch_sub(AbortCounter);
+ if (HttpClient::Response DeleteResult = Http.Delete(fmt::format("/admin/jobs/{}", JobId)))
+ {
+ ZEN_CONSOLE("Requested cancel...");
+ Cancelled = true;
+ }
+ else
+ {
+ ZEN_CONSOLE("Failed cancelling job {}", DeleteResult);
+ }
+ continue;
+ }
+ Sleep(100);
+ }
+ }
+ else
+ {
+ ZEN_CONSOLE("{}", Result);
+ }
+ }
+ else
+ {
+ Result.ThrowError("failed to start operation"sv);
+ }
+ }
+
} // namespace
///////////////////////////////////////
@@ -481,6 +593,7 @@ ExportOplogCommand::ExportOplogCommand()
"Disable block creation and save all attachments individually (applies to file and cloud target)",
cxxopts::value(m_DisableBlocks),
"<disable>");
+ m_Options.add_option("", "a", "async", "Trigger export but don't wait for completion", cxxopts::value(m_Async), "<async>");
m_Options.add_option("", "", "cloud", "Cloud Storage URL", cxxopts::value(m_CloudUrl), "<url>");
m_Options.add_option("cloud", "", "namespace", "Cloud Storage namespace", cxxopts::value(m_CloudNamespace), "<namespace>");
@@ -693,6 +806,7 @@ ExportOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg
{
Writer.AddBool("force"sv, true);
}
+ Writer.AddBool("async"sv, true);
if (!m_FileDirectoryPath.empty())
{
Writer.BeginObject("file"sv);
@@ -793,16 +907,26 @@ ExportOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg
ZEN_CONSOLE("Saving oplog '{}/{}' from '{}' to {}", m_ProjectName, m_OplogName, m_HostName, TargetDescription);
HttpClient Http(m_HostName);
- if (HttpClient::Response Result = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", m_ProjectName, m_OplogName), Payload))
+ if (m_Async)
{
- ZEN_CONSOLE("{}", Result);
- return 0;
+ if (HttpClient::Response Result = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", m_ProjectName, m_OplogName),
+ std::move(Payload),
+ HttpClient::Accept(ZenContentType::kJSON));
+ Result)
+ {
+ ZEN_CONSOLE("{}", Result.AsText());
+ }
+ else
+ {
+ Result.ThrowError("failed requesting loading oplog export"sv);
+ return 1;
+ }
}
else
{
- Result.ThrowError("failed to create project"sv);
- return 1;
+ AsyncPost(Http, fmt::format("/prj/{}/oplog/{}/rpc", m_ProjectName, m_OplogName), std::move(Payload));
}
+ return 0;
}
////////////////////////////
@@ -821,6 +945,7 @@ ImportOplogCommand::ImportOplogCommand()
cxxopts::value(m_MaxChunkEmbedSize),
"<chunksize>");
m_Options.add_option("", "f", "force", "Force import of all attachments", cxxopts::value(m_Force), "<force>");
+ m_Options.add_option("", "a", "async", "Trigger import but don't wait for completion", cxxopts::value(m_Async), "<async>");
m_Options.add_option("", "", "cloud", "Cloud Storage URL", cxxopts::value(m_CloudUrl), "<url>");
m_Options.add_option("cloud", "", "namespace", "Cloud Storage namespace", cxxopts::value(m_CloudNamespace), "<namespace>");
@@ -1050,16 +1175,27 @@ ImportOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg
});
ZEN_CONSOLE("Loading oplog '{}/{}' from '{}' to {}", m_ProjectName, m_OplogName, SourceDescription, m_HostName);
- if (HttpClient::Response Result = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", m_ProjectName, m_OplogName), Payload))
+
+ if (m_Async)
{
- ZEN_CONSOLE("{}", Result);
- return 0;
+ if (HttpClient::Response Result = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", m_ProjectName, m_OplogName),
+ std::move(Payload),
+ HttpClient::Accept(ZenContentType::kJSON));
+ Result)
+ {
+ ZEN_CONSOLE("{}", Result.AsText());
+ }
+ else
+ {
+ Result.ThrowError("failed requesting loading oplog import"sv);
+ return 1;
+ }
}
else
{
- Result.ThrowError("failed to create project"sv);
- return 1;
+ AsyncPost(Http, fmt::format("/prj/{}/oplog/{}/rpc", m_ProjectName, m_OplogName), std::move(Payload));
}
+ return 0;
}
////////////////////////////