diff options
| author | Stefan Boberg <[email protected]> | 2021-09-17 19:11:27 +0200 |
|---|---|---|
| committer | Stefan Boberg <[email protected]> | 2021-09-17 19:11:27 +0200 |
| commit | a47396e06a8c6ebc2a44135c47d4820e6984c225 (patch) | |
| tree | e719ed435a2de51e477822d98b0c71083edcca6a | |
| parent | Implemented basics for Windows server support (not yet 100% - needs to proper... (diff) | |
| parent | Added upstream cache policy command line option (read|write,readonly,writeonl... (diff) | |
| download | zen-a47396e06a8c6ebc2a44135c47d4820e6984c225.tar.xz zen-a47396e06a8c6ebc2a44135c47d4820e6984c225.zip | |
Merge branch 'main' of https://github.com/EpicGames/zen
| -rw-r--r-- | zencore/include/zencore/string.h | 39 | ||||
| -rw-r--r-- | zencore/string.cpp | 40 | ||||
| -rw-r--r-- | zenhttp/include/zenhttp/httpserver.h | 2 | ||||
| -rw-r--r-- | zenserver-test/zenserver-test.cpp | 280 | ||||
| -rw-r--r-- | zenserver/cache/structuredcache.cpp | 151 | ||||
| -rw-r--r-- | zenserver/cache/structuredcache.h | 5 | ||||
| -rw-r--r-- | zenserver/config.cpp | 41 | ||||
| -rw-r--r-- | zenserver/config.h | 10 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamcache.cpp | 24 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamcache.h | 4 | ||||
| -rw-r--r-- | zenserver/zenserver.cpp | 12 |
11 files changed, 569 insertions, 39 deletions
diff --git a/zencore/include/zencore/string.h b/zencore/include/zencore/string.h index 2b5f20f86..bb9b1c896 100644 --- a/zencore/include/zencore/string.h +++ b/zencore/include/zencore/string.h @@ -622,6 +622,45 @@ ToLower(const std::string_view& InString) ////////////////////////////////////////////////////////////////////////// +template<typename Fn> +uint32_t +ForEachStrTok(const std::string_view& Str, char Delim, Fn&& Func) +{ + auto It = Str.begin(); + auto End = Str.end(); + uint32_t Count = 0; + + while (It != End) + { + if (*It == Delim) + { + It++; + continue; + } + + std::string_view Remaining{It, End}; + size_t Idx = Remaining.find(Delim, 0); + + if (Idx == std::string_view::npos) + { + Idx = Remaining.size(); + } + + Count++; + std::string_view Token{It, It + Idx}; + if (!Func(Token)) + { + break; + } + + It = It + Idx; + } + + return Count; +} + +////////////////////////////////////////////////////////////////////////// + void string_forcelink(); // internal } // namespace zen diff --git a/zencore/string.cpp b/zencore/string.cpp index 8ea10d2a3..ce1b6d675 100644 --- a/zencore/string.cpp +++ b/zencore/string.cpp @@ -912,4 +912,44 @@ TEST_CASE("filepath") CHECK(FilepathFindExtension(".txt") == std::string_view(".txt")); } +TEST_CASE("string") +{ + using namespace std::literals; + + SUBCASE("ForEachStrTok") + { + const auto Tokens = "here,is,my,different,tokens"sv; + int32_t ExpectedTokenCount = 5; + int32_t TokenCount = 0; + StringBuilder<512> Sb; + + TokenCount = ForEachStrTok(Tokens, ',', [&Sb](const std::string_view& Token) { + if (Sb.Size()) + { + Sb << ","; + } + Sb << Token; + return true; + }); + + CHECK(TokenCount == ExpectedTokenCount); + CHECK(Sb.ToString() == Tokens); + + ExpectedTokenCount = 1; + const auto Str = "mosdef"sv; + + Sb.Reset(); + TokenCount = ForEachStrTok(Str, ' ', [&Sb](const std::string_view& Token) { + Sb << Token; + return true; + }); + CHECK(Sb.ToString() == Str); + CHECK(TokenCount == ExpectedTokenCount); + + ExpectedTokenCount = 0; + TokenCount = ForEachStrTok(""sv, ',', [](const std::string_view&) { return true; }); + CHECK(TokenCount == ExpectedTokenCount); + } +} + } // namespace zen diff --git a/zenhttp/include/zenhttp/httpserver.h b/zenhttp/include/zenhttp/httpserver.h index f656c69a8..ed6075c92 100644 --- a/zenhttp/include/zenhttp/httpserver.h +++ b/zenhttp/include/zenhttp/httpserver.h @@ -39,7 +39,7 @@ public: { std::vector<std::pair<std::string_view, std::string_view>> KvPairs; - std::string_view GetValue(std::string_view ParamName) + std::string_view GetValue(std::string_view ParamName) const { for (const auto& Kv : KvPairs) { diff --git a/zenserver-test/zenserver-test.cpp b/zenserver-test/zenserver-test.cpp index 17b8b76eb..7629e2ea1 100644 --- a/zenserver-test/zenserver-test.cpp +++ b/zenserver-test/zenserver-test.cpp @@ -1402,6 +1402,286 @@ TEST_CASE("zcache.cbpackage") } } +TEST_CASE("zcache.policy") +{ + using namespace std::literals; + + struct ZenConfig + { + std::filesystem::path DataDir; + uint16_t Port; + std::string BaseUri; + std::string Args; + + static ZenConfig New(uint16_t Port = 13337, std::string Args = "") + { + return ZenConfig{.DataDir = TestEnv.CreateNewTestDir(), + .Port = Port, + .BaseUri = "http://localhost:{}/z$"_format(Port), + .Args = std::move(Args)}; + } + + static ZenConfig NewWithUpstream(uint16_t UpstreamPort) + { + return New(13337, "--upstream-thread-count=0 --upstream-zen-url=http://localhost:{}"_format(UpstreamPort)); + } + + void Spawn(ZenServerInstance& Inst) + { + Inst.SetTestDir(DataDir); + Inst.SpawnServer(Port, Args); + Inst.WaitUntilReady(); + } + }; + + auto GenerateData = [](uint64_t Size, zen::IoHash& OutHash) -> zen::UniqueBuffer { + auto Buf = zen::UniqueBuffer::Alloc(Size); + uint8_t* Data = reinterpret_cast<uint8_t*>(Buf.GetData()); + for (uint64_t Idx = 0; Idx < Size; Idx++) + { + Data[Idx] = Idx % 256; + } + OutHash = zen::IoHash::HashBuffer(Data, Size); + return Buf; + }; + + auto GeneratePackage = [](zen::IoHash& OutAttachmentKey) -> zen::CbPackage { + auto Data = zen::SharedBuffer::Clone(zen::MakeMemoryView<uint8_t>({1, 2, 3, 4, 5, 6, 7, 8, 9})); + auto CompressedData = zen::CompressedBuffer::Compress(Data); + OutAttachmentKey = zen::IoHash::FromBLAKE3(CompressedData.GetRawHash()); + zen::CbWriter Obj; + Obj.BeginObject("obj"sv); + Obj.AddBinaryAttachment("data", OutAttachmentKey); + Obj.EndObject(); + zen::CbPackage Package; + Package.SetObject(Obj.Save().AsObject()); + Package.AddAttachment(zen::CbAttachment(CompressedData)); + + return Package; + }; + + auto ToBuffer = [](zen::CbPackage Package) -> zen::IoBuffer { + zen::MemoryOutStream MemStream; + zen::BinaryWriter Writer(MemStream); + Package.Save(Writer); + + return zen::IoBuffer(zen::IoBuffer::Clone, MemStream.Data(), MemStream.Size()); + }; + + SUBCASE("query - 'local' does not query upstream (binary)") + { + ZenConfig UpstreamCfg = ZenConfig::New(13338); + ZenServerInstance UpstreamInst(TestEnv); + ZenConfig LocalCfg = ZenConfig::NewWithUpstream(13338); + ZenServerInstance LocalInst(TestEnv); + const auto Bucket = "legacy"sv; + + UpstreamCfg.Spawn(UpstreamInst); + LocalCfg.Spawn(LocalInst); + + zen::IoHash Key; + auto BinaryValue = GenerateData(1024, Key); + + // Store binary cache value upstream + { + cpr::Response Result = cpr::Put(cpr::Url{"{}/{}/{}"_format(UpstreamCfg.BaseUri, Bucket, Key)}, + cpr::Body{(const char*)BinaryValue.GetData(), BinaryValue.GetSize()}, + cpr::Header{{"Content-Type", "application/octet-stream"}}); + CHECK(Result.status_code == 201); + } + + { + cpr::Response Result = cpr::Get(cpr::Url{"{}/{}/{}?query=local"_format(LocalCfg.BaseUri, Bucket, Key)}, + cpr::Header{{"Accept", "application/octet-stream"}}); + CHECK(Result.status_code == 404); + } + + { + cpr::Response Result = cpr::Get(cpr::Url{"{}/{}/{}?query=local,remote"_format(LocalCfg.BaseUri, Bucket, Key)}, + cpr::Header{{"Accept", "application/octet-stream"}}); + CHECK(Result.status_code == 200); + } + } + + SUBCASE("store - 'local' does not store upstream (binary)") + { + ZenConfig UpstreamCfg = ZenConfig::New(13338); + ZenServerInstance UpstreamInst(TestEnv); + ZenConfig LocalCfg = ZenConfig::NewWithUpstream(13338); + ZenServerInstance LocalInst(TestEnv); + const auto Bucket = "legacy"sv; + + UpstreamCfg.Spawn(UpstreamInst); + LocalCfg.Spawn(LocalInst); + + zen::IoHash Key; + auto BinaryValue = GenerateData(1024, Key); + + // Store binary cache value locally + { + cpr::Response Result = cpr::Put(cpr::Url{"{}/{}/{}?store=local"_format(LocalCfg.BaseUri, Bucket, Key)}, + cpr::Body{(const char*)BinaryValue.GetData(), BinaryValue.GetSize()}, + cpr::Header{{"Content-Type", "application/octet-stream"}}); + CHECK(Result.status_code == 201); + } + + { + cpr::Response Result = cpr::Get(cpr::Url{"{}/{}/{}"_format(UpstreamCfg.BaseUri, Bucket, Key)}, + cpr::Header{{"Accept", "application/octet-stream"}}); + CHECK(Result.status_code == 404); + } + + { + cpr::Response Result = + cpr::Get(cpr::Url{"{}/{}/{}"_format(LocalCfg.BaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/octet-stream"}}); + CHECK(Result.status_code == 200); + } + } + + SUBCASE("store - 'local/remote' stores local and upstream (binary)") + { + ZenConfig UpstreamCfg = ZenConfig::New(13338); + ZenServerInstance UpstreamInst(TestEnv); + ZenConfig LocalCfg = ZenConfig::NewWithUpstream(13338); + ZenServerInstance LocalInst(TestEnv); + const auto Bucket = "legacy"sv; + + UpstreamCfg.Spawn(UpstreamInst); + LocalCfg.Spawn(LocalInst); + + zen::IoHash Key; + auto BinaryValue = GenerateData(1024, Key); + + // Store binary cache value locally and upstream + { + cpr::Response Result = cpr::Put(cpr::Url{"{}/{}/{}?store=local,remote"_format(LocalCfg.BaseUri, Bucket, Key)}, + cpr::Body{(const char*)BinaryValue.GetData(), BinaryValue.GetSize()}, + cpr::Header{{"Content-Type", "application/octet-stream"}}); + CHECK(Result.status_code == 201); + } + + { + cpr::Response Result = cpr::Get(cpr::Url{"{}/{}/{}"_format(UpstreamCfg.BaseUri, Bucket, Key)}, + cpr::Header{{"Accept", "application/octet-stream"}}); + CHECK(Result.status_code == 200); + } + + { + cpr::Response Result = + cpr::Get(cpr::Url{"{}/{}/{}"_format(LocalCfg.BaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/octet-stream"}}); + CHECK(Result.status_code == 200); + } + } + + SUBCASE("query - 'local' does not query upstream (cppackage)") + { + ZenConfig UpstreamCfg = ZenConfig::New(13338); + ZenServerInstance UpstreamInst(TestEnv); + ZenConfig LocalCfg = ZenConfig::NewWithUpstream(13338); + ZenServerInstance LocalInst(TestEnv); + const auto Bucket = "legacy"sv; + + UpstreamCfg.Spawn(UpstreamInst); + LocalCfg.Spawn(LocalInst); + + zen::IoHash Key; + zen::CbPackage Package = GeneratePackage(Key); + auto Buf = ToBuffer(Package); + + // Store package upstream + { + cpr::Response Result = cpr::Put(cpr::Url{"{}/{}/{}"_format(UpstreamCfg.BaseUri, Bucket, Key)}, + cpr::Body{(const char*)Buf.GetData(), Buf.GetSize()}, + cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}}); + CHECK(Result.status_code == 201); + } + + { + cpr::Response Result = cpr::Get(cpr::Url{"{}/{}/{}?query=local"_format(LocalCfg.BaseUri, Bucket, Key)}, + cpr::Header{{"Accept", "application/x-ue-cbpkg"}}); + CHECK(Result.status_code == 404); + } + + { + cpr::Response Result = cpr::Get(cpr::Url{"{}/{}/{}?query=local,remote"_format(LocalCfg.BaseUri, Bucket, Key)}, + cpr::Header{{"Accept", "application/x-ue-cbpkg"}}); + CHECK(Result.status_code == 200); + } + } + + SUBCASE("store - 'local' does not store upstream (cbpackge)") + { + ZenConfig UpstreamCfg = ZenConfig::New(13338); + ZenServerInstance UpstreamInst(TestEnv); + ZenConfig LocalCfg = ZenConfig::NewWithUpstream(13338); + ZenServerInstance LocalInst(TestEnv); + const auto Bucket = "legacy"sv; + + UpstreamCfg.Spawn(UpstreamInst); + LocalCfg.Spawn(LocalInst); + + zen::IoHash Key; + zen::CbPackage Package = GeneratePackage(Key); + auto Buf = ToBuffer(Package); + + // Store packge locally + { + cpr::Response Result = cpr::Put(cpr::Url{"{}/{}/{}?store=local"_format(LocalCfg.BaseUri, Bucket, Key)}, + cpr::Body{(const char*)Buf.GetData(), Buf.GetSize()}, + cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}}); + CHECK(Result.status_code == 201); + } + + { + cpr::Response Result = + cpr::Get(cpr::Url{"{}/{}/{}"_format(UpstreamCfg.BaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/x-ue-cbpkg"}}); + CHECK(Result.status_code == 404); + } + + { + cpr::Response Result = + cpr::Get(cpr::Url{"{}/{}/{}"_format(LocalCfg.BaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/x-ue-cbpkg"}}); + CHECK(Result.status_code == 200); + } + } + + SUBCASE("store - 'local/remote' stores local and upstream (cbpackage)") + { + ZenConfig UpstreamCfg = ZenConfig::New(13338); + ZenServerInstance UpstreamInst(TestEnv); + ZenConfig LocalCfg = ZenConfig::NewWithUpstream(13338); + ZenServerInstance LocalInst(TestEnv); + const auto Bucket = "legacy"sv; + + UpstreamCfg.Spawn(UpstreamInst); + LocalCfg.Spawn(LocalInst); + + zen::IoHash Key; + zen::CbPackage Package = GeneratePackage(Key); + auto Buf = ToBuffer(Package); + + // Store package locally and upstream + { + cpr::Response Result = cpr::Put(cpr::Url{"{}/{}/{}?store=local,remote"_format(LocalCfg.BaseUri, Bucket, Key)}, + cpr::Body{(const char*)Buf.GetData(), Buf.GetSize()}, + cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}}); + CHECK(Result.status_code == 201); + } + + { + cpr::Response Result = + cpr::Get(cpr::Url{"{}/{}/{}"_format(UpstreamCfg.BaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/x-ue-cbpkg"}}); + CHECK(Result.status_code == 200); + } + + { + cpr::Response Result = + cpr::Get(cpr::Url{"{}/{}/{}"_format(LocalCfg.BaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/x-ue-cbpkg"}}); + CHECK(Result.status_code == 200); + } + } +} + struct RemoteExecutionRequest { RemoteExecutionRequest(std::string_view Host, int Port, std::filesystem::path& TreePath) diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index da2e8850e..c66b1f98d 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -25,12 +25,125 @@ #include <queue> #include <thread> +#include <gsl/gsl-lite.hpp> + namespace zen { using namespace std::literals; ////////////////////////////////////////////////////////////////////////// +namespace detail { namespace cacheopt { + constexpr std::string_view Local = "local"sv; + constexpr std::string_view Remote = "remote"sv; + constexpr std::string_view Data = "data"sv; + constexpr std::string_view Meta = "meta"sv; + constexpr std::string_view Value = "value"sv; + constexpr std::string_view Attachments = "attachments"sv; +}} // namespace detail::cacheopt + +////////////////////////////////////////////////////////////////////////// + +enum class CachePolicy : uint8_t +{ + None = 0, + QueryLocal = 1 << 0, + QueryRemote = 1 << 1, + Query = QueryLocal | QueryRemote, + StoreLocal = 1 << 2, + StoreRemote = 1 << 3, + Store = StoreLocal | StoreRemote, + SkipMeta = 1 << 4, + SkipValue = 1 << 5, + SkipAttachments = 1 << 6, + SkipData = SkipMeta | SkipValue | SkipAttachments, + SkipLocalCopy = 1 << 7, + Local = QueryLocal | StoreLocal, + Remote = QueryRemote | StoreRemote, + Default = Query | Store, + Disable = None, +}; + +gsl_DEFINE_ENUM_BITMASK_OPERATORS(CachePolicy); + +CachePolicy +ParseCachePolicy(const zen::HttpServerRequest::QueryParams& QueryParams) +{ + CachePolicy QueryPolicy = CachePolicy::Query; + + { + std::string_view Opts = QueryParams.GetValue("query"sv); + if (!Opts.empty()) + { + QueryPolicy = CachePolicy::None; + zen::ForEachStrTok(Opts, ',', [&QueryPolicy](const std::string_view& Opt) { + if (Opt == detail::cacheopt::Local) + { + QueryPolicy |= CachePolicy::QueryLocal; + } + if (Opt == detail::cacheopt::Remote) + { + QueryPolicy |= CachePolicy::QueryRemote; + } + return true; + }); + } + } + + CachePolicy StorePolicy = CachePolicy::Store; + + { + std::string_view Opts = QueryParams.GetValue("store"sv); + if (!Opts.empty()) + { + StorePolicy = CachePolicy::None; + zen::ForEachStrTok(Opts, ',', [&StorePolicy](const std::string_view& Opt) { + if (Opt == detail::cacheopt::Local) + { + StorePolicy |= CachePolicy::StoreLocal; + } + if (Opt == detail::cacheopt::Remote) + { + StorePolicy |= CachePolicy::StoreRemote; + } + return true; + }); + } + } + + CachePolicy SkipPolicy = CachePolicy::None; + + { + std::string_view Opts = QueryParams.GetValue("skip"sv); + if (!Opts.empty()) + { + zen::ForEachStrTok(Opts, ',', [&SkipPolicy](const std::string_view& Opt) { + if (Opt == detail::cacheopt::Meta) + { + SkipPolicy |= CachePolicy::SkipMeta; + } + if (Opt == detail::cacheopt::Value) + { + SkipPolicy |= CachePolicy::SkipValue; + } + if (Opt == detail::cacheopt::Attachments) + { + SkipPolicy |= CachePolicy::SkipAttachments; + } + if (Opt == detail::cacheopt::Data) + { + SkipPolicy |= CachePolicy::SkipData; + } + return true; + }); + } + } + + return QueryPolicy | StorePolicy | SkipPolicy; +} + +////////////////////////////////////////////////////////////////////////// + HttpStructuredCacheService::HttpStructuredCacheService(::ZenCacheStore& InCacheStore, zen::CasStore& InStore, zen::CidStore& InCidStore, @@ -78,13 +191,16 @@ HttpStructuredCacheService::HandleRequest(zen::HttpServerRequest& Request) return Request.WriteResponse(zen::HttpResponseCode::BadRequest); // invalid URL } + const auto QueryParams = Request.GetQueryParams(); + CachePolicy Policy = ParseCachePolicy(QueryParams); + if (Ref.PayloadId == IoHash::Zero) { - return HandleCacheRecordRequest(Request, Ref); + return HandleCacheRecordRequest(Request, Ref, Policy); } else { - return HandleCachePayloadRequest(Request, Ref); + return HandleCachePayloadRequest(Request, Ref, Policy); } return; @@ -121,7 +237,7 @@ HttpStructuredCacheService::HandleCacheBucketRequest(zen::HttpServerRequest& Req } void -HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Request, CacheRef& Ref) +HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Request, CacheRef& Ref, CachePolicy Policy) { switch (auto Verb = Request.RequestVerb()) { @@ -136,7 +252,10 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req bool Success = m_CacheStore.Get(Ref.BucketSegment, Ref.HashKey, /* out */ Value); bool InUpstreamCache = false; - if (!Success && m_UpstreamCache) + const bool QueryUpstream = + !Success && m_UpstreamCache && (zen::CachePolicy::QueryRemote == (Policy & zen::CachePolicy::QueryRemote)); + + if (QueryUpstream) { const ZenContentType CacheRecordType = Ref.BucketSegment == "legacy"sv ? ZenContentType::kBinary : AcceptType == ZenContentType::kCbPackage ? ZenContentType::kCbPackage @@ -326,13 +445,15 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req const HttpContentType ContentType = Request.RequestContentType(); + const bool StoreUpstream = m_UpstreamCache && (zen::CachePolicy::StoreRemote == (Policy & zen::CachePolicy::StoreRemote)); + if (ContentType == HttpContentType::kBinary || ContentType == HttpContentType::kUnknownContentType) { // TODO: create a cache record and put value in CAS? m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, {.Value = Body}); ZEN_DEBUG("PUT - binary '{}/{}' {}", Ref.BucketSegment, Ref.HashKey, NiceBytes(Body.Size())); - if (m_UpstreamCache) + if (StoreUpstream) { auto Result = m_UpstreamCache->EnqueueUpstream( {.Type = ZenContentType::kBinary, .CacheKey = {Ref.BucketSegment, Ref.HashKey}}); @@ -397,15 +518,12 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req MissingRefs.size(), References.size()); - if (MissingRefs.empty()) + if (MissingRefs.empty() && StoreUpstream) { - // Only enqueue valid cache records, i.e. all referenced payloads exists - if (m_UpstreamCache) - { - auto Result = m_UpstreamCache->EnqueueUpstream({.Type = ZenContentType::kCbObject, - .CacheKey = {Ref.BucketSegment, Ref.HashKey}, - .PayloadIds = std::move(References)}); - } + ZEN_ASSERT(m_UpstreamCache); + auto Result = m_UpstreamCache->EnqueueUpstream({.Type = ZenContentType::kCbObject, + .CacheKey = {Ref.BucketSegment, Ref.HashKey}, + .PayloadIds = std::move(References)}); return Request.WriteResponse(zen::HttpResponseCode::Created); } @@ -502,8 +620,9 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req ZenCacheValue CacheValue{.Value = CacheRecordChunk}; m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, CacheValue); - if (m_UpstreamCache) + if (StoreUpstream) { + ZEN_ASSERT(m_UpstreamCache); auto Result = m_UpstreamCache->EnqueueUpstream({.Type = ZenContentType::kCbPackage, .CacheKey = {Ref.BucketSegment, Ref.HashKey}, .PayloadIds = std::move(PayloadIds)}); @@ -536,7 +655,7 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req } void -HttpStructuredCacheService::HandleCachePayloadRequest(zen::HttpServerRequest& Request, CacheRef& Ref) +HttpStructuredCacheService::HandleCachePayloadRequest(zen::HttpServerRequest& Request, CacheRef& Ref, CachePolicy Policy) { // Note: the URL references the uncompressed payload hash - so this maintains the mapping // from uncompressed CAS identity (aka CID/Content ID) to the stored payload hash @@ -544,6 +663,8 @@ HttpStructuredCacheService::HandleCachePayloadRequest(zen::HttpServerRequest& Re // this is a PITA but a consequence of the fact that the client side code is not able to // address data by compressed hash + ZEN_UNUSED(Policy); + switch (auto Verb = Request.RequestVerb()) { using enum zen::HttpVerb; diff --git a/zenserver/cache/structuredcache.h b/zenserver/cache/structuredcache.h index 8289fd700..796b21d1f 100644 --- a/zenserver/cache/structuredcache.h +++ b/zenserver/cache/structuredcache.h @@ -17,6 +17,7 @@ namespace zen { class CasStore; class CidStore; class UpstreamCache; +enum class CachePolicy : uint8_t; /** * Structured cache service. Imposes constraints on keys, supports blobs and @@ -70,8 +71,8 @@ private: }; [[nodiscard]] bool ValidateKeyUri(zen::HttpServerRequest& Request, CacheRef& OutRef); - void HandleCacheRecordRequest(zen::HttpServerRequest& Request, CacheRef& Ref); - void HandleCachePayloadRequest(zen::HttpServerRequest& Request, CacheRef& Ref); + void HandleCacheRecordRequest(zen::HttpServerRequest& Request, CacheRef& Ref, CachePolicy Policy); + void HandleCachePayloadRequest(zen::HttpServerRequest& Request, CacheRef& Ref, CachePolicy Policy); void HandleCacheBucketRequest(zen::HttpServerRequest& Request, std::string_view Bucket); spdlog::logger& Log() { return m_Log; } diff --git a/zenserver/config.cpp b/zenserver/config.cpp index 092fc6998..164d2a792 100644 --- a/zenserver/config.cpp +++ b/zenserver/config.cpp @@ -55,6 +55,27 @@ PickDefaultStateDirectory() #endif +UpstreamCachePolicy +ParseUpstreamCachePolicy(std::string_view Options) +{ + if (Options == "readonly") + { + return UpstreamCachePolicy::Read; + } + else if (Options == "writeonly") + { + return UpstreamCachePolicy::Write; + } + else if (Options == "disabled") + { + return UpstreamCachePolicy::Disabled; + } + else + { + return UpstreamCachePolicy::ReadWrite; + } +} + void ParseGlobalCliOptions(int argc, char* argv[], ZenServerOptions& GlobalOptions, ZenServiceConfig& ServiceConfig) { @@ -113,6 +134,14 @@ ParseGlobalCliOptions(int argc, char* argv[], ZenServerOptions& GlobalOptions, Z cxxopts::value<bool>(ServiceConfig.ShouldCrash)->default_value("false"), ""); + std::string UpstreamCachePolicyOptions; + options.add_option("cache", + "", + "upstream-cache-policy", + "", + cxxopts::value<std::string>(UpstreamCachePolicyOptions)->default_value(""), + "Upstream cache policy (readwrite|readonly|writeonly|disabled)"); + options.add_option("cache", "", "upstream-jupiter-url", @@ -178,13 +207,6 @@ ParseGlobalCliOptions(int argc, char* argv[], ZenServerOptions& GlobalOptions, Z options.add_option("cache", "", - "upstream-enabled", - "Whether upstream caching is disabled", - cxxopts::value<bool>(ServiceConfig.UpstreamCacheConfig.Enabled)->default_value("true"), - ""); - - options.add_option("cache", - "", "upstream-thread-count", "Number of threads used for upstream procsssing", cxxopts::value<int>(ServiceConfig.UpstreamCacheConfig.UpstreamThreadCount)->default_value("4"), @@ -200,6 +222,8 @@ ParseGlobalCliOptions(int argc, char* argv[], ZenServerOptions& GlobalOptions, Z exit(0); } + + ServiceConfig.UpstreamCacheConfig.CachePolicy = ParseUpstreamCachePolicy(UpstreamCachePolicyOptions); } catch (cxxopts::OptionParseException& e) { @@ -276,7 +300,8 @@ ParseServiceConfig(const std::filesystem::path& DataRoot, ZenServiceConfig& Serv if (auto UpstreamConfig = StructuredCacheConfig->get<sol::optional<sol::table>>("upstream")) { - ServiceConfig.UpstreamCacheConfig.Enabled = UpstreamConfig->get_or("enable", ServiceConfig.UpstreamCacheConfig.Enabled); + std::string Policy = UpstreamConfig->get_or("policy", std::string()); + ServiceConfig.UpstreamCacheConfig.CachePolicy = ParseUpstreamCachePolicy(Policy); ServiceConfig.UpstreamCacheConfig.UpstreamThreadCount = UpstreamConfig->get_or("upstreamthreadcount", 4); if (auto JupiterConfig = UpstreamConfig->get<sol::optional<sol::table>>("jupiter")) diff --git a/zenserver/config.h b/zenserver/config.h index c06102384..6ade1b401 100644 --- a/zenserver/config.h +++ b/zenserver/config.h @@ -36,12 +36,20 @@ struct ZenUpstreamZenConfig std::string Url; }; +enum class UpstreamCachePolicy : uint8_t +{ + Disabled = 0, + Read = 1 << 0, + Write = 1 << 1, + ReadWrite = Read | Write +}; + struct ZenUpstreamCacheConfig { ZenUpstreamJupiterConfig JupiterConfig; ZenUpstreamZenConfig ZenConfig; int UpstreamThreadCount = 4; - bool Enabled = false; + UpstreamCachePolicy CachePolicy = UpstreamCachePolicy::ReadWrite; }; struct ZenServiceConfig diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index 38d30a795..c015ef3e9 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -559,12 +559,15 @@ public: virtual GetUpstreamCacheResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) override { - for (auto& Endpoint : m_Endpoints) + if (m_Options.ReadUpstream) { - if (GetUpstreamCacheResult Result = Endpoint->GetCacheRecord(CacheKey, Type); Result.Success) + for (auto& Endpoint : m_Endpoints) { - m_Stats.Add(*Endpoint, Result); - return Result; + if (GetUpstreamCacheResult Result = Endpoint->GetCacheRecord(CacheKey, Type); Result.Success) + { + m_Stats.Add(*Endpoint, Result); + return Result; + } } } @@ -573,12 +576,15 @@ public: virtual GetUpstreamCacheResult GetCachePayload(UpstreamPayloadKey PayloadKey) override { - for (auto& Endpoint : m_Endpoints) + if (m_Options.ReadUpstream) { - if (GetUpstreamCacheResult Result = Endpoint->GetCachePayload(PayloadKey); Result.Success) + for (auto& Endpoint : m_Endpoints) { - m_Stats.Add(*Endpoint, Result); - return Result; + if (GetUpstreamCacheResult Result = Endpoint->GetCachePayload(PayloadKey); Result.Success) + { + m_Stats.Add(*Endpoint, Result); + return Result; + } } } @@ -587,7 +593,7 @@ public: virtual EnqueueResult EnqueueUpstream(UpstreamCacheRecord CacheRecord) override { - if (m_IsRunning.load()) + if (m_IsRunning.load() && m_Options.WriteUpstream) { if (!m_UpstreamThreads.empty()) { diff --git a/zenserver/upstream/upstreamcache.h b/zenserver/upstream/upstreamcache.h index 327778452..3b054d815 100644 --- a/zenserver/upstream/upstreamcache.h +++ b/zenserver/upstream/upstreamcache.h @@ -36,7 +36,9 @@ struct UpstreamCacheRecord struct UpstreamCacheOptions { - uint32_t ThreadCount = 4; + uint32_t ThreadCount = 4; + bool ReadUpstream = true; + bool WriteUpstream = true; }; struct GetUpstreamCacheResult diff --git a/zenserver/zenserver.cpp b/zenserver/zenserver.cpp index f36cfba48..83580b288 100644 --- a/zenserver/zenserver.cpp +++ b/zenserver/zenserver.cpp @@ -166,11 +166,15 @@ public: m_CacheStore = std::make_unique<ZenCacheStore>(*m_CasStore, m_DataRoot / "cache"); std::unique_ptr<zen::UpstreamCache> UpstreamCache; - if (ServiceConfig.UpstreamCacheConfig.Enabled) + if (ServiceConfig.UpstreamCacheConfig.CachePolicy != UpstreamCachePolicy::Disabled) { const ZenUpstreamCacheConfig& UpstreamConfig = ServiceConfig.UpstreamCacheConfig; zen::UpstreamCacheOptions UpstreamOptions; + UpstreamOptions.ReadUpstream = + (uint8_t(ServiceConfig.UpstreamCacheConfig.CachePolicy) & uint8_t(UpstreamCachePolicy::Read)) != 0; + UpstreamOptions.WriteUpstream = + (uint8_t(ServiceConfig.UpstreamCacheConfig.CachePolicy) & uint8_t(UpstreamCachePolicy::Write)) != 0; if (UpstreamConfig.UpstreamThreadCount < 32) { @@ -216,7 +220,11 @@ public: if (UpstreamCache->Initialize()) { - ZEN_INFO("upstream cache active"); + ZEN_INFO("upstream cache active ({})", + UpstreamOptions.ReadUpstream && UpstreamOptions.WriteUpstream ? "READ|WRITE" + : UpstreamOptions.ReadUpstream ? "READONLY" + : UpstreamOptions.WriteUpstream ? "WRITEONLY" + : "DISABLED"); } else { |