diff options
| author | Per Larsson <[email protected]> | 2021-11-02 10:50:18 +0100 |
|---|---|---|
| committer | Per Larsson <[email protected]> | 2021-11-02 10:50:18 +0100 |
| commit | 4b8d4c0e375c729e38bdaadfebf0eaf14f08f5f9 (patch) | |
| tree | d1e6fb1c71a0c8c5e12b2814f309d09d7093c9b3 | |
| parent | Merge branch 'main' into zcache-batch (diff) | |
| download | zen-4b8d4c0e375c729e38bdaadfebf0eaf14f08f5f9.tar.xz zen-4b8d4c0e375c729e38bdaadfebf0eaf14f08f5f9.zip | |
Added upstream batch API.
| -rw-r--r-- | zencore/include/zencore/iobuffer.h | 1 | ||||
| -rw-r--r-- | zenserver-test/zenserver-test.cpp | 156 | ||||
| -rw-r--r-- | zenserver/cache/cachekey.cpp | 105 | ||||
| -rw-r--r-- | zenserver/cache/cachekey.h | 52 | ||||
| -rw-r--r-- | zenserver/cache/structuredcache.cpp | 225 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamcache.cpp | 41 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamcache.h | 14 | ||||
| -rw-r--r-- | zenserver/zenserver.vcxproj | 2 | ||||
| -rw-r--r-- | zenserver/zenserver.vcxproj.filters | 6 |
9 files changed, 462 insertions, 140 deletions
diff --git a/zencore/include/zencore/iobuffer.h b/zencore/include/zencore/iobuffer.h index 88a72cbba..f3e194bbd 100644 --- a/zencore/include/zencore/iobuffer.h +++ b/zencore/include/zencore/iobuffer.h @@ -382,6 +382,7 @@ public: ZENCORE_API static IoBuffer MakeFromFileHandle(void* FileHandle, uint64_t Offset = 0, uint64_t Size = ~0ull); ZENCORE_API static IoBuffer ReadFromFileMaybe(IoBuffer& InBuffer); inline static IoBuffer MakeCloneFromMemory(const void* Ptr, size_t Sz) { return IoBuffer(IoBuffer::Clone, Ptr, Sz); } + inline static IoBuffer MakeCloneFromMemory(MemoryView Memory) { return IoBuffer(IoBuffer::Clone, Memory.GetData(), Memory.GetSize()); } }; IoHash HashBuffer(IoBuffer& Buffer); diff --git a/zenserver-test/zenserver-test.cpp b/zenserver-test/zenserver-test.cpp index 876979914..70515770c 100644 --- a/zenserver-test/zenserver-test.cpp +++ b/zenserver-test/zenserver-test.cpp @@ -464,7 +464,9 @@ using namespace std::literals; class full_test_formatter final : public spdlog::formatter { public: - full_test_formatter(std::string_view LogId, std::chrono::time_point<std::chrono::system_clock> Epoch) : m_Epoch(Epoch), m_LogId(LogId) {} + full_test_formatter(std::string_view LogId, std::chrono::time_point<std::chrono::system_clock> Epoch) : m_Epoch(Epoch), m_LogId(LogId) + { + } virtual std::unique_ptr<formatter> clone() const override { return std::make_unique<full_test_formatter>(m_LogId, m_Epoch); } @@ -1703,8 +1705,8 @@ TEST_CASE("zcache.policy") LocalCfg.Spawn(LocalInst); - zen::IoHash Key; - zen::IoHash PayloadId; + zen::IoHash Key; + zen::IoHash PayloadId; // Store package locally { @@ -1761,8 +1763,8 @@ TEST_CASE("zcache.policy") UpstreamCfg.Spawn(UpstreamInst); LocalCfg.Spawn(LocalInst); - zen::IoHash Key; - zen::IoHash PayloadId; + zen::IoHash Key; + zen::IoHash PayloadId; // Store package upstream { @@ -1927,10 +1929,10 @@ TEST_CASE("zcache.batch") Inst.SpawnServer(PortNumber); Inst.WaitUntilReady(); - const std::string_view Bucket = "mybucket"sv; + const std::string_view Bucket = "mastodon"sv; const uint32_t BatchCount = 128; - // Create cachereords + // Create cach records { for (uint32_t Key = 1; Key <= BatchCount; ++Key) { @@ -1948,20 +1950,27 @@ TEST_CASE("zcache.batch") // Get all as a batch { - CbObjectWriter BatchQuery; + CbObjectWriter BatchRequest; + + BatchRequest << "method" + << "getcacherecords"; + + BatchRequest.BeginObject("params"); + BatchRequest.BeginArray("cachekeys"sv); - BatchQuery.BeginArray("records"sv); for (uint32_t Key = 1; Key <= BatchCount; ++Key) { const IoHash CacheKey = CreateCacheKey(Key); - BatchQuery.BeginObject(); - BatchQuery << "bucket"sv << Bucket << "key" << CacheKey; - BatchQuery.EndObject(); + BatchRequest.BeginObject(); + BatchRequest << "bucket"sv << Bucket << "key" << CacheKey; + BatchRequest.EndObject(); } - BatchQuery.EndArray(); + + BatchRequest.EndArray(); + BatchRequest.EndObject(); zen::BinaryWriter Payload; - BatchQuery.Save(Payload); + BatchRequest.Save(Payload); cpr::Response Result = cpr::Get(cpr::Url{"{}/$batch"_format(BaseUri)}, cpr::Header{{"Content-Type", "application/x-ue-cb"}, {"Accept", "application/x-ue-cbpkg"}}, @@ -1972,16 +1981,127 @@ TEST_CASE("zcache.batch") zen::IoBuffer Response(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()); zen::CbPackage Package; const bool Ok = Package.TryLoad(Response); + CHECK(Ok); + + size_t ReturnCount = 0; CbObjectView BatchResponse = Package.GetObject(); - for (uint32_t ExpectedKey = 1; CbFieldView ResponseView : BatchResponse["records"]) + for (uint32_t ExpectedKey = 1; CbFieldView ResponseView : BatchResponse["result"]) { - CbObjectView Response = ResponseView.AsObjectView(); - const uint32_t Key = Response["key"sv].AsUInt32(); - const IoHash Attachment = Response["data"sv].AsHash(); + CbObjectView Response = ResponseView.AsObjectView(); + const uint32_t Key = Response["key"sv].AsUInt32(); + const IoHash AttachmentHash = Response["data"sv].AsHash(); + const CbAttachment* Attachment = Package.FindAttachment(AttachmentHash); + CHECK(Key == ExpectedKey); + CHECK(Attachment != nullptr); + ExpectedKey++; + ReturnCount++; + } + + CHECK(ReturnCount == BatchCount); + } + } + + SUBCASE("get missing cache records") + { + std::filesystem::path TestDir = TestEnv.CreateNewTestDir(); + const uint16_t PortNumber = 13337; + const auto BaseUri = "http://localhost:{}/z$"_format(PortNumber); + + ZenServerInstance Inst(TestEnv); + Inst.SetTestDir(TestDir); + Inst.SpawnServer(PortNumber); + Inst.WaitUntilReady(); + + const std::string_view Bucket = "mastodon"sv; + + // Create some cache records + { + for (uint32_t Key = 1; Key <= 5; ++Key) + { + const IoHash CacheKey = CreateCacheKey(Key); + CbPackage CacheRecord = CreateCacheRecord(Key, 4096); + IoBuffer Payload = ToIoBuffer(CacheRecord); + + cpr::Response Result = cpr::Put(cpr::Url{"{}/{}/{}"_format(BaseUri, Bucket, CacheKey)}, + cpr::Body{(const char*)Payload.Data(), Payload.Size()}, + cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}}); + + CHECK(Result.status_code == 201); + } + } + + { + CbObjectWriter BatchRequest; + + BatchRequest << "method" + << "getcacherecords"; + + BatchRequest.BeginObject("params"); + BatchRequest.BeginArray("cachekeys"sv); + + BatchRequest.BeginObject(); + BatchRequest << "bucket"sv << Bucket << "key" << CreateCacheKey(1); + BatchRequest.EndObject(); + BatchRequest.BeginObject(); + BatchRequest << "bucket"sv << Bucket << "key" << CreateCacheKey(11); // Missing + BatchRequest.EndObject(); + BatchRequest.BeginObject(); + BatchRequest << "bucket"sv << Bucket << "key" << CreateCacheKey(2); + BatchRequest.EndObject(); + BatchRequest.BeginObject(); + BatchRequest << "bucket"sv << Bucket << "key" << CreateCacheKey(22); // Missing + BatchRequest.EndObject(); + + BatchRequest.EndArray(); + BatchRequest.EndObject(); + + zen::BinaryWriter Body; + BatchRequest.Save(Body); + + cpr::Response Result = cpr::Get(cpr::Url{"{}/$batch"_format(BaseUri)}, + cpr::Header{{"Content-Type", "application/x-ue-cb"}, {"Accept", "application/x-ue-cbpkg"}}, + cpr::Body{(const char*)Body.GetData(), Body.GetSize()}); + + CHECK(Result.status_code == 200); + + zen::IoBuffer Response(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()); + zen::CbPackage Package; + const bool Ok = Package.TryLoad(Response); + CHECK(Ok); + + CbObjectView BatchResponse = Package.GetObject(); + CbArrayView CacheRecords = BatchResponse["result"sv].AsArrayView(); + auto It = CacheRecords.CreateViewIterator(); + + { + CbObjectView CacheRecord = It.AsObjectView(); + const uint32_t Key = CacheRecord["key"sv].AsUInt32(); + const IoHash AttachmentHash = CacheRecord["data"sv].AsHash(); + const CbAttachment* Attachment = Package.FindAttachment(AttachmentHash); + + CHECK(Key == 1); + CHECK(Attachment != nullptr); } + + It++; + CHECK(It.IsNull()); + + It++; + { + CbObjectView CacheRecord = It.AsObjectView(); + const uint32_t Key = CacheRecord["key"sv].AsUInt32(); + const IoHash AttachmentHash = CacheRecord["data"sv].AsHash(); + const CbAttachment* Attachment = Package.FindAttachment(AttachmentHash); + + CHECK(Key == 2); + CHECK(Attachment != nullptr); + } + + It++; + CHECK(It.IsNull()); } } } diff --git a/zenserver/cache/cachekey.cpp b/zenserver/cache/cachekey.cpp new file mode 100644 index 000000000..94ef7fd12 --- /dev/null +++ b/zenserver/cache/cachekey.cpp @@ -0,0 +1,105 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "cachekey.h" + +#include <zencore/string.h> + +namespace zen { + +using namespace std::literals; + +namespace detail { namespace cacheopt { + constexpr std::string_view Local = "local"sv; + constexpr std::string_view Remote = "remote"sv; + constexpr std::string_view Data = "data"sv; + constexpr std::string_view Meta = "meta"sv; + constexpr std::string_view Value = "value"sv; + constexpr std::string_view Attachments = "attachments"sv; +}} // namespace detail::cacheopt + +CachePolicy +ParseQueryCachePolicy(std::string_view QueryPolicy, CachePolicy Default) +{ + if (QueryPolicy.empty()) + { + return Default; + } + + CachePolicy Result = CachePolicy::None; + + ForEachStrTok(QueryPolicy, ',', [&Result](const std::string_view& Token) { + if (Token == detail::cacheopt::Local) + { + Result |= CachePolicy::QueryLocal; + } + if (Token == detail::cacheopt::Remote) + { + Result |= CachePolicy::QueryRemote; + } + return true; + }); + + return Result; +} + +CachePolicy +ParseStoreCachePolicy(std::string_view StorePolicy, CachePolicy Default) +{ + if (StorePolicy.empty()) + { + return Default; + } + + CachePolicy Result = CachePolicy::None; + + ForEachStrTok(StorePolicy, ',', [&Result](const std::string_view& Token) { + if (Token == detail::cacheopt::Local) + { + Result |= CachePolicy::StoreLocal; + } + if (Token == detail::cacheopt::Remote) + { + Result |= CachePolicy::StoreRemote; + } + return true; + }); + + return Result; +} + +CachePolicy +ParseSkipCachePolicy(std::string_view SkipPolicy, CachePolicy Default) +{ + if (SkipPolicy.empty()) + { + return Default; + } + + CachePolicy Result = CachePolicy::None; + + ForEachStrTok(SkipPolicy, ',', [&Result](const std::string_view& Token) { + if (Token == detail::cacheopt::Meta) + { + Result |= CachePolicy::SkipMeta; + } + if (Token == detail::cacheopt::Value) + { + Result |= CachePolicy::SkipValue; + } + if (Token == detail::cacheopt::Attachments) + { + Result |= CachePolicy::SkipAttachments; + } + if (Token == detail::cacheopt::Data) + { + Result |= CachePolicy::SkipData; + } + return true; + }); + + return Result; +} + +CacheKey CacheKey::None = CacheKey{.Bucket = std::string(), .Hash = IoHash()}; + +} // namespace zen diff --git a/zenserver/cache/cachekey.h b/zenserver/cache/cachekey.h new file mode 100644 index 000000000..eba063699 --- /dev/null +++ b/zenserver/cache/cachekey.h @@ -0,0 +1,52 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/iohash.h> +#include <zencore/string.h> +#include <gsl/gsl-lite.hpp> + +namespace zen { + +enum class CachePolicy : uint8_t +{ + None = 0, + QueryLocal = 1 << 0, + QueryRemote = 1 << 1, + Query = QueryLocal | QueryRemote, + StoreLocal = 1 << 2, + StoreRemote = 1 << 3, + Store = StoreLocal | StoreRemote, + SkipMeta = 1 << 4, + SkipValue = 1 << 5, + SkipAttachments = 1 << 6, + SkipData = SkipMeta | SkipValue | SkipAttachments, + SkipLocalCopy = 1 << 7, + Local = QueryLocal | StoreLocal, + Remote = QueryRemote | StoreRemote, + Default = Query | Store, + Disable = None, +}; + +gsl_DEFINE_ENUM_BITMASK_OPERATORS(CachePolicy); + +CachePolicy ParseQueryCachePolicy(std::string_view QueryPolicy, CachePolicy Default = CachePolicy::Query); + +CachePolicy ParseStoreCachePolicy(std::string_view StorePolicy, CachePolicy Default = CachePolicy::Store); + +CachePolicy ParseSkipCachePolicy(std::string_view SkipPolicy, CachePolicy Default = CachePolicy::None); + +struct CacheKey +{ + std::string Bucket; + IoHash Hash; + + static CacheKey Create(std::string_view Bucket, const IoHash& Hash) + { + return {.Bucket = ToLower(Bucket), .Hash = Hash}; + } + + static CacheKey None; +}; + +} // namespace zen diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index 172e122e4..59cc74d53 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -12,6 +12,7 @@ #include <zenhttp/httpserver.h> #include <zenstore/CAS.h> +#include "cachekey.h" #include "monitoring/httpstats.h" #include "structuredcache.h" #include "structuredcachestore.h" @@ -36,111 +37,12 @@ using namespace std::literals; ////////////////////////////////////////////////////////////////////////// -namespace detail { namespace cacheopt { - constexpr std::string_view Local = "local"sv; - constexpr std::string_view Remote = "remote"sv; - constexpr std::string_view Data = "data"sv; - constexpr std::string_view Meta = "meta"sv; - constexpr std::string_view Value = "value"sv; - constexpr std::string_view Attachments = "attachments"sv; -}} // namespace detail::cacheopt - -////////////////////////////////////////////////////////////////////////// - -enum class CachePolicy : uint8_t -{ - None = 0, - QueryLocal = 1 << 0, - QueryRemote = 1 << 1, - Query = QueryLocal | QueryRemote, - StoreLocal = 1 << 2, - StoreRemote = 1 << 3, - Store = StoreLocal | StoreRemote, - SkipMeta = 1 << 4, - SkipValue = 1 << 5, - SkipAttachments = 1 << 6, - SkipData = SkipMeta | SkipValue | SkipAttachments, - SkipLocalCopy = 1 << 7, - Local = QueryLocal | StoreLocal, - Remote = QueryRemote | StoreRemote, - Default = Query | Store, - Disable = None, -}; - -gsl_DEFINE_ENUM_BITMASK_OPERATORS(CachePolicy); - CachePolicy ParseCachePolicy(const HttpServerRequest::QueryParams& QueryParams) { - CachePolicy QueryPolicy = CachePolicy::Query; - - { - std::string_view Opts = QueryParams.GetValue("query"sv); - if (!Opts.empty()) - { - QueryPolicy = CachePolicy::None; - ForEachStrTok(Opts, ',', [&QueryPolicy](const std::string_view& Opt) { - if (Opt == detail::cacheopt::Local) - { - QueryPolicy |= CachePolicy::QueryLocal; - } - if (Opt == detail::cacheopt::Remote) - { - QueryPolicy |= CachePolicy::QueryRemote; - } - return true; - }); - } - } - - CachePolicy StorePolicy = CachePolicy::Store; - - { - std::string_view Opts = QueryParams.GetValue("store"sv); - if (!Opts.empty()) - { - StorePolicy = CachePolicy::None; - ForEachStrTok(Opts, ',', [&StorePolicy](const std::string_view& Opt) { - if (Opt == detail::cacheopt::Local) - { - StorePolicy |= CachePolicy::StoreLocal; - } - if (Opt == detail::cacheopt::Remote) - { - StorePolicy |= CachePolicy::StoreRemote; - } - return true; - }); - } - } - - CachePolicy SkipPolicy = CachePolicy::None; - - { - std::string_view Opts = QueryParams.GetValue("skip"sv); - if (!Opts.empty()) - { - ForEachStrTok(Opts, ',', [&SkipPolicy](const std::string_view& Opt) { - if (Opt == detail::cacheopt::Meta) - { - SkipPolicy |= CachePolicy::SkipMeta; - } - if (Opt == detail::cacheopt::Value) - { - SkipPolicy |= CachePolicy::SkipValue; - } - if (Opt == detail::cacheopt::Attachments) - { - SkipPolicy |= CachePolicy::SkipAttachments; - } - if (Opt == detail::cacheopt::Data) - { - SkipPolicy |= CachePolicy::SkipData; - } - return true; - }); - } - } + const CachePolicy QueryPolicy = zen::ParseQueryCachePolicy(QueryParams.GetValue("query"sv)); + const CachePolicy StorePolicy = zen::ParseStoreCachePolicy(QueryParams.GetValue("store"sv)); + const CachePolicy SkipPolicy = zen::ParseSkipCachePolicy(QueryParams.GetValue("skip"sv)); return QueryPolicy | StorePolicy | SkipPolicy; } @@ -882,7 +784,11 @@ HttpStructuredCacheService::ValidateKeyUri(HttpServerRequest& Request, CacheRef& void HttpStructuredCacheService::HandleBatchRequest(zen::HttpServerRequest& Request, CachePolicy Policy) { - ZEN_UNUSED(Policy); + using namespace fmt::literals; + + const bool SkipData = (Policy & CachePolicy::SkipData) == CachePolicy::SkipData; + const bool SkipAttachments = (Policy & CachePolicy::SkipAttachments) == CachePolicy::SkipAttachments; + const bool QueryUpstream = m_UpstreamCache && ((Policy & CachePolicy::QueryRemote) == CachePolicy::QueryRemote); switch (auto Verb = Request.RequestVerb()) { @@ -898,44 +804,119 @@ HttpStructuredCacheService::HandleBatchRequest(zen::HttpServerRequest& Request, return Request.WriteResponse(HttpResponseCode::BadRequest); } - CbObject BatchRequest = zen::LoadCompactBinaryObject(Request.ReadPayload()); + CbObject BatchRequest = zen::LoadCompactBinaryObject(Request.ReadPayload()); + const std::string_view Method = BatchRequest["method"sv].AsString(); - CbPackage Package; - CbObjectWriter BatchResponse; + if (Method != "getcacherecords"sv) + { + return Request.WriteResponse(HttpResponseCode::BadRequest, + HttpContentType::kText, + "Invalid method '{}'"_format(Method)); + } - BatchResponse.BeginArray("records"sv); + CbObjectView Params = BatchRequest["params"sv].AsObjectView(); - for (CbFieldView QueryView : BatchRequest["records"sv]) + std::vector<CacheKey> CacheKeys; + std::vector<IoBuffer> CacheValues; + std::vector<IoBuffer> Payloads; + std::vector<size_t> Missing; + + for (CbFieldView QueryView : Params["cachekeys"sv]) { - CbObjectView Query = QueryView.AsObjectView(); - const std::string_view Bucket = Query["bucket"sv].AsString(); - const IoHash CacheKey = Query["key"sv].AsHash(); + CbObjectView Query = QueryView.AsObjectView(); + CacheKeys.push_back(CacheKey::Create(Query["bucket"sv].AsString(), Query["key"sv].AsHash())); + } - ZenCacheValue CacheValue; - const bool Hit = m_CacheStore.Get(Bucket, CacheKey, CacheValue); + CacheValues.resize(CacheKeys.size()); - if (Hit) + for (size_t Idx = 0; const CacheKey& Key : CacheKeys) + { + ZenCacheValue CacheValue; + if (m_CacheStore.Get(Key.Bucket, Key.Hash, CacheValue)) { CbObjectView CacheRecord(CacheValue.Value.Data()); - CacheRecord.IterateAttachments([this, &Package](CbFieldView AttachmentHash) { - if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash())) + if (!SkipAttachments) + { + CacheRecord.IterateAttachments([this, &Payloads](CbFieldView AttachmentHash) { + if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash())) + { + Payloads.push_back(Chunk); + } + }); + } + + CacheValues[Idx] = CacheValue.Value; + } + else + { + Missing.push_back(Idx); + } + + ++Idx; + } + + if (QueryUpstream) + { + auto UpstreamResult = m_UpstreamCache->GetCacheRecords( + CacheKeys, + Missing, + [this, &CacheValues, &Payloads, SkipAttachments](size_t KeyIndex, IoBuffer UpstreamValue) { + CbPackage UpstreamPackage; + if (UpstreamValue && UpstreamPackage.TryLoad(UpstreamValue)) { - Package.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk)))); + CbObjectView CacheRecord = UpstreamPackage.GetObject(); + + CacheRecord.IterateAttachments([&](CbFieldView AttachmentHash) { + if (const CbAttachment* Attachment = UpstreamPackage.FindAttachment(AttachmentHash.AsHash())) + { + if (CompressedBuffer Chunk = Attachment->AsCompressedBinary()) + { + m_CidStore.AddChunk(Chunk); + + if (!SkipAttachments) + { + Payloads.push_back(Chunk.GetCompressed().Flatten().AsIoBuffer()); + } + } + } + }); + + CacheValues[KeyIndex] = IoBufferBuilder::MakeCloneFromMemory(CacheRecord.GetView()); } }); + } + + CbObjectWriter BatchResponse; - BatchResponse << CacheRecord; + BatchResponse.BeginArray("result"sv); + for (const IoBuffer& Value : CacheValues) + { + if (Value) + { + BatchResponse << CbObjectView(Value.Data()); + } + else + { + BatchResponse.AddNull(); } } - BatchResponse.EndArray(); + + CbPackage Package; Package.SetObject(BatchResponse.Save()); - + + for (const IoBuffer& Payload : Payloads) + { + Package.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Payload)))); + } + BinaryWriter MemStream; Package.Save(MemStream); - - Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); + + Request.WriteResponse(HttpResponseCode::OK, + HttpContentType::kCbPackage, + IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); } break; default: diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index c0cd858b6..437b29cd7 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -780,6 +780,47 @@ public: return {}; } + virtual GetUpstreamCacheBatchResult GetCacheRecords(std::span<CacheKey> CacheKeys, + std::span<size_t> KeyIndex, + OnCacheGetComplete OnComplete) override + { + if (!m_Options.ReadUpstream) + { + return {.Missing = std::vector<size_t>(KeyIndex.begin(), KeyIndex.end())}; + } + + GetUpstreamCacheBatchResult Result; + + for (size_t Idx : KeyIndex) + { + const UpstreamCacheKey CacheKey = {CacheKeys[Idx].Bucket, CacheKeys[Idx].Hash}; + + GetUpstreamCacheResult CacheResult; + for (auto& Endpoint : m_Endpoints) + { + if (Endpoint->IsHealthy()) + { + CacheResult = Endpoint->GetCacheRecord(CacheKey, ZenContentType::kCbPackage); + if (CacheResult.Success) + { + break; + } + } + } + + if (CacheResult.Success) + { + OnComplete(Idx, CacheResult.Value); + } + else + { + Result.Missing.push_back(Idx); + } + } + + return Result; + } + virtual GetUpstreamCacheResult GetCachePayload(UpstreamPayloadKey PayloadKey) override { if (m_Options.ReadUpstream) diff --git a/zenserver/upstream/upstreamcache.h b/zenserver/upstream/upstreamcache.h index edc995da6..04554f210 100644 --- a/zenserver/upstream/upstreamcache.h +++ b/zenserver/upstream/upstreamcache.h @@ -2,12 +2,15 @@ #pragma once +#include "cache/cachekey.h" + #include <zencore/iobuffer.h> #include <zencore/iohash.h> #include <zencore/zencore.h> #include <atomic> #include <chrono> +#include <functional> #include <memory> namespace zen { @@ -62,6 +65,11 @@ struct GetUpstreamCacheResult bool Success = false; }; +struct GetUpstreamCacheBatchResult +{ + std::vector<size_t> Missing; +}; + struct PutUpstreamCacheResult { std::string Reason; @@ -88,6 +96,8 @@ struct UpstreamEndpointStats std::atomic<double> SecondsDown{}; }; +using OnCacheGetComplete = std::function<void(size_t, IoBuffer)>; + /** * The upstream endpont is responsible for handling upload/downloading of cache records. */ @@ -129,6 +139,10 @@ public: virtual GetUpstreamCacheResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) = 0; + virtual GetUpstreamCacheBatchResult GetCacheRecords(std::span<CacheKey> CacheKeys, + std::span<size_t> KeyIndex, + OnCacheGetComplete OnComplete) = 0; + virtual GetUpstreamCacheResult GetCachePayload(UpstreamPayloadKey PayloadKey) = 0; struct EnqueueResult diff --git a/zenserver/zenserver.vcxproj b/zenserver/zenserver.vcxproj index d954d3f8d..d90dd009a 100644 --- a/zenserver/zenserver.vcxproj +++ b/zenserver/zenserver.vcxproj @@ -105,6 +105,7 @@ </ItemDefinitionGroup> <ItemGroup> <ClInclude Include="admin\admin.h" /> + <ClInclude Include="cache\cachekey.h" /> <ClInclude Include="cache\structuredcache.h" /> <ClInclude Include="cache\structuredcachestore.h" /> <ClInclude Include="compute\apply.h" /> @@ -131,6 +132,7 @@ </ItemGroup> <ItemGroup> <ClCompile Include="admin\admin.cpp" /> + <ClCompile Include="cache\cachekey.cpp" /> <ClCompile Include="cache\structuredcache.cpp" /> <ClCompile Include="cache\structuredcachestore.cpp" /> <ClCompile Include="compute\apply.cpp" /> diff --git a/zenserver/zenserver.vcxproj.filters b/zenserver/zenserver.vcxproj.filters index 04c6267ba..ae5411afb 100644 --- a/zenserver/zenserver.vcxproj.filters +++ b/zenserver/zenserver.vcxproj.filters @@ -41,6 +41,9 @@ <ClInclude Include="experimental\vfs.h" /> <ClInclude Include="monitoring\httpstats.h" /> <ClInclude Include="monitoring\httpstatus.h" /> + <ClInclude Include="cache\cachekey.h"> + <Filter>cache</Filter> + </ClInclude> </ItemGroup> <ItemGroup> <ClCompile Include="zenserver.cpp" /> @@ -76,6 +79,9 @@ <ClCompile Include="experimental\vfs.cpp" /> <ClCompile Include="monitoring\httpstats.cpp" /> <ClCompile Include="monitoring\httpstatus.cpp" /> + <ClCompile Include="cache\cachekey.cpp"> + <Filter>cache</Filter> + </ClCompile> </ItemGroup> <ItemGroup> <Filter Include="cache"> |