aboutsummaryrefslogtreecommitdiff
path: root/zenserver/cache/structuredcache.cpp
diff options
context:
space:
mode:
authorPer Larsson <[email protected]>2021-09-21 15:10:39 +0200
committerPer Larsson <[email protected]>2021-09-21 15:10:39 +0200
commit9a92f2e73fb186d1fce69c5d784ba441e39fb9ec (patch)
tree1f74cd00751fe1c35f7a02446e4f61e8be727e68 /zenserver/cache/structuredcache.cpp
parentClang format fix. (diff)
downloadzen-9a92f2e73fb186d1fce69c5d784ba441e39fb9ec.tar.xz
zen-9a92f2e73fb186d1fce69c5d784ba441e39fb9ec.zip
Refactored out get/set cache record.
Diffstat (limited to 'zenserver/cache/structuredcache.cpp')
-rw-r--r--zenserver/cache/structuredcache.cpp644
1 files changed, 297 insertions, 347 deletions
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp
index 3743bdc53..87ec10520 100644
--- a/zenserver/cache/structuredcache.cpp
+++ b/zenserver/cache/structuredcache.cpp
@@ -244,7 +244,7 @@ HttpStructuredCacheService::HandleCacheBucketRequest(HttpServerRequest& Request,
}
void
-HttpStructuredCacheService::HandleCacheRecordRequest(HttpServerRequest& Request, CacheRef& Ref, CachePolicy Policy)
+HttpStructuredCacheService::HandleCacheRecordRequest(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy)
{
switch (auto Verb = Request.RequestVerb())
{
@@ -253,425 +253,375 @@ HttpStructuredCacheService::HandleCacheRecordRequest(HttpServerRequest& Request,
case kHead:
case kGet:
{
- const ZenContentType AcceptType = Request.AcceptContentType();
+ HandleGetCacheRecord(Request, Ref, Policy);
+ if (Verb == kHead)
+ {
+ Request.SetSuppressResponseBody();
+ }
+ }
+ break;
+ case kPut:
+ HandlePutCacheRecord(Request, Ref, Policy);
+ break;
+ default:
+ break;
+ }
+}
- ZenCacheValue Value;
- bool Success = m_CacheStore.Get(Ref.BucketSegment, Ref.HashKey, /* out */ Value);
- bool InUpstreamCache = false;
+void
+HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy)
+{
+ const ZenContentType AcceptType = Request.AcceptContentType();
- const bool QueryUpstream = !Success && m_UpstreamCache && (CachePolicy::QueryRemote == (Policy & CachePolicy::QueryRemote));
+ ZenCacheValue Value;
+ bool Success = m_CacheStore.Get(Ref.BucketSegment, Ref.HashKey, /* out */ Value);
+ bool InUpstreamCache = false;
- if (QueryUpstream)
- {
- const ZenContentType CacheRecordType = Ref.BucketSegment == "legacy"sv ? ZenContentType::kBinary
- : AcceptType == ZenContentType::kCbPackage ? ZenContentType::kCbPackage
- : ZenContentType::kCbObject;
+ const bool QueryUpstream = !Success && m_UpstreamCache && (CachePolicy::QueryRemote == (Policy & CachePolicy::QueryRemote));
- if (auto UpstreamResult = m_UpstreamCache->GetCacheRecord({Ref.BucketSegment, Ref.HashKey}, CacheRecordType);
- UpstreamResult.Success)
- {
- Value.Value = UpstreamResult.Value;
- Success = true;
- InUpstreamCache = true;
+ if (QueryUpstream)
+ {
+ const ZenContentType CacheRecordType = Ref.BucketSegment == "legacy"sv ? ZenContentType::kBinary
+ : AcceptType == ZenContentType::kCbPackage ? ZenContentType::kCbPackage
+ : ZenContentType::kCbObject;
- if (CacheRecordType == ZenContentType::kBinary || CacheRecordType == ZenContentType::kCbObject)
- {
- if (CacheRecordType == ZenContentType::kCbObject)
- {
- const CbValidateError ValidationResult = ValidateCompactBinary(UpstreamResult.Value, CbValidateMode::All);
-
- if (ValidationResult == CbValidateError::None)
- {
- CbObjectView CacheRecord(UpstreamResult.Value.Data());
-
- CbObjectWriter IndexData;
- IndexData.BeginArray("references");
- CacheRecord.IterateAttachments([&](CbFieldView Attachment) { IndexData.AddHash(Attachment.AsHash()); });
- IndexData.EndArray();
-
- Value.IndexData = IndexData.Save();
- }
- else
- {
- Success = false;
- ZEN_WARN("Get - cache record '{}/{}' FAILED, invalid compact binary object from upstream",
- Ref.BucketSegment,
- Ref.HashKey);
- }
- }
+ if (auto UpstreamResult = m_UpstreamCache->GetCacheRecord({Ref.BucketSegment, Ref.HashKey}, CacheRecordType);
+ UpstreamResult.Success)
+ {
+ Value.Value = UpstreamResult.Value;
+ Success = true;
+ InUpstreamCache = true;
- if (Success)
- {
- m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, Value);
- }
- }
- else
- {
- ZEN_ASSERT(CacheRecordType == ZenContentType::kCbPackage);
+ if (CacheRecordType == ZenContentType::kBinary || CacheRecordType == ZenContentType::kCbObject)
+ {
+ if (CacheRecordType == ZenContentType::kCbObject)
+ {
+ const CbValidateError ValidationResult = ValidateCompactBinary(UpstreamResult.Value, CbValidateMode::All);
- CbPackage Package;
- if (Package.TryLoad(UpstreamResult.Value))
- {
- uint32_t AttachmentCount = 0;
- uint32_t FoundCount = 0;
- CbObject CacheRecord = Package.GetObject();
-
- CacheRecord.IterateAttachments(
- [this, &Package, &Ref, &AttachmentCount, &FoundCount](CbFieldView AttachmentHash) {
- if (const CbAttachment* Attachment = Package.FindAttachment(AttachmentHash.AsHash()))
- {
- if (CompressedBuffer Chunk = Attachment->AsCompressedBinary())
- {
- m_CidStore.AddChunk(Chunk);
- FoundCount++;
- }
- else
- {
- ZEN_WARN("GET - cache record '{}/{}' FAILED, upstream attachment not compressed",
- Ref.BucketSegment,
- Ref.HashKey);
- }
- }
- AttachmentCount++;
- });
-
- if (FoundCount == AttachmentCount)
- {
- m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, {.Value = CacheRecord.GetBuffer().AsIoBuffer()});
-
- if (zen::CachePolicy::SkipAttachments == (Policy & zen::CachePolicy::SkipAttachments))
- {
- CbPackage PackageWithoutAttachments;
- PackageWithoutAttachments.SetObject(CacheRecord);
-
- MemoryOutStream MemStream;
- BinaryWriter Writer(MemStream);
- PackageWithoutAttachments.Save(Writer);
-
- Value.Value = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size());
- }
- }
- else
- {
- Success = false;
- ZEN_WARN("GET - cache record '{}/{}' FAILED, attachments missing in upstream package",
- Ref.BucketSegment,
- Ref.HashKey);
- }
- }
- else
- {
- Success = false;
- ZEN_WARN("GET - cache record '{}/{}' FAILED, invalid upstream package", Ref.BucketSegment, Ref.HashKey);
- }
- }
- }
- }
+ if (ValidationResult == CbValidateError::None)
+ {
+ CbObjectView CacheRecord(UpstreamResult.Value.Data());
- if (!Success)
- {
- ZEN_DEBUG("MISS - '{}/{}'", Ref.BucketSegment, Ref.HashKey);
+ CbObjectWriter IndexData;
+ IndexData.BeginArray("references");
+ CacheRecord.IterateAttachments([&](CbFieldView Attachment) { IndexData.AddHash(Attachment.AsHash()); });
+ IndexData.EndArray();
- return Request.WriteResponse(HttpResponseCode::NotFound);
+ Value.IndexData = IndexData.Save();
+ }
+ else
+ {
+ Success = false;
+ ZEN_WARN("Get - cache record '{}/{}' FAILED, invalid compact binary object from upstream",
+ Ref.BucketSegment,
+ Ref.HashKey);
+ }
}
- if (Verb == kHead)
+ if (Success)
{
- Request.SetSuppressResponseBody();
+ m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, Value);
}
+ }
+ else
+ {
+ ZEN_ASSERT(CacheRecordType == ZenContentType::kCbPackage);
- if (AcceptType == ZenContentType::kCbPackage && !InUpstreamCache)
+ CbPackage Package;
+ if (Package.TryLoad(UpstreamResult.Value))
{
- CbObjectView CacheRecord(Value.Value.Data());
+ uint32_t AttachmentCount = 0;
+ uint32_t ValidCount = 0;
+ CbObject CacheRecord = Package.GetObject();
- const CbValidateError ValidationResult = ValidateCompactBinary(Value.Value, CbValidateMode::All);
+ CacheRecord.IterateAttachments([this, &Package, &Ref, &AttachmentCount, &ValidCount](CbFieldView AttachmentHash) {
+ if (const CbAttachment* Attachment = Package.FindAttachment(AttachmentHash.AsHash()))
+ {
+ if (CompressedBuffer Chunk = Attachment->AsCompressedBinary())
+ {
+ m_CidStore.AddChunk(Chunk);
+ ValidCount++;
+ }
+ else
+ {
+ ZEN_WARN("GET - cache record '{}/{}' FAILED, upstream attachment not compressed",
+ Ref.BucketSegment,
+ Ref.HashKey);
+ }
+ }
+ AttachmentCount++;
+ });
- if (ValidationResult != CbValidateError::None)
+ if (ValidCount == AttachmentCount)
{
- ZEN_WARN("GET - cache record '{}/{}' FAILED, invalid compact binary object", Ref.BucketSegment, Ref.HashKey);
-
- return Request.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, "Invalid cache record"sv);
- }
+ m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, {.Value = CacheRecord.GetBuffer().AsIoBuffer()});
- const bool SkipAttachments = zen::CachePolicy::SkipAttachments == (Policy & zen::CachePolicy::SkipAttachments);
- uint32_t AttachmentCount = 0;
- uint32_t FoundCount = 0;
- uint64_t AttachmentBytes = 0ull;
-
- CbPackage Package;
-
- if (!SkipAttachments)
- {
- CacheRecord.IterateAttachments(
- [this, &Ref, &Package, &AttachmentCount, &FoundCount, &AttachmentBytes](CbFieldView AttachmentHash) {
- if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash()))
- {
- Package.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk))));
- AttachmentBytes += Chunk.Size();
- FoundCount++;
- }
- AttachmentCount++;
- });
-
- if (FoundCount != AttachmentCount)
+ if (zen::CachePolicy::SkipAttachments == (Policy & zen::CachePolicy::SkipAttachments))
{
- ZEN_WARN("GET - cache record '{}/{}' FAILED, found '{}' of '{}' attachments",
- Ref.BucketSegment,
- Ref.HashKey,
- FoundCount,
- AttachmentCount);
+ CbPackage PackageWithoutAttachments;
+ PackageWithoutAttachments.SetObject(CacheRecord);
- return Request.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, "Missing attachments"sv);
+ MemoryOutStream MemStream;
+ BinaryWriter Writer(MemStream);
+ PackageWithoutAttachments.Save(Writer);
+
+ Value.Value = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size());
}
}
-
- Package.SetObject(LoadCompactBinaryObject(Value.Value));
-
- ZEN_DEBUG("HIT - '{}/{}' {}, {} attachments ({})",
- Ref.BucketSegment,
- Ref.HashKey,
- NiceBytes(AttachmentBytes + Value.Value.Size()),
- AttachmentCount,
- InUpstreamCache ? "UPSTREAM" : "LOCAL");
-
- MemoryOutStream MemStream;
- BinaryWriter Writer(MemStream);
- Package.Save(Writer);
-
- IoBuffer Response(IoBuffer::Clone, MemStream.Data(), MemStream.Size());
-
- return Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, Response);
+ else
+ {
+ Success = false;
+ ZEN_WARN("GET - cache record '{}/{}' FAILED, attachments missing in upstream package",
+ Ref.BucketSegment,
+ Ref.HashKey);
+ }
}
else
{
- ZEN_DEBUG("HIT - '{}/{}' {} ({})",
- Ref.BucketSegment,
- Ref.HashKey,
- NiceBytes(Value.Value.Size()),
- InUpstreamCache ? "UPSTREAM" : "LOCAL");
-
- return Request.WriteResponse(HttpResponseCode::OK, Value.Value.GetContentType(), Value.Value);
+ Success = false;
+ ZEN_WARN("GET - cache record '{}/{}' FAILED, invalid upstream package", Ref.BucketSegment, Ref.HashKey);
}
}
- break;
+ }
+ }
- case kPut:
- {
- IoBuffer Body = Request.ReadPayload();
+ if (!Success)
+ {
+ ZEN_DEBUG("MISS - '{}/{}'", Ref.BucketSegment, Ref.HashKey);
- if (!Body || Body.Size() == 0)
- {
- return Request.WriteResponse(HttpResponseCode::BadRequest);
- }
+ return Request.WriteResponse(HttpResponseCode::NotFound);
+ }
- const HttpContentType ContentType = Request.RequestContentType();
+ if (AcceptType == ZenContentType::kCbPackage && !InUpstreamCache)
+ {
+ CbObjectView CacheRecord(Value.Value.Data());
- const bool StoreUpstream = m_UpstreamCache && (CachePolicy::StoreRemote == (Policy & CachePolicy::StoreRemote));
+ const CbValidateError ValidationResult = ValidateCompactBinary(Value.Value, CbValidateMode::All);
- if (ContentType == HttpContentType::kBinary || ContentType == HttpContentType::kUnknownContentType)
- {
- // TODO: create a cache record and put value in CAS?
- m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, {.Value = Body});
- ZEN_DEBUG("PUT - binary '{}/{}' {}", Ref.BucketSegment, Ref.HashKey, NiceBytes(Body.Size()));
+ if (ValidationResult != CbValidateError::None)
+ {
+ ZEN_WARN("GET - cache record '{}/{}' FAILED, invalid compact binary object", Ref.BucketSegment, Ref.HashKey);
+ return Request.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, "Invalid cache record"sv);
+ }
- if (StoreUpstream)
+ const bool SkipAttachments = zen::CachePolicy::SkipAttachments == (Policy & zen::CachePolicy::SkipAttachments);
+ uint32_t AttachmentCount = 0;
+ uint32_t ValidCount = 0;
+ uint64_t AttachmentBytes = 0ull;
+
+ CbPackage Package;
+
+ if (!SkipAttachments)
+ {
+ CacheRecord.IterateAttachments(
+ [this, &Ref, &Package, &AttachmentCount, &ValidCount, &AttachmentBytes](CbFieldView AttachmentHash) {
+ if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash()))
{
- auto Result = m_UpstreamCache->EnqueueUpstream(
- {.Type = ZenContentType::kBinary, .CacheKey = {Ref.BucketSegment, Ref.HashKey}});
+ Package.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk))));
+ AttachmentBytes += Chunk.Size();
+ ValidCount++;
}
+ AttachmentCount++;
+ });
- return Request.WriteResponse(HttpResponseCode::Created);
- }
- else if (ContentType == HttpContentType::kCbObject)
- {
- // Validate payload before accessing it
- const CbValidateError ValidationResult =
- ValidateCompactBinary(MemoryView(Body.Data(), Body.Size()), CbValidateMode::All);
+ if (ValidCount != AttachmentCount)
+ {
+ ZEN_WARN("GET - cache record '{}/{}' FAILED, found '{}' of '{}' attachments",
+ Ref.BucketSegment,
+ Ref.HashKey,
+ ValidCount,
+ AttachmentCount);
- if (ValidationResult != CbValidateError::None)
- {
- ZEN_WARN("PUT - cache record '{}/{}' ({} bytes) FAILED, invalid compact binary",
- Ref.BucketSegment,
- Ref.HashKey,
- Body.Size());
+ return Request.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, "Missing attachments"sv);
+ }
+ }
- // TODO: add details in response, kText || kCbObject?
- return Request.WriteResponse(HttpResponseCode::BadRequest,
- HttpContentType::kText,
- "Compact binary validation failed"sv);
- }
+ Package.SetObject(LoadCompactBinaryObject(Value.Value));
- // Extract referenced payload hashes
- CbObjectView Cbo(Body.Data());
+ ZEN_DEBUG("HIT - '{}/{}' {}, {} attachments (LOCAL)",
+ Ref.BucketSegment,
+ Ref.HashKey,
+ NiceBytes(AttachmentBytes + Value.Value.Size()),
+ AttachmentCount);
- std::vector<IoHash> References;
- std::vector<IoHash> MissingRefs;
- Cbo.IterateAttachments([&](CbFieldView AttachmentView) { References.push_back(AttachmentView.AsHash()); });
+ MemoryOutStream MemStream;
+ BinaryWriter Writer(MemStream);
+ Package.Save(Writer);
- ZenCacheValue CacheValue;
- CacheValue.Value = Body;
+ IoBuffer Response(IoBuffer::Clone, MemStream.Data(), MemStream.Size());
- if (!References.empty())
- {
- CbObjectWriter Idx;
- Idx.BeginArray("references");
+ Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, Response);
+ }
+ else
+ {
+ ZEN_DEBUG("HIT - '{}/{}' {} ({})",
+ Ref.BucketSegment,
+ Ref.HashKey,
+ NiceBytes(Value.Value.Size()),
+ InUpstreamCache ? "UPSTREAM" : "LOCAL");
- for (const IoHash& Hash : References)
- {
- Idx.AddHash(Hash);
- if (!m_CidStore.ContainsChunk(Hash))
- {
- MissingRefs.push_back(Hash);
- }
- }
+ Request.WriteResponse(HttpResponseCode::OK, Value.Value.GetContentType(), Value.Value);
+ }
+}
- Idx.EndArray();
+void
+HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy)
+{
+ IoBuffer Body = Request.ReadPayload();
- CacheValue.IndexData = Idx.Save();
- }
+ if (!Body || Body.Size() == 0)
+ {
+ return Request.WriteResponse(HttpResponseCode::BadRequest);
+ }
- m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, CacheValue);
+ const HttpContentType ContentType = Request.RequestContentType();
+ const bool StoreUpstream = m_UpstreamCache && (CachePolicy::StoreRemote == (Policy & CachePolicy::StoreRemote));
- ZEN_DEBUG("PUT - cache record '{}/{}' {}, {}/{} attachments missing",
- Ref.BucketSegment,
- Ref.HashKey,
- NiceBytes(CacheValue.Value.Size()),
- MissingRefs.size(),
- References.size());
+ if (ContentType == HttpContentType::kBinary || ContentType == HttpContentType::kUnknownContentType)
+ {
+ m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, {.Value = Body});
+ ZEN_DEBUG("PUT - binary '{}/{}' {}", Ref.BucketSegment, Ref.HashKey, NiceBytes(Body.Size()));
- if (MissingRefs.empty() && StoreUpstream)
- {
- ZEN_ASSERT(m_UpstreamCache);
- auto Result = m_UpstreamCache->EnqueueUpstream({.Type = ZenContentType::kCbObject,
- .CacheKey = {Ref.BucketSegment, Ref.HashKey},
- .PayloadIds = std::move(References)});
+ if (StoreUpstream)
+ {
+ ZEN_ASSERT(m_UpstreamCache);
+ auto Result = m_UpstreamCache->EnqueueUpstream({.Type = ZenContentType::kBinary, .CacheKey = {Ref.BucketSegment, Ref.HashKey}});
+ }
- return Request.WriteResponse(HttpResponseCode::Created);
- }
- else
- {
- // TODO: Binary attachments?
- CbObjectWriter Response;
- Response.BeginArray("needs");
- for (const IoHash& MissingRef : MissingRefs)
- {
- Response.AddHash(MissingRef);
- ZEN_DEBUG("cache record '{}/{}' is missing reference '{}'", Ref.BucketSegment, Ref.HashKey, MissingRef);
- }
- Response.EndArray();
+ Request.WriteResponse(HttpResponseCode::Created);
+ }
+ else if (ContentType == HttpContentType::kCbObject)
+ {
+ const CbValidateError ValidationResult = ValidateCompactBinary(MemoryView(Body.GetData(), Body.GetSize()), CbValidateMode::All);
- // Return Created | BadRequest?
- return Request.WriteResponse(HttpResponseCode::Created, Response.Save());
- }
- }
- else if (ContentType == HttpContentType::kCbPackage)
- {
- CbPackage Package;
+ if (ValidationResult != CbValidateError::None)
+ {
+ ZEN_WARN("PUT - cache record '{}/{}' ({} bytes) FAILED, invalid compact binary", Ref.BucketSegment, Ref.HashKey, Body.Size());
+ return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Compact binary validation failed"sv);
+ }
- if (!Package.TryLoad(Body))
- {
- ZEN_WARN("PUT - cache record '{}/{}' FAILED, invalid package", Ref.BucketSegment, Ref.HashKey);
- return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid package");
- }
+ CbObjectView CacheRecord(Body.Data());
+ std::vector<IoHash> ValidAttachments;
+ uint32_t AttachmentCount = 0;
- CbObject CacheRecord = Package.GetObject();
+ CacheRecord.IterateAttachments([this, &AttachmentCount, &ValidAttachments](CbFieldView AttachmentHash) {
+ const IoHash Hash = AttachmentHash.AsHash();
+ if (m_CidStore.ContainsChunk(Hash))
+ {
+ ValidAttachments.emplace_back(Hash);
+ }
+ AttachmentCount++;
+ });
- struct AttachmentInsertResult
- {
- int32_t Count = 0;
- int32_t NewCount = 0;
- uint64_t Bytes = 0;
- uint64_t NewBytes = 0;
- bool Ok = false;
- };
+ const uint32_t ValidCount = static_cast<uint32_t>(ValidAttachments.size());
+ const bool ValidCacheRecord = ValidCount == AttachmentCount;
- AttachmentInsertResult AttachmentResult{.Ok = true};
- std::span<const CbAttachment> Attachments = Package.GetAttachments();
- std::vector<IoHash> PayloadIds;
+ if (ValidCacheRecord)
+ {
+ ZEN_DEBUG("PUT - cache record '{}/{}' {}, {} attachments", Ref.BucketSegment, Ref.HashKey, NiceBytes(Body.Size()), ValidCount);
+ m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, {.Value = Body});
- PayloadIds.reserve(Attachments.size());
+ if (StoreUpstream)
+ {
+ ZEN_ASSERT(m_UpstreamCache);
+ auto Result = m_UpstreamCache->EnqueueUpstream({.Type = ZenContentType::kCbObject,
+ .CacheKey = {Ref.BucketSegment, Ref.HashKey},
+ .PayloadIds = std::move(ValidAttachments)});
+ }
- CacheRecord.IterateAttachments([this, &Ref, &Package, &AttachmentResult, &PayloadIds](CbFieldView AttachmentHash) {
- if (const CbAttachment* Attachment = Package.FindAttachment(AttachmentHash.AsHash()))
- {
- if (Attachment->IsCompressedBinary())
- {
- CompressedBuffer Chunk = Attachment->AsCompressedBinary();
- const uint64_t ChunkSize = Chunk.GetCompressed().GetSize();
- CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk);
+ Request.WriteResponse(HttpResponseCode::Created);
+ }
+ else
+ {
+ ZEN_WARN("PUT - cache record '{}/{}' FAILED, found {}/{} attachments",
+ Ref.BucketSegment,
+ Ref.HashKey,
+ ValidCount,
+ AttachmentCount);
- PayloadIds.emplace_back(InsertResult.DecompressedId);
+ Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Missing attachments"sv);
+ }
+ }
+ else if (ContentType == HttpContentType::kCbPackage)
+ {
+ CbPackage Package;
- if (InsertResult.New)
- {
- AttachmentResult.NewBytes += ChunkSize;
- AttachmentResult.NewCount++;
- }
+ if (!Package.TryLoad(Body))
+ {
+ ZEN_WARN("PUT - cache record '{}/{}' FAILED, invalid package", Ref.BucketSegment, Ref.HashKey);
+ return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid package"sv);
+ }
- AttachmentResult.Bytes += ChunkSize;
- AttachmentResult.Count++;
- }
- else
- {
- ZEN_WARN("PUT - cache record '{}/{}' FAILED, attachment '{}' is not compressed",
- Ref.BucketSegment,
- Ref.HashKey,
- AttachmentHash.AsHash());
- AttachmentResult.Ok = false;
- }
- }
- else
- {
- ZEN_WARN("PUT - cache record '{}/{}' FAILED, missing attachment '{}'",
- Ref.BucketSegment,
- Ref.HashKey,
- AttachmentHash.AsHash());
- AttachmentResult.Ok = false;
- }
- });
+ CbObject CacheRecord = Package.GetObject();
- if (!AttachmentResult.Ok)
- {
- return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid attachments");
- }
+ std::span<const CbAttachment> Attachments = Package.GetAttachments();
+ std::vector<IoHash> ValidAttachments;
+ int32_t NewAttachmentCount = 0;
+
+ ValidAttachments.reserve(Attachments.size());
- IoBuffer CacheRecordChunk = CacheRecord.GetBuffer().AsIoBuffer();
- const uint64_t TotalPackageBytes = AttachmentResult.Bytes + CacheRecordChunk.Size();
+ CacheRecord.IterateAttachments([this, &Ref, &Package, &ValidAttachments, &NewAttachmentCount](CbFieldView AttachmentHash) {
+ if (const CbAttachment* Attachment = Package.FindAttachment(AttachmentHash.AsHash()))
+ {
+ if (Attachment->IsCompressedBinary())
+ {
+ CompressedBuffer Chunk = Attachment->AsCompressedBinary();
+ const uint64_t ChunkSize = Chunk.GetCompressed().GetSize();
+ CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk);
- ZenCacheValue CacheValue{.Value = CacheRecordChunk};
- m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, CacheValue);
+ ValidAttachments.emplace_back(InsertResult.DecompressedId);
- if (StoreUpstream)
+ if (InsertResult.New)
{
- ZEN_ASSERT(m_UpstreamCache);
- auto Result = m_UpstreamCache->EnqueueUpstream({.Type = ZenContentType::kCbPackage,
- .CacheKey = {Ref.BucketSegment, Ref.HashKey},
- .PayloadIds = std::move(PayloadIds)});
+ NewAttachmentCount++;
}
-
- ZEN_DEBUG("PUT - cache record '{}/{}' {}, {}/{} ({}/{}) new attachments",
- Ref.BucketSegment,
- Ref.HashKey,
- NiceBytes(TotalPackageBytes),
- AttachmentResult.NewCount,
- AttachmentResult.Count,
- NiceBytes(AttachmentResult.NewBytes),
- NiceBytes(AttachmentResult.Bytes));
-
- return Request.WriteResponse(HttpResponseCode::Created);
}
else
{
- return Request.WriteResponse(HttpResponseCode::BadRequest);
+ ZEN_WARN("PUT - cache record '{}/{}' FAILED, attachment '{}' is not compressed",
+ Ref.BucketSegment,
+ Ref.HashKey,
+ AttachmentHash.AsHash());
}
}
- break;
+ else
+ {
+ ZEN_WARN("PUT - cache record '{}/{}' FAILED, missing attachment '{}'",
+ Ref.BucketSegment,
+ Ref.HashKey,
+ AttachmentHash.AsHash());
+ }
+ });
- case kPost:
- break;
+ const bool AttachmentsValid = ValidAttachments.size() == Attachments.size();
- default:
- break;
+ if (!AttachmentsValid)
+ {
+ return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid attachments"sv);
+ }
+
+ ZEN_DEBUG("PUT - cache record '{}/{}' {}, {}/{} new attachments",
+ Ref.BucketSegment,
+ Ref.HashKey,
+ NiceBytes(Body.GetSize()),
+ NewAttachmentCount,
+ Attachments.size());
+
+ m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, {.Value = CacheRecord.GetBuffer().AsIoBuffer()});
+
+ if (StoreUpstream)
+ {
+ ZEN_ASSERT(m_UpstreamCache);
+ auto Result = m_UpstreamCache->EnqueueUpstream({.Type = ZenContentType::kCbPackage,
+ .CacheKey = {Ref.BucketSegment, Ref.HashKey},
+ .PayloadIds = std::move(ValidAttachments)});
+ }
+
+ Request.WriteResponse(HttpResponseCode::Created);
+ }
+ else
+ {
+ Request.WriteResponse(HttpResponseCode::BadRequest);
}
}