aboutsummaryrefslogtreecommitdiff
path: root/zenserver/upstream
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2023-01-26 09:56:08 +0100
committerStefan Boberg <[email protected]>2023-01-26 09:56:52 +0100
commitc3b906dec4589d0b06f4387dce26dd79a988c269 (patch)
tree96e3947b7f76e3b255b21c2f70831ce121191492 /zenserver/upstream
parentfix gc logging (#213) (diff)
downloadzen-c3b906dec4589d0b06f4387dce26dd79a988c269.tar.xz
zen-c3b906dec4589d0b06f4387dce26dd79a988c269.zip
removed experimental mesh code
should be replaced with a proper implementation later
Diffstat (limited to 'zenserver/upstream')
-rw-r--r--zenserver/upstream/zen.cpp284
-rw-r--r--zenserver/upstream/zen.h59
2 files changed, 0 insertions, 343 deletions
diff --git a/zenserver/upstream/zen.cpp b/zenserver/upstream/zen.cpp
index 980958740..9e1212834 100644
--- a/zenserver/upstream/zen.cpp
+++ b/zenserver/upstream/zen.cpp
@@ -23,290 +23,6 @@ ZEN_THIRD_PARTY_INCLUDES_END
#include <gsl/gsl-lite.hpp>
namespace zen {
-namespace detail {
- struct MessageHeader
- {
- static const uint32_t kMagic = 0x11'99'77'22;
-
- uint32_t Magic = kMagic;
- uint32_t Checksum = 0;
- uint16_t MessageSize = 0; // Size *including* this field and the reserved field
- uint16_t Reserved = 0;
-
- void SetPayload(const void* PayloadData, uint64_t PayloadSize)
- {
- memcpy(Payload(), PayloadData, PayloadSize);
- MessageSize = gsl::narrow<uint16_t>(PayloadSize + sizeof MessageSize + sizeof Reserved);
- Checksum = ComputeChecksum();
- }
-
- inline CbObject GetMessage() const
- {
- if (IsOk())
- {
- MemoryView MessageView(Payload(), MessageSize - sizeof MessageSize - sizeof Reserved);
-
- CbValidateError ValidationResult = ValidateCompactBinary(MessageView, CbValidateMode::All);
-
- if (ValidationResult == CbValidateError::None)
- {
- return CbObject{SharedBuffer::MakeView(MessageView)};
- }
- }
-
- return {};
- }
-
- uint32_t TotalSize() const { return MessageSize + sizeof Checksum + sizeof Magic; }
- uint32_t ComputeChecksum() const { return gsl::narrow_cast<uint32_t>(XXH3_64bits(&MessageSize, MessageSize)); }
- inline bool IsOk() const { return Magic == kMagic && Checksum == ComputeChecksum(); }
-
- private:
- inline void* Payload() { return &Reserved + 1; }
- inline const void* Payload() const { return &Reserved + 1; }
- };
-} // namespace detail
-
-// Note that currently this just implements an UDP echo service for testing purposes
-
-MeshTracker::MeshTracker(asio::io_context& IoContext) : m_Log(logging::Get("mesh")), m_IoContext(IoContext), m_SessionId(GetSessionId())
-{
-}
-
-MeshTracker::~MeshTracker()
-{
- Stop();
-}
-
-void
-MeshTracker::Start(uint16_t Port)
-{
- ZEN_ASSERT(Port);
- ZEN_ASSERT(m_Port == 0);
-
- m_Port = Port;
- m_UdpSocket = std::make_unique<asio::ip::udp::socket>(m_IoContext, asio::ip::udp::endpoint(asio::ip::udp::v4(), m_Port));
- m_Thread = std::make_unique<std::thread>([this] { Run(); });
-};
-
-void
-MeshTracker::Stop()
-{
- using namespace std::literals;
-
- if (!m_Port)
- {
- // Never started, nothing to do here
- return;
- }
-
- CbObjectWriter Msg;
- Msg << "bye"sv << m_SessionId;
- BroadcastPacket(Msg);
-
- m_State = kExiting;
-
- std::error_code Ec;
- m_Timer.cancel(Ec);
-
- m_UdpSocket->close(Ec);
-
- m_IoContext.stop();
-
- if (m_Thread)
- {
- m_Thread->join();
- m_Thread.reset();
- }
-}
-
-void
-MeshTracker::EnqueueTick()
-{
- m_Timer.expires_after(std::chrono::seconds(10));
-
- m_Timer.async_wait([&](const std::error_code& Ec) {
- if (!Ec)
- {
- OnTick();
- }
- else
- {
- if (m_State != kExiting)
- {
- ZEN_WARN("Mesh timer error: {}", Ec.message());
- }
- }
- });
-}
-
-void
-MeshTracker::OnTick()
-{
- using namespace std::literals;
-
- CbObjectWriter Msg;
-
- // Basic service information
-
- Msg.BeginArray("s");
- Msg << m_SessionId << m_Port << /* event sequence # */ uint32_t(0);
- Msg.EndArray();
-
- BroadcastPacket(Msg);
-
- EnqueueTick();
-}
-
-void
-MeshTracker::BroadcastPacket(CbObjectWriter& Obj)
-{
- std::error_code ErrorCode;
-
- asio::ip::udp::socket BroadcastSocket(m_IoContext);
- BroadcastSocket.open(asio::ip::udp::v4(), ErrorCode);
-
- if (!ErrorCode)
- {
- BroadcastSocket.set_option(asio::ip::udp::socket::reuse_address(true));
- BroadcastSocket.set_option(asio::socket_base::broadcast(true));
-
- asio::ip::udp::endpoint BroadcastEndpoint(asio::ip::address_v4::broadcast(), m_Port);
-
- uint8_t MessageBuffer[kMaxMessageSize];
- detail::MessageHeader* Message = reinterpret_cast<detail::MessageHeader*>(MessageBuffer);
- *Message = {};
-
- BinaryWriter MemOut;
-
- Obj.Save(MemOut);
-
- // TODO: check that it fits in a packet!
-
- Message->SetPayload(MemOut.Data(), MemOut.Size());
-
- BroadcastSocket.send_to(asio::buffer(Message, Message->TotalSize()), BroadcastEndpoint, 0, ErrorCode);
-
- if (!ErrorCode)
- {
- BroadcastSocket.close(ErrorCode);
- }
-
- if (ErrorCode)
- {
- ZEN_WARN("packet broadcast failed: {}", ErrorCode.message());
- }
- }
- else
- {
- ZEN_WARN("failed to open broadcast socket: {}", ErrorCode.message());
- }
-}
-
-void
-MeshTracker::Run()
-{
- m_State = kRunning;
-
- EnqueueTick();
-
- IssueReceive();
- m_IoContext.run();
-}
-
-void
-MeshTracker::IssueReceive()
-{
- using namespace std::literals;
-
- m_UdpSocket->async_receive_from(
- asio::buffer(m_MessageBuffer, sizeof m_MessageBuffer),
- m_SenderEndpoint,
- [this](std::error_code ec, size_t BytesReceived) {
- if (!ec && BytesReceived)
- {
- std::error_code ErrorCode;
- std::string SenderIp = m_SenderEndpoint.address().to_string(ErrorCode);
-
- // Process message
-
- uint32_t& Magic = *reinterpret_cast<uint32_t*>(m_MessageBuffer);
-
- switch (Magic)
- {
- case detail::MessageHeader::kMagic:
- {
- detail::MessageHeader& Header = *reinterpret_cast<detail::MessageHeader*>(m_MessageBuffer);
-
- if (CbObject Msg = Header.GetMessage())
- {
- const asio::ip::address& Ip = m_SenderEndpoint.address();
-
- if (auto Field = Msg["s"sv])
- {
- // Announce
-
- CbArrayView Ci = Field.AsArrayView();
- auto It = Ci.CreateViewIterator();
-
- const Oid SessionId = It->AsObjectId();
-
- if (SessionId != Oid::Zero && SessionId != m_SessionId)
- {
- // const uint16_t Port = (++It)->AsUInt16(m_SenderEndpoint.port());
- // const uint32_t Lsn = (++It)->AsUInt32();
-
- ZEN_TRACE("received hey from {} ({})", SenderIp, SessionId);
-
- RwLock::ExclusiveLockScope _(m_SessionsLock);
-
- PeerInfo& Info = m_KnownPeers[SessionId];
-
- Info.LastSeen = std::time(nullptr);
- Info.SessionId = SessionId;
-
- if (std::find(begin(Info.SeenOnIP), end(Info.SeenOnIP), Ip) == Info.SeenOnIP.end())
- {
- Info.SeenOnIP.push_back(Ip);
- }
- }
- }
- else if (auto Bye = Msg["bye"sv])
- {
- Oid SessionId = Field.AsObjectId();
-
- ZEN_DEBUG("received bye from {} ({})", SenderIp, SessionId);
-
- // We could verify that it's sent from a known IP before erasing the
- // session, if we want to be paranoid
-
- RwLock::ExclusiveLockScope _(m_SessionsLock);
-
- m_KnownPeers.erase(SessionId);
- }
- else
- {
- // Unknown message type, just ignore
- }
- }
- else
- {
- ZEN_WARN("received malformed message from {}", SenderIp);
- }
- }
- break;
-
- default:
- ZEN_WARN("received malformed data from {}", SenderIp);
- break;
- }
- }
-
- IssueReceive();
- });
-}
-
-//////////////////////////////////////////////////////////////////////////
namespace detail {
struct ZenCacheSessionState
diff --git a/zenserver/upstream/zen.h b/zenserver/upstream/zen.h
index 9a082ebc9..bfba8fa98 100644
--- a/zenserver/upstream/zen.h
+++ b/zenserver/upstream/zen.h
@@ -31,65 +31,6 @@ class CbObjectView;
class CbPackage;
class ZenStructuredCacheClient;
-/** Zen mesh tracker
- *
- * Discovers and tracks local peers
- *
- * NOTE: This is currently experimental, and not very useful yet
- *
- */
-
-class MeshTracker
-{
-public:
- MeshTracker(asio::io_context& IoContext);
- ~MeshTracker();
-
- void Start(uint16_t Port);
- void Stop();
-
-private:
- void Run();
- void IssueReceive();
- void EnqueueTick();
- void OnTick();
- void BroadcastPacket(CbObjectWriter&);
-
- enum State
- {
- kInitializing,
- kRunning,
- kExiting
- };
-
- static const int kMaxMessageSize = 2048;
- static const int kMaxUpdateSize = 1400; // We'll try not to send messages larger than this
-
- spdlog::logger& Log() { return m_Log; }
-
- spdlog::logger& m_Log;
- std::atomic<State> m_State = kInitializing;
- asio::io_context& m_IoContext;
- std::unique_ptr<asio::ip::udp::socket> m_UdpSocket;
- std::unique_ptr<asio::ip::udp::socket> m_BroadcastSocket;
- asio::ip::udp::endpoint m_SenderEndpoint;
- std::unique_ptr<std::thread> m_Thread;
- uint16_t m_Port = 0;
- uint8_t m_MessageBuffer[kMaxMessageSize];
- asio::high_resolution_timer m_Timer{m_IoContext};
- Oid m_SessionId;
-
- struct PeerInfo
- {
- Oid SessionId;
- std::time_t LastSeen;
- std::vector<asio::ip::address> SeenOnIP;
- };
-
- RwLock m_SessionsLock;
- tsl::robin_map<Oid, PeerInfo, Oid::Hasher> m_KnownPeers;
-};
-
//////////////////////////////////////////////////////////////////////////
namespace detail {