aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/upstream/zen.cpp
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2023-05-02 10:01:47 +0200
committerGitHub <[email protected]>2023-05-02 10:01:47 +0200
commit075d17f8ada47e990fe94606c3d21df409223465 (patch)
treee50549b766a2f3c354798a54ff73404217b4c9af /src/zenserver/upstream/zen.cpp
parentfix: bundle shouldn't append content zip to zen (diff)
downloadzen-075d17f8ada47e990fe94606c3d21df409223465.tar.xz
zen-075d17f8ada47e990fe94606c3d21df409223465.zip
moved source directories into `/src` (#264)
* moved source directories into `/src` * updated bundle.lua for new `src` path * moved some docs, icon * removed old test trees
Diffstat (limited to 'src/zenserver/upstream/zen.cpp')
-rw-r--r--src/zenserver/upstream/zen.cpp326
1 files changed, 326 insertions, 0 deletions
diff --git a/src/zenserver/upstream/zen.cpp b/src/zenserver/upstream/zen.cpp
new file mode 100644
index 000000000..9e1212834
--- /dev/null
+++ b/src/zenserver/upstream/zen.cpp
@@ -0,0 +1,326 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "zen.h"
+
+#include <zencore/compactbinarybuilder.h>
+#include <zencore/compactbinarypackage.h>
+#include <zencore/compactbinaryvalidation.h>
+#include <zencore/fmtutils.h>
+#include <zencore/session.h>
+#include <zencore/stream.h>
+#include <zenhttp/httpcommon.h>
+#include <zenhttp/httpshared.h>
+
+#include "cache/structuredcachestore.h"
+#include "diag/formatters.h"
+#include "diag/logging.h"
+
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <cpr/cpr.h>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+#include <xxhash.h>
+#include <gsl/gsl-lite.hpp>
+
+namespace zen {
+
+namespace detail {
+ struct ZenCacheSessionState
+ {
+ ZenCacheSessionState(ZenStructuredCacheClient& Client) : OwnerClient(Client) {}
+ ~ZenCacheSessionState() {}
+
+ void Reset(std::chrono::milliseconds ConnectTimeout, std::chrono::milliseconds Timeout)
+ {
+ Session.SetBody({});
+ Session.SetHeader({});
+ Session.SetConnectTimeout(ConnectTimeout);
+ Session.SetTimeout(Timeout);
+ }
+
+ cpr::Session& GetSession() { return Session; }
+
+ private:
+ ZenStructuredCacheClient& OwnerClient;
+ cpr::Session Session;
+ };
+
+} // namespace detail
+
+//////////////////////////////////////////////////////////////////////////
+
+ZenStructuredCacheClient::ZenStructuredCacheClient(const ZenStructuredCacheClientOptions& Options)
+: m_Log(logging::Get(std::string_view("zenclient")))
+, m_ServiceUrl(Options.Url)
+, m_ConnectTimeout(Options.ConnectTimeout)
+, m_Timeout(Options.Timeout)
+{
+}
+
+ZenStructuredCacheClient::~ZenStructuredCacheClient()
+{
+}
+
+detail::ZenCacheSessionState*
+ZenStructuredCacheClient::AllocSessionState()
+{
+ detail::ZenCacheSessionState* State = nullptr;
+
+ if (RwLock::ExclusiveLockScope _(m_SessionStateLock); !m_SessionStateCache.empty())
+ {
+ State = m_SessionStateCache.front();
+ m_SessionStateCache.pop_front();
+ }
+
+ if (State == nullptr)
+ {
+ State = new detail::ZenCacheSessionState(*this);
+ }
+
+ State->Reset(m_ConnectTimeout, m_Timeout);
+
+ return State;
+}
+
+void
+ZenStructuredCacheClient::FreeSessionState(detail::ZenCacheSessionState* State)
+{
+ RwLock::ExclusiveLockScope _(m_SessionStateLock);
+ m_SessionStateCache.push_front(State);
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+using namespace std::literals;
+
+ZenStructuredCacheSession::ZenStructuredCacheSession(Ref<ZenStructuredCacheClient>&& OuterClient)
+: m_Log(OuterClient->Log())
+, m_Client(std::move(OuterClient))
+{
+ m_SessionState = m_Client->AllocSessionState();
+}
+
+ZenStructuredCacheSession::~ZenStructuredCacheSession()
+{
+ m_Client->FreeSessionState(m_SessionState);
+}
+
+ZenCacheResult
+ZenStructuredCacheSession::CheckHealth()
+{
+ ExtendableStringBuilder<256> Uri;
+ Uri << m_Client->ServiceUrl() << "/health/check";
+
+ cpr::Session& Session = m_SessionState->GetSession();
+ Session.SetOption(cpr::Url{Uri.c_str()});
+ cpr::Response Response = Session.Get();
+
+ if (Response.error)
+ {
+ return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = std::move(Response.error.message)};
+ }
+
+ return {.Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200};
+}
+
+ZenCacheResult
+ZenStructuredCacheSession::GetCacheRecord(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, ZenContentType Type)
+{
+ ExtendableStringBuilder<256> Uri;
+ Uri << m_Client->ServiceUrl() << "/z$/";
+ if (Namespace != ZenCacheStore::DefaultNamespace)
+ {
+ Uri << Namespace << "/";
+ }
+ Uri << BucketId << "/" << Key.ToHexString();
+
+ cpr::Session& Session = m_SessionState->GetSession();
+
+ Session.SetOption(cpr::Url{Uri.c_str()});
+ Session.SetHeader(cpr::Header{{"Accept", std::string{MapContentTypeToString(Type)}}});
+ cpr::Response Response = Session.Get();
+ ZEN_DEBUG("GET {}", Response);
+
+ if (Response.error)
+ {
+ return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = std::move(Response.error.message)};
+ }
+
+ const bool Success = Response.status_code == 200;
+ const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer();
+
+ return {.Response = Buffer, .Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success};
+}
+
+ZenCacheResult
+ZenStructuredCacheSession::GetCacheChunk(std::string_view Namespace,
+ std::string_view BucketId,
+ const IoHash& Key,
+ const IoHash& ValueContentId)
+{
+ ExtendableStringBuilder<256> Uri;
+ Uri << m_Client->ServiceUrl() << "/z$/";
+ if (Namespace != ZenCacheStore::DefaultNamespace)
+ {
+ Uri << Namespace << "/";
+ }
+ Uri << BucketId << "/" << Key.ToHexString() << "/" << ValueContentId.ToHexString();
+
+ cpr::Session& Session = m_SessionState->GetSession();
+
+ Session.SetOption(cpr::Url{Uri.c_str()});
+ Session.SetHeader(cpr::Header{{"Accept", "application/x-ue-comp"}});
+
+ cpr::Response Response = Session.Get();
+ ZEN_DEBUG("GET {}", Response);
+
+ if (Response.error)
+ {
+ return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = std::move(Response.error.message)};
+ }
+
+ const bool Success = Response.status_code == 200;
+ const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer();
+
+ return {.Response = Buffer,
+ .Bytes = Response.downloaded_bytes,
+ .ElapsedSeconds = Response.elapsed,
+ .Reason = Response.reason,
+ .Success = Success};
+}
+
+ZenCacheResult
+ZenStructuredCacheSession::PutCacheRecord(std::string_view Namespace,
+ std::string_view BucketId,
+ const IoHash& Key,
+ IoBuffer Value,
+ ZenContentType Type)
+{
+ ExtendableStringBuilder<256> Uri;
+ Uri << m_Client->ServiceUrl() << "/z$/";
+ if (Namespace != ZenCacheStore::DefaultNamespace)
+ {
+ Uri << Namespace << "/";
+ }
+ Uri << BucketId << "/" << Key.ToHexString();
+
+ cpr::Session& Session = m_SessionState->GetSession();
+
+ Session.SetOption(cpr::Url{Uri.c_str()});
+ Session.SetHeader(cpr::Header{{"Content-Type",
+ Type == ZenContentType::kCbPackage ? "application/x-ue-cbpkg"
+ : Type == ZenContentType::kCbObject ? "application/x-ue-cb"
+ : "application/octet-stream"}});
+ Session.SetBody(cpr::Body{static_cast<const char*>(Value.Data()), Value.Size()});
+
+ cpr::Response Response = Session.Put();
+ ZEN_DEBUG("PUT {}", Response);
+
+ if (Response.error)
+ {
+ return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = std::move(Response.error.message)};
+ }
+
+ const bool Success = Response.status_code == 200 || Response.status_code == 201;
+ return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Reason = Response.reason, .Success = Success};
+}
+
+ZenCacheResult
+ZenStructuredCacheSession::PutCacheValue(std::string_view Namespace,
+ std::string_view BucketId,
+ const IoHash& Key,
+ const IoHash& ValueContentId,
+ IoBuffer Payload)
+{
+ ExtendableStringBuilder<256> Uri;
+ Uri << m_Client->ServiceUrl() << "/z$/";
+ if (Namespace != ZenCacheStore::DefaultNamespace)
+ {
+ Uri << Namespace << "/";
+ }
+ Uri << BucketId << "/" << Key.ToHexString() << "/" << ValueContentId.ToHexString();
+
+ cpr::Session& Session = m_SessionState->GetSession();
+
+ Session.SetOption(cpr::Url{Uri.c_str()});
+ Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-comp"}});
+ Session.SetBody(cpr::Body{static_cast<const char*>(Payload.Data()), Payload.Size()});
+
+ cpr::Response Response = Session.Put();
+ ZEN_DEBUG("PUT {}", Response);
+
+ if (Response.error)
+ {
+ return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = std::move(Response.error.message)};
+ }
+
+ const bool Success = Response.status_code == 200 || Response.status_code == 201;
+ return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Reason = Response.reason, .Success = Success};
+}
+
+ZenCacheResult
+ZenStructuredCacheSession::InvokeRpc(const CbObjectView& Request)
+{
+ ExtendableStringBuilder<256> Uri;
+ Uri << m_Client->ServiceUrl() << "/z$/$rpc";
+
+ BinaryWriter Body;
+ Request.CopyTo(Body);
+
+ cpr::Session& Session = m_SessionState->GetSession();
+
+ Session.SetOption(cpr::Url{Uri.c_str()});
+ Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}, {"Accept", "application/x-ue-cbpkg"}});
+ Session.SetBody(cpr::Body{reinterpret_cast<const char*>(Body.GetData()), Body.GetSize()});
+
+ cpr::Response Response = Session.Post();
+ ZEN_DEBUG("POST {}", Response);
+
+ if (Response.error)
+ {
+ return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = std::move(Response.error.message)};
+ }
+
+ const bool Success = Response.status_code == 200;
+ const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer();
+
+ return {.Response = std::move(Buffer),
+ .Bytes = Response.uploaded_bytes,
+ .ElapsedSeconds = Response.elapsed,
+ .Reason = Response.reason,
+ .Success = Success};
+}
+
+ZenCacheResult
+ZenStructuredCacheSession::InvokeRpc(const CbPackage& Request)
+{
+ ExtendableStringBuilder<256> Uri;
+ Uri << m_Client->ServiceUrl() << "/z$/$rpc";
+
+ SharedBuffer Message = FormatPackageMessageBuffer(Request).Flatten();
+
+ cpr::Session& Session = m_SessionState->GetSession();
+
+ Session.SetOption(cpr::Url{Uri.c_str()});
+ Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}});
+ Session.SetBody(cpr::Body{reinterpret_cast<const char*>(Message.GetData()), Message.GetSize()});
+
+ cpr::Response Response = Session.Post();
+ ZEN_DEBUG("POST {}", Response);
+
+ if (Response.error)
+ {
+ return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = std::move(Response.error.message)};
+ }
+
+ const bool Success = Response.status_code == 200;
+ const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer();
+
+ return {.Response = std::move(Buffer),
+ .Bytes = Response.uploaded_bytes,
+ .ElapsedSeconds = Response.elapsed,
+ .Reason = Response.reason,
+ .Success = Success};
+}
+
+} // namespace zen