aboutsummaryrefslogtreecommitdiff
path: root/src/zencompute/httpcomputeservice.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zencompute/httpcomputeservice.cpp')
-rw-r--r--src/zencompute/httpcomputeservice.cpp170
1 files changed, 133 insertions, 37 deletions
diff --git a/src/zencompute/httpcomputeservice.cpp b/src/zencompute/httpcomputeservice.cpp
index bd3f4e70e..5ab189d89 100644
--- a/src/zencompute/httpcomputeservice.cpp
+++ b/src/zencompute/httpcomputeservice.cpp
@@ -21,12 +21,14 @@
# include <zencore/thread.h>
# include <zencore/trace.h>
# include <zencore/uid.h>
-# include <zenstore/cidstore.h>
+# include <zenstore/hashkeyset.h>
+# include <zenstore/zenstore.h>
# include <zentelemetry/stats.h>
# include <algorithm>
# include <span>
# include <unordered_map>
+# include <utility>
# include <vector>
using namespace std::literals;
@@ -45,7 +47,9 @@ auto OidMatcher = [](std::string_view Str) { return Str.size() == 24 && AsciiSe
struct HttpComputeService::Impl
{
HttpComputeService* m_Self;
- CidStore& m_CidStore;
+ ChunkStore& m_ActionStore;
+ ChunkStore& m_WorkerStore;
+ FallbackChunkResolver m_CombinedResolver;
IHttpStatsService& m_StatsService;
LoggerRef m_Log;
std::filesystem::path m_BaseDir;
@@ -58,6 +62,8 @@ struct HttpComputeService::Impl
RwLock m_WsConnectionsLock;
std::vector<Ref<WebSocketConnection>> m_WsConnections;
+ std::function<void()> m_ShutdownCallback;
+
// Metrics
metrics::OperationTiming m_HttpRequests;
@@ -72,13 +78,13 @@ struct HttpComputeService::Impl
std::string ClientHostname; // empty if no hostname was provided
};
- // Remote queue registry — all three maps share the same RemoteQueueInfo objects.
+ // Remote queue registry - all three maps share the same RemoteQueueInfo objects.
// All maps are guarded by m_RemoteQueueLock.
RwLock m_RemoteQueueLock;
- std::unordered_map<Oid, Ref<RemoteQueueInfo>, Oid::Hasher> m_RemoteQueuesByToken; // Token → info
- std::unordered_map<int, Ref<RemoteQueueInfo>> m_RemoteQueuesByQueueId; // QueueId → info
- std::unordered_map<std::string, Ref<RemoteQueueInfo>> m_RemoteQueuesByTag; // idempotency key → info
+ std::unordered_map<Oid, Ref<RemoteQueueInfo>, Oid::Hasher> m_RemoteQueuesByToken; // Token -> info
+ std::unordered_map<int, Ref<RemoteQueueInfo>> m_RemoteQueuesByQueueId; // QueueId -> info
+ std::unordered_map<std::string, Ref<RemoteQueueInfo>> m_RemoteQueuesByTag; // idempotency key -> info
LoggerRef Log() { return m_Log; }
@@ -103,25 +109,28 @@ struct HttpComputeService::Impl
void HandleSubmitAction(HttpServerRequest& HttpReq, int QueueId, int Priority, const WorkerDesc* Worker);
// WebSocket / observer
- void OnWebSocketOpen(Ref<WebSocketConnection> Connection);
+ void OnWebSocketOpen(Ref<WebSocketConnection> Connection, std::string_view RelativeUri);
void OnWebSocketClose(WebSocketConnection& Conn, uint16_t Code);
void OnActionsCompleted(std::span<const IComputeCompletionObserver::CompletedActionNotification> Actions);
void RegisterRoutes();
- Impl(HttpComputeService* Self,
- CidStore& InCidStore,
- IHttpStatsService& StatsService,
- const std::filesystem::path& BaseDir,
- int32_t MaxConcurrentActions)
+ Impl(HttpComputeService* Self,
+ ChunkStore& InActionStore,
+ ChunkStore& InWorkerStore,
+ IHttpStatsService& StatsService,
+ std::filesystem::path BaseDir,
+ int32_t MaxConcurrentActions)
: m_Self(Self)
- , m_CidStore(InCidStore)
+ , m_ActionStore(InActionStore)
+ , m_WorkerStore(InWorkerStore)
+ , m_CombinedResolver(InActionStore, InWorkerStore)
, m_StatsService(StatsService)
, m_Log(logging::Get("compute"))
- , m_BaseDir(BaseDir)
- , m_ComputeService(InCidStore)
+ , m_BaseDir(std::move(BaseDir))
+ , m_ComputeService(m_CombinedResolver)
{
- m_ComputeService.AddLocalRunner(InCidStore, m_BaseDir / "local", MaxConcurrentActions);
+ m_ComputeService.AddLocalRunner(m_CombinedResolver, m_BaseDir / "local", MaxConcurrentActions);
m_ComputeService.WaitUntilReady();
m_StatsService.RegisterHandler("compute", *m_Self);
RegisterRoutes();
@@ -183,6 +192,65 @@ HttpComputeService::Impl::RegisterRoutes()
HttpVerb::kPost);
m_Router.RegisterRoute(
+ "session/drain",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+
+ if (m_ComputeService.RequestStateTransition(ComputeServiceSession::SessionState::Draining))
+ {
+ CbObjectWriter Cbo;
+ Cbo << "state"sv << ToString(m_ComputeService.GetSessionState());
+ return HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save());
+ }
+
+ CbObjectWriter Cbo;
+ Cbo << "error"sv
+ << "Cannot transition to Draining from current state"sv;
+ HttpReq.WriteResponse(HttpResponseCode::Conflict, Cbo.Save());
+ },
+ HttpVerb::kPost);
+
+ m_Router.RegisterRoute(
+ "session/status",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+
+ CbObjectWriter Cbo;
+ Cbo << "state"sv << ToString(m_ComputeService.GetSessionState());
+ auto Counts = m_ComputeService.GetActionCounts();
+ Cbo << "actions_pending"sv << Counts.Pending;
+ Cbo << "actions_running"sv << Counts.Running;
+ Cbo << "actions_completed"sv << Counts.Completed;
+ HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save());
+ },
+ HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
+ "session/sunset",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+
+ if (m_ComputeService.RequestStateTransition(ComputeServiceSession::SessionState::Sunset))
+ {
+ CbObjectWriter Cbo;
+ Cbo << "state"sv << ToString(m_ComputeService.GetSessionState());
+ HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save());
+
+ if (m_ShutdownCallback)
+ {
+ m_ShutdownCallback();
+ }
+ return;
+ }
+
+ CbObjectWriter Cbo;
+ Cbo << "error"sv
+ << "Cannot transition to Sunset from current state"sv;
+ HttpReq.WriteResponse(HttpResponseCode::Conflict, Cbo.Save());
+ },
+ HttpVerb::kPost);
+
+ m_Router.RegisterRoute(
"workers",
[this](HttpRouterRequest& Req) { HandleWorkersGet(Req.ServerRequest()); },
HttpVerb::kGet);
@@ -499,9 +567,19 @@ HttpComputeService::Impl::RegisterRoutes()
return HttpReq.WriteResponse(HttpResponseCode::Forbidden);
}
- m_ComputeService.StartRecording(m_CidStore, m_BaseDir / "recording");
+ std::filesystem::path RecordingPath = m_BaseDir / "recording";
- return HttpReq.WriteResponse(HttpResponseCode::OK);
+ if (!m_ComputeService.StartRecording(m_CombinedResolver, RecordingPath))
+ {
+ CbObjectWriter Cbo;
+ Cbo << "error"
+ << "recording is already active";
+ return HttpReq.WriteResponse(HttpResponseCode::Conflict, Cbo.Save());
+ }
+
+ CbObjectWriter Cbo;
+ Cbo << "path" << RecordingPath.string();
+ return HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save());
},
HttpVerb::kPost);
@@ -515,9 +593,19 @@ HttpComputeService::Impl::RegisterRoutes()
return HttpReq.WriteResponse(HttpResponseCode::Forbidden);
}
- m_ComputeService.StopRecording();
+ std::filesystem::path RecordingPath = m_BaseDir / "recording";
+
+ if (!m_ComputeService.StopRecording())
+ {
+ CbObjectWriter Cbo;
+ Cbo << "error"
+ << "no recording is active";
+ return HttpReq.WriteResponse(HttpResponseCode::Conflict, Cbo.Save());
+ }
- return HttpReq.WriteResponse(HttpResponseCode::OK);
+ CbObjectWriter Cbo;
+ Cbo << "path" << RecordingPath.string();
+ return HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save());
},
HttpVerb::kPost);
@@ -584,7 +672,7 @@ HttpComputeService::Impl::RegisterRoutes()
},
HttpVerb::kGet | HttpVerb::kPost);
- // Queue creation routes — these remain separate since local creates a plain queue
+ // Queue creation routes - these remain separate since local creates a plain queue
// while remote additionally generates an OID token for external access.
m_Router.RegisterRoute(
@@ -638,7 +726,7 @@ HttpComputeService::Impl::RegisterRoutes()
return HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save());
}
- // Queue has since expired — clean up stale entries and fall through to create a new one
+ // Queue has since expired - clean up stale entries and fall through to create a new one
m_RemoteQueuesByToken.erase(Existing->Token);
m_RemoteQueuesByQueueId.erase(Existing->QueueId);
m_RemoteQueuesByTag.erase(It);
@@ -667,7 +755,7 @@ HttpComputeService::Impl::RegisterRoutes()
},
HttpVerb::kPost);
- // Unified queue routes — {queueref} accepts both local integer IDs and remote OID tokens.
+ // Unified queue routes - {queueref} accepts both local integer IDs and remote OID tokens.
// ResolveQueueRef() handles access control (local-only for integer IDs) and token resolution.
m_Router.RegisterRoute(
@@ -1017,7 +1105,7 @@ HttpComputeService::Impl::RegisterRoutes()
},
HttpVerb::kPost);
- // WebSocket upgrade endpoint — the handler logic lives in
+ // WebSocket upgrade endpoint - the handler logic lives in
// HttpComputeService::OnWebSocket* methods; this route merely
// satisfies the router so the upgrade request isn't rejected.
m_Router.RegisterRoute(
@@ -1028,11 +1116,12 @@ HttpComputeService::Impl::RegisterRoutes()
//////////////////////////////////////////////////////////////////////////
-HttpComputeService::HttpComputeService(CidStore& InCidStore,
+HttpComputeService::HttpComputeService(ChunkStore& InActionStore,
+ ChunkStore& InWorkerStore,
IHttpStatsService& StatsService,
const std::filesystem::path& BaseDir,
int32_t MaxConcurrentActions)
-: m_Impl(std::make_unique<Impl>(this, InCidStore, StatsService, BaseDir, MaxConcurrentActions))
+: m_Impl(std::make_unique<Impl>(this, InActionStore, InWorkerStore, StatsService, BaseDir, MaxConcurrentActions))
{
}
@@ -1058,6 +1147,12 @@ HttpComputeService::GetActionCounts()
return m_Impl->m_ComputeService.GetActionCounts();
}
+void
+HttpComputeService::SetShutdownCallback(std::function<void()> Callback)
+{
+ m_Impl->m_ShutdownCallback = std::move(Callback);
+}
+
const char*
HttpComputeService::BaseUri() const
{
@@ -1146,7 +1241,7 @@ HttpComputeService::Impl::ResolveQueueRef(HttpServerRequest& HttpReq, std::strin
{
if (OidMatcher(Capture))
{
- // Remote OID token — accessible from any client
+ // Remote OID token - accessible from any client
const Oid Token = Oid::FromHexString(Capture);
const int QueueId = ResolveQueueToken(Token);
@@ -1158,7 +1253,7 @@ HttpComputeService::Impl::ResolveQueueRef(HttpServerRequest& HttpReq, std::strin
return QueueId;
}
- // Local integer queue ID — restricted to local machine requests
+ // Local integer queue ID - restricted to local machine requests
if (!HttpReq.IsLocalMachineRequest())
{
HttpReq.WriteResponse(HttpResponseCode::Forbidden);
@@ -1233,7 +1328,7 @@ HttpComputeService::Impl::IngestPackageAttachments(HttpServerRequest& HttpReq, c
OutStats.Bytes += CompressedSize;
++OutStats.Count;
- const CidStore::InsertResult InsertResult = m_CidStore.AddChunk(DataView.GetCompressed().Flatten().AsIoBuffer(), DataHash);
+ const ChunkStore::InsertResult InsertResult = m_ActionStore.AddChunk(DataView.GetCompressed().Flatten().AsIoBuffer(), DataHash);
if (InsertResult.New)
{
@@ -1251,7 +1346,7 @@ HttpComputeService::Impl::CheckAttachments(const CbObject& ActionObj, std::vecto
ActionObj.IterateAttachments([&](CbFieldView Field) {
const IoHash FileHash = Field.AsHash();
- if (!m_CidStore.ContainsChunk(FileHash))
+ if (!m_ActionStore.ContainsChunk(FileHash))
{
NeedList.push_back(FileHash);
}
@@ -1501,7 +1596,7 @@ HttpComputeService::Impl::HandleWorkerRequest(HttpServerRequest& HttpReq, const
CbPackage WorkerPackage;
WorkerPackage.SetObject(WorkerSpec);
- m_CidStore.FilterChunks(ChunkSet);
+ m_WorkerStore.FilterChunks(ChunkSet);
if (ChunkSet.IsEmpty())
{
@@ -1550,8 +1645,8 @@ HttpComputeService::Impl::HandleWorkerRequest(HttpServerRequest& HttpReq, const
TotalAttachmentBytes += Buffer.GetCompressedSize();
++AttachmentCount;
- const CidStore::InsertResult InsertResult =
- m_CidStore.AddChunk(Buffer.GetCompressed().Flatten().AsIoBuffer(), DataHash);
+ const ChunkStore::InsertResult InsertResult =
+ m_WorkerStore.AddChunk(Buffer.GetCompressed().Flatten().AsIoBuffer(), DataHash);
if (InsertResult.New)
{
@@ -1589,9 +1684,9 @@ HttpComputeService::Impl::HandleWorkerRequest(HttpServerRequest& HttpReq, const
//
void
-HttpComputeService::OnWebSocketOpen(Ref<WebSocketConnection> Connection)
+HttpComputeService::OnWebSocketOpen(Ref<WebSocketConnection> Connection, std::string_view RelativeUri)
{
- m_Impl->OnWebSocketOpen(std::move(Connection));
+ m_Impl->OnWebSocketOpen(std::move(Connection), RelativeUri);
}
void
@@ -1614,12 +1709,13 @@ HttpComputeService::OnActionsCompleted(std::span<const CompletedActionNotificati
//////////////////////////////////////////////////////////////////////////
//
-// Impl — WebSocket / observer
+// Impl - WebSocket / observer
//
void
-HttpComputeService::Impl::OnWebSocketOpen(Ref<WebSocketConnection> Connection)
+HttpComputeService::Impl::OnWebSocketOpen(Ref<WebSocketConnection> Connection, std::string_view RelativeUri)
{
+ ZEN_UNUSED(RelativeUri);
ZEN_INFO("compute WebSocket client connected");
m_WsConnectionsLock.WithExclusiveLock([&] { m_WsConnections.push_back(std::move(Connection)); });
}