aboutsummaryrefslogtreecommitdiff
path: root/zenserver/upstream/upstreamcache.cpp
diff options
context:
space:
mode:
authorPer Larsson <[email protected]>2021-10-01 11:06:32 +0200
committerPer Larsson <[email protected]>2021-10-01 11:06:32 +0200
commit0d3ce5ffecfd894b3a2b0421c562f2bb06fdc1c8 (patch)
treeacedad27ed8f61c3694395d18b13240cb401c25b /zenserver/upstream/upstreamcache.cpp
parentClang format fix. (diff)
downloadzen-0d3ce5ffecfd894b3a2b0421c562f2bb06fdc1c8.tar.xz
zen-0d3ce5ffecfd894b3a2b0421c562f2bb06fdc1c8.zip
Try upload missing payloads and finalize Jupiter cache ref.
Diffstat (limited to 'zenserver/upstream/upstreamcache.cpp')
-rw-r--r--zenserver/upstream/upstreamcache.cpp114
1 files changed, 99 insertions, 15 deletions
diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp
index 0dd16cd06..0329569d8 100644
--- a/zenserver/upstream/upstreamcache.cpp
+++ b/zenserver/upstream/upstreamcache.cpp
@@ -9,6 +9,7 @@
#include <zencore/compactbinarypackage.h>
#include <zencore/compactbinaryvalidation.h>
#include <zencore/fmtutils.h>
+#include <zencore/stats.h>
#include <zencore/stream.h>
#include <zencore/timer.h>
@@ -45,6 +46,7 @@ namespace detail {
{
std::lock_guard Lock(m_Lock);
m_Queue.emplace_back(std::move(Item));
+ m_Size++;
}
m_NewItemSignal.notify_one();
@@ -64,6 +66,7 @@ namespace detail {
{
Item = std::move(m_Queue.front());
m_Queue.pop_front();
+ m_Size--;
return true;
}
@@ -80,7 +83,7 @@ namespace detail {
}
}
- std::size_t Num() const
+ std::size_t Size() const
{
std::unique_lock Lock(m_Lock);
return m_Queue.size();
@@ -91,12 +94,15 @@ namespace detail {
std::condition_variable m_NewItemSignal;
std::deque<T> m_Queue;
std::atomic_bool m_CompleteAdding{false};
+ std::atomic_uint32_t m_Size;
};
class JupiterUpstreamEndpoint final : public UpstreamEndpoint
{
public:
- JupiterUpstreamEndpoint(const CloudCacheClientOptions& Options) : m_UseLegacyDdc(Options.UseLegacyDdc)
+ JupiterUpstreamEndpoint(const CloudCacheClientOptions& Options)
+ : m_Log(zen::logging::Get("upstream"))
+ , m_UseLegacyDdc(Options.UseLegacyDdc)
{
using namespace fmt::literals;
m_DisplayName = "Jupier - '{}'"_format(Options.ServiceUrl);
@@ -277,16 +283,82 @@ namespace detail {
Success = false;
for (int32_t Attempt = 0; Attempt < MaxAttempts; Attempt++)
{
- if (CloudCacheResult Result = Session.PutRef(CacheRecord.CacheKey.Bucket,
- CacheRecord.CacheKey.Hash,
- RecordValue,
- ZenContentType::kCbObject);
+ if (PutRefResult Result = Session.PutRef(CacheRecord.CacheKey.Bucket,
+ CacheRecord.CacheKey.Hash,
+ RecordValue,
+ ZenContentType::kCbObject);
Result.Success)
{
TotalBytes += Result.Bytes;
TotalElapsedSeconds += Result.ElapsedSeconds;
Success = true;
- break;
+
+ if (!Result.Needs.empty())
+ {
+ for (const IoHash& NeededHash : Result.Needs)
+ {
+ Success = false;
+
+ if (auto It =
+ std::find(std::begin(CacheRecord.PayloadIds), std::end(CacheRecord.PayloadIds), NeededHash);
+ It != std::end(CacheRecord.PayloadIds))
+ {
+ const size_t Idx = It - std::begin(CacheRecord.PayloadIds);
+
+ if (CloudCacheResult BlobResult =
+ Session.PutCompressedBlob(CacheRecord.PayloadIds[Idx], Payloads[Idx]);
+ BlobResult.Success)
+ {
+ TotalBytes += BlobResult.Bytes;
+ TotalElapsedSeconds += BlobResult.ElapsedSeconds;
+ Success = true;
+ }
+ else
+ {
+ ZEN_WARN("upload missing payload '{}/{}/{}' FAILED",
+ CacheRecord.CacheKey.Bucket,
+ CacheRecord.CacheKey.Hash,
+ NeededHash);
+ }
+ }
+ else
+ {
+ ZEN_WARN("needed payload '{}/{}/{}' MISSING",
+ CacheRecord.CacheKey.Bucket,
+ CacheRecord.CacheKey.Hash,
+ NeededHash);
+ }
+ }
+
+ const IoHash RefHash = IoHash::HashBuffer(RecordValue);
+
+ if (FinalizeRefResult FinalizeResult =
+ Session.FinalizeRef(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RefHash);
+ FinalizeResult.Success)
+ {
+ TotalBytes += FinalizeResult.Bytes;
+ TotalElapsedSeconds += FinalizeResult.ElapsedSeconds;
+ Success = true;
+
+ for (const IoHash& MissingHash : FinalizeResult.Needs)
+ {
+ ZEN_WARN("finalize '{}/{}' FAILED, missing '{}'",
+ CacheRecord.CacheKey.Bucket,
+ CacheRecord.CacheKey.Hash,
+ MissingHash);
+ }
+ }
+ else
+ {
+ ZEN_WARN("finalize '{}/{}' FAILED", CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash);
+ Success = false;
+ }
+ }
+
+ if (Success)
+ {
+ break;
+ }
}
}
@@ -302,6 +374,9 @@ namespace detail {
virtual UpstreamEndpointStats& Stats() override { return m_Stats; }
private:
+ spdlog::logger& Log() { return m_Log; }
+
+ spdlog::logger& m_Log;
bool m_UseLegacyDdc;
std::string m_DisplayName;
RefPtr<CloudCacheClient> m_Client;
@@ -515,7 +590,12 @@ struct UpstreamStats
}
UpstreamEndpointStats& Stats = Endpoint.Stats();
- if (Result.Success)
+
+ if (Result.Error)
+ {
+ Stats.ErrorCount++;
+ }
+ else if (Result.Success)
{
Stats.HitCount++;
Stats.DownBytes.fetch_add(double(Result.Bytes) / 1024.0 / 1024.0);
@@ -549,6 +629,10 @@ struct UpstreamStats
Stats.UpBytes.fetch_add(double(Result.Bytes) / 1024.0 / 1024.0);
Stats.SecondsUp.fetch_add(Result.ElapsedSeconds);
}
+ else
+ {
+ Stats.ErrorCount++;
+ }
if (m_SampleCount++ % MaxSampleCount)
{
@@ -575,13 +659,13 @@ struct UpstreamStats
const uint64_t TotalCount = HitCount + MissCount;
const double HitRate = TotalCount > 0 ? (double(HitCount) / double(TotalCount)) * 100.0 : 0.0;
- Logger.info("STATS - '{}', Hit rate: {:.2f}%, DOWN: '{:.2f} MiB {:.2f} MiB/s', UP: '{:.2f} MiB {:.2f} MiB/s'",
- Ep->DisplayName(),
- HitRate,
- DownBytes,
- DownSpeed,
- UpBytes,
- UpSpeed);
+ Logger.debug("STATS - '{}', Hit rate: {:.2f}%, DOWN: '{:.2f} MiB {:.2f} MiB/s', UP: '{:.2f} MiB {:.2f} MiB/s'",
+ Ep->DisplayName(),
+ HitRate,
+ DownBytes,
+ DownSpeed,
+ UpBytes,
+ UpSpeed);
}
}