diff options
| author | Stefan Boberg <[email protected]> | 2023-05-23 15:00:55 +0200 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-05-23 15:00:55 +0200 |
| commit | 55844c3f72866711f9a10b987f14da5622cc2d63 (patch) | |
| tree | a872fdb170bf1019cdb4b5cc17c53dcc1585e3cc /src | |
| parent | use exception when allocations fail rather than asserts (#319) (diff) | |
| download | zen-55844c3f72866711f9a10b987f14da5622cc2d63.tar.xz zen-55844c3f72866711f9a10b987f14da5622cc2d63.zip | |
cache log sessionid (#297)
* implemented structured cache logging to be used as audit trail to help analyse potential cache pollution/corruption
* added common header to all known log targets
* made Oid::operator bool explicit to avoid logging/text format mishaps
* HttpClient::operator bool -> explicit
* changed cache logs to not rotate on start in order to retain more history
* added CacheRequestContext
* properly initialize request context
* log session id and request id on zencacehstore get/put
* changelog
Diffstat (limited to 'src')
| -rw-r--r-- | src/zenserver/cache/httpstructuredcache.cpp | 230 | ||||
| -rw-r--r-- | src/zenserver/cache/httpstructuredcache.h | 39 | ||||
| -rw-r--r-- | src/zenserver/cache/structuredcachestore.cpp | 128 | ||||
| -rw-r--r-- | src/zenserver/cache/structuredcachestore.h | 15 | ||||
| -rw-r--r-- | src/zenserver/upstream/upstreamcache.cpp | 2 | ||||
| -rw-r--r-- | src/zenserver/upstream/upstreamcache.h | 1 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/cache/cache.h | 28 |
7 files changed, 274 insertions, 169 deletions
diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp index 75800ad3b..03bbde10e 100644 --- a/src/zenserver/cache/httpstructuredcache.cpp +++ b/src/zenserver/cache/httpstructuredcache.cpp @@ -73,10 +73,11 @@ struct AttachmentCount struct PutRequestData { - std::string Namespace; - CacheKey Key; - CbObjectView RecordObject; - CacheRecordPolicy Policy; + std::string Namespace; + CacheKey Key; + CbObjectView RecordObject; + CacheRecordPolicy Policy; + CacheRequestContext Context; }; namespace { @@ -619,6 +620,8 @@ HttpStructuredCacheService::HandleRequest(HttpServerRequest& Request) } if (Key == HttpZCacheUtilReplayRecording) { + CacheRequestContext RequestContext = {.SessionId = Request.SessionId(), .RequestId = Request.RequestId()}; + m_RequestRecorder.reset(); HttpServerRequest::QueryParams Params = Request.GetQueryParams(); std::string RecordPath = cpr::util::urlDecode(std::string(Params.GetValue("path"))); @@ -631,7 +634,7 @@ HttpStructuredCacheService::HandleRequest(HttpServerRequest& Request) } } std::unique_ptr<cache::IRpcRequestReplayer> Replayer(cache::MakeDiskRequestReplayer(RecordPath, false)); - ReplayRequestRecorder(*Replayer, ThreadCount < 1 ? 1 : ThreadCount); + ReplayRequestRecorder(RequestContext, *Replayer, ThreadCount < 1 ? 1 : ThreadCount); Request.WriteResponse(HttpResponseCode::OK); return; } @@ -884,10 +887,11 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con return Request.WriteResponse(HttpResponseCode::OK); } - Stopwatch Timer; + CacheRequestContext RequestContext = {.SessionId = Request.SessionId(), .RequestId = Request.RequestId()}; + Stopwatch Timer; if (EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal) && - m_CacheStore.Get(Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ClientResultValue)) + m_CacheStore.Get(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ClientResultValue)) { Success = true; ZenContentType ContentType = ClientResultValue.Value.GetContentType(); @@ -985,7 +989,7 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con uint64_t LocalElapsedTimeUs = Timer.GetElapsedTimeUs(); - Request.WriteResponseAsync([this, AcceptType, PolicyFromUrl, Ref, LocalElapsedTimeUs](HttpServerRequest& AsyncRequest) { + Request.WriteResponseAsync([this, AcceptType, PolicyFromUrl, Ref, LocalElapsedTimeUs, RequestContext](HttpServerRequest& AsyncRequest) { Stopwatch Timer; bool Success = false; const bool PartialRecord = EnumHasAllFlags(PolicyFromUrl, CachePolicy::PartialRecord); @@ -1025,7 +1029,7 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con if (Success && StoreLocal) { - m_CacheStore.Put(Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ClientResultValue); + m_CacheStore.Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ClientResultValue); } } else if (AcceptType == ZenContentType::kCbPackage) @@ -1099,7 +1103,7 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con if (StoreLocal) { - m_CacheStore.Put(Ref.Namespace, Ref.BucketSegment, Ref.HashKey, CacheValue); + m_CacheStore.Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, CacheValue); } for (const CbAttachment* Attachment : AttachmentsToStoreLocally) @@ -1205,6 +1209,8 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con Body.SetContentType(ContentType); + CacheRequestContext RequestContext = {.SessionId = Request.SessionId(), .RequestId = Request.RequestId()}; + Stopwatch Timer; if (ContentType == HttpContentType::kBinary || ContentType == HttpContentType::kCompressedBinary) @@ -1224,7 +1230,11 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con { RawHash = IoHash::HashBuffer(SharedBuffer(Body)); } - m_CacheStore.Put(Ref.Namespace, Ref.BucketSegment, Ref.HashKey, {.Value = Body, .RawSize = RawSize, .RawHash = RawHash}); + m_CacheStore.Put(RequestContext, + Ref.Namespace, + Ref.BucketSegment, + Ref.HashKey, + {.Value = Body, .RawSize = RawSize, .RawHash = RawHash}); if (EnumHasAllFlags(PolicyFromUrl, CachePolicy::StoreRemote)) { @@ -1255,7 +1265,7 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con } Body.SetContentType(ZenContentType::kCbObject); - m_CacheStore.Put(Ref.Namespace, Ref.BucketSegment, Ref.HashKey, {.Value = Body}); + m_CacheStore.Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, {.Value = Body}); CbObjectView CacheRecord(Body.Data()); std::vector<IoHash> ValidAttachments; @@ -1354,7 +1364,7 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con ZenCacheValue CacheValue; CacheValue.Value = CacheRecord.GetBuffer().AsIoBuffer(); CacheValue.Value.SetContentType(ZenContentType::kCbObject); - m_CacheStore.Put(Ref.Namespace, Ref.BucketSegment, Ref.HashKey, CacheValue); + m_CacheStore.Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, CacheValue); for (const CbAttachment* Attachment : AttachmentsToStoreLocally) { @@ -1546,12 +1556,13 @@ HttpStructuredCacheService::HandlePutCacheChunk(HttpServerRequest& Request, cons } HttpResponseCode -HttpStructuredCacheService::HandleRpcRequest(const ZenContentType ContentType, - IoBuffer&& Body, - uint32_t& OutAcceptMagic, - RpcAcceptOptions& OutAcceptFlags, - int& OutTargetProcessId, - CbPackage& OutResultPackage) +HttpStructuredCacheService::HandleRpcRequest(const CacheRequestContext& Context, + const ZenContentType ContentType, + IoBuffer&& Body, + uint32_t& OutAcceptMagic, + RpcAcceptOptions& OutAcceptFlags, + int& OutTargetProcessId, + CbPackage& OutResultPackage) { CbPackage Package; CbObjectView Object; @@ -1578,11 +1589,11 @@ HttpStructuredCacheService::HandleRpcRequest(const ZenContentType ContentType, { return HttpResponseCode::InsufficientStorage; } - OutResultPackage = HandleRpcPutCacheRecords(Package); + OutResultPackage = HandleRpcPutCacheRecords(Context, Package); } else if (Method == "GetCacheRecords"sv) { - OutResultPackage = HandleRpcGetCacheRecords(Object); + OutResultPackage = HandleRpcGetCacheRecords(Context, Object); } else if (Method == "PutCacheValues"sv) { @@ -1590,15 +1601,15 @@ HttpStructuredCacheService::HandleRpcRequest(const ZenContentType ContentType, { return HttpResponseCode::InsufficientStorage; } - OutResultPackage = HandleRpcPutCacheValues(Package); + OutResultPackage = HandleRpcPutCacheValues(Context, Package); } else if (Method == "GetCacheValues"sv) { - OutResultPackage = HandleRpcGetCacheValues(Object); + OutResultPackage = HandleRpcGetCacheValues(Context, Object); } else if (Method == "GetCacheChunks"sv) { - OutResultPackage = HandleRpcGetCacheChunks(Object); + OutResultPackage = HandleRpcGetCacheChunks(Context, Object); } else { @@ -1608,7 +1619,9 @@ HttpStructuredCacheService::HandleRpcRequest(const ZenContentType ContentType, } void -HttpStructuredCacheService::ReplayRequestRecorder(cache::IRpcRequestReplayer& Replayer, uint32_t ThreadCount) +HttpStructuredCacheService::ReplayRequestRecorder(const CacheRequestContext& Context, + cache::IRpcRequestReplayer& Replayer, + uint32_t ThreadCount) { WorkerThreadPool WorkerPool(ThreadCount); uint64_t RequestCount = Replayer.GetRequestCount(); @@ -1618,7 +1631,7 @@ HttpStructuredCacheService::ReplayRequestRecorder(cache::IRpcRequestReplayer& Re ZEN_INFO("Replaying {} requests", RequestCount); for (uint64_t RequestIndex = 0; RequestIndex < RequestCount; ++RequestIndex) { - WorkerPool.ScheduleWork([this, &JobLatch, &Replayer, RequestIndex]() { + WorkerPool.ScheduleWork([this, &Context, &JobLatch, &Replayer, RequestIndex]() { IoBuffer Body; std::pair<ZenContentType, ZenContentType> ContentType = Replayer.GetRequest(RequestIndex, Body); if (Body) @@ -1627,7 +1640,8 @@ HttpStructuredCacheService::ReplayRequestRecorder(cache::IRpcRequestReplayer& Re RpcAcceptOptions AcceptFlags = RpcAcceptOptions::kNone; int TargetPid = 0; CbPackage RpcResult; - if (IsHttpSuccessCode(HandleRpcRequest(ContentType.first, std::move(Body), AcceptMagic, AcceptFlags, TargetPid, RpcResult))) + if (IsHttpSuccessCode( + HandleRpcRequest(Context, ContentType.first, std::move(Body), AcceptMagic, AcceptFlags, TargetPid, RpcResult))) { if (AcceptMagic == kCbPkgMagic) { @@ -1671,6 +1685,8 @@ HttpStructuredCacheService::HandleRpcRequest(HttpServerRequest& Request) { case HttpVerb::kPost: { + CacheRequestContext RequestContext = {.SessionId = Request.SessionId(), .RequestId = Request.RequestId()}; + const HttpContentType ContentType = Request.RequestContentType(); const HttpContentType AcceptType = Request.AcceptContentType(); @@ -1680,58 +1696,63 @@ HttpStructuredCacheService::HandleRpcRequest(HttpServerRequest& Request) return Request.WriteResponse(HttpResponseCode::BadRequest); } - Request.WriteResponseAsync( - [this, Body = Request.ReadPayload(), ContentType, AcceptType](HttpServerRequest& AsyncRequest) mutable { - std::uint64_t RequestIndex = - m_RequestRecorder ? m_RequestRecorder->RecordRequest(ContentType, AcceptType, Body) : ~0ull; - uint32_t AcceptMagic = 0; - RpcAcceptOptions AcceptFlags = RpcAcceptOptions::kNone; - int TargetProcessId = 0; - CbPackage RpcResult; - - HttpResponseCode ResultCode = - HandleRpcRequest(ContentType, std::move(Body), AcceptMagic, AcceptFlags, TargetProcessId, RpcResult); - if (!IsHttpSuccessCode(ResultCode)) - { - AsyncRequest.WriteResponse(ResultCode); - return; - } - if (AcceptMagic == kCbPkgMagic) + Request.WriteResponseAsync([this, &RequestContext, Body = Request.ReadPayload(), ContentType, AcceptType]( + HttpServerRequest& AsyncRequest) mutable { + std::uint64_t RequestIndex = + m_RequestRecorder ? m_RequestRecorder->RecordRequest(ContentType, AcceptType, Body) : ~0ull; + uint32_t AcceptMagic = 0; + RpcAcceptOptions AcceptFlags = RpcAcceptOptions::kNone; + int TargetProcessId = 0; + CbPackage RpcResult; + + HttpResponseCode ResultCode = HandleRpcRequest(RequestContext, + ContentType, + std::move(Body), + AcceptMagic, + AcceptFlags, + TargetProcessId, + RpcResult); + if (!IsHttpSuccessCode(ResultCode)) + { + AsyncRequest.WriteResponse(ResultCode); + return; + } + if (AcceptMagic == kCbPkgMagic) + { + FormatFlags Flags = FormatFlags::kDefault; + if (EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences)) { - FormatFlags Flags = FormatFlags::kDefault; - if (EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences)) - { - Flags |= FormatFlags::kAllowLocalReferences; - if (!EnumHasAnyFlags(AcceptFlags, RpcAcceptOptions::kAllowPartialLocalReferences)) - { - Flags |= FormatFlags::kDenyPartialLocalReferences; - } - } - CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResult, Flags, TargetProcessId); - if (RequestIndex != ~0ull) + Flags |= FormatFlags::kAllowLocalReferences; + if (!EnumHasAnyFlags(AcceptFlags, RpcAcceptOptions::kAllowPartialLocalReferences)) { - ZEN_ASSERT(m_RequestRecorder); - m_RequestRecorder->RecordResponse(RequestIndex, HttpContentType::kCbPackage, RpcResponseBuffer); + Flags |= FormatFlags::kDenyPartialLocalReferences; } - AsyncRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer); } - else + CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResult, Flags, TargetProcessId); + if (RequestIndex != ~0ull) { - BinaryWriter MemStream; - RpcResult.Save(MemStream); + ZEN_ASSERT(m_RequestRecorder); + m_RequestRecorder->RecordResponse(RequestIndex, HttpContentType::kCbPackage, RpcResponseBuffer); + } + AsyncRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer); + } + else + { + BinaryWriter MemStream; + RpcResult.Save(MemStream); - if (RequestIndex != ~0ull) - { - ZEN_ASSERT(m_RequestRecorder); - m_RequestRecorder->RecordResponse(RequestIndex, - HttpContentType::kCbPackage, - IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); - } - AsyncRequest.WriteResponse(HttpResponseCode::OK, - HttpContentType::kCbPackage, - IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); + if (RequestIndex != ~0ull) + { + ZEN_ASSERT(m_RequestRecorder); + m_RequestRecorder->RecordResponse(RequestIndex, + HttpContentType::kCbPackage, + IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); } - }); + AsyncRequest.WriteResponse(HttpResponseCode::OK, + HttpContentType::kCbPackage, + IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); + } + }); } break; default: @@ -1741,7 +1762,7 @@ HttpStructuredCacheService::HandleRpcRequest(HttpServerRequest& Request) } CbPackage -HttpStructuredCacheService::HandleRpcPutCacheRecords(const CbPackage& BatchRequest) +HttpStructuredCacheService::HandleRpcPutCacheRecords([[maybe_unused]] const CacheRequestContext& Context, const CbPackage& BatchRequest) { ZEN_TRACE_CPU("Z$::RpcPutCacheRecords"); @@ -1854,7 +1875,7 @@ HttpStructuredCacheService::PutCacheRecord(PutRequestData& Request, const CbPack CacheValue.Value = IoBuffer(Record.GetSize()); Record.CopyTo(MutableMemoryView(CacheValue.Value.MutableData(), CacheValue.Value.GetSize())); CacheValue.Value.SetContentType(ZenContentType::kCbObject); - m_CacheStore.Put(Request.Namespace, Request.Key.Bucket, Request.Key.Hash, CacheValue); + m_CacheStore.Put(Request.Context, Request.Namespace, Request.Key.Bucket, Request.Key.Hash, CacheValue); for (const CbAttachment* Attachment : AttachmentsToStoreLocally) { @@ -1890,7 +1911,7 @@ HttpStructuredCacheService::PutCacheRecord(PutRequestData& Request, const CbPack } CbPackage -HttpStructuredCacheService::HandleRpcGetCacheRecords(CbObjectView RpcRequest) +HttpStructuredCacheService::HandleRpcGetCacheRecords(const CacheRequestContext& Context, CbObjectView RpcRequest) { ZEN_TRACE_CPU("Z$::RpcGetCacheRecords"); @@ -1970,7 +1991,7 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(CbObjectView RpcRequest) ZenCacheValue RecordCacheValue; if (EnumHasAllFlags(Policy.GetRecordPolicy(), CachePolicy::QueryLocal) && - m_CacheStore.Get(*Namespace, Key.Bucket, Key.Hash, RecordCacheValue)) + m_CacheStore.Get(Context, *Namespace, Key.Bucket, Key.Hash, RecordCacheValue)) { Request.RecordCacheValue = std::move(RecordCacheValue.Value); if (Request.RecordCacheValue.GetContentType() != ZenContentType::kCbObject) @@ -2085,7 +2106,7 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(CbObjectView RpcRequest) } } - const auto OnCacheRecordGetComplete = [this, Namespace, &ParseValues](CacheRecordGetCompleteParams&& Params) { + const auto OnCacheRecordGetComplete = [this, Namespace, &ParseValues, Context](CacheRecordGetCompleteParams&& Params) { if (!Params.Record) { return; @@ -2107,7 +2128,7 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(CbObjectView RpcRequest) EnumHasAllFlags(Request.DownstreamPolicy.GetRecordPolicy(), CachePolicy::StoreLocal) && AreDiskWritesAllowed(); if (StoreLocal) { - m_CacheStore.Put(*Namespace, Key.Bucket, Key.Hash, {.Value = {Request.RecordCacheValue}}); + m_CacheStore.Put(Context, *Namespace, Key.Bucket, Key.Hash, {.Value = {Request.RecordCacheValue}}); } ParseValues(Request); Request.Source = Params.Source; @@ -2231,7 +2252,7 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(CbObjectView RpcRequest) } CbPackage -HttpStructuredCacheService::HandleRpcPutCacheValues(const CbPackage& BatchRequest) +HttpStructuredCacheService::HandleRpcPutCacheValues(const CacheRequestContext& Context, const CbPackage& BatchRequest) { CbObjectView BatchObject = BatchRequest.GetObject(); CbObjectView Params = BatchObject["Params"sv].AsObjectView(); @@ -2284,7 +2305,7 @@ HttpStructuredCacheService::HandleRpcPutCacheValues(const CbPackage& BatchReques { RawSize = Chunk.DecodeRawSize(); } - m_CacheStore.Put(*Namespace, Key.Bucket, Key.Hash, {.Value = Value, .RawSize = RawSize, .RawHash = RawHash}); + m_CacheStore.Put(Context, *Namespace, Key.Bucket, Key.Hash, {.Value = Value, .RawSize = RawSize, .RawHash = RawHash}); TransferredSize = Chunk.GetCompressedSize(); } Succeeded = true; @@ -2298,7 +2319,7 @@ HttpStructuredCacheService::HandleRpcPutCacheValues(const CbPackage& BatchReques else if (EnumHasAllFlags(Policy, CachePolicy::QueryLocal)) { ZenCacheValue ExistingValue; - if (m_CacheStore.Get(*Namespace, Key.Bucket, Key.Hash, ExistingValue) && + if (m_CacheStore.Get(Context, *Namespace, Key.Bucket, Key.Hash, ExistingValue) && IsCompressedBinary(ExistingValue.Value.GetContentType())) { Succeeded = true; @@ -2340,7 +2361,7 @@ HttpStructuredCacheService::HandleRpcPutCacheValues(const CbPackage& BatchReques } CbPackage -HttpStructuredCacheService::HandleRpcGetCacheValues(CbObjectView RpcRequest) +HttpStructuredCacheService::HandleRpcGetCacheValues(const CacheRequestContext& Context, CbObjectView RpcRequest) { ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheValues"sv); @@ -2387,7 +2408,8 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(CbObjectView RpcRequest) ZenCacheValue CacheValue; if (EnumHasAllFlags(Policy, CachePolicy::QueryLocal)) { - if (m_CacheStore.Get(*Namespace, Key.Bucket, Key.Hash, CacheValue) && IsCompressedBinary(CacheValue.Value.GetContentType())) + if (m_CacheStore.Get(Context, *Namespace, Key.Bucket, Key.Hash, CacheValue) && + IsCompressedBinary(CacheValue.Value.GetContentType())) { Request.RawHash = CacheValue.RawHash; Request.RawSize = CacheValue.RawSize; @@ -2442,7 +2464,8 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(CbObjectView RpcRequest) m_UpstreamCache.GetCacheValues( *Namespace, CacheValueRequests, - [this, Namespace, &RequestedRecordsData, &Requests, &RemoteRequestIndexes, &Timer](CacheValueGetCompleteParams&& Params) { + [this, Namespace, &RequestedRecordsData, &Requests, &RemoteRequestIndexes, &Timer, Context]( + CacheValueGetCompleteParams&& Params) { CacheValueRequest& ChunkRequest = Params.Request; if (Params.RawHash != IoHash::Zero) { @@ -2464,7 +2487,8 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(CbObjectView RpcRequest) if (HasData && StoreData) { - m_CacheStore.Put(*Namespace, + m_CacheStore.Put(Context, + *Namespace, Request.Key.Bucket, Request.Key.Hash, ZenCacheValue{.Value = Params.Value, .RawSize = Request.RawSize, .RawHash = Request.RawHash}); @@ -2568,7 +2592,7 @@ namespace cache::detail { } // namespace cache::detail CbPackage -HttpStructuredCacheService::HandleRpcGetCacheChunks(CbObjectView RpcRequest) +HttpStructuredCacheService::HandleRpcGetCacheChunks(const CacheRequestContext& Context, CbObjectView RpcRequest) { using namespace cache::detail; @@ -2589,16 +2613,16 @@ HttpStructuredCacheService::HandleRpcGetCacheChunks(CbObjectView RpcRequest) // For each Record request, load the Record if necessary to find the Chunk's ContentId, load its Payloads if we // have it locally, and otherwise append a request for the payload to UpstreamChunks - GetLocalCacheRecords(Namespace, RecordKeys, Records, RecordRequests, UpstreamChunks); + GetLocalCacheRecords(Context, Namespace, RecordKeys, Records, RecordRequests, UpstreamChunks); // For each Value request, load the Value if we have it locally and otherwise append a request for the payload to UpstreamChunks - GetLocalCacheValues(Namespace, ValueRequests, UpstreamChunks); + GetLocalCacheValues(Context, Namespace, ValueRequests, UpstreamChunks); // Call GetCacheChunks on the upstream for any payloads we do not have locally - GetUpstreamCacheChunks(Namespace, UpstreamChunks, RequestKeys, Requests); + GetUpstreamCacheChunks(Context, Namespace, UpstreamChunks, RequestKeys, Requests); // Send the payload and descriptive data about each chunk to the client - return WriteGetCacheChunksResponse(Namespace, Requests); + return WriteGetCacheChunksResponse(Context, Namespace, Requests); } bool @@ -2715,7 +2739,8 @@ HttpStructuredCacheService::ParseGetCacheChunksRequest(std::string& Nam } void -HttpStructuredCacheService::GetLocalCacheRecords(std::string_view Namespace, +HttpStructuredCacheService::GetLocalCacheRecords(const CacheRequestContext& Context, + std::string_view Namespace, std::vector<CacheKeyRequest>& RecordKeys, std::vector<cache::detail::RecordBody>& Records, std::vector<cache::detail::ChunkRequest*>& RecordRequests, @@ -2736,7 +2761,7 @@ HttpStructuredCacheService::GetLocalCacheRecords(std::string_view Namespac if (!Record.Exists && EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::QueryLocal)) { ZenCacheValue CacheValue; - if (m_CacheStore.Get(Namespace, RecordKey.Key.Bucket, RecordKey.Key.Hash, CacheValue)) + if (m_CacheStore.Get(Context, Namespace, RecordKey.Key.Bucket, RecordKey.Key.Hash, CacheValue)) { Record.Exists = true; Record.CacheValue = std::move(CacheValue.Value); @@ -2754,7 +2779,7 @@ HttpStructuredCacheService::GetLocalCacheRecords(std::string_view Namespac if (!UpstreamRecordRequests.empty()) { const auto OnCacheRecordGetComplete = - [this, Namespace, &RecordKeys, &Records, &RecordRequests](CacheRecordGetCompleteParams&& Params) { + [this, Namespace, &RecordKeys, &Records, &RecordRequests, Context](CacheRecordGetCompleteParams&& Params) { if (!Params.Record) { return; @@ -2774,7 +2799,7 @@ HttpStructuredCacheService::GetLocalCacheRecords(std::string_view Namespac bool StoreLocal = EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::StoreLocal) && AreDiskWritesAllowed(); if (StoreLocal) { - m_CacheStore.Put(Namespace, Key.Bucket, Key.Hash, {.Value = Record.CacheValue}); + m_CacheStore.Put(Context, Namespace, Key.Bucket, Key.Hash, {.Value = Record.CacheValue}); } }; m_UpstreamCache.GetCacheRecords(Namespace, UpstreamRecordRequests, std::move(OnCacheRecordGetComplete)); @@ -2868,7 +2893,8 @@ HttpStructuredCacheService::GetLocalCacheRecords(std::string_view Namespac } void -HttpStructuredCacheService::GetLocalCacheValues(std::string_view Namespace, +HttpStructuredCacheService::GetLocalCacheValues(const CacheRequestContext& Context, + std::string_view Namespace, std::vector<cache::detail::ChunkRequest*>& ValueRequests, std::vector<CacheChunkRequest*>& OutUpstreamChunks) { @@ -2880,7 +2906,7 @@ HttpStructuredCacheService::GetLocalCacheValues(std::string_view Namespa if (!Request->Exists && EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryLocal)) { ZenCacheValue CacheValue; - if (m_CacheStore.Get(Namespace, Request->Key->Key.Bucket, Request->Key->Key.Hash, CacheValue)) + if (m_CacheStore.Get(Context, Namespace, Request->Key->Key.Bucket, Request->Key->Key.Hash, CacheValue)) { if (IsCompressedBinary(CacheValue.Value.GetContentType())) { @@ -2910,7 +2936,8 @@ HttpStructuredCacheService::GetLocalCacheValues(std::string_view Namespa } void -HttpStructuredCacheService::GetUpstreamCacheChunks(std::string_view Namespace, +HttpStructuredCacheService::GetUpstreamCacheChunks(const CacheRequestContext& Context, + std::string_view Namespace, std::vector<CacheChunkRequest*>& UpstreamChunks, std::vector<CacheChunkRequest>& RequestKeys, std::vector<cache::detail::ChunkRequest>& Requests) @@ -2919,7 +2946,7 @@ HttpStructuredCacheService::GetUpstreamCacheChunks(std::string_view Names if (!UpstreamChunks.empty()) { - const auto OnCacheChunksGetComplete = [this, Namespace, &RequestKeys, &Requests](CacheChunkGetCompleteParams&& Params) { + const auto OnCacheChunksGetComplete = [this, Namespace, &RequestKeys, &Requests, Context](CacheChunkGetCompleteParams&& Params) { if (Params.RawHash == Params.RawHash.Zero) { return; @@ -2947,7 +2974,8 @@ HttpStructuredCacheService::GetUpstreamCacheChunks(std::string_view Names } else { - m_CacheStore.Put(Namespace, + m_CacheStore.Put(Context, + Namespace, Key.Key.Bucket, Key.Key.Hash, {.Value = Params.Value, .RawSize = Params.RawSize, .RawHash = Params.RawHash}); @@ -2972,7 +3000,9 @@ HttpStructuredCacheService::GetUpstreamCacheChunks(std::string_view Names } CbPackage -HttpStructuredCacheService::WriteGetCacheChunksResponse(std::string_view Namespace, std::vector<cache::detail::ChunkRequest>& Requests) +HttpStructuredCacheService::WriteGetCacheChunksResponse([[maybe_unused]] const CacheRequestContext& Context, + std::string_view Namespace, + std::vector<cache::detail::ChunkRequest>& Requests) { using namespace cache::detail; diff --git a/src/zenserver/cache/httpstructuredcache.h b/src/zenserver/cache/httpstructuredcache.h index 4f03daae4..60714d6ae 100644 --- a/src/zenserver/cache/httpstructuredcache.h +++ b/src/zenserver/cache/httpstructuredcache.h @@ -6,6 +6,7 @@ #include <zenhttp/httpserver.h> #include <zenhttp/httpstats.h> #include <zenhttp/httpstatus.h> +#include <zenutil/cache/cache.h> #include <memory> #include <vector> @@ -113,17 +114,18 @@ private: void HandleRpcRequest(HttpServerRequest& Request); void HandleDetailsRequest(HttpServerRequest& Request); - CbPackage HandleRpcPutCacheRecords(const CbPackage& BatchRequest); - CbPackage HandleRpcGetCacheRecords(CbObjectView BatchRequest); - CbPackage HandleRpcPutCacheValues(const CbPackage& BatchRequest); - CbPackage HandleRpcGetCacheValues(CbObjectView BatchRequest); - CbPackage HandleRpcGetCacheChunks(CbObjectView BatchRequest); - HttpResponseCode HandleRpcRequest(const ZenContentType ContentType, - IoBuffer&& Body, - uint32_t& OutAcceptMagic, - RpcAcceptOptions& OutAcceptFlags, - int& OutTargetProcessId, - CbPackage& OutPackage); + CbPackage HandleRpcPutCacheRecords(const CacheRequestContext& Context, const CbPackage& BatchRequest); + CbPackage HandleRpcGetCacheRecords(const CacheRequestContext& Context, CbObjectView BatchRequest); + CbPackage HandleRpcPutCacheValues(const CacheRequestContext& Context, const CbPackage& BatchRequest); + CbPackage HandleRpcGetCacheValues(const CacheRequestContext& Context, CbObjectView BatchRequest); + CbPackage HandleRpcGetCacheChunks(const CacheRequestContext& Context, CbObjectView BatchRequest); + HttpResponseCode HandleRpcRequest(const CacheRequestContext& Context, + const ZenContentType ContentType, + IoBuffer&& Body, + uint32_t& OutAcceptMagic, + RpcAcceptOptions& OutAcceptFlags, + int& OutTargetProcessId, + CbPackage& OutPackage); void HandleCacheRequest(HttpServerRequest& Request); void HandleCacheNamespaceRequest(HttpServerRequest& Request, std::string_view Namespace); @@ -142,22 +144,27 @@ private: std::vector<cache::detail::ChunkRequest*>& ValueRequests, CbObjectView RpcRequest); /** HandleRpcGetCacheChunks Helper: Load records to get ContentId for RecordRequests, and load their payloads if they exist locally. */ - void GetLocalCacheRecords(std::string_view Namespace, + void GetLocalCacheRecords(const CacheRequestContext& Context, + std::string_view Namespace, std::vector<CacheKeyRequest>& RecordKeys, std::vector<cache::detail::RecordBody>& Records, std::vector<cache::detail::ChunkRequest*>& RecordRequests, std::vector<CacheChunkRequest*>& OutUpstreamChunks); /** HandleRpcGetCacheChunks Helper: For ValueRequests, load their payloads if they exist locally. */ - void GetLocalCacheValues(std::string_view Namespace, + void GetLocalCacheValues(const CacheRequestContext& Context, + std::string_view Namespace, std::vector<cache::detail::ChunkRequest*>& ValueRequests, std::vector<CacheChunkRequest*>& OutUpstreamChunks); /** HandleRpcGetCacheChunks Helper: Load payloads from upstream that did not exist locally. */ - void GetUpstreamCacheChunks(std::string_view Namespace, + void GetUpstreamCacheChunks(const CacheRequestContext& Context, + std::string_view Namespace, std::vector<CacheChunkRequest*>& UpstreamChunks, std::vector<CacheChunkRequest>& RequestKeys, std::vector<cache::detail::ChunkRequest>& Requests); /** HandleRpcGetCacheChunks Helper: Send response message containing all chunk results. */ - CbPackage WriteGetCacheChunksResponse(std::string_view Namespace, std::vector<cache::detail::ChunkRequest>& Requests); + CbPackage WriteGetCacheChunksResponse(const CacheRequestContext& Context, + std::string_view Namespace, + std::vector<cache::detail::ChunkRequest>& Requests); bool AreDiskWritesAllowed() const; @@ -174,7 +181,7 @@ private: CacheStats m_CacheStats; const DiskWriteBlocker* m_DiskWriteBlocker = nullptr; - void ReplayRequestRecorder(cache::IRpcRequestReplayer& Replayer, uint32_t ThreadCount); + void ReplayRequestRecorder(const CacheRequestContext& Context, cache::IRpcRequestReplayer& Replayer, uint32_t ThreadCount); std::unique_ptr<cache::IRpcRequestRecorder> m_RequestRecorder; }; diff --git a/src/zenserver/cache/structuredcachestore.cpp b/src/zenserver/cache/structuredcachestore.cpp index bc4248a8a..1847c724d 100644 --- a/src/zenserver/cache/structuredcachestore.cpp +++ b/src/zenserver/cache/structuredcachestore.cpp @@ -16,6 +16,7 @@ #include <zencore/timer.h> #include <zencore/trace.h> #include <zenstore/scrubcontext.h> +#include <zenutil/cache/cache.h> #include <future> #include <limits> @@ -218,7 +219,10 @@ ZEN_DEFINE_LOG_CATEGORY_STATIC(LogCacheActivity, "z$"); static constinit std::string_view UE4DDCNamespaceName = "ue4.ddc"; -ZenCacheStore::ZenCacheStore(GcManager& Gc, const Configuration& Configuration) : m_Gc(Gc), m_Configuration(Configuration) +ZenCacheStore::ZenCacheStore(GcManager& Gc, const Configuration& Configuration) +: m_Log(logging::Get("z$")) +, m_Gc(Gc) +, m_Configuration(Configuration) { CreateDirectories(m_Configuration.BasePath); @@ -263,7 +267,11 @@ ZenCacheStore::~ZenCacheStore() } bool -ZenCacheStore::Get(std::string_view Namespace, std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue) +ZenCacheStore::Get(const CacheRequestContext& Context, + std::string_view Namespace, + std::string_view Bucket, + const IoHash& HashKey, + ZenCacheValue& OutValue) { if (ZenCacheNamespace* Store = GetNamespace(Namespace); Store) { @@ -279,7 +287,8 @@ ZenCacheStore::Get(std::string_view Namespace, std::string_view Bucket, const Io const size_t ObjectSize = OutValue.Value.GetSize(); ZEN_LOG_INFO(LogCacheActivity, - "GET HIT {}/{}/{} -> {} {} {}", + "GET HIT [{}] {}/{}/{} -> {} {} {}", + Context, Namespace, Bucket, HashKey, @@ -290,7 +299,8 @@ ZenCacheStore::Get(std::string_view Namespace, std::string_view Bucket, const Io else { ZEN_LOG_INFO(LogCacheActivity, - "GET HIT {}/{}/{} -> {} {} {}", + "GET HIT [{}] {}/{}/{} -> {} {} {}", + Context, Namespace, Bucket, HashKey, @@ -301,19 +311,27 @@ ZenCacheStore::Get(std::string_view Namespace, std::string_view Bucket, const Io } else { - ZEN_LOG_INFO(LogCacheActivity, "GET MISS {}/{}/{}", Namespace, Bucket, HashKey); + ZEN_LOG_INFO(LogCacheActivity, "GET MISS [{}] {}/{}/{}", Context, Namespace, Bucket, HashKey); } } return Result; } - ZEN_WARN("request for unknown namespace '{}' in ZenCacheStore::Get, bucket '{}', key '{}'", Namespace, Bucket, HashKey.ToHexString()); + ZEN_WARN("request for unknown namespace '{}' in ZenCacheStore::Get [{}], bucket '{}', key '{}'", + Context, + Namespace, + Bucket, + HashKey.ToHexString()); return false; } void -ZenCacheStore::Put(std::string_view Namespace, std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value) +ZenCacheStore::Put(const CacheRequestContext& Context, + std::string_view Namespace, + std::string_view Bucket, + const IoHash& HashKey, + const ZenCacheValue& Value) { if (m_Configuration.EnableWriteLog) { @@ -323,7 +341,8 @@ ZenCacheStore::Put(std::string_view Namespace, std::string_view Bucket, const Io const size_t ObjectSize = Value.Value.GetSize(); ZEN_LOG_INFO(LogCacheActivity, - "PUT {}/{}/{} -> {} {} {}", + "PUT [{}] {}/{}/{} -> {} {} {}", + Context, Namespace, Bucket, HashKey, @@ -334,7 +353,8 @@ ZenCacheStore::Put(std::string_view Namespace, std::string_view Bucket, const Io else { ZEN_LOG_INFO(LogCacheActivity, - "PUT {}/{}/{} -> {} {} {}", + "PUT [{}] {}/{}/{} -> {} {} {}", + Context, Namespace, Bucket, HashKey, @@ -348,7 +368,11 @@ ZenCacheStore::Put(std::string_view Namespace, std::string_view Bucket, const Io { return Store->Put(Bucket, HashKey, Value); } - ZEN_WARN("request for unknown namespace '{}' in ZenCacheStore::Put, bucket '{}', key '{}'", Namespace, Bucket, HashKey.ToHexString()); + ZEN_WARN("request for unknown namespace '{}' in ZenCacheStore::Put [{}] bucket '{}', key '{}'", + Context, + Namespace, + Bucket, + HashKey.ToHexString()); } bool @@ -1128,8 +1152,9 @@ TEST_CASE("z$.namespaces") ScopedTemporaryDirectory TempDir; CreateDirectories(TempDir.Path()); - IoHash Key1; - IoHash Key2; + const CacheRequestContext Context; + IoHash Key1; + IoHash Key2; { GcManager Gc; ZenCacheStore Zcs(Gc, {.BasePath = TempDir.Path() / "cache", .AllowAutomaticCreationOfNamespaces = false}); @@ -1144,15 +1169,15 @@ TEST_CASE("z$.namespaces") Buffer.SetContentType(ZenContentType::kCbObject); ZenCacheValue PutValue = {.Value = Buffer}; - Zcs.Put(ZenCacheStore::DefaultNamespace, Bucket, Key1, PutValue); + Zcs.Put(Context, ZenCacheStore::DefaultNamespace, Bucket, Key1, PutValue); ZenCacheValue GetValue; - CHECK(Zcs.Get(ZenCacheStore::DefaultNamespace, Bucket, Key1, GetValue)); - CHECK(!Zcs.Get(CustomNamespace, Bucket, Key1, GetValue)); + CHECK(Zcs.Get(Context, ZenCacheStore::DefaultNamespace, Bucket, Key1, GetValue)); + CHECK(!Zcs.Get(Context, CustomNamespace, Bucket, Key1, GetValue)); // This should just be dropped as we don't allow creating of namespaces on the fly - Zcs.Put(CustomNamespace, Bucket, Key1, PutValue); - CHECK(!Zcs.Get(CustomNamespace, Bucket, Key1, GetValue)); + Zcs.Put(Context, CustomNamespace, Bucket, Key1, PutValue); + CHECK(!Zcs.Get(Context, CustomNamespace, Bucket, Key1, GetValue)); } { @@ -1167,13 +1192,13 @@ TEST_CASE("z$.namespaces") IoBuffer Buffer2 = CacheValue2.GetBuffer().AsIoBuffer(); Buffer2.SetContentType(ZenContentType::kCbObject); ZenCacheValue PutValue2 = {.Value = Buffer2}; - Zcs.Put(CustomNamespace, Bucket, Key2, PutValue2); + Zcs.Put(Context, CustomNamespace, Bucket, Key2, PutValue2); ZenCacheValue GetValue; - CHECK(!Zcs.Get(ZenCacheStore::DefaultNamespace, Bucket, Key2, GetValue)); - CHECK(Zcs.Get(ZenCacheStore::DefaultNamespace, Bucket, Key1, GetValue)); - CHECK(!Zcs.Get(CustomNamespace, Bucket, Key1, GetValue)); - CHECK(Zcs.Get(CustomNamespace, Bucket, Key2, GetValue)); + CHECK(!Zcs.Get(Context, ZenCacheStore::DefaultNamespace, Bucket, Key2, GetValue)); + CHECK(Zcs.Get(Context, ZenCacheStore::DefaultNamespace, Bucket, Key1, GetValue)); + CHECK(!Zcs.Get(Context, CustomNamespace, Bucket, Key1, GetValue)); + CHECK(Zcs.Get(Context, CustomNamespace, Bucket, Key2, GetValue)); } } @@ -1193,25 +1218,26 @@ TEST_CASE("z$.drop.bucket") ScopedTemporaryDirectory TempDir; CreateDirectories(TempDir.Path()); - IoHash Key1; - IoHash Key2; + const CacheRequestContext Context; + IoHash Key1; + IoHash Key2; - auto PutValue = - [&CreateCacheValue](ZenCacheStore& Zcs, std::string_view Namespace, std::string_view Bucket, size_t KeyIndex, size_t Size) { - // Create a cache record - IoHash Key = CreateKey(KeyIndex); - CbObject CacheValue = CreateCacheValue(Size); + auto PutValue = [&CreateCacheValue, + &Context](ZenCacheStore& Zcs, std::string_view Namespace, std::string_view Bucket, size_t KeyIndex, size_t Size) { + // Create a cache record + IoHash Key = CreateKey(KeyIndex); + CbObject CacheValue = CreateCacheValue(Size); - IoBuffer Buffer = CacheValue.GetBuffer().AsIoBuffer(); - Buffer.SetContentType(ZenContentType::kCbObject); + IoBuffer Buffer = CacheValue.GetBuffer().AsIoBuffer(); + Buffer.SetContentType(ZenContentType::kCbObject); - ZenCacheValue PutValue = {.Value = Buffer}; - Zcs.Put(Namespace, Bucket, Key, PutValue); - return Key; - }; - auto GetValue = [](ZenCacheStore& Zcs, std::string_view Namespace, std::string_view Bucket, const IoHash& Key) { + ZenCacheValue PutValue = {.Value = Buffer}; + Zcs.Put(Context, Namespace, Bucket, Key, PutValue); + return Key; + }; + auto GetValue = [&Context](ZenCacheStore& Zcs, std::string_view Namespace, std::string_view Bucket, const IoHash& Key) { ZenCacheValue GetValue; - Zcs.Get(Namespace, Bucket, Key, GetValue); + Zcs.Get(Context, Namespace, Bucket, Key, GetValue); return GetValue; }; WorkerThreadPool Workers(1); @@ -1253,6 +1279,8 @@ TEST_CASE("z$.drop.namespace") { using namespace testutils; + const CacheRequestContext Context; + const auto CreateCacheValue = [](size_t Size) -> CbObject { std::vector<uint8_t> Buf; Buf.resize(Size); @@ -1265,22 +1293,22 @@ TEST_CASE("z$.drop.namespace") ScopedTemporaryDirectory TempDir; CreateDirectories(TempDir.Path()); - auto PutValue = - [&CreateCacheValue](ZenCacheStore& Zcs, std::string_view Namespace, std::string_view Bucket, size_t KeyIndex, size_t Size) { - // Create a cache record - IoHash Key = CreateKey(KeyIndex); - CbObject CacheValue = CreateCacheValue(Size); + auto PutValue = [&CreateCacheValue, + &Context](ZenCacheStore& Zcs, std::string_view Namespace, std::string_view Bucket, size_t KeyIndex, size_t Size) { + // Create a cache record + IoHash Key = CreateKey(KeyIndex); + CbObject CacheValue = CreateCacheValue(Size); - IoBuffer Buffer = CacheValue.GetBuffer().AsIoBuffer(); - Buffer.SetContentType(ZenContentType::kCbObject); + IoBuffer Buffer = CacheValue.GetBuffer().AsIoBuffer(); + Buffer.SetContentType(ZenContentType::kCbObject); - ZenCacheValue PutValue = {.Value = Buffer}; - Zcs.Put(Namespace, Bucket, Key, PutValue); - return Key; - }; - auto GetValue = [](ZenCacheStore& Zcs, std::string_view Namespace, std::string_view Bucket, const IoHash& Key) { + ZenCacheValue PutValue = {.Value = Buffer}; + Zcs.Put(Context, Namespace, Bucket, Key, PutValue); + return Key; + }; + auto GetValue = [&Context](ZenCacheStore& Zcs, std::string_view Namespace, std::string_view Bucket, const IoHash& Key) { ZenCacheValue GetValue; - Zcs.Get(Namespace, Bucket, Key, GetValue); + Zcs.Get(Context, Namespace, Bucket, Key, GetValue); return GetValue; }; WorkerThreadPool Workers(1); diff --git a/src/zenserver/cache/structuredcachestore.h b/src/zenserver/cache/structuredcachestore.h index 01f6d7d82..7ce195ef5 100644 --- a/src/zenserver/cache/structuredcachestore.h +++ b/src/zenserver/cache/structuredcachestore.h @@ -8,6 +8,7 @@ #include <zencore/compactbinary.h> #include <zencore/iohash.h> #include <zenstore/gc.h> +#include <zenutil/cache/cache.h> #include <atomic> #include <compare> @@ -130,8 +131,16 @@ public: ZenCacheStore(GcManager& Gc, const Configuration& Configuration); ~ZenCacheStore(); - bool Get(std::string_view Namespace, std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue); - void Put(std::string_view Namespace, std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value); + bool Get(const CacheRequestContext& Context, + std::string_view Namespace, + std::string_view Bucket, + const IoHash& HashKey, + ZenCacheValue& OutValue); + void Put(const CacheRequestContext& Context, + std::string_view Namespace, + std::string_view Bucket, + const IoHash& HashKey, + const ZenCacheValue& Value); bool DropBucket(std::string_view Namespace, std::string_view Bucket); bool DropNamespace(std::string_view Namespace); void Flush(); @@ -154,6 +163,8 @@ private: typedef std::unordered_map<std::string, std::unique_ptr<ZenCacheNamespace>> NamespaceMap; + spdlog::logger& m_Log; + spdlog::logger& Log() { return m_Log; } mutable RwLock m_NamespacesLock; NamespaceMap m_Namespaces; std::vector<std::unique_ptr<ZenCacheNamespace>> m_DroppedNamespaces; diff --git a/src/zenserver/upstream/upstreamcache.cpp b/src/zenserver/upstream/upstreamcache.cpp index 4b6338f3a..9f6a0abc4 100644 --- a/src/zenserver/upstream/upstreamcache.cpp +++ b/src/zenserver/upstream/upstreamcache.cpp @@ -1901,7 +1901,7 @@ private: ZenCacheValue CacheValue; std::vector<IoBuffer> Payloads; - if (!m_CacheStore.Get(CacheRecord.Namespace, CacheRecord.Key.Bucket, CacheRecord.Key.Hash, CacheValue)) + if (!m_CacheStore.Get(CacheRecord.Context, CacheRecord.Namespace, CacheRecord.Key.Bucket, CacheRecord.Key.Hash, CacheValue)) { ZEN_WARN("process upstream FAILED, '{}/{}/{}', cache record doesn't exist", CacheRecord.Namespace, diff --git a/src/zenserver/upstream/upstreamcache.h b/src/zenserver/upstream/upstreamcache.h index 695c06b32..8f1395509 100644 --- a/src/zenserver/upstream/upstreamcache.h +++ b/src/zenserver/upstream/upstreamcache.h @@ -35,6 +35,7 @@ struct UpstreamCacheRecord std::string Namespace; CacheKey Key; std::vector<IoHash> ValueContentIds; + CacheRequestContext Context; }; struct UpstreamCacheOptions diff --git a/src/zenutil/include/zenutil/cache/cache.h b/src/zenutil/include/zenutil/cache/cache.h index 1a1dd9386..5e12c488c 100644 --- a/src/zenutil/include/zenutil/cache/cache.h +++ b/src/zenutil/include/zenutil/cache/cache.h @@ -4,3 +4,31 @@ #include <zenutil/cache/cachekey.h> #include <zenutil/cache/cachepolicy.h> + +ZEN_THIRD_PARTY_INCLUDES_START +#include <fmt/format.h> +ZEN_THIRD_PARTY_INCLUDES_END + +namespace zen { + +struct CacheRequestContext +{ + Oid SessionId{Oid::Zero}; + uint32_t RequestId{0}; +}; + +} // namespace zen + +template<> +struct fmt::formatter<zen::CacheRequestContext> : formatter<string_view> +{ + template<typename FormatContext> + auto format(const zen::CacheRequestContext& Context, FormatContext& ctx) + { + zen::ExtendableStringBuilder<64> String; + Context.SessionId.ToString(String); + String << "."; + String << Context.RequestId; + return formatter<string_view>::format(String.ToView(), ctx); + } +}; |