diff options
| author | Stefan Boberg <[email protected]> | 2023-01-26 09:56:08 +0100 |
|---|---|---|
| committer | Stefan Boberg <[email protected]> | 2023-01-26 09:56:52 +0100 |
| commit | c3b906dec4589d0b06f4387dce26dd79a988c269 (patch) | |
| tree | 96e3947b7f76e3b255b21c2f70831ce121191492 /zenserver/upstream | |
| parent | fix gc logging (#213) (diff) | |
| download | zen-c3b906dec4589d0b06f4387dce26dd79a988c269.tar.xz zen-c3b906dec4589d0b06f4387dce26dd79a988c269.zip | |
removed experimental mesh code
should be replaced with a proper implementation later
Diffstat (limited to 'zenserver/upstream')
| -rw-r--r-- | zenserver/upstream/zen.cpp | 284 | ||||
| -rw-r--r-- | zenserver/upstream/zen.h | 59 |
2 files changed, 0 insertions, 343 deletions
diff --git a/zenserver/upstream/zen.cpp b/zenserver/upstream/zen.cpp index 980958740..9e1212834 100644 --- a/zenserver/upstream/zen.cpp +++ b/zenserver/upstream/zen.cpp @@ -23,290 +23,6 @@ ZEN_THIRD_PARTY_INCLUDES_END #include <gsl/gsl-lite.hpp> namespace zen { -namespace detail { - struct MessageHeader - { - static const uint32_t kMagic = 0x11'99'77'22; - - uint32_t Magic = kMagic; - uint32_t Checksum = 0; - uint16_t MessageSize = 0; // Size *including* this field and the reserved field - uint16_t Reserved = 0; - - void SetPayload(const void* PayloadData, uint64_t PayloadSize) - { - memcpy(Payload(), PayloadData, PayloadSize); - MessageSize = gsl::narrow<uint16_t>(PayloadSize + sizeof MessageSize + sizeof Reserved); - Checksum = ComputeChecksum(); - } - - inline CbObject GetMessage() const - { - if (IsOk()) - { - MemoryView MessageView(Payload(), MessageSize - sizeof MessageSize - sizeof Reserved); - - CbValidateError ValidationResult = ValidateCompactBinary(MessageView, CbValidateMode::All); - - if (ValidationResult == CbValidateError::None) - { - return CbObject{SharedBuffer::MakeView(MessageView)}; - } - } - - return {}; - } - - uint32_t TotalSize() const { return MessageSize + sizeof Checksum + sizeof Magic; } - uint32_t ComputeChecksum() const { return gsl::narrow_cast<uint32_t>(XXH3_64bits(&MessageSize, MessageSize)); } - inline bool IsOk() const { return Magic == kMagic && Checksum == ComputeChecksum(); } - - private: - inline void* Payload() { return &Reserved + 1; } - inline const void* Payload() const { return &Reserved + 1; } - }; -} // namespace detail - -// Note that currently this just implements an UDP echo service for testing purposes - -MeshTracker::MeshTracker(asio::io_context& IoContext) : m_Log(logging::Get("mesh")), m_IoContext(IoContext), m_SessionId(GetSessionId()) -{ -} - -MeshTracker::~MeshTracker() -{ - Stop(); -} - -void -MeshTracker::Start(uint16_t Port) -{ - ZEN_ASSERT(Port); - ZEN_ASSERT(m_Port == 0); - - m_Port = Port; - m_UdpSocket = std::make_unique<asio::ip::udp::socket>(m_IoContext, asio::ip::udp::endpoint(asio::ip::udp::v4(), m_Port)); - m_Thread = std::make_unique<std::thread>([this] { Run(); }); -}; - -void -MeshTracker::Stop() -{ - using namespace std::literals; - - if (!m_Port) - { - // Never started, nothing to do here - return; - } - - CbObjectWriter Msg; - Msg << "bye"sv << m_SessionId; - BroadcastPacket(Msg); - - m_State = kExiting; - - std::error_code Ec; - m_Timer.cancel(Ec); - - m_UdpSocket->close(Ec); - - m_IoContext.stop(); - - if (m_Thread) - { - m_Thread->join(); - m_Thread.reset(); - } -} - -void -MeshTracker::EnqueueTick() -{ - m_Timer.expires_after(std::chrono::seconds(10)); - - m_Timer.async_wait([&](const std::error_code& Ec) { - if (!Ec) - { - OnTick(); - } - else - { - if (m_State != kExiting) - { - ZEN_WARN("Mesh timer error: {}", Ec.message()); - } - } - }); -} - -void -MeshTracker::OnTick() -{ - using namespace std::literals; - - CbObjectWriter Msg; - - // Basic service information - - Msg.BeginArray("s"); - Msg << m_SessionId << m_Port << /* event sequence # */ uint32_t(0); - Msg.EndArray(); - - BroadcastPacket(Msg); - - EnqueueTick(); -} - -void -MeshTracker::BroadcastPacket(CbObjectWriter& Obj) -{ - std::error_code ErrorCode; - - asio::ip::udp::socket BroadcastSocket(m_IoContext); - BroadcastSocket.open(asio::ip::udp::v4(), ErrorCode); - - if (!ErrorCode) - { - BroadcastSocket.set_option(asio::ip::udp::socket::reuse_address(true)); - BroadcastSocket.set_option(asio::socket_base::broadcast(true)); - - asio::ip::udp::endpoint BroadcastEndpoint(asio::ip::address_v4::broadcast(), m_Port); - - uint8_t MessageBuffer[kMaxMessageSize]; - detail::MessageHeader* Message = reinterpret_cast<detail::MessageHeader*>(MessageBuffer); - *Message = {}; - - BinaryWriter MemOut; - - Obj.Save(MemOut); - - // TODO: check that it fits in a packet! - - Message->SetPayload(MemOut.Data(), MemOut.Size()); - - BroadcastSocket.send_to(asio::buffer(Message, Message->TotalSize()), BroadcastEndpoint, 0, ErrorCode); - - if (!ErrorCode) - { - BroadcastSocket.close(ErrorCode); - } - - if (ErrorCode) - { - ZEN_WARN("packet broadcast failed: {}", ErrorCode.message()); - } - } - else - { - ZEN_WARN("failed to open broadcast socket: {}", ErrorCode.message()); - } -} - -void -MeshTracker::Run() -{ - m_State = kRunning; - - EnqueueTick(); - - IssueReceive(); - m_IoContext.run(); -} - -void -MeshTracker::IssueReceive() -{ - using namespace std::literals; - - m_UdpSocket->async_receive_from( - asio::buffer(m_MessageBuffer, sizeof m_MessageBuffer), - m_SenderEndpoint, - [this](std::error_code ec, size_t BytesReceived) { - if (!ec && BytesReceived) - { - std::error_code ErrorCode; - std::string SenderIp = m_SenderEndpoint.address().to_string(ErrorCode); - - // Process message - - uint32_t& Magic = *reinterpret_cast<uint32_t*>(m_MessageBuffer); - - switch (Magic) - { - case detail::MessageHeader::kMagic: - { - detail::MessageHeader& Header = *reinterpret_cast<detail::MessageHeader*>(m_MessageBuffer); - - if (CbObject Msg = Header.GetMessage()) - { - const asio::ip::address& Ip = m_SenderEndpoint.address(); - - if (auto Field = Msg["s"sv]) - { - // Announce - - CbArrayView Ci = Field.AsArrayView(); - auto It = Ci.CreateViewIterator(); - - const Oid SessionId = It->AsObjectId(); - - if (SessionId != Oid::Zero && SessionId != m_SessionId) - { - // const uint16_t Port = (++It)->AsUInt16(m_SenderEndpoint.port()); - // const uint32_t Lsn = (++It)->AsUInt32(); - - ZEN_TRACE("received hey from {} ({})", SenderIp, SessionId); - - RwLock::ExclusiveLockScope _(m_SessionsLock); - - PeerInfo& Info = m_KnownPeers[SessionId]; - - Info.LastSeen = std::time(nullptr); - Info.SessionId = SessionId; - - if (std::find(begin(Info.SeenOnIP), end(Info.SeenOnIP), Ip) == Info.SeenOnIP.end()) - { - Info.SeenOnIP.push_back(Ip); - } - } - } - else if (auto Bye = Msg["bye"sv]) - { - Oid SessionId = Field.AsObjectId(); - - ZEN_DEBUG("received bye from {} ({})", SenderIp, SessionId); - - // We could verify that it's sent from a known IP before erasing the - // session, if we want to be paranoid - - RwLock::ExclusiveLockScope _(m_SessionsLock); - - m_KnownPeers.erase(SessionId); - } - else - { - // Unknown message type, just ignore - } - } - else - { - ZEN_WARN("received malformed message from {}", SenderIp); - } - } - break; - - default: - ZEN_WARN("received malformed data from {}", SenderIp); - break; - } - } - - IssueReceive(); - }); -} - -////////////////////////////////////////////////////////////////////////// namespace detail { struct ZenCacheSessionState diff --git a/zenserver/upstream/zen.h b/zenserver/upstream/zen.h index 9a082ebc9..bfba8fa98 100644 --- a/zenserver/upstream/zen.h +++ b/zenserver/upstream/zen.h @@ -31,65 +31,6 @@ class CbObjectView; class CbPackage; class ZenStructuredCacheClient; -/** Zen mesh tracker - * - * Discovers and tracks local peers - * - * NOTE: This is currently experimental, and not very useful yet - * - */ - -class MeshTracker -{ -public: - MeshTracker(asio::io_context& IoContext); - ~MeshTracker(); - - void Start(uint16_t Port); - void Stop(); - -private: - void Run(); - void IssueReceive(); - void EnqueueTick(); - void OnTick(); - void BroadcastPacket(CbObjectWriter&); - - enum State - { - kInitializing, - kRunning, - kExiting - }; - - static const int kMaxMessageSize = 2048; - static const int kMaxUpdateSize = 1400; // We'll try not to send messages larger than this - - spdlog::logger& Log() { return m_Log; } - - spdlog::logger& m_Log; - std::atomic<State> m_State = kInitializing; - asio::io_context& m_IoContext; - std::unique_ptr<asio::ip::udp::socket> m_UdpSocket; - std::unique_ptr<asio::ip::udp::socket> m_BroadcastSocket; - asio::ip::udp::endpoint m_SenderEndpoint; - std::unique_ptr<std::thread> m_Thread; - uint16_t m_Port = 0; - uint8_t m_MessageBuffer[kMaxMessageSize]; - asio::high_resolution_timer m_Timer{m_IoContext}; - Oid m_SessionId; - - struct PeerInfo - { - Oid SessionId; - std::time_t LastSeen; - std::vector<asio::ip::address> SeenOnIP; - }; - - RwLock m_SessionsLock; - tsl::robin_map<Oid, PeerInfo, Oid::Hasher> m_KnownPeers; -}; - ////////////////////////////////////////////////////////////////////////// namespace detail { |