aboutsummaryrefslogtreecommitdiff
path: root/zenserver/upstream/zen.cpp
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2021-05-11 13:05:39 +0200
committerStefan Boberg <[email protected]>2021-05-11 13:05:39 +0200
commitf8d9ac5d13dd37b8b57af0478e77ba1e75c813aa (patch)
tree1daf7621e110d48acd5e12e3073ce48ef0dd11b2 /zenserver/upstream/zen.cpp
downloadzen-f8d9ac5d13dd37b8b57af0478e77ba1e75c813aa.tar.xz
zen-f8d9ac5d13dd37b8b57af0478e77ba1e75c813aa.zip
Adding zenservice code
Diffstat (limited to 'zenserver/upstream/zen.cpp')
-rw-r--r--zenserver/upstream/zen.cpp291
1 files changed, 291 insertions, 0 deletions
diff --git a/zenserver/upstream/zen.cpp b/zenserver/upstream/zen.cpp
new file mode 100644
index 000000000..7148715f2
--- /dev/null
+++ b/zenserver/upstream/zen.cpp
@@ -0,0 +1,291 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "zen.h"
+
+#include <zencore/compactbinarybuilder.h>
+#include <zencore/compactbinaryvalidation.h>
+#include <zencore/fmtutils.h>
+#include <zencore/stream.h>
+
+#include <spdlog/spdlog.h>
+#include <xxhash.h>
+#include <gsl/gsl-lite.hpp>
+
+namespace zen {
+
+namespace detail {
+ struct MessageHeader
+ {
+ static const uint32_t kMagic = 0x11'99'77'22;
+
+ uint32_t Magic = kMagic;
+ uint32_t Checksum = 0;
+ uint16_t MessageSize = 0; // Size *including* this field and the reserved field
+ uint16_t Reserved = 0;
+
+ void SetPayload(const void* PayloadData, uint64_t PayloadSize)
+ {
+ memcpy(Payload(), PayloadData, PayloadSize);
+ MessageSize = gsl::narrow<uint16_t>(PayloadSize + sizeof MessageSize + sizeof Reserved);
+ Checksum = ComputeChecksum();
+ }
+
+ inline CbObject GetMessage() const
+ {
+ if (IsOk())
+ {
+ MemoryView MessageView(Payload(), MessageSize - sizeof MessageSize - sizeof Reserved);
+
+ CbValidateError ValidationResult = ValidateCompactBinary(MessageView, CbValidateMode::All);
+
+ if (ValidationResult == CbValidateError::None)
+ {
+ return CbObject{SharedBuffer::MakeView(MessageView)};
+ }
+ }
+
+ return {};
+ }
+
+ uint32_t TotalSize() const { return MessageSize + sizeof Checksum + sizeof Magic; }
+ uint32_t ComputeChecksum() const { return gsl::narrow_cast<uint32_t>(XXH3_64bits(&MessageSize, MessageSize)); }
+ inline bool IsOk() const { return Magic == kMagic && Checksum == ComputeChecksum(); }
+
+ private:
+ inline void* Payload() { return &Reserved + 1; }
+ inline const void* Payload() const { return &Reserved + 1; }
+ };
+} // namespace detail
+
+// Note that currently this just implements an UDP echo service for testing purposes
+
+Mesh::Mesh(asio::io_context& IoContext) : m_IoContext(IoContext)
+{
+}
+
+Mesh::~Mesh()
+{
+ Stop();
+}
+
+void
+Mesh::Start(uint16_t Port)
+{
+ ZEN_ASSERT(Port);
+ ZEN_ASSERT(m_Port == 0);
+
+ m_Port = Port;
+ m_UdpSocket = std::make_unique<asio::ip::udp::socket>(m_IoContext, asio::ip::udp::endpoint(asio::ip::udp::v4(), m_Port));
+ m_Thread = std::make_unique<std::thread>([this] { Run(); });
+};
+
+void
+Mesh::Stop()
+{
+ using namespace std::literals;
+
+ if (!m_Port)
+ {
+ // Never started, nothing to do here
+ return;
+ }
+
+ CbObjectWriter Msg;
+ Msg << "bye"sv << m_SessionId;
+ BroadcastPacket(Msg);
+
+ m_State = kExiting;
+
+ std::error_code Ec;
+ m_Timer.cancel(Ec);
+
+ m_UdpSocket->close(Ec);
+
+ m_IoContext.stop();
+
+ if (m_Thread)
+ {
+ m_Thread->join();
+ m_Thread.reset();
+ }
+}
+
+void
+Mesh::EnqueueTick()
+{
+ m_Timer.expires_after(std::chrono::seconds(10));
+
+ m_Timer.async_wait([&](const std::error_code& Ec) {
+ if (!Ec)
+ {
+ OnTick();
+ }
+ else
+ {
+ if (m_State != kExiting)
+ {
+ spdlog::warn("Mesh timer error: {}", Ec.message());
+ }
+ }
+ });
+}
+
+void
+Mesh::OnTick()
+{
+ using namespace std::literals;
+
+ CbObjectWriter Msg;
+
+ // Basic service information
+
+ Msg.BeginArray("s");
+ Msg << m_SessionId << m_Port << /* event sequence # */ uint32_t(0);
+ Msg.EndArray();
+
+ BroadcastPacket(Msg);
+
+ EnqueueTick();
+}
+
+void
+Mesh::BroadcastPacket(CbObjectWriter& Obj)
+{
+ std::error_code ErrorCode;
+
+ asio::ip::udp::socket BroadcastSocket(m_IoContext);
+ BroadcastSocket.open(asio::ip::udp::v4(), ErrorCode);
+
+ if (!ErrorCode)
+ {
+ BroadcastSocket.set_option(asio::ip::udp::socket::reuse_address(true));
+ BroadcastSocket.set_option(asio::socket_base::broadcast(true));
+
+ asio::ip::udp::endpoint BroadcastEndpoint(asio::ip::address_v4::broadcast(), m_Port);
+
+ uint8_t MessageBuffer[kMaxMessageSize];
+ detail::MessageHeader* Message = reinterpret_cast<detail::MessageHeader*>(MessageBuffer);
+ *Message = {};
+
+ MemoryOutStream MemOut;
+ BinaryWriter Writer(MemOut);
+
+ Obj.Save(Writer);
+
+ // TODO: check that it fits in a packet!
+
+ Message->SetPayload(MemOut.Data(), MemOut.Size());
+
+ BroadcastSocket.send_to(asio::buffer(Message, Message->TotalSize()), BroadcastEndpoint);
+ BroadcastSocket.close();
+ }
+ else
+ {
+ spdlog::warn("failed to open broadcast socket: {}", ErrorCode.message());
+ }
+}
+
+void
+Mesh::Run()
+{
+ m_State = kRunning;
+
+ EnqueueTick();
+
+ IssueReceive();
+ m_IoContext.run();
+}
+
+void
+Mesh::IssueReceive()
+{
+ using namespace std::literals;
+
+ m_UdpSocket->async_receive_from(
+ asio::buffer(m_MessageBuffer, sizeof m_MessageBuffer),
+ m_SenderEndpoint,
+ [this](std::error_code ec, size_t BytesReceived) {
+ if (!ec && BytesReceived)
+ {
+ std::error_code ErrorCode;
+ std::string Sender = m_SenderEndpoint.address().to_string(ErrorCode);
+
+ // Process message
+
+ uint32_t& Magic = *reinterpret_cast<uint32_t*>(m_MessageBuffer);
+
+ switch (Magic)
+ {
+ case detail::MessageHeader::kMagic:
+ {
+ detail::MessageHeader& Header = *reinterpret_cast<detail::MessageHeader*>(m_MessageBuffer);
+
+ if (CbObject Msg = Header.GetMessage())
+ {
+ const asio::ip::address& Ip = m_SenderEndpoint.address();
+
+ if (auto Field = Msg["s"sv])
+ {
+ // Announce
+
+ CbArrayView Ci = Field.AsArrayView();
+ auto It = Ci.CreateViewIterator();
+
+ const Oid SessionId = It->AsObjectId();
+
+ if (SessionId != Oid::Zero && SessionId != m_SessionId)
+ {
+ const uint16_t Port = (++It)->AsUInt16(m_SenderEndpoint.port());
+ const uint32_t Lsn = (++It)->AsUInt32();
+
+ spdlog::info("received hey from {} ({})", Sender, SessionId);
+
+ RwLock::ExclusiveLockScope _(m_SessionsLock);
+
+ PeerInfo& Info = m_KnownPeers[SessionId];
+
+ Info.LastSeen = std::time(nullptr);
+ Info.SessionId = SessionId;
+
+ if (std::find(begin(Info.SeenOnIP), end(Info.SeenOnIP), Ip) == Info.SeenOnIP.end())
+ {
+ Info.SeenOnIP.push_back(Ip);
+ }
+ }
+ }
+ else if (auto Bye = Msg["bye"sv])
+ {
+ Oid SessionId = Field.AsObjectId();
+
+ spdlog::info("received bye from {} ({})", Sender, SessionId);
+
+ // We could verify that it's sent from a known IP before erasing the
+ // session, if we want to be paranoid
+
+ RwLock::ExclusiveLockScope _(m_SessionsLock);
+
+ m_KnownPeers.erase(SessionId);
+ }
+ else
+ {
+ // Unknown message type, just ignore
+ }
+ }
+ else
+ {
+ spdlog::warn("received malformed message from {}", Sender);
+ }
+ }
+ break;
+
+ default:
+ spdlog::warn("received malformed data from {}", Sender);
+ break;
+ }
+
+ IssueReceive();
+ }
+ });
+}
+
+} // namespace zen