diff options
| author | Per Larsson <[email protected]> | 2022-02-18 14:48:41 +0100 |
|---|---|---|
| committer | Per Larsson <[email protected]> | 2022-02-18 14:48:41 +0100 |
| commit | 08cc02bf1b5cad75ed7b97c0dc7cfc082da537c4 (patch) | |
| tree | 8fd8e7189c9807280003eabb3040cb35db2e5cd4 | |
| parent | Web socket client is shared between I/O thead and client. (diff) | |
| download | zen-08cc02bf1b5cad75ed7b97c0dc7cfc082da537c4.tar.xz zen-08cc02bf1b5cad75ed7b97c0dc7cfc082da537c4.zip | |
Basic websocket service and test.
| -rw-r--r-- | zenhttp/include/zenhttp/websocket.h | 55 | ||||
| -rw-r--r-- | zenhttp/websocketasio.cpp | 212 | ||||
| -rw-r--r-- | zenserver-test/zenserver-test.cpp | 32 | ||||
| -rw-r--r-- | zenserver/testing/httptest.cpp | 27 | ||||
| -rw-r--r-- | zenserver/testing/httptest.h | 4 | ||||
| -rw-r--r-- | zenserver/zenserver.cpp | 5 |
6 files changed, 270 insertions, 65 deletions
diff --git a/zenhttp/include/zenhttp/websocket.h b/zenhttp/include/zenhttp/websocket.h index 908b24636..1ab9b4804 100644 --- a/zenhttp/include/zenhttp/websocket.h +++ b/zenhttp/include/zenhttp/websocket.h @@ -2,9 +2,12 @@ #include <zencore/memory.h> +#include <compare> #include <functional> #include <memory> +#pragma once + namespace asio { class io_context; } @@ -12,6 +15,47 @@ class io_context; namespace zen { class CbPackage; +class CbObject; +class BinaryWriter; + +class WebSocketId +{ + static std::atomic_uint32_t NextId; + +public: + WebSocketId() = default; + + uint32_t Value() const { return m_Value; } + + auto operator<=>(const WebSocketId&) const = default; + + static WebSocketId New() { return WebSocketId(NextId.fetch_add(1)); } + +private: + WebSocketId(uint32_t Value) : m_Value(Value) {} + + uint32_t m_Value{}; +}; + +class WebSocketServer; + +class WebSocketService +{ +public: + virtual ~WebSocketService() = default; + + virtual bool HandleMessage(WebSocketId Id, const CbPackage& Msg) = 0; + void Configure(WebSocketServer& Server); + +protected: + WebSocketService() = default; + + void PublishMessage(WebSocketId Id, CbPackage&& Msg); + void PublishMessage(WebSocketId Id, CbObject&& Msg); + +private: + WebSocketServer* m_Server = nullptr; +}; struct WebSocketServerOptions { @@ -24,8 +68,10 @@ class WebSocketServer public: virtual ~WebSocketServer() = default; - virtual bool Run(const WebSocketServerOptions& Options) = 0; - virtual void Shutdown() = 0; + virtual void RegisterService(WebSocketService& Service) = 0; + virtual bool Run(const WebSocketServerOptions& Options) = 0; + virtual void Shutdown() = 0; + virtual void PublishMessage(WebSocketId Id, CbPackage&& Msg) = 0; static std::unique_ptr<WebSocketServer> Create(); }; @@ -41,7 +87,6 @@ enum class WebSocketState : uint32_t enum class WebSocketEvent : uint32_t { - kNone, kConnected, kDisconnected, kError @@ -68,6 +113,8 @@ public: virtual void Disconnect() = 0; virtual bool IsConnected() const = 0; virtual WebSocketState State() const = 0; + virtual void SendMsg(CbObject&& Msg) = 0; + virtual void SendMsg(CbPackage&& Msg) = 0; virtual void On(WebSocketEvent Evt, EventCallback&& Cb) = 0; virtual void OnMessage(MessageCallback&& Cb) = 0; @@ -86,6 +133,8 @@ struct WebSocketMessageHeader bool IsValid() const; static bool Read(MemoryView Memory, WebSocketMessageHeader& OutHeader); + + static void Write(BinaryWriter& Writer, const CbPackage& Msg); }; } // namespace zen diff --git a/zenhttp/websocketasio.cpp b/zenhttp/websocketasio.cpp index 1952c97a2..f6f58f38c 100644 --- a/zenhttp/websocketasio.cpp +++ b/zenhttp/websocketasio.cpp @@ -13,7 +13,6 @@ #include <zencore/string.h> #include <chrono> -#include <compare> #include <optional> #include <shared_mutex> #include <span> @@ -381,7 +380,6 @@ private: virtual ParseMessageResult OnParseMessage(MemoryView Msg) override; virtual void OnReset() override; - SimpleBinaryWriter m_HeaderStream; WebSocketMessageHeader m_Header; }; @@ -390,20 +388,20 @@ WebSocketMessageParser::OnParseMessage(MemoryView Msg) { const uint64_t PrevOffset = m_Stream.CurrentOffset(); - if (m_HeaderStream.CurrentOffset() < sizeof(WebSocketMessageHeader)) + if (m_Stream.CurrentOffset() < sizeof(WebSocketMessageHeader)) { - const uint64_t RemaingHeaderSize = sizeof(WebSocketMessageHeader) - m_HeaderStream.CurrentOffset(); + const uint64_t RemaingHeaderSize = sizeof(WebSocketMessageHeader) - m_Stream.CurrentOffset(); - m_HeaderStream.Write(Msg.Left(RemaingHeaderSize)); + m_Stream.Write(Msg.Left(RemaingHeaderSize)); Msg.RightChopInline(RemaingHeaderSize); - if (m_HeaderStream.CurrentOffset() < sizeof(WebSocketMessageHeader)) + if (m_Stream.CurrentOffset() < sizeof(WebSocketMessageHeader)) { return {.Status = ParseMessageStatus::kContinue, .ByteCount = m_Stream.CurrentOffset() - PrevOffset}; } - const bool IsValidHeader = WebSocketMessageHeader::Read(m_HeaderStream.GetView(), m_Header); + const bool IsValidHeader = WebSocketMessageHeader::Read(m_Stream.GetView(), m_Header); if (IsValidHeader == false) { @@ -411,16 +409,21 @@ WebSocketMessageParser::OnParseMessage(MemoryView Msg) } } - if (Msg.GetSize() == 0) + ZEN_ASSERT(m_Stream.CurrentOffset() >= sizeof(WebSocketMessageHeader)); + + if (Msg.IsEmpty()) { return {.Status = ParseMessageStatus::kContinue, .ByteCount = m_Stream.CurrentOffset() - PrevOffset}; } - const uint64_t RemaingContentSize = m_Header.ContentLength - m_HeaderStream.CurrentOffset(); + const uint64_t RemaingContentSize = + Min(m_Header.ContentLength - (m_Stream.CurrentOffset() - sizeof(WebSocketMessageHeader)), Msg.GetSize()); m_Stream.Write(Msg.Left(RemaingContentSize)); - const auto Status = m_Stream.CurrentOffset() == m_Header.ContentLength ? ParseMessageStatus::kDone : ParseMessageStatus::kContinue; + const auto Status = (m_Stream.CurrentOffset() - sizeof(WebSocketMessageHeader)) == m_Header.ContentLength + ? ParseMessageStatus::kDone + : ParseMessageStatus::kContinue; return {.Status = Status, .ByteCount = m_Stream.CurrentOffset() - PrevOffset}; } @@ -428,18 +431,17 @@ WebSocketMessageParser::OnParseMessage(MemoryView Msg) void WebSocketMessageParser::OnReset() { - m_HeaderStream.Clear(); m_Header = {}; } bool WebSocketMessageParser::TryLoadMessage(CbPackage& OutMsg) { - const bool IsParsed = m_Header.IsValid() && m_Stream.CurrentOffset() == m_Header.ContentLength; + const bool IsParsed = m_Header.IsValid() && m_Stream.CurrentOffset() == m_Header.ContentLength + sizeof(WebSocketMessageHeader); if (IsParsed) { - BinaryReader Reader(m_Stream.Data(), m_Stream.Size()); + BinaryReader Reader(m_Stream.GetView().RightChop(sizeof(WebSocketMessageHeader))); return OutMsg.TryLoad(Reader); } @@ -448,32 +450,10 @@ WebSocketMessageParser::TryLoadMessage(CbPackage& OutMsg) } /////////////////////////////////////////////////////////////////////////////// -class WsConnectionId -{ - static std::atomic_uint32_t WsConnectionCounter; - -public: - WsConnectionId() = default; - - uint32_t Value() const { return m_Value; } - - auto operator<=>(const WsConnectionId& RHS) const = default; - - static WsConnectionId New() { return WsConnectionId(WsConnectionCounter.fetch_add(1)); } - -private: - WsConnectionId(uint32_t Value) : m_Value(Value) {} - - uint32_t m_Value{}; -}; - -std::atomic_uint32_t WsConnectionId::WsConnectionCounter{1}; - -/////////////////////////////////////////////////////////////////////////////// class WsConnection : public std::enable_shared_from_this<WsConnection> { public: - WsConnection(WsConnectionId Id, std::unique_ptr<asio::ip::tcp::socket> Socket) + WsConnection(WebSocketId Id, std::unique_ptr<asio::ip::tcp::socket> Socket) : m_Id(Id) , m_Socket(std::move(Socket)) , m_StartTime(Clock::now()) @@ -485,7 +465,7 @@ public: std::shared_ptr<WsConnection> AsShared() { return shared_from_this(); } - WsConnectionId Id() const { return m_Id; } + WebSocketId Id() const { return m_Id; } asio::ip::tcp::socket& Socket() { return *m_Socket; } TimePoint StartTime() const { return m_StartTime; } WebSocketState State() const { return static_cast<WebSocketState>(m_State.load(std::memory_order_relaxed)); } @@ -497,7 +477,7 @@ public: void SetParser(std::unique_ptr<MessageParser>&& Parser) { m_MsgParser = std::move(Parser); } private: - WsConnectionId m_Id; + WebSocketId m_Id; std::unique_ptr<asio::ip::tcp::socket> m_Socket; TimePoint m_StartTime; std::atomic_uint32_t m_State; @@ -594,8 +574,10 @@ public: WsServer() = default; virtual ~WsServer() { Shutdown(); } + virtual void RegisterService(WebSocketService& Service) override; virtual bool Run(const WebSocketServerOptions& Options) override; virtual void Shutdown() override; + virtual void PublishMessage(WebSocketId Id, CbPackage&& Msg) override; private: friend class WsConnection; @@ -608,19 +590,28 @@ private: struct IdHasher { - size_t operator()(WsConnectionId Id) const { return size_t(Id.Value()); } + size_t operator()(WebSocketId Id) const { return size_t(Id.Value()); } }; - using ConnectionMap = std::unordered_map<WsConnectionId, std::shared_ptr<WsConnection>, IdHasher>; + using ConnectionMap = std::unordered_map<WebSocketId, std::shared_ptr<WsConnection>, IdHasher>; asio::io_service m_IoSvc; std::unique_ptr<asio::ip::tcp::acceptor> m_Acceptor; std::unique_ptr<WsThreadPool> m_ThreadPool; ConnectionMap m_Connections; std::shared_mutex m_ConnMutex; + std::vector<WebSocketService*> m_Services; std::atomic_bool m_Running{}; }; +void +WsServer::RegisterService(WebSocketService& Service) +{ + m_Services.push_back(&Service); + + Service.Configure(*this); +} + bool WsServer::Run(const WebSocketServerOptions& Options) { @@ -673,6 +664,37 @@ WsServer::Shutdown() } void +WsServer::PublishMessage(WebSocketId Id, CbPackage&& Msg) +{ + std::shared_ptr<WsConnection> Connection; + + { + std::unique_lock _(m_ConnMutex); + + if (auto It = m_Connections.find(Id); It != m_Connections.end()) + { + Connection = It->second; + } + } + + BinaryWriter Writer; + WebSocketMessageHeader::Write(Writer, Msg); + + IoBuffer Buffer = IoBufferBuilder::MakeCloneFromMemory(Writer.Data(), Writer.Size()); + + ZEN_LOG_WARN(LogWebSocket, "sending message {}B to '#{} {}' ", Buffer.Size(), Connection->Id().Value(), Connection->RemoteAddr()); + + async_write(Connection->Socket(), + asio::buffer(Buffer.Data(), Buffer.Size()), + [this, Connection, Buffer](const asio::error_code& Ec, std::size_t) { + if (Ec) + { + CloseConnection(Connection, Ec); + } + }); +} + +void WsServer::AcceptConnection() { auto Socket = std::make_unique<asio::ip::tcp::socket>(m_IoSvc); @@ -685,7 +707,7 @@ WsServer::AcceptConnection() } else { - auto Connection = std::make_shared<WsConnection>(WsConnectionId::New(), std::move(ConnectedSocket)); + auto Connection = std::make_shared<WsConnection>(WebSocketId::New(), std::move(ConnectedSocket)); ZEN_LOG_DEBUG(LogWebSocket, "accept connection '#{} {}' OK", Connection->Id().Value(), Connection->RemoteAddr()); @@ -726,7 +748,7 @@ WsServer::CloseConnection(std::shared_ptr<WsConnection> Connection, const std::e } } - const WsConnectionId Id = Connection->Id(); + const WebSocketId Id = Connection->Id(); { std::unique_lock _(m_ConnMutex); @@ -763,6 +785,8 @@ WsServer::ReadMessage(std::shared_ptr<WsConnection> Connection) ParseMessageResult Result = Parser.ParseMessage(MemoryView(Buffer.data(), Buffer.size())); + Connection->ReadBuffer().consume(Result.ByteCount); + if (Result.Status == ParseMessageStatus::kContinue) { return ReadMessage(Connection); @@ -770,10 +794,10 @@ WsServer::ReadMessage(std::shared_ptr<WsConnection> Connection) if (Result.Status == ParseMessageStatus::kError) { - ZEN_LOG_DEBUG(LogWebSocket, - "handshake with connection '#{} {}' FAILED, reason 'HTTP parse error'", - Connection->Id().Value(), - Connection->RemoteAddr()); + ZEN_LOG_WARN(LogWebSocket, + "handshake with connection '#{} {}' FAILED, reason 'HTTP parse error'", + Connection->Id().Value(), + Connection->RemoteAddr()); return CloseConnection(Connection, std::error_code()); } @@ -877,6 +901,8 @@ WsServer::ReadMessage(std::shared_ptr<WsConnection> Connection) Connection->SetParser(std::make_unique<WebSocketMessageParser>()); Connection->SetState(kConnected); + + ReadMessage(Connection); }); } break; @@ -935,15 +961,24 @@ WsServer::ReadMessage(std::shared_ptr<WsConnection> Connection) void WsServer::RouteMessage(std::shared_ptr<WsConnection> Connection, const CbPackage& Msg) { - ZEN_UNUSED(Connection, Msg); ZEN_LOG_DEBUG(LogWebSocket, "routing message"); + + for (auto Server : m_Services) + { + if (Server->HandleMessage(Connection->Id(), Msg)) + { + return; + } + } + + ZEN_LOG_WARN(LogWebSocket, "unhandled message"); } /////////////////////////////////////////////////////////////////////////////// class WsClient final : public WebSocketClient, public std::enable_shared_from_this<WsClient> { public: - WsClient(asio::io_context& IoCtx) : m_IoCtx(IoCtx), m_Id(WsConnectionId::New()) {} + WsClient(asio::io_context& IoCtx) : m_IoCtx(IoCtx), m_Id(WebSocketId::New()) {} virtual ~WsClient() { Disconnect(); } @@ -953,6 +988,8 @@ public: virtual void Disconnect() override; virtual bool IsConnected() const { return false; } virtual WebSocketState State() const { return static_cast<WebSocketState>(m_State.load()); } + virtual void SendMsg(CbPackage&& Msg) override; + virtual void SendMsg(CbObject&& Msg) override; virtual void On(WebSocketEvent Evt, EventCallback&& Cb) override; virtual void OnMessage(MessageCallback&& Cb) override; @@ -967,7 +1004,7 @@ private: void RouteMessage(CbPackage&& Msg); asio::io_context& m_IoCtx; - WsConnectionId m_Id; + WebSocketId m_Id; std::unique_ptr<asio::ip::tcp::socket> m_Socket; std::unique_ptr<MessageParser> m_MsgParser; asio::streambuf m_ReadBuffer; @@ -1077,6 +1114,35 @@ WsClient::Disconnect() } void +WsClient::SendMsg(CbPackage&& Msg) +{ + BinaryWriter Writer; + WebSocketMessageHeader::Write(Writer, Msg); + + IoBuffer Buffer = IoBufferBuilder::MakeCloneFromMemory(Writer.Data(), Writer.Size()); + + ZEN_LOG_DEBUG(LogWsClient, "sending message {}B", Buffer.Size()); + + async_write(*m_Socket, asio::buffer(Buffer.Data(), Buffer.Size()), [Self = AsShared()](const asio::error_code& Ec, std::size_t) { + if (Ec) + { + ZEN_LOG_ERROR(LogWsClient, "send messge FAILED, reason '{}'", Ec.message()); + + Self->Disconnect(); + } + }); +} + +void +WsClient::SendMsg(CbObject&& Msg) +{ + CbPackage Pkg; + Pkg.SetObject(std::move(Msg)); + + WsClient::SendMsg(std::move(Pkg)); +} + +void WsClient::On(WebSocketEvent Evt, WebSocketClient::EventCallback&& Cb) { m_EventCallbacks[static_cast<uint32_t>(Evt)] = std::move(Cb); @@ -1157,9 +1223,8 @@ WsClient::ReadMessage() Self->SetParser(std::make_unique<WebSocketMessageParser>()); Self->SetState(WebSocketState::kConnected); - Self->TriggerEvent(WebSocketEvent::kConnected); - Self->ReadMessage(); + Self->TriggerEvent(WebSocketEvent::kConnected); } break; @@ -1225,6 +1290,35 @@ WsClient::RouteMessage(CbPackage&& Msg) namespace zen { +std::atomic_uint32_t WebSocketId::NextId{1}; + +void +WebSocketService::Configure(WebSocketServer& Server) +{ + ZEN_ASSERT(m_Server == nullptr); + + m_Server = &Server; +} + +void +WebSocketService::PublishMessage(WebSocketId Id, CbPackage&& Msg) +{ + ZEN_ASSERT(m_Server != nullptr); + + m_Server->PublishMessage(Id, std::move(Msg)); +} + +void +WebSocketService::PublishMessage(WebSocketId Id, CbObject&& Msg) +{ + ZEN_ASSERT(m_Server != nullptr); + + CbPackage Pkg; + Pkg.SetObject(std::move(Msg)); + + m_Server->PublishMessage(Id, std::move(Pkg)); +} + bool WebSocketMessageHeader::IsValid() const { @@ -1245,6 +1339,22 @@ WebSocketMessageHeader::Read(MemoryView Memory, WebSocketMessageHeader& OutHeade return OutHeader.IsValid(); } +void +WebSocketMessageHeader::Write(BinaryWriter& Writer, const CbPackage& Msg) +{ + WebSocketMessageHeader Header; + + Writer.Write(&Header, sizeof(WebSocketMessageHeader)); + Msg.Save(Writer); + + Header.Magic = WebSocketMessageHeader::ExpectedMagic; + Header.ContentLength = Writer.CurrentOffset() - sizeof(WebSocketMessageHeader); + Header.Crc32 = WebSocketMessageHeader::ExpectedMagic; // TODO + + MutableMemoryView HeaderView(const_cast<uint8_t*>(Writer.Data()), sizeof(WebSocketMessageHeader)); + HeaderView.CopyFrom(MemoryView(&Header, sizeof(WebSocketMessageHeader))); +} + std::unique_ptr<WebSocketServer> WebSocketServer::Create() { diff --git a/zenserver-test/zenserver-test.cpp b/zenserver-test/zenserver-test.cpp index b7d157d5a..73048b504 100644 --- a/zenserver-test/zenserver-test.cpp +++ b/zenserver-test/zenserver-test.cpp @@ -2095,31 +2095,47 @@ TEST_CASE("websocket.basic") IoDispatcher IoDispatcher(IoCtx); auto WebSocket = WebSocketClient::Create(IoCtx); - std::atomic_bool Signaled{false}; + std::atomic_bool Done{false}; WebSocketEvent Event; WebSocket->On(WebSocketEvent::kConnected, [&]() { - Event = WebSocketEvent::kConnected; - Signaled = true; + CbObjectWriter Req; + Req.BeginObject("Header"); + Req << "Method"sv + << "TestHelloZen"sv; + Req << "CorrelationId" << uint64_t(1); + Req.EndObject(); + + WebSocket->SendMsg(Req.Save()); }); WebSocket->On(WebSocketEvent::kDisconnected, [&]() { - Event = WebSocketEvent::kDisconnected; - Signaled = true; + Event = WebSocketEvent::kDisconnected; + Done = true; + }); + + CbPackage Response; + WebSocket->OnMessage([&](const CbPackage& Msg) { + Response = Msg; + Done = true; }); WebSocket->Connect({.Host = "127.0.0.1", .Port = 8848, .Endpoint = "/zen"}); IoDispatcher.Run(); - while (Signaled == false && IoDispatcher.IsRunning()) + while (Done == false && IoDispatcher.IsRunning()) { - std::this_thread::sleep_for(std::chrono::seconds(5)); + std::this_thread::sleep_for(std::chrono::seconds(2)); }; - CHECK(Event == WebSocketEvent::kConnected); + CbObject ResponseObject = Response.GetObject(); + std::string_view Message = ResponseObject["Result"].AsString(); + CHECK(Message == "Hello Friend!!"sv); WebSocket->Disconnect(); + + IoDispatcher.Stop(); } # if 0 diff --git a/zenserver/testing/httptest.cpp b/zenserver/testing/httptest.cpp index 230d5d6c5..41a4f064b 100644 --- a/zenserver/testing/httptest.cpp +++ b/zenserver/testing/httptest.cpp @@ -136,6 +136,33 @@ HttpTestingService::HandlePackageRequest(HttpServerRequest& HttpServiceRequest) return (InsertResult.first->second = new PackageHandler(*this, RequestId)).Get(); } +bool +HttpTestingService::HandleMessage(WebSocketId SocketId, const CbPackage& Msg) +{ + using namespace std::literals; + + CbObject Request = Msg.GetObject(); + + CbObjectView Header = Request["Header"sv].AsObjectView(); + std::string_view Method = Header["Method"].AsString(); + const uint64_t CorrelationId = Header["CorrelationId"].AsUInt64(); + + if (Method != "TestHelloZen"sv) + { + return false; + } + + CbObjectWriter Response; + Response.BeginObject("Header"); + Response << "CorrelationId"sv << CorrelationId; + Response.EndObject(); + Response.AddString("Result"sv, "Hello Friend!!"); + + PublishMessage(SocketId, Response.Save()); + + return true; +} + ////////////////////////////////////////////////////////////////////////// HttpTestingService::PackageHandler::PackageHandler(HttpTestingService& Svc, uint32_t RequestId) : m_Svc(Svc), m_RequestId(RequestId) diff --git a/zenserver/testing/httptest.h b/zenserver/testing/httptest.h index f7ea0c31c..267d59b36 100644 --- a/zenserver/testing/httptest.h +++ b/zenserver/testing/httptest.h @@ -5,6 +5,7 @@ #include <zencore/logging.h> #include <zencore/stats.h> #include <zenhttp/httpserver.h> +#include <zenhttp/websocket.h> #include <atomic> @@ -13,7 +14,7 @@ namespace zen { /** * Test service to facilitate testing the HTTP framework and client interactions */ -class HttpTestingService : public HttpService +class HttpTestingService : public HttpService, public WebSocketService { public: HttpTestingService(); @@ -22,6 +23,7 @@ public: virtual const char* BaseUri() const override; virtual void HandleRequest(HttpServerRequest& Request) override; virtual Ref<IHttpPackageHandler> HandlePackageRequest(HttpServerRequest& HttpServiceRequest) override; + virtual bool HandleMessage(WebSocketId SocketId, const CbPackage& Msg) override; class PackageHandler : public IHttpPackageHandler { diff --git a/zenserver/zenserver.cpp b/zenserver/zenserver.cpp index 1b067ac70..08abfdecd 100644 --- a/zenserver/zenserver.cpp +++ b/zenserver/zenserver.cpp @@ -305,6 +305,7 @@ public: m_Http->RegisterService(m_TestService); // NOTE: this is intentionally not limited to test mode as it's useful for diagnostics m_Http->RegisterService(m_TestingService); + m_WebSocket->RegisterService(m_TestingService); m_Http->RegisterService(m_AdminService); if (m_HttpProjectService) @@ -398,9 +399,9 @@ public: SetNewState(kRunning); OnReady(); - + m_WebSocket->Run({.Port = 8848}); - + m_Http->Run(IsInteractiveMode); SetNewState(kShuttingDown); |