aboutsummaryrefslogtreecommitdiff
path: root/zenserver/upstream/upstreamcache.cpp
diff options
context:
space:
mode:
authorPer Larsson <[email protected]>2021-11-11 13:28:52 +0100
committerPer Larsson <[email protected]>2021-11-11 13:28:52 +0100
commit7940ae2ff24e1b708d1d6a0bccf266213ea7316b (patch)
tree7d9863b3a89e8567b34d28205f25b93b5091dd2f /zenserver/upstream/upstreamcache.cpp
parentRemoved batch result. (diff)
downloadzen-7940ae2ff24e1b708d1d6a0bccf266213ea7316b.tar.xz
zen-7940ae2ff24e1b708d1d6a0bccf266213ea7316b.zip
Fixed stats.
Diffstat (limited to 'zenserver/upstream/upstreamcache.cpp')
-rw-r--r--zenserver/upstream/upstreamcache.cpp211
1 files changed, 119 insertions, 92 deletions
diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp
index d5414daf1..b486b751c 100644
--- a/zenserver/upstream/upstreamcache.cpp
+++ b/zenserver/upstream/upstreamcache.cpp
@@ -151,43 +151,54 @@ namespace detail {
{
ZEN_UNUSED(Policy);
- CloudCacheSession Session(m_Client);
+ CloudCacheSession Session(m_Client);
+ GetUpstreamCacheResult Result;
for (size_t Index : KeyIndex)
{
- const CacheKey& CacheKey = CacheKeys[Index];
- CloudCacheResult Result = Session.GetRef(CacheKey.Bucket, CacheKey.Hash, ZenContentType::kCbObject);
+ const CacheKey& CacheKey = CacheKeys[Index];
+ CbPackage Package;
+ CbObjectView Record;
- CbPackage Package;
- CbObjectView Record;
-
- if (Result.ErrorCode == 0)
+ if (!Result.Error)
{
- const CbValidateError ValidationResult = ValidateCompactBinary(Result.Response, CbValidateMode::All);
- if (ValidationResult == CbValidateError::None)
+ CloudCacheResult RefResult = Session.GetRef(CacheKey.Bucket, CacheKey.Hash, ZenContentType::kCbObject);
+ AppendResult(RefResult, Result);
+
+ if (RefResult.ErrorCode == 0)
{
- Record = CbObjectView(Result.Response.GetData());
- Record.IterateAttachments([&Session, &Result, &Package](CbFieldView AttachmentHash) {
- CloudCacheResult AttachmentResult = Session.GetCompressedBlob(AttachmentHash.AsHash());
- if (AttachmentResult.Success)
- {
- if (CompressedBuffer Chunk = CompressedBuffer::FromCompressed(SharedBuffer(AttachmentResult.Response)))
+ const CbValidateError ValidationResult = ValidateCompactBinary(RefResult.Response, CbValidateMode::All);
+ if (ValidationResult == CbValidateError::None)
+ {
+ Record = CbObjectView(RefResult.Response.GetData());
+ Record.IterateAttachments([this, &Session, &Result, &Package](CbFieldView AttachmentHash) {
+ CloudCacheResult BlobResult = Session.GetCompressedBlob(AttachmentHash.AsHash());
+ AppendResult(BlobResult, Result);
+
+ if (BlobResult.ErrorCode == 0)
{
- Package.AddAttachment(CbAttachment(Chunk));
+ if (CompressedBuffer Chunk = CompressedBuffer::FromCompressed(SharedBuffer(BlobResult.Response)))
+ {
+ Package.AddAttachment(CbAttachment(Chunk));
+ }
}
- }
- });
+ else
+ {
+ m_HealthOk = false;
+ }
+ });
+ }
+ }
+ else
+ {
+ m_HealthOk = false;
}
- }
- else
- {
- m_HealthOk = false;
}
OnComplete({.CacheKey = CacheKey, .KeyIndex = Index, .Record = Record, .Package = Package});
}
- return {};
+ return Result;
}
virtual GetUpstreamCacheResult GetCachePayload(const CacheKey&, const IoHash& PayloadId) override
@@ -221,23 +232,27 @@ namespace detail {
std::span<size_t> RequestIndex,
OnCachePayloadGetComplete&& OnComplete) override final
{
- CloudCacheSession Session(m_Client);
+ CloudCacheSession Session(m_Client);
+ GetUpstreamCacheResult Result;
for (size_t Index : RequestIndex)
{
const CacheChunkRequest& Request = CacheChunkRequests[Index];
- const CloudCacheResult Result = Session.GetCompressedBlob(Request.ChunkId);
-
- OnComplete({.Request = Request, .RequestIndex = Index, .Payload = Result.Response});
+ IoBuffer Payload;
- if (Result.ErrorCode)
+ if (!Result.Error)
{
- m_HealthOk = false;
- break;
+ const CloudCacheResult BlobResult = Session.GetCompressedBlob(Request.ChunkId);
+ Payload = BlobResult.Response;
+
+ AppendResult(BlobResult, Result);
+ m_HealthOk = BlobResult.ErrorCode == 0;
}
+
+ OnComplete({.Request = Request, .RequestIndex = Index, .Payload = Payload});
}
- return {};
+ return Result;
}
virtual PutUpstreamCacheResult PutCacheRecord(const UpstreamCacheRecord& CacheRecord,
@@ -392,6 +407,18 @@ namespace detail {
virtual UpstreamEndpointStats& Stats() override { return m_Stats; }
private:
+ static void AppendResult(const CloudCacheResult& Result, GetUpstreamCacheResult& Out)
+ {
+ Out.Success &= Result.Success;
+ Out.Bytes += Result.Bytes;
+ Out.ElapsedSeconds += Result.ElapsedSeconds;
+
+ if (Result.ErrorCode)
+ {
+ Out.Error = {.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)};
+ }
+ };
+
spdlog::logger& Log() { return m_Log; }
spdlog::logger& m_Log;
@@ -548,39 +575,39 @@ namespace detail {
}
BatchRequest.EndObject();
- CbPackage BatchResponse;
- bool Success = false;
+ CbPackage BatchResponse;
+ ZenCacheResult Result;
{
ZenStructuredCacheSession Session(*m_Client);
- ZenCacheResult Result = Session.InvokeRpc(BatchRequest.Save());
- if (Result.Success)
- {
- Success = BatchResponse.TryLoad(Result.Response);
- }
- else if (Result.ErrorCode)
- {
- Success = m_HealthOk = false;
- }
+ Result = Session.InvokeRpc(BatchRequest.Save());
}
- if (!Success)
+ if (Result.Success)
{
- for (size_t Index : KeyIndex)
+ if (BatchResponse.TryLoad(Result.Response))
{
- OnComplete({.CacheKey = CacheKeys[Index], .KeyIndex = Index, .Record = CbObjectView(), .Package = CbPackage()});
- }
+ for (size_t LocalIndex = 0; CbFieldView Record : BatchResponse.GetObject()["Result"sv])
+ {
+ const size_t Index = IndexMap[LocalIndex++];
+ OnComplete(
+ {.CacheKey = CacheKeys[Index], .KeyIndex = Index, .Record = Record.AsObjectView(), .Package = BatchResponse});
+ }
- return {};
+ return {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = true};
+ }
+ }
+ else if (Result.ErrorCode)
+ {
+ m_HealthOk = false;
}
- for (size_t LocalIndex = 0; CbFieldView Record : BatchResponse.GetObject()["Result"sv])
+ for (size_t Index : KeyIndex)
{
- const size_t Index = IndexMap[LocalIndex++];
- OnComplete({.CacheKey = CacheKeys[Index], .KeyIndex = Index, .Record = Record.AsObjectView(), .Package = BatchResponse});
+ OnComplete({.CacheKey = CacheKeys[Index], .KeyIndex = Index, .Record = CbObjectView(), .Package = CbPackage()});
}
- return {};
+ return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}};
}
virtual GetUpstreamCacheResult GetCachePayload(const CacheKey& CacheKey, const IoHash& PayloadId) override
@@ -650,49 +677,48 @@ namespace detail {
}
BatchRequest.EndObject();
- CbPackage BatchResponse;
- bool Success = false;
+ CbPackage BatchResponse;
+ ZenCacheResult Result;
{
ZenStructuredCacheSession Session(*m_Client);
- ZenCacheResult Result = Session.InvokeRpc(BatchRequest.Save());
- if (Result.Success)
- {
- Success = BatchResponse.TryLoad(Result.Response);
- }
- else if (Result.ErrorCode)
- {
- m_HealthOk = false;
- }
+ Result = Session.InvokeRpc(BatchRequest.Save());
}
- if (!Success)
+ if (Result.Success)
{
- for (size_t Index : RequestIndex)
+ if (BatchResponse.TryLoad(Result.Response))
{
- OnComplete({.Request = CacheChunkRequests[Index], .RequestIndex = Index, .Payload = IoBuffer()});
- }
-
- return {};
- }
+ for (size_t LocalIndex = 0; CbFieldView AttachmentHash : BatchResponse.GetObject()["Result"sv])
+ {
+ const size_t Index = IndexMap[LocalIndex++];
+ IoBuffer Payload;
- for (int32_t LocalIndex = 0; CbFieldView AttachmentHash : BatchResponse.GetObject()["Result"sv])
- {
- const size_t Index = IndexMap[LocalIndex++];
- IoBuffer Payload;
+ if (const CbAttachment* Attachment = BatchResponse.FindAttachment(AttachmentHash.AsHash()))
+ {
+ if (const CompressedBuffer& Compressed = Attachment->AsCompressedBinary())
+ {
+ Payload = Compressed.GetCompressed().Flatten().AsIoBuffer();
+ }
+ }
- if (const CbAttachment* Attachment = BatchResponse.FindAttachment(AttachmentHash.AsHash()))
- {
- if (const CompressedBuffer& Compressed = Attachment->AsCompressedBinary())
- {
- Payload = Compressed.GetCompressed().Flatten().AsIoBuffer();
+ OnComplete({.Request = CacheChunkRequests[Index], .RequestIndex = Index, .Payload = std::move(Payload)});
}
+
+ return {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = true};
}
+ }
+ else if (Result.ErrorCode)
+ {
+ m_HealthOk = false;
+ }
- OnComplete({.Request = CacheChunkRequests[Index], .RequestIndex = Index, .Payload = std::move(Payload)});
+ for (size_t Index : RequestIndex)
+ {
+ OnComplete({.Request = CacheChunkRequests[Index], .RequestIndex = Index, .Payload = IoBuffer()});
}
- return {};
+ return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}};
}
virtual PutUpstreamCacheResult PutCacheRecord(const UpstreamCacheRecord& CacheRecord,
@@ -1016,18 +1042,18 @@ public:
{
std::vector<size_t> Missing;
- auto EndpointResult =
- Endpoint->GetCacheRecords(CacheKeys, MissingKeys, Policy, [&](CacheRecordGetCompleteParams&& Params) {
- if (Params.Record)
- {
- OnComplete(std::forward<CacheRecordGetCompleteParams>(Params));
- }
- else
- {
- Missing.push_back(Params.KeyIndex);
- }
- });
+ auto Result = Endpoint->GetCacheRecords(CacheKeys, MissingKeys, Policy, [&](CacheRecordGetCompleteParams&& Params) {
+ if (Params.Record)
+ {
+ OnComplete(std::forward<CacheRecordGetCompleteParams>(Params));
+ }
+ else
+ {
+ Missing.push_back(Params.KeyIndex);
+ }
+ });
+ m_Stats.Add(m_Log, *Endpoint, Result, m_Endpoints);
MissingKeys = std::move(Missing);
}
}
@@ -1053,7 +1079,7 @@ public:
{
std::vector<size_t> Missing;
- auto EndpointResult =
+ auto Result =
Endpoint->GetCachePayloads(CacheChunkRequests, MissingPayloads, [&](CachePayloadGetCompleteParams&& Params) {
if (Params.Payload)
{
@@ -1065,6 +1091,7 @@ public:
}
});
+ m_Stats.Add(m_Log, *Endpoint, Result, m_Endpoints);
MissingPayloads = std::move(Missing);
}
}