diff options
Diffstat (limited to 'src/zenserver/cache/structuredcachestore.cpp')
| -rw-r--r-- | src/zenserver/cache/structuredcachestore.cpp | 155 |
1 files changed, 83 insertions, 72 deletions
diff --git a/src/zenserver/cache/structuredcachestore.cpp b/src/zenserver/cache/structuredcachestore.cpp index f80670a17..001659672 100644 --- a/src/zenserver/cache/structuredcachestore.cpp +++ b/src/zenserver/cache/structuredcachestore.cpp @@ -224,11 +224,11 @@ ZenCacheStore::ZenCacheStore(GcManager& Gc, const Configuration& Configuration) : m_Log(logging::Get("z$")) , m_Gc(Gc) , m_Configuration(Configuration) -, m_PendingAsyncLogging(1) +, m_ExitLogging(false) { if (m_Configuration.EnableAccessLog || m_Configuration.EnableWriteLog) { - m_AsyncLogging = std::make_unique<WorkerThreadPool>(1, "cache_async_log"); + m_AsyncLoggingThread = std::thread(&ZenCacheStore::LogWorker, this); } CreateDirectories(m_Configuration.BasePath); @@ -269,12 +269,62 @@ ZenCacheStore::ZenCacheStore(GcManager& Gc, const Configuration& Configuration) ZenCacheStore::~ZenCacheStore() { - m_PendingAsyncLogging.CountDown(); - m_PendingAsyncLogging.Wait(); - m_AsyncLogging.reset(); + m_ExitLogging.store(true); + m_LogEvent.Set(); + if (m_AsyncLoggingThread.joinable()) + { + m_AsyncLoggingThread.join(); + } m_Namespaces.clear(); } +void +ZenCacheStore::LogWorker() +{ + SetCurrentThreadName("ZenCacheStore::LogWorker"); + + std::vector<AccessLogItem> Items; + while (true) + { + m_LogQueueLock.WithExclusiveLock([this, &Items]() { Items.swap(m_LogQueue); }); + for (const auto& Item : Items) + { + if (Item.Value.Value) + { + const bool IsCbObject = Item.Value.Value.GetContentType() == ZenContentType::kCbObject; + + const IoHash ObjectHash = IsCbObject ? IoHash::HashBuffer(Item.Value.Value.GetView()) : Item.Value.RawHash; + const size_t ObjectSize = IsCbObject ? Item.Value.Value.GetSize() : Item.Value.RawSize; + + ZEN_LOG_INFO(LogCacheActivity, + "{} [{}] {}/{}/{} -> {} {} {}", + Item.Op, + Item.Context, + Item.Namespace, + Item.Bucket, + Item.HashKey, + ObjectHash, + ObjectSize, + ToString(Item.Value.Value.GetContentType())) + } + else + { + ZEN_LOG_INFO(LogCacheActivity, "{} [{}] {}/{}/{}", Item.Op, Item.Context, Item.Namespace, Item.Bucket, Item.HashKey); + } + } + if (!Items.empty()) + { + Items.resize(0); + continue; + } + if (m_ExitLogging) + { + break; + } + m_LogEvent.Wait(); + } +} + bool ZenCacheStore::Get(const CacheRequestContext& Context, std::string_view Namespace, @@ -288,44 +338,21 @@ ZenCacheStore::Get(const CacheRequestContext& Context, if (m_Configuration.EnableAccessLog) { - if (Result) - { - if (OutValue.Value.GetContentType() == ZenContentType::kCbObject) - { - m_PendingAsyncLogging.AddCount(1); - m_AsyncLogging->ScheduleWork( - [this, OutValue, Context, Namespace = std::string(Namespace), Bucket = std::string(Bucket), HashKey]() { - auto _ = MakeGuard([this]() { m_PendingAsyncLogging.CountDown(); }); - const IoHash ObjectHash = IoHash::HashBuffer(OutValue.Value.GetView()); - const size_t ObjectSize = OutValue.Value.GetSize(); - - ZEN_LOG_INFO(LogCacheActivity, - "GET HIT [{}] {}/{}/{} -> {} {} {}", - Context, - Namespace, - Bucket, - HashKey, - ObjectHash, - ObjectSize, - ToString(OutValue.Value.GetContentType())) - }); - } - else - { - ZEN_LOG_INFO(LogCacheActivity, - "GET HIT [{}] {}/{}/{} -> {} {} {}", - Context, - Namespace, - Bucket, - HashKey, - OutValue.RawHash, - OutValue.RawSize, - ToString(OutValue.Value.GetContentType())); - } - } - else + ZEN_TRACE_CPU("Z$::Get::AccessLog"); + bool Signal = false; + m_LogQueueLock.WithExclusiveLock([&]() { + Signal = m_LogQueue.empty(); + m_LogQueue.emplace_back(AccessLogItem{.Op = Result ? "GET HIT " : "GET MISS", + .Context = Context, + .Namespace = std::string(Namespace), + .Bucket = std::string(Bucket), + .HashKey = HashKey, + .Value = OutValue /*, + .Result = Result*/}); + }); + if (Signal) { - ZEN_LOG_INFO(LogCacheActivity, "GET MISS [{}] {}/{}/{}", Context, Namespace, Bucket, HashKey); + m_LogEvent.Set(); } } @@ -349,37 +376,21 @@ ZenCacheStore::Put(const CacheRequestContext& Context, { if (m_Configuration.EnableWriteLog) { - if (Value.Value.GetContentType() == ZenContentType::kCbObject) - { - m_PendingAsyncLogging.AddCount(1); - m_AsyncLogging->ScheduleWork( - [this, Value, Context, Namespace = std::string(Namespace), Bucket = std::string(Bucket), HashKey]() { - auto _ = MakeGuard([this]() { m_PendingAsyncLogging.CountDown(); }); - const IoHash ObjectHash = IoHash::HashBuffer(Value.Value.GetView()); - const size_t ObjectSize = Value.Value.GetSize(); - - ZEN_LOG_INFO(LogCacheActivity, - "PUT [{}] {}/{}/{} -> {} {} {}", - Context, - Namespace, - Bucket, - HashKey, - ObjectHash, - ObjectSize, - ToString(Value.Value.GetContentType())); - }); - } - else + ZEN_TRACE_CPU("Z$::Get::WriteLog"); + bool Signal = false; + m_LogQueueLock.WithExclusiveLock([&]() { + Signal = m_LogQueue.empty(); + m_LogQueue.emplace_back(AccessLogItem{.Op = "PUT ", + .Context = Context, + .Namespace = std::string(Namespace), + .Bucket = std::string(Bucket), + .HashKey = HashKey, + .Value = Value /*, + .Result = true*/}); + }); + if (Signal) { - ZEN_LOG_INFO(LogCacheActivity, - "PUT [{}] {}/{}/{} -> {} {} {}", - Context, - Namespace, - Bucket, - HashKey, - Value.RawHash, - Value.RawSize, - ToString(Value.Value.GetContentType())); + m_LogEvent.Set(); } } |