// Copyright Epic Games, Inc. All Rights Reserved. #include "zen.h" #include #include #include #include #include #include #include #include #include "cache/structuredcachestore.h" #include "diag/formatters.h" #include "diag/logging.h" ZEN_THIRD_PARTY_INCLUDES_START #include ZEN_THIRD_PARTY_INCLUDES_END #include #include namespace zen { namespace detail { struct MessageHeader { static const uint32_t kMagic = 0x11'99'77'22; uint32_t Magic = kMagic; uint32_t Checksum = 0; uint16_t MessageSize = 0; // Size *including* this field and the reserved field uint16_t Reserved = 0; void SetPayload(const void* PayloadData, uint64_t PayloadSize) { memcpy(Payload(), PayloadData, PayloadSize); MessageSize = gsl::narrow(PayloadSize + sizeof MessageSize + sizeof Reserved); Checksum = ComputeChecksum(); } inline CbObject GetMessage() const { if (IsOk()) { MemoryView MessageView(Payload(), MessageSize - sizeof MessageSize - sizeof Reserved); CbValidateError ValidationResult = ValidateCompactBinary(MessageView, CbValidateMode::All); if (ValidationResult == CbValidateError::None) { return CbObject{SharedBuffer::MakeView(MessageView)}; } } return {}; } uint32_t TotalSize() const { return MessageSize + sizeof Checksum + sizeof Magic; } uint32_t ComputeChecksum() const { return gsl::narrow_cast(XXH3_64bits(&MessageSize, MessageSize)); } inline bool IsOk() const { return Magic == kMagic && Checksum == ComputeChecksum(); } private: inline void* Payload() { return &Reserved + 1; } inline const void* Payload() const { return &Reserved + 1; } }; } // namespace detail // Note that currently this just implements an UDP echo service for testing purposes MeshTracker::MeshTracker(asio::io_context& IoContext) : m_Log(logging::Get("mesh")), m_IoContext(IoContext), m_SessionId(GetSessionId()) { } MeshTracker::~MeshTracker() { Stop(); } void MeshTracker::Start(uint16_t Port) { ZEN_ASSERT(Port); ZEN_ASSERT(m_Port == 0); m_Port = Port; m_UdpSocket = std::make_unique(m_IoContext, asio::ip::udp::endpoint(asio::ip::udp::v4(), m_Port)); m_Thread = std::make_unique([this] { Run(); }); }; void MeshTracker::Stop() { using namespace std::literals; if (!m_Port) { // Never started, nothing to do here return; } CbObjectWriter Msg; Msg << "bye"sv << m_SessionId; BroadcastPacket(Msg); m_State = kExiting; std::error_code Ec; m_Timer.cancel(Ec); m_UdpSocket->close(Ec); m_IoContext.stop(); if (m_Thread) { m_Thread->join(); m_Thread.reset(); } } void MeshTracker::EnqueueTick() { m_Timer.expires_after(std::chrono::seconds(10)); m_Timer.async_wait([&](const std::error_code& Ec) { if (!Ec) { OnTick(); } else { if (m_State != kExiting) { ZEN_WARN("Mesh timer error: {}", Ec.message()); } } }); } void MeshTracker::OnTick() { using namespace std::literals; CbObjectWriter Msg; // Basic service information Msg.BeginArray("s"); Msg << m_SessionId << m_Port << /* event sequence # */ uint32_t(0); Msg.EndArray(); BroadcastPacket(Msg); EnqueueTick(); } void MeshTracker::BroadcastPacket(CbObjectWriter& Obj) { std::error_code ErrorCode; asio::ip::udp::socket BroadcastSocket(m_IoContext); BroadcastSocket.open(asio::ip::udp::v4(), ErrorCode); if (!ErrorCode) { BroadcastSocket.set_option(asio::ip::udp::socket::reuse_address(true)); BroadcastSocket.set_option(asio::socket_base::broadcast(true)); asio::ip::udp::endpoint BroadcastEndpoint(asio::ip::address_v4::broadcast(), m_Port); uint8_t MessageBuffer[kMaxMessageSize]; detail::MessageHeader* Message = reinterpret_cast(MessageBuffer); *Message = {}; BinaryWriter MemOut; Obj.Save(MemOut); // TODO: check that it fits in a packet! Message->SetPayload(MemOut.Data(), MemOut.Size()); BroadcastSocket.send_to(asio::buffer(Message, Message->TotalSize()), BroadcastEndpoint, 0, ErrorCode); if (!ErrorCode) { BroadcastSocket.close(ErrorCode); } if (ErrorCode) { ZEN_WARN("packet broadcast failed: {}", ErrorCode.message()); } } else { ZEN_WARN("failed to open broadcast socket: {}", ErrorCode.message()); } } void MeshTracker::Run() { m_State = kRunning; EnqueueTick(); IssueReceive(); m_IoContext.run(); } void MeshTracker::IssueReceive() { using namespace std::literals; m_UdpSocket->async_receive_from( asio::buffer(m_MessageBuffer, sizeof m_MessageBuffer), m_SenderEndpoint, [this](std::error_code ec, size_t BytesReceived) { if (!ec && BytesReceived) { std::error_code ErrorCode; std::string SenderIp = m_SenderEndpoint.address().to_string(ErrorCode); // Process message uint32_t& Magic = *reinterpret_cast(m_MessageBuffer); switch (Magic) { case detail::MessageHeader::kMagic: { detail::MessageHeader& Header = *reinterpret_cast(m_MessageBuffer); if (CbObject Msg = Header.GetMessage()) { const asio::ip::address& Ip = m_SenderEndpoint.address(); if (auto Field = Msg["s"sv]) { // Announce CbArrayView Ci = Field.AsArrayView(); auto It = Ci.CreateViewIterator(); const Oid SessionId = It->AsObjectId(); if (SessionId != Oid::Zero && SessionId != m_SessionId) { // const uint16_t Port = (++It)->AsUInt16(m_SenderEndpoint.port()); // const uint32_t Lsn = (++It)->AsUInt32(); ZEN_TRACE("received hey from {} ({})", SenderIp, SessionId); RwLock::ExclusiveLockScope _(m_SessionsLock); PeerInfo& Info = m_KnownPeers[SessionId]; Info.LastSeen = std::time(nullptr); Info.SessionId = SessionId; if (std::find(begin(Info.SeenOnIP), end(Info.SeenOnIP), Ip) == Info.SeenOnIP.end()) { Info.SeenOnIP.push_back(Ip); } } } else if (auto Bye = Msg["bye"sv]) { Oid SessionId = Field.AsObjectId(); ZEN_DEBUG("received bye from {} ({})", SenderIp, SessionId); // We could verify that it's sent from a known IP before erasing the // session, if we want to be paranoid RwLock::ExclusiveLockScope _(m_SessionsLock); m_KnownPeers.erase(SessionId); } else { // Unknown message type, just ignore } } else { ZEN_WARN("received malformed message from {}", SenderIp); } } break; default: ZEN_WARN("received malformed data from {}", SenderIp); break; } } IssueReceive(); }); } ////////////////////////////////////////////////////////////////////////// namespace detail { struct ZenCacheSessionState { ZenCacheSessionState(ZenStructuredCacheClient& Client) : OwnerClient(Client) {} ~ZenCacheSessionState() {} void Reset(std::chrono::milliseconds ConnectTimeout, std::chrono::milliseconds Timeout) { Session.SetBody({}); Session.SetHeader({}); Session.SetConnectTimeout(ConnectTimeout); Session.SetTimeout(Timeout); } cpr::Session& GetSession() { return Session; } private: ZenStructuredCacheClient& OwnerClient; cpr::Session Session; }; } // namespace detail ////////////////////////////////////////////////////////////////////////// ZenStructuredCacheClient::ZenStructuredCacheClient(const ZenStructuredCacheClientOptions& Options) : m_Log(logging::Get(std::string_view("zenclient"))) , m_ServiceUrl(Options.Url) , m_ConnectTimeout(Options.ConnectTimeout) , m_Timeout(Options.Timeout) { } ZenStructuredCacheClient::~ZenStructuredCacheClient() { } detail::ZenCacheSessionState* ZenStructuredCacheClient::AllocSessionState() { detail::ZenCacheSessionState* State = nullptr; if (RwLock::ExclusiveLockScope _(m_SessionStateLock); !m_SessionStateCache.empty()) { State = m_SessionStateCache.front(); m_SessionStateCache.pop_front(); } if (State == nullptr) { State = new detail::ZenCacheSessionState(*this); } State->Reset(m_ConnectTimeout, m_Timeout); return State; } void ZenStructuredCacheClient::FreeSessionState(detail::ZenCacheSessionState* State) { RwLock::ExclusiveLockScope _(m_SessionStateLock); m_SessionStateCache.push_front(State); } ////////////////////////////////////////////////////////////////////////// using namespace std::literals; ZenStructuredCacheSession::ZenStructuredCacheSession(Ref&& OuterClient) : m_Log(OuterClient->Log()) , m_Client(std::move(OuterClient)) { m_SessionState = m_Client->AllocSessionState(); } ZenStructuredCacheSession::~ZenStructuredCacheSession() { m_Client->FreeSessionState(m_SessionState); } ZenCacheResult ZenStructuredCacheSession::CheckHealth() { ExtendableStringBuilder<256> Uri; Uri << m_Client->ServiceUrl() << "/health/check"; cpr::Session& Session = m_SessionState->GetSession(); Session.SetOption(cpr::Url{Uri.c_str()}); cpr::Response Response = Session.Get(); if (Response.error) { return {.ErrorCode = static_cast(Response.error.code), .Reason = std::move(Response.error.message)}; } return {.Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200}; } ZenCacheResult ZenStructuredCacheSession::GetCacheRecord(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, ZenContentType Type) { ExtendableStringBuilder<256> Uri; Uri << m_Client->ServiceUrl() << "/z$/"; if (Namespace != ZenCacheStore::DefaultNamespace) { Uri << Namespace << "/"; } Uri << BucketId << "/" << Key.ToHexString(); cpr::Session& Session = m_SessionState->GetSession(); Session.SetOption(cpr::Url{Uri.c_str()}); Session.SetHeader(cpr::Header{{"Accept", std::string{MapContentTypeToString(Type)}}}); cpr::Response Response = Session.Get(); ZEN_DEBUG("GET {}", Response); if (Response.error) { return {.ErrorCode = static_cast(Response.error.code), .Reason = std::move(Response.error.message)}; } const bool Success = Response.status_code == 200; const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); return {.Response = Buffer, .Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success}; } ZenCacheResult ZenStructuredCacheSession::GetCacheChunk(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, const IoHash& ValueContentId) { ExtendableStringBuilder<256> Uri; Uri << m_Client->ServiceUrl() << "/z$/"; if (Namespace != ZenCacheStore::DefaultNamespace) { Uri << Namespace << "/"; } Uri << BucketId << "/" << Key.ToHexString() << "/" << ValueContentId.ToHexString(); cpr::Session& Session = m_SessionState->GetSession(); Session.SetOption(cpr::Url{Uri.c_str()}); Session.SetHeader(cpr::Header{{"Accept", "application/x-ue-comp"}}); cpr::Response Response = Session.Get(); ZEN_DEBUG("GET {}", Response); if (Response.error) { return {.ErrorCode = static_cast(Response.error.code), .Reason = std::move(Response.error.message)}; } const bool Success = Response.status_code == 200; const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); return {.Response = Buffer, .Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Reason = Response.reason, .Success = Success}; } ZenCacheResult ZenStructuredCacheSession::PutCacheRecord(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, IoBuffer Value, ZenContentType Type) { ExtendableStringBuilder<256> Uri; Uri << m_Client->ServiceUrl() << "/z$/"; if (Namespace != ZenCacheStore::DefaultNamespace) { Uri << Namespace << "/"; } Uri << BucketId << "/" << Key.ToHexString(); cpr::Session& Session = m_SessionState->GetSession(); Session.SetOption(cpr::Url{Uri.c_str()}); Session.SetHeader(cpr::Header{{"Content-Type", Type == ZenContentType::kCbPackage ? "application/x-ue-cbpkg" : Type == ZenContentType::kCbObject ? "application/x-ue-cb" : "application/octet-stream"}}); Session.SetBody(cpr::Body{static_cast(Value.Data()), Value.Size()}); cpr::Response Response = Session.Put(); ZEN_DEBUG("PUT {}", Response); if (Response.error) { return {.ErrorCode = static_cast(Response.error.code), .Reason = std::move(Response.error.message)}; } const bool Success = Response.status_code == 200 || Response.status_code == 201; return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Reason = Response.reason, .Success = Success}; } ZenCacheResult ZenStructuredCacheSession::PutCacheValue(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, const IoHash& ValueContentId, IoBuffer Payload) { ExtendableStringBuilder<256> Uri; Uri << m_Client->ServiceUrl() << "/z$/"; if (Namespace != ZenCacheStore::DefaultNamespace) { Uri << Namespace << "/"; } Uri << BucketId << "/" << Key.ToHexString() << "/" << ValueContentId.ToHexString(); cpr::Session& Session = m_SessionState->GetSession(); Session.SetOption(cpr::Url{Uri.c_str()}); Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-comp"}}); Session.SetBody(cpr::Body{static_cast(Payload.Data()), Payload.Size()}); cpr::Response Response = Session.Put(); ZEN_DEBUG("PUT {}", Response); if (Response.error) { return {.ErrorCode = static_cast(Response.error.code), .Reason = std::move(Response.error.message)}; } const bool Success = Response.status_code == 200 || Response.status_code == 201; return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Reason = Response.reason, .Success = Success}; } ZenCacheResult ZenStructuredCacheSession::InvokeRpc(const CbObjectView& Request) { ExtendableStringBuilder<256> Uri; Uri << m_Client->ServiceUrl() << "/z$/$rpc"; BinaryWriter Body; Request.CopyTo(Body); cpr::Session& Session = m_SessionState->GetSession(); Session.SetOption(cpr::Url{Uri.c_str()}); Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}, {"Accept", "application/x-ue-cbpkg"}}); Session.SetBody(cpr::Body{reinterpret_cast(Body.GetData()), Body.GetSize()}); cpr::Response Response = Session.Post(); ZEN_DEBUG("POST {}", Response); if (Response.error) { return {.ErrorCode = static_cast(Response.error.code), .Reason = std::move(Response.error.message)}; } const bool Success = Response.status_code == 200; const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); return {.Response = std::move(Buffer), .Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Reason = Response.reason, .Success = Success}; } ZenCacheResult ZenStructuredCacheSession::InvokeRpc(const CbPackage& Request) { ExtendableStringBuilder<256> Uri; Uri << m_Client->ServiceUrl() << "/z$/$rpc"; SharedBuffer Message = FormatPackageMessageBuffer(Request).Flatten(); cpr::Session& Session = m_SessionState->GetSession(); Session.SetOption(cpr::Url{Uri.c_str()}); Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}}); Session.SetBody(cpr::Body{reinterpret_cast(Message.GetData()), Message.GetSize()}); cpr::Response Response = Session.Post(); ZEN_DEBUG("POST {}", Response); if (Response.error) { return {.ErrorCode = static_cast(Response.error.code), .Reason = std::move(Response.error.message)}; } const bool Success = Response.status_code == 200; const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); return {.Response = std::move(Buffer), .Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Reason = Response.reason, .Success = Success}; } } // namespace zen