diff options
Diffstat (limited to 'zenserver/cache/structuredcache.cpp')
| -rw-r--r-- | zenserver/cache/structuredcache.cpp | 181 |
1 files changed, 84 insertions, 97 deletions
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index faef3eb12..7e44b82a2 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -635,16 +635,8 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request } void -HttpStructuredCacheService::HandleCachePayloadRequest(HttpServerRequest& Request, CacheRef& Ref, CachePolicy Policy) +HttpStructuredCacheService::HandleCachePayloadRequest(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy) { - // Note: the URL references the uncompressed payload hash - so this maintains the mapping - // from uncompressed CAS identity (aka CID/Content ID) to the stored payload hash - // - // this is a PITA but a consequence of the fact that the client side code is not able to - // address data by compressed hash - - ZEN_UNUSED(Policy); - switch (auto Verb = Request.RequestVerb()) { using enum HttpVerb; @@ -652,112 +644,107 @@ HttpStructuredCacheService::HandleCachePayloadRequest(HttpServerRequest& Request case kHead: case kGet: { - IoBuffer Payload = m_CidStore.FindChunkByCid(Ref.PayloadId); - bool InUpstreamCache = false; - - if (!Payload && m_UpstreamCache) - { - if (auto UpstreamResult = m_UpstreamCache->GetCachePayload({{Ref.BucketSegment, Ref.HashKey}, Ref.PayloadId}); - UpstreamResult.Success) - { - if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Value))) - { - Payload = UpstreamResult.Value; - IoHash ChunkHash = IoHash::HashBuffer(Payload); - CasStore::InsertResult Result = m_CasStore.InsertChunk(Payload, ChunkHash); - InUpstreamCache = true; - - m_CidStore.AddCompressedCid(Ref.PayloadId, ChunkHash); - } - else - { - ZEN_WARN("got uncompressed upstream cache payload"); - } - } - } - - if (!Payload) - { - ZEN_DEBUG("MISS - '{}/{}/{}'", Ref.BucketSegment, Ref.HashKey, Ref.PayloadId); - return Request.WriteResponse(HttpResponseCode::NotFound); - } - - ZEN_DEBUG("HIT - '{}/{}/{}' {} (type: {}) ({})", - Ref.BucketSegment, - Ref.HashKey, - Ref.PayloadId, - NiceBytes(Payload.Size()), - Payload.GetContentType(), - InUpstreamCache ? "UPSTREAM" : "LOCAL"); - + HandleGetCachePayload(Request, Ref, Policy); if (Verb == kHead) { Request.SetSuppressResponseBody(); } - - return Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, Payload); } break; - case kPut: + HandlePutCachePayload(Request, Ref, Policy); + break; + default: + break; + } +} + +void +HttpStructuredCacheService::HandleGetCachePayload(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy) +{ + IoBuffer Payload = m_CidStore.FindChunkByCid(Ref.PayloadId); + bool InUpstreamCache = false; + const bool QueryUpstream = !Payload && m_UpstreamCache && (CachePolicy::QueryRemote == (Policy & CachePolicy::QueryRemote)); + + if (QueryUpstream) + { + if (auto UpstreamResult = m_UpstreamCache->GetCachePayload({{Ref.BucketSegment, Ref.HashKey}, Ref.PayloadId}); + UpstreamResult.Success) + { + if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Value))) { - if (IoBuffer Body = Request.ReadPayload()) - { - if (Body.Size() == 0) - { - return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Empty payload not permitted"); - } + Payload = UpstreamResult.Value; + IoHash ChunkHash = IoHash::HashBuffer(Payload); + CasStore::InsertResult Result = m_CasStore.InsertChunk(Payload, ChunkHash); + InUpstreamCache = true; + + m_CidStore.AddCompressedCid(Ref.PayloadId, ChunkHash); + } + else + { + ZEN_WARN("got uncompressed upstream cache payload"); + } + } + } - IoHash ChunkHash = IoHash::HashBuffer(Body); + if (!Payload) + { + ZEN_DEBUG("MISS - '{}/{}/{}'", Ref.BucketSegment, Ref.HashKey, Ref.PayloadId); + return Request.WriteResponse(HttpResponseCode::NotFound); + } - CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Body)); + ZEN_DEBUG("HIT - '{}/{}/{}' {} (type: {}) ({})", + Ref.BucketSegment, + Ref.HashKey, + Ref.PayloadId, + NiceBytes(Payload.Size()), + Payload.GetContentType(), + InUpstreamCache ? "UPSTREAM" : "LOCAL"); - if (!Compressed) - { - // All attachment payloads need to be in compressed buffer format - return Request.WriteResponse(HttpResponseCode::BadRequest, - HttpContentType::kText, - "Attachments must be compressed"); - } - else - { - if (IoHash::FromBLAKE3(Compressed.GetRawHash()) != Ref.PayloadId) - { - // the URL specified content id and content hashes don't match! - return Request.WriteResponse(HttpResponseCode::BadRequest); - } + Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, Payload); +} - CasStore::InsertResult Result = m_CasStore.InsertChunk(Body, ChunkHash); +void +HttpStructuredCacheService::HandlePutCachePayload(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy) +{ + // Note: Individual cache payloads are not propagated upstream until a valid cache record has been stored + ZEN_UNUSED(Policy); - m_CidStore.AddCompressedCid(Ref.PayloadId, ChunkHash); + IoBuffer Body = Request.ReadPayload(); - ZEN_DEBUG("PUT - payload '{}/{}/{}' {} (type: {}) {}", - Ref.BucketSegment, - Ref.HashKey, - Ref.PayloadId, - NiceBytes(Body.Size()), - Body.GetContentType(), - Result.New ? "NEW" : "OLD"); + if (!Body || Body.Size() == 0) + { + return Request.WriteResponse(HttpResponseCode::BadRequest); + } - if (Result.New) - { - return Request.WriteResponse(HttpResponseCode::Created); - } - else - { - return Request.WriteResponse(HttpResponseCode::OK); - } - } - } - } - break; + IoHash ChunkHash = IoHash::HashBuffer(Body); + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Body)); - case kPost: - break; + if (!Compressed) + { + return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Attachments must be compressed"sv); + } - default: - break; + if (IoHash::FromBLAKE3(Compressed.GetRawHash()) != Ref.PayloadId) + { + return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Payload ID does not match attachment hash"sv); } + + CasStore::InsertResult Result = m_CasStore.InsertChunk(Body, ChunkHash); + + m_CidStore.AddCompressedCid(Ref.PayloadId, ChunkHash); + + ZEN_DEBUG("PUT - payload '{}/{}/{}' {} (type: {}) {}", + Ref.BucketSegment, + Ref.HashKey, + Ref.PayloadId, + NiceBytes(Body.Size()), + Body.GetContentType(), + Result.New ? "NEW" : "OLD"); + + const HttpResponseCode ResponseCode = Result.New ? HttpResponseCode::Created : HttpResponseCode::OK; + + Request.WriteResponse(ResponseCode); } bool |