diff options
| author | Dan Engelbrecht <[email protected]> | 2023-08-17 17:16:18 +0200 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-08-17 17:16:18 +0200 |
| commit | d2887ce78b4fbbdb7e5e320877dfcf12c7524e69 (patch) | |
| tree | dc5c7f0cc078219df49cb442f4842ea741130da3 /src/zenserver/cache | |
| parent | skip upstream logic early if we have no upstream endpoints (#359) (diff) | |
| download | zen-d2887ce78b4fbbdb7e5e320877dfcf12c7524e69.tar.xz zen-d2887ce78b4fbbdb7e5e320877dfcf12c7524e69.zip | |
single thread async cache log (#361)
* rework cache store background jogging
* correct capture for context
Diffstat (limited to 'src/zenserver/cache')
| -rw-r--r-- | src/zenserver/cache/httpstructuredcache.cpp | 100 | ||||
| -rw-r--r-- | src/zenserver/cache/structuredcachestore.cpp | 155 | ||||
| -rw-r--r-- | src/zenserver/cache/structuredcachestore.h | 18 |
3 files changed, 149 insertions, 124 deletions
diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp index 0120f3599..51d3d94d8 100644 --- a/src/zenserver/cache/httpstructuredcache.cpp +++ b/src/zenserver/cache/httpstructuredcache.cpp @@ -1703,63 +1703,63 @@ HttpStructuredCacheService::HandleRpcRequest(HttpServerRequest& Request) return Request.WriteResponse(HttpResponseCode::BadRequest); } - Request.WriteResponseAsync([this, &RequestContext, Body = Request.ReadPayload(), ContentType, AcceptType]( - HttpServerRequest& AsyncRequest) mutable { - std::uint64_t RequestIndex = - m_RequestRecorder ? m_RequestRecorder->RecordRequest(ContentType, AcceptType, Body) : ~0ull; - uint32_t AcceptMagic = 0; - RpcAcceptOptions AcceptFlags = RpcAcceptOptions::kNone; - int TargetProcessId = 0; - CbPackage RpcResult; - - HttpResponseCode ResultCode = HandleRpcRequest(RequestContext, - ContentType, - std::move(Body), - AcceptMagic, - AcceptFlags, - TargetProcessId, - RpcResult); - if (!IsHttpSuccessCode(ResultCode)) - { - AsyncRequest.WriteResponse(ResultCode); - return; - } - if (AcceptMagic == kCbPkgMagic) - { - FormatFlags Flags = FormatFlags::kDefault; - if (EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences)) + Request.WriteResponseAsync( + [this, RequestContext, Body = Request.ReadPayload(), ContentType, AcceptType](HttpServerRequest& AsyncRequest) mutable { + std::uint64_t RequestIndex = + m_RequestRecorder ? m_RequestRecorder->RecordRequest(ContentType, AcceptType, Body) : ~0ull; + uint32_t AcceptMagic = 0; + RpcAcceptOptions AcceptFlags = RpcAcceptOptions::kNone; + int TargetProcessId = 0; + CbPackage RpcResult; + + HttpResponseCode ResultCode = HandleRpcRequest(RequestContext, + ContentType, + std::move(Body), + AcceptMagic, + AcceptFlags, + TargetProcessId, + RpcResult); + if (!IsHttpSuccessCode(ResultCode)) { - Flags |= FormatFlags::kAllowLocalReferences; - if (!EnumHasAnyFlags(AcceptFlags, RpcAcceptOptions::kAllowPartialLocalReferences)) + AsyncRequest.WriteResponse(ResultCode); + return; + } + if (AcceptMagic == kCbPkgMagic) + { + FormatFlags Flags = FormatFlags::kDefault; + if (EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences)) { - Flags |= FormatFlags::kDenyPartialLocalReferences; + Flags |= FormatFlags::kAllowLocalReferences; + if (!EnumHasAnyFlags(AcceptFlags, RpcAcceptOptions::kAllowPartialLocalReferences)) + { + Flags |= FormatFlags::kDenyPartialLocalReferences; + } } + CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResult, Flags, TargetProcessId); + if (RequestIndex != ~0ull) + { + ZEN_ASSERT(m_RequestRecorder); + m_RequestRecorder->RecordResponse(RequestIndex, HttpContentType::kCbPackage, RpcResponseBuffer); + } + AsyncRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer); } - CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResult, Flags, TargetProcessId); - if (RequestIndex != ~0ull) + else { - ZEN_ASSERT(m_RequestRecorder); - m_RequestRecorder->RecordResponse(RequestIndex, HttpContentType::kCbPackage, RpcResponseBuffer); - } - AsyncRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer); - } - else - { - BinaryWriter MemStream; - RpcResult.Save(MemStream); + BinaryWriter MemStream; + RpcResult.Save(MemStream); - if (RequestIndex != ~0ull) - { - ZEN_ASSERT(m_RequestRecorder); - m_RequestRecorder->RecordResponse(RequestIndex, - HttpContentType::kCbPackage, - IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); + if (RequestIndex != ~0ull) + { + ZEN_ASSERT(m_RequestRecorder); + m_RequestRecorder->RecordResponse(RequestIndex, + HttpContentType::kCbPackage, + IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); + } + AsyncRequest.WriteResponse(HttpResponseCode::OK, + HttpContentType::kCbPackage, + IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); } - AsyncRequest.WriteResponse(HttpResponseCode::OK, - HttpContentType::kCbPackage, - IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); - } - }); + }); } break; default: diff --git a/src/zenserver/cache/structuredcachestore.cpp b/src/zenserver/cache/structuredcachestore.cpp index f80670a17..001659672 100644 --- a/src/zenserver/cache/structuredcachestore.cpp +++ b/src/zenserver/cache/structuredcachestore.cpp @@ -224,11 +224,11 @@ ZenCacheStore::ZenCacheStore(GcManager& Gc, const Configuration& Configuration) : m_Log(logging::Get("z$")) , m_Gc(Gc) , m_Configuration(Configuration) -, m_PendingAsyncLogging(1) +, m_ExitLogging(false) { if (m_Configuration.EnableAccessLog || m_Configuration.EnableWriteLog) { - m_AsyncLogging = std::make_unique<WorkerThreadPool>(1, "cache_async_log"); + m_AsyncLoggingThread = std::thread(&ZenCacheStore::LogWorker, this); } CreateDirectories(m_Configuration.BasePath); @@ -269,12 +269,62 @@ ZenCacheStore::ZenCacheStore(GcManager& Gc, const Configuration& Configuration) ZenCacheStore::~ZenCacheStore() { - m_PendingAsyncLogging.CountDown(); - m_PendingAsyncLogging.Wait(); - m_AsyncLogging.reset(); + m_ExitLogging.store(true); + m_LogEvent.Set(); + if (m_AsyncLoggingThread.joinable()) + { + m_AsyncLoggingThread.join(); + } m_Namespaces.clear(); } +void +ZenCacheStore::LogWorker() +{ + SetCurrentThreadName("ZenCacheStore::LogWorker"); + + std::vector<AccessLogItem> Items; + while (true) + { + m_LogQueueLock.WithExclusiveLock([this, &Items]() { Items.swap(m_LogQueue); }); + for (const auto& Item : Items) + { + if (Item.Value.Value) + { + const bool IsCbObject = Item.Value.Value.GetContentType() == ZenContentType::kCbObject; + + const IoHash ObjectHash = IsCbObject ? IoHash::HashBuffer(Item.Value.Value.GetView()) : Item.Value.RawHash; + const size_t ObjectSize = IsCbObject ? Item.Value.Value.GetSize() : Item.Value.RawSize; + + ZEN_LOG_INFO(LogCacheActivity, + "{} [{}] {}/{}/{} -> {} {} {}", + Item.Op, + Item.Context, + Item.Namespace, + Item.Bucket, + Item.HashKey, + ObjectHash, + ObjectSize, + ToString(Item.Value.Value.GetContentType())) + } + else + { + ZEN_LOG_INFO(LogCacheActivity, "{} [{}] {}/{}/{}", Item.Op, Item.Context, Item.Namespace, Item.Bucket, Item.HashKey); + } + } + if (!Items.empty()) + { + Items.resize(0); + continue; + } + if (m_ExitLogging) + { + break; + } + m_LogEvent.Wait(); + } +} + bool ZenCacheStore::Get(const CacheRequestContext& Context, std::string_view Namespace, @@ -288,44 +338,21 @@ ZenCacheStore::Get(const CacheRequestContext& Context, if (m_Configuration.EnableAccessLog) { - if (Result) - { - if (OutValue.Value.GetContentType() == ZenContentType::kCbObject) - { - m_PendingAsyncLogging.AddCount(1); - m_AsyncLogging->ScheduleWork( - [this, OutValue, Context, Namespace = std::string(Namespace), Bucket = std::string(Bucket), HashKey]() { - auto _ = MakeGuard([this]() { m_PendingAsyncLogging.CountDown(); }); - const IoHash ObjectHash = IoHash::HashBuffer(OutValue.Value.GetView()); - const size_t ObjectSize = OutValue.Value.GetSize(); - - ZEN_LOG_INFO(LogCacheActivity, - "GET HIT [{}] {}/{}/{} -> {} {} {}", - Context, - Namespace, - Bucket, - HashKey, - ObjectHash, - ObjectSize, - ToString(OutValue.Value.GetContentType())) - }); - } - else - { - ZEN_LOG_INFO(LogCacheActivity, - "GET HIT [{}] {}/{}/{} -> {} {} {}", - Context, - Namespace, - Bucket, - HashKey, - OutValue.RawHash, - OutValue.RawSize, - ToString(OutValue.Value.GetContentType())); - } - } - else + ZEN_TRACE_CPU("Z$::Get::AccessLog"); + bool Signal = false; + m_LogQueueLock.WithExclusiveLock([&]() { + Signal = m_LogQueue.empty(); + m_LogQueue.emplace_back(AccessLogItem{.Op = Result ? "GET HIT " : "GET MISS", + .Context = Context, + .Namespace = std::string(Namespace), + .Bucket = std::string(Bucket), + .HashKey = HashKey, + .Value = OutValue /*, + .Result = Result*/}); + }); + if (Signal) { - ZEN_LOG_INFO(LogCacheActivity, "GET MISS [{}] {}/{}/{}", Context, Namespace, Bucket, HashKey); + m_LogEvent.Set(); } } @@ -349,37 +376,21 @@ ZenCacheStore::Put(const CacheRequestContext& Context, { if (m_Configuration.EnableWriteLog) { - if (Value.Value.GetContentType() == ZenContentType::kCbObject) - { - m_PendingAsyncLogging.AddCount(1); - m_AsyncLogging->ScheduleWork( - [this, Value, Context, Namespace = std::string(Namespace), Bucket = std::string(Bucket), HashKey]() { - auto _ = MakeGuard([this]() { m_PendingAsyncLogging.CountDown(); }); - const IoHash ObjectHash = IoHash::HashBuffer(Value.Value.GetView()); - const size_t ObjectSize = Value.Value.GetSize(); - - ZEN_LOG_INFO(LogCacheActivity, - "PUT [{}] {}/{}/{} -> {} {} {}", - Context, - Namespace, - Bucket, - HashKey, - ObjectHash, - ObjectSize, - ToString(Value.Value.GetContentType())); - }); - } - else + ZEN_TRACE_CPU("Z$::Get::WriteLog"); + bool Signal = false; + m_LogQueueLock.WithExclusiveLock([&]() { + Signal = m_LogQueue.empty(); + m_LogQueue.emplace_back(AccessLogItem{.Op = "PUT ", + .Context = Context, + .Namespace = std::string(Namespace), + .Bucket = std::string(Bucket), + .HashKey = HashKey, + .Value = Value /*, + .Result = true*/}); + }); + if (Signal) { - ZEN_LOG_INFO(LogCacheActivity, - "PUT [{}] {}/{}/{} -> {} {} {}", - Context, - Namespace, - Bucket, - HashKey, - Value.RawHash, - Value.RawSize, - ToString(Value.Value.GetContentType())); + m_LogEvent.Set(); } } diff --git a/src/zenserver/cache/structuredcachestore.h b/src/zenserver/cache/structuredcachestore.h index 040b71c95..067cfc0bf 100644 --- a/src/zenserver/cache/structuredcachestore.h +++ b/src/zenserver/cache/structuredcachestore.h @@ -174,8 +174,22 @@ private: GcManager& m_Gc; Configuration m_Configuration; - std::unique_ptr<WorkerThreadPool> m_AsyncLogging; - Latch m_PendingAsyncLogging; + struct AccessLogItem + { + const char* Op; + CacheRequestContext Context; + std::string Namespace; + std::string Bucket; + IoHash HashKey; + ZenCacheValue Value; + }; + + void LogWorker(); + RwLock m_LogQueueLock; + std::vector<AccessLogItem> m_LogQueue; + std::atomic_bool m_ExitLogging; + Event m_LogEvent; + std::thread m_AsyncLoggingThread; }; void z$_forcelink(); |