diff options
Diffstat (limited to 'zenserver/cache/structuredcache.cpp')
| -rw-r--r-- | zenserver/cache/structuredcache.cpp | 581 |
1 files changed, 581 insertions, 0 deletions
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index 11a23242f..dfed42aa5 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -1535,6 +1535,237 @@ namespace impl { return HttpResponseCode::OK; } + HttpResponseCode GetCacheChunks(ZenCacheStoreBase& CacheStore, + CidStoreBase& CidStore, + UpstreamCache& UpstreamCache, + std::atomic_uint64_t&, + std::atomic_uint64_t&, + std::atomic_uint64_t&, + cacherequests::GetCacheChunksRequest& Request, + const cacherequests::ChunksRequestPolicy& Policy, + cacherequests::GetCacheChunksResult& OutResult) + { + auto GetChunkIdsFromCacheRecord = [&Request](size_t RequestIndex, const ZenCacheValue& RecordCacheValue) -> size_t { + if (RecordCacheValue.Value.GetContentType() != ZenContentType::kCbObject) + { + ZEN_WARN("local record {}/{}/{} is not a structured object, skipping.", + Request.Namespace, + Request.Requests[RequestIndex].Key.Bucket, + Request.Requests[RequestIndex].Key.Hash); + return 0; + } + cacherequests::CacheRecord CacheRecord; + CbObject Record = LoadCompactBinaryObject(RecordCacheValue.Value); + if (!CacheRecord.Parse(Record)) + { + ZEN_WARN("local record {}/{}/{} is corrupt, skipping", + Request.Namespace, + Request.Requests[RequestIndex].Key.Bucket, + Request.Requests[RequestIndex].Key.Hash); + return 0; + } + size_t ChunkCount = 0; + while (CacheRecord.Key == Request.Requests[RequestIndex + ChunkCount].Key) + { + cacherequests::GetCacheChunkRequest& ChunkRequest = Request.Requests[RequestIndex + ChunkCount]; + auto FindIt = std::find_if( + CacheRecord.Values.begin(), + CacheRecord.Values.end(), + [&ChunkRequest](const cacherequests::CacheRecordValue& Value) { return Value.Id == ChunkRequest.ValueId; }); + if (FindIt != CacheRecord.Values.end()) + { + ChunkRequest.ChunkId = FindIt->RawHash; + ChunkRequest.RawSize = FindIt->RawSize; + } + ChunkCount++; + } + return ChunkCount; + }; + + // Figure out which records we need, locally we just get them, upstream we request them + std::vector<size_t> UpstreamRecordRequestIndexes; + size_t RequestCount = Request.Requests.size(); + { + size_t RequestIndex = 0; + while (RequestIndex < RequestCount) + { + cacherequests::GetCacheChunkRequest& ChunkRequest = Request.Requests[RequestIndex]; + // cacherequests::CacheValueResult& ChunkResult = OutResult.Results[RequestIndex]; + CachePolicy ChunkPolicy = cacherequests::GetEffectiveChunkPolicy(Policy, RequestIndex); + const bool HasValueId = ChunkRequest.ValueId != Oid::Zero; + const bool HasChunkId = ChunkRequest.ChunkId != IoHash::Zero; + const bool QueryLocal = EnumHasAnyFlags(ChunkPolicy, CachePolicy::QueryLocal); + // const bool SkipData = EnumHasAnyFlags(ChunkPolicy, CachePolicy::SkipData); + const bool QueryRemote = EnumHasAnyFlags(ChunkPolicy, CachePolicy::QueryRemote); + const bool GetInlineCacheValue = !HasValueId && !HasChunkId; + + if (GetInlineCacheValue) + { + RequestIndex++; + continue; + } + if (HasChunkId) + { + RequestIndex++; + continue; + } + ZenCacheValue RecordCacheValue; + if (QueryLocal && CacheStore.Get(Request.Namespace, ChunkRequest.Key.Bucket, ChunkRequest.Key.Hash, RecordCacheValue)) + { + size_t ResolvedChunkCount = GetChunkIdsFromCacheRecord(RequestIndex, RecordCacheValue); + if (ResolvedChunkCount != 0) + { + RequestIndex += ResolvedChunkCount; + continue; + } + } + if (QueryRemote) + { + UpstreamRecordRequestIndexes.push_back(RequestIndex); + size_t ChunkCount = 1; + while (Request.Requests[RequestIndex].Key == Request.Requests[RequestIndex + ChunkCount].Key) + { + ChunkCount++; + } + RequestIndex += ChunkCount; + continue; + } + } + } + + if (!UpstreamRecordRequestIndexes.empty()) + { + // We need to do GetCacheValue for the actual record - we don't want to fetch cache records... + cacherequests::GetCacheRecordsRequest RecordsRequest = {.Namespace = Request.Namespace}; + RecordsRequest.Requests.reserve(UpstreamRecordRequestIndexes.size()); + + for (size_t RequestIndex : UpstreamRecordRequestIndexes) + { + cacherequests::GetCacheChunkRequest& ChunkRequest = Request.Requests[RequestIndex]; + RecordsRequest.Requests.push_back(ChunkRequest.Key); + } + + cacherequests::RecordsRequestPolicy RecordsRequestPolicy = {.DefaultPolicy = Policy.DefaultPolicy | CachePolicy::SkipData | + CachePolicy::SkipMeta}; + cacherequests::GetCacheRecordsResult RecordsResult; + RecordsRequestPolicy.RecordPolicies.resize(UpstreamRecordRequestIndexes.size()); + UpstreamCache.GetCacheRecords(RecordsRequest, RecordsRequestPolicy, RecordsResult, {}); + + size_t ResultIndex = 0; + for (size_t RequestIndex : UpstreamRecordRequestIndexes) + { + const std::optional<cacherequests::GetCacheRecordResult>& RecordResult = RecordsResult.Results[ResultIndex++]; + if (!RecordResult) + { + continue; + } + ZEN_ASSERT(RecordResult->Key == Request.Requests[RequestIndex].Key); + + while (RecordResult->Key == Request.Requests[RequestIndex].Key) + { + cacherequests::GetCacheChunkRequest& ChunkRequest = Request.Requests[RequestIndex]; + auto FindIt = std::find_if(RecordResult->Values.begin(), + RecordResult->Values.end(), + [&ChunkRequest](const cacherequests::GetCacheRecordResultValue& Value) { + return Value.Id == ChunkRequest.ValueId; + }); + if (FindIt != RecordResult->Values.end()) + { + ChunkRequest.ChunkId = FindIt->RawHash; + ChunkRequest.RawSize = FindIt->RawSize; + } + RequestIndex++; + } + } + } + + // Now we should have any ChunkId we can get for all requests + + std::vector<size_t> UpstreamChunkIndexes; + { + for (size_t RequestIndex = 0; RequestIndex < RequestCount; ++RequestIndex) + { + cacherequests::GetCacheChunkRequest& ChunkRequest = Request.Requests[RequestIndex]; + cacherequests::CacheValueResult& ChunkResult = OutResult.Results[RequestIndex]; + CachePolicy ChunkPolicy = cacherequests::GetEffectiveChunkPolicy(Policy, RequestIndex); + const bool HasValueId = ChunkRequest.ValueId != Oid::Zero; + const bool HasChunkId = ChunkRequest.ChunkId != IoHash::Zero; + const bool QueryLocal = EnumHasAnyFlags(ChunkPolicy, CachePolicy::QueryLocal); + const bool SkipData = EnumHasAnyFlags(ChunkPolicy, CachePolicy::SkipData); + const bool QueryRemote = EnumHasAnyFlags(ChunkPolicy, CachePolicy::QueryRemote); + const bool GetInlineCacheValue = !HasValueId && !HasChunkId; + + if (GetInlineCacheValue) + { + if (QueryLocal) + { + ZenCacheValue RecordCacheValue; + if (CacheStore.Get(Request.Namespace, ChunkRequest.Key.Bucket, ChunkRequest.Key.Hash, RecordCacheValue)) + { + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(RecordCacheValue.Value)); + if (!Compressed) + { + Compressed = CompressedBuffer::Compress(SharedBuffer(RecordCacheValue.Value)); + } + if (!SkipData) + { + ChunkResult.Body = Compressed; + } + ChunkResult.RawSize = Compressed.GetRawSize(); + ChunkResult.RawHash = IoHash::FromBLAKE3(Compressed.GetRawHash()); + continue; + } + } + if (QueryRemote) + { + UpstreamChunkIndexes.push_back(RequestIndex); + } + continue; + } + if (!HasChunkId) + { + // Miss + continue; + } + if (QueryLocal) + { + if (SkipData && ChunkRequest.RawSize != ~uint64_t(0)) + { + if (CidStore.ContainsChunk(ChunkRequest.ChunkId)) + { + // Hit + ChunkResult.RawSize = ChunkRequest.RawSize; + ChunkResult.RawHash = ChunkRequest.ChunkId; + continue; + } + } + ZenCacheValue RecordCacheValue; + if (IoBuffer Chunk = CidStore.FindChunkByCid(ChunkRequest.ChunkId); Chunk) + { + // Hit + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Chunk)); + if (!Compressed) + { + Compressed = CompressedBuffer::Compress(SharedBuffer(RecordCacheValue.Value)); + } + if (!SkipData) + { + ChunkResult.Body = Compressed; + } + ChunkResult.RawSize = Compressed.GetRawSize(); + ChunkResult.RawHash = IoHash::FromBLAKE3(Compressed.GetRawHash()); + continue; + } + } + if (QueryRemote) + { + UpstreamChunkIndexes.push_back(RequestIndex); + } + } + } + return HttpResponseCode::OK; + } + } // namespace impl void @@ -1919,9 +2150,359 @@ namespace cache::detail { } // namespace cache::detail +static bool UseHandleRpcGetCacheChunksNew = true; + +void +HttpStructuredCacheService::HandleRpcGetCacheChunksNew(zen::HttpServerRequest& HttpRequest, CbObjectView RpcRequest) +{ + using namespace cache::detail; + + ZEN_TRACE_CPU("Z$::RpcGetCacheChunks"); + + cacherequests::GetCacheChunksRequest Request; + cacherequests::ChunksRequestPolicy Policy; + if (!Request.Parse(RpcRequest, Policy)) + { + return HttpRequest.WriteResponse(HttpResponseCode::BadRequest); + } + cacherequests::GetCacheChunksResult Result; + Result.Results.resize(Request.Requests.size()); + + impl::GetCacheChunks(m_CacheStore, + m_CidStore, + m_UpstreamCache, + m_CacheStats.HitCount, + m_CacheStats.UpstreamHitCount, + m_CacheStats.MissCount, + Request, + Policy, + Result); + +#if 0 + + std::vector<size_t> UpstreamChunkIndexes; + UpstreamChunkIndexes.reserve(Request.Requests.size()); + std::vector<CacheKey> UpstreamRecords; + std::vector<size_t> MissingChunksIndexes; + UpstreamRecords.reserve(Request.Requests.size()); + cacherequests::CacheRecord CurrentCacheRecord; + + // Requests should be sorted by Key.Hash - it currently does not care about bucket, but is that an issue? + + // Should we just start by checking for which records we need and handle that first and the run through the cache values themselves? + + for (size_t RequestIndex = 0; RequestIndex < Request.Requests.size(); ++RequestIndex) + { + cacherequests::GetCacheChunkRequest& ChunkRequest = Request.Requests[RequestIndex]; + cacherequests::CacheValueResult& ChunkResult = Result.Results[RequestIndex]; + CachePolicy ChunkPolicy = cacherequests::GetEffectiveChunkPolicy(Policy, RequestIndex); + const bool HasValueId = ChunkRequest.ValueId != Oid::Zero; + const bool HasChunkId = ChunkRequest.ChunkId != IoHash::Zero; + const bool QueryLocal = EnumHasAnyFlags(ChunkPolicy, CachePolicy::QueryLocal); + const bool SkipData = EnumHasAnyFlags(ChunkPolicy, CachePolicy::SkipData); + const bool QueryRemote = EnumHasAnyFlags(ChunkPolicy, CachePolicy::QueryRemote); + const bool GetInlineCacheValue = !HasValueId && !HasChunkId; + if (GetInlineCacheValue) + { + if (QueryLocal) + { + ZenCacheValue RecordCacheValue; + if (m_CacheStore.Get(Request.Namespace, ChunkRequest.Key.Bucket, ChunkRequest.Key.Hash, RecordCacheValue)) + { + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(RecordCacheValue.Value)); + if (!Compressed) + { + Compressed = CompressedBuffer::Compress(SharedBuffer(RecordCacheValue.Value)); + } + if (!SkipData) + { + ChunkResult.Body = Compressed; + } + ChunkResult.RawSize = Compressed.GetRawSize(); + ChunkResult.RawHash = IoHash::FromBLAKE3(Compressed.GetRawHash()); + continue; + } + } + if (QueryRemote) + { + UpstreamChunkIndexes.push_back(RequestIndex); + } + continue; + } + + if (!HasChunkId) + { + ZEN_ASSERT_SLOW(HasValueId); + if (CurrentCacheRecord.Key != ChunkRequest.Key) + { + CurrentCacheRecord.Values.clear(); + + ZenCacheValue RecordCacheValue; + if (QueryLocal && m_CacheStore.Get(Request.Namespace, ChunkRequest.Key.Bucket, ChunkRequest.Key.Hash, RecordCacheValue)) + { + if (RecordCacheValue.Value.GetContentType() != ZenContentType::kCbObject) + { + ZEN_WARN("local record {}/{}/{} is not a structured object, skipping.", + Request.Namespace, + ChunkRequest.Key.Bucket, + ChunkRequest.Key.Hash); + // continue; + } + CbObject Record = LoadCompactBinaryObject(RecordCacheValue.Value); + if (!CurrentCacheRecord.Parse(Record)) + { + ZEN_WARN("local record {}/{}/{} is corrupt, skipping", + Request.Namespace, + ChunkRequest.Key.Bucket, + ChunkRequest.Key.Hash); + // continue; + } + CurrentCacheRecord.Key = ChunkRequest.Key; + } + else + { + if (QueryRemote && (UpstreamRecords.empty() || UpstreamRecords.back() != ChunkRequest.Key)) + { + UpstreamRecords.push_back(ChunkRequest.Key); + } + } + } + + auto FindIt = + std::find_if(CurrentCacheRecord.Values.begin(), + CurrentCacheRecord.Values.end(), + [&ChunkRequest](const cacherequests::CacheRecordValue& Value) { return Value.Id == ChunkRequest.ValueId; }); + if (FindIt == CurrentCacheRecord.Values.end()) + { + MissingChunksIndexes.push_back(RequestIndex); + continue; + } + ZEN_ASSERT(FindIt->RawHash != IoHash::Zero); + ChunkRequest.ChunkId = FindIt->RawHash; + ChunkRequest.RawSize = FindIt->RawSize; + } + if (QueryLocal) + { + ZenCacheValue RecordCacheValue; + if (m_CacheStore.Get(Request.Namespace, ChunkRequest.Key.Bucket, ChunkRequest.Key.Hash, RecordCacheValue)) + { + // Parse + + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(RecordCacheValue.Value)); + if (!Compressed) + { + Compressed = CompressedBuffer::Compress(SharedBuffer(RecordCacheValue.Value)); + } + if (!SkipData) + { + ChunkResult.Body = Compressed; + } + ChunkResult.RawSize = Compressed.GetRawSize(); + ChunkResult.RawHash = IoHash::FromBLAKE3(Compressed.GetRawHash()); + continue; + } + } + if (QueryRemote) + { + UpstreamChunkIndexes.push_back(RequestIndex); + } + } + + if (!UpstreamRecords.empty()) + { + // We need to do GetCacheValue for the actual record - we don't want to fetch cache records... + cacherequests::GetCacheRecordsRequest RecordsRequest = {.Namespace = Request.Namespace, .Requests = UpstreamRecords}; + cacherequests::RecordsRequestPolicy RecordsRequestPolicy = {.DefaultPolicy = Policy.DefaultPolicy | CachePolicy::SkipData | CachePolicy::SkipMeta}; cacherequests::GetCacheRecordsResult RecordsResult; + RecordsRequestPolicy.RecordPolicies.resize(UpstreamRecords.size()); + m_UpstreamCache.GetCacheRecords(RecordsRequest, RecordsRequestPolicy, RecordsResult, {}); + size_t MissingChunkOffset = 0; + for (const auto& RecordResult : RecordsResult.Results) + { + if (RecordResult) + { + while (MissingChunkOffset < MissingChunksIndexes.size()) + { + cacherequests::GetCacheChunkRequest& ChunkRequest = Request.Requests[MissingChunksIndexes[MissingChunkOffset]]; + if (ChunkRequest.Key != RecordResult->Key) + { + auto FindIt = std::find_if( + CurrentCacheRecord.Values.begin(), + CurrentCacheRecord.Values.end(), + [&ChunkRequest](const cacherequests::CacheRecordValue& Value) { return Value.Id == ChunkRequest.ValueId; }); + if (FindIt == CurrentCacheRecord.Values.end()) + { + // The ValueId is not part of the the Record, ignore it + continue; + } + ZEN_ASSERT(FindIt->RawHash != IoHash::Zero); + ChunkRequest.ChunkId = FindIt->RawHash; + ChunkRequest.RawSize = FindIt->RawSize; + break; + } + MissingChunkOffset++; + } + } + } + // Fill in + for (size_t Index = 0; Index < Policy.ChunkPolicies.size(); ++Index) + { + + } + RecordsRequest.Requests = UpstreamRecords; + } + + // Try to fetch records locally + // Fill in RawHash for any chunk request matching the found cache record + // Add any RecordKeys to UpstreamRecords for any records that does not exist locally + // Request all UpstreamRecords from upstream + // Fill in RawHash for any chunk request matching the found cache record + // Run through all requests and fetch the chunks we need where we have filled in RawHash + // If chunk is not found locally, add to UpstreamChunkIndexes and try to fetch from upstream + +# if 0 + { + { + // Try to get local record, if we can't find it add the chunk request to fetch from upstream - if we do, will me miss + // opportunity to find payload locally? + if (QueryLocal) + { + ZenCacheValue RecordCacheValue; + if (m_CacheStore.Get(Request.Namespace, ChunkRequest.Key.Bucket, ChunkRequest.Key.Hash, RecordCacheValue)) + { + // Parse + + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(RecordCacheValue.Value)); + if (!Compressed) + { + Compressed = CompressedBuffer::Compress(SharedBuffer(RecordCacheValue.Value)); + } + if (!SkipData) + { + ChunkResult.Body = Compressed; + } + ChunkResult.RawSize = Compressed.GetRawSize(); + ChunkResult.RawHash = IoHash::FromBLAKE3(Compressed.GetRawHash()); + continue; + } + } + + if (QueryRemote) + { + // We need the record to be able to fill this request + if (UpstreamRecords.empty() || UpstreamRecords.back() != ChunkRequest.Key) + { + UpstreamRecords.push_back(ChunkRequest.Key); + } + } + continue; + } + } +# endif +# if 0 + { + if (QueryLocal) + { + if (SkipData && Result.Results[RequestIndex].RawSize != 0) + { + if (m_CidStore.ContainsChunk(ChunkRequest.ChunkId)) + { + continue; + } + else + { + // Fail! Not found! + Result.Results[RequestIndex].RawSize = 0; + Result.Results[RequestIndex].RawHash = IoHash::Zero; + } + } + IoBuffer Value = m_CidStore.FindChunkByCid(ChunkRequest.ChunkId); + if (Value) + { + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Value)); + if (!Compressed) + { + Compressed = CompressedBuffer::Compress(SharedBuffer(Value)); + } + if (Result.Results[RequestIndex].RawHash != IoHash::FromBLAKE3(Compressed.GetRawHash())) + { + // Fail! Not found! + Result.Results[RequestIndex].RawSize = 0; + Result.Results[RequestIndex].RawHash = IoHash::Zero; + continue; + } + Result.Results[RequestIndex].RawSize = Compressed.GetRawSize(); + if (!SkipData) + { + Result.Results[RequestIndex].Body = Compressed; + } + continue; + } + } + if (QueryRemote) + { + UpstreamChunkIndexes.push_back(RequestIndex); + } + } +# endif + + if (!UpstreamRecords.empty()) + { + // We need to do GetCacheValue for the actual record - we don't want to fetch cache records... + // cacherequests::GetCacheRecordsRequest RecordsRequest = {.Namespace = Request.Namespace, .Requests = UpstreamRecords}; + // cacherequests::RecordsRequestPolicy RecordsRequestPolicy = {.DefaultPolicy = Policy.DefaultPolicy | CachePolicy::SkipData + //| CachePolicy::SkipMeta}; cacherequests::GetCacheRecordsResult RecordsResult; + // RecordsRequestPolicy.RecordPolicies.resize(UpstreamRecords.size()); + // m_UpstreamCache.GetCacheRecords(RecordsRequest, RecordsRequestPolicy, RecordsResult, {}); + // // Fill in + // for (size_t Index = 0; Index < Policy.ChunkPolicies.size(); ++Index) + // { + // + // } + // RecordsRequest.Requests = UpstreamRecords; + } + + if (!UpstreamChunkIndexes.empty()) + { +// m_UpstreamCache.GetCacheChunks(Request.Namespace, Request, Policy, UpstreamChunkIndexes); + } +#endif + std::string Namespace; + std::vector<CacheKeyRequest> RecordKeys; // Data about a Record necessary to identify it to the upstream + std::vector<RecordBody> Records; // Scratch-space data about a Record when fulfilling RecordRequests + std::vector<CacheChunkRequest> RequestKeys; // Data about a ChunkRequest necessary to identify it to the upstream + std::vector<ChunkRequest> Requests; // Intermediate and result data about a ChunkRequest + std::vector<ChunkRequest*> RecordRequests; // The ChunkRequests that are requesting a subvalue from a Record Key + std::vector<ChunkRequest*> ValueRequests; // The ChunkRequests that are requesting a Value Key + std::vector<CacheChunkRequest*> UpstreamChunks; // ChunkRequests that we need to send to the upstream + + // Parse requests from the CompactBinary body of the RpcRequest and divide it into RecordRequests and ValueRequests + if (!ParseGetCacheChunksRequest(Namespace, RecordKeys, Records, RequestKeys, Requests, RecordRequests, ValueRequests, RpcRequest)) + { + return HttpRequest.WriteResponse(HttpResponseCode::BadRequest); + } + + // For each Record request, load the Record if necessary to find the Chunk's ContentId, load its Payloads if we + // have it locally, and otherwise append a request for the payload to UpstreamChunks + GetLocalCacheRecords(Namespace, RecordKeys, Records, RecordRequests, UpstreamChunks); + + // For each Value request, load the Value if we have it locally and otherwise append a request for the payload to UpstreamChunks + GetLocalCacheValues(Namespace, ValueRequests, UpstreamChunks); + + // Call GetCacheChunks on the upstream for any payloads we do not have locally + GetUpstreamCacheChunks(Namespace, UpstreamChunks, RequestKeys, Requests); + + // Send the payload and descriptive data about each chunk to the client + WriteGetCacheChunksResponse(Namespace, Requests, HttpRequest); +} + void HttpStructuredCacheService::HandleRpcGetCacheChunks(zen::HttpServerRequest& HttpRequest, CbObjectView RpcRequest) { + if (UseHandleRpcGetCacheChunksNew) + { + HandleRpcGetCacheChunksNew(HttpRequest, RpcRequest); + return; + } using namespace cache::detail; ZEN_TRACE_CPU("Z$::RpcGetCacheChunks"); |