aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/cache
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2023-08-17 17:16:18 +0200
committerGitHub <[email protected]>2023-08-17 17:16:18 +0200
commitd2887ce78b4fbbdb7e5e320877dfcf12c7524e69 (patch)
treedc5c7f0cc078219df49cb442f4842ea741130da3 /src/zenserver/cache
parentskip upstream logic early if we have no upstream endpoints (#359) (diff)
downloadzen-d2887ce78b4fbbdb7e5e320877dfcf12c7524e69.tar.xz
zen-d2887ce78b4fbbdb7e5e320877dfcf12c7524e69.zip
single thread async cache log (#361)
* rework cache store background jogging * correct capture for context
Diffstat (limited to 'src/zenserver/cache')
-rw-r--r--src/zenserver/cache/httpstructuredcache.cpp100
-rw-r--r--src/zenserver/cache/structuredcachestore.cpp155
-rw-r--r--src/zenserver/cache/structuredcachestore.h18
3 files changed, 149 insertions, 124 deletions
diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp
index 0120f3599..51d3d94d8 100644
--- a/src/zenserver/cache/httpstructuredcache.cpp
+++ b/src/zenserver/cache/httpstructuredcache.cpp
@@ -1703,63 +1703,63 @@ HttpStructuredCacheService::HandleRpcRequest(HttpServerRequest& Request)
return Request.WriteResponse(HttpResponseCode::BadRequest);
}
- Request.WriteResponseAsync([this, &RequestContext, Body = Request.ReadPayload(), ContentType, AcceptType](
- HttpServerRequest& AsyncRequest) mutable {
- std::uint64_t RequestIndex =
- m_RequestRecorder ? m_RequestRecorder->RecordRequest(ContentType, AcceptType, Body) : ~0ull;
- uint32_t AcceptMagic = 0;
- RpcAcceptOptions AcceptFlags = RpcAcceptOptions::kNone;
- int TargetProcessId = 0;
- CbPackage RpcResult;
-
- HttpResponseCode ResultCode = HandleRpcRequest(RequestContext,
- ContentType,
- std::move(Body),
- AcceptMagic,
- AcceptFlags,
- TargetProcessId,
- RpcResult);
- if (!IsHttpSuccessCode(ResultCode))
- {
- AsyncRequest.WriteResponse(ResultCode);
- return;
- }
- if (AcceptMagic == kCbPkgMagic)
- {
- FormatFlags Flags = FormatFlags::kDefault;
- if (EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences))
+ Request.WriteResponseAsync(
+ [this, RequestContext, Body = Request.ReadPayload(), ContentType, AcceptType](HttpServerRequest& AsyncRequest) mutable {
+ std::uint64_t RequestIndex =
+ m_RequestRecorder ? m_RequestRecorder->RecordRequest(ContentType, AcceptType, Body) : ~0ull;
+ uint32_t AcceptMagic = 0;
+ RpcAcceptOptions AcceptFlags = RpcAcceptOptions::kNone;
+ int TargetProcessId = 0;
+ CbPackage RpcResult;
+
+ HttpResponseCode ResultCode = HandleRpcRequest(RequestContext,
+ ContentType,
+ std::move(Body),
+ AcceptMagic,
+ AcceptFlags,
+ TargetProcessId,
+ RpcResult);
+ if (!IsHttpSuccessCode(ResultCode))
{
- Flags |= FormatFlags::kAllowLocalReferences;
- if (!EnumHasAnyFlags(AcceptFlags, RpcAcceptOptions::kAllowPartialLocalReferences))
+ AsyncRequest.WriteResponse(ResultCode);
+ return;
+ }
+ if (AcceptMagic == kCbPkgMagic)
+ {
+ FormatFlags Flags = FormatFlags::kDefault;
+ if (EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences))
{
- Flags |= FormatFlags::kDenyPartialLocalReferences;
+ Flags |= FormatFlags::kAllowLocalReferences;
+ if (!EnumHasAnyFlags(AcceptFlags, RpcAcceptOptions::kAllowPartialLocalReferences))
+ {
+ Flags |= FormatFlags::kDenyPartialLocalReferences;
+ }
}
+ CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResult, Flags, TargetProcessId);
+ if (RequestIndex != ~0ull)
+ {
+ ZEN_ASSERT(m_RequestRecorder);
+ m_RequestRecorder->RecordResponse(RequestIndex, HttpContentType::kCbPackage, RpcResponseBuffer);
+ }
+ AsyncRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer);
}
- CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResult, Flags, TargetProcessId);
- if (RequestIndex != ~0ull)
+ else
{
- ZEN_ASSERT(m_RequestRecorder);
- m_RequestRecorder->RecordResponse(RequestIndex, HttpContentType::kCbPackage, RpcResponseBuffer);
- }
- AsyncRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer);
- }
- else
- {
- BinaryWriter MemStream;
- RpcResult.Save(MemStream);
+ BinaryWriter MemStream;
+ RpcResult.Save(MemStream);
- if (RequestIndex != ~0ull)
- {
- ZEN_ASSERT(m_RequestRecorder);
- m_RequestRecorder->RecordResponse(RequestIndex,
- HttpContentType::kCbPackage,
- IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()));
+ if (RequestIndex != ~0ull)
+ {
+ ZEN_ASSERT(m_RequestRecorder);
+ m_RequestRecorder->RecordResponse(RequestIndex,
+ HttpContentType::kCbPackage,
+ IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()));
+ }
+ AsyncRequest.WriteResponse(HttpResponseCode::OK,
+ HttpContentType::kCbPackage,
+ IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()));
}
- AsyncRequest.WriteResponse(HttpResponseCode::OK,
- HttpContentType::kCbPackage,
- IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()));
- }
- });
+ });
}
break;
default:
diff --git a/src/zenserver/cache/structuredcachestore.cpp b/src/zenserver/cache/structuredcachestore.cpp
index f80670a17..001659672 100644
--- a/src/zenserver/cache/structuredcachestore.cpp
+++ b/src/zenserver/cache/structuredcachestore.cpp
@@ -224,11 +224,11 @@ ZenCacheStore::ZenCacheStore(GcManager& Gc, const Configuration& Configuration)
: m_Log(logging::Get("z$"))
, m_Gc(Gc)
, m_Configuration(Configuration)
-, m_PendingAsyncLogging(1)
+, m_ExitLogging(false)
{
if (m_Configuration.EnableAccessLog || m_Configuration.EnableWriteLog)
{
- m_AsyncLogging = std::make_unique<WorkerThreadPool>(1, "cache_async_log");
+ m_AsyncLoggingThread = std::thread(&ZenCacheStore::LogWorker, this);
}
CreateDirectories(m_Configuration.BasePath);
@@ -269,12 +269,62 @@ ZenCacheStore::ZenCacheStore(GcManager& Gc, const Configuration& Configuration)
ZenCacheStore::~ZenCacheStore()
{
- m_PendingAsyncLogging.CountDown();
- m_PendingAsyncLogging.Wait();
- m_AsyncLogging.reset();
+ m_ExitLogging.store(true);
+ m_LogEvent.Set();
+ if (m_AsyncLoggingThread.joinable())
+ {
+ m_AsyncLoggingThread.join();
+ }
m_Namespaces.clear();
}
+void
+ZenCacheStore::LogWorker()
+{
+ SetCurrentThreadName("ZenCacheStore::LogWorker");
+
+ std::vector<AccessLogItem> Items;
+ while (true)
+ {
+ m_LogQueueLock.WithExclusiveLock([this, &Items]() { Items.swap(m_LogQueue); });
+ for (const auto& Item : Items)
+ {
+ if (Item.Value.Value)
+ {
+ const bool IsCbObject = Item.Value.Value.GetContentType() == ZenContentType::kCbObject;
+
+ const IoHash ObjectHash = IsCbObject ? IoHash::HashBuffer(Item.Value.Value.GetView()) : Item.Value.RawHash;
+ const size_t ObjectSize = IsCbObject ? Item.Value.Value.GetSize() : Item.Value.RawSize;
+
+ ZEN_LOG_INFO(LogCacheActivity,
+ "{} [{}] {}/{}/{} -> {} {} {}",
+ Item.Op,
+ Item.Context,
+ Item.Namespace,
+ Item.Bucket,
+ Item.HashKey,
+ ObjectHash,
+ ObjectSize,
+ ToString(Item.Value.Value.GetContentType()))
+ }
+ else
+ {
+ ZEN_LOG_INFO(LogCacheActivity, "{} [{}] {}/{}/{}", Item.Op, Item.Context, Item.Namespace, Item.Bucket, Item.HashKey);
+ }
+ }
+ if (!Items.empty())
+ {
+ Items.resize(0);
+ continue;
+ }
+ if (m_ExitLogging)
+ {
+ break;
+ }
+ m_LogEvent.Wait();
+ }
+}
+
bool
ZenCacheStore::Get(const CacheRequestContext& Context,
std::string_view Namespace,
@@ -288,44 +338,21 @@ ZenCacheStore::Get(const CacheRequestContext& Context,
if (m_Configuration.EnableAccessLog)
{
- if (Result)
- {
- if (OutValue.Value.GetContentType() == ZenContentType::kCbObject)
- {
- m_PendingAsyncLogging.AddCount(1);
- m_AsyncLogging->ScheduleWork(
- [this, OutValue, Context, Namespace = std::string(Namespace), Bucket = std::string(Bucket), HashKey]() {
- auto _ = MakeGuard([this]() { m_PendingAsyncLogging.CountDown(); });
- const IoHash ObjectHash = IoHash::HashBuffer(OutValue.Value.GetView());
- const size_t ObjectSize = OutValue.Value.GetSize();
-
- ZEN_LOG_INFO(LogCacheActivity,
- "GET HIT [{}] {}/{}/{} -> {} {} {}",
- Context,
- Namespace,
- Bucket,
- HashKey,
- ObjectHash,
- ObjectSize,
- ToString(OutValue.Value.GetContentType()))
- });
- }
- else
- {
- ZEN_LOG_INFO(LogCacheActivity,
- "GET HIT [{}] {}/{}/{} -> {} {} {}",
- Context,
- Namespace,
- Bucket,
- HashKey,
- OutValue.RawHash,
- OutValue.RawSize,
- ToString(OutValue.Value.GetContentType()));
- }
- }
- else
+ ZEN_TRACE_CPU("Z$::Get::AccessLog");
+ bool Signal = false;
+ m_LogQueueLock.WithExclusiveLock([&]() {
+ Signal = m_LogQueue.empty();
+ m_LogQueue.emplace_back(AccessLogItem{.Op = Result ? "GET HIT " : "GET MISS",
+ .Context = Context,
+ .Namespace = std::string(Namespace),
+ .Bucket = std::string(Bucket),
+ .HashKey = HashKey,
+ .Value = OutValue /*,
+ .Result = Result*/});
+ });
+ if (Signal)
{
- ZEN_LOG_INFO(LogCacheActivity, "GET MISS [{}] {}/{}/{}", Context, Namespace, Bucket, HashKey);
+ m_LogEvent.Set();
}
}
@@ -349,37 +376,21 @@ ZenCacheStore::Put(const CacheRequestContext& Context,
{
if (m_Configuration.EnableWriteLog)
{
- if (Value.Value.GetContentType() == ZenContentType::kCbObject)
- {
- m_PendingAsyncLogging.AddCount(1);
- m_AsyncLogging->ScheduleWork(
- [this, Value, Context, Namespace = std::string(Namespace), Bucket = std::string(Bucket), HashKey]() {
- auto _ = MakeGuard([this]() { m_PendingAsyncLogging.CountDown(); });
- const IoHash ObjectHash = IoHash::HashBuffer(Value.Value.GetView());
- const size_t ObjectSize = Value.Value.GetSize();
-
- ZEN_LOG_INFO(LogCacheActivity,
- "PUT [{}] {}/{}/{} -> {} {} {}",
- Context,
- Namespace,
- Bucket,
- HashKey,
- ObjectHash,
- ObjectSize,
- ToString(Value.Value.GetContentType()));
- });
- }
- else
+ ZEN_TRACE_CPU("Z$::Get::WriteLog");
+ bool Signal = false;
+ m_LogQueueLock.WithExclusiveLock([&]() {
+ Signal = m_LogQueue.empty();
+ m_LogQueue.emplace_back(AccessLogItem{.Op = "PUT ",
+ .Context = Context,
+ .Namespace = std::string(Namespace),
+ .Bucket = std::string(Bucket),
+ .HashKey = HashKey,
+ .Value = Value /*,
+ .Result = true*/});
+ });
+ if (Signal)
{
- ZEN_LOG_INFO(LogCacheActivity,
- "PUT [{}] {}/{}/{} -> {} {} {}",
- Context,
- Namespace,
- Bucket,
- HashKey,
- Value.RawHash,
- Value.RawSize,
- ToString(Value.Value.GetContentType()));
+ m_LogEvent.Set();
}
}
diff --git a/src/zenserver/cache/structuredcachestore.h b/src/zenserver/cache/structuredcachestore.h
index 040b71c95..067cfc0bf 100644
--- a/src/zenserver/cache/structuredcachestore.h
+++ b/src/zenserver/cache/structuredcachestore.h
@@ -174,8 +174,22 @@ private:
GcManager& m_Gc;
Configuration m_Configuration;
- std::unique_ptr<WorkerThreadPool> m_AsyncLogging;
- Latch m_PendingAsyncLogging;
+ struct AccessLogItem
+ {
+ const char* Op;
+ CacheRequestContext Context;
+ std::string Namespace;
+ std::string Bucket;
+ IoHash HashKey;
+ ZenCacheValue Value;
+ };
+
+ void LogWorker();
+ RwLock m_LogQueueLock;
+ std::vector<AccessLogItem> m_LogQueue;
+ std::atomic_bool m_ExitLogging;
+ Event m_LogEvent;
+ std::thread m_AsyncLoggingThread;
};
void z$_forcelink();