aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2024-01-22 13:21:55 +0100
committerGitHub <[email protected]>2024-01-22 13:21:55 +0100
commitcccb2d71885d1211b3eb2fc9531826457846b014 (patch)
tree9424b687d76bf937304ce14aaeef9308a31e02c2 /src
parentmake sure to advance read buffer pointer in BasicFileWriter::Write (#633) (diff)
downloadzen-cccb2d71885d1211b3eb2fc9531826457846b014.tar.xz
zen-cccb2d71885d1211b3eb2fc9531826457846b014.zip
jobqueue - allow multiple threads to report progress/messages (#635)
jobqueue - add AbortReason and properly propagate error when running async command
Diffstat (limited to 'src')
-rw-r--r--src/zen/cmds/projectstore_cmd.cpp36
-rw-r--r--src/zencore/include/zencore/jobqueue.h1
-rw-r--r--src/zencore/jobqueue.cpp14
-rw-r--r--src/zenserver/admin/admin.cpp5
4 files changed, 37 insertions, 19 deletions
diff --git a/src/zen/cmds/projectstore_cmd.cpp b/src/zen/cmds/projectstore_cmd.cpp
index 5795b3190..723b45cda 100644
--- a/src/zen/cmds/projectstore_cmd.cpp
+++ b/src/zen/cmds/projectstore_cmd.cpp
@@ -84,7 +84,20 @@ namespace {
}
CbObject StatusObject = StatusResult.AsObject();
std::string_view Status = StatusObject["Status"sv].AsString();
- CbArrayView Messages = StatusObject["Messages"sv].AsArrayView();
+
+ if (Status == "Running")
+ {
+ std::string_view CurrentOp = StatusObject["CurrentOp"sv].AsString();
+ uint32_t CurrentOpPercentComplete = StatusObject["CurrentOpPercentComplete"sv].AsUInt32();
+ if (CurrentOp != LastCurrentOp || CurrentOpPercentComplete != LastCurrentOpPercentComplete)
+ {
+ LastCurrentOp = CurrentOp;
+ LastCurrentOpPercentComplete = CurrentOpPercentComplete;
+ ZEN_CONSOLE("{} {}%", CurrentOp, CurrentOpPercentComplete);
+ }
+ }
+
+ CbArrayView Messages = StatusObject["Messages"sv].AsArrayView();
for (auto M : Messages)
{
std::string_view Message = M.AsString();
@@ -106,7 +119,15 @@ namespace {
}
if (Status == "Aborted")
{
- Result.ThrowError("Aborted");
+ std::string_view AbortReason = StatusObject["AbortReason"].AsString();
+ if (!AbortReason.empty())
+ {
+ throw std::runtime_error(std::string(AbortReason));
+ }
+ else
+ {
+ throw std::runtime_error("Aborted");
+ }
break;
}
if (Status == "Queued")
@@ -114,17 +135,6 @@ namespace {
double QueueTimeS = StatusObject["QueueTimeS"].AsDouble();
ZEN_CONSOLE("Queued, waited {:.3} s...", QueueTimeS);
}
- if (Status == "Running")
- {
- std::string_view CurrentOp = StatusObject["CurrentOp"sv].AsString();
- uint32_t CurrentOpPercentComplete = StatusObject["CurrentOpPercentComplete"sv].AsUInt32();
- if (CurrentOp != LastCurrentOp || CurrentOpPercentComplete != LastCurrentOpPercentComplete)
- {
- LastCurrentOp = CurrentOp;
- LastCurrentOpPercentComplete = CurrentOpPercentComplete;
- ZEN_CONSOLE("{} {}%", CurrentOp, CurrentOpPercentComplete);
- }
- }
uint32_t AbortCounter = SignalCounter[SIGINT].load();
if (SignalCounter[SIGINT] > 0)
{
diff --git a/src/zencore/include/zencore/jobqueue.h b/src/zencore/include/zencore/jobqueue.h
index 91ca24b34..06143791c 100644
--- a/src/zencore/include/zencore/jobqueue.h
+++ b/src/zencore/include/zencore/jobqueue.h
@@ -50,6 +50,7 @@ public:
std::string CurrentOp;
uint32_t CurrentOpPercentComplete = 0;
std::vector<std::string> Messages;
+ std::string AbortReason;
};
struct JobInfo
diff --git a/src/zencore/jobqueue.cpp b/src/zencore/jobqueue.cpp
index 4bcc5c885..e92817431 100644
--- a/src/zencore/jobqueue.cpp
+++ b/src/zencore/jobqueue.cpp
@@ -258,7 +258,8 @@ public:
.Status = Status,
.State = {.CurrentOp = Job.State.CurrentOp,
.CurrentOpPercentComplete = Job.State.CurrentOpPercentComplete,
- .Messages = std::move(Job.State.Messages)},
+ .Messages = std::move(Job.State.Messages),
+ .AbortReason = Job.State.AbortReason},
.CreateTime = JobClock::TimePointFromTick(Job.CreateTick),
.StartTime = JobClock::TimePointFromTick(Job.StartTick),
.EndTime = JobClock::TimePointFromTick(Job.EndTick),
@@ -296,7 +297,7 @@ public:
void ReportMessage(JobId Id, std::string_view Message)
{
- QueueLock.WithSharedLock([&]() {
+ QueueLock.WithExclusiveLock([&]() {
auto It = RunningJobs.find(Id.Id);
ZEN_ASSERT(It != RunningJobs.end());
It->second->State.Messages.push_back(std::string(Message));
@@ -305,7 +306,7 @@ public:
void ReportProgress(JobId Id, std::string_view CurrentOp, uint32_t CurrentOpPercentComplete)
{
- QueueLock.WithSharedLock([&]() {
+ QueueLock.WithExclusiveLock([&]() {
auto It = RunningJobs.find(Id.Id);
ZEN_ASSERT(It != RunningJobs.end());
It->second->State.CurrentOp = CurrentOp;
@@ -363,8 +364,9 @@ public:
ZEN_DEBUG("Background job {}:'{}' aborted. Reason: '{}'", CurrentJob->Id.Id, CurrentJob->Name, Ex.what());
QueueLock.WithExclusiveLock([&]() {
CurrentJob->State.Messages.push_back(Ex.what());
- CurrentJob->EndTick = JobClock::Now();
- CurrentJob->WorkerThreadId = 0;
+ CurrentJob->State.AbortReason = Ex.what();
+ CurrentJob->EndTick = JobClock::Now();
+ CurrentJob->WorkerThreadId = 0;
RunningJobs.erase(CurrentJob->Id.Id);
AbortedJobs.insert_or_assign(CurrentJob->Id.Id, std::move(CurrentJob));
});
@@ -503,7 +505,7 @@ TEST_CASE("JobQueue")
RemainingJobs.push_back(Id);
break;
case JobQueue::Status::Aborted:
- ZEN_DEBUG("{} aborted. Reason: '{}'", Id.Id, Join(CurrentState->State.Messages, " "sv));
+ ZEN_DEBUG("{} aborted. Reason: '{}'", Id.Id, CurrentState->State.AbortReason);
break;
case JobQueue::Status::Completed:
ZEN_DEBUG("{} completed. '{}'", Id.Id, Join(CurrentState->State.Messages, " "sv));
diff --git a/src/zenserver/admin/admin.cpp b/src/zenserver/admin/admin.cpp
index 8b3f5a785..8093a0735 100644
--- a/src/zenserver/admin/admin.cpp
+++ b/src/zenserver/admin/admin.cpp
@@ -171,6 +171,10 @@ HttpAdminService::HttpAdminService(GcScheduler& Scheduler,
}
Obj.EndArray();
}
+ if (!State.AbortReason.empty())
+ {
+ Obj.AddString("AbortReason"sv, State.AbortReason);
+ }
};
auto GetAgeAsSeconds = [](std::chrono::system_clock::time_point Start, std::chrono::system_clock::time_point End) {
@@ -210,6 +214,7 @@ HttpAdminService::HttpAdminService(GcScheduler& Scheduler,
CbObjectWriter Obj;
Obj.AddString("Name"sv, CurrentState->Name);
Obj.AddString("Status"sv, "Aborted"sv);
+
WriteState(Obj, CurrentState->State);
Obj.AddFloat("QueueTimeS", GetAgeAsSeconds(CurrentState->CreateTime, CurrentState->StartTime));
Obj.AddFloat("RunTimeS", GetAgeAsSeconds(CurrentState->StartTime, CurrentState->EndTime));