diff options
| author | Dan Engelbrecht <[email protected]> | 2023-09-20 17:22:49 -0400 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-09-20 23:22:49 +0200 |
| commit | 73b86bbbd15e22893f05261dee036b81bfff91cf (patch) | |
| tree | 45c36ed9f8c7e340bd1ba292ec11cab5186b6965 /src | |
| parent | controlled zenserver shutdown (#413) (diff) | |
| download | zen-73b86bbbd15e22893f05261dee036b81bfff91cf.tar.xz zen-73b86bbbd15e22893f05261dee036b81bfff91cf.zip | |
Improvement: Add names to background jobs for easier debugging (#412)
Improvement: Background jobs now temporarily sets thread name to background job name while executing
Improvement: Background jobs tracks worker thread id used while executing
Diffstat (limited to 'src')
| -rw-r--r-- | src/zencore/include/zencore/jobqueue.h | 22 | ||||
| -rw-r--r-- | src/zencore/jobqueue.cpp | 155 | ||||
| -rw-r--r-- | src/zenserver/admin/admin.cpp | 6 | ||||
| -rw-r--r-- | src/zenserver/projectstore/projectstore.cpp | 60 | ||||
| -rw-r--r-- | src/zenserver/projectstore/remoteprojectstore.cpp | 44 | ||||
| -rw-r--r-- | src/zenserver/projectstore/remoteprojectstore.h | 2 |
6 files changed, 176 insertions, 113 deletions
diff --git a/src/zencore/include/zencore/jobqueue.h b/src/zencore/include/zencore/jobqueue.h index 41a3288e1..91ca24b34 100644 --- a/src/zencore/include/zencore/jobqueue.h +++ b/src/zencore/include/zencore/jobqueue.h @@ -14,16 +14,17 @@ namespace zen { struct JobId { - uint64_t Id; + uint64_t Id = 0; }; class JobQueue; -struct JobContext +class JobContext { - JobQueue& Queue; - const JobId Id; - std::atomic_bool& CancelFlag; +public: + virtual bool IsCancelled() const = 0; + virtual void ReportMessage(std::string_view Message) = 0; + virtual void ReportProgress(std::string_view CurrentOp, uint32_t CurrentOpPercentComplete) = 0; }; class JobQueue @@ -32,12 +33,9 @@ public: typedef std::function<void(JobContext& Context)> JobFunction; virtual ~JobQueue() = default; - virtual JobId QueueJob(JobFunction&& JobFunc) = 0; - virtual bool CancelJob(JobId Id) = 0; - virtual void Stop() = 0; - - virtual void ReportMessage(JobId Id, std::string_view Message) = 0; - virtual void ReportProgress(JobId Id, std::string_view CurrentOp, uint32_t CurrentOpPercentComplete) = 0; + virtual JobId QueueJob(std::string_view Name, JobFunction&& JobFunc) = 0; + virtual bool CancelJob(JobId Id) = 0; + virtual void Stop() = 0; enum class Status : uint32_t { @@ -64,11 +62,13 @@ public: struct JobDetails { + std::string Name; Status Status; State State; std::chrono::system_clock::time_point CreateTime; std::chrono::system_clock::time_point StartTime; std::chrono::system_clock::time_point EndTime; + int WorkerThreadId; }; // Will only respond once when status is Complete or Aborted diff --git a/src/zencore/jobqueue.cpp b/src/zencore/jobqueue.cpp index 34530b3a2..1755b9fe9 100644 --- a/src/zencore/jobqueue.cpp +++ b/src/zencore/jobqueue.cpp @@ -29,11 +29,15 @@ namespace JobClock { ClockSource::time_point TimePointFromTick(const Tick TickCount) { return ClockSource::time_point{ClockSource::duration{TickCount}}; } } // namespace JobClock +class JobQueueImpl; + class JobQueueImpl : public JobQueue { public: - struct Job : public RefCounted + struct Job : public RefCounted, public JobContext { + JobQueueImpl* Queue; + std::string Name; JobId Id; JobFunction Callback; std::atomic_bool CancelFlag; @@ -41,6 +45,14 @@ public: JobClock::Tick CreateTick; JobClock::Tick StartTick; JobClock::Tick EndTick; + int WorkerThreadId; + + virtual bool IsCancelled() const override { return CancelFlag.load(); } + virtual void ReportMessage(std::string_view Message) override { Queue->ReportMessage(Id, Message); } + virtual void ReportProgress(std::string_view CurrentOp, uint32_t CurrentOpPercentComplete) override + { + Queue->ReportProgress(Id, CurrentOp, CurrentOpPercentComplete); + } }; JobQueueImpl(int WorkerCount, std::string_view QueueName) : WorkerPool(WorkerCount, QueueName), WorkerCounter(1) @@ -63,7 +75,7 @@ public: } } - virtual JobId QueueJob(JobFunction&& JobFunc) override + virtual JobId QueueJob(std::string_view Name, JobFunction&& JobFunc) override { ZEN_ASSERT(InitializedFlag); @@ -73,13 +85,17 @@ public: IdGenerator.fetch_add(1); } RefPtr<Job> NewJob(new Job()); - NewJob->Callback = std::move(JobFunc); - NewJob->CancelFlag = false; - NewJob->Id = JobId{.Id = NewJobId}; - NewJob->CreateTick = JobClock::Now(); - NewJob->StartTick = JobClock::Never(); - NewJob->EndTick = JobClock::Never(); - + NewJob->Queue = this; + NewJob->Name = Name; + NewJob->Callback = std::move(JobFunc); + NewJob->CancelFlag = false; + NewJob->Id = JobId{.Id = NewJobId}; + NewJob->CreateTick = JobClock::Now(); + NewJob->StartTick = JobClock::Never(); + NewJob->EndTick = JobClock::Never(); + NewJob->WorkerThreadId = 0; + + ZEN_DEBUG("Scheduling background job {}:'{}'", NewJob->Id.Id, NewJob->Name); QueueLock.WithExclusiveLock([&]() { QueuedJobs.emplace_back(std::move(NewJob)); }); WorkerCounter.AddCount(1); try @@ -102,7 +118,7 @@ public: QueuedJobs.erase(It); } }); - ZEN_ERROR("Failed to schedule job to job queue. Reason: ''", Ex.what()); + ZEN_ERROR("Failed to schedule job {}:'{}' to job queue. Reason: ''", NewJob->Id.Id, NewJob->Name, Ex.what()); throw; } } @@ -113,6 +129,7 @@ public: QueueLock.WithExclusiveLock([&]() { if (auto It = RunningJobs.find(Id.Id); It != RunningJobs.end()) { + ZEN_DEBUG("Cancelling running background job {}:'{}'", It->second->Id.Id, It->second->Name); It->second->CancelFlag.store(true); Result = true; return; @@ -130,6 +147,7 @@ public: if (auto It = std::find_if(QueuedJobs.begin(), QueuedJobs.end(), [&Id](const RefPtr<Job>& Job) { return Job->Id.Id == Id.Id; }); It != QueuedJobs.end()) { + ZEN_DEBUG("Cancelling queued background job {}:'{}'", (*It)->Id.Id, (*It)->Name); QueuedJobs.erase(It); Result = true; return; @@ -140,6 +158,11 @@ public: virtual void Stop() override { + ZEN_DEBUG("Stopping jobqueue"); + if (!InitializedFlag) + { + return; + } InitializedFlag.store(false); QueueLock.WithExclusiveLock([&]() { for (auto& Job : RunningJobs) @@ -149,41 +172,32 @@ public: QueuedJobs.clear(); }); WorkerCounter.CountDown(); - ptrdiff_t Remaining = WorkerCounter.Remaining(); - while (Remaining > 0) + while (true) { - ZEN_INFO("Waiting for {} background jobs to complete", Remaining); - WorkerCounter.Wait(500); + size_t RunningJobCount = 0; QueueLock.WithExclusiveLock([&]() { for (auto& Job : RunningJobs) { Job.second->CancelFlag.store(true); + ZEN_INFO("Cancelling background job {}:'{}'", Job.second->Id.Id, Job.second->Name); + RunningJobCount++; } QueuedJobs.clear(); }); - Remaining = WorkerCounter.Remaining(); + if (RunningJobCount == 0) + { + WorkerCounter.Wait(); + break; + } + ptrdiff_t Remaining = WorkerCounter.Remaining(); + if (Remaining > 0) + { + ZEN_INFO("Waiting for {} background jobs to complete", Remaining); + WorkerCounter.Wait(500); + } } } - virtual void ReportMessage(JobId Id, std::string_view Message) override - { - QueueLock.WithSharedLock([&]() { - auto It = RunningJobs.find(Id.Id); - ZEN_ASSERT(It != RunningJobs.end()); - It->second->State.Messages.push_back(std::string(Message)); - }); - } - - virtual void ReportProgress(JobId Id, std::string_view CurrentOp, uint32_t CurrentOpPercentComplete) override - { - QueueLock.WithSharedLock([&]() { - auto It = RunningJobs.find(Id.Id); - ZEN_ASSERT(It != RunningJobs.end()); - It->second->State.CurrentOp = CurrentOp; - It->second->State.CurrentOpPercentComplete = CurrentOpPercentComplete; - }); - } - virtual std::vector<JobInfo> GetJobs() override { std::vector<JobId> DeadJobs; @@ -240,13 +254,15 @@ public: virtual std::optional<JobDetails> Get(JobId Id) override { auto Convert = [](Status Status, Job& Job) -> JobDetails { - return JobDetails{.Status = Status, - .State = {.CurrentOp = Job.State.CurrentOp, - .CurrentOpPercentComplete = Job.State.CurrentOpPercentComplete, - .Messages = std::move(Job.State.Messages)}, - .CreateTime = JobClock::TimePointFromTick(Job.CreateTick), - .StartTime = JobClock::TimePointFromTick(Job.StartTick), - .EndTime = JobClock::TimePointFromTick(Job.EndTick)}; + return JobDetails{.Name = Job.Name, + .Status = Status, + .State = {.CurrentOp = Job.State.CurrentOp, + .CurrentOpPercentComplete = Job.State.CurrentOpPercentComplete, + .Messages = std::move(Job.State.Messages)}, + .CreateTime = JobClock::TimePointFromTick(Job.CreateTick), + .StartTime = JobClock::TimePointFromTick(Job.StartTick), + .EndTime = JobClock::TimePointFromTick(Job.EndTick), + .WorkerThreadId = Job.WorkerThreadId}; }; std::optional<JobDetails> Result; @@ -278,6 +294,25 @@ public: return Result; } + void ReportMessage(JobId Id, std::string_view Message) + { + QueueLock.WithSharedLock([&]() { + auto It = RunningJobs.find(Id.Id); + ZEN_ASSERT(It != RunningJobs.end()); + It->second->State.Messages.push_back(std::string(Message)); + }); + } + + void ReportProgress(JobId Id, std::string_view CurrentOp, uint32_t CurrentOpPercentComplete) + { + QueueLock.WithSharedLock([&]() { + auto It = RunningJobs.find(Id.Id); + ZEN_ASSERT(It != RunningJobs.end()); + It->second->State.CurrentOp = CurrentOp; + It->second->State.CurrentOpPercentComplete = CurrentOpPercentComplete; + }); + } + std::atomic_uint64_t IdGenerator = 1; std::atomic_bool InitializedFlag = false; @@ -292,6 +327,7 @@ public: void Worker() { + int CurrentThreadId = GetCurrentThreadId(); RefPtr<Job> CurrentJob; QueueLock.WithExclusiveLock([&]() { if (!QueuedJobs.empty()) @@ -300,7 +336,8 @@ public: ZEN_ASSERT(CurrentJob); QueuedJobs.pop_front(); RunningJobs.insert_or_assign(CurrentJob->Id.Id, CurrentJob); - CurrentJob->StartTick = JobClock::Now(); + CurrentJob->StartTick = JobClock::Now(); + CurrentJob->WorkerThreadId = CurrentThreadId; } }); if (!CurrentJob) @@ -310,23 +347,29 @@ public: try { - JobContext Context{.Queue = *this, .Id = CurrentJob->Id, .CancelFlag = CurrentJob->CancelFlag}; - CurrentJob->Callback(Context); + SetCurrentThreadName(fmt::format("BkgJob: {}", CurrentJob->Name)); + ZEN_DEBUG("Executing background job {}:'{}'", CurrentJob->Id.Id, CurrentJob->Name); + CurrentJob->Callback(*CurrentJob); + ZEN_DEBUG("Completed background job {}:'{}'", CurrentJob->Id.Id, CurrentJob->Name); QueueLock.WithExclusiveLock([&]() { - CurrentJob->EndTick = JobClock::Now(); + CurrentJob->EndTick = JobClock::Now(); + CurrentJob->WorkerThreadId = 0; RunningJobs.erase(CurrentJob->Id.Id); CompletedJobs.insert_or_assign(CurrentJob->Id.Id, std::move(CurrentJob)); }); } catch (std::exception& Ex) { + ZEN_DEBUG("Background job {}:'{}' aborted. Reason: '{}'", CurrentJob->Id.Id, CurrentJob->Name, Ex.what()); QueueLock.WithExclusiveLock([&]() { CurrentJob->State.Messages.push_back(Ex.what()); - CurrentJob->EndTick = JobClock::Now(); + CurrentJob->EndTick = JobClock::Now(); + CurrentJob->WorkerThreadId = 0; RunningJobs.erase(CurrentJob->Id.Id); AbortedJobs.insert_or_assign(CurrentJob->Id.Id, std::move(CurrentJob)); }); } + SetCurrentThreadName(fmt::format("JobQueueImpl::Worker {}", GetCurrentThreadId())); } }; @@ -378,30 +421,30 @@ TEST_CASE("JobQueue") for (uint32_t I = 0; I < 100; I++) { JobsLatch.AddCount(1); - Pool.ScheduleWork([&Queue, &JobsLatch]() { + Pool.ScheduleWork([&Queue, &JobsLatch, I]() { auto _ = MakeGuard([&JobsLatch]() { JobsLatch.CountDown(); }); - auto Id = Queue->QueueJob([&](JobContext& Context) { - if (Context.CancelFlag.load()) + auto Id = Queue->QueueJob(fmt::format("busy {}", I), [&](JobContext& Context) { + if (Context.IsCancelled()) { return; } - Context.Queue.ReportProgress(Context.Id, "going to sleep", 0); + Context.ReportProgress("going to sleep", 0); Sleep(10); - if (Context.CancelFlag.load()) + if (Context.IsCancelled()) { return; } - Context.Queue.ReportProgress(Context.Id, "going to sleep again", 50); - if ((Context.Id.Id & 0xFF) == 0x10) + Context.ReportProgress("going to sleep again", 50); + if ((I & 0xFF) == 0x10) { - zen::ThrowSystemError(8, fmt::format("Job {} forced to fail", Context.Id.Id)); + zen::ThrowSystemError(8, fmt::format("Job {} forced to fail", I)); } Sleep(10); - if (Context.CancelFlag.load()) + if (Context.IsCancelled()) { return; } - Context.Queue.ReportProgress(Context.Id, "done", 100); + Context.ReportProgress("done", 100); }); }); } diff --git a/src/zenserver/admin/admin.cpp b/src/zenserver/admin/admin.cpp index 74131e624..cef2ba403 100644 --- a/src/zenserver/admin/admin.cpp +++ b/src/zenserver/admin/admin.cpp @@ -99,24 +99,29 @@ HttpAdminService::HttpAdminService(GcScheduler& Scheduler, JobQueue& BackgroundJ case JobQueue::Status::Queued: { CbObjectWriter Obj; + Obj.AddString("Name"sv, CurrentState->Name); Obj.AddString("Status"sv, "Queued"sv); Obj.AddFloat("QueueTimeS", GetAgeAsSeconds(CurrentState->CreateTime, Now)); + Obj.AddInteger("WorkerThread", CurrentState->WorkerThreadId); Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Obj.Save()); } break; case JobQueue::Status::Running: { CbObjectWriter Obj; + Obj.AddString("Name"sv, CurrentState->Name); Obj.AddString("Status"sv, "Running"sv); WriteState(Obj, CurrentState->State); Obj.AddFloat("QueueTimeS", GetAgeAsSeconds(CurrentState->CreateTime, CurrentState->StartTime)); Obj.AddFloat("RunTimeS", GetAgeAsSeconds(CurrentState->StartTime, Now)); + Obj.AddInteger("WorkerThread", CurrentState->WorkerThreadId); Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Obj.Save()); } break; case JobQueue::Status::Aborted: { CbObjectWriter Obj; + Obj.AddString("Name"sv, CurrentState->Name); Obj.AddString("Status"sv, "Aborted"sv); WriteState(Obj, CurrentState->State); Obj.AddFloat("QueueTimeS", GetAgeAsSeconds(CurrentState->CreateTime, CurrentState->StartTime)); @@ -128,6 +133,7 @@ HttpAdminService::HttpAdminService(GcScheduler& Scheduler, JobQueue& BackgroundJ case JobQueue::Status::Completed: { CbObjectWriter Obj; + Obj.AddString("Name"sv, CurrentState->Name); Obj.AddString("Status"sv, "Complete"sv); WriteState(Obj, CurrentState->State); Obj.AddFloat("QueueTimeS", GetAgeAsSeconds(CurrentState->CreateTime, CurrentState->StartTime)); diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp index 1ad4403f4..70dc678a9 100644 --- a/src/zenserver/projectstore/projectstore.cpp +++ b/src/zenserver/projectstore/projectstore.cpp @@ -2881,34 +2881,37 @@ ProjectStore::Export(Ref<ProjectStore::Project> Project, ProjectStore::Oplog& Op NiceBytes(MaxBlockSize), NiceBytes(MaxChunkEmbedSize)); - JobId JobId = m_JobQueue.QueueJob([this, - ActualRemoteStore = std::move(RemoteStore), - Project, - OplogPtr = &Oplog, - MaxBlockSize, - MaxChunkEmbedSize, - EmbedLooseFile, - CreateBlocks = StoreInfo.CreateBlocks, - UseTempBlockFiles = StoreInfo.UseTempBlockFiles, - Force](JobContext& Context) { - RemoteProjectStore::Result Result = SaveOplog(m_CidStore, - *ActualRemoteStore, - *Project.Get(), - *OplogPtr, - MaxBlockSize, - MaxChunkEmbedSize, - EmbedLooseFile, - CreateBlocks, - UseTempBlockFiles, - Force, - &Context); - auto Response = ConvertResult(Result); - ZEN_INFO("SaveOplog: Status: {} '{}'", ToString(Response.first), Response.second); - if (!IsHttpSuccessCode(Response.first)) - { - throw std::runtime_error(fmt::format("Export failed. Status '{}'. Reason: '{}'", ToString(Response.first), Response.second)); - } - }); + JobId JobId = + m_JobQueue.QueueJob(fmt::format("Export oplog '{}/{}' to {}", Project->Identifier, Oplog.OplogId(), StoreInfo.Description), + [this, + ActualRemoteStore = std::move(RemoteStore), + Project, + OplogPtr = &Oplog, + MaxBlockSize, + MaxChunkEmbedSize, + EmbedLooseFile, + CreateBlocks = StoreInfo.CreateBlocks, + UseTempBlockFiles = StoreInfo.UseTempBlockFiles, + Force](JobContext& Context) { + RemoteProjectStore::Result Result = SaveOplog(m_CidStore, + *ActualRemoteStore, + *Project.Get(), + *OplogPtr, + MaxBlockSize, + MaxChunkEmbedSize, + EmbedLooseFile, + CreateBlocks, + UseTempBlockFiles, + Force, + &Context); + auto Response = ConvertResult(Result); + ZEN_INFO("SaveOplog: Status: {} '{}'", ToString(Response.first), Response.second); + if (!IsHttpSuccessCode(Response.first)) + { + throw std::runtime_error( + fmt::format("Export failed. Status '{}'. Reason: '{}'", ToString(Response.first), Response.second)); + } + }); return {HttpResponseCode::Accepted, fmt::format("{}", JobId.Id)}; } @@ -2935,6 +2938,7 @@ ProjectStore::Import(ProjectStore::Project& Project, ProjectStore::Oplog& Oplog, ZEN_INFO("Loading oplog '{}/{}' from {}", Project.Identifier, Oplog.OplogId(), StoreInfo.Description); JobId JobId = m_JobQueue.QueueJob( + fmt::format("Import oplog '{}/{}' from {}", Project.Identifier, Oplog.OplogId(), StoreInfo.Description), [this, ActualRemoteStore = std::move(RemoteStore), OplogPtr = &Oplog, MaxBlockSize, MaxChunkEmbedSize, Force](JobContext& Context) { RemoteProjectStore::Result Result = LoadOplog(m_CidStore, *ActualRemoteStore, *OplogPtr, Force, &Context); auto Response = ConvertResult(Result); diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp index ea744eb35..ee3456ecc 100644 --- a/src/zenserver/projectstore/remoteprojectstore.cpp +++ b/src/zenserver/projectstore/remoteprojectstore.cpp @@ -77,7 +77,7 @@ ReportProgress(JobContext* OptionalContext, std::string_view CurrentOp, ptrdiff_ if (OptionalContext) { ZEN_ASSERT(Total > 0); - OptionalContext->Queue.ReportProgress(OptionalContext->Id, CurrentOp, gsl::narrow<uint32_t>((100 * (Total - Remaining)) / Total)); + OptionalContext->ReportProgress(CurrentOp, gsl::narrow<uint32_t>((100 * (Total - Remaining)) / Total)); } ZEN_INFO("{}", CurrentOp); } @@ -87,12 +87,22 @@ ReportMessage(JobContext* OptionalContext, std::string_view Message) { if (OptionalContext) { - OptionalContext->Queue.ReportMessage(OptionalContext->Id, Message); + OptionalContext->ReportMessage(Message); } ZEN_INFO("{}", Message); } bool +IsCancelled(JobContext* OptionalContext) +{ + if (!OptionalContext) + { + return false; + } + return OptionalContext->IsCancelled(); +} + +bool IterateBlock(IoBuffer&& CompressedBlock, std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor) { IoBuffer BlockPayload = CompressedBuffer::FromCompressedNoValidate(std::move(CompressedBlock)).Decompress().AsIoBuffer(); @@ -408,13 +418,13 @@ BuildContainer(CidStore& ChunkStore, SectionOpsWriter << Op; } OpCount++; - if (OptionalContext && OptionalContext->CancelFlag) + if (IsCancelled(OptionalContext)) { RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); } }); - if (OptionalContext && OptionalContext->CancelFlag) + if (IsCancelled(OptionalContext)) { RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); return {}; @@ -493,7 +503,7 @@ BuildContainer(CidStore& ChunkStore, return LhsKeyIt->second < RhsKeyIt->second; }); - if (OptionalContext && OptionalContext->CancelFlag) + if (IsCancelled(OptionalContext)) { RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); return {}; @@ -522,7 +532,7 @@ BuildContainer(CidStore& ChunkStore, Latch BlockCreateLatch(1); for (const IoHash& AttachmentHash : SortedAttachments) { - if (OptionalContext && OptionalContext->CancelFlag) + if (IsCancelled(OptionalContext)) { RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); BlockCreateLatch.CountDown(); @@ -652,7 +662,7 @@ BuildContainer(CidStore& ChunkStore, } SectionOpsWriter.EndArray(); // "ops" - if (OptionalContext && OptionalContext->CancelFlag) + if (IsCancelled(OptionalContext)) { RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); BlockCreateLatch.CountDown(); @@ -670,7 +680,7 @@ BuildContainer(CidStore& ChunkStore, LargeAttachmentCount)); CompressedBuffer CompressedOpsSection = CompressedBuffer::Compress(SectionOpsWriter.Save().GetBuffer()); - if (OptionalContext && OptionalContext->CancelFlag) + if (IsCancelled(OptionalContext)) { RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); BlockCreateLatch.CountDown(); @@ -694,7 +704,7 @@ BuildContainer(CidStore& ChunkStore, while (!BlockCreateLatch.Wait(1000)) { ptrdiff_t Remaining = BlockCreateLatch.Remaining(); - if (OptionalContext && OptionalContext->CancelFlag) + if (IsCancelled(OptionalContext)) { RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); while (!BlockCreateLatch.Wait(1000)) @@ -896,7 +906,7 @@ UploadAttachments(WorkerThreadPool& WorkerPool, return; } - if (OptionalContext && OptionalContext->CancelFlag) + if (IsCancelled(OptionalContext)) { if (!RemoteResult.IsError()) { @@ -978,7 +988,7 @@ UploadAttachments(WorkerThreadPool& WorkerPool, }); } - if (OptionalContext && OptionalContext->CancelFlag) + if (IsCancelled(OptionalContext)) { if (!RemoteResult.IsError()) { @@ -1029,7 +1039,7 @@ UploadAttachments(WorkerThreadPool& WorkerPool, }); } - if (OptionalContext && OptionalContext->CancelFlag) + if (IsCancelled(OptionalContext)) { if (!RemoteResult.IsError()) { @@ -1105,7 +1115,7 @@ UploadAttachments(WorkerThreadPool& WorkerPool, while (!SaveAttachmentsLatch.Wait(1000)) { ptrdiff_t Remaining = SaveAttachmentsLatch.Remaining(); - if (OptionalContext && OptionalContext->CancelFlag) + if (IsCancelled(OptionalContext)) { if (!RemoteResult.IsError()) { @@ -1304,7 +1314,7 @@ SaveOplog(CidStore& ChunkStore, /* out */ RemoteResult); if (!RemoteResult.IsError()) { - if (OptionalContext && OptionalContext->CancelFlag) + if (IsCancelled(OptionalContext)) { RemoteProjectStore::Result Result = {.ErrorCode = 0, .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500, @@ -1341,7 +1351,7 @@ SaveOplog(CidStore& ChunkStore, while (!RemoteResult.IsError()) { - if (OptionalContext && OptionalContext->CancelFlag) + if (IsCancelled(OptionalContext)) { RemoteProjectStore::Result Result = {.ErrorCode = 0, .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500, @@ -1365,7 +1375,7 @@ SaveOplog(CidStore& ChunkStore, break; } - if (OptionalContext && OptionalContext->CancelFlag) + if (IsCancelled(OptionalContext)) { RemoteProjectStore::Result Result = {.ErrorCode = 0, .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500, @@ -1654,7 +1664,7 @@ LoadOplog(CidStore& ChunkStore, while (!AttachmentsWorkLatch.Wait(1000)) { ptrdiff_t Remaining = AttachmentsWorkLatch.Remaining(); - if (OptionalContext && OptionalContext->CancelFlag) + if (IsCancelled(OptionalContext)) { if (!RemoteResult.IsError()) { diff --git a/src/zenserver/projectstore/remoteprojectstore.h b/src/zenserver/projectstore/remoteprojectstore.h index 552b11380..be086084c 100644 --- a/src/zenserver/projectstore/remoteprojectstore.h +++ b/src/zenserver/projectstore/remoteprojectstore.h @@ -99,7 +99,7 @@ RemoteProjectStore::LoadContainerResult BuildContainer( tsl::robin_map<IoHash, IoBuffer, IoHash::Hasher>* OutOptionalTempAttachments); // Set OutOptionalTempAttachments to nullptr to avoid embedding loose "additional files" -struct JobContext; +class JobContext; RemoteProjectStore::Result SaveOplogContainer(ProjectStore::Oplog& Oplog, const CbObject& ContainerObject, |