From dfee496be1e1b1ca1ee5bdf080f4bc9e3f171fb4 Mon Sep 17 00:00:00 2001 From: Per Larsson Date: Tue, 23 Nov 2021 09:10:58 +0000 Subject: Prevent destroying ASIO server connection until callbacks complete --- zenhttp/httpasio.cpp | 115 ++++++++++++++++++++++++--------------------------- 1 file changed, 54 insertions(+), 61 deletions(-) diff --git a/zenhttp/httpasio.cpp b/zenhttp/httpasio.cpp index ea84e7e87..4e4646e3b 100644 --- a/zenhttp/httpasio.cpp +++ b/zenhttp/httpasio.cpp @@ -6,6 +6,7 @@ #include #include +#include #include ZEN_THIRD_PARTY_INCLUDES_START @@ -169,7 +170,7 @@ private: IoBuffer m_BodyBuffer; uint64_t m_BodyPosition = 0; http_parser m_Parser; - char m_HeaderBuffer[512]; + char m_HeaderBuffer[1024]; void AppendInputBytes(const char* Data, size_t Bytes); void AppendCurrentHeader(); @@ -281,7 +282,7 @@ private: ////////////////////////////////////////////////////////////////////////// -struct HttpServerConnection +struct HttpServerConnection : std::enable_shared_from_this { HttpServerConnection(HttpAsioServerImpl& Server, std::unique_ptr&& Socket); ~HttpServerConnection(); @@ -290,6 +291,8 @@ struct HttpServerConnection void TerminateConnection(); void HandleRequest(); + std::shared_ptr AsSharedPtr() { return shared_from_this(); } + private: enum class RequestState { @@ -369,50 +372,32 @@ HttpServerConnection::EnqueueRead() asio::async_read(*m_Socket.get(), m_RequestBuffer, asio::transfer_at_least(16), - [this](const asio::error_code& Ec, std::size_t ByteCount) { - if (Ec) - { - if (m_RequestState == RequestState::kDone || m_RequestState == RequestState::kInitialRead) - { - // Expected, just silently handle the condition - // - // if we get an I/O error on the boundary between two messages - // it should be fine to just not say anything - - ZEN_TRACE("(expected) socket read error: conn#{} '{}'", m_ConnectionId, Ec.message()); - } - else - { - ZEN_WARN("unexpected socket read error: conn#{} {}", m_ConnectionId, Ec.message()); - } - - delete this; - } - else - { - ZEN_TRACE("read: conn#:{} seq#:{} t:{} bytes:{}", - m_ConnectionId, - m_RequestCounter.load(std::memory_order_relaxed), - GetCurrentThreadId(), - ByteCount); - - OnDataReceived(Ec, ByteCount); - } - }); + [Conn = AsSharedPtr()](const asio::error_code& Ec, std::size_t ByteCount) { Conn->OnDataReceived(Ec, ByteCount); }); } void HttpServerConnection::OnDataReceived(const asio::error_code& Ec, std::size_t ByteCount) { - ZEN_UNUSED(ByteCount); - if (Ec) { - ZEN_ERROR("OnDataReceived Error: {}", Ec.message()); - - return OnError(); + if (m_RequestState == RequestState::kDone || m_RequestState == RequestState::kInitialRead) + { + ZEN_TRACE("on data received ERROR (EXPECTED), connection '{}' reason '{}'", m_ConnectionId, Ec.message()); + return; + } + else + { + ZEN_ERROR("on data received ERROR, connection '{}' reason '{}'", m_ConnectionId, Ec.message()); + return OnError(); + } } + ZEN_TRACE("on data received, connection '{}', request '{}', thread '{}', bytes '{}'", + m_ConnectionId, + m_RequestCounter.load(std::memory_order_relaxed), + GetCurrentThreadId(), + NiceBytes(ByteCount)); + while (m_RequestBuffer.size()) { const asio::const_buffer& InputBuffer = m_RequestBuffer.data(); @@ -438,24 +423,34 @@ HttpServerConnection::OnDataReceived(const asio::error_code& Ec, std::size_t Byt void HttpServerConnection::OnResponseDataSent(const asio::error_code& Ec, std::size_t ByteCount, bool Pop) { - ZEN_UNUSED(ByteCount); if (Ec) { - ZEN_ERROR("OnResponseDataSent Error: {}", Ec.message()); - return OnError(); + ZEN_ERROR("on data sent ERROR, connection '{}' reason '{}'", m_ConnectionId, Ec.message()); + OnError(); } else { + ZEN_TRACE("on data sent, connection '{}', request '{}', thread '{}', bytes '{}'", + m_ConnectionId, + m_RequestCounter.load(std::memory_order_relaxed), + GetCurrentThreadId(), + NiceBytes(ByteCount)); + if (!m_RequestData.IsKeepAlive()) { m_RequestState = RequestState::kDone; m_Socket->close(); } - else if (Pop) + else { - RwLock::ExclusiveLockScope _(m_ResponsesLock); - m_Responses.pop_front(); + if (Pop) + { + RwLock::ExclusiveLockScope _(m_ResponsesLock); + m_Responses.pop_front(); + } + + m_RequestCounter.fetch_add(1); } } } @@ -478,7 +473,7 @@ HttpServerConnection::HandleRequest() if (Ec) { - ZEN_WARN("socket shutdown reported error: {}", Ec.message()); + ZEN_WARN("socket shutdown ERROR, reason '{}'", Ec.message()); } } else @@ -486,14 +481,11 @@ HttpServerConnection::HandleRequest() m_RequestState = RequestState::kWriting; } - const uint32_t RequestNum = m_RequestCounter.load(std::memory_order_relaxed); - m_RequestCounter.fetch_add(1); - if (HttpService* Service = m_Server.RouteRequest(m_RequestData.Url())) { HttpAsioServerRequest Request(m_RequestData, *Service, m_RequestData.Body()); - ZEN_TRACE("Handling request: Conn#{} Req#{}", m_ConnectionId, RequestNum); + ZEN_TRACE("handle request, connection '{}' request '{}'", m_ConnectionId, m_RequestCounter.load(std::memory_order_relaxed)); Service->HandleRequest(Request); @@ -525,9 +517,8 @@ HttpServerConnection::HandleRequest() asio::async_write(*m_Socket.get(), ResponseBuffers, asio::transfer_exactly(ResponseLength), - [this, RequestNum](const asio::error_code& Ec, std::size_t ByteCount) { - ZEN_TRACE("Response sent: Conn#{} Req#{} ({})", m_ConnectionId, RequestNum, NiceBytes(ByteCount)); - OnResponseDataSent(Ec, ByteCount, true); + [Conn = AsSharedPtr()](const asio::error_code& Ec, std::size_t ByteCount) { + Conn->OnResponseDataSent(Ec, ByteCount, true); }); return; @@ -548,9 +539,10 @@ HttpServerConnection::HandleRequest() "\r\n"sv; } - asio::async_write(*m_Socket.get(), asio::buffer(Response), [this](const asio::error_code& Ec, std::size_t ByteCount) { - OnResponseDataSent(Ec, ByteCount); - }); + asio::async_write( + *m_Socket.get(), + asio::buffer(Response), + [Conn = AsSharedPtr()](const asio::error_code& Ec, std::size_t ByteCount) { Conn->OnResponseDataSent(Ec, ByteCount); }); } else { @@ -572,9 +564,10 @@ HttpServerConnection::HandleRequest() "No suitable route found"sv; } - asio::async_write(*m_Socket.get(), asio::buffer(Response), [this](const asio::error_code& Ec, std::size_t ByteCount) { - OnResponseDataSent(Ec, ByteCount); - }); + asio::async_write( + *m_Socket.get(), + asio::buffer(Response), + [Conn = AsSharedPtr()](const asio::error_code& Ec, std::size_t ByteCount) { Conn->OnResponseDataSent(Ec, ByteCount); }); } } @@ -900,14 +893,14 @@ struct HttpAcceptor else { // New connection established, pass socket ownership into connection object - // and initiate request handling loop + // and initiate request handling loop. The connection lifetime is + // managed by the async read/write loop by passing the shared + // reference to the callbacks. Socket->set_option(asio::ip::tcp::no_delay(true)); - HttpServerConnection* Conn = new HttpServerConnection(m_Server, std::move(Socket)); + auto Conn = std::make_shared(m_Server, std::move(Socket)); Conn->HandleNewRequest(); - - // note: the connection object is responsible for deleting itself } if (!m_IsStopped.load()) -- cgit v1.2.3 From 4d35f96deba30a9785e2bb67767139c94ec20608 Mon Sep 17 00:00:00 2001 From: Per Larsson Date: Tue, 23 Nov 2021 15:02:34 +0100 Subject: Removed unused variable. --- zencore/compress.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/zencore/compress.cpp b/zencore/compress.cpp index 61f1effe4..35a5acb3a 100644 --- a/zencore/compress.cpp +++ b/zencore/compress.cpp @@ -1205,7 +1205,6 @@ TEST_CASE("CompressedBuffer") SUBCASE("copy uncompressed range") { - const uint64_t BlockSize = 64 * sizeof(uint64_t); const uint64_t N = 1000; std::vector ExpectedValues = GenerateData(N); -- cgit v1.2.3 From 6fdaf7cf4d5eff0d7d33467540a95d2afa3ab556 Mon Sep 17 00:00:00 2001 From: Per Larsson Date: Tue, 23 Nov 2021 15:43:07 +0100 Subject: Fixed unused variable in integration tests. --- zenserver-test/zenserver-test.cpp | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/zenserver-test/zenserver-test.cpp b/zenserver-test/zenserver-test.cpp index 8f38cc1be..0878678d8 100644 --- a/zenserver-test/zenserver-test.cpp +++ b/zenserver-test/zenserver-test.cpp @@ -1743,6 +1743,7 @@ TEST_CASE("zcache.policy") zen::IoBuffer Body(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()); zen::CbPackage Package; const bool Ok = Package.TryLoad(Body); + CHECK(Ok); CbObject CacheRecord = Package.GetObject(); std::vector AttachmentKeys; @@ -1762,6 +1763,7 @@ TEST_CASE("zcache.policy") zen::IoBuffer Body(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()); zen::CbPackage Package; const bool Ok = Package.TryLoad(Body); + CHECK(Ok); CHECK(Package.GetAttachments().size() != 0); } @@ -1801,6 +1803,7 @@ TEST_CASE("zcache.policy") zen::IoBuffer Body(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()); zen::CbPackage Package; const bool Ok = Package.TryLoad(Body); + CHECK(Ok); CbObject CacheRecord = Package.GetObject(); std::vector AttachmentKeys; @@ -1820,6 +1823,7 @@ TEST_CASE("zcache.policy") zen::IoBuffer Body(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()); zen::CbPackage Package; const bool Ok = Package.TryLoad(Body); + CHECK(Ok); CHECK(Package.GetAttachments().size() != 0); } -- cgit v1.2.3 From dfff6364d4533ebedf1426d2dfc0222c8856b30d Mon Sep 17 00:00:00 2001 From: Zousar Shaker Date: Wed, 24 Nov 2021 00:24:29 -0700 Subject: Changed the asio acceptor initialization to allow dual stack IPV6 connections. --- zenhttp/httpasio.cpp | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/zenhttp/httpasio.cpp b/zenhttp/httpasio.cpp index 4e4646e3b..d5fe9adbb 100644 --- a/zenhttp/httpasio.cpp +++ b/zenhttp/httpasio.cpp @@ -868,8 +868,13 @@ struct HttpAcceptor HttpAcceptor(HttpAsioServerImpl& Server, asio::io_service& IoService, uint16_t Port) : m_Server(Server) , m_IoService(IoService) - , m_Acceptor(m_IoService, asio::ip::tcp::endpoint(asio::ip::address_v4::any(), Port)) + , m_Acceptor(m_IoService, asio::ip::tcp::v6()) { + m_Acceptor.set_option(asio::ip::v6_only(false)); + m_Acceptor.set_option(asio::socket_base::reuse_address(true)); + m_Acceptor.set_option(asio::ip::tcp::no_delay(true)); + m_Acceptor.bind(asio::ip::tcp::endpoint(asio::ip::address_v6::any(), Port)); + m_Acceptor.listen(); } void Start() -- cgit v1.2.3 From 893f9e180abab61fe001ade04a6a8b1e8f519c3a Mon Sep 17 00:00:00 2001 From: Per Larsson Date: Wed, 24 Nov 2021 09:03:10 +0100 Subject: Set empty body in Jupiter client. --- zenserver/upstream/jupiter.cpp | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/zenserver/upstream/jupiter.cpp b/zenserver/upstream/jupiter.cpp index 9223ea0f4..1f82f4a04 100644 --- a/zenserver/upstream/jupiter.cpp +++ b/zenserver/upstream/jupiter.cpp @@ -673,6 +673,7 @@ CloudCacheSession::GetComputeUpdates(std::string_view ChannelId, const uint32_t Session.SetOption(cpr::Url{Uri.c_str()}); Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-ue-cb"}}); + Session.SetOption(cpr::Body{}); cpr::Response Response = Session.Post(); ZEN_DEBUG("POST {}", Response); @@ -708,6 +709,7 @@ CloudCacheSession::GetObjectTree(const IoHash& Key) Session.SetOption(cpr::Url{Uri.c_str()}); Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/octet-stream"}}); + Session.SetOption(cpr::Body{}); cpr::Response Response = Session.Get(); ZEN_DEBUG("GET {}", Response); @@ -772,6 +774,7 @@ CloudCacheSession::CacheTypeExists(std::string_view TypeId, const IoHash& Key) Session.SetOption(cpr::Url{Uri.c_str()}); Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}}); + Session.SetOption(cpr::Body{}); cpr::Response Response = Session.Head(); ZEN_DEBUG("HEAD {}", Response); @@ -810,6 +813,7 @@ CloudCacheSession::CacheTypeExists(std::string_view TypeId, const std::set Date: Wed, 24 Nov 2021 10:18:59 +0100 Subject: Updated deploy script to set platform, architecture and configuration. --- scripts/deploybuild.py | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/scripts/deploybuild.py b/scripts/deploybuild.py index c81235a8c..d137c3da1 100644 --- a/scripts/deploybuild.py +++ b/scripts/deploybuild.py @@ -50,20 +50,20 @@ while not os.path.exists(os.path.join(zenroot, "zen.sln")): zenroot = os.path.dirname(zenroot) jazz_print("Zen root:", zenroot) - # Build fresh binaries -if use_xmake: - build_cmd = ["xmake", "-b", "zenserver"] - build_output_dir = r'build\windows\x64\release' -else: - vs_path = vswhere.get_latest_path() # can also specify prerelease=True - jazz_print("BUILDING CODE", f"using VS root: {vs_path}") - devenv_path = os.path.join(vs_path, "Common7\\IDE\\devenv.com") - build_cmd = [devenv_path, "/build", "Release", "zen.sln"] - build_output_dir = r'x64\Release' - try: + if use_xmake: + subprocess.run(["xmake.exe", "config", "-p", "windows", "-a", "x64", "-m", "release"], check=True) + build_cmd = ["xmake.exe", "build", "--rebuild", "zenserver"] + build_output_dir = r'build\windows\x64\release' + else: + vs_path = vswhere.get_latest_path() # can also specify prerelease=True + jazz_print("BUILDING CODE", f"using VS root: {vs_path}") + devenv_path = os.path.join(vs_path, "Common7\\IDE\\devenv.com") + build_cmd = [devenv_path, "/build", "Release", "zen.sln"] + build_output_dir = r'x64\Release' + subprocess.run(build_cmd, check=True) except: jazz_fail("Build failed!") -- cgit v1.2.3 From 096e46e281a718e719d7c5687f08a71586f031a1 Mon Sep 17 00:00:00 2001 From: Per Larsson Date: Wed, 24 Nov 2021 18:03:14 +0100 Subject: Added bundle xmake task. --- .gitignore | 1 + xmake.lua | 50 ++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 51 insertions(+) diff --git a/.gitignore b/.gitignore index bf237a7b3..5433b71f8 100644 --- a/.gitignore +++ b/.gitignore @@ -20,6 +20,7 @@ x64/ x86/ bld/ +build/ [Bb]in/ [Oo]bj/ [Ll]og/ diff --git a/xmake.lua b/xmake.lua index ced2b55ce..49989615c 100644 --- a/xmake.lua +++ b/xmake.lua @@ -73,3 +73,53 @@ includes("zenstore", "zenstore-test") includes("zenutil") includes("zenserver", "zenserver-test") includes("zen") + +task("bundle") + on_run(function() + import("detect.tools.find_zip") + import("detect.tools.find_7z") + + -- copy files + local dirs = { + binaries = "./build/windows/x64/release", + bundles = "./build/bundles", + bundle = "./build/bundles/zenzerver-win64" + } + + local files = { + dirs.binaries .. "/zenserver.exe", + dirs.binaries .. "/zenserver.pdb", + "./vcpkg_installed/x64-windows-static/tools/sentry-native/crashpad_handler.exe" + } + + os.mkdir(dirs.bundles) + os.mkdir(dirs.bundle) + + for _,file in ipairs(files) do + printf("copy '%s' -> '%s'\n", file, dirs.bundle) + os.cp(file, dirs.bundle) + end + + -- create archive + local bundle_name = "zenserver-win64.zip" + + local zip_cmd = find_7z() + assert(zip_cmd) + + local zip_args = {} + table.insert(zip_args, "a") + table.insert(zip_args, dirs.bundles .. "/" .. bundle_name) + table.insert(zip_args, dirs.bundle .. "/*.*") + + printf("creating bundle '%s'...", dirs.bundles .. "/" .. bundle_name) + os.runv(zip_cmd, zip_args) + os.rm(dirs.bundle) + + printf(" Ok!") + end) + + set_menu { + usage = "xmake bundle", + description = "Create zip bundle from binaries", + options = {} + } -- cgit v1.2.3