aboutsummaryrefslogtreecommitdiff
path: root/zenserver/upstream/upstreamcache.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'zenserver/upstream/upstreamcache.cpp')
-rw-r--r--zenserver/upstream/upstreamcache.cpp678
1 files changed, 537 insertions, 141 deletions
diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp
index 168449d05..e2dc09872 100644
--- a/zenserver/upstream/upstreamcache.cpp
+++ b/zenserver/upstream/upstreamcache.cpp
@@ -40,13 +40,15 @@ namespace detail {
: m_Log(zen::logging::Get("upstream"))
, m_UseLegacyDdc(Options.UseLegacyDdc)
{
- using namespace fmt::literals;
- m_DisplayName = "Jupiter - '{}'"_format(Options.ServiceUrl);
- m_Client = new CloudCacheClient(Options);
+ m_Info.Name = "Horde"sv;
+ m_Info.Url = Options.ServiceUrl;
+ m_Client = new CloudCacheClient(Options);
}
virtual ~JupiterUpstreamEndpoint() = default;
+ virtual const UpstreamEndpointInfo& GetEndpointInfo() const { return m_Info; }
+
virtual UpstreamEndpointHealth Initialize() override { return CheckHealth(); }
virtual bool IsHealthy() const override { return m_HealthOk.load(); }
@@ -58,7 +60,7 @@ namespace detail {
CloudCacheSession Session(m_Client);
const CloudCacheResult Result = Session.Authenticate();
- m_HealthOk = Result.ErrorCode == 0;
+ m_HealthOk = Result.Success && Result.ErrorCode == 0;
return {.Reason = std::move(Result.Reason), .Ok = Result.Success};
}
@@ -68,9 +70,7 @@ namespace detail {
}
}
- virtual std::string_view DisplayName() const override { return m_DisplayName; }
-
- virtual GetUpstreamCacheResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) override
+ virtual GetUpstreamCacheResult GetCacheRecord(CacheKey CacheKey, ZenContentType Type) override
{
try
{
@@ -144,12 +144,69 @@ namespace detail {
}
}
- virtual GetUpstreamCacheResult GetCachePayload(UpstreamPayloadKey PayloadKey) override
+ virtual GetUpstreamCacheResult GetCacheRecords(std::span<CacheKey> CacheKeys,
+ std::span<size_t> KeyIndex,
+ const CacheRecordPolicy& Policy,
+ OnCacheRecordGetComplete&& OnComplete) override
+ {
+ ZEN_UNUSED(Policy);
+
+ CloudCacheSession Session(m_Client);
+ GetUpstreamCacheResult Result;
+
+ for (size_t Index : KeyIndex)
+ {
+ const CacheKey& CacheKey = CacheKeys[Index];
+ CbPackage Package;
+ CbObject Record;
+
+ if (!Result.Error)
+ {
+ CloudCacheResult RefResult = Session.GetRef(CacheKey.Bucket, CacheKey.Hash, ZenContentType::kCbObject);
+ AppendResult(RefResult, Result);
+
+ if (RefResult.ErrorCode == 0)
+ {
+ const CbValidateError ValidationResult = ValidateCompactBinary(RefResult.Response, CbValidateMode::All);
+ if (ValidationResult == CbValidateError::None)
+ {
+ Record = LoadCompactBinaryObject(RefResult.Response);
+ Record.IterateAttachments([this, &Session, &Result, &Package](CbFieldView AttachmentHash) {
+ CloudCacheResult BlobResult = Session.GetCompressedBlob(AttachmentHash.AsHash());
+ AppendResult(BlobResult, Result);
+
+ if (BlobResult.ErrorCode == 0)
+ {
+ if (CompressedBuffer Chunk = CompressedBuffer::FromCompressed(SharedBuffer(BlobResult.Response)))
+ {
+ Package.AddAttachment(CbAttachment(Chunk));
+ }
+ }
+ else
+ {
+ m_HealthOk = false;
+ }
+ });
+ }
+ }
+ else
+ {
+ m_HealthOk = false;
+ }
+ }
+
+ OnComplete({.CacheKey = CacheKey, .KeyIndex = Index, .Record = Record, .Package = Package});
+ }
+
+ return Result;
+ }
+
+ virtual GetUpstreamCacheResult GetCachePayload(const CacheKey&, const IoHash& PayloadId) override
{
try
{
CloudCacheSession Session(m_Client);
- const CloudCacheResult Result = Session.GetCompressedBlob(PayloadKey.PayloadId);
+ const CloudCacheResult Result = Session.GetCompressedBlob(PayloadId);
if (Result.ErrorCode == 0)
{
@@ -171,12 +228,41 @@ namespace detail {
}
}
+ virtual GetUpstreamCacheResult GetCachePayloads(std::span<CacheChunkRequest> CacheChunkRequests,
+ std::span<size_t> RequestIndex,
+ OnCachePayloadGetComplete&& OnComplete) override final
+ {
+ CloudCacheSession Session(m_Client);
+ GetUpstreamCacheResult Result;
+
+ for (size_t Index : RequestIndex)
+ {
+ const CacheChunkRequest& Request = CacheChunkRequests[Index];
+ IoBuffer Payload;
+
+ if (!Result.Error)
+ {
+ const CloudCacheResult BlobResult = Session.GetCompressedBlob(Request.ChunkId);
+ Payload = BlobResult.Response;
+
+ AppendResult(BlobResult, Result);
+ m_HealthOk = BlobResult.ErrorCode == 0;
+ }
+
+ OnComplete({.Request = Request, .RequestIndex = Index, .Payload = Payload});
+ }
+
+ return Result;
+ }
+
virtual PutUpstreamCacheResult PutCacheRecord(const UpstreamCacheRecord& CacheRecord,
IoBuffer RecordValue,
std::span<IoBuffer const> Payloads) override
{
+ using namespace fmt::literals;
+
ZEN_ASSERT(CacheRecord.PayloadIds.size() == Payloads.size());
- const uint32_t MaxAttempts = 3;
+ const int32_t MaxAttempts = 3;
try
{
@@ -200,125 +286,132 @@ namespace detail {
}
}
- return {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = Result.Success};
+ m_HealthOk = Result.ErrorCode == 0;
+
+ return {.Reason = std::move(Result.Reason),
+ .Bytes = Result.Bytes,
+ .ElapsedSeconds = Result.ElapsedSeconds,
+ .Success = Result.Success};
}
else
{
- bool Success = false;
int64_t TotalBytes = 0ull;
double TotalElapsedSeconds = 0.0;
- for (size_t Idx = 0, Count = Payloads.size(); Idx < Count; Idx++)
- {
- Success = false;
- for (int32_t Attempt = 0; Attempt < MaxAttempts; Attempt++)
+ const auto PutBlobs = [&](std::span<IoHash> PayloadIds, std::string& OutReason) -> bool {
+ for (const IoHash& PayloadId : PayloadIds)
{
- if (CloudCacheResult Result = Session.PutCompressedBlob(CacheRecord.PayloadIds[Idx], Payloads[Idx]);
- Result.Success)
+ const auto It = std::find(std::begin(CacheRecord.PayloadIds), std::end(CacheRecord.PayloadIds), PayloadId);
+
+ if (It == std::end(CacheRecord.PayloadIds))
{
- TotalBytes += Result.Bytes;
- TotalElapsedSeconds += Result.ElapsedSeconds;
- Success = true;
- break;
+ OutReason = "payload '{}' MISSING from local cache"_format(PayloadId);
+ return false;
}
- }
- if (!Success)
- {
- return {.Reason = "Failed to upload payload",
- .Bytes = TotalBytes,
- .ElapsedSeconds = TotalElapsedSeconds,
- .Success = false};
+ const size_t Idx = std::distance(std::begin(CacheRecord.PayloadIds), It);
+
+ CloudCacheResult BlobResult;
+ for (int32_t Attempt = 0; Attempt < MaxAttempts && !BlobResult.Success; Attempt++)
+ {
+ BlobResult = Session.PutCompressedBlob(CacheRecord.PayloadIds[Idx], Payloads[Idx]);
+ }
+
+ m_HealthOk = BlobResult.ErrorCode == 0;
+
+ if (!BlobResult.Success)
+ {
+ OutReason = "upload payload '{}' FAILED, reason '{}'"_format(PayloadId, BlobResult.Reason);
+ return false;
+ }
+
+ TotalBytes += BlobResult.Bytes;
+ TotalElapsedSeconds += BlobResult.ElapsedSeconds;
}
+
+ return true;
+ };
+
+ PutRefResult RefResult;
+ for (int32_t Attempt = 0; Attempt < MaxAttempts && !RefResult.Success; Attempt++)
+ {
+ RefResult =
+ Session.PutRef(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RecordValue, ZenContentType::kCbObject);
}
- Success = false;
- for (int32_t Attempt = 0; Attempt < MaxAttempts; Attempt++)
+ m_HealthOk = RefResult.ErrorCode == 0;
+
+ if (!RefResult.Success)
{
- if (PutRefResult Result = Session.PutRef(CacheRecord.CacheKey.Bucket,
- CacheRecord.CacheKey.Hash,
- RecordValue,
- ZenContentType::kCbObject);
- Result.Success)
- {
- TotalBytes += Result.Bytes;
- TotalElapsedSeconds += Result.ElapsedSeconds;
- Success = true;
+ return {.Reason = "upload cache record '{}/{}' FAILED, reason '{}'"_format(CacheRecord.CacheKey.Bucket,
+ CacheRecord.CacheKey.Hash,
+ RefResult.Reason),
+ .Success = false};
+ }
- if (!Result.Needs.empty())
- {
- for (const IoHash& NeededHash : Result.Needs)
- {
- Success = false;
+ TotalBytes += RefResult.Bytes;
+ TotalElapsedSeconds += RefResult.ElapsedSeconds;
- if (auto It =
- std::find(std::begin(CacheRecord.PayloadIds), std::end(CacheRecord.PayloadIds), NeededHash);
- It != std::end(CacheRecord.PayloadIds))
- {
- const size_t Idx = It - std::begin(CacheRecord.PayloadIds);
-
- if (CloudCacheResult BlobResult =
- Session.PutCompressedBlob(CacheRecord.PayloadIds[Idx], Payloads[Idx]);
- BlobResult.Success)
- {
- TotalBytes += BlobResult.Bytes;
- TotalElapsedSeconds += BlobResult.ElapsedSeconds;
- Success = true;
- }
- else
- {
- ZEN_WARN("upload missing payload '{}/{}/{}' FAILED",
- CacheRecord.CacheKey.Bucket,
- CacheRecord.CacheKey.Hash,
- NeededHash);
- }
- }
- else
- {
- ZEN_WARN("needed payload '{}/{}/{}' MISSING",
- CacheRecord.CacheKey.Bucket,
- CacheRecord.CacheKey.Hash,
- NeededHash);
- }
- }
+ std::string Reason;
+ if (!PutBlobs(RefResult.Needs, Reason))
+ {
+ return {.Reason = std::move(Reason), .Success = false};
+ }
- const IoHash RefHash = IoHash::HashBuffer(RecordValue);
+ const IoHash RefHash = IoHash::HashBuffer(RecordValue);
+ FinalizeRefResult FinalizeResult = Session.FinalizeRef(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RefHash);
+ m_HealthOk = FinalizeResult.ErrorCode == 0;
- if (FinalizeRefResult FinalizeResult =
- Session.FinalizeRef(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RefHash);
- FinalizeResult.Success)
- {
- TotalBytes += FinalizeResult.Bytes;
- TotalElapsedSeconds += FinalizeResult.ElapsedSeconds;
- Success = true;
+ if (!FinalizeResult.Success)
+ {
+ return {.Reason = "finalize cache record '{}/{}' FAILED, reason '{}'"_format(CacheRecord.CacheKey.Bucket,
+ CacheRecord.CacheKey.Hash,
+ FinalizeResult.Reason),
+ .Success = false};
+ }
- for (const IoHash& MissingHash : FinalizeResult.Needs)
- {
- ZEN_WARN("finalize '{}/{}' FAILED, missing '{}'",
- CacheRecord.CacheKey.Bucket,
- CacheRecord.CacheKey.Hash,
- MissingHash);
- }
- }
- else
- {
- ZEN_WARN("finalize '{}/{}' FAILED", CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash);
- Success = false;
- }
- }
+ if (!FinalizeResult.Needs.empty())
+ {
+ if (!PutBlobs(FinalizeResult.Needs, Reason))
+ {
+ return {.Reason = std::move(Reason), .Success = false};
+ }
- if (Success)
+ FinalizeResult = Session.FinalizeRef(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RefHash);
+ m_HealthOk = FinalizeResult.ErrorCode == 0;
+
+ if (!FinalizeResult.Success)
+ {
+ return {.Reason = "finalize '{}/{}' FAILED, reason '{}'"_format(CacheRecord.CacheKey.Bucket,
+ CacheRecord.CacheKey.Hash,
+ FinalizeResult.Reason),
+ .Success = false};
+ }
+
+ if (!FinalizeResult.Needs.empty())
+ {
+ ExtendableStringBuilder<256> Sb;
+ for (const IoHash& MissingHash : FinalizeResult.Needs)
{
- break;
+ Sb << MissingHash.ToHexString() << ",";
}
+
+ return {.Reason = "finalize '{}/{}' FAILED, still needs payload(s) '{}'"_format(CacheRecord.CacheKey.Bucket,
+ CacheRecord.CacheKey.Hash,
+ Sb.ToString()),
+ .Success = false};
}
}
- return {.Bytes = TotalBytes, .ElapsedSeconds = TotalElapsedSeconds, .Success = Success};
+ TotalBytes += FinalizeResult.Bytes;
+ TotalElapsedSeconds += FinalizeResult.ElapsedSeconds;
+
+ return {.Bytes = TotalBytes, .ElapsedSeconds = TotalElapsedSeconds, .Success = true};
}
}
catch (std::exception& Err)
{
+ m_HealthOk = false;
return {.Reason = std::string(Err.what()), .Success = false};
}
}
@@ -326,9 +419,22 @@ namespace detail {
virtual UpstreamEndpointStats& Stats() override { return m_Stats; }
private:
+ static void AppendResult(const CloudCacheResult& Result, GetUpstreamCacheResult& Out)
+ {
+ Out.Success &= Result.Success;
+ Out.Bytes += Result.Bytes;
+ Out.ElapsedSeconds += Result.ElapsedSeconds;
+
+ if (Result.ErrorCode)
+ {
+ Out.Error = {.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)};
+ }
+ };
+
spdlog::logger& Log() { return m_Log; }
spdlog::logger& m_Log;
+ UpstreamEndpointInfo m_Info;
bool m_UseLegacyDdc;
std::string m_DisplayName;
RefPtr<CloudCacheClient> m_Client;
@@ -349,9 +455,13 @@ namespace detail {
};
public:
- ZenUpstreamEndpoint(std::span<std::string const> Urls) : m_Log(zen::logging::Get("upstream")), m_DisplayName("ZEN")
+ ZenUpstreamEndpoint(const ZenStructuredCacheClientOptions& Options)
+ : m_Log(zen::logging::Get("upstream"))
+ , m_Info({.Name = std::string("Zen")})
+ , m_ConnectTimeout(Options.ConnectTimeout)
+ , m_Timeout(Options.Timeout)
{
- for (const auto& Url : Urls)
+ for (const auto& Url : Options.Urls)
{
m_Endpoints.push_back({.Url = Url});
}
@@ -359,6 +469,8 @@ namespace detail {
~ZenUpstreamEndpoint() = default;
+ virtual const UpstreamEndpointInfo& GetEndpointInfo() const { return m_Info; }
+
virtual UpstreamEndpointHealth Initialize() override
{
using namespace fmt::literals;
@@ -366,9 +478,8 @@ namespace detail {
const ZenEndpoint& Ep = GetEndpoint();
if (Ep.Ok)
{
- m_ServiceUrl = Ep.Url;
- m_DisplayName = "ZEN - {}"_format(m_ServiceUrl);
- m_Client = new ZenStructuredCacheClient(m_ServiceUrl);
+ m_Info.Url = Ep.Url;
+ m_Client = new ZenStructuredCacheClient({.Url = m_Info.Url, .ConnectTimeout = m_ConnectTimeout, .Timeout = m_Timeout});
m_HealthOk = true;
return {.Ok = true};
@@ -391,9 +502,9 @@ namespace detail {
const ZenEndpoint& Ep = GetEndpoint();
if (Ep.Ok)
{
- m_ServiceUrl = Ep.Url;
- m_DisplayName = "ZEN - {}"_format(m_ServiceUrl);
- m_Client = new ZenStructuredCacheClient(m_ServiceUrl);
+ m_Info.Url = Ep.Url;
+ m_Client =
+ new ZenStructuredCacheClient({.Url = m_Info.Url, .ConnectTimeout = m_ConnectTimeout, .Timeout = m_Timeout});
m_HealthOk = true;
return {.Ok = true};
@@ -420,9 +531,7 @@ namespace detail {
}
}
- virtual std::string_view DisplayName() const override { return m_DisplayName; }
-
- virtual GetUpstreamCacheResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) override
+ virtual GetUpstreamCacheResult GetCacheRecord(CacheKey CacheKey, ZenContentType Type) override
{
try
{
@@ -449,13 +558,80 @@ namespace detail {
}
}
- virtual GetUpstreamCacheResult GetCachePayload(UpstreamPayloadKey PayloadKey) override
+ virtual GetUpstreamCacheResult GetCacheRecords(std::span<CacheKey> CacheKeys,
+ std::span<size_t> KeyIndex,
+ const CacheRecordPolicy& Policy,
+ OnCacheRecordGetComplete&& OnComplete) override
+ {
+ std::vector<size_t> IndexMap;
+ IndexMap.reserve(KeyIndex.size());
+
+ CbObjectWriter BatchRequest;
+ BatchRequest << "Method"sv
+ << "GetCacheRecords";
+
+ BatchRequest.BeginObject("Params"sv);
+ {
+ BatchRequest.BeginArray("CacheKeys"sv);
+ for (size_t Index : KeyIndex)
+ {
+ const CacheKey& Key = CacheKeys[Index];
+ IndexMap.push_back(Index);
+
+ BatchRequest.BeginObject();
+ BatchRequest << "Bucket"sv << Key.Bucket;
+ BatchRequest << "Hash"sv << Key.Hash;
+ BatchRequest.EndObject();
+ }
+ BatchRequest.EndArray();
+
+ BatchRequest.BeginObject("Policy"sv);
+ CacheRecordPolicy::Save(Policy, BatchRequest);
+ BatchRequest.EndObject();
+ }
+ BatchRequest.EndObject();
+
+ CbPackage BatchResponse;
+ ZenCacheResult Result;
+
+ {
+ ZenStructuredCacheSession Session(*m_Client);
+ Result = Session.InvokeRpc(BatchRequest.Save());
+ }
+
+ if (Result.Success)
+ {
+ if (BatchResponse.TryLoad(Result.Response))
+ {
+ for (size_t LocalIndex = 0; CbFieldView Record : BatchResponse.GetObject()["Result"sv])
+ {
+ const size_t Index = IndexMap[LocalIndex++];
+ OnComplete(
+ {.CacheKey = CacheKeys[Index], .KeyIndex = Index, .Record = Record.AsObjectView(), .Package = BatchResponse});
+ }
+
+ return {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = true};
+ }
+ }
+ else if (Result.ErrorCode)
+ {
+ m_HealthOk = false;
+ }
+
+ for (size_t Index : KeyIndex)
+ {
+ OnComplete({.CacheKey = CacheKeys[Index], .KeyIndex = Index, .Record = CbObjectView(), .Package = CbPackage()});
+ }
+
+ return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}};
+ }
+
+ virtual GetUpstreamCacheResult GetCachePayload(const CacheKey& CacheKey, const IoHash& PayloadId) override
{
try
{
ZenStructuredCacheSession Session(*m_Client);
- const ZenCacheResult Result =
- Session.GetCachePayload(PayloadKey.CacheKey.Bucket, PayloadKey.CacheKey.Hash, PayloadKey.PayloadId);
+ const ZenCacheResult Result = Session.GetCachePayload(CacheKey.Bucket, CacheKey.Hash, PayloadId);
if (Result.ErrorCode == 0)
{
@@ -477,12 +653,96 @@ namespace detail {
}
}
+ virtual GetUpstreamCacheResult GetCachePayloads(std::span<CacheChunkRequest> CacheChunkRequests,
+ std::span<size_t> RequestIndex,
+ OnCachePayloadGetComplete&& OnComplete) override final
+ {
+ std::vector<size_t> IndexMap;
+ IndexMap.reserve(RequestIndex.size());
+
+ CbObjectWriter BatchRequest;
+ BatchRequest << "Method"sv
+ << "GetCachePayloads";
+
+ BatchRequest.BeginObject("Params"sv);
+ {
+ BatchRequest.BeginArray("ChunkRequests"sv);
+ {
+ for (size_t Index : RequestIndex)
+ {
+ const CacheChunkRequest& Request = CacheChunkRequests[Index];
+ IndexMap.push_back(Index);
+
+ BatchRequest.BeginObject();
+ {
+ BatchRequest.BeginObject("Key"sv);
+ BatchRequest << "Bucket"sv << Request.Key.Bucket;
+ BatchRequest << "Hash"sv << Request.Key.Hash;
+ BatchRequest.EndObject();
+
+ BatchRequest.AddObjectId("PayloadId"sv, Request.PayloadId);
+ BatchRequest << "ChunkId"sv << Request.ChunkId;
+ BatchRequest << "RawOffset"sv << Request.RawOffset;
+ BatchRequest << "RawSize"sv << Request.RawSize;
+ BatchRequest << "Policy"sv << static_cast<uint32_t>(Request.Policy);
+ }
+ BatchRequest.EndObject();
+ }
+ }
+ BatchRequest.EndArray();
+ }
+ BatchRequest.EndObject();
+
+ CbPackage BatchResponse;
+ ZenCacheResult Result;
+
+ {
+ ZenStructuredCacheSession Session(*m_Client);
+ Result = Session.InvokeRpc(BatchRequest.Save());
+ }
+
+ if (Result.Success)
+ {
+ if (BatchResponse.TryLoad(Result.Response))
+ {
+ for (size_t LocalIndex = 0; CbFieldView AttachmentHash : BatchResponse.GetObject()["Result"sv])
+ {
+ const size_t Index = IndexMap[LocalIndex++];
+ IoBuffer Payload;
+
+ if (const CbAttachment* Attachment = BatchResponse.FindAttachment(AttachmentHash.AsHash()))
+ {
+ if (const CompressedBuffer& Compressed = Attachment->AsCompressedBinary())
+ {
+ Payload = Compressed.GetCompressed().Flatten().AsIoBuffer();
+ }
+ }
+
+ OnComplete({.Request = CacheChunkRequests[Index], .RequestIndex = Index, .Payload = std::move(Payload)});
+ }
+
+ return {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = true};
+ }
+ }
+ else if (Result.ErrorCode)
+ {
+ m_HealthOk = false;
+ }
+
+ for (size_t Index : RequestIndex)
+ {
+ OnComplete({.Request = CacheChunkRequests[Index], .RequestIndex = Index, .Payload = IoBuffer()});
+ }
+
+ return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}};
+ }
+
virtual PutUpstreamCacheResult PutCacheRecord(const UpstreamCacheRecord& CacheRecord,
IoBuffer RecordValue,
std::span<IoBuffer const> Payloads) override
{
ZEN_ASSERT(CacheRecord.PayloadIds.size() == Payloads.size());
- const uint32_t MaxAttempts = 3;
+ const int32_t MaxAttempts = 3;
try
{
@@ -565,12 +825,15 @@ namespace detail {
TotalElapsedSeconds += Result.ElapsedSeconds;
}
- return {.Bytes = TotalBytes, .ElapsedSeconds = TotalElapsedSeconds, .Success = Result.Success};
+ return {.Reason = std::move(Result.Reason),
+ .Bytes = TotalBytes,
+ .ElapsedSeconds = TotalElapsedSeconds,
+ .Success = Result.Success};
}
- catch (std::exception& e)
+ catch (std::exception& Err)
{
m_HealthOk = false;
- return {.Reason = std::string(e.what()), .Success = false};
+ return {.Reason = std::string(Err.what()), .Success = false};
}
}
@@ -581,7 +844,7 @@ namespace detail {
{
for (ZenEndpoint& Ep : m_Endpoints)
{
- ZenStructuredCacheClient Client(Ep.Url);
+ ZenStructuredCacheClient Client({.Url = Ep.Url, .ConnectTimeout = std::chrono::milliseconds(1000)});
ZenStructuredCacheSession Session(Client);
const int32_t SampleCount = 2;
@@ -602,7 +865,7 @@ namespace detail {
for (const auto& Ep : m_Endpoints)
{
- ZEN_INFO("ping ZEN endpoint '{}' latency '{:.3}s' {}", Ep.Url, Ep.Latency, Ep.Ok ? "OK" : Ep.Reason);
+ ZEN_INFO("ping 'Zen' endpoint '{}' latency '{:.3}s' {}", Ep.Url, Ep.Latency, Ep.Ok ? "OK" : Ep.Reason);
}
return m_Endpoints.front();
@@ -611,9 +874,10 @@ namespace detail {
spdlog::logger& Log() { return m_Log; }
spdlog::logger& m_Log;
- std::string m_ServiceUrl;
+ UpstreamEndpointInfo m_Info;
std::vector<ZenEndpoint> m_Endpoints;
- std::string m_DisplayName;
+ std::chrono::milliseconds m_ConnectTimeout;
+ std::chrono::milliseconds m_Timeout;
RefPtr<ZenStructuredCacheClient> m_Client;
UpstreamEndpointStats m_Stats;
std::atomic_bool m_HealthOk{false};
@@ -700,7 +964,7 @@ struct UpstreamStats
const double HitRate = TotalCount > 0 ? (double(HitCount) / double(TotalCount)) : 0.0;
Logger.debug("STATS - '{}', Hit rate: {:.2f}%, DOWN: '{:.2f} MiB {:.2f} MiB/s', UP: '{:.2f} MiB {:.2f} MiB/s'",
- Ep->DisplayName(),
+ Ep->GetEndpointInfo().Name,
HitRate,
DownBytes,
DownSpeed,
@@ -734,13 +998,15 @@ public:
for (auto& Endpoint : m_Endpoints)
{
const UpstreamEndpointHealth Health = Endpoint->Initialize();
+ const UpstreamEndpointInfo& Info = Endpoint->GetEndpointInfo();
+
if (Health.Ok)
{
- ZEN_INFO("initialize endpoint '{}' OK", Endpoint->DisplayName());
+ ZEN_INFO("'{}' endpoint '{}' OK", Info.Name, Info.Url);
}
else
{
- ZEN_WARN("initialize endpoint '{}' FAILED, reason '{}'", Endpoint->DisplayName(), Health.Reason);
+ ZEN_WARN("'{}' endpoint '{}' FAILED, reason '{}'", Info.Name, Info.Url, Health.Reason);
}
}
@@ -761,7 +1027,7 @@ public:
virtual void RegisterEndpoint(std::unique_ptr<UpstreamEndpoint> Endpoint) override { m_Endpoints.emplace_back(std::move(Endpoint)); }
- virtual GetUpstreamCacheResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) override
+ virtual GetUpstreamCacheResult GetCacheRecord(CacheKey CacheKey, ZenContentType Type) override
{
if (m_Options.ReadUpstream)
{
@@ -776,6 +1042,14 @@ public:
{
return Result;
}
+
+ if (Result.Error)
+ {
+ ZEN_ERROR("get cache record FAILED, endpoint '{}', reason '{}', error code '{}'",
+ Endpoint->GetEndpointInfo().Url,
+ Result.Error.Reason,
+ Result.Error.ErrorCode);
+ }
}
}
}
@@ -783,7 +1057,99 @@ public:
return {};
}
- virtual GetUpstreamCacheResult GetCachePayload(UpstreamPayloadKey PayloadKey) override
+ virtual void GetCacheRecords(std::span<CacheKey> CacheKeys,
+ std::span<size_t> KeyIndex,
+ const CacheRecordPolicy& Policy,
+ OnCacheRecordGetComplete&& OnComplete) override final
+ {
+ std::vector<size_t> MissingKeys(KeyIndex.begin(), KeyIndex.end());
+
+ if (m_Options.ReadUpstream)
+ {
+ for (auto& Endpoint : m_Endpoints)
+ {
+ if (Endpoint->IsHealthy() && !MissingKeys.empty())
+ {
+ std::vector<size_t> Missing;
+
+ auto Result = Endpoint->GetCacheRecords(CacheKeys, MissingKeys, Policy, [&](CacheRecordGetCompleteParams&& Params) {
+ if (Params.Record)
+ {
+ OnComplete(std::forward<CacheRecordGetCompleteParams>(Params));
+ }
+ else
+ {
+ Missing.push_back(Params.KeyIndex);
+ }
+ });
+
+ if (Result.Error)
+ {
+ ZEN_ERROR("get cache record(s) (rpc) FAILED, endpoint '{}', reason '{}', error code '{}'",
+ Endpoint->GetEndpointInfo().Url,
+ Result.Error.Reason,
+ Result.Error.ErrorCode);
+ }
+
+ m_Stats.Add(m_Log, *Endpoint, Result, m_Endpoints);
+ MissingKeys = std::move(Missing);
+ }
+ }
+ }
+
+ for (size_t Index : MissingKeys)
+ {
+ OnComplete({.CacheKey = CacheKeys[Index], .KeyIndex = Index, .Record = CbObjectView(), .Package = CbPackage()});
+ }
+ }
+
+ virtual void GetCachePayloads(std::span<CacheChunkRequest> CacheChunkRequests,
+ std::span<size_t> RequestIndex,
+ OnCachePayloadGetComplete&& OnComplete) override final
+ {
+ std::vector<size_t> MissingPayloads(RequestIndex.begin(), RequestIndex.end());
+
+ if (m_Options.ReadUpstream)
+ {
+ for (auto& Endpoint : m_Endpoints)
+ {
+ if (Endpoint->IsHealthy() && !MissingPayloads.empty())
+ {
+ std::vector<size_t> Missing;
+
+ auto Result =
+ Endpoint->GetCachePayloads(CacheChunkRequests, MissingPayloads, [&](CachePayloadGetCompleteParams&& Params) {
+ if (Params.Payload)
+ {
+ OnComplete(std::forward<CachePayloadGetCompleteParams>(Params));
+ }
+ else
+ {
+ Missing.push_back(Params.RequestIndex);
+ }
+ });
+
+ if (Result.Error)
+ {
+ ZEN_ERROR("get cache payloads(s) (rpc) FAILED, endpoint '{}', reason '{}', error code '{}'",
+ Endpoint->GetEndpointInfo().Url,
+ Result.Error.Reason,
+ Result.Error.ErrorCode);
+ }
+
+ m_Stats.Add(m_Log, *Endpoint, Result, m_Endpoints);
+ MissingPayloads = std::move(Missing);
+ }
+ }
+ }
+
+ for (size_t Index : MissingPayloads)
+ {
+ OnComplete({.Request = CacheChunkRequests[Index], .RequestIndex = Index, .Payload = IoBuffer()});
+ }
+ }
+
+ virtual GetUpstreamCacheResult GetCachePayload(const CacheKey& CacheKey, const IoHash& PayloadId) override
{
if (m_Options.ReadUpstream)
{
@@ -791,13 +1157,21 @@ public:
{
if (Endpoint->IsHealthy())
{
- const GetUpstreamCacheResult Result = Endpoint->GetCachePayload(PayloadKey);
+ const GetUpstreamCacheResult Result = Endpoint->GetCachePayload(CacheKey, PayloadId);
m_Stats.Add(m_Log, *Endpoint, Result, m_Endpoints);
if (Result.Success)
{
return Result;
}
+
+ if (Result.Error)
+ {
+ ZEN_ERROR("get cache payload FAILED, endpoint '{}', reason '{}', error code '{}'",
+ Endpoint->GetEndpointInfo().Url,
+ Result.Error.Reason,
+ Result.Error.ErrorCode);
+ }
}
}
}
@@ -834,8 +1208,10 @@ public:
Status.BeginArray("endpoints");
for (const auto& Ep : m_Endpoints)
{
+ const UpstreamEndpointInfo& Info = Ep->GetEndpointInfo();
Status.BeginObject();
- Status << "name" << Ep->DisplayName();
+ Status << "name" << Info.Name;
+ Status << "url" << Info.Url;
Status << "health" << (Ep->IsHealthy() ? "ok"sv : "inactive"sv);
UpstreamEndpointStats& Stats = Ep->Stats();
@@ -890,6 +1266,15 @@ private:
{
const PutUpstreamCacheResult Result = Endpoint->PutCacheRecord(CacheRecord, CacheValue.Value, std::span(Payloads));
m_Stats.Add(m_Log, *Endpoint, Result, m_Endpoints);
+
+ if (!Result.Success)
+ {
+ ZEN_WARN("upload cache record '{}/{}' FAILED, endpoint '{}', reason '{}'",
+ CacheRecord.CacheKey.Bucket,
+ CacheRecord.CacheKey.Hash,
+ Endpoint->GetEndpointInfo().Url,
+ Result.Reason);
+ }
}
}
}
@@ -905,9 +1290,12 @@ private:
{
ProcessCacheRecord(std::move(CacheRecord));
}
- catch (std::exception& e)
+ catch (std::exception& Err)
{
- ZEN_WARN("process upstream ({}/{}) FAILED '{}'", CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, e.what());
+ ZEN_ERROR("upload cache record '{}/{}' FAILED, reason '{}'",
+ CacheRecord.CacheKey.Bucket,
+ CacheRecord.CacheKey.Hash,
+ Err.what());
}
}
@@ -930,20 +1318,28 @@ private:
}
}
- for (auto& Endpoint : m_Endpoints)
+ try
{
- if (!Endpoint->IsHealthy())
+ for (auto& Endpoint : m_Endpoints)
{
- if (const UpstreamEndpointHealth Health = Endpoint->CheckHealth(); Health.Ok)
- {
- ZEN_INFO("health check endpoint '{}' OK", Endpoint->DisplayName(), Health.Reason);
- }
- else
+ if (!Endpoint->IsHealthy())
{
- ZEN_WARN("health check endpoint '{}' FAILED, reason '{}'", Endpoint->DisplayName(), Health.Reason);
+ const UpstreamEndpointInfo& Info = Endpoint->GetEndpointInfo();
+ if (const UpstreamEndpointHealth Health = Endpoint->CheckHealth(); Health.Ok)
+ {
+ ZEN_INFO("health check endpoint '{} - {}' OK", Info.Name, Info.Url, Health.Reason);
+ }
+ else
+ {
+ ZEN_WARN("health check endpoint '{} - {}' FAILED, reason '{}'", Info.Name, Info.Url, Health.Reason);
+ }
}
}
}
+ catch (std::exception& Err)
+ {
+ ZEN_ERROR("check endpoint(s) health FAILED, reason '{}'", Err.what());
+ }
}
}
@@ -1015,9 +1411,9 @@ MakeJupiterUpstreamEndpoint(const CloudCacheClientOptions& Options)
}
std::unique_ptr<UpstreamEndpoint>
-MakeZenUpstreamEndpoint(std::span<std::string const> Urls)
+MakeZenUpstreamEndpoint(const ZenStructuredCacheClientOptions& Options)
{
- return std::make_unique<detail::ZenUpstreamEndpoint>(Urls);
+ return std::make_unique<detail::ZenUpstreamEndpoint>(Options);
}
} // namespace zen