aboutsummaryrefslogtreecommitdiff
path: root/zenserver
diff options
context:
space:
mode:
Diffstat (limited to 'zenserver')
-rw-r--r--zenserver/cache/structuredcache.cpp28
-rw-r--r--zenserver/cache/structuredcache.h3
-rw-r--r--zenserver/config.cpp2
-rw-r--r--zenserver/experimental/usnjournal.cpp2
-rw-r--r--zenserver/projectstore.cpp4
-rw-r--r--zenserver/testing/httptest.cpp17
-rw-r--r--zenserver/testing/httptest.h6
-rw-r--r--zenserver/upstream/jupiter.cpp103
-rw-r--r--zenserver/upstream/jupiter.h15
-rw-r--r--zenserver/upstream/upstreamcache.cpp144
-rw-r--r--zenserver/upstream/upstreamcache.h4
11 files changed, 298 insertions, 30 deletions
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp
index 74cee6614..dc96aecae 100644
--- a/zenserver/cache/structuredcache.cpp
+++ b/zenserver/cache/structuredcache.cpp
@@ -6,6 +6,7 @@
#include <zencore/compress.h>
#include <zencore/fmtutils.h>
#include <zencore/logging.h>
+#include <zencore/scopeguard.h>
#include <zencore/stream.h>
#include <zencore/timer.h>
#include <zenhttp/httpserver.h>
@@ -193,10 +194,17 @@ HttpStructuredCacheService::HandleRequest(HttpServerRequest& Request)
{
CacheRef Ref;
+ metrics::OperationTiming::Scope $(m_HttpRequests);
+
if (!ValidateKeyUri(Request, /* out */ Ref))
{
std::string_view Key = Request.RelativeUri();
+ if (Key.empty())
+ {
+ return HandleStatusRequest(Request);
+ }
+
if (std::all_of(begin(Key), end(Key), [](const char c) { return std::isalnum(c); }))
{
// Bucket reference
@@ -262,11 +270,11 @@ HttpStructuredCacheService::HandleCacheRecordRequest(HttpServerRequest& Request,
case kHead:
case kGet:
{
- HandleGetCacheRecord(Request, Ref, Policy);
if (Verb == kHead)
{
Request.SetSuppressResponseBody();
}
+ HandleGetCacheRecord(Request, Ref, Policy);
}
break;
case kPut:
@@ -829,4 +837,22 @@ HttpStructuredCacheService::ValidateKeyUri(HttpServerRequest& Request, CacheRef&
return true;
}
+
+void
+HttpStructuredCacheService::HandleStatusRequest(zen::HttpServerRequest& Request)
+{
+ CbObjectWriter Cbo;
+ Cbo << "ok" << true;
+
+ EmitSnapshot("requests", m_HttpRequests, Cbo);
+ if (m_UpstreamCache)
+ {
+ Cbo.BeginObject("upstream");
+ m_UpstreamCache->GetStatus(Cbo);
+ Cbo.EndObject();
+ }
+
+ Request.WriteResponse(HttpResponseCode::OK, Cbo.Save());
+}
+
} // namespace zen
diff --git a/zenserver/cache/structuredcache.h b/zenserver/cache/structuredcache.h
index 3fdaa1236..47fc173e9 100644
--- a/zenserver/cache/structuredcache.h
+++ b/zenserver/cache/structuredcache.h
@@ -2,6 +2,7 @@
#pragma once
+#include <zencore/stats.h>
#include <zenhttp/httpserver.h>
#include <memory>
@@ -78,6 +79,7 @@ private:
void HandleGetCachePayload(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy);
void HandlePutCachePayload(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy);
void HandleCacheBucketRequest(zen::HttpServerRequest& Request, std::string_view Bucket);
+ void HandleStatusRequest(zen::HttpServerRequest& Request);
spdlog::logger& Log() { return m_Log; }
spdlog::logger& m_Log;
@@ -86,6 +88,7 @@ private:
zen::CidStore& m_CidStore;
std::unique_ptr<UpstreamCache> m_UpstreamCache;
uint64_t m_LastScrubTime = 0;
+ metrics::OperationTiming m_HttpRequests;
};
} // namespace zen
diff --git a/zenserver/config.cpp b/zenserver/config.cpp
index 42f59b26c..254032226 100644
--- a/zenserver/config.cpp
+++ b/zenserver/config.cpp
@@ -227,7 +227,7 @@ ParseGlobalCliOptions(int argc, char* argv[], ZenServerOptions& GlobalOptions, Z
"",
"upstream-stats",
"Collect performance metrics for upstream endpoints",
- cxxopts::value<bool>(ServiceConfig.UpstreamCacheConfig.StatsEnabled)->default_value("false"),
+ cxxopts::value<bool>(ServiceConfig.UpstreamCacheConfig.StatsEnabled)->default_value("true"),
"");
try
diff --git a/zenserver/experimental/usnjournal.cpp b/zenserver/experimental/usnjournal.cpp
index 1e765fbe5..d575e1779 100644
--- a/zenserver/experimental/usnjournal.cpp
+++ b/zenserver/experimental/usnjournal.cpp
@@ -259,7 +259,7 @@ UsnJournalReader::Initialize(std::filesystem::path VolumePath)
}
}
- const auto ElapsedMs = Timer.getElapsedTimeMs();
+ const auto ElapsedMs = Timer.GetElapsedTimeMs();
ZEN_INFO("MFT enumeration of {} completed after {} ({})",
zen::NiceBytes(MftBytesProcessed),
diff --git a/zenserver/projectstore.cpp b/zenserver/projectstore.cpp
index 1a9eb2c67..7870f9559 100644
--- a/zenserver/projectstore.cpp
+++ b/zenserver/projectstore.cpp
@@ -191,7 +191,7 @@ struct ProjectStore::OplogStorage : public RefCounted
});
ZEN_INFO("Oplog replay completed in {} - Max LSN# {}, Next offset: {}",
- NiceTimeSpanMs(Timer.getElapsedTimeMs()),
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()),
m_MaxLsn,
m_NextOpsOffset);
}
@@ -502,7 +502,7 @@ ProjectStore::Oplog::RegisterOplogEntry(CbObject Core, const OplogEntry& OpEntry
}
}
- ZEN_DEBUG("added {} file(s) in {}", FileCount, NiceTimeSpanMs(Timer.getElapsedTimeMs()));
+ ZEN_DEBUG("added {} file(s) in {}", FileCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
}
for (CbFieldView& Entry : Core["meta"sv])
diff --git a/zenserver/testing/httptest.cpp b/zenserver/testing/httptest.cpp
index 18d63a6ef..01866a63b 100644
--- a/zenserver/testing/httptest.cpp
+++ b/zenserver/testing/httptest.cpp
@@ -15,6 +15,23 @@ HttpTestingService::HttpTestingService()
HttpVerb::kGet);
m_Router.RegisterRoute(
+ "metrics",
+ [this](HttpRouterRequest& Req) {
+ metrics::OperationTiming::Scope _(m_TimingStats);
+ Req.ServerRequest().WriteResponse(HttpResponseCode::OK);
+ },
+ HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
+ "get_metrics",
+ [this](HttpRouterRequest& Req) {
+ CbObjectWriter Cbo;
+ EmitSnapshot("requests", m_TimingStats, Cbo);
+ Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Cbo.Save());
+ },
+ HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
"json",
[this](HttpRouterRequest& Req) {
CbObjectWriter Obj;
diff --git a/zenserver/testing/httptest.h b/zenserver/testing/httptest.h
index f55780d05..f7ea0c31c 100644
--- a/zenserver/testing/httptest.h
+++ b/zenserver/testing/httptest.h
@@ -3,6 +3,7 @@
#pragma once
#include <zencore/logging.h>
+#include <zencore/stats.h>
#include <zenhttp/httpserver.h>
#include <atomic>
@@ -39,8 +40,9 @@ public:
};
private:
- HttpRequestRouter m_Router;
- std::atomic<uint32_t> m_Counter{0};
+ HttpRequestRouter m_Router;
+ std::atomic<uint32_t> m_Counter{0};
+ metrics::OperationTiming m_TimingStats;
RwLock m_RwLock;
std::unordered_map<uint32_t, Ref<PackageHandler>> m_HandlerMap;
diff --git a/zenserver/upstream/jupiter.cpp b/zenserver/upstream/jupiter.cpp
index b93635e76..0397ddaa0 100644
--- a/zenserver/upstream/jupiter.cpp
+++ b/zenserver/upstream/jupiter.cpp
@@ -299,13 +299,16 @@ CloudCacheSession::PutDerivedData(std::string_view BucketId, const IoHash& Key,
return PutDerivedData(BucketId, Key.ToHexString(), DerivedData);
}
-CloudCacheResult
+PutRefResult
CloudCacheSession::PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer Ref, ZenContentType RefType)
{
const CloudCacheAccessToken& AccessToken = GetAccessToken();
if (!AccessToken.IsValid())
{
- return {.ErrorCode = 401, .Reason = std::string("Invalid access token")};
+ PutRefResult Result;
+ Result.ErrorCode = 401;
+ Result.Reason = "Invalid access token"sv;
+ return Result;
}
IoHash Hash = IoHash::HashBuffer(Ref.Data(), Ref.Size());
@@ -328,16 +331,102 @@ CloudCacheSession::PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer
if (Response.error)
{
- return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message};
+ PutRefResult Result;
+ Result.ErrorCode = static_cast<int32_t>(Response.error.code);
+ Result.Reason = std::move(Response.error.message);
+ return Result;
}
else if (!VerifyAccessToken(Response.status_code))
{
- return {.ErrorCode = 401, .Reason = std::string("Invalid access token")};
+ PutRefResult Result;
+ Result.ErrorCode = 401;
+ Result.Reason = "Invalid access token"sv;
+ return Result;
}
- return {.Bytes = Response.uploaded_bytes,
- .ElapsedSeconds = Response.elapsed,
- .Success = (Response.status_code == 200 || Response.status_code == 201)};
+ PutRefResult Result;
+ Result.Success = (Response.status_code == 200 || Response.status_code == 201);
+ Result.Bytes = Response.uploaded_bytes;
+ Result.ElapsedSeconds = Response.elapsed;
+
+ if (Result.Success)
+ {
+ std::string JsonError;
+ json11::Json Json = json11::Json::parse(Response.text, JsonError);
+ if (JsonError.empty())
+ {
+ json11::Json::array Needs = Json["needs"].array_items();
+ for (const auto& Need : Needs)
+ {
+ Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value()));
+ }
+ }
+ }
+
+ return Result;
+}
+
+FinalizeRefResult
+CloudCacheSession::FinalizeRef(std::string_view BucketId, const IoHash& Key, const IoHash& RefHash)
+{
+ const CloudCacheAccessToken& AccessToken = GetAccessToken();
+ if (!AccessToken.IsValid())
+ {
+ FinalizeRefResult Result;
+ Result.ErrorCode = 401;
+ Result.Reason = "Invalid access token"sv;
+ return Result;
+ }
+
+ ExtendableStringBuilder<256> Uri;
+ Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << m_CacheClient->BlobStoreNamespace() << "/" << BucketId << "/"
+ << Key.ToHexString() << "/finalize/" << RefHash.ToHexString();
+
+ cpr::Session& Session = m_SessionState->Session;
+
+ Session.SetOption(cpr::Url{Uri.c_str()});
+ Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value},
+ {"X-Jupiter-IoHash", RefHash.ToHexString()},
+ {"Content-Type", "application/x-ue-cb"}});
+
+ cpr::Response Response = Session.Post();
+ ZEN_DEBUG("POST {}", Response);
+
+ if (Response.error)
+ {
+ FinalizeRefResult Result;
+ Result.ErrorCode = static_cast<int32_t>(Response.error.code);
+ Result.Reason = std::move(Response.error.message);
+ return Result;
+ }
+ else if (!VerifyAccessToken(Response.status_code))
+ {
+ FinalizeRefResult Result;
+ Result.ErrorCode = 401;
+ Result.Reason = "Invalid access token"sv;
+ return Result;
+ }
+
+ FinalizeRefResult Result;
+ Result.Success = (Response.status_code == 200 || Response.status_code == 201);
+ Result.Bytes = Response.uploaded_bytes;
+ Result.ElapsedSeconds = Response.elapsed;
+
+ if (Result.Success)
+ {
+ std::string JsonError;
+ json11::Json Json = json11::Json::parse(Response.text, JsonError);
+ if (JsonError.empty())
+ {
+ json11::Json::array Needs = Json["needs"].array_items();
+ for (const auto& Need : Needs)
+ {
+ Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value()));
+ }
+ }
+ }
+
+ return Result;
}
CloudCacheResult
diff --git a/zenserver/upstream/jupiter.h b/zenserver/upstream/jupiter.h
index d8844279e..9573a1631 100644
--- a/zenserver/upstream/jupiter.h
+++ b/zenserver/upstream/jupiter.h
@@ -2,6 +2,7 @@
#pragma once
+#include <zencore/iohash.h>
#include <zencore/logging.h>
#include <zencore/refcount.h>
#include <zencore/thread.h>
@@ -53,6 +54,16 @@ struct CloudCacheResult
bool Success = false;
};
+struct PutRefResult : CloudCacheResult
+{
+ std::vector<IoHash> Needs;
+};
+
+struct FinalizeRefResult : CloudCacheResult
+{
+ std::vector<IoHash> Needs;
+};
+
/**
* Context for performing Jupiter operations
*
@@ -76,11 +87,13 @@ public:
CloudCacheResult PutDerivedData(std::string_view BucketId, std::string_view Key, IoBuffer DerivedData);
CloudCacheResult PutDerivedData(std::string_view BucketId, const IoHash& Key, IoBuffer DerivedData);
- CloudCacheResult PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer Ref, ZenContentType RefType);
+ PutRefResult PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer Ref, ZenContentType RefType);
CloudCacheResult PutBlob(const IoHash& Key, IoBuffer Blob);
CloudCacheResult PutCompressedBlob(const IoHash& Key, IoBuffer Blob);
CloudCacheResult PutObject(const IoHash& Key, IoBuffer Object);
+ FinalizeRefResult FinalizeRef(std::string_view BucketId, const IoHash& Key, const IoHash& RefHah);
+
CloudCacheResult DerivedDataExists(std::string_view BucketId, std::string_view Key);
CloudCacheResult DerivedDataExists(std::string_view BucketId, const IoHash& Key);
CloudCacheResult RefExists(std::string_view BucketId, const IoHash& Key);
diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp
index 0dd16cd06..03054b542 100644
--- a/zenserver/upstream/upstreamcache.cpp
+++ b/zenserver/upstream/upstreamcache.cpp
@@ -9,6 +9,7 @@
#include <zencore/compactbinarypackage.h>
#include <zencore/compactbinaryvalidation.h>
#include <zencore/fmtutils.h>
+#include <zencore/stats.h>
#include <zencore/stream.h>
#include <zencore/timer.h>
@@ -45,6 +46,7 @@ namespace detail {
{
std::lock_guard Lock(m_Lock);
m_Queue.emplace_back(std::move(Item));
+ m_Size++;
}
m_NewItemSignal.notify_one();
@@ -64,6 +66,7 @@ namespace detail {
{
Item = std::move(m_Queue.front());
m_Queue.pop_front();
+ m_Size--;
return true;
}
@@ -80,7 +83,7 @@ namespace detail {
}
}
- std::size_t Num() const
+ std::size_t Size() const
{
std::unique_lock Lock(m_Lock);
return m_Queue.size();
@@ -91,12 +94,15 @@ namespace detail {
std::condition_variable m_NewItemSignal;
std::deque<T> m_Queue;
std::atomic_bool m_CompleteAdding{false};
+ std::atomic_uint32_t m_Size;
};
class JupiterUpstreamEndpoint final : public UpstreamEndpoint
{
public:
- JupiterUpstreamEndpoint(const CloudCacheClientOptions& Options) : m_UseLegacyDdc(Options.UseLegacyDdc)
+ JupiterUpstreamEndpoint(const CloudCacheClientOptions& Options)
+ : m_Log(zen::logging::Get("upstream"))
+ , m_UseLegacyDdc(Options.UseLegacyDdc)
{
using namespace fmt::literals;
m_DisplayName = "Jupier - '{}'"_format(Options.ServiceUrl);
@@ -277,16 +283,82 @@ namespace detail {
Success = false;
for (int32_t Attempt = 0; Attempt < MaxAttempts; Attempt++)
{
- if (CloudCacheResult Result = Session.PutRef(CacheRecord.CacheKey.Bucket,
- CacheRecord.CacheKey.Hash,
- RecordValue,
- ZenContentType::kCbObject);
+ if (PutRefResult Result = Session.PutRef(CacheRecord.CacheKey.Bucket,
+ CacheRecord.CacheKey.Hash,
+ RecordValue,
+ ZenContentType::kCbObject);
Result.Success)
{
TotalBytes += Result.Bytes;
TotalElapsedSeconds += Result.ElapsedSeconds;
Success = true;
- break;
+
+ if (!Result.Needs.empty())
+ {
+ for (const IoHash& NeededHash : Result.Needs)
+ {
+ Success = false;
+
+ if (auto It =
+ std::find(std::begin(CacheRecord.PayloadIds), std::end(CacheRecord.PayloadIds), NeededHash);
+ It != std::end(CacheRecord.PayloadIds))
+ {
+ const size_t Idx = It - std::begin(CacheRecord.PayloadIds);
+
+ if (CloudCacheResult BlobResult =
+ Session.PutCompressedBlob(CacheRecord.PayloadIds[Idx], Payloads[Idx]);
+ BlobResult.Success)
+ {
+ TotalBytes += BlobResult.Bytes;
+ TotalElapsedSeconds += BlobResult.ElapsedSeconds;
+ Success = true;
+ }
+ else
+ {
+ ZEN_WARN("upload missing payload '{}/{}/{}' FAILED",
+ CacheRecord.CacheKey.Bucket,
+ CacheRecord.CacheKey.Hash,
+ NeededHash);
+ }
+ }
+ else
+ {
+ ZEN_WARN("needed payload '{}/{}/{}' MISSING",
+ CacheRecord.CacheKey.Bucket,
+ CacheRecord.CacheKey.Hash,
+ NeededHash);
+ }
+ }
+
+ const IoHash RefHash = IoHash::HashBuffer(RecordValue);
+
+ if (FinalizeRefResult FinalizeResult =
+ Session.FinalizeRef(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RefHash);
+ FinalizeResult.Success)
+ {
+ TotalBytes += FinalizeResult.Bytes;
+ TotalElapsedSeconds += FinalizeResult.ElapsedSeconds;
+ Success = true;
+
+ for (const IoHash& MissingHash : FinalizeResult.Needs)
+ {
+ ZEN_WARN("finalize '{}/{}' FAILED, missing '{}'",
+ CacheRecord.CacheKey.Bucket,
+ CacheRecord.CacheKey.Hash,
+ MissingHash);
+ }
+ }
+ else
+ {
+ ZEN_WARN("finalize '{}/{}' FAILED", CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash);
+ Success = false;
+ }
+ }
+
+ if (Success)
+ {
+ break;
+ }
}
}
@@ -302,6 +374,9 @@ namespace detail {
virtual UpstreamEndpointStats& Stats() override { return m_Stats; }
private:
+ spdlog::logger& Log() { return m_Log; }
+
+ spdlog::logger& m_Log;
bool m_UseLegacyDdc;
std::string m_DisplayName;
RefPtr<CloudCacheClient> m_Client;
@@ -515,7 +590,12 @@ struct UpstreamStats
}
UpstreamEndpointStats& Stats = Endpoint.Stats();
- if (Result.Success)
+
+ if (Result.Error)
+ {
+ Stats.ErrorCount++;
+ }
+ else if (Result.Success)
{
Stats.HitCount++;
Stats.DownBytes.fetch_add(double(Result.Bytes) / 1024.0 / 1024.0);
@@ -549,6 +629,10 @@ struct UpstreamStats
Stats.UpBytes.fetch_add(double(Result.Bytes) / 1024.0 / 1024.0);
Stats.SecondsUp.fetch_add(Result.ElapsedSeconds);
}
+ else
+ {
+ Stats.ErrorCount++;
+ }
if (m_SampleCount++ % MaxSampleCount)
{
@@ -575,13 +659,13 @@ struct UpstreamStats
const uint64_t TotalCount = HitCount + MissCount;
const double HitRate = TotalCount > 0 ? (double(HitCount) / double(TotalCount)) * 100.0 : 0.0;
- Logger.info("STATS - '{}', Hit rate: {:.2f}%, DOWN: '{:.2f} MiB {:.2f} MiB/s', UP: '{:.2f} MiB {:.2f} MiB/s'",
- Ep->DisplayName(),
- HitRate,
- DownBytes,
- DownSpeed,
- UpBytes,
- UpSpeed);
+ Logger.debug("STATS - '{}', Hit rate: {:.2f}%, DOWN: '{:.2f} MiB {:.2f} MiB/s', UP: '{:.2f} MiB {:.2f} MiB/s'",
+ Ep->DisplayName(),
+ HitRate,
+ DownBytes,
+ DownSpeed,
+ UpBytes,
+ UpSpeed);
}
}
@@ -700,6 +784,36 @@ public:
return {};
}
+ virtual void GetStatus(CbObjectWriter& Status) override
+ {
+ Status << "reading" << m_Options.ReadUpstream;
+ Status << "writing" << m_Options.WriteUpstream;
+ Status << "worker_threads" << m_Options.ThreadCount;
+ Status << "queue_count" << m_UpstreamQueue.Size();
+
+ Status.BeginArray("endpoints");
+ for (const auto& Ep : m_Endpoints)
+ {
+ Status.BeginObject();
+ Status << "name" << Ep->DisplayName();
+ Status << "health" << (Ep->IsHealthy() ? "ok"sv : "inactive"sv);
+
+ UpstreamEndpointStats& Stats = Ep->Stats();
+ const uint64_t HitCount = Stats.HitCount;
+ const uint64_t MissCount = Stats.MissCount;
+ const uint64_t TotalCount = HitCount + MissCount;
+ const double HitRate = TotalCount > 0 ? (double(HitCount) / double(TotalCount)) * 100.0 : 0.0;
+
+ Status << "hit_ratio" << HitRate;
+ Status << "downloaded_mb" << Stats.DownBytes;
+ Status << "uploaded_mb" << Stats.UpBytes;
+ Status << "error_count" << Stats.ErrorCount;
+
+ Status.EndObject();
+ }
+ Status.EndArray();
+ }
+
private:
void ProcessCacheRecord(UpstreamCacheRecord CacheRecord)
{
diff --git a/zenserver/upstream/upstreamcache.h b/zenserver/upstream/upstreamcache.h
index 0e736480b..a6b1e9784 100644
--- a/zenserver/upstream/upstreamcache.h
+++ b/zenserver/upstream/upstreamcache.h
@@ -12,6 +12,7 @@
namespace zen {
+class CbObjectWriter;
class CidStore;
class ZenCacheStore;
struct CloudCacheClientOptions;
@@ -86,6 +87,7 @@ struct UpstreamEndpointStats
std::atomic_uint64_t HitCount = {};
std::atomic_uint64_t MissCount = {};
std::atomic_uint64_t UpCount = {};
+ std::atomic_uint64_t ErrorCount = {};
std::atomic<double> UpBytes = {};
std::atomic<double> DownBytes = {};
std::atomic<double> SecondsUp = {};
@@ -139,6 +141,8 @@ public:
};
virtual EnqueueResult EnqueueUpstream(UpstreamCacheRecord CacheRecord) = 0;
+
+ virtual void GetStatus(CbObjectWriter& CbO) = 0;
};
std::unique_ptr<UpstreamCache> MakeUpstreamCache(const UpstreamCacheOptions& Options, ZenCacheStore& CacheStore, CidStore& CidStore);