aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/storage/upstream/zen.cpp
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2025-10-14 11:32:16 +0200
committerGitHub Enterprise <[email protected]>2025-10-14 11:32:16 +0200
commitca09abbeef5b1788f4a52b61eedd2f3dd07f81f2 (patch)
tree005a50adfddf6982bab3a06bb93d4c50da1a11fd /src/zenserver/storage/upstream/zen.cpp
parentmake asiohttp work without IPv6 (#562) (diff)
downloadzen-ca09abbeef5b1788f4a52b61eedd2f3dd07f81f2.tar.xz
zen-ca09abbeef5b1788f4a52b61eedd2f3dd07f81f2.zip
move all storage-related services into storage tree (#571)
* move all storage-related services into storage tree * move config into config/ * also move admin service into storage since it mostly has storage related functionality * header consolidation
Diffstat (limited to 'src/zenserver/storage/upstream/zen.cpp')
-rw-r--r--src/zenserver/storage/upstream/zen.cpp251
1 files changed, 251 insertions, 0 deletions
diff --git a/src/zenserver/storage/upstream/zen.cpp b/src/zenserver/storage/upstream/zen.cpp
new file mode 100644
index 000000000..25fd3a3bb
--- /dev/null
+++ b/src/zenserver/storage/upstream/zen.cpp
@@ -0,0 +1,251 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "zen.h"
+
+#include <zencore/compactbinarybuilder.h>
+#include <zencore/compactbinarypackage.h>
+#include <zencore/compactbinaryvalidation.h>
+#include <zencore/fmtutils.h>
+#include <zencore/session.h>
+#include <zencore/stream.h>
+#include <zenhttp/formatters.h>
+#include <zenhttp/httpclient.h>
+#include <zenhttp/httpcommon.h>
+#include <zenhttp/packageformat.h>
+
+#include <zenstore/cache/structuredcachestore.h>
+#include "diag/logging.h"
+
+#include <xxhash.h>
+#include <gsl/gsl-lite.hpp>
+
+namespace zen {
+
+//////////////////////////////////////////////////////////////////////////
+
+ZenStructuredCacheClient::ZenStructuredCacheClient(const ZenStructuredCacheClientOptions& Options)
+: m_Log(logging::Get(std::string_view("zenclient")))
+, m_ServiceUrl(Options.Url)
+, m_ConnectTimeout(Options.ConnectTimeout)
+, m_Timeout(Options.Timeout)
+{
+}
+
+ZenStructuredCacheClient::~ZenStructuredCacheClient()
+{
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+using namespace std::literals;
+
+ZenStructuredCacheSession::ZenStructuredCacheSession(Ref<ZenStructuredCacheClient>&& OuterClient)
+: m_Log(OuterClient->Log())
+, m_Client(std::move(OuterClient))
+{
+}
+
+ZenStructuredCacheSession::~ZenStructuredCacheSession()
+{
+}
+
+ZenCacheResult
+ZenStructuredCacheSession::CheckHealth()
+{
+ HttpClient Http{m_Client->ServiceUrl()};
+
+ HttpClient::Response Response = Http.Get("/health/check"sv);
+
+ if (auto& Error = Response.Error; Error)
+ {
+ return {.ErrorCode = static_cast<int32_t>(Error->ErrorCode), .Reason = std::move(Error->ErrorMessage)};
+ }
+
+ return {.Bytes = Response.DownloadedBytes,
+ .ElapsedSeconds = Response.ElapsedSeconds,
+ .Success = Response.StatusCode == HttpResponseCode::OK};
+}
+
+ZenCacheResult
+ZenStructuredCacheSession::GetCacheRecord(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, ZenContentType Type)
+{
+ HttpClient Http{m_Client->ServiceUrl()};
+
+ ExtendableStringBuilder<256> Uri;
+ Uri << "/z$/";
+ if (Namespace != ZenCacheStore::DefaultNamespace)
+ {
+ Uri << Namespace << "/";
+ }
+ Uri << BucketId << "/" << Key.ToHexString();
+
+ HttpClient::Response Response = Http.Get(Uri, {{"Accept", std::string{MapContentTypeToString(Type)}}});
+ ZEN_DEBUG("GET {}", Response);
+
+ if (auto& Error = Response.Error; Error)
+ {
+ return {.ErrorCode = static_cast<int32_t>(Error->ErrorCode), .Reason = std::move(Error->ErrorMessage)};
+ }
+
+ const bool Success = Response.StatusCode == HttpResponseCode::OK;
+ const IoBuffer Buffer = Success ? Response.ResponsePayload : IoBuffer{};
+
+ return {.Response = Buffer, .Bytes = Response.DownloadedBytes, .ElapsedSeconds = Response.ElapsedSeconds, .Success = Success};
+}
+
+ZenCacheResult
+ZenStructuredCacheSession::GetCacheChunk(std::string_view Namespace,
+ std::string_view BucketId,
+ const IoHash& Key,
+ const IoHash& ValueContentId)
+{
+ HttpClient Http{m_Client->ServiceUrl()};
+
+ ExtendableStringBuilder<256> Uri;
+ Uri << "/z$/";
+ if (Namespace != ZenCacheStore::DefaultNamespace)
+ {
+ Uri << Namespace << "/";
+ }
+ Uri << BucketId << "/" << Key.ToHexString() << "/" << ValueContentId.ToHexString();
+
+ HttpClient::Response Response = Http.Get(Uri, {{"Accept", "application/x-ue-comp"}});
+ ZEN_DEBUG("GET {}", Response);
+
+ if (auto& Error = Response.Error; Error)
+ {
+ return {.ErrorCode = static_cast<int32_t>(Error->ErrorCode), .Reason = std::move(Error->ErrorMessage)};
+ }
+
+ const bool Success = Response.StatusCode == HttpResponseCode::OK;
+ const IoBuffer Buffer = Success ? Response.ResponsePayload : IoBuffer{};
+
+ return {.Response = Buffer, .Bytes = Response.DownloadedBytes, .ElapsedSeconds = Response.ElapsedSeconds, .Success = Success};
+}
+
+ZenCacheResult
+ZenStructuredCacheSession::PutCacheRecord(std::string_view Namespace,
+ std::string_view BucketId,
+ const IoHash& Key,
+ IoBuffer Value,
+ ZenContentType Type)
+{
+ HttpClient Http{m_Client->ServiceUrl()};
+
+ ExtendableStringBuilder<256> Uri;
+ Uri << "/z$/";
+ if (Namespace != ZenCacheStore::DefaultNamespace)
+ {
+ Uri << Namespace << "/";
+ }
+ Uri << BucketId << "/" << Key.ToHexString();
+
+ Value.SetContentType(Type);
+
+ HttpClient::Response Response = Http.Put(Uri, Value);
+ ZEN_DEBUG("PUT {}", Response);
+
+ if (auto& Error = Response.Error; Error)
+ {
+ return {.ErrorCode = static_cast<int32_t>(Error->ErrorCode), .Reason = std::move(Error->ErrorMessage)};
+ }
+
+ const bool Success = Response.StatusCode == HttpResponseCode::OK || Response.StatusCode == HttpResponseCode::Created;
+
+ return {.Bytes = Response.DownloadedBytes, .ElapsedSeconds = Response.ElapsedSeconds, .Success = Success};
+}
+
+ZenCacheResult
+ZenStructuredCacheSession::PutCacheValue(std::string_view Namespace,
+ std::string_view BucketId,
+ const IoHash& Key,
+ const IoHash& ValueContentId,
+ IoBuffer Payload)
+{
+ HttpClient Http{m_Client->ServiceUrl()};
+
+ ExtendableStringBuilder<256> Uri;
+ Uri << "/z$/";
+ if (Namespace != ZenCacheStore::DefaultNamespace)
+ {
+ Uri << Namespace << "/";
+ }
+ Uri << BucketId << "/" << Key.ToHexString() << "/" << ValueContentId.ToHexString();
+
+ Payload.SetContentType(HttpContentType::kCompressedBinary);
+
+ HttpClient::Response Response = Http.Put(Uri, Payload);
+ ZEN_DEBUG("PUT {}", Response);
+
+ if (auto& Error = Response.Error; Error)
+ {
+ return {.ErrorCode = static_cast<int32_t>(Error->ErrorCode), .Reason = std::move(Error->ErrorMessage)};
+ }
+
+ const bool Success = Response.StatusCode == HttpResponseCode::OK || Response.StatusCode == HttpResponseCode::Created;
+
+ return {.Bytes = Response.DownloadedBytes, .ElapsedSeconds = Response.ElapsedSeconds, .Success = Success};
+}
+
+ZenCacheResult
+ZenStructuredCacheSession::InvokeRpc(const CbObjectView& Request)
+{
+ HttpClient Http{m_Client->ServiceUrl()};
+
+ ExtendableStringBuilder<256> Uri;
+ Uri << "/z$/$rpc";
+
+ // TODO: this seems redundant, we should be able to send the data more directly, without the BinaryWriter
+
+ BinaryWriter BodyWriter;
+ Request.CopyTo(BodyWriter);
+
+ IoBuffer Body{IoBuffer::Wrap, BodyWriter.GetData(), BodyWriter.GetSize()};
+ Body.SetContentType(HttpContentType::kCbObject);
+
+ HttpClient::Response Response = Http.Post(Uri, Body, {{"Accept", "application/x-ue-cbpkg"}});
+ ZEN_DEBUG("POST {}", Response);
+
+ if (auto& Error = Response.Error; Error)
+ {
+ return {.ErrorCode = static_cast<int32_t>(Error->ErrorCode), .Reason = std::move(Error->ErrorMessage)};
+ }
+
+ const bool Success = Response.StatusCode == HttpResponseCode::OK;
+ const IoBuffer Buffer = Success ? Response.ResponsePayload : IoBuffer{};
+
+ return {.Response = std::move(Buffer),
+ .Bytes = Response.DownloadedBytes,
+ .ElapsedSeconds = Response.ElapsedSeconds,
+ .Success = Success};
+}
+
+ZenCacheResult
+ZenStructuredCacheSession::InvokeRpc(const CbPackage& Request)
+{
+ HttpClient Http{m_Client->ServiceUrl()};
+
+ ExtendableStringBuilder<256> Uri;
+ Uri << "/z$/$rpc";
+
+ IoBuffer Message = FormatPackageMessageBuffer(Request).Flatten().AsIoBuffer();
+ Message.SetContentType(HttpContentType::kCbPackage);
+
+ HttpClient::Response Response = Http.Post(Uri, Message, {{"Accept", "application/x-ue-cbpkg"}});
+ ZEN_DEBUG("POST {}", Response);
+
+ if (auto& Error = Response.Error; Error)
+ {
+ return {.ErrorCode = static_cast<int32_t>(Error->ErrorCode), .Reason = std::move(Error->ErrorMessage)};
+ }
+
+ const bool Success = Response.StatusCode == HttpResponseCode::OK;
+ const IoBuffer Buffer = Success ? Response.ResponsePayload : IoBuffer{};
+
+ return {.Response = std::move(Buffer),
+ .Bytes = Response.DownloadedBytes,
+ .ElapsedSeconds = Response.ElapsedSeconds,
+ .Success = Success};
+}
+
+} // namespace zen