diff options
| author | Stefan Boberg <[email protected]> | 2025-09-29 10:36:32 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-09-29 10:36:32 +0200 |
| commit | 2f0efec7ab0430f4f4858db87b7eecfbccc0f47c (patch) | |
| tree | 80ce35992a220260cf070fac739626f555de738a /src/zenserver/upstream/zen.cpp | |
| parent | fixed race condition in zen::logging::Get (#519) (diff) | |
| download | zen-2f0efec7ab0430f4f4858db87b7eecfbccc0f47c.tar.xz zen-2f0efec7ab0430f4f4858db87b7eecfbccc0f47c.zip | |
make cpr a HttpClient implementation detail (#517)
these changes remove cpr from anything which is not `HttpClient` internals.
The goal is to eventually replace cpr with a more direct curl interface to eliminate cpr since it's proven problematic due to their development practices which frequently breaks APIs and prevents us from updating vcpkg. But this PR is limited to refactoring existing cpr code to use `HttpClient` instead.
Diffstat (limited to 'src/zenserver/upstream/zen.cpp')
| -rw-r--r-- | src/zenserver/upstream/zen.cpp | 226 |
1 files changed, 73 insertions, 153 deletions
diff --git a/src/zenserver/upstream/zen.cpp b/src/zenserver/upstream/zen.cpp index 7494ae379..25fd3a3bb 100644 --- a/src/zenserver/upstream/zen.cpp +++ b/src/zenserver/upstream/zen.cpp @@ -9,44 +9,18 @@ #include <zencore/session.h> #include <zencore/stream.h> #include <zenhttp/formatters.h> +#include <zenhttp/httpclient.h> #include <zenhttp/httpcommon.h> #include <zenhttp/packageformat.h> #include <zenstore/cache/structuredcachestore.h> #include "diag/logging.h" -ZEN_THIRD_PARTY_INCLUDES_START -#include <cpr/cpr.h> -ZEN_THIRD_PARTY_INCLUDES_END - #include <xxhash.h> #include <gsl/gsl-lite.hpp> namespace zen { -namespace detail { - struct ZenCacheSessionState - { - ZenCacheSessionState(ZenStructuredCacheClient& Client) : OwnerClient(Client) {} - ~ZenCacheSessionState() {} - - void Reset(std::chrono::milliseconds ConnectTimeout, std::chrono::milliseconds Timeout) - { - Session.SetBody({}); - Session.SetHeader({}); - Session.SetConnectTimeout(ConnectTimeout); - Session.SetTimeout(Timeout); - } - - cpr::Session& GetSession() { return Session; } - - private: - ZenStructuredCacheClient& OwnerClient; - cpr::Session Session; - }; - -} // namespace detail - ////////////////////////////////////////////////////////////////////////// ZenStructuredCacheClient::ZenStructuredCacheClient(const ZenStructuredCacheClientOptions& Options) @@ -59,39 +33,6 @@ ZenStructuredCacheClient::ZenStructuredCacheClient(const ZenStructuredCacheClien ZenStructuredCacheClient::~ZenStructuredCacheClient() { - RwLock::ExclusiveLockScope _(m_SessionStateLock); - for (auto& CacheEntry : m_SessionStateCache) - { - delete CacheEntry; - } -} - -detail::ZenCacheSessionState* -ZenStructuredCacheClient::AllocSessionState() -{ - detail::ZenCacheSessionState* State = nullptr; - - if (RwLock::ExclusiveLockScope _(m_SessionStateLock); !m_SessionStateCache.empty()) - { - State = m_SessionStateCache.front(); - m_SessionStateCache.pop_front(); - } - - if (State == nullptr) - { - State = new detail::ZenCacheSessionState(*this); - } - - State->Reset(m_ConnectTimeout, m_Timeout); - - return State; -} - -void -ZenStructuredCacheClient::FreeSessionState(detail::ZenCacheSessionState* State) -{ - RwLock::ExclusiveLockScope _(m_SessionStateLock); - m_SessionStateCache.push_front(State); } ////////////////////////////////////////////////////////////////////////// @@ -102,59 +43,54 @@ ZenStructuredCacheSession::ZenStructuredCacheSession(Ref<ZenStructuredCacheClien : m_Log(OuterClient->Log()) , m_Client(std::move(OuterClient)) { - m_SessionState = m_Client->AllocSessionState(); } ZenStructuredCacheSession::~ZenStructuredCacheSession() { - m_Client->FreeSessionState(m_SessionState); } ZenCacheResult ZenStructuredCacheSession::CheckHealth() { - ExtendableStringBuilder<256> Uri; - Uri << m_Client->ServiceUrl() << "/health/check"; + HttpClient Http{m_Client->ServiceUrl()}; - cpr::Session& Session = m_SessionState->GetSession(); - Session.SetOption(cpr::Url{Uri.c_str()}); - cpr::Response Response = Session.Get(); + HttpClient::Response Response = Http.Get("/health/check"sv); - if (Response.error) + if (auto& Error = Response.Error; Error) { - return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = std::move(Response.error.message)}; + return {.ErrorCode = static_cast<int32_t>(Error->ErrorCode), .Reason = std::move(Error->ErrorMessage)}; } - return {.Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200}; + return {.Bytes = Response.DownloadedBytes, + .ElapsedSeconds = Response.ElapsedSeconds, + .Success = Response.StatusCode == HttpResponseCode::OK}; } ZenCacheResult ZenStructuredCacheSession::GetCacheRecord(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, ZenContentType Type) { + HttpClient Http{m_Client->ServiceUrl()}; + ExtendableStringBuilder<256> Uri; - Uri << m_Client->ServiceUrl() << "/z$/"; + Uri << "/z$/"; if (Namespace != ZenCacheStore::DefaultNamespace) { Uri << Namespace << "/"; } Uri << BucketId << "/" << Key.ToHexString(); - cpr::Session& Session = m_SessionState->GetSession(); - - Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetHeader(cpr::Header{{"Accept", std::string{MapContentTypeToString(Type)}}}); - cpr::Response Response = Session.Get(); + HttpClient::Response Response = Http.Get(Uri, {{"Accept", std::string{MapContentTypeToString(Type)}}}); ZEN_DEBUG("GET {}", Response); - if (Response.error) + if (auto& Error = Response.Error; Error) { - return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = std::move(Response.error.message)}; + return {.ErrorCode = static_cast<int32_t>(Error->ErrorCode), .Reason = std::move(Error->ErrorMessage)}; } - const bool Success = Response.status_code == 200; - const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); + const bool Success = Response.StatusCode == HttpResponseCode::OK; + const IoBuffer Buffer = Success ? Response.ResponsePayload : IoBuffer{}; - return {.Response = Buffer, .Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success}; + return {.Response = Buffer, .Bytes = Response.DownloadedBytes, .ElapsedSeconds = Response.ElapsedSeconds, .Success = Success}; } ZenCacheResult @@ -163,35 +99,28 @@ ZenStructuredCacheSession::GetCacheChunk(std::string_view Namespace, const IoHash& Key, const IoHash& ValueContentId) { + HttpClient Http{m_Client->ServiceUrl()}; + ExtendableStringBuilder<256> Uri; - Uri << m_Client->ServiceUrl() << "/z$/"; + Uri << "/z$/"; if (Namespace != ZenCacheStore::DefaultNamespace) { Uri << Namespace << "/"; } Uri << BucketId << "/" << Key.ToHexString() << "/" << ValueContentId.ToHexString(); - cpr::Session& Session = m_SessionState->GetSession(); - - Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetHeader(cpr::Header{{"Accept", "application/x-ue-comp"}}); - - cpr::Response Response = Session.Get(); + HttpClient::Response Response = Http.Get(Uri, {{"Accept", "application/x-ue-comp"}}); ZEN_DEBUG("GET {}", Response); - if (Response.error) + if (auto& Error = Response.Error; Error) { - return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = std::move(Response.error.message)}; + return {.ErrorCode = static_cast<int32_t>(Error->ErrorCode), .Reason = std::move(Error->ErrorMessage)}; } - const bool Success = Response.status_code == 200; - const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); + const bool Success = Response.StatusCode == HttpResponseCode::OK; + const IoBuffer Buffer = Success ? Response.ResponsePayload : IoBuffer{}; - return {.Response = Buffer, - .Bytes = Response.downloaded_bytes, - .ElapsedSeconds = Response.elapsed, - .Reason = Response.reason, - .Success = Success}; + return {.Response = Buffer, .Bytes = Response.DownloadedBytes, .ElapsedSeconds = Response.ElapsedSeconds, .Success = Success}; } ZenCacheResult @@ -201,33 +130,29 @@ ZenStructuredCacheSession::PutCacheRecord(std::string_view Namespace, IoBuffer Value, ZenContentType Type) { + HttpClient Http{m_Client->ServiceUrl()}; + ExtendableStringBuilder<256> Uri; - Uri << m_Client->ServiceUrl() << "/z$/"; + Uri << "/z$/"; if (Namespace != ZenCacheStore::DefaultNamespace) { Uri << Namespace << "/"; } Uri << BucketId << "/" << Key.ToHexString(); - cpr::Session& Session = m_SessionState->GetSession(); - - Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetHeader(cpr::Header{{"Content-Type", - Type == ZenContentType::kCbPackage ? "application/x-ue-cbpkg" - : Type == ZenContentType::kCbObject ? "application/x-ue-cb" - : "application/octet-stream"}}); - Session.SetBody(cpr::Body{static_cast<const char*>(Value.Data()), Value.Size()}); + Value.SetContentType(Type); - cpr::Response Response = Session.Put(); + HttpClient::Response Response = Http.Put(Uri, Value); ZEN_DEBUG("PUT {}", Response); - if (Response.error) + if (auto& Error = Response.Error; Error) { - return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = std::move(Response.error.message)}; + return {.ErrorCode = static_cast<int32_t>(Error->ErrorCode), .Reason = std::move(Error->ErrorMessage)}; } - const bool Success = Response.status_code == 200 || Response.status_code == 201; - return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Reason = Response.reason, .Success = Success}; + const bool Success = Response.StatusCode == HttpResponseCode::OK || Response.StatusCode == HttpResponseCode::Created; + + return {.Bytes = Response.DownloadedBytes, .ElapsedSeconds = Response.ElapsedSeconds, .Success = Success}; } ZenCacheResult @@ -237,94 +162,89 @@ ZenStructuredCacheSession::PutCacheValue(std::string_view Namespace, const IoHash& ValueContentId, IoBuffer Payload) { + HttpClient Http{m_Client->ServiceUrl()}; + ExtendableStringBuilder<256> Uri; - Uri << m_Client->ServiceUrl() << "/z$/"; + Uri << "/z$/"; if (Namespace != ZenCacheStore::DefaultNamespace) { Uri << Namespace << "/"; } Uri << BucketId << "/" << Key.ToHexString() << "/" << ValueContentId.ToHexString(); - cpr::Session& Session = m_SessionState->GetSession(); - - Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-comp"}}); - Session.SetBody(cpr::Body{static_cast<const char*>(Payload.Data()), Payload.Size()}); + Payload.SetContentType(HttpContentType::kCompressedBinary); - cpr::Response Response = Session.Put(); + HttpClient::Response Response = Http.Put(Uri, Payload); ZEN_DEBUG("PUT {}", Response); - if (Response.error) + if (auto& Error = Response.Error; Error) { - return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = std::move(Response.error.message)}; + return {.ErrorCode = static_cast<int32_t>(Error->ErrorCode), .Reason = std::move(Error->ErrorMessage)}; } - const bool Success = Response.status_code == 200 || Response.status_code == 201; - return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Reason = Response.reason, .Success = Success}; + const bool Success = Response.StatusCode == HttpResponseCode::OK || Response.StatusCode == HttpResponseCode::Created; + + return {.Bytes = Response.DownloadedBytes, .ElapsedSeconds = Response.ElapsedSeconds, .Success = Success}; } ZenCacheResult ZenStructuredCacheSession::InvokeRpc(const CbObjectView& Request) { + HttpClient Http{m_Client->ServiceUrl()}; + ExtendableStringBuilder<256> Uri; - Uri << m_Client->ServiceUrl() << "/z$/$rpc"; + Uri << "/z$/$rpc"; - BinaryWriter Body; - Request.CopyTo(Body); + // TODO: this seems redundant, we should be able to send the data more directly, without the BinaryWriter - cpr::Session& Session = m_SessionState->GetSession(); + BinaryWriter BodyWriter; + Request.CopyTo(BodyWriter); - Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}, {"Accept", "application/x-ue-cbpkg"}}); - Session.SetBody(cpr::Body{reinterpret_cast<const char*>(Body.GetData()), Body.GetSize()}); + IoBuffer Body{IoBuffer::Wrap, BodyWriter.GetData(), BodyWriter.GetSize()}; + Body.SetContentType(HttpContentType::kCbObject); - cpr::Response Response = Session.Post(); + HttpClient::Response Response = Http.Post(Uri, Body, {{"Accept", "application/x-ue-cbpkg"}}); ZEN_DEBUG("POST {}", Response); - if (Response.error) + if (auto& Error = Response.Error; Error) { - return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = std::move(Response.error.message)}; + return {.ErrorCode = static_cast<int32_t>(Error->ErrorCode), .Reason = std::move(Error->ErrorMessage)}; } - const bool Success = Response.status_code == 200; - const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); + const bool Success = Response.StatusCode == HttpResponseCode::OK; + const IoBuffer Buffer = Success ? Response.ResponsePayload : IoBuffer{}; return {.Response = std::move(Buffer), - .Bytes = Response.uploaded_bytes, - .ElapsedSeconds = Response.elapsed, - .Reason = Response.reason, + .Bytes = Response.DownloadedBytes, + .ElapsedSeconds = Response.ElapsedSeconds, .Success = Success}; } ZenCacheResult ZenStructuredCacheSession::InvokeRpc(const CbPackage& Request) { - ExtendableStringBuilder<256> Uri; - Uri << m_Client->ServiceUrl() << "/z$/$rpc"; + HttpClient Http{m_Client->ServiceUrl()}; - SharedBuffer Message = FormatPackageMessageBuffer(Request).Flatten(); - - cpr::Session& Session = m_SessionState->GetSession(); + ExtendableStringBuilder<256> Uri; + Uri << "/z$/$rpc"; - Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}}); - Session.SetBody(cpr::Body{reinterpret_cast<const char*>(Message.GetData()), Message.GetSize()}); + IoBuffer Message = FormatPackageMessageBuffer(Request).Flatten().AsIoBuffer(); + Message.SetContentType(HttpContentType::kCbPackage); - cpr::Response Response = Session.Post(); + HttpClient::Response Response = Http.Post(Uri, Message, {{"Accept", "application/x-ue-cbpkg"}}); ZEN_DEBUG("POST {}", Response); - if (Response.error) + if (auto& Error = Response.Error; Error) { - return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = std::move(Response.error.message)}; + return {.ErrorCode = static_cast<int32_t>(Error->ErrorCode), .Reason = std::move(Error->ErrorMessage)}; } - const bool Success = Response.status_code == 200; - const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); + const bool Success = Response.StatusCode == HttpResponseCode::OK; + const IoBuffer Buffer = Success ? Response.ResponsePayload : IoBuffer{}; return {.Response = std::move(Buffer), - .Bytes = Response.uploaded_bytes, - .ElapsedSeconds = Response.elapsed, - .Reason = Response.reason, + .Bytes = Response.DownloadedBytes, + .ElapsedSeconds = Response.ElapsedSeconds, .Success = Success}; } |