diff options
Diffstat (limited to 'src/zencompute/httpcomputeservice.cpp')
| -rw-r--r-- | src/zencompute/httpcomputeservice.cpp | 170 |
1 files changed, 133 insertions, 37 deletions
diff --git a/src/zencompute/httpcomputeservice.cpp b/src/zencompute/httpcomputeservice.cpp index bd3f4e70e..5ab189d89 100644 --- a/src/zencompute/httpcomputeservice.cpp +++ b/src/zencompute/httpcomputeservice.cpp @@ -21,12 +21,14 @@ # include <zencore/thread.h> # include <zencore/trace.h> # include <zencore/uid.h> -# include <zenstore/cidstore.h> +# include <zenstore/hashkeyset.h> +# include <zenstore/zenstore.h> # include <zentelemetry/stats.h> # include <algorithm> # include <span> # include <unordered_map> +# include <utility> # include <vector> using namespace std::literals; @@ -45,7 +47,9 @@ auto OidMatcher = [](std::string_view Str) { return Str.size() == 24 && AsciiSe struct HttpComputeService::Impl { HttpComputeService* m_Self; - CidStore& m_CidStore; + ChunkStore& m_ActionStore; + ChunkStore& m_WorkerStore; + FallbackChunkResolver m_CombinedResolver; IHttpStatsService& m_StatsService; LoggerRef m_Log; std::filesystem::path m_BaseDir; @@ -58,6 +62,8 @@ struct HttpComputeService::Impl RwLock m_WsConnectionsLock; std::vector<Ref<WebSocketConnection>> m_WsConnections; + std::function<void()> m_ShutdownCallback; + // Metrics metrics::OperationTiming m_HttpRequests; @@ -72,13 +78,13 @@ struct HttpComputeService::Impl std::string ClientHostname; // empty if no hostname was provided }; - // Remote queue registry — all three maps share the same RemoteQueueInfo objects. + // Remote queue registry - all three maps share the same RemoteQueueInfo objects. // All maps are guarded by m_RemoteQueueLock. RwLock m_RemoteQueueLock; - std::unordered_map<Oid, Ref<RemoteQueueInfo>, Oid::Hasher> m_RemoteQueuesByToken; // Token → info - std::unordered_map<int, Ref<RemoteQueueInfo>> m_RemoteQueuesByQueueId; // QueueId → info - std::unordered_map<std::string, Ref<RemoteQueueInfo>> m_RemoteQueuesByTag; // idempotency key → info + std::unordered_map<Oid, Ref<RemoteQueueInfo>, Oid::Hasher> m_RemoteQueuesByToken; // Token -> info + std::unordered_map<int, Ref<RemoteQueueInfo>> m_RemoteQueuesByQueueId; // QueueId -> info + std::unordered_map<std::string, Ref<RemoteQueueInfo>> m_RemoteQueuesByTag; // idempotency key -> info LoggerRef Log() { return m_Log; } @@ -103,25 +109,28 @@ struct HttpComputeService::Impl void HandleSubmitAction(HttpServerRequest& HttpReq, int QueueId, int Priority, const WorkerDesc* Worker); // WebSocket / observer - void OnWebSocketOpen(Ref<WebSocketConnection> Connection); + void OnWebSocketOpen(Ref<WebSocketConnection> Connection, std::string_view RelativeUri); void OnWebSocketClose(WebSocketConnection& Conn, uint16_t Code); void OnActionsCompleted(std::span<const IComputeCompletionObserver::CompletedActionNotification> Actions); void RegisterRoutes(); - Impl(HttpComputeService* Self, - CidStore& InCidStore, - IHttpStatsService& StatsService, - const std::filesystem::path& BaseDir, - int32_t MaxConcurrentActions) + Impl(HttpComputeService* Self, + ChunkStore& InActionStore, + ChunkStore& InWorkerStore, + IHttpStatsService& StatsService, + std::filesystem::path BaseDir, + int32_t MaxConcurrentActions) : m_Self(Self) - , m_CidStore(InCidStore) + , m_ActionStore(InActionStore) + , m_WorkerStore(InWorkerStore) + , m_CombinedResolver(InActionStore, InWorkerStore) , m_StatsService(StatsService) , m_Log(logging::Get("compute")) - , m_BaseDir(BaseDir) - , m_ComputeService(InCidStore) + , m_BaseDir(std::move(BaseDir)) + , m_ComputeService(m_CombinedResolver) { - m_ComputeService.AddLocalRunner(InCidStore, m_BaseDir / "local", MaxConcurrentActions); + m_ComputeService.AddLocalRunner(m_CombinedResolver, m_BaseDir / "local", MaxConcurrentActions); m_ComputeService.WaitUntilReady(); m_StatsService.RegisterHandler("compute", *m_Self); RegisterRoutes(); @@ -183,6 +192,65 @@ HttpComputeService::Impl::RegisterRoutes() HttpVerb::kPost); m_Router.RegisterRoute( + "session/drain", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + + if (m_ComputeService.RequestStateTransition(ComputeServiceSession::SessionState::Draining)) + { + CbObjectWriter Cbo; + Cbo << "state"sv << ToString(m_ComputeService.GetSessionState()); + return HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); + } + + CbObjectWriter Cbo; + Cbo << "error"sv + << "Cannot transition to Draining from current state"sv; + HttpReq.WriteResponse(HttpResponseCode::Conflict, Cbo.Save()); + }, + HttpVerb::kPost); + + m_Router.RegisterRoute( + "session/status", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + + CbObjectWriter Cbo; + Cbo << "state"sv << ToString(m_ComputeService.GetSessionState()); + auto Counts = m_ComputeService.GetActionCounts(); + Cbo << "actions_pending"sv << Counts.Pending; + Cbo << "actions_running"sv << Counts.Running; + Cbo << "actions_completed"sv << Counts.Completed; + HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); + }, + HttpVerb::kGet); + + m_Router.RegisterRoute( + "session/sunset", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + + if (m_ComputeService.RequestStateTransition(ComputeServiceSession::SessionState::Sunset)) + { + CbObjectWriter Cbo; + Cbo << "state"sv << ToString(m_ComputeService.GetSessionState()); + HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); + + if (m_ShutdownCallback) + { + m_ShutdownCallback(); + } + return; + } + + CbObjectWriter Cbo; + Cbo << "error"sv + << "Cannot transition to Sunset from current state"sv; + HttpReq.WriteResponse(HttpResponseCode::Conflict, Cbo.Save()); + }, + HttpVerb::kPost); + + m_Router.RegisterRoute( "workers", [this](HttpRouterRequest& Req) { HandleWorkersGet(Req.ServerRequest()); }, HttpVerb::kGet); @@ -499,9 +567,19 @@ HttpComputeService::Impl::RegisterRoutes() return HttpReq.WriteResponse(HttpResponseCode::Forbidden); } - m_ComputeService.StartRecording(m_CidStore, m_BaseDir / "recording"); + std::filesystem::path RecordingPath = m_BaseDir / "recording"; - return HttpReq.WriteResponse(HttpResponseCode::OK); + if (!m_ComputeService.StartRecording(m_CombinedResolver, RecordingPath)) + { + CbObjectWriter Cbo; + Cbo << "error" + << "recording is already active"; + return HttpReq.WriteResponse(HttpResponseCode::Conflict, Cbo.Save()); + } + + CbObjectWriter Cbo; + Cbo << "path" << RecordingPath.string(); + return HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); }, HttpVerb::kPost); @@ -515,9 +593,19 @@ HttpComputeService::Impl::RegisterRoutes() return HttpReq.WriteResponse(HttpResponseCode::Forbidden); } - m_ComputeService.StopRecording(); + std::filesystem::path RecordingPath = m_BaseDir / "recording"; + + if (!m_ComputeService.StopRecording()) + { + CbObjectWriter Cbo; + Cbo << "error" + << "no recording is active"; + return HttpReq.WriteResponse(HttpResponseCode::Conflict, Cbo.Save()); + } - return HttpReq.WriteResponse(HttpResponseCode::OK); + CbObjectWriter Cbo; + Cbo << "path" << RecordingPath.string(); + return HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); }, HttpVerb::kPost); @@ -584,7 +672,7 @@ HttpComputeService::Impl::RegisterRoutes() }, HttpVerb::kGet | HttpVerb::kPost); - // Queue creation routes — these remain separate since local creates a plain queue + // Queue creation routes - these remain separate since local creates a plain queue // while remote additionally generates an OID token for external access. m_Router.RegisterRoute( @@ -638,7 +726,7 @@ HttpComputeService::Impl::RegisterRoutes() return HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); } - // Queue has since expired — clean up stale entries and fall through to create a new one + // Queue has since expired - clean up stale entries and fall through to create a new one m_RemoteQueuesByToken.erase(Existing->Token); m_RemoteQueuesByQueueId.erase(Existing->QueueId); m_RemoteQueuesByTag.erase(It); @@ -667,7 +755,7 @@ HttpComputeService::Impl::RegisterRoutes() }, HttpVerb::kPost); - // Unified queue routes — {queueref} accepts both local integer IDs and remote OID tokens. + // Unified queue routes - {queueref} accepts both local integer IDs and remote OID tokens. // ResolveQueueRef() handles access control (local-only for integer IDs) and token resolution. m_Router.RegisterRoute( @@ -1017,7 +1105,7 @@ HttpComputeService::Impl::RegisterRoutes() }, HttpVerb::kPost); - // WebSocket upgrade endpoint — the handler logic lives in + // WebSocket upgrade endpoint - the handler logic lives in // HttpComputeService::OnWebSocket* methods; this route merely // satisfies the router so the upgrade request isn't rejected. m_Router.RegisterRoute( @@ -1028,11 +1116,12 @@ HttpComputeService::Impl::RegisterRoutes() ////////////////////////////////////////////////////////////////////////// -HttpComputeService::HttpComputeService(CidStore& InCidStore, +HttpComputeService::HttpComputeService(ChunkStore& InActionStore, + ChunkStore& InWorkerStore, IHttpStatsService& StatsService, const std::filesystem::path& BaseDir, int32_t MaxConcurrentActions) -: m_Impl(std::make_unique<Impl>(this, InCidStore, StatsService, BaseDir, MaxConcurrentActions)) +: m_Impl(std::make_unique<Impl>(this, InActionStore, InWorkerStore, StatsService, BaseDir, MaxConcurrentActions)) { } @@ -1058,6 +1147,12 @@ HttpComputeService::GetActionCounts() return m_Impl->m_ComputeService.GetActionCounts(); } +void +HttpComputeService::SetShutdownCallback(std::function<void()> Callback) +{ + m_Impl->m_ShutdownCallback = std::move(Callback); +} + const char* HttpComputeService::BaseUri() const { @@ -1146,7 +1241,7 @@ HttpComputeService::Impl::ResolveQueueRef(HttpServerRequest& HttpReq, std::strin { if (OidMatcher(Capture)) { - // Remote OID token — accessible from any client + // Remote OID token - accessible from any client const Oid Token = Oid::FromHexString(Capture); const int QueueId = ResolveQueueToken(Token); @@ -1158,7 +1253,7 @@ HttpComputeService::Impl::ResolveQueueRef(HttpServerRequest& HttpReq, std::strin return QueueId; } - // Local integer queue ID — restricted to local machine requests + // Local integer queue ID - restricted to local machine requests if (!HttpReq.IsLocalMachineRequest()) { HttpReq.WriteResponse(HttpResponseCode::Forbidden); @@ -1233,7 +1328,7 @@ HttpComputeService::Impl::IngestPackageAttachments(HttpServerRequest& HttpReq, c OutStats.Bytes += CompressedSize; ++OutStats.Count; - const CidStore::InsertResult InsertResult = m_CidStore.AddChunk(DataView.GetCompressed().Flatten().AsIoBuffer(), DataHash); + const ChunkStore::InsertResult InsertResult = m_ActionStore.AddChunk(DataView.GetCompressed().Flatten().AsIoBuffer(), DataHash); if (InsertResult.New) { @@ -1251,7 +1346,7 @@ HttpComputeService::Impl::CheckAttachments(const CbObject& ActionObj, std::vecto ActionObj.IterateAttachments([&](CbFieldView Field) { const IoHash FileHash = Field.AsHash(); - if (!m_CidStore.ContainsChunk(FileHash)) + if (!m_ActionStore.ContainsChunk(FileHash)) { NeedList.push_back(FileHash); } @@ -1501,7 +1596,7 @@ HttpComputeService::Impl::HandleWorkerRequest(HttpServerRequest& HttpReq, const CbPackage WorkerPackage; WorkerPackage.SetObject(WorkerSpec); - m_CidStore.FilterChunks(ChunkSet); + m_WorkerStore.FilterChunks(ChunkSet); if (ChunkSet.IsEmpty()) { @@ -1550,8 +1645,8 @@ HttpComputeService::Impl::HandleWorkerRequest(HttpServerRequest& HttpReq, const TotalAttachmentBytes += Buffer.GetCompressedSize(); ++AttachmentCount; - const CidStore::InsertResult InsertResult = - m_CidStore.AddChunk(Buffer.GetCompressed().Flatten().AsIoBuffer(), DataHash); + const ChunkStore::InsertResult InsertResult = + m_WorkerStore.AddChunk(Buffer.GetCompressed().Flatten().AsIoBuffer(), DataHash); if (InsertResult.New) { @@ -1589,9 +1684,9 @@ HttpComputeService::Impl::HandleWorkerRequest(HttpServerRequest& HttpReq, const // void -HttpComputeService::OnWebSocketOpen(Ref<WebSocketConnection> Connection) +HttpComputeService::OnWebSocketOpen(Ref<WebSocketConnection> Connection, std::string_view RelativeUri) { - m_Impl->OnWebSocketOpen(std::move(Connection)); + m_Impl->OnWebSocketOpen(std::move(Connection), RelativeUri); } void @@ -1614,12 +1709,13 @@ HttpComputeService::OnActionsCompleted(std::span<const CompletedActionNotificati ////////////////////////////////////////////////////////////////////////// // -// Impl — WebSocket / observer +// Impl - WebSocket / observer // void -HttpComputeService::Impl::OnWebSocketOpen(Ref<WebSocketConnection> Connection) +HttpComputeService::Impl::OnWebSocketOpen(Ref<WebSocketConnection> Connection, std::string_view RelativeUri) { + ZEN_UNUSED(RelativeUri); ZEN_INFO("compute WebSocket client connected"); m_WsConnectionsLock.WithExclusiveLock([&] { m_WsConnections.push_back(std::move(Connection)); }); } |