diff options
| author | Stefan Boberg <[email protected]> | 2021-10-06 13:59:18 +0200 |
|---|---|---|
| committer | GitHub <[email protected]> | 2021-10-06 13:59:18 +0200 |
| commit | fa48ebf89e06edc9d3bdd26b119417df20902bdd (patch) | |
| tree | 2ea8c3e06282ff537d5985b94f8dc129bd60e9e8 | |
| parent | Added option to specify path to logfile. (diff) | |
| download | zen-fa48ebf89e06edc9d3bdd26b119417df20902bdd.tar.xz zen-fa48ebf89e06edc9d3bdd26b119417df20902bdd.zip | |
Support for asynchronous HTTP response processing (#19)
This change introduces WriteResponseAsync which can be used to move potentially slow request handler code (like upstream lookups) off the I/O service thread to ensure we are always able to serve as many HTTP requests as possible. The current implementation defaults to 16 async worker threads and there is currently no back-pressure.
- Added RequestStats - Metrics for network requests. Aggregates tracking of duration, payload sizes into a single class for ease of use
- Added some metrics on upstream communication
Co-authored-by: Per Larsson <[email protected]>
| -rw-r--r-- | zencore/include/zencore/stats.h | 62 | ||||
| -rw-r--r-- | zencore/stats.cpp | 71 | ||||
| -rw-r--r-- | zenhttp/httpserver.cpp | 2 | ||||
| -rw-r--r-- | zenhttp/httpsys.cpp | 248 | ||||
| -rw-r--r-- | zenhttp/httpsys.h | 21 | ||||
| -rw-r--r-- | zenhttp/include/zenhttp/httpserver.h | 2 | ||||
| -rw-r--r-- | zenhttp/workthreadpool.cpp | 77 | ||||
| -rw-r--r-- | zenhttp/workthreadpool.h | 47 | ||||
| -rw-r--r-- | zenhttp/zenhttp.vcxproj | 2 | ||||
| -rw-r--r-- | zenhttp/zenhttp.vcxproj.filters | 2 | ||||
| -rw-r--r-- | zenserver/cache/structuredcache.cpp | 228 | ||||
| -rw-r--r-- | zenserver/cache/structuredcache.h | 12 | ||||
| -rw-r--r-- | zenserver/testing/httptest.cpp | 41 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamcache.cpp | 2 |
14 files changed, 627 insertions, 190 deletions
diff --git a/zencore/include/zencore/stats.h b/zencore/include/zencore/stats.h index 2e567d614..884bb53f6 100644 --- a/zencore/include/zencore/stats.h +++ b/zencore/include/zencore/stats.h @@ -221,10 +221,70 @@ private: Histogram m_Histogram; }; +/** Metrics for network requests + + Aggregates tracking of duration, payload sizes into a single + class + + */ +class RequestStats +{ +public: + RequestStats(int32_t SampleCount = 514); + ~RequestStats(); + + void Update(int64_t Duration, int64_t Bytes); + uint64_t Count() const; + + // Timing + + int64_t MaxDuration() const { return m_BytesHistogram.Max(); } + int64_t MinDuration() const { return m_BytesHistogram.Min(); } + double MeanDuration() const { return m_BytesHistogram.Mean(); } + SampleSnapshot DurationSnapshot() const { return m_RequestTimeHistogram.Snapshot(); } + double Rate1() { return m_RequestMeter.Rate1(); } + double Rate5() { return m_RequestMeter.Rate5(); } + double Rate15() { return m_RequestMeter.Rate15(); } + double MeanRate() const { return m_RequestMeter.MeanRate(); } + + // Bytes + + int64_t MaxBytes() const { return m_BytesHistogram.Max(); } + int64_t MinBytes() const { return m_BytesHistogram.Min(); } + double MeanBytes() const { return m_BytesHistogram.Mean(); } + SampleSnapshot BytesSnapshot() const { return m_BytesHistogram.Snapshot(); } + double ByteRate1() { return m_BytesMeter.Rate1(); } + double ByteRate5() { return m_BytesMeter.Rate5(); } + double ByteRate15() { return m_BytesMeter.Rate15(); } + double ByteMeanRate() const { return m_BytesMeter.MeanRate(); } + + struct Scope + { + Scope(OperationTiming& Outer); + ~Scope(); + + void Cancel(); + + private: + OperationTiming& m_Outer; + uint64_t m_StartTick; + }; + + void EmitSnapshot(std::string_view Tag, CbObjectWriter& Cbo); + +private: + Meter m_RequestMeter; + Meter m_BytesMeter; + Histogram m_RequestTimeHistogram; + Histogram m_BytesHistogram; +}; + void EmitSnapshot(std::string_view Tag, OperationTiming& Stat, CbObjectWriter& Cbo); -void EmitSnapshot(std::string_view Tag, const Histogram& Stat, CbObjectWriter& Cbo); +void EmitSnapshot(std::string_view Tag, const Histogram& Stat, CbObjectWriter& Cbo, double ConversionFactor); void EmitSnapshot(std::string_view Tag, Meter& Stat, CbObjectWriter& Cbo); +void EmitSnapshot(const Histogram& Stat, CbObjectWriter& Cbo, double ConversionFactor); + } // namespace zen::metrics namespace zen { diff --git a/zencore/stats.cpp b/zencore/stats.cpp index 6afd217e4..0c0647999 100644 --- a/zencore/stats.cpp +++ b/zencore/stats.cpp @@ -419,6 +419,58 @@ OperationTiming::Scope::Cancel() ////////////////////////////////////////////////////////////////////////// +RequestStats::RequestStats(int32_t SampleCount) : m_RequestTimeHistogram{SampleCount}, m_BytesHistogram{SampleCount} +{ +} + +RequestStats::~RequestStats() +{ +} + +void +RequestStats::Update(int64_t Duration, int64_t Bytes) +{ + m_RequestMeter.Mark(1); + m_RequestTimeHistogram.Update(Duration); + + m_BytesMeter.Mark(Bytes); + m_BytesHistogram.Update(Bytes); +} + +uint64_t +RequestStats::Count() const +{ + return m_RequestMeter.Count(); +} + +////////////////////////////////////////////////////////////////////////// + +void +EmitSnapshot(Meter& Stat, CbObjectWriter& Cbo) +{ + Cbo << "count" << Stat.Count(); + Cbo << "rate_mean" << Stat.MeanRate(); + Cbo << "rate_1" << Stat.Rate1() << "rate_5" << Stat.Rate5() << "rate_15" << Stat.Rate15(); +} + +void +RequestStats::EmitSnapshot(std::string_view Tag, CbObjectWriter& Cbo) +{ + Cbo.BeginObject(Tag); + + Cbo.BeginObject("requests"); + metrics::EmitSnapshot(m_RequestMeter, Cbo); + metrics::EmitSnapshot(m_RequestTimeHistogram, Cbo, GetHifreqTimerToSeconds()); + Cbo.EndObject(); + + Cbo.BeginObject("bytes"); + metrics::EmitSnapshot(m_BytesMeter, Cbo); + metrics::EmitSnapshot(m_BytesHistogram, Cbo, 1.0); + Cbo.EndObject(); + + Cbo.EndObject(); +} + void EmitSnapshot(std::string_view Tag, OperationTiming& Stat, CbObjectWriter& Cbo) { @@ -427,7 +479,6 @@ EmitSnapshot(std::string_view Tag, OperationTiming& Stat, CbObjectWriter& Cbo) SampleSnapshot Snap = Stat.Snapshot(); Cbo << "count" << Stat.Count(); - Cbo << "rate_mean" << Stat.MeanRate(); Cbo << "rate_1" << Stat.Rate1() << "rate_5" << Stat.Rate5() << "rate_15" << Stat.Rate15(); @@ -442,18 +493,22 @@ EmitSnapshot(std::string_view Tag, OperationTiming& Stat, CbObjectWriter& Cbo) } void -EmitSnapshot(std::string_view Tag, const Histogram& Stat, CbObjectWriter& Cbo) +EmitSnapshot(std::string_view Tag, const Histogram& Stat, CbObjectWriter& Cbo, double ConversionFactor) { Cbo.BeginObject(Tag); + EmitSnapshot(Stat, Cbo, ConversionFactor); + Cbo.EndObject(); +} +void +EmitSnapshot(const Histogram& Stat, CbObjectWriter& Cbo, double ConversionFactor) +{ SampleSnapshot Snap = Stat.Snapshot(); - Cbo << "count" << Stat.Count() << "avg" << Stat.Mean(); - Cbo << "min" << Stat.Min() << "max" << Stat.Max(); - Cbo << "p75" << Snap.Get75Percentile() << "p95" << Snap.Get95Percentile() << "p99" << Snap.Get99Percentile() << "p999" - << Snap.Get999Percentile(); - - Cbo.EndObject(); + Cbo << "count" << Stat.Count() * ConversionFactor << "avg" << Stat.Mean() * ConversionFactor; + Cbo << "min" << Stat.Min() * ConversionFactor << "max" << Stat.Max() * ConversionFactor; + Cbo << "p75" << Snap.Get75Percentile() * ConversionFactor << "p95" << Snap.Get95Percentile() * ConversionFactor << "p99" + << Snap.Get99Percentile() * ConversionFactor << "p999" << Snap.Get999Percentile() * ConversionFactor; } void diff --git a/zenhttp/httpserver.cpp b/zenhttp/httpserver.cpp index 193426ed2..69974ca06 100644 --- a/zenhttp/httpserver.cpp +++ b/zenhttp/httpserver.cpp @@ -571,7 +571,7 @@ CreateHttpServer() #if 0 return new HttpUwsServer; #elif ZEN_WITH_HTTPSYS - return new HttpSysServer{std::thread::hardware_concurrency()}; + return new HttpSysServer(std::thread::hardware_concurrency(), /* background worker threads */ 16); #else return new HttpNullServer; #endif diff --git a/zenhttp/httpsys.cpp b/zenhttp/httpsys.cpp index de3069bb8..f88563097 100644 --- a/zenhttp/httpsys.cpp +++ b/zenhttp/httpsys.cpp @@ -129,16 +129,18 @@ GetAcceptType(const HTTP_REQUEST* HttpRequest) class HttpSysRequestHandler { public: - explicit HttpSysRequestHandler(HttpSysTransaction& InRequest) : m_Request(InRequest) {} + explicit HttpSysRequestHandler(HttpSysTransaction& Transaction) : m_Transaction(Transaction) {} virtual ~HttpSysRequestHandler() = default; virtual void IssueRequest(std::error_code& ErrorCode) = 0; virtual HttpSysRequestHandler* HandleCompletion(ULONG IoResult, ULONG_PTR NumberOfBytesTransferred) = 0; + HttpSysTransaction& Transaction() { return m_Transaction; } - HttpSysTransaction& Transaction() { return m_Request; } + HttpSysRequestHandler(const HttpSysRequestHandler&) = delete; + HttpSysRequestHandler& operator=(const HttpSysRequestHandler&) = delete; private: - HttpSysTransaction& m_Request; // Related HTTP transaction object + HttpSysTransaction& m_Transaction; }; /** @@ -184,12 +186,16 @@ public: virtual void WriteResponse(HttpResponseCode ResponseCode) override; virtual void WriteResponse(HttpResponseCode ResponseCode, HttpContentType ContentType, std::span<IoBuffer> Blobs) override; virtual void WriteResponse(HttpResponseCode ResponseCode, HttpContentType ContentType, std::u8string_view ResponseString) override; + virtual void WriteResponseAsync(std::function<void(HttpServerRequest&)>&& ContinuationHandler) override; using HttpServerRequest::WriteResponse; - HttpSysTransaction& m_HttpTx; - HttpMessageResponseRequest* m_Response = nullptr; // TODO: make this more general - IoBuffer m_PayloadBuffer; + HttpSysServerRequest(const HttpSysServerRequest&) = delete; + HttpSysServerRequest& operator=(const HttpSysServerRequest&) = delete; + + HttpSysTransaction& m_HttpTx; + HttpSysRequestHandler* m_NextCompletionHandler = nullptr; + IoBuffer m_PayloadBuffer; }; /** HTTP transaction @@ -218,7 +224,9 @@ public: ULONG_PTR NumberOfBytesTransferred, PTP_IO Io); - void IssueInitialRequest(std::error_code& ErrorCode); + void IssueInitialRequest(std::error_code& ErrorCode); + bool IssueNextRequest(HttpSysRequestHandler* NewCompletionHandler); + PTP_IO Iocp(); HANDLE RequestQueueHandle(); inline OVERLAPPED* Overlapped() { return &m_HttpOverlapped; } @@ -227,6 +235,8 @@ public: HttpSysServerRequest& InvokeRequestHandler(HttpService& Service, IoBuffer Payload); + HttpSysServerRequest& ServerRequest() { return m_HandlerRequest.value(); } + private: OVERLAPPED m_HttpOverlapped{}; HttpSysServer& m_HttpServer; @@ -239,8 +249,6 @@ private: Ref<IHttpPackageHandler> m_PackageHandler; }; -////////////////////////////////////////////////////////////////////////// - /** * @brief HTTP request response I/O request handler * @@ -588,6 +596,108 @@ HttpMessageResponseRequest::IssueRequest(std::error_code& ErrorCode) } } +/** HTTP completion handler for async work + + This is used to allow work to be taken off the request handler threads + and to support posting responses asynchronously. + */ + +class HttpAsyncWorkRequest : public HttpSysRequestHandler +{ +public: + HttpAsyncWorkRequest(HttpSysTransaction& Tx, std::function<void(HttpServerRequest&)>&& Response); + ~HttpAsyncWorkRequest(); + + virtual void IssueRequest(std::error_code& ErrorCode) override final; + virtual HttpSysRequestHandler* HandleCompletion(ULONG IoResult, ULONG_PTR NumberOfBytesTransferred) override; + +private: + struct AsyncWorkItem : public IWork + { + virtual void Execute() override; + + AsyncWorkItem(HttpSysTransaction& InTx, std::function<void(HttpServerRequest&)>&& InHandler) + : Tx(InTx) + , Handler(std::move(InHandler)) + { + } + + HttpSysTransaction& Tx; + std::function<void(HttpServerRequest&)> Handler; + }; + + Ref<AsyncWorkItem> m_WorkItem; +}; + +HttpAsyncWorkRequest::HttpAsyncWorkRequest(HttpSysTransaction& Tx, std::function<void(HttpServerRequest&)>&& Response) +: HttpSysRequestHandler(Tx) +{ + m_WorkItem = new AsyncWorkItem(Tx, std::move(Response)); +} + +HttpAsyncWorkRequest::~HttpAsyncWorkRequest() +{ +} + +void +HttpAsyncWorkRequest::IssueRequest(std::error_code& ErrorCode) +{ + ErrorCode.clear(); + + Transaction().Server().WorkPool().ScheduleWork(m_WorkItem); +} + +HttpSysRequestHandler* +HttpAsyncWorkRequest::HandleCompletion(ULONG IoResult, ULONG_PTR NumberOfBytesTransferred) +{ + // This ought to not be called since there should be no outstanding I/O request + // when this completion handler is active + + ZEN_UNUSED(IoResult, NumberOfBytesTransferred); + + ZEN_WARN("Unexpected I/O completion during async work! IoResult: {}, NumberOfBytesTransferred: {}", IoResult, NumberOfBytesTransferred); + + return this; +} + +void +HttpAsyncWorkRequest::AsyncWorkItem::Execute() +{ + using namespace fmt::literals; + + try + { + HttpSysServerRequest& ThisRequest = Tx.ServerRequest(); + + ThisRequest.m_NextCompletionHandler = nullptr; + + Handler(ThisRequest); + + // TODO: should Handler be destroyed at this point to ensure there + // are no outstanding references into state which could be + // deleted asynchronously as a result of issuing the response? + + if (HttpSysRequestHandler* NextHandler = ThisRequest.m_NextCompletionHandler) + { + return (void)Tx.IssueNextRequest(NextHandler); + } + else if (!ThisRequest.IsHandled()) + { + return (void)Tx.IssueNextRequest(new HttpMessageResponseRequest(Tx, 404, "Not found"sv)); + } + else + { + // "Handled" but no request handler? Shouldn't ever happen + return (void)Tx.IssueNextRequest( + new HttpMessageResponseRequest(Tx, 500, "Response generated but no request handler scheduled"sv)); + } + } + catch (std::exception& Ex) + { + return (void)Tx.IssueNextRequest(new HttpMessageResponseRequest(Tx, 500, "Exception thrown in async work: '{}'"_format(Ex.what()))); + } +} + /** _________ / _____/ ______________ __ ___________ @@ -597,10 +707,11 @@ HttpMessageResponseRequest::IssueRequest(std::error_code& ErrorCode) \/ \/ \/ */ -HttpSysServer::HttpSysServer(unsigned int ThreadCount) +HttpSysServer::HttpSysServer(unsigned int ThreadCount, unsigned int AsyncWorkThreadCount) : m_Log(logging::Get("http")) , m_RequestLog(logging::Get("http_requests")) , m_ThreadPool(ThreadCount) +, m_AsyncWorkPool(AsyncWorkThreadCount) { ULONG Result = HttpInitialize(HTTPAPI_VERSION_2, HTTP_INITIALIZE_SERVER, nullptr); @@ -611,6 +722,8 @@ HttpSysServer::HttpSysServer(unsigned int ThreadCount) m_IsHttpInitialized = true; m_IsOk = true; + + ZEN_INFO("http.sys server started, using {} I/O threads and {} async worker threads", ThreadCount, AsyncWorkThreadCount); } HttpSysServer::~HttpSysServer() @@ -915,6 +1028,47 @@ HttpSysTransaction::IoCompletionCallback(PTP_CALLBACK_INSTANCE Instance, } } +bool +HttpSysTransaction::IssueNextRequest(HttpSysRequestHandler* NewCompletionHandler) +{ + HttpSysRequestHandler* CurrentHandler = m_CompletionHandler; + m_CompletionHandler = NewCompletionHandler; + + auto _ = MakeGuard([this, CurrentHandler] { + if ((CurrentHandler != &m_InitialHttpHandler) && (CurrentHandler != m_CompletionHandler)) + { + delete CurrentHandler; + } + }); + + if (NewCompletionHandler == nullptr) + { + return false; + } + + try + { + std::error_code ErrorCode; + m_CompletionHandler->IssueRequest(ErrorCode); + + if (!ErrorCode) + { + return true; + } + + ZEN_ERROR("IssueRequest() failed: '{}'", ErrorCode.message()); + } + catch (std::exception& Ex) + { + ZEN_ERROR("exception caught in IssueNextRequest(): '{}'", Ex.what()); + } + + // something went wrong, no request is pending + m_CompletionHandler = nullptr; + + return false; +} + HttpSysTransaction::Status HttpSysTransaction::HandleCompletion(ULONG IoResult, ULONG_PTR NumberOfBytesTransferred) { @@ -934,38 +1088,9 @@ HttpSysTransaction::HandleCompletion(ULONG IoResult, ULONG_PTR NumberOfBytesTran m_HttpServer.OnHandlingRequest(); } - m_CompletionHandler = CurrentHandler->HandleCompletion(IoResult, NumberOfBytesTransferred); + auto NewCompletionHandler = CurrentHandler->HandleCompletion(IoResult, NumberOfBytesTransferred); - if (m_CompletionHandler) - { - try - { - std::error_code ErrorCode; - m_CompletionHandler->IssueRequest(ErrorCode); - - if (ErrorCode) - { - ZEN_ERROR("IssueRequest() failed {}", ErrorCode.message()); - } - else - { - IsRequestPending = true; - } - } - catch (std::exception& Ex) - { - ZEN_ERROR("exception caught from IssueRequest(): {}", Ex.what()); - - // something went wrong, no request is pending - } - } - else - { - if (CurrentHandler != &m_InitialHttpHandler) - { - delete CurrentHandler; - } - } + IsRequestPending = IssueNextRequest(NewCompletionHandler); } // Ensure new requests are enqueued as necessary @@ -1213,13 +1338,15 @@ HttpSysServerRequest::WriteResponse(HttpResponseCode ResponseCode) { ZEN_ASSERT(IsHandled() == false); - m_Response = new HttpMessageResponseRequest(m_HttpTx, (uint16_t)ResponseCode); + auto Response = new HttpMessageResponseRequest(m_HttpTx, (uint16_t)ResponseCode); if (SuppressBody()) { - m_Response->SuppressResponseBody(); + Response->SuppressResponseBody(); } + m_NextCompletionHandler = Response; + SetIsHandled(); } @@ -1228,13 +1355,15 @@ HttpSysServerRequest::WriteResponse(HttpResponseCode ResponseCode, HttpContentTy { ZEN_ASSERT(IsHandled() == false); - m_Response = new HttpMessageResponseRequest(m_HttpTx, (uint16_t)ResponseCode, ContentType, Blobs); + auto Response = new HttpMessageResponseRequest(m_HttpTx, (uint16_t)ResponseCode, ContentType, Blobs); if (SuppressBody()) { - m_Response->SuppressResponseBody(); + Response->SuppressResponseBody(); } + m_NextCompletionHandler = Response; + SetIsHandled(); } @@ -1243,17 +1372,32 @@ HttpSysServerRequest::WriteResponse(HttpResponseCode ResponseCode, HttpContentTy { ZEN_ASSERT(IsHandled() == false); - m_Response = + auto Response = new HttpMessageResponseRequest(m_HttpTx, (uint16_t)ResponseCode, ContentType, ResponseString.data(), ResponseString.size()); if (SuppressBody()) { - m_Response->SuppressResponseBody(); + Response->SuppressResponseBody(); } + m_NextCompletionHandler = Response; + SetIsHandled(); } +void +HttpSysServerRequest::WriteResponseAsync(std::function<void(HttpServerRequest&)>&& ContinuationHandler) +{ + if (m_HttpTx.Server().IsAsyncResponseEnabled()) + { + ContinuationHandler(m_HttpTx.ServerRequest()); + } + else + { + m_NextCompletionHandler = new HttpAsyncWorkRequest(m_HttpTx, std::move(ContinuationHandler)); + } +} + ////////////////////////////////////////////////////////////////////////// InitialRequestHandler::InitialRequestHandler(HttpSysTransaction& InRequest) : HttpSysRequestHandler(InRequest) @@ -1452,14 +1596,14 @@ InitialRequestHandler::HandleCompletion(ULONG IoResult, ULONG_PTR NumberOfBytesT HttpSysServerRequest& ThisRequest = Transaction().InvokeRequestHandler(*Service, m_PayloadBuffer); - if (!ThisRequest.IsHandled()) + if (HttpSysRequestHandler* Response = ThisRequest.m_NextCompletionHandler) { - return new HttpMessageResponseRequest(Transaction(), 404, "Not found"sv); + return Response; } - if (HttpMessageResponseRequest* Response = ThisRequest.m_Response) + if (!ThisRequest.IsHandled()) { - return Response; + return new HttpMessageResponseRequest(Transaction(), 404, "Not found"sv); } } @@ -1503,4 +1647,4 @@ HttpSysServer::RegisterService(HttpService& Service) } } // namespace zen -#endif
\ No newline at end of file +#endif diff --git a/zenhttp/httpsys.h b/zenhttp/httpsys.h index a8395b283..46ba122cc 100644 --- a/zenhttp/httpsys.h +++ b/zenhttp/httpsys.h @@ -16,6 +16,7 @@ # define _WINSOCKAPI_ # include <zencore/windows.h> # include "iothreadpool.h" +# include "workthreadpool.h" # include <http.h> @@ -35,7 +36,7 @@ class HttpSysServer : public HttpServer friend class HttpSysTransaction; public: - explicit HttpSysServer(unsigned int ThreadCount); + explicit HttpSysServer(unsigned int ThreadCount, unsigned int AsyncWorkThreadCount); ~HttpSysServer(); // HttpServer interface implementation @@ -45,6 +46,11 @@ public: virtual void RequestExit() override; virtual void RegisterService(HttpService& Service) override; + WorkerThreadPool& WorkPool() { return m_AsyncWorkPool; } + + inline bool IsOk() const { return m_IsOk; } + inline bool IsAsyncResponseEnabled() const { return m_IsAsyncResponseEnabled; } + private: void Initialize(const wchar_t* UrlPath); void Cleanup(); @@ -53,8 +59,6 @@ private: void OnHandlingRequest(); void IssueNewRequestMaybe(); - inline bool IsOk() const { return m_IsOk; } - void RegisterService(const char* Endpoint, HttpService& Service); void UnregisterService(const char* Endpoint, HttpService& Service); @@ -63,10 +67,13 @@ private: spdlog::logger& m_RequestLog; spdlog::logger& Log() { return m_Log; } - bool m_IsOk = false; - bool m_IsHttpInitialized = false; - bool m_IsRequestLoggingEnabled = false; - WinIoThreadPool m_ThreadPool; + bool m_IsOk = false; + bool m_IsHttpInitialized = false; + bool m_IsRequestLoggingEnabled = false; + bool m_IsAsyncResponseEnabled = true; + + WinIoThreadPool m_ThreadPool; + WorkerThreadPool m_AsyncWorkPool; std::wstring m_BaseUri; // http://*:nnnn/ HTTP_SERVER_SESSION_ID m_HttpSessionId = 0; diff --git a/zenhttp/include/zenhttp/httpserver.h b/zenhttp/include/zenhttp/httpserver.h index 6a7dc8a70..3e6608f11 100644 --- a/zenhttp/include/zenhttp/httpserver.h +++ b/zenhttp/include/zenhttp/httpserver.h @@ -97,6 +97,8 @@ public: void WriteResponse(HttpResponseCode ResponseCode, HttpContentType ContentType, std::string_view ResponseString); void WriteResponse(HttpResponseCode ResponseCode, HttpContentType ContentType, IoBuffer Blob); + virtual void WriteResponseAsync(std::function<void(HttpServerRequest&)>&& ContinuationHandler) = 0; + protected: enum { diff --git a/zenhttp/workthreadpool.cpp b/zenhttp/workthreadpool.cpp new file mode 100644 index 000000000..41eaaae94 --- /dev/null +++ b/zenhttp/workthreadpool.cpp @@ -0,0 +1,77 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "workthreadpool.h" + +#include <zencore/logging.h> + +namespace zen { + +namespace detail { + struct LambdaWork : IWork + { + LambdaWork(auto Work) : WorkFunction(Work) {} + virtual void Execute() override { WorkFunction(); } + + std::function<void()> WorkFunction; + }; +} // namespace detail + +WorkerThreadPool::WorkerThreadPool(int InThreadCount) +{ + for (int i = 0; i < InThreadCount; ++i) + { + m_WorkerThreads.emplace_back(&WorkerThreadPool::WorkerThreadFunction, this); + } +} + +WorkerThreadPool::~WorkerThreadPool() +{ + m_WorkQueue.CompleteAdding(); + + for (std::thread& Thread : m_WorkerThreads) + { + Thread.join(); + } + + m_WorkerThreads.clear(); +} + +void +WorkerThreadPool::ScheduleWork(Ref<IWork> Work) +{ + m_WorkQueue.Enqueue(std::move(Work)); +} + +void +WorkerThreadPool::ScheduleWork(std::function<void()>&& Work) +{ + m_WorkQueue.Enqueue(new detail::LambdaWork(Work)); +} + +void +WorkerThreadPool::WorkerThreadFunction() +{ + do + { + Ref<IWork> Work; + if (m_WorkQueue.WaitAndDequeue(Work)) + { + try + { + Work->Execute(); + } + catch (std::exception& e) + { + Work->m_Exception = std::current_exception(); + + ZEN_WARN("Caught exception in worker thread: {}", e.what()); + } + } + else + { + return; + } + } while (true); +} + +} // namespace zen
\ No newline at end of file diff --git a/zenhttp/workthreadpool.h b/zenhttp/workthreadpool.h new file mode 100644 index 000000000..6581cc08f --- /dev/null +++ b/zenhttp/workthreadpool.h @@ -0,0 +1,47 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/zencore.h> + +#include <zencore/blockingqueue.h> +#include <zencore/refcount.h> +#include <zencore/windows.h> + +#include <exception> +#include <functional> +#include <system_error> +#include <thread> +#include <vector> + +namespace zen { + +struct IWork : public RefCounted +{ + virtual void Execute() = 0; + + inline std::exception_ptr GetException() { return m_Exception; } + +private: + std::exception_ptr m_Exception; + + friend class WorkerThreadPool; +}; + +class WorkerThreadPool +{ +public: + WorkerThreadPool(int InThreadCount); + ~WorkerThreadPool(); + + void ScheduleWork(Ref<IWork> Work); + void ScheduleWork(std::function<void()>&& Work); + + void WorkerThreadFunction(); + +private: + std::vector<std::thread> m_WorkerThreads; + BlockingQueue<Ref<IWork>> m_WorkQueue; +}; + +} // namespace zen diff --git a/zenhttp/zenhttp.vcxproj b/zenhttp/zenhttp.vcxproj index 899cf4bd1..1fc64bfc2 100644 --- a/zenhttp/zenhttp.vcxproj +++ b/zenhttp/zenhttp.vcxproj @@ -100,6 +100,7 @@ <ClCompile Include="httpsys.cpp" /> <ClCompile Include="httpuws.cpp" /> <ClCompile Include="iothreadpool.cpp" /> + <ClCompile Include="workthreadpool.cpp" /> <ClCompile Include="zenhttp.cpp" /> </ItemGroup> <ItemGroup> @@ -112,6 +113,7 @@ <ClInclude Include="include\zenhttp\httpshared.h" /> <ClInclude Include="include\zenhttp\zenhttp.h" /> <ClInclude Include="iothreadpool.h" /> + <ClInclude Include="workthreadpool.h" /> </ItemGroup> <ItemGroup> <ProjectReference Include="..\zencore\zencore.vcxproj"> diff --git a/zenhttp/zenhttp.vcxproj.filters b/zenhttp/zenhttp.vcxproj.filters index 2e968055c..e57e7a712 100644 --- a/zenhttp/zenhttp.vcxproj.filters +++ b/zenhttp/zenhttp.vcxproj.filters @@ -9,6 +9,7 @@ <ClCompile Include="httpuws.cpp" /> <ClCompile Include="httpshared.cpp" /> <ClCompile Include="zenhttp.cpp" /> + <ClCompile Include="workthreadpool.cpp" /> </ItemGroup> <ItemGroup> <ClInclude Include="include\zenhttp\httpclient.h" /> @@ -20,6 +21,7 @@ <ClInclude Include="httpuws.h" /> <ClInclude Include="include\zenhttp\httpcommon.h" /> <ClInclude Include="include\zenhttp\httpshared.h" /> + <ClInclude Include="workthreadpool.h" /> </ItemGroup> <ItemGroup> <None Include="xmake.lua" /> diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index 27bb1c5cd..4a2a3748a 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -286,28 +286,104 @@ HttpStructuredCacheService::HandleCacheRecordRequest(HttpServerRequest& Request, void HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy) { - const ZenContentType AcceptType = Request.AcceptContentType(); + const ZenContentType AcceptType = Request.AcceptContentType(); + const bool SkipData = (Policy & CachePolicy::SkipData) == CachePolicy::SkipData; + const bool SkipAttachments = (Policy & CachePolicy::SkipAttachments) == CachePolicy::SkipAttachments; + const bool QueryUpstream = m_UpstreamCache && ((Policy & CachePolicy::QueryRemote) == CachePolicy::QueryRemote); - ZenCacheValue Value; - bool Success = m_CacheStore.Get(Ref.BucketSegment, Ref.HashKey, /* out */ Value); - bool InUpstreamCache = false; + bool Success = false; + ZenCacheValue LocalCacheValue; - const bool QueryUpstream = !Success && m_UpstreamCache && (CachePolicy::QueryRemote == (Policy & CachePolicy::QueryRemote)); + if (m_CacheStore.Get(Ref.BucketSegment, Ref.HashKey, LocalCacheValue)) + { + Success = true; - if (QueryUpstream) + if (!SkipData && AcceptType == ZenContentType::kCbPackage) + { + CbPackage Package; + CbObjectView CacheRecord(LocalCacheValue.Value.Data()); + uint32_t AttachmentCount = 0; + uint32_t ValidCount = 0; + + if (!SkipAttachments) + { + CacheRecord.IterateAttachments([this, &Ref, &Package, &AttachmentCount, &ValidCount](CbFieldView AttachmentHash) { + if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash())) + { + Package.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk)))); + ValidCount++; + } + AttachmentCount++; + }); + + if (ValidCount != AttachmentCount) + { + Success = false; + ZEN_WARN("GET - '{}/{}' '{}' FAILED, found '{}' of '{}' attachments", + Ref.BucketSegment, + Ref.HashKey, + ToString(AcceptType), + ValidCount, + AttachmentCount); + } + } + + Package.SetObject(LoadCompactBinaryObject(LocalCacheValue.Value)); + + BinaryWriter MemStream; + Package.Save(MemStream); + + LocalCacheValue.Value = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size()); + LocalCacheValue.Value.SetContentType(HttpContentType::kCbPackage); + } + } + + if (Success) { - const ZenContentType CacheRecordType = AcceptType; + ZEN_DEBUG("HIT - '{}/{}' {} '{}' (LOCAL)", + Ref.BucketSegment, + Ref.HashKey, + NiceBytes(LocalCacheValue.Value.Size()), + ToString(LocalCacheValue.Value.GetContentType())); - if (auto UpstreamResult = m_UpstreamCache->GetCacheRecord({Ref.BucketSegment, Ref.HashKey}, CacheRecordType); + m_CacheStats.HitCount++; + + if (SkipData) + { + return Request.WriteResponse(HttpResponseCode::OK); + } + else + { + return Request.WriteResponse(HttpResponseCode::OK, LocalCacheValue.Value.GetContentType(), LocalCacheValue.Value); + } + } + else if (!QueryUpstream) + { + ZEN_DEBUG("MISS - '{}/{}' '{}'", Ref.BucketSegment, Ref.HashKey, ToString(AcceptType)); + m_CacheStats.MissCount++; + return Request.WriteResponse(HttpResponseCode::NotFound); + } + + // Issue upstream query asynchronously in order to keep requests flowing without + // hogging I/O servicing threads with blocking work + + Request.WriteResponseAsync([this, AcceptType, SkipData, SkipAttachments, Ref](HttpServerRequest& AsyncRequest) { + bool Success = false; + ZenCacheValue UpstreamCacheValue; + + metrics::OperationTiming::Scope $(m_UpstreamGetRequestTiming); + + if (GetUpstreamCacheResult UpstreamResult = m_UpstreamCache->GetCacheRecord({Ref.BucketSegment, Ref.HashKey}, AcceptType); UpstreamResult.Success) { - Value.Value = UpstreamResult.Value; - Success = true; - InUpstreamCache = true; + Success = true; + UpstreamCacheValue.Value = UpstreamResult.Value; + + UpstreamCacheValue.Value.SetContentType(AcceptType); - if (CacheRecordType == ZenContentType::kBinary || CacheRecordType == ZenContentType::kCbObject) + if (AcceptType == ZenContentType::kBinary || AcceptType == ZenContentType::kCbObject) { - if (CacheRecordType == ZenContentType::kCbObject) + if (AcceptType == ZenContentType::kCbObject) { const CbValidateError ValidationResult = ValidateCompactBinary(UpstreamResult.Value, CbValidateMode::All); @@ -320,7 +396,7 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request CacheRecord.IterateAttachments([&](CbFieldView Attachment) { IndexData.AddHash(Attachment.AsHash()); }); IndexData.EndArray(); - Value.IndexData = IndexData.Save(); + UpstreamCacheValue.IndexData = IndexData.Save(); } else { @@ -334,19 +410,17 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request if (Success) { - m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, Value); + m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, UpstreamCacheValue); } } - else + else if (AcceptType == ZenContentType::kCbPackage) { - ZEN_ASSERT(CacheRecordType == ZenContentType::kCbPackage); - CbPackage Package; - if (Package.TryLoad(UpstreamResult.Value)) + if (Package.TryLoad(UpstreamCacheValue.Value)) { + CbObject CacheRecord = Package.GetObject(); uint32_t AttachmentCount = 0; uint32_t ValidCount = 0; - CbObject CacheRecord = Package.GetObject(); CacheRecord.IterateAttachments([this, &Package, &Ref, &AttachmentCount, &ValidCount](CbFieldView AttachmentHash) { if (const CbAttachment* Attachment = Package.FindAttachment(AttachmentHash.AsHash())) @@ -371,7 +445,7 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request { m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, {.Value = CacheRecord.GetBuffer().AsIoBuffer()}); - if (zen::CachePolicy::SkipAttachments == (Policy & zen::CachePolicy::SkipAttachments)) + if (SkipAttachments) { CbPackage PackageWithoutAttachments; PackageWithoutAttachments.SetObject(CacheRecord); @@ -379,7 +453,7 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request BinaryWriter MemStream; PackageWithoutAttachments.Save(MemStream); - Value.Value = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size()); + UpstreamCacheValue.Value = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size()); } } else @@ -398,109 +472,34 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request } } } - } - - if (!Success) - { - ZEN_DEBUG("MISS - '{}/{}' '{}'", Ref.BucketSegment, Ref.HashKey, ToString(AcceptType)); - m_CacheStats.MissCount++; - return Request.WriteResponse(HttpResponseCode::NotFound); - } - - if (AcceptType == ZenContentType::kCbPackage && !InUpstreamCache) - { - CbObjectView CacheRecord(Value.Value.Data()); - const CbValidateError ValidationResult = ValidateCompactBinary(Value.Value, CbValidateMode::All); - - if (ValidationResult != CbValidateError::None) + if (Success) { - ZEN_WARN("GET - '{}/{}' '{}' FAILED, invalid compact binary object", Ref.BucketSegment, Ref.HashKey, ToString(AcceptType)); - m_CacheStats.MissCount++; - return Request.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, "Invalid cache record"sv); - } + ZEN_DEBUG("HIT - '{}/{}' {} '{}' (UPSTREAM)", + Ref.BucketSegment, + Ref.HashKey, + NiceBytes(UpstreamCacheValue.Value.Size()), + ToString(UpstreamCacheValue.Value.GetContentType())); - if ((Policy & CachePolicy::SkipData) == CachePolicy::SkipData) - { m_CacheStats.HitCount++; - return Request.WriteResponse(HttpResponseCode::OK); - } - - const bool SkipAttachments = (Policy & CachePolicy::SkipAttachments) == CachePolicy::SkipAttachments; - uint32_t AttachmentCount = 0; - uint32_t ValidCount = 0; - uint64_t AttachmentBytes = 0ull; - - CbPackage Package; - - if (!SkipAttachments) - { - CacheRecord.IterateAttachments( - [this, &Ref, &Package, &AttachmentCount, &ValidCount, &AttachmentBytes](CbFieldView AttachmentHash) { - if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash())) - { - Package.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk)))); - AttachmentBytes += Chunk.Size(); - ValidCount++; - } - AttachmentCount++; - }); + m_CacheStats.UpstreamHitCount++; - if (ValidCount != AttachmentCount) + if (SkipData) { - ZEN_WARN("GET - '{}/{}' '{}' FAILED, found '{}' of '{}' attachments", - Ref.BucketSegment, - Ref.HashKey, - ToString(AcceptType), - ValidCount, - AttachmentCount); - - m_CacheStats.MissCount++; - return Request.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, "Missing attachments"sv); + AsyncRequest.WriteResponse(HttpResponseCode::OK); + } + else + { + AsyncRequest.WriteResponse(HttpResponseCode::OK, UpstreamCacheValue.Value.GetContentType(), UpstreamCacheValue.Value); } - } - - Package.SetObject(LoadCompactBinaryObject(Value.Value)); - - ZEN_DEBUG("HIT - '{}/{}' {} '{}', {} attachments (LOCAL)", - Ref.BucketSegment, - Ref.HashKey, - NiceBytes(AttachmentBytes + Value.Value.Size()), - ToString(HttpContentType::kCbPackage), - AttachmentCount); - - BinaryWriter MemStream; - Package.Save(MemStream); - - IoBuffer Response(IoBuffer::Clone, MemStream.Data(), MemStream.Size()); - - m_CacheStats.HitCount++; - Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, Response); - } - else - { - ZEN_DEBUG("HIT - '{}/{}' {} '{}' ({})", - Ref.BucketSegment, - Ref.HashKey, - NiceBytes(Value.Value.Size()), - ToString(Value.Value.GetContentType()), - InUpstreamCache ? "UPSTREAM" : "LOCAL"); - - m_CacheStats.HitCount++; - if (InUpstreamCache) - { - m_CacheStats.UpstreamHitCount++; - } - - if ((Policy & CachePolicy::SkipData) == CachePolicy::SkipData) - { - Request.WriteResponse(HttpResponseCode::OK); } else { - Request.WriteResponse(HttpResponseCode::OK, Value.Value.GetContentType(), Value.Value); + ZEN_DEBUG("MISS - '{}/{}' '{}'", Ref.BucketSegment, Ref.HashKey, ToString(AcceptType)); + m_CacheStats.MissCount++; + AsyncRequest.WriteResponse(HttpResponseCode::NotFound); } - } + }); } void @@ -875,6 +874,7 @@ HttpStructuredCacheService::HandleStatsRequest(zen::HttpServerRequest& Request) CbObjectWriter Cbo; EmitSnapshot("requests", m_HttpRequests, Cbo); + EmitSnapshot("upstream_gets", m_UpstreamGetRequestTiming, Cbo); const uint64_t HitCount = m_CacheStats.HitCount; const uint64_t UpstreamHitCount = m_CacheStats.UpstreamHitCount; diff --git a/zenserver/cache/structuredcache.h b/zenserver/cache/structuredcache.h index 703e24ed3..ad7253f79 100644 --- a/zenserver/cache/structuredcache.h +++ b/zenserver/cache/structuredcache.h @@ -30,12 +30,12 @@ enum class CachePolicy : uint8_t; * * {BucketId}/{KeyHash} * - * Where BucketId is an alphanumeric string, and KeyHash is a 40-character hexadecimal - * sequence. The hash value may be derived in any number of ways, it's up to the - * application to pick an approach. + * Where BucketId is a lower-case alphanumeric string, and KeyHash is a 40-character + * hexadecimal sequence. The hash value may be derived in any number of ways, it's + * up to the application to pick an approach. * * Values may be structured or unstructured. Structured values are encoded using Unreal - * Engine's compact binary encoding + * Engine's compact binary encoding (see CbObject) * * Additionally, attachments may be addressed as: * @@ -62,8 +62,7 @@ public: ~HttpStructuredCacheService(); virtual const char* BaseUri() const override; - - virtual void HandleRequest(zen::HttpServerRequest& Request) override; + virtual void HandleRequest(zen::HttpServerRequest& Request) override; void Flush(); void Scrub(ScrubContext& Ctx); @@ -104,6 +103,7 @@ private: std::unique_ptr<UpstreamCache> m_UpstreamCache; uint64_t m_LastScrubTime = 0; metrics::OperationTiming m_HttpRequests; + metrics::OperationTiming m_UpstreamGetRequestTiming; CacheStats m_CacheStats; }; diff --git a/zenserver/testing/httptest.cpp b/zenserver/testing/httptest.cpp index 01866a63b..924546762 100644 --- a/zenserver/testing/httptest.cpp +++ b/zenserver/testing/httptest.cpp @@ -4,9 +4,12 @@ #include <zencore/compactbinarybuilder.h> #include <zencore/compactbinarypackage.h> +#include <zencore/timer.h> namespace zen { +using namespace fmt::literals; + HttpTestingService::HttpTestingService() { m_Router.RegisterRoute( @@ -15,6 +18,44 @@ HttpTestingService::HttpTestingService() HttpVerb::kGet); m_Router.RegisterRoute( + "hello_slow", + [this](HttpRouterRequest& Req) { + Req.ServerRequest().WriteResponseAsync([this](HttpServerRequest& Request) { + Stopwatch Timer; + Sleep(1000); + Request.WriteResponse(HttpResponseCode::OK, + HttpContentType::kText, + "hello, took me {}"_format(NiceTimeSpanMs(Timer.GetElapsedTimeMs()))); + }); + }, + HttpVerb::kGet); + + m_Router.RegisterRoute( + "hello_veryslow", + [this](HttpRouterRequest& Req) { + Req.ServerRequest().WriteResponseAsync([this](HttpServerRequest& Request) { + Stopwatch Timer; + Sleep(60000); + Request.WriteResponse(HttpResponseCode::OK, + HttpContentType::kText, + "hello, took me {}"_format(NiceTimeSpanMs(Timer.GetElapsedTimeMs()))); + }); + }, + HttpVerb::kGet); + + m_Router.RegisterRoute( + "hello_throw", + [this](HttpRouterRequest& Req) { + Req.ServerRequest().WriteResponseAsync([this](HttpServerRequest&) { throw std::runtime_error("intentional error"); }); + }, + HttpVerb::kGet); + + m_Router.RegisterRoute( + "hello_noresponse", + [this](HttpRouterRequest& Req) { Req.ServerRequest().WriteResponseAsync([this](HttpServerRequest&) {}); }, + HttpVerb::kGet); + + m_Router.RegisterRoute( "metrics", [this](HttpRouterRequest& Req) { metrics::OperationTiming::Scope _(m_TimingStats); diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index 9ed392faf..5b2629f72 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -41,7 +41,7 @@ namespace detail { , m_UseLegacyDdc(Options.UseLegacyDdc) { using namespace fmt::literals; - m_DisplayName = "Jupier - '{}'"_format(Options.ServiceUrl); + m_DisplayName = "Jupiter - '{}'"_format(Options.ServiceUrl); m_Client = new CloudCacheClient(Options); } |