aboutsummaryrefslogtreecommitdiff
path: root/zenserver
diff options
context:
space:
mode:
authorPer Larsson <[email protected]>2021-08-31 15:01:46 +0200
committerPer Larsson <[email protected]>2021-08-31 15:16:22 +0200
commitfd3946f2b2b013af01fdf60f67afb655c38c1901 (patch)
treeeca4abed5d71a157e185699f4e9668a92b756ca8 /zenserver
parentRemoved unused packages from vcpkg.json (diff)
downloadzen-fd3946f2b2b013af01fdf60f67afb655c38c1901.tar.xz
zen-fd3946f2b2b013af01fdf60f67afb655c38c1901.zip
Asynchronous upstream caching to Jupiter
Co-authored-by: Stefan Boberg <[email protected]>
Diffstat (limited to 'zenserver')
-rw-r--r--zenserver/cache/kvcache.cpp12
-rw-r--r--zenserver/cache/structuredcache.cpp343
-rw-r--r--zenserver/cache/structuredcache.h27
-rw-r--r--zenserver/cache/structuredcachestore.cpp1
-rw-r--r--zenserver/config.cpp16
-rw-r--r--zenserver/config.h1
-rw-r--r--zenserver/diag/logging.cpp64
-rw-r--r--zenserver/diag/logging.h10
-rw-r--r--zenserver/upstream/jupiter.cpp235
-rw-r--r--zenserver/upstream/jupiter.h38
-rw-r--r--zenserver/upstream/upstreamcache.cpp347
-rw-r--r--zenserver/upstream/upstreamcache.h82
-rw-r--r--zenserver/zenserver.cpp28
-rw-r--r--zenserver/zenserver.vcxproj2
-rw-r--r--zenserver/zenserver.vcxproj.filters6
15 files changed, 926 insertions, 286 deletions
diff --git a/zenserver/cache/kvcache.cpp b/zenserver/cache/kvcache.cpp
index 096edcef7..fbbb932ea 100644
--- a/zenserver/cache/kvcache.cpp
+++ b/zenserver/cache/kvcache.cpp
@@ -69,6 +69,7 @@ HttpKvCacheService::HttpKvCacheService()
{
m_Cloud = new CloudCacheClient("https://jupiter.devtools.epicgames.com"sv,
"ue4.ddc"sv /* namespace */,
+ "test.ddc"sv /* blob store namespace */,
"https://epicgames.okta.com/oauth2/auso645ojjWVdRI3d0x7/v1/token"sv /* provider */,
"0oao91lrhqPiAlaGD0x7"sv /* client id */,
"-GBWjjenhCgOwhxL5yBKNJECVIoDPH0MK4RDuN7d"sv /* oauth secret */);
@@ -115,16 +116,16 @@ HttpKvCacheService::HandleRequest(zen::HttpServerRequest& Request)
zen::Stopwatch Timer;
- if (IoBuffer CloudValue = Session.Get("default", Key))
+ if (CloudCacheResult Result = Session.GetDerivedData("default", Key); Result.Success)
{
Success = true;
spdlog::debug("upstream HIT after {:5} {:6}! {}",
zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()),
- NiceBytes(CloudValue.Size()),
+ NiceBytes(Result.Value.Size()),
Key);
- Value.Value = CloudValue;
+ Value.Value = Result.Value;
}
else
{
@@ -175,9 +176,10 @@ HttpKvCacheService::HandleRequest(zen::HttpServerRequest& Request)
zen::Stopwatch Timer;
- Session.Put("default", Key, Value.Value);
+ CloudCacheResult Result = Session.PutDerivedData("default", Key, Value.Value);
- spdlog::debug("upstream PUT took {:5} {:6}! {}",
+ spdlog::debug("upstream PUT '{}', took {:5} {:6}! {}",
+ Result.Success ? "OK" : "FAILED",
zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()),
NiceBytes(Value.Value.Size()),
Key);
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp
index def1adb90..1e3eb5dcd 100644
--- a/zenserver/cache/structuredcache.cpp
+++ b/zenserver/cache/structuredcache.cpp
@@ -12,12 +12,16 @@
#include "structuredcache.h"
#include "structuredcachestore.h"
#include "upstream/jupiter.h"
+#include "upstream/upstreamcache.h"
#include "upstream/zen.h"
#include "zenstore/cidstore.h"
#include <spdlog/spdlog.h>
#include <algorithm>
+#include <atomic>
#include <filesystem>
+#include <queue>
+#include <thread>
namespace zen {
@@ -44,31 +48,19 @@ MapToHttpContentType(zen::ZenContentType Type)
}
};
-HttpStructuredCacheService::HttpStructuredCacheService(std::filesystem::path RootPath, zen::CasStore& InStore, zen::CidStore& InCidStore)
+//////////////////////////////////////////////////////////////////////////
+
+HttpStructuredCacheService::HttpStructuredCacheService(::ZenCacheStore& InCacheStore,
+ zen::CasStore& InStore,
+ zen::CidStore& InCidStore,
+ std::unique_ptr<UpstreamCache> UpstreamCache)
: m_Log("cache", begin(spdlog::default_logger()->sinks()), end(spdlog::default_logger()->sinks()))
+, m_CacheStore(InCacheStore)
, m_CasStore(InStore)
-, m_CacheStore(InStore, RootPath)
, m_CidStore(InCidStore)
+, m_UpstreamCache(std::move(UpstreamCache))
{
m_Log.set_level(spdlog::level::debug);
-
- m_Log.info("initializing structured cache at '{}'", RootPath);
-
-#if 0
- m_Cloud = new CloudCacheClient("https://jupiter.devtools.epicgames.com"sv,
- "ue4.ddc"sv /* namespace */,
- "https://epicgames.okta.com/oauth2/auso645ojjWVdRI3d0x7/v1/token"sv /* provider */,
- "0oao91lrhqPiAlaGD0x7"sv /* client id */,
- "-GBWjjenhCgOwhxL5yBKNJECVIoDPH0MK4RDuN7d"sv /* oauth secret */);
-#endif
-
-#if 0
- std::string_view UpstreamSpec = "http://arn-wd-15192.epicgames.net"sv;
-
- m_Log.info("Using upstream Zen cache at '{}'", UpstreamSpec);
-
- m_ZenClient = new ZenStructuredCacheClient(UpstreamSpec);
-#endif
}
HttpStructuredCacheService::~HttpStructuredCacheService()
@@ -161,66 +153,55 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req
ZenCacheValue Value;
bool Success = m_CacheStore.Get(Ref.BucketSegment, Ref.HashKey, /* out */ Value);
- if (!Success && m_ZenClient)
+ if (!Success && m_UpstreamCache)
{
- ZenStructuredCacheSession Session(*m_ZenClient);
-
- zen::Stopwatch Timer;
+ const ZenContentType CacheRecordType =
+ Ref.BucketSegment == "legacy"sv ? ZenContentType::kBinary : ZenContentType::kCbObject;
- try
+ if (auto UpstreamResult = m_UpstreamCache->GetCacheRecord({Ref.BucketSegment, Ref.HashKey}, CacheRecordType);
+ UpstreamResult.Success)
{
- Value.Value = Session.Get(Ref.BucketSegment, Ref.HashKey);
+ Value.Value = UpstreamResult.Value;
+ Success = true;
- m_Log.debug("Zen upstream GET ({}/{}) succeeded after {:5}!",
- Ref.BucketSegment,
- Ref.HashKey,
- zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()));
-
- // TODO: this is incomplete and needs to propagate any referenced content
-
- Success = true;
- }
- catch (std::exception& e)
- {
- m_Log.warn("Zen upstream GET ({}/{}) FAILED after {:5}: '{}'",
- Ref.BucketSegment,
- Ref.HashKey,
- zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()),
- e.what());
-
- throw;
- }
- }
-
- if (m_Cloud)
- {
- // Note that this is not fully functional, pending implementation work on
- // the Jupiter end
-
- CloudCacheSession Session(m_Cloud);
-
- zen::Stopwatch Timer;
-
- try
- {
- Value.Value = Session.Get(Ref.BucketSegment, Ref.HashKey);
-
- m_Log.debug("Cloud upstream GET ({}/{}) succeeded after {:5}!",
- Ref.BucketSegment,
- Ref.HashKey,
- zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()));
+ if (CacheRecordType == ZenContentType::kCbObject)
+ {
+ const zen::CbValidateError ValidationResult =
+ zen::ValidateCompactBinary(MemoryView(UpstreamResult.Value.Data(), UpstreamResult.Value.Size()),
+ zen::CbValidateMode::All);
- // TODO: this is incomplete and needs to propagate any referenced content
- }
- catch (std::exception& e)
- {
- m_Log.warn("Cloud upstream GET ({}/{}) FAILED after {:5}: '{}'",
- Ref.BucketSegment,
- Ref.HashKey,
- zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()),
- e.what());
+ if (ValidationResult == CbValidateError::None)
+ {
+ zen::CbObjectView Cbo(UpstreamResult.Value.Data());
+
+ std::vector<IoHash> References;
+ Cbo.IterateAttachments([&](CbFieldView AttachmentView) { References.push_back(AttachmentView.AsHash()); });
+
+ if (!References.empty())
+ {
+ zen::CbObjectWriter Idx;
+ Idx.BeginArray("references");
+ for (const IoHash& Hash : References)
+ {
+ Idx.AddHash(Hash);
+ }
+ Idx.EndArray();
+
+ Value.IndexData = Idx.Save();
+ }
+ }
+ else
+ {
+ Value.Value = IoBuffer();
+ Success = false;
+ m_Log.warn("Upstream cache record '{}/{}' failed validation", Ref.BucketSegment, Ref.HashKey);
+ }
+ }
- throw;
+ if (Success)
+ {
+ m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, Value);
+ }
}
}
@@ -248,140 +229,130 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req
case kPut:
{
- if (zen::IoBuffer Body = Request.ReadPayload())
- {
- if (Body.Size() == 0)
- {
- return Request.WriteResponse(zen::HttpResponse::BadRequest);
- }
+ zen::IoBuffer Body = Request.ReadPayload();
- ZenCacheValue Value;
- Value.Value = Body;
+ if (!Body || Body.Size() == 0)
+ {
+ return Request.WriteResponse(zen::HttpResponse::BadRequest);
+ }
- HttpContentType ContentType = Request.RequestContentType();
+ const HttpContentType ContentType = Request.RequestContentType();
- bool IsCompactBinary;
+ bool IsCompactBinary = false;
- switch (ContentType)
- {
- case HttpContentType::kUnknownContentType:
- case HttpContentType::kBinary:
- IsCompactBinary = false;
- break;
+ switch (ContentType)
+ {
+ case HttpContentType::kUnknownContentType:
+ case HttpContentType::kBinary:
+ IsCompactBinary = false;
+ break;
- case HttpContentType::kCbObject:
- IsCompactBinary = true;
- break;
+ case HttpContentType::kCbObject:
+ IsCompactBinary = true;
+ break;
- default:
- return Request.WriteResponse(zen::HttpResponse::BadRequest);
- }
+ default:
+ return Request.WriteResponse(zen::HttpResponse::BadRequest);
+ }
- // Compute index data
+ if (!IsCompactBinary)
+ {
+ // TODO: create a cache record and put value in CAS?
+ m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, {.Value = Body});
+ m_Log.debug("PUT (binary) - '{}/{}' ({} bytes, {})",
+ Ref.BucketSegment,
+ Ref.HashKey,
+ Body.Size(),
+ Body.GetContentType());
- if (IsCompactBinary)
+ if (m_UpstreamCache)
{
- // Validate payload before accessing it
- zen::CbValidateError ValidationResult =
- zen::ValidateCompactBinary(MemoryView(Body.Data(), Body.Size()), zen::CbValidateMode::All);
-
- if (ValidationResult != CbValidateError::None)
- {
- m_Log.warn("Payload for key '{}/{}' ({} bytes) failed validation", Ref.BucketSegment, Ref.HashKey, Body.Size());
-
- // TODO: add details in response
- return Request.WriteResponse(HttpResponse::BadRequest);
- }
-
- // Extract data for index
- zen::CbObjectView Cbo(Body.Data());
+ auto Result = m_UpstreamCache->EnqueueUpstream(
+ {.Type = ZenContentType::kBinary, .CacheKey = {Ref.BucketSegment, Ref.HashKey}});
+ ZEN_ASSERT(Result.Success);
+ }
- std::vector<IoHash> References;
- Cbo.IterateAttachments([&](CbFieldView AttachmentView) { References.push_back(AttachmentView.AsHash()); });
+ return Request.WriteResponse(zen::HttpResponse::Created);
+ }
- if (!References.empty())
- {
- zen::CbObjectWriter Idx;
- Idx.BeginArray("r");
+ // Validate payload before accessing it
+ const zen::CbValidateError ValidationResult =
+ zen::ValidateCompactBinary(MemoryView(Body.Data(), Body.Size()), zen::CbValidateMode::All);
- for (const IoHash& Hash : References)
- {
- Idx.AddHash(Hash);
- }
+ if (ValidationResult != CbValidateError::None)
+ {
+ m_Log.warn("Payload for key '{}/{}' ({} bytes) failed validation", Ref.BucketSegment, Ref.HashKey, Body.Size());
- Idx.EndArray();
+ // TODO: add details in response, kText || kCbObject?
+ return Request.WriteResponse(HttpResponse::BadRequest, HttpContentType::kText, "Compact binary validation failed"sv);
+ }
- // TODO: store references in index
- }
- }
+ // Extract referenced payload hashes
+ zen::CbObjectView Cbo(Body.Data());
- m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, Value);
+ std::vector<IoHash> References;
+ std::vector<IoHash> MissingRefs;
+ Cbo.IterateAttachments([&](CbFieldView AttachmentView) { References.push_back(AttachmentView.AsHash()); });
- m_Log.debug("PUT - '{}/{}' ({} bytes, {})",
- Ref.BucketSegment,
- Ref.HashKey,
- Value.Value.Size(),
- Value.Value.GetContentType());
+ ZenCacheValue CacheValue;
+ CacheValue.Value = Body;
- // absolutely be made asynchronous. By default these should be deferred
- // because the client should not care if the data has propagated upstream or
- // not
+ if (!References.empty())
+ {
+ zen::CbObjectWriter Idx;
+ Idx.BeginArray("references");
- if (m_ZenClient)
+ for (const IoHash& Hash : References)
{
- ZenStructuredCacheSession Session(*m_ZenClient);
-
- zen::Stopwatch Timer;
-
- try
- {
- Session.Put(Ref.BucketSegment, Ref.HashKey, Value);
-
- m_Log.debug("Zen upstream PUT ({}) succeeded after {:5}!",
- Ref.HashKey,
- zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()));
- }
- catch (std::exception& e)
+ Idx.AddHash(Hash);
+ if (!m_CidStore.ContainsChunk(Hash))
{
- m_Log.warn("Zen upstream PUT ({}) FAILED after {:5}: '{}'",
- Ref.HashKey,
- zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()),
- e.what());
-
- throw;
+ MissingRefs.push_back(Hash);
}
}
- if (m_Cloud)
- {
- CloudCacheSession Session(m_Cloud);
+ Idx.EndArray();
- zen::Stopwatch Timer;
+ CacheValue.IndexData = Idx.Save();
+ }
- try
- {
- Session.Put(Ref.BucketSegment, Ref.HashKey, Value);
+ m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, CacheValue);
- m_Log.debug("upstream PUT ({}) succeeded after {:5}!",
- Ref.HashKey,
- zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()));
- }
- catch (std::exception& e)
- {
- m_Log.warn("upstream PUT ({}) FAILED after {:5}: '{}'",
- Ref.HashKey,
- zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()),
- e.what());
+ m_Log.debug("PUT (cache record) - '{}/{}' ({} bytes, {}, ({}/{} refs/missing))",
+ Ref.BucketSegment,
+ Ref.HashKey,
+ CacheValue.Value.Size(),
+ CacheValue.Value.GetContentType(),
+ References.size(),
+ MissingRefs.size());
- throw;
- }
+ if (MissingRefs.empty())
+ {
+ // Only enqueue valid cache records, i.e. all referenced payloads exists
+ if (m_UpstreamCache)
+ {
+ auto Result = m_UpstreamCache->EnqueueUpstream({.Type = ZenContentType::kCbObject,
+ .CacheKey = {Ref.BucketSegment, Ref.HashKey},
+ .PayloadIds = std::move(References)});
+ ZEN_ASSERT(Result.Success);
}
return Request.WriteResponse(zen::HttpResponse::Created);
}
else
{
- return;
+ // TODO: Binary attachments?
+ zen::CbObjectWriter Response;
+ Response.BeginArray("needs");
+ for (const IoHash& MissingRef : MissingRefs)
+ {
+ Response.AddHash(MissingRef);
+ m_Log.debug("cache record '{}/{}' is missing reference '{}'", Ref.BucketSegment, Ref.HashKey, MissingRef);
+ }
+ Response.EndArray();
+
+ // Return Created | BadRequest?
+ return Request.WriteResponse(zen::HttpResponse::Created, Response.Save());
}
}
break;
@@ -412,6 +383,26 @@ HttpStructuredCacheService::HandleCachePayloadRequest(zen::HttpServerRequest& Re
{
zen::IoBuffer Payload = m_CidStore.FindChunkByCid(Ref.PayloadId);
+ if (!Payload && m_UpstreamCache)
+ {
+ if (auto UpstreamResult = m_UpstreamCache->GetCachePayload({{Ref.BucketSegment, Ref.HashKey}, Ref.PayloadId});
+ UpstreamResult.Success)
+ {
+ if (zen::CompressedBuffer Compressed = zen::CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Payload)))
+ {
+ Payload = UpstreamResult.Payload;
+ zen::IoHash ChunkHash = zen::IoHash::HashMemory(Payload);
+ zen::CasStore::InsertResult Result = m_CasStore.InsertChunk(Payload, ChunkHash);
+
+ m_CidStore.AddCompressedCid(Ref.PayloadId, ChunkHash);
+ }
+ else
+ {
+ m_Log.warn("got uncompressed upstream cache payload");
+ }
+ }
+ }
+
if (!Payload)
{
m_Log.debug("MISS - '{}/{}/{}' ({} bytes, {})",
@@ -512,7 +503,7 @@ HttpStructuredCacheService::ValidateKeyUri(zen::HttpServerRequest& Request, Cach
return false;
}
- OutRef.BucketSegment = Key.substr(0, BucketSplitOffset);
+ OutRef.BucketSegment = ToLower(Key.substr(0, BucketSplitOffset));
if (!std::all_of(begin(OutRef.BucketSegment), end(OutRef.BucketSegment), [](const char c) { return std::isalnum(c); }))
{
diff --git a/zenserver/cache/structuredcache.h b/zenserver/cache/structuredcache.h
index 73b0825dc..b90301d84 100644
--- a/zenserver/cache/structuredcache.h
+++ b/zenserver/cache/structuredcache.h
@@ -3,18 +3,17 @@
#pragma once
#include <zencore/httpserver.h>
-#include <zencore/refcount.h>
-
-#include "structuredcachestore.h"
-#include "upstream/jupiter.h"
#include <spdlog/spdlog.h>
+#include <memory>
+
+class ZenCacheStore;
namespace zen {
-class CloudCacheClient;
+class CasStore;
class CidStore;
-class ZenStructuredCacheClient;
+class UpstreamCache;
/**
* Structured cache service. Imposes constraints on keys, supports blobs and
@@ -47,7 +46,10 @@ class ZenStructuredCacheClient;
class HttpStructuredCacheService : public zen::HttpService
{
public:
- HttpStructuredCacheService(std::filesystem::path RootPath, zen::CasStore& InStore, zen::CidStore& InCidStore);
+ HttpStructuredCacheService(ZenCacheStore& InCacheStore,
+ zen::CasStore& InCasStore,
+ zen::CidStore& InCidStore,
+ std::unique_ptr<UpstreamCache> UpstreamCache);
~HttpStructuredCacheService();
virtual const char* BaseUri() const override;
@@ -69,12 +71,11 @@ private:
void HandleCachePayloadRequest(zen::HttpServerRequest& Request, CacheRef& Ref);
void HandleCacheBucketRequest(zen::HttpServerRequest& Request, std::string_view Bucket);
- spdlog::logger m_Log;
- zen::CasStore& m_CasStore;
- zen::CidStore& m_CidStore;
- ZenCacheStore m_CacheStore;
- RefPtr<CloudCacheClient> m_Cloud;
- RefPtr<ZenStructuredCacheClient> m_ZenClient;
+ spdlog::logger m_Log;
+ ZenCacheStore& m_CacheStore;
+ zen::CasStore& m_CasStore;
+ zen::CidStore& m_CidStore;
+ std::unique_ptr<UpstreamCache> m_UpstreamCache;
};
} // namespace zen
diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp
index 79bcf4838..140ff1853 100644
--- a/zenserver/cache/structuredcachestore.cpp
+++ b/zenserver/cache/structuredcachestore.cpp
@@ -31,6 +31,7 @@ using namespace fmt::literals;
ZenCacheStore::ZenCacheStore(zen::CasStore& Cas, const std::filesystem::path& RootDir) : m_DiskLayer{Cas, RootDir}
{
+ spdlog::info("initializing structured cache at '{}'", RootDir);
zen::CreateDirectories(RootDir);
}
diff --git a/zenserver/config.cpp b/zenserver/config.cpp
index 6d725e55b..6e8b48703 100644
--- a/zenserver/config.cpp
+++ b/zenserver/config.cpp
@@ -95,6 +95,12 @@ ParseGlobalCliOptions(int argc, char* argv[], ZenServerOptions& GlobalOptions, Z
cxxopts::value<bool>(ServiceConfig.ShouldCrash)->default_value("false"),
"");
+ options.add_option("cache",
+ "",
+ "enable-upstream-cache",
+ "Whether upstream caching is enabled",
+ cxxopts::value<bool>(ServiceConfig.UpstreamCacheEnabled)->default_value("false"),
+ "");
try
{
auto result = options.parse(argc, argv);
@@ -164,9 +170,11 @@ ParseServiceConfig(const std::filesystem::path& DataRoot, ZenServiceConfig& Serv
throw std::exception("fatal zen global config script ({}) failure: {}"_format(ConfigScript, e.what()).c_str());
}
- ServiceConfig.LegacyCacheEnabled = lua["legacycache"]["enable"];
- const std::string path = lua["legacycache"]["readpath"];
- ServiceConfig.StructuredCacheEnabled = lua["structuredcache"]["enable"];
- ServiceConfig.MeshEnabled = lua["mesh"]["enable"];
+
+ ServiceConfig.LegacyCacheEnabled = lua["legacycache"]["enable"].get_or(ServiceConfig.LegacyCacheEnabled);
+ const std::string path = lua["legacycache"]["readpath"].get_or(std::string());
+ ServiceConfig.StructuredCacheEnabled = lua["structuredcache"]["enable"].get_or(ServiceConfig.StructuredCacheEnabled);
+ ServiceConfig.MeshEnabled = lua["mesh"]["enable"].get_or(ServiceConfig.MeshEnabled);
+ ServiceConfig.UpstreamCacheEnabled = lua["structuredcache"]["upstream"]["enable"].get_or(ServiceConfig.UpstreamCacheEnabled);
}
}
diff --git a/zenserver/config.h b/zenserver/config.h
index 53538314d..04498a520 100644
--- a/zenserver/config.h
+++ b/zenserver/config.h
@@ -20,6 +20,7 @@ struct ZenServiceConfig
{
bool LegacyCacheEnabled = false;
bool StructuredCacheEnabled = true;
+ bool UpstreamCacheEnabled = false;
bool ShouldCrash = false; // Option for testing crash handling
bool MeshEnabled = false; // Experimental p2p mesh discovery
std::string FlockId; // Id for grouping test instances into sets
diff --git a/zenserver/diag/logging.cpp b/zenserver/diag/logging.cpp
index 2583784c3..23933f12e 100644
--- a/zenserver/diag/logging.cpp
+++ b/zenserver/diag/logging.cpp
@@ -4,13 +4,14 @@
#include "config.h"
-#include <zencore/string.h>
#include <spdlog/pattern_formatter.h>
#include <spdlog/sinks/ansicolor_sink.h>
#include <spdlog/sinks/basic_file_sink.h>
#include <spdlog/sinks/stdout_color_sinks.h>
#include <spdlog/spdlog.h>
+#include <zencore/string.h>
#include <memory>
+#include "spdlog/sinks/basic_file_sink.h"
// Custom logging -- test code, this should be tweaked
@@ -188,20 +189,38 @@ InitializeLogging(const ZenServerOptions& GlobalOptions)
{
EnableVTMode();
- std::filesystem::path LogPath = GlobalOptions.DataDir / "logs/zenserver.txt";
+ std::filesystem::path LogPath = GlobalOptions.DataDir / "logs/zenserver.log";
+
+ spdlog::level::level_enum LogLevel = spdlog::level::debug;
- auto file_sink = std::make_shared<spdlog::sinks::basic_file_sink_mt>(zen::WideToUtf8(LogPath.c_str()), /* truncate */ true);
- file_sink->set_level(spdlog::level::trace);
+ // Sinks
+ auto ConsoleSink = std::make_shared<spdlog::sinks::ansicolor_stdout_sink_mt>();
+ auto FileSink = std::make_shared<spdlog::sinks::basic_file_sink_mt>(zen::WideToUtf8(LogPath.c_str()), /* truncate */ true);
- auto& sinks = spdlog::default_logger()->sinks();
- sinks.clear();
- sinks.push_back(std::make_shared<spdlog::sinks::ansicolor_stdout_sink_mt>());
- sinks.push_back(file_sink);
+ // Default
+ auto DefaultLogger = spdlog::default_logger();
+ auto& Sinks = spdlog::default_logger()->sinks();
+ Sinks.clear();
+ Sinks.push_back(ConsoleSink);
+ Sinks.push_back(FileSink);
+ DefaultLogger->set_level(LogLevel);
- spdlog::set_level(spdlog::level::debug);
+ // Jupiter - only log HTTP traffic to file
+ auto JupiterLogger = std::make_shared<spdlog::logger>("jupiter", FileSink);
+ spdlog::register_logger(JupiterLogger);
+ JupiterLogger->set_level(LogLevel);
+
+ spdlog::set_level(LogLevel);
spdlog::set_formatter(std::make_unique<logging::full_formatter>(GlobalOptions.LogId, std::chrono::system_clock::now()));
}
+void
+ShutdownLogging()
+{
+ spdlog::drop_all();
+ spdlog::shutdown();
+}
+
spdlog::logger&
ConsoleLog()
{
@@ -211,3 +230,30 @@ ConsoleLog()
return *ConLogger;
}
+
+namespace zen::logging {
+
+spdlog::logger&
+Default()
+{
+ return *spdlog::default_logger();
+}
+
+spdlog::logger&
+Get(std::string_view Name)
+{
+ std::shared_ptr<spdlog::logger> Logger = spdlog::get(std::string(Name));
+ if (!Logger)
+ {
+ Logger = std::make_shared<spdlog::logger>(std::string(Name),
+ begin(spdlog::default_logger()->sinks()),
+ end(spdlog::default_logger()->sinks()));
+
+ Logger->set_level(spdlog::default_logger()->level());
+ spdlog::register_logger(Logger);
+ }
+
+ return *Logger;
+}
+
+} // namespace zen::logging
diff --git a/zenserver/diag/logging.h b/zenserver/diag/logging.h
index 52ec5d1bd..bc93898ad 100644
--- a/zenserver/diag/logging.h
+++ b/zenserver/diag/logging.h
@@ -7,4 +7,14 @@ struct ZenServerOptions;
void InitializeLogging(const ZenServerOptions& GlobalOptions);
+void ShutdownLogging();
+
spdlog::logger& ConsoleLog();
+
+namespace zen::logging {
+
+spdlog::logger& Default();
+
+spdlog::logger& Get(std::string_view Name);
+
+} // namespace zen::logging
diff --git a/zenserver/upstream/jupiter.cpp b/zenserver/upstream/jupiter.cpp
index 977bcc712..09be2c776 100644
--- a/zenserver/upstream/jupiter.cpp
+++ b/zenserver/upstream/jupiter.cpp
@@ -3,6 +3,7 @@
#include "jupiter.h"
#include "cache/structuredcachestore.h"
+#include "diag/logging.h"
#include <fmt/format.h>
#include <zencore/compactbinary.h>
@@ -25,7 +26,6 @@
# pragma comment(lib, "Wldap32.lib")
#endif
-#include <spdlog/spdlog.h>
#include <json11.hpp>
using namespace std::literals;
@@ -51,9 +51,47 @@ namespace detail {
CloudCacheClient& OwnerClient;
cpr::Session Session;
};
+
+ void Log(spdlog::logger& Log, std::string_view Verb, const cpr::Response& Response)
+ {
+ std::string_view ContentType = "unknown"sv;
+ if (auto It = Response.header.find("Content-Type"); It != Response.header.end())
+ {
+ ContentType = It->second;
+ }
+
+ const double Bytes = Verb == "GET"sv ? Response.downloaded_bytes : Response.uploaded_bytes;
+
+ const bool IsBinary =
+ ContentType == "application/x-ue-cb"sv || ContentType == "application/x-ue-comp"sv || ContentType == "application/octet-stream";
+
+ if (IsBinary)
+ {
+ Log.debug("{} '{}', Status: '{}', Elapsed: '{}', Content-Type: '{}' '{}' Bytes, Reason: '{}'",
+ Verb,
+ Response.url.str(),
+ Response.status_code,
+ Response.elapsed,
+ ContentType,
+ Bytes,
+ Response.reason);
+ }
+ else
+ {
+ Log.debug("{} '{}', Status: '{}', Elapsed: '{}', Content-Type: '{}': '{}', Reason: '{}'",
+ Verb,
+ Response.url.str(),
+ Response.status_code,
+ Response.elapsed,
+ ContentType,
+ Response.text,
+ Response.reason);
+ }
+ }
+
} // namespace detail
-CloudCacheSession::CloudCacheSession(CloudCacheClient* OuterClient) : m_CacheClient(OuterClient)
+CloudCacheSession::CloudCacheSession(CloudCacheClient* OuterClient) : m_Log(OuterClient->Logger()), m_CacheClient(OuterClient)
{
m_SessionState = m_CacheClient->AllocSessionState();
}
@@ -63,94 +101,163 @@ CloudCacheSession::~CloudCacheSession()
m_CacheClient->FreeSessionState(m_SessionState);
}
-#define TESTING_PREFIX "aaaaa"
-
-IoBuffer
-CloudCacheSession::Get(std::string_view BucketId, std::string_view Key)
+CloudCacheResult
+CloudCacheSession::GetDerivedData(std::string_view BucketId, std::string_view Key)
{
+ std::string Auth;
+ m_CacheClient->AcquireAccessToken(Auth);
+
ExtendableStringBuilder<256> Uri;
- Uri << m_CacheClient->ServiceUrl();
- Uri << "/api/v1/c/ddc/" << m_CacheClient->Namespace() << "/" << BucketId << "/" TESTING_PREFIX << Key << ".raw";
+ Uri << m_CacheClient->ServiceUrl() << "/api/v1/c/ddc/" << m_CacheClient->DdcNamespace() << "/" << BucketId << "/" << Key << ".raw";
- auto& Session = m_SessionState->Session;
- Session.SetUrl(cpr::Url{Uri.c_str()});
+ cpr::Session& Session = m_SessionState->Session;
+
+ Session.SetOption(cpr::Url{Uri.c_str()});
+ Session.SetOption(cpr::Header{{"Authorization", Auth}});
cpr::Response Response = Session.Get();
- if (!Response.error)
+ detail::Log(m_Log, "GET"sv, Response);
+
+ if (Response.status_code == 200)
{
- return IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size());
+ return {IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()), true};
}
- return {};
+ return {.Success = false};
}
-void
-CloudCacheSession::Put(std::string_view BucketId, std::string_view Key, IoBuffer Data)
+CloudCacheResult
+CloudCacheSession::GetDerivedData(std::string_view BucketId, const IoHash& Key)
{
+ return GetDerivedData(BucketId, Key.ToHexString());
+}
+
+CloudCacheResult
+CloudCacheSession::GetRef(std::string_view BucketId, const IoHash& Key)
+{
+ std::string Auth;
+ m_CacheClient->AcquireAccessToken(Auth);
+
ExtendableStringBuilder<256> Uri;
- Uri << m_CacheClient->ServiceUrl();
- Uri << "/api/v1/c/ddc/" << m_CacheClient->Namespace() << "/" << BucketId << "/" TESTING_PREFIX << Key;
+ Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << m_CacheClient->BlobStoreNamespace() << "/" << BucketId << "/"
+ << Key.ToHexString();
- auto& Session = m_SessionState->Session;
+ cpr::Session& Session = m_SessionState->Session;
+
+ Session.SetOption(cpr::Url{Uri.c_str()});
+ Session.SetOption(cpr::Header{{"Authorization", Auth}, {"Accept", "application/x-ue-cb"}});
- IoHash Hash = IoHash::HashMemory(Data.Data(), Data.Size());
+ cpr::Response Response = Session.Get();
+ detail::Log(m_Log, "GET"sv, Response);
+ if (Response.status_code == 200)
+ {
+ return {IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()), true};
+ }
+
+ return {.Success = false};
+}
+
+CloudCacheResult
+CloudCacheSession::GetCompressedBlob(const IoHash& Key)
+{
std::string Auth;
m_CacheClient->AcquireAccessToken(Auth);
+
+ ExtendableStringBuilder<256> Uri;
+ Uri << m_CacheClient->ServiceUrl() << "/api/v1/compressed-blobs/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString();
+
+ cpr::Session& Session = m_SessionState->Session;
+
Session.SetOption(cpr::Url{Uri.c_str()});
- Session.SetOption(
- cpr::Header{{"Authorization", Auth}, {"X-Jupiter-IoHash", Hash.ToHexString()}, {"Content-Type", "application/octet-stream"}});
- Session.SetOption(cpr::Body{(const char*)Data.Data(), Data.Size()});
+ Session.SetOption(cpr::Header{{"Authorization", Auth}, {"Accept", "application/x-ue-comp"}});
- cpr::Response Response = Session.Put();
+ cpr::Response Response = Session.Get();
+ detail::Log(m_Log, "GET"sv, Response);
- if (Response.error)
+ if (Response.status_code == 200)
{
- spdlog::warn("PUT failed: '{}'", Response.error.message);
+ return {IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()), true};
}
+
+ return {.Success = false};
}
-void
-CloudCacheSession::Put(std::string_view BucketId, const IoHash& Key, ZenCacheValue Data)
+CloudCacheResult
+CloudCacheSession::PutDerivedData(std::string_view BucketId, std::string_view Key, IoBuffer DerivedData)
{
+ IoHash Hash = IoHash::HashMemory(DerivedData.Data(), DerivedData.Size());
+
+ std::string Auth;
+ m_CacheClient->AcquireAccessToken(Auth);
+
ExtendableStringBuilder<256> Uri;
- Uri << m_CacheClient->ServiceUrl();
- Uri << "/api/v1/c/ddc/" << m_CacheClient->Namespace() << "/" << BucketId << "/" TESTING_PREFIX << Key.ToHexString();
+ Uri << m_CacheClient->ServiceUrl() << "/api/v1/c/ddc/" << m_CacheClient->DdcNamespace() << "/" << BucketId << "/" << Key;
auto& Session = m_SessionState->Session;
+ Session.SetOption(cpr::Url{Uri.c_str()});
+ Session.SetOption(
+ cpr::Header{{"Authorization", Auth}, {"X-Jupiter-IoHash", Hash.ToHexString()}, {"Content-Type", "application/octet-stream"}});
+ Session.SetBody(cpr::Body{(const char*)DerivedData.Data(), DerivedData.Size()});
+
+ cpr::Response Response = Session.Put();
+ detail::Log(m_Log, "PUT"sv, Response);
+
+ return {.Success = Response.status_code == 200};
+}
+
+CloudCacheResult
+CloudCacheSession::PutDerivedData(std::string_view BucketId, const IoHash& Key, IoBuffer DerivedData)
+{
+ return PutDerivedData(BucketId, Key.ToHexString(), DerivedData);
+}
+
+CloudCacheResult
+CloudCacheSession::PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer Ref)
+{
+ IoHash Hash = IoHash::HashMemory(Ref.Data(), Ref.Size());
+
std::string Auth;
m_CacheClient->AcquireAccessToken(Auth);
+
+ ExtendableStringBuilder<256> Uri;
+ Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << m_CacheClient->BlobStoreNamespace() << "/" << BucketId << "/"
+ << Key.ToHexString();
+
+ cpr::Session& Session = m_SessionState->Session;
+
Session.SetOption(cpr::Url{Uri.c_str()});
+ Session.SetOption(
+ cpr::Header{{"Authorization", Auth}, {"X-Jupiter-IoHash", Hash.ToHexString()}, {"Content-Type", "application/x-ue-cb"}});
+ Session.SetBody(cpr::Body{(const char*)Ref.Data(), Ref.Size()});
- if (Data.Value.GetContentType() == ZenContentType::kCbObject)
- {
- CbObjectView Cbo(Data.Value.Data());
- const IoHash Hash = Cbo.GetHash();
- const MemoryView DataView = Cbo.GetView();
+ cpr::Response Response = Session.Put();
+ detail::Log(m_Log, "PUT"sv, Response);
- Session.SetOption(
- cpr::Header{{"Authorization", Auth}, {"X-Jupiter-IoHash", Hash.ToHexString()}, {"Content-Type", "application/x-ue-cb"}});
+ return {.Success = Response.status_code == 200};
+}
- Session.SetOption(cpr::Body{reinterpret_cast<const char*>(DataView.GetData()), DataView.GetSize()});
- }
- else
- {
- const IoHash Hash = IoHash::HashMemory(Data.Value.Data(), Data.Value.Size());
+CloudCacheResult
+CloudCacheSession::PutCompressedBlob(const IoHash& Key, IoBuffer Blob)
+{
+ std::string Auth;
+ m_CacheClient->AcquireAccessToken(Auth);
- Session.SetOption(
- cpr::Header{{"Authorization", Auth}, {"X-Jupiter-IoHash", Hash.ToHexString()}, {"Content-Type", "application/x-ue-cb"}});
+ ExtendableStringBuilder<256> Uri;
+ Uri << m_CacheClient->ServiceUrl() << "/api/v1/compressed-blobs/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString();
- Session.SetOption(cpr::Body{reinterpret_cast<const char*>(Data.Value.Data()), Data.Value.Size()});
- }
+ cpr::Session& Session = m_SessionState->Session;
+
+ Session.SetOption(cpr::Url{Uri.c_str()});
+ Session.SetOption(cpr::Header{{"Authorization", Auth}, {"Content-Type", "application/x-ue-comp"}});
+ Session.SetBody(cpr::Body{(const char*)Blob.Data(), Blob.Size()});
cpr::Response Response = Session.Put();
+ detail::Log(m_Log, "PUT"sv, Response);
- if (Response.error)
- {
- spdlog::warn("PUT failed: '{}'", Response.error.message);
- }
+ return {.Success = Response.status_code == 200};
}
std::vector<IoHash>
@@ -158,7 +265,7 @@ CloudCacheSession::Filter(std::string_view BucketId, const std::vector<IoHash>&
{
ExtendableStringBuilder<256> Uri;
Uri << m_CacheClient->ServiceUrl();
- Uri << "/api/v1/s/" << m_CacheClient->Namespace();
+ Uri << "/api/v1/s/" << m_CacheClient->DdcNamespace();
ZEN_UNUSED(BucketId, ChunkHashes);
@@ -167,17 +274,6 @@ CloudCacheSession::Filter(std::string_view BucketId, const std::vector<IoHash>&
//////////////////////////////////////////////////////////////////////////
-IoBuffer
-CloudCacheSession::Get(std::string_view BucketId, const IoHash& Key)
-{
- StringBuilder<64> KeyString;
- Key.ToHexString(KeyString);
-
- return Get(BucketId, KeyString);
-}
-
-//////////////////////////////////////////////////////////////////////////
-
std::string
CloudCacheAccessToken::GetAuthorizationHeaderValue()
{
@@ -197,27 +293,30 @@ CloudCacheAccessToken::SetToken(std::string_view Token)
//////////////////////////////////////////////////////////////////////////
//
// ServiceUrl: https://jupiter.devtools.epicgames.com
-// Namespace: ue4.ddc
+// DdcNamespace: ue4.ddc
// OAuthClientId: 0oao91lrhqPiAlaGD0x7
// OAuthProvider: https://epicgames.okta.com/oauth2/auso645ojjWVdRI3d0x7/v1/token
// OAuthSecret: -GBWjjenhCgOwhxL5yBKNJECVIoDPH0MK4RDuN7d
//
CloudCacheClient::CloudCacheClient(std::string_view ServiceUrl,
- std::string_view Namespace,
+ std::string_view DdcNamespace,
+ std::string_view BlobStoreNamespace,
std::string_view OAuthProvider,
std::string_view OAuthClientId,
std::string_view OAuthSecret)
-: m_ServiceUrl(ServiceUrl)
+: m_Log(zen::logging::Get("jupiter"))
+, m_ServiceUrl(ServiceUrl)
, m_OAuthFullUri(OAuthProvider)
-, m_Namespace(Namespace)
+, m_DdcNamespace(DdcNamespace)
+, m_BlobStoreNamespace(BlobStoreNamespace)
, m_DefaultBucket("default")
, m_OAuthClientId(OAuthClientId)
, m_OAuthSecret(OAuthSecret)
{
if (!OAuthProvider.starts_with("http://"sv) && !OAuthProvider.starts_with("https://"sv))
{
- spdlog::warn("bad provider specification: '{}' - must be fully qualified"_format(OAuthProvider).c_str());
+ m_Log.warn("bad provider specification: '{}' - must be fully qualified"_format(OAuthProvider).c_str());
m_IsValid = false;
return;
@@ -229,7 +328,7 @@ CloudCacheClient::CloudCacheClient(std::string_view ServiceUrl,
if (SchemePos == std::string::npos)
{
- spdlog::warn("Bad service URL passed to cloud cache client: '{}'", ServiceUrl);
+ m_Log.warn("Bad service URL passed to cloud cache client: '{}'", ServiceUrl);
m_IsValid = false;
return;
@@ -239,7 +338,7 @@ CloudCacheClient::CloudCacheClient(std::string_view ServiceUrl,
if (DomainEnd == std::string::npos)
{
- spdlog::warn("Bad service URL passed to cloud cache client: '{}' no path delimiter found", ServiceUrl);
+ m_Log.warn("Bad service URL passed to cloud cache client: '{}' no path delimiter found", ServiceUrl);
m_IsValid = false;
return;
diff --git a/zenserver/upstream/jupiter.h b/zenserver/upstream/jupiter.h
index 2ed458142..61d1bd99c 100644
--- a/zenserver/upstream/jupiter.h
+++ b/zenserver/upstream/jupiter.h
@@ -6,6 +6,8 @@
#include <zencore/refcount.h>
#include <zencore/thread.h>
+#include <spdlog/spdlog.h>
+
#include <atomic>
#include <list>
#include <memory>
@@ -39,6 +41,12 @@ private:
std::atomic<uint32_t> m_Serial;
};
+struct CloudCacheResult
+{
+ IoBuffer Value;
+ bool Success = false;
+};
+
/**
* Context for performing Jupiter operations
*
@@ -52,17 +60,20 @@ public:
CloudCacheSession(CloudCacheClient* OuterClient);
~CloudCacheSession();
- // Key-value cache operations
- IoBuffer Get(std::string_view BucketId, std::string_view Key);
- void Put(std::string_view BucketId, std::string_view Key, IoBuffer Data);
+ CloudCacheResult GetDerivedData(std::string_view BucketId, std::string_view Key);
+ CloudCacheResult GetDerivedData(std::string_view BucketId, const IoHash& Key);
+ CloudCacheResult GetRef(std::string_view BucketId, const IoHash& Key);
+ CloudCacheResult GetCompressedBlob(const IoHash& Key);
- // Structured cache operations
- IoBuffer Get(std::string_view BucketId, const IoHash& Key);
- void Put(std::string_view BucketId, const IoHash& Key, ZenCacheValue Data);
+ CloudCacheResult PutDerivedData(std::string_view BucketId, std::string_view Key, IoBuffer DerivedData);
+ CloudCacheResult PutDerivedData(std::string_view BucketId, const IoHash& Key, IoBuffer DerivedData);
+ CloudCacheResult PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer Ref);
+ CloudCacheResult PutCompressedBlob(const IoHash& Key, IoBuffer Blob);
std::vector<IoHash> Filter(std::string_view BucketId, const std::vector<IoHash>& ChunkHashes);
private:
+ spdlog::logger& m_Log;
RefPtr<CloudCacheClient> m_CacheClient;
detail::CloudCacheSessionState* m_SessionState;
};
@@ -74,28 +85,35 @@ class CloudCacheClient : public RefCounted
{
public:
CloudCacheClient(std::string_view ServiceUrl,
- std::string_view Namespace,
+ std::string_view DdcNamespace,
+ std::string_view BlobStoreNamespace,
std::string_view OAuthProvider,
std::string_view OAuthClientId,
std::string_view OAuthSecret);
~CloudCacheClient();
bool AcquireAccessToken(std::string& AuthorizationHeaderValue);
- std::string_view Namespace() const { return m_Namespace; }
+ std::string_view DdcNamespace() const { return m_DdcNamespace; }
+ std::string_view BlobStoreNamespace() const { return m_BlobStoreNamespace; }
std::string_view DefaultBucket() const { return m_DefaultBucket; }
std::string_view ServiceUrl() const { return m_ServiceUrl; }
+ bool IsValid() const { return m_IsValid; }
+
+ spdlog::logger& Logger() { return m_Log; }
private:
- bool m_IsValid = false;
+ spdlog::logger& m_Log;
std::string m_ServiceUrl;
std::string m_OAuthDomain;
std::string m_OAuthUriPath;
std::string m_OAuthFullUri;
- std::string m_Namespace;
+ std::string m_DdcNamespace;
+ std::string m_BlobStoreNamespace;
std::string m_DefaultBucket;
std::string m_OAuthClientId;
std::string m_OAuthSecret;
CloudCacheAccessToken m_AccessToken;
+ bool m_IsValid = false;
RwLock m_SessionStateLock;
std::list<detail::CloudCacheSessionState*> m_SessionStateCache;
diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp
new file mode 100644
index 000000000..ecd51a706
--- /dev/null
+++ b/zenserver/upstream/upstreamcache.cpp
@@ -0,0 +1,347 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "upstreamcache.h"
+#include "jupiter.h"
+#include "zen.h"
+
+#include <zencore/fmtutils.h>
+#include <zencore/timer.h>
+#include <zenstore/cas.h>
+#include <zenstore/cidstore.h>
+
+#include "cache/structuredcachestore.h"
+#include "diag/logging.h"
+
+#include <atomic>
+#include <deque>
+#include <thread>
+
+namespace zen {
+
+using namespace std::literals;
+
+namespace detail {
+
+ template<typename T>
+ class BlockingQueue
+ {
+ public:
+ BlockingQueue() = default;
+
+ ~BlockingQueue() { CompleteAdding(); }
+
+ void Enqueue(T&& Item)
+ {
+ {
+ std::lock_guard Lock(m_Lock);
+ m_Queue.emplace_back(std::move(Item));
+ }
+
+ m_NewItemSignal.notify_one();
+ }
+
+ bool WaitAndDequeue(T& Item)
+ {
+ if (m_CompleteAdding.load())
+ {
+ return false;
+ }
+
+ std::unique_lock Lock(m_Lock);
+ m_NewItemSignal.wait(Lock, [this]() { return !m_Queue.empty() || m_CompleteAdding.load(); });
+
+ if (!m_Queue.empty())
+ {
+ Item = std::move(m_Queue.front());
+ m_Queue.pop_front();
+
+ return true;
+ }
+
+ return false;
+ }
+
+ void CompleteAdding()
+ {
+ if (!m_CompleteAdding.load())
+ {
+ m_CompleteAdding.store(true);
+ m_NewItemSignal.notify_all();
+ }
+ }
+
+ std::size_t Num() const
+ {
+ std::unique_lock Lock(m_Lock);
+ return m_Queue.size();
+ }
+
+ private:
+ mutable std::mutex m_Lock;
+ std::condition_variable m_NewItemSignal;
+ std::deque<T> m_Queue;
+ std::atomic_bool m_CompleteAdding{false};
+ };
+
+} // namespace detail
+
+//////////////////////////////////////////////////////////////////////////
+
+class DefaultUpstreamCache final : public UpstreamCache
+{
+public:
+ DefaultUpstreamCache(const UpstreamCacheOptions& Options, ::ZenCacheStore& CacheStore, CidStore& CidStore)
+ : m_Log(zen::logging::Get("upstream"))
+ , m_Options(Options)
+ , m_CacheStore(CacheStore)
+ , m_CidStore(CidStore)
+ {
+ if (m_Options.JupiterEnabled)
+ {
+ m_CloudClient = new CloudCacheClient(m_Options.JupiterEndpoint,
+ m_Options.JupiterDdcNamespace,
+ m_Options.JupiterBlobStoreNamespace,
+ m_Options.JupiterOAuthProvider,
+ m_Options.JupiterOAuthClientId,
+ m_Options.JupiterOAuthSecret);
+
+ std::string TmpAuthHeader;
+ if (m_CloudClient->AcquireAccessToken(TmpAuthHeader) && m_CloudClient->IsValid())
+ {
+ m_Log.info("using Jupiter endpoint: '{}', DDC namespace: '{}', Blob Store namespace: '{}'",
+ m_Options.JupiterEndpoint,
+ m_Options.JupiterDdcNamespace,
+ m_Options.JupiterBlobStoreNamespace);
+ }
+ else
+ {
+ m_Log.warn("failed to initialized Jupiter at '{}'", m_Options.JupiterEndpoint);
+ }
+ }
+
+ m_IsRunning = m_CloudClient && m_CloudClient->IsValid();
+
+ if (m_IsRunning)
+ {
+ m_Log.info("using '{}' upstream thread(s)", m_Options.ThreadCount);
+ for (uint32_t Idx = 0; Idx < m_Options.ThreadCount; Idx++)
+ {
+ m_UpstreamThreads.emplace_back(&DefaultUpstreamCache::ProcessUpstreamQueue, this);
+ }
+ }
+ else
+ {
+ m_Log.warn("upstream disabled, no valid endpoints");
+ }
+ }
+
+ virtual ~DefaultUpstreamCache() { Shutdown(); }
+
+ virtual GetCacheRecordResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) override
+ {
+ if (m_CloudClient && m_CloudClient->IsValid())
+ {
+ zen::Stopwatch Timer;
+
+ try
+ {
+ CloudCacheSession Session(m_CloudClient);
+ CloudCacheResult Result;
+
+ if (Type == ZenContentType::kBinary)
+ {
+ Result = Session.GetDerivedData(CacheKey.Bucket, CacheKey.Hash);
+ }
+ else
+ {
+ Result = Session.GetRef(CacheKey.Bucket, CacheKey.Hash);
+ }
+
+ return {.Value = Result.Value, .Success = Result.Success};
+ }
+ catch (std::exception& e)
+ {
+ m_Log.warn("get cache record ({}/{}) FAILED after {:5}: '{}'",
+ CacheKey.Bucket,
+ CacheKey.Hash,
+ zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()),
+ e.what());
+ }
+ }
+
+ return {};
+ }
+
+ virtual GetCachePayloadResult GetCachePayload(UpstreamPayloadKey PayloadKey) override
+ {
+ if (m_CloudClient && m_CloudClient->IsValid())
+ {
+ zen::Stopwatch Timer;
+
+ try
+ {
+ CloudCacheSession Session(m_CloudClient);
+
+ CloudCacheResult Result = Session.GetCompressedBlob(PayloadKey.PayloadId);
+ return {.Payload = Result.Value, .Success = Result.Success};
+ }
+ catch (std::exception& e)
+ {
+ m_Log.warn("get cache payload ({}/{}/{}) FAILED after {:5}: '{}'",
+ PayloadKey.CacheKey.Bucket,
+ PayloadKey.CacheKey.Hash,
+ PayloadKey.PayloadId,
+ zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()),
+ e.what());
+ }
+ }
+
+ return {};
+ }
+
+ virtual EnqueueResult EnqueueUpstream(UpstreamCacheRecord CacheRecord) override
+ {
+ if (m_IsRunning.load())
+ {
+ m_UpstreamQueue.Enqueue(std::move(CacheRecord));
+ return {.Success = true};
+ }
+
+ return {};
+ }
+
+private:
+ void ProcessCacheRecord(UpstreamCacheRecord CacheRecord)
+ {
+ const uint32_t MaxAttempts = 3;
+
+ if (m_CloudClient && m_CloudClient->IsValid())
+ {
+ CloudCacheSession Session(m_CloudClient);
+ ZenCacheValue CacheValue;
+ if (!m_CacheStore.Get(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, CacheValue))
+ {
+ m_Log.warn("process upstream FAILED, '{}/{}' doesn't exist in cache",
+ CacheRecord.CacheKey.Bucket,
+ CacheRecord.CacheKey.Hash);
+ return;
+ }
+
+ if (CacheRecord.Type == ZenContentType::kBinary)
+ {
+ CloudCacheResult Result;
+ for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++)
+ {
+ Result = Session.PutDerivedData(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, CacheValue.Value);
+ }
+
+ if (!Result.Success)
+ {
+ m_Log.warn("upload (binary) '{}/{}' FAILED after '{}' attempts",
+ CacheRecord.CacheKey.Bucket,
+ CacheRecord.CacheKey.Hash,
+ MaxAttempts);
+ }
+ }
+ else
+ {
+ ZEN_ASSERT(CacheRecord.Type == ZenContentType::kCbObject);
+
+ CloudCacheResult Result;
+ for (const IoHash& PayloadId : CacheRecord.PayloadIds)
+ {
+ Result.Success = false;
+ if (IoBuffer Payload = m_CidStore.FindChunkByCid(PayloadId))
+ {
+ for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++)
+ {
+ Result = Session.PutCompressedBlob(PayloadId, Payload);
+ }
+ }
+
+ if (!Result.Success)
+ {
+ m_Log.warn("upload payload '{}' FAILED after '{}' attempts", PayloadId, MaxAttempts);
+ break;
+ }
+ }
+
+ if (Result.Success)
+ {
+ Result.Success = false;
+ for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++)
+ {
+ Result = Session.PutRef(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, CacheValue.Value);
+ }
+ }
+
+ if (!Result.Success)
+ {
+ m_Log.warn("upload cache record '{}/{}' FAILED", CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash);
+ }
+ }
+ }
+ }
+
+ void ProcessUpstreamQueue()
+ {
+ for (;;)
+ {
+ UpstreamCacheRecord CacheRecord;
+ if (m_UpstreamQueue.WaitAndDequeue(CacheRecord))
+ {
+ try
+ {
+ ProcessCacheRecord(std::move(CacheRecord));
+ }
+ catch (std::exception& e)
+ {
+ m_Log.warn("process upstream ({}/{}) FAILED '{}'", CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, e.what());
+ }
+ }
+
+ if (!m_IsRunning.load())
+ {
+ break;
+ }
+ }
+ }
+
+ void Shutdown()
+ {
+ if (m_IsRunning.load())
+ {
+ m_IsRunning.store(false);
+ m_UpstreamQueue.CompleteAdding();
+
+ for (std::thread& Thread : m_UpstreamThreads)
+ {
+ Thread.join();
+ }
+
+ m_UpstreamThreads.clear();
+ }
+ }
+
+ using UpstreamQueue = detail::BlockingQueue<UpstreamCacheRecord>;
+
+ spdlog::logger& m_Log;
+ UpstreamCacheOptions m_Options;
+ ::ZenCacheStore& m_CacheStore;
+ CidStore& m_CidStore;
+ UpstreamQueue m_UpstreamQueue;
+ RefPtr<CloudCacheClient> m_CloudClient;
+ RefPtr<ZenStructuredCacheClient> m_ZenClient;
+ std::vector<std::thread> m_UpstreamThreads;
+ std::atomic_bool m_IsRunning{false};
+};
+
+//////////////////////////////////////////////////////////////////////////
+
+std::unique_ptr<UpstreamCache>
+MakeUpstreamCache(const UpstreamCacheOptions& Options, ::ZenCacheStore& CacheStore, CidStore& CidStore)
+{
+ return std::make_unique<DefaultUpstreamCache>(Options, CacheStore, CidStore);
+}
+
+} // namespace zen
diff --git a/zenserver/upstream/upstreamcache.h b/zenserver/upstream/upstreamcache.h
new file mode 100644
index 000000000..23a542151
--- /dev/null
+++ b/zenserver/upstream/upstreamcache.h
@@ -0,0 +1,82 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/iobuffer.h>
+#include <zencore/iohash.h>
+#include <zencore/zencore.h>
+
+#include <memory>
+
+class ZenCacheStore;
+
+namespace zen {
+
+class CidStore;
+
+struct UpstreamCacheKey
+{
+ std::string Bucket;
+ IoHash Hash;
+};
+
+struct UpstreamPayloadKey
+{
+ UpstreamCacheKey CacheKey;
+ IoHash PayloadId;
+};
+
+struct UpstreamCacheRecord
+{
+ ZenContentType Type = ZenContentType::kBinary;
+ UpstreamCacheKey CacheKey;
+ std::vector<IoHash> PayloadIds;
+};
+
+struct UpstreamCacheOptions
+{
+ uint32_t ThreadCount = 4;
+ bool JupiterEnabled = true;
+ std::string_view JupiterEndpoint;
+ std::string_view JupiterDdcNamespace;
+ std::string_view JupiterBlobStoreNamespace;
+ std::string_view JupiterOAuthProvider;
+ std::string_view JupiterOAuthClientId;
+ std::string_view JupiterOAuthSecret;
+};
+
+/**
+ * Manages one or more upstream cache endpoints.
+ */
+class UpstreamCache
+{
+public:
+ virtual ~UpstreamCache() = default;
+
+ struct GetCacheRecordResult
+ {
+ IoBuffer Value;
+ bool Success = false;
+ };
+
+ virtual GetCacheRecordResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) = 0;
+
+ struct GetCachePayloadResult
+ {
+ IoBuffer Payload;
+ bool Success = false;
+ };
+
+ virtual GetCachePayloadResult GetCachePayload(UpstreamPayloadKey PayloadKey) = 0;
+
+ struct EnqueueResult
+ {
+ bool Success = false;
+ };
+
+ virtual EnqueueResult EnqueueUpstream(UpstreamCacheRecord CacheRecord) = 0;
+};
+
+std::unique_ptr<UpstreamCache> MakeUpstreamCache(const UpstreamCacheOptions& Options, ::ZenCacheStore& CacheStore, CidStore& CidStore);
+
+} // namespace zen
diff --git a/zenserver/zenserver.cpp b/zenserver/zenserver.cpp
index 32a468452..395ae5fc2 100644
--- a/zenserver/zenserver.cpp
+++ b/zenserver/zenserver.cpp
@@ -54,12 +54,14 @@
#include "admin/admin.h"
#include "cache/kvcache.h"
#include "cache/structuredcache.h"
+#include "cache/structuredcachestore.h"
#include "compute/apply.h"
#include "diag/diagsvcs.h"
#include "experimental/usnjournal.h"
#include "projectstore.h"
#include "testing/launch.h"
#include "upstream/jupiter.h"
+#include "upstream/upstreamcache.h"
#include "upstream/zen.h"
#include "zenstore/gc.h"
#include "zenstore/scrub.h"
@@ -139,7 +141,28 @@ public:
if (ServiceConfig.StructuredCacheEnabled)
{
spdlog::info("instantiating structured cache service");
- m_StructuredCacheService.reset(new zen::HttpStructuredCacheService(m_DataRoot / "cache", *m_CasStore, *m_CidStore));
+ m_CacheStore = std::make_unique<ZenCacheStore>(*m_CasStore, m_DataRoot / "cache");
+
+ std::unique_ptr<zen::UpstreamCache> UpstreamCache;
+ if (ServiceConfig.UpstreamCacheEnabled)
+ {
+ using namespace std::literals;
+
+ zen::UpstreamCacheOptions UpstreamOptions;
+ UpstreamOptions.ThreadCount = 4;
+ UpstreamOptions.JupiterEnabled = true;
+ UpstreamOptions.JupiterEndpoint = "https://jupiter.devtools-dev.epicgames.com"sv;
+ UpstreamOptions.JupiterDdcNamespace = "ue4.ddc"sv;
+ UpstreamOptions.JupiterBlobStoreNamespace = "test.ddc"sv;
+ UpstreamOptions.JupiterOAuthProvider = "https://epicgames.okta.com/oauth2/auso645ojjWVdRI3d0x7/v1/token"sv;
+ UpstreamOptions.JupiterOAuthClientId = "0oao91lrhqPiAlaGD0x7"sv;
+ UpstreamOptions.JupiterOAuthSecret = "-GBWjjenhCgOwhxL5yBKNJECVIoDPH0MK4RDuN7d"sv;
+
+ UpstreamCache = zen::MakeUpstreamCache(UpstreamOptions, *m_CacheStore, *m_CidStore);
+ }
+
+ m_StructuredCacheService.reset(
+ new zen::HttpStructuredCacheService(*m_CacheStore, *m_CasStore, *m_CidStore, std::move(UpstreamCache)));
}
else
{
@@ -297,6 +320,7 @@ private:
zen::HttpServer m_Http;
std::unique_ptr<zen::CasStore> m_CasStore{zen::CreateCasStore()};
std::unique_ptr<zen::CidStore> m_CidStore;
+ std::unique_ptr<ZenCacheStore> m_CacheStore;
zen::CasGc m_Gc{*m_CasStore};
zen::CasScrubber m_Scrubber{*m_CasStore};
HttpTestService m_TestService;
@@ -397,5 +421,7 @@ main(int argc, char* argv[])
SPDLOG_CRITICAL("Caught exception in main: {}", e.what());
}
+ ShutdownLogging();
+
return 0;
}
diff --git a/zenserver/zenserver.vcxproj b/zenserver/zenserver.vcxproj
index 6c87b4a68..9b2fc891e 100644
--- a/zenserver/zenserver.vcxproj
+++ b/zenserver/zenserver.vcxproj
@@ -120,6 +120,7 @@
<ClInclude Include="diag\diagsvcs.h" />
<ClInclude Include="experimental\usnjournal.h" />
<ClInclude Include="targetver.h" />
+ <ClInclude Include="upstream\upstreamcache.h" />
<ClInclude Include="upstream\zen.h" />
<ClInclude Include="vfs.h" />
</ItemGroup>
@@ -138,6 +139,7 @@
<ClCompile Include="cache\cachestore.cpp" />
<ClCompile Include="casstore.cpp" />
<ClCompile Include="experimental\usnjournal.cpp" />
+ <ClCompile Include="upstream\upstreamcache.cpp" />
<ClCompile Include="upstream\zen.cpp" />
<ClCompile Include="vfs.cpp" />
<ClCompile Include="zenserver.cpp" />
diff --git a/zenserver/zenserver.vcxproj.filters b/zenserver/zenserver.vcxproj.filters
index 79fcfb803..1fe902731 100644
--- a/zenserver/zenserver.vcxproj.filters
+++ b/zenserver/zenserver.vcxproj.filters
@@ -38,6 +38,9 @@
<ClInclude Include="cache\structuredcachestore.h" />
<ClInclude Include="compute\apply.h" />
<ClInclude Include="sos\sos.h" />
+ <ClInclude Include="upstream\upstreamcache.h">
+ <Filter>upstream</Filter>
+ </ClInclude>
</ItemGroup>
<ItemGroup>
<ClCompile Include="zenserver.cpp" />
@@ -71,6 +74,9 @@
<ClCompile Include="cache\structuredcachestore.cpp" />
<ClCompile Include="compute\apply.cpp" />
<ClCompile Include="sos\sos.cpp" />
+ <ClCompile Include="upstream\upstreamcache.cpp">
+ <Filter>upstream</Filter>
+ </ClCompile>
</ItemGroup>
<ItemGroup>
<Filter Include="cache">