diff options
| author | Stefan Boberg <[email protected]> | 2021-05-11 13:05:39 +0200 |
|---|---|---|
| committer | Stefan Boberg <[email protected]> | 2021-05-11 13:05:39 +0200 |
| commit | f8d9ac5d13dd37b8b57af0478e77ba1e75c813aa (patch) | |
| tree | 1daf7621e110d48acd5e12e3073ce48ef0dd11b2 /zenserver/upstream | |
| download | zen-f8d9ac5d13dd37b8b57af0478e77ba1e75c813aa.tar.xz zen-f8d9ac5d13dd37b8b57af0478e77ba1e75c813aa.zip | |
Adding zenservice code
Diffstat (limited to 'zenserver/upstream')
| -rw-r--r-- | zenserver/upstream/jupiter.cpp | 277 | ||||
| -rw-r--r-- | zenserver/upstream/jupiter.h | 97 | ||||
| -rw-r--r-- | zenserver/upstream/zen.cpp | 291 | ||||
| -rw-r--r-- | zenserver/upstream/zen.h | 84 |
4 files changed, 749 insertions, 0 deletions
diff --git a/zenserver/upstream/jupiter.cpp b/zenserver/upstream/jupiter.cpp new file mode 100644 index 000000000..6b54f3d01 --- /dev/null +++ b/zenserver/upstream/jupiter.cpp @@ -0,0 +1,277 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "jupiter.h" + +#include <fmt/format.h> +#include <zencore/iobuffer.h> +#include <zencore/iohash.h> +#include <zencore/string.h> +#include <zencore/thread.h> + +// For some reason, these don't seem to stick, so we disable the warnings +//# define _SILENCE_CXX17_C_HEADER_DEPRECATION_WARNING 1 +//# define _SILENCE_ALL_CXX17_DEPRECATION_WARNINGS 1 +#pragma warning(push) +#pragma warning(disable : 4004) +#pragma warning(disable : 4996) +#include <cpr/cpr.h> +#pragma warning(pop) + +#if ZEN_PLATFORM_WINDOWS +# pragma comment(lib, "Crypt32.lib") +# pragma comment(lib, "Wldap32.lib") +#endif + +#include <spdlog/spdlog.h> +#include <json11.hpp> + +using namespace std::literals; +using namespace fmt::literals; + +namespace zen { + +namespace detail { + struct CloudCacheSessionState + { + CloudCacheSessionState(CloudCacheClient& Client) : OwnerClient(Client) {} + ~CloudCacheSessionState() {} + + void Reset() + { + std::string Auth; + OwnerClient.AcquireAccessToken(Auth); + + Session.SetBody({}); + Session.SetOption(cpr::Header{{"Authorization", Auth}}); + } + + CloudCacheClient& OwnerClient; + cpr::Session Session; + }; +} // namespace detail + +CloudCacheSession::CloudCacheSession(CloudCacheClient* OuterClient) : m_CacheClient(OuterClient) +{ + m_SessionState = m_CacheClient->AllocSessionState(); +} + +CloudCacheSession::~CloudCacheSession() +{ + m_CacheClient->FreeSessionState(m_SessionState); +} + +#define TESTING_PREFIX "aaaaa" + +IoBuffer +CloudCacheSession::Get(std::string_view BucketId, std::string_view Key) +{ + ExtendableStringBuilder<256> Uri; + Uri << m_CacheClient->ServiceUrl(); + Uri << "/api/v1/c/ddc/" << m_CacheClient->Namespace() << "/" << BucketId << "/" TESTING_PREFIX << Key << ".raw"; + + auto& Session = m_SessionState->Session; + Session.SetUrl(cpr::Url{Uri.c_str()}); + + cpr::Response Response = Session.Get(); + + if (!Response.error) + { + return IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()); + } + + return {}; +} + +void +CloudCacheSession::Put(std::string_view BucketId, std::string_view Key, IoBuffer Data) +{ + ExtendableStringBuilder<256> Uri; + Uri << m_CacheClient->ServiceUrl(); + Uri << "/api/v1/c/ddc/" << m_CacheClient->Namespace() << "/" << BucketId << "/" TESTING_PREFIX << Key; + + auto& Session = m_SessionState->Session; + + IoHash Hash = IoHash::HashMemory(Data.Data(), Data.Size()); + + std::string Auth; + m_CacheClient->AcquireAccessToken(Auth); + Session.SetOption(cpr::Url{Uri.c_str()}); + Session.SetOption( + cpr::Header{{"Authorization", Auth}, {"X-Jupiter-IoHash", Hash.ToHexString()}, {"Content-Type", "application/octet-stream"}}); + Session.SetOption(cpr::Body{(const char*)Data.Data(), Data.Size()}); + + cpr::Response Response = Session.Put(); + + if (Response.error) + { + spdlog::warn("PUT failed: '{}'", Response.error.message); + } +} + +////////////////////////////////////////////////////////////////////////// + +std::string +CloudCacheAccessToken::GetAuthorizationHeaderValue() +{ + RwLock::SharedLockScope _(m_Lock); + + return "Bearer {}"_format(m_Token); +} + +inline void +CloudCacheAccessToken::SetToken(std::string_view Token) +{ + RwLock::ExclusiveLockScope _(m_Lock); + m_Token = Token; + ++m_Serial; +} + +////////////////////////////////////////////////////////////////////////// +// +// ServiceUrl: https://jupiter.devtools.epicgames.com +// Namespace: ue4.ddc +// OAuthClientId: 0oao91lrhqPiAlaGD0x7 +// OAuthProvider: https://epicgames.okta.com/oauth2/auso645ojjWVdRI3d0x7/v1/token +// OAuthSecret: -GBWjjenhCgOwhxL5yBKNJECVIoDPH0MK4RDuN7d +// + +CloudCacheClient::CloudCacheClient(std::string_view ServiceUrl, + std::string_view Namespace, + std::string_view OAuthProvider, + std::string_view OAuthClientId, + std::string_view OAuthSecret) +: m_ServiceUrl(ServiceUrl) +, m_OAuthFullUri(OAuthProvider) +, m_Namespace(Namespace) +, m_DefaultBucket("default") +, m_OAuthClientId(OAuthClientId) +, m_OAuthSecret(OAuthSecret) +{ + if (!OAuthProvider.starts_with("http://"sv) && !OAuthProvider.starts_with("https://"sv)) + { + spdlog::warn("bad provider specification: '{}' - must be fully qualified"_format(OAuthProvider).c_str()); + m_IsValid = false; + + return; + } + + // Split into host and Uri substrings + + auto SchemePos = OAuthProvider.find("://"sv); + + if (SchemePos == std::string::npos) + { + spdlog::warn("Bad service URL passed to cloud cache client: '{}'", ServiceUrl); + m_IsValid = false; + + return; + } + + auto DomainEnd = OAuthProvider.find('/', /* also skip the :// */ SchemePos + 3); + + if (DomainEnd == std::string::npos) + { + spdlog::warn("Bad service URL passed to cloud cache client: '{}' no path delimiter found", ServiceUrl); + m_IsValid = false; + + return; + } + + m_OAuthDomain = OAuthProvider.substr(SchemePos + 3, DomainEnd - SchemePos - 3); // epicgames.okta.com + m_OAuthUriPath = OAuthProvider.substr(DomainEnd + 1); // oauth2/..../v1/token +} + +CloudCacheClient::~CloudCacheClient() +{ + RwLock::ExclusiveLockScope _(m_SessionStateLock); + + for (auto State : m_SessionStateCache) + { + delete State; + } +} + +bool +CloudCacheClient::AcquireAccessToken(std::string& AuthorizationHeaderValue) +{ + // TODO: check for expiration + + if (!m_IsValid) + { + ExtendableStringBuilder<128> OAuthFormData; + OAuthFormData << "client_id=" << m_OAuthClientId + << "&scope=cache_access&grant_type=client_credentials&client_secret=" << m_OAuthSecret; + + const uint32_t CurrentSerial = m_AccessToken.GetSerial(); + + static RwLock AuthMutex; + RwLock::ExclusiveLockScope _(AuthMutex); + + // Protect against redundant authentication operations + if (m_AccessToken.GetSerial() != CurrentSerial) + { + // TODO: this could verify that the token is actually valid and retry if not? + + return true; + } + + std::string data{OAuthFormData}; + + cpr::Response Response = + cpr::Post(cpr::Url{m_OAuthFullUri}, cpr::Header{{"Content-Type", "application/x-www-form-urlencoded"}}, cpr::Body{data}); + + std::string Body{std::move(Response.text)}; + + // Parse JSON response + + std::string JsonError; + json11::Json JsonResponse = json11::Json::parse(Body, /* out */ JsonError); + if (!JsonError.empty()) + { + spdlog::warn("failed to parse OAuth response: '{}'", JsonError); + + return false; + } + + std::string AccessToken = JsonResponse["access_token"].string_value(); + int ExpiryTimeSeconds = JsonResponse["expires_in"].int_value(); + + m_AccessToken.SetToken(AccessToken); + + m_IsValid = true; + } + + AuthorizationHeaderValue = m_AccessToken.GetAuthorizationHeaderValue(); + + return true; +} + +detail::CloudCacheSessionState* +CloudCacheClient::AllocSessionState() +{ + detail::CloudCacheSessionState* State = nullptr; + + if (RwLock::ExclusiveLockScope _(m_SessionStateLock); !m_SessionStateCache.empty()) + { + State = m_SessionStateCache.front(); + m_SessionStateCache.pop_front(); + } + + if (State == nullptr) + { + State = new detail::CloudCacheSessionState(*this); + } + + State->Reset(); + + return State; +} + +void +CloudCacheClient::FreeSessionState(detail::CloudCacheSessionState* State) +{ + RwLock::ExclusiveLockScope _(m_SessionStateLock); + m_SessionStateCache.push_front(State); +} + +} // namespace zen diff --git a/zenserver/upstream/jupiter.h b/zenserver/upstream/jupiter.h new file mode 100644 index 000000000..dd01cfb86 --- /dev/null +++ b/zenserver/upstream/jupiter.h @@ -0,0 +1,97 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/refcount.h> +#include <zencore/thread.h> + +#include <atomic> +#include <list> +#include <memory> + +namespace zen { +namespace detail { + struct CloudCacheSessionState; +} + +class IoBuffer; +class CloudCacheClient; +struct IoHash; + +/** + * Cached access token, for use with `Authorization:` header + */ +struct CloudCacheAccessToken +{ + std::string GetAuthorizationHeaderValue(); + void SetToken(std::string_view Token); + + inline uint32_t GetSerial() const { return m_Serial.load(std::memory_order::memory_order_relaxed); } + +private: + RwLock m_Lock; + std::string m_Token; + std::atomic<uint32_t> m_Serial; +}; + +/** + * Context for performing Jupiter operations + * + * Maintains an HTTP connection so that subsequent operations don't need to go + * through the whole connection setup process + * + */ +class CloudCacheSession +{ +public: + CloudCacheSession(CloudCacheClient* OuterClient); + ~CloudCacheSession(); + + IoBuffer Get(std::string_view BucketId, std::string_view Key); + void Put(std::string_view BucketId, std::string_view Key, IoBuffer Data); + +private: + RefPtr<CloudCacheClient> m_CacheClient; + detail::CloudCacheSessionState* m_SessionState; +}; + +/** + * Jupiter upstream cache client + */ +class CloudCacheClient : public RefCounted +{ +public: + CloudCacheClient(std::string_view ServiceUrl, + std::string_view Namespace, + std::string_view OAuthProvider, + std::string_view OAuthClientId, + std::string_view OAuthSecret); + ~CloudCacheClient(); + + bool AcquireAccessToken(std::string& AuthorizationHeaderValue); + std::string_view Namespace() const { return m_Namespace; } + std::string_view DefaultBucket() const { return m_DefaultBucket; } + std::string_view ServiceUrl() const { return m_ServiceUrl; } + +private: + bool m_IsValid = false; + std::string m_ServiceUrl; + std::string m_OAuthDomain; + std::string m_OAuthUriPath; + std::string m_OAuthFullUri; + std::string m_Namespace; + std::string m_DefaultBucket; + std::string m_OAuthClientId; + std::string m_OAuthSecret; + CloudCacheAccessToken m_AccessToken; + + RwLock m_SessionStateLock; + std::list<detail::CloudCacheSessionState*> m_SessionStateCache; + + detail::CloudCacheSessionState* AllocSessionState(); + void FreeSessionState(detail::CloudCacheSessionState*); + + friend class CloudCacheSession; +}; + +} // namespace zen diff --git a/zenserver/upstream/zen.cpp b/zenserver/upstream/zen.cpp new file mode 100644 index 000000000..7148715f2 --- /dev/null +++ b/zenserver/upstream/zen.cpp @@ -0,0 +1,291 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "zen.h" + +#include <zencore/compactbinarybuilder.h> +#include <zencore/compactbinaryvalidation.h> +#include <zencore/fmtutils.h> +#include <zencore/stream.h> + +#include <spdlog/spdlog.h> +#include <xxhash.h> +#include <gsl/gsl-lite.hpp> + +namespace zen { + +namespace detail { + struct MessageHeader + { + static const uint32_t kMagic = 0x11'99'77'22; + + uint32_t Magic = kMagic; + uint32_t Checksum = 0; + uint16_t MessageSize = 0; // Size *including* this field and the reserved field + uint16_t Reserved = 0; + + void SetPayload(const void* PayloadData, uint64_t PayloadSize) + { + memcpy(Payload(), PayloadData, PayloadSize); + MessageSize = gsl::narrow<uint16_t>(PayloadSize + sizeof MessageSize + sizeof Reserved); + Checksum = ComputeChecksum(); + } + + inline CbObject GetMessage() const + { + if (IsOk()) + { + MemoryView MessageView(Payload(), MessageSize - sizeof MessageSize - sizeof Reserved); + + CbValidateError ValidationResult = ValidateCompactBinary(MessageView, CbValidateMode::All); + + if (ValidationResult == CbValidateError::None) + { + return CbObject{SharedBuffer::MakeView(MessageView)}; + } + } + + return {}; + } + + uint32_t TotalSize() const { return MessageSize + sizeof Checksum + sizeof Magic; } + uint32_t ComputeChecksum() const { return gsl::narrow_cast<uint32_t>(XXH3_64bits(&MessageSize, MessageSize)); } + inline bool IsOk() const { return Magic == kMagic && Checksum == ComputeChecksum(); } + + private: + inline void* Payload() { return &Reserved + 1; } + inline const void* Payload() const { return &Reserved + 1; } + }; +} // namespace detail + +// Note that currently this just implements an UDP echo service for testing purposes + +Mesh::Mesh(asio::io_context& IoContext) : m_IoContext(IoContext) +{ +} + +Mesh::~Mesh() +{ + Stop(); +} + +void +Mesh::Start(uint16_t Port) +{ + ZEN_ASSERT(Port); + ZEN_ASSERT(m_Port == 0); + + m_Port = Port; + m_UdpSocket = std::make_unique<asio::ip::udp::socket>(m_IoContext, asio::ip::udp::endpoint(asio::ip::udp::v4(), m_Port)); + m_Thread = std::make_unique<std::thread>([this] { Run(); }); +}; + +void +Mesh::Stop() +{ + using namespace std::literals; + + if (!m_Port) + { + // Never started, nothing to do here + return; + } + + CbObjectWriter Msg; + Msg << "bye"sv << m_SessionId; + BroadcastPacket(Msg); + + m_State = kExiting; + + std::error_code Ec; + m_Timer.cancel(Ec); + + m_UdpSocket->close(Ec); + + m_IoContext.stop(); + + if (m_Thread) + { + m_Thread->join(); + m_Thread.reset(); + } +} + +void +Mesh::EnqueueTick() +{ + m_Timer.expires_after(std::chrono::seconds(10)); + + m_Timer.async_wait([&](const std::error_code& Ec) { + if (!Ec) + { + OnTick(); + } + else + { + if (m_State != kExiting) + { + spdlog::warn("Mesh timer error: {}", Ec.message()); + } + } + }); +} + +void +Mesh::OnTick() +{ + using namespace std::literals; + + CbObjectWriter Msg; + + // Basic service information + + Msg.BeginArray("s"); + Msg << m_SessionId << m_Port << /* event sequence # */ uint32_t(0); + Msg.EndArray(); + + BroadcastPacket(Msg); + + EnqueueTick(); +} + +void +Mesh::BroadcastPacket(CbObjectWriter& Obj) +{ + std::error_code ErrorCode; + + asio::ip::udp::socket BroadcastSocket(m_IoContext); + BroadcastSocket.open(asio::ip::udp::v4(), ErrorCode); + + if (!ErrorCode) + { + BroadcastSocket.set_option(asio::ip::udp::socket::reuse_address(true)); + BroadcastSocket.set_option(asio::socket_base::broadcast(true)); + + asio::ip::udp::endpoint BroadcastEndpoint(asio::ip::address_v4::broadcast(), m_Port); + + uint8_t MessageBuffer[kMaxMessageSize]; + detail::MessageHeader* Message = reinterpret_cast<detail::MessageHeader*>(MessageBuffer); + *Message = {}; + + MemoryOutStream MemOut; + BinaryWriter Writer(MemOut); + + Obj.Save(Writer); + + // TODO: check that it fits in a packet! + + Message->SetPayload(MemOut.Data(), MemOut.Size()); + + BroadcastSocket.send_to(asio::buffer(Message, Message->TotalSize()), BroadcastEndpoint); + BroadcastSocket.close(); + } + else + { + spdlog::warn("failed to open broadcast socket: {}", ErrorCode.message()); + } +} + +void +Mesh::Run() +{ + m_State = kRunning; + + EnqueueTick(); + + IssueReceive(); + m_IoContext.run(); +} + +void +Mesh::IssueReceive() +{ + using namespace std::literals; + + m_UdpSocket->async_receive_from( + asio::buffer(m_MessageBuffer, sizeof m_MessageBuffer), + m_SenderEndpoint, + [this](std::error_code ec, size_t BytesReceived) { + if (!ec && BytesReceived) + { + std::error_code ErrorCode; + std::string Sender = m_SenderEndpoint.address().to_string(ErrorCode); + + // Process message + + uint32_t& Magic = *reinterpret_cast<uint32_t*>(m_MessageBuffer); + + switch (Magic) + { + case detail::MessageHeader::kMagic: + { + detail::MessageHeader& Header = *reinterpret_cast<detail::MessageHeader*>(m_MessageBuffer); + + if (CbObject Msg = Header.GetMessage()) + { + const asio::ip::address& Ip = m_SenderEndpoint.address(); + + if (auto Field = Msg["s"sv]) + { + // Announce + + CbArrayView Ci = Field.AsArrayView(); + auto It = Ci.CreateViewIterator(); + + const Oid SessionId = It->AsObjectId(); + + if (SessionId != Oid::Zero && SessionId != m_SessionId) + { + const uint16_t Port = (++It)->AsUInt16(m_SenderEndpoint.port()); + const uint32_t Lsn = (++It)->AsUInt32(); + + spdlog::info("received hey from {} ({})", Sender, SessionId); + + RwLock::ExclusiveLockScope _(m_SessionsLock); + + PeerInfo& Info = m_KnownPeers[SessionId]; + + Info.LastSeen = std::time(nullptr); + Info.SessionId = SessionId; + + if (std::find(begin(Info.SeenOnIP), end(Info.SeenOnIP), Ip) == Info.SeenOnIP.end()) + { + Info.SeenOnIP.push_back(Ip); + } + } + } + else if (auto Bye = Msg["bye"sv]) + { + Oid SessionId = Field.AsObjectId(); + + spdlog::info("received bye from {} ({})", Sender, SessionId); + + // We could verify that it's sent from a known IP before erasing the + // session, if we want to be paranoid + + RwLock::ExclusiveLockScope _(m_SessionsLock); + + m_KnownPeers.erase(SessionId); + } + else + { + // Unknown message type, just ignore + } + } + else + { + spdlog::warn("received malformed message from {}", Sender); + } + } + break; + + default: + spdlog::warn("received malformed data from {}", Sender); + break; + } + + IssueReceive(); + } + }); +} + +} // namespace zen diff --git a/zenserver/upstream/zen.h b/zenserver/upstream/zen.h new file mode 100644 index 000000000..75e29bf86 --- /dev/null +++ b/zenserver/upstream/zen.h @@ -0,0 +1,84 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/memory.h> +#include <zencore/thread.h> +#include <zencore/uid.h> +#include <zencore/zencore.h> + +#pragma warning(push) +#pragma warning(disable : 4127) +#include <tsl/robin_map.h> +#pragma warning(pop) + +#include <asio.hpp> + +#include <chrono> + +namespace zen { + +class CbObjectWriter; + +/** Zen mesh tracker + * + * Discovers and tracks local peers + */ +class Mesh +{ +public: + Mesh(asio::io_context& IoContext); + ~Mesh(); + + void Start(uint16_t Port); + void Stop(); + +private: + void Run(); + void IssueReceive(); + void EnqueueTick(); + void OnTick(); + void BroadcastPacket(CbObjectWriter&); + + enum State + { + kInitializing, + kRunning, + kExiting + }; + + static const int kMaxMessageSize = 2048; + static const int kMaxUpdateSize = 1400; // We'll try not to send messages larger than this + + std::atomic<State> m_State = kInitializing; + asio::io_context& m_IoContext; + std::unique_ptr<asio::ip::udp::socket> m_UdpSocket; + std::unique_ptr<asio::ip::udp::socket> m_BroadcastSocket; + asio::ip::udp::endpoint m_SenderEndpoint; + std::unique_ptr<std::thread> m_Thread; + uint16_t m_Port = 0; + uint8_t m_MessageBuffer[kMaxMessageSize]; + asio::high_resolution_timer m_Timer{m_IoContext}; + Oid m_SessionId{Oid::NewOid()}; + + struct PeerInfo + { + Oid SessionId; + std::time_t LastSeen; + std::vector<asio::ip::address> SeenOnIP; + }; + + RwLock m_SessionsLock; + tsl::robin_map<Oid, PeerInfo, Oid::Hasher> m_KnownPeers; +}; + +class ZenKvCacheClient +{ +public: + ZenKvCacheClient(); + ~ZenKvCacheClient(); + +private: +}; + +} // namespace zen |