diff options
| author | Stefan Boberg <[email protected]> | 2021-10-14 19:07:14 +0200 |
|---|---|---|
| committer | GitHub <[email protected]> | 2021-10-14 19:07:14 +0200 |
| commit | 2b71d6a8d57c773bc7734b253a1ffd1e47162184 (patch) | |
| tree | c0c70f9f2f8b9dc895080aac9f7de1140c56ebf0 | |
| parent | Merge branch 'main' of https://github.com/EpicGames/zen (diff) | |
| download | zen-2b71d6a8d57c773bc7734b253a1ffd1e47162184.tar.xz zen-2b71d6a8d57c773bc7734b253a1ffd1e47162184.zip | |
asio HTTP implementation (#23)
asio-based HTTP implementation
| -rw-r--r-- | vcpkg.json | 3 | ||||
| -rw-r--r-- | xmake.lua | 5 | ||||
| -rw-r--r-- | zencore/include/zencore/iobuffer.h | 1 | ||||
| -rw-r--r-- | zencore/include/zencore/string.h | 22 | ||||
| -rw-r--r-- | zencore/include/zencore/thread.h | 3 | ||||
| -rw-r--r-- | zencore/iobuffer.cpp | 83 | ||||
| -rw-r--r-- | zencore/string.cpp | 7 | ||||
| -rw-r--r-- | zencore/thread.cpp | 55 | ||||
| -rw-r--r-- | zenhttp/httpasio.cpp | 1125 | ||||
| -rw-r--r-- | zenhttp/httpasio.h (renamed from zenhttp/httpuws.h) | 19 | ||||
| -rw-r--r-- | zenhttp/httpserver.cpp | 52 | ||||
| -rw-r--r-- | zenhttp/httpsys.cpp | 12 | ||||
| -rw-r--r-- | zenhttp/httpuws.cpp | 94 | ||||
| -rw-r--r-- | zenhttp/include/zenhttp/httpserver.h | 24 | ||||
| -rw-r--r-- | zenhttp/xmake.lua | 2 | ||||
| -rw-r--r-- | zenhttp/zenhttp.vcxproj | 4 | ||||
| -rw-r--r-- | zenhttp/zenhttp.vcxproj.filters | 4 | ||||
| -rw-r--r-- | zenserver/config.cpp | 13 | ||||
| -rw-r--r-- | zenserver/config.h | 3 | ||||
| -rw-r--r-- | zenserver/xmake.lua | 2 | ||||
| -rw-r--r-- | zenserver/zenserver.cpp | 6 |
21 files changed, 1375 insertions, 164 deletions
diff --git a/vcpkg.json b/vcpkg.json index 45910556a..4a0c41d3b 100644 --- a/vcpkg.json +++ b/vcpkg.json @@ -21,7 +21,6 @@ "features": [ "lz4", "zstd" ] }, "sol2", - "sentry-native", - "uwebsockets" + "sentry-native" ] } @@ -17,10 +17,7 @@ add_requires( "vcpkg::curl", "vcpkg::zlib", "vcpkg::zstd", - "vcpkg::http-parser", - "vcpkg::uwebsockets", - "vcpkg::usockets", - "vcpkg::libuv") + "vcpkg::http-parser") add_rules("mode.debug", "mode.release") diff --git a/zencore/include/zencore/iobuffer.h b/zencore/include/zencore/iobuffer.h index 5fbeaeaeb..db462e238 100644 --- a/zencore/include/zencore/iobuffer.h +++ b/zencore/include/zencore/iobuffer.h @@ -374,6 +374,7 @@ public: ZENCORE_API static IoBuffer MakeFromFile(const path_char_t* FileName, uint64_t Offset = 0, uint64_t Size = ~0ull); ZENCORE_API static IoBuffer MakeFromTemporaryFile(const std::filesystem::path& FileName); ZENCORE_API static IoBuffer MakeFromFileHandle(void* FileHandle, uint64_t Offset = 0, uint64_t Size = ~0ull); + ZENCORE_API static IoBuffer ReadFromFileMaybe(IoBuffer& InBuffer); inline static IoBuffer MakeCloneFromMemory(const void* Ptr, size_t Sz) { return IoBuffer(IoBuffer::Clone, Ptr, Sz); } }; diff --git a/zencore/include/zencore/string.h b/zencore/include/zencore/string.h index c205b199d..a94e063a4 100644 --- a/zencore/include/zencore/string.h +++ b/zencore/include/zencore/string.h @@ -619,9 +619,23 @@ HashStringDjb2(const std::string_view& InString) { uint32_t HashValue = 5381; - for (int c : InString) + for (int CurChar : InString) { - HashValue = HashValue * 33 + c; + HashValue = HashValue * 33 + CurChar; + } + + return HashValue; +} + +constexpr uint32_t +HashStringAsLowerDjb2(const std::string_view& InString) +{ + uint32_t HashValue = 5381; + + for (int CurChar : InString) + { + CurChar -= ((CurChar - 'A') <= ('Z' - 'A')) * ('A' - 'a'); // this should be compiled into branchless logic + HashValue = HashValue * 33 + CurChar; } return HashValue; @@ -634,9 +648,9 @@ ToLower(const std::string_view& InString) { std::string Out(InString); - for (char& C : Out) + for (char& CurChar : Out) { - C = static_cast<char>(std::tolower(C)); + CurChar -= ((CurChar - 'A') <= ('Z' - 'A')) * ('A' - 'a'); // this should be compiled into branchless logic } return Out; diff --git a/zencore/include/zencore/thread.h b/zencore/include/zencore/thread.h index 410ffbd1e..9fc4c87a2 100644 --- a/zencore/include/zencore/thread.h +++ b/zencore/include/zencore/thread.h @@ -6,10 +6,13 @@ #include <shared_mutex> +#include <string_view> #include <vector> namespace zen { +void SetCurrentThreadName(std::string_view ThreadName); + /** * Reader-writer lock * diff --git a/zencore/iobuffer.cpp b/zencore/iobuffer.cpp index 922c14f83..119e37d0b 100644 --- a/zencore/iobuffer.cpp +++ b/zencore/iobuffer.cpp @@ -144,7 +144,7 @@ IoBufferCore::MakeOwned(bool Immutable) { if (!IsOwned()) { - const void* OldDataPtr = m_DataPtr; + const void* OldDataPtr = m_DataPtr; AllocateBuffer(m_DataBytes, sizeof(void*)); memcpy(const_cast<void*>(m_DataPtr), OldDataPtr, m_DataBytes); SetIsOwnedByThis(true); @@ -188,29 +188,29 @@ IoBufferExtendedCore::~IoBufferExtendedCore() { if (m_MappedPointer) { -# if ZEN_PLATFORM_WINDOWS +#if ZEN_PLATFORM_WINDOWS UnmapViewOfFile(m_MappedPointer); -# else +#else uint64_t MapSize = ~uint64_t(uintptr_t(m_MmapHandle)); munmap(m_MappedPointer, MapSize); -# endif +#endif } -# if ZEN_PLATFORM_WINDOWS +#if ZEN_PLATFORM_WINDOWS if (m_Flags & kOwnsMmap) { CloseHandle(m_MmapHandle); } -# endif +#endif if (m_Flags & kOwnsFile) { -# if ZEN_PLATFORM_WINDOWS +#if ZEN_PLATFORM_WINDOWS BOOL Success = CloseHandle(m_FileHandle); -# else +#else int Fd = int(uintptr_t(m_FileHandle)); bool Success = (close(Fd) == 0); -# endif +#endif if (!Success) { @@ -244,7 +244,7 @@ IoBufferExtendedCore::Materialize() const const uint64_t MappedOffsetDisplacement = m_FileOffset - MapOffset; const uint64_t MapSize = m_DataBytes + MappedOffsetDisplacement; -# if ZEN_PLATFORM_WINDOWS +#if ZEN_PLATFORM_WINDOWS m_MmapHandle = CreateFileMapping(m_FileHandle, /* lpFileMappingAttributes */ nullptr, /* flProtect */ PAGE_READONLY, @@ -265,7 +265,7 @@ IoBufferExtendedCore::Materialize() const /* FileOffsetHigh */ uint32_t(MapOffset >> 32), /* FileOffsetLow */ uint32_t(MapOffset & 0xffFFffFFu), /* dwNumberOfBytesToMap */ MapSize); -# else +#else m_MmapHandle = (void*)uintptr_t(~MapSize); // ~ so it's never null (assuming MapSize >= 0) m_Flags |= kOwnsMmap; @@ -276,7 +276,7 @@ IoBufferExtendedCore::Materialize() const /* flags */ MAP_SHARED | MAP_NORESERVE, /* fd */ int(uintptr_t(m_FileHandle)), /* offset */ MapOffset); -# endif // ZEN_PLATFORM_WINDOWS +#endif // ZEN_PLATFORM_WINDOWS if (MappedBase == nullptr) { @@ -371,6 +371,41 @@ IoBuffer::GetFileReference(IoBufferFileReference& OutRef) const ////////////////////////////////////////////////////////////////////////// IoBuffer +IoBufferBuilder::ReadFromFileMaybe(IoBuffer& InBuffer) +{ + IoBufferFileReference FileRef; + if (InBuffer.GetFileReference(/* out */ FileRef)) + { + IoBuffer OutBuffer(FileRef.FileChunkSize); + +#if ZEN_PLATFORM_WINDOWS + OVERLAPPED Ovl{}; + + const uint64_t NumberOfBytesToRead = FileRef.FileChunkSize; + const uint64_t& FileOffset = FileRef.FileChunkOffset; + + Ovl.Offset = DWORD(FileOffset & 0xffff'ffffu); + Ovl.OffsetHigh = DWORD(FileOffset >> 32); + + DWORD dwNumberOfBytesRead = 0; + BOOL Success = ::ReadFile(FileRef.FileHandle, OutBuffer.MutableData(), DWORD(NumberOfBytesToRead), &dwNumberOfBytesRead, &Ovl); + + ZEN_ASSERT(dwNumberOfBytesRead == NumberOfBytesToRead); + + // TODO: error handling + + return OutBuffer; +#else +# error Needs implementation +#endif + } + else + { + return InBuffer; + } +} + +IoBuffer IoBufferBuilder::MakeFromFileHandle(void* FileHandle, uint64_t Offset, uint64_t Size) { return IoBuffer(IoBuffer::BorrowedFile, FileHandle, Offset, Size); @@ -381,7 +416,7 @@ IoBufferBuilder::MakeFromFile(const path_char_t* FileName, uint64_t Offset, uint { uint64_t FileSize; -# if ZEN_PLATFORM_WINDOWS +#if ZEN_PLATFORM_WINDOWS CAtlFile DataFile; HRESULT hRes = DataFile.Create(FileName, GENERIC_READ, FILE_SHARE_READ, OPEN_EXISTING); @@ -392,7 +427,7 @@ IoBufferBuilder::MakeFromFile(const path_char_t* FileName, uint64_t Offset, uint } DataFile.GetSize((ULONGLONG&)FileSize); -# else +#else int Fd = open(FileName, O_RDONLY); if (Fd < 0) { @@ -403,7 +438,7 @@ IoBufferBuilder::MakeFromFile(const path_char_t* FileName, uint64_t Offset, uint struct stat Stat; fstat(Fd, &Stat); FileSize = Stat.st_size; -# endif // ZEN_PLATFORM_WINDOWS +#endif // ZEN_PLATFORM_WINDOWS // TODO: should validate that offset is in range @@ -422,15 +457,15 @@ IoBufferBuilder::MakeFromFile(const path_char_t* FileName, uint64_t Offset, uint if (Size) { -# if ZEN_PLATFORM_WINDOWS +#if ZEN_PLATFORM_WINDOWS void* Fd = DataFile.Detach(); -# endif +#endif return IoBuffer(IoBuffer::File, (void*)uintptr_t(Fd), Offset, Size); } -# if !ZEN_PLATFORM_WINDOWS +#if !ZEN_PLATFORM_WINDOWS close(Fd); -# endif +#endif // For an empty file, we may as well just return an empty memory IoBuffer return IoBuffer(IoBuffer::Wrap, "", 0); @@ -442,7 +477,7 @@ IoBufferBuilder::MakeFromTemporaryFile(const std::filesystem::path& FileName) uint64_t FileSize; void* Handle; -# if ZEN_PLATFORM_WINDOWS +#if ZEN_PLATFORM_WINDOWS CAtlFile DataFile; // We need to open with DELETE since this is used for the case @@ -459,7 +494,7 @@ IoBufferBuilder::MakeFromTemporaryFile(const std::filesystem::path& FileName) DataFile.GetSize((ULONGLONG&)FileSize); Handle = DataFile.Detach(); -# else +#else int Fd = open(FileName.native().c_str(), O_RDONLY); if (Fd < 0) { @@ -472,7 +507,7 @@ IoBufferBuilder::MakeFromTemporaryFile(const std::filesystem::path& FileName) FileSize = Stat.st_size; Handle = (void*)uintptr_t(Fd); -# endif // ZEN_PLATFORM_WINDOWS +#endif // ZEN_PLATFORM_WINDOWS IoBuffer Iob(IoBuffer::File, Handle, 0, FileSize); Iob.m_Core->SetIsWholeFile(true); @@ -489,7 +524,7 @@ HashBuffer(IoBuffer& Buffer) ////////////////////////////////////////////////////////////////////////// -# if ZEN_WITH_TESTS +#if ZEN_WITH_TESTS void iobuffer_forcelink() @@ -503,6 +538,6 @@ TEST_CASE("IoBuffer") zen::IoBuffer buffer3(buffer2, 0, buffer2.Size()); } -# endif +#endif } // namespace zen diff --git a/zencore/string.cpp b/zencore/string.cpp index 49824a910..8e7921bb6 100644 --- a/zencore/string.cpp +++ b/zencore/string.cpp @@ -929,6 +929,13 @@ TEST_CASE("string") { using namespace std::literals; + SUBCASE("hash_djb2") + { + CHECK(HashStringAsLowerDjb2("AbcdZ"sv) == HashStringDjb2("abcdz"sv)); + CHECK(HashStringAsLowerDjb2("aBCd"sv) == HashStringDjb2("abcd"sv)); + CHECK(HashStringAsLowerDjb2("aBCd"sv) == HashStringDjb2(ToLower("aBCd"sv))); + } + SUBCASE("ForEachStrTok") { const auto Tokens = "here,is,my,different,tokens"sv; diff --git a/zencore/thread.cpp b/zencore/thread.cpp index 20ab19f56..6f27ee528 100644 --- a/zencore/thread.cpp +++ b/zencore/thread.cpp @@ -14,6 +14,61 @@ namespace zen { +#if ZEN_PLATFORM_WINDOWS +// The information on how to set the thread name comes from +// a MSDN article: http://msdn2.microsoft.com/en-us/library/xcb2z8hs.aspx +const DWORD kVCThreadNameException = 0x406D1388; +typedef struct tagTHREADNAME_INFO +{ + DWORD dwType; // Must be 0x1000. + LPCSTR szName; // Pointer to name (in user addr space). + DWORD dwThreadID; // Thread ID (-1=caller thread). + DWORD dwFlags; // Reserved for future use, must be zero. +} THREADNAME_INFO; +// The SetThreadDescription API was brought in version 1607 of Windows 10. +typedef HRESULT(WINAPI* SetThreadDescription)(HANDLE hThread, PCWSTR lpThreadDescription); +// This function has try handling, so it is separated out of its caller. +void +SetNameInternal(DWORD thread_id, const char* name) +{ + THREADNAME_INFO info; + info.dwType = 0x1000; + info.szName = name; + info.dwThreadID = thread_id; + info.dwFlags = 0; + __try + { + RaiseException(kVCThreadNameException, 0, sizeof(info) / sizeof(DWORD), reinterpret_cast<DWORD_PTR*>(&info)); + } + __except (EXCEPTION_CONTINUE_EXECUTION) + { + } +} +#endif + +void +SetCurrentThreadName([[maybe_unused]] std::string_view ThreadName) +{ +#if ZEN_PLATFORM_WINDOWS + // The SetThreadDescription API works even if no debugger is attached. + static auto SetThreadDescriptionFunc = + reinterpret_cast<SetThreadDescription>(::GetProcAddress(::GetModuleHandle(L"Kernel32.dll"), "SetThreadDescription")); + + if (SetThreadDescriptionFunc) + { + SetThreadDescriptionFunc(::GetCurrentThread(), Utf8ToWide(ThreadName).c_str()); + } + // The debugger needs to be around to catch the name in the exception. If + // there isn't a debugger, we are just needlessly throwing an exception. + if (!::IsDebuggerPresent()) + return; + + std::string ThreadNameZ{ThreadName}; + SetNameInternal(GetCurrentThreadId(), ThreadNameZ.c_str()); +#else +#endif +} // namespace zen + void RwLock::AcquireShared() { diff --git a/zenhttp/httpasio.cpp b/zenhttp/httpasio.cpp new file mode 100644 index 000000000..015d32633 --- /dev/null +++ b/zenhttp/httpasio.cpp @@ -0,0 +1,1125 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "httpasio.h" + +#include <zenhttp/httpserver.h> + +#include <conio.h> +#include <zencore/logging.h> + +#include <http_parser.h> +#include <asio.hpp> +#include <deque> + +namespace zen::asio_http { + +using namespace std::literals; +using namespace fmt::literals; + +struct HttpAcceptor; +struct HttpRequest; +struct HttpResponse; +struct HttpServerConnection; + +static constinit uint32_t HashContentLength = HashStringAsLowerDjb2("Content-Length"sv); +static constinit uint32_t HashContentType = HashStringAsLowerDjb2("Content-Type"sv); +static constinit uint32_t HashAccept = HashStringAsLowerDjb2("Accept"sv); +static constinit uint32_t HashSession = HashStringAsLowerDjb2("UE-Session"sv); +static constinit uint32_t HashRequest = HashStringAsLowerDjb2("UE-Request"sv); + +inline spdlog::logger& +InitLogger() +{ + spdlog::logger& Logger = logging::Get("asio"); +// Logger.set_level(spdlog::level::trace); + return Logger; +} + +inline spdlog::logger& +Log() +{ + static spdlog::logger& g_Logger = InitLogger(); + return g_Logger; +} + +////////////////////////////////////////////////////////////////////////// + +struct HttpAsioServerImpl +{ +public: + HttpAsioServerImpl(); + ~HttpAsioServerImpl(); + + void Start(uint16_t Port, int ThreadCount); + void Stop(); + void RegisterService(const char* UrlPath, HttpService& Service); + HttpService* RouteRequest(std::string_view Url); + + asio::io_service m_IoService; + asio::io_service::work m_Work{m_IoService}; + std::unique_ptr<asio_http::HttpAcceptor> m_Acceptor; + std::vector<std::thread> m_ThreadPool; + + struct ServiceEntry + { + std::string ServiceUrlPath; + HttpService* Service; + }; + + RwLock m_Lock; + std::vector<ServiceEntry> m_UriHandlers; +}; + +/** + * This is the class which request handlers use to interact with the server instance + */ + +class HttpAsioServerRequest : public HttpServerRequest +{ +public: + HttpAsioServerRequest(asio_http::HttpRequest& Request, HttpService& Service, IoBuffer PayloadBuffer); + ~HttpAsioServerRequest(); + + virtual Oid ParseSessionId() const override; + virtual uint32_t ParseRequestId() const override; + + virtual IoBuffer ReadPayload() override; + virtual void WriteResponse(HttpResponseCode ResponseCode) override; + virtual void WriteResponse(HttpResponseCode ResponseCode, HttpContentType ContentType, std::span<IoBuffer> Blobs) override; + virtual void WriteResponse(HttpResponseCode ResponseCode, HttpContentType ContentType, std::u8string_view ResponseString) override; + virtual void WriteResponseAsync(std::function<void(HttpServerRequest&)>&& ContinuationHandler) override; + + using HttpServerRequest::WriteResponse; + + HttpAsioServerRequest(const HttpAsioServerRequest&) = delete; + HttpAsioServerRequest& operator=(const HttpAsioServerRequest&) = delete; + + asio_http::HttpRequest& m_Request; + IoBuffer m_PayloadBuffer; + std::string_view m_QueryString; + std::unique_ptr<HttpResponse> m_Response; +}; + +struct HttpRequest +{ + explicit HttpRequest(HttpServerConnection& Connection) : m_Connection(Connection) {} + + void Initialize(); + size_t ConsumeData(const char* InputData, size_t DataSize); + + void ResetState(); + + HttpVerb RequestVerb() const { return m_RequestVerb; } + bool IsKeepAlive() const { return m_KeepAlive; } + std::string_view Url() const { return std::string_view(m_Url, m_UrlLength); } + IoBuffer Body() { return m_BodyBuffer; } + + inline HttpContentType ContentType() + { + if (m_ContentTypeHeaderIndex < 0) + { + return HttpContentType::kUnknownContentType; + } + + return ParseContentType(m_Headers[m_ContentTypeHeaderIndex].Value); + } + + inline HttpContentType AcceptType() + { + if (m_AcceptHeaderIndex < 0) + { + return HttpContentType::kUnknownContentType; + } + + return ParseContentType(m_Headers[m_AcceptHeaderIndex].Value); + } + + Oid SessionId() const { return m_SessionId; } + int RequestId() const { return m_RequestId; } + +private: + struct HeaderEntry + { + std::string_view Name; + std::string_view Value; + }; + + HttpServerConnection& m_Connection; + char m_HeaderBuffer[512]; + char* m_HeaderCursor = m_HeaderBuffer; + char* m_Url = nullptr; + size_t m_UrlLength = 0; + char* m_CurrentHeaderName = nullptr; // Used while parsing headers + size_t m_CurrentHeaderNameLength = 0; + char* m_CurrentHeaderValue = nullptr; // Used while parsing headers + size_t m_CurrentHeaderValueLength = 0; + std::vector<HeaderEntry> m_Headers; + int8_t m_ContentLengthHeaderIndex; + int8_t m_AcceptHeaderIndex; + int8_t m_ContentTypeHeaderIndex; + int m_RequestId = -1; + Oid m_SessionId{}; + IoBuffer m_BodyBuffer; + uint64_t m_BodyPosition; + http_parser m_Parser; + HttpVerb m_RequestVerb; + bool m_KeepAlive = false; + + void AppendInputBytes(const char* Data, size_t Bytes); + void AppendCurrentHeader(); + + int OnMessageBegin(); + int OnUrl(const char* Data, size_t Bytes); + int OnHeader(const char* Data, size_t Bytes); + int OnHeaderValue(const char* Data, size_t Bytes); + int OnHeadersComplete(); + int OnBody(const char* Data, size_t Bytes); + int OnMessageComplete(); + + static HttpRequest* GetThis(http_parser* Parser) { return reinterpret_cast<HttpRequest*>(Parser->data); } + static http_parser_settings s_ParserSettings; + + void TerminateConnection(); +}; + +struct HttpResponse +{ +public: + HttpResponse() = default; + explicit HttpResponse(HttpContentType ContentType) : m_ContentType(ContentType) {} + + void InitializeForPayload(uint16_t ResponseCode, std::span<IoBuffer> BlobList) + { + m_ResponseCode = ResponseCode; + const uint32_t ChunkCount = gsl::narrow<uint32_t>(BlobList.size()); + + m_DataBuffers.reserve(ChunkCount); + + for (IoBuffer& Buffer : BlobList) + { +#if 1 + m_DataBuffers.emplace_back(std::move(Buffer)).MakeOwned(); +#else + IoBuffer TempBuffer = std::move(Buffer); + TempBuffer.MakeOwned(); + m_DataBuffers.emplace_back(IoBufferBuilder::ReadFromFileMaybe(TempBuffer)); +#endif + } + + uint64_t LocalDataSize = 0; + + m_AsioBuffers.push_back({}); // Placeholder for header + + for (IoBuffer& Buffer : m_DataBuffers) + { + uint64_t BufferDataSize = Buffer.Size(); + + ZEN_ASSERT(BufferDataSize); + + LocalDataSize += BufferDataSize; + + IoBufferFileReference FileRef; + if (Buffer.GetFileReference(/* out */ FileRef)) + { + // TODO: Use direct file transfer, via TransmitFile/sendfile + // + // this looks like it requires some custom asio plumbing however + + m_AsioBuffers.push_back({Buffer.Data(), Buffer.Size()}); + } + else + { + // Send from memory + + m_AsioBuffers.push_back({Buffer.Data(), Buffer.Size()}); + } + } + m_ContentLength = LocalDataSize; + + auto Headers = GetHeaders(); + m_AsioBuffers[0] = asio::const_buffer(Headers.data(), Headers.size()); + } + + uint16_t ResponseCode() const { return m_ResponseCode; } + uint64_t ContentLength() const { return m_ContentLength; } + + const std::vector<asio::const_buffer>& AsioBuffers() const { return m_AsioBuffers; } + + std::string_view GetHeaders() + { + m_Headers << "HTTP/1.1 " << ResponseCode() << " " << ReasonStringForHttpResultCode(ResponseCode()) << "\r\n" + << "Content-Type: " << MapContentTypeToString(m_ContentType) << "\r\n" + << "Content-Length: " << ContentLength() << "\r\n"; + + if (!m_IsKeepAlive) + { + m_Headers << "Connection: close\r\n"; + } + + m_Headers << "Date: Mon, 11 Oct 2021 15:06:32 GMT\r\n\r\n"; // TODO: produce more believable data + + return m_Headers; + } + + void SuppressPayload() { m_AsioBuffers.resize(1); } + +private: + uint16_t m_ResponseCode = 0; + bool m_IsKeepAlive = true; + HttpContentType m_ContentType = HttpContentType::kBinary; + uint64_t m_ContentLength = 0; + std::vector<IoBuffer> m_DataBuffers; + std::vector<asio::const_buffer> m_AsioBuffers; + ExtendableStringBuilder<160> m_Headers; +}; + +////////////////////////////////////////////////////////////////////////// + +struct HttpServerConnection +{ + HttpServerConnection(HttpAsioServerImpl& Server, std::unique_ptr<asio::ip::tcp::socket>&& Socket); + ~HttpServerConnection(); + + void HandleNewRequest(); + void TerminateConnection(); + void HandleRequest(); + +private: + enum class RequestState + { + kInitialState, + kInitialRead, + kReadingMore, + kWriting, + kWritingFinal, + kDone, + kTerminated + }; + + RequestState m_RequestState = RequestState::kInitialState; + HttpRequest m_RequestData{*this}; + + void EnqueueRead(); + void OnDataReceived(const asio::error_code& Ec, std::size_t ByteCount); + void OnResponseDataSent(const asio::error_code& Ec, std::size_t ByteCount, bool Pop = false); + void OnError(); + + HttpAsioServerImpl& m_Server; + asio::streambuf m_RequestBuffer; + std::unique_ptr<asio::ip::tcp::socket> m_Socket; + std::atomic<uint32_t> m_RequestCounter{0}; + uint32_t m_ConnectionId = 0; + + RwLock m_ResponsesLock; + std::deque<std::unique_ptr<HttpResponse>> m_Responses; +}; + +std::atomic<uint32_t> g_ConnectionIdCounter{0}; + +HttpServerConnection::HttpServerConnection(HttpAsioServerImpl& Server, std::unique_ptr<asio::ip::tcp::socket>&& Socket) +: m_Server(Server) +, m_Socket(std::move(Socket)) +, m_ConnectionId(g_ConnectionIdCounter.fetch_add(1)) +{ + ZEN_TRACE("new connection #{}", m_ConnectionId); +} + +HttpServerConnection::~HttpServerConnection() +{ +} + +void +HttpServerConnection::HandleNewRequest() +{ + m_RequestData.Initialize(); + + EnqueueRead(); +} + +void +HttpServerConnection::TerminateConnection() +{ + m_RequestState = RequestState::kTerminated; + + std::error_code Ec; + m_Socket->close(Ec); +} + +void +HttpServerConnection::EnqueueRead() +{ + if (m_RequestState == RequestState::kInitialRead) + { + m_RequestState = RequestState::kReadingMore; + } + else + { + m_RequestState = RequestState::kInitialRead; + } + + m_RequestBuffer.prepare(64 * 1024); + + asio::async_read(*m_Socket.get(), + m_RequestBuffer, + asio::transfer_at_least(16), + [this](const asio::error_code& Ec, std::size_t ByteCount) { + if (Ec) + { + if (m_RequestState == RequestState::kDone || m_RequestState == RequestState::kInitialRead) + { + // Expected, just silently handle the condition + // + // if we get an I/O error on the boundary between two messages + // it should be fine to just not say anything + + ZEN_TRACE("(expected) socket read error: conn#{} '{}'", m_ConnectionId, Ec.message()); + } + else + { + ZEN_WARN("unexpected socket read error: conn#{} {}", m_ConnectionId, Ec.message()); + } + + delete this; + } + else + { + ZEN_TRACE("read: conn#:{} seq#:{} t:{} bytes:{}", + m_ConnectionId, + m_RequestCounter.load(std::memory_order_relaxed), + GetCurrentThreadId(), + ByteCount); + + OnDataReceived(Ec, ByteCount); + } + }); +} + +void +HttpServerConnection::OnDataReceived(const asio::error_code& Ec, std::size_t ByteCount) +{ + ZEN_UNUSED(ByteCount); + + if (Ec) + { + ZEN_ERROR("OnDataReceived Error: {}", Ec.message()); + + return OnError(); + } + + while (m_RequestBuffer.size()) + { + const asio::const_buffer& InputBuffer = m_RequestBuffer.data(); + + size_t Result = m_RequestData.ConsumeData((const char*)InputBuffer.data(), InputBuffer.size()); + + m_RequestBuffer.consume(Result); + } + + switch (m_RequestState) + { + case RequestState::kDone: + case RequestState::kWritingFinal: + case RequestState::kTerminated: + break; + + default: + EnqueueRead(); + break; + } +} + +void +HttpServerConnection::OnResponseDataSent(const asio::error_code& Ec, std::size_t ByteCount, bool Pop) +{ + ZEN_UNUSED(ByteCount); + if (Ec) + { + ZEN_ERROR("OnResponseDataSent Error: {}", Ec.message()); + return OnError(); + } + else + { + if (!m_RequestData.IsKeepAlive()) + { + m_RequestState = RequestState::kDone; + + m_Socket->close(); + } + else if (Pop) + { + RwLock::ExclusiveLockScope _(m_ResponsesLock); + m_Responses.pop_front(); + } + } +} + +void +HttpServerConnection::OnError() +{ + m_Socket->close(); +} + +void +HttpServerConnection::HandleRequest() +{ + if (!m_RequestData.IsKeepAlive()) + { + m_RequestState = RequestState::kWritingFinal; + + std::error_code Ec; + m_Socket->shutdown(asio::socket_base::shutdown_receive, Ec); + + if (Ec) + { + ZEN_WARN("socket shutdown reported error: {}", Ec.message()); + } + } + else + { + m_RequestState = RequestState::kWriting; + } + + const uint32_t RequestNum = m_RequestCounter.load(std::memory_order_relaxed); + m_RequestCounter.fetch_add(1); + + if (HttpService* Service = m_Server.RouteRequest(m_RequestData.Url())) + { + HttpAsioServerRequest Request(m_RequestData, *Service, m_RequestData.Body()); + + ZEN_TRACE("Handling request: Conn#{} Req#{}", m_ConnectionId, RequestNum); + + Service->HandleRequest(Request); + + if (std::unique_ptr<HttpResponse> Response = std::move(Request.m_Response)) + { + // Transmit the response + + if (m_RequestData.RequestVerb() == HttpVerb::kHead) + { + Response->SuppressPayload(); + } + + auto ResponseBuffers = Response->AsioBuffers(); + + uint64_t ResponseLength = 0; + + for (auto& Buffer : ResponseBuffers) + { + ResponseLength += Buffer.size(); + } + + { + RwLock::ExclusiveLockScope _(m_ResponsesLock); + m_Responses.push_back(std::move(Response)); + } + + // TODO: should cork/uncork for Linux? + + asio::async_write(*m_Socket.get(), + ResponseBuffers, + asio::transfer_exactly(ResponseLength), + [this, RequestNum](const asio::error_code& Ec, std::size_t ByteCount) { + ZEN_TRACE("Response sent: Conn#{} Req#{} ({})", m_ConnectionId, RequestNum, NiceBytes(ByteCount)); + OnResponseDataSent(Ec, ByteCount, true); + }); + + return; + } + } + + if (m_RequestData.RequestVerb() == HttpVerb::kHead) + { + std::string_view Response = + "HTTP/1.1 404 NOT FOUND\r\n" + "Date: Mon, 11 Oct 2021 15:06:32 GMT\r\n\r\n"sv; + + if (!m_RequestData.IsKeepAlive()) + { + Response = + "HTTP/1.1 404 NOT FOUND\r\n" + "Date: Mon, 11 Oct 2021 15:06:32 GMT\r\nConnection: close\r\n\r\n"sv; + } + + asio::async_write(*m_Socket.get(), asio::buffer(Response), [this](const asio::error_code& Ec, std::size_t ByteCount) { + OnResponseDataSent(Ec, ByteCount); + }); + } + else + { + std::string_view Response = + "HTTP/1.1 404 NOT FOUND\r\n" + "Content-Length: 23\r\nContent-Type: text/plain\r\nDate: Mon, 11 Oct 2021 15:06:32 GMT\r\n\r\nNo suitable route found"sv; + + if (!m_RequestData.IsKeepAlive()) + { + Response = + "HTTP/1.1 404 NOT FOUND\r\n" + "Content-Length: 23\r\nContent-Type: text/plain\r\nDate: Mon, 11 Oct 2021 15:06:32 GMT\r\nConnection: close\r\n\r\nNo suitable route found"sv; + } + + asio::async_write(*m_Socket.get(), asio::buffer(Response), [this](const asio::error_code& Ec, std::size_t ByteCount) { + OnResponseDataSent(Ec, ByteCount); + }); + } +} + +////////////////////////////////////////////////////////////////////////// +// +// HttpRequest +// + +http_parser_settings HttpRequest::s_ParserSettings{ + .on_message_begin = [](http_parser* p) { return GetThis(p)->OnMessageBegin(); }, + .on_url = [](http_parser* p, const char* Data, size_t ByteCount) { return GetThis(p)->OnUrl(Data, ByteCount); }, + .on_status = + [](http_parser* p, const char* Data, size_t ByteCount) { + ZEN_UNUSED(p, Data, ByteCount); + return 0; + }, + .on_header_field = [](http_parser* p, const char* Data, size_t ByteCount) { return GetThis(p)->OnHeader(Data, ByteCount); }, + .on_header_value = [](http_parser* p, const char* Data, size_t ByteCount) { return GetThis(p)->OnHeaderValue(Data, ByteCount); }, + .on_headers_complete = [](http_parser* p) { return GetThis(p)->OnHeadersComplete(); }, + .on_body = [](http_parser* p, const char* Data, size_t ByteCount) { return GetThis(p)->OnBody(Data, ByteCount); }, + .on_message_complete = [](http_parser* p) { return GetThis(p)->OnMessageComplete(); }, + .on_chunk_header{}, + .on_chunk_complete{}}; + +void +HttpRequest::Initialize() +{ + http_parser_init(&m_Parser, HTTP_REQUEST); + m_Parser.data = this; + + ResetState(); +} + +size_t +HttpRequest::ConsumeData(const char* InputData, size_t DataSize) +{ + const size_t ConsumedBytes = http_parser_execute(&m_Parser, &s_ParserSettings, InputData, DataSize); + + if (m_Parser.http_errno) + { + ZEN_WARN("HTTP parser error {}", (uint32_t)m_Parser.http_errno); + + // TODO: we need to kill the connection since we're most likely + // out of sync and can't make progress + } + + return ConsumedBytes; +} + +void +HttpRequest::AppendInputBytes(const char* Data, size_t Bytes) +{ + const size_t RemainingBufferSpace = sizeof m_HeaderBuffer + m_HeaderBuffer - m_HeaderCursor; + + if (RemainingBufferSpace >= Bytes) + { + memcpy(m_HeaderCursor, Data, Bytes); + m_HeaderCursor += Bytes; + + return; + } + + // Terribad, but better than buffer overflow + TerminateConnection(); +} + +int +HttpRequest::OnUrl(const char* Data, size_t Bytes) +{ + if (!m_Url) + { + ZEN_ASSERT_SLOW(m_UrlLength == 0); + m_Url = m_HeaderCursor; + } + + AppendInputBytes(Data, Bytes); + m_UrlLength += Bytes; + + return 0; +} + +int +HttpRequest::OnHeader(const char* Data, size_t Bytes) +{ + if (m_CurrentHeaderValueLength) + { + AppendCurrentHeader(); + + m_CurrentHeaderNameLength = 0; + m_CurrentHeaderValueLength = 0; + m_CurrentHeaderName = m_HeaderCursor; + } + else if (m_CurrentHeaderName == nullptr) + { + m_CurrentHeaderName = m_HeaderCursor; + } + + memcpy(m_HeaderCursor, Data, Bytes); + m_HeaderCursor += Bytes; + m_CurrentHeaderNameLength += Bytes; + + return 0; +} + +void +HttpRequest::AppendCurrentHeader() +{ + std::string_view HeaderName(m_CurrentHeaderName, m_CurrentHeaderNameLength); + std::string_view HeaderValue(m_CurrentHeaderValue, m_CurrentHeaderValueLength); + + const uint32_t HeaderHash = HashStringAsLowerDjb2(HeaderName); + + if (HeaderHash == HashContentLength) + { + m_ContentLengthHeaderIndex = (int8_t)m_Headers.size(); + } + else if (HeaderHash == HashAccept) + { + m_AcceptHeaderIndex = (int8_t)m_Headers.size(); + } + else if (HeaderHash == HashContentType) + { + m_ContentTypeHeaderIndex = (int8_t)m_Headers.size(); + } + else if (HeaderHash == HashSession) + { + m_SessionId = Oid::FromHexString(HeaderValue); + } + else if (HeaderHash == HashRequest) + { + std::from_chars(HeaderValue.data(), HeaderValue.data() + HeaderValue.size(), m_RequestId); + } + + m_Headers.emplace_back(HeaderName, HeaderValue); +} + +int +HttpRequest::OnHeaderValue(const char* Data, size_t Bytes) +{ + if (m_CurrentHeaderValueLength == 0) + { + m_CurrentHeaderValue = m_HeaderCursor; + } + + memcpy(m_HeaderCursor, Data, Bytes); + m_HeaderCursor += Bytes; + m_CurrentHeaderValueLength += Bytes; + + return 0; +} + +void +HttpRequest::TerminateConnection() +{ + m_Connection.TerminateConnection(); +} + +int +HttpRequest::OnHeadersComplete() +{ + if (m_CurrentHeaderValueLength) + { + AppendCurrentHeader(); + } + + if (m_ContentLengthHeaderIndex >= 0) + { + std::string_view& Value = m_Headers[m_ContentLengthHeaderIndex].Value; + uint64_t ContentLength = 0; + std::from_chars(Value.data(), Value.data() + Value.size(), ContentLength); + + if (ContentLength) + { + m_BodyBuffer = IoBuffer(ContentLength); + } + else + { + m_BodyBuffer = {}; + } + + m_BodyBuffer.SetContentType(ContentType()); + + m_BodyPosition = 0; + } + + m_KeepAlive = !!http_should_keep_alive(&m_Parser); + + return 0; +} + +int +HttpRequest::OnBody(const char* Data, size_t Bytes) +{ + memcpy(reinterpret_cast<uint8_t*>(m_BodyBuffer.MutableData()) + m_BodyPosition, Data, Bytes); + m_BodyPosition += Bytes; + + if (http_body_is_final(&m_Parser)) + { + if (m_BodyPosition != m_BodyBuffer.Size()) + { + ZEN_WARN("Body mismatch! {} != {}", m_BodyPosition, m_BodyBuffer.Size()); + } + } + + return 0; +} + +void +HttpRequest::ResetState() +{ + m_HeaderCursor = m_HeaderBuffer; + m_CurrentHeaderName = nullptr; + m_CurrentHeaderNameLength = 0; + m_CurrentHeaderValue = nullptr; + m_CurrentHeaderValueLength = 0; + m_CurrentHeaderName = nullptr; + m_Url = nullptr; + m_UrlLength = 0; + m_ContentLengthHeaderIndex = -1; + m_AcceptHeaderIndex = -1; + m_ContentTypeHeaderIndex = -1; + + m_BodyBuffer = {}; + m_BodyPosition = 0; + + m_Headers.clear(); +} + +int +HttpRequest::OnMessageBegin() +{ + return 0; +} + +int +HttpRequest::OnMessageComplete() +{ + switch (m_Parser.method) + { + case HTTP_GET: + m_RequestVerb = HttpVerb::kGet; + break; + + case HTTP_POST: + m_RequestVerb = HttpVerb::kPost; + break; + + case HTTP_PUT: + m_RequestVerb = HttpVerb::kPut; + break; + + case HTTP_DELETE: + m_RequestVerb = HttpVerb::kDelete; + break; + + case HTTP_HEAD: + m_RequestVerb = HttpVerb::kHead; + break; + + case HTTP_COPY: + m_RequestVerb = HttpVerb::kCopy; + break; + + case HTTP_OPTIONS: + m_RequestVerb = HttpVerb::kOptions; + break; + } + + m_Connection.HandleRequest(); + + ResetState(); + + return 0; +} + +////////////////////////////////////////////////////////////////////////// + +struct HttpAcceptor +{ + HttpAcceptor(HttpAsioServerImpl& Server, asio::io_service& IoService, uint16_t Port) + : m_Server(Server) + , m_IoService(IoService) + , m_Acceptor(m_IoService, asio::ip::tcp::endpoint(asio::ip::address_v4::any(), Port)) + { + } + + void Start() + { + m_Acceptor.listen(); + InitAccept(); + } + + void Stop() { m_IsStopped = true; } + + void InitAccept() + { + auto SocketPtr = std::make_unique<asio::ip::tcp::socket>(m_IoService); + asio::ip::tcp::socket& SocketRef = *SocketPtr.get(); + + m_Acceptor.async_accept(SocketRef, [this, Socket = std::move(SocketPtr)](const asio::error_code& Ec) mutable { + if (Ec) + { + // TODO: Error condition - please handle and report properly + } + else + { + // New connection established, pass socket ownership into connection object + // and initiate request handling loop + + Socket->set_option(asio::ip::tcp::no_delay(true)); + + HttpServerConnection* Conn = new HttpServerConnection(m_Server, std::move(Socket)); + Conn->HandleNewRequest(); + + // note: the connection object is responsible for deleting itself + } + + if (!m_IsStopped.load()) + { + InitAccept(); + } + else + { + m_Acceptor.close(); + } + }); + } + +private: + HttpAsioServerImpl& m_Server; + asio::io_service& m_IoService; + asio::ip::tcp::acceptor m_Acceptor; + std::atomic<bool> m_IsStopped{false}; +}; + +////////////////////////////////////////////////////////////////////////// + +HttpAsioServerRequest::HttpAsioServerRequest(asio_http::HttpRequest& Request, HttpService& Service, IoBuffer PayloadBuffer) +: m_Request(Request) +, m_PayloadBuffer(std::move(PayloadBuffer)) +{ + const int PrefixLength = Service.UriPrefixLength(); + + std::string_view Uri = Request.Url(); + Uri.remove_prefix(PrefixLength); + m_Uri = Uri; + + m_Verb = Request.RequestVerb(); + m_ContentLength = Request.Body().Size(); + m_ContentType = Request.ContentType(); + + // It an explicit content type extension was specified then we'll use that over any + // Accept: header value that may be present + // + // TODO! + + m_AcceptType = Request.AcceptType(); +} + +HttpAsioServerRequest::~HttpAsioServerRequest() +{ +} + +Oid +HttpAsioServerRequest::ParseSessionId() const +{ + return m_Request.SessionId(); +} + +uint32_t +HttpAsioServerRequest::ParseRequestId() const +{ + return m_Request.RequestId(); +} + +IoBuffer +HttpAsioServerRequest::ReadPayload() +{ + return m_PayloadBuffer; +} + +void +HttpAsioServerRequest::WriteResponse(HttpResponseCode ResponseCode) +{ + ZEN_ASSERT(!m_Response); + + m_Response.reset(new HttpResponse(HttpContentType::kBinary)); + std::array<IoBuffer, 0> Empty; + + m_Response->InitializeForPayload((UINT16)ResponseCode, Empty); +} + +void +HttpAsioServerRequest::WriteResponse(HttpResponseCode ResponseCode, HttpContentType ContentType, std::span<IoBuffer> Blobs) +{ + ZEN_ASSERT(!m_Response); + + m_Response.reset(new HttpResponse(ContentType)); + m_Response->InitializeForPayload((UINT16)ResponseCode, Blobs); +} + +void +HttpAsioServerRequest::WriteResponse(HttpResponseCode ResponseCode, HttpContentType ContentType, std::u8string_view ResponseString) +{ + ZEN_ASSERT(!m_Response); + m_Response.reset(new HttpResponse(ContentType)); + + IoBuffer MessageBuffer(IoBuffer::Wrap, ResponseString.data(), ResponseString.size()); + std::array<IoBuffer, 1> SingleBufferList({MessageBuffer}); + + m_Response->InitializeForPayload((uint16_t)ResponseCode, SingleBufferList); +} + +void +HttpAsioServerRequest::WriteResponseAsync(std::function<void(HttpServerRequest&)>&& ContinuationHandler) +{ + ZEN_ASSERT(!m_Response); + + // Not one bit async, innit + ContinuationHandler(*this); +} + +////////////////////////////////////////////////////////////////////////// + +HttpAsioServerImpl::HttpAsioServerImpl() +{ +} + +HttpAsioServerImpl::~HttpAsioServerImpl() +{ +} + +void +HttpAsioServerImpl::Start(uint16_t Port, int ThreadCount) +{ + ZEN_ASSERT(ThreadCount > 0); + + m_Acceptor.reset(new asio_http::HttpAcceptor(*this, m_IoService, Port)); + m_Acceptor->Start(); + + for (int i = 0; i < ThreadCount; ++i) + { + m_ThreadPool.emplace_back([this, Index = i + 1] { + SetCurrentThreadName("asio worker {}"_format(Index)); + + try + { + m_IoService.run(); + } + catch (std::exception& e) + { + ZEN_ERROR("Exception caught in asio event loop: '{}'", e.what()); + } + }); + } +} + +void +HttpAsioServerImpl::Stop() +{ + m_Acceptor->Stop(); + m_IoService.stop(); + for (auto& Thread : m_ThreadPool) + { + Thread.join(); + } +} + +void +HttpAsioServerImpl::RegisterService(const char* InUrlPath, HttpService& Service) +{ + std::string_view UrlPath(InUrlPath); + Service.SetUriPrefixLength(UrlPath.size()); + + RwLock::ExclusiveLockScope _(m_Lock); + m_UriHandlers.push_back({std::string(UrlPath), &Service}); +} + +HttpService* +HttpAsioServerImpl::RouteRequest(std::string_view Url) +{ + RwLock::SharedLockScope _(m_Lock); + + for (const ServiceEntry& SvcEntry : m_UriHandlers) + { + const std::string& SvcUrl = SvcEntry.ServiceUrlPath; + if (Url.compare(0, SvcUrl.size(), SvcUrl) == 0) + { + return SvcEntry.Service; + } + } + + return nullptr; +} + +} // namespace zen::asio_http + +////////////////////////////////////////////////////////////////////////// + +namespace zen { +HttpAsioServer::HttpAsioServer() : m_Impl(std::make_unique<asio_http::HttpAsioServerImpl>()) +{ +} + +HttpAsioServer::~HttpAsioServer() +{ + m_Impl->Stop(); +} + +void +HttpAsioServer::RegisterService(HttpService& Service) +{ + m_Impl->RegisterService(Service.BaseUri(), Service); +} + +void +HttpAsioServer::Initialize(int BasePort) +{ + m_BasePort = BasePort; + + m_Impl->Start(gsl::narrow<uint16_t>(m_BasePort), Max(std::thread::hardware_concurrency(), 8u)); +} + +void +HttpAsioServer::Run(bool IsInteractive) +{ + const bool TestMode = !IsInteractive; + + if (TestMode == false) + { + zen::logging::ConsoleLog().info("Zen Server running (asio HTTP). Press ESC or Q to quit"); + } + + do + { + int WaitTimeout = -1; + + if (!TestMode) + { + WaitTimeout = 1000; + } + + if (!TestMode && _kbhit() != 0) + { + char c = (char)_getch(); + + if (c == 27 || c == 'Q' || c == 'q') + { + RequestApplicationExit(0); + } + } + + m_ShutdownEvent.Wait(WaitTimeout); + } while (!IsApplicationExitRequested()); +} + +void +HttpAsioServer::RequestExit() +{ + m_ShutdownEvent.Set(); +} + +} // namespace zen diff --git a/zenhttp/httpuws.h b/zenhttp/httpasio.h index 5e300202f..08834ba21 100644 --- a/zenhttp/httpuws.h +++ b/zenhttp/httpasio.h @@ -2,17 +2,24 @@ #pragma once +#include <zencore/thread.h> #include <zenhttp/httpserver.h> -#include <zencore/thread.h> +#include <memory> namespace zen { -class HttpUwsServer : public HttpServer +namespace asio_http { + struct HttpServerConnection; + struct HttpAcceptor; + struct HttpAsioServerImpl; +} // namespace asio_http + +class HttpAsioServer : public HttpServer { public: - HttpUwsServer(); - ~HttpUwsServer(); + HttpAsioServer(); + ~HttpAsioServer(); virtual void RegisterService(HttpService& Service) override; virtual void Initialize(int BasePort) override; @@ -22,6 +29,8 @@ public: private: Event m_ShutdownEvent; int m_BasePort = 0; + + std::unique_ptr<asio_http::HttpAsioServerImpl> m_Impl; }; -} // namespace zen
\ No newline at end of file +} // namespace zen diff --git a/zenhttp/httpserver.cpp b/zenhttp/httpserver.cpp index 69974ca06..150054c30 100644 --- a/zenhttp/httpserver.cpp +++ b/zenhttp/httpserver.cpp @@ -2,9 +2,9 @@ #include <zenhttp/httpserver.h> +#include "httpasio.h" #include "httpnull.h" #include "httpsys.h" -#include "httpuws.h" #include <zencore/compactbinary.h> #include <zencore/compactbinarypackage.h> @@ -565,16 +565,56 @@ HttpRequestRouter::HandleRequest(zen::HttpServerRequest& Request) ////////////////////////////////////////////////////////////////////////// +enum class HttpServerClass +{ + kHttpAsio, + kHttpSys, + kHttpNull +}; + Ref<HttpServer> -CreateHttpServer() +CreateHttpServer(std::string_view ServerClass) { -#if 0 - return new HttpUwsServer; + using namespace std::literals; + +#if 1 + HttpServerClass Class = HttpServerClass::kHttpAsio; #elif ZEN_WITH_HTTPSYS - return new HttpSysServer(std::thread::hardware_concurrency(), /* background worker threads */ 16); + HttpServerClass Class = HttpServerClass::kHttpSys; #else - return new HttpNullServer; + HttpServerClass Class = HttpServerClass::kHttpNull; +#endif + + if (ServerClass == "asio"sv) + { + Class = HttpServerClass::kHttpAsio; + } + else if (ServerClass == "httpsys"sv) + { + Class = HttpServerClass::kHttpSys; + } + else if (ServerClass == "null"sv) + { + Class = HttpServerClass::kHttpNull; + } + + switch (Class) + { + default: + case HttpServerClass::kHttpAsio: + ZEN_INFO("using asio HTTP server implementation"); + return new HttpAsioServer(); + +#if ZEN_WITH_HTTPSYS + case HttpServerClass::kHttpSys: + ZEN_INFO("using http.sys server implementation"); + return new HttpSysServer(std::thread::hardware_concurrency(), /* background worker threads */ 16); #endif + + case HttpServerClass::kHttpNull: + ZEN_INFO("using null HTTP server implementation"); + return new HttpNullServer; + } } ////////////////////////////////////////////////////////////////////////// diff --git a/zenhttp/httpsys.cpp b/zenhttp/httpsys.cpp index f550f1f4c..0a9341d0c 100644 --- a/zenhttp/httpsys.cpp +++ b/zenhttp/httpsys.cpp @@ -194,9 +194,11 @@ public: HttpSysServerRequest(const HttpSysServerRequest&) = delete; HttpSysServerRequest& operator=(const HttpSysServerRequest&) = delete; - HttpSysTransaction& m_HttpTx; - HttpSysRequestHandler* m_NextCompletionHandler = nullptr; - IoBuffer m_PayloadBuffer; + HttpSysTransaction& m_HttpTx; + HttpSysRequestHandler* m_NextCompletionHandler = nullptr; + IoBuffer m_PayloadBuffer; + ExtendableStringBuilder<128> m_UriUtf8; + ExtendableStringBuilder<128> m_QueryStringUtf8; }; /** HTTP transaction @@ -1252,6 +1254,8 @@ HttpSysServerRequest::HttpSysServerRequest(HttpSysTransaction& Tx, HttpService& m_UriUtf8.Reset(); } + m_Uri = std::string_view(m_UriUtf8); + if (uint16_t QueryStringLength = HttpRequestPtr->CookedUrl.QueryStringLength) { --QueryStringLength; // We skip the leading question mark @@ -1263,6 +1267,8 @@ HttpSysServerRequest::HttpSysServerRequest(HttpSysTransaction& Tx, HttpService& m_QueryStringUtf8.Reset(); } + m_QueryString = std::string_view(m_QueryStringUtf8); + m_Verb = TranslateHttpVerb(HttpRequestPtr->Verb); m_ContentLength = GetContentLength(HttpRequestPtr); m_ContentType = GetContentType(HttpRequestPtr); diff --git a/zenhttp/httpuws.cpp b/zenhttp/httpuws.cpp deleted file mode 100644 index 2a6950532..000000000 --- a/zenhttp/httpuws.cpp +++ /dev/null @@ -1,94 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#include "httpuws.h" - -#pragma warning(push) -#pragma warning(disable : 4244 4324 4267 4458 4706) -#include <uwebsockets/App.h> -#pragma warning(pop) - -#include <conio.h> -#include <zencore/logging.h> - -#if ZEN_PLATFORM_WINDOWS -# pragma comment(lib, "Iphlpapi.lib") -# pragma comment(lib, "userenv.lib") -#endif - -namespace zen { - -HttpUwsServer::HttpUwsServer() -{ -} - -HttpUwsServer::~HttpUwsServer() -{ -} - -void -HttpUwsServer::RegisterService(HttpService& Service) -{ - ZEN_UNUSED(Service); -} - -void -HttpUwsServer::Initialize(int BasePort) -{ - m_BasePort = BasePort; -} - -void -HttpUwsServer::Run(bool IsInteractive) -{ - const bool TestMode = !IsInteractive; - - if (TestMode == false) - { - zen::logging::ConsoleLog().info("Zen Server running (uWS HTTP). Press ESC or Q to quit"); - } - - ::uWS::App() - .any("/*", - [](uWS::HttpResponse<false>* res, uWS::HttpRequest* req) { - res->onData([=](std::string_view Data, bool fin) { - ZEN_UNUSED(Data); - if (fin) - res->end("Hello world!"); - }); - - res->onAborted([&] {}); - ZEN_UNUSED(req); - }) - .listen(m_BasePort, [](auto* listen_socket) { ZEN_UNUSED(listen_socket); }) - .run(); - - do - { - int WaitTimeout = -1; - - if (!TestMode) - { - WaitTimeout = 1000; - } - - if (!TestMode && _kbhit() != 0) - { - char c = (char)_getch(); - - if (c == 27 || c == 'Q' || c == 'q') - { - RequestApplicationExit(0); - } - } - - m_ShutdownEvent.Wait(WaitTimeout); - } while (!IsApplicationExitRequested()); -} - -void -HttpUwsServer::RequestExit() -{ - m_ShutdownEvent.Set(); -} - -} // namespace zen diff --git a/zenhttp/include/zenhttp/httpserver.h b/zenhttp/include/zenhttp/httpserver.h index 3e6608f11..55bd08a8e 100644 --- a/zenhttp/include/zenhttp/httpserver.h +++ b/zenhttp/include/zenhttp/httpserver.h @@ -32,8 +32,8 @@ public: // Synchronous operations - [[nodiscard]] inline std::string_view RelativeUri() const { return m_UriUtf8; } // Returns URI without service prefix - [[nodiscard]] inline std::string_view QueryString() const { return m_QueryStringUtf8; } + [[nodiscard]] inline std::string_view RelativeUri() const { return m_Uri; } // Returns URI without service prefix + [[nodiscard]] inline std::string_view QueryString() const { return m_QueryString; } struct QueryParams { @@ -108,15 +108,15 @@ protected: kHaveSessionId = 1 << 3, }; - mutable uint32_t m_Flags = 0; - HttpVerb m_Verb = HttpVerb::kGet; - HttpContentType m_ContentType = HttpContentType::kBinary; - HttpContentType m_AcceptType = HttpContentType::kUnknownContentType; - uint64_t m_ContentLength = ~0ull; - ExtendableStringBuilder<128> m_UriUtf8; - ExtendableStringBuilder<128> m_QueryStringUtf8; - mutable uint32_t m_RequestId = ~uint32_t(0); - mutable Oid m_SessionId = Oid::Zero; + mutable uint32_t m_Flags = 0; + HttpVerb m_Verb = HttpVerb::kGet; + HttpContentType m_ContentType = HttpContentType::kBinary; + HttpContentType m_AcceptType = HttpContentType::kUnknownContentType; + uint64_t m_ContentLength = ~0ull; + std::string_view m_Uri; + std::string_view m_QueryString; + mutable uint32_t m_RequestId = ~uint32_t(0); + mutable Oid m_SessionId = Oid::Zero; inline void SetIsHandled() { m_Flags |= kIsHandled; } @@ -173,7 +173,7 @@ public: virtual void RequestExit() = 0; }; -Ref<HttpServer> CreateHttpServer(); +Ref<HttpServer> CreateHttpServer(std::string_view ServerClass); ////////////////////////////////////////////////////////////////////////// diff --git a/zenhttp/xmake.lua b/zenhttp/xmake.lua index 65d5f08ea..fff4fb526 100644 --- a/zenhttp/xmake.lua +++ b/zenhttp/xmake.lua @@ -3,5 +3,5 @@ target('zenhttp') add_files("**.cpp") add_includedirs("include", {public=true}) add_deps("zencore") - add_packages("vcpkg::gsl-lite", "vcpkg::uwebsockets", "vcpkg::usockets", "vcpkg::libuv") + add_packages("vcpkg::gsl-lite") add_options("httpsys")
\ No newline at end of file diff --git a/zenhttp/zenhttp.vcxproj b/zenhttp/zenhttp.vcxproj index 1fc64bfc2..c86a41283 100644 --- a/zenhttp/zenhttp.vcxproj +++ b/zenhttp/zenhttp.vcxproj @@ -93,20 +93,20 @@ </Link> </ItemDefinitionGroup> <ItemGroup> + <ClCompile Include="httpasio.cpp" /> <ClCompile Include="httpclient.cpp" /> <ClCompile Include="httpnull.cpp" /> <ClCompile Include="httpserver.cpp" /> <ClCompile Include="httpshared.cpp" /> <ClCompile Include="httpsys.cpp" /> - <ClCompile Include="httpuws.cpp" /> <ClCompile Include="iothreadpool.cpp" /> <ClCompile Include="workthreadpool.cpp" /> <ClCompile Include="zenhttp.cpp" /> </ItemGroup> <ItemGroup> + <ClInclude Include="httpasio.h" /> <ClInclude Include="httpnull.h" /> <ClInclude Include="httpsys.h" /> - <ClInclude Include="httpuws.h" /> <ClInclude Include="include\zenhttp\httpclient.h" /> <ClInclude Include="include\zenhttp\httpcommon.h" /> <ClInclude Include="include\zenhttp\httpserver.h" /> diff --git a/zenhttp/zenhttp.vcxproj.filters b/zenhttp/zenhttp.vcxproj.filters index e57e7a712..ffc98b1c6 100644 --- a/zenhttp/zenhttp.vcxproj.filters +++ b/zenhttp/zenhttp.vcxproj.filters @@ -6,10 +6,10 @@ <ClCompile Include="httpsys.cpp" /> <ClCompile Include="iothreadpool.cpp" /> <ClCompile Include="httpnull.cpp" /> - <ClCompile Include="httpuws.cpp" /> <ClCompile Include="httpshared.cpp" /> <ClCompile Include="zenhttp.cpp" /> <ClCompile Include="workthreadpool.cpp" /> + <ClCompile Include="httpasio.cpp" /> </ItemGroup> <ItemGroup> <ClInclude Include="include\zenhttp\httpclient.h" /> @@ -18,10 +18,10 @@ <ClInclude Include="iothreadpool.h" /> <ClInclude Include="include\zenhttp\zenhttp.h" /> <ClInclude Include="httpnull.h" /> - <ClInclude Include="httpuws.h" /> <ClInclude Include="include\zenhttp\httpcommon.h" /> <ClInclude Include="include\zenhttp\httpshared.h" /> <ClInclude Include="workthreadpool.h" /> + <ClInclude Include="httpasio.h" /> </ItemGroup> <ItemGroup> <None Include="xmake.lua" /> diff --git a/zenserver/config.cpp b/zenserver/config.cpp index df3259542..8d7254ae1 100644 --- a/zenserver/config.cpp +++ b/zenserver/config.cpp @@ -81,6 +81,12 @@ ParseUpstreamCachePolicy(std::string_view Options) void ParseGlobalCliOptions(int argc, char* argv[], ZenServerOptions& GlobalOptions, ZenServiceConfig& ServiceConfig) { +#if ZEN_WITH_HTTPSYS + const char* DefaultHttp = "httpsys"; +#else + const char* DefaultHttp = "asio"; +#endif + cxxopts::Options options("zenserver", "Zen Server"); options.add_options()("dedicated", "Enable dedicated server mode", @@ -118,6 +124,13 @@ ParseGlobalCliOptions(int argc, char* argv[], ZenServerOptions& GlobalOptions, Z #endif options.add_option("network", + "", + "http", + "Select HTTP server implementation (asio|httpsys|null)", + cxxopts::value<std::string>(GlobalOptions.HttpServerClass)->default_value(DefaultHttp), + "<http class>"); + + options.add_option("network", "p", "port", "Select HTTP port", diff --git a/zenserver/config.h b/zenserver/config.h index 405e22739..eaafc31bb 100644 --- a/zenserver/config.h +++ b/zenserver/config.h @@ -18,7 +18,8 @@ struct ZenServerOptions std::string LogId; // Id for tagging log output std::filesystem::path DataDir; // Root directory for state (used for testing) std::filesystem::path ContentDir; // Root directory for serving frontend content (experimental) - std::filesystem::path AbsLogFile; + std::string HttpServerClass; // Choice of HTTP server implementation + std::filesystem::path AbsLogFile; // Absolute path to main log file }; struct ZenUpstreamJupiterConfig diff --git a/zenserver/xmake.lua b/zenserver/xmake.lua index fb1ba651d..bba9b6ba5 100644 --- a/zenserver/xmake.lua +++ b/zenserver/xmake.lua @@ -26,7 +26,7 @@ target("zenserver") "vcpkg::lua", "vcpkg::asio", "vcpkg::json11", - "vcpkg::uwebsockets", "vcpkg::usockets", "vcpkg::libuv" + "vcpkg::http-parser" ) add_packages( diff --git a/zenserver/zenserver.cpp b/zenserver/zenserver.cpp index 18c59636d..1fddec437 100644 --- a/zenserver/zenserver.cpp +++ b/zenserver/zenserver.cpp @@ -108,7 +108,7 @@ using namespace std::literals; class ZenServer : public IHttpStatusProvider { public: - void Initialize(ZenServiceConfig& ServiceConfig, int BasePort, int ParentPid, ZenServerState::ZenServerEntry* ServerEntry) + void Initialize(ZenServiceConfig& ServiceConfig, std::string_view HttpServerClass, int BasePort, int ParentPid, ZenServerState::ZenServerEntry* ServerEntry) { using namespace fmt::literals; @@ -146,7 +146,7 @@ public: // Ok so now we're configured, let's kick things off - m_Http = zen::CreateHttpServer(); + m_Http = zen::CreateHttpServer(HttpServerClass); m_Http->Initialize(BasePort); m_Http->RegisterService(m_HealthService); m_Http->RegisterService(m_StatsService); @@ -636,7 +636,7 @@ ZenWindowsService::Run() Server.SetContentRoot(GlobalOptions.ContentDir); Server.SetTestMode(GlobalOptions.IsTest); Server.SetDedicatedMode(GlobalOptions.IsDedicated); - Server.Initialize(ServiceConfig, GlobalOptions.BasePort, GlobalOptions.OwnerPid, Entry); + Server.Initialize(ServiceConfig, GlobalOptions.HttpServerClass, GlobalOptions.BasePort, GlobalOptions.OwnerPid, Entry); // Monitor shutdown signals |