aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2023-08-17 09:30:40 +0200
committerGitHub <[email protected]>2023-08-17 09:30:40 +0200
commit6b06cffdb84fdb9b010e6aefc642db763b6386d5 (patch)
tree193ba716a459fa838f71cf5af19dd6483614066a /src
parentchangelog (diff)
downloadzen-6b06cffdb84fdb9b010e6aefc642db763b6386d5.tar.xz
zen-6b06cffdb84fdb9b010e6aefc642db763b6386d5.zip
skip upstream logic early if we have no upstream endpoints (#359)
* Skip upstream logic early if we have not upstream endpoints * make cache store logging of CbObjects async * changelog
Diffstat (limited to 'src')
-rw-r--r--src/zenserver/cache/httpstructuredcache.cpp44
-rw-r--r--src/zenserver/cache/structuredcachestore.cpp65
-rw-r--r--src/zenserver/cache/structuredcachestore.h5
-rw-r--r--src/zenserver/upstream/upstreamcache.cpp6
-rw-r--r--src/zenserver/upstream/upstreamcache.h2
5 files changed, 85 insertions, 37 deletions
diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp
index 7c4097d4f..0120f3599 100644
--- a/src/zenserver/cache/httpstructuredcache.cpp
+++ b/src/zenserver/cache/httpstructuredcache.cpp
@@ -888,6 +888,8 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con
return Request.WriteResponse(HttpResponseCode::OK);
}
+ const bool HasUpstream = m_UpstreamCache.IsActive();
+
CacheRequestContext RequestContext = {.SessionId = Request.SessionId(), .RequestId = Request.RequestId()};
Stopwatch Timer;
@@ -973,7 +975,7 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con
return Request.WriteResponse(HttpResponseCode::OK, ClientResultValue.Value.GetContentType(), ClientResultValue.Value);
}
}
- else if (!EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryRemote))
+ else if (!HasUpstream || !EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryRemote))
{
ZEN_DEBUG("GETCACHERECORD MISS - '{}/{}/{}' '{}' in {}",
Ref.Namespace,
@@ -1212,6 +1214,8 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con
CacheRequestContext RequestContext = {.SessionId = Request.SessionId(), .RequestId = Request.RequestId()};
+ const bool HasUpstream = m_UpstreamCache.IsActive();
+
Stopwatch Timer;
if (ContentType == HttpContentType::kBinary || ContentType == HttpContentType::kCompressedBinary)
@@ -1237,7 +1241,7 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con
Ref.HashKey,
{.Value = Body, .RawSize = RawSize, .RawHash = RawHash});
- if (EnumHasAllFlags(PolicyFromUrl, CachePolicy::StoreRemote))
+ if (HasUpstream && EnumHasAllFlags(PolicyFromUrl, CachePolicy::StoreRemote))
{
m_UpstreamCache.EnqueueUpstream({.Type = ContentType, .Namespace = Ref.Namespace, .Key = {Ref.BucketSegment, Ref.HashKey}});
}
@@ -1294,7 +1298,7 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con
const bool IsPartialRecord = TotalCount != static_cast<int32_t>(ValidAttachments.size());
CachePolicy Policy = PolicyFromUrl;
- if (EnumHasAllFlags(Policy, CachePolicy::StoreRemote) && !IsPartialRecord)
+ if (HasUpstream && EnumHasAllFlags(Policy, CachePolicy::StoreRemote) && !IsPartialRecord)
{
m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbObject,
.Namespace = Ref.Namespace,
@@ -1390,7 +1394,7 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con
const bool IsPartialRecord = Count.Valid != Count.Total;
- if (EnumHasAllFlags(Policy, CachePolicy::StoreRemote) && !IsPartialRecord)
+ if (HasUpstream && EnumHasAllFlags(Policy, CachePolicy::StoreRemote) && !IsPartialRecord)
{
m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbPackage,
.Namespace = Ref.Namespace,
@@ -1431,8 +1435,10 @@ HttpStructuredCacheService::HandleGetCacheChunk(HttpServerRequest& Request, cons
IoBuffer Value = m_CidStore.FindChunkByCid(Ref.ValueContentId);
const UpstreamEndpointInfo* Source = nullptr;
CachePolicy Policy = PolicyFromUrl;
+
+ const bool HasUpstream = m_UpstreamCache.IsActive();
{
- const bool QueryUpstream = !Value && EnumHasAllFlags(Policy, CachePolicy::QueryRemote);
+ const bool QueryUpstream = HasUpstream && !Value && EnumHasAllFlags(Policy, CachePolicy::QueryRemote);
if (QueryUpstream)
{
@@ -1835,6 +1841,8 @@ HttpStructuredCacheService::PutCacheRecord(PutRequestData& Request, const CbPack
ValidAttachments.reserve(NumAttachments);
AttachmentsToStoreLocally.reserve(NumAttachments);
+ const bool HasUpstream = m_UpstreamCache.IsActive();
+
Stopwatch Timer;
Request.RecordObject.IterateAttachments(
@@ -1901,7 +1909,7 @@ HttpStructuredCacheService::PutCacheRecord(PutRequestData& Request, const CbPack
const bool IsPartialRecord = Count.Valid != Count.Total;
- if (EnumHasAllFlags(Request.Policy.GetRecordPolicy(), CachePolicy::StoreRemote) && !IsPartialRecord)
+ if (HasUpstream && EnumHasAllFlags(Request.Policy.GetRecordPolicy(), CachePolicy::StoreRemote) && !IsPartialRecord)
{
m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbPackage,
.Namespace = Request.Namespace,
@@ -1948,6 +1956,9 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(const CacheRequestContext&
{
return CbPackage{};
}
+
+ const bool HasUpstream = m_UpstreamCache.IsActive();
+
std::vector<RecordRequestData> Requests;
std::vector<size_t> UpstreamIndexes;
CbArrayView RequestsArray = Params["Requests"sv].AsArrayView();
@@ -2062,8 +2073,8 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(const CacheRequestContext&
}
if (!Request.Complete)
{
- bool NeedUpstreamRecord =
- !Request.RecordObject && !FoundLocalInvalid && EnumHasAllFlags(Policy.GetRecordPolicy(), CachePolicy::QueryRemote);
+ bool NeedUpstreamRecord = HasUpstream && !Request.RecordObject && !FoundLocalInvalid &&
+ EnumHasAllFlags(Policy.GetRecordPolicy(), CachePolicy::QueryRemote);
if (NeedUpstreamRecord || NeedUpstreamAttachment)
{
UpstreamIndexes.push_back(Requests.size() - 1);
@@ -2265,6 +2276,8 @@ HttpStructuredCacheService::HandleRpcPutCacheValues(const CacheRequestContext& C
{
return CbPackage{};
}
+ const bool HasUpstream = m_UpstreamCache.IsActive();
+
std::vector<bool> Results;
for (CbFieldView RequestField : Params["Requests"sv])
{
@@ -2329,7 +2342,7 @@ HttpStructuredCacheService::HandleRpcPutCacheValues(const CacheRequestContext& C
// We do not search the Upstream. No data in a put means the caller is probing for whether they need to do a heavy put.
// If it doesn't exist locally they should do the heavy put rather than having us fetch it from upstream.
- if (Succeeded && EnumHasAllFlags(Policy, CachePolicy::StoreRemote))
+ if (HasUpstream && Succeeded && EnumHasAllFlags(Policy, CachePolicy::StoreRemote))
{
m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCompressedBinary, .Namespace = *Namespace, .Key = Key});
}
@@ -2387,6 +2400,8 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(const CacheRequestContext& C
std::vector<size_t> RemoteRequestIndexes;
+ const bool HasUpstream = m_UpstreamCache.IsActive();
+
for (CbFieldView RequestField : Params["Requests"sv])
{
Stopwatch Timer;
@@ -2428,7 +2443,7 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(const CacheRequestContext& C
NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
m_CacheStats.HitCount++;
}
- else if (EnumHasAllFlags(Policy, CachePolicy::QueryRemote))
+ else if (HasUpstream && EnumHasAllFlags(Policy, CachePolicy::QueryRemote))
{
RemoteRequestIndexes.push_back(Requests.size() - 1);
}
@@ -2748,6 +2763,7 @@ HttpStructuredCacheService::GetLocalCacheRecords(const CacheRequestContext&
std::vector<CacheChunkRequest*>& OutUpstreamChunks)
{
using namespace cache::detail;
+ const bool HasUpstream = m_UpstreamCache.IsActive();
std::vector<CacheKeyRequest*> UpstreamRecordRequests;
for (size_t RecordIndex = 0; RecordIndex < Records.size(); ++RecordIndex)
@@ -2768,7 +2784,7 @@ HttpStructuredCacheService::GetLocalCacheRecords(const CacheRequestContext&
Record.CacheValue = std::move(CacheValue.Value);
}
}
- if (!Record.Exists && EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::QueryRemote))
+ if (HasUpstream && !Record.Exists && EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::QueryRemote))
{
RecordKey.Policy = CacheRecordPolicy(ConvertToUpstream(Record.DownstreamPolicy));
UpstreamRecordRequests.push_back(&RecordKey);
@@ -2806,7 +2822,6 @@ HttpStructuredCacheService::GetLocalCacheRecords(const CacheRequestContext&
m_UpstreamCache.GetCacheRecords(Namespace, UpstreamRecordRequests, std::move(OnCacheRecordGetComplete));
}
- std::vector<CacheChunkRequest*> UpstreamPayloadRequests;
for (ChunkRequest* Request : RecordRequests)
{
Stopwatch Timer;
@@ -2883,7 +2898,7 @@ HttpStructuredCacheService::GetLocalCacheRecords(const CacheRequestContext&
}
}
}
- if (!Request->Exists && EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryRemote))
+ if (HasUpstream && !Request->Exists && EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryRemote))
{
Request->Key->Policy = ConvertToUpstream(Request->DownstreamPolicy);
OutUpstreamChunks.push_back(Request->Key);
@@ -2900,6 +2915,7 @@ HttpStructuredCacheService::GetLocalCacheValues(const CacheRequestContext&
std::vector<CacheChunkRequest*>& OutUpstreamChunks)
{
using namespace cache::detail;
+ const bool HasUpstream = m_UpstreamCache.IsActive();
for (ChunkRequest* Request : ValueRequests)
{
@@ -2922,7 +2938,7 @@ HttpStructuredCacheService::GetLocalCacheValues(const CacheRequestContext&
}
}
}
- if (!Request->Exists && EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryRemote))
+ if (HasUpstream && !Request->Exists && EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryRemote))
{
if (EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::StoreLocal))
{
diff --git a/src/zenserver/cache/structuredcachestore.cpp b/src/zenserver/cache/structuredcachestore.cpp
index 1847c724d..f80670a17 100644
--- a/src/zenserver/cache/structuredcachestore.cpp
+++ b/src/zenserver/cache/structuredcachestore.cpp
@@ -15,6 +15,7 @@
#include <zencore/thread.h>
#include <zencore/timer.h>
#include <zencore/trace.h>
+#include <zencore/workthreadpool.h>
#include <zenstore/scrubcontext.h>
#include <zenutil/cache/cache.h>
@@ -223,7 +224,12 @@ ZenCacheStore::ZenCacheStore(GcManager& Gc, const Configuration& Configuration)
: m_Log(logging::Get("z$"))
, m_Gc(Gc)
, m_Configuration(Configuration)
+, m_PendingAsyncLogging(1)
{
+ if (m_Configuration.EnableAccessLog || m_Configuration.EnableWriteLog)
+ {
+ m_AsyncLogging = std::make_unique<WorkerThreadPool>(1, "cache_async_log");
+ }
CreateDirectories(m_Configuration.BasePath);
ZEN_INFO("Initializing at '{}'", m_Configuration.BasePath);
@@ -263,6 +269,9 @@ ZenCacheStore::ZenCacheStore(GcManager& Gc, const Configuration& Configuration)
ZenCacheStore::~ZenCacheStore()
{
+ m_PendingAsyncLogging.CountDown();
+ m_PendingAsyncLogging.Wait();
+ m_AsyncLogging.reset();
m_Namespaces.clear();
}
@@ -283,18 +292,23 @@ ZenCacheStore::Get(const CacheRequestContext& Context,
{
if (OutValue.Value.GetContentType() == ZenContentType::kCbObject)
{
- const IoHash ObjectHash = IoHash::HashBuffer(OutValue.Value.GetView());
- const size_t ObjectSize = OutValue.Value.GetSize();
-
- ZEN_LOG_INFO(LogCacheActivity,
- "GET HIT [{}] {}/{}/{} -> {} {} {}",
- Context,
- Namespace,
- Bucket,
- HashKey,
- ObjectHash,
- ObjectSize,
- ToString(OutValue.Value.GetContentType()))
+ m_PendingAsyncLogging.AddCount(1);
+ m_AsyncLogging->ScheduleWork(
+ [this, OutValue, Context, Namespace = std::string(Namespace), Bucket = std::string(Bucket), HashKey]() {
+ auto _ = MakeGuard([this]() { m_PendingAsyncLogging.CountDown(); });
+ const IoHash ObjectHash = IoHash::HashBuffer(OutValue.Value.GetView());
+ const size_t ObjectSize = OutValue.Value.GetSize();
+
+ ZEN_LOG_INFO(LogCacheActivity,
+ "GET HIT [{}] {}/{}/{} -> {} {} {}",
+ Context,
+ Namespace,
+ Bucket,
+ HashKey,
+ ObjectHash,
+ ObjectSize,
+ ToString(OutValue.Value.GetContentType()))
+ });
}
else
{
@@ -337,18 +351,23 @@ ZenCacheStore::Put(const CacheRequestContext& Context,
{
if (Value.Value.GetContentType() == ZenContentType::kCbObject)
{
- const IoHash ObjectHash = IoHash::HashBuffer(Value.Value.GetView());
- const size_t ObjectSize = Value.Value.GetSize();
+ m_PendingAsyncLogging.AddCount(1);
+ m_AsyncLogging->ScheduleWork(
+ [this, Value, Context, Namespace = std::string(Namespace), Bucket = std::string(Bucket), HashKey]() {
+ auto _ = MakeGuard([this]() { m_PendingAsyncLogging.CountDown(); });
+ const IoHash ObjectHash = IoHash::HashBuffer(Value.Value.GetView());
+ const size_t ObjectSize = Value.Value.GetSize();
- ZEN_LOG_INFO(LogCacheActivity,
- "PUT [{}] {}/{}/{} -> {} {} {}",
- Context,
- Namespace,
- Bucket,
- HashKey,
- ObjectHash,
- ObjectSize,
- ToString(Value.Value.GetContentType()));
+ ZEN_LOG_INFO(LogCacheActivity,
+ "PUT [{}] {}/{}/{} -> {} {} {}",
+ Context,
+ Namespace,
+ Bucket,
+ HashKey,
+ ObjectHash,
+ ObjectSize,
+ ToString(Value.Value.GetContentType()));
+ });
}
else
{
diff --git a/src/zenserver/cache/structuredcachestore.h b/src/zenserver/cache/structuredcachestore.h
index 7ce195ef5..040b71c95 100644
--- a/src/zenserver/cache/structuredcachestore.h
+++ b/src/zenserver/cache/structuredcachestore.h
@@ -36,6 +36,8 @@ namespace zen {
******************************************************************************/
+class WorkerThreadPool;
+
/* Z$ namespace
A namespace scopes a set of buckets, and would typically be used to isolate
@@ -171,6 +173,9 @@ private:
GcManager& m_Gc;
Configuration m_Configuration;
+
+ std::unique_ptr<WorkerThreadPool> m_AsyncLogging;
+ Latch m_PendingAsyncLogging;
};
void z$_forcelink();
diff --git a/src/zenserver/upstream/upstreamcache.cpp b/src/zenserver/upstream/upstreamcache.cpp
index 01ba626bd..15f68f70c 100644
--- a/src/zenserver/upstream/upstreamcache.cpp
+++ b/src/zenserver/upstream/upstreamcache.cpp
@@ -1503,6 +1503,12 @@ public:
m_RunState.IsRunning = true;
}
+ virtual bool IsActive() override
+ {
+ std::shared_lock<std::shared_mutex> _(m_EndpointsMutex);
+ return !m_Endpoints.empty();
+ }
+
virtual void RegisterEndpoint(std::unique_ptr<UpstreamEndpoint> Endpoint) override
{
ZEN_TRACE_CPU("Upstream::RegisterEndpoint");
diff --git a/src/zenserver/upstream/upstreamcache.h b/src/zenserver/upstream/upstreamcache.h
index 8f1395509..291e7e95e 100644
--- a/src/zenserver/upstream/upstreamcache.h
+++ b/src/zenserver/upstream/upstreamcache.h
@@ -224,6 +224,8 @@ public:
virtual void Initialize() = 0;
+ virtual bool IsActive() = 0;
+
virtual void RegisterEndpoint(std::unique_ptr<UpstreamEndpoint> Endpoint) = 0;
virtual void IterateEndpoints(std::function<bool(UpstreamEndpoint&)>&& Fn) = 0;