// Copyright Epic Games, Inc. All Rights Reserved. #include "zen.h" #include #include #include #include #include #include #include #include #include #include #include #include "diag/logging.h" #include #include namespace zen { ////////////////////////////////////////////////////////////////////////// ZenStructuredCacheClient::ZenStructuredCacheClient(const ZenStructuredCacheClientOptions& Options) : m_Log(logging::Get(std::string_view("zenclient"))) , m_ServiceUrl(Options.Url) , m_ConnectTimeout(Options.ConnectTimeout) , m_Timeout(Options.Timeout) { } ZenStructuredCacheClient::~ZenStructuredCacheClient() { } ////////////////////////////////////////////////////////////////////////// using namespace std::literals; ZenStructuredCacheSession::ZenStructuredCacheSession(Ref&& OuterClient) : m_Log(OuterClient->Log()) , m_Client(std::move(OuterClient)) { } ZenStructuredCacheSession::~ZenStructuredCacheSession() { } ZenCacheResult ZenStructuredCacheSession::CheckHealth() { HttpClient Http{m_Client->ServiceUrl()}; HttpClient::Response Response = Http.Get("/health/check"sv); if (auto& Error = Response.Error; Error) { return {.ErrorCode = static_cast(Error->ErrorCode), .Reason = std::move(Error->ErrorMessage)}; } return {.Bytes = Response.DownloadedBytes, .ElapsedSeconds = Response.ElapsedSeconds, .Success = Response.StatusCode == HttpResponseCode::OK}; } ZenCacheResult ZenStructuredCacheSession::GetCacheRecord(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, ZenContentType Type) { HttpClient Http{m_Client->ServiceUrl()}; ExtendableStringBuilder<256> Uri; Uri << "/z$/"; if (Namespace != ZenCacheStore::DefaultNamespace) { Uri << Namespace << "/"; } Uri << BucketId << "/" << Key.ToHexString(); HttpClient::Response Response = Http.Get(Uri, {{"Accept", std::string{MapContentTypeToString(Type)}}}); ZEN_DEBUG("GET {}", Response); if (auto& Error = Response.Error; Error) { return {.ErrorCode = static_cast(Error->ErrorCode), .Reason = std::move(Error->ErrorMessage)}; } const bool Success = Response.StatusCode == HttpResponseCode::OK; const IoBuffer Buffer = Success ? Response.ResponsePayload : IoBuffer{}; return {.Response = Buffer, .Bytes = Response.DownloadedBytes, .ElapsedSeconds = Response.ElapsedSeconds, .Success = Success}; } ZenCacheResult ZenStructuredCacheSession::GetCacheChunk(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, const IoHash& ValueContentId) { HttpClient Http{m_Client->ServiceUrl()}; ExtendableStringBuilder<256> Uri; Uri << "/z$/"; if (Namespace != ZenCacheStore::DefaultNamespace) { Uri << Namespace << "/"; } Uri << BucketId << "/" << Key.ToHexString() << "/" << ValueContentId.ToHexString(); HttpClient::Response Response = Http.Get(Uri, {{"Accept", "application/x-ue-comp"}}); ZEN_DEBUG("GET {}", Response); if (auto& Error = Response.Error; Error) { return {.ErrorCode = static_cast(Error->ErrorCode), .Reason = std::move(Error->ErrorMessage)}; } const bool Success = Response.StatusCode == HttpResponseCode::OK; const IoBuffer Buffer = Success ? Response.ResponsePayload : IoBuffer{}; return {.Response = Buffer, .Bytes = Response.DownloadedBytes, .ElapsedSeconds = Response.ElapsedSeconds, .Success = Success}; } ZenCacheResult ZenStructuredCacheSession::PutCacheRecord(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, CachePolicy Policy, IoBuffer Value, ZenContentType Type) { HttpClient Http{m_Client->ServiceUrl()}; ExtendableStringBuilder<256> Uri; Uri << "/z$/"; if (Namespace != ZenCacheStore::DefaultNamespace) { Uri << Namespace << "/"; } Uri << BucketId << "/" << Key.ToHexString(); if (Policy != CachePolicy::Default) { Uri << "?Policy="; Uri << Policy; } Value.SetContentType(Type); HttpClient::Response Response = Http.Put(Uri, Value); ZEN_DEBUG("PUT {}", Response); if (auto& Error = Response.Error; Error) { return {.ErrorCode = static_cast(Error->ErrorCode), .Reason = std::move(Error->ErrorMessage)}; } const bool Success = Response.StatusCode == HttpResponseCode::OK || Response.StatusCode == HttpResponseCode::Created; return {.Bytes = Response.DownloadedBytes, .ElapsedSeconds = Response.ElapsedSeconds, .Success = Success}; } ZenCacheResult ZenStructuredCacheSession::PutCacheValue(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, CachePolicy Policy, const IoHash& ValueContentId, IoBuffer Payload) { HttpClient Http{m_Client->ServiceUrl()}; ExtendableStringBuilder<256> Uri; Uri << "/z$/"; if (Namespace != ZenCacheStore::DefaultNamespace) { Uri << Namespace << "/"; } Uri << BucketId << "/" << Key.ToHexString() << "/" << ValueContentId.ToHexString(); if (Policy != CachePolicy::Default) { Uri << "?Policy="; Uri << Policy; } Payload.SetContentType(HttpContentType::kCompressedBinary); HttpClient::Response Response = Http.Put(Uri, Payload); ZEN_DEBUG("PUT {}", Response); if (auto& Error = Response.Error; Error) { return {.ErrorCode = static_cast(Error->ErrorCode), .Reason = std::move(Error->ErrorMessage)}; } const bool Success = Response.StatusCode == HttpResponseCode::OK || Response.StatusCode == HttpResponseCode::Created; return {.Bytes = Response.DownloadedBytes, .ElapsedSeconds = Response.ElapsedSeconds, .Success = Success}; } ZenCacheResult ZenStructuredCacheSession::InvokeRpc(const CbObjectView& Request) { HttpClient Http{m_Client->ServiceUrl()}; ExtendableStringBuilder<256> Uri; Uri << "/z$/$rpc"; // TODO: this seems redundant, we should be able to send the data more directly, without the BinaryWriter BinaryWriter BodyWriter; Request.CopyTo(BodyWriter); IoBuffer Body{IoBuffer::Wrap, BodyWriter.GetData(), BodyWriter.GetSize()}; Body.SetContentType(HttpContentType::kCbObject); HttpClient::Response Response = Http.Post(Uri, Body, {{"Accept", "application/x-ue-cbpkg"}}); ZEN_DEBUG("POST {}", Response); if (auto& Error = Response.Error; Error) { return {.ErrorCode = static_cast(Error->ErrorCode), .Reason = std::move(Error->ErrorMessage)}; } const bool Success = Response.StatusCode == HttpResponseCode::OK; const IoBuffer Buffer = Success ? Response.ResponsePayload : IoBuffer{}; return {.Response = std::move(Buffer), .Bytes = Response.DownloadedBytes, .ElapsedSeconds = Response.ElapsedSeconds, .Success = Success}; } ZenCacheResult ZenStructuredCacheSession::InvokeRpc(const CbPackage& Request) { HttpClient Http{m_Client->ServiceUrl()}; ExtendableStringBuilder<256> Uri; Uri << "/z$/$rpc"; IoBuffer Message = FormatPackageMessageBuffer(Request).Flatten().AsIoBuffer(); Message.SetContentType(HttpContentType::kCbPackage); HttpClient::Response Response = Http.Post(Uri, Message, {{"Accept", "application/x-ue-cbpkg"}}); ZEN_DEBUG("POST {}", Response); if (auto& Error = Response.Error; Error) { return {.ErrorCode = static_cast(Error->ErrorCode), .Reason = std::move(Error->ErrorMessage)}; } const bool Success = Response.StatusCode == HttpResponseCode::OK; const IoBuffer Buffer = Success ? Response.ResponsePayload : IoBuffer{}; return {.Response = std::move(Buffer), .Bytes = Response.DownloadedBytes, .ElapsedSeconds = Response.ElapsedSeconds, .Success = Success}; } } // namespace zen