// Copyright Epic Games, Inc. All Rights Reserved. #include "zen.h" #include #include #include #include #include #include #include #include #include #include #include "diag/logging.h" ZEN_THIRD_PARTY_INCLUDES_START #include ZEN_THIRD_PARTY_INCLUDES_END #include #include namespace zen { namespace detail { struct ZenCacheSessionState { ZenCacheSessionState(ZenStructuredCacheClient& Client) : OwnerClient(Client) {} ~ZenCacheSessionState() {} void Reset(std::chrono::milliseconds ConnectTimeout, std::chrono::milliseconds Timeout) { Session.SetBody({}); Session.SetHeader({}); Session.SetConnectTimeout(ConnectTimeout); Session.SetTimeout(Timeout); } cpr::Session& GetSession() { return Session; } private: ZenStructuredCacheClient& OwnerClient; cpr::Session Session; }; } // namespace detail ////////////////////////////////////////////////////////////////////////// ZenStructuredCacheClient::ZenStructuredCacheClient(const ZenStructuredCacheClientOptions& Options) : m_Log(logging::Get(std::string_view("zenclient"))) , m_ServiceUrl(Options.Url) , m_ConnectTimeout(Options.ConnectTimeout) , m_Timeout(Options.Timeout) { } ZenStructuredCacheClient::~ZenStructuredCacheClient() { RwLock::ExclusiveLockScope _(m_SessionStateLock); for (auto& CacheEntry : m_SessionStateCache) { delete CacheEntry; } } detail::ZenCacheSessionState* ZenStructuredCacheClient::AllocSessionState() { detail::ZenCacheSessionState* State = nullptr; if (RwLock::ExclusiveLockScope _(m_SessionStateLock); !m_SessionStateCache.empty()) { State = m_SessionStateCache.front(); m_SessionStateCache.pop_front(); } if (State == nullptr) { State = new detail::ZenCacheSessionState(*this); } State->Reset(m_ConnectTimeout, m_Timeout); return State; } void ZenStructuredCacheClient::FreeSessionState(detail::ZenCacheSessionState* State) { RwLock::ExclusiveLockScope _(m_SessionStateLock); m_SessionStateCache.push_front(State); } ////////////////////////////////////////////////////////////////////////// using namespace std::literals; ZenStructuredCacheSession::ZenStructuredCacheSession(Ref&& OuterClient) : m_Log(OuterClient->Log()) , m_Client(std::move(OuterClient)) { m_SessionState = m_Client->AllocSessionState(); } ZenStructuredCacheSession::~ZenStructuredCacheSession() { m_Client->FreeSessionState(m_SessionState); } ZenCacheResult ZenStructuredCacheSession::CheckHealth() { ExtendableStringBuilder<256> Uri; Uri << m_Client->ServiceUrl() << "/health/check"; cpr::Session& Session = m_SessionState->GetSession(); Session.SetOption(cpr::Url{Uri.c_str()}); cpr::Response Response = Session.Get(); if (Response.error) { return {.ErrorCode = static_cast(Response.error.code), .Reason = std::move(Response.error.message)}; } return {.Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200}; } ZenCacheResult ZenStructuredCacheSession::GetCacheRecord(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, ZenContentType Type) { ExtendableStringBuilder<256> Uri; Uri << m_Client->ServiceUrl() << "/z$/"; if (Namespace != ZenCacheStore::DefaultNamespace) { Uri << Namespace << "/"; } Uri << BucketId << "/" << Key.ToHexString(); cpr::Session& Session = m_SessionState->GetSession(); Session.SetOption(cpr::Url{Uri.c_str()}); Session.SetHeader(cpr::Header{{"Accept", std::string{MapContentTypeToString(Type)}}}); cpr::Response Response = Session.Get(); ZEN_DEBUG("GET {}", Response); if (Response.error) { return {.ErrorCode = static_cast(Response.error.code), .Reason = std::move(Response.error.message)}; } const bool Success = Response.status_code == 200; const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); return {.Response = Buffer, .Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success}; } ZenCacheResult ZenStructuredCacheSession::GetCacheChunk(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, const IoHash& ValueContentId) { ExtendableStringBuilder<256> Uri; Uri << m_Client->ServiceUrl() << "/z$/"; if (Namespace != ZenCacheStore::DefaultNamespace) { Uri << Namespace << "/"; } Uri << BucketId << "/" << Key.ToHexString() << "/" << ValueContentId.ToHexString(); cpr::Session& Session = m_SessionState->GetSession(); Session.SetOption(cpr::Url{Uri.c_str()}); Session.SetHeader(cpr::Header{{"Accept", "application/x-ue-comp"}}); cpr::Response Response = Session.Get(); ZEN_DEBUG("GET {}", Response); if (Response.error) { return {.ErrorCode = static_cast(Response.error.code), .Reason = std::move(Response.error.message)}; } const bool Success = Response.status_code == 200; const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); return {.Response = Buffer, .Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Reason = Response.reason, .Success = Success}; } ZenCacheResult ZenStructuredCacheSession::PutCacheRecord(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, IoBuffer Value, ZenContentType Type) { ExtendableStringBuilder<256> Uri; Uri << m_Client->ServiceUrl() << "/z$/"; if (Namespace != ZenCacheStore::DefaultNamespace) { Uri << Namespace << "/"; } Uri << BucketId << "/" << Key.ToHexString(); cpr::Session& Session = m_SessionState->GetSession(); Session.SetOption(cpr::Url{Uri.c_str()}); Session.SetHeader(cpr::Header{{"Content-Type", Type == ZenContentType::kCbPackage ? "application/x-ue-cbpkg" : Type == ZenContentType::kCbObject ? "application/x-ue-cb" : "application/octet-stream"}}); Session.SetBody(cpr::Body{static_cast(Value.Data()), Value.Size()}); cpr::Response Response = Session.Put(); ZEN_DEBUG("PUT {}", Response); if (Response.error) { return {.ErrorCode = static_cast(Response.error.code), .Reason = std::move(Response.error.message)}; } const bool Success = Response.status_code == 200 || Response.status_code == 201; return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Reason = Response.reason, .Success = Success}; } ZenCacheResult ZenStructuredCacheSession::PutCacheValue(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, const IoHash& ValueContentId, IoBuffer Payload) { ExtendableStringBuilder<256> Uri; Uri << m_Client->ServiceUrl() << "/z$/"; if (Namespace != ZenCacheStore::DefaultNamespace) { Uri << Namespace << "/"; } Uri << BucketId << "/" << Key.ToHexString() << "/" << ValueContentId.ToHexString(); cpr::Session& Session = m_SessionState->GetSession(); Session.SetOption(cpr::Url{Uri.c_str()}); Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-comp"}}); Session.SetBody(cpr::Body{static_cast(Payload.Data()), Payload.Size()}); cpr::Response Response = Session.Put(); ZEN_DEBUG("PUT {}", Response); if (Response.error) { return {.ErrorCode = static_cast(Response.error.code), .Reason = std::move(Response.error.message)}; } const bool Success = Response.status_code == 200 || Response.status_code == 201; return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Reason = Response.reason, .Success = Success}; } ZenCacheResult ZenStructuredCacheSession::InvokeRpc(const CbObjectView& Request) { ExtendableStringBuilder<256> Uri; Uri << m_Client->ServiceUrl() << "/z$/$rpc"; BinaryWriter Body; Request.CopyTo(Body); cpr::Session& Session = m_SessionState->GetSession(); Session.SetOption(cpr::Url{Uri.c_str()}); Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}, {"Accept", "application/x-ue-cbpkg"}}); Session.SetBody(cpr::Body{reinterpret_cast(Body.GetData()), Body.GetSize()}); cpr::Response Response = Session.Post(); ZEN_DEBUG("POST {}", Response); if (Response.error) { return {.ErrorCode = static_cast(Response.error.code), .Reason = std::move(Response.error.message)}; } const bool Success = Response.status_code == 200; const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); return {.Response = std::move(Buffer), .Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Reason = Response.reason, .Success = Success}; } ZenCacheResult ZenStructuredCacheSession::InvokeRpc(const CbPackage& Request) { ExtendableStringBuilder<256> Uri; Uri << m_Client->ServiceUrl() << "/z$/$rpc"; SharedBuffer Message = FormatPackageMessageBuffer(Request).Flatten(); cpr::Session& Session = m_SessionState->GetSession(); Session.SetOption(cpr::Url{Uri.c_str()}); Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}}); Session.SetBody(cpr::Body{reinterpret_cast(Message.GetData()), Message.GetSize()}); cpr::Response Response = Session.Post(); ZEN_DEBUG("POST {}", Response); if (Response.error) { return {.ErrorCode = static_cast(Response.error.code), .Reason = std::move(Response.error.message)}; } const bool Success = Response.status_code == 200; const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); return {.Response = std::move(Buffer), .Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Reason = Response.reason, .Success = Success}; } } // namespace zen