diff options
| author | Stefan Boberg <[email protected]> | 2025-10-14 11:32:16 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-10-14 11:32:16 +0200 |
| commit | ca09abbeef5b1788f4a52b61eedd2f3dd07f81f2 (patch) | |
| tree | 005a50adfddf6982bab3a06bb93d4c50da1a11fd /src/zenserver/storage/upstream/zen.cpp | |
| parent | make asiohttp work without IPv6 (#562) (diff) | |
| download | zen-ca09abbeef5b1788f4a52b61eedd2f3dd07f81f2.tar.xz zen-ca09abbeef5b1788f4a52b61eedd2f3dd07f81f2.zip | |
move all storage-related services into storage tree (#571)
* move all storage-related services into storage tree
* move config into config/
* also move admin service into storage since it mostly has storage related functionality
* header consolidation
Diffstat (limited to 'src/zenserver/storage/upstream/zen.cpp')
| -rw-r--r-- | src/zenserver/storage/upstream/zen.cpp | 251 |
1 files changed, 251 insertions, 0 deletions
diff --git a/src/zenserver/storage/upstream/zen.cpp b/src/zenserver/storage/upstream/zen.cpp new file mode 100644 index 000000000..25fd3a3bb --- /dev/null +++ b/src/zenserver/storage/upstream/zen.cpp @@ -0,0 +1,251 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "zen.h" + +#include <zencore/compactbinarybuilder.h> +#include <zencore/compactbinarypackage.h> +#include <zencore/compactbinaryvalidation.h> +#include <zencore/fmtutils.h> +#include <zencore/session.h> +#include <zencore/stream.h> +#include <zenhttp/formatters.h> +#include <zenhttp/httpclient.h> +#include <zenhttp/httpcommon.h> +#include <zenhttp/packageformat.h> + +#include <zenstore/cache/structuredcachestore.h> +#include "diag/logging.h" + +#include <xxhash.h> +#include <gsl/gsl-lite.hpp> + +namespace zen { + +////////////////////////////////////////////////////////////////////////// + +ZenStructuredCacheClient::ZenStructuredCacheClient(const ZenStructuredCacheClientOptions& Options) +: m_Log(logging::Get(std::string_view("zenclient"))) +, m_ServiceUrl(Options.Url) +, m_ConnectTimeout(Options.ConnectTimeout) +, m_Timeout(Options.Timeout) +{ +} + +ZenStructuredCacheClient::~ZenStructuredCacheClient() +{ +} + +////////////////////////////////////////////////////////////////////////// + +using namespace std::literals; + +ZenStructuredCacheSession::ZenStructuredCacheSession(Ref<ZenStructuredCacheClient>&& OuterClient) +: m_Log(OuterClient->Log()) +, m_Client(std::move(OuterClient)) +{ +} + +ZenStructuredCacheSession::~ZenStructuredCacheSession() +{ +} + +ZenCacheResult +ZenStructuredCacheSession::CheckHealth() +{ + HttpClient Http{m_Client->ServiceUrl()}; + + HttpClient::Response Response = Http.Get("/health/check"sv); + + if (auto& Error = Response.Error; Error) + { + return {.ErrorCode = static_cast<int32_t>(Error->ErrorCode), .Reason = std::move(Error->ErrorMessage)}; + } + + return {.Bytes = Response.DownloadedBytes, + .ElapsedSeconds = Response.ElapsedSeconds, + .Success = Response.StatusCode == HttpResponseCode::OK}; +} + +ZenCacheResult +ZenStructuredCacheSession::GetCacheRecord(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, ZenContentType Type) +{ + HttpClient Http{m_Client->ServiceUrl()}; + + ExtendableStringBuilder<256> Uri; + Uri << "/z$/"; + if (Namespace != ZenCacheStore::DefaultNamespace) + { + Uri << Namespace << "/"; + } + Uri << BucketId << "/" << Key.ToHexString(); + + HttpClient::Response Response = Http.Get(Uri, {{"Accept", std::string{MapContentTypeToString(Type)}}}); + ZEN_DEBUG("GET {}", Response); + + if (auto& Error = Response.Error; Error) + { + return {.ErrorCode = static_cast<int32_t>(Error->ErrorCode), .Reason = std::move(Error->ErrorMessage)}; + } + + const bool Success = Response.StatusCode == HttpResponseCode::OK; + const IoBuffer Buffer = Success ? Response.ResponsePayload : IoBuffer{}; + + return {.Response = Buffer, .Bytes = Response.DownloadedBytes, .ElapsedSeconds = Response.ElapsedSeconds, .Success = Success}; +} + +ZenCacheResult +ZenStructuredCacheSession::GetCacheChunk(std::string_view Namespace, + std::string_view BucketId, + const IoHash& Key, + const IoHash& ValueContentId) +{ + HttpClient Http{m_Client->ServiceUrl()}; + + ExtendableStringBuilder<256> Uri; + Uri << "/z$/"; + if (Namespace != ZenCacheStore::DefaultNamespace) + { + Uri << Namespace << "/"; + } + Uri << BucketId << "/" << Key.ToHexString() << "/" << ValueContentId.ToHexString(); + + HttpClient::Response Response = Http.Get(Uri, {{"Accept", "application/x-ue-comp"}}); + ZEN_DEBUG("GET {}", Response); + + if (auto& Error = Response.Error; Error) + { + return {.ErrorCode = static_cast<int32_t>(Error->ErrorCode), .Reason = std::move(Error->ErrorMessage)}; + } + + const bool Success = Response.StatusCode == HttpResponseCode::OK; + const IoBuffer Buffer = Success ? Response.ResponsePayload : IoBuffer{}; + + return {.Response = Buffer, .Bytes = Response.DownloadedBytes, .ElapsedSeconds = Response.ElapsedSeconds, .Success = Success}; +} + +ZenCacheResult +ZenStructuredCacheSession::PutCacheRecord(std::string_view Namespace, + std::string_view BucketId, + const IoHash& Key, + IoBuffer Value, + ZenContentType Type) +{ + HttpClient Http{m_Client->ServiceUrl()}; + + ExtendableStringBuilder<256> Uri; + Uri << "/z$/"; + if (Namespace != ZenCacheStore::DefaultNamespace) + { + Uri << Namespace << "/"; + } + Uri << BucketId << "/" << Key.ToHexString(); + + Value.SetContentType(Type); + + HttpClient::Response Response = Http.Put(Uri, Value); + ZEN_DEBUG("PUT {}", Response); + + if (auto& Error = Response.Error; Error) + { + return {.ErrorCode = static_cast<int32_t>(Error->ErrorCode), .Reason = std::move(Error->ErrorMessage)}; + } + + const bool Success = Response.StatusCode == HttpResponseCode::OK || Response.StatusCode == HttpResponseCode::Created; + + return {.Bytes = Response.DownloadedBytes, .ElapsedSeconds = Response.ElapsedSeconds, .Success = Success}; +} + +ZenCacheResult +ZenStructuredCacheSession::PutCacheValue(std::string_view Namespace, + std::string_view BucketId, + const IoHash& Key, + const IoHash& ValueContentId, + IoBuffer Payload) +{ + HttpClient Http{m_Client->ServiceUrl()}; + + ExtendableStringBuilder<256> Uri; + Uri << "/z$/"; + if (Namespace != ZenCacheStore::DefaultNamespace) + { + Uri << Namespace << "/"; + } + Uri << BucketId << "/" << Key.ToHexString() << "/" << ValueContentId.ToHexString(); + + Payload.SetContentType(HttpContentType::kCompressedBinary); + + HttpClient::Response Response = Http.Put(Uri, Payload); + ZEN_DEBUG("PUT {}", Response); + + if (auto& Error = Response.Error; Error) + { + return {.ErrorCode = static_cast<int32_t>(Error->ErrorCode), .Reason = std::move(Error->ErrorMessage)}; + } + + const bool Success = Response.StatusCode == HttpResponseCode::OK || Response.StatusCode == HttpResponseCode::Created; + + return {.Bytes = Response.DownloadedBytes, .ElapsedSeconds = Response.ElapsedSeconds, .Success = Success}; +} + +ZenCacheResult +ZenStructuredCacheSession::InvokeRpc(const CbObjectView& Request) +{ + HttpClient Http{m_Client->ServiceUrl()}; + + ExtendableStringBuilder<256> Uri; + Uri << "/z$/$rpc"; + + // TODO: this seems redundant, we should be able to send the data more directly, without the BinaryWriter + + BinaryWriter BodyWriter; + Request.CopyTo(BodyWriter); + + IoBuffer Body{IoBuffer::Wrap, BodyWriter.GetData(), BodyWriter.GetSize()}; + Body.SetContentType(HttpContentType::kCbObject); + + HttpClient::Response Response = Http.Post(Uri, Body, {{"Accept", "application/x-ue-cbpkg"}}); + ZEN_DEBUG("POST {}", Response); + + if (auto& Error = Response.Error; Error) + { + return {.ErrorCode = static_cast<int32_t>(Error->ErrorCode), .Reason = std::move(Error->ErrorMessage)}; + } + + const bool Success = Response.StatusCode == HttpResponseCode::OK; + const IoBuffer Buffer = Success ? Response.ResponsePayload : IoBuffer{}; + + return {.Response = std::move(Buffer), + .Bytes = Response.DownloadedBytes, + .ElapsedSeconds = Response.ElapsedSeconds, + .Success = Success}; +} + +ZenCacheResult +ZenStructuredCacheSession::InvokeRpc(const CbPackage& Request) +{ + HttpClient Http{m_Client->ServiceUrl()}; + + ExtendableStringBuilder<256> Uri; + Uri << "/z$/$rpc"; + + IoBuffer Message = FormatPackageMessageBuffer(Request).Flatten().AsIoBuffer(); + Message.SetContentType(HttpContentType::kCbPackage); + + HttpClient::Response Response = Http.Post(Uri, Message, {{"Accept", "application/x-ue-cbpkg"}}); + ZEN_DEBUG("POST {}", Response); + + if (auto& Error = Response.Error; Error) + { + return {.ErrorCode = static_cast<int32_t>(Error->ErrorCode), .Reason = std::move(Error->ErrorMessage)}; + } + + const bool Success = Response.StatusCode == HttpResponseCode::OK; + const IoBuffer Buffer = Success ? Response.ResponsePayload : IoBuffer{}; + + return {.Response = std::move(Buffer), + .Bytes = Response.DownloadedBytes, + .ElapsedSeconds = Response.ElapsedSeconds, + .Success = Success}; +} + +} // namespace zen |