aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2026-03-20 00:10:22 +0100
committerStefan Boberg <[email protected]>2026-03-20 00:10:22 +0100
commitacbec2de68598fd64179387158d4c808220241cf (patch)
tree6e88c2aadd4b428887fec5bca96ed7fa0bf4d97d
parentFix HordeClient to use HttpClientAccessToken constructor (diff)
downloadzen-sb/compute-auth.tar.xz
zen-sb/compute-auth.zip
Consolidate action map locks and fix immediate query visibilitysb/compute-auth
Replace three separate RwLocks (m_PendingLock, m_RunningLock, m_ResultsLock) with a single m_ActionMapLock guarding all three action maps. This eliminates lock-ordering requirements and prevents actions from being temporarily absent from all maps during state transitions. Additionally, insert actions into m_PendingActions immediately during EnqueueResolvedAction so they are visible to GetActionResult and FindActionResult right away, without waiting for the scheduler thread to process the update. Previously, eager clients could get a spurious 404 if they queried before the scheduler's HandleActionUpdates ran. Also adds debug logging for worker manifest, process launch and exit on Windows, and ANSI color helper macros for log formatting.
-rw-r--r--src/zencompute/CLAUDE.md6
-rw-r--r--src/zencompute/computeservice.cpp234
-rw-r--r--src/zencompute/httpcomputeservice.cpp2
-rw-r--r--src/zencompute/runners/localrunner.cpp2
-rw-r--r--src/zencompute/runners/localrunner.h2
-rw-r--r--src/zencompute/runners/windowsrunner.cpp9
-rw-r--r--src/zencore/include/zencore/logging.h20
-rw-r--r--src/zenserver-test/compute-tests.cpp35
8 files changed, 167 insertions, 143 deletions
diff --git a/src/zencompute/CLAUDE.md b/src/zencompute/CLAUDE.md
index a1a39fc3c..e72677cb0 100644
--- a/src/zencompute/CLAUDE.md
+++ b/src/zencompute/CLAUDE.md
@@ -216,11 +216,7 @@ Worker handler logic is extracted into private helpers (`HandleWorkersGet`, `Han
## Concurrency Model
-**Locking discipline:** When multiple locks must be held simultaneously, always acquire in this order to prevent deadlocks:
-1. `m_ResultsLock`
-2. `m_RunningLock` (comment in localrunner.h: "must be taken *after* m_ResultsLock")
-3. `m_PendingLock`
-4. `m_QueueLock`
+**Locking discipline:** The three action maps (`m_PendingActions`, `m_RunningMap`, `m_ResultsMap`) are guarded by a single `m_ActionMapLock`. This eliminates lock-ordering concerns between maps and prevents actions from being temporarily absent from all maps during state transitions. Runner-level `m_RunningLock` in `LocalProcessRunner` / `RemoteHttpRunner` is a separate lock on a different class — unrelated to the session-level action map lock.
**Atomic fields** for counters and simple state: queue counts, `CpuUsagePercent`, `CpuSeconds`, `RetryCount`, `RunnerAction::m_ActionState`.
diff --git a/src/zencompute/computeservice.cpp b/src/zencompute/computeservice.cpp
index f1dcb337d..d10144d08 100644
--- a/src/zencompute/computeservice.cpp
+++ b/src/zencompute/computeservice.cpp
@@ -195,13 +195,9 @@ struct ComputeServiceSession::Impl
std::atomic<IComputeCompletionObserver*> m_CompletionObserver{nullptr};
- RwLock m_PendingLock;
- std::map<int, Ref<RunnerAction>> m_PendingActions;
-
- RwLock m_RunningLock;
+ RwLock m_ActionMapLock; // Guards m_PendingActions, m_RunningMap, m_ResultsMap
+ std::map<int, Ref<RunnerAction>> m_PendingActions;
std::unordered_map<int, Ref<RunnerAction>> m_RunningMap;
-
- RwLock m_ResultsLock;
std::unordered_map<int, Ref<RunnerAction>> m_ResultsMap;
metrics::Meter m_ResultRate;
std::atomic<uint64_t> m_RetiredCount{0};
@@ -343,9 +339,12 @@ struct ComputeServiceSession::Impl
ActionCounts GetActionCounts()
{
ActionCounts Counts;
- Counts.Pending = (int)m_PendingLock.WithSharedLock([&] { return m_PendingActions.size(); });
- Counts.Running = (int)m_RunningLock.WithSharedLock([&] { return m_RunningMap.size(); });
- Counts.Completed = (int)m_ResultsLock.WithSharedLock([&] { return m_ResultsMap.size(); }) + (int)m_RetiredCount.load();
+ m_ActionMapLock.WithSharedLock([&] {
+ Counts.Pending = (int)m_PendingActions.size();
+ Counts.Running = (int)m_RunningMap.size();
+ Counts.Completed = (int)m_ResultsMap.size();
+ });
+ Counts.Completed += (int)m_RetiredCount.load();
Counts.ActiveQueues = (int)m_QueueLock.WithSharedLock([&] {
size_t Count = 0;
for (const auto& [Id, Queue] : m_Queues)
@@ -364,8 +363,10 @@ struct ComputeServiceSession::Impl
{
Cbo << "session_state"sv << ToString(m_SessionState.load(std::memory_order_relaxed));
m_WorkerLock.WithSharedLock([&] { Cbo << "worker_count"sv << m_WorkerMap.size(); });
- m_ResultsLock.WithSharedLock([&] { Cbo << "actions_complete"sv << m_ResultsMap.size(); });
- m_PendingLock.WithSharedLock([&] { Cbo << "actions_pending"sv << m_PendingActions.size(); });
+ m_ActionMapLock.WithSharedLock([&] {
+ Cbo << "actions_complete"sv << m_ResultsMap.size();
+ Cbo << "actions_pending"sv << m_PendingActions.size();
+ });
Cbo << "actions_submitted"sv << GetSubmittedActionCount();
EmitSnapshot("actions_arrival"sv, m_ArrivalRate, Cbo);
EmitSnapshot("actions_retired"sv, m_ResultRate, Cbo);
@@ -450,27 +451,17 @@ ComputeServiceSession::Impl::RequestStateTransition(SessionState NewState)
void
ComputeServiceSession::Impl::AbandonAllActions()
{
- // Collect all pending actions and mark them as Abandoned
+ // Collect all pending and running actions under a single lock scope
std::vector<Ref<RunnerAction>> PendingToAbandon;
+ std::vector<Ref<RunnerAction>> RunningToAbandon;
- m_PendingLock.WithSharedLock([&] {
+ m_ActionMapLock.WithSharedLock([&] {
PendingToAbandon.reserve(m_PendingActions.size());
for (auto& [Lsn, Action] : m_PendingActions)
{
PendingToAbandon.push_back(Action);
}
- });
-
- for (auto& Action : PendingToAbandon)
- {
- Action->SetActionState(RunnerAction::State::Abandoned);
- }
- // Collect all running actions and mark them as Abandoned, then
- // best-effort cancel via the local runner group
- std::vector<Ref<RunnerAction>> RunningToAbandon;
-
- m_RunningLock.WithSharedLock([&] {
RunningToAbandon.reserve(m_RunningMap.size());
for (auto& [Lsn, Action] : m_RunningMap)
{
@@ -478,6 +469,11 @@ ComputeServiceSession::Impl::AbandonAllActions()
}
});
+ for (auto& Action : PendingToAbandon)
+ {
+ Action->SetActionState(RunnerAction::State::Abandoned);
+ }
+
for (auto& Action : RunningToAbandon)
{
Action->SetActionState(RunnerAction::State::Abandoned);
@@ -742,7 +738,7 @@ std::vector<ComputeServiceSession::RunningActionInfo>
ComputeServiceSession::Impl::GetRunningActions()
{
std::vector<ComputeServiceSession::RunningActionInfo> Result;
- m_RunningLock.WithSharedLock([&] {
+ m_ActionMapLock.WithSharedLock([&] {
Result.reserve(m_RunningMap.size());
for (const auto& [Lsn, Action] : m_RunningMap)
{
@@ -994,10 +990,15 @@ ComputeServiceSession::Impl::EnqueueResolvedAction(int QueueId, WorkerDesc Worke
Pending->ActionObj = ActionObj;
Pending->Priority = RequestPriority;
- // For now simply put action into pending state, so we can do batch scheduling
+ // Insert into the pending map immediately so the action is visible to
+ // FindActionResult/GetActionResult right away. SetActionState will call
+ // PostUpdate which adds the action to m_UpdatedActions and signals the
+ // scheduler, but the scheduler's HandleActionUpdates inserts with
+ // std::map::insert which is a no-op for existing keys.
ZEN_DEBUG("action {} ({}) PENDING", Pending->ActionId, Pending->ActionLsn);
+ m_ActionMapLock.WithExclusiveLock([&] { m_PendingActions.insert({ActionLsn, Pending}); });
Pending->SetActionState(RunnerAction::State::Pending);
if (m_Recorder)
@@ -1043,11 +1044,7 @@ ComputeServiceSession::Impl::GetSubmittedActionCount()
HttpResponseCode
ComputeServiceSession::Impl::GetActionResult(int ActionLsn, CbPackage& OutResultPackage)
{
- // This lock is held for the duration of the function since we need to
- // be sure that the action doesn't change state while we are checking the
- // different data structures
-
- RwLock::ExclusiveLockScope _(m_ResultsLock);
+ RwLock::ExclusiveLockScope _(m_ActionMapLock);
if (auto It = m_ResultsMap.find(ActionLsn); It != m_ResultsMap.end())
{
@@ -1058,25 +1055,14 @@ ComputeServiceSession::Impl::GetActionResult(int ActionLsn, CbPackage& OutResult
return HttpResponseCode::OK;
}
+ if (m_PendingActions.find(ActionLsn) != m_PendingActions.end())
{
- RwLock::SharedLockScope __(m_PendingLock);
-
- if (auto FindIt = m_PendingActions.find(ActionLsn); FindIt != m_PendingActions.end())
- {
- return HttpResponseCode::Accepted;
- }
+ return HttpResponseCode::Accepted;
}
- // Lock order is important here to avoid deadlocks, RwLock m_RunningLock must
- // always be taken after m_ResultsLock if both are needed
-
+ if (m_RunningMap.find(ActionLsn) != m_RunningMap.end())
{
- RwLock::SharedLockScope __(m_RunningLock);
-
- if (m_RunningMap.find(ActionLsn) != m_RunningMap.end())
- {
- return HttpResponseCode::Accepted;
- }
+ return HttpResponseCode::Accepted;
}
return HttpResponseCode::NotFound;
@@ -1085,11 +1071,7 @@ ComputeServiceSession::Impl::GetActionResult(int ActionLsn, CbPackage& OutResult
HttpResponseCode
ComputeServiceSession::Impl::FindActionResult(const IoHash& ActionId, CbPackage& OutResultPackage)
{
- // This lock is held for the duration of the function since we need to
- // be sure that the action doesn't change state while we are checking the
- // different data structures
-
- RwLock::ExclusiveLockScope _(m_ResultsLock);
+ RwLock::ExclusiveLockScope _(m_ActionMapLock);
for (auto It = begin(m_ResultsMap), End = end(m_ResultsMap); It != End; ++It)
{
@@ -1103,30 +1085,19 @@ ComputeServiceSession::Impl::FindActionResult(const IoHash& ActionId, CbPackage&
}
}
+ for (const auto& [K, Pending] : m_PendingActions)
{
- RwLock::SharedLockScope __(m_PendingLock);
-
- for (const auto& [K, Pending] : m_PendingActions)
+ if (Pending->ActionId == ActionId)
{
- if (Pending->ActionId == ActionId)
- {
- return HttpResponseCode::Accepted;
- }
+ return HttpResponseCode::Accepted;
}
}
- // Lock order is important here to avoid deadlocks, RwLock m_RunningLock must
- // always be taken after m_ResultsLock if both are needed
-
+ for (const auto& [K, v] : m_RunningMap)
{
- RwLock::SharedLockScope __(m_RunningLock);
-
- for (const auto& [K, v] : m_RunningMap)
+ if (v->ActionId == ActionId)
{
- if (v->ActionId == ActionId)
- {
- return HttpResponseCode::Accepted;
- }
+ return HttpResponseCode::Accepted;
}
}
@@ -1144,7 +1115,7 @@ ComputeServiceSession::Impl::GetCompleted(CbWriter& Cbo)
{
Cbo.BeginArray("completed");
- m_ResultsLock.WithSharedLock([&] {
+ m_ActionMapLock.WithSharedLock([&] {
for (auto& [Lsn, Action] : m_ResultsMap)
{
Cbo.BeginObject();
@@ -1275,20 +1246,14 @@ ComputeServiceSession::Impl::CancelQueue(int QueueId)
std::vector<Ref<RunnerAction>> PendingActionsToCancel;
std::vector<int> RunningLsnsToCancel;
- m_PendingLock.WithSharedLock([&] {
+ m_ActionMapLock.WithSharedLock([&] {
for (int Lsn : LsnsToCancel)
{
if (auto It = m_PendingActions.find(Lsn); It != m_PendingActions.end())
{
PendingActionsToCancel.push_back(It->second);
}
- }
- });
-
- m_RunningLock.WithSharedLock([&] {
- for (int Lsn : LsnsToCancel)
- {
- if (m_RunningMap.find(Lsn) != m_RunningMap.end())
+ else if (m_RunningMap.find(Lsn) != m_RunningMap.end())
{
RunningLsnsToCancel.push_back(Lsn);
}
@@ -1307,7 +1272,7 @@ ComputeServiceSession::Impl::CancelQueue(int QueueId)
// transition from the runner is blocked (Cancelled > Failed in the enum).
for (int Lsn : RunningLsnsToCancel)
{
- m_RunningLock.WithSharedLock([&] {
+ m_ActionMapLock.WithSharedLock([&] {
if (auto It = m_RunningMap.find(Lsn); It != m_RunningMap.end())
{
It->second->SetActionState(RunnerAction::State::Cancelled);
@@ -1445,7 +1410,7 @@ ComputeServiceSession::Impl::GetQueueCompleted(int QueueId, CbWriter& Cbo)
if (Queue)
{
Queue->m_Lock.WithSharedLock([&] {
- m_ResultsLock.WithSharedLock([&] {
+ m_ActionMapLock.WithSharedLock([&] {
for (int Lsn : Queue->FinishedLsns)
{
if (m_ResultsMap.contains(Lsn))
@@ -1541,9 +1506,15 @@ ComputeServiceSession::Impl::SchedulePendingActions()
{
ZEN_TRACE_CPU("ComputeServiceSession::SchedulePendingActions");
int ScheduledCount = 0;
- size_t RunningCount = m_RunningLock.WithSharedLock([&] { return m_RunningMap.size(); });
- size_t PendingCount = m_PendingLock.WithSharedLock([&] { return m_PendingActions.size(); });
- size_t ResultCount = m_ResultsLock.WithSharedLock([&] { return m_ResultsMap.size(); });
+ size_t RunningCount = 0;
+ size_t PendingCount = 0;
+ size_t ResultCount = 0;
+
+ m_ActionMapLock.WithSharedLock([&] {
+ RunningCount = m_RunningMap.size();
+ PendingCount = m_PendingActions.size();
+ ResultCount = m_ResultsMap.size();
+ });
static Stopwatch DumpRunningTimer;
@@ -1560,7 +1531,7 @@ ComputeServiceSession::Impl::SchedulePendingActions()
DumpRunningTimer.Reset();
std::set<int> RunningList;
- m_RunningLock.WithSharedLock([&] {
+ m_ActionMapLock.WithSharedLock([&] {
for (auto& [K, V] : m_RunningMap)
{
RunningList.insert(K);
@@ -1602,7 +1573,7 @@ ComputeServiceSession::Impl::SchedulePendingActions()
// Also note that the m_PendingActions list is not maintained
// here, that's done periodically in SchedulePendingActions()
- m_PendingLock.WithExclusiveLock([&] {
+ m_ActionMapLock.WithExclusiveLock([&] {
if (m_SessionState.load(std::memory_order_relaxed) >= SessionState::Paused)
{
return;
@@ -1701,7 +1672,7 @@ ComputeServiceSession::Impl::SchedulerThreadFunction()
{
int TimeoutMs = 500;
- auto PendingCount = m_PendingLock.WithSharedLock([&] { return m_PendingActions.size(); });
+ auto PendingCount = m_ActionMapLock.WithSharedLock([&] { return m_PendingActions.size(); });
if (PendingCount)
{
@@ -1720,22 +1691,22 @@ ComputeServiceSession::Impl::SchedulerThreadFunction()
m_SchedulingThreadEvent.Reset();
}
- ZEN_DEBUG("compute scheduler TICK (Pending: {} was {}, Running: {}, Results: {}) timeout: {}",
- m_PendingLock.WithSharedLock([&] { return m_PendingActions.size(); }),
- PendingCount,
- m_RunningLock.WithSharedLock([&] { return m_RunningMap.size(); }),
- m_ResultsLock.WithSharedLock([&] { return m_ResultsMap.size(); }),
- TimeoutMs);
+ m_ActionMapLock.WithSharedLock([&] {
+ ZEN_DEBUG("compute scheduler TICK (Pending: {}, Running: {}, Results: {}) timeout: {}",
+ m_PendingActions.size(),
+ m_RunningMap.size(),
+ m_ResultsMap.size(),
+ TimeoutMs);
+ });
HandleActionUpdates();
// Auto-transition Draining → Paused when all work is done
if (m_SessionState.load(std::memory_order_relaxed) == SessionState::Draining)
{
- size_t Pending = m_PendingLock.WithSharedLock([&] { return m_PendingActions.size(); });
- size_t Running = m_RunningLock.WithSharedLock([&] { return m_RunningMap.size(); });
+ bool AllDrained = m_ActionMapLock.WithSharedLock([&] { return m_PendingActions.empty() && m_RunningMap.empty(); });
- if (Pending == 0 && Running == 0)
+ if (AllDrained)
{
SessionState Expected = SessionState::Draining;
if (m_SessionState.compare_exchange_strong(Expected, SessionState::Paused, std::memory_order_acq_rel))
@@ -1797,7 +1768,7 @@ ComputeServiceSession::Impl::RescheduleAction(int ActionLsn)
// Find, validate, and remove atomically under a single lock scope to prevent
// concurrent RescheduleAction calls from double-removing the same action.
- m_ResultsLock.WithExclusiveLock([&] {
+ m_ActionMapLock.WithExclusiveLock([&] {
auto It = m_ResultsMap.find(ActionLsn);
if (It == m_ResultsMap.end())
{
@@ -1871,26 +1842,20 @@ ComputeServiceSession::Impl::RetractAction(int ActionLsn)
bool WasRunning = false;
// Look for the action in pending or running maps
- m_RunningLock.WithSharedLock([&] {
+ m_ActionMapLock.WithSharedLock([&] {
if (auto It = m_RunningMap.find(ActionLsn); It != m_RunningMap.end())
{
Action = It->second;
WasRunning = true;
}
+ else if (auto PIt = m_PendingActions.find(ActionLsn); PIt != m_PendingActions.end())
+ {
+ Action = PIt->second;
+ }
});
if (!Action)
{
- m_PendingLock.WithSharedLock([&] {
- if (auto It = m_PendingActions.find(ActionLsn); It != m_PendingActions.end())
- {
- Action = It->second;
- }
- });
- }
-
- if (!Action)
- {
return {.Success = false, .Error = "Action not found in pending or running maps"};
}
@@ -1912,18 +1877,15 @@ ComputeServiceSession::Impl::RetractAction(int ActionLsn)
void
ComputeServiceSession::Impl::RemoveActionFromActiveMaps(int ActionLsn)
{
- m_RunningLock.WithExclusiveLock([&] {
- m_PendingLock.WithExclusiveLock([&] {
- if (auto FindIt = m_RunningMap.find(ActionLsn); FindIt == m_RunningMap.end())
- {
- m_PendingActions.erase(ActionLsn);
- }
- else
- {
- m_RunningMap.erase(FindIt);
- }
- });
- });
+ // Caller must hold m_ActionMapLock exclusively.
+ if (auto FindIt = m_RunningMap.find(ActionLsn); FindIt == m_RunningMap.end())
+ {
+ m_PendingActions.erase(ActionLsn);
+ }
+ else
+ {
+ m_RunningMap.erase(FindIt);
+ }
}
void
@@ -1973,7 +1935,7 @@ ComputeServiceSession::Impl::HandleActionUpdates()
}
else
{
- m_PendingLock.WithExclusiveLock([&] { m_PendingActions.insert({ActionLsn, Action}); });
+ m_ActionMapLock.WithExclusiveLock([&] { m_PendingActions.insert({ActionLsn, Action}); });
}
break;
@@ -1983,11 +1945,9 @@ ComputeServiceSession::Impl::HandleActionUpdates()
// Dispatched to a runner — move from pending to running
case RunnerAction::State::Running:
- m_RunningLock.WithExclusiveLock([&] {
- m_PendingLock.WithExclusiveLock([&] {
- m_RunningMap.insert({ActionLsn, Action});
- m_PendingActions.erase(ActionLsn);
- });
+ m_ActionMapLock.WithExclusiveLock([&] {
+ m_RunningMap.insert({ActionLsn, Action});
+ m_PendingActions.erase(ActionLsn);
});
ZEN_DEBUG("action {} ({}) RUNNING", Action->ActionId, ActionLsn);
break;
@@ -1995,7 +1955,10 @@ ComputeServiceSession::Impl::HandleActionUpdates()
// Retracted — pull back for rescheduling without counting against retry limit
case RunnerAction::State::Retracted:
{
- RemoveActionFromActiveMaps(ActionLsn);
+ m_ActionMapLock.WithExclusiveLock([&] {
+ m_RunningMap.erase(ActionLsn);
+ m_PendingActions[ActionLsn] = Action;
+ });
Action->ResetActionStateToPending();
ZEN_INFO("action {} ({}) retracted for rescheduling", Action->ActionId, ActionLsn);
break;
@@ -2019,7 +1982,10 @@ ComputeServiceSession::Impl::HandleActionUpdates()
if (Action->RetryCount.load(std::memory_order_relaxed) < MaxRetries)
{
- RemoveActionFromActiveMaps(ActionLsn);
+ m_ActionMapLock.WithExclusiveLock([&] {
+ m_RunningMap.erase(ActionLsn);
+ m_PendingActions[ActionLsn] = Action;
+ });
// Reset triggers PostUpdate() which re-enters the action as Pending
Action->ResetActionStateToPending();
@@ -2034,16 +2000,16 @@ ComputeServiceSession::Impl::HandleActionUpdates()
}
}
- RemoveActionFromActiveMaps(ActionLsn);
+ m_ActionMapLock.WithExclusiveLock([&] {
+ RemoveActionFromActiveMaps(ActionLsn);
- // Update queue counters BEFORE publishing the result into
- // m_ResultsMap. GetActionResult erases from m_ResultsMap
- // under m_ResultsLock, so if we updated counters after
- // releasing that lock, a caller could observe ActiveCount
- // still at 1 immediately after GetActionResult returned OK.
- NotifyQueueActionComplete(Action->QueueId, ActionLsn, TerminalState);
+ // Update queue counters BEFORE publishing the result into
+ // m_ResultsMap. GetActionResult erases from m_ResultsMap
+ // under m_ActionMapLock, so if we updated counters after
+ // releasing that lock, a caller could observe ActiveCount
+ // still at 1 immediately after GetActionResult returned OK.
+ NotifyQueueActionComplete(Action->QueueId, ActionLsn, TerminalState);
- m_ResultsLock.WithExclusiveLock([&] {
m_ResultsMap[ActionLsn] = Action;
// Append to bounded action history ring
diff --git a/src/zencompute/httpcomputeservice.cpp b/src/zencompute/httpcomputeservice.cpp
index bdfd9d197..3c56c732a 100644
--- a/src/zencompute/httpcomputeservice.cpp
+++ b/src/zencompute/httpcomputeservice.cpp
@@ -373,7 +373,7 @@ HttpComputeService::Impl::RegisterRoutes()
if (HttpResponseCode ResponseCode = m_ComputeService.FindActionResult(ActionId, /* out */ Output);
ResponseCode != HttpResponseCode::OK)
{
- ZEN_TRACE("jobs/{}/{}: {}", Req.GetCapture(1), Req.GetCapture(2), ToString(ResponseCode))
+ ZEN_DEBUG("jobs/{}/{}: {}", Req.GetCapture(1), Req.GetCapture(2), ToString(ResponseCode))
if (ResponseCode == HttpResponseCode::NotFound)
{
diff --git a/src/zencompute/runners/localrunner.cpp b/src/zencompute/runners/localrunner.cpp
index b61e0a46f..362f917ca 100644
--- a/src/zencompute/runners/localrunner.cpp
+++ b/src/zencompute/runners/localrunner.cpp
@@ -482,6 +482,8 @@ LocalProcessRunner::ManifestWorker(const CbPackage& WorkerPackage,
}
WriteFile(SandboxPath / "worker.zcb", WorkerDescription.GetBuffer().AsIoBuffer());
+
+ ZEN_INFO("manifested worker '{}' in '{}'", WorkerPackage.GetObjectHash(), SandboxPath);
}
CbPackage
diff --git a/src/zencompute/runners/localrunner.h b/src/zencompute/runners/localrunner.h
index b8cff6826..576892e4e 100644
--- a/src/zencompute/runners/localrunner.h
+++ b/src/zencompute/runners/localrunner.h
@@ -83,8 +83,6 @@ protected:
std::filesystem::path m_SandboxPath;
int32_t m_MaxRunningActions = 64; // arbitrary limit for testing
- // if used in conjuction with m_ResultsLock, this lock must be taken *after*
- // m_ResultsLock to avoid deadlocks
RwLock m_RunningLock;
std::unordered_map<int, Ref<RunningAction>> m_RunningMap;
diff --git a/src/zencompute/runners/windowsrunner.cpp b/src/zencompute/runners/windowsrunner.cpp
index a646e91c8..d124481a8 100644
--- a/src/zencompute/runners/windowsrunner.cpp
+++ b/src/zencompute/runners/windowsrunner.cpp
@@ -207,7 +207,7 @@ WindowsProcessRunner::SubmitAction(Ref<RunnerAction> Action)
BOOL bInheritHandles = FALSE;
DWORD dwCreationFlags = CREATE_SUSPENDED;
- ZEN_DEBUG("Executing: {} (sandboxed={})", WideToUtf8(CommandLine.c_str()), m_Sandboxed);
+ ZEN_DEBUG("{}: '{}' (sandbox='{}')", m_Sandboxed ? "Sandboxing" : "Executing", WideToUtf8(CommandLine.c_str()), Prepared->SandboxPath);
CommandLine.EnsureNulTerminated();
@@ -314,6 +314,8 @@ WindowsProcessRunner::SubmitAction(Ref<RunnerAction> Action)
Action->SetActionState(RunnerAction::State::Running);
+ ZEN_DEBUG("Local runner: action LSN {} -> PID {}", Action->ActionLsn, ProcessInformation.dwProcessId);
+
return SubmitResult{.IsAccepted = true};
}
@@ -333,6 +335,11 @@ WindowsProcessRunner::SweepRunningActions()
if (IsSuccess && ExitCode != STILL_ACTIVE)
{
+ ZEN_DEBUG("Local runner: action LSN {} + PID {} exited with code " ZEN_WHITE("{}"),
+ Running->Action->ActionLsn,
+ Running->ProcessHandle,
+ ExitCode);
+
CloseHandle(Running->ProcessHandle);
Running->ProcessHandle = INVALID_HANDLE_VALUE;
Running->ExitCode = ExitCode;
diff --git a/src/zencore/include/zencore/logging.h b/src/zencore/include/zencore/logging.h
index 3427991d2..9923c8531 100644
--- a/src/zencore/include/zencore/logging.h
+++ b/src/zencore/include/zencore/logging.h
@@ -90,6 +90,26 @@ using zen::ConsoleLog;
using zen::ErrorLog;
using zen::Log;
+////////////////////////////////////////////////////////////////////////
+// Color helpers
+
+#define ZEN_RED(str) "\033[31m" str "\033[0m"
+#define ZEN_GREEN(str) "\033[32m" str "\033[0m"
+#define ZEN_YELLOW(str) "\033[33m" str "\033[0m"
+#define ZEN_BLUE(str) "\033[34m" str "\033[0m"
+#define ZEN_MAGENTA(str) "\033[35m" str "\033[0m"
+#define ZEN_CYAN(str) "\033[36m" str "\033[0m"
+#define ZEN_WHITE(str) "\033[37m" str "\033[0m"
+
+#define ZEN_BOLD(str) "\033[1m" str "\033[0m"
+#define ZEN_UNDERLINE(str) "\033[4m" str "\033[0m"
+#define ZEN_DIM(str) "\033[2m" str "\033[0m"
+#define ZEN_ITALIC(str) "\033[3m" str "\033[0m"
+#define ZEN_STRIKETHROUGH(str) "\033[9m" str "\033[0m"
+#define ZEN_INVERSE(str) "\033[7m" str "\033[0m"
+
+////////////////////////////////////////////////////////////////////////
+
#if ZEN_BUILD_DEBUG
# define ZEN_CHECK_FORMAT_STRING(fmtstr, ...) \
while (false) \
diff --git a/src/zenserver-test/compute-tests.cpp b/src/zenserver-test/compute-tests.cpp
index d088c2bee..e522ed203 100644
--- a/src/zenserver-test/compute-tests.cpp
+++ b/src/zenserver-test/compute-tests.cpp
@@ -2542,6 +2542,41 @@ TEST_CASE("function.retract_http")
fmt::format("Second retract failed: status={}, body={}", int(RetractResp2.StatusCode), RetractResp2.ToText()));
}
+TEST_CASE("function.session.immediate_query_after_enqueue")
+{
+ // Verify that actions are immediately visible to GetActionResult and
+ // FindActionResult right after enqueue, without waiting for the
+ // scheduler thread to process the update.
+
+ InMemoryChunkResolver Resolver;
+ ScopedTemporaryDirectory SessionBaseDir;
+ zen::compute::ComputeServiceSession Session(Resolver);
+ Session.Ready();
+
+ CbPackage WorkerPackage = BuildWorkerPackage(TestEnv, Resolver);
+ Session.RegisterWorker(WorkerPackage);
+
+ CbObject ActionObj = BuildRot13ActionForSession("immediate-query"sv, Resolver);
+
+ auto EnqueueRes = Session.EnqueueAction(ActionObj, 0);
+ REQUIRE_MESSAGE(EnqueueRes, "Failed to enqueue action");
+
+ // Query by LSN immediately — must not return NotFound
+ CbPackage Result;
+ HttpResponseCode Code = Session.GetActionResult(EnqueueRes.Lsn, Result);
+ CHECK_MESSAGE(Code == HttpResponseCode::Accepted,
+ fmt::format("GetActionResult returned {} immediately after enqueue, expected Accepted", int(Code)));
+
+ // Query by ActionId immediately — must not return NotFound
+ const IoHash ActionId = ActionObj.GetHash();
+ CbPackage FindResult;
+ HttpResponseCode FindCode = Session.FindActionResult(ActionId, FindResult);
+ CHECK_MESSAGE(FindCode == HttpResponseCode::Accepted,
+ fmt::format("FindActionResult returned {} immediately after enqueue, expected Accepted", int(FindCode)));
+
+ Session.Shutdown();
+}
+
TEST_SUITE_END();
} // namespace zen::tests::compute