diff options
| author | Stefan Boberg <[email protected]> | 2023-01-26 09:56:08 +0100 |
|---|---|---|
| committer | Stefan Boberg <[email protected]> | 2023-01-26 09:56:52 +0100 |
| commit | c3b906dec4589d0b06f4387dce26dd79a988c269 (patch) | |
| tree | 96e3947b7f76e3b255b21c2f70831ce121191492 | |
| parent | fix gc logging (#213) (diff) | |
| download | zen-c3b906dec4589d0b06f4387dce26dd79a988c269.tar.xz zen-c3b906dec4589d0b06f4387dce26dd79a988c269.zip | |
removed experimental mesh code
should be replaced with a proper implementation later
| -rw-r--r-- | xmake.lua | 8 | ||||
| -rw-r--r-- | zenserver-test/zenserver-test.cpp | 31 | ||||
| -rw-r--r-- | zenserver/config.cpp | 12 | ||||
| -rw-r--r-- | zenserver/config.h | 7 | ||||
| -rw-r--r-- | zenserver/upstream/zen.cpp | 284 | ||||
| -rw-r--r-- | zenserver/upstream/zen.h | 59 | ||||
| -rw-r--r-- | zenserver/zenserver.cpp | 20 |
7 files changed, 0 insertions, 421 deletions
@@ -142,14 +142,6 @@ option("exec") option_end() add_define_by_config("ZEN_WITH_EXEC_SERVICES", "exec") -option("zenmesh") - -- note: this is an old prototype and is not functional at all at the moment - set_default(false) - set_showmenu(true) - set_description("Enables Zen's mesh feature") -option_end() -add_define_by_config("ZEN_ENABLE_MESH", "zenmesh") - option("zentrace") set_default(true) set_showmenu(true) diff --git a/zenserver-test/zenserver-test.cpp b/zenserver-test/zenserver-test.cpp index ee3c5b742..c08ad8e0a 100644 --- a/zenserver-test/zenserver-test.cpp +++ b/zenserver-test/zenserver-test.cpp @@ -2492,37 +2492,6 @@ TEST_CASE(".exec.basic") } # endif // ZEN_WITH_EXEC_SERVICES -TEST_CASE("mesh.basic") -{ -// --mesh option only available with ZEN_ENABLE_MESH -# if ZEN_ENABLE_MESH - using namespace std::literals; - - const int kInstanceCount = 4; - - ZEN_INFO("spawning {} instances", kInstanceCount); - - std::unique_ptr<ZenServerInstance> Instances[kInstanceCount]; - - for (int i = 0; i < kInstanceCount; ++i) - { - auto& Instance = Instances[i]; - - Instance = std::make_unique<ZenServerInstance>(TestEnv); - Instance->SetTestDir(TestEnv.CreateNewTestDir()); - Instance->EnableMesh(); - Instance->SpawnServer(13337 + i); - } - - for (int i = 0; i < kInstanceCount; ++i) - { - auto& Instance = Instances[i]; - - Instance->WaitUntilReady(); - } -# endif -} - class ZenServerTestHelper { public: diff --git a/zenserver/config.cpp b/zenserver/config.cpp index d7233a6f4..fa4ee7f5a 100644 --- a/zenserver/config.cpp +++ b/zenserver/config.cpp @@ -222,15 +222,6 @@ ParseCliOptions(int argc, char* argv[], ZenServerOptions& ServerOptions) cxxopts::value<int>(ServerOptions.WebSocketThreads)->default_value("0"), ""); -#if ZEN_ENABLE_MESH - options.add_option("network", - "m", - "mesh", - "Enable mesh network", - cxxopts::value<bool>(ServerOptions.MeshEnabled)->default_value("false"), - ""); -#endif - #if ZEN_WITH_TRACE options.add_option("ue-trace", "", @@ -670,9 +661,6 @@ ParseConfigFile(const std::filesystem::path& Path, ZenServerOptions& ServerOptio } ServerOptions.BasePort = NetworkConfig->get_or<int>("port", ServerOptions.BasePort); -#if ZEN_ENABLE_MESH - ServerOptions.MeshEnabled = NetworkConfig->get_or<bool>("meshenabled", ServerOptions.MeshEnabled); -#endif } auto UpdateStringValueFromConfig = [](const sol::table& Table, std::string_view Key, std::string& OutValue) { diff --git a/zenserver/config.h b/zenserver/config.h index 4cdef0318..096a800b4 100644 --- a/zenserver/config.h +++ b/zenserver/config.h @@ -7,10 +7,6 @@ #include <string> #include <vector> -#ifndef ZEN_ENABLE_MESH -# define ZEN_ENABLE_MESH 0 -#endif - struct ZenUpstreamJupiterConfig { std::string Name; @@ -144,9 +140,6 @@ struct ZenServerOptions std::string TraceHost; // Host name or IP address to send trace data to std::string TraceFile; // Path of a file to write a trace #endif -#if ZEN_ENABLE_MESH - bool MeshEnabled = false; // Experimental p2p mesh discovery -#endif }; void ParseCliOptions(int argc, char* argv[], ZenServerOptions& ServerOptions); diff --git a/zenserver/upstream/zen.cpp b/zenserver/upstream/zen.cpp index 980958740..9e1212834 100644 --- a/zenserver/upstream/zen.cpp +++ b/zenserver/upstream/zen.cpp @@ -23,290 +23,6 @@ ZEN_THIRD_PARTY_INCLUDES_END #include <gsl/gsl-lite.hpp> namespace zen { -namespace detail { - struct MessageHeader - { - static const uint32_t kMagic = 0x11'99'77'22; - - uint32_t Magic = kMagic; - uint32_t Checksum = 0; - uint16_t MessageSize = 0; // Size *including* this field and the reserved field - uint16_t Reserved = 0; - - void SetPayload(const void* PayloadData, uint64_t PayloadSize) - { - memcpy(Payload(), PayloadData, PayloadSize); - MessageSize = gsl::narrow<uint16_t>(PayloadSize + sizeof MessageSize + sizeof Reserved); - Checksum = ComputeChecksum(); - } - - inline CbObject GetMessage() const - { - if (IsOk()) - { - MemoryView MessageView(Payload(), MessageSize - sizeof MessageSize - sizeof Reserved); - - CbValidateError ValidationResult = ValidateCompactBinary(MessageView, CbValidateMode::All); - - if (ValidationResult == CbValidateError::None) - { - return CbObject{SharedBuffer::MakeView(MessageView)}; - } - } - - return {}; - } - - uint32_t TotalSize() const { return MessageSize + sizeof Checksum + sizeof Magic; } - uint32_t ComputeChecksum() const { return gsl::narrow_cast<uint32_t>(XXH3_64bits(&MessageSize, MessageSize)); } - inline bool IsOk() const { return Magic == kMagic && Checksum == ComputeChecksum(); } - - private: - inline void* Payload() { return &Reserved + 1; } - inline const void* Payload() const { return &Reserved + 1; } - }; -} // namespace detail - -// Note that currently this just implements an UDP echo service for testing purposes - -MeshTracker::MeshTracker(asio::io_context& IoContext) : m_Log(logging::Get("mesh")), m_IoContext(IoContext), m_SessionId(GetSessionId()) -{ -} - -MeshTracker::~MeshTracker() -{ - Stop(); -} - -void -MeshTracker::Start(uint16_t Port) -{ - ZEN_ASSERT(Port); - ZEN_ASSERT(m_Port == 0); - - m_Port = Port; - m_UdpSocket = std::make_unique<asio::ip::udp::socket>(m_IoContext, asio::ip::udp::endpoint(asio::ip::udp::v4(), m_Port)); - m_Thread = std::make_unique<std::thread>([this] { Run(); }); -}; - -void -MeshTracker::Stop() -{ - using namespace std::literals; - - if (!m_Port) - { - // Never started, nothing to do here - return; - } - - CbObjectWriter Msg; - Msg << "bye"sv << m_SessionId; - BroadcastPacket(Msg); - - m_State = kExiting; - - std::error_code Ec; - m_Timer.cancel(Ec); - - m_UdpSocket->close(Ec); - - m_IoContext.stop(); - - if (m_Thread) - { - m_Thread->join(); - m_Thread.reset(); - } -} - -void -MeshTracker::EnqueueTick() -{ - m_Timer.expires_after(std::chrono::seconds(10)); - - m_Timer.async_wait([&](const std::error_code& Ec) { - if (!Ec) - { - OnTick(); - } - else - { - if (m_State != kExiting) - { - ZEN_WARN("Mesh timer error: {}", Ec.message()); - } - } - }); -} - -void -MeshTracker::OnTick() -{ - using namespace std::literals; - - CbObjectWriter Msg; - - // Basic service information - - Msg.BeginArray("s"); - Msg << m_SessionId << m_Port << /* event sequence # */ uint32_t(0); - Msg.EndArray(); - - BroadcastPacket(Msg); - - EnqueueTick(); -} - -void -MeshTracker::BroadcastPacket(CbObjectWriter& Obj) -{ - std::error_code ErrorCode; - - asio::ip::udp::socket BroadcastSocket(m_IoContext); - BroadcastSocket.open(asio::ip::udp::v4(), ErrorCode); - - if (!ErrorCode) - { - BroadcastSocket.set_option(asio::ip::udp::socket::reuse_address(true)); - BroadcastSocket.set_option(asio::socket_base::broadcast(true)); - - asio::ip::udp::endpoint BroadcastEndpoint(asio::ip::address_v4::broadcast(), m_Port); - - uint8_t MessageBuffer[kMaxMessageSize]; - detail::MessageHeader* Message = reinterpret_cast<detail::MessageHeader*>(MessageBuffer); - *Message = {}; - - BinaryWriter MemOut; - - Obj.Save(MemOut); - - // TODO: check that it fits in a packet! - - Message->SetPayload(MemOut.Data(), MemOut.Size()); - - BroadcastSocket.send_to(asio::buffer(Message, Message->TotalSize()), BroadcastEndpoint, 0, ErrorCode); - - if (!ErrorCode) - { - BroadcastSocket.close(ErrorCode); - } - - if (ErrorCode) - { - ZEN_WARN("packet broadcast failed: {}", ErrorCode.message()); - } - } - else - { - ZEN_WARN("failed to open broadcast socket: {}", ErrorCode.message()); - } -} - -void -MeshTracker::Run() -{ - m_State = kRunning; - - EnqueueTick(); - - IssueReceive(); - m_IoContext.run(); -} - -void -MeshTracker::IssueReceive() -{ - using namespace std::literals; - - m_UdpSocket->async_receive_from( - asio::buffer(m_MessageBuffer, sizeof m_MessageBuffer), - m_SenderEndpoint, - [this](std::error_code ec, size_t BytesReceived) { - if (!ec && BytesReceived) - { - std::error_code ErrorCode; - std::string SenderIp = m_SenderEndpoint.address().to_string(ErrorCode); - - // Process message - - uint32_t& Magic = *reinterpret_cast<uint32_t*>(m_MessageBuffer); - - switch (Magic) - { - case detail::MessageHeader::kMagic: - { - detail::MessageHeader& Header = *reinterpret_cast<detail::MessageHeader*>(m_MessageBuffer); - - if (CbObject Msg = Header.GetMessage()) - { - const asio::ip::address& Ip = m_SenderEndpoint.address(); - - if (auto Field = Msg["s"sv]) - { - // Announce - - CbArrayView Ci = Field.AsArrayView(); - auto It = Ci.CreateViewIterator(); - - const Oid SessionId = It->AsObjectId(); - - if (SessionId != Oid::Zero && SessionId != m_SessionId) - { - // const uint16_t Port = (++It)->AsUInt16(m_SenderEndpoint.port()); - // const uint32_t Lsn = (++It)->AsUInt32(); - - ZEN_TRACE("received hey from {} ({})", SenderIp, SessionId); - - RwLock::ExclusiveLockScope _(m_SessionsLock); - - PeerInfo& Info = m_KnownPeers[SessionId]; - - Info.LastSeen = std::time(nullptr); - Info.SessionId = SessionId; - - if (std::find(begin(Info.SeenOnIP), end(Info.SeenOnIP), Ip) == Info.SeenOnIP.end()) - { - Info.SeenOnIP.push_back(Ip); - } - } - } - else if (auto Bye = Msg["bye"sv]) - { - Oid SessionId = Field.AsObjectId(); - - ZEN_DEBUG("received bye from {} ({})", SenderIp, SessionId); - - // We could verify that it's sent from a known IP before erasing the - // session, if we want to be paranoid - - RwLock::ExclusiveLockScope _(m_SessionsLock); - - m_KnownPeers.erase(SessionId); - } - else - { - // Unknown message type, just ignore - } - } - else - { - ZEN_WARN("received malformed message from {}", SenderIp); - } - } - break; - - default: - ZEN_WARN("received malformed data from {}", SenderIp); - break; - } - } - - IssueReceive(); - }); -} - -////////////////////////////////////////////////////////////////////////// namespace detail { struct ZenCacheSessionState diff --git a/zenserver/upstream/zen.h b/zenserver/upstream/zen.h index 9a082ebc9..bfba8fa98 100644 --- a/zenserver/upstream/zen.h +++ b/zenserver/upstream/zen.h @@ -31,65 +31,6 @@ class CbObjectView; class CbPackage; class ZenStructuredCacheClient; -/** Zen mesh tracker - * - * Discovers and tracks local peers - * - * NOTE: This is currently experimental, and not very useful yet - * - */ - -class MeshTracker -{ -public: - MeshTracker(asio::io_context& IoContext); - ~MeshTracker(); - - void Start(uint16_t Port); - void Stop(); - -private: - void Run(); - void IssueReceive(); - void EnqueueTick(); - void OnTick(); - void BroadcastPacket(CbObjectWriter&); - - enum State - { - kInitializing, - kRunning, - kExiting - }; - - static const int kMaxMessageSize = 2048; - static const int kMaxUpdateSize = 1400; // We'll try not to send messages larger than this - - spdlog::logger& Log() { return m_Log; } - - spdlog::logger& m_Log; - std::atomic<State> m_State = kInitializing; - asio::io_context& m_IoContext; - std::unique_ptr<asio::ip::udp::socket> m_UdpSocket; - std::unique_ptr<asio::ip::udp::socket> m_BroadcastSocket; - asio::ip::udp::endpoint m_SenderEndpoint; - std::unique_ptr<std::thread> m_Thread; - uint16_t m_Port = 0; - uint8_t m_MessageBuffer[kMaxMessageSize]; - asio::high_resolution_timer m_Timer{m_IoContext}; - Oid m_SessionId; - - struct PeerInfo - { - Oid SessionId; - std::time_t LastSeen; - std::vector<asio::ip::address> SeenOnIP; - }; - - RwLock m_SessionsLock; - tsl::robin_map<Oid, PeerInfo, Oid::Hasher> m_KnownPeers; -}; - ////////////////////////////////////////////////////////////////////////// namespace detail { diff --git a/zenserver/zenserver.cpp b/zenserver/zenserver.cpp index cf3d90324..c1e82f404 100644 --- a/zenserver/zenserver.cpp +++ b/zenserver/zenserver.cpp @@ -303,17 +303,6 @@ public: ZEN_INFO("NOT instantiating structured cache service"); } -#if ZEN_ENABLE_MESH - if (ServerOptions.MeshEnabled) - { - StartMesh(EffectiveBasePort); - } - else - { - ZEN_INFO("NOT starting mesh"); - } -#endif - m_Http->RegisterService(m_TestService); // NOTE: this is intentionally not limited to test mode as it's useful for diagnostics m_Http->RegisterService(m_TestingService); m_Http->RegisterService(m_AdminService); @@ -374,14 +363,6 @@ public: void InitializeStructuredCache(const ZenServerOptions& ServerOptions); void InitializeCompute(const ZenServerOptions& ServerOptions); -#if ZEN_ENABLE_MESH - void StartMesh(int BasePort) - { - ZEN_INFO("initializing mesh discovery"); - m_ZenMesh.Start(uint16_t(BasePort)); - } -#endif - void Run() { // This is disabled for now, awaiting better scheduling @@ -611,7 +592,6 @@ private: std::unique_ptr<zen::HttpStructuredCacheService> m_StructuredCacheService; zen::HttpAdminService m_AdminService{m_GcScheduler}; zen::HttpHealthService m_HealthService; - zen::MeshTracker m_ZenMesh{m_IoContext}; #if ZEN_WITH_EXEC_SERVICES std::unique_ptr<zen::HttpLaunchService> m_HttpLaunchService; #endif // ZEN_WITH_EXEC_SERVICES |