aboutsummaryrefslogtreecommitdiff
path: root/zenhttp
diff options
context:
space:
mode:
Diffstat (limited to 'zenhttp')
-rw-r--r--zenhttp/httpserver.cpp2
-rw-r--r--zenhttp/httpsys.cpp248
-rw-r--r--zenhttp/httpsys.h21
-rw-r--r--zenhttp/include/zenhttp/httpserver.h2
-rw-r--r--zenhttp/workthreadpool.cpp77
-rw-r--r--zenhttp/workthreadpool.h47
-rw-r--r--zenhttp/zenhttp.vcxproj2
-rw-r--r--zenhttp/zenhttp.vcxproj.filters2
8 files changed, 341 insertions, 60 deletions
diff --git a/zenhttp/httpserver.cpp b/zenhttp/httpserver.cpp
index 193426ed2..69974ca06 100644
--- a/zenhttp/httpserver.cpp
+++ b/zenhttp/httpserver.cpp
@@ -571,7 +571,7 @@ CreateHttpServer()
#if 0
return new HttpUwsServer;
#elif ZEN_WITH_HTTPSYS
- return new HttpSysServer{std::thread::hardware_concurrency()};
+ return new HttpSysServer(std::thread::hardware_concurrency(), /* background worker threads */ 16);
#else
return new HttpNullServer;
#endif
diff --git a/zenhttp/httpsys.cpp b/zenhttp/httpsys.cpp
index de3069bb8..f88563097 100644
--- a/zenhttp/httpsys.cpp
+++ b/zenhttp/httpsys.cpp
@@ -129,16 +129,18 @@ GetAcceptType(const HTTP_REQUEST* HttpRequest)
class HttpSysRequestHandler
{
public:
- explicit HttpSysRequestHandler(HttpSysTransaction& InRequest) : m_Request(InRequest) {}
+ explicit HttpSysRequestHandler(HttpSysTransaction& Transaction) : m_Transaction(Transaction) {}
virtual ~HttpSysRequestHandler() = default;
virtual void IssueRequest(std::error_code& ErrorCode) = 0;
virtual HttpSysRequestHandler* HandleCompletion(ULONG IoResult, ULONG_PTR NumberOfBytesTransferred) = 0;
+ HttpSysTransaction& Transaction() { return m_Transaction; }
- HttpSysTransaction& Transaction() { return m_Request; }
+ HttpSysRequestHandler(const HttpSysRequestHandler&) = delete;
+ HttpSysRequestHandler& operator=(const HttpSysRequestHandler&) = delete;
private:
- HttpSysTransaction& m_Request; // Related HTTP transaction object
+ HttpSysTransaction& m_Transaction;
};
/**
@@ -184,12 +186,16 @@ public:
virtual void WriteResponse(HttpResponseCode ResponseCode) override;
virtual void WriteResponse(HttpResponseCode ResponseCode, HttpContentType ContentType, std::span<IoBuffer> Blobs) override;
virtual void WriteResponse(HttpResponseCode ResponseCode, HttpContentType ContentType, std::u8string_view ResponseString) override;
+ virtual void WriteResponseAsync(std::function<void(HttpServerRequest&)>&& ContinuationHandler) override;
using HttpServerRequest::WriteResponse;
- HttpSysTransaction& m_HttpTx;
- HttpMessageResponseRequest* m_Response = nullptr; // TODO: make this more general
- IoBuffer m_PayloadBuffer;
+ HttpSysServerRequest(const HttpSysServerRequest&) = delete;
+ HttpSysServerRequest& operator=(const HttpSysServerRequest&) = delete;
+
+ HttpSysTransaction& m_HttpTx;
+ HttpSysRequestHandler* m_NextCompletionHandler = nullptr;
+ IoBuffer m_PayloadBuffer;
};
/** HTTP transaction
@@ -218,7 +224,9 @@ public:
ULONG_PTR NumberOfBytesTransferred,
PTP_IO Io);
- void IssueInitialRequest(std::error_code& ErrorCode);
+ void IssueInitialRequest(std::error_code& ErrorCode);
+ bool IssueNextRequest(HttpSysRequestHandler* NewCompletionHandler);
+
PTP_IO Iocp();
HANDLE RequestQueueHandle();
inline OVERLAPPED* Overlapped() { return &m_HttpOverlapped; }
@@ -227,6 +235,8 @@ public:
HttpSysServerRequest& InvokeRequestHandler(HttpService& Service, IoBuffer Payload);
+ HttpSysServerRequest& ServerRequest() { return m_HandlerRequest.value(); }
+
private:
OVERLAPPED m_HttpOverlapped{};
HttpSysServer& m_HttpServer;
@@ -239,8 +249,6 @@ private:
Ref<IHttpPackageHandler> m_PackageHandler;
};
-//////////////////////////////////////////////////////////////////////////
-
/**
* @brief HTTP request response I/O request handler
*
@@ -588,6 +596,108 @@ HttpMessageResponseRequest::IssueRequest(std::error_code& ErrorCode)
}
}
+/** HTTP completion handler for async work
+
+ This is used to allow work to be taken off the request handler threads
+ and to support posting responses asynchronously.
+ */
+
+class HttpAsyncWorkRequest : public HttpSysRequestHandler
+{
+public:
+ HttpAsyncWorkRequest(HttpSysTransaction& Tx, std::function<void(HttpServerRequest&)>&& Response);
+ ~HttpAsyncWorkRequest();
+
+ virtual void IssueRequest(std::error_code& ErrorCode) override final;
+ virtual HttpSysRequestHandler* HandleCompletion(ULONG IoResult, ULONG_PTR NumberOfBytesTransferred) override;
+
+private:
+ struct AsyncWorkItem : public IWork
+ {
+ virtual void Execute() override;
+
+ AsyncWorkItem(HttpSysTransaction& InTx, std::function<void(HttpServerRequest&)>&& InHandler)
+ : Tx(InTx)
+ , Handler(std::move(InHandler))
+ {
+ }
+
+ HttpSysTransaction& Tx;
+ std::function<void(HttpServerRequest&)> Handler;
+ };
+
+ Ref<AsyncWorkItem> m_WorkItem;
+};
+
+HttpAsyncWorkRequest::HttpAsyncWorkRequest(HttpSysTransaction& Tx, std::function<void(HttpServerRequest&)>&& Response)
+: HttpSysRequestHandler(Tx)
+{
+ m_WorkItem = new AsyncWorkItem(Tx, std::move(Response));
+}
+
+HttpAsyncWorkRequest::~HttpAsyncWorkRequest()
+{
+}
+
+void
+HttpAsyncWorkRequest::IssueRequest(std::error_code& ErrorCode)
+{
+ ErrorCode.clear();
+
+ Transaction().Server().WorkPool().ScheduleWork(m_WorkItem);
+}
+
+HttpSysRequestHandler*
+HttpAsyncWorkRequest::HandleCompletion(ULONG IoResult, ULONG_PTR NumberOfBytesTransferred)
+{
+ // This ought to not be called since there should be no outstanding I/O request
+ // when this completion handler is active
+
+ ZEN_UNUSED(IoResult, NumberOfBytesTransferred);
+
+ ZEN_WARN("Unexpected I/O completion during async work! IoResult: {}, NumberOfBytesTransferred: {}", IoResult, NumberOfBytesTransferred);
+
+ return this;
+}
+
+void
+HttpAsyncWorkRequest::AsyncWorkItem::Execute()
+{
+ using namespace fmt::literals;
+
+ try
+ {
+ HttpSysServerRequest& ThisRequest = Tx.ServerRequest();
+
+ ThisRequest.m_NextCompletionHandler = nullptr;
+
+ Handler(ThisRequest);
+
+ // TODO: should Handler be destroyed at this point to ensure there
+ // are no outstanding references into state which could be
+ // deleted asynchronously as a result of issuing the response?
+
+ if (HttpSysRequestHandler* NextHandler = ThisRequest.m_NextCompletionHandler)
+ {
+ return (void)Tx.IssueNextRequest(NextHandler);
+ }
+ else if (!ThisRequest.IsHandled())
+ {
+ return (void)Tx.IssueNextRequest(new HttpMessageResponseRequest(Tx, 404, "Not found"sv));
+ }
+ else
+ {
+ // "Handled" but no request handler? Shouldn't ever happen
+ return (void)Tx.IssueNextRequest(
+ new HttpMessageResponseRequest(Tx, 500, "Response generated but no request handler scheduled"sv));
+ }
+ }
+ catch (std::exception& Ex)
+ {
+ return (void)Tx.IssueNextRequest(new HttpMessageResponseRequest(Tx, 500, "Exception thrown in async work: '{}'"_format(Ex.what())));
+ }
+}
+
/**
_________
/ _____/ ______________ __ ___________
@@ -597,10 +707,11 @@ HttpMessageResponseRequest::IssueRequest(std::error_code& ErrorCode)
\/ \/ \/
*/
-HttpSysServer::HttpSysServer(unsigned int ThreadCount)
+HttpSysServer::HttpSysServer(unsigned int ThreadCount, unsigned int AsyncWorkThreadCount)
: m_Log(logging::Get("http"))
, m_RequestLog(logging::Get("http_requests"))
, m_ThreadPool(ThreadCount)
+, m_AsyncWorkPool(AsyncWorkThreadCount)
{
ULONG Result = HttpInitialize(HTTPAPI_VERSION_2, HTTP_INITIALIZE_SERVER, nullptr);
@@ -611,6 +722,8 @@ HttpSysServer::HttpSysServer(unsigned int ThreadCount)
m_IsHttpInitialized = true;
m_IsOk = true;
+
+ ZEN_INFO("http.sys server started, using {} I/O threads and {} async worker threads", ThreadCount, AsyncWorkThreadCount);
}
HttpSysServer::~HttpSysServer()
@@ -915,6 +1028,47 @@ HttpSysTransaction::IoCompletionCallback(PTP_CALLBACK_INSTANCE Instance,
}
}
+bool
+HttpSysTransaction::IssueNextRequest(HttpSysRequestHandler* NewCompletionHandler)
+{
+ HttpSysRequestHandler* CurrentHandler = m_CompletionHandler;
+ m_CompletionHandler = NewCompletionHandler;
+
+ auto _ = MakeGuard([this, CurrentHandler] {
+ if ((CurrentHandler != &m_InitialHttpHandler) && (CurrentHandler != m_CompletionHandler))
+ {
+ delete CurrentHandler;
+ }
+ });
+
+ if (NewCompletionHandler == nullptr)
+ {
+ return false;
+ }
+
+ try
+ {
+ std::error_code ErrorCode;
+ m_CompletionHandler->IssueRequest(ErrorCode);
+
+ if (!ErrorCode)
+ {
+ return true;
+ }
+
+ ZEN_ERROR("IssueRequest() failed: '{}'", ErrorCode.message());
+ }
+ catch (std::exception& Ex)
+ {
+ ZEN_ERROR("exception caught in IssueNextRequest(): '{}'", Ex.what());
+ }
+
+ // something went wrong, no request is pending
+ m_CompletionHandler = nullptr;
+
+ return false;
+}
+
HttpSysTransaction::Status
HttpSysTransaction::HandleCompletion(ULONG IoResult, ULONG_PTR NumberOfBytesTransferred)
{
@@ -934,38 +1088,9 @@ HttpSysTransaction::HandleCompletion(ULONG IoResult, ULONG_PTR NumberOfBytesTran
m_HttpServer.OnHandlingRequest();
}
- m_CompletionHandler = CurrentHandler->HandleCompletion(IoResult, NumberOfBytesTransferred);
+ auto NewCompletionHandler = CurrentHandler->HandleCompletion(IoResult, NumberOfBytesTransferred);
- if (m_CompletionHandler)
- {
- try
- {
- std::error_code ErrorCode;
- m_CompletionHandler->IssueRequest(ErrorCode);
-
- if (ErrorCode)
- {
- ZEN_ERROR("IssueRequest() failed {}", ErrorCode.message());
- }
- else
- {
- IsRequestPending = true;
- }
- }
- catch (std::exception& Ex)
- {
- ZEN_ERROR("exception caught from IssueRequest(): {}", Ex.what());
-
- // something went wrong, no request is pending
- }
- }
- else
- {
- if (CurrentHandler != &m_InitialHttpHandler)
- {
- delete CurrentHandler;
- }
- }
+ IsRequestPending = IssueNextRequest(NewCompletionHandler);
}
// Ensure new requests are enqueued as necessary
@@ -1213,13 +1338,15 @@ HttpSysServerRequest::WriteResponse(HttpResponseCode ResponseCode)
{
ZEN_ASSERT(IsHandled() == false);
- m_Response = new HttpMessageResponseRequest(m_HttpTx, (uint16_t)ResponseCode);
+ auto Response = new HttpMessageResponseRequest(m_HttpTx, (uint16_t)ResponseCode);
if (SuppressBody())
{
- m_Response->SuppressResponseBody();
+ Response->SuppressResponseBody();
}
+ m_NextCompletionHandler = Response;
+
SetIsHandled();
}
@@ -1228,13 +1355,15 @@ HttpSysServerRequest::WriteResponse(HttpResponseCode ResponseCode, HttpContentTy
{
ZEN_ASSERT(IsHandled() == false);
- m_Response = new HttpMessageResponseRequest(m_HttpTx, (uint16_t)ResponseCode, ContentType, Blobs);
+ auto Response = new HttpMessageResponseRequest(m_HttpTx, (uint16_t)ResponseCode, ContentType, Blobs);
if (SuppressBody())
{
- m_Response->SuppressResponseBody();
+ Response->SuppressResponseBody();
}
+ m_NextCompletionHandler = Response;
+
SetIsHandled();
}
@@ -1243,17 +1372,32 @@ HttpSysServerRequest::WriteResponse(HttpResponseCode ResponseCode, HttpContentTy
{
ZEN_ASSERT(IsHandled() == false);
- m_Response =
+ auto Response =
new HttpMessageResponseRequest(m_HttpTx, (uint16_t)ResponseCode, ContentType, ResponseString.data(), ResponseString.size());
if (SuppressBody())
{
- m_Response->SuppressResponseBody();
+ Response->SuppressResponseBody();
}
+ m_NextCompletionHandler = Response;
+
SetIsHandled();
}
+void
+HttpSysServerRequest::WriteResponseAsync(std::function<void(HttpServerRequest&)>&& ContinuationHandler)
+{
+ if (m_HttpTx.Server().IsAsyncResponseEnabled())
+ {
+ ContinuationHandler(m_HttpTx.ServerRequest());
+ }
+ else
+ {
+ m_NextCompletionHandler = new HttpAsyncWorkRequest(m_HttpTx, std::move(ContinuationHandler));
+ }
+}
+
//////////////////////////////////////////////////////////////////////////
InitialRequestHandler::InitialRequestHandler(HttpSysTransaction& InRequest) : HttpSysRequestHandler(InRequest)
@@ -1452,14 +1596,14 @@ InitialRequestHandler::HandleCompletion(ULONG IoResult, ULONG_PTR NumberOfBytesT
HttpSysServerRequest& ThisRequest = Transaction().InvokeRequestHandler(*Service, m_PayloadBuffer);
- if (!ThisRequest.IsHandled())
+ if (HttpSysRequestHandler* Response = ThisRequest.m_NextCompletionHandler)
{
- return new HttpMessageResponseRequest(Transaction(), 404, "Not found"sv);
+ return Response;
}
- if (HttpMessageResponseRequest* Response = ThisRequest.m_Response)
+ if (!ThisRequest.IsHandled())
{
- return Response;
+ return new HttpMessageResponseRequest(Transaction(), 404, "Not found"sv);
}
}
@@ -1503,4 +1647,4 @@ HttpSysServer::RegisterService(HttpService& Service)
}
} // namespace zen
-#endif \ No newline at end of file
+#endif
diff --git a/zenhttp/httpsys.h b/zenhttp/httpsys.h
index a8395b283..46ba122cc 100644
--- a/zenhttp/httpsys.h
+++ b/zenhttp/httpsys.h
@@ -16,6 +16,7 @@
# define _WINSOCKAPI_
# include <zencore/windows.h>
# include "iothreadpool.h"
+# include "workthreadpool.h"
# include <http.h>
@@ -35,7 +36,7 @@ class HttpSysServer : public HttpServer
friend class HttpSysTransaction;
public:
- explicit HttpSysServer(unsigned int ThreadCount);
+ explicit HttpSysServer(unsigned int ThreadCount, unsigned int AsyncWorkThreadCount);
~HttpSysServer();
// HttpServer interface implementation
@@ -45,6 +46,11 @@ public:
virtual void RequestExit() override;
virtual void RegisterService(HttpService& Service) override;
+ WorkerThreadPool& WorkPool() { return m_AsyncWorkPool; }
+
+ inline bool IsOk() const { return m_IsOk; }
+ inline bool IsAsyncResponseEnabled() const { return m_IsAsyncResponseEnabled; }
+
private:
void Initialize(const wchar_t* UrlPath);
void Cleanup();
@@ -53,8 +59,6 @@ private:
void OnHandlingRequest();
void IssueNewRequestMaybe();
- inline bool IsOk() const { return m_IsOk; }
-
void RegisterService(const char* Endpoint, HttpService& Service);
void UnregisterService(const char* Endpoint, HttpService& Service);
@@ -63,10 +67,13 @@ private:
spdlog::logger& m_RequestLog;
spdlog::logger& Log() { return m_Log; }
- bool m_IsOk = false;
- bool m_IsHttpInitialized = false;
- bool m_IsRequestLoggingEnabled = false;
- WinIoThreadPool m_ThreadPool;
+ bool m_IsOk = false;
+ bool m_IsHttpInitialized = false;
+ bool m_IsRequestLoggingEnabled = false;
+ bool m_IsAsyncResponseEnabled = true;
+
+ WinIoThreadPool m_ThreadPool;
+ WorkerThreadPool m_AsyncWorkPool;
std::wstring m_BaseUri; // http://*:nnnn/
HTTP_SERVER_SESSION_ID m_HttpSessionId = 0;
diff --git a/zenhttp/include/zenhttp/httpserver.h b/zenhttp/include/zenhttp/httpserver.h
index 6a7dc8a70..3e6608f11 100644
--- a/zenhttp/include/zenhttp/httpserver.h
+++ b/zenhttp/include/zenhttp/httpserver.h
@@ -97,6 +97,8 @@ public:
void WriteResponse(HttpResponseCode ResponseCode, HttpContentType ContentType, std::string_view ResponseString);
void WriteResponse(HttpResponseCode ResponseCode, HttpContentType ContentType, IoBuffer Blob);
+ virtual void WriteResponseAsync(std::function<void(HttpServerRequest&)>&& ContinuationHandler) = 0;
+
protected:
enum
{
diff --git a/zenhttp/workthreadpool.cpp b/zenhttp/workthreadpool.cpp
new file mode 100644
index 000000000..41eaaae94
--- /dev/null
+++ b/zenhttp/workthreadpool.cpp
@@ -0,0 +1,77 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "workthreadpool.h"
+
+#include <zencore/logging.h>
+
+namespace zen {
+
+namespace detail {
+ struct LambdaWork : IWork
+ {
+ LambdaWork(auto Work) : WorkFunction(Work) {}
+ virtual void Execute() override { WorkFunction(); }
+
+ std::function<void()> WorkFunction;
+ };
+} // namespace detail
+
+WorkerThreadPool::WorkerThreadPool(int InThreadCount)
+{
+ for (int i = 0; i < InThreadCount; ++i)
+ {
+ m_WorkerThreads.emplace_back(&WorkerThreadPool::WorkerThreadFunction, this);
+ }
+}
+
+WorkerThreadPool::~WorkerThreadPool()
+{
+ m_WorkQueue.CompleteAdding();
+
+ for (std::thread& Thread : m_WorkerThreads)
+ {
+ Thread.join();
+ }
+
+ m_WorkerThreads.clear();
+}
+
+void
+WorkerThreadPool::ScheduleWork(Ref<IWork> Work)
+{
+ m_WorkQueue.Enqueue(std::move(Work));
+}
+
+void
+WorkerThreadPool::ScheduleWork(std::function<void()>&& Work)
+{
+ m_WorkQueue.Enqueue(new detail::LambdaWork(Work));
+}
+
+void
+WorkerThreadPool::WorkerThreadFunction()
+{
+ do
+ {
+ Ref<IWork> Work;
+ if (m_WorkQueue.WaitAndDequeue(Work))
+ {
+ try
+ {
+ Work->Execute();
+ }
+ catch (std::exception& e)
+ {
+ Work->m_Exception = std::current_exception();
+
+ ZEN_WARN("Caught exception in worker thread: {}", e.what());
+ }
+ }
+ else
+ {
+ return;
+ }
+ } while (true);
+}
+
+} // namespace zen \ No newline at end of file
diff --git a/zenhttp/workthreadpool.h b/zenhttp/workthreadpool.h
new file mode 100644
index 000000000..6581cc08f
--- /dev/null
+++ b/zenhttp/workthreadpool.h
@@ -0,0 +1,47 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/zencore.h>
+
+#include <zencore/blockingqueue.h>
+#include <zencore/refcount.h>
+#include <zencore/windows.h>
+
+#include <exception>
+#include <functional>
+#include <system_error>
+#include <thread>
+#include <vector>
+
+namespace zen {
+
+struct IWork : public RefCounted
+{
+ virtual void Execute() = 0;
+
+ inline std::exception_ptr GetException() { return m_Exception; }
+
+private:
+ std::exception_ptr m_Exception;
+
+ friend class WorkerThreadPool;
+};
+
+class WorkerThreadPool
+{
+public:
+ WorkerThreadPool(int InThreadCount);
+ ~WorkerThreadPool();
+
+ void ScheduleWork(Ref<IWork> Work);
+ void ScheduleWork(std::function<void()>&& Work);
+
+ void WorkerThreadFunction();
+
+private:
+ std::vector<std::thread> m_WorkerThreads;
+ BlockingQueue<Ref<IWork>> m_WorkQueue;
+};
+
+} // namespace zen
diff --git a/zenhttp/zenhttp.vcxproj b/zenhttp/zenhttp.vcxproj
index 899cf4bd1..1fc64bfc2 100644
--- a/zenhttp/zenhttp.vcxproj
+++ b/zenhttp/zenhttp.vcxproj
@@ -100,6 +100,7 @@
<ClCompile Include="httpsys.cpp" />
<ClCompile Include="httpuws.cpp" />
<ClCompile Include="iothreadpool.cpp" />
+ <ClCompile Include="workthreadpool.cpp" />
<ClCompile Include="zenhttp.cpp" />
</ItemGroup>
<ItemGroup>
@@ -112,6 +113,7 @@
<ClInclude Include="include\zenhttp\httpshared.h" />
<ClInclude Include="include\zenhttp\zenhttp.h" />
<ClInclude Include="iothreadpool.h" />
+ <ClInclude Include="workthreadpool.h" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\zencore\zencore.vcxproj">
diff --git a/zenhttp/zenhttp.vcxproj.filters b/zenhttp/zenhttp.vcxproj.filters
index 2e968055c..e57e7a712 100644
--- a/zenhttp/zenhttp.vcxproj.filters
+++ b/zenhttp/zenhttp.vcxproj.filters
@@ -9,6 +9,7 @@
<ClCompile Include="httpuws.cpp" />
<ClCompile Include="httpshared.cpp" />
<ClCompile Include="zenhttp.cpp" />
+ <ClCompile Include="workthreadpool.cpp" />
</ItemGroup>
<ItemGroup>
<ClInclude Include="include\zenhttp\httpclient.h" />
@@ -20,6 +21,7 @@
<ClInclude Include="httpuws.h" />
<ClInclude Include="include\zenhttp\httpcommon.h" />
<ClInclude Include="include\zenhttp\httpshared.h" />
+ <ClInclude Include="workthreadpool.h" />
</ItemGroup>
<ItemGroup>
<None Include="xmake.lua" />