aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2023-01-26 09:56:08 +0100
committerStefan Boberg <[email protected]>2023-01-26 09:56:52 +0100
commitc3b906dec4589d0b06f4387dce26dd79a988c269 (patch)
tree96e3947b7f76e3b255b21c2f70831ce121191492
parentfix gc logging (#213) (diff)
downloadzen-c3b906dec4589d0b06f4387dce26dd79a988c269.tar.xz
zen-c3b906dec4589d0b06f4387dce26dd79a988c269.zip
removed experimental mesh code
should be replaced with a proper implementation later
-rw-r--r--xmake.lua8
-rw-r--r--zenserver-test/zenserver-test.cpp31
-rw-r--r--zenserver/config.cpp12
-rw-r--r--zenserver/config.h7
-rw-r--r--zenserver/upstream/zen.cpp284
-rw-r--r--zenserver/upstream/zen.h59
-rw-r--r--zenserver/zenserver.cpp20
7 files changed, 0 insertions, 421 deletions
diff --git a/xmake.lua b/xmake.lua
index 4469a3acc..30dedf0c3 100644
--- a/xmake.lua
+++ b/xmake.lua
@@ -142,14 +142,6 @@ option("exec")
option_end()
add_define_by_config("ZEN_WITH_EXEC_SERVICES", "exec")
-option("zenmesh")
- -- note: this is an old prototype and is not functional at all at the moment
- set_default(false)
- set_showmenu(true)
- set_description("Enables Zen's mesh feature")
-option_end()
-add_define_by_config("ZEN_ENABLE_MESH", "zenmesh")
-
option("zentrace")
set_default(true)
set_showmenu(true)
diff --git a/zenserver-test/zenserver-test.cpp b/zenserver-test/zenserver-test.cpp
index ee3c5b742..c08ad8e0a 100644
--- a/zenserver-test/zenserver-test.cpp
+++ b/zenserver-test/zenserver-test.cpp
@@ -2492,37 +2492,6 @@ TEST_CASE(".exec.basic")
}
# endif // ZEN_WITH_EXEC_SERVICES
-TEST_CASE("mesh.basic")
-{
-// --mesh option only available with ZEN_ENABLE_MESH
-# if ZEN_ENABLE_MESH
- using namespace std::literals;
-
- const int kInstanceCount = 4;
-
- ZEN_INFO("spawning {} instances", kInstanceCount);
-
- std::unique_ptr<ZenServerInstance> Instances[kInstanceCount];
-
- for (int i = 0; i < kInstanceCount; ++i)
- {
- auto& Instance = Instances[i];
-
- Instance = std::make_unique<ZenServerInstance>(TestEnv);
- Instance->SetTestDir(TestEnv.CreateNewTestDir());
- Instance->EnableMesh();
- Instance->SpawnServer(13337 + i);
- }
-
- for (int i = 0; i < kInstanceCount; ++i)
- {
- auto& Instance = Instances[i];
-
- Instance->WaitUntilReady();
- }
-# endif
-}
-
class ZenServerTestHelper
{
public:
diff --git a/zenserver/config.cpp b/zenserver/config.cpp
index d7233a6f4..fa4ee7f5a 100644
--- a/zenserver/config.cpp
+++ b/zenserver/config.cpp
@@ -222,15 +222,6 @@ ParseCliOptions(int argc, char* argv[], ZenServerOptions& ServerOptions)
cxxopts::value<int>(ServerOptions.WebSocketThreads)->default_value("0"),
"");
-#if ZEN_ENABLE_MESH
- options.add_option("network",
- "m",
- "mesh",
- "Enable mesh network",
- cxxopts::value<bool>(ServerOptions.MeshEnabled)->default_value("false"),
- "");
-#endif
-
#if ZEN_WITH_TRACE
options.add_option("ue-trace",
"",
@@ -670,9 +661,6 @@ ParseConfigFile(const std::filesystem::path& Path, ZenServerOptions& ServerOptio
}
ServerOptions.BasePort = NetworkConfig->get_or<int>("port", ServerOptions.BasePort);
-#if ZEN_ENABLE_MESH
- ServerOptions.MeshEnabled = NetworkConfig->get_or<bool>("meshenabled", ServerOptions.MeshEnabled);
-#endif
}
auto UpdateStringValueFromConfig = [](const sol::table& Table, std::string_view Key, std::string& OutValue) {
diff --git a/zenserver/config.h b/zenserver/config.h
index 4cdef0318..096a800b4 100644
--- a/zenserver/config.h
+++ b/zenserver/config.h
@@ -7,10 +7,6 @@
#include <string>
#include <vector>
-#ifndef ZEN_ENABLE_MESH
-# define ZEN_ENABLE_MESH 0
-#endif
-
struct ZenUpstreamJupiterConfig
{
std::string Name;
@@ -144,9 +140,6 @@ struct ZenServerOptions
std::string TraceHost; // Host name or IP address to send trace data to
std::string TraceFile; // Path of a file to write a trace
#endif
-#if ZEN_ENABLE_MESH
- bool MeshEnabled = false; // Experimental p2p mesh discovery
-#endif
};
void ParseCliOptions(int argc, char* argv[], ZenServerOptions& ServerOptions);
diff --git a/zenserver/upstream/zen.cpp b/zenserver/upstream/zen.cpp
index 980958740..9e1212834 100644
--- a/zenserver/upstream/zen.cpp
+++ b/zenserver/upstream/zen.cpp
@@ -23,290 +23,6 @@ ZEN_THIRD_PARTY_INCLUDES_END
#include <gsl/gsl-lite.hpp>
namespace zen {
-namespace detail {
- struct MessageHeader
- {
- static const uint32_t kMagic = 0x11'99'77'22;
-
- uint32_t Magic = kMagic;
- uint32_t Checksum = 0;
- uint16_t MessageSize = 0; // Size *including* this field and the reserved field
- uint16_t Reserved = 0;
-
- void SetPayload(const void* PayloadData, uint64_t PayloadSize)
- {
- memcpy(Payload(), PayloadData, PayloadSize);
- MessageSize = gsl::narrow<uint16_t>(PayloadSize + sizeof MessageSize + sizeof Reserved);
- Checksum = ComputeChecksum();
- }
-
- inline CbObject GetMessage() const
- {
- if (IsOk())
- {
- MemoryView MessageView(Payload(), MessageSize - sizeof MessageSize - sizeof Reserved);
-
- CbValidateError ValidationResult = ValidateCompactBinary(MessageView, CbValidateMode::All);
-
- if (ValidationResult == CbValidateError::None)
- {
- return CbObject{SharedBuffer::MakeView(MessageView)};
- }
- }
-
- return {};
- }
-
- uint32_t TotalSize() const { return MessageSize + sizeof Checksum + sizeof Magic; }
- uint32_t ComputeChecksum() const { return gsl::narrow_cast<uint32_t>(XXH3_64bits(&MessageSize, MessageSize)); }
- inline bool IsOk() const { return Magic == kMagic && Checksum == ComputeChecksum(); }
-
- private:
- inline void* Payload() { return &Reserved + 1; }
- inline const void* Payload() const { return &Reserved + 1; }
- };
-} // namespace detail
-
-// Note that currently this just implements an UDP echo service for testing purposes
-
-MeshTracker::MeshTracker(asio::io_context& IoContext) : m_Log(logging::Get("mesh")), m_IoContext(IoContext), m_SessionId(GetSessionId())
-{
-}
-
-MeshTracker::~MeshTracker()
-{
- Stop();
-}
-
-void
-MeshTracker::Start(uint16_t Port)
-{
- ZEN_ASSERT(Port);
- ZEN_ASSERT(m_Port == 0);
-
- m_Port = Port;
- m_UdpSocket = std::make_unique<asio::ip::udp::socket>(m_IoContext, asio::ip::udp::endpoint(asio::ip::udp::v4(), m_Port));
- m_Thread = std::make_unique<std::thread>([this] { Run(); });
-};
-
-void
-MeshTracker::Stop()
-{
- using namespace std::literals;
-
- if (!m_Port)
- {
- // Never started, nothing to do here
- return;
- }
-
- CbObjectWriter Msg;
- Msg << "bye"sv << m_SessionId;
- BroadcastPacket(Msg);
-
- m_State = kExiting;
-
- std::error_code Ec;
- m_Timer.cancel(Ec);
-
- m_UdpSocket->close(Ec);
-
- m_IoContext.stop();
-
- if (m_Thread)
- {
- m_Thread->join();
- m_Thread.reset();
- }
-}
-
-void
-MeshTracker::EnqueueTick()
-{
- m_Timer.expires_after(std::chrono::seconds(10));
-
- m_Timer.async_wait([&](const std::error_code& Ec) {
- if (!Ec)
- {
- OnTick();
- }
- else
- {
- if (m_State != kExiting)
- {
- ZEN_WARN("Mesh timer error: {}", Ec.message());
- }
- }
- });
-}
-
-void
-MeshTracker::OnTick()
-{
- using namespace std::literals;
-
- CbObjectWriter Msg;
-
- // Basic service information
-
- Msg.BeginArray("s");
- Msg << m_SessionId << m_Port << /* event sequence # */ uint32_t(0);
- Msg.EndArray();
-
- BroadcastPacket(Msg);
-
- EnqueueTick();
-}
-
-void
-MeshTracker::BroadcastPacket(CbObjectWriter& Obj)
-{
- std::error_code ErrorCode;
-
- asio::ip::udp::socket BroadcastSocket(m_IoContext);
- BroadcastSocket.open(asio::ip::udp::v4(), ErrorCode);
-
- if (!ErrorCode)
- {
- BroadcastSocket.set_option(asio::ip::udp::socket::reuse_address(true));
- BroadcastSocket.set_option(asio::socket_base::broadcast(true));
-
- asio::ip::udp::endpoint BroadcastEndpoint(asio::ip::address_v4::broadcast(), m_Port);
-
- uint8_t MessageBuffer[kMaxMessageSize];
- detail::MessageHeader* Message = reinterpret_cast<detail::MessageHeader*>(MessageBuffer);
- *Message = {};
-
- BinaryWriter MemOut;
-
- Obj.Save(MemOut);
-
- // TODO: check that it fits in a packet!
-
- Message->SetPayload(MemOut.Data(), MemOut.Size());
-
- BroadcastSocket.send_to(asio::buffer(Message, Message->TotalSize()), BroadcastEndpoint, 0, ErrorCode);
-
- if (!ErrorCode)
- {
- BroadcastSocket.close(ErrorCode);
- }
-
- if (ErrorCode)
- {
- ZEN_WARN("packet broadcast failed: {}", ErrorCode.message());
- }
- }
- else
- {
- ZEN_WARN("failed to open broadcast socket: {}", ErrorCode.message());
- }
-}
-
-void
-MeshTracker::Run()
-{
- m_State = kRunning;
-
- EnqueueTick();
-
- IssueReceive();
- m_IoContext.run();
-}
-
-void
-MeshTracker::IssueReceive()
-{
- using namespace std::literals;
-
- m_UdpSocket->async_receive_from(
- asio::buffer(m_MessageBuffer, sizeof m_MessageBuffer),
- m_SenderEndpoint,
- [this](std::error_code ec, size_t BytesReceived) {
- if (!ec && BytesReceived)
- {
- std::error_code ErrorCode;
- std::string SenderIp = m_SenderEndpoint.address().to_string(ErrorCode);
-
- // Process message
-
- uint32_t& Magic = *reinterpret_cast<uint32_t*>(m_MessageBuffer);
-
- switch (Magic)
- {
- case detail::MessageHeader::kMagic:
- {
- detail::MessageHeader& Header = *reinterpret_cast<detail::MessageHeader*>(m_MessageBuffer);
-
- if (CbObject Msg = Header.GetMessage())
- {
- const asio::ip::address& Ip = m_SenderEndpoint.address();
-
- if (auto Field = Msg["s"sv])
- {
- // Announce
-
- CbArrayView Ci = Field.AsArrayView();
- auto It = Ci.CreateViewIterator();
-
- const Oid SessionId = It->AsObjectId();
-
- if (SessionId != Oid::Zero && SessionId != m_SessionId)
- {
- // const uint16_t Port = (++It)->AsUInt16(m_SenderEndpoint.port());
- // const uint32_t Lsn = (++It)->AsUInt32();
-
- ZEN_TRACE("received hey from {} ({})", SenderIp, SessionId);
-
- RwLock::ExclusiveLockScope _(m_SessionsLock);
-
- PeerInfo& Info = m_KnownPeers[SessionId];
-
- Info.LastSeen = std::time(nullptr);
- Info.SessionId = SessionId;
-
- if (std::find(begin(Info.SeenOnIP), end(Info.SeenOnIP), Ip) == Info.SeenOnIP.end())
- {
- Info.SeenOnIP.push_back(Ip);
- }
- }
- }
- else if (auto Bye = Msg["bye"sv])
- {
- Oid SessionId = Field.AsObjectId();
-
- ZEN_DEBUG("received bye from {} ({})", SenderIp, SessionId);
-
- // We could verify that it's sent from a known IP before erasing the
- // session, if we want to be paranoid
-
- RwLock::ExclusiveLockScope _(m_SessionsLock);
-
- m_KnownPeers.erase(SessionId);
- }
- else
- {
- // Unknown message type, just ignore
- }
- }
- else
- {
- ZEN_WARN("received malformed message from {}", SenderIp);
- }
- }
- break;
-
- default:
- ZEN_WARN("received malformed data from {}", SenderIp);
- break;
- }
- }
-
- IssueReceive();
- });
-}
-
-//////////////////////////////////////////////////////////////////////////
namespace detail {
struct ZenCacheSessionState
diff --git a/zenserver/upstream/zen.h b/zenserver/upstream/zen.h
index 9a082ebc9..bfba8fa98 100644
--- a/zenserver/upstream/zen.h
+++ b/zenserver/upstream/zen.h
@@ -31,65 +31,6 @@ class CbObjectView;
class CbPackage;
class ZenStructuredCacheClient;
-/** Zen mesh tracker
- *
- * Discovers and tracks local peers
- *
- * NOTE: This is currently experimental, and not very useful yet
- *
- */
-
-class MeshTracker
-{
-public:
- MeshTracker(asio::io_context& IoContext);
- ~MeshTracker();
-
- void Start(uint16_t Port);
- void Stop();
-
-private:
- void Run();
- void IssueReceive();
- void EnqueueTick();
- void OnTick();
- void BroadcastPacket(CbObjectWriter&);
-
- enum State
- {
- kInitializing,
- kRunning,
- kExiting
- };
-
- static const int kMaxMessageSize = 2048;
- static const int kMaxUpdateSize = 1400; // We'll try not to send messages larger than this
-
- spdlog::logger& Log() { return m_Log; }
-
- spdlog::logger& m_Log;
- std::atomic<State> m_State = kInitializing;
- asio::io_context& m_IoContext;
- std::unique_ptr<asio::ip::udp::socket> m_UdpSocket;
- std::unique_ptr<asio::ip::udp::socket> m_BroadcastSocket;
- asio::ip::udp::endpoint m_SenderEndpoint;
- std::unique_ptr<std::thread> m_Thread;
- uint16_t m_Port = 0;
- uint8_t m_MessageBuffer[kMaxMessageSize];
- asio::high_resolution_timer m_Timer{m_IoContext};
- Oid m_SessionId;
-
- struct PeerInfo
- {
- Oid SessionId;
- std::time_t LastSeen;
- std::vector<asio::ip::address> SeenOnIP;
- };
-
- RwLock m_SessionsLock;
- tsl::robin_map<Oid, PeerInfo, Oid::Hasher> m_KnownPeers;
-};
-
//////////////////////////////////////////////////////////////////////////
namespace detail {
diff --git a/zenserver/zenserver.cpp b/zenserver/zenserver.cpp
index cf3d90324..c1e82f404 100644
--- a/zenserver/zenserver.cpp
+++ b/zenserver/zenserver.cpp
@@ -303,17 +303,6 @@ public:
ZEN_INFO("NOT instantiating structured cache service");
}
-#if ZEN_ENABLE_MESH
- if (ServerOptions.MeshEnabled)
- {
- StartMesh(EffectiveBasePort);
- }
- else
- {
- ZEN_INFO("NOT starting mesh");
- }
-#endif
-
m_Http->RegisterService(m_TestService); // NOTE: this is intentionally not limited to test mode as it's useful for diagnostics
m_Http->RegisterService(m_TestingService);
m_Http->RegisterService(m_AdminService);
@@ -374,14 +363,6 @@ public:
void InitializeStructuredCache(const ZenServerOptions& ServerOptions);
void InitializeCompute(const ZenServerOptions& ServerOptions);
-#if ZEN_ENABLE_MESH
- void StartMesh(int BasePort)
- {
- ZEN_INFO("initializing mesh discovery");
- m_ZenMesh.Start(uint16_t(BasePort));
- }
-#endif
-
void Run()
{
// This is disabled for now, awaiting better scheduling
@@ -611,7 +592,6 @@ private:
std::unique_ptr<zen::HttpStructuredCacheService> m_StructuredCacheService;
zen::HttpAdminService m_AdminService{m_GcScheduler};
zen::HttpHealthService m_HealthService;
- zen::MeshTracker m_ZenMesh{m_IoContext};
#if ZEN_WITH_EXEC_SERVICES
std::unique_ptr<zen::HttpLaunchService> m_HttpLaunchService;
#endif // ZEN_WITH_EXEC_SERVICES