aboutsummaryrefslogtreecommitdiff
path: root/zenserver/upstream/zen.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'zenserver/upstream/zen.cpp')
-rw-r--r--zenserver/upstream/zen.cpp58
1 files changed, 49 insertions, 9 deletions
diff --git a/zenserver/upstream/zen.cpp b/zenserver/upstream/zen.cpp
index 14333f45a..cd7f48334 100644
--- a/zenserver/upstream/zen.cpp
+++ b/zenserver/upstream/zen.cpp
@@ -311,8 +311,17 @@ namespace detail {
ZenCacheSessionState(ZenStructuredCacheClient& Client) : OwnerClient(Client) {}
~ZenCacheSessionState() {}
- void Reset() {}
+ void Reset(std::chrono::milliseconds ConnectTimeout, std::chrono::milliseconds Timeout)
+ {
+ Session.SetBody({});
+ Session.SetHeader({});
+ Session.SetConnectTimeout(ConnectTimeout);
+ Session.SetTimeout(Timeout);
+ }
+
+ cpr::Session& GetSession() { return Session; }
+ private:
ZenStructuredCacheClient& OwnerClient;
cpr::Session Session;
};
@@ -321,9 +330,11 @@ namespace detail {
//////////////////////////////////////////////////////////////////////////
-ZenStructuredCacheClient::ZenStructuredCacheClient(std::string_view ServiceUrl)
+ZenStructuredCacheClient::ZenStructuredCacheClient(const ZenStructuredCacheClientOptions& Options)
: m_Log(logging::Get(std::string_view("zenclient")))
-, m_ServiceUrl(ServiceUrl)
+, m_ServiceUrl(Options.Url)
+, m_ConnectTimeout(Options.ConnectTimeout)
+, m_Timeout(Options.Timeout)
{
}
@@ -347,7 +358,7 @@ ZenStructuredCacheClient::AllocSessionState()
State = new detail::ZenCacheSessionState(*this);
}
- State->Reset();
+ State->Reset(m_ConnectTimeout, m_Timeout);
return State;
}
@@ -381,7 +392,7 @@ ZenStructuredCacheSession::CheckHealth()
ExtendableStringBuilder<256> Uri;
Uri << m_Client.ServiceUrl() << "/health/check";
- cpr::Session& Session = m_SessionState->Session;
+ cpr::Session& Session = m_SessionState->GetSession();
Session.SetOption(cpr::Url{Uri.c_str()});
cpr::Response Response = Session.Get();
@@ -399,7 +410,7 @@ ZenStructuredCacheSession::GetCacheRecord(std::string_view BucketId, const IoHas
ExtendableStringBuilder<256> Uri;
Uri << m_Client.ServiceUrl() << "/z$/" << BucketId << "/" << Key.ToHexString();
- cpr::Session& Session = m_SessionState->Session;
+ cpr::Session& Session = m_SessionState->GetSession();
Session.SetOption(cpr::Url{Uri.c_str()});
Session.SetHeader(cpr::Header{{"Accept",
@@ -427,7 +438,7 @@ ZenStructuredCacheSession::GetCachePayload(std::string_view BucketId, const IoHa
ExtendableStringBuilder<256> Uri;
Uri << m_Client.ServiceUrl() << "/z$/" << BucketId << "/" << Key.ToHexString() << "/" << PayloadId.ToHexString();
- cpr::Session& Session = m_SessionState->Session;
+ cpr::Session& Session = m_SessionState->GetSession();
Session.SetOption(cpr::Url{Uri.c_str()});
Session.SetHeader(cpr::Header{{"Accept", "application/x-ue-comp"}});
@@ -452,7 +463,7 @@ ZenStructuredCacheSession::PutCacheRecord(std::string_view BucketId, const IoHas
ExtendableStringBuilder<256> Uri;
Uri << m_Client.ServiceUrl() << "/z$/" << BucketId << "/" << Key.ToHexString();
- cpr::Session& Session = m_SessionState->Session;
+ cpr::Session& Session = m_SessionState->GetSession();
Session.SetOption(cpr::Url{Uri.c_str()});
Session.SetHeader(cpr::Header{{"Content-Type",
@@ -480,7 +491,7 @@ ZenStructuredCacheSession::PutCachePayload(std::string_view BucketId, const IoHa
ExtendableStringBuilder<256> Uri;
Uri << m_Client.ServiceUrl() << "/z$/" << BucketId << "/" << Key.ToHexString() << "/" << PayloadId.ToHexString();
- cpr::Session& Session = m_SessionState->Session;
+ cpr::Session& Session = m_SessionState->GetSession();
Session.SetOption(cpr::Url{Uri.c_str()});
Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-comp"}});
@@ -499,4 +510,33 @@ ZenStructuredCacheSession::PutCachePayload(std::string_view BucketId, const IoHa
.Success = (Response.status_code == 200 || Response.status_code == 201)};
}
+ZenCacheResult
+ZenStructuredCacheSession::InvokeRpc(const CbObjectView& Request)
+{
+ ExtendableStringBuilder<256> Uri;
+ Uri << m_Client.ServiceUrl() << "/z$/$rpc";
+
+ BinaryWriter Body;
+ Request.CopyTo(Body);
+
+ cpr::Session& Session = m_SessionState->GetSession();
+
+ Session.SetOption(cpr::Url{Uri.c_str()});
+ Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}, {"Accept", "application/x-ue-cbpkg"}});
+ Session.SetBody(cpr::Body{reinterpret_cast<const char*>(Body.GetData()), Body.GetSize()});
+
+ cpr::Response Response = Session.Post();
+ ZEN_DEBUG("POST {}", Response);
+
+ if (Response.error)
+ {
+ return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = std::move(Response.error.message)};
+ }
+
+ const bool Success = Response.status_code == 200;
+ const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer();
+
+ return {.Response = std::move(Buffer), .Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success};
+}
+
} // namespace zen