aboutsummaryrefslogtreecommitdiff
path: root/zenserver/cache/structuredcache.cpp
diff options
context:
space:
mode:
authorPer Larsson <[email protected]>2021-11-11 15:05:11 +0100
committerPer Larsson <[email protected]>2021-11-11 15:05:11 +0100
commit3efc2ddb02511300cd6dfe59cd89ca4338f6ec4c (patch)
tree8f71dc1901cf48bca97a1f17b43947fe02304508 /zenserver/cache/structuredcache.cpp
parentCorrect content type when invoking RPC. (diff)
downloadzen-3efc2ddb02511300cd6dfe59cd89ca4338f6ec4c.tar.xz
zen-3efc2ddb02511300cd6dfe59cd89ca4338f6ec4c.zip
Handle batch requests asynchronously.
Diffstat (limited to 'zenserver/cache/structuredcache.cpp')
-rw-r--r--zenserver/cache/structuredcache.cpp264
1 files changed, 120 insertions, 144 deletions
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp
index 632368062..10fbb3709 100644
--- a/zenserver/cache/structuredcache.cpp
+++ b/zenserver/cache/structuredcache.cpp
@@ -795,21 +795,22 @@ HttpStructuredCacheService::HandleBatchRequest(zen::HttpServerRequest& Request)
return Request.WriteResponse(HttpResponseCode::BadRequest);
}
- CbObject BatchRequest = zen::LoadCompactBinaryObject(Request.ReadPayload());
- const std::string_view Method = BatchRequest["Method"sv].AsString();
-
- if (Method == "GetCacheRecords"sv)
- {
- HandleBatchGetCacheRecords(Request, BatchRequest);
- }
- else if (Method == "GetCachePayloads"sv)
- {
- HandleBatchGetCachePayloads(Request, BatchRequest);
- }
- else
- {
- Request.WriteResponse(HttpResponseCode::BadRequest);
- }
+ Request.WriteResponseAsync(
+ [this, BatchRequest = zen::LoadCompactBinaryObject(Request.ReadPayload())](HttpServerRequest& AsyncRequest) {
+ const std::string_view Method = BatchRequest["Method"sv].AsString();
+ if (Method == "GetCacheRecords"sv)
+ {
+ HandleBatchGetCacheRecords(AsyncRequest, BatchRequest);
+ }
+ else if (Method == "GetCachePayloads"sv)
+ {
+ HandleBatchGetCachePayloads(AsyncRequest, BatchRequest);
+ }
+ else
+ {
+ AsyncRequest.WriteResponse(HttpResponseCode::BadRequest);
+ }
+ });
}
break;
default:
@@ -823,27 +824,24 @@ HttpStructuredCacheService::HandleBatchGetCacheRecords(zen::HttpServerRequest& R
{
using namespace fmt::literals;
- const std::string_view Method = BatchRequest["Method"sv].AsString();
- ZEN_ASSERT(Method == "GetCacheRecords"sv);
+ CbPackage BatchResponse;
+ CacheRecordPolicy Policy;
+ CbObjectView Params = BatchRequest["Params"sv].AsObjectView();
+ std::vector<CacheKey> CacheKeys;
+ std::vector<IoBuffer> CacheValues;
+ std::vector<size_t> UpstreamRequests;
- CbObjectView Params = BatchRequest["Params"sv].AsObjectView();
+ ZEN_ASSERT(BatchRequest["Method"sv].AsString() == "GetCacheRecords"sv);
- CacheRecordPolicy Policy;
CacheRecordPolicy::Load(Params["Policy"sv].AsObjectView(), Policy);
const bool SkipAttachments = (Policy.GetRecordPolicy() & CachePolicy::SkipAttachments) == CachePolicy::SkipAttachments;
const bool QueryRemote = m_UpstreamCache && ((Policy.GetRecordPolicy() & CachePolicy::QueryRemote) == CachePolicy::QueryRemote);
- std::vector<CacheKey> CacheKeys;
- std::vector<IoBuffer> CacheValues;
- std::vector<IoBuffer> Payloads;
- std::vector<size_t> UpstreamRequests;
- std::vector<size_t> Missing;
-
- for (CbFieldView QueryView : Params["CacheKeys"sv])
+ for (CbFieldView KeyView : Params["CacheKeys"sv])
{
- CbObjectView Query = QueryView.AsObjectView();
- CacheKeys.push_back(CacheKey::Create(Query["Bucket"sv].AsString(), Query["Hash"sv].AsHash()));
+ CbObjectView KeyObject = KeyView.AsObjectView();
+ CacheKeys.push_back(CacheKey::Create(KeyObject["Bucket"sv].AsString(), KeyObject["Hash"sv].AsHash()));
}
if (CacheKeys.empty())
@@ -853,11 +851,6 @@ HttpStructuredCacheService::HandleBatchGetCacheRecords(zen::HttpServerRequest& R
CacheValues.resize(CacheKeys.size());
- if (!SkipAttachments)
- {
- Payloads.reserve(CacheKeys.size());
- }
-
for (size_t KeyIndex = 0; const CacheKey& Key : CacheKeys)
{
ZenCacheValue CacheValue;
@@ -867,10 +860,10 @@ HttpStructuredCacheService::HandleBatchGetCacheRecords(zen::HttpServerRequest& R
if (!SkipAttachments)
{
- CacheRecord.IterateAttachments([this, &Payloads](CbFieldView AttachmentHash) {
+ CacheRecord.IterateAttachments([this, &BatchResponse](CbFieldView AttachmentHash) {
if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash()))
{
- Payloads.push_back(Chunk);
+ BatchResponse.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk))));
}
});
}
@@ -890,7 +883,8 @@ HttpStructuredCacheService::HandleBatchGetCacheRecords(zen::HttpServerRequest& R
}
else
{
- Missing.push_back(KeyIndex);
+ ZEN_DEBUG("MISS - '{}/{}'", Key.Bucket, Key.Hash);
+ m_CacheStats.MissCount++;
}
++KeyIndex;
@@ -899,19 +893,19 @@ HttpStructuredCacheService::HandleBatchGetCacheRecords(zen::HttpServerRequest& R
if (!UpstreamRequests.empty() && m_UpstreamCache)
{
const auto OnCacheRecordGetComplete =
- [this, &CacheKeys, &CacheValues, &Payloads, &Missing, SkipAttachments](CacheRecordGetCompleteParams&& Params) {
+ [this, &CacheKeys, &CacheValues, &BatchResponse, SkipAttachments](CacheRecordGetCompleteParams&& Params) {
if (Params.Record)
{
Params.Record.IterateAttachments([&](CbFieldView AttachmentHash) {
if (const CbAttachment* Attachment = Params.Package.FindAttachment(AttachmentHash.AsHash()))
{
- if (CompressedBuffer Chunk = Attachment->AsCompressedBinary())
+ if (CompressedBuffer Compressed = Attachment->AsCompressedBinary())
{
- m_CidStore.AddChunk(Chunk);
+ m_CidStore.AddChunk(Compressed);
if (!SkipAttachments)
{
- Payloads.push_back(Chunk.GetCompressed().Flatten().AsIoBuffer());
+ BatchResponse.AddAttachment(CbAttachment(Compressed));
}
}
}
@@ -924,51 +918,40 @@ HttpStructuredCacheService::HandleBatchGetCacheRecords(zen::HttpServerRequest& R
ToString(HttpContentType::kCbObject));
CacheValues[Params.KeyIndex] = IoBufferBuilder::MakeCloneFromMemory(Params.Record.GetView());
+ m_CacheStats.HitCount++;
m_CacheStats.UpstreamHitCount++;
}
else
{
- Missing.push_back(Params.KeyIndex);
+ ZEN_DEBUG("MISS - '{}/{}'", Params.CacheKey.Bucket, Params.CacheKey.Hash);
+ m_CacheStats.MissCount++;
}
};
m_UpstreamCache->GetCacheRecords(CacheKeys, UpstreamRequests, Policy, std::move(OnCacheRecordGetComplete));
}
- for (size_t KeyIndex : Missing)
- {
- const CacheKey& Key = CacheKeys[KeyIndex];
- ZEN_DEBUG("MISS - '{}/{}'", Key.Bucket, Key.Hash);
- m_CacheStats.MissCount++;
- }
+ CbObjectWriter ResponseObject;
- CbObjectWriter BatchResponse;
-
- BatchResponse.BeginArray("Result"sv);
+ ResponseObject.BeginArray("Result"sv);
for (const IoBuffer& Value : CacheValues)
{
if (Value)
{
CbObjectView Record(Value.Data());
- BatchResponse << Record;
+ ResponseObject << Record;
}
else
{
- BatchResponse.AddNull();
+ ResponseObject.AddNull();
}
}
- BatchResponse.EndArray();
+ ResponseObject.EndArray();
- CbPackage Package;
- Package.SetObject(BatchResponse.Save());
-
- for (const IoBuffer& Payload : Payloads)
- {
- Package.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Payload))));
- }
+ BatchResponse.SetObject(ResponseObject.Save());
BinaryWriter MemStream;
- Package.Save(MemStream);
+ BatchResponse.Save(MemStream);
Request.WriteResponse(HttpResponseCode::OK,
HttpContentType::kCbPackage,
@@ -980,84 +963,84 @@ HttpStructuredCacheService::HandleBatchGetCachePayloads(zen::HttpServerRequest&
{
using namespace fmt::literals;
- const auto GetChunkIdFromPayloadId = [](CbObjectView Record, const Oid& PayloadId) -> IoHash {
- if (CbObjectView ValueObject = Record["Value"].AsObjectView())
- {
- const Oid Id = ValueObject["Id"].AsObjectId();
- if (Id == PayloadId)
- {
- return ValueObject["RawHash"sv].AsHash();
- }
- }
-
- for (CbFieldView AttachmentView : Record["Attachments"sv])
- {
- CbObjectView AttachmentObject = AttachmentView.AsObjectView();
- const Oid Id = AttachmentObject["Id"].AsObjectId();
-
- if (Id == PayloadId)
- {
- return AttachmentObject["RawHash"sv].AsHash();
- }
- }
-
- return IoHash::Zero;
- };
-
- const std::string_view Method = BatchRequest["Method"sv].AsString();
- ZEN_ASSERT(Method == "GetCachePayloads"sv);
-
- CbObjectView Params = BatchRequest["Params"sv].AsObjectView();
+ ZEN_ASSERT(BatchRequest["Method"sv].AsString() == "GetCachePayloads"sv);
std::vector<CacheChunkRequest> ChunkRequests;
std::vector<size_t> UpstreamRequests;
- std::vector<size_t> Missing;
+ std::vector<IoBuffer> Chunks;
+ CbObjectView Params = BatchRequest["Params"sv].AsObjectView();
- for (CbFieldView ChunkView : Params["ChunkRequests"sv])
+ for (CbFieldView RequestView : Params["ChunkRequests"sv])
{
- CbObjectView Chunk = ChunkView.AsObjectView();
- CbObjectView CacheKeyObject = Chunk["Key"sv].AsObjectView();
- const CacheKey Key = CacheKey::Create(CacheKeyObject["Bucket"sv].AsString(), CacheKeyObject["Hash"sv].AsHash());
- const IoHash ChunkId = IoHash::Zero;
- const Oid PayloadId = Chunk["PayloadId"sv].AsObjectId();
- const uint64_t RawOffset = Chunk["RawoffSet"sv].AsUInt64();
- const uint64_t RawSize = Chunk["RawSize"sv].AsUInt64();
- const uint32_t ChunkPolicy = Chunk["Policy"sv].AsUInt32();
+ CbObjectView RequestObject = RequestView.AsObjectView();
+ CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView();
+ const CacheKey Key = CacheKey::Create(KeyObject["Bucket"sv].AsString(), KeyObject["Hash"sv].AsHash());
+ const IoHash ChunkId = IoHash::Zero;
+ const Oid PayloadId = RequestObject["PayloadId"sv].AsObjectId();
+ const uint64_t RawOffset = RequestObject["RawoffSet"sv].AsUInt64();
+ const uint64_t RawSize = RequestObject["RawSize"sv].AsUInt64();
+ const uint32_t ChunkPolicy = RequestObject["Policy"sv].AsUInt32();
ChunkRequests.emplace_back(Key, ChunkId, PayloadId, RawOffset, RawSize, static_cast<CachePolicy>(ChunkPolicy));
}
- std::stable_sort(ChunkRequests.begin(), ChunkRequests.end());
+ if (ChunkRequests.empty())
+ {
+ return Request.WriteResponse(HttpResponseCode::BadRequest);
+ }
- CacheKey CurrentKey = CacheKey::Empty;
- IoBuffer CurrentRecordBuffer;
+ Chunks.resize(ChunkRequests.size());
- for (CacheChunkRequest& ChunkRequest : ChunkRequests)
+ // Try to find the uncompressed raw hash from the payload ID.
{
- if (ChunkRequest.Key != CurrentKey)
- {
- CurrentKey = ChunkRequest.Key;
+ const auto GetChunkIdFromPayloadId = [](CbObjectView Record, const Oid& PayloadId) -> IoHash {
+ if (CbObjectView ValueObject = Record["Value"].AsObjectView())
+ {
+ const Oid Id = ValueObject["Id"].AsObjectId();
+ if (Id == PayloadId)
+ {
+ return ValueObject["RawHash"sv].AsHash();
+ }
+ }
- ZenCacheValue CacheValue;
- if (m_CacheStore.Get(ChunkRequest.Key.Bucket, ChunkRequest.Key.Hash, CacheValue))
+ for (CbFieldView AttachmentView : Record["Attachments"sv])
{
- CurrentRecordBuffer = CacheValue.Value;
+ CbObjectView AttachmentObject = AttachmentView.AsObjectView();
+ const Oid Id = AttachmentObject["Id"].AsObjectId();
+
+ if (Id == PayloadId)
+ {
+ return AttachmentObject["RawHash"sv].AsHash();
+ }
}
- }
- if (CurrentRecordBuffer)
+ return IoHash::Zero;
+ };
+
+ CacheKey CurrentKey = CacheKey::Empty;
+ IoBuffer CurrentRecordBuffer;
+
+ std::stable_sort(ChunkRequests.begin(), ChunkRequests.end());
+
+ for (CacheChunkRequest& ChunkRequest : ChunkRequests)
{
- ChunkRequest.ChunkId = GetChunkIdFromPayloadId(CbObjectView(CurrentRecordBuffer.GetData()), ChunkRequest.PayloadId);
- }
- }
+ if (ChunkRequest.Key != CurrentKey)
+ {
+ CurrentKey = ChunkRequest.Key;
- if (ChunkRequests.empty())
- {
- return Request.WriteResponse(HttpResponseCode::BadRequest);
- }
+ ZenCacheValue CacheValue;
+ if (m_CacheStore.Get(ChunkRequest.Key.Bucket, ChunkRequest.Key.Hash, CacheValue))
+ {
+ CurrentRecordBuffer = CacheValue.Value;
+ }
+ }
- std::vector<IoBuffer> Chunks;
- Chunks.resize(ChunkRequests.size());
+ if (CurrentRecordBuffer)
+ {
+ ChunkRequest.ChunkId = GetChunkIdFromPayloadId(CbObjectView(CurrentRecordBuffer.GetData()), ChunkRequest.PayloadId);
+ }
+ }
+ }
for (size_t RequestIndex = 0; const CacheChunkRequest& ChunkRequest : ChunkRequests)
{
@@ -1085,7 +1068,8 @@ HttpStructuredCacheService::HandleBatchGetCachePayloads(zen::HttpServerRequest&
}
else
{
- Missing.push_back(RequestIndex);
+ ZEN_DEBUG("MISS - '{}/{}/{}'", ChunkRequest.Key.Bucket, ChunkRequest.Key.Hash, ChunkRequest.ChunkId);
+ m_CacheStats.MissCount++;
}
}
else
@@ -1098,18 +1082,16 @@ HttpStructuredCacheService::HandleBatchGetCachePayloads(zen::HttpServerRequest&
if (!UpstreamRequests.empty() && m_UpstreamCache)
{
- const auto OnCachePayloadGetComplete = [this, &ChunkRequests, &Chunks, &Missing](CachePayloadGetCompleteParams&& Params) {
- if (Params.Payload)
+ const auto OnCachePayloadGetComplete = [this, &ChunkRequests, &Chunks](CachePayloadGetCompleteParams&& Params) {
+ if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Params.Payload)))
{
- CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Params.Payload));
- auto InsertResult = m_CidStore.AddChunk(Compressed);
+ auto InsertResult = m_CidStore.AddChunk(Compressed);
- ZEN_DEBUG("HIT - '{}/{}/{}' {} '{}' ({})",
+ ZEN_DEBUG("HIT - '{}/{}/{}' {} ({})",
Params.Request.Key.Bucket,
Params.Request.Key.Hash,
Params.Request.ChunkId,
NiceBytes(Params.Payload.GetSize()),
- ToString(Params.Payload.GetContentType()),
"UPSTREAM");
Chunks[Params.RequestIndex] = std::move(Params.Payload);
@@ -1119,43 +1101,37 @@ HttpStructuredCacheService::HandleBatchGetCachePayloads(zen::HttpServerRequest&
}
else
{
- Missing.push_back(Params.RequestIndex);
+ ZEN_DEBUG("MISS - '{}/{}/{}'", Params.Request.Key.Bucket, Params.Request.Key.Hash, Params.Request.ChunkId);
+ m_CacheStats.MissCount++;
}
};
m_UpstreamCache->GetCachePayloads(ChunkRequests, UpstreamRequests, std::move(OnCachePayloadGetComplete));
}
- for (size_t RequestIndex : Missing)
- {
- const CacheChunkRequest& ChunkRequest = ChunkRequests[RequestIndex];
- ZEN_DEBUG("MISS - '{}/{}/{}'", ChunkRequest.Key.Bucket, ChunkRequest.Key.Hash, ChunkRequest.ChunkId);
- m_CacheStats.MissCount++;
- }
-
- CbPackage Package;
- CbObjectWriter BatchResponse;
+ CbPackage BatchResponse;
+ CbObjectWriter ResponseObject;
- BatchResponse.BeginArray("Result"sv);
+ ResponseObject.BeginArray("Result"sv);
for (size_t ChunkIndex = 0; ChunkIndex < Chunks.size(); ++ChunkIndex)
{
if (Chunks[ChunkIndex])
{
- BatchResponse << ChunkRequests[ChunkIndex].ChunkId;
- Package.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(std::move(Chunks[ChunkIndex])))));
+ ResponseObject << ChunkRequests[ChunkIndex].ChunkId;
+ BatchResponse.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(std::move(Chunks[ChunkIndex])))));
}
else
{
- BatchResponse << IoHash::Zero;
+ ResponseObject << IoHash::Zero;
}
}
- BatchResponse.EndArray();
+ ResponseObject.EndArray();
- Package.SetObject(BatchResponse.Save());
+ BatchResponse.SetObject(ResponseObject.Save());
BinaryWriter MemStream;
- Package.Save(MemStream);
+ BatchResponse.Save(MemStream);
Request.WriteResponse(HttpResponseCode::OK,
HttpContentType::kCbPackage,