diff options
| -rw-r--r-- | zencore/include/zencore/stats.h | 62 | ||||
| -rw-r--r-- | zencore/stats.cpp | 71 | ||||
| -rw-r--r-- | zenhttp/httpserver.cpp | 2 | ||||
| -rw-r--r-- | zenhttp/httpsys.cpp | 248 | ||||
| -rw-r--r-- | zenhttp/httpsys.h | 21 | ||||
| -rw-r--r-- | zenhttp/include/zenhttp/httpserver.h | 2 | ||||
| -rw-r--r-- | zenhttp/workthreadpool.cpp | 77 | ||||
| -rw-r--r-- | zenhttp/workthreadpool.h | 47 | ||||
| -rw-r--r-- | zenhttp/zenhttp.vcxproj | 2 | ||||
| -rw-r--r-- | zenhttp/zenhttp.vcxproj.filters | 2 | ||||
| -rw-r--r-- | zenserver/cache/structuredcache.cpp | 228 | ||||
| -rw-r--r-- | zenserver/cache/structuredcache.h | 12 | ||||
| -rw-r--r-- | zenserver/testing/httptest.cpp | 41 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamcache.cpp | 2 |
14 files changed, 627 insertions, 190 deletions
diff --git a/zencore/include/zencore/stats.h b/zencore/include/zencore/stats.h index 2e567d614..884bb53f6 100644 --- a/zencore/include/zencore/stats.h +++ b/zencore/include/zencore/stats.h @@ -221,10 +221,70 @@ private: Histogram m_Histogram; }; +/** Metrics for network requests + + Aggregates tracking of duration, payload sizes into a single + class + + */ +class RequestStats +{ +public: + RequestStats(int32_t SampleCount = 514); + ~RequestStats(); + + void Update(int64_t Duration, int64_t Bytes); + uint64_t Count() const; + + // Timing + + int64_t MaxDuration() const { return m_BytesHistogram.Max(); } + int64_t MinDuration() const { return m_BytesHistogram.Min(); } + double MeanDuration() const { return m_BytesHistogram.Mean(); } + SampleSnapshot DurationSnapshot() const { return m_RequestTimeHistogram.Snapshot(); } + double Rate1() { return m_RequestMeter.Rate1(); } + double Rate5() { return m_RequestMeter.Rate5(); } + double Rate15() { return m_RequestMeter.Rate15(); } + double MeanRate() const { return m_RequestMeter.MeanRate(); } + + // Bytes + + int64_t MaxBytes() const { return m_BytesHistogram.Max(); } + int64_t MinBytes() const { return m_BytesHistogram.Min(); } + double MeanBytes() const { return m_BytesHistogram.Mean(); } + SampleSnapshot BytesSnapshot() const { return m_BytesHistogram.Snapshot(); } + double ByteRate1() { return m_BytesMeter.Rate1(); } + double ByteRate5() { return m_BytesMeter.Rate5(); } + double ByteRate15() { return m_BytesMeter.Rate15(); } + double ByteMeanRate() const { return m_BytesMeter.MeanRate(); } + + struct Scope + { + Scope(OperationTiming& Outer); + ~Scope(); + + void Cancel(); + + private: + OperationTiming& m_Outer; + uint64_t m_StartTick; + }; + + void EmitSnapshot(std::string_view Tag, CbObjectWriter& Cbo); + +private: + Meter m_RequestMeter; + Meter m_BytesMeter; + Histogram m_RequestTimeHistogram; + Histogram m_BytesHistogram; +}; + void EmitSnapshot(std::string_view Tag, OperationTiming& Stat, CbObjectWriter& Cbo); -void EmitSnapshot(std::string_view Tag, const Histogram& Stat, CbObjectWriter& Cbo); +void EmitSnapshot(std::string_view Tag, const Histogram& Stat, CbObjectWriter& Cbo, double ConversionFactor); void EmitSnapshot(std::string_view Tag, Meter& Stat, CbObjectWriter& Cbo); +void EmitSnapshot(const Histogram& Stat, CbObjectWriter& Cbo, double ConversionFactor); + } // namespace zen::metrics namespace zen { diff --git a/zencore/stats.cpp b/zencore/stats.cpp index 6afd217e4..0c0647999 100644 --- a/zencore/stats.cpp +++ b/zencore/stats.cpp @@ -419,6 +419,58 @@ OperationTiming::Scope::Cancel() ////////////////////////////////////////////////////////////////////////// +RequestStats::RequestStats(int32_t SampleCount) : m_RequestTimeHistogram{SampleCount}, m_BytesHistogram{SampleCount} +{ +} + +RequestStats::~RequestStats() +{ +} + +void +RequestStats::Update(int64_t Duration, int64_t Bytes) +{ + m_RequestMeter.Mark(1); + m_RequestTimeHistogram.Update(Duration); + + m_BytesMeter.Mark(Bytes); + m_BytesHistogram.Update(Bytes); +} + +uint64_t +RequestStats::Count() const +{ + return m_RequestMeter.Count(); +} + +////////////////////////////////////////////////////////////////////////// + +void +EmitSnapshot(Meter& Stat, CbObjectWriter& Cbo) +{ + Cbo << "count" << Stat.Count(); + Cbo << "rate_mean" << Stat.MeanRate(); + Cbo << "rate_1" << Stat.Rate1() << "rate_5" << Stat.Rate5() << "rate_15" << Stat.Rate15(); +} + +void +RequestStats::EmitSnapshot(std::string_view Tag, CbObjectWriter& Cbo) +{ + Cbo.BeginObject(Tag); + + Cbo.BeginObject("requests"); + metrics::EmitSnapshot(m_RequestMeter, Cbo); + metrics::EmitSnapshot(m_RequestTimeHistogram, Cbo, GetHifreqTimerToSeconds()); + Cbo.EndObject(); + + Cbo.BeginObject("bytes"); + metrics::EmitSnapshot(m_BytesMeter, Cbo); + metrics::EmitSnapshot(m_BytesHistogram, Cbo, 1.0); + Cbo.EndObject(); + + Cbo.EndObject(); +} + void EmitSnapshot(std::string_view Tag, OperationTiming& Stat, CbObjectWriter& Cbo) { @@ -427,7 +479,6 @@ EmitSnapshot(std::string_view Tag, OperationTiming& Stat, CbObjectWriter& Cbo) SampleSnapshot Snap = Stat.Snapshot(); Cbo << "count" << Stat.Count(); - Cbo << "rate_mean" << Stat.MeanRate(); Cbo << "rate_1" << Stat.Rate1() << "rate_5" << Stat.Rate5() << "rate_15" << Stat.Rate15(); @@ -442,18 +493,22 @@ EmitSnapshot(std::string_view Tag, OperationTiming& Stat, CbObjectWriter& Cbo) } void -EmitSnapshot(std::string_view Tag, const Histogram& Stat, CbObjectWriter& Cbo) +EmitSnapshot(std::string_view Tag, const Histogram& Stat, CbObjectWriter& Cbo, double ConversionFactor) { Cbo.BeginObject(Tag); + EmitSnapshot(Stat, Cbo, ConversionFactor); + Cbo.EndObject(); +} +void +EmitSnapshot(const Histogram& Stat, CbObjectWriter& Cbo, double ConversionFactor) +{ SampleSnapshot Snap = Stat.Snapshot(); - Cbo << "count" << Stat.Count() << "avg" << Stat.Mean(); - Cbo << "min" << Stat.Min() << "max" << Stat.Max(); - Cbo << "p75" << Snap.Get75Percentile() << "p95" << Snap.Get95Percentile() << "p99" << Snap.Get99Percentile() << "p999" - << Snap.Get999Percentile(); - - Cbo.EndObject(); + Cbo << "count" << Stat.Count() * ConversionFactor << "avg" << Stat.Mean() * ConversionFactor; + Cbo << "min" << Stat.Min() * ConversionFactor << "max" << Stat.Max() * ConversionFactor; + Cbo << "p75" << Snap.Get75Percentile() * ConversionFactor << "p95" << Snap.Get95Percentile() * ConversionFactor << "p99" + << Snap.Get99Percentile() * ConversionFactor << "p999" << Snap.Get999Percentile() * ConversionFactor; } void diff --git a/zenhttp/httpserver.cpp b/zenhttp/httpserver.cpp index 193426ed2..69974ca06 100644 --- a/zenhttp/httpserver.cpp +++ b/zenhttp/httpserver.cpp @@ -571,7 +571,7 @@ CreateHttpServer() #if 0 return new HttpUwsServer; #elif ZEN_WITH_HTTPSYS - return new HttpSysServer{std::thread::hardware_concurrency()}; + return new HttpSysServer(std::thread::hardware_concurrency(), /* background worker threads */ 16); #else return new HttpNullServer; #endif diff --git a/zenhttp/httpsys.cpp b/zenhttp/httpsys.cpp index de3069bb8..f88563097 100644 --- a/zenhttp/httpsys.cpp +++ b/zenhttp/httpsys.cpp @@ -129,16 +129,18 @@ GetAcceptType(const HTTP_REQUEST* HttpRequest) class HttpSysRequestHandler { public: - explicit HttpSysRequestHandler(HttpSysTransaction& InRequest) : m_Request(InRequest) {} + explicit HttpSysRequestHandler(HttpSysTransaction& Transaction) : m_Transaction(Transaction) {} virtual ~HttpSysRequestHandler() = default; virtual void IssueRequest(std::error_code& ErrorCode) = 0; virtual HttpSysRequestHandler* HandleCompletion(ULONG IoResult, ULONG_PTR NumberOfBytesTransferred) = 0; + HttpSysTransaction& Transaction() { return m_Transaction; } - HttpSysTransaction& Transaction() { return m_Request; } + HttpSysRequestHandler(const HttpSysRequestHandler&) = delete; + HttpSysRequestHandler& operator=(const HttpSysRequestHandler&) = delete; private: - HttpSysTransaction& m_Request; // Related HTTP transaction object + HttpSysTransaction& m_Transaction; }; /** @@ -184,12 +186,16 @@ public: virtual void WriteResponse(HttpResponseCode ResponseCode) override; virtual void WriteResponse(HttpResponseCode ResponseCode, HttpContentType ContentType, std::span<IoBuffer> Blobs) override; virtual void WriteResponse(HttpResponseCode ResponseCode, HttpContentType ContentType, std::u8string_view ResponseString) override; + virtual void WriteResponseAsync(std::function<void(HttpServerRequest&)>&& ContinuationHandler) override; using HttpServerRequest::WriteResponse; - HttpSysTransaction& m_HttpTx; - HttpMessageResponseRequest* m_Response = nullptr; // TODO: make this more general - IoBuffer m_PayloadBuffer; + HttpSysServerRequest(const HttpSysServerRequest&) = delete; + HttpSysServerRequest& operator=(const HttpSysServerRequest&) = delete; + + HttpSysTransaction& m_HttpTx; + HttpSysRequestHandler* m_NextCompletionHandler = nullptr; + IoBuffer m_PayloadBuffer; }; /** HTTP transaction @@ -218,7 +224,9 @@ public: ULONG_PTR NumberOfBytesTransferred, PTP_IO Io); - void IssueInitialRequest(std::error_code& ErrorCode); + void IssueInitialRequest(std::error_code& ErrorCode); + bool IssueNextRequest(HttpSysRequestHandler* NewCompletionHandler); + PTP_IO Iocp(); HANDLE RequestQueueHandle(); inline OVERLAPPED* Overlapped() { return &m_HttpOverlapped; } @@ -227,6 +235,8 @@ public: HttpSysServerRequest& InvokeRequestHandler(HttpService& Service, IoBuffer Payload); + HttpSysServerRequest& ServerRequest() { return m_HandlerRequest.value(); } + private: OVERLAPPED m_HttpOverlapped{}; HttpSysServer& m_HttpServer; @@ -239,8 +249,6 @@ private: Ref<IHttpPackageHandler> m_PackageHandler; }; -////////////////////////////////////////////////////////////////////////// - /** * @brief HTTP request response I/O request handler * @@ -588,6 +596,108 @@ HttpMessageResponseRequest::IssueRequest(std::error_code& ErrorCode) } } +/** HTTP completion handler for async work + + This is used to allow work to be taken off the request handler threads + and to support posting responses asynchronously. + */ + +class HttpAsyncWorkRequest : public HttpSysRequestHandler +{ +public: + HttpAsyncWorkRequest(HttpSysTransaction& Tx, std::function<void(HttpServerRequest&)>&& Response); + ~HttpAsyncWorkRequest(); + + virtual void IssueRequest(std::error_code& ErrorCode) override final; + virtual HttpSysRequestHandler* HandleCompletion(ULONG IoResult, ULONG_PTR NumberOfBytesTransferred) override; + +private: + struct AsyncWorkItem : public IWork + { + virtual void Execute() override; + + AsyncWorkItem(HttpSysTransaction& InTx, std::function<void(HttpServerRequest&)>&& InHandler) + : Tx(InTx) + , Handler(std::move(InHandler)) + { + } + + HttpSysTransaction& Tx; + std::function<void(HttpServerRequest&)> Handler; + }; + + Ref<AsyncWorkItem> m_WorkItem; +}; + +HttpAsyncWorkRequest::HttpAsyncWorkRequest(HttpSysTransaction& Tx, std::function<void(HttpServerRequest&)>&& Response) +: HttpSysRequestHandler(Tx) +{ + m_WorkItem = new AsyncWorkItem(Tx, std::move(Response)); +} + +HttpAsyncWorkRequest::~HttpAsyncWorkRequest() +{ +} + +void +HttpAsyncWorkRequest::IssueRequest(std::error_code& ErrorCode) +{ + ErrorCode.clear(); + + Transaction().Server().WorkPool().ScheduleWork(m_WorkItem); +} + +HttpSysRequestHandler* +HttpAsyncWorkRequest::HandleCompletion(ULONG IoResult, ULONG_PTR NumberOfBytesTransferred) +{ + // This ought to not be called since there should be no outstanding I/O request + // when this completion handler is active + + ZEN_UNUSED(IoResult, NumberOfBytesTransferred); + + ZEN_WARN("Unexpected I/O completion during async work! IoResult: {}, NumberOfBytesTransferred: {}", IoResult, NumberOfBytesTransferred); + + return this; +} + +void +HttpAsyncWorkRequest::AsyncWorkItem::Execute() +{ + using namespace fmt::literals; + + try + { + HttpSysServerRequest& ThisRequest = Tx.ServerRequest(); + + ThisRequest.m_NextCompletionHandler = nullptr; + + Handler(ThisRequest); + + // TODO: should Handler be destroyed at this point to ensure there + // are no outstanding references into state which could be + // deleted asynchronously as a result of issuing the response? + + if (HttpSysRequestHandler* NextHandler = ThisRequest.m_NextCompletionHandler) + { + return (void)Tx.IssueNextRequest(NextHandler); + } + else if (!ThisRequest.IsHandled()) + { + return (void)Tx.IssueNextRequest(new HttpMessageResponseRequest(Tx, 404, "Not found"sv)); + } + else + { + // "Handled" but no request handler? Shouldn't ever happen + return (void)Tx.IssueNextRequest( + new HttpMessageResponseRequest(Tx, 500, "Response generated but no request handler scheduled"sv)); + } + } + catch (std::exception& Ex) + { + return (void)Tx.IssueNextRequest(new HttpMessageResponseRequest(Tx, 500, "Exception thrown in async work: '{}'"_format(Ex.what()))); + } +} + /** _________ / _____/ ______________ __ ___________ @@ -597,10 +707,11 @@ HttpMessageResponseRequest::IssueRequest(std::error_code& ErrorCode) \/ \/ \/ */ -HttpSysServer::HttpSysServer(unsigned int ThreadCount) +HttpSysServer::HttpSysServer(unsigned int ThreadCount, unsigned int AsyncWorkThreadCount) : m_Log(logging::Get("http")) , m_RequestLog(logging::Get("http_requests")) , m_ThreadPool(ThreadCount) +, m_AsyncWorkPool(AsyncWorkThreadCount) { ULONG Result = HttpInitialize(HTTPAPI_VERSION_2, HTTP_INITIALIZE_SERVER, nullptr); @@ -611,6 +722,8 @@ HttpSysServer::HttpSysServer(unsigned int ThreadCount) m_IsHttpInitialized = true; m_IsOk = true; + + ZEN_INFO("http.sys server started, using {} I/O threads and {} async worker threads", ThreadCount, AsyncWorkThreadCount); } HttpSysServer::~HttpSysServer() @@ -915,6 +1028,47 @@ HttpSysTransaction::IoCompletionCallback(PTP_CALLBACK_INSTANCE Instance, } } +bool +HttpSysTransaction::IssueNextRequest(HttpSysRequestHandler* NewCompletionHandler) +{ + HttpSysRequestHandler* CurrentHandler = m_CompletionHandler; + m_CompletionHandler = NewCompletionHandler; + + auto _ = MakeGuard([this, CurrentHandler] { + if ((CurrentHandler != &m_InitialHttpHandler) && (CurrentHandler != m_CompletionHandler)) + { + delete CurrentHandler; + } + }); + + if (NewCompletionHandler == nullptr) + { + return false; + } + + try + { + std::error_code ErrorCode; + m_CompletionHandler->IssueRequest(ErrorCode); + + if (!ErrorCode) + { + return true; + } + + ZEN_ERROR("IssueRequest() failed: '{}'", ErrorCode.message()); + } + catch (std::exception& Ex) + { + ZEN_ERROR("exception caught in IssueNextRequest(): '{}'", Ex.what()); + } + + // something went wrong, no request is pending + m_CompletionHandler = nullptr; + + return false; +} + HttpSysTransaction::Status HttpSysTransaction::HandleCompletion(ULONG IoResult, ULONG_PTR NumberOfBytesTransferred) { @@ -934,38 +1088,9 @@ HttpSysTransaction::HandleCompletion(ULONG IoResult, ULONG_PTR NumberOfBytesTran m_HttpServer.OnHandlingRequest(); } - m_CompletionHandler = CurrentHandler->HandleCompletion(IoResult, NumberOfBytesTransferred); + auto NewCompletionHandler = CurrentHandler->HandleCompletion(IoResult, NumberOfBytesTransferred); - if (m_CompletionHandler) - { - try - { - std::error_code ErrorCode; - m_CompletionHandler->IssueRequest(ErrorCode); - - if (ErrorCode) - { - ZEN_ERROR("IssueRequest() failed {}", ErrorCode.message()); - } - else - { - IsRequestPending = true; - } - } - catch (std::exception& Ex) - { - ZEN_ERROR("exception caught from IssueRequest(): {}", Ex.what()); - - // something went wrong, no request is pending - } - } - else - { - if (CurrentHandler != &m_InitialHttpHandler) - { - delete CurrentHandler; - } - } + IsRequestPending = IssueNextRequest(NewCompletionHandler); } // Ensure new requests are enqueued as necessary @@ -1213,13 +1338,15 @@ HttpSysServerRequest::WriteResponse(HttpResponseCode ResponseCode) { ZEN_ASSERT(IsHandled() == false); - m_Response = new HttpMessageResponseRequest(m_HttpTx, (uint16_t)ResponseCode); + auto Response = new HttpMessageResponseRequest(m_HttpTx, (uint16_t)ResponseCode); if (SuppressBody()) { - m_Response->SuppressResponseBody(); + Response->SuppressResponseBody(); } + m_NextCompletionHandler = Response; + SetIsHandled(); } @@ -1228,13 +1355,15 @@ HttpSysServerRequest::WriteResponse(HttpResponseCode ResponseCode, HttpContentTy { ZEN_ASSERT(IsHandled() == false); - m_Response = new HttpMessageResponseRequest(m_HttpTx, (uint16_t)ResponseCode, ContentType, Blobs); + auto Response = new HttpMessageResponseRequest(m_HttpTx, (uint16_t)ResponseCode, ContentType, Blobs); if (SuppressBody()) { - m_Response->SuppressResponseBody(); + Response->SuppressResponseBody(); } + m_NextCompletionHandler = Response; + SetIsHandled(); } @@ -1243,17 +1372,32 @@ HttpSysServerRequest::WriteResponse(HttpResponseCode ResponseCode, HttpContentTy { ZEN_ASSERT(IsHandled() == false); - m_Response = + auto Response = new HttpMessageResponseRequest(m_HttpTx, (uint16_t)ResponseCode, ContentType, ResponseString.data(), ResponseString.size()); if (SuppressBody()) { - m_Response->SuppressResponseBody(); + Response->SuppressResponseBody(); } + m_NextCompletionHandler = Response; + SetIsHandled(); } +void +HttpSysServerRequest::WriteResponseAsync(std::function<void(HttpServerRequest&)>&& ContinuationHandler) +{ + if (m_HttpTx.Server().IsAsyncResponseEnabled()) + { + ContinuationHandler(m_HttpTx.ServerRequest()); + } + else + { + m_NextCompletionHandler = new HttpAsyncWorkRequest(m_HttpTx, std::move(ContinuationHandler)); + } +} + ////////////////////////////////////////////////////////////////////////// InitialRequestHandler::InitialRequestHandler(HttpSysTransaction& InRequest) : HttpSysRequestHandler(InRequest) @@ -1452,14 +1596,14 @@ InitialRequestHandler::HandleCompletion(ULONG IoResult, ULONG_PTR NumberOfBytesT HttpSysServerRequest& ThisRequest = Transaction().InvokeRequestHandler(*Service, m_PayloadBuffer); - if (!ThisRequest.IsHandled()) + if (HttpSysRequestHandler* Response = ThisRequest.m_NextCompletionHandler) { - return new HttpMessageResponseRequest(Transaction(), 404, "Not found"sv); + return Response; } - if (HttpMessageResponseRequest* Response = ThisRequest.m_Response) + if (!ThisRequest.IsHandled()) { - return Response; + return new HttpMessageResponseRequest(Transaction(), 404, "Not found"sv); } } @@ -1503,4 +1647,4 @@ HttpSysServer::RegisterService(HttpService& Service) } } // namespace zen -#endif
\ No newline at end of file +#endif diff --git a/zenhttp/httpsys.h b/zenhttp/httpsys.h index a8395b283..46ba122cc 100644 --- a/zenhttp/httpsys.h +++ b/zenhttp/httpsys.h @@ -16,6 +16,7 @@ # define _WINSOCKAPI_ # include <zencore/windows.h> # include "iothreadpool.h" +# include "workthreadpool.h" # include <http.h> @@ -35,7 +36,7 @@ class HttpSysServer : public HttpServer friend class HttpSysTransaction; public: - explicit HttpSysServer(unsigned int ThreadCount); + explicit HttpSysServer(unsigned int ThreadCount, unsigned int AsyncWorkThreadCount); ~HttpSysServer(); // HttpServer interface implementation @@ -45,6 +46,11 @@ public: virtual void RequestExit() override; virtual void RegisterService(HttpService& Service) override; + WorkerThreadPool& WorkPool() { return m_AsyncWorkPool; } + + inline bool IsOk() const { return m_IsOk; } + inline bool IsAsyncResponseEnabled() const { return m_IsAsyncResponseEnabled; } + private: void Initialize(const wchar_t* UrlPath); void Cleanup(); @@ -53,8 +59,6 @@ private: void OnHandlingRequest(); void IssueNewRequestMaybe(); - inline bool IsOk() const { return m_IsOk; } - void RegisterService(const char* Endpoint, HttpService& Service); void UnregisterService(const char* Endpoint, HttpService& Service); @@ -63,10 +67,13 @@ private: spdlog::logger& m_RequestLog; spdlog::logger& Log() { return m_Log; } - bool m_IsOk = false; - bool m_IsHttpInitialized = false; - bool m_IsRequestLoggingEnabled = false; - WinIoThreadPool m_ThreadPool; + bool m_IsOk = false; + bool m_IsHttpInitialized = false; + bool m_IsRequestLoggingEnabled = false; + bool m_IsAsyncResponseEnabled = true; + + WinIoThreadPool m_ThreadPool; + WorkerThreadPool m_AsyncWorkPool; std::wstring m_BaseUri; // http://*:nnnn/ HTTP_SERVER_SESSION_ID m_HttpSessionId = 0; diff --git a/zenhttp/include/zenhttp/httpserver.h b/zenhttp/include/zenhttp/httpserver.h index 6a7dc8a70..3e6608f11 100644 --- a/zenhttp/include/zenhttp/httpserver.h +++ b/zenhttp/include/zenhttp/httpserver.h @@ -97,6 +97,8 @@ public: void WriteResponse(HttpResponseCode ResponseCode, HttpContentType ContentType, std::string_view ResponseString); void WriteResponse(HttpResponseCode ResponseCode, HttpContentType ContentType, IoBuffer Blob); + virtual void WriteResponseAsync(std::function<void(HttpServerRequest&)>&& ContinuationHandler) = 0; + protected: enum { diff --git a/zenhttp/workthreadpool.cpp b/zenhttp/workthreadpool.cpp new file mode 100644 index 000000000..41eaaae94 --- /dev/null +++ b/zenhttp/workthreadpool.cpp @@ -0,0 +1,77 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "workthreadpool.h" + +#include <zencore/logging.h> + +namespace zen { + +namespace detail { + struct LambdaWork : IWork + { + LambdaWork(auto Work) : WorkFunction(Work) {} + virtual void Execute() override { WorkFunction(); } + + std::function<void()> WorkFunction; + }; +} // namespace detail + +WorkerThreadPool::WorkerThreadPool(int InThreadCount) +{ + for (int i = 0; i < InThreadCount; ++i) + { + m_WorkerThreads.emplace_back(&WorkerThreadPool::WorkerThreadFunction, this); + } +} + +WorkerThreadPool::~WorkerThreadPool() +{ + m_WorkQueue.CompleteAdding(); + + for (std::thread& Thread : m_WorkerThreads) + { + Thread.join(); + } + + m_WorkerThreads.clear(); +} + +void +WorkerThreadPool::ScheduleWork(Ref<IWork> Work) +{ + m_WorkQueue.Enqueue(std::move(Work)); +} + +void +WorkerThreadPool::ScheduleWork(std::function<void()>&& Work) +{ + m_WorkQueue.Enqueue(new detail::LambdaWork(Work)); +} + +void +WorkerThreadPool::WorkerThreadFunction() +{ + do + { + Ref<IWork> Work; + if (m_WorkQueue.WaitAndDequeue(Work)) + { + try + { + Work->Execute(); + } + catch (std::exception& e) + { + Work->m_Exception = std::current_exception(); + + ZEN_WARN("Caught exception in worker thread: {}", e.what()); + } + } + else + { + return; + } + } while (true); +} + +} // namespace zen
\ No newline at end of file diff --git a/zenhttp/workthreadpool.h b/zenhttp/workthreadpool.h new file mode 100644 index 000000000..6581cc08f --- /dev/null +++ b/zenhttp/workthreadpool.h @@ -0,0 +1,47 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/zencore.h> + +#include <zencore/blockingqueue.h> +#include <zencore/refcount.h> +#include <zencore/windows.h> + +#include <exception> +#include <functional> +#include <system_error> +#include <thread> +#include <vector> + +namespace zen { + +struct IWork : public RefCounted +{ + virtual void Execute() = 0; + + inline std::exception_ptr GetException() { return m_Exception; } + +private: + std::exception_ptr m_Exception; + + friend class WorkerThreadPool; +}; + +class WorkerThreadPool +{ +public: + WorkerThreadPool(int InThreadCount); + ~WorkerThreadPool(); + + void ScheduleWork(Ref<IWork> Work); + void ScheduleWork(std::function<void()>&& Work); + + void WorkerThreadFunction(); + +private: + std::vector<std::thread> m_WorkerThreads; + BlockingQueue<Ref<IWork>> m_WorkQueue; +}; + +} // namespace zen diff --git a/zenhttp/zenhttp.vcxproj b/zenhttp/zenhttp.vcxproj index 899cf4bd1..1fc64bfc2 100644 --- a/zenhttp/zenhttp.vcxproj +++ b/zenhttp/zenhttp.vcxproj @@ -100,6 +100,7 @@ <ClCompile Include="httpsys.cpp" /> <ClCompile Include="httpuws.cpp" /> <ClCompile Include="iothreadpool.cpp" /> + <ClCompile Include="workthreadpool.cpp" /> <ClCompile Include="zenhttp.cpp" /> </ItemGroup> <ItemGroup> @@ -112,6 +113,7 @@ <ClInclude Include="include\zenhttp\httpshared.h" /> <ClInclude Include="include\zenhttp\zenhttp.h" /> <ClInclude Include="iothreadpool.h" /> + <ClInclude Include="workthreadpool.h" /> </ItemGroup> <ItemGroup> <ProjectReference Include="..\zencore\zencore.vcxproj"> diff --git a/zenhttp/zenhttp.vcxproj.filters b/zenhttp/zenhttp.vcxproj.filters index 2e968055c..e57e7a712 100644 --- a/zenhttp/zenhttp.vcxproj.filters +++ b/zenhttp/zenhttp.vcxproj.filters @@ -9,6 +9,7 @@ <ClCompile Include="httpuws.cpp" /> <ClCompile Include="httpshared.cpp" /> <ClCompile Include="zenhttp.cpp" /> + <ClCompile Include="workthreadpool.cpp" /> </ItemGroup> <ItemGroup> <ClInclude Include="include\zenhttp\httpclient.h" /> @@ -20,6 +21,7 @@ <ClInclude Include="httpuws.h" /> <ClInclude Include="include\zenhttp\httpcommon.h" /> <ClInclude Include="include\zenhttp\httpshared.h" /> + <ClInclude Include="workthreadpool.h" /> </ItemGroup> <ItemGroup> <None Include="xmake.lua" /> diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index 27bb1c5cd..4a2a3748a 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -286,28 +286,104 @@ HttpStructuredCacheService::HandleCacheRecordRequest(HttpServerRequest& Request, void HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy) { - const ZenContentType AcceptType = Request.AcceptContentType(); + const ZenContentType AcceptType = Request.AcceptContentType(); + const bool SkipData = (Policy & CachePolicy::SkipData) == CachePolicy::SkipData; + const bool SkipAttachments = (Policy & CachePolicy::SkipAttachments) == CachePolicy::SkipAttachments; + const bool QueryUpstream = m_UpstreamCache && ((Policy & CachePolicy::QueryRemote) == CachePolicy::QueryRemote); - ZenCacheValue Value; - bool Success = m_CacheStore.Get(Ref.BucketSegment, Ref.HashKey, /* out */ Value); - bool InUpstreamCache = false; + bool Success = false; + ZenCacheValue LocalCacheValue; - const bool QueryUpstream = !Success && m_UpstreamCache && (CachePolicy::QueryRemote == (Policy & CachePolicy::QueryRemote)); + if (m_CacheStore.Get(Ref.BucketSegment, Ref.HashKey, LocalCacheValue)) + { + Success = true; - if (QueryUpstream) + if (!SkipData && AcceptType == ZenContentType::kCbPackage) + { + CbPackage Package; + CbObjectView CacheRecord(LocalCacheValue.Value.Data()); + uint32_t AttachmentCount = 0; + uint32_t ValidCount = 0; + + if (!SkipAttachments) + { + CacheRecord.IterateAttachments([this, &Ref, &Package, &AttachmentCount, &ValidCount](CbFieldView AttachmentHash) { + if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash())) + { + Package.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk)))); + ValidCount++; + } + AttachmentCount++; + }); + + if (ValidCount != AttachmentCount) + { + Success = false; + ZEN_WARN("GET - '{}/{}' '{}' FAILED, found '{}' of '{}' attachments", + Ref.BucketSegment, + Ref.HashKey, + ToString(AcceptType), + ValidCount, + AttachmentCount); + } + } + + Package.SetObject(LoadCompactBinaryObject(LocalCacheValue.Value)); + + BinaryWriter MemStream; + Package.Save(MemStream); + + LocalCacheValue.Value = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size()); + LocalCacheValue.Value.SetContentType(HttpContentType::kCbPackage); + } + } + + if (Success) { - const ZenContentType CacheRecordType = AcceptType; + ZEN_DEBUG("HIT - '{}/{}' {} '{}' (LOCAL)", + Ref.BucketSegment, + Ref.HashKey, + NiceBytes(LocalCacheValue.Value.Size()), + ToString(LocalCacheValue.Value.GetContentType())); - if (auto UpstreamResult = m_UpstreamCache->GetCacheRecord({Ref.BucketSegment, Ref.HashKey}, CacheRecordType); + m_CacheStats.HitCount++; + + if (SkipData) + { + return Request.WriteResponse(HttpResponseCode::OK); + } + else + { + return Request.WriteResponse(HttpResponseCode::OK, LocalCacheValue.Value.GetContentType(), LocalCacheValue.Value); + } + } + else if (!QueryUpstream) + { + ZEN_DEBUG("MISS - '{}/{}' '{}'", Ref.BucketSegment, Ref.HashKey, ToString(AcceptType)); + m_CacheStats.MissCount++; + return Request.WriteResponse(HttpResponseCode::NotFound); + } + + // Issue upstream query asynchronously in order to keep requests flowing without + // hogging I/O servicing threads with blocking work + + Request.WriteResponseAsync([this, AcceptType, SkipData, SkipAttachments, Ref](HttpServerRequest& AsyncRequest) { + bool Success = false; + ZenCacheValue UpstreamCacheValue; + + metrics::OperationTiming::Scope $(m_UpstreamGetRequestTiming); + + if (GetUpstreamCacheResult UpstreamResult = m_UpstreamCache->GetCacheRecord({Ref.BucketSegment, Ref.HashKey}, AcceptType); UpstreamResult.Success) { - Value.Value = UpstreamResult.Value; - Success = true; - InUpstreamCache = true; + Success = true; + UpstreamCacheValue.Value = UpstreamResult.Value; + + UpstreamCacheValue.Value.SetContentType(AcceptType); - if (CacheRecordType == ZenContentType::kBinary || CacheRecordType == ZenContentType::kCbObject) + if (AcceptType == ZenContentType::kBinary || AcceptType == ZenContentType::kCbObject) { - if (CacheRecordType == ZenContentType::kCbObject) + if (AcceptType == ZenContentType::kCbObject) { const CbValidateError ValidationResult = ValidateCompactBinary(UpstreamResult.Value, CbValidateMode::All); @@ -320,7 +396,7 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request CacheRecord.IterateAttachments([&](CbFieldView Attachment) { IndexData.AddHash(Attachment.AsHash()); }); IndexData.EndArray(); - Value.IndexData = IndexData.Save(); + UpstreamCacheValue.IndexData = IndexData.Save(); } else { @@ -334,19 +410,17 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request if (Success) { - m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, Value); + m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, UpstreamCacheValue); } } - else + else if (AcceptType == ZenContentType::kCbPackage) { - ZEN_ASSERT(CacheRecordType == ZenContentType::kCbPackage); - CbPackage Package; - if (Package.TryLoad(UpstreamResult.Value)) + if (Package.TryLoad(UpstreamCacheValue.Value)) { + CbObject CacheRecord = Package.GetObject(); uint32_t AttachmentCount = 0; uint32_t ValidCount = 0; - CbObject CacheRecord = Package.GetObject(); CacheRecord.IterateAttachments([this, &Package, &Ref, &AttachmentCount, &ValidCount](CbFieldView AttachmentHash) { if (const CbAttachment* Attachment = Package.FindAttachment(AttachmentHash.AsHash())) @@ -371,7 +445,7 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request { m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, {.Value = CacheRecord.GetBuffer().AsIoBuffer()}); - if (zen::CachePolicy::SkipAttachments == (Policy & zen::CachePolicy::SkipAttachments)) + if (SkipAttachments) { CbPackage PackageWithoutAttachments; PackageWithoutAttachments.SetObject(CacheRecord); @@ -379,7 +453,7 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request BinaryWriter MemStream; PackageWithoutAttachments.Save(MemStream); - Value.Value = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size()); + UpstreamCacheValue.Value = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size()); } } else @@ -398,109 +472,34 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request } } } - } - - if (!Success) - { - ZEN_DEBUG("MISS - '{}/{}' '{}'", Ref.BucketSegment, Ref.HashKey, ToString(AcceptType)); - m_CacheStats.MissCount++; - return Request.WriteResponse(HttpResponseCode::NotFound); - } - - if (AcceptType == ZenContentType::kCbPackage && !InUpstreamCache) - { - CbObjectView CacheRecord(Value.Value.Data()); - const CbValidateError ValidationResult = ValidateCompactBinary(Value.Value, CbValidateMode::All); - - if (ValidationResult != CbValidateError::None) + if (Success) { - ZEN_WARN("GET - '{}/{}' '{}' FAILED, invalid compact binary object", Ref.BucketSegment, Ref.HashKey, ToString(AcceptType)); - m_CacheStats.MissCount++; - return Request.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, "Invalid cache record"sv); - } + ZEN_DEBUG("HIT - '{}/{}' {} '{}' (UPSTREAM)", + Ref.BucketSegment, + Ref.HashKey, + NiceBytes(UpstreamCacheValue.Value.Size()), + ToString(UpstreamCacheValue.Value.GetContentType())); - if ((Policy & CachePolicy::SkipData) == CachePolicy::SkipData) - { m_CacheStats.HitCount++; - return Request.WriteResponse(HttpResponseCode::OK); - } - - const bool SkipAttachments = (Policy & CachePolicy::SkipAttachments) == CachePolicy::SkipAttachments; - uint32_t AttachmentCount = 0; - uint32_t ValidCount = 0; - uint64_t AttachmentBytes = 0ull; - - CbPackage Package; - - if (!SkipAttachments) - { - CacheRecord.IterateAttachments( - [this, &Ref, &Package, &AttachmentCount, &ValidCount, &AttachmentBytes](CbFieldView AttachmentHash) { - if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash())) - { - Package.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk)))); - AttachmentBytes += Chunk.Size(); - ValidCount++; - } - AttachmentCount++; - }); + m_CacheStats.UpstreamHitCount++; - if (ValidCount != AttachmentCount) + if (SkipData) { - ZEN_WARN("GET - '{}/{}' '{}' FAILED, found '{}' of '{}' attachments", - Ref.BucketSegment, - Ref.HashKey, - ToString(AcceptType), - ValidCount, - AttachmentCount); - - m_CacheStats.MissCount++; - return Request.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, "Missing attachments"sv); + AsyncRequest.WriteResponse(HttpResponseCode::OK); + } + else + { + AsyncRequest.WriteResponse(HttpResponseCode::OK, UpstreamCacheValue.Value.GetContentType(), UpstreamCacheValue.Value); } - } - - Package.SetObject(LoadCompactBinaryObject(Value.Value)); - - ZEN_DEBUG("HIT - '{}/{}' {} '{}', {} attachments (LOCAL)", - Ref.BucketSegment, - Ref.HashKey, - NiceBytes(AttachmentBytes + Value.Value.Size()), - ToString(HttpContentType::kCbPackage), - AttachmentCount); - - BinaryWriter MemStream; - Package.Save(MemStream); - - IoBuffer Response(IoBuffer::Clone, MemStream.Data(), MemStream.Size()); - - m_CacheStats.HitCount++; - Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, Response); - } - else - { - ZEN_DEBUG("HIT - '{}/{}' {} '{}' ({})", - Ref.BucketSegment, - Ref.HashKey, - NiceBytes(Value.Value.Size()), - ToString(Value.Value.GetContentType()), - InUpstreamCache ? "UPSTREAM" : "LOCAL"); - - m_CacheStats.HitCount++; - if (InUpstreamCache) - { - m_CacheStats.UpstreamHitCount++; - } - - if ((Policy & CachePolicy::SkipData) == CachePolicy::SkipData) - { - Request.WriteResponse(HttpResponseCode::OK); } else { - Request.WriteResponse(HttpResponseCode::OK, Value.Value.GetContentType(), Value.Value); + ZEN_DEBUG("MISS - '{}/{}' '{}'", Ref.BucketSegment, Ref.HashKey, ToString(AcceptType)); + m_CacheStats.MissCount++; + AsyncRequest.WriteResponse(HttpResponseCode::NotFound); } - } + }); } void @@ -875,6 +874,7 @@ HttpStructuredCacheService::HandleStatsRequest(zen::HttpServerRequest& Request) CbObjectWriter Cbo; EmitSnapshot("requests", m_HttpRequests, Cbo); + EmitSnapshot("upstream_gets", m_UpstreamGetRequestTiming, Cbo); const uint64_t HitCount = m_CacheStats.HitCount; const uint64_t UpstreamHitCount = m_CacheStats.UpstreamHitCount; diff --git a/zenserver/cache/structuredcache.h b/zenserver/cache/structuredcache.h index 703e24ed3..ad7253f79 100644 --- a/zenserver/cache/structuredcache.h +++ b/zenserver/cache/structuredcache.h @@ -30,12 +30,12 @@ enum class CachePolicy : uint8_t; * * {BucketId}/{KeyHash} * - * Where BucketId is an alphanumeric string, and KeyHash is a 40-character hexadecimal - * sequence. The hash value may be derived in any number of ways, it's up to the - * application to pick an approach. + * Where BucketId is a lower-case alphanumeric string, and KeyHash is a 40-character + * hexadecimal sequence. The hash value may be derived in any number of ways, it's + * up to the application to pick an approach. * * Values may be structured or unstructured. Structured values are encoded using Unreal - * Engine's compact binary encoding + * Engine's compact binary encoding (see CbObject) * * Additionally, attachments may be addressed as: * @@ -62,8 +62,7 @@ public: ~HttpStructuredCacheService(); virtual const char* BaseUri() const override; - - virtual void HandleRequest(zen::HttpServerRequest& Request) override; + virtual void HandleRequest(zen::HttpServerRequest& Request) override; void Flush(); void Scrub(ScrubContext& Ctx); @@ -104,6 +103,7 @@ private: std::unique_ptr<UpstreamCache> m_UpstreamCache; uint64_t m_LastScrubTime = 0; metrics::OperationTiming m_HttpRequests; + metrics::OperationTiming m_UpstreamGetRequestTiming; CacheStats m_CacheStats; }; diff --git a/zenserver/testing/httptest.cpp b/zenserver/testing/httptest.cpp index 01866a63b..924546762 100644 --- a/zenserver/testing/httptest.cpp +++ b/zenserver/testing/httptest.cpp @@ -4,9 +4,12 @@ #include <zencore/compactbinarybuilder.h> #include <zencore/compactbinarypackage.h> +#include <zencore/timer.h> namespace zen { +using namespace fmt::literals; + HttpTestingService::HttpTestingService() { m_Router.RegisterRoute( @@ -15,6 +18,44 @@ HttpTestingService::HttpTestingService() HttpVerb::kGet); m_Router.RegisterRoute( + "hello_slow", + [this](HttpRouterRequest& Req) { + Req.ServerRequest().WriteResponseAsync([this](HttpServerRequest& Request) { + Stopwatch Timer; + Sleep(1000); + Request.WriteResponse(HttpResponseCode::OK, + HttpContentType::kText, + "hello, took me {}"_format(NiceTimeSpanMs(Timer.GetElapsedTimeMs()))); + }); + }, + HttpVerb::kGet); + + m_Router.RegisterRoute( + "hello_veryslow", + [this](HttpRouterRequest& Req) { + Req.ServerRequest().WriteResponseAsync([this](HttpServerRequest& Request) { + Stopwatch Timer; + Sleep(60000); + Request.WriteResponse(HttpResponseCode::OK, + HttpContentType::kText, + "hello, took me {}"_format(NiceTimeSpanMs(Timer.GetElapsedTimeMs()))); + }); + }, + HttpVerb::kGet); + + m_Router.RegisterRoute( + "hello_throw", + [this](HttpRouterRequest& Req) { + Req.ServerRequest().WriteResponseAsync([this](HttpServerRequest&) { throw std::runtime_error("intentional error"); }); + }, + HttpVerb::kGet); + + m_Router.RegisterRoute( + "hello_noresponse", + [this](HttpRouterRequest& Req) { Req.ServerRequest().WriteResponseAsync([this](HttpServerRequest&) {}); }, + HttpVerb::kGet); + + m_Router.RegisterRoute( "metrics", [this](HttpRouterRequest& Req) { metrics::OperationTiming::Scope _(m_TimingStats); diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index 9ed392faf..5b2629f72 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -41,7 +41,7 @@ namespace detail { , m_UseLegacyDdc(Options.UseLegacyDdc) { using namespace fmt::literals; - m_DisplayName = "Jupier - '{}'"_format(Options.ServiceUrl); + m_DisplayName = "Jupiter - '{}'"_format(Options.ServiceUrl); m_Client = new CloudCacheClient(Options); } |