// Copyright Epic Games, Inc. All Rights Reserved. #include "httpstructuredcache.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "upstream/upstreamcache.h" #include "upstream/zen.h" #include "zenstore/cidstore.h" #include "zenstore/scrubcontext.h" #include #include #include #include #include #include #include namespace zen { const FLLMTag& GetCacheHttpTag() { static FLLMTag CacheHttpTag("http", FLLMTag("cache")); return CacheHttpTag; } extern const FLLMTag& GetCacheRpcTag(); using namespace std::literals; ////////////////////////////////////////////////////////////////////////// CachePolicy ParseCachePolicy(const HttpServerRequest::QueryParams& QueryParams) { std::string_view PolicyText = QueryParams.GetValue("Policy"sv); return !PolicyText.empty() ? zen::ParseCachePolicy(PolicyText) : CachePolicy::Default; } namespace { static constinit std::string_view HttpZCacheRPCPrefix = "$rpc"sv; static constinit std::string_view HttpZCacheUtilStartRecording = "exec$/start-recording"sv; static constinit std::string_view HttpZCacheUtilStopRecording = "exec$/stop-recording"sv; static constinit std::string_view HttpZCacheUtilReplayRecording = "exec$/replay-recording"sv; static constinit std::string_view HttpZCacheDetailsPrefix = "details$"sv; struct HttpRequestData { std::optional Namespace; std::optional Bucket; std::optional HashKey; std::optional ValueContentId; }; } // namespace ////////////////////////////////////////////////////////////////////////// HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCacheStore, CidStore& InCidStore, HttpStatsService& StatsService, HttpStatusService& StatusService, UpstreamCache& UpstreamCache, const DiskWriteBlocker* InDiskWriteBlocker, OpenProcessCache& InOpenProcessCache) : m_Log(logging::Get("cache")) , m_CacheStore(InCacheStore) , m_StatsService(StatsService) , m_StatusService(StatusService) , m_CidStore(InCidStore) , m_UpstreamCache(UpstreamCache) , m_DiskWriteBlocker(InDiskWriteBlocker) , m_OpenProcessCache(InOpenProcessCache) , m_RpcHandler(m_Log, m_CacheStats, UpstreamCache, InCacheStore, InCidStore, InDiskWriteBlocker) { m_StatsService.RegisterHandler("z$", *this); m_StatusService.RegisterHandler("z$", *this); } HttpStructuredCacheService::~HttpStructuredCacheService() { ZEN_INFO("closing structured cache"); { RwLock::ExclusiveLockScope _(m_RequestRecordingLock); m_RequestRecordingEnabled.store(false); m_RequestRecorder.reset(); } m_StatsService.UnregisterHandler("z$", *this); m_StatusService.UnregisterHandler("z$", *this); } const char* HttpStructuredCacheService::BaseUri() const { return "/z$/"; } void HttpStructuredCacheService::Flush() { m_CacheStore.Flush(); } void HttpStructuredCacheService::ScrubStorage(ScrubContext& Ctx) { if (m_LastScrubTime == Ctx.ScrubTimestamp()) { return; } ZenCacheStore::Info Info = m_CacheStore.GetInfo(); ZEN_INFO("scrubbing '{}'", Info.BasePath); m_LastScrubTime = Ctx.ScrubTimestamp(); m_CidStore.ScrubStorage(Ctx); m_CacheStore.ScrubStorage(Ctx); } void HttpStructuredCacheService::HandleDetailsRequest(HttpServerRequest& Request) { std::string_view Key = Request.RelativeUri(); std::vector Tokens; uint32_t TokenCount = ForEachStrTok(Key, '/', [&Tokens](std::string_view Token) { Tokens.push_back(std::string(Token)); return true; }); std::string FilterNamespace; std::string FilterBucket; std::string FilterValue; switch (TokenCount) { case 1: break; case 2: { FilterNamespace = Tokens[1]; if (FilterNamespace.empty()) { m_CacheStats.BadRequestCount++; return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL } } break; case 3: { FilterNamespace = Tokens[1]; if (FilterNamespace.empty()) { m_CacheStats.BadRequestCount++; return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL } FilterBucket = Tokens[2]; if (FilterBucket.empty()) { m_CacheStats.BadRequestCount++; return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL } } break; case 4: { FilterNamespace = Tokens[1]; if (FilterNamespace.empty()) { m_CacheStats.BadRequestCount++; return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL } FilterBucket = Tokens[2]; if (FilterBucket.empty()) { m_CacheStats.BadRequestCount++; return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL } FilterValue = Tokens[3]; if (FilterValue.empty()) { m_CacheStats.BadRequestCount++; return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL } } break; default: m_CacheStats.BadRequestCount++; return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL } HttpServerRequest::QueryParams Params = Request.GetQueryParams(); bool CSV = Params.GetValue("csv") == "true"; bool Details = Params.GetValue("details") == "true"; bool AttachmentDetails = Params.GetValue("attachmentdetails") == "true"; std::chrono::seconds NowSeconds = std::chrono::duration_cast(GcClock::Now().time_since_epoch()); CacheValueDetails ValueDetails = m_CacheStore.GetValueDetails(FilterNamespace, FilterBucket, FilterValue); if (CSV) { ExtendableStringBuilder<4096> CSVWriter; if (AttachmentDetails) { CSVWriter << "Namespace, Bucket, Key, Cid, Size"; } else if (Details) { CSVWriter << "Namespace, Bucket, Key, Size, RawSize, RawHash, ContentType, Age, AttachmentsCount, AttachmentsSize"; } else { CSVWriter << "Namespace, Bucket, Key"; } for (const auto& NamespaceIt : ValueDetails.Namespaces) { const std::string& Namespace = NamespaceIt.first; for (const auto& BucketIt : NamespaceIt.second.Buckets) { const std::string& Bucket = BucketIt.first; for (const auto& ValueIt : BucketIt.second.Values) { if (AttachmentDetails) { for (const IoHash& Hash : ValueIt.second.Attachments) { IoBuffer Payload = m_CidStore.FindChunkByCid(Hash); CSVWriter << "\r\n" << Namespace << "," << Bucket << "," << ValueIt.first.ToHexString() << ", " << Hash.ToHexString() << ", " << gsl::narrow(Payload.GetSize()); } } else if (Details) { std::chrono::seconds LastAccessedSeconds = std::chrono::duration_cast( GcClock::TimePointFromTick(ValueIt.second.LastAccess).time_since_epoch()); CSVWriter << "\r\n" << Namespace << "," << Bucket << "," << ValueIt.first.ToHexString() << ", " << ValueIt.second.Size << "," << ValueIt.second.RawSize << "," << ValueIt.second.RawHash.ToHexString() << ", " << ToString(ValueIt.second.ContentType) << ", " << (NowSeconds.count() - LastAccessedSeconds.count()) << ", " << gsl::narrow(ValueIt.second.Attachments.size()); size_t AttachmentsSize = 0; for (const IoHash& Hash : ValueIt.second.Attachments) { IoBuffer Payload = m_CidStore.FindChunkByCid(Hash); AttachmentsSize += Payload.GetSize(); } CSVWriter << ", " << gsl::narrow(AttachmentsSize); } else { CSVWriter << "\r\n" << Namespace << "," << Bucket << "," << ValueIt.first.ToHexString(); } } } } return Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, CSVWriter.ToView()); } else { CbObjectWriter Cbo; Cbo.BeginArray("namespaces"); { for (const auto& NamespaceIt : ValueDetails.Namespaces) { const std::string& Namespace = NamespaceIt.first; Cbo.BeginObject(); { Cbo.AddString("name", Namespace); Cbo.BeginArray("buckets"); { for (const auto& BucketIt : NamespaceIt.second.Buckets) { const std::string& Bucket = BucketIt.first; Cbo.BeginObject(); { Cbo.AddString("name", Bucket); Cbo.BeginArray("values"); { for (const auto& ValueIt : BucketIt.second.Values) { std::chrono::seconds LastAccessedSeconds = std::chrono::duration_cast( GcClock::TimePointFromTick(ValueIt.second.LastAccess).time_since_epoch()); Cbo.BeginObject(); { Cbo.AddHash("key", ValueIt.first); if (Details) { Cbo.AddInteger("size", ValueIt.second.Size); if (ValueIt.second.Size > 0 && ValueIt.second.RawSize != 0 && ValueIt.second.RawSize != ValueIt.second.Size) { Cbo.AddInteger("rawsize", ValueIt.second.RawSize); Cbo.AddHash("rawhash", ValueIt.second.RawHash); } Cbo.AddString("contenttype", ToString(ValueIt.second.ContentType)); Cbo.AddInteger("age", gsl::narrow(NowSeconds.count() - LastAccessedSeconds.count())); if (ValueIt.second.Attachments.size() > 0) { if (AttachmentDetails) { Cbo.BeginArray("attachments"); { for (const IoHash& Hash : ValueIt.second.Attachments) { Cbo.BeginObject(); Cbo.AddHash("cid", Hash); IoBuffer Payload = m_CidStore.FindChunkByCid(Hash); Cbo.AddInteger("size", gsl::narrow(Payload.GetSize())); Cbo.EndObject(); } } Cbo.EndArray(); } else { Cbo.AddInteger("attachmentcount", gsl::narrow(ValueIt.second.Attachments.size())); size_t AttachmentsSize = 0; for (const IoHash& Hash : ValueIt.second.Attachments) { IoBuffer Payload = m_CidStore.FindChunkByCid(Hash); AttachmentsSize += Payload.GetSize(); } Cbo.AddInteger("attachmentssize", gsl::narrow(AttachmentsSize)); } } } } Cbo.EndObject(); } } Cbo.EndArray(); } Cbo.EndObject(); } } Cbo.EndArray(); } Cbo.EndObject(); } } Cbo.EndArray(); Request.WriteResponse(HttpResponseCode::OK, Cbo.Save()); } } void HttpStructuredCacheService::HandleRequest(HttpServerRequest& Request) { ZEN_TRACE_CPU("z$::Http::HandleRequest"); ZEN_MEMSCOPE(GetCacheHttpTag()); metrics::OperationTiming::Scope $(m_HttpRequests); const std::string_view Key = Request.RelativeUri(); std::string_view UriNamespace; if (Key.ends_with(HttpZCacheRPCPrefix)) { const size_t RpcOffset = Key.length() - HttpZCacheRPCPrefix.length(); if (RpcOffset) { std::string_view KeyPrefix = Key.substr(0, RpcOffset); if (KeyPrefix.back() == '/') { KeyPrefix.remove_suffix(1); UriNamespace = KeyPrefix; } } return HandleRpcRequest(Request, UriNamespace); } if (Key == HttpZCacheUtilStartRecording) { HttpServerRequest::QueryParams Params = Request.GetQueryParams(); std::string RecordPath = cpr::util::urlDecode(std::string(Params.GetValue("path"))); { RwLock::ExclusiveLockScope _(m_RequestRecordingLock); m_RequestRecordingEnabled.store(false); m_RequestRecorder.reset(); m_RequestRecorder = cache::MakeDiskRequestRecorder(RecordPath); m_RequestRecordingEnabled.store(true); } ZEN_INFO("cache RPC recording STARTED -> '{}'", RecordPath); Request.WriteResponse(HttpResponseCode::OK); return; } if (Key == HttpZCacheUtilStopRecording) { { RwLock::ExclusiveLockScope _(m_RequestRecordingLock); m_RequestRecordingEnabled.store(false); m_RequestRecorder.reset(); } ZEN_INFO("cache RPC recording STOPPED"); Request.WriteResponse(HttpResponseCode::OK); return; } if (Key == HttpZCacheUtilReplayRecording) { CacheRequestContext RequestContext = {.SessionId = Request.SessionId(), .RequestId = Request.RequestId()}; { RwLock::ExclusiveLockScope _(m_RequestRecordingLock); m_RequestRecordingEnabled.store(false); m_RequestRecorder.reset(); } HttpServerRequest::QueryParams Params = Request.GetQueryParams(); std::string RecordPath = cpr::util::urlDecode(std::string(Params.GetValue("path"))); uint32_t ThreadCount = std::thread::hardware_concurrency(); if (auto Param = Params.GetValue("thread_count"); Param.empty() == false) { if (auto Value = ParseInt(Param)) { ThreadCount = gsl::narrow(Value.value()); } } ZEN_INFO("initiating cache RPC replay using {} threads, from '{}'", ThreadCount, RecordPath); std::unique_ptr Replayer(cache::MakeDiskRequestReplayer(RecordPath, false)); ReplayRequestRecorder(RequestContext, *Replayer, ThreadCount < 1 ? 1 : ThreadCount); ZEN_INFO("cache RPC replay STARTED"); Request.WriteResponse(HttpResponseCode::OK); return; } if (Key.starts_with(HttpZCacheDetailsPrefix)) { HandleDetailsRequest(Request); return; } cacherequests::HttpRequestData RequestData; if (!cacherequests::HttpRequestParseRelativeUri(Key, ZenCacheStore::DefaultNamespace, RequestData)) { m_CacheStats.BadRequestCount++; return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL } if (RequestData.ValueContentId.has_value()) { ZEN_ASSERT(RequestData.Namespace.has_value()); ZEN_ASSERT(RequestData.Bucket.has_value()); ZEN_ASSERT(RequestData.HashKey.has_value()); CacheRef Ref = {.Namespace = RequestData.Namespace.value(), .BucketSegment = RequestData.Bucket.value(), .HashKey = RequestData.HashKey.value(), .ValueContentId = RequestData.ValueContentId.value()}; return HandleCacheChunkRequest(Request, Ref, ParseCachePolicy(Request.GetQueryParams())); } if (RequestData.HashKey.has_value()) { ZEN_ASSERT(RequestData.Namespace.has_value()); ZEN_ASSERT(RequestData.Bucket.has_value()); CacheRef Ref = {.Namespace = RequestData.Namespace.value(), .BucketSegment = RequestData.Bucket.value(), .HashKey = RequestData.HashKey.value(), .ValueContentId = IoHash::Zero}; return HandleCacheRecordRequest(Request, Ref, ParseCachePolicy(Request.GetQueryParams())); } if (RequestData.Bucket.has_value()) { ZEN_ASSERT(RequestData.Namespace.has_value()); return HandleCacheBucketRequest(Request, RequestData.Namespace.value(), RequestData.Bucket.value()); } if (RequestData.Namespace.has_value()) { return HandleCacheNamespaceRequest(Request, RequestData.Namespace.value()); } return HandleCacheRequest(Request); } void HttpStructuredCacheService::HandleCacheRequest(HttpServerRequest& Request) { switch (Request.RequestVerb()) { case HttpVerb::kHead: case HttpVerb::kGet: { ZenCacheStore::Info Info = m_CacheStore.GetInfo(); CbObjectWriter ResponseWriter; ResponseWriter.BeginObject("Configuration"); { ExtendableStringBuilder<128> BasePathString; BasePathString << Info.BasePath.u8string(); ResponseWriter.AddString("BasePath"sv, BasePathString.ToView()); ResponseWriter.AddBool("AllowAutomaticCreationOfNamespaces", Info.Config.AllowAutomaticCreationOfNamespaces); ResponseWriter.BeginObject("Logging"); { ResponseWriter.AddBool("EnableWriteLog", Info.Config.Logging.EnableWriteLog); ResponseWriter.AddBool("EnableAccessLog", Info.Config.Logging.EnableAccessLog); } ResponseWriter.EndObject(); } ResponseWriter.EndObject(); std::sort(begin(Info.NamespaceNames), end(Info.NamespaceNames), [](std::string_view L, std::string_view R) { return L.compare(R) < 0; }); ResponseWriter.BeginArray("Namespaces"); for (const std::string& NamespaceName : Info.NamespaceNames) { ResponseWriter.AddString(NamespaceName); } ResponseWriter.EndArray(); ResponseWriter.BeginObject("StorageSize"); { ResponseWriter.AddInteger("DiskSize", Info.StorageSize.DiskSize); ResponseWriter.AddInteger("MemorySize", Info.StorageSize.MemorySize); } ResponseWriter.EndObject(); ResponseWriter.AddInteger("DiskEntryCount", Info.DiskEntryCount); return Request.WriteResponse(HttpResponseCode::OK, ResponseWriter.Save()); } break; default: m_CacheStats.BadRequestCount++; break; } } void HttpStructuredCacheService::HandleCacheNamespaceRequest(HttpServerRequest& Request, std::string_view NamespaceName) { switch (Request.RequestVerb()) { case HttpVerb::kHead: case HttpVerb::kGet: { std::optional Info = m_CacheStore.GetNamespaceInfo(NamespaceName); if (!Info.has_value()) { return Request.WriteResponse(HttpResponseCode::NotFound); } CbObjectWriter ResponseWriter; ResponseWriter.BeginObject("Configuration"); { ExtendableStringBuilder<128> BasePathString; BasePathString << Info->RootDir.u8string(); ResponseWriter.AddString("RootDir"sv, BasePathString.ToView()); ResponseWriter.AddInteger("MaxBlockSize"sv, Info->Config.DiskLayerConfig.BucketConfig.MaxBlockSize); ResponseWriter.AddInteger("PayloadAlignment"sv, Info->Config.DiskLayerConfig.BucketConfig.PayloadAlignment); ResponseWriter.AddInteger("MemCacheSizeThreshold"sv, Info->Config.DiskLayerConfig.BucketConfig.MemCacheSizeThreshold); ResponseWriter.AddInteger("LargeObjectThreshold"sv, Info->Config.DiskLayerConfig.BucketConfig.LargeObjectThreshold); ResponseWriter.AddInteger("MemCacheTargetFootprintBytes"sv, Info->Config.DiskLayerConfig.MemCacheTargetFootprintBytes); ResponseWriter.AddInteger("MemCacheTrimIntervalSeconds"sv, Info->Config.DiskLayerConfig.MemCacheTrimIntervalSeconds); ResponseWriter.AddInteger("MemCacheMaxAgeSeconds"sv, Info->Config.DiskLayerConfig.MemCacheMaxAgeSeconds); } ResponseWriter.EndObject(); std::sort(begin(Info->BucketNames), end(Info->BucketNames), [](std::string_view L, std::string_view R) { return L.compare(R) < 0; }); ResponseWriter.BeginArray("Buckets"sv); for (const std::string& BucketName : Info->BucketNames) { ResponseWriter.AddString(BucketName); } ResponseWriter.EndArray(); ResponseWriter.BeginObject("StorageSize"sv); { ResponseWriter.AddInteger("DiskSize"sv, Info->DiskLayerInfo.StorageSize.DiskSize); ResponseWriter.AddInteger("MemorySize"sv, Info->DiskLayerInfo.StorageSize.MemorySize); } ResponseWriter.EndObject(); ResponseWriter.AddInteger("EntryCount", Info->DiskLayerInfo.EntryCount); if (auto Buckets = HttpServerRequest::Decode(Request.GetQueryParams().GetValue("bucketsizes")); !Buckets.empty()) { ResponseWriter.BeginObject("BucketSizes"); ResponseWriter.BeginArray("Buckets"); std::vector BucketNames; if (Buckets == "*") // Get all - empty FieldFilter equal getting all fields { BucketNames = Info.value().BucketNames; } else { ForEachStrTok(Buckets, ',', [&](std::string_view BucketName) { BucketNames.push_back(std::string(BucketName)); return true; }); } WorkerThreadPool& WorkerPool = GetMediumWorkerPool(EWorkloadType::Background); std::vector AllAttachments; for (const std::string& BucketName : BucketNames) { ResponseWriter.BeginObject(); ResponseWriter << "Name" << BucketName; CacheContentStats ContentStats; bool Success = m_CacheStore.GetContentStats(NamespaceName, BucketName, ContentStats); if (Success) { size_t ValuesSize = 0; for (const uint64_t Size : ContentStats.ValueSizes) { ValuesSize += Size; } std::sort(ContentStats.Attachments.begin(), ContentStats.Attachments.end()); auto NewEnd = std::unique(ContentStats.Attachments.begin(), ContentStats.Attachments.end()); ContentStats.Attachments.erase(NewEnd, ContentStats.Attachments.end()); ResponseWriter << "Count" << ContentStats.ValueSizes.size(); ResponseWriter << "StructuredCount" << ContentStats.StructuredValuesCount; ResponseWriter << "StandaloneCount" << ContentStats.StandaloneValuesCount; ResponseWriter << "Size" << ValuesSize; ResponseWriter << "AttachmentCount" << ContentStats.Attachments.size(); AllAttachments.insert(AllAttachments.end(), ContentStats.Attachments.begin(), ContentStats.Attachments.end()); } ResponseWriter.EndObject(); } ResponseWriter.EndArray(); ResponseWriter.BeginObject("Attachments"); std::sort(AllAttachments.begin(), AllAttachments.end()); auto NewEnd = std::unique(AllAttachments.begin(), AllAttachments.end()); AllAttachments.erase(NewEnd, AllAttachments.end()); uint64_t AttachmentsSize = 0; m_CidStore.IterateChunks( AllAttachments, [&](size_t Index, const IoBuffer& Payload) { ZEN_UNUSED(Index); AttachmentsSize += Payload.GetSize(); return true; }, &WorkerPool, 8u * 1024u); ResponseWriter << "Count" << AllAttachments.size(); ResponseWriter << "Size" << AttachmentsSize; ResponseWriter.EndObject(); ResponseWriter.EndObject(); } return Request.WriteResponse(HttpResponseCode::OK, ResponseWriter.Save()); } break; case HttpVerb::kDelete: // Drop namespace { if (m_CacheStore.DropNamespace(NamespaceName)) { return Request.WriteResponse(HttpResponseCode::OK); } else { return Request.WriteResponse(HttpResponseCode::NotFound); } } break; default: break; } } void HttpStructuredCacheService::HandleCacheBucketRequest(HttpServerRequest& Request, std::string_view NamespaceName, std::string_view BucketName) { switch (Request.RequestVerb()) { case HttpVerb::kHead: case HttpVerb::kGet: { std::optional Info = m_CacheStore.GetBucketInfo(NamespaceName, BucketName); if (!Info.has_value()) { return Request.WriteResponse(HttpResponseCode::NotFound); } CbObjectWriter ResponseWriter; ResponseWriter.BeginObject("StorageSize"); { ResponseWriter.AddInteger("DiskSize", Info->DiskLayerInfo.StorageSize.DiskSize); ResponseWriter.AddInteger("MemorySize", Info->DiskLayerInfo.StorageSize.MemorySize); } ResponseWriter.EndObject(); ResponseWriter.AddInteger("DiskEntryCount", Info->DiskLayerInfo.EntryCount); if (auto GetBucketSize = Request.GetQueryParams().GetValue("bucketsize"); GetBucketSize == "true") { CacheContentStats ContentStats; bool Success = m_CacheStore.GetContentStats(NamespaceName, BucketName, ContentStats); if (Success) { size_t ValuesSize = 0; for (const uint64_t Size : ContentStats.ValueSizes) { ValuesSize += Size; } std::sort(ContentStats.Attachments.begin(), ContentStats.Attachments.end()); auto NewEnd = std::unique(ContentStats.Attachments.begin(), ContentStats.Attachments.end()); ContentStats.Attachments.erase(NewEnd, ContentStats.Attachments.end()); ResponseWriter << "Count" << ContentStats.ValueSizes.size(); ResponseWriter << "StructuredCount" << ContentStats.StructuredValuesCount; ResponseWriter << "StandaloneCount" << ContentStats.StandaloneValuesCount; ResponseWriter << "Size" << ValuesSize; ResponseWriter << "AttachmentCount" << ContentStats.Attachments.size(); uint64_t AttachmentsSize = 0; WorkerThreadPool& WorkerPool = GetMediumWorkerPool(EWorkloadType::Background); m_CidStore.IterateChunks( ContentStats.Attachments, [&](size_t Index, const IoBuffer& Payload) { ZEN_UNUSED(Index); AttachmentsSize += Payload.GetSize(); return true; }, &WorkerPool, 8u * 1024u); ResponseWriter << "AttachmentsSize" << AttachmentsSize; } } return Request.WriteResponse(HttpResponseCode::OK, ResponseWriter.Save()); } break; case HttpVerb::kDelete: // Drop bucket { if (m_CacheStore.DropBucket(NamespaceName, BucketName)) { return Request.WriteResponse(HttpResponseCode::OK); } else { return Request.WriteResponse(HttpResponseCode::NotFound); } } break; default: break; } } void HttpStructuredCacheService::HandleCacheRecordRequest(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl) { switch (Request.RequestVerb()) { case HttpVerb::kHead: case HttpVerb::kGet: HandleGetCacheRecord(Request, Ref, PolicyFromUrl); break; case HttpVerb::kPut: HandlePutCacheRecord(Request, Ref, PolicyFromUrl); break; default: break; } } void HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl) { const ZenContentType AcceptType = Request.AcceptContentType(); const bool SkipData = EnumHasAllFlags(PolicyFromUrl, CachePolicy::SkipData); const bool PartialRecord = EnumHasAllFlags(PolicyFromUrl, CachePolicy::PartialRecord); bool Success = false; ZenCacheValue ClientResultValue; if (!EnumHasAnyFlags(PolicyFromUrl, CachePolicy::Query)) { return Request.WriteResponse(HttpResponseCode::OK); } const bool HasUpstream = m_UpstreamCache.IsActive(); CacheRequestContext RequestContext = {.SessionId = Request.SessionId(), .RequestId = Request.RequestId()}; Stopwatch Timer; if (EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal) && m_CacheStore.Get(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ClientResultValue)) { Success = true; ZenContentType ContentType = ClientResultValue.Value.GetContentType(); if (AcceptType == ZenContentType::kCbPackage) { if (ContentType == ZenContentType::kCbObject) { CbPackage Package; uint32_t MissingCount = 0; CbObjectView CacheRecord(ClientResultValue.Value.Data()); CacheRecord.IterateAttachments([this, &MissingCount, &Package, SkipData](CbFieldView AttachmentHash) { if (SkipData) { if (!m_CidStore.ContainsChunk(AttachmentHash.AsHash())) { MissingCount++; } } else { if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash())) { CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(Chunk)); if (Compressed) { Package.AddAttachment(CbAttachment(Compressed, AttachmentHash.AsHash())); } else { ZEN_WARN("invalid compressed binary returned for {}", AttachmentHash.AsHash()); MissingCount++; } } else { MissingCount++; } } }); Success = MissingCount == 0 || PartialRecord; if (Success) { Package.SetObject(LoadCompactBinaryObject(ClientResultValue.Value)); BinaryWriter MemStream; Package.Save(MemStream); ClientResultValue.Value = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size()); ClientResultValue.Value.SetContentType(HttpContentType::kCbPackage); } } else { Success = false; } } else if (AcceptType != ClientResultValue.Value.GetContentType() && AcceptType != ZenContentType::kUnknownContentType && AcceptType != ZenContentType::kBinary) { Success = false; } } if (Success) { ZEN_DEBUG("GETCACHERECORD HIT - '{}/{}/{}' {} '{}' (LOCAL) in {}", Ref.Namespace, Ref.BucketSegment, Ref.HashKey, NiceBytes(ClientResultValue.Value.Size()), ToString(ClientResultValue.Value.GetContentType()), NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); m_CacheStats.HitCount++; if (SkipData && AcceptType != ZenContentType::kCbPackage && AcceptType != ZenContentType::kCbObject) { return Request.WriteResponse(HttpResponseCode::OK); } else { // kCbPackage handled SkipData when constructing the ClientResultValue, kcbObject ignores SkipData return Request.WriteResponse(HttpResponseCode::OK, ClientResultValue.Value.GetContentType(), ClientResultValue.Value); } } else if (!HasUpstream || !EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryRemote)) { ZEN_DEBUG("GETCACHERECORD MISS - '{}/{}/{}' '{}' in {}", Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ToString(AcceptType), NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); m_CacheStats.MissCount++; return Request.WriteResponse(HttpResponseCode::NotFound); } // Issue upstream query asynchronously in order to keep requests flowing without // hogging I/O servicing threads with blocking work uint64_t LocalElapsedTimeUs = Timer.GetElapsedTimeUs(); Request.WriteResponseAsync([this, AcceptType, PolicyFromUrl, Ref, LocalElapsedTimeUs, RequestContext](HttpServerRequest& AsyncRequest) { Stopwatch Timer; bool Success = false; const bool PartialRecord = EnumHasAllFlags(PolicyFromUrl, CachePolicy::PartialRecord); const bool QueryLocal = EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal); const bool StoreLocal = EnumHasAllFlags(PolicyFromUrl, CachePolicy::StoreLocal) && AreDiskWritesAllowed(); const bool SkipData = EnumHasAllFlags(PolicyFromUrl, CachePolicy::SkipData); ZenCacheValue ClientResultValue; metrics::OperationTiming::Scope $(m_UpstreamGetRequestTiming); if (GetUpstreamCacheSingleResult UpstreamResult = m_UpstreamCache.GetCacheRecord(Ref.Namespace, {Ref.BucketSegment, Ref.HashKey}, AcceptType); UpstreamResult.Status.Success) { Success = true; ClientResultValue.Value = UpstreamResult.Value; ClientResultValue.Value.SetContentType(AcceptType); if (AcceptType == ZenContentType::kBinary || AcceptType == ZenContentType::kCbObject) { if (AcceptType == ZenContentType::kCbObject) { const CbValidateError ValidationResult = ValidateCompactBinary(UpstreamResult.Value, CbValidateMode::All); if (ValidationResult != CbValidateError::None) { Success = false; ZEN_WARN("Get - '{}/{}/{}' '{}' FAILED, invalid compact binary object from upstream", Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ToString(AcceptType)); } // We do not do anything to the returned object for SkipData, only package attachments are cut when skipping data } if (Success && StoreLocal) { const bool Overwrite = !EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal); ZenCacheStore::PutResult PutResult = m_CacheStore .Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ClientResultValue, {}, Overwrite, nullptr); if (PutResult.Status == zen::PutStatus::Success) { m_CacheStats.WriteCount++; } } } else if (AcceptType == ZenContentType::kCbPackage) { CbPackage Package; if (Package.TryLoad(ClientResultValue.Value)) { CbObject CacheRecord = Package.GetObject(); AttachmentCount Count; size_t NumAttachments = Package.GetAttachments().size(); std::vector ReferencedAttachments; std::vector WriteAttachmentBuffers; WriteAttachmentBuffers.reserve(NumAttachments); std::vector WriteRawHashes; WriteRawHashes.reserve(NumAttachments); CacheRecord.IterateAttachments([this, &Package, &Ref, &WriteAttachmentBuffers, &WriteRawHashes, &ReferencedAttachments, &Count, QueryLocal, StoreLocal, SkipData](CbFieldView HashView) { IoHash Hash = HashView.AsHash(); ReferencedAttachments.push_back(Hash); if (const CbAttachment* Attachment = Package.FindAttachment(Hash)) { if (Attachment->IsCompressedBinary()) { if (StoreLocal) { const CompressedBuffer& Chunk = Attachment->AsCompressedBinary(); WriteAttachmentBuffers.push_back(Chunk.GetCompressed().Flatten().AsIoBuffer()); WriteRawHashes.push_back(Attachment->GetHash()); } Count.Valid++; } else { ZEN_WARN("Uncompressed value '{}' from upstream cache record '{}/{}'", Hash, Ref.BucketSegment, Ref.HashKey); Count.Invalid++; } } else if (QueryLocal) { if (SkipData) { if (m_CidStore.ContainsChunk(Hash)) { Count.Valid++; } } else if (IoBuffer Chunk = m_CidStore.FindChunkByCid(Hash)) { CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(Chunk)); if (Compressed) { Package.AddAttachment(CbAttachment(Compressed, Hash)); Count.Valid++; } else { ZEN_WARN("Uncompressed value '{}' stored in local cache '{}/{}'", Hash, Ref.BucketSegment, Ref.HashKey); Count.Invalid++; } } } Count.Total++; }); if ((Count.Valid == Count.Total) || PartialRecord) { ZenCacheValue CacheValue; CacheValue.Value = CacheRecord.GetBuffer().AsIoBuffer(); CacheValue.Value.SetContentType(ZenContentType::kCbObject); if (StoreLocal) { const bool Overwrite = !EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal); ZenCacheStore::PutResult PutResult = m_CacheStore.Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, CacheValue, ReferencedAttachments, Overwrite, nullptr); if (PutResult.Status == zen::PutStatus::Success) { m_CacheStats.WriteCount++; if (!WriteAttachmentBuffers.empty()) { std::vector InsertResults = m_CidStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes); for (const CidStore::InsertResult& Result : InsertResults) { if (Result.New) { Count.New++; } } } WriteAttachmentBuffers = {}; WriteRawHashes = {}; } } BinaryWriter MemStream; if (SkipData) { // Save a package containing only the object. CbPackage(Package.GetObject()).Save(MemStream); } else { Package.Save(MemStream); } ClientResultValue.Value = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size()); ClientResultValue.Value.SetContentType(ZenContentType::kCbPackage); } else { Success = false; ZEN_WARN("Get - '{}/{}' '{}' FAILED, attachments missing in upstream package", Ref.BucketSegment, Ref.HashKey, ToString(AcceptType)); } } else { Success = false; ZEN_WARN("Get - '{}/{}/{}' '{}' FAILED, invalid upstream package", Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ToString(AcceptType)); } } } if (Success) { ZEN_DEBUG("GETCACHERECORD HIT - '{}/{}/{}' {} '{}' (UPSTREAM) in {}", Ref.Namespace, Ref.BucketSegment, Ref.HashKey, NiceBytes(ClientResultValue.Value.Size()), ToString(ClientResultValue.Value.GetContentType()), NiceLatencyNs((LocalElapsedTimeUs + Timer.GetElapsedTimeUs()) * 1000)); m_CacheStats.HitCount++; m_CacheStats.UpstreamHitCount++; if (SkipData && AcceptType == ZenContentType::kBinary) { AsyncRequest.WriteResponse(HttpResponseCode::OK); } else { // Other methods modify ClientResultValue to a version that has skipped the data but keeps the Object and optionally // metadata. AsyncRequest.WriteResponse(HttpResponseCode::OK, ClientResultValue.Value.GetContentType(), ClientResultValue.Value); } } else { ZEN_DEBUG("GETCACHERECORD MISS - '{}/{}/{}' '{}' in {}", Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ToString(AcceptType), NiceLatencyNs((LocalElapsedTimeUs + Timer.GetElapsedTimeUs()) * 1000)); m_CacheStats.MissCount++; AsyncRequest.WriteResponse(HttpResponseCode::NotFound); } }); } void HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl) { IoBuffer Body = Request.ReadPayload(); if (!Body || Body.Size() == 0) { m_CacheStats.BadRequestCount++; return Request.WriteResponse(HttpResponseCode::BadRequest); } if (!AreDiskWritesAllowed()) { return Request.WriteResponse(HttpResponseCode::InsufficientStorage); } auto WriteFailureResponse = [&Request](const ZenCacheStore::PutResult& PutResult) { ZEN_UNUSED(PutResult); HttpResponseCode ResponseCode = HttpResponseCode::InternalServerError; switch (PutResult.Status) { case zen::PutStatus::Conflict: ResponseCode = HttpResponseCode::Conflict; break; case zen::PutStatus::Invalid: ResponseCode = HttpResponseCode::BadRequest; break; } return PutResult.Message.empty() ? Request.WriteResponse(ResponseCode) : Request.WriteResponse(ResponseCode, zen::HttpContentType::kText, PutResult.Message); }; const HttpContentType ContentType = Request.RequestContentType(); Body.SetContentType(ContentType); CacheRequestContext RequestContext = {.SessionId = Request.SessionId(), .RequestId = Request.RequestId()}; const bool HasUpstream = m_UpstreamCache.IsActive(); Stopwatch Timer; if (ContentType == HttpContentType::kBinary || ContentType == HttpContentType::kCompressedBinary) { IoHash RawHash = IoHash::Zero; uint64_t RawSize = Body.GetSize(); if (ContentType == HttpContentType::kCompressedBinary) { if (!CompressedBuffer::ValidateCompressedHeader(Body, RawHash, RawSize)) { m_CacheStats.BadRequestCount++; return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Payload is not a valid compressed binary"sv); } } else { RawHash = IoHash::HashBuffer(SharedBuffer(Body)); } const bool Overwrite = !EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal); // TODO: Propagation for rejected PUTs ZenCacheStore::PutResult PutResult = m_CacheStore.Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, {.Value = Body, .RawSize = RawSize, .RawHash = RawHash}, {}, Overwrite, nullptr); if (PutResult.Status != zen::PutStatus::Success) { return WriteFailureResponse(PutResult); } m_CacheStats.WriteCount++; if (HasUpstream && EnumHasAllFlags(PolicyFromUrl, CachePolicy::StoreRemote)) { m_UpstreamCache.EnqueueUpstream({.Type = ContentType, .Namespace = Ref.Namespace, .Key = {Ref.BucketSegment, Ref.HashKey}}); } ZEN_DEBUG("PUTCACHERECORD - '{}/{}/{}' {} '{}' in {}", Ref.Namespace, Ref.BucketSegment, Ref.HashKey, NiceBytes(Body.Size()), ToString(ContentType), NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); Request.WriteResponse(HttpResponseCode::Created); } else if (ContentType == HttpContentType::kCbObject) { const CbValidateError ValidationResult = ValidateCompactBinary(MemoryView(Body.GetData(), Body.GetSize()), CbValidateMode::All); if (ValidationResult != CbValidateError::None) { ZEN_WARN("PUTCACHERECORD - '{}/{}/{}' '{}' FAILED, invalid compact binary", Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ToString(ContentType)); m_CacheStats.BadRequestCount++; return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Compact binary validation failed"sv); } Body.SetContentType(ZenContentType::kCbObject); CbObjectView CacheRecord(Body.Data()); std::vector ValidAttachments; std::vector ReferencedAttachments; int32_t TotalCount = 0; CacheRecord.IterateAttachments([this, &TotalCount, &ValidAttachments, &ReferencedAttachments](CbFieldView AttachmentHash) { const IoHash Hash = AttachmentHash.AsHash(); ReferencedAttachments.push_back(Hash); if (m_CidStore.ContainsChunk(Hash)) { ValidAttachments.emplace_back(Hash); } TotalCount++; }); const bool Overwrite = !EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal); // TODO: Propagation for rejected PUTs ZenCacheStore::PutResult PutResult = m_CacheStore.Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, {.Value = Body}, ReferencedAttachments, Overwrite, nullptr); if (PutResult.Status != zen::PutStatus::Success) { return WriteFailureResponse(PutResult); } m_CacheStats.WriteCount++; ZEN_DEBUG("PUTCACHERECORD - '{}/{}/{}' {} '{}' attachments '{}/{}' (valid/total) in {}", Ref.Namespace, Ref.BucketSegment, Ref.HashKey, NiceBytes(Body.Size()), ToString(ContentType), TotalCount, ValidAttachments.size(), NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); const bool IsPartialRecord = TotalCount != static_cast(ValidAttachments.size()); CachePolicy Policy = PolicyFromUrl; if (HasUpstream && EnumHasAllFlags(Policy, CachePolicy::StoreRemote) && !IsPartialRecord) { m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbObject, .Namespace = Ref.Namespace, .Key = {Ref.BucketSegment, Ref.HashKey}, .ValueContentIds = std::move(ValidAttachments)}); } Request.WriteResponse(HttpResponseCode::Created); } else if (ContentType == HttpContentType::kCbPackage) { CbPackage Package; if (!Package.TryLoad(Body)) { ZEN_WARN("PUTCACHERECORD - '{}/{}/{}' '{}' FAILED, invalid package", Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ToString(ContentType)); m_CacheStats.BadRequestCount++; return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid package"sv); } CachePolicy Policy = PolicyFromUrl; CbObject CacheRecord = Package.GetObject(); AttachmentCount Count; size_t NumAttachments = Package.GetAttachments().size(); std::vector ValidAttachments; std::vector ReferencedAttachments; ValidAttachments.reserve(NumAttachments); std::vector WriteAttachmentBuffers; std::vector WriteRawHashes; WriteAttachmentBuffers.reserve(NumAttachments); WriteRawHashes.reserve(NumAttachments); CacheRecord.IterateAttachments( [this, &Ref, &Package, &WriteAttachmentBuffers, &WriteRawHashes, &ValidAttachments, &ReferencedAttachments, &Count]( CbFieldView HashView) { const IoHash Hash = HashView.AsHash(); ReferencedAttachments.push_back(Hash); if (const CbAttachment* Attachment = Package.FindAttachment(Hash)) { if (Attachment->IsCompressedBinary()) { WriteAttachmentBuffers.emplace_back(Attachment->AsCompressedBinary().GetCompressed().Flatten().AsIoBuffer()); WriteRawHashes.push_back(Hash); ValidAttachments.emplace_back(Hash); Count.Valid++; } else { ZEN_WARN("PUTCACHERECORD - '{}/{}/{}' '{}' FAILED, attachment '{}' is not compressed", Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ToString(HttpContentType::kCbPackage), Hash); Count.Invalid++; } } else if (m_CidStore.ContainsChunk(Hash)) { ValidAttachments.emplace_back(Hash); Count.Valid++; } Count.Total++; }); if (Count.Invalid > 0) { m_CacheStats.BadRequestCount++; return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid attachment(s)"sv); } const bool Overwrite = !EnumHasAllFlags(Policy, CachePolicy::QueryLocal); ZenCacheValue CacheValue; CacheValue.Value = CacheRecord.GetBuffer().AsIoBuffer(); CacheValue.Value.SetContentType(ZenContentType::kCbObject); // TODO: Propagation for rejected PUTs ZenCacheStore::PutResult PutResult = m_CacheStore.Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, CacheValue, ReferencedAttachments, Overwrite); if (PutResult.Status != zen::PutStatus::Success) { return WriteFailureResponse(PutResult); } m_CacheStats.WriteCount++; if (!WriteAttachmentBuffers.empty()) { std::vector InsertResults = m_CidStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes); for (const CidStore::InsertResult& InsertResult : InsertResults) { if (InsertResult.New) { Count.New++; } } WriteAttachmentBuffers = {}; WriteRawHashes = {}; } ZEN_DEBUG("PUTCACHERECORD - '{}/{}/{}' {} '{}', attachments '{}/{}/{}' (new/valid/total) in {}", Ref.Namespace, Ref.BucketSegment, Ref.HashKey, NiceBytes(Body.GetSize()), ToString(ContentType), Count.New, Count.Valid, Count.Total, NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); const bool IsPartialRecord = Count.Valid != Count.Total; if (HasUpstream && EnumHasAllFlags(Policy, CachePolicy::StoreRemote) && !IsPartialRecord) { m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbPackage, .Namespace = Ref.Namespace, .Key = {Ref.BucketSegment, Ref.HashKey}, .ValueContentIds = std::move(ValidAttachments)}); } Request.WriteResponse(HttpResponseCode::Created); } else { m_CacheStats.BadRequestCount++; return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Content-Type invalid"sv); } } void HttpStructuredCacheService::HandleCacheChunkRequest(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl) { switch (Request.RequestVerb()) { case HttpVerb::kHead: case HttpVerb::kGet: HandleGetCacheChunk(Request, Ref, PolicyFromUrl); break; case HttpVerb::kPut: HandlePutCacheChunk(Request, Ref, PolicyFromUrl); break; default: break; } } void HttpStructuredCacheService::HandleGetCacheChunk(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl) { Stopwatch Timer; IoBuffer Value = m_CidStore.FindChunkByCid(Ref.ValueContentId); const UpstreamEndpointInfo* Source = nullptr; CachePolicy Policy = PolicyFromUrl; const bool HasUpstream = m_UpstreamCache.IsActive(); { const bool QueryUpstream = HasUpstream && !Value && EnumHasAllFlags(Policy, CachePolicy::QueryRemote); if (QueryUpstream) { if (GetUpstreamCacheSingleResult UpstreamResult = m_UpstreamCache.GetCacheChunk(Ref.Namespace, {Ref.BucketSegment, Ref.HashKey}, Ref.ValueContentId); UpstreamResult.Status.Success) { IoHash RawHash; uint64_t RawSize; if (CompressedBuffer::ValidateCompressedHeader(UpstreamResult.Value, RawHash, RawSize)) { if (RawHash == Ref.ValueContentId) { if (AreDiskWritesAllowed()) { m_CidStore.AddChunk(UpstreamResult.Value, RawHash); } Source = UpstreamResult.Source; } else { ZEN_WARN("got missmatching upstream cache value"); } } else { ZEN_WARN("got uncompressed upstream cache value"); } } } } if (!Value) { ZEN_DEBUG("GETCACHECHUNK MISS - '{}/{}/{}/{}' '{}' in {}", Ref.Namespace, Ref.BucketSegment, Ref.HashKey, Ref.ValueContentId, ToString(Request.AcceptContentType()), NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); m_CacheStats.MissCount++; return Request.WriteResponse(HttpResponseCode::NotFound); } ZEN_DEBUG("GETCACHECHUNK HIT - '{}/{}/{}/{}' {} '{}' ({}) in {}", Ref.Namespace, Ref.BucketSegment, Ref.HashKey, Ref.ValueContentId, NiceBytes(Value.Size()), ToString(Value.GetContentType()), Source ? Source->Url : "LOCAL"sv, NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); m_CacheStats.HitCount++; if (Source) { m_CacheStats.UpstreamHitCount++; } if (EnumHasAllFlags(Policy, CachePolicy::SkipData)) { Request.WriteResponse(HttpResponseCode::OK); } else { Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, Value); } } void HttpStructuredCacheService::HandlePutCacheChunk(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl) { // Note: Individual cacherecord values are not propagated upstream until a valid cache record has been stored ZEN_UNUSED(PolicyFromUrl); Stopwatch Timer; IoBuffer Body = Request.ReadPayload(); if (!Body || Body.Size() == 0) { m_CacheStats.BadRequestCount++; return Request.WriteResponse(HttpResponseCode::BadRequest); } if (!AreDiskWritesAllowed()) { return Request.WriteResponse(HttpResponseCode::InsufficientStorage); } Body.SetContentType(Request.RequestContentType()); IoHash RawHash; uint64_t RawSize; if (!CompressedBuffer::ValidateCompressedHeader(Body, RawHash, RawSize)) { m_CacheStats.BadRequestCount++; return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Attachments must be compressed"sv); } if (RawHash != Ref.ValueContentId) { m_CacheStats.BadRequestCount++; return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "ValueContentId does not match attachment hash"sv); } CidStore::InsertResult Result = m_CidStore.AddChunk(Body, RawHash); ZEN_DEBUG("PUTCACHECHUNK - '{}/{}/{}/{}' {} '{}' ({}) in {}", Ref.Namespace, Ref.BucketSegment, Ref.HashKey, Ref.ValueContentId, NiceBytes(Body.Size()), ToString(Body.GetContentType()), Result.New ? "NEW" : "OLD", NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); const HttpResponseCode ResponseCode = Result.New ? HttpResponseCode::Created : HttpResponseCode::OK; Request.WriteResponse(ResponseCode); } void HttpStructuredCacheService::ReplayRequestRecorder(const CacheRequestContext& Context, cache::IRpcRequestReplayer& Replayer, uint32_t ThreadCount) { WorkerThreadPool WorkerPool(ThreadCount); uint64_t RequestCount = Replayer.GetRequestCount(); Stopwatch Timer; auto _ = MakeGuard([&]() { ZEN_INFO("Replayed {} requests in {}", RequestCount, NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); }); Latch JobLatch(RequestCount); ZEN_INFO("Replaying {} requests", RequestCount); for (uint64_t RequestIndex = 0; RequestIndex < RequestCount; ++RequestIndex) { WorkerPool.ScheduleWork([this, &Context, &JobLatch, &Replayer, RequestIndex]() { IoBuffer Body; zen::cache::RecordedRequestInfo RequestInfo = Replayer.GetRequest(RequestIndex, /* out */ Body); if (Body) { uint32_t AcceptMagic = 0; RpcAcceptOptions AcceptFlags = RpcAcceptOptions::kNone; int TargetPid = 0; CbPackage RpcResult; if (m_RpcHandler.HandleRpcRequest(Context, /* UriNamespace */ {}, RequestInfo.ContentType, std::move(Body), AcceptMagic, AcceptFlags, TargetPid, RpcResult) == CacheRpcHandler::RpcResponseCode::OK) { if (AcceptMagic == kCbPkgMagic) { void* TargetProcessHandle = nullptr; FormatFlags Flags = FormatFlags::kDefault; if (EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences)) { Flags |= FormatFlags::kAllowLocalReferences; if (!EnumHasAnyFlags(AcceptFlags, RpcAcceptOptions::kAllowPartialLocalReferences)) { Flags |= FormatFlags::kDenyPartialLocalReferences; } TargetProcessHandle = m_OpenProcessCache.GetProcessHandle(Context.SessionId, TargetPid); } CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResult, Flags, TargetProcessHandle); ZEN_ASSERT(RpcResponseBuffer.GetSize() > 0); } else { BinaryWriter MemStream; RpcResult.Save(MemStream); IoBuffer RpcResponseBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()); ZEN_ASSERT(RpcResponseBuffer.Size() > 0); } } } JobLatch.CountDown(); }); } while (!JobLatch.Wait(10000)) { ZEN_INFO("Replayed {} of {} requests, elapsed {}", RequestCount - JobLatch.Remaining(), RequestCount, NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); } } void HttpStructuredCacheService::HandleRpcRequest(HttpServerRequest& Request, std::string_view UriNamespace) { ZEN_MEMSCOPE(GetCacheRpcTag()); ZEN_TRACE_CPU("z$::Http::HandleRpcRequest"); const bool HasUpstream = m_UpstreamCache.IsActive(); switch (Request.RequestVerb()) { case HttpVerb::kPost: { CacheRequestContext RequestContext = {.SessionId = Request.SessionId(), .RequestId = Request.RequestId()}; const HttpContentType ContentType = Request.RequestContentType(); const HttpContentType AcceptType = Request.AcceptContentType(); if ((ContentType != HttpContentType::kCbObject && ContentType != HttpContentType::kCbPackage) || AcceptType != HttpContentType::kCbPackage) { m_CacheStats.BadRequestCount++; return Request.WriteResponse(HttpResponseCode::BadRequest); } auto HandleRpc = [this, RequestContext, Body = Request.ReadPayload(), ContentType, AcceptType, UriNamespaceString = std::string{UriNamespace}](HttpServerRequest& AsyncRequest) mutable { if (m_RequestRecordingEnabled) { RwLock::SharedLockScope _(m_RequestRecordingLock); if (m_RequestRecorder) { m_RequestRecorder->RecordRequest( {.ContentType = ContentType, .AcceptType = AcceptType, .SessionId = RequestContext.SessionId}, Body); } } uint32_t AcceptMagic = 0; RpcAcceptOptions AcceptFlags = RpcAcceptOptions::kNone; int TargetProcessId = 0; CbPackage RpcResult; CacheRpcHandler::RpcResponseCode ResultCode = m_RpcHandler.HandleRpcRequest(RequestContext, UriNamespaceString, ContentType, std::move(Body), /* out */ AcceptMagic, /* out */ AcceptFlags, /* out */ TargetProcessId, /* out */ RpcResult); HttpResponseCode HttpResultCode = HttpResponseCode(int(ResultCode)); if (!IsHttpSuccessCode(HttpResultCode)) { return AsyncRequest.WriteResponse(HttpResultCode); } if (AcceptMagic == kCbPkgMagic) { void* TargetProcessHandle = nullptr; FormatFlags Flags = FormatFlags::kDefault; if (EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences)) { Flags |= FormatFlags::kAllowLocalReferences; if (!EnumHasAnyFlags(AcceptFlags, RpcAcceptOptions::kAllowPartialLocalReferences)) { Flags |= FormatFlags::kDenyPartialLocalReferences; } TargetProcessHandle = m_OpenProcessCache.GetProcessHandle(RequestContext.SessionId, TargetProcessId); } CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResult, Flags, TargetProcessHandle); AsyncRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer); } else { BinaryWriter MemStream; RpcResult.Save(MemStream); AsyncRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); } }; if (HasUpstream) { ZEN_TRACE_CPU("z$::Http::HandleRpcRequest::WriteResponseAsync"); Request.WriteResponseAsync(std::move(HandleRpc)); } else { ZEN_TRACE_CPU("z$::Http::HandleRpcRequest::WriteResponse"); HandleRpc(Request); } } break; default: m_CacheStats.BadRequestCount++; Request.WriteResponse(HttpResponseCode::BadRequest); break; } } void HttpStructuredCacheService::HandleStatsRequest(HttpServerRequest& Request) { ZEN_MEMSCOPE(GetCacheHttpTag()); CbObjectWriter Cbo; EmitSnapshot("requests", m_HttpRequests, Cbo); const uint64_t HitCount = m_CacheStats.HitCount; const uint64_t UpstreamHitCount = m_CacheStats.UpstreamHitCount; const uint64_t MissCount = m_CacheStats.MissCount; const uint64_t WriteCount = m_CacheStats.WriteCount; const uint64_t BadRequestCount = m_CacheStats.BadRequestCount; struct CidStoreStats StoreStats = m_CidStore.Stats(); const uint64_t ChunkHitCount = StoreStats.HitCount; const uint64_t ChunkMissCount = StoreStats.MissCount; const uint64_t ChunkWriteCount = StoreStats.WriteCount; const uint64_t TotalCount = HitCount + MissCount; const uint64_t RpcRequests = m_CacheStats.RpcRequests; const uint64_t RpcRecordRequests = m_CacheStats.RpcRecordRequests; const uint64_t RpcRecordBatchRequests = m_CacheStats.RpcRecordBatchRequests; const uint64_t RpcValueRequests = m_CacheStats.RpcValueRequests; const uint64_t RpcValueBatchRequests = m_CacheStats.RpcValueBatchRequests; const uint64_t RpcChunkRequests = m_CacheStats.RpcChunkRequests; const uint64_t RpcChunkBatchRequests = m_CacheStats.RpcChunkBatchRequests; const CidStoreSize CidSize = m_CidStore.TotalSize(); const GcStorageSize CacheSize = m_CacheStore.StorageSize(); bool ShowCidStoreStats = Request.GetQueryParams().GetValue("cidstorestats") == "true"; bool ShowCacheStoreStats = Request.GetQueryParams().GetValue("cachestorestats") == "true"; CidStoreStats CidStoreStats = {}; if (ShowCidStoreStats) { CidStoreStats = m_CidStore.Stats(); } ZenCacheStore::CacheStoreStats CacheStoreStats = {}; if (ShowCacheStoreStats) { CacheStoreStats = m_CacheStore.Stats(); } Cbo.BeginObject("cache"); { Cbo << "badrequestcount" << BadRequestCount; Cbo.BeginObject("rpc"); Cbo << "count" << RpcRequests; Cbo << "ops" << RpcRecordBatchRequests + RpcValueBatchRequests + RpcChunkBatchRequests; Cbo.BeginObject("records"); Cbo << "count" << RpcRecordRequests; Cbo << "ops" << RpcRecordBatchRequests; Cbo.EndObject(); Cbo.BeginObject("values"); Cbo << "count" << RpcValueRequests; Cbo << "ops" << RpcValueBatchRequests; Cbo.EndObject(); Cbo.BeginObject("chunks"); Cbo << "count" << RpcChunkRequests; Cbo << "ops" << RpcChunkBatchRequests; Cbo.EndObject(); Cbo.EndObject(); Cbo.BeginObject("size"); { Cbo << "disk" << CacheSize.DiskSize; Cbo << "memory" << CacheSize.MemorySize; } Cbo.EndObject(); Cbo << "hits" << HitCount << "misses" << MissCount << "writes" << WriteCount; Cbo << "hit_ratio" << (TotalCount > 0 ? (double(HitCount) / double(TotalCount)) : 0.0); if (m_UpstreamCache.IsActive()) { Cbo << "upstream_ratio" << (HitCount > 0 ? (double(UpstreamHitCount) / double(HitCount)) : 0.0); Cbo << "upstream_hits" << m_CacheStats.UpstreamHitCount; Cbo << "upstream_ratio" << (HitCount > 0 ? (double(UpstreamHitCount) / double(HitCount)) : 0.0); Cbo << "upstream_ratio" << (HitCount > 0 ? (double(UpstreamHitCount) / double(HitCount)) : 0.0); } Cbo << "cidhits" << ChunkHitCount << "cidmisses" << ChunkMissCount << "cidwrites" << ChunkWriteCount; if (ShowCacheStoreStats) { Cbo.BeginObject("store"); Cbo << "hits" << CacheStoreStats.HitCount << "misses" << CacheStoreStats.MissCount << "writes" << CacheStoreStats.WriteCount << "rejected_writes" << CacheStoreStats.RejectedWriteCount << "rejected_reads" << CacheStoreStats.RejectedReadCount; const uint64_t StoreTotal = CacheStoreStats.HitCount + CacheStoreStats.MissCount; Cbo << "hit_ratio" << (StoreTotal > 0 ? (double(CacheStoreStats.HitCount) / double(StoreTotal)) : 0.0); EmitSnapshot("read", CacheStoreStats.GetOps, Cbo); EmitSnapshot("write", CacheStoreStats.PutOps, Cbo); if (!CacheStoreStats.NamespaceStats.empty()) { Cbo.BeginArray("namespaces"); for (const ZenCacheStore::NamedNamespaceStats& NamespaceStats : CacheStoreStats.NamespaceStats) { Cbo.BeginObject(); Cbo.AddString("namespace", NamespaceStats.NamespaceName); Cbo << "hits" << NamespaceStats.Stats.HitCount << "misses" << NamespaceStats.Stats.MissCount << "writes" << NamespaceStats.Stats.WriteCount; const uint64_t NamespaceTotal = NamespaceStats.Stats.HitCount + NamespaceStats.Stats.MissCount; Cbo << "hit_ratio" << (NamespaceTotal > 0 ? (double(NamespaceStats.Stats.HitCount) / double(NamespaceTotal)) : 0.0); EmitSnapshot("read", NamespaceStats.Stats.GetOps, Cbo); EmitSnapshot("write", NamespaceStats.Stats.PutOps, Cbo); Cbo.BeginObject("size"); { Cbo << "disk" << NamespaceStats.Stats.DiskStats.DiskSize; Cbo << "memory" << NamespaceStats.Stats.DiskStats.MemorySize; } Cbo.EndObject(); if (!NamespaceStats.Stats.DiskStats.BucketStats.empty()) { Cbo.BeginArray("buckets"); for (const ZenCacheDiskLayer::NamedBucketStats& BucketStats : NamespaceStats.Stats.DiskStats.BucketStats) { Cbo.BeginObject(); Cbo.AddString("bucket", BucketStats.BucketName); if (BucketStats.Stats.DiskSize != 0 || BucketStats.Stats.MemorySize != 0) { Cbo.BeginObject("size"); { Cbo << "disk" << BucketStats.Stats.DiskSize; Cbo << "memory" << BucketStats.Stats.MemorySize; } Cbo.EndObject(); } if (BucketStats.Stats.DiskSize == 0 && BucketStats.Stats.DiskHitCount == 0 && BucketStats.Stats.DiskMissCount == 0 && BucketStats.Stats.DiskWriteCount == 0 && BucketStats.Stats.MemoryHitCount == 0 && BucketStats.Stats.MemoryMissCount == 0 && BucketStats.Stats.MemoryWriteCount == 0) { Cbo.EndObject(); continue; } const uint64_t BucketDiskTotal = BucketStats.Stats.DiskHitCount + BucketStats.Stats.DiskMissCount; if (BucketDiskTotal != 0 || BucketStats.Stats.DiskWriteCount != 0) { Cbo << "hits" << BucketStats.Stats.DiskHitCount << "misses" << BucketStats.Stats.DiskMissCount << "writes" << BucketStats.Stats.DiskWriteCount; Cbo << "hit_ratio" << (BucketDiskTotal > 0 ? (double(BucketStats.Stats.DiskHitCount) / double(BucketDiskTotal)) : 0.0); } const uint64_t BucketMemoryTotal = BucketStats.Stats.MemoryHitCount + BucketStats.Stats.MemoryMissCount; if (BucketMemoryTotal != 0 || BucketStats.Stats.MemoryWriteCount != 0) { Cbo << "mem_hits" << BucketStats.Stats.MemoryHitCount << "mem_misses" << BucketStats.Stats.MemoryMissCount << "mem_writes" << BucketStats.Stats.MemoryWriteCount; Cbo << "mem_hit_ratio" << (BucketMemoryTotal > 0 ? (double(BucketStats.Stats.MemoryHitCount) / double(BucketMemoryTotal)) : 0.0); } if (BucketDiskTotal != 0 || BucketStats.Stats.DiskWriteCount != 0 || BucketMemoryTotal != 0 || BucketStats.Stats.MemoryWriteCount != 0) { EmitSnapshot("read", BucketStats.Stats.GetOps, Cbo); EmitSnapshot("write", BucketStats.Stats.PutOps, Cbo); } Cbo.EndObject(); } Cbo.EndArray(); } Cbo.EndObject(); } Cbo.EndArray(); } Cbo.EndObject(); } Cbo.EndObject(); } if (m_UpstreamCache.IsActive()) { EmitSnapshot("upstream_gets", m_UpstreamGetRequestTiming, Cbo); Cbo.BeginObject("upstream"); { m_UpstreamCache.GetStatus(Cbo); } Cbo.EndObject(); } Cbo.BeginObject("cid"); { Cbo.BeginObject("size"); { Cbo << "tiny" << CidSize.TinySize; Cbo << "small" << CidSize.SmallSize; Cbo << "large" << CidSize.LargeSize; Cbo << "total" << CidSize.TotalSize; } Cbo.EndObject(); if (ShowCidStoreStats) { Cbo.BeginObject("store"); Cbo << "hits" << CidStoreStats.HitCount << "misses" << CidStoreStats.MissCount << "writes" << CidStoreStats.WriteCount; EmitSnapshot("read", CidStoreStats.FindChunkOps, Cbo); EmitSnapshot("write", CidStoreStats.AddChunkOps, Cbo); // EmitSnapshot("exists", CidStoreStats.ContainChunkOps, Cbo); Cbo.EndObject(); } } Cbo.EndObject(); Request.WriteResponse(HttpResponseCode::OK, Cbo.Save()); } void HttpStructuredCacheService::HandleStatusRequest(HttpServerRequest& Request) { CbObjectWriter Cbo; Cbo << "ok" << true; Request.WriteResponse(HttpResponseCode::OK, Cbo.Save()); } bool HttpStructuredCacheService::AreDiskWritesAllowed() const { return (m_DiskWriteBlocker == nullptr || m_DiskWriteBlocker->AreDiskWritesAllowed()); } } // namespace zen