aboutsummaryrefslogtreecommitdiff
path: root/zenserver/upstream/upstreamcache.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'zenserver/upstream/upstreamcache.cpp')
-rw-r--r--zenserver/upstream/upstreamcache.cpp242
1 files changed, 187 insertions, 55 deletions
diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp
index 0dd16cd06..b1966e299 100644
--- a/zenserver/upstream/upstreamcache.cpp
+++ b/zenserver/upstream/upstreamcache.cpp
@@ -9,6 +9,7 @@
#include <zencore/compactbinarypackage.h>
#include <zencore/compactbinaryvalidation.h>
#include <zencore/fmtutils.h>
+#include <zencore/stats.h>
#include <zencore/stream.h>
#include <zencore/timer.h>
@@ -45,6 +46,7 @@ namespace detail {
{
std::lock_guard Lock(m_Lock);
m_Queue.emplace_back(std::move(Item));
+ m_Size++;
}
m_NewItemSignal.notify_one();
@@ -64,6 +66,7 @@ namespace detail {
{
Item = std::move(m_Queue.front());
m_Queue.pop_front();
+ m_Size--;
return true;
}
@@ -80,7 +83,7 @@ namespace detail {
}
}
- std::size_t Num() const
+ std::size_t Size() const
{
std::unique_lock Lock(m_Lock);
return m_Queue.size();
@@ -91,12 +94,15 @@ namespace detail {
std::condition_variable m_NewItemSignal;
std::deque<T> m_Queue;
std::atomic_bool m_CompleteAdding{false};
+ std::atomic_uint32_t m_Size;
};
class JupiterUpstreamEndpoint final : public UpstreamEndpoint
{
public:
- JupiterUpstreamEndpoint(const CloudCacheClientOptions& Options) : m_UseLegacyDdc(Options.UseLegacyDdc)
+ JupiterUpstreamEndpoint(const CloudCacheClientOptions& Options)
+ : m_Log(zen::logging::Get("upstream"))
+ , m_UseLegacyDdc(Options.UseLegacyDdc)
{
using namespace fmt::literals;
m_DisplayName = "Jupier - '{}'"_format(Options.ServiceUrl);
@@ -180,16 +186,23 @@ namespace detail {
}
}
- m_HealthOk = Result.ErrorCode == 0;
-
- return {.Value = Result.Response,
- .Bytes = Result.Bytes,
- .ElapsedSeconds = Result.ElapsedSeconds,
- .Success = Result.Success};
+ if (Result.ErrorCode == 0)
+ {
+ return {.Value = Result.Response,
+ .Bytes = Result.Bytes,
+ .ElapsedSeconds = Result.ElapsedSeconds,
+ .Success = Result.Success};
+ }
+ else
+ {
+ m_HealthOk = false;
+ return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}};
+ }
}
catch (std::exception& Err)
{
- return {.Error{.StatusCode = UpstreamStatusCode::Error, .Reason = Err.what()}};
+ m_HealthOk = false;
+ return {.Error{.ErrorCode = -1, .Reason = Err.what()}};
}
}
@@ -200,16 +213,23 @@ namespace detail {
CloudCacheSession Session(m_Client);
const CloudCacheResult Result = Session.GetCompressedBlob(PayloadKey.PayloadId);
- m_HealthOk = Result.ErrorCode == 0;
-
- return {.Value = Result.Response,
- .Bytes = Result.Bytes,
- .ElapsedSeconds = Result.ElapsedSeconds,
- .Success = Result.Success};
+ if (Result.ErrorCode == 0)
+ {
+ return {.Value = Result.Response,
+ .Bytes = Result.Bytes,
+ .ElapsedSeconds = Result.ElapsedSeconds,
+ .Success = Result.Success};
+ }
+ else
+ {
+ m_HealthOk = false;
+ return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}};
+ }
}
catch (std::exception& Err)
{
- return {.Error{.StatusCode = UpstreamStatusCode::Error, .Reason = Err.what()}};
+ m_HealthOk = false;
+ return {.Error{.ErrorCode = -1, .Reason = Err.what()}};
}
}
@@ -277,16 +297,82 @@ namespace detail {
Success = false;
for (int32_t Attempt = 0; Attempt < MaxAttempts; Attempt++)
{
- if (CloudCacheResult Result = Session.PutRef(CacheRecord.CacheKey.Bucket,
- CacheRecord.CacheKey.Hash,
- RecordValue,
- ZenContentType::kCbObject);
+ if (PutRefResult Result = Session.PutRef(CacheRecord.CacheKey.Bucket,
+ CacheRecord.CacheKey.Hash,
+ RecordValue,
+ ZenContentType::kCbObject);
Result.Success)
{
TotalBytes += Result.Bytes;
TotalElapsedSeconds += Result.ElapsedSeconds;
Success = true;
- break;
+
+ if (!Result.Needs.empty())
+ {
+ for (const IoHash& NeededHash : Result.Needs)
+ {
+ Success = false;
+
+ if (auto It =
+ std::find(std::begin(CacheRecord.PayloadIds), std::end(CacheRecord.PayloadIds), NeededHash);
+ It != std::end(CacheRecord.PayloadIds))
+ {
+ const size_t Idx = It - std::begin(CacheRecord.PayloadIds);
+
+ if (CloudCacheResult BlobResult =
+ Session.PutCompressedBlob(CacheRecord.PayloadIds[Idx], Payloads[Idx]);
+ BlobResult.Success)
+ {
+ TotalBytes += BlobResult.Bytes;
+ TotalElapsedSeconds += BlobResult.ElapsedSeconds;
+ Success = true;
+ }
+ else
+ {
+ ZEN_WARN("upload missing payload '{}/{}/{}' FAILED",
+ CacheRecord.CacheKey.Bucket,
+ CacheRecord.CacheKey.Hash,
+ NeededHash);
+ }
+ }
+ else
+ {
+ ZEN_WARN("needed payload '{}/{}/{}' MISSING",
+ CacheRecord.CacheKey.Bucket,
+ CacheRecord.CacheKey.Hash,
+ NeededHash);
+ }
+ }
+
+ const IoHash RefHash = IoHash::HashBuffer(RecordValue);
+
+ if (FinalizeRefResult FinalizeResult =
+ Session.FinalizeRef(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RefHash);
+ FinalizeResult.Success)
+ {
+ TotalBytes += FinalizeResult.Bytes;
+ TotalElapsedSeconds += FinalizeResult.ElapsedSeconds;
+ Success = true;
+
+ for (const IoHash& MissingHash : FinalizeResult.Needs)
+ {
+ ZEN_WARN("finalize '{}/{}' FAILED, missing '{}'",
+ CacheRecord.CacheKey.Bucket,
+ CacheRecord.CacheKey.Hash,
+ MissingHash);
+ }
+ }
+ else
+ {
+ ZEN_WARN("finalize '{}/{}' FAILED", CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash);
+ Success = false;
+ }
+ }
+
+ if (Success)
+ {
+ break;
+ }
}
}
@@ -302,6 +388,9 @@ namespace detail {
virtual UpstreamEndpointStats& Stats() override { return m_Stats; }
private:
+ spdlog::logger& Log() { return m_Log; }
+
+ spdlog::logger& m_Log;
bool m_UseLegacyDdc;
std::string m_DisplayName;
RefPtr<CloudCacheClient> m_Client;
@@ -354,16 +443,23 @@ namespace detail {
ZenStructuredCacheSession Session(*m_Client);
const ZenCacheResult Result = Session.GetCacheRecord(CacheKey.Bucket, CacheKey.Hash, Type);
- m_HealthOk = Result.ErrorCode == 0;
-
- return {.Value = Result.Response,
- .Bytes = Result.Bytes,
- .ElapsedSeconds = Result.ElapsedSeconds,
- .Success = Result.Success};
+ if (Result.ErrorCode == 0)
+ {
+ return {.Value = Result.Response,
+ .Bytes = Result.Bytes,
+ .ElapsedSeconds = Result.ElapsedSeconds,
+ .Success = Result.Success};
+ }
+ else
+ {
+ m_HealthOk = false;
+ return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}};
+ }
}
catch (std::exception& Err)
{
- return {.Error{.StatusCode = UpstreamStatusCode::Error, .Reason = Err.what()}};
+ m_HealthOk = false;
+ return {.Error{.ErrorCode = -1, .Reason = Err.what()}};
}
}
@@ -375,16 +471,23 @@ namespace detail {
const ZenCacheResult Result =
Session.GetCachePayload(PayloadKey.CacheKey.Bucket, PayloadKey.CacheKey.Hash, PayloadKey.PayloadId);
- m_HealthOk = Result.ErrorCode == 0;
-
- return {.Value = Result.Response,
- .Bytes = Result.Bytes,
- .ElapsedSeconds = Result.ElapsedSeconds,
- .Success = Result.Success};
+ if (Result.ErrorCode == 0)
+ {
+ return {.Value = Result.Response,
+ .Bytes = Result.Bytes,
+ .ElapsedSeconds = Result.ElapsedSeconds,
+ .Success = Result.Success};
+ }
+ else
+ {
+ m_HealthOk = false;
+ return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}};
+ }
}
catch (std::exception& Err)
{
- return {.Error{.StatusCode = UpstreamStatusCode::Error, .Reason = Err.what()}};
+ m_HealthOk = false;
+ return {.Error{.ErrorCode = -1, .Reason = Err.what()}};
}
}
@@ -500,7 +603,7 @@ namespace detail {
struct UpstreamStats
{
- static constexpr uint64_t MaxSampleCount = 100ull;
+ static constexpr uint64_t MaxSampleCount = 1000ull;
UpstreamStats(bool Enabled) : m_Enabled(Enabled) {}
@@ -509,13 +612,13 @@ struct UpstreamStats
const GetUpstreamCacheResult& Result,
const std::vector<std::unique_ptr<UpstreamEndpoint>>& Endpoints)
{
- if (!m_Enabled)
+ UpstreamEndpointStats& Stats = Endpoint.Stats();
+
+ if (Result.Error)
{
- return;
+ Stats.ErrorCount++;
}
-
- UpstreamEndpointStats& Stats = Endpoint.Stats();
- if (Result.Success)
+ else if (Result.Success)
{
Stats.HitCount++;
Stats.DownBytes.fetch_add(double(Result.Bytes) / 1024.0 / 1024.0);
@@ -526,7 +629,7 @@ struct UpstreamStats
Stats.MissCount++;
}
- if (m_SampleCount++ % MaxSampleCount)
+ if (m_Enabled && m_SampleCount++ % MaxSampleCount)
{
Dump(Logger, Endpoints);
}
@@ -537,11 +640,6 @@ struct UpstreamStats
const PutUpstreamCacheResult& Result,
const std::vector<std::unique_ptr<UpstreamEndpoint>>& Endpoints)
{
- if (!m_Enabled)
- {
- return;
- }
-
UpstreamEndpointStats& Stats = Endpoint.Stats();
if (Result.Success)
{
@@ -549,8 +647,12 @@ struct UpstreamStats
Stats.UpBytes.fetch_add(double(Result.Bytes) / 1024.0 / 1024.0);
Stats.SecondsUp.fetch_add(Result.ElapsedSeconds);
}
+ else
+ {
+ Stats.ErrorCount++;
+ }
- if (m_SampleCount++ % MaxSampleCount)
+ if (m_Enabled && m_SampleCount++ % MaxSampleCount)
{
Dump(Logger, Endpoints);
}
@@ -575,13 +677,13 @@ struct UpstreamStats
const uint64_t TotalCount = HitCount + MissCount;
const double HitRate = TotalCount > 0 ? (double(HitCount) / double(TotalCount)) * 100.0 : 0.0;
- Logger.info("STATS - '{}', Hit rate: {:.2f}%, DOWN: '{:.2f} MiB {:.2f} MiB/s', UP: '{:.2f} MiB {:.2f} MiB/s'",
- Ep->DisplayName(),
- HitRate,
- DownBytes,
- DownSpeed,
- UpBytes,
- UpSpeed);
+ Logger.debug("STATS - '{}', Hit rate: {:.2f}%, DOWN: '{:.2f} MiB {:.2f} MiB/s', UP: '{:.2f} MiB {:.2f} MiB/s'",
+ Ep->DisplayName(),
+ HitRate,
+ DownBytes,
+ DownSpeed,
+ UpBytes,
+ UpSpeed);
}
}
@@ -700,6 +802,36 @@ public:
return {};
}
+ virtual void GetStatus(CbObjectWriter& Status) override
+ {
+ Status << "reading" << m_Options.ReadUpstream;
+ Status << "writing" << m_Options.WriteUpstream;
+ Status << "worker_threads" << m_Options.ThreadCount;
+ Status << "queue_count" << m_UpstreamQueue.Size();
+
+ Status.BeginArray("endpoints");
+ for (const auto& Ep : m_Endpoints)
+ {
+ Status.BeginObject();
+ Status << "name" << Ep->DisplayName();
+ Status << "health" << (Ep->IsHealthy() ? "ok"sv : "inactive"sv);
+
+ UpstreamEndpointStats& Stats = Ep->Stats();
+ const uint64_t HitCount = Stats.HitCount;
+ const uint64_t MissCount = Stats.MissCount;
+ const uint64_t TotalCount = HitCount + MissCount;
+ const double HitRate = TotalCount > 0 ? (double(HitCount) / double(TotalCount)) * 100.0 : 0.0;
+
+ Status << "hit_ratio" << HitRate;
+ Status << "downloaded_mb" << Stats.DownBytes;
+ Status << "uploaded_mb" << Stats.UpBytes;
+ Status << "error_count" << Stats.ErrorCount;
+
+ Status.EndObject();
+ }
+ Status.EndArray();
+ }
+
private:
void ProcessCacheRecord(UpstreamCacheRecord CacheRecord)
{