aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPer Larsson <[email protected]>2021-09-22 21:21:15 +0200
committerPer Larsson <[email protected]>2021-09-22 21:21:15 +0200
commitddb84cb54f7cf6777d2ccaed4338fff56b75922c (patch)
treebffa905f41526a5ed0ddbefed45573a069a2d845
parentMade icon resource path relative, as it should be (diff)
downloadzen-ddb84cb54f7cf6777d2ccaed4338fff56b75922c.tar.xz
zen-ddb84cb54f7cf6777d2ccaed4338fff56b75922c.zip
Made upstream endpoints more resilient to failures by checking health/reconnecting at regular intervals.
-rw-r--r--zenserver/upstream/jupiter.cpp30
-rw-r--r--zenserver/upstream/jupiter.h10
-rw-r--r--zenserver/upstream/upstreamcache.cpp206
-rw-r--r--zenserver/upstream/upstreamcache.h44
-rw-r--r--zenserver/upstream/zen.cpp25
-rw-r--r--zenserver/upstream/zen.h10
-rw-r--r--zenserver/zenserver.cpp4
7 files changed, 257 insertions, 72 deletions
diff --git a/zenserver/upstream/jupiter.cpp b/zenserver/upstream/jupiter.cpp
index 2e74602db..14da8cbcc 100644
--- a/zenserver/upstream/jupiter.cpp
+++ b/zenserver/upstream/jupiter.cpp
@@ -90,6 +90,11 @@ CloudCacheSession::GetDerivedData(std::string_view BucketId, std::string_view Ke
cpr::Response Response = Session.Get();
ZEN_DEBUG("GET {}", Response);
+ if (Response.error)
+ {
+ return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message};
+ }
+
const bool Success = Response.status_code == 200;
const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer();
@@ -121,6 +126,11 @@ CloudCacheSession::GetRef(std::string_view BucketId, const IoHash& Key, ZenConte
cpr::Response Response = Session.Get();
ZEN_DEBUG("GET {}", Response);
+ if (Response.error)
+ {
+ return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message};
+ }
+
const bool Success = Response.status_code == 200;
const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer();
@@ -144,6 +154,11 @@ CloudCacheSession::GetCompressedBlob(const IoHash& Key)
cpr::Response Response = Session.Get();
ZEN_DEBUG("GET {}", Response);
+ if (Response.error)
+ {
+ return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message};
+ }
+
const bool Success = Response.status_code == 200;
const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer();
@@ -171,6 +186,11 @@ CloudCacheSession::PutDerivedData(std::string_view BucketId, std::string_view Ke
cpr::Response Response = Session.Put();
ZEN_DEBUG("PUT {}", Response);
+ if (Response.error)
+ {
+ return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message};
+ }
+
return {.Bytes = Response.uploaded_bytes,
.ElapsedSeconds = Response.elapsed,
.Success = (Response.status_code == 200 || Response.status_code == 201)};
@@ -204,6 +224,11 @@ CloudCacheSession::PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer
cpr::Response Response = Session.Put();
ZEN_DEBUG("PUT {}", Response);
+ if (Response.error)
+ {
+ return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message};
+ }
+
return {.Bytes = Response.uploaded_bytes,
.ElapsedSeconds = Response.elapsed,
.Success = (Response.status_code == 200 || Response.status_code == 201)};
@@ -227,6 +252,11 @@ CloudCacheSession::PutCompressedBlob(const IoHash& Key, IoBuffer Blob)
cpr::Response Response = Session.Put();
ZEN_DEBUG("PUT {}", Response);
+ if (Response.error)
+ {
+ return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message};
+ }
+
return {.Bytes = Response.uploaded_bytes,
.ElapsedSeconds = Response.elapsed,
.Success = (Response.status_code == 200 || Response.status_code == 201)};
diff --git a/zenserver/upstream/jupiter.h b/zenserver/upstream/jupiter.h
index 21217387c..94e7e7680 100644
--- a/zenserver/upstream/jupiter.h
+++ b/zenserver/upstream/jupiter.h
@@ -42,10 +42,12 @@ private:
struct CloudCacheResult
{
- IoBuffer Response;
- int64_t Bytes = {};
- double ElapsedSeconds = {};
- bool Success = false;
+ IoBuffer Response;
+ int64_t Bytes = {};
+ double ElapsedSeconds = {};
+ int32_t ErrorCode = {};
+ std::string Reason;
+ bool Success = false;
};
/**
diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp
index d6b6d44be..a889fb984 100644
--- a/zenserver/upstream/upstreamcache.cpp
+++ b/zenserver/upstream/upstreamcache.cpp
@@ -23,7 +23,6 @@
#include <algorithm>
#include <atomic>
#include <deque>
-#include <limits>
#include <thread>
#include <unordered_map>
@@ -106,11 +105,23 @@ namespace detail {
virtual ~JupiterUpstreamEndpoint() = default;
- virtual bool Initialize() override
+ virtual bool IsHealthy() const override { return m_HealthOk.load(); }
+
+ virtual UpstreamEndpointHealth CheckHealth() override
{
- CloudCacheSession Session(m_Client);
- const CloudCacheResult Result = Session.Authenticate();
- return Result.Success;
+ try
+ {
+ CloudCacheSession Session(m_Client);
+ const CloudCacheResult Result = Session.Authenticate();
+
+ m_HealthOk = Result.ErrorCode == 0;
+
+ return {.Reason = std::move(Result.Reason), .Ok = Result.Success};
+ }
+ catch (std::exception& Err)
+ {
+ return {.Reason = Err.what(), .Ok = false};
+ }
}
virtual std::string_view DisplayName() const override { return m_DisplayName; }
@@ -143,6 +154,7 @@ namespace detail {
CacheRecord.IterateAttachments([&Session, &Result, &Package](CbFieldView AttachmentHash) {
CloudCacheResult AttachmentResult = Session.GetCompressedBlob(AttachmentHash.AsHash());
Result.ElapsedSeconds += AttachmentResult.ElapsedSeconds;
+ Result.ErrorCode = AttachmentResult.ErrorCode;
if (CompressedBuffer Chunk = CompressedBuffer::FromCompressed(SharedBuffer(AttachmentResult.Response)))
{
@@ -169,14 +181,16 @@ namespace detail {
}
}
+ m_HealthOk = Result.ErrorCode == 0;
+
return {.Value = Result.Response,
.Bytes = Result.Bytes,
.ElapsedSeconds = Result.ElapsedSeconds,
.Success = Result.Success};
}
- catch (std::exception& e)
+ catch (std::exception& Err)
{
- return {.Reason = std::string(e.what()), .Success = false};
+ return {.Error{.StatusCode = UpstreamStatusCode::Error, .Reason = Err.what()}};
}
}
@@ -187,14 +201,16 @@ namespace detail {
CloudCacheSession Session(m_Client);
const CloudCacheResult Result = Session.GetCompressedBlob(PayloadKey.PayloadId);
+ m_HealthOk = Result.ErrorCode == 0;
+
return {.Value = Result.Response,
.Bytes = Result.Bytes,
.ElapsedSeconds = Result.ElapsedSeconds,
.Success = Result.Success};
}
- catch (std::exception& e)
+ catch (std::exception& Err)
{
- return {.Reason = std::string(e.what()), .Success = false};
+ return {.Error{.StatusCode = UpstreamStatusCode::Error, .Reason = Err.what()}};
}
}
@@ -277,6 +293,7 @@ namespace detail {
bool m_UseLegacyDdc;
std::string m_DisplayName;
RefPtr<CloudCacheClient> m_Client;
+ std::atomic_bool m_HealthOk{false};
};
class ZenUpstreamEndpoint final : public UpstreamEndpoint
@@ -285,27 +302,33 @@ namespace detail {
ZenUpstreamEndpoint(std::string_view ServiceUrl)
{
using namespace fmt::literals;
- m_DisplayName = "Zen - '{}'"_format(ServiceUrl);
+ m_DisplayName = "Zen - {}"_format(ServiceUrl);
m_Client = new ZenStructuredCacheClient(ServiceUrl);
}
~ZenUpstreamEndpoint() = default;
- virtual bool Initialize() override
+ virtual bool IsHealthy() const override { return m_HealthOk; }
+
+ virtual UpstreamEndpointHealth CheckHealth() override
{
try
{
ZenStructuredCacheSession Session(*m_Client);
ZenCacheResult Result;
+
for (int32_t Attempt = 0, MaxAttempts = 3; Attempt < MaxAttempts && !Result.Success; ++Attempt)
{
Result = Session.SayHello();
}
- return Result.Success;
+
+ m_HealthOk = Result.ErrorCode == 0;
+
+ return {.Reason = std::move(Result.Reason), .Ok = m_HealthOk};
}
- catch (std::exception&)
+ catch (std::exception& Err)
{
- return false;
+ return {.Reason = Err.what(), .Ok = false};
}
}
@@ -318,14 +341,16 @@ namespace detail {
ZenStructuredCacheSession Session(*m_Client);
const ZenCacheResult Result = Session.GetCacheRecord(CacheKey.Bucket, CacheKey.Hash, Type);
+ m_HealthOk = Result.ErrorCode == 0;
+
return {.Value = Result.Response,
.Bytes = Result.Bytes,
.ElapsedSeconds = Result.ElapsedSeconds,
.Success = Result.Success};
}
- catch (std::exception& e)
+ catch (std::exception& Err)
{
- return {.Reason = std::string(e.what()), .Success = false};
+ return {.Error{.StatusCode = UpstreamStatusCode::Error, .Reason = Err.what()}};
}
}
@@ -337,14 +362,16 @@ namespace detail {
const ZenCacheResult Result =
Session.GetCachePayload(PayloadKey.CacheKey.Bucket, PayloadKey.CacheKey.Hash, PayloadKey.PayloadId);
+ m_HealthOk = Result.ErrorCode == 0;
+
return {.Value = Result.Response,
.Bytes = Result.Bytes,
.ElapsedSeconds = Result.ElapsedSeconds,
.Success = Result.Success};
}
- catch (std::exception& e)
+ catch (std::exception& Err)
{
- return {.Reason = std::string(e.what()), .Success = false};
+ return {.Error{.StatusCode = UpstreamStatusCode::Error, .Reason = Err.what()}};
}
}
@@ -390,6 +417,8 @@ namespace detail {
CacheRecord.CacheKey.Hash,
PackagePayload,
CacheRecord.Type);
+
+ m_HealthOk = Result.ErrorCode == 0;
}
TotalBytes = Result.Bytes;
@@ -406,6 +435,8 @@ namespace detail {
CacheRecord.CacheKey.Hash,
CacheRecord.PayloadIds[Idx],
Payloads[Idx]);
+
+ m_HealthOk = Result.ErrorCode == 0;
}
TotalBytes += Result.Bytes;
@@ -425,6 +456,8 @@ namespace detail {
{
Result =
Session.PutCacheRecord(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RecordValue, CacheRecord.Type);
+
+ m_HealthOk = Result.ErrorCode == 0;
}
TotalBytes += Result.Bytes;
@@ -435,6 +468,7 @@ namespace detail {
}
catch (std::exception& e)
{
+ m_HealthOk = false;
return {.Reason = std::string(e.what()), .Success = false};
}
}
@@ -442,6 +476,7 @@ namespace detail {
private:
std::string m_DisplayName;
RefPtr<ZenStructuredCacheClient> m_Client;
+ std::atomic_bool m_HealthOk{false};
};
} // namespace detail
@@ -548,27 +583,35 @@ public:
virtual bool Initialize() override
{
- auto NewEnd = std::remove_if(std::begin(m_Endpoints), std::end(m_Endpoints), [this](auto& Endpoint) {
- const bool Ok = Endpoint->Initialize();
- ZEN_INFO("{} [{}]", Endpoint->DisplayName(), Ok ? "OK" : "FAILED");
- return !Ok;
- });
+ for (auto& Endpoint : m_Endpoints)
+ {
+ const UpstreamEndpointHealth Health = Endpoint->CheckHealth();
+ if (Health.Ok)
+ {
+ ZEN_INFO("initialize endpoint '{}' OK", Endpoint->DisplayName());
+ }
+ else
+ {
+ ZEN_WARN("initialize endpoint '{}' FAILED, reason '{}'", Endpoint->DisplayName(), Health.Reason);
+ }
+ }
- m_Endpoints.erase(NewEnd, std::end(m_Endpoints));
- m_IsRunning = !m_Endpoints.empty();
+ m_RunState.IsRunning = !m_Endpoints.empty();
- if (m_IsRunning)
+ if (m_RunState.IsRunning)
{
for (uint32_t Idx = 0; Idx < m_Options.ThreadCount; Idx++)
{
m_UpstreamThreads.emplace_back(&DefaultUpstreamCache::ProcessUpstreamQueue, this);
}
+
+ m_EndpointMonitorThread = std::thread(&DefaultUpstreamCache::MonitorEndpoints, this);
}
- return m_IsRunning;
+ return m_RunState.IsRunning;
}
- virtual void AddEndpoint(std::unique_ptr<UpstreamEndpoint> Endpoint) override { m_Endpoints.emplace_back(std::move(Endpoint)); }
+ virtual void RegisterEndpoint(std::unique_ptr<UpstreamEndpoint> Endpoint) override { m_Endpoints.emplace_back(std::move(Endpoint)); }
virtual GetUpstreamCacheResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) override
{
@@ -576,10 +619,13 @@ public:
{
for (auto& Endpoint : m_Endpoints)
{
- if (GetUpstreamCacheResult Result = Endpoint->GetCacheRecord(CacheKey, Type); Result.Success)
+ if (Endpoint->IsHealthy())
{
- m_Stats.Add(*Endpoint, Result);
- return Result;
+ if (GetUpstreamCacheResult Result = Endpoint->GetCacheRecord(CacheKey, Type); Result.Success)
+ {
+ m_Stats.Add(*Endpoint, Result);
+ return Result;
+ }
}
}
}
@@ -593,10 +639,13 @@ public:
{
for (auto& Endpoint : m_Endpoints)
{
- if (GetUpstreamCacheResult Result = Endpoint->GetCachePayload(PayloadKey); Result.Success)
+ if (Endpoint->IsHealthy())
{
- m_Stats.Add(*Endpoint, Result);
- return Result;
+ if (GetUpstreamCacheResult Result = Endpoint->GetCachePayload(PayloadKey); Result.Success)
+ {
+ m_Stats.Add(*Endpoint, Result);
+ return Result;
+ }
}
}
}
@@ -606,7 +655,7 @@ public:
virtual EnqueueResult EnqueueUpstream(UpstreamCacheRecord CacheRecord) override
{
- if (m_IsRunning.load() && m_Options.WriteUpstream)
+ if (m_RunState.IsRunning && m_Options.WriteUpstream)
{
if (!m_UpstreamThreads.empty())
{
@@ -655,18 +704,21 @@ private:
for (auto& Endpoint : m_Endpoints)
{
- const PutUpstreamCacheResult Result = Endpoint->PutCacheRecord(CacheRecord, CacheValue.Value, std::span(Payloads));
- if (Result.Success)
+ if (Endpoint->IsHealthy())
{
- m_Stats.Add(*Endpoint, Result);
- }
- else
- {
- ZEN_WARN("process upstream FAILED, '{}/{}' FAILED, endpoint '{}', reason: '{}'",
- CacheRecord.CacheKey.Bucket,
- CacheRecord.CacheKey.Hash,
- Endpoint->DisplayName(),
- Result.Reason);
+ const PutUpstreamCacheResult Result = Endpoint->PutCacheRecord(CacheRecord, CacheValue.Value, std::span(Payloads));
+ if (Result.Success)
+ {
+ m_Stats.Add(*Endpoint, Result);
+ }
+ else
+ {
+ ZEN_WARN("process upstream FAILED, '{}/{}' FAILED, endpoint '{}', reason: '{}'",
+ CacheRecord.CacheKey.Bucket,
+ CacheRecord.CacheKey.Hash,
+ Endpoint->DisplayName(),
+ Result.Reason);
+ }
}
}
}
@@ -688,33 +740,82 @@ private:
}
}
- if (!m_IsRunning.load())
+ if (!m_RunState.IsRunning)
{
break;
}
}
}
+ void MonitorEndpoints()
+ {
+ for (;;)
+ {
+ {
+ std::unique_lock lk(m_RunState.Mutex);
+ if (m_RunState.ExitSignal.wait_for(lk, m_Options.HealthCheckInterval, [this]() { return !m_RunState.IsRunning.load(); }))
+ {
+ break;
+ }
+ }
+
+ for (auto& Endpoint : m_Endpoints)
+ {
+ if (!Endpoint->IsHealthy())
+ {
+ if (const UpstreamEndpointHealth Health = Endpoint->CheckHealth(); Health.Ok)
+ {
+ ZEN_INFO("health check endpoint '{}' OK", Endpoint->DisplayName(), Health.Reason);
+ }
+ else
+ {
+ ZEN_WARN("health check endpoint '{}' FAILED, reason '{}'", Endpoint->DisplayName(), Health.Reason);
+ }
+ }
+ }
+ }
+ }
+
void Shutdown()
{
- if (m_IsRunning.load())
+ if (m_RunState.Stop())
{
- m_IsRunning.store(false);
m_UpstreamQueue.CompleteAdding();
-
for (std::thread& Thread : m_UpstreamThreads)
{
Thread.join();
}
+ m_EndpointMonitorThread.join();
m_UpstreamThreads.clear();
m_Endpoints.clear();
}
}
+ spdlog::logger& Log() { return m_Log; }
+
using UpstreamQueue = detail::BlockingQueue<UpstreamCacheRecord>;
- spdlog::logger& Log() { return m_Log; }
+ struct RunState
+ {
+ std::mutex Mutex;
+ std::condition_variable ExitSignal;
+ std::atomic_bool IsRunning{false};
+
+ bool Stop()
+ {
+ bool Stopped = false;
+ {
+ std::lock_guard _(Mutex);
+ Stopped = IsRunning.exchange(false);
+ }
+ if (Stopped)
+ {
+ ExitSignal.notify_all();
+ }
+ return Stopped;
+ }
+ };
spdlog::logger& m_Log;
UpstreamCacheOptions m_Options;
@@ -724,7 +825,8 @@ private:
UpstreamStats m_Stats;
std::vector<std::unique_ptr<UpstreamEndpoint>> m_Endpoints;
std::vector<std::thread> m_UpstreamThreads;
- std::atomic_bool m_IsRunning{false};
+ std::thread m_EndpointMonitorThread;
+ RunState m_RunState;
};
//////////////////////////////////////////////////////////////////////////
diff --git a/zenserver/upstream/upstreamcache.h b/zenserver/upstream/upstreamcache.h
index 142fe260f..96ee8bddc 100644
--- a/zenserver/upstream/upstreamcache.h
+++ b/zenserver/upstream/upstreamcache.h
@@ -6,6 +6,7 @@
#include <zencore/iohash.h>
#include <zencore/zencore.h>
+#include <chrono>
#include <memory>
namespace zen {
@@ -35,18 +36,33 @@ struct UpstreamCacheRecord
struct UpstreamCacheOptions
{
- uint32_t ThreadCount = 4;
- bool ReadUpstream = true;
- bool WriteUpstream = true;
+ std::chrono::seconds HealthCheckInterval{5};
+ uint32_t ThreadCount = 4;
+ bool ReadUpstream = true;
+ bool WriteUpstream = true;
+};
+
+enum class UpstreamStatusCode : uint8_t
+{
+ Ok,
+ Error
+};
+
+struct UpstreamError
+{
+ UpstreamStatusCode StatusCode = UpstreamStatusCode::Ok;
+ std::string Reason;
+
+ explicit operator bool() const { return StatusCode != UpstreamStatusCode::Ok; }
};
struct GetUpstreamCacheResult
{
- IoBuffer Value;
- std::string Reason;
- int64_t Bytes = {};
- double ElapsedSeconds = {};
- bool Success = false;
+ IoBuffer Value;
+ UpstreamError Error;
+ int64_t Bytes = {};
+ double ElapsedSeconds = {};
+ bool Success = false;
};
struct PutUpstreamCacheResult
@@ -57,6 +73,12 @@ struct PutUpstreamCacheResult
bool Success = false;
};
+struct UpstreamEndpointHealth
+{
+ std::string Reason;
+ bool Ok = false;
+};
+
/**
* The upstream endpont is responsible for handling upload/downloading of cache records.
*/
@@ -65,7 +87,9 @@ class UpstreamEndpoint
public:
virtual ~UpstreamEndpoint() = default;
- virtual bool Initialize() = 0;
+ virtual bool IsHealthy() const = 0;
+
+ virtual UpstreamEndpointHealth CheckHealth() = 0;
virtual std::string_view DisplayName() const = 0;
@@ -88,7 +112,7 @@ public:
virtual bool Initialize() = 0;
- virtual void AddEndpoint(std::unique_ptr<UpstreamEndpoint> Endpoint) = 0;
+ virtual void RegisterEndpoint(std::unique_ptr<UpstreamEndpoint> Endpoint) = 0;
virtual GetUpstreamCacheResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) = 0;
diff --git a/zenserver/upstream/zen.cpp b/zenserver/upstream/zen.cpp
index 7f689d7f3..710d381c6 100644
--- a/zenserver/upstream/zen.cpp
+++ b/zenserver/upstream/zen.cpp
@@ -391,6 +391,11 @@ ZenStructuredCacheSession::SayHello()
Session.SetOption(cpr::Url{Uri.c_str()});
cpr::Response Response = Session.Get();
+ if (Response.error)
+ {
+ return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = std::move(Response.error.message)};
+ }
+
return {.Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200};
}
@@ -411,6 +416,11 @@ ZenStructuredCacheSession::GetCacheRecord(std::string_view BucketId, const IoHas
cpr::Response Response = Session.Get();
ZEN_DEBUG("GET {}", Response);
+ if (Response.error)
+ {
+ return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = std::move(Response.error.message)};
+ }
+
const bool Success = Response.status_code == 200;
const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer();
@@ -431,6 +441,11 @@ ZenStructuredCacheSession::GetCachePayload(std::string_view BucketId, const IoHa
cpr::Response Response = Session.Get();
ZEN_DEBUG("GET {}", Response);
+ if (Response.error)
+ {
+ return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = std::move(Response.error.message)};
+ }
+
const bool Success = Response.status_code == 200;
const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer();
@@ -455,6 +470,11 @@ ZenStructuredCacheSession::PutCacheRecord(std::string_view BucketId, const IoHas
cpr::Response Response = Session.Put();
ZEN_DEBUG("PUT {}", Response);
+ if (Response.error)
+ {
+ return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = std::move(Response.error.message)};
+ }
+
return {.Bytes = Response.uploaded_bytes,
.ElapsedSeconds = Response.elapsed,
.Success = (Response.status_code == 200 || Response.status_code == 201)};
@@ -475,6 +495,11 @@ ZenStructuredCacheSession::PutCachePayload(std::string_view BucketId, const IoHa
cpr::Response Response = Session.Put();
ZEN_DEBUG("PUT {}", Response);
+ if (Response.error)
+ {
+ return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = std::move(Response.error.message)};
+ }
+
return {.Bytes = Response.uploaded_bytes,
.ElapsedSeconds = Response.elapsed,
.Success = (Response.status_code == 200 || Response.status_code == 201)};
diff --git a/zenserver/upstream/zen.h b/zenserver/upstream/zen.h
index 36cfd1217..48886096d 100644
--- a/zenserver/upstream/zen.h
+++ b/zenserver/upstream/zen.h
@@ -91,10 +91,12 @@ namespace detail {
struct ZenCacheResult
{
- IoBuffer Response;
- int64_t Bytes = {};
- double ElapsedSeconds = {};
- bool Success = false;
+ IoBuffer Response;
+ int64_t Bytes = {};
+ double ElapsedSeconds = {};
+ int32_t ErrorCode = {};
+ std::string Reason;
+ bool Success = false;
};
/** Zen Structured Cache session
diff --git a/zenserver/zenserver.cpp b/zenserver/zenserver.cpp
index cf24dc224..e3b61568f 100644
--- a/zenserver/zenserver.cpp
+++ b/zenserver/zenserver.cpp
@@ -193,7 +193,7 @@ public:
if (!UpstreamConfig.ZenConfig.Url.empty())
{
std::unique_ptr<zen::UpstreamEndpoint> ZenEndpoint = zen::MakeZenUpstreamEndpoint(UpstreamConfig.ZenConfig.Url);
- UpstreamCache->AddEndpoint(std::move(ZenEndpoint));
+ UpstreamCache->RegisterEndpoint(std::move(ZenEndpoint));
}
{
@@ -221,7 +221,7 @@ public:
if (!Options.ServiceUrl.empty())
{
std::unique_ptr<zen::UpstreamEndpoint> JupiterEndpoint = zen::MakeJupiterUpstreamEndpoint(Options);
- UpstreamCache->AddEndpoint(std::move(JupiterEndpoint));
+ UpstreamCache->RegisterEndpoint(std::move(JupiterEndpoint));
}
}