aboutsummaryrefslogtreecommitdiff
path: root/zenserver/upstream/upstreamcache.cpp
diff options
context:
space:
mode:
authorMartin Ridgers <[email protected]>2021-09-21 11:06:13 +0200
committerMartin Ridgers <[email protected]>2021-09-21 11:06:13 +0200
commit68c951e0f440ffd483795dced737e88152c1a581 (patch)
tree5c0910ca2a85b45fb05dba3ce457b7d156213894 /zenserver/upstream/upstreamcache.cpp
parentMerge main into linux-mac (diff)
parentTrigger storage scrubbing pass at startup (diff)
downloadzen-68c951e0f440ffd483795dced737e88152c1a581.tar.xz
zen-68c951e0f440ffd483795dced737e88152c1a581.zip
Merged main into linux-mac
Diffstat (limited to 'zenserver/upstream/upstreamcache.cpp')
-rw-r--r--zenserver/upstream/upstreamcache.cpp97
1 files changed, 58 insertions, 39 deletions
diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp
index 38d30a795..d6b6d44be 100644
--- a/zenserver/upstream/upstreamcache.cpp
+++ b/zenserver/upstream/upstreamcache.cpp
@@ -94,7 +94,7 @@ namespace detail {
std::atomic_bool m_CompleteAdding{false};
};
- class JupiterUpstreamEndpoint final : public zen::UpstreamEndpoint
+ class JupiterUpstreamEndpoint final : public UpstreamEndpoint
{
public:
JupiterUpstreamEndpoint(const CloudCacheClientOptions& Options) : m_UseLegacyDdc(Options.UseLegacyDdc)
@@ -108,8 +108,9 @@ namespace detail {
virtual bool Initialize() override
{
- // TODO: Test and authenticate Jupiter client connection
- return !m_Client->ServiceUrl().empty();
+ CloudCacheSession Session(m_Client);
+ const CloudCacheResult Result = Session.Authenticate();
+ return Result.Success;
}
virtual std::string_view DisplayName() const override { return m_DisplayName; }
@@ -118,8 +119,8 @@ namespace detail {
{
try
{
- zen::CloudCacheSession Session(m_Client);
- CloudCacheResult Result;
+ CloudCacheSession Session(m_Client);
+ CloudCacheResult Result;
if (m_UseLegacyDdc && Type == ZenContentType::kBinary)
{
@@ -134,7 +135,7 @@ namespace detail {
{
CbPackage Package;
- const CbValidateError ValidationResult = zen::ValidateCompactBinary(Result.Response, CbValidateMode::All);
+ const CbValidateError ValidationResult = ValidateCompactBinary(Result.Response, CbValidateMode::All);
if (Result.Success = ValidationResult == CbValidateError::None; Result.Success)
{
CbObject CacheRecord = LoadCompactBinaryObject(Result.Response);
@@ -183,7 +184,7 @@ namespace detail {
{
try
{
- zen::CloudCacheSession Session(m_Client);
+ CloudCacheSession Session(m_Client);
const CloudCacheResult Result = Session.GetCompressedBlob(PayloadKey.PayloadId);
return {.Value = Result.Response,
@@ -278,7 +279,7 @@ namespace detail {
RefPtr<CloudCacheClient> m_Client;
};
- class ZenUpstreamEndpoint final : public zen::UpstreamEndpoint
+ class ZenUpstreamEndpoint final : public UpstreamEndpoint
{
public:
ZenUpstreamEndpoint(std::string_view ServiceUrl)
@@ -292,8 +293,20 @@ namespace detail {
virtual bool Initialize() override
{
- // TODO: Test and authenticate Zen client connection
- return !m_Client->ServiceUrl().empty();
+ try
+ {
+ ZenStructuredCacheSession Session(*m_Client);
+ ZenCacheResult Result;
+ for (int32_t Attempt = 0, MaxAttempts = 3; Attempt < MaxAttempts && !Result.Success; ++Attempt)
+ {
+ Result = Session.SayHello();
+ }
+ return Result.Success;
+ }
+ catch (std::exception&)
+ {
+ return false;
+ }
}
virtual std::string_view DisplayName() const override { return m_DisplayName; }
@@ -344,14 +357,14 @@ namespace detail {
try
{
- zen::ZenStructuredCacheSession Session(*m_Client);
- ZenCacheResult Result;
- int64_t TotalBytes = 0ull;
- double TotalElapsedSeconds = 0.0;
+ ZenStructuredCacheSession Session(*m_Client);
+ ZenCacheResult Result;
+ int64_t TotalBytes = 0ull;
+ double TotalElapsedSeconds = 0.0;
if (CacheRecord.Type == ZenContentType::kCbPackage)
{
- zen::CbPackage Package;
+ CbPackage Package;
Package.SetObject(CbObject(SharedBuffer(RecordValue)));
for (const IoBuffer& Payload : Payloads)
@@ -427,8 +440,8 @@ namespace detail {
}
private:
- std::string m_DisplayName;
- RefPtr<zen::ZenStructuredCacheClient> m_Client;
+ std::string m_DisplayName;
+ RefPtr<ZenStructuredCacheClient> m_Client;
};
} // namespace detail
@@ -455,7 +468,7 @@ class UpstreamStats final
};
public:
- UpstreamStats() : m_Log(zen::logging::Get("upstream")) {}
+ UpstreamStats() : m_Log(logging::Get("upstream")) {}
void Add(const UpstreamEndpoint& Endpoint, const GetUpstreamCacheResult& Result)
{
@@ -523,8 +536,8 @@ private:
class DefaultUpstreamCache final : public UpstreamCache
{
public:
- DefaultUpstreamCache(const UpstreamCacheOptions& Options, ::ZenCacheStore& CacheStore, CidStore& CidStore)
- : m_Log(zen::logging::Get("upstream"))
+ DefaultUpstreamCache(const UpstreamCacheOptions& Options, ZenCacheStore& CacheStore, CidStore& CidStore)
+ : m_Log(logging::Get("upstream"))
, m_Options(Options)
, m_CacheStore(CacheStore)
, m_CidStore(CidStore)
@@ -559,12 +572,15 @@ public:
virtual GetUpstreamCacheResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) override
{
- for (auto& Endpoint : m_Endpoints)
+ if (m_Options.ReadUpstream)
{
- if (GetUpstreamCacheResult Result = Endpoint->GetCacheRecord(CacheKey, Type); Result.Success)
+ for (auto& Endpoint : m_Endpoints)
{
- m_Stats.Add(*Endpoint, Result);
- return Result;
+ if (GetUpstreamCacheResult Result = Endpoint->GetCacheRecord(CacheKey, Type); Result.Success)
+ {
+ m_Stats.Add(*Endpoint, Result);
+ return Result;
+ }
}
}
@@ -573,12 +589,15 @@ public:
virtual GetUpstreamCacheResult GetCachePayload(UpstreamPayloadKey PayloadKey) override
{
- for (auto& Endpoint : m_Endpoints)
+ if (m_Options.ReadUpstream)
{
- if (GetUpstreamCacheResult Result = Endpoint->GetCachePayload(PayloadKey); Result.Success)
+ for (auto& Endpoint : m_Endpoints)
{
- m_Stats.Add(*Endpoint, Result);
- return Result;
+ if (GetUpstreamCacheResult Result = Endpoint->GetCachePayload(PayloadKey); Result.Success)
+ {
+ m_Stats.Add(*Endpoint, Result);
+ return Result;
+ }
}
}
@@ -587,7 +606,7 @@ public:
virtual EnqueueResult EnqueueUpstream(UpstreamCacheRecord CacheRecord) override
{
- if (m_IsRunning.load())
+ if (m_IsRunning.load() && m_Options.WriteUpstream)
{
if (!m_UpstreamThreads.empty())
{
@@ -697,21 +716,21 @@ private:
spdlog::logger& Log() { return m_Log; }
- spdlog::logger& m_Log;
- UpstreamCacheOptions m_Options;
- ::ZenCacheStore& m_CacheStore;
- CidStore& m_CidStore;
- UpstreamQueue m_UpstreamQueue;
- UpstreamStats m_Stats;
- std::vector<std::unique_ptr<zen::UpstreamEndpoint>> m_Endpoints;
- std::vector<std::thread> m_UpstreamThreads;
- std::atomic_bool m_IsRunning{false};
+ spdlog::logger& m_Log;
+ UpstreamCacheOptions m_Options;
+ ZenCacheStore& m_CacheStore;
+ CidStore& m_CidStore;
+ UpstreamQueue m_UpstreamQueue;
+ UpstreamStats m_Stats;
+ std::vector<std::unique_ptr<UpstreamEndpoint>> m_Endpoints;
+ std::vector<std::thread> m_UpstreamThreads;
+ std::atomic_bool m_IsRunning{false};
};
//////////////////////////////////////////////////////////////////////////
std::unique_ptr<UpstreamCache>
-MakeUpstreamCache(const UpstreamCacheOptions& Options, ::ZenCacheStore& CacheStore, CidStore& CidStore)
+MakeUpstreamCache(const UpstreamCacheOptions& Options, ZenCacheStore& CacheStore, CidStore& CidStore)
{
return std::make_unique<DefaultUpstreamCache>(Options, CacheStore, CidStore);
}