aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/cache/structuredcachestore.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenserver/cache/structuredcachestore.cpp')
-rw-r--r--src/zenserver/cache/structuredcachestore.cpp155
1 files changed, 83 insertions, 72 deletions
diff --git a/src/zenserver/cache/structuredcachestore.cpp b/src/zenserver/cache/structuredcachestore.cpp
index f80670a17..001659672 100644
--- a/src/zenserver/cache/structuredcachestore.cpp
+++ b/src/zenserver/cache/structuredcachestore.cpp
@@ -224,11 +224,11 @@ ZenCacheStore::ZenCacheStore(GcManager& Gc, const Configuration& Configuration)
: m_Log(logging::Get("z$"))
, m_Gc(Gc)
, m_Configuration(Configuration)
-, m_PendingAsyncLogging(1)
+, m_ExitLogging(false)
{
if (m_Configuration.EnableAccessLog || m_Configuration.EnableWriteLog)
{
- m_AsyncLogging = std::make_unique<WorkerThreadPool>(1, "cache_async_log");
+ m_AsyncLoggingThread = std::thread(&ZenCacheStore::LogWorker, this);
}
CreateDirectories(m_Configuration.BasePath);
@@ -269,12 +269,62 @@ ZenCacheStore::ZenCacheStore(GcManager& Gc, const Configuration& Configuration)
ZenCacheStore::~ZenCacheStore()
{
- m_PendingAsyncLogging.CountDown();
- m_PendingAsyncLogging.Wait();
- m_AsyncLogging.reset();
+ m_ExitLogging.store(true);
+ m_LogEvent.Set();
+ if (m_AsyncLoggingThread.joinable())
+ {
+ m_AsyncLoggingThread.join();
+ }
m_Namespaces.clear();
}
+void
+ZenCacheStore::LogWorker()
+{
+ SetCurrentThreadName("ZenCacheStore::LogWorker");
+
+ std::vector<AccessLogItem> Items;
+ while (true)
+ {
+ m_LogQueueLock.WithExclusiveLock([this, &Items]() { Items.swap(m_LogQueue); });
+ for (const auto& Item : Items)
+ {
+ if (Item.Value.Value)
+ {
+ const bool IsCbObject = Item.Value.Value.GetContentType() == ZenContentType::kCbObject;
+
+ const IoHash ObjectHash = IsCbObject ? IoHash::HashBuffer(Item.Value.Value.GetView()) : Item.Value.RawHash;
+ const size_t ObjectSize = IsCbObject ? Item.Value.Value.GetSize() : Item.Value.RawSize;
+
+ ZEN_LOG_INFO(LogCacheActivity,
+ "{} [{}] {}/{}/{} -> {} {} {}",
+ Item.Op,
+ Item.Context,
+ Item.Namespace,
+ Item.Bucket,
+ Item.HashKey,
+ ObjectHash,
+ ObjectSize,
+ ToString(Item.Value.Value.GetContentType()))
+ }
+ else
+ {
+ ZEN_LOG_INFO(LogCacheActivity, "{} [{}] {}/{}/{}", Item.Op, Item.Context, Item.Namespace, Item.Bucket, Item.HashKey);
+ }
+ }
+ if (!Items.empty())
+ {
+ Items.resize(0);
+ continue;
+ }
+ if (m_ExitLogging)
+ {
+ break;
+ }
+ m_LogEvent.Wait();
+ }
+}
+
bool
ZenCacheStore::Get(const CacheRequestContext& Context,
std::string_view Namespace,
@@ -288,44 +338,21 @@ ZenCacheStore::Get(const CacheRequestContext& Context,
if (m_Configuration.EnableAccessLog)
{
- if (Result)
- {
- if (OutValue.Value.GetContentType() == ZenContentType::kCbObject)
- {
- m_PendingAsyncLogging.AddCount(1);
- m_AsyncLogging->ScheduleWork(
- [this, OutValue, Context, Namespace = std::string(Namespace), Bucket = std::string(Bucket), HashKey]() {
- auto _ = MakeGuard([this]() { m_PendingAsyncLogging.CountDown(); });
- const IoHash ObjectHash = IoHash::HashBuffer(OutValue.Value.GetView());
- const size_t ObjectSize = OutValue.Value.GetSize();
-
- ZEN_LOG_INFO(LogCacheActivity,
- "GET HIT [{}] {}/{}/{} -> {} {} {}",
- Context,
- Namespace,
- Bucket,
- HashKey,
- ObjectHash,
- ObjectSize,
- ToString(OutValue.Value.GetContentType()))
- });
- }
- else
- {
- ZEN_LOG_INFO(LogCacheActivity,
- "GET HIT [{}] {}/{}/{} -> {} {} {}",
- Context,
- Namespace,
- Bucket,
- HashKey,
- OutValue.RawHash,
- OutValue.RawSize,
- ToString(OutValue.Value.GetContentType()));
- }
- }
- else
+ ZEN_TRACE_CPU("Z$::Get::AccessLog");
+ bool Signal = false;
+ m_LogQueueLock.WithExclusiveLock([&]() {
+ Signal = m_LogQueue.empty();
+ m_LogQueue.emplace_back(AccessLogItem{.Op = Result ? "GET HIT " : "GET MISS",
+ .Context = Context,
+ .Namespace = std::string(Namespace),
+ .Bucket = std::string(Bucket),
+ .HashKey = HashKey,
+ .Value = OutValue /*,
+ .Result = Result*/});
+ });
+ if (Signal)
{
- ZEN_LOG_INFO(LogCacheActivity, "GET MISS [{}] {}/{}/{}", Context, Namespace, Bucket, HashKey);
+ m_LogEvent.Set();
}
}
@@ -349,37 +376,21 @@ ZenCacheStore::Put(const CacheRequestContext& Context,
{
if (m_Configuration.EnableWriteLog)
{
- if (Value.Value.GetContentType() == ZenContentType::kCbObject)
- {
- m_PendingAsyncLogging.AddCount(1);
- m_AsyncLogging->ScheduleWork(
- [this, Value, Context, Namespace = std::string(Namespace), Bucket = std::string(Bucket), HashKey]() {
- auto _ = MakeGuard([this]() { m_PendingAsyncLogging.CountDown(); });
- const IoHash ObjectHash = IoHash::HashBuffer(Value.Value.GetView());
- const size_t ObjectSize = Value.Value.GetSize();
-
- ZEN_LOG_INFO(LogCacheActivity,
- "PUT [{}] {}/{}/{} -> {} {} {}",
- Context,
- Namespace,
- Bucket,
- HashKey,
- ObjectHash,
- ObjectSize,
- ToString(Value.Value.GetContentType()));
- });
- }
- else
+ ZEN_TRACE_CPU("Z$::Get::WriteLog");
+ bool Signal = false;
+ m_LogQueueLock.WithExclusiveLock([&]() {
+ Signal = m_LogQueue.empty();
+ m_LogQueue.emplace_back(AccessLogItem{.Op = "PUT ",
+ .Context = Context,
+ .Namespace = std::string(Namespace),
+ .Bucket = std::string(Bucket),
+ .HashKey = HashKey,
+ .Value = Value /*,
+ .Result = true*/});
+ });
+ if (Signal)
{
- ZEN_LOG_INFO(LogCacheActivity,
- "PUT [{}] {}/{}/{} -> {} {} {}",
- Context,
- Namespace,
- Bucket,
- HashKey,
- Value.RawHash,
- Value.RawSize,
- ToString(Value.Value.GetContentType()));
+ m_LogEvent.Set();
}
}