diff options
| author | Stefan Boberg <[email protected]> | 2026-03-30 15:07:08 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2026-03-30 15:07:08 +0200 |
| commit | 3540d676733efaddecf504b30e9a596465bd43f8 (patch) | |
| tree | 7a8d8b3d2da993e30c34e3ff36f659b90a2b228e /src/zencompute/httpcomputeservice.cpp | |
| parent | include rawHash in structure output for builds ls command (#903) (diff) | |
| download | zen-3540d676733efaddecf504b30e9a596465bd43f8.tar.xz zen-3540d676733efaddecf504b30e9a596465bd43f8.zip | |
Request validation and resilience improvements (#864)
### Security: Input validation & path safety
- **Reject local file references by default** in package parsing — only allow when explicitly opted in by the service (`ParseFlags::kAllowLocalReferences`) and validated by an `ILocalRefPolicy` (fail-closed: no policy = rejected)
- **`DataRootLocalRefPolicy`** restricts local ref paths to the server's data root via canonical path prefix matching
- **Validate attachment hashes** in compute HTTP handlers — decompresses and re-hashes each attachment at ingestion time to reject tampered payloads
- **Path traversal validation** for worker descriptions (`pathvalidation.h`) — rejects absolute paths, `..` components, Windows reserved device names, and invalid filename characters
- **Harden CbPackage parsing** against corrupt inputs — overflow-safe attachment count, bounds checks on local ref offset/size, graceful failure instead of `ZEN_ASSERT` for untrusted data
- **Harden legacy package parser** — reject zero-size binary fields, missing mappers, and optionally validate resolved attachment hashes
- **Bounds check in `CbPackageReader::MarshalLocalChunkReference`** — detect when `MakeFromFile` silently clamps offset+size to file size
### Reliability: Lock consolidation & bug fixes
- **Consolidate three action map locks into one** (`m_ActionMapLock`) — eliminates deadlock risk from multi-lock ordering, simplifies state transitions, and fixes a race where newly enqueued actions were briefly invisible to `GetActionResult`/`FindActionResult`
- **Fix infinite loop in `BaseRunnerGroup::SubmitActions`** when actions exceed total runner capacity — cap round-robin at `TotalCapacity` and default unassigned results to "No capacity"
- **Fix `MakeSafeAbsolutePathInPlace` for UNC paths** — `\server\share` now correctly becomes `\?\UNC\server\share` instead of `\?\server\share`
- **Fix `max_retries=0`** — previously fell through to the default of 3; now correctly means "no retries"
### New: ManagedProcessRunner
- Cross-platform process runner backed by `SubprocessManager` — uses async exit callbacks instead of polling, delegates CPU/memory metrics to the manager's built-in sampler
- `ProcessGroup` (JobObject on Windows, process group on POSIX) for bulk cancellation on shutdown
- `--managed` flag on `zen exec inproc` to select this runner
- Refactored monitor thread lifecycle — `StartMonitorThread()` now called from derived constructors to avoid calling virtual functions from base constructor
### Process management
- **Suppress crash dialogs** via `JOB_OBJECT_UILIMIT_ERRORMODE` + `SEM_NOGPFAULTERRORBOX` in both `WindowsProcessRunner` and `JobObject::Initialize` — prevents WER/Dr. Watson modal dialogs from blocking the monitor thread
- **CREATE_SUSPENDED → AssignProcessToJobObject → ResumeThread** pattern in `WindowsProcessRunner` — ensures job object assignment before process execution
- **Move stdout/stderr callbacks to `Spawn()` parameters** in `SubprocessManager` — prevents race where early output could be missed before callback installation
- Consistent PID logging across all runner types
### Test infrastructure
- **`zentest-appstub`**: Added `Fail` (configurable exit code) and `Crash` (abort / nullptr deref) test functions
- **Compute integration tests**: exit code handling, auto-retry exhaustion, manual reschedule after failure, mixed success/failure queues, crash handling (abort + nullptr), crash auto-retry, immediate query visibility after enqueue
- **Package format tests**: truncated header, bad magic, attachment count overflow, truncated data, local ref rejection/acceptance, policy enforcement (inside/outside root, traversal, no-policy fail-closed)
- **Legacy package parser tests**: empty input, zero-size binary, hash resolution with/without mapper, hash mismatch detection
- **UNC path tests** for `MakeSafeAbsolutePath`
### Misc
- ANSI color helper macros (`ZEN_RED`, `ZEN_BRIGHT_WHITE`, etc.) and `ZEN_BOLD`/`ZEN_DIM`/etc.
- Generic `fmt::formatter` for types with free `ToString` functions
- Compute dashboard: truncated hash display with monospace font and hover for full value
- Renamed `usonpackage_forcelink` → `cbpackage_forcelink`
- Compute enabled by default in xmake config (releases still explicitly disable)
Diffstat (limited to 'src/zencompute/httpcomputeservice.cpp')
| -rw-r--r-- | src/zencompute/httpcomputeservice.cpp | 104 |
1 files changed, 78 insertions, 26 deletions
diff --git a/src/zencompute/httpcomputeservice.cpp b/src/zencompute/httpcomputeservice.cpp index bdfd9d197..bd3f4e70e 100644 --- a/src/zencompute/httpcomputeservice.cpp +++ b/src/zencompute/httpcomputeservice.cpp @@ -93,13 +93,14 @@ struct HttpComputeService::Impl uint64_t NewBytes = 0; }; - IngestStats IngestPackageAttachments(const CbPackage& Package); - bool CheckAttachments(const CbObject& ActionObj, std::vector<IoHash>& NeedList); - void HandleWorkersGet(HttpServerRequest& HttpReq); - void HandleWorkersAllGet(HttpServerRequest& HttpReq); - void WriteQueueDescription(CbWriter& Cbo, int QueueId, const ComputeServiceSession::QueueStatus& Status); - void HandleWorkerRequest(HttpServerRequest& HttpReq, const IoHash& WorkerId); - void HandleSubmitAction(HttpServerRequest& HttpReq, int QueueId, int Priority, const WorkerDesc* Worker); + bool IngestPackageAttachments(HttpServerRequest& HttpReq, const CbPackage& Package, IngestStats& OutStats); + bool CheckAttachments(const CbObject& ActionObj, std::vector<IoHash>& NeedList); + bool ValidateAttachmentHash(HttpServerRequest& HttpReq, const CbAttachment& Attachment); + void HandleWorkersGet(HttpServerRequest& HttpReq); + void HandleWorkersAllGet(HttpServerRequest& HttpReq); + void WriteQueueDescription(CbWriter& Cbo, int QueueId, const ComputeServiceSession::QueueStatus& Status); + void HandleWorkerRequest(HttpServerRequest& HttpReq, const IoHash& WorkerId); + void HandleSubmitAction(HttpServerRequest& HttpReq, int QueueId, int Priority, const WorkerDesc* Worker); // WebSocket / observer void OnWebSocketOpen(Ref<WebSocketConnection> Connection); @@ -373,7 +374,7 @@ HttpComputeService::Impl::RegisterRoutes() if (HttpResponseCode ResponseCode = m_ComputeService.FindActionResult(ActionId, /* out */ Output); ResponseCode != HttpResponseCode::OK) { - ZEN_TRACE("jobs/{}/{}: {}", Req.GetCapture(1), Req.GetCapture(2), ToString(ResponseCode)) + ZEN_DEBUG("jobs/{}/{}: {}", Req.GetCapture(1), Req.GetCapture(2), ToString(ResponseCode)) if (ResponseCode == HttpResponseCode::NotFound) { @@ -1167,35 +1168,81 @@ HttpComputeService::Impl::ResolveQueueRef(HttpServerRequest& HttpReq, std::strin return ParseInt<int>(Capture).value_or(0); } -HttpComputeService::Impl::IngestStats -HttpComputeService::Impl::IngestPackageAttachments(const CbPackage& Package) +bool +HttpComputeService::Impl::ValidateAttachmentHash(HttpServerRequest& HttpReq, const CbAttachment& Attachment) { - IngestStats Stats; + const IoHash ClaimedHash = Attachment.GetHash(); + CompressedBuffer Buffer = Attachment.AsCompressedBinary(); + const IoHash HeaderHash = Buffer.DecodeRawHash(); + + if (HeaderHash != ClaimedHash) + { + ZEN_WARN("attachment header hash mismatch: claimed {} but header contains {}", ClaimedHash, HeaderHash); + HttpReq.WriteResponse(HttpResponseCode::BadRequest); + return false; + } + + IoHashStream Hasher; + + bool DecompressOk = Buffer.DecompressToStream( + 0, + Buffer.DecodeRawSize(), + [&](uint64_t /*SourceOffset*/, uint64_t /*SourceSize*/, uint64_t /*Offset*/, const CompositeBuffer& Range) -> bool { + for (const SharedBuffer& Segment : Range.GetSegments()) + { + Hasher.Append(Segment.GetView()); + } + return true; + }); + + if (!DecompressOk) + { + ZEN_WARN("attachment {}: failed to decompress", ClaimedHash); + HttpReq.WriteResponse(HttpResponseCode::BadRequest); + return false; + } + + const IoHash ActualHash = Hasher.GetHash(); + + if (ActualHash != ClaimedHash) + { + ZEN_WARN("attachment hash mismatch: claimed {} but decompressed data hashes to {}", ClaimedHash, ActualHash); + HttpReq.WriteResponse(HttpResponseCode::BadRequest); + return false; + } + + return true; +} +bool +HttpComputeService::Impl::IngestPackageAttachments(HttpServerRequest& HttpReq, const CbPackage& Package, IngestStats& OutStats) +{ for (const CbAttachment& Attachment : Package.GetAttachments()) { ZEN_ASSERT(Attachment.IsCompressedBinary()); - const IoHash DataHash = Attachment.GetHash(); - CompressedBuffer DataView = Attachment.AsCompressedBinary(); - - ZEN_UNUSED(DataHash); + if (!ValidateAttachmentHash(HttpReq, Attachment)) + { + return false; + } - const uint64_t CompressedSize = DataView.GetCompressedSize(); + const IoHash DataHash = Attachment.GetHash(); + CompressedBuffer DataView = Attachment.AsCompressedBinary(); + const uint64_t CompressedSize = DataView.GetCompressedSize(); - Stats.Bytes += CompressedSize; - ++Stats.Count; + OutStats.Bytes += CompressedSize; + ++OutStats.Count; const CidStore::InsertResult InsertResult = m_CidStore.AddChunk(DataView.GetCompressed().Flatten().AsIoBuffer(), DataHash); if (InsertResult.New) { - Stats.NewBytes += CompressedSize; - ++Stats.NewCount; + OutStats.NewBytes += CompressedSize; + ++OutStats.NewCount; } } - return Stats; + return true; } bool @@ -1253,7 +1300,10 @@ HttpComputeService::Impl::HandleSubmitAction(HttpServerRequest& HttpReq, int Que { CbPackage Package = HttpReq.ReadPayloadPackage(); Body = Package.GetObject(); - Stats = IngestPackageAttachments(Package); + if (!IngestPackageAttachments(HttpReq, Package, Stats)) + { + return; // validation failed, response already written + } break; } @@ -1268,8 +1318,7 @@ HttpComputeService::Impl::HandleSubmitAction(HttpServerRequest& HttpReq, int Que { // --- Batch path --- - // For CbObject payloads, check all attachments upfront before enqueuing anything - if (HttpReq.RequestContentType() == HttpContentType::kCbObject) + // Verify all action attachment references exist in the store { std::vector<IoHash> NeedList; @@ -1345,7 +1394,6 @@ HttpComputeService::Impl::HandleSubmitAction(HttpServerRequest& HttpReq, int Que // --- Single-action path: Body is the action itself --- - if (HttpReq.RequestContentType() == HttpContentType::kCbObject) { std::vector<IoHash> NeedList; @@ -1491,10 +1539,14 @@ HttpComputeService::Impl::HandleWorkerRequest(HttpServerRequest& HttpReq, const { ZEN_ASSERT(Attachment.IsCompressedBinary()); + if (!ValidateAttachmentHash(HttpReq, Attachment)) + { + return; + } + const IoHash DataHash = Attachment.GetHash(); CompressedBuffer Buffer = Attachment.AsCompressedBinary(); - ZEN_UNUSED(DataHash); TotalAttachmentBytes += Buffer.GetCompressedSize(); ++AttachmentCount; |