diff options
Diffstat (limited to 'zenhttp')
| -rw-r--r-- | zenhttp/httpserver.cpp | 2 | ||||
| -rw-r--r-- | zenhttp/httpsys.cpp | 248 | ||||
| -rw-r--r-- | zenhttp/httpsys.h | 21 | ||||
| -rw-r--r-- | zenhttp/include/zenhttp/httpserver.h | 2 | ||||
| -rw-r--r-- | zenhttp/workthreadpool.cpp | 77 | ||||
| -rw-r--r-- | zenhttp/workthreadpool.h | 47 | ||||
| -rw-r--r-- | zenhttp/zenhttp.vcxproj | 2 | ||||
| -rw-r--r-- | zenhttp/zenhttp.vcxproj.filters | 2 |
8 files changed, 341 insertions, 60 deletions
diff --git a/zenhttp/httpserver.cpp b/zenhttp/httpserver.cpp index 193426ed2..69974ca06 100644 --- a/zenhttp/httpserver.cpp +++ b/zenhttp/httpserver.cpp @@ -571,7 +571,7 @@ CreateHttpServer() #if 0 return new HttpUwsServer; #elif ZEN_WITH_HTTPSYS - return new HttpSysServer{std::thread::hardware_concurrency()}; + return new HttpSysServer(std::thread::hardware_concurrency(), /* background worker threads */ 16); #else return new HttpNullServer; #endif diff --git a/zenhttp/httpsys.cpp b/zenhttp/httpsys.cpp index de3069bb8..f88563097 100644 --- a/zenhttp/httpsys.cpp +++ b/zenhttp/httpsys.cpp @@ -129,16 +129,18 @@ GetAcceptType(const HTTP_REQUEST* HttpRequest) class HttpSysRequestHandler { public: - explicit HttpSysRequestHandler(HttpSysTransaction& InRequest) : m_Request(InRequest) {} + explicit HttpSysRequestHandler(HttpSysTransaction& Transaction) : m_Transaction(Transaction) {} virtual ~HttpSysRequestHandler() = default; virtual void IssueRequest(std::error_code& ErrorCode) = 0; virtual HttpSysRequestHandler* HandleCompletion(ULONG IoResult, ULONG_PTR NumberOfBytesTransferred) = 0; + HttpSysTransaction& Transaction() { return m_Transaction; } - HttpSysTransaction& Transaction() { return m_Request; } + HttpSysRequestHandler(const HttpSysRequestHandler&) = delete; + HttpSysRequestHandler& operator=(const HttpSysRequestHandler&) = delete; private: - HttpSysTransaction& m_Request; // Related HTTP transaction object + HttpSysTransaction& m_Transaction; }; /** @@ -184,12 +186,16 @@ public: virtual void WriteResponse(HttpResponseCode ResponseCode) override; virtual void WriteResponse(HttpResponseCode ResponseCode, HttpContentType ContentType, std::span<IoBuffer> Blobs) override; virtual void WriteResponse(HttpResponseCode ResponseCode, HttpContentType ContentType, std::u8string_view ResponseString) override; + virtual void WriteResponseAsync(std::function<void(HttpServerRequest&)>&& ContinuationHandler) override; using HttpServerRequest::WriteResponse; - HttpSysTransaction& m_HttpTx; - HttpMessageResponseRequest* m_Response = nullptr; // TODO: make this more general - IoBuffer m_PayloadBuffer; + HttpSysServerRequest(const HttpSysServerRequest&) = delete; + HttpSysServerRequest& operator=(const HttpSysServerRequest&) = delete; + + HttpSysTransaction& m_HttpTx; + HttpSysRequestHandler* m_NextCompletionHandler = nullptr; + IoBuffer m_PayloadBuffer; }; /** HTTP transaction @@ -218,7 +224,9 @@ public: ULONG_PTR NumberOfBytesTransferred, PTP_IO Io); - void IssueInitialRequest(std::error_code& ErrorCode); + void IssueInitialRequest(std::error_code& ErrorCode); + bool IssueNextRequest(HttpSysRequestHandler* NewCompletionHandler); + PTP_IO Iocp(); HANDLE RequestQueueHandle(); inline OVERLAPPED* Overlapped() { return &m_HttpOverlapped; } @@ -227,6 +235,8 @@ public: HttpSysServerRequest& InvokeRequestHandler(HttpService& Service, IoBuffer Payload); + HttpSysServerRequest& ServerRequest() { return m_HandlerRequest.value(); } + private: OVERLAPPED m_HttpOverlapped{}; HttpSysServer& m_HttpServer; @@ -239,8 +249,6 @@ private: Ref<IHttpPackageHandler> m_PackageHandler; }; -////////////////////////////////////////////////////////////////////////// - /** * @brief HTTP request response I/O request handler * @@ -588,6 +596,108 @@ HttpMessageResponseRequest::IssueRequest(std::error_code& ErrorCode) } } +/** HTTP completion handler for async work + + This is used to allow work to be taken off the request handler threads + and to support posting responses asynchronously. + */ + +class HttpAsyncWorkRequest : public HttpSysRequestHandler +{ +public: + HttpAsyncWorkRequest(HttpSysTransaction& Tx, std::function<void(HttpServerRequest&)>&& Response); + ~HttpAsyncWorkRequest(); + + virtual void IssueRequest(std::error_code& ErrorCode) override final; + virtual HttpSysRequestHandler* HandleCompletion(ULONG IoResult, ULONG_PTR NumberOfBytesTransferred) override; + +private: + struct AsyncWorkItem : public IWork + { + virtual void Execute() override; + + AsyncWorkItem(HttpSysTransaction& InTx, std::function<void(HttpServerRequest&)>&& InHandler) + : Tx(InTx) + , Handler(std::move(InHandler)) + { + } + + HttpSysTransaction& Tx; + std::function<void(HttpServerRequest&)> Handler; + }; + + Ref<AsyncWorkItem> m_WorkItem; +}; + +HttpAsyncWorkRequest::HttpAsyncWorkRequest(HttpSysTransaction& Tx, std::function<void(HttpServerRequest&)>&& Response) +: HttpSysRequestHandler(Tx) +{ + m_WorkItem = new AsyncWorkItem(Tx, std::move(Response)); +} + +HttpAsyncWorkRequest::~HttpAsyncWorkRequest() +{ +} + +void +HttpAsyncWorkRequest::IssueRequest(std::error_code& ErrorCode) +{ + ErrorCode.clear(); + + Transaction().Server().WorkPool().ScheduleWork(m_WorkItem); +} + +HttpSysRequestHandler* +HttpAsyncWorkRequest::HandleCompletion(ULONG IoResult, ULONG_PTR NumberOfBytesTransferred) +{ + // This ought to not be called since there should be no outstanding I/O request + // when this completion handler is active + + ZEN_UNUSED(IoResult, NumberOfBytesTransferred); + + ZEN_WARN("Unexpected I/O completion during async work! IoResult: {}, NumberOfBytesTransferred: {}", IoResult, NumberOfBytesTransferred); + + return this; +} + +void +HttpAsyncWorkRequest::AsyncWorkItem::Execute() +{ + using namespace fmt::literals; + + try + { + HttpSysServerRequest& ThisRequest = Tx.ServerRequest(); + + ThisRequest.m_NextCompletionHandler = nullptr; + + Handler(ThisRequest); + + // TODO: should Handler be destroyed at this point to ensure there + // are no outstanding references into state which could be + // deleted asynchronously as a result of issuing the response? + + if (HttpSysRequestHandler* NextHandler = ThisRequest.m_NextCompletionHandler) + { + return (void)Tx.IssueNextRequest(NextHandler); + } + else if (!ThisRequest.IsHandled()) + { + return (void)Tx.IssueNextRequest(new HttpMessageResponseRequest(Tx, 404, "Not found"sv)); + } + else + { + // "Handled" but no request handler? Shouldn't ever happen + return (void)Tx.IssueNextRequest( + new HttpMessageResponseRequest(Tx, 500, "Response generated but no request handler scheduled"sv)); + } + } + catch (std::exception& Ex) + { + return (void)Tx.IssueNextRequest(new HttpMessageResponseRequest(Tx, 500, "Exception thrown in async work: '{}'"_format(Ex.what()))); + } +} + /** _________ / _____/ ______________ __ ___________ @@ -597,10 +707,11 @@ HttpMessageResponseRequest::IssueRequest(std::error_code& ErrorCode) \/ \/ \/ */ -HttpSysServer::HttpSysServer(unsigned int ThreadCount) +HttpSysServer::HttpSysServer(unsigned int ThreadCount, unsigned int AsyncWorkThreadCount) : m_Log(logging::Get("http")) , m_RequestLog(logging::Get("http_requests")) , m_ThreadPool(ThreadCount) +, m_AsyncWorkPool(AsyncWorkThreadCount) { ULONG Result = HttpInitialize(HTTPAPI_VERSION_2, HTTP_INITIALIZE_SERVER, nullptr); @@ -611,6 +722,8 @@ HttpSysServer::HttpSysServer(unsigned int ThreadCount) m_IsHttpInitialized = true; m_IsOk = true; + + ZEN_INFO("http.sys server started, using {} I/O threads and {} async worker threads", ThreadCount, AsyncWorkThreadCount); } HttpSysServer::~HttpSysServer() @@ -915,6 +1028,47 @@ HttpSysTransaction::IoCompletionCallback(PTP_CALLBACK_INSTANCE Instance, } } +bool +HttpSysTransaction::IssueNextRequest(HttpSysRequestHandler* NewCompletionHandler) +{ + HttpSysRequestHandler* CurrentHandler = m_CompletionHandler; + m_CompletionHandler = NewCompletionHandler; + + auto _ = MakeGuard([this, CurrentHandler] { + if ((CurrentHandler != &m_InitialHttpHandler) && (CurrentHandler != m_CompletionHandler)) + { + delete CurrentHandler; + } + }); + + if (NewCompletionHandler == nullptr) + { + return false; + } + + try + { + std::error_code ErrorCode; + m_CompletionHandler->IssueRequest(ErrorCode); + + if (!ErrorCode) + { + return true; + } + + ZEN_ERROR("IssueRequest() failed: '{}'", ErrorCode.message()); + } + catch (std::exception& Ex) + { + ZEN_ERROR("exception caught in IssueNextRequest(): '{}'", Ex.what()); + } + + // something went wrong, no request is pending + m_CompletionHandler = nullptr; + + return false; +} + HttpSysTransaction::Status HttpSysTransaction::HandleCompletion(ULONG IoResult, ULONG_PTR NumberOfBytesTransferred) { @@ -934,38 +1088,9 @@ HttpSysTransaction::HandleCompletion(ULONG IoResult, ULONG_PTR NumberOfBytesTran m_HttpServer.OnHandlingRequest(); } - m_CompletionHandler = CurrentHandler->HandleCompletion(IoResult, NumberOfBytesTransferred); + auto NewCompletionHandler = CurrentHandler->HandleCompletion(IoResult, NumberOfBytesTransferred); - if (m_CompletionHandler) - { - try - { - std::error_code ErrorCode; - m_CompletionHandler->IssueRequest(ErrorCode); - - if (ErrorCode) - { - ZEN_ERROR("IssueRequest() failed {}", ErrorCode.message()); - } - else - { - IsRequestPending = true; - } - } - catch (std::exception& Ex) - { - ZEN_ERROR("exception caught from IssueRequest(): {}", Ex.what()); - - // something went wrong, no request is pending - } - } - else - { - if (CurrentHandler != &m_InitialHttpHandler) - { - delete CurrentHandler; - } - } + IsRequestPending = IssueNextRequest(NewCompletionHandler); } // Ensure new requests are enqueued as necessary @@ -1213,13 +1338,15 @@ HttpSysServerRequest::WriteResponse(HttpResponseCode ResponseCode) { ZEN_ASSERT(IsHandled() == false); - m_Response = new HttpMessageResponseRequest(m_HttpTx, (uint16_t)ResponseCode); + auto Response = new HttpMessageResponseRequest(m_HttpTx, (uint16_t)ResponseCode); if (SuppressBody()) { - m_Response->SuppressResponseBody(); + Response->SuppressResponseBody(); } + m_NextCompletionHandler = Response; + SetIsHandled(); } @@ -1228,13 +1355,15 @@ HttpSysServerRequest::WriteResponse(HttpResponseCode ResponseCode, HttpContentTy { ZEN_ASSERT(IsHandled() == false); - m_Response = new HttpMessageResponseRequest(m_HttpTx, (uint16_t)ResponseCode, ContentType, Blobs); + auto Response = new HttpMessageResponseRequest(m_HttpTx, (uint16_t)ResponseCode, ContentType, Blobs); if (SuppressBody()) { - m_Response->SuppressResponseBody(); + Response->SuppressResponseBody(); } + m_NextCompletionHandler = Response; + SetIsHandled(); } @@ -1243,17 +1372,32 @@ HttpSysServerRequest::WriteResponse(HttpResponseCode ResponseCode, HttpContentTy { ZEN_ASSERT(IsHandled() == false); - m_Response = + auto Response = new HttpMessageResponseRequest(m_HttpTx, (uint16_t)ResponseCode, ContentType, ResponseString.data(), ResponseString.size()); if (SuppressBody()) { - m_Response->SuppressResponseBody(); + Response->SuppressResponseBody(); } + m_NextCompletionHandler = Response; + SetIsHandled(); } +void +HttpSysServerRequest::WriteResponseAsync(std::function<void(HttpServerRequest&)>&& ContinuationHandler) +{ + if (m_HttpTx.Server().IsAsyncResponseEnabled()) + { + ContinuationHandler(m_HttpTx.ServerRequest()); + } + else + { + m_NextCompletionHandler = new HttpAsyncWorkRequest(m_HttpTx, std::move(ContinuationHandler)); + } +} + ////////////////////////////////////////////////////////////////////////// InitialRequestHandler::InitialRequestHandler(HttpSysTransaction& InRequest) : HttpSysRequestHandler(InRequest) @@ -1452,14 +1596,14 @@ InitialRequestHandler::HandleCompletion(ULONG IoResult, ULONG_PTR NumberOfBytesT HttpSysServerRequest& ThisRequest = Transaction().InvokeRequestHandler(*Service, m_PayloadBuffer); - if (!ThisRequest.IsHandled()) + if (HttpSysRequestHandler* Response = ThisRequest.m_NextCompletionHandler) { - return new HttpMessageResponseRequest(Transaction(), 404, "Not found"sv); + return Response; } - if (HttpMessageResponseRequest* Response = ThisRequest.m_Response) + if (!ThisRequest.IsHandled()) { - return Response; + return new HttpMessageResponseRequest(Transaction(), 404, "Not found"sv); } } @@ -1503,4 +1647,4 @@ HttpSysServer::RegisterService(HttpService& Service) } } // namespace zen -#endif
\ No newline at end of file +#endif diff --git a/zenhttp/httpsys.h b/zenhttp/httpsys.h index a8395b283..46ba122cc 100644 --- a/zenhttp/httpsys.h +++ b/zenhttp/httpsys.h @@ -16,6 +16,7 @@ # define _WINSOCKAPI_ # include <zencore/windows.h> # include "iothreadpool.h" +# include "workthreadpool.h" # include <http.h> @@ -35,7 +36,7 @@ class HttpSysServer : public HttpServer friend class HttpSysTransaction; public: - explicit HttpSysServer(unsigned int ThreadCount); + explicit HttpSysServer(unsigned int ThreadCount, unsigned int AsyncWorkThreadCount); ~HttpSysServer(); // HttpServer interface implementation @@ -45,6 +46,11 @@ public: virtual void RequestExit() override; virtual void RegisterService(HttpService& Service) override; + WorkerThreadPool& WorkPool() { return m_AsyncWorkPool; } + + inline bool IsOk() const { return m_IsOk; } + inline bool IsAsyncResponseEnabled() const { return m_IsAsyncResponseEnabled; } + private: void Initialize(const wchar_t* UrlPath); void Cleanup(); @@ -53,8 +59,6 @@ private: void OnHandlingRequest(); void IssueNewRequestMaybe(); - inline bool IsOk() const { return m_IsOk; } - void RegisterService(const char* Endpoint, HttpService& Service); void UnregisterService(const char* Endpoint, HttpService& Service); @@ -63,10 +67,13 @@ private: spdlog::logger& m_RequestLog; spdlog::logger& Log() { return m_Log; } - bool m_IsOk = false; - bool m_IsHttpInitialized = false; - bool m_IsRequestLoggingEnabled = false; - WinIoThreadPool m_ThreadPool; + bool m_IsOk = false; + bool m_IsHttpInitialized = false; + bool m_IsRequestLoggingEnabled = false; + bool m_IsAsyncResponseEnabled = true; + + WinIoThreadPool m_ThreadPool; + WorkerThreadPool m_AsyncWorkPool; std::wstring m_BaseUri; // http://*:nnnn/ HTTP_SERVER_SESSION_ID m_HttpSessionId = 0; diff --git a/zenhttp/include/zenhttp/httpserver.h b/zenhttp/include/zenhttp/httpserver.h index 6a7dc8a70..3e6608f11 100644 --- a/zenhttp/include/zenhttp/httpserver.h +++ b/zenhttp/include/zenhttp/httpserver.h @@ -97,6 +97,8 @@ public: void WriteResponse(HttpResponseCode ResponseCode, HttpContentType ContentType, std::string_view ResponseString); void WriteResponse(HttpResponseCode ResponseCode, HttpContentType ContentType, IoBuffer Blob); + virtual void WriteResponseAsync(std::function<void(HttpServerRequest&)>&& ContinuationHandler) = 0; + protected: enum { diff --git a/zenhttp/workthreadpool.cpp b/zenhttp/workthreadpool.cpp new file mode 100644 index 000000000..41eaaae94 --- /dev/null +++ b/zenhttp/workthreadpool.cpp @@ -0,0 +1,77 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "workthreadpool.h" + +#include <zencore/logging.h> + +namespace zen { + +namespace detail { + struct LambdaWork : IWork + { + LambdaWork(auto Work) : WorkFunction(Work) {} + virtual void Execute() override { WorkFunction(); } + + std::function<void()> WorkFunction; + }; +} // namespace detail + +WorkerThreadPool::WorkerThreadPool(int InThreadCount) +{ + for (int i = 0; i < InThreadCount; ++i) + { + m_WorkerThreads.emplace_back(&WorkerThreadPool::WorkerThreadFunction, this); + } +} + +WorkerThreadPool::~WorkerThreadPool() +{ + m_WorkQueue.CompleteAdding(); + + for (std::thread& Thread : m_WorkerThreads) + { + Thread.join(); + } + + m_WorkerThreads.clear(); +} + +void +WorkerThreadPool::ScheduleWork(Ref<IWork> Work) +{ + m_WorkQueue.Enqueue(std::move(Work)); +} + +void +WorkerThreadPool::ScheduleWork(std::function<void()>&& Work) +{ + m_WorkQueue.Enqueue(new detail::LambdaWork(Work)); +} + +void +WorkerThreadPool::WorkerThreadFunction() +{ + do + { + Ref<IWork> Work; + if (m_WorkQueue.WaitAndDequeue(Work)) + { + try + { + Work->Execute(); + } + catch (std::exception& e) + { + Work->m_Exception = std::current_exception(); + + ZEN_WARN("Caught exception in worker thread: {}", e.what()); + } + } + else + { + return; + } + } while (true); +} + +} // namespace zen
\ No newline at end of file diff --git a/zenhttp/workthreadpool.h b/zenhttp/workthreadpool.h new file mode 100644 index 000000000..6581cc08f --- /dev/null +++ b/zenhttp/workthreadpool.h @@ -0,0 +1,47 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/zencore.h> + +#include <zencore/blockingqueue.h> +#include <zencore/refcount.h> +#include <zencore/windows.h> + +#include <exception> +#include <functional> +#include <system_error> +#include <thread> +#include <vector> + +namespace zen { + +struct IWork : public RefCounted +{ + virtual void Execute() = 0; + + inline std::exception_ptr GetException() { return m_Exception; } + +private: + std::exception_ptr m_Exception; + + friend class WorkerThreadPool; +}; + +class WorkerThreadPool +{ +public: + WorkerThreadPool(int InThreadCount); + ~WorkerThreadPool(); + + void ScheduleWork(Ref<IWork> Work); + void ScheduleWork(std::function<void()>&& Work); + + void WorkerThreadFunction(); + +private: + std::vector<std::thread> m_WorkerThreads; + BlockingQueue<Ref<IWork>> m_WorkQueue; +}; + +} // namespace zen diff --git a/zenhttp/zenhttp.vcxproj b/zenhttp/zenhttp.vcxproj index 899cf4bd1..1fc64bfc2 100644 --- a/zenhttp/zenhttp.vcxproj +++ b/zenhttp/zenhttp.vcxproj @@ -100,6 +100,7 @@ <ClCompile Include="httpsys.cpp" /> <ClCompile Include="httpuws.cpp" /> <ClCompile Include="iothreadpool.cpp" /> + <ClCompile Include="workthreadpool.cpp" /> <ClCompile Include="zenhttp.cpp" /> </ItemGroup> <ItemGroup> @@ -112,6 +113,7 @@ <ClInclude Include="include\zenhttp\httpshared.h" /> <ClInclude Include="include\zenhttp\zenhttp.h" /> <ClInclude Include="iothreadpool.h" /> + <ClInclude Include="workthreadpool.h" /> </ItemGroup> <ItemGroup> <ProjectReference Include="..\zencore\zencore.vcxproj"> diff --git a/zenhttp/zenhttp.vcxproj.filters b/zenhttp/zenhttp.vcxproj.filters index 2e968055c..e57e7a712 100644 --- a/zenhttp/zenhttp.vcxproj.filters +++ b/zenhttp/zenhttp.vcxproj.filters @@ -9,6 +9,7 @@ <ClCompile Include="httpuws.cpp" /> <ClCompile Include="httpshared.cpp" /> <ClCompile Include="zenhttp.cpp" /> + <ClCompile Include="workthreadpool.cpp" /> </ItemGroup> <ItemGroup> <ClInclude Include="include\zenhttp\httpclient.h" /> @@ -20,6 +21,7 @@ <ClInclude Include="httpuws.h" /> <ClInclude Include="include\zenhttp\httpcommon.h" /> <ClInclude Include="include\zenhttp\httpshared.h" /> + <ClInclude Include="workthreadpool.h" /> </ItemGroup> <ItemGroup> <None Include="xmake.lua" /> |