aboutsummaryrefslogtreecommitdiff
path: root/src/zencompute/httpcomputeservice.cpp
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2026-03-30 15:07:08 +0200
committerGitHub Enterprise <[email protected]>2026-03-30 15:07:08 +0200
commit3540d676733efaddecf504b30e9a596465bd43f8 (patch)
tree7a8d8b3d2da993e30c34e3ff36f659b90a2b228e /src/zencompute/httpcomputeservice.cpp
parentinclude rawHash in structure output for builds ls command (#903) (diff)
downloadzen-3540d676733efaddecf504b30e9a596465bd43f8.tar.xz
zen-3540d676733efaddecf504b30e9a596465bd43f8.zip
Request validation and resilience improvements (#864)
### Security: Input validation & path safety - **Reject local file references by default** in package parsing — only allow when explicitly opted in by the service (`ParseFlags::kAllowLocalReferences`) and validated by an `ILocalRefPolicy` (fail-closed: no policy = rejected) - **`DataRootLocalRefPolicy`** restricts local ref paths to the server's data root via canonical path prefix matching - **Validate attachment hashes** in compute HTTP handlers — decompresses and re-hashes each attachment at ingestion time to reject tampered payloads - **Path traversal validation** for worker descriptions (`pathvalidation.h`) — rejects absolute paths, `..` components, Windows reserved device names, and invalid filename characters - **Harden CbPackage parsing** against corrupt inputs — overflow-safe attachment count, bounds checks on local ref offset/size, graceful failure instead of `ZEN_ASSERT` for untrusted data - **Harden legacy package parser** — reject zero-size binary fields, missing mappers, and optionally validate resolved attachment hashes - **Bounds check in `CbPackageReader::MarshalLocalChunkReference`** — detect when `MakeFromFile` silently clamps offset+size to file size ### Reliability: Lock consolidation & bug fixes - **Consolidate three action map locks into one** (`m_ActionMapLock`) — eliminates deadlock risk from multi-lock ordering, simplifies state transitions, and fixes a race where newly enqueued actions were briefly invisible to `GetActionResult`/`FindActionResult` - **Fix infinite loop in `BaseRunnerGroup::SubmitActions`** when actions exceed total runner capacity — cap round-robin at `TotalCapacity` and default unassigned results to "No capacity" - **Fix `MakeSafeAbsolutePathInPlace` for UNC paths** — `\server\share` now correctly becomes `\?\UNC\server\share` instead of `\?\server\share` - **Fix `max_retries=0`** — previously fell through to the default of 3; now correctly means "no retries" ### New: ManagedProcessRunner - Cross-platform process runner backed by `SubprocessManager` — uses async exit callbacks instead of polling, delegates CPU/memory metrics to the manager's built-in sampler - `ProcessGroup` (JobObject on Windows, process group on POSIX) for bulk cancellation on shutdown - `--managed` flag on `zen exec inproc` to select this runner - Refactored monitor thread lifecycle — `StartMonitorThread()` now called from derived constructors to avoid calling virtual functions from base constructor ### Process management - **Suppress crash dialogs** via `JOB_OBJECT_UILIMIT_ERRORMODE` + `SEM_NOGPFAULTERRORBOX` in both `WindowsProcessRunner` and `JobObject::Initialize` — prevents WER/Dr. Watson modal dialogs from blocking the monitor thread - **CREATE_SUSPENDED → AssignProcessToJobObject → ResumeThread** pattern in `WindowsProcessRunner` — ensures job object assignment before process execution - **Move stdout/stderr callbacks to `Spawn()` parameters** in `SubprocessManager` — prevents race where early output could be missed before callback installation - Consistent PID logging across all runner types ### Test infrastructure - **`zentest-appstub`**: Added `Fail` (configurable exit code) and `Crash` (abort / nullptr deref) test functions - **Compute integration tests**: exit code handling, auto-retry exhaustion, manual reschedule after failure, mixed success/failure queues, crash handling (abort + nullptr), crash auto-retry, immediate query visibility after enqueue - **Package format tests**: truncated header, bad magic, attachment count overflow, truncated data, local ref rejection/acceptance, policy enforcement (inside/outside root, traversal, no-policy fail-closed) - **Legacy package parser tests**: empty input, zero-size binary, hash resolution with/without mapper, hash mismatch detection - **UNC path tests** for `MakeSafeAbsolutePath` ### Misc - ANSI color helper macros (`ZEN_RED`, `ZEN_BRIGHT_WHITE`, etc.) and `ZEN_BOLD`/`ZEN_DIM`/etc. - Generic `fmt::formatter` for types with free `ToString` functions - Compute dashboard: truncated hash display with monospace font and hover for full value - Renamed `usonpackage_forcelink` → `cbpackage_forcelink` - Compute enabled by default in xmake config (releases still explicitly disable)
Diffstat (limited to 'src/zencompute/httpcomputeservice.cpp')
-rw-r--r--src/zencompute/httpcomputeservice.cpp104
1 files changed, 78 insertions, 26 deletions
diff --git a/src/zencompute/httpcomputeservice.cpp b/src/zencompute/httpcomputeservice.cpp
index bdfd9d197..bd3f4e70e 100644
--- a/src/zencompute/httpcomputeservice.cpp
+++ b/src/zencompute/httpcomputeservice.cpp
@@ -93,13 +93,14 @@ struct HttpComputeService::Impl
uint64_t NewBytes = 0;
};
- IngestStats IngestPackageAttachments(const CbPackage& Package);
- bool CheckAttachments(const CbObject& ActionObj, std::vector<IoHash>& NeedList);
- void HandleWorkersGet(HttpServerRequest& HttpReq);
- void HandleWorkersAllGet(HttpServerRequest& HttpReq);
- void WriteQueueDescription(CbWriter& Cbo, int QueueId, const ComputeServiceSession::QueueStatus& Status);
- void HandleWorkerRequest(HttpServerRequest& HttpReq, const IoHash& WorkerId);
- void HandleSubmitAction(HttpServerRequest& HttpReq, int QueueId, int Priority, const WorkerDesc* Worker);
+ bool IngestPackageAttachments(HttpServerRequest& HttpReq, const CbPackage& Package, IngestStats& OutStats);
+ bool CheckAttachments(const CbObject& ActionObj, std::vector<IoHash>& NeedList);
+ bool ValidateAttachmentHash(HttpServerRequest& HttpReq, const CbAttachment& Attachment);
+ void HandleWorkersGet(HttpServerRequest& HttpReq);
+ void HandleWorkersAllGet(HttpServerRequest& HttpReq);
+ void WriteQueueDescription(CbWriter& Cbo, int QueueId, const ComputeServiceSession::QueueStatus& Status);
+ void HandleWorkerRequest(HttpServerRequest& HttpReq, const IoHash& WorkerId);
+ void HandleSubmitAction(HttpServerRequest& HttpReq, int QueueId, int Priority, const WorkerDesc* Worker);
// WebSocket / observer
void OnWebSocketOpen(Ref<WebSocketConnection> Connection);
@@ -373,7 +374,7 @@ HttpComputeService::Impl::RegisterRoutes()
if (HttpResponseCode ResponseCode = m_ComputeService.FindActionResult(ActionId, /* out */ Output);
ResponseCode != HttpResponseCode::OK)
{
- ZEN_TRACE("jobs/{}/{}: {}", Req.GetCapture(1), Req.GetCapture(2), ToString(ResponseCode))
+ ZEN_DEBUG("jobs/{}/{}: {}", Req.GetCapture(1), Req.GetCapture(2), ToString(ResponseCode))
if (ResponseCode == HttpResponseCode::NotFound)
{
@@ -1167,35 +1168,81 @@ HttpComputeService::Impl::ResolveQueueRef(HttpServerRequest& HttpReq, std::strin
return ParseInt<int>(Capture).value_or(0);
}
-HttpComputeService::Impl::IngestStats
-HttpComputeService::Impl::IngestPackageAttachments(const CbPackage& Package)
+bool
+HttpComputeService::Impl::ValidateAttachmentHash(HttpServerRequest& HttpReq, const CbAttachment& Attachment)
{
- IngestStats Stats;
+ const IoHash ClaimedHash = Attachment.GetHash();
+ CompressedBuffer Buffer = Attachment.AsCompressedBinary();
+ const IoHash HeaderHash = Buffer.DecodeRawHash();
+
+ if (HeaderHash != ClaimedHash)
+ {
+ ZEN_WARN("attachment header hash mismatch: claimed {} but header contains {}", ClaimedHash, HeaderHash);
+ HttpReq.WriteResponse(HttpResponseCode::BadRequest);
+ return false;
+ }
+
+ IoHashStream Hasher;
+
+ bool DecompressOk = Buffer.DecompressToStream(
+ 0,
+ Buffer.DecodeRawSize(),
+ [&](uint64_t /*SourceOffset*/, uint64_t /*SourceSize*/, uint64_t /*Offset*/, const CompositeBuffer& Range) -> bool {
+ for (const SharedBuffer& Segment : Range.GetSegments())
+ {
+ Hasher.Append(Segment.GetView());
+ }
+ return true;
+ });
+
+ if (!DecompressOk)
+ {
+ ZEN_WARN("attachment {}: failed to decompress", ClaimedHash);
+ HttpReq.WriteResponse(HttpResponseCode::BadRequest);
+ return false;
+ }
+
+ const IoHash ActualHash = Hasher.GetHash();
+
+ if (ActualHash != ClaimedHash)
+ {
+ ZEN_WARN("attachment hash mismatch: claimed {} but decompressed data hashes to {}", ClaimedHash, ActualHash);
+ HttpReq.WriteResponse(HttpResponseCode::BadRequest);
+ return false;
+ }
+
+ return true;
+}
+bool
+HttpComputeService::Impl::IngestPackageAttachments(HttpServerRequest& HttpReq, const CbPackage& Package, IngestStats& OutStats)
+{
for (const CbAttachment& Attachment : Package.GetAttachments())
{
ZEN_ASSERT(Attachment.IsCompressedBinary());
- const IoHash DataHash = Attachment.GetHash();
- CompressedBuffer DataView = Attachment.AsCompressedBinary();
-
- ZEN_UNUSED(DataHash);
+ if (!ValidateAttachmentHash(HttpReq, Attachment))
+ {
+ return false;
+ }
- const uint64_t CompressedSize = DataView.GetCompressedSize();
+ const IoHash DataHash = Attachment.GetHash();
+ CompressedBuffer DataView = Attachment.AsCompressedBinary();
+ const uint64_t CompressedSize = DataView.GetCompressedSize();
- Stats.Bytes += CompressedSize;
- ++Stats.Count;
+ OutStats.Bytes += CompressedSize;
+ ++OutStats.Count;
const CidStore::InsertResult InsertResult = m_CidStore.AddChunk(DataView.GetCompressed().Flatten().AsIoBuffer(), DataHash);
if (InsertResult.New)
{
- Stats.NewBytes += CompressedSize;
- ++Stats.NewCount;
+ OutStats.NewBytes += CompressedSize;
+ ++OutStats.NewCount;
}
}
- return Stats;
+ return true;
}
bool
@@ -1253,7 +1300,10 @@ HttpComputeService::Impl::HandleSubmitAction(HttpServerRequest& HttpReq, int Que
{
CbPackage Package = HttpReq.ReadPayloadPackage();
Body = Package.GetObject();
- Stats = IngestPackageAttachments(Package);
+ if (!IngestPackageAttachments(HttpReq, Package, Stats))
+ {
+ return; // validation failed, response already written
+ }
break;
}
@@ -1268,8 +1318,7 @@ HttpComputeService::Impl::HandleSubmitAction(HttpServerRequest& HttpReq, int Que
{
// --- Batch path ---
- // For CbObject payloads, check all attachments upfront before enqueuing anything
- if (HttpReq.RequestContentType() == HttpContentType::kCbObject)
+ // Verify all action attachment references exist in the store
{
std::vector<IoHash> NeedList;
@@ -1345,7 +1394,6 @@ HttpComputeService::Impl::HandleSubmitAction(HttpServerRequest& HttpReq, int Que
// --- Single-action path: Body is the action itself ---
- if (HttpReq.RequestContentType() == HttpContentType::kCbObject)
{
std::vector<IoHash> NeedList;
@@ -1491,10 +1539,14 @@ HttpComputeService::Impl::HandleWorkerRequest(HttpServerRequest& HttpReq, const
{
ZEN_ASSERT(Attachment.IsCompressedBinary());
+ if (!ValidateAttachmentHash(HttpReq, Attachment))
+ {
+ return;
+ }
+
const IoHash DataHash = Attachment.GetHash();
CompressedBuffer Buffer = Attachment.AsCompressedBinary();
- ZEN_UNUSED(DataHash);
TotalAttachmentBytes += Buffer.GetCompressedSize();
++AttachmentCount;