aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2024-01-22 13:21:55 +0100
committerGitHub <[email protected]>2024-01-22 13:21:55 +0100
commitcccb2d71885d1211b3eb2fc9531826457846b014 (patch)
tree9424b687d76bf937304ce14aaeef9308a31e02c2
parentmake sure to advance read buffer pointer in BasicFileWriter::Write (#633) (diff)
downloadzen-cccb2d71885d1211b3eb2fc9531826457846b014.tar.xz
zen-cccb2d71885d1211b3eb2fc9531826457846b014.zip
jobqueue - allow multiple threads to report progress/messages (#635)
jobqueue - add AbortReason and properly propagate error when running async command
-rw-r--r--CHANGELOG.md2
-rw-r--r--src/zen/cmds/projectstore_cmd.cpp36
-rw-r--r--src/zencore/include/zencore/jobqueue.h1
-rw-r--r--src/zencore/jobqueue.cpp14
-rw-r--r--src/zenserver/admin/admin.cpp5
5 files changed, 39 insertions, 19 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md
index bab4ef63a..4e98a1bff 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -3,6 +3,8 @@
- Improvement: Separated cache RPC handling code from general structured cache HTTP code
- Bugfix: RPC recording would not release memory as early as intended which resulted in memory buildup during long recording sessions. Previously certain memory was only released when recording stopped, now it gets released immediately when a segment is complete and written to disk.
- Bugfix: File log format now contains dates again (PR #631)
+- Bugfix: Jobqueue - Allow multiple threads to report progress/messages (oplog import/export)
+- Bugfix: Jobqueue - Add AbortReason and properly propagate error when running async command (oplog import/export)
- Bugfix: Make sure to write the correct data in BasicFileWriter when writing items that are not a multiple of the buffer size
## 0.2.38
diff --git a/src/zen/cmds/projectstore_cmd.cpp b/src/zen/cmds/projectstore_cmd.cpp
index 5795b3190..723b45cda 100644
--- a/src/zen/cmds/projectstore_cmd.cpp
+++ b/src/zen/cmds/projectstore_cmd.cpp
@@ -84,7 +84,20 @@ namespace {
}
CbObject StatusObject = StatusResult.AsObject();
std::string_view Status = StatusObject["Status"sv].AsString();
- CbArrayView Messages = StatusObject["Messages"sv].AsArrayView();
+
+ if (Status == "Running")
+ {
+ std::string_view CurrentOp = StatusObject["CurrentOp"sv].AsString();
+ uint32_t CurrentOpPercentComplete = StatusObject["CurrentOpPercentComplete"sv].AsUInt32();
+ if (CurrentOp != LastCurrentOp || CurrentOpPercentComplete != LastCurrentOpPercentComplete)
+ {
+ LastCurrentOp = CurrentOp;
+ LastCurrentOpPercentComplete = CurrentOpPercentComplete;
+ ZEN_CONSOLE("{} {}%", CurrentOp, CurrentOpPercentComplete);
+ }
+ }
+
+ CbArrayView Messages = StatusObject["Messages"sv].AsArrayView();
for (auto M : Messages)
{
std::string_view Message = M.AsString();
@@ -106,7 +119,15 @@ namespace {
}
if (Status == "Aborted")
{
- Result.ThrowError("Aborted");
+ std::string_view AbortReason = StatusObject["AbortReason"].AsString();
+ if (!AbortReason.empty())
+ {
+ throw std::runtime_error(std::string(AbortReason));
+ }
+ else
+ {
+ throw std::runtime_error("Aborted");
+ }
break;
}
if (Status == "Queued")
@@ -114,17 +135,6 @@ namespace {
double QueueTimeS = StatusObject["QueueTimeS"].AsDouble();
ZEN_CONSOLE("Queued, waited {:.3} s...", QueueTimeS);
}
- if (Status == "Running")
- {
- std::string_view CurrentOp = StatusObject["CurrentOp"sv].AsString();
- uint32_t CurrentOpPercentComplete = StatusObject["CurrentOpPercentComplete"sv].AsUInt32();
- if (CurrentOp != LastCurrentOp || CurrentOpPercentComplete != LastCurrentOpPercentComplete)
- {
- LastCurrentOp = CurrentOp;
- LastCurrentOpPercentComplete = CurrentOpPercentComplete;
- ZEN_CONSOLE("{} {}%", CurrentOp, CurrentOpPercentComplete);
- }
- }
uint32_t AbortCounter = SignalCounter[SIGINT].load();
if (SignalCounter[SIGINT] > 0)
{
diff --git a/src/zencore/include/zencore/jobqueue.h b/src/zencore/include/zencore/jobqueue.h
index 91ca24b34..06143791c 100644
--- a/src/zencore/include/zencore/jobqueue.h
+++ b/src/zencore/include/zencore/jobqueue.h
@@ -50,6 +50,7 @@ public:
std::string CurrentOp;
uint32_t CurrentOpPercentComplete = 0;
std::vector<std::string> Messages;
+ std::string AbortReason;
};
struct JobInfo
diff --git a/src/zencore/jobqueue.cpp b/src/zencore/jobqueue.cpp
index 4bcc5c885..e92817431 100644
--- a/src/zencore/jobqueue.cpp
+++ b/src/zencore/jobqueue.cpp
@@ -258,7 +258,8 @@ public:
.Status = Status,
.State = {.CurrentOp = Job.State.CurrentOp,
.CurrentOpPercentComplete = Job.State.CurrentOpPercentComplete,
- .Messages = std::move(Job.State.Messages)},
+ .Messages = std::move(Job.State.Messages),
+ .AbortReason = Job.State.AbortReason},
.CreateTime = JobClock::TimePointFromTick(Job.CreateTick),
.StartTime = JobClock::TimePointFromTick(Job.StartTick),
.EndTime = JobClock::TimePointFromTick(Job.EndTick),
@@ -296,7 +297,7 @@ public:
void ReportMessage(JobId Id, std::string_view Message)
{
- QueueLock.WithSharedLock([&]() {
+ QueueLock.WithExclusiveLock([&]() {
auto It = RunningJobs.find(Id.Id);
ZEN_ASSERT(It != RunningJobs.end());
It->second->State.Messages.push_back(std::string(Message));
@@ -305,7 +306,7 @@ public:
void ReportProgress(JobId Id, std::string_view CurrentOp, uint32_t CurrentOpPercentComplete)
{
- QueueLock.WithSharedLock([&]() {
+ QueueLock.WithExclusiveLock([&]() {
auto It = RunningJobs.find(Id.Id);
ZEN_ASSERT(It != RunningJobs.end());
It->second->State.CurrentOp = CurrentOp;
@@ -363,8 +364,9 @@ public:
ZEN_DEBUG("Background job {}:'{}' aborted. Reason: '{}'", CurrentJob->Id.Id, CurrentJob->Name, Ex.what());
QueueLock.WithExclusiveLock([&]() {
CurrentJob->State.Messages.push_back(Ex.what());
- CurrentJob->EndTick = JobClock::Now();
- CurrentJob->WorkerThreadId = 0;
+ CurrentJob->State.AbortReason = Ex.what();
+ CurrentJob->EndTick = JobClock::Now();
+ CurrentJob->WorkerThreadId = 0;
RunningJobs.erase(CurrentJob->Id.Id);
AbortedJobs.insert_or_assign(CurrentJob->Id.Id, std::move(CurrentJob));
});
@@ -503,7 +505,7 @@ TEST_CASE("JobQueue")
RemainingJobs.push_back(Id);
break;
case JobQueue::Status::Aborted:
- ZEN_DEBUG("{} aborted. Reason: '{}'", Id.Id, Join(CurrentState->State.Messages, " "sv));
+ ZEN_DEBUG("{} aborted. Reason: '{}'", Id.Id, CurrentState->State.AbortReason);
break;
case JobQueue::Status::Completed:
ZEN_DEBUG("{} completed. '{}'", Id.Id, Join(CurrentState->State.Messages, " "sv));
diff --git a/src/zenserver/admin/admin.cpp b/src/zenserver/admin/admin.cpp
index 8b3f5a785..8093a0735 100644
--- a/src/zenserver/admin/admin.cpp
+++ b/src/zenserver/admin/admin.cpp
@@ -171,6 +171,10 @@ HttpAdminService::HttpAdminService(GcScheduler& Scheduler,
}
Obj.EndArray();
}
+ if (!State.AbortReason.empty())
+ {
+ Obj.AddString("AbortReason"sv, State.AbortReason);
+ }
};
auto GetAgeAsSeconds = [](std::chrono::system_clock::time_point Start, std::chrono::system_clock::time_point End) {
@@ -210,6 +214,7 @@ HttpAdminService::HttpAdminService(GcScheduler& Scheduler,
CbObjectWriter Obj;
Obj.AddString("Name"sv, CurrentState->Name);
Obj.AddString("Status"sv, "Aborted"sv);
+
WriteState(Obj, CurrentState->State);
Obj.AddFloat("QueueTimeS", GetAgeAsSeconds(CurrentState->CreateTime, CurrentState->StartTime));
Obj.AddFloat("RunTimeS", GetAgeAsSeconds(CurrentState->StartTime, CurrentState->EndTime));