aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2021-09-17 19:11:27 +0200
committerStefan Boberg <[email protected]>2021-09-17 19:11:27 +0200
commita47396e06a8c6ebc2a44135c47d4820e6984c225 (patch)
treee719ed435a2de51e477822d98b0c71083edcca6a
parentImplemented basics for Windows server support (not yet 100% - needs to proper... (diff)
parentAdded upstream cache policy command line option (read|write,readonly,writeonl... (diff)
downloadzen-a47396e06a8c6ebc2a44135c47d4820e6984c225.tar.xz
zen-a47396e06a8c6ebc2a44135c47d4820e6984c225.zip
Merge branch 'main' of https://github.com/EpicGames/zen
-rw-r--r--zencore/include/zencore/string.h39
-rw-r--r--zencore/string.cpp40
-rw-r--r--zenhttp/include/zenhttp/httpserver.h2
-rw-r--r--zenserver-test/zenserver-test.cpp280
-rw-r--r--zenserver/cache/structuredcache.cpp151
-rw-r--r--zenserver/cache/structuredcache.h5
-rw-r--r--zenserver/config.cpp41
-rw-r--r--zenserver/config.h10
-rw-r--r--zenserver/upstream/upstreamcache.cpp24
-rw-r--r--zenserver/upstream/upstreamcache.h4
-rw-r--r--zenserver/zenserver.cpp12
11 files changed, 569 insertions, 39 deletions
diff --git a/zencore/include/zencore/string.h b/zencore/include/zencore/string.h
index 2b5f20f86..bb9b1c896 100644
--- a/zencore/include/zencore/string.h
+++ b/zencore/include/zencore/string.h
@@ -622,6 +622,45 @@ ToLower(const std::string_view& InString)
//////////////////////////////////////////////////////////////////////////
+template<typename Fn>
+uint32_t
+ForEachStrTok(const std::string_view& Str, char Delim, Fn&& Func)
+{
+ auto It = Str.begin();
+ auto End = Str.end();
+ uint32_t Count = 0;
+
+ while (It != End)
+ {
+ if (*It == Delim)
+ {
+ It++;
+ continue;
+ }
+
+ std::string_view Remaining{It, End};
+ size_t Idx = Remaining.find(Delim, 0);
+
+ if (Idx == std::string_view::npos)
+ {
+ Idx = Remaining.size();
+ }
+
+ Count++;
+ std::string_view Token{It, It + Idx};
+ if (!Func(Token))
+ {
+ break;
+ }
+
+ It = It + Idx;
+ }
+
+ return Count;
+}
+
+//////////////////////////////////////////////////////////////////////////
+
void string_forcelink(); // internal
} // namespace zen
diff --git a/zencore/string.cpp b/zencore/string.cpp
index 8ea10d2a3..ce1b6d675 100644
--- a/zencore/string.cpp
+++ b/zencore/string.cpp
@@ -912,4 +912,44 @@ TEST_CASE("filepath")
CHECK(FilepathFindExtension(".txt") == std::string_view(".txt"));
}
+TEST_CASE("string")
+{
+ using namespace std::literals;
+
+ SUBCASE("ForEachStrTok")
+ {
+ const auto Tokens = "here,is,my,different,tokens"sv;
+ int32_t ExpectedTokenCount = 5;
+ int32_t TokenCount = 0;
+ StringBuilder<512> Sb;
+
+ TokenCount = ForEachStrTok(Tokens, ',', [&Sb](const std::string_view& Token) {
+ if (Sb.Size())
+ {
+ Sb << ",";
+ }
+ Sb << Token;
+ return true;
+ });
+
+ CHECK(TokenCount == ExpectedTokenCount);
+ CHECK(Sb.ToString() == Tokens);
+
+ ExpectedTokenCount = 1;
+ const auto Str = "mosdef"sv;
+
+ Sb.Reset();
+ TokenCount = ForEachStrTok(Str, ' ', [&Sb](const std::string_view& Token) {
+ Sb << Token;
+ return true;
+ });
+ CHECK(Sb.ToString() == Str);
+ CHECK(TokenCount == ExpectedTokenCount);
+
+ ExpectedTokenCount = 0;
+ TokenCount = ForEachStrTok(""sv, ',', [](const std::string_view&) { return true; });
+ CHECK(TokenCount == ExpectedTokenCount);
+ }
+}
+
} // namespace zen
diff --git a/zenhttp/include/zenhttp/httpserver.h b/zenhttp/include/zenhttp/httpserver.h
index f656c69a8..ed6075c92 100644
--- a/zenhttp/include/zenhttp/httpserver.h
+++ b/zenhttp/include/zenhttp/httpserver.h
@@ -39,7 +39,7 @@ public:
{
std::vector<std::pair<std::string_view, std::string_view>> KvPairs;
- std::string_view GetValue(std::string_view ParamName)
+ std::string_view GetValue(std::string_view ParamName) const
{
for (const auto& Kv : KvPairs)
{
diff --git a/zenserver-test/zenserver-test.cpp b/zenserver-test/zenserver-test.cpp
index 17b8b76eb..7629e2ea1 100644
--- a/zenserver-test/zenserver-test.cpp
+++ b/zenserver-test/zenserver-test.cpp
@@ -1402,6 +1402,286 @@ TEST_CASE("zcache.cbpackage")
}
}
+TEST_CASE("zcache.policy")
+{
+ using namespace std::literals;
+
+ struct ZenConfig
+ {
+ std::filesystem::path DataDir;
+ uint16_t Port;
+ std::string BaseUri;
+ std::string Args;
+
+ static ZenConfig New(uint16_t Port = 13337, std::string Args = "")
+ {
+ return ZenConfig{.DataDir = TestEnv.CreateNewTestDir(),
+ .Port = Port,
+ .BaseUri = "http://localhost:{}/z$"_format(Port),
+ .Args = std::move(Args)};
+ }
+
+ static ZenConfig NewWithUpstream(uint16_t UpstreamPort)
+ {
+ return New(13337, "--upstream-thread-count=0 --upstream-zen-url=http://localhost:{}"_format(UpstreamPort));
+ }
+
+ void Spawn(ZenServerInstance& Inst)
+ {
+ Inst.SetTestDir(DataDir);
+ Inst.SpawnServer(Port, Args);
+ Inst.WaitUntilReady();
+ }
+ };
+
+ auto GenerateData = [](uint64_t Size, zen::IoHash& OutHash) -> zen::UniqueBuffer {
+ auto Buf = zen::UniqueBuffer::Alloc(Size);
+ uint8_t* Data = reinterpret_cast<uint8_t*>(Buf.GetData());
+ for (uint64_t Idx = 0; Idx < Size; Idx++)
+ {
+ Data[Idx] = Idx % 256;
+ }
+ OutHash = zen::IoHash::HashBuffer(Data, Size);
+ return Buf;
+ };
+
+ auto GeneratePackage = [](zen::IoHash& OutAttachmentKey) -> zen::CbPackage {
+ auto Data = zen::SharedBuffer::Clone(zen::MakeMemoryView<uint8_t>({1, 2, 3, 4, 5, 6, 7, 8, 9}));
+ auto CompressedData = zen::CompressedBuffer::Compress(Data);
+ OutAttachmentKey = zen::IoHash::FromBLAKE3(CompressedData.GetRawHash());
+ zen::CbWriter Obj;
+ Obj.BeginObject("obj"sv);
+ Obj.AddBinaryAttachment("data", OutAttachmentKey);
+ Obj.EndObject();
+ zen::CbPackage Package;
+ Package.SetObject(Obj.Save().AsObject());
+ Package.AddAttachment(zen::CbAttachment(CompressedData));
+
+ return Package;
+ };
+
+ auto ToBuffer = [](zen::CbPackage Package) -> zen::IoBuffer {
+ zen::MemoryOutStream MemStream;
+ zen::BinaryWriter Writer(MemStream);
+ Package.Save(Writer);
+
+ return zen::IoBuffer(zen::IoBuffer::Clone, MemStream.Data(), MemStream.Size());
+ };
+
+ SUBCASE("query - 'local' does not query upstream (binary)")
+ {
+ ZenConfig UpstreamCfg = ZenConfig::New(13338);
+ ZenServerInstance UpstreamInst(TestEnv);
+ ZenConfig LocalCfg = ZenConfig::NewWithUpstream(13338);
+ ZenServerInstance LocalInst(TestEnv);
+ const auto Bucket = "legacy"sv;
+
+ UpstreamCfg.Spawn(UpstreamInst);
+ LocalCfg.Spawn(LocalInst);
+
+ zen::IoHash Key;
+ auto BinaryValue = GenerateData(1024, Key);
+
+ // Store binary cache value upstream
+ {
+ cpr::Response Result = cpr::Put(cpr::Url{"{}/{}/{}"_format(UpstreamCfg.BaseUri, Bucket, Key)},
+ cpr::Body{(const char*)BinaryValue.GetData(), BinaryValue.GetSize()},
+ cpr::Header{{"Content-Type", "application/octet-stream"}});
+ CHECK(Result.status_code == 201);
+ }
+
+ {
+ cpr::Response Result = cpr::Get(cpr::Url{"{}/{}/{}?query=local"_format(LocalCfg.BaseUri, Bucket, Key)},
+ cpr::Header{{"Accept", "application/octet-stream"}});
+ CHECK(Result.status_code == 404);
+ }
+
+ {
+ cpr::Response Result = cpr::Get(cpr::Url{"{}/{}/{}?query=local,remote"_format(LocalCfg.BaseUri, Bucket, Key)},
+ cpr::Header{{"Accept", "application/octet-stream"}});
+ CHECK(Result.status_code == 200);
+ }
+ }
+
+ SUBCASE("store - 'local' does not store upstream (binary)")
+ {
+ ZenConfig UpstreamCfg = ZenConfig::New(13338);
+ ZenServerInstance UpstreamInst(TestEnv);
+ ZenConfig LocalCfg = ZenConfig::NewWithUpstream(13338);
+ ZenServerInstance LocalInst(TestEnv);
+ const auto Bucket = "legacy"sv;
+
+ UpstreamCfg.Spawn(UpstreamInst);
+ LocalCfg.Spawn(LocalInst);
+
+ zen::IoHash Key;
+ auto BinaryValue = GenerateData(1024, Key);
+
+ // Store binary cache value locally
+ {
+ cpr::Response Result = cpr::Put(cpr::Url{"{}/{}/{}?store=local"_format(LocalCfg.BaseUri, Bucket, Key)},
+ cpr::Body{(const char*)BinaryValue.GetData(), BinaryValue.GetSize()},
+ cpr::Header{{"Content-Type", "application/octet-stream"}});
+ CHECK(Result.status_code == 201);
+ }
+
+ {
+ cpr::Response Result = cpr::Get(cpr::Url{"{}/{}/{}"_format(UpstreamCfg.BaseUri, Bucket, Key)},
+ cpr::Header{{"Accept", "application/octet-stream"}});
+ CHECK(Result.status_code == 404);
+ }
+
+ {
+ cpr::Response Result =
+ cpr::Get(cpr::Url{"{}/{}/{}"_format(LocalCfg.BaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/octet-stream"}});
+ CHECK(Result.status_code == 200);
+ }
+ }
+
+ SUBCASE("store - 'local/remote' stores local and upstream (binary)")
+ {
+ ZenConfig UpstreamCfg = ZenConfig::New(13338);
+ ZenServerInstance UpstreamInst(TestEnv);
+ ZenConfig LocalCfg = ZenConfig::NewWithUpstream(13338);
+ ZenServerInstance LocalInst(TestEnv);
+ const auto Bucket = "legacy"sv;
+
+ UpstreamCfg.Spawn(UpstreamInst);
+ LocalCfg.Spawn(LocalInst);
+
+ zen::IoHash Key;
+ auto BinaryValue = GenerateData(1024, Key);
+
+ // Store binary cache value locally and upstream
+ {
+ cpr::Response Result = cpr::Put(cpr::Url{"{}/{}/{}?store=local,remote"_format(LocalCfg.BaseUri, Bucket, Key)},
+ cpr::Body{(const char*)BinaryValue.GetData(), BinaryValue.GetSize()},
+ cpr::Header{{"Content-Type", "application/octet-stream"}});
+ CHECK(Result.status_code == 201);
+ }
+
+ {
+ cpr::Response Result = cpr::Get(cpr::Url{"{}/{}/{}"_format(UpstreamCfg.BaseUri, Bucket, Key)},
+ cpr::Header{{"Accept", "application/octet-stream"}});
+ CHECK(Result.status_code == 200);
+ }
+
+ {
+ cpr::Response Result =
+ cpr::Get(cpr::Url{"{}/{}/{}"_format(LocalCfg.BaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/octet-stream"}});
+ CHECK(Result.status_code == 200);
+ }
+ }
+
+ SUBCASE("query - 'local' does not query upstream (cppackage)")
+ {
+ ZenConfig UpstreamCfg = ZenConfig::New(13338);
+ ZenServerInstance UpstreamInst(TestEnv);
+ ZenConfig LocalCfg = ZenConfig::NewWithUpstream(13338);
+ ZenServerInstance LocalInst(TestEnv);
+ const auto Bucket = "legacy"sv;
+
+ UpstreamCfg.Spawn(UpstreamInst);
+ LocalCfg.Spawn(LocalInst);
+
+ zen::IoHash Key;
+ zen::CbPackage Package = GeneratePackage(Key);
+ auto Buf = ToBuffer(Package);
+
+ // Store package upstream
+ {
+ cpr::Response Result = cpr::Put(cpr::Url{"{}/{}/{}"_format(UpstreamCfg.BaseUri, Bucket, Key)},
+ cpr::Body{(const char*)Buf.GetData(), Buf.GetSize()},
+ cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}});
+ CHECK(Result.status_code == 201);
+ }
+
+ {
+ cpr::Response Result = cpr::Get(cpr::Url{"{}/{}/{}?query=local"_format(LocalCfg.BaseUri, Bucket, Key)},
+ cpr::Header{{"Accept", "application/x-ue-cbpkg"}});
+ CHECK(Result.status_code == 404);
+ }
+
+ {
+ cpr::Response Result = cpr::Get(cpr::Url{"{}/{}/{}?query=local,remote"_format(LocalCfg.BaseUri, Bucket, Key)},
+ cpr::Header{{"Accept", "application/x-ue-cbpkg"}});
+ CHECK(Result.status_code == 200);
+ }
+ }
+
+ SUBCASE("store - 'local' does not store upstream (cbpackge)")
+ {
+ ZenConfig UpstreamCfg = ZenConfig::New(13338);
+ ZenServerInstance UpstreamInst(TestEnv);
+ ZenConfig LocalCfg = ZenConfig::NewWithUpstream(13338);
+ ZenServerInstance LocalInst(TestEnv);
+ const auto Bucket = "legacy"sv;
+
+ UpstreamCfg.Spawn(UpstreamInst);
+ LocalCfg.Spawn(LocalInst);
+
+ zen::IoHash Key;
+ zen::CbPackage Package = GeneratePackage(Key);
+ auto Buf = ToBuffer(Package);
+
+ // Store packge locally
+ {
+ cpr::Response Result = cpr::Put(cpr::Url{"{}/{}/{}?store=local"_format(LocalCfg.BaseUri, Bucket, Key)},
+ cpr::Body{(const char*)Buf.GetData(), Buf.GetSize()},
+ cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}});
+ CHECK(Result.status_code == 201);
+ }
+
+ {
+ cpr::Response Result =
+ cpr::Get(cpr::Url{"{}/{}/{}"_format(UpstreamCfg.BaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/x-ue-cbpkg"}});
+ CHECK(Result.status_code == 404);
+ }
+
+ {
+ cpr::Response Result =
+ cpr::Get(cpr::Url{"{}/{}/{}"_format(LocalCfg.BaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/x-ue-cbpkg"}});
+ CHECK(Result.status_code == 200);
+ }
+ }
+
+ SUBCASE("store - 'local/remote' stores local and upstream (cbpackage)")
+ {
+ ZenConfig UpstreamCfg = ZenConfig::New(13338);
+ ZenServerInstance UpstreamInst(TestEnv);
+ ZenConfig LocalCfg = ZenConfig::NewWithUpstream(13338);
+ ZenServerInstance LocalInst(TestEnv);
+ const auto Bucket = "legacy"sv;
+
+ UpstreamCfg.Spawn(UpstreamInst);
+ LocalCfg.Spawn(LocalInst);
+
+ zen::IoHash Key;
+ zen::CbPackage Package = GeneratePackage(Key);
+ auto Buf = ToBuffer(Package);
+
+ // Store package locally and upstream
+ {
+ cpr::Response Result = cpr::Put(cpr::Url{"{}/{}/{}?store=local,remote"_format(LocalCfg.BaseUri, Bucket, Key)},
+ cpr::Body{(const char*)Buf.GetData(), Buf.GetSize()},
+ cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}});
+ CHECK(Result.status_code == 201);
+ }
+
+ {
+ cpr::Response Result =
+ cpr::Get(cpr::Url{"{}/{}/{}"_format(UpstreamCfg.BaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/x-ue-cbpkg"}});
+ CHECK(Result.status_code == 200);
+ }
+
+ {
+ cpr::Response Result =
+ cpr::Get(cpr::Url{"{}/{}/{}"_format(LocalCfg.BaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/x-ue-cbpkg"}});
+ CHECK(Result.status_code == 200);
+ }
+ }
+}
+
struct RemoteExecutionRequest
{
RemoteExecutionRequest(std::string_view Host, int Port, std::filesystem::path& TreePath)
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp
index da2e8850e..c66b1f98d 100644
--- a/zenserver/cache/structuredcache.cpp
+++ b/zenserver/cache/structuredcache.cpp
@@ -25,12 +25,125 @@
#include <queue>
#include <thread>
+#include <gsl/gsl-lite.hpp>
+
namespace zen {
using namespace std::literals;
//////////////////////////////////////////////////////////////////////////
+namespace detail { namespace cacheopt {
+ constexpr std::string_view Local = "local"sv;
+ constexpr std::string_view Remote = "remote"sv;
+ constexpr std::string_view Data = "data"sv;
+ constexpr std::string_view Meta = "meta"sv;
+ constexpr std::string_view Value = "value"sv;
+ constexpr std::string_view Attachments = "attachments"sv;
+}} // namespace detail::cacheopt
+
+//////////////////////////////////////////////////////////////////////////
+
+enum class CachePolicy : uint8_t
+{
+ None = 0,
+ QueryLocal = 1 << 0,
+ QueryRemote = 1 << 1,
+ Query = QueryLocal | QueryRemote,
+ StoreLocal = 1 << 2,
+ StoreRemote = 1 << 3,
+ Store = StoreLocal | StoreRemote,
+ SkipMeta = 1 << 4,
+ SkipValue = 1 << 5,
+ SkipAttachments = 1 << 6,
+ SkipData = SkipMeta | SkipValue | SkipAttachments,
+ SkipLocalCopy = 1 << 7,
+ Local = QueryLocal | StoreLocal,
+ Remote = QueryRemote | StoreRemote,
+ Default = Query | Store,
+ Disable = None,
+};
+
+gsl_DEFINE_ENUM_BITMASK_OPERATORS(CachePolicy);
+
+CachePolicy
+ParseCachePolicy(const zen::HttpServerRequest::QueryParams& QueryParams)
+{
+ CachePolicy QueryPolicy = CachePolicy::Query;
+
+ {
+ std::string_view Opts = QueryParams.GetValue("query"sv);
+ if (!Opts.empty())
+ {
+ QueryPolicy = CachePolicy::None;
+ zen::ForEachStrTok(Opts, ',', [&QueryPolicy](const std::string_view& Opt) {
+ if (Opt == detail::cacheopt::Local)
+ {
+ QueryPolicy |= CachePolicy::QueryLocal;
+ }
+ if (Opt == detail::cacheopt::Remote)
+ {
+ QueryPolicy |= CachePolicy::QueryRemote;
+ }
+ return true;
+ });
+ }
+ }
+
+ CachePolicy StorePolicy = CachePolicy::Store;
+
+ {
+ std::string_view Opts = QueryParams.GetValue("store"sv);
+ if (!Opts.empty())
+ {
+ StorePolicy = CachePolicy::None;
+ zen::ForEachStrTok(Opts, ',', [&StorePolicy](const std::string_view& Opt) {
+ if (Opt == detail::cacheopt::Local)
+ {
+ StorePolicy |= CachePolicy::StoreLocal;
+ }
+ if (Opt == detail::cacheopt::Remote)
+ {
+ StorePolicy |= CachePolicy::StoreRemote;
+ }
+ return true;
+ });
+ }
+ }
+
+ CachePolicy SkipPolicy = CachePolicy::None;
+
+ {
+ std::string_view Opts = QueryParams.GetValue("skip"sv);
+ if (!Opts.empty())
+ {
+ zen::ForEachStrTok(Opts, ',', [&SkipPolicy](const std::string_view& Opt) {
+ if (Opt == detail::cacheopt::Meta)
+ {
+ SkipPolicy |= CachePolicy::SkipMeta;
+ }
+ if (Opt == detail::cacheopt::Value)
+ {
+ SkipPolicy |= CachePolicy::SkipValue;
+ }
+ if (Opt == detail::cacheopt::Attachments)
+ {
+ SkipPolicy |= CachePolicy::SkipAttachments;
+ }
+ if (Opt == detail::cacheopt::Data)
+ {
+ SkipPolicy |= CachePolicy::SkipData;
+ }
+ return true;
+ });
+ }
+ }
+
+ return QueryPolicy | StorePolicy | SkipPolicy;
+}
+
+//////////////////////////////////////////////////////////////////////////
+
HttpStructuredCacheService::HttpStructuredCacheService(::ZenCacheStore& InCacheStore,
zen::CasStore& InStore,
zen::CidStore& InCidStore,
@@ -78,13 +191,16 @@ HttpStructuredCacheService::HandleRequest(zen::HttpServerRequest& Request)
return Request.WriteResponse(zen::HttpResponseCode::BadRequest); // invalid URL
}
+ const auto QueryParams = Request.GetQueryParams();
+ CachePolicy Policy = ParseCachePolicy(QueryParams);
+
if (Ref.PayloadId == IoHash::Zero)
{
- return HandleCacheRecordRequest(Request, Ref);
+ return HandleCacheRecordRequest(Request, Ref, Policy);
}
else
{
- return HandleCachePayloadRequest(Request, Ref);
+ return HandleCachePayloadRequest(Request, Ref, Policy);
}
return;
@@ -121,7 +237,7 @@ HttpStructuredCacheService::HandleCacheBucketRequest(zen::HttpServerRequest& Req
}
void
-HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Request, CacheRef& Ref)
+HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Request, CacheRef& Ref, CachePolicy Policy)
{
switch (auto Verb = Request.RequestVerb())
{
@@ -136,7 +252,10 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req
bool Success = m_CacheStore.Get(Ref.BucketSegment, Ref.HashKey, /* out */ Value);
bool InUpstreamCache = false;
- if (!Success && m_UpstreamCache)
+ const bool QueryUpstream =
+ !Success && m_UpstreamCache && (zen::CachePolicy::QueryRemote == (Policy & zen::CachePolicy::QueryRemote));
+
+ if (QueryUpstream)
{
const ZenContentType CacheRecordType = Ref.BucketSegment == "legacy"sv ? ZenContentType::kBinary
: AcceptType == ZenContentType::kCbPackage ? ZenContentType::kCbPackage
@@ -326,13 +445,15 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req
const HttpContentType ContentType = Request.RequestContentType();
+ const bool StoreUpstream = m_UpstreamCache && (zen::CachePolicy::StoreRemote == (Policy & zen::CachePolicy::StoreRemote));
+
if (ContentType == HttpContentType::kBinary || ContentType == HttpContentType::kUnknownContentType)
{
// TODO: create a cache record and put value in CAS?
m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, {.Value = Body});
ZEN_DEBUG("PUT - binary '{}/{}' {}", Ref.BucketSegment, Ref.HashKey, NiceBytes(Body.Size()));
- if (m_UpstreamCache)
+ if (StoreUpstream)
{
auto Result = m_UpstreamCache->EnqueueUpstream(
{.Type = ZenContentType::kBinary, .CacheKey = {Ref.BucketSegment, Ref.HashKey}});
@@ -397,15 +518,12 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req
MissingRefs.size(),
References.size());
- if (MissingRefs.empty())
+ if (MissingRefs.empty() && StoreUpstream)
{
- // Only enqueue valid cache records, i.e. all referenced payloads exists
- if (m_UpstreamCache)
- {
- auto Result = m_UpstreamCache->EnqueueUpstream({.Type = ZenContentType::kCbObject,
- .CacheKey = {Ref.BucketSegment, Ref.HashKey},
- .PayloadIds = std::move(References)});
- }
+ ZEN_ASSERT(m_UpstreamCache);
+ auto Result = m_UpstreamCache->EnqueueUpstream({.Type = ZenContentType::kCbObject,
+ .CacheKey = {Ref.BucketSegment, Ref.HashKey},
+ .PayloadIds = std::move(References)});
return Request.WriteResponse(zen::HttpResponseCode::Created);
}
@@ -502,8 +620,9 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req
ZenCacheValue CacheValue{.Value = CacheRecordChunk};
m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, CacheValue);
- if (m_UpstreamCache)
+ if (StoreUpstream)
{
+ ZEN_ASSERT(m_UpstreamCache);
auto Result = m_UpstreamCache->EnqueueUpstream({.Type = ZenContentType::kCbPackage,
.CacheKey = {Ref.BucketSegment, Ref.HashKey},
.PayloadIds = std::move(PayloadIds)});
@@ -536,7 +655,7 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req
}
void
-HttpStructuredCacheService::HandleCachePayloadRequest(zen::HttpServerRequest& Request, CacheRef& Ref)
+HttpStructuredCacheService::HandleCachePayloadRequest(zen::HttpServerRequest& Request, CacheRef& Ref, CachePolicy Policy)
{
// Note: the URL references the uncompressed payload hash - so this maintains the mapping
// from uncompressed CAS identity (aka CID/Content ID) to the stored payload hash
@@ -544,6 +663,8 @@ HttpStructuredCacheService::HandleCachePayloadRequest(zen::HttpServerRequest& Re
// this is a PITA but a consequence of the fact that the client side code is not able to
// address data by compressed hash
+ ZEN_UNUSED(Policy);
+
switch (auto Verb = Request.RequestVerb())
{
using enum zen::HttpVerb;
diff --git a/zenserver/cache/structuredcache.h b/zenserver/cache/structuredcache.h
index 8289fd700..796b21d1f 100644
--- a/zenserver/cache/structuredcache.h
+++ b/zenserver/cache/structuredcache.h
@@ -17,6 +17,7 @@ namespace zen {
class CasStore;
class CidStore;
class UpstreamCache;
+enum class CachePolicy : uint8_t;
/**
* Structured cache service. Imposes constraints on keys, supports blobs and
@@ -70,8 +71,8 @@ private:
};
[[nodiscard]] bool ValidateKeyUri(zen::HttpServerRequest& Request, CacheRef& OutRef);
- void HandleCacheRecordRequest(zen::HttpServerRequest& Request, CacheRef& Ref);
- void HandleCachePayloadRequest(zen::HttpServerRequest& Request, CacheRef& Ref);
+ void HandleCacheRecordRequest(zen::HttpServerRequest& Request, CacheRef& Ref, CachePolicy Policy);
+ void HandleCachePayloadRequest(zen::HttpServerRequest& Request, CacheRef& Ref, CachePolicy Policy);
void HandleCacheBucketRequest(zen::HttpServerRequest& Request, std::string_view Bucket);
spdlog::logger& Log() { return m_Log; }
diff --git a/zenserver/config.cpp b/zenserver/config.cpp
index 092fc6998..164d2a792 100644
--- a/zenserver/config.cpp
+++ b/zenserver/config.cpp
@@ -55,6 +55,27 @@ PickDefaultStateDirectory()
#endif
+UpstreamCachePolicy
+ParseUpstreamCachePolicy(std::string_view Options)
+{
+ if (Options == "readonly")
+ {
+ return UpstreamCachePolicy::Read;
+ }
+ else if (Options == "writeonly")
+ {
+ return UpstreamCachePolicy::Write;
+ }
+ else if (Options == "disabled")
+ {
+ return UpstreamCachePolicy::Disabled;
+ }
+ else
+ {
+ return UpstreamCachePolicy::ReadWrite;
+ }
+}
+
void
ParseGlobalCliOptions(int argc, char* argv[], ZenServerOptions& GlobalOptions, ZenServiceConfig& ServiceConfig)
{
@@ -113,6 +134,14 @@ ParseGlobalCliOptions(int argc, char* argv[], ZenServerOptions& GlobalOptions, Z
cxxopts::value<bool>(ServiceConfig.ShouldCrash)->default_value("false"),
"");
+ std::string UpstreamCachePolicyOptions;
+ options.add_option("cache",
+ "",
+ "upstream-cache-policy",
+ "",
+ cxxopts::value<std::string>(UpstreamCachePolicyOptions)->default_value(""),
+ "Upstream cache policy (readwrite|readonly|writeonly|disabled)");
+
options.add_option("cache",
"",
"upstream-jupiter-url",
@@ -178,13 +207,6 @@ ParseGlobalCliOptions(int argc, char* argv[], ZenServerOptions& GlobalOptions, Z
options.add_option("cache",
"",
- "upstream-enabled",
- "Whether upstream caching is disabled",
- cxxopts::value<bool>(ServiceConfig.UpstreamCacheConfig.Enabled)->default_value("true"),
- "");
-
- options.add_option("cache",
- "",
"upstream-thread-count",
"Number of threads used for upstream procsssing",
cxxopts::value<int>(ServiceConfig.UpstreamCacheConfig.UpstreamThreadCount)->default_value("4"),
@@ -200,6 +222,8 @@ ParseGlobalCliOptions(int argc, char* argv[], ZenServerOptions& GlobalOptions, Z
exit(0);
}
+
+ ServiceConfig.UpstreamCacheConfig.CachePolicy = ParseUpstreamCachePolicy(UpstreamCachePolicyOptions);
}
catch (cxxopts::OptionParseException& e)
{
@@ -276,7 +300,8 @@ ParseServiceConfig(const std::filesystem::path& DataRoot, ZenServiceConfig& Serv
if (auto UpstreamConfig = StructuredCacheConfig->get<sol::optional<sol::table>>("upstream"))
{
- ServiceConfig.UpstreamCacheConfig.Enabled = UpstreamConfig->get_or("enable", ServiceConfig.UpstreamCacheConfig.Enabled);
+ std::string Policy = UpstreamConfig->get_or("policy", std::string());
+ ServiceConfig.UpstreamCacheConfig.CachePolicy = ParseUpstreamCachePolicy(Policy);
ServiceConfig.UpstreamCacheConfig.UpstreamThreadCount = UpstreamConfig->get_or("upstreamthreadcount", 4);
if (auto JupiterConfig = UpstreamConfig->get<sol::optional<sol::table>>("jupiter"))
diff --git a/zenserver/config.h b/zenserver/config.h
index c06102384..6ade1b401 100644
--- a/zenserver/config.h
+++ b/zenserver/config.h
@@ -36,12 +36,20 @@ struct ZenUpstreamZenConfig
std::string Url;
};
+enum class UpstreamCachePolicy : uint8_t
+{
+ Disabled = 0,
+ Read = 1 << 0,
+ Write = 1 << 1,
+ ReadWrite = Read | Write
+};
+
struct ZenUpstreamCacheConfig
{
ZenUpstreamJupiterConfig JupiterConfig;
ZenUpstreamZenConfig ZenConfig;
int UpstreamThreadCount = 4;
- bool Enabled = false;
+ UpstreamCachePolicy CachePolicy = UpstreamCachePolicy::ReadWrite;
};
struct ZenServiceConfig
diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp
index 38d30a795..c015ef3e9 100644
--- a/zenserver/upstream/upstreamcache.cpp
+++ b/zenserver/upstream/upstreamcache.cpp
@@ -559,12 +559,15 @@ public:
virtual GetUpstreamCacheResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) override
{
- for (auto& Endpoint : m_Endpoints)
+ if (m_Options.ReadUpstream)
{
- if (GetUpstreamCacheResult Result = Endpoint->GetCacheRecord(CacheKey, Type); Result.Success)
+ for (auto& Endpoint : m_Endpoints)
{
- m_Stats.Add(*Endpoint, Result);
- return Result;
+ if (GetUpstreamCacheResult Result = Endpoint->GetCacheRecord(CacheKey, Type); Result.Success)
+ {
+ m_Stats.Add(*Endpoint, Result);
+ return Result;
+ }
}
}
@@ -573,12 +576,15 @@ public:
virtual GetUpstreamCacheResult GetCachePayload(UpstreamPayloadKey PayloadKey) override
{
- for (auto& Endpoint : m_Endpoints)
+ if (m_Options.ReadUpstream)
{
- if (GetUpstreamCacheResult Result = Endpoint->GetCachePayload(PayloadKey); Result.Success)
+ for (auto& Endpoint : m_Endpoints)
{
- m_Stats.Add(*Endpoint, Result);
- return Result;
+ if (GetUpstreamCacheResult Result = Endpoint->GetCachePayload(PayloadKey); Result.Success)
+ {
+ m_Stats.Add(*Endpoint, Result);
+ return Result;
+ }
}
}
@@ -587,7 +593,7 @@ public:
virtual EnqueueResult EnqueueUpstream(UpstreamCacheRecord CacheRecord) override
{
- if (m_IsRunning.load())
+ if (m_IsRunning.load() && m_Options.WriteUpstream)
{
if (!m_UpstreamThreads.empty())
{
diff --git a/zenserver/upstream/upstreamcache.h b/zenserver/upstream/upstreamcache.h
index 327778452..3b054d815 100644
--- a/zenserver/upstream/upstreamcache.h
+++ b/zenserver/upstream/upstreamcache.h
@@ -36,7 +36,9 @@ struct UpstreamCacheRecord
struct UpstreamCacheOptions
{
- uint32_t ThreadCount = 4;
+ uint32_t ThreadCount = 4;
+ bool ReadUpstream = true;
+ bool WriteUpstream = true;
};
struct GetUpstreamCacheResult
diff --git a/zenserver/zenserver.cpp b/zenserver/zenserver.cpp
index f36cfba48..83580b288 100644
--- a/zenserver/zenserver.cpp
+++ b/zenserver/zenserver.cpp
@@ -166,11 +166,15 @@ public:
m_CacheStore = std::make_unique<ZenCacheStore>(*m_CasStore, m_DataRoot / "cache");
std::unique_ptr<zen::UpstreamCache> UpstreamCache;
- if (ServiceConfig.UpstreamCacheConfig.Enabled)
+ if (ServiceConfig.UpstreamCacheConfig.CachePolicy != UpstreamCachePolicy::Disabled)
{
const ZenUpstreamCacheConfig& UpstreamConfig = ServiceConfig.UpstreamCacheConfig;
zen::UpstreamCacheOptions UpstreamOptions;
+ UpstreamOptions.ReadUpstream =
+ (uint8_t(ServiceConfig.UpstreamCacheConfig.CachePolicy) & uint8_t(UpstreamCachePolicy::Read)) != 0;
+ UpstreamOptions.WriteUpstream =
+ (uint8_t(ServiceConfig.UpstreamCacheConfig.CachePolicy) & uint8_t(UpstreamCachePolicy::Write)) != 0;
if (UpstreamConfig.UpstreamThreadCount < 32)
{
@@ -216,7 +220,11 @@ public:
if (UpstreamCache->Initialize())
{
- ZEN_INFO("upstream cache active");
+ ZEN_INFO("upstream cache active ({})",
+ UpstreamOptions.ReadUpstream && UpstreamOptions.WriteUpstream ? "READ|WRITE"
+ : UpstreamOptions.ReadUpstream ? "READONLY"
+ : UpstreamOptions.WriteUpstream ? "WRITEONLY"
+ : "DISABLED");
}
else
{