aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2023-09-20 17:22:49 -0400
committerGitHub <[email protected]>2023-09-20 23:22:49 +0200
commit73b86bbbd15e22893f05261dee036b81bfff91cf (patch)
tree45c36ed9f8c7e340bd1ba292ec11cab5186b6965 /src
parentcontrolled zenserver shutdown (#413) (diff)
downloadzen-73b86bbbd15e22893f05261dee036b81bfff91cf.tar.xz
zen-73b86bbbd15e22893f05261dee036b81bfff91cf.zip
Improvement: Add names to background jobs for easier debugging (#412)
Improvement: Background jobs now temporarily sets thread name to background job name while executing Improvement: Background jobs tracks worker thread id used while executing
Diffstat (limited to 'src')
-rw-r--r--src/zencore/include/zencore/jobqueue.h22
-rw-r--r--src/zencore/jobqueue.cpp155
-rw-r--r--src/zenserver/admin/admin.cpp6
-rw-r--r--src/zenserver/projectstore/projectstore.cpp60
-rw-r--r--src/zenserver/projectstore/remoteprojectstore.cpp44
-rw-r--r--src/zenserver/projectstore/remoteprojectstore.h2
6 files changed, 176 insertions, 113 deletions
diff --git a/src/zencore/include/zencore/jobqueue.h b/src/zencore/include/zencore/jobqueue.h
index 41a3288e1..91ca24b34 100644
--- a/src/zencore/include/zencore/jobqueue.h
+++ b/src/zencore/include/zencore/jobqueue.h
@@ -14,16 +14,17 @@ namespace zen {
struct JobId
{
- uint64_t Id;
+ uint64_t Id = 0;
};
class JobQueue;
-struct JobContext
+class JobContext
{
- JobQueue& Queue;
- const JobId Id;
- std::atomic_bool& CancelFlag;
+public:
+ virtual bool IsCancelled() const = 0;
+ virtual void ReportMessage(std::string_view Message) = 0;
+ virtual void ReportProgress(std::string_view CurrentOp, uint32_t CurrentOpPercentComplete) = 0;
};
class JobQueue
@@ -32,12 +33,9 @@ public:
typedef std::function<void(JobContext& Context)> JobFunction;
virtual ~JobQueue() = default;
- virtual JobId QueueJob(JobFunction&& JobFunc) = 0;
- virtual bool CancelJob(JobId Id) = 0;
- virtual void Stop() = 0;
-
- virtual void ReportMessage(JobId Id, std::string_view Message) = 0;
- virtual void ReportProgress(JobId Id, std::string_view CurrentOp, uint32_t CurrentOpPercentComplete) = 0;
+ virtual JobId QueueJob(std::string_view Name, JobFunction&& JobFunc) = 0;
+ virtual bool CancelJob(JobId Id) = 0;
+ virtual void Stop() = 0;
enum class Status : uint32_t
{
@@ -64,11 +62,13 @@ public:
struct JobDetails
{
+ std::string Name;
Status Status;
State State;
std::chrono::system_clock::time_point CreateTime;
std::chrono::system_clock::time_point StartTime;
std::chrono::system_clock::time_point EndTime;
+ int WorkerThreadId;
};
// Will only respond once when status is Complete or Aborted
diff --git a/src/zencore/jobqueue.cpp b/src/zencore/jobqueue.cpp
index 34530b3a2..1755b9fe9 100644
--- a/src/zencore/jobqueue.cpp
+++ b/src/zencore/jobqueue.cpp
@@ -29,11 +29,15 @@ namespace JobClock {
ClockSource::time_point TimePointFromTick(const Tick TickCount) { return ClockSource::time_point{ClockSource::duration{TickCount}}; }
} // namespace JobClock
+class JobQueueImpl;
+
class JobQueueImpl : public JobQueue
{
public:
- struct Job : public RefCounted
+ struct Job : public RefCounted, public JobContext
{
+ JobQueueImpl* Queue;
+ std::string Name;
JobId Id;
JobFunction Callback;
std::atomic_bool CancelFlag;
@@ -41,6 +45,14 @@ public:
JobClock::Tick CreateTick;
JobClock::Tick StartTick;
JobClock::Tick EndTick;
+ int WorkerThreadId;
+
+ virtual bool IsCancelled() const override { return CancelFlag.load(); }
+ virtual void ReportMessage(std::string_view Message) override { Queue->ReportMessage(Id, Message); }
+ virtual void ReportProgress(std::string_view CurrentOp, uint32_t CurrentOpPercentComplete) override
+ {
+ Queue->ReportProgress(Id, CurrentOp, CurrentOpPercentComplete);
+ }
};
JobQueueImpl(int WorkerCount, std::string_view QueueName) : WorkerPool(WorkerCount, QueueName), WorkerCounter(1)
@@ -63,7 +75,7 @@ public:
}
}
- virtual JobId QueueJob(JobFunction&& JobFunc) override
+ virtual JobId QueueJob(std::string_view Name, JobFunction&& JobFunc) override
{
ZEN_ASSERT(InitializedFlag);
@@ -73,13 +85,17 @@ public:
IdGenerator.fetch_add(1);
}
RefPtr<Job> NewJob(new Job());
- NewJob->Callback = std::move(JobFunc);
- NewJob->CancelFlag = false;
- NewJob->Id = JobId{.Id = NewJobId};
- NewJob->CreateTick = JobClock::Now();
- NewJob->StartTick = JobClock::Never();
- NewJob->EndTick = JobClock::Never();
-
+ NewJob->Queue = this;
+ NewJob->Name = Name;
+ NewJob->Callback = std::move(JobFunc);
+ NewJob->CancelFlag = false;
+ NewJob->Id = JobId{.Id = NewJobId};
+ NewJob->CreateTick = JobClock::Now();
+ NewJob->StartTick = JobClock::Never();
+ NewJob->EndTick = JobClock::Never();
+ NewJob->WorkerThreadId = 0;
+
+ ZEN_DEBUG("Scheduling background job {}:'{}'", NewJob->Id.Id, NewJob->Name);
QueueLock.WithExclusiveLock([&]() { QueuedJobs.emplace_back(std::move(NewJob)); });
WorkerCounter.AddCount(1);
try
@@ -102,7 +118,7 @@ public:
QueuedJobs.erase(It);
}
});
- ZEN_ERROR("Failed to schedule job to job queue. Reason: ''", Ex.what());
+ ZEN_ERROR("Failed to schedule job {}:'{}' to job queue. Reason: ''", NewJob->Id.Id, NewJob->Name, Ex.what());
throw;
}
}
@@ -113,6 +129,7 @@ public:
QueueLock.WithExclusiveLock([&]() {
if (auto It = RunningJobs.find(Id.Id); It != RunningJobs.end())
{
+ ZEN_DEBUG("Cancelling running background job {}:'{}'", It->second->Id.Id, It->second->Name);
It->second->CancelFlag.store(true);
Result = true;
return;
@@ -130,6 +147,7 @@ public:
if (auto It = std::find_if(QueuedJobs.begin(), QueuedJobs.end(), [&Id](const RefPtr<Job>& Job) { return Job->Id.Id == Id.Id; });
It != QueuedJobs.end())
{
+ ZEN_DEBUG("Cancelling queued background job {}:'{}'", (*It)->Id.Id, (*It)->Name);
QueuedJobs.erase(It);
Result = true;
return;
@@ -140,6 +158,11 @@ public:
virtual void Stop() override
{
+ ZEN_DEBUG("Stopping jobqueue");
+ if (!InitializedFlag)
+ {
+ return;
+ }
InitializedFlag.store(false);
QueueLock.WithExclusiveLock([&]() {
for (auto& Job : RunningJobs)
@@ -149,41 +172,32 @@ public:
QueuedJobs.clear();
});
WorkerCounter.CountDown();
- ptrdiff_t Remaining = WorkerCounter.Remaining();
- while (Remaining > 0)
+ while (true)
{
- ZEN_INFO("Waiting for {} background jobs to complete", Remaining);
- WorkerCounter.Wait(500);
+ size_t RunningJobCount = 0;
QueueLock.WithExclusiveLock([&]() {
for (auto& Job : RunningJobs)
{
Job.second->CancelFlag.store(true);
+ ZEN_INFO("Cancelling background job {}:'{}'", Job.second->Id.Id, Job.second->Name);
+ RunningJobCount++;
}
QueuedJobs.clear();
});
- Remaining = WorkerCounter.Remaining();
+ if (RunningJobCount == 0)
+ {
+ WorkerCounter.Wait();
+ break;
+ }
+ ptrdiff_t Remaining = WorkerCounter.Remaining();
+ if (Remaining > 0)
+ {
+ ZEN_INFO("Waiting for {} background jobs to complete", Remaining);
+ WorkerCounter.Wait(500);
+ }
}
}
- virtual void ReportMessage(JobId Id, std::string_view Message) override
- {
- QueueLock.WithSharedLock([&]() {
- auto It = RunningJobs.find(Id.Id);
- ZEN_ASSERT(It != RunningJobs.end());
- It->second->State.Messages.push_back(std::string(Message));
- });
- }
-
- virtual void ReportProgress(JobId Id, std::string_view CurrentOp, uint32_t CurrentOpPercentComplete) override
- {
- QueueLock.WithSharedLock([&]() {
- auto It = RunningJobs.find(Id.Id);
- ZEN_ASSERT(It != RunningJobs.end());
- It->second->State.CurrentOp = CurrentOp;
- It->second->State.CurrentOpPercentComplete = CurrentOpPercentComplete;
- });
- }
-
virtual std::vector<JobInfo> GetJobs() override
{
std::vector<JobId> DeadJobs;
@@ -240,13 +254,15 @@ public:
virtual std::optional<JobDetails> Get(JobId Id) override
{
auto Convert = [](Status Status, Job& Job) -> JobDetails {
- return JobDetails{.Status = Status,
- .State = {.CurrentOp = Job.State.CurrentOp,
- .CurrentOpPercentComplete = Job.State.CurrentOpPercentComplete,
- .Messages = std::move(Job.State.Messages)},
- .CreateTime = JobClock::TimePointFromTick(Job.CreateTick),
- .StartTime = JobClock::TimePointFromTick(Job.StartTick),
- .EndTime = JobClock::TimePointFromTick(Job.EndTick)};
+ return JobDetails{.Name = Job.Name,
+ .Status = Status,
+ .State = {.CurrentOp = Job.State.CurrentOp,
+ .CurrentOpPercentComplete = Job.State.CurrentOpPercentComplete,
+ .Messages = std::move(Job.State.Messages)},
+ .CreateTime = JobClock::TimePointFromTick(Job.CreateTick),
+ .StartTime = JobClock::TimePointFromTick(Job.StartTick),
+ .EndTime = JobClock::TimePointFromTick(Job.EndTick),
+ .WorkerThreadId = Job.WorkerThreadId};
};
std::optional<JobDetails> Result;
@@ -278,6 +294,25 @@ public:
return Result;
}
+ void ReportMessage(JobId Id, std::string_view Message)
+ {
+ QueueLock.WithSharedLock([&]() {
+ auto It = RunningJobs.find(Id.Id);
+ ZEN_ASSERT(It != RunningJobs.end());
+ It->second->State.Messages.push_back(std::string(Message));
+ });
+ }
+
+ void ReportProgress(JobId Id, std::string_view CurrentOp, uint32_t CurrentOpPercentComplete)
+ {
+ QueueLock.WithSharedLock([&]() {
+ auto It = RunningJobs.find(Id.Id);
+ ZEN_ASSERT(It != RunningJobs.end());
+ It->second->State.CurrentOp = CurrentOp;
+ It->second->State.CurrentOpPercentComplete = CurrentOpPercentComplete;
+ });
+ }
+
std::atomic_uint64_t IdGenerator = 1;
std::atomic_bool InitializedFlag = false;
@@ -292,6 +327,7 @@ public:
void Worker()
{
+ int CurrentThreadId = GetCurrentThreadId();
RefPtr<Job> CurrentJob;
QueueLock.WithExclusiveLock([&]() {
if (!QueuedJobs.empty())
@@ -300,7 +336,8 @@ public:
ZEN_ASSERT(CurrentJob);
QueuedJobs.pop_front();
RunningJobs.insert_or_assign(CurrentJob->Id.Id, CurrentJob);
- CurrentJob->StartTick = JobClock::Now();
+ CurrentJob->StartTick = JobClock::Now();
+ CurrentJob->WorkerThreadId = CurrentThreadId;
}
});
if (!CurrentJob)
@@ -310,23 +347,29 @@ public:
try
{
- JobContext Context{.Queue = *this, .Id = CurrentJob->Id, .CancelFlag = CurrentJob->CancelFlag};
- CurrentJob->Callback(Context);
+ SetCurrentThreadName(fmt::format("BkgJob: {}", CurrentJob->Name));
+ ZEN_DEBUG("Executing background job {}:'{}'", CurrentJob->Id.Id, CurrentJob->Name);
+ CurrentJob->Callback(*CurrentJob);
+ ZEN_DEBUG("Completed background job {}:'{}'", CurrentJob->Id.Id, CurrentJob->Name);
QueueLock.WithExclusiveLock([&]() {
- CurrentJob->EndTick = JobClock::Now();
+ CurrentJob->EndTick = JobClock::Now();
+ CurrentJob->WorkerThreadId = 0;
RunningJobs.erase(CurrentJob->Id.Id);
CompletedJobs.insert_or_assign(CurrentJob->Id.Id, std::move(CurrentJob));
});
}
catch (std::exception& Ex)
{
+ ZEN_DEBUG("Background job {}:'{}' aborted. Reason: '{}'", CurrentJob->Id.Id, CurrentJob->Name, Ex.what());
QueueLock.WithExclusiveLock([&]() {
CurrentJob->State.Messages.push_back(Ex.what());
- CurrentJob->EndTick = JobClock::Now();
+ CurrentJob->EndTick = JobClock::Now();
+ CurrentJob->WorkerThreadId = 0;
RunningJobs.erase(CurrentJob->Id.Id);
AbortedJobs.insert_or_assign(CurrentJob->Id.Id, std::move(CurrentJob));
});
}
+ SetCurrentThreadName(fmt::format("JobQueueImpl::Worker {}", GetCurrentThreadId()));
}
};
@@ -378,30 +421,30 @@ TEST_CASE("JobQueue")
for (uint32_t I = 0; I < 100; I++)
{
JobsLatch.AddCount(1);
- Pool.ScheduleWork([&Queue, &JobsLatch]() {
+ Pool.ScheduleWork([&Queue, &JobsLatch, I]() {
auto _ = MakeGuard([&JobsLatch]() { JobsLatch.CountDown(); });
- auto Id = Queue->QueueJob([&](JobContext& Context) {
- if (Context.CancelFlag.load())
+ auto Id = Queue->QueueJob(fmt::format("busy {}", I), [&](JobContext& Context) {
+ if (Context.IsCancelled())
{
return;
}
- Context.Queue.ReportProgress(Context.Id, "going to sleep", 0);
+ Context.ReportProgress("going to sleep", 0);
Sleep(10);
- if (Context.CancelFlag.load())
+ if (Context.IsCancelled())
{
return;
}
- Context.Queue.ReportProgress(Context.Id, "going to sleep again", 50);
- if ((Context.Id.Id & 0xFF) == 0x10)
+ Context.ReportProgress("going to sleep again", 50);
+ if ((I & 0xFF) == 0x10)
{
- zen::ThrowSystemError(8, fmt::format("Job {} forced to fail", Context.Id.Id));
+ zen::ThrowSystemError(8, fmt::format("Job {} forced to fail", I));
}
Sleep(10);
- if (Context.CancelFlag.load())
+ if (Context.IsCancelled())
{
return;
}
- Context.Queue.ReportProgress(Context.Id, "done", 100);
+ Context.ReportProgress("done", 100);
});
});
}
diff --git a/src/zenserver/admin/admin.cpp b/src/zenserver/admin/admin.cpp
index 74131e624..cef2ba403 100644
--- a/src/zenserver/admin/admin.cpp
+++ b/src/zenserver/admin/admin.cpp
@@ -99,24 +99,29 @@ HttpAdminService::HttpAdminService(GcScheduler& Scheduler, JobQueue& BackgroundJ
case JobQueue::Status::Queued:
{
CbObjectWriter Obj;
+ Obj.AddString("Name"sv, CurrentState->Name);
Obj.AddString("Status"sv, "Queued"sv);
Obj.AddFloat("QueueTimeS", GetAgeAsSeconds(CurrentState->CreateTime, Now));
+ Obj.AddInteger("WorkerThread", CurrentState->WorkerThreadId);
Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Obj.Save());
}
break;
case JobQueue::Status::Running:
{
CbObjectWriter Obj;
+ Obj.AddString("Name"sv, CurrentState->Name);
Obj.AddString("Status"sv, "Running"sv);
WriteState(Obj, CurrentState->State);
Obj.AddFloat("QueueTimeS", GetAgeAsSeconds(CurrentState->CreateTime, CurrentState->StartTime));
Obj.AddFloat("RunTimeS", GetAgeAsSeconds(CurrentState->StartTime, Now));
+ Obj.AddInteger("WorkerThread", CurrentState->WorkerThreadId);
Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Obj.Save());
}
break;
case JobQueue::Status::Aborted:
{
CbObjectWriter Obj;
+ Obj.AddString("Name"sv, CurrentState->Name);
Obj.AddString("Status"sv, "Aborted"sv);
WriteState(Obj, CurrentState->State);
Obj.AddFloat("QueueTimeS", GetAgeAsSeconds(CurrentState->CreateTime, CurrentState->StartTime));
@@ -128,6 +133,7 @@ HttpAdminService::HttpAdminService(GcScheduler& Scheduler, JobQueue& BackgroundJ
case JobQueue::Status::Completed:
{
CbObjectWriter Obj;
+ Obj.AddString("Name"sv, CurrentState->Name);
Obj.AddString("Status"sv, "Complete"sv);
WriteState(Obj, CurrentState->State);
Obj.AddFloat("QueueTimeS", GetAgeAsSeconds(CurrentState->CreateTime, CurrentState->StartTime));
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp
index 1ad4403f4..70dc678a9 100644
--- a/src/zenserver/projectstore/projectstore.cpp
+++ b/src/zenserver/projectstore/projectstore.cpp
@@ -2881,34 +2881,37 @@ ProjectStore::Export(Ref<ProjectStore::Project> Project, ProjectStore::Oplog& Op
NiceBytes(MaxBlockSize),
NiceBytes(MaxChunkEmbedSize));
- JobId JobId = m_JobQueue.QueueJob([this,
- ActualRemoteStore = std::move(RemoteStore),
- Project,
- OplogPtr = &Oplog,
- MaxBlockSize,
- MaxChunkEmbedSize,
- EmbedLooseFile,
- CreateBlocks = StoreInfo.CreateBlocks,
- UseTempBlockFiles = StoreInfo.UseTempBlockFiles,
- Force](JobContext& Context) {
- RemoteProjectStore::Result Result = SaveOplog(m_CidStore,
- *ActualRemoteStore,
- *Project.Get(),
- *OplogPtr,
- MaxBlockSize,
- MaxChunkEmbedSize,
- EmbedLooseFile,
- CreateBlocks,
- UseTempBlockFiles,
- Force,
- &Context);
- auto Response = ConvertResult(Result);
- ZEN_INFO("SaveOplog: Status: {} '{}'", ToString(Response.first), Response.second);
- if (!IsHttpSuccessCode(Response.first))
- {
- throw std::runtime_error(fmt::format("Export failed. Status '{}'. Reason: '{}'", ToString(Response.first), Response.second));
- }
- });
+ JobId JobId =
+ m_JobQueue.QueueJob(fmt::format("Export oplog '{}/{}' to {}", Project->Identifier, Oplog.OplogId(), StoreInfo.Description),
+ [this,
+ ActualRemoteStore = std::move(RemoteStore),
+ Project,
+ OplogPtr = &Oplog,
+ MaxBlockSize,
+ MaxChunkEmbedSize,
+ EmbedLooseFile,
+ CreateBlocks = StoreInfo.CreateBlocks,
+ UseTempBlockFiles = StoreInfo.UseTempBlockFiles,
+ Force](JobContext& Context) {
+ RemoteProjectStore::Result Result = SaveOplog(m_CidStore,
+ *ActualRemoteStore,
+ *Project.Get(),
+ *OplogPtr,
+ MaxBlockSize,
+ MaxChunkEmbedSize,
+ EmbedLooseFile,
+ CreateBlocks,
+ UseTempBlockFiles,
+ Force,
+ &Context);
+ auto Response = ConvertResult(Result);
+ ZEN_INFO("SaveOplog: Status: {} '{}'", ToString(Response.first), Response.second);
+ if (!IsHttpSuccessCode(Response.first))
+ {
+ throw std::runtime_error(
+ fmt::format("Export failed. Status '{}'. Reason: '{}'", ToString(Response.first), Response.second));
+ }
+ });
return {HttpResponseCode::Accepted, fmt::format("{}", JobId.Id)};
}
@@ -2935,6 +2938,7 @@ ProjectStore::Import(ProjectStore::Project& Project, ProjectStore::Oplog& Oplog,
ZEN_INFO("Loading oplog '{}/{}' from {}", Project.Identifier, Oplog.OplogId(), StoreInfo.Description);
JobId JobId = m_JobQueue.QueueJob(
+ fmt::format("Import oplog '{}/{}' from {}", Project.Identifier, Oplog.OplogId(), StoreInfo.Description),
[this, ActualRemoteStore = std::move(RemoteStore), OplogPtr = &Oplog, MaxBlockSize, MaxChunkEmbedSize, Force](JobContext& Context) {
RemoteProjectStore::Result Result = LoadOplog(m_CidStore, *ActualRemoteStore, *OplogPtr, Force, &Context);
auto Response = ConvertResult(Result);
diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp
index ea744eb35..ee3456ecc 100644
--- a/src/zenserver/projectstore/remoteprojectstore.cpp
+++ b/src/zenserver/projectstore/remoteprojectstore.cpp
@@ -77,7 +77,7 @@ ReportProgress(JobContext* OptionalContext, std::string_view CurrentOp, ptrdiff_
if (OptionalContext)
{
ZEN_ASSERT(Total > 0);
- OptionalContext->Queue.ReportProgress(OptionalContext->Id, CurrentOp, gsl::narrow<uint32_t>((100 * (Total - Remaining)) / Total));
+ OptionalContext->ReportProgress(CurrentOp, gsl::narrow<uint32_t>((100 * (Total - Remaining)) / Total));
}
ZEN_INFO("{}", CurrentOp);
}
@@ -87,12 +87,22 @@ ReportMessage(JobContext* OptionalContext, std::string_view Message)
{
if (OptionalContext)
{
- OptionalContext->Queue.ReportMessage(OptionalContext->Id, Message);
+ OptionalContext->ReportMessage(Message);
}
ZEN_INFO("{}", Message);
}
bool
+IsCancelled(JobContext* OptionalContext)
+{
+ if (!OptionalContext)
+ {
+ return false;
+ }
+ return OptionalContext->IsCancelled();
+}
+
+bool
IterateBlock(IoBuffer&& CompressedBlock, std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor)
{
IoBuffer BlockPayload = CompressedBuffer::FromCompressedNoValidate(std::move(CompressedBlock)).Decompress().AsIoBuffer();
@@ -408,13 +418,13 @@ BuildContainer(CidStore& ChunkStore,
SectionOpsWriter << Op;
}
OpCount++;
- if (OptionalContext && OptionalContext->CancelFlag)
+ if (IsCancelled(OptionalContext))
{
RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
}
});
- if (OptionalContext && OptionalContext->CancelFlag)
+ if (IsCancelled(OptionalContext))
{
RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
return {};
@@ -493,7 +503,7 @@ BuildContainer(CidStore& ChunkStore,
return LhsKeyIt->second < RhsKeyIt->second;
});
- if (OptionalContext && OptionalContext->CancelFlag)
+ if (IsCancelled(OptionalContext))
{
RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
return {};
@@ -522,7 +532,7 @@ BuildContainer(CidStore& ChunkStore,
Latch BlockCreateLatch(1);
for (const IoHash& AttachmentHash : SortedAttachments)
{
- if (OptionalContext && OptionalContext->CancelFlag)
+ if (IsCancelled(OptionalContext))
{
RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
BlockCreateLatch.CountDown();
@@ -652,7 +662,7 @@ BuildContainer(CidStore& ChunkStore,
}
SectionOpsWriter.EndArray(); // "ops"
- if (OptionalContext && OptionalContext->CancelFlag)
+ if (IsCancelled(OptionalContext))
{
RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
BlockCreateLatch.CountDown();
@@ -670,7 +680,7 @@ BuildContainer(CidStore& ChunkStore,
LargeAttachmentCount));
CompressedBuffer CompressedOpsSection = CompressedBuffer::Compress(SectionOpsWriter.Save().GetBuffer());
- if (OptionalContext && OptionalContext->CancelFlag)
+ if (IsCancelled(OptionalContext))
{
RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
BlockCreateLatch.CountDown();
@@ -694,7 +704,7 @@ BuildContainer(CidStore& ChunkStore,
while (!BlockCreateLatch.Wait(1000))
{
ptrdiff_t Remaining = BlockCreateLatch.Remaining();
- if (OptionalContext && OptionalContext->CancelFlag)
+ if (IsCancelled(OptionalContext))
{
RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
while (!BlockCreateLatch.Wait(1000))
@@ -896,7 +906,7 @@ UploadAttachments(WorkerThreadPool& WorkerPool,
return;
}
- if (OptionalContext && OptionalContext->CancelFlag)
+ if (IsCancelled(OptionalContext))
{
if (!RemoteResult.IsError())
{
@@ -978,7 +988,7 @@ UploadAttachments(WorkerThreadPool& WorkerPool,
});
}
- if (OptionalContext && OptionalContext->CancelFlag)
+ if (IsCancelled(OptionalContext))
{
if (!RemoteResult.IsError())
{
@@ -1029,7 +1039,7 @@ UploadAttachments(WorkerThreadPool& WorkerPool,
});
}
- if (OptionalContext && OptionalContext->CancelFlag)
+ if (IsCancelled(OptionalContext))
{
if (!RemoteResult.IsError())
{
@@ -1105,7 +1115,7 @@ UploadAttachments(WorkerThreadPool& WorkerPool,
while (!SaveAttachmentsLatch.Wait(1000))
{
ptrdiff_t Remaining = SaveAttachmentsLatch.Remaining();
- if (OptionalContext && OptionalContext->CancelFlag)
+ if (IsCancelled(OptionalContext))
{
if (!RemoteResult.IsError())
{
@@ -1304,7 +1314,7 @@ SaveOplog(CidStore& ChunkStore,
/* out */ RemoteResult);
if (!RemoteResult.IsError())
{
- if (OptionalContext && OptionalContext->CancelFlag)
+ if (IsCancelled(OptionalContext))
{
RemoteProjectStore::Result Result = {.ErrorCode = 0,
.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500,
@@ -1341,7 +1351,7 @@ SaveOplog(CidStore& ChunkStore,
while (!RemoteResult.IsError())
{
- if (OptionalContext && OptionalContext->CancelFlag)
+ if (IsCancelled(OptionalContext))
{
RemoteProjectStore::Result Result = {.ErrorCode = 0,
.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500,
@@ -1365,7 +1375,7 @@ SaveOplog(CidStore& ChunkStore,
break;
}
- if (OptionalContext && OptionalContext->CancelFlag)
+ if (IsCancelled(OptionalContext))
{
RemoteProjectStore::Result Result = {.ErrorCode = 0,
.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500,
@@ -1654,7 +1664,7 @@ LoadOplog(CidStore& ChunkStore,
while (!AttachmentsWorkLatch.Wait(1000))
{
ptrdiff_t Remaining = AttachmentsWorkLatch.Remaining();
- if (OptionalContext && OptionalContext->CancelFlag)
+ if (IsCancelled(OptionalContext))
{
if (!RemoteResult.IsError())
{
diff --git a/src/zenserver/projectstore/remoteprojectstore.h b/src/zenserver/projectstore/remoteprojectstore.h
index 552b11380..be086084c 100644
--- a/src/zenserver/projectstore/remoteprojectstore.h
+++ b/src/zenserver/projectstore/remoteprojectstore.h
@@ -99,7 +99,7 @@ RemoteProjectStore::LoadContainerResult BuildContainer(
tsl::robin_map<IoHash, IoBuffer, IoHash::Hasher>*
OutOptionalTempAttachments); // Set OutOptionalTempAttachments to nullptr to avoid embedding loose "additional files"
-struct JobContext;
+class JobContext;
RemoteProjectStore::Result SaveOplogContainer(ProjectStore::Oplog& Oplog,
const CbObject& ContainerObject,