diff options
| author | Dan Engelbrecht <[email protected]> | 2023-08-17 09:30:40 +0200 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-08-17 09:30:40 +0200 |
| commit | 6b06cffdb84fdb9b010e6aefc642db763b6386d5 (patch) | |
| tree | 193ba716a459fa838f71cf5af19dd6483614066a /src | |
| parent | changelog (diff) | |
| download | zen-6b06cffdb84fdb9b010e6aefc642db763b6386d5.tar.xz zen-6b06cffdb84fdb9b010e6aefc642db763b6386d5.zip | |
skip upstream logic early if we have no upstream endpoints (#359)
* Skip upstream logic early if we have not upstream endpoints
* make cache store logging of CbObjects async
* changelog
Diffstat (limited to 'src')
| -rw-r--r-- | src/zenserver/cache/httpstructuredcache.cpp | 44 | ||||
| -rw-r--r-- | src/zenserver/cache/structuredcachestore.cpp | 65 | ||||
| -rw-r--r-- | src/zenserver/cache/structuredcachestore.h | 5 | ||||
| -rw-r--r-- | src/zenserver/upstream/upstreamcache.cpp | 6 | ||||
| -rw-r--r-- | src/zenserver/upstream/upstreamcache.h | 2 |
5 files changed, 85 insertions, 37 deletions
diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp index 7c4097d4f..0120f3599 100644 --- a/src/zenserver/cache/httpstructuredcache.cpp +++ b/src/zenserver/cache/httpstructuredcache.cpp @@ -888,6 +888,8 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con return Request.WriteResponse(HttpResponseCode::OK); } + const bool HasUpstream = m_UpstreamCache.IsActive(); + CacheRequestContext RequestContext = {.SessionId = Request.SessionId(), .RequestId = Request.RequestId()}; Stopwatch Timer; @@ -973,7 +975,7 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con return Request.WriteResponse(HttpResponseCode::OK, ClientResultValue.Value.GetContentType(), ClientResultValue.Value); } } - else if (!EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryRemote)) + else if (!HasUpstream || !EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryRemote)) { ZEN_DEBUG("GETCACHERECORD MISS - '{}/{}/{}' '{}' in {}", Ref.Namespace, @@ -1212,6 +1214,8 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con CacheRequestContext RequestContext = {.SessionId = Request.SessionId(), .RequestId = Request.RequestId()}; + const bool HasUpstream = m_UpstreamCache.IsActive(); + Stopwatch Timer; if (ContentType == HttpContentType::kBinary || ContentType == HttpContentType::kCompressedBinary) @@ -1237,7 +1241,7 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con Ref.HashKey, {.Value = Body, .RawSize = RawSize, .RawHash = RawHash}); - if (EnumHasAllFlags(PolicyFromUrl, CachePolicy::StoreRemote)) + if (HasUpstream && EnumHasAllFlags(PolicyFromUrl, CachePolicy::StoreRemote)) { m_UpstreamCache.EnqueueUpstream({.Type = ContentType, .Namespace = Ref.Namespace, .Key = {Ref.BucketSegment, Ref.HashKey}}); } @@ -1294,7 +1298,7 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con const bool IsPartialRecord = TotalCount != static_cast<int32_t>(ValidAttachments.size()); CachePolicy Policy = PolicyFromUrl; - if (EnumHasAllFlags(Policy, CachePolicy::StoreRemote) && !IsPartialRecord) + if (HasUpstream && EnumHasAllFlags(Policy, CachePolicy::StoreRemote) && !IsPartialRecord) { m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbObject, .Namespace = Ref.Namespace, @@ -1390,7 +1394,7 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con const bool IsPartialRecord = Count.Valid != Count.Total; - if (EnumHasAllFlags(Policy, CachePolicy::StoreRemote) && !IsPartialRecord) + if (HasUpstream && EnumHasAllFlags(Policy, CachePolicy::StoreRemote) && !IsPartialRecord) { m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbPackage, .Namespace = Ref.Namespace, @@ -1431,8 +1435,10 @@ HttpStructuredCacheService::HandleGetCacheChunk(HttpServerRequest& Request, cons IoBuffer Value = m_CidStore.FindChunkByCid(Ref.ValueContentId); const UpstreamEndpointInfo* Source = nullptr; CachePolicy Policy = PolicyFromUrl; + + const bool HasUpstream = m_UpstreamCache.IsActive(); { - const bool QueryUpstream = !Value && EnumHasAllFlags(Policy, CachePolicy::QueryRemote); + const bool QueryUpstream = HasUpstream && !Value && EnumHasAllFlags(Policy, CachePolicy::QueryRemote); if (QueryUpstream) { @@ -1835,6 +1841,8 @@ HttpStructuredCacheService::PutCacheRecord(PutRequestData& Request, const CbPack ValidAttachments.reserve(NumAttachments); AttachmentsToStoreLocally.reserve(NumAttachments); + const bool HasUpstream = m_UpstreamCache.IsActive(); + Stopwatch Timer; Request.RecordObject.IterateAttachments( @@ -1901,7 +1909,7 @@ HttpStructuredCacheService::PutCacheRecord(PutRequestData& Request, const CbPack const bool IsPartialRecord = Count.Valid != Count.Total; - if (EnumHasAllFlags(Request.Policy.GetRecordPolicy(), CachePolicy::StoreRemote) && !IsPartialRecord) + if (HasUpstream && EnumHasAllFlags(Request.Policy.GetRecordPolicy(), CachePolicy::StoreRemote) && !IsPartialRecord) { m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbPackage, .Namespace = Request.Namespace, @@ -1948,6 +1956,9 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(const CacheRequestContext& { return CbPackage{}; } + + const bool HasUpstream = m_UpstreamCache.IsActive(); + std::vector<RecordRequestData> Requests; std::vector<size_t> UpstreamIndexes; CbArrayView RequestsArray = Params["Requests"sv].AsArrayView(); @@ -2062,8 +2073,8 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(const CacheRequestContext& } if (!Request.Complete) { - bool NeedUpstreamRecord = - !Request.RecordObject && !FoundLocalInvalid && EnumHasAllFlags(Policy.GetRecordPolicy(), CachePolicy::QueryRemote); + bool NeedUpstreamRecord = HasUpstream && !Request.RecordObject && !FoundLocalInvalid && + EnumHasAllFlags(Policy.GetRecordPolicy(), CachePolicy::QueryRemote); if (NeedUpstreamRecord || NeedUpstreamAttachment) { UpstreamIndexes.push_back(Requests.size() - 1); @@ -2265,6 +2276,8 @@ HttpStructuredCacheService::HandleRpcPutCacheValues(const CacheRequestContext& C { return CbPackage{}; } + const bool HasUpstream = m_UpstreamCache.IsActive(); + std::vector<bool> Results; for (CbFieldView RequestField : Params["Requests"sv]) { @@ -2329,7 +2342,7 @@ HttpStructuredCacheService::HandleRpcPutCacheValues(const CacheRequestContext& C // We do not search the Upstream. No data in a put means the caller is probing for whether they need to do a heavy put. // If it doesn't exist locally they should do the heavy put rather than having us fetch it from upstream. - if (Succeeded && EnumHasAllFlags(Policy, CachePolicy::StoreRemote)) + if (HasUpstream && Succeeded && EnumHasAllFlags(Policy, CachePolicy::StoreRemote)) { m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCompressedBinary, .Namespace = *Namespace, .Key = Key}); } @@ -2387,6 +2400,8 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(const CacheRequestContext& C std::vector<size_t> RemoteRequestIndexes; + const bool HasUpstream = m_UpstreamCache.IsActive(); + for (CbFieldView RequestField : Params["Requests"sv]) { Stopwatch Timer; @@ -2428,7 +2443,7 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(const CacheRequestContext& C NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); m_CacheStats.HitCount++; } - else if (EnumHasAllFlags(Policy, CachePolicy::QueryRemote)) + else if (HasUpstream && EnumHasAllFlags(Policy, CachePolicy::QueryRemote)) { RemoteRequestIndexes.push_back(Requests.size() - 1); } @@ -2748,6 +2763,7 @@ HttpStructuredCacheService::GetLocalCacheRecords(const CacheRequestContext& std::vector<CacheChunkRequest*>& OutUpstreamChunks) { using namespace cache::detail; + const bool HasUpstream = m_UpstreamCache.IsActive(); std::vector<CacheKeyRequest*> UpstreamRecordRequests; for (size_t RecordIndex = 0; RecordIndex < Records.size(); ++RecordIndex) @@ -2768,7 +2784,7 @@ HttpStructuredCacheService::GetLocalCacheRecords(const CacheRequestContext& Record.CacheValue = std::move(CacheValue.Value); } } - if (!Record.Exists && EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::QueryRemote)) + if (HasUpstream && !Record.Exists && EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::QueryRemote)) { RecordKey.Policy = CacheRecordPolicy(ConvertToUpstream(Record.DownstreamPolicy)); UpstreamRecordRequests.push_back(&RecordKey); @@ -2806,7 +2822,6 @@ HttpStructuredCacheService::GetLocalCacheRecords(const CacheRequestContext& m_UpstreamCache.GetCacheRecords(Namespace, UpstreamRecordRequests, std::move(OnCacheRecordGetComplete)); } - std::vector<CacheChunkRequest*> UpstreamPayloadRequests; for (ChunkRequest* Request : RecordRequests) { Stopwatch Timer; @@ -2883,7 +2898,7 @@ HttpStructuredCacheService::GetLocalCacheRecords(const CacheRequestContext& } } } - if (!Request->Exists && EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryRemote)) + if (HasUpstream && !Request->Exists && EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryRemote)) { Request->Key->Policy = ConvertToUpstream(Request->DownstreamPolicy); OutUpstreamChunks.push_back(Request->Key); @@ -2900,6 +2915,7 @@ HttpStructuredCacheService::GetLocalCacheValues(const CacheRequestContext& std::vector<CacheChunkRequest*>& OutUpstreamChunks) { using namespace cache::detail; + const bool HasUpstream = m_UpstreamCache.IsActive(); for (ChunkRequest* Request : ValueRequests) { @@ -2922,7 +2938,7 @@ HttpStructuredCacheService::GetLocalCacheValues(const CacheRequestContext& } } } - if (!Request->Exists && EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryRemote)) + if (HasUpstream && !Request->Exists && EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryRemote)) { if (EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::StoreLocal)) { diff --git a/src/zenserver/cache/structuredcachestore.cpp b/src/zenserver/cache/structuredcachestore.cpp index 1847c724d..f80670a17 100644 --- a/src/zenserver/cache/structuredcachestore.cpp +++ b/src/zenserver/cache/structuredcachestore.cpp @@ -15,6 +15,7 @@ #include <zencore/thread.h> #include <zencore/timer.h> #include <zencore/trace.h> +#include <zencore/workthreadpool.h> #include <zenstore/scrubcontext.h> #include <zenutil/cache/cache.h> @@ -223,7 +224,12 @@ ZenCacheStore::ZenCacheStore(GcManager& Gc, const Configuration& Configuration) : m_Log(logging::Get("z$")) , m_Gc(Gc) , m_Configuration(Configuration) +, m_PendingAsyncLogging(1) { + if (m_Configuration.EnableAccessLog || m_Configuration.EnableWriteLog) + { + m_AsyncLogging = std::make_unique<WorkerThreadPool>(1, "cache_async_log"); + } CreateDirectories(m_Configuration.BasePath); ZEN_INFO("Initializing at '{}'", m_Configuration.BasePath); @@ -263,6 +269,9 @@ ZenCacheStore::ZenCacheStore(GcManager& Gc, const Configuration& Configuration) ZenCacheStore::~ZenCacheStore() { + m_PendingAsyncLogging.CountDown(); + m_PendingAsyncLogging.Wait(); + m_AsyncLogging.reset(); m_Namespaces.clear(); } @@ -283,18 +292,23 @@ ZenCacheStore::Get(const CacheRequestContext& Context, { if (OutValue.Value.GetContentType() == ZenContentType::kCbObject) { - const IoHash ObjectHash = IoHash::HashBuffer(OutValue.Value.GetView()); - const size_t ObjectSize = OutValue.Value.GetSize(); - - ZEN_LOG_INFO(LogCacheActivity, - "GET HIT [{}] {}/{}/{} -> {} {} {}", - Context, - Namespace, - Bucket, - HashKey, - ObjectHash, - ObjectSize, - ToString(OutValue.Value.GetContentType())) + m_PendingAsyncLogging.AddCount(1); + m_AsyncLogging->ScheduleWork( + [this, OutValue, Context, Namespace = std::string(Namespace), Bucket = std::string(Bucket), HashKey]() { + auto _ = MakeGuard([this]() { m_PendingAsyncLogging.CountDown(); }); + const IoHash ObjectHash = IoHash::HashBuffer(OutValue.Value.GetView()); + const size_t ObjectSize = OutValue.Value.GetSize(); + + ZEN_LOG_INFO(LogCacheActivity, + "GET HIT [{}] {}/{}/{} -> {} {} {}", + Context, + Namespace, + Bucket, + HashKey, + ObjectHash, + ObjectSize, + ToString(OutValue.Value.GetContentType())) + }); } else { @@ -337,18 +351,23 @@ ZenCacheStore::Put(const CacheRequestContext& Context, { if (Value.Value.GetContentType() == ZenContentType::kCbObject) { - const IoHash ObjectHash = IoHash::HashBuffer(Value.Value.GetView()); - const size_t ObjectSize = Value.Value.GetSize(); + m_PendingAsyncLogging.AddCount(1); + m_AsyncLogging->ScheduleWork( + [this, Value, Context, Namespace = std::string(Namespace), Bucket = std::string(Bucket), HashKey]() { + auto _ = MakeGuard([this]() { m_PendingAsyncLogging.CountDown(); }); + const IoHash ObjectHash = IoHash::HashBuffer(Value.Value.GetView()); + const size_t ObjectSize = Value.Value.GetSize(); - ZEN_LOG_INFO(LogCacheActivity, - "PUT [{}] {}/{}/{} -> {} {} {}", - Context, - Namespace, - Bucket, - HashKey, - ObjectHash, - ObjectSize, - ToString(Value.Value.GetContentType())); + ZEN_LOG_INFO(LogCacheActivity, + "PUT [{}] {}/{}/{} -> {} {} {}", + Context, + Namespace, + Bucket, + HashKey, + ObjectHash, + ObjectSize, + ToString(Value.Value.GetContentType())); + }); } else { diff --git a/src/zenserver/cache/structuredcachestore.h b/src/zenserver/cache/structuredcachestore.h index 7ce195ef5..040b71c95 100644 --- a/src/zenserver/cache/structuredcachestore.h +++ b/src/zenserver/cache/structuredcachestore.h @@ -36,6 +36,8 @@ namespace zen { ******************************************************************************/ +class WorkerThreadPool; + /* Z$ namespace A namespace scopes a set of buckets, and would typically be used to isolate @@ -171,6 +173,9 @@ private: GcManager& m_Gc; Configuration m_Configuration; + + std::unique_ptr<WorkerThreadPool> m_AsyncLogging; + Latch m_PendingAsyncLogging; }; void z$_forcelink(); diff --git a/src/zenserver/upstream/upstreamcache.cpp b/src/zenserver/upstream/upstreamcache.cpp index 01ba626bd..15f68f70c 100644 --- a/src/zenserver/upstream/upstreamcache.cpp +++ b/src/zenserver/upstream/upstreamcache.cpp @@ -1503,6 +1503,12 @@ public: m_RunState.IsRunning = true; } + virtual bool IsActive() override + { + std::shared_lock<std::shared_mutex> _(m_EndpointsMutex); + return !m_Endpoints.empty(); + } + virtual void RegisterEndpoint(std::unique_ptr<UpstreamEndpoint> Endpoint) override { ZEN_TRACE_CPU("Upstream::RegisterEndpoint"); diff --git a/src/zenserver/upstream/upstreamcache.h b/src/zenserver/upstream/upstreamcache.h index 8f1395509..291e7e95e 100644 --- a/src/zenserver/upstream/upstreamcache.h +++ b/src/zenserver/upstream/upstreamcache.h @@ -224,6 +224,8 @@ public: virtual void Initialize() = 0; + virtual bool IsActive() = 0; + virtual void RegisterEndpoint(std::unique_ptr<UpstreamEndpoint> Endpoint) = 0; virtual void IterateEndpoints(std::function<bool(UpstreamEndpoint&)>&& Fn) = 0; |