diff options
| author | Wladimir J. van der Laan <[email protected]> | 2015-09-28 16:03:41 +0200 |
|---|---|---|
| committer | Wladimir J. van der Laan <[email protected]> | 2015-09-28 16:03:51 +0200 |
| commit | 1a9f19a78daa392baf2e062bddff597ce0ce30b6 (patch) | |
| tree | b80a79a74bdd6a0160a2fd76c0bff2d382b85cd7 /src/httpserver.cpp | |
| parent | Merge pull request #6724 (diff) | |
| parent | http: Force-exit event loop after predefined time (diff) | |
| download | discoin-1a9f19a78daa392baf2e062bddff597ce0ce30b6.tar.xz discoin-1a9f19a78daa392baf2e062bddff597ce0ce30b6.zip | |
Merge pull request #6719
ec908d5 http: Force-exit event loop after predefined time (Wladimir J. van der Laan)
de9de2d http: Wait for worker threads to exit (Wladimir J. van der Laan)
5e0c221 Make HTTP server shutdown more graceful (Wladimir J. van der Laan)
Diffstat (limited to 'src/httpserver.cpp')
| -rw-r--r-- | src/httpserver.cpp | 74 |
1 files changed, 65 insertions, 9 deletions
diff --git a/src/httpserver.cpp b/src/httpserver.cpp index 600e57b7c..0a7f903e9 100644 --- a/src/httpserver.cpp +++ b/src/httpserver.cpp @@ -72,13 +72,35 @@ private: std::deque<WorkItem*> queue; bool running; size_t maxDepth; + int numThreads; + + /** RAII object to keep track of number of running worker threads */ + class ThreadCounter + { + public: + WorkQueue &wq; + ThreadCounter(WorkQueue &w): wq(w) + { + boost::lock_guard<boost::mutex> lock(wq.cs); + wq.numThreads += 1; + } + ~ThreadCounter() + { + boost::lock_guard<boost::mutex> lock(wq.cs); + wq.numThreads -= 1; + wq.cond.notify_all(); + } + }; public: WorkQueue(size_t maxDepth) : running(true), - maxDepth(maxDepth) + maxDepth(maxDepth), + numThreads(0) { } - /* Precondition: worker threads have all stopped */ + /*( Precondition: worker threads have all stopped + * (call WaitExit) + */ ~WorkQueue() { while (!queue.empty()) { @@ -100,6 +122,7 @@ public: /** Thread function */ void Run() { + ThreadCounter count(*this); while (running) { WorkItem* i = 0; { @@ -122,6 +145,13 @@ public: running = false; cond.notify_all(); } + /** Wait for worker threads to exit */ + void WaitExit() + { + boost::unique_lock<boost::mutex> lock(cs); + while (numThreads > 0) + cond.wait(lock); + } /** Return current depth of queue */ size_t Depth() @@ -155,6 +185,8 @@ static std::vector<CSubNet> rpc_allow_subnets; static WorkQueue<HTTPClosure>* workQueue = 0; //! Handlers for (sub)paths std::vector<HTTPPathHandler> pathHandlers; +//! Bound listening sockets +std::vector<evhttp_bound_socket *> boundSockets; /** Check if a network address is allowed to access the HTTP server */ static bool ClientAllowed(const CNetAddr& netaddr) @@ -264,6 +296,13 @@ static void http_request_cb(struct evhttp_request* req, void* arg) } } +/** Callback to reject HTTP requests after shutdown. */ +static void http_reject_request_cb(struct evhttp_request* req, void*) +{ + LogPrint("http", "Rejecting request while shutting down\n"); + evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL); +} + /** Event dispatcher thread */ static void ThreadHTTP(struct event_base* base, struct evhttp* http) { @@ -278,7 +317,6 @@ static void ThreadHTTP(struct event_base* base, struct evhttp* http) static bool HTTPBindAddresses(struct evhttp* http) { int defaultPort = GetArg("-rpcport", BaseParams().RPCPort()); - int nBound = 0; std::vector<std::pair<std::string, uint16_t> > endpoints; // Determine what addresses to bind to @@ -304,13 +342,14 @@ static bool HTTPBindAddresses(struct evhttp* http) // Bind addresses for (std::vector<std::pair<std::string, uint16_t> >::iterator i = endpoints.begin(); i != endpoints.end(); ++i) { LogPrint("http", "Binding RPC on address %s port %i\n", i->first, i->second); - if (evhttp_bind_socket(http, i->first.empty() ? NULL : i->first.c_str(), i->second) == 0) { - nBound += 1; + evhttp_bound_socket *bind_handle = evhttp_bind_socket_with_handle(http, i->first.empty() ? NULL : i->first.c_str(), i->second); + if (bind_handle) { + boundSockets.push_back(bind_handle); } else { LogPrintf("Binding RPC on address %s port %i failed.\n", i->first, i->second); } } - return nBound > 0; + return !boundSockets.empty(); } /** Simple wrapper to set thread name and run work queue */ @@ -410,8 +449,21 @@ bool StartHTTPServer(boost::thread_group& threadGroup) void InterruptHTTPServer() { LogPrint("http", "Interrupting HTTP server\n"); - if (eventBase) - event_base_loopbreak(eventBase); + if (eventHTTP) { + // Unlisten sockets + BOOST_FOREACH (evhttp_bound_socket *socket, boundSockets) { + evhttp_del_accept_socket(eventHTTP, socket); + } + // Reject requests on current connections + evhttp_set_gencb(eventHTTP, http_reject_request_cb, NULL); + } + if (eventBase) { + // Force-exit event loop after predefined time + struct timeval tv; + tv.tv_sec = 10; + tv.tv_usec = 0; + event_base_loopexit(eventBase, &tv); + } if (workQueue) workQueue->Interrupt(); } @@ -419,7 +471,11 @@ void InterruptHTTPServer() void StopHTTPServer() { LogPrint("http", "Stopping HTTP server\n"); - delete workQueue; + if (workQueue) { + LogPrint("http", "Waiting for HTTP worker threads to exit\n"); + workQueue->WaitExit(); + delete workQueue; + } if (eventHTTP) { evhttp_free(eventHTTP); eventHTTP = 0; |