aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2023-09-27 16:25:15 +0200
committerGitHub <[email protected]>2023-09-27 16:25:15 +0200
commitc9e43e1ad5ea3f41e1f7df876460ede167d76351 (patch)
treee554bfd12d1ac21f33d5cbfcf99cc24cd441c93d /src
parentchache upstream stats improved (#426) (diff)
downloadzen-c9e43e1ad5ea3f41e1f7df876460ede167d76351.tar.xz
zen-c9e43e1ad5ea3f41e1f7df876460ede167d76351.zip
prefer to handle cache RPC requests synchronously (#428)
* only handle RPC requests in a worker thread if we have an upstream. we may as well handle the request inline on the http_io thread if we're only dealing with local data since the response times should be pretty consistent in that case * http.sys: don't create async worker thread pool until it's needed (typically only if we have an upstream)
Diffstat (limited to 'src')
-rw-r--r--src/zenhttp/httpsys.cpp23
-rw-r--r--src/zenhttp/httpsys.h8
-rw-r--r--src/zenserver/cache/httpstructuredcache.cpp21
3 files changed, 44 insertions, 8 deletions
diff --git a/src/zenhttp/httpsys.cpp b/src/zenhttp/httpsys.cpp
index f95a31914..886d53143 100644
--- a/src/zenhttp/httpsys.cpp
+++ b/src/zenhttp/httpsys.cpp
@@ -747,7 +747,7 @@ HttpSysServer::HttpSysServer(const Config& Config)
, m_IsRequestLoggingEnabled(Config.IsRequestLoggingEnabled)
, m_IsAsyncResponseEnabled(Config.IsAsyncResponseEnabled)
, m_ThreadPool(Config.ThreadCount != 0 ? Config.ThreadCount : std::thread::hardware_concurrency())
-, m_AsyncWorkPool(Config.AsyncWorkThreadCount != 0 ? Config.AsyncWorkThreadCount : 16, "http_async")
+, m_InitialConfig(Config)
{
ULONG Result = HttpInitialize(HTTPAPI_VERSION_2, HTTP_INITIALIZE_SERVER, nullptr);
@@ -768,6 +768,9 @@ HttpSysServer::~HttpSysServer()
{
ZEN_ERROR("~HttpSysServer() called without calling Close() first");
}
+
+ delete m_AsyncWorkPool;
+ m_AsyncWorkPool = nullptr;
}
void
@@ -977,6 +980,24 @@ HttpSysServer::Cleanup()
}
}
+WorkerThreadPool&
+HttpSysServer::WorkPool()
+{
+ if (!m_AsyncWorkPool)
+ {
+ RwLock::ExclusiveLockScope _(m_AsyncWorkPoolInitLock);
+
+ if (!m_AsyncWorkPool)
+ {
+ unsigned int WorkerThreadCount = m_InitialConfig.AsyncWorkThreadCount != 0 ? m_InitialConfig.AsyncWorkThreadCount : 16;
+
+ m_AsyncWorkPool = new WorkerThreadPool(WorkerThreadCount, "http_async");
+ }
+ }
+
+ return *m_AsyncWorkPool;
+}
+
void
HttpSysServer::StartServer()
{
diff --git a/src/zenhttp/httpsys.h b/src/zenhttp/httpsys.h
index 3a2a6065d..d79206082 100644
--- a/src/zenhttp/httpsys.h
+++ b/src/zenhttp/httpsys.h
@@ -54,7 +54,7 @@ public:
virtual void RegisterService(HttpService& Service) override;
virtual void Close() override;
- WorkerThreadPool& WorkPool() { return m_AsyncWorkPool; }
+ WorkerThreadPool& WorkPool();
inline bool IsOk() const { return m_IsOk; }
inline bool IsAsyncResponseEnabled() const { return m_IsAsyncResponseEnabled; }
@@ -80,8 +80,9 @@ private:
bool m_IsRequestLoggingEnabled = false;
bool m_IsAsyncResponseEnabled = true;
- WinIoThreadPool m_ThreadPool;
- WorkerThreadPool m_AsyncWorkPool;
+ WinIoThreadPool m_ThreadPool;
+ RwLock m_AsyncWorkPoolInitLock;
+ WorkerThreadPool* m_AsyncWorkPool = nullptr;
std::vector<std::wstring> m_BaseUris; // eg: http://*:nnnn/
HTTP_SERVER_SESSION_ID m_HttpSessionId = 0;
@@ -92,6 +93,7 @@ private:
int32_t m_MinPendingRequests = 16;
int32_t m_MaxPendingRequests = 128;
Event m_ShutdownEvent;
+ const Config m_InitialConfig;
};
} // namespace zen
diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp
index 11ac81dcb..4a2fdd96b 100644
--- a/src/zenserver/cache/httpstructuredcache.cpp
+++ b/src/zenserver/cache/httpstructuredcache.cpp
@@ -1729,6 +1729,9 @@ void
HttpStructuredCacheService::HandleRpcRequest(HttpServerRequest& Request)
{
ZEN_TRACE_CPU("z$::Http::HandleRpcRequest");
+
+ const bool HasUpstream = m_UpstreamCache.IsActive();
+
switch (Request.RequestVerb())
{
case HttpVerb::kPost:
@@ -1745,7 +1748,7 @@ HttpStructuredCacheService::HandleRpcRequest(HttpServerRequest& Request)
return Request.WriteResponse(HttpResponseCode::BadRequest);
}
- Request.WriteResponseAsync(
+ auto HandleRpc =
[this, RequestContext, Body = Request.ReadPayload(), ContentType, AcceptType](HttpServerRequest& AsyncRequest) mutable {
ZEN_TRACE_CPU("z$::Http::HandleRpcRequest::WriteResponseAsync");
std::uint64_t RequestIndex =
@@ -1764,9 +1767,9 @@ HttpStructuredCacheService::HandleRpcRequest(HttpServerRequest& Request)
RpcResult);
if (!IsHttpSuccessCode(ResultCode))
{
- AsyncRequest.WriteResponse(ResultCode);
- return;
+ return AsyncRequest.WriteResponse(ResultCode);
}
+
if (AcceptMagic == kCbPkgMagic)
{
void* TargetProcessHandle = nullptr;
@@ -1804,9 +1807,19 @@ HttpStructuredCacheService::HandleRpcRequest(HttpServerRequest& Request)
HttpContentType::kCbPackage,
IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()));
}
- });
+ };
+
+ if (HasUpstream)
+ {
+ Request.WriteResponseAsync(std::move(HandleRpc));
+ }
+ else
+ {
+ HandleRpc(Request);
+ }
}
break;
+
default:
m_CacheStats.BadRequestCount++;
Request.WriteResponse(HttpResponseCode::BadRequest);