diff options
| author | Stefan Boberg <[email protected]> | 2023-05-02 10:01:47 +0200 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-05-02 10:01:47 +0200 |
| commit | 075d17f8ada47e990fe94606c3d21df409223465 (patch) | |
| tree | e50549b766a2f3c354798a54ff73404217b4c9af /src/zenserver/upstream/zen.cpp | |
| parent | fix: bundle shouldn't append content zip to zen (diff) | |
| download | zen-075d17f8ada47e990fe94606c3d21df409223465.tar.xz zen-075d17f8ada47e990fe94606c3d21df409223465.zip | |
moved source directories into `/src` (#264)
* moved source directories into `/src`
* updated bundle.lua for new `src` path
* moved some docs, icon
* removed old test trees
Diffstat (limited to 'src/zenserver/upstream/zen.cpp')
| -rw-r--r-- | src/zenserver/upstream/zen.cpp | 326 |
1 files changed, 326 insertions, 0 deletions
diff --git a/src/zenserver/upstream/zen.cpp b/src/zenserver/upstream/zen.cpp new file mode 100644 index 000000000..9e1212834 --- /dev/null +++ b/src/zenserver/upstream/zen.cpp @@ -0,0 +1,326 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "zen.h" + +#include <zencore/compactbinarybuilder.h> +#include <zencore/compactbinarypackage.h> +#include <zencore/compactbinaryvalidation.h> +#include <zencore/fmtutils.h> +#include <zencore/session.h> +#include <zencore/stream.h> +#include <zenhttp/httpcommon.h> +#include <zenhttp/httpshared.h> + +#include "cache/structuredcachestore.h" +#include "diag/formatters.h" +#include "diag/logging.h" + +ZEN_THIRD_PARTY_INCLUDES_START +#include <cpr/cpr.h> +ZEN_THIRD_PARTY_INCLUDES_END + +#include <xxhash.h> +#include <gsl/gsl-lite.hpp> + +namespace zen { + +namespace detail { + struct ZenCacheSessionState + { + ZenCacheSessionState(ZenStructuredCacheClient& Client) : OwnerClient(Client) {} + ~ZenCacheSessionState() {} + + void Reset(std::chrono::milliseconds ConnectTimeout, std::chrono::milliseconds Timeout) + { + Session.SetBody({}); + Session.SetHeader({}); + Session.SetConnectTimeout(ConnectTimeout); + Session.SetTimeout(Timeout); + } + + cpr::Session& GetSession() { return Session; } + + private: + ZenStructuredCacheClient& OwnerClient; + cpr::Session Session; + }; + +} // namespace detail + +////////////////////////////////////////////////////////////////////////// + +ZenStructuredCacheClient::ZenStructuredCacheClient(const ZenStructuredCacheClientOptions& Options) +: m_Log(logging::Get(std::string_view("zenclient"))) +, m_ServiceUrl(Options.Url) +, m_ConnectTimeout(Options.ConnectTimeout) +, m_Timeout(Options.Timeout) +{ +} + +ZenStructuredCacheClient::~ZenStructuredCacheClient() +{ +} + +detail::ZenCacheSessionState* +ZenStructuredCacheClient::AllocSessionState() +{ + detail::ZenCacheSessionState* State = nullptr; + + if (RwLock::ExclusiveLockScope _(m_SessionStateLock); !m_SessionStateCache.empty()) + { + State = m_SessionStateCache.front(); + m_SessionStateCache.pop_front(); + } + + if (State == nullptr) + { + State = new detail::ZenCacheSessionState(*this); + } + + State->Reset(m_ConnectTimeout, m_Timeout); + + return State; +} + +void +ZenStructuredCacheClient::FreeSessionState(detail::ZenCacheSessionState* State) +{ + RwLock::ExclusiveLockScope _(m_SessionStateLock); + m_SessionStateCache.push_front(State); +} + +////////////////////////////////////////////////////////////////////////// + +using namespace std::literals; + +ZenStructuredCacheSession::ZenStructuredCacheSession(Ref<ZenStructuredCacheClient>&& OuterClient) +: m_Log(OuterClient->Log()) +, m_Client(std::move(OuterClient)) +{ + m_SessionState = m_Client->AllocSessionState(); +} + +ZenStructuredCacheSession::~ZenStructuredCacheSession() +{ + m_Client->FreeSessionState(m_SessionState); +} + +ZenCacheResult +ZenStructuredCacheSession::CheckHealth() +{ + ExtendableStringBuilder<256> Uri; + Uri << m_Client->ServiceUrl() << "/health/check"; + + cpr::Session& Session = m_SessionState->GetSession(); + Session.SetOption(cpr::Url{Uri.c_str()}); + cpr::Response Response = Session.Get(); + + if (Response.error) + { + return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = std::move(Response.error.message)}; + } + + return {.Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200}; +} + +ZenCacheResult +ZenStructuredCacheSession::GetCacheRecord(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, ZenContentType Type) +{ + ExtendableStringBuilder<256> Uri; + Uri << m_Client->ServiceUrl() << "/z$/"; + if (Namespace != ZenCacheStore::DefaultNamespace) + { + Uri << Namespace << "/"; + } + Uri << BucketId << "/" << Key.ToHexString(); + + cpr::Session& Session = m_SessionState->GetSession(); + + Session.SetOption(cpr::Url{Uri.c_str()}); + Session.SetHeader(cpr::Header{{"Accept", std::string{MapContentTypeToString(Type)}}}); + cpr::Response Response = Session.Get(); + ZEN_DEBUG("GET {}", Response); + + if (Response.error) + { + return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = std::move(Response.error.message)}; + } + + const bool Success = Response.status_code == 200; + const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); + + return {.Response = Buffer, .Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success}; +} + +ZenCacheResult +ZenStructuredCacheSession::GetCacheChunk(std::string_view Namespace, + std::string_view BucketId, + const IoHash& Key, + const IoHash& ValueContentId) +{ + ExtendableStringBuilder<256> Uri; + Uri << m_Client->ServiceUrl() << "/z$/"; + if (Namespace != ZenCacheStore::DefaultNamespace) + { + Uri << Namespace << "/"; + } + Uri << BucketId << "/" << Key.ToHexString() << "/" << ValueContentId.ToHexString(); + + cpr::Session& Session = m_SessionState->GetSession(); + + Session.SetOption(cpr::Url{Uri.c_str()}); + Session.SetHeader(cpr::Header{{"Accept", "application/x-ue-comp"}}); + + cpr::Response Response = Session.Get(); + ZEN_DEBUG("GET {}", Response); + + if (Response.error) + { + return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = std::move(Response.error.message)}; + } + + const bool Success = Response.status_code == 200; + const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); + + return {.Response = Buffer, + .Bytes = Response.downloaded_bytes, + .ElapsedSeconds = Response.elapsed, + .Reason = Response.reason, + .Success = Success}; +} + +ZenCacheResult +ZenStructuredCacheSession::PutCacheRecord(std::string_view Namespace, + std::string_view BucketId, + const IoHash& Key, + IoBuffer Value, + ZenContentType Type) +{ + ExtendableStringBuilder<256> Uri; + Uri << m_Client->ServiceUrl() << "/z$/"; + if (Namespace != ZenCacheStore::DefaultNamespace) + { + Uri << Namespace << "/"; + } + Uri << BucketId << "/" << Key.ToHexString(); + + cpr::Session& Session = m_SessionState->GetSession(); + + Session.SetOption(cpr::Url{Uri.c_str()}); + Session.SetHeader(cpr::Header{{"Content-Type", + Type == ZenContentType::kCbPackage ? "application/x-ue-cbpkg" + : Type == ZenContentType::kCbObject ? "application/x-ue-cb" + : "application/octet-stream"}}); + Session.SetBody(cpr::Body{static_cast<const char*>(Value.Data()), Value.Size()}); + + cpr::Response Response = Session.Put(); + ZEN_DEBUG("PUT {}", Response); + + if (Response.error) + { + return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = std::move(Response.error.message)}; + } + + const bool Success = Response.status_code == 200 || Response.status_code == 201; + return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Reason = Response.reason, .Success = Success}; +} + +ZenCacheResult +ZenStructuredCacheSession::PutCacheValue(std::string_view Namespace, + std::string_view BucketId, + const IoHash& Key, + const IoHash& ValueContentId, + IoBuffer Payload) +{ + ExtendableStringBuilder<256> Uri; + Uri << m_Client->ServiceUrl() << "/z$/"; + if (Namespace != ZenCacheStore::DefaultNamespace) + { + Uri << Namespace << "/"; + } + Uri << BucketId << "/" << Key.ToHexString() << "/" << ValueContentId.ToHexString(); + + cpr::Session& Session = m_SessionState->GetSession(); + + Session.SetOption(cpr::Url{Uri.c_str()}); + Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-comp"}}); + Session.SetBody(cpr::Body{static_cast<const char*>(Payload.Data()), Payload.Size()}); + + cpr::Response Response = Session.Put(); + ZEN_DEBUG("PUT {}", Response); + + if (Response.error) + { + return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = std::move(Response.error.message)}; + } + + const bool Success = Response.status_code == 200 || Response.status_code == 201; + return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Reason = Response.reason, .Success = Success}; +} + +ZenCacheResult +ZenStructuredCacheSession::InvokeRpc(const CbObjectView& Request) +{ + ExtendableStringBuilder<256> Uri; + Uri << m_Client->ServiceUrl() << "/z$/$rpc"; + + BinaryWriter Body; + Request.CopyTo(Body); + + cpr::Session& Session = m_SessionState->GetSession(); + + Session.SetOption(cpr::Url{Uri.c_str()}); + Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}, {"Accept", "application/x-ue-cbpkg"}}); + Session.SetBody(cpr::Body{reinterpret_cast<const char*>(Body.GetData()), Body.GetSize()}); + + cpr::Response Response = Session.Post(); + ZEN_DEBUG("POST {}", Response); + + if (Response.error) + { + return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = std::move(Response.error.message)}; + } + + const bool Success = Response.status_code == 200; + const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); + + return {.Response = std::move(Buffer), + .Bytes = Response.uploaded_bytes, + .ElapsedSeconds = Response.elapsed, + .Reason = Response.reason, + .Success = Success}; +} + +ZenCacheResult +ZenStructuredCacheSession::InvokeRpc(const CbPackage& Request) +{ + ExtendableStringBuilder<256> Uri; + Uri << m_Client->ServiceUrl() << "/z$/$rpc"; + + SharedBuffer Message = FormatPackageMessageBuffer(Request).Flatten(); + + cpr::Session& Session = m_SessionState->GetSession(); + + Session.SetOption(cpr::Url{Uri.c_str()}); + Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}}); + Session.SetBody(cpr::Body{reinterpret_cast<const char*>(Message.GetData()), Message.GetSize()}); + + cpr::Response Response = Session.Post(); + ZEN_DEBUG("POST {}", Response); + + if (Response.error) + { + return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = std::move(Response.error.message)}; + } + + const bool Success = Response.status_code == 200; + const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); + + return {.Response = std::move(Buffer), + .Bytes = Response.uploaded_bytes, + .ElapsedSeconds = Response.elapsed, + .Reason = Response.reason, + .Success = Success}; +} + +} // namespace zen |