aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2023-05-23 15:00:55 +0200
committerGitHub <[email protected]>2023-05-23 15:00:55 +0200
commit55844c3f72866711f9a10b987f14da5622cc2d63 (patch)
treea872fdb170bf1019cdb4b5cc17c53dcc1585e3cc /src
parentuse exception when allocations fail rather than asserts (#319) (diff)
downloadzen-55844c3f72866711f9a10b987f14da5622cc2d63.tar.xz
zen-55844c3f72866711f9a10b987f14da5622cc2d63.zip
cache log sessionid (#297)
* implemented structured cache logging to be used as audit trail to help analyse potential cache pollution/corruption * added common header to all known log targets * made Oid::operator bool explicit to avoid logging/text format mishaps * HttpClient::operator bool -> explicit * changed cache logs to not rotate on start in order to retain more history * added CacheRequestContext * properly initialize request context * log session id and request id on zencacehstore get/put * changelog
Diffstat (limited to 'src')
-rw-r--r--src/zenserver/cache/httpstructuredcache.cpp230
-rw-r--r--src/zenserver/cache/httpstructuredcache.h39
-rw-r--r--src/zenserver/cache/structuredcachestore.cpp128
-rw-r--r--src/zenserver/cache/structuredcachestore.h15
-rw-r--r--src/zenserver/upstream/upstreamcache.cpp2
-rw-r--r--src/zenserver/upstream/upstreamcache.h1
-rw-r--r--src/zenutil/include/zenutil/cache/cache.h28
7 files changed, 274 insertions, 169 deletions
diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp
index 75800ad3b..03bbde10e 100644
--- a/src/zenserver/cache/httpstructuredcache.cpp
+++ b/src/zenserver/cache/httpstructuredcache.cpp
@@ -73,10 +73,11 @@ struct AttachmentCount
struct PutRequestData
{
- std::string Namespace;
- CacheKey Key;
- CbObjectView RecordObject;
- CacheRecordPolicy Policy;
+ std::string Namespace;
+ CacheKey Key;
+ CbObjectView RecordObject;
+ CacheRecordPolicy Policy;
+ CacheRequestContext Context;
};
namespace {
@@ -619,6 +620,8 @@ HttpStructuredCacheService::HandleRequest(HttpServerRequest& Request)
}
if (Key == HttpZCacheUtilReplayRecording)
{
+ CacheRequestContext RequestContext = {.SessionId = Request.SessionId(), .RequestId = Request.RequestId()};
+
m_RequestRecorder.reset();
HttpServerRequest::QueryParams Params = Request.GetQueryParams();
std::string RecordPath = cpr::util::urlDecode(std::string(Params.GetValue("path")));
@@ -631,7 +634,7 @@ HttpStructuredCacheService::HandleRequest(HttpServerRequest& Request)
}
}
std::unique_ptr<cache::IRpcRequestReplayer> Replayer(cache::MakeDiskRequestReplayer(RecordPath, false));
- ReplayRequestRecorder(*Replayer, ThreadCount < 1 ? 1 : ThreadCount);
+ ReplayRequestRecorder(RequestContext, *Replayer, ThreadCount < 1 ? 1 : ThreadCount);
Request.WriteResponse(HttpResponseCode::OK);
return;
}
@@ -884,10 +887,11 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con
return Request.WriteResponse(HttpResponseCode::OK);
}
- Stopwatch Timer;
+ CacheRequestContext RequestContext = {.SessionId = Request.SessionId(), .RequestId = Request.RequestId()};
+ Stopwatch Timer;
if (EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal) &&
- m_CacheStore.Get(Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ClientResultValue))
+ m_CacheStore.Get(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ClientResultValue))
{
Success = true;
ZenContentType ContentType = ClientResultValue.Value.GetContentType();
@@ -985,7 +989,7 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con
uint64_t LocalElapsedTimeUs = Timer.GetElapsedTimeUs();
- Request.WriteResponseAsync([this, AcceptType, PolicyFromUrl, Ref, LocalElapsedTimeUs](HttpServerRequest& AsyncRequest) {
+ Request.WriteResponseAsync([this, AcceptType, PolicyFromUrl, Ref, LocalElapsedTimeUs, RequestContext](HttpServerRequest& AsyncRequest) {
Stopwatch Timer;
bool Success = false;
const bool PartialRecord = EnumHasAllFlags(PolicyFromUrl, CachePolicy::PartialRecord);
@@ -1025,7 +1029,7 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con
if (Success && StoreLocal)
{
- m_CacheStore.Put(Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ClientResultValue);
+ m_CacheStore.Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ClientResultValue);
}
}
else if (AcceptType == ZenContentType::kCbPackage)
@@ -1099,7 +1103,7 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con
if (StoreLocal)
{
- m_CacheStore.Put(Ref.Namespace, Ref.BucketSegment, Ref.HashKey, CacheValue);
+ m_CacheStore.Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, CacheValue);
}
for (const CbAttachment* Attachment : AttachmentsToStoreLocally)
@@ -1205,6 +1209,8 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con
Body.SetContentType(ContentType);
+ CacheRequestContext RequestContext = {.SessionId = Request.SessionId(), .RequestId = Request.RequestId()};
+
Stopwatch Timer;
if (ContentType == HttpContentType::kBinary || ContentType == HttpContentType::kCompressedBinary)
@@ -1224,7 +1230,11 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con
{
RawHash = IoHash::HashBuffer(SharedBuffer(Body));
}
- m_CacheStore.Put(Ref.Namespace, Ref.BucketSegment, Ref.HashKey, {.Value = Body, .RawSize = RawSize, .RawHash = RawHash});
+ m_CacheStore.Put(RequestContext,
+ Ref.Namespace,
+ Ref.BucketSegment,
+ Ref.HashKey,
+ {.Value = Body, .RawSize = RawSize, .RawHash = RawHash});
if (EnumHasAllFlags(PolicyFromUrl, CachePolicy::StoreRemote))
{
@@ -1255,7 +1265,7 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con
}
Body.SetContentType(ZenContentType::kCbObject);
- m_CacheStore.Put(Ref.Namespace, Ref.BucketSegment, Ref.HashKey, {.Value = Body});
+ m_CacheStore.Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, {.Value = Body});
CbObjectView CacheRecord(Body.Data());
std::vector<IoHash> ValidAttachments;
@@ -1354,7 +1364,7 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con
ZenCacheValue CacheValue;
CacheValue.Value = CacheRecord.GetBuffer().AsIoBuffer();
CacheValue.Value.SetContentType(ZenContentType::kCbObject);
- m_CacheStore.Put(Ref.Namespace, Ref.BucketSegment, Ref.HashKey, CacheValue);
+ m_CacheStore.Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, CacheValue);
for (const CbAttachment* Attachment : AttachmentsToStoreLocally)
{
@@ -1546,12 +1556,13 @@ HttpStructuredCacheService::HandlePutCacheChunk(HttpServerRequest& Request, cons
}
HttpResponseCode
-HttpStructuredCacheService::HandleRpcRequest(const ZenContentType ContentType,
- IoBuffer&& Body,
- uint32_t& OutAcceptMagic,
- RpcAcceptOptions& OutAcceptFlags,
- int& OutTargetProcessId,
- CbPackage& OutResultPackage)
+HttpStructuredCacheService::HandleRpcRequest(const CacheRequestContext& Context,
+ const ZenContentType ContentType,
+ IoBuffer&& Body,
+ uint32_t& OutAcceptMagic,
+ RpcAcceptOptions& OutAcceptFlags,
+ int& OutTargetProcessId,
+ CbPackage& OutResultPackage)
{
CbPackage Package;
CbObjectView Object;
@@ -1578,11 +1589,11 @@ HttpStructuredCacheService::HandleRpcRequest(const ZenContentType ContentType,
{
return HttpResponseCode::InsufficientStorage;
}
- OutResultPackage = HandleRpcPutCacheRecords(Package);
+ OutResultPackage = HandleRpcPutCacheRecords(Context, Package);
}
else if (Method == "GetCacheRecords"sv)
{
- OutResultPackage = HandleRpcGetCacheRecords(Object);
+ OutResultPackage = HandleRpcGetCacheRecords(Context, Object);
}
else if (Method == "PutCacheValues"sv)
{
@@ -1590,15 +1601,15 @@ HttpStructuredCacheService::HandleRpcRequest(const ZenContentType ContentType,
{
return HttpResponseCode::InsufficientStorage;
}
- OutResultPackage = HandleRpcPutCacheValues(Package);
+ OutResultPackage = HandleRpcPutCacheValues(Context, Package);
}
else if (Method == "GetCacheValues"sv)
{
- OutResultPackage = HandleRpcGetCacheValues(Object);
+ OutResultPackage = HandleRpcGetCacheValues(Context, Object);
}
else if (Method == "GetCacheChunks"sv)
{
- OutResultPackage = HandleRpcGetCacheChunks(Object);
+ OutResultPackage = HandleRpcGetCacheChunks(Context, Object);
}
else
{
@@ -1608,7 +1619,9 @@ HttpStructuredCacheService::HandleRpcRequest(const ZenContentType ContentType,
}
void
-HttpStructuredCacheService::ReplayRequestRecorder(cache::IRpcRequestReplayer& Replayer, uint32_t ThreadCount)
+HttpStructuredCacheService::ReplayRequestRecorder(const CacheRequestContext& Context,
+ cache::IRpcRequestReplayer& Replayer,
+ uint32_t ThreadCount)
{
WorkerThreadPool WorkerPool(ThreadCount);
uint64_t RequestCount = Replayer.GetRequestCount();
@@ -1618,7 +1631,7 @@ HttpStructuredCacheService::ReplayRequestRecorder(cache::IRpcRequestReplayer& Re
ZEN_INFO("Replaying {} requests", RequestCount);
for (uint64_t RequestIndex = 0; RequestIndex < RequestCount; ++RequestIndex)
{
- WorkerPool.ScheduleWork([this, &JobLatch, &Replayer, RequestIndex]() {
+ WorkerPool.ScheduleWork([this, &Context, &JobLatch, &Replayer, RequestIndex]() {
IoBuffer Body;
std::pair<ZenContentType, ZenContentType> ContentType = Replayer.GetRequest(RequestIndex, Body);
if (Body)
@@ -1627,7 +1640,8 @@ HttpStructuredCacheService::ReplayRequestRecorder(cache::IRpcRequestReplayer& Re
RpcAcceptOptions AcceptFlags = RpcAcceptOptions::kNone;
int TargetPid = 0;
CbPackage RpcResult;
- if (IsHttpSuccessCode(HandleRpcRequest(ContentType.first, std::move(Body), AcceptMagic, AcceptFlags, TargetPid, RpcResult)))
+ if (IsHttpSuccessCode(
+ HandleRpcRequest(Context, ContentType.first, std::move(Body), AcceptMagic, AcceptFlags, TargetPid, RpcResult)))
{
if (AcceptMagic == kCbPkgMagic)
{
@@ -1671,6 +1685,8 @@ HttpStructuredCacheService::HandleRpcRequest(HttpServerRequest& Request)
{
case HttpVerb::kPost:
{
+ CacheRequestContext RequestContext = {.SessionId = Request.SessionId(), .RequestId = Request.RequestId()};
+
const HttpContentType ContentType = Request.RequestContentType();
const HttpContentType AcceptType = Request.AcceptContentType();
@@ -1680,58 +1696,63 @@ HttpStructuredCacheService::HandleRpcRequest(HttpServerRequest& Request)
return Request.WriteResponse(HttpResponseCode::BadRequest);
}
- Request.WriteResponseAsync(
- [this, Body = Request.ReadPayload(), ContentType, AcceptType](HttpServerRequest& AsyncRequest) mutable {
- std::uint64_t RequestIndex =
- m_RequestRecorder ? m_RequestRecorder->RecordRequest(ContentType, AcceptType, Body) : ~0ull;
- uint32_t AcceptMagic = 0;
- RpcAcceptOptions AcceptFlags = RpcAcceptOptions::kNone;
- int TargetProcessId = 0;
- CbPackage RpcResult;
-
- HttpResponseCode ResultCode =
- HandleRpcRequest(ContentType, std::move(Body), AcceptMagic, AcceptFlags, TargetProcessId, RpcResult);
- if (!IsHttpSuccessCode(ResultCode))
- {
- AsyncRequest.WriteResponse(ResultCode);
- return;
- }
- if (AcceptMagic == kCbPkgMagic)
+ Request.WriteResponseAsync([this, &RequestContext, Body = Request.ReadPayload(), ContentType, AcceptType](
+ HttpServerRequest& AsyncRequest) mutable {
+ std::uint64_t RequestIndex =
+ m_RequestRecorder ? m_RequestRecorder->RecordRequest(ContentType, AcceptType, Body) : ~0ull;
+ uint32_t AcceptMagic = 0;
+ RpcAcceptOptions AcceptFlags = RpcAcceptOptions::kNone;
+ int TargetProcessId = 0;
+ CbPackage RpcResult;
+
+ HttpResponseCode ResultCode = HandleRpcRequest(RequestContext,
+ ContentType,
+ std::move(Body),
+ AcceptMagic,
+ AcceptFlags,
+ TargetProcessId,
+ RpcResult);
+ if (!IsHttpSuccessCode(ResultCode))
+ {
+ AsyncRequest.WriteResponse(ResultCode);
+ return;
+ }
+ if (AcceptMagic == kCbPkgMagic)
+ {
+ FormatFlags Flags = FormatFlags::kDefault;
+ if (EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences))
{
- FormatFlags Flags = FormatFlags::kDefault;
- if (EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences))
- {
- Flags |= FormatFlags::kAllowLocalReferences;
- if (!EnumHasAnyFlags(AcceptFlags, RpcAcceptOptions::kAllowPartialLocalReferences))
- {
- Flags |= FormatFlags::kDenyPartialLocalReferences;
- }
- }
- CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResult, Flags, TargetProcessId);
- if (RequestIndex != ~0ull)
+ Flags |= FormatFlags::kAllowLocalReferences;
+ if (!EnumHasAnyFlags(AcceptFlags, RpcAcceptOptions::kAllowPartialLocalReferences))
{
- ZEN_ASSERT(m_RequestRecorder);
- m_RequestRecorder->RecordResponse(RequestIndex, HttpContentType::kCbPackage, RpcResponseBuffer);
+ Flags |= FormatFlags::kDenyPartialLocalReferences;
}
- AsyncRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer);
}
- else
+ CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResult, Flags, TargetProcessId);
+ if (RequestIndex != ~0ull)
{
- BinaryWriter MemStream;
- RpcResult.Save(MemStream);
+ ZEN_ASSERT(m_RequestRecorder);
+ m_RequestRecorder->RecordResponse(RequestIndex, HttpContentType::kCbPackage, RpcResponseBuffer);
+ }
+ AsyncRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer);
+ }
+ else
+ {
+ BinaryWriter MemStream;
+ RpcResult.Save(MemStream);
- if (RequestIndex != ~0ull)
- {
- ZEN_ASSERT(m_RequestRecorder);
- m_RequestRecorder->RecordResponse(RequestIndex,
- HttpContentType::kCbPackage,
- IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()));
- }
- AsyncRequest.WriteResponse(HttpResponseCode::OK,
- HttpContentType::kCbPackage,
- IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()));
+ if (RequestIndex != ~0ull)
+ {
+ ZEN_ASSERT(m_RequestRecorder);
+ m_RequestRecorder->RecordResponse(RequestIndex,
+ HttpContentType::kCbPackage,
+ IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()));
}
- });
+ AsyncRequest.WriteResponse(HttpResponseCode::OK,
+ HttpContentType::kCbPackage,
+ IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()));
+ }
+ });
}
break;
default:
@@ -1741,7 +1762,7 @@ HttpStructuredCacheService::HandleRpcRequest(HttpServerRequest& Request)
}
CbPackage
-HttpStructuredCacheService::HandleRpcPutCacheRecords(const CbPackage& BatchRequest)
+HttpStructuredCacheService::HandleRpcPutCacheRecords([[maybe_unused]] const CacheRequestContext& Context, const CbPackage& BatchRequest)
{
ZEN_TRACE_CPU("Z$::RpcPutCacheRecords");
@@ -1854,7 +1875,7 @@ HttpStructuredCacheService::PutCacheRecord(PutRequestData& Request, const CbPack
CacheValue.Value = IoBuffer(Record.GetSize());
Record.CopyTo(MutableMemoryView(CacheValue.Value.MutableData(), CacheValue.Value.GetSize()));
CacheValue.Value.SetContentType(ZenContentType::kCbObject);
- m_CacheStore.Put(Request.Namespace, Request.Key.Bucket, Request.Key.Hash, CacheValue);
+ m_CacheStore.Put(Request.Context, Request.Namespace, Request.Key.Bucket, Request.Key.Hash, CacheValue);
for (const CbAttachment* Attachment : AttachmentsToStoreLocally)
{
@@ -1890,7 +1911,7 @@ HttpStructuredCacheService::PutCacheRecord(PutRequestData& Request, const CbPack
}
CbPackage
-HttpStructuredCacheService::HandleRpcGetCacheRecords(CbObjectView RpcRequest)
+HttpStructuredCacheService::HandleRpcGetCacheRecords(const CacheRequestContext& Context, CbObjectView RpcRequest)
{
ZEN_TRACE_CPU("Z$::RpcGetCacheRecords");
@@ -1970,7 +1991,7 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(CbObjectView RpcRequest)
ZenCacheValue RecordCacheValue;
if (EnumHasAllFlags(Policy.GetRecordPolicy(), CachePolicy::QueryLocal) &&
- m_CacheStore.Get(*Namespace, Key.Bucket, Key.Hash, RecordCacheValue))
+ m_CacheStore.Get(Context, *Namespace, Key.Bucket, Key.Hash, RecordCacheValue))
{
Request.RecordCacheValue = std::move(RecordCacheValue.Value);
if (Request.RecordCacheValue.GetContentType() != ZenContentType::kCbObject)
@@ -2085,7 +2106,7 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(CbObjectView RpcRequest)
}
}
- const auto OnCacheRecordGetComplete = [this, Namespace, &ParseValues](CacheRecordGetCompleteParams&& Params) {
+ const auto OnCacheRecordGetComplete = [this, Namespace, &ParseValues, Context](CacheRecordGetCompleteParams&& Params) {
if (!Params.Record)
{
return;
@@ -2107,7 +2128,7 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(CbObjectView RpcRequest)
EnumHasAllFlags(Request.DownstreamPolicy.GetRecordPolicy(), CachePolicy::StoreLocal) && AreDiskWritesAllowed();
if (StoreLocal)
{
- m_CacheStore.Put(*Namespace, Key.Bucket, Key.Hash, {.Value = {Request.RecordCacheValue}});
+ m_CacheStore.Put(Context, *Namespace, Key.Bucket, Key.Hash, {.Value = {Request.RecordCacheValue}});
}
ParseValues(Request);
Request.Source = Params.Source;
@@ -2231,7 +2252,7 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(CbObjectView RpcRequest)
}
CbPackage
-HttpStructuredCacheService::HandleRpcPutCacheValues(const CbPackage& BatchRequest)
+HttpStructuredCacheService::HandleRpcPutCacheValues(const CacheRequestContext& Context, const CbPackage& BatchRequest)
{
CbObjectView BatchObject = BatchRequest.GetObject();
CbObjectView Params = BatchObject["Params"sv].AsObjectView();
@@ -2284,7 +2305,7 @@ HttpStructuredCacheService::HandleRpcPutCacheValues(const CbPackage& BatchReques
{
RawSize = Chunk.DecodeRawSize();
}
- m_CacheStore.Put(*Namespace, Key.Bucket, Key.Hash, {.Value = Value, .RawSize = RawSize, .RawHash = RawHash});
+ m_CacheStore.Put(Context, *Namespace, Key.Bucket, Key.Hash, {.Value = Value, .RawSize = RawSize, .RawHash = RawHash});
TransferredSize = Chunk.GetCompressedSize();
}
Succeeded = true;
@@ -2298,7 +2319,7 @@ HttpStructuredCacheService::HandleRpcPutCacheValues(const CbPackage& BatchReques
else if (EnumHasAllFlags(Policy, CachePolicy::QueryLocal))
{
ZenCacheValue ExistingValue;
- if (m_CacheStore.Get(*Namespace, Key.Bucket, Key.Hash, ExistingValue) &&
+ if (m_CacheStore.Get(Context, *Namespace, Key.Bucket, Key.Hash, ExistingValue) &&
IsCompressedBinary(ExistingValue.Value.GetContentType()))
{
Succeeded = true;
@@ -2340,7 +2361,7 @@ HttpStructuredCacheService::HandleRpcPutCacheValues(const CbPackage& BatchReques
}
CbPackage
-HttpStructuredCacheService::HandleRpcGetCacheValues(CbObjectView RpcRequest)
+HttpStructuredCacheService::HandleRpcGetCacheValues(const CacheRequestContext& Context, CbObjectView RpcRequest)
{
ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheValues"sv);
@@ -2387,7 +2408,8 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(CbObjectView RpcRequest)
ZenCacheValue CacheValue;
if (EnumHasAllFlags(Policy, CachePolicy::QueryLocal))
{
- if (m_CacheStore.Get(*Namespace, Key.Bucket, Key.Hash, CacheValue) && IsCompressedBinary(CacheValue.Value.GetContentType()))
+ if (m_CacheStore.Get(Context, *Namespace, Key.Bucket, Key.Hash, CacheValue) &&
+ IsCompressedBinary(CacheValue.Value.GetContentType()))
{
Request.RawHash = CacheValue.RawHash;
Request.RawSize = CacheValue.RawSize;
@@ -2442,7 +2464,8 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(CbObjectView RpcRequest)
m_UpstreamCache.GetCacheValues(
*Namespace,
CacheValueRequests,
- [this, Namespace, &RequestedRecordsData, &Requests, &RemoteRequestIndexes, &Timer](CacheValueGetCompleteParams&& Params) {
+ [this, Namespace, &RequestedRecordsData, &Requests, &RemoteRequestIndexes, &Timer, Context](
+ CacheValueGetCompleteParams&& Params) {
CacheValueRequest& ChunkRequest = Params.Request;
if (Params.RawHash != IoHash::Zero)
{
@@ -2464,7 +2487,8 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(CbObjectView RpcRequest)
if (HasData && StoreData)
{
- m_CacheStore.Put(*Namespace,
+ m_CacheStore.Put(Context,
+ *Namespace,
Request.Key.Bucket,
Request.Key.Hash,
ZenCacheValue{.Value = Params.Value, .RawSize = Request.RawSize, .RawHash = Request.RawHash});
@@ -2568,7 +2592,7 @@ namespace cache::detail {
} // namespace cache::detail
CbPackage
-HttpStructuredCacheService::HandleRpcGetCacheChunks(CbObjectView RpcRequest)
+HttpStructuredCacheService::HandleRpcGetCacheChunks(const CacheRequestContext& Context, CbObjectView RpcRequest)
{
using namespace cache::detail;
@@ -2589,16 +2613,16 @@ HttpStructuredCacheService::HandleRpcGetCacheChunks(CbObjectView RpcRequest)
// For each Record request, load the Record if necessary to find the Chunk's ContentId, load its Payloads if we
// have it locally, and otherwise append a request for the payload to UpstreamChunks
- GetLocalCacheRecords(Namespace, RecordKeys, Records, RecordRequests, UpstreamChunks);
+ GetLocalCacheRecords(Context, Namespace, RecordKeys, Records, RecordRequests, UpstreamChunks);
// For each Value request, load the Value if we have it locally and otherwise append a request for the payload to UpstreamChunks
- GetLocalCacheValues(Namespace, ValueRequests, UpstreamChunks);
+ GetLocalCacheValues(Context, Namespace, ValueRequests, UpstreamChunks);
// Call GetCacheChunks on the upstream for any payloads we do not have locally
- GetUpstreamCacheChunks(Namespace, UpstreamChunks, RequestKeys, Requests);
+ GetUpstreamCacheChunks(Context, Namespace, UpstreamChunks, RequestKeys, Requests);
// Send the payload and descriptive data about each chunk to the client
- return WriteGetCacheChunksResponse(Namespace, Requests);
+ return WriteGetCacheChunksResponse(Context, Namespace, Requests);
}
bool
@@ -2715,7 +2739,8 @@ HttpStructuredCacheService::ParseGetCacheChunksRequest(std::string& Nam
}
void
-HttpStructuredCacheService::GetLocalCacheRecords(std::string_view Namespace,
+HttpStructuredCacheService::GetLocalCacheRecords(const CacheRequestContext& Context,
+ std::string_view Namespace,
std::vector<CacheKeyRequest>& RecordKeys,
std::vector<cache::detail::RecordBody>& Records,
std::vector<cache::detail::ChunkRequest*>& RecordRequests,
@@ -2736,7 +2761,7 @@ HttpStructuredCacheService::GetLocalCacheRecords(std::string_view Namespac
if (!Record.Exists && EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::QueryLocal))
{
ZenCacheValue CacheValue;
- if (m_CacheStore.Get(Namespace, RecordKey.Key.Bucket, RecordKey.Key.Hash, CacheValue))
+ if (m_CacheStore.Get(Context, Namespace, RecordKey.Key.Bucket, RecordKey.Key.Hash, CacheValue))
{
Record.Exists = true;
Record.CacheValue = std::move(CacheValue.Value);
@@ -2754,7 +2779,7 @@ HttpStructuredCacheService::GetLocalCacheRecords(std::string_view Namespac
if (!UpstreamRecordRequests.empty())
{
const auto OnCacheRecordGetComplete =
- [this, Namespace, &RecordKeys, &Records, &RecordRequests](CacheRecordGetCompleteParams&& Params) {
+ [this, Namespace, &RecordKeys, &Records, &RecordRequests, Context](CacheRecordGetCompleteParams&& Params) {
if (!Params.Record)
{
return;
@@ -2774,7 +2799,7 @@ HttpStructuredCacheService::GetLocalCacheRecords(std::string_view Namespac
bool StoreLocal = EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::StoreLocal) && AreDiskWritesAllowed();
if (StoreLocal)
{
- m_CacheStore.Put(Namespace, Key.Bucket, Key.Hash, {.Value = Record.CacheValue});
+ m_CacheStore.Put(Context, Namespace, Key.Bucket, Key.Hash, {.Value = Record.CacheValue});
}
};
m_UpstreamCache.GetCacheRecords(Namespace, UpstreamRecordRequests, std::move(OnCacheRecordGetComplete));
@@ -2868,7 +2893,8 @@ HttpStructuredCacheService::GetLocalCacheRecords(std::string_view Namespac
}
void
-HttpStructuredCacheService::GetLocalCacheValues(std::string_view Namespace,
+HttpStructuredCacheService::GetLocalCacheValues(const CacheRequestContext& Context,
+ std::string_view Namespace,
std::vector<cache::detail::ChunkRequest*>& ValueRequests,
std::vector<CacheChunkRequest*>& OutUpstreamChunks)
{
@@ -2880,7 +2906,7 @@ HttpStructuredCacheService::GetLocalCacheValues(std::string_view Namespa
if (!Request->Exists && EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryLocal))
{
ZenCacheValue CacheValue;
- if (m_CacheStore.Get(Namespace, Request->Key->Key.Bucket, Request->Key->Key.Hash, CacheValue))
+ if (m_CacheStore.Get(Context, Namespace, Request->Key->Key.Bucket, Request->Key->Key.Hash, CacheValue))
{
if (IsCompressedBinary(CacheValue.Value.GetContentType()))
{
@@ -2910,7 +2936,8 @@ HttpStructuredCacheService::GetLocalCacheValues(std::string_view Namespa
}
void
-HttpStructuredCacheService::GetUpstreamCacheChunks(std::string_view Namespace,
+HttpStructuredCacheService::GetUpstreamCacheChunks(const CacheRequestContext& Context,
+ std::string_view Namespace,
std::vector<CacheChunkRequest*>& UpstreamChunks,
std::vector<CacheChunkRequest>& RequestKeys,
std::vector<cache::detail::ChunkRequest>& Requests)
@@ -2919,7 +2946,7 @@ HttpStructuredCacheService::GetUpstreamCacheChunks(std::string_view Names
if (!UpstreamChunks.empty())
{
- const auto OnCacheChunksGetComplete = [this, Namespace, &RequestKeys, &Requests](CacheChunkGetCompleteParams&& Params) {
+ const auto OnCacheChunksGetComplete = [this, Namespace, &RequestKeys, &Requests, Context](CacheChunkGetCompleteParams&& Params) {
if (Params.RawHash == Params.RawHash.Zero)
{
return;
@@ -2947,7 +2974,8 @@ HttpStructuredCacheService::GetUpstreamCacheChunks(std::string_view Names
}
else
{
- m_CacheStore.Put(Namespace,
+ m_CacheStore.Put(Context,
+ Namespace,
Key.Key.Bucket,
Key.Key.Hash,
{.Value = Params.Value, .RawSize = Params.RawSize, .RawHash = Params.RawHash});
@@ -2972,7 +3000,9 @@ HttpStructuredCacheService::GetUpstreamCacheChunks(std::string_view Names
}
CbPackage
-HttpStructuredCacheService::WriteGetCacheChunksResponse(std::string_view Namespace, std::vector<cache::detail::ChunkRequest>& Requests)
+HttpStructuredCacheService::WriteGetCacheChunksResponse([[maybe_unused]] const CacheRequestContext& Context,
+ std::string_view Namespace,
+ std::vector<cache::detail::ChunkRequest>& Requests)
{
using namespace cache::detail;
diff --git a/src/zenserver/cache/httpstructuredcache.h b/src/zenserver/cache/httpstructuredcache.h
index 4f03daae4..60714d6ae 100644
--- a/src/zenserver/cache/httpstructuredcache.h
+++ b/src/zenserver/cache/httpstructuredcache.h
@@ -6,6 +6,7 @@
#include <zenhttp/httpserver.h>
#include <zenhttp/httpstats.h>
#include <zenhttp/httpstatus.h>
+#include <zenutil/cache/cache.h>
#include <memory>
#include <vector>
@@ -113,17 +114,18 @@ private:
void HandleRpcRequest(HttpServerRequest& Request);
void HandleDetailsRequest(HttpServerRequest& Request);
- CbPackage HandleRpcPutCacheRecords(const CbPackage& BatchRequest);
- CbPackage HandleRpcGetCacheRecords(CbObjectView BatchRequest);
- CbPackage HandleRpcPutCacheValues(const CbPackage& BatchRequest);
- CbPackage HandleRpcGetCacheValues(CbObjectView BatchRequest);
- CbPackage HandleRpcGetCacheChunks(CbObjectView BatchRequest);
- HttpResponseCode HandleRpcRequest(const ZenContentType ContentType,
- IoBuffer&& Body,
- uint32_t& OutAcceptMagic,
- RpcAcceptOptions& OutAcceptFlags,
- int& OutTargetProcessId,
- CbPackage& OutPackage);
+ CbPackage HandleRpcPutCacheRecords(const CacheRequestContext& Context, const CbPackage& BatchRequest);
+ CbPackage HandleRpcGetCacheRecords(const CacheRequestContext& Context, CbObjectView BatchRequest);
+ CbPackage HandleRpcPutCacheValues(const CacheRequestContext& Context, const CbPackage& BatchRequest);
+ CbPackage HandleRpcGetCacheValues(const CacheRequestContext& Context, CbObjectView BatchRequest);
+ CbPackage HandleRpcGetCacheChunks(const CacheRequestContext& Context, CbObjectView BatchRequest);
+ HttpResponseCode HandleRpcRequest(const CacheRequestContext& Context,
+ const ZenContentType ContentType,
+ IoBuffer&& Body,
+ uint32_t& OutAcceptMagic,
+ RpcAcceptOptions& OutAcceptFlags,
+ int& OutTargetProcessId,
+ CbPackage& OutPackage);
void HandleCacheRequest(HttpServerRequest& Request);
void HandleCacheNamespaceRequest(HttpServerRequest& Request, std::string_view Namespace);
@@ -142,22 +144,27 @@ private:
std::vector<cache::detail::ChunkRequest*>& ValueRequests,
CbObjectView RpcRequest);
/** HandleRpcGetCacheChunks Helper: Load records to get ContentId for RecordRequests, and load their payloads if they exist locally. */
- void GetLocalCacheRecords(std::string_view Namespace,
+ void GetLocalCacheRecords(const CacheRequestContext& Context,
+ std::string_view Namespace,
std::vector<CacheKeyRequest>& RecordKeys,
std::vector<cache::detail::RecordBody>& Records,
std::vector<cache::detail::ChunkRequest*>& RecordRequests,
std::vector<CacheChunkRequest*>& OutUpstreamChunks);
/** HandleRpcGetCacheChunks Helper: For ValueRequests, load their payloads if they exist locally. */
- void GetLocalCacheValues(std::string_view Namespace,
+ void GetLocalCacheValues(const CacheRequestContext& Context,
+ std::string_view Namespace,
std::vector<cache::detail::ChunkRequest*>& ValueRequests,
std::vector<CacheChunkRequest*>& OutUpstreamChunks);
/** HandleRpcGetCacheChunks Helper: Load payloads from upstream that did not exist locally. */
- void GetUpstreamCacheChunks(std::string_view Namespace,
+ void GetUpstreamCacheChunks(const CacheRequestContext& Context,
+ std::string_view Namespace,
std::vector<CacheChunkRequest*>& UpstreamChunks,
std::vector<CacheChunkRequest>& RequestKeys,
std::vector<cache::detail::ChunkRequest>& Requests);
/** HandleRpcGetCacheChunks Helper: Send response message containing all chunk results. */
- CbPackage WriteGetCacheChunksResponse(std::string_view Namespace, std::vector<cache::detail::ChunkRequest>& Requests);
+ CbPackage WriteGetCacheChunksResponse(const CacheRequestContext& Context,
+ std::string_view Namespace,
+ std::vector<cache::detail::ChunkRequest>& Requests);
bool AreDiskWritesAllowed() const;
@@ -174,7 +181,7 @@ private:
CacheStats m_CacheStats;
const DiskWriteBlocker* m_DiskWriteBlocker = nullptr;
- void ReplayRequestRecorder(cache::IRpcRequestReplayer& Replayer, uint32_t ThreadCount);
+ void ReplayRequestRecorder(const CacheRequestContext& Context, cache::IRpcRequestReplayer& Replayer, uint32_t ThreadCount);
std::unique_ptr<cache::IRpcRequestRecorder> m_RequestRecorder;
};
diff --git a/src/zenserver/cache/structuredcachestore.cpp b/src/zenserver/cache/structuredcachestore.cpp
index bc4248a8a..1847c724d 100644
--- a/src/zenserver/cache/structuredcachestore.cpp
+++ b/src/zenserver/cache/structuredcachestore.cpp
@@ -16,6 +16,7 @@
#include <zencore/timer.h>
#include <zencore/trace.h>
#include <zenstore/scrubcontext.h>
+#include <zenutil/cache/cache.h>
#include <future>
#include <limits>
@@ -218,7 +219,10 @@ ZEN_DEFINE_LOG_CATEGORY_STATIC(LogCacheActivity, "z$");
static constinit std::string_view UE4DDCNamespaceName = "ue4.ddc";
-ZenCacheStore::ZenCacheStore(GcManager& Gc, const Configuration& Configuration) : m_Gc(Gc), m_Configuration(Configuration)
+ZenCacheStore::ZenCacheStore(GcManager& Gc, const Configuration& Configuration)
+: m_Log(logging::Get("z$"))
+, m_Gc(Gc)
+, m_Configuration(Configuration)
{
CreateDirectories(m_Configuration.BasePath);
@@ -263,7 +267,11 @@ ZenCacheStore::~ZenCacheStore()
}
bool
-ZenCacheStore::Get(std::string_view Namespace, std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue)
+ZenCacheStore::Get(const CacheRequestContext& Context,
+ std::string_view Namespace,
+ std::string_view Bucket,
+ const IoHash& HashKey,
+ ZenCacheValue& OutValue)
{
if (ZenCacheNamespace* Store = GetNamespace(Namespace); Store)
{
@@ -279,7 +287,8 @@ ZenCacheStore::Get(std::string_view Namespace, std::string_view Bucket, const Io
const size_t ObjectSize = OutValue.Value.GetSize();
ZEN_LOG_INFO(LogCacheActivity,
- "GET HIT {}/{}/{} -> {} {} {}",
+ "GET HIT [{}] {}/{}/{} -> {} {} {}",
+ Context,
Namespace,
Bucket,
HashKey,
@@ -290,7 +299,8 @@ ZenCacheStore::Get(std::string_view Namespace, std::string_view Bucket, const Io
else
{
ZEN_LOG_INFO(LogCacheActivity,
- "GET HIT {}/{}/{} -> {} {} {}",
+ "GET HIT [{}] {}/{}/{} -> {} {} {}",
+ Context,
Namespace,
Bucket,
HashKey,
@@ -301,19 +311,27 @@ ZenCacheStore::Get(std::string_view Namespace, std::string_view Bucket, const Io
}
else
{
- ZEN_LOG_INFO(LogCacheActivity, "GET MISS {}/{}/{}", Namespace, Bucket, HashKey);
+ ZEN_LOG_INFO(LogCacheActivity, "GET MISS [{}] {}/{}/{}", Context, Namespace, Bucket, HashKey);
}
}
return Result;
}
- ZEN_WARN("request for unknown namespace '{}' in ZenCacheStore::Get, bucket '{}', key '{}'", Namespace, Bucket, HashKey.ToHexString());
+ ZEN_WARN("request for unknown namespace '{}' in ZenCacheStore::Get [{}], bucket '{}', key '{}'",
+ Context,
+ Namespace,
+ Bucket,
+ HashKey.ToHexString());
return false;
}
void
-ZenCacheStore::Put(std::string_view Namespace, std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value)
+ZenCacheStore::Put(const CacheRequestContext& Context,
+ std::string_view Namespace,
+ std::string_view Bucket,
+ const IoHash& HashKey,
+ const ZenCacheValue& Value)
{
if (m_Configuration.EnableWriteLog)
{
@@ -323,7 +341,8 @@ ZenCacheStore::Put(std::string_view Namespace, std::string_view Bucket, const Io
const size_t ObjectSize = Value.Value.GetSize();
ZEN_LOG_INFO(LogCacheActivity,
- "PUT {}/{}/{} -> {} {} {}",
+ "PUT [{}] {}/{}/{} -> {} {} {}",
+ Context,
Namespace,
Bucket,
HashKey,
@@ -334,7 +353,8 @@ ZenCacheStore::Put(std::string_view Namespace, std::string_view Bucket, const Io
else
{
ZEN_LOG_INFO(LogCacheActivity,
- "PUT {}/{}/{} -> {} {} {}",
+ "PUT [{}] {}/{}/{} -> {} {} {}",
+ Context,
Namespace,
Bucket,
HashKey,
@@ -348,7 +368,11 @@ ZenCacheStore::Put(std::string_view Namespace, std::string_view Bucket, const Io
{
return Store->Put(Bucket, HashKey, Value);
}
- ZEN_WARN("request for unknown namespace '{}' in ZenCacheStore::Put, bucket '{}', key '{}'", Namespace, Bucket, HashKey.ToHexString());
+ ZEN_WARN("request for unknown namespace '{}' in ZenCacheStore::Put [{}] bucket '{}', key '{}'",
+ Context,
+ Namespace,
+ Bucket,
+ HashKey.ToHexString());
}
bool
@@ -1128,8 +1152,9 @@ TEST_CASE("z$.namespaces")
ScopedTemporaryDirectory TempDir;
CreateDirectories(TempDir.Path());
- IoHash Key1;
- IoHash Key2;
+ const CacheRequestContext Context;
+ IoHash Key1;
+ IoHash Key2;
{
GcManager Gc;
ZenCacheStore Zcs(Gc, {.BasePath = TempDir.Path() / "cache", .AllowAutomaticCreationOfNamespaces = false});
@@ -1144,15 +1169,15 @@ TEST_CASE("z$.namespaces")
Buffer.SetContentType(ZenContentType::kCbObject);
ZenCacheValue PutValue = {.Value = Buffer};
- Zcs.Put(ZenCacheStore::DefaultNamespace, Bucket, Key1, PutValue);
+ Zcs.Put(Context, ZenCacheStore::DefaultNamespace, Bucket, Key1, PutValue);
ZenCacheValue GetValue;
- CHECK(Zcs.Get(ZenCacheStore::DefaultNamespace, Bucket, Key1, GetValue));
- CHECK(!Zcs.Get(CustomNamespace, Bucket, Key1, GetValue));
+ CHECK(Zcs.Get(Context, ZenCacheStore::DefaultNamespace, Bucket, Key1, GetValue));
+ CHECK(!Zcs.Get(Context, CustomNamespace, Bucket, Key1, GetValue));
// This should just be dropped as we don't allow creating of namespaces on the fly
- Zcs.Put(CustomNamespace, Bucket, Key1, PutValue);
- CHECK(!Zcs.Get(CustomNamespace, Bucket, Key1, GetValue));
+ Zcs.Put(Context, CustomNamespace, Bucket, Key1, PutValue);
+ CHECK(!Zcs.Get(Context, CustomNamespace, Bucket, Key1, GetValue));
}
{
@@ -1167,13 +1192,13 @@ TEST_CASE("z$.namespaces")
IoBuffer Buffer2 = CacheValue2.GetBuffer().AsIoBuffer();
Buffer2.SetContentType(ZenContentType::kCbObject);
ZenCacheValue PutValue2 = {.Value = Buffer2};
- Zcs.Put(CustomNamespace, Bucket, Key2, PutValue2);
+ Zcs.Put(Context, CustomNamespace, Bucket, Key2, PutValue2);
ZenCacheValue GetValue;
- CHECK(!Zcs.Get(ZenCacheStore::DefaultNamespace, Bucket, Key2, GetValue));
- CHECK(Zcs.Get(ZenCacheStore::DefaultNamespace, Bucket, Key1, GetValue));
- CHECK(!Zcs.Get(CustomNamespace, Bucket, Key1, GetValue));
- CHECK(Zcs.Get(CustomNamespace, Bucket, Key2, GetValue));
+ CHECK(!Zcs.Get(Context, ZenCacheStore::DefaultNamespace, Bucket, Key2, GetValue));
+ CHECK(Zcs.Get(Context, ZenCacheStore::DefaultNamespace, Bucket, Key1, GetValue));
+ CHECK(!Zcs.Get(Context, CustomNamespace, Bucket, Key1, GetValue));
+ CHECK(Zcs.Get(Context, CustomNamespace, Bucket, Key2, GetValue));
}
}
@@ -1193,25 +1218,26 @@ TEST_CASE("z$.drop.bucket")
ScopedTemporaryDirectory TempDir;
CreateDirectories(TempDir.Path());
- IoHash Key1;
- IoHash Key2;
+ const CacheRequestContext Context;
+ IoHash Key1;
+ IoHash Key2;
- auto PutValue =
- [&CreateCacheValue](ZenCacheStore& Zcs, std::string_view Namespace, std::string_view Bucket, size_t KeyIndex, size_t Size) {
- // Create a cache record
- IoHash Key = CreateKey(KeyIndex);
- CbObject CacheValue = CreateCacheValue(Size);
+ auto PutValue = [&CreateCacheValue,
+ &Context](ZenCacheStore& Zcs, std::string_view Namespace, std::string_view Bucket, size_t KeyIndex, size_t Size) {
+ // Create a cache record
+ IoHash Key = CreateKey(KeyIndex);
+ CbObject CacheValue = CreateCacheValue(Size);
- IoBuffer Buffer = CacheValue.GetBuffer().AsIoBuffer();
- Buffer.SetContentType(ZenContentType::kCbObject);
+ IoBuffer Buffer = CacheValue.GetBuffer().AsIoBuffer();
+ Buffer.SetContentType(ZenContentType::kCbObject);
- ZenCacheValue PutValue = {.Value = Buffer};
- Zcs.Put(Namespace, Bucket, Key, PutValue);
- return Key;
- };
- auto GetValue = [](ZenCacheStore& Zcs, std::string_view Namespace, std::string_view Bucket, const IoHash& Key) {
+ ZenCacheValue PutValue = {.Value = Buffer};
+ Zcs.Put(Context, Namespace, Bucket, Key, PutValue);
+ return Key;
+ };
+ auto GetValue = [&Context](ZenCacheStore& Zcs, std::string_view Namespace, std::string_view Bucket, const IoHash& Key) {
ZenCacheValue GetValue;
- Zcs.Get(Namespace, Bucket, Key, GetValue);
+ Zcs.Get(Context, Namespace, Bucket, Key, GetValue);
return GetValue;
};
WorkerThreadPool Workers(1);
@@ -1253,6 +1279,8 @@ TEST_CASE("z$.drop.namespace")
{
using namespace testutils;
+ const CacheRequestContext Context;
+
const auto CreateCacheValue = [](size_t Size) -> CbObject {
std::vector<uint8_t> Buf;
Buf.resize(Size);
@@ -1265,22 +1293,22 @@ TEST_CASE("z$.drop.namespace")
ScopedTemporaryDirectory TempDir;
CreateDirectories(TempDir.Path());
- auto PutValue =
- [&CreateCacheValue](ZenCacheStore& Zcs, std::string_view Namespace, std::string_view Bucket, size_t KeyIndex, size_t Size) {
- // Create a cache record
- IoHash Key = CreateKey(KeyIndex);
- CbObject CacheValue = CreateCacheValue(Size);
+ auto PutValue = [&CreateCacheValue,
+ &Context](ZenCacheStore& Zcs, std::string_view Namespace, std::string_view Bucket, size_t KeyIndex, size_t Size) {
+ // Create a cache record
+ IoHash Key = CreateKey(KeyIndex);
+ CbObject CacheValue = CreateCacheValue(Size);
- IoBuffer Buffer = CacheValue.GetBuffer().AsIoBuffer();
- Buffer.SetContentType(ZenContentType::kCbObject);
+ IoBuffer Buffer = CacheValue.GetBuffer().AsIoBuffer();
+ Buffer.SetContentType(ZenContentType::kCbObject);
- ZenCacheValue PutValue = {.Value = Buffer};
- Zcs.Put(Namespace, Bucket, Key, PutValue);
- return Key;
- };
- auto GetValue = [](ZenCacheStore& Zcs, std::string_view Namespace, std::string_view Bucket, const IoHash& Key) {
+ ZenCacheValue PutValue = {.Value = Buffer};
+ Zcs.Put(Context, Namespace, Bucket, Key, PutValue);
+ return Key;
+ };
+ auto GetValue = [&Context](ZenCacheStore& Zcs, std::string_view Namespace, std::string_view Bucket, const IoHash& Key) {
ZenCacheValue GetValue;
- Zcs.Get(Namespace, Bucket, Key, GetValue);
+ Zcs.Get(Context, Namespace, Bucket, Key, GetValue);
return GetValue;
};
WorkerThreadPool Workers(1);
diff --git a/src/zenserver/cache/structuredcachestore.h b/src/zenserver/cache/structuredcachestore.h
index 01f6d7d82..7ce195ef5 100644
--- a/src/zenserver/cache/structuredcachestore.h
+++ b/src/zenserver/cache/structuredcachestore.h
@@ -8,6 +8,7 @@
#include <zencore/compactbinary.h>
#include <zencore/iohash.h>
#include <zenstore/gc.h>
+#include <zenutil/cache/cache.h>
#include <atomic>
#include <compare>
@@ -130,8 +131,16 @@ public:
ZenCacheStore(GcManager& Gc, const Configuration& Configuration);
~ZenCacheStore();
- bool Get(std::string_view Namespace, std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue);
- void Put(std::string_view Namespace, std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value);
+ bool Get(const CacheRequestContext& Context,
+ std::string_view Namespace,
+ std::string_view Bucket,
+ const IoHash& HashKey,
+ ZenCacheValue& OutValue);
+ void Put(const CacheRequestContext& Context,
+ std::string_view Namespace,
+ std::string_view Bucket,
+ const IoHash& HashKey,
+ const ZenCacheValue& Value);
bool DropBucket(std::string_view Namespace, std::string_view Bucket);
bool DropNamespace(std::string_view Namespace);
void Flush();
@@ -154,6 +163,8 @@ private:
typedef std::unordered_map<std::string, std::unique_ptr<ZenCacheNamespace>> NamespaceMap;
+ spdlog::logger& m_Log;
+ spdlog::logger& Log() { return m_Log; }
mutable RwLock m_NamespacesLock;
NamespaceMap m_Namespaces;
std::vector<std::unique_ptr<ZenCacheNamespace>> m_DroppedNamespaces;
diff --git a/src/zenserver/upstream/upstreamcache.cpp b/src/zenserver/upstream/upstreamcache.cpp
index 4b6338f3a..9f6a0abc4 100644
--- a/src/zenserver/upstream/upstreamcache.cpp
+++ b/src/zenserver/upstream/upstreamcache.cpp
@@ -1901,7 +1901,7 @@ private:
ZenCacheValue CacheValue;
std::vector<IoBuffer> Payloads;
- if (!m_CacheStore.Get(CacheRecord.Namespace, CacheRecord.Key.Bucket, CacheRecord.Key.Hash, CacheValue))
+ if (!m_CacheStore.Get(CacheRecord.Context, CacheRecord.Namespace, CacheRecord.Key.Bucket, CacheRecord.Key.Hash, CacheValue))
{
ZEN_WARN("process upstream FAILED, '{}/{}/{}', cache record doesn't exist",
CacheRecord.Namespace,
diff --git a/src/zenserver/upstream/upstreamcache.h b/src/zenserver/upstream/upstreamcache.h
index 695c06b32..8f1395509 100644
--- a/src/zenserver/upstream/upstreamcache.h
+++ b/src/zenserver/upstream/upstreamcache.h
@@ -35,6 +35,7 @@ struct UpstreamCacheRecord
std::string Namespace;
CacheKey Key;
std::vector<IoHash> ValueContentIds;
+ CacheRequestContext Context;
};
struct UpstreamCacheOptions
diff --git a/src/zenutil/include/zenutil/cache/cache.h b/src/zenutil/include/zenutil/cache/cache.h
index 1a1dd9386..5e12c488c 100644
--- a/src/zenutil/include/zenutil/cache/cache.h
+++ b/src/zenutil/include/zenutil/cache/cache.h
@@ -4,3 +4,31 @@
#include <zenutil/cache/cachekey.h>
#include <zenutil/cache/cachepolicy.h>
+
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <fmt/format.h>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+namespace zen {
+
+struct CacheRequestContext
+{
+ Oid SessionId{Oid::Zero};
+ uint32_t RequestId{0};
+};
+
+} // namespace zen
+
+template<>
+struct fmt::formatter<zen::CacheRequestContext> : formatter<string_view>
+{
+ template<typename FormatContext>
+ auto format(const zen::CacheRequestContext& Context, FormatContext& ctx)
+ {
+ zen::ExtendableStringBuilder<64> String;
+ Context.SessionId.ToString(String);
+ String << ".";
+ String << Context.RequestId;
+ return formatter<string_view>::format(String.ToView(), ctx);
+ }
+};