From 3efc2ddb02511300cd6dfe59cd89ca4338f6ec4c Mon Sep 17 00:00:00 2001 From: Per Larsson Date: Thu, 11 Nov 2021 15:05:11 +0100 Subject: Handle batch requests asynchronously. --- zenserver/cache/structuredcache.cpp | 264 ++++++++++++++++-------------------- 1 file changed, 120 insertions(+), 144 deletions(-) (limited to 'zenserver/cache/structuredcache.cpp') diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index 632368062..10fbb3709 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -795,21 +795,22 @@ HttpStructuredCacheService::HandleBatchRequest(zen::HttpServerRequest& Request) return Request.WriteResponse(HttpResponseCode::BadRequest); } - CbObject BatchRequest = zen::LoadCompactBinaryObject(Request.ReadPayload()); - const std::string_view Method = BatchRequest["Method"sv].AsString(); - - if (Method == "GetCacheRecords"sv) - { - HandleBatchGetCacheRecords(Request, BatchRequest); - } - else if (Method == "GetCachePayloads"sv) - { - HandleBatchGetCachePayloads(Request, BatchRequest); - } - else - { - Request.WriteResponse(HttpResponseCode::BadRequest); - } + Request.WriteResponseAsync( + [this, BatchRequest = zen::LoadCompactBinaryObject(Request.ReadPayload())](HttpServerRequest& AsyncRequest) { + const std::string_view Method = BatchRequest["Method"sv].AsString(); + if (Method == "GetCacheRecords"sv) + { + HandleBatchGetCacheRecords(AsyncRequest, BatchRequest); + } + else if (Method == "GetCachePayloads"sv) + { + HandleBatchGetCachePayloads(AsyncRequest, BatchRequest); + } + else + { + AsyncRequest.WriteResponse(HttpResponseCode::BadRequest); + } + }); } break; default: @@ -823,27 +824,24 @@ HttpStructuredCacheService::HandleBatchGetCacheRecords(zen::HttpServerRequest& R { using namespace fmt::literals; - const std::string_view Method = BatchRequest["Method"sv].AsString(); - ZEN_ASSERT(Method == "GetCacheRecords"sv); + CbPackage BatchResponse; + CacheRecordPolicy Policy; + CbObjectView Params = BatchRequest["Params"sv].AsObjectView(); + std::vector CacheKeys; + std::vector CacheValues; + std::vector UpstreamRequests; - CbObjectView Params = BatchRequest["Params"sv].AsObjectView(); + ZEN_ASSERT(BatchRequest["Method"sv].AsString() == "GetCacheRecords"sv); - CacheRecordPolicy Policy; CacheRecordPolicy::Load(Params["Policy"sv].AsObjectView(), Policy); const bool SkipAttachments = (Policy.GetRecordPolicy() & CachePolicy::SkipAttachments) == CachePolicy::SkipAttachments; const bool QueryRemote = m_UpstreamCache && ((Policy.GetRecordPolicy() & CachePolicy::QueryRemote) == CachePolicy::QueryRemote); - std::vector CacheKeys; - std::vector CacheValues; - std::vector Payloads; - std::vector UpstreamRequests; - std::vector Missing; - - for (CbFieldView QueryView : Params["CacheKeys"sv]) + for (CbFieldView KeyView : Params["CacheKeys"sv]) { - CbObjectView Query = QueryView.AsObjectView(); - CacheKeys.push_back(CacheKey::Create(Query["Bucket"sv].AsString(), Query["Hash"sv].AsHash())); + CbObjectView KeyObject = KeyView.AsObjectView(); + CacheKeys.push_back(CacheKey::Create(KeyObject["Bucket"sv].AsString(), KeyObject["Hash"sv].AsHash())); } if (CacheKeys.empty()) @@ -853,11 +851,6 @@ HttpStructuredCacheService::HandleBatchGetCacheRecords(zen::HttpServerRequest& R CacheValues.resize(CacheKeys.size()); - if (!SkipAttachments) - { - Payloads.reserve(CacheKeys.size()); - } - for (size_t KeyIndex = 0; const CacheKey& Key : CacheKeys) { ZenCacheValue CacheValue; @@ -867,10 +860,10 @@ HttpStructuredCacheService::HandleBatchGetCacheRecords(zen::HttpServerRequest& R if (!SkipAttachments) { - CacheRecord.IterateAttachments([this, &Payloads](CbFieldView AttachmentHash) { + CacheRecord.IterateAttachments([this, &BatchResponse](CbFieldView AttachmentHash) { if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash())) { - Payloads.push_back(Chunk); + BatchResponse.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk)))); } }); } @@ -890,7 +883,8 @@ HttpStructuredCacheService::HandleBatchGetCacheRecords(zen::HttpServerRequest& R } else { - Missing.push_back(KeyIndex); + ZEN_DEBUG("MISS - '{}/{}'", Key.Bucket, Key.Hash); + m_CacheStats.MissCount++; } ++KeyIndex; @@ -899,19 +893,19 @@ HttpStructuredCacheService::HandleBatchGetCacheRecords(zen::HttpServerRequest& R if (!UpstreamRequests.empty() && m_UpstreamCache) { const auto OnCacheRecordGetComplete = - [this, &CacheKeys, &CacheValues, &Payloads, &Missing, SkipAttachments](CacheRecordGetCompleteParams&& Params) { + [this, &CacheKeys, &CacheValues, &BatchResponse, SkipAttachments](CacheRecordGetCompleteParams&& Params) { if (Params.Record) { Params.Record.IterateAttachments([&](CbFieldView AttachmentHash) { if (const CbAttachment* Attachment = Params.Package.FindAttachment(AttachmentHash.AsHash())) { - if (CompressedBuffer Chunk = Attachment->AsCompressedBinary()) + if (CompressedBuffer Compressed = Attachment->AsCompressedBinary()) { - m_CidStore.AddChunk(Chunk); + m_CidStore.AddChunk(Compressed); if (!SkipAttachments) { - Payloads.push_back(Chunk.GetCompressed().Flatten().AsIoBuffer()); + BatchResponse.AddAttachment(CbAttachment(Compressed)); } } } @@ -924,51 +918,40 @@ HttpStructuredCacheService::HandleBatchGetCacheRecords(zen::HttpServerRequest& R ToString(HttpContentType::kCbObject)); CacheValues[Params.KeyIndex] = IoBufferBuilder::MakeCloneFromMemory(Params.Record.GetView()); + m_CacheStats.HitCount++; m_CacheStats.UpstreamHitCount++; } else { - Missing.push_back(Params.KeyIndex); + ZEN_DEBUG("MISS - '{}/{}'", Params.CacheKey.Bucket, Params.CacheKey.Hash); + m_CacheStats.MissCount++; } }; m_UpstreamCache->GetCacheRecords(CacheKeys, UpstreamRequests, Policy, std::move(OnCacheRecordGetComplete)); } - for (size_t KeyIndex : Missing) - { - const CacheKey& Key = CacheKeys[KeyIndex]; - ZEN_DEBUG("MISS - '{}/{}'", Key.Bucket, Key.Hash); - m_CacheStats.MissCount++; - } + CbObjectWriter ResponseObject; - CbObjectWriter BatchResponse; - - BatchResponse.BeginArray("Result"sv); + ResponseObject.BeginArray("Result"sv); for (const IoBuffer& Value : CacheValues) { if (Value) { CbObjectView Record(Value.Data()); - BatchResponse << Record; + ResponseObject << Record; } else { - BatchResponse.AddNull(); + ResponseObject.AddNull(); } } - BatchResponse.EndArray(); + ResponseObject.EndArray(); - CbPackage Package; - Package.SetObject(BatchResponse.Save()); - - for (const IoBuffer& Payload : Payloads) - { - Package.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Payload)))); - } + BatchResponse.SetObject(ResponseObject.Save()); BinaryWriter MemStream; - Package.Save(MemStream); + BatchResponse.Save(MemStream); Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, @@ -980,84 +963,84 @@ HttpStructuredCacheService::HandleBatchGetCachePayloads(zen::HttpServerRequest& { using namespace fmt::literals; - const auto GetChunkIdFromPayloadId = [](CbObjectView Record, const Oid& PayloadId) -> IoHash { - if (CbObjectView ValueObject = Record["Value"].AsObjectView()) - { - const Oid Id = ValueObject["Id"].AsObjectId(); - if (Id == PayloadId) - { - return ValueObject["RawHash"sv].AsHash(); - } - } - - for (CbFieldView AttachmentView : Record["Attachments"sv]) - { - CbObjectView AttachmentObject = AttachmentView.AsObjectView(); - const Oid Id = AttachmentObject["Id"].AsObjectId(); - - if (Id == PayloadId) - { - return AttachmentObject["RawHash"sv].AsHash(); - } - } - - return IoHash::Zero; - }; - - const std::string_view Method = BatchRequest["Method"sv].AsString(); - ZEN_ASSERT(Method == "GetCachePayloads"sv); - - CbObjectView Params = BatchRequest["Params"sv].AsObjectView(); + ZEN_ASSERT(BatchRequest["Method"sv].AsString() == "GetCachePayloads"sv); std::vector ChunkRequests; std::vector UpstreamRequests; - std::vector Missing; + std::vector Chunks; + CbObjectView Params = BatchRequest["Params"sv].AsObjectView(); - for (CbFieldView ChunkView : Params["ChunkRequests"sv]) + for (CbFieldView RequestView : Params["ChunkRequests"sv]) { - CbObjectView Chunk = ChunkView.AsObjectView(); - CbObjectView CacheKeyObject = Chunk["Key"sv].AsObjectView(); - const CacheKey Key = CacheKey::Create(CacheKeyObject["Bucket"sv].AsString(), CacheKeyObject["Hash"sv].AsHash()); - const IoHash ChunkId = IoHash::Zero; - const Oid PayloadId = Chunk["PayloadId"sv].AsObjectId(); - const uint64_t RawOffset = Chunk["RawoffSet"sv].AsUInt64(); - const uint64_t RawSize = Chunk["RawSize"sv].AsUInt64(); - const uint32_t ChunkPolicy = Chunk["Policy"sv].AsUInt32(); + CbObjectView RequestObject = RequestView.AsObjectView(); + CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView(); + const CacheKey Key = CacheKey::Create(KeyObject["Bucket"sv].AsString(), KeyObject["Hash"sv].AsHash()); + const IoHash ChunkId = IoHash::Zero; + const Oid PayloadId = RequestObject["PayloadId"sv].AsObjectId(); + const uint64_t RawOffset = RequestObject["RawoffSet"sv].AsUInt64(); + const uint64_t RawSize = RequestObject["RawSize"sv].AsUInt64(); + const uint32_t ChunkPolicy = RequestObject["Policy"sv].AsUInt32(); ChunkRequests.emplace_back(Key, ChunkId, PayloadId, RawOffset, RawSize, static_cast(ChunkPolicy)); } - std::stable_sort(ChunkRequests.begin(), ChunkRequests.end()); + if (ChunkRequests.empty()) + { + return Request.WriteResponse(HttpResponseCode::BadRequest); + } - CacheKey CurrentKey = CacheKey::Empty; - IoBuffer CurrentRecordBuffer; + Chunks.resize(ChunkRequests.size()); - for (CacheChunkRequest& ChunkRequest : ChunkRequests) + // Try to find the uncompressed raw hash from the payload ID. { - if (ChunkRequest.Key != CurrentKey) - { - CurrentKey = ChunkRequest.Key; + const auto GetChunkIdFromPayloadId = [](CbObjectView Record, const Oid& PayloadId) -> IoHash { + if (CbObjectView ValueObject = Record["Value"].AsObjectView()) + { + const Oid Id = ValueObject["Id"].AsObjectId(); + if (Id == PayloadId) + { + return ValueObject["RawHash"sv].AsHash(); + } + } - ZenCacheValue CacheValue; - if (m_CacheStore.Get(ChunkRequest.Key.Bucket, ChunkRequest.Key.Hash, CacheValue)) + for (CbFieldView AttachmentView : Record["Attachments"sv]) { - CurrentRecordBuffer = CacheValue.Value; + CbObjectView AttachmentObject = AttachmentView.AsObjectView(); + const Oid Id = AttachmentObject["Id"].AsObjectId(); + + if (Id == PayloadId) + { + return AttachmentObject["RawHash"sv].AsHash(); + } } - } - if (CurrentRecordBuffer) + return IoHash::Zero; + }; + + CacheKey CurrentKey = CacheKey::Empty; + IoBuffer CurrentRecordBuffer; + + std::stable_sort(ChunkRequests.begin(), ChunkRequests.end()); + + for (CacheChunkRequest& ChunkRequest : ChunkRequests) { - ChunkRequest.ChunkId = GetChunkIdFromPayloadId(CbObjectView(CurrentRecordBuffer.GetData()), ChunkRequest.PayloadId); - } - } + if (ChunkRequest.Key != CurrentKey) + { + CurrentKey = ChunkRequest.Key; - if (ChunkRequests.empty()) - { - return Request.WriteResponse(HttpResponseCode::BadRequest); - } + ZenCacheValue CacheValue; + if (m_CacheStore.Get(ChunkRequest.Key.Bucket, ChunkRequest.Key.Hash, CacheValue)) + { + CurrentRecordBuffer = CacheValue.Value; + } + } - std::vector Chunks; - Chunks.resize(ChunkRequests.size()); + if (CurrentRecordBuffer) + { + ChunkRequest.ChunkId = GetChunkIdFromPayloadId(CbObjectView(CurrentRecordBuffer.GetData()), ChunkRequest.PayloadId); + } + } + } for (size_t RequestIndex = 0; const CacheChunkRequest& ChunkRequest : ChunkRequests) { @@ -1085,7 +1068,8 @@ HttpStructuredCacheService::HandleBatchGetCachePayloads(zen::HttpServerRequest& } else { - Missing.push_back(RequestIndex); + ZEN_DEBUG("MISS - '{}/{}/{}'", ChunkRequest.Key.Bucket, ChunkRequest.Key.Hash, ChunkRequest.ChunkId); + m_CacheStats.MissCount++; } } else @@ -1098,18 +1082,16 @@ HttpStructuredCacheService::HandleBatchGetCachePayloads(zen::HttpServerRequest& if (!UpstreamRequests.empty() && m_UpstreamCache) { - const auto OnCachePayloadGetComplete = [this, &ChunkRequests, &Chunks, &Missing](CachePayloadGetCompleteParams&& Params) { - if (Params.Payload) + const auto OnCachePayloadGetComplete = [this, &ChunkRequests, &Chunks](CachePayloadGetCompleteParams&& Params) { + if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Params.Payload))) { - CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Params.Payload)); - auto InsertResult = m_CidStore.AddChunk(Compressed); + auto InsertResult = m_CidStore.AddChunk(Compressed); - ZEN_DEBUG("HIT - '{}/{}/{}' {} '{}' ({})", + ZEN_DEBUG("HIT - '{}/{}/{}' {} ({})", Params.Request.Key.Bucket, Params.Request.Key.Hash, Params.Request.ChunkId, NiceBytes(Params.Payload.GetSize()), - ToString(Params.Payload.GetContentType()), "UPSTREAM"); Chunks[Params.RequestIndex] = std::move(Params.Payload); @@ -1119,43 +1101,37 @@ HttpStructuredCacheService::HandleBatchGetCachePayloads(zen::HttpServerRequest& } else { - Missing.push_back(Params.RequestIndex); + ZEN_DEBUG("MISS - '{}/{}/{}'", Params.Request.Key.Bucket, Params.Request.Key.Hash, Params.Request.ChunkId); + m_CacheStats.MissCount++; } }; m_UpstreamCache->GetCachePayloads(ChunkRequests, UpstreamRequests, std::move(OnCachePayloadGetComplete)); } - for (size_t RequestIndex : Missing) - { - const CacheChunkRequest& ChunkRequest = ChunkRequests[RequestIndex]; - ZEN_DEBUG("MISS - '{}/{}/{}'", ChunkRequest.Key.Bucket, ChunkRequest.Key.Hash, ChunkRequest.ChunkId); - m_CacheStats.MissCount++; - } - - CbPackage Package; - CbObjectWriter BatchResponse; + CbPackage BatchResponse; + CbObjectWriter ResponseObject; - BatchResponse.BeginArray("Result"sv); + ResponseObject.BeginArray("Result"sv); for (size_t ChunkIndex = 0; ChunkIndex < Chunks.size(); ++ChunkIndex) { if (Chunks[ChunkIndex]) { - BatchResponse << ChunkRequests[ChunkIndex].ChunkId; - Package.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(std::move(Chunks[ChunkIndex]))))); + ResponseObject << ChunkRequests[ChunkIndex].ChunkId; + BatchResponse.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(std::move(Chunks[ChunkIndex]))))); } else { - BatchResponse << IoHash::Zero; + ResponseObject << IoHash::Zero; } } - BatchResponse.EndArray(); + ResponseObject.EndArray(); - Package.SetObject(BatchResponse.Save()); + BatchResponse.SetObject(ResponseObject.Save()); BinaryWriter MemStream; - Package.Save(MemStream); + BatchResponse.Save(MemStream); Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, -- cgit v1.2.3