aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPer Larsson <[email protected]>2021-11-02 10:50:18 +0100
committerPer Larsson <[email protected]>2021-11-02 10:50:18 +0100
commit4b8d4c0e375c729e38bdaadfebf0eaf14f08f5f9 (patch)
treed1e6fb1c71a0c8c5e12b2814f309d09d7093c9b3
parentMerge branch 'main' into zcache-batch (diff)
downloadzen-4b8d4c0e375c729e38bdaadfebf0eaf14f08f5f9.tar.xz
zen-4b8d4c0e375c729e38bdaadfebf0eaf14f08f5f9.zip
Added upstream batch API.
-rw-r--r--zencore/include/zencore/iobuffer.h1
-rw-r--r--zenserver-test/zenserver-test.cpp156
-rw-r--r--zenserver/cache/cachekey.cpp105
-rw-r--r--zenserver/cache/cachekey.h52
-rw-r--r--zenserver/cache/structuredcache.cpp225
-rw-r--r--zenserver/upstream/upstreamcache.cpp41
-rw-r--r--zenserver/upstream/upstreamcache.h14
-rw-r--r--zenserver/zenserver.vcxproj2
-rw-r--r--zenserver/zenserver.vcxproj.filters6
9 files changed, 462 insertions, 140 deletions
diff --git a/zencore/include/zencore/iobuffer.h b/zencore/include/zencore/iobuffer.h
index 88a72cbba..f3e194bbd 100644
--- a/zencore/include/zencore/iobuffer.h
+++ b/zencore/include/zencore/iobuffer.h
@@ -382,6 +382,7 @@ public:
ZENCORE_API static IoBuffer MakeFromFileHandle(void* FileHandle, uint64_t Offset = 0, uint64_t Size = ~0ull);
ZENCORE_API static IoBuffer ReadFromFileMaybe(IoBuffer& InBuffer);
inline static IoBuffer MakeCloneFromMemory(const void* Ptr, size_t Sz) { return IoBuffer(IoBuffer::Clone, Ptr, Sz); }
+ inline static IoBuffer MakeCloneFromMemory(MemoryView Memory) { return IoBuffer(IoBuffer::Clone, Memory.GetData(), Memory.GetSize()); }
};
IoHash HashBuffer(IoBuffer& Buffer);
diff --git a/zenserver-test/zenserver-test.cpp b/zenserver-test/zenserver-test.cpp
index 876979914..70515770c 100644
--- a/zenserver-test/zenserver-test.cpp
+++ b/zenserver-test/zenserver-test.cpp
@@ -464,7 +464,9 @@ using namespace std::literals;
class full_test_formatter final : public spdlog::formatter
{
public:
- full_test_formatter(std::string_view LogId, std::chrono::time_point<std::chrono::system_clock> Epoch) : m_Epoch(Epoch), m_LogId(LogId) {}
+ full_test_formatter(std::string_view LogId, std::chrono::time_point<std::chrono::system_clock> Epoch) : m_Epoch(Epoch), m_LogId(LogId)
+ {
+ }
virtual std::unique_ptr<formatter> clone() const override { return std::make_unique<full_test_formatter>(m_LogId, m_Epoch); }
@@ -1703,8 +1705,8 @@ TEST_CASE("zcache.policy")
LocalCfg.Spawn(LocalInst);
- zen::IoHash Key;
- zen::IoHash PayloadId;
+ zen::IoHash Key;
+ zen::IoHash PayloadId;
// Store package locally
{
@@ -1761,8 +1763,8 @@ TEST_CASE("zcache.policy")
UpstreamCfg.Spawn(UpstreamInst);
LocalCfg.Spawn(LocalInst);
- zen::IoHash Key;
- zen::IoHash PayloadId;
+ zen::IoHash Key;
+ zen::IoHash PayloadId;
// Store package upstream
{
@@ -1927,10 +1929,10 @@ TEST_CASE("zcache.batch")
Inst.SpawnServer(PortNumber);
Inst.WaitUntilReady();
- const std::string_view Bucket = "mybucket"sv;
+ const std::string_view Bucket = "mastodon"sv;
const uint32_t BatchCount = 128;
- // Create cachereords
+ // Create cach records
{
for (uint32_t Key = 1; Key <= BatchCount; ++Key)
{
@@ -1948,20 +1950,27 @@ TEST_CASE("zcache.batch")
// Get all as a batch
{
- CbObjectWriter BatchQuery;
+ CbObjectWriter BatchRequest;
+
+ BatchRequest << "method"
+ << "getcacherecords";
+
+ BatchRequest.BeginObject("params");
+ BatchRequest.BeginArray("cachekeys"sv);
- BatchQuery.BeginArray("records"sv);
for (uint32_t Key = 1; Key <= BatchCount; ++Key)
{
const IoHash CacheKey = CreateCacheKey(Key);
- BatchQuery.BeginObject();
- BatchQuery << "bucket"sv << Bucket << "key" << CacheKey;
- BatchQuery.EndObject();
+ BatchRequest.BeginObject();
+ BatchRequest << "bucket"sv << Bucket << "key" << CacheKey;
+ BatchRequest.EndObject();
}
- BatchQuery.EndArray();
+
+ BatchRequest.EndArray();
+ BatchRequest.EndObject();
zen::BinaryWriter Payload;
- BatchQuery.Save(Payload);
+ BatchRequest.Save(Payload);
cpr::Response Result = cpr::Get(cpr::Url{"{}/$batch"_format(BaseUri)},
cpr::Header{{"Content-Type", "application/x-ue-cb"}, {"Accept", "application/x-ue-cbpkg"}},
@@ -1972,16 +1981,127 @@ TEST_CASE("zcache.batch")
zen::IoBuffer Response(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size());
zen::CbPackage Package;
const bool Ok = Package.TryLoad(Response);
+ CHECK(Ok);
+
+ size_t ReturnCount = 0;
CbObjectView BatchResponse = Package.GetObject();
- for (uint32_t ExpectedKey = 1; CbFieldView ResponseView : BatchResponse["records"])
+ for (uint32_t ExpectedKey = 1; CbFieldView ResponseView : BatchResponse["result"])
{
- CbObjectView Response = ResponseView.AsObjectView();
- const uint32_t Key = Response["key"sv].AsUInt32();
- const IoHash Attachment = Response["data"sv].AsHash();
+ CbObjectView Response = ResponseView.AsObjectView();
+ const uint32_t Key = Response["key"sv].AsUInt32();
+ const IoHash AttachmentHash = Response["data"sv].AsHash();
+ const CbAttachment* Attachment = Package.FindAttachment(AttachmentHash);
+
CHECK(Key == ExpectedKey);
+ CHECK(Attachment != nullptr);
+
ExpectedKey++;
+ ReturnCount++;
+ }
+
+ CHECK(ReturnCount == BatchCount);
+ }
+ }
+
+ SUBCASE("get missing cache records")
+ {
+ std::filesystem::path TestDir = TestEnv.CreateNewTestDir();
+ const uint16_t PortNumber = 13337;
+ const auto BaseUri = "http://localhost:{}/z$"_format(PortNumber);
+
+ ZenServerInstance Inst(TestEnv);
+ Inst.SetTestDir(TestDir);
+ Inst.SpawnServer(PortNumber);
+ Inst.WaitUntilReady();
+
+ const std::string_view Bucket = "mastodon"sv;
+
+ // Create some cache records
+ {
+ for (uint32_t Key = 1; Key <= 5; ++Key)
+ {
+ const IoHash CacheKey = CreateCacheKey(Key);
+ CbPackage CacheRecord = CreateCacheRecord(Key, 4096);
+ IoBuffer Payload = ToIoBuffer(CacheRecord);
+
+ cpr::Response Result = cpr::Put(cpr::Url{"{}/{}/{}"_format(BaseUri, Bucket, CacheKey)},
+ cpr::Body{(const char*)Payload.Data(), Payload.Size()},
+ cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}});
+
+ CHECK(Result.status_code == 201);
+ }
+ }
+
+ {
+ CbObjectWriter BatchRequest;
+
+ BatchRequest << "method"
+ << "getcacherecords";
+
+ BatchRequest.BeginObject("params");
+ BatchRequest.BeginArray("cachekeys"sv);
+
+ BatchRequest.BeginObject();
+ BatchRequest << "bucket"sv << Bucket << "key" << CreateCacheKey(1);
+ BatchRequest.EndObject();
+ BatchRequest.BeginObject();
+ BatchRequest << "bucket"sv << Bucket << "key" << CreateCacheKey(11); // Missing
+ BatchRequest.EndObject();
+ BatchRequest.BeginObject();
+ BatchRequest << "bucket"sv << Bucket << "key" << CreateCacheKey(2);
+ BatchRequest.EndObject();
+ BatchRequest.BeginObject();
+ BatchRequest << "bucket"sv << Bucket << "key" << CreateCacheKey(22); // Missing
+ BatchRequest.EndObject();
+
+ BatchRequest.EndArray();
+ BatchRequest.EndObject();
+
+ zen::BinaryWriter Body;
+ BatchRequest.Save(Body);
+
+ cpr::Response Result = cpr::Get(cpr::Url{"{}/$batch"_format(BaseUri)},
+ cpr::Header{{"Content-Type", "application/x-ue-cb"}, {"Accept", "application/x-ue-cbpkg"}},
+ cpr::Body{(const char*)Body.GetData(), Body.GetSize()});
+
+ CHECK(Result.status_code == 200);
+
+ zen::IoBuffer Response(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size());
+ zen::CbPackage Package;
+ const bool Ok = Package.TryLoad(Response);
+ CHECK(Ok);
+
+ CbObjectView BatchResponse = Package.GetObject();
+ CbArrayView CacheRecords = BatchResponse["result"sv].AsArrayView();
+ auto It = CacheRecords.CreateViewIterator();
+
+ {
+ CbObjectView CacheRecord = It.AsObjectView();
+ const uint32_t Key = CacheRecord["key"sv].AsUInt32();
+ const IoHash AttachmentHash = CacheRecord["data"sv].AsHash();
+ const CbAttachment* Attachment = Package.FindAttachment(AttachmentHash);
+
+ CHECK(Key == 1);
+ CHECK(Attachment != nullptr);
}
+
+ It++;
+ CHECK(It.IsNull());
+
+ It++;
+ {
+ CbObjectView CacheRecord = It.AsObjectView();
+ const uint32_t Key = CacheRecord["key"sv].AsUInt32();
+ const IoHash AttachmentHash = CacheRecord["data"sv].AsHash();
+ const CbAttachment* Attachment = Package.FindAttachment(AttachmentHash);
+
+ CHECK(Key == 2);
+ CHECK(Attachment != nullptr);
+ }
+
+ It++;
+ CHECK(It.IsNull());
}
}
}
diff --git a/zenserver/cache/cachekey.cpp b/zenserver/cache/cachekey.cpp
new file mode 100644
index 000000000..94ef7fd12
--- /dev/null
+++ b/zenserver/cache/cachekey.cpp
@@ -0,0 +1,105 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "cachekey.h"
+
+#include <zencore/string.h>
+
+namespace zen {
+
+using namespace std::literals;
+
+namespace detail { namespace cacheopt {
+ constexpr std::string_view Local = "local"sv;
+ constexpr std::string_view Remote = "remote"sv;
+ constexpr std::string_view Data = "data"sv;
+ constexpr std::string_view Meta = "meta"sv;
+ constexpr std::string_view Value = "value"sv;
+ constexpr std::string_view Attachments = "attachments"sv;
+}} // namespace detail::cacheopt
+
+CachePolicy
+ParseQueryCachePolicy(std::string_view QueryPolicy, CachePolicy Default)
+{
+ if (QueryPolicy.empty())
+ {
+ return Default;
+ }
+
+ CachePolicy Result = CachePolicy::None;
+
+ ForEachStrTok(QueryPolicy, ',', [&Result](const std::string_view& Token) {
+ if (Token == detail::cacheopt::Local)
+ {
+ Result |= CachePolicy::QueryLocal;
+ }
+ if (Token == detail::cacheopt::Remote)
+ {
+ Result |= CachePolicy::QueryRemote;
+ }
+ return true;
+ });
+
+ return Result;
+}
+
+CachePolicy
+ParseStoreCachePolicy(std::string_view StorePolicy, CachePolicy Default)
+{
+ if (StorePolicy.empty())
+ {
+ return Default;
+ }
+
+ CachePolicy Result = CachePolicy::None;
+
+ ForEachStrTok(StorePolicy, ',', [&Result](const std::string_view& Token) {
+ if (Token == detail::cacheopt::Local)
+ {
+ Result |= CachePolicy::StoreLocal;
+ }
+ if (Token == detail::cacheopt::Remote)
+ {
+ Result |= CachePolicy::StoreRemote;
+ }
+ return true;
+ });
+
+ return Result;
+}
+
+CachePolicy
+ParseSkipCachePolicy(std::string_view SkipPolicy, CachePolicy Default)
+{
+ if (SkipPolicy.empty())
+ {
+ return Default;
+ }
+
+ CachePolicy Result = CachePolicy::None;
+
+ ForEachStrTok(SkipPolicy, ',', [&Result](const std::string_view& Token) {
+ if (Token == detail::cacheopt::Meta)
+ {
+ Result |= CachePolicy::SkipMeta;
+ }
+ if (Token == detail::cacheopt::Value)
+ {
+ Result |= CachePolicy::SkipValue;
+ }
+ if (Token == detail::cacheopt::Attachments)
+ {
+ Result |= CachePolicy::SkipAttachments;
+ }
+ if (Token == detail::cacheopt::Data)
+ {
+ Result |= CachePolicy::SkipData;
+ }
+ return true;
+ });
+
+ return Result;
+}
+
+CacheKey CacheKey::None = CacheKey{.Bucket = std::string(), .Hash = IoHash()};
+
+} // namespace zen
diff --git a/zenserver/cache/cachekey.h b/zenserver/cache/cachekey.h
new file mode 100644
index 000000000..eba063699
--- /dev/null
+++ b/zenserver/cache/cachekey.h
@@ -0,0 +1,52 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/iohash.h>
+#include <zencore/string.h>
+#include <gsl/gsl-lite.hpp>
+
+namespace zen {
+
+enum class CachePolicy : uint8_t
+{
+ None = 0,
+ QueryLocal = 1 << 0,
+ QueryRemote = 1 << 1,
+ Query = QueryLocal | QueryRemote,
+ StoreLocal = 1 << 2,
+ StoreRemote = 1 << 3,
+ Store = StoreLocal | StoreRemote,
+ SkipMeta = 1 << 4,
+ SkipValue = 1 << 5,
+ SkipAttachments = 1 << 6,
+ SkipData = SkipMeta | SkipValue | SkipAttachments,
+ SkipLocalCopy = 1 << 7,
+ Local = QueryLocal | StoreLocal,
+ Remote = QueryRemote | StoreRemote,
+ Default = Query | Store,
+ Disable = None,
+};
+
+gsl_DEFINE_ENUM_BITMASK_OPERATORS(CachePolicy);
+
+CachePolicy ParseQueryCachePolicy(std::string_view QueryPolicy, CachePolicy Default = CachePolicy::Query);
+
+CachePolicy ParseStoreCachePolicy(std::string_view StorePolicy, CachePolicy Default = CachePolicy::Store);
+
+CachePolicy ParseSkipCachePolicy(std::string_view SkipPolicy, CachePolicy Default = CachePolicy::None);
+
+struct CacheKey
+{
+ std::string Bucket;
+ IoHash Hash;
+
+ static CacheKey Create(std::string_view Bucket, const IoHash& Hash)
+ {
+ return {.Bucket = ToLower(Bucket), .Hash = Hash};
+ }
+
+ static CacheKey None;
+};
+
+} // namespace zen
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp
index 172e122e4..59cc74d53 100644
--- a/zenserver/cache/structuredcache.cpp
+++ b/zenserver/cache/structuredcache.cpp
@@ -12,6 +12,7 @@
#include <zenhttp/httpserver.h>
#include <zenstore/CAS.h>
+#include "cachekey.h"
#include "monitoring/httpstats.h"
#include "structuredcache.h"
#include "structuredcachestore.h"
@@ -36,111 +37,12 @@ using namespace std::literals;
//////////////////////////////////////////////////////////////////////////
-namespace detail { namespace cacheopt {
- constexpr std::string_view Local = "local"sv;
- constexpr std::string_view Remote = "remote"sv;
- constexpr std::string_view Data = "data"sv;
- constexpr std::string_view Meta = "meta"sv;
- constexpr std::string_view Value = "value"sv;
- constexpr std::string_view Attachments = "attachments"sv;
-}} // namespace detail::cacheopt
-
-//////////////////////////////////////////////////////////////////////////
-
-enum class CachePolicy : uint8_t
-{
- None = 0,
- QueryLocal = 1 << 0,
- QueryRemote = 1 << 1,
- Query = QueryLocal | QueryRemote,
- StoreLocal = 1 << 2,
- StoreRemote = 1 << 3,
- Store = StoreLocal | StoreRemote,
- SkipMeta = 1 << 4,
- SkipValue = 1 << 5,
- SkipAttachments = 1 << 6,
- SkipData = SkipMeta | SkipValue | SkipAttachments,
- SkipLocalCopy = 1 << 7,
- Local = QueryLocal | StoreLocal,
- Remote = QueryRemote | StoreRemote,
- Default = Query | Store,
- Disable = None,
-};
-
-gsl_DEFINE_ENUM_BITMASK_OPERATORS(CachePolicy);
-
CachePolicy
ParseCachePolicy(const HttpServerRequest::QueryParams& QueryParams)
{
- CachePolicy QueryPolicy = CachePolicy::Query;
-
- {
- std::string_view Opts = QueryParams.GetValue("query"sv);
- if (!Opts.empty())
- {
- QueryPolicy = CachePolicy::None;
- ForEachStrTok(Opts, ',', [&QueryPolicy](const std::string_view& Opt) {
- if (Opt == detail::cacheopt::Local)
- {
- QueryPolicy |= CachePolicy::QueryLocal;
- }
- if (Opt == detail::cacheopt::Remote)
- {
- QueryPolicy |= CachePolicy::QueryRemote;
- }
- return true;
- });
- }
- }
-
- CachePolicy StorePolicy = CachePolicy::Store;
-
- {
- std::string_view Opts = QueryParams.GetValue("store"sv);
- if (!Opts.empty())
- {
- StorePolicy = CachePolicy::None;
- ForEachStrTok(Opts, ',', [&StorePolicy](const std::string_view& Opt) {
- if (Opt == detail::cacheopt::Local)
- {
- StorePolicy |= CachePolicy::StoreLocal;
- }
- if (Opt == detail::cacheopt::Remote)
- {
- StorePolicy |= CachePolicy::StoreRemote;
- }
- return true;
- });
- }
- }
-
- CachePolicy SkipPolicy = CachePolicy::None;
-
- {
- std::string_view Opts = QueryParams.GetValue("skip"sv);
- if (!Opts.empty())
- {
- ForEachStrTok(Opts, ',', [&SkipPolicy](const std::string_view& Opt) {
- if (Opt == detail::cacheopt::Meta)
- {
- SkipPolicy |= CachePolicy::SkipMeta;
- }
- if (Opt == detail::cacheopt::Value)
- {
- SkipPolicy |= CachePolicy::SkipValue;
- }
- if (Opt == detail::cacheopt::Attachments)
- {
- SkipPolicy |= CachePolicy::SkipAttachments;
- }
- if (Opt == detail::cacheopt::Data)
- {
- SkipPolicy |= CachePolicy::SkipData;
- }
- return true;
- });
- }
- }
+ const CachePolicy QueryPolicy = zen::ParseQueryCachePolicy(QueryParams.GetValue("query"sv));
+ const CachePolicy StorePolicy = zen::ParseStoreCachePolicy(QueryParams.GetValue("store"sv));
+ const CachePolicy SkipPolicy = zen::ParseSkipCachePolicy(QueryParams.GetValue("skip"sv));
return QueryPolicy | StorePolicy | SkipPolicy;
}
@@ -882,7 +784,11 @@ HttpStructuredCacheService::ValidateKeyUri(HttpServerRequest& Request, CacheRef&
void
HttpStructuredCacheService::HandleBatchRequest(zen::HttpServerRequest& Request, CachePolicy Policy)
{
- ZEN_UNUSED(Policy);
+ using namespace fmt::literals;
+
+ const bool SkipData = (Policy & CachePolicy::SkipData) == CachePolicy::SkipData;
+ const bool SkipAttachments = (Policy & CachePolicy::SkipAttachments) == CachePolicy::SkipAttachments;
+ const bool QueryUpstream = m_UpstreamCache && ((Policy & CachePolicy::QueryRemote) == CachePolicy::QueryRemote);
switch (auto Verb = Request.RequestVerb())
{
@@ -898,44 +804,119 @@ HttpStructuredCacheService::HandleBatchRequest(zen::HttpServerRequest& Request,
return Request.WriteResponse(HttpResponseCode::BadRequest);
}
- CbObject BatchRequest = zen::LoadCompactBinaryObject(Request.ReadPayload());
+ CbObject BatchRequest = zen::LoadCompactBinaryObject(Request.ReadPayload());
+ const std::string_view Method = BatchRequest["method"sv].AsString();
- CbPackage Package;
- CbObjectWriter BatchResponse;
+ if (Method != "getcacherecords"sv)
+ {
+ return Request.WriteResponse(HttpResponseCode::BadRequest,
+ HttpContentType::kText,
+ "Invalid method '{}'"_format(Method));
+ }
- BatchResponse.BeginArray("records"sv);
+ CbObjectView Params = BatchRequest["params"sv].AsObjectView();
- for (CbFieldView QueryView : BatchRequest["records"sv])
+ std::vector<CacheKey> CacheKeys;
+ std::vector<IoBuffer> CacheValues;
+ std::vector<IoBuffer> Payloads;
+ std::vector<size_t> Missing;
+
+ for (CbFieldView QueryView : Params["cachekeys"sv])
{
- CbObjectView Query = QueryView.AsObjectView();
- const std::string_view Bucket = Query["bucket"sv].AsString();
- const IoHash CacheKey = Query["key"sv].AsHash();
+ CbObjectView Query = QueryView.AsObjectView();
+ CacheKeys.push_back(CacheKey::Create(Query["bucket"sv].AsString(), Query["key"sv].AsHash()));
+ }
- ZenCacheValue CacheValue;
- const bool Hit = m_CacheStore.Get(Bucket, CacheKey, CacheValue);
+ CacheValues.resize(CacheKeys.size());
- if (Hit)
+ for (size_t Idx = 0; const CacheKey& Key : CacheKeys)
+ {
+ ZenCacheValue CacheValue;
+ if (m_CacheStore.Get(Key.Bucket, Key.Hash, CacheValue))
{
CbObjectView CacheRecord(CacheValue.Value.Data());
- CacheRecord.IterateAttachments([this, &Package](CbFieldView AttachmentHash) {
- if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash()))
+ if (!SkipAttachments)
+ {
+ CacheRecord.IterateAttachments([this, &Payloads](CbFieldView AttachmentHash) {
+ if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash()))
+ {
+ Payloads.push_back(Chunk);
+ }
+ });
+ }
+
+ CacheValues[Idx] = CacheValue.Value;
+ }
+ else
+ {
+ Missing.push_back(Idx);
+ }
+
+ ++Idx;
+ }
+
+ if (QueryUpstream)
+ {
+ auto UpstreamResult = m_UpstreamCache->GetCacheRecords(
+ CacheKeys,
+ Missing,
+ [this, &CacheValues, &Payloads, SkipAttachments](size_t KeyIndex, IoBuffer UpstreamValue) {
+ CbPackage UpstreamPackage;
+ if (UpstreamValue && UpstreamPackage.TryLoad(UpstreamValue))
{
- Package.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk))));
+ CbObjectView CacheRecord = UpstreamPackage.GetObject();
+
+ CacheRecord.IterateAttachments([&](CbFieldView AttachmentHash) {
+ if (const CbAttachment* Attachment = UpstreamPackage.FindAttachment(AttachmentHash.AsHash()))
+ {
+ if (CompressedBuffer Chunk = Attachment->AsCompressedBinary())
+ {
+ m_CidStore.AddChunk(Chunk);
+
+ if (!SkipAttachments)
+ {
+ Payloads.push_back(Chunk.GetCompressed().Flatten().AsIoBuffer());
+ }
+ }
+ }
+ });
+
+ CacheValues[KeyIndex] = IoBufferBuilder::MakeCloneFromMemory(CacheRecord.GetView());
}
});
+ }
+
+ CbObjectWriter BatchResponse;
- BatchResponse << CacheRecord;
+ BatchResponse.BeginArray("result"sv);
+ for (const IoBuffer& Value : CacheValues)
+ {
+ if (Value)
+ {
+ BatchResponse << CbObjectView(Value.Data());
+ }
+ else
+ {
+ BatchResponse.AddNull();
}
}
-
BatchResponse.EndArray();
+
+ CbPackage Package;
Package.SetObject(BatchResponse.Save());
-
+
+ for (const IoBuffer& Payload : Payloads)
+ {
+ Package.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Payload))));
+ }
+
BinaryWriter MemStream;
Package.Save(MemStream);
-
- Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()));
+
+ Request.WriteResponse(HttpResponseCode::OK,
+ HttpContentType::kCbPackage,
+ IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()));
}
break;
default:
diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp
index c0cd858b6..437b29cd7 100644
--- a/zenserver/upstream/upstreamcache.cpp
+++ b/zenserver/upstream/upstreamcache.cpp
@@ -780,6 +780,47 @@ public:
return {};
}
+ virtual GetUpstreamCacheBatchResult GetCacheRecords(std::span<CacheKey> CacheKeys,
+ std::span<size_t> KeyIndex,
+ OnCacheGetComplete OnComplete) override
+ {
+ if (!m_Options.ReadUpstream)
+ {
+ return {.Missing = std::vector<size_t>(KeyIndex.begin(), KeyIndex.end())};
+ }
+
+ GetUpstreamCacheBatchResult Result;
+
+ for (size_t Idx : KeyIndex)
+ {
+ const UpstreamCacheKey CacheKey = {CacheKeys[Idx].Bucket, CacheKeys[Idx].Hash};
+
+ GetUpstreamCacheResult CacheResult;
+ for (auto& Endpoint : m_Endpoints)
+ {
+ if (Endpoint->IsHealthy())
+ {
+ CacheResult = Endpoint->GetCacheRecord(CacheKey, ZenContentType::kCbPackage);
+ if (CacheResult.Success)
+ {
+ break;
+ }
+ }
+ }
+
+ if (CacheResult.Success)
+ {
+ OnComplete(Idx, CacheResult.Value);
+ }
+ else
+ {
+ Result.Missing.push_back(Idx);
+ }
+ }
+
+ return Result;
+ }
+
virtual GetUpstreamCacheResult GetCachePayload(UpstreamPayloadKey PayloadKey) override
{
if (m_Options.ReadUpstream)
diff --git a/zenserver/upstream/upstreamcache.h b/zenserver/upstream/upstreamcache.h
index edc995da6..04554f210 100644
--- a/zenserver/upstream/upstreamcache.h
+++ b/zenserver/upstream/upstreamcache.h
@@ -2,12 +2,15 @@
#pragma once
+#include "cache/cachekey.h"
+
#include <zencore/iobuffer.h>
#include <zencore/iohash.h>
#include <zencore/zencore.h>
#include <atomic>
#include <chrono>
+#include <functional>
#include <memory>
namespace zen {
@@ -62,6 +65,11 @@ struct GetUpstreamCacheResult
bool Success = false;
};
+struct GetUpstreamCacheBatchResult
+{
+ std::vector<size_t> Missing;
+};
+
struct PutUpstreamCacheResult
{
std::string Reason;
@@ -88,6 +96,8 @@ struct UpstreamEndpointStats
std::atomic<double> SecondsDown{};
};
+using OnCacheGetComplete = std::function<void(size_t, IoBuffer)>;
+
/**
* The upstream endpont is responsible for handling upload/downloading of cache records.
*/
@@ -129,6 +139,10 @@ public:
virtual GetUpstreamCacheResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) = 0;
+ virtual GetUpstreamCacheBatchResult GetCacheRecords(std::span<CacheKey> CacheKeys,
+ std::span<size_t> KeyIndex,
+ OnCacheGetComplete OnComplete) = 0;
+
virtual GetUpstreamCacheResult GetCachePayload(UpstreamPayloadKey PayloadKey) = 0;
struct EnqueueResult
diff --git a/zenserver/zenserver.vcxproj b/zenserver/zenserver.vcxproj
index d954d3f8d..d90dd009a 100644
--- a/zenserver/zenserver.vcxproj
+++ b/zenserver/zenserver.vcxproj
@@ -105,6 +105,7 @@
</ItemDefinitionGroup>
<ItemGroup>
<ClInclude Include="admin\admin.h" />
+ <ClInclude Include="cache\cachekey.h" />
<ClInclude Include="cache\structuredcache.h" />
<ClInclude Include="cache\structuredcachestore.h" />
<ClInclude Include="compute\apply.h" />
@@ -131,6 +132,7 @@
</ItemGroup>
<ItemGroup>
<ClCompile Include="admin\admin.cpp" />
+ <ClCompile Include="cache\cachekey.cpp" />
<ClCompile Include="cache\structuredcache.cpp" />
<ClCompile Include="cache\structuredcachestore.cpp" />
<ClCompile Include="compute\apply.cpp" />
diff --git a/zenserver/zenserver.vcxproj.filters b/zenserver/zenserver.vcxproj.filters
index 04c6267ba..ae5411afb 100644
--- a/zenserver/zenserver.vcxproj.filters
+++ b/zenserver/zenserver.vcxproj.filters
@@ -41,6 +41,9 @@
<ClInclude Include="experimental\vfs.h" />
<ClInclude Include="monitoring\httpstats.h" />
<ClInclude Include="monitoring\httpstatus.h" />
+ <ClInclude Include="cache\cachekey.h">
+ <Filter>cache</Filter>
+ </ClInclude>
</ItemGroup>
<ItemGroup>
<ClCompile Include="zenserver.cpp" />
@@ -76,6 +79,9 @@
<ClCompile Include="experimental\vfs.cpp" />
<ClCompile Include="monitoring\httpstats.cpp" />
<ClCompile Include="monitoring\httpstatus.cpp" />
+ <ClCompile Include="cache\cachekey.cpp">
+ <Filter>cache</Filter>
+ </ClCompile>
</ItemGroup>
<ItemGroup>
<Filter Include="cache">