aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/upstream/zen.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenserver/upstream/zen.cpp')
-rw-r--r--src/zenserver/upstream/zen.cpp226
1 files changed, 73 insertions, 153 deletions
diff --git a/src/zenserver/upstream/zen.cpp b/src/zenserver/upstream/zen.cpp
index 7494ae379..25fd3a3bb 100644
--- a/src/zenserver/upstream/zen.cpp
+++ b/src/zenserver/upstream/zen.cpp
@@ -9,44 +9,18 @@
#include <zencore/session.h>
#include <zencore/stream.h>
#include <zenhttp/formatters.h>
+#include <zenhttp/httpclient.h>
#include <zenhttp/httpcommon.h>
#include <zenhttp/packageformat.h>
#include <zenstore/cache/structuredcachestore.h>
#include "diag/logging.h"
-ZEN_THIRD_PARTY_INCLUDES_START
-#include <cpr/cpr.h>
-ZEN_THIRD_PARTY_INCLUDES_END
-
#include <xxhash.h>
#include <gsl/gsl-lite.hpp>
namespace zen {
-namespace detail {
- struct ZenCacheSessionState
- {
- ZenCacheSessionState(ZenStructuredCacheClient& Client) : OwnerClient(Client) {}
- ~ZenCacheSessionState() {}
-
- void Reset(std::chrono::milliseconds ConnectTimeout, std::chrono::milliseconds Timeout)
- {
- Session.SetBody({});
- Session.SetHeader({});
- Session.SetConnectTimeout(ConnectTimeout);
- Session.SetTimeout(Timeout);
- }
-
- cpr::Session& GetSession() { return Session; }
-
- private:
- ZenStructuredCacheClient& OwnerClient;
- cpr::Session Session;
- };
-
-} // namespace detail
-
//////////////////////////////////////////////////////////////////////////
ZenStructuredCacheClient::ZenStructuredCacheClient(const ZenStructuredCacheClientOptions& Options)
@@ -59,39 +33,6 @@ ZenStructuredCacheClient::ZenStructuredCacheClient(const ZenStructuredCacheClien
ZenStructuredCacheClient::~ZenStructuredCacheClient()
{
- RwLock::ExclusiveLockScope _(m_SessionStateLock);
- for (auto& CacheEntry : m_SessionStateCache)
- {
- delete CacheEntry;
- }
-}
-
-detail::ZenCacheSessionState*
-ZenStructuredCacheClient::AllocSessionState()
-{
- detail::ZenCacheSessionState* State = nullptr;
-
- if (RwLock::ExclusiveLockScope _(m_SessionStateLock); !m_SessionStateCache.empty())
- {
- State = m_SessionStateCache.front();
- m_SessionStateCache.pop_front();
- }
-
- if (State == nullptr)
- {
- State = new detail::ZenCacheSessionState(*this);
- }
-
- State->Reset(m_ConnectTimeout, m_Timeout);
-
- return State;
-}
-
-void
-ZenStructuredCacheClient::FreeSessionState(detail::ZenCacheSessionState* State)
-{
- RwLock::ExclusiveLockScope _(m_SessionStateLock);
- m_SessionStateCache.push_front(State);
}
//////////////////////////////////////////////////////////////////////////
@@ -102,59 +43,54 @@ ZenStructuredCacheSession::ZenStructuredCacheSession(Ref<ZenStructuredCacheClien
: m_Log(OuterClient->Log())
, m_Client(std::move(OuterClient))
{
- m_SessionState = m_Client->AllocSessionState();
}
ZenStructuredCacheSession::~ZenStructuredCacheSession()
{
- m_Client->FreeSessionState(m_SessionState);
}
ZenCacheResult
ZenStructuredCacheSession::CheckHealth()
{
- ExtendableStringBuilder<256> Uri;
- Uri << m_Client->ServiceUrl() << "/health/check";
+ HttpClient Http{m_Client->ServiceUrl()};
- cpr::Session& Session = m_SessionState->GetSession();
- Session.SetOption(cpr::Url{Uri.c_str()});
- cpr::Response Response = Session.Get();
+ HttpClient::Response Response = Http.Get("/health/check"sv);
- if (Response.error)
+ if (auto& Error = Response.Error; Error)
{
- return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = std::move(Response.error.message)};
+ return {.ErrorCode = static_cast<int32_t>(Error->ErrorCode), .Reason = std::move(Error->ErrorMessage)};
}
- return {.Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200};
+ return {.Bytes = Response.DownloadedBytes,
+ .ElapsedSeconds = Response.ElapsedSeconds,
+ .Success = Response.StatusCode == HttpResponseCode::OK};
}
ZenCacheResult
ZenStructuredCacheSession::GetCacheRecord(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, ZenContentType Type)
{
+ HttpClient Http{m_Client->ServiceUrl()};
+
ExtendableStringBuilder<256> Uri;
- Uri << m_Client->ServiceUrl() << "/z$/";
+ Uri << "/z$/";
if (Namespace != ZenCacheStore::DefaultNamespace)
{
Uri << Namespace << "/";
}
Uri << BucketId << "/" << Key.ToHexString();
- cpr::Session& Session = m_SessionState->GetSession();
-
- Session.SetOption(cpr::Url{Uri.c_str()});
- Session.SetHeader(cpr::Header{{"Accept", std::string{MapContentTypeToString(Type)}}});
- cpr::Response Response = Session.Get();
+ HttpClient::Response Response = Http.Get(Uri, {{"Accept", std::string{MapContentTypeToString(Type)}}});
ZEN_DEBUG("GET {}", Response);
- if (Response.error)
+ if (auto& Error = Response.Error; Error)
{
- return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = std::move(Response.error.message)};
+ return {.ErrorCode = static_cast<int32_t>(Error->ErrorCode), .Reason = std::move(Error->ErrorMessage)};
}
- const bool Success = Response.status_code == 200;
- const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer();
+ const bool Success = Response.StatusCode == HttpResponseCode::OK;
+ const IoBuffer Buffer = Success ? Response.ResponsePayload : IoBuffer{};
- return {.Response = Buffer, .Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success};
+ return {.Response = Buffer, .Bytes = Response.DownloadedBytes, .ElapsedSeconds = Response.ElapsedSeconds, .Success = Success};
}
ZenCacheResult
@@ -163,35 +99,28 @@ ZenStructuredCacheSession::GetCacheChunk(std::string_view Namespace,
const IoHash& Key,
const IoHash& ValueContentId)
{
+ HttpClient Http{m_Client->ServiceUrl()};
+
ExtendableStringBuilder<256> Uri;
- Uri << m_Client->ServiceUrl() << "/z$/";
+ Uri << "/z$/";
if (Namespace != ZenCacheStore::DefaultNamespace)
{
Uri << Namespace << "/";
}
Uri << BucketId << "/" << Key.ToHexString() << "/" << ValueContentId.ToHexString();
- cpr::Session& Session = m_SessionState->GetSession();
-
- Session.SetOption(cpr::Url{Uri.c_str()});
- Session.SetHeader(cpr::Header{{"Accept", "application/x-ue-comp"}});
-
- cpr::Response Response = Session.Get();
+ HttpClient::Response Response = Http.Get(Uri, {{"Accept", "application/x-ue-comp"}});
ZEN_DEBUG("GET {}", Response);
- if (Response.error)
+ if (auto& Error = Response.Error; Error)
{
- return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = std::move(Response.error.message)};
+ return {.ErrorCode = static_cast<int32_t>(Error->ErrorCode), .Reason = std::move(Error->ErrorMessage)};
}
- const bool Success = Response.status_code == 200;
- const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer();
+ const bool Success = Response.StatusCode == HttpResponseCode::OK;
+ const IoBuffer Buffer = Success ? Response.ResponsePayload : IoBuffer{};
- return {.Response = Buffer,
- .Bytes = Response.downloaded_bytes,
- .ElapsedSeconds = Response.elapsed,
- .Reason = Response.reason,
- .Success = Success};
+ return {.Response = Buffer, .Bytes = Response.DownloadedBytes, .ElapsedSeconds = Response.ElapsedSeconds, .Success = Success};
}
ZenCacheResult
@@ -201,33 +130,29 @@ ZenStructuredCacheSession::PutCacheRecord(std::string_view Namespace,
IoBuffer Value,
ZenContentType Type)
{
+ HttpClient Http{m_Client->ServiceUrl()};
+
ExtendableStringBuilder<256> Uri;
- Uri << m_Client->ServiceUrl() << "/z$/";
+ Uri << "/z$/";
if (Namespace != ZenCacheStore::DefaultNamespace)
{
Uri << Namespace << "/";
}
Uri << BucketId << "/" << Key.ToHexString();
- cpr::Session& Session = m_SessionState->GetSession();
-
- Session.SetOption(cpr::Url{Uri.c_str()});
- Session.SetHeader(cpr::Header{{"Content-Type",
- Type == ZenContentType::kCbPackage ? "application/x-ue-cbpkg"
- : Type == ZenContentType::kCbObject ? "application/x-ue-cb"
- : "application/octet-stream"}});
- Session.SetBody(cpr::Body{static_cast<const char*>(Value.Data()), Value.Size()});
+ Value.SetContentType(Type);
- cpr::Response Response = Session.Put();
+ HttpClient::Response Response = Http.Put(Uri, Value);
ZEN_DEBUG("PUT {}", Response);
- if (Response.error)
+ if (auto& Error = Response.Error; Error)
{
- return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = std::move(Response.error.message)};
+ return {.ErrorCode = static_cast<int32_t>(Error->ErrorCode), .Reason = std::move(Error->ErrorMessage)};
}
- const bool Success = Response.status_code == 200 || Response.status_code == 201;
- return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Reason = Response.reason, .Success = Success};
+ const bool Success = Response.StatusCode == HttpResponseCode::OK || Response.StatusCode == HttpResponseCode::Created;
+
+ return {.Bytes = Response.DownloadedBytes, .ElapsedSeconds = Response.ElapsedSeconds, .Success = Success};
}
ZenCacheResult
@@ -237,94 +162,89 @@ ZenStructuredCacheSession::PutCacheValue(std::string_view Namespace,
const IoHash& ValueContentId,
IoBuffer Payload)
{
+ HttpClient Http{m_Client->ServiceUrl()};
+
ExtendableStringBuilder<256> Uri;
- Uri << m_Client->ServiceUrl() << "/z$/";
+ Uri << "/z$/";
if (Namespace != ZenCacheStore::DefaultNamespace)
{
Uri << Namespace << "/";
}
Uri << BucketId << "/" << Key.ToHexString() << "/" << ValueContentId.ToHexString();
- cpr::Session& Session = m_SessionState->GetSession();
-
- Session.SetOption(cpr::Url{Uri.c_str()});
- Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-comp"}});
- Session.SetBody(cpr::Body{static_cast<const char*>(Payload.Data()), Payload.Size()});
+ Payload.SetContentType(HttpContentType::kCompressedBinary);
- cpr::Response Response = Session.Put();
+ HttpClient::Response Response = Http.Put(Uri, Payload);
ZEN_DEBUG("PUT {}", Response);
- if (Response.error)
+ if (auto& Error = Response.Error; Error)
{
- return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = std::move(Response.error.message)};
+ return {.ErrorCode = static_cast<int32_t>(Error->ErrorCode), .Reason = std::move(Error->ErrorMessage)};
}
- const bool Success = Response.status_code == 200 || Response.status_code == 201;
- return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Reason = Response.reason, .Success = Success};
+ const bool Success = Response.StatusCode == HttpResponseCode::OK || Response.StatusCode == HttpResponseCode::Created;
+
+ return {.Bytes = Response.DownloadedBytes, .ElapsedSeconds = Response.ElapsedSeconds, .Success = Success};
}
ZenCacheResult
ZenStructuredCacheSession::InvokeRpc(const CbObjectView& Request)
{
+ HttpClient Http{m_Client->ServiceUrl()};
+
ExtendableStringBuilder<256> Uri;
- Uri << m_Client->ServiceUrl() << "/z$/$rpc";
+ Uri << "/z$/$rpc";
- BinaryWriter Body;
- Request.CopyTo(Body);
+ // TODO: this seems redundant, we should be able to send the data more directly, without the BinaryWriter
- cpr::Session& Session = m_SessionState->GetSession();
+ BinaryWriter BodyWriter;
+ Request.CopyTo(BodyWriter);
- Session.SetOption(cpr::Url{Uri.c_str()});
- Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}, {"Accept", "application/x-ue-cbpkg"}});
- Session.SetBody(cpr::Body{reinterpret_cast<const char*>(Body.GetData()), Body.GetSize()});
+ IoBuffer Body{IoBuffer::Wrap, BodyWriter.GetData(), BodyWriter.GetSize()};
+ Body.SetContentType(HttpContentType::kCbObject);
- cpr::Response Response = Session.Post();
+ HttpClient::Response Response = Http.Post(Uri, Body, {{"Accept", "application/x-ue-cbpkg"}});
ZEN_DEBUG("POST {}", Response);
- if (Response.error)
+ if (auto& Error = Response.Error; Error)
{
- return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = std::move(Response.error.message)};
+ return {.ErrorCode = static_cast<int32_t>(Error->ErrorCode), .Reason = std::move(Error->ErrorMessage)};
}
- const bool Success = Response.status_code == 200;
- const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer();
+ const bool Success = Response.StatusCode == HttpResponseCode::OK;
+ const IoBuffer Buffer = Success ? Response.ResponsePayload : IoBuffer{};
return {.Response = std::move(Buffer),
- .Bytes = Response.uploaded_bytes,
- .ElapsedSeconds = Response.elapsed,
- .Reason = Response.reason,
+ .Bytes = Response.DownloadedBytes,
+ .ElapsedSeconds = Response.ElapsedSeconds,
.Success = Success};
}
ZenCacheResult
ZenStructuredCacheSession::InvokeRpc(const CbPackage& Request)
{
- ExtendableStringBuilder<256> Uri;
- Uri << m_Client->ServiceUrl() << "/z$/$rpc";
+ HttpClient Http{m_Client->ServiceUrl()};
- SharedBuffer Message = FormatPackageMessageBuffer(Request).Flatten();
-
- cpr::Session& Session = m_SessionState->GetSession();
+ ExtendableStringBuilder<256> Uri;
+ Uri << "/z$/$rpc";
- Session.SetOption(cpr::Url{Uri.c_str()});
- Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}});
- Session.SetBody(cpr::Body{reinterpret_cast<const char*>(Message.GetData()), Message.GetSize()});
+ IoBuffer Message = FormatPackageMessageBuffer(Request).Flatten().AsIoBuffer();
+ Message.SetContentType(HttpContentType::kCbPackage);
- cpr::Response Response = Session.Post();
+ HttpClient::Response Response = Http.Post(Uri, Message, {{"Accept", "application/x-ue-cbpkg"}});
ZEN_DEBUG("POST {}", Response);
- if (Response.error)
+ if (auto& Error = Response.Error; Error)
{
- return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = std::move(Response.error.message)};
+ return {.ErrorCode = static_cast<int32_t>(Error->ErrorCode), .Reason = std::move(Error->ErrorMessage)};
}
- const bool Success = Response.status_code == 200;
- const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer();
+ const bool Success = Response.StatusCode == HttpResponseCode::OK;
+ const IoBuffer Buffer = Success ? Response.ResponsePayload : IoBuffer{};
return {.Response = std::move(Buffer),
- .Bytes = Response.uploaded_bytes,
- .ElapsedSeconds = Response.elapsed,
- .Reason = Response.reason,
+ .Bytes = Response.DownloadedBytes,
+ .ElapsedSeconds = Response.ElapsedSeconds,
.Success = Success};
}