aboutsummaryrefslogtreecommitdiff
path: root/zenserver/cache/structuredcache.cpp
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2021-08-12 10:05:20 +0200
committerStefan Boberg <[email protected]>2021-08-12 10:05:20 +0200
commite664bbc31ff4ea866401c9303c53108242a6cb27 (patch)
tree71dccfc377f58042a5776db85738e3cf2e495358 /zenserver/cache/structuredcache.cpp
parentImplemented Flush() operation for CID/CAS store interfaces (diff)
downloadzen-e664bbc31ff4ea866401c9303c53108242a6cb27.tar.xz
zen-e664bbc31ff4ea866401c9303c53108242a6cb27.zip
Implemented flush operations for cache services
Also implemented basic upstream query interface, which needs a bit more work to be fully functional (chunk propagation / fetching and new propagation policies as per DDC requirements)
Diffstat (limited to 'zenserver/cache/structuredcache.cpp')
-rw-r--r--zenserver/cache/structuredcache.cpp184
1 files changed, 167 insertions, 17 deletions
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp
index 9083f764e..7f6f49e06 100644
--- a/zenserver/cache/structuredcache.cpp
+++ b/zenserver/cache/structuredcache.cpp
@@ -12,6 +12,7 @@
#include "structuredcache.h"
#include "structuredcachestore.h"
#include "upstream/jupiter.h"
+#include "upstream/zen.h"
#include "zenstore/cidstore.h"
#include <spdlog/spdlog.h>
@@ -22,12 +23,36 @@ namespace zen {
using namespace std::literals;
+zen::HttpContentType
+MapToHttpContentType(zen::ZenContentType Type)
+{
+ switch (Type)
+ {
+ default:
+ case zen::ZenContentType::kBinary:
+ return zen::HttpContentType::kBinary;
+ case zen::ZenContentType::kCbObject:
+ return zen::HttpContentType::kCbObject;
+ case zen::ZenContentType::kCbPackage:
+ return zen::HttpContentType::kCbPackage;
+ case zen::ZenContentType::kText:
+ return zen::HttpContentType::kText;
+ case zen::ZenContentType::kJSON:
+ return zen::HttpContentType::kJSON;
+ case zen::ZenContentType::kYAML:
+ return zen::HttpContentType::kYAML;
+ }
+};
+
HttpStructuredCacheService::HttpStructuredCacheService(std::filesystem::path RootPath, zen::CasStore& InStore, zen::CidStore& InCidStore)
-: m_CasStore(InStore)
+: m_Log("cache", begin(spdlog::default_logger()->sinks()), end(spdlog::default_logger()->sinks()))
+, m_CasStore(InStore)
, m_CacheStore(InStore, RootPath)
, m_CidStore(InCidStore)
{
- spdlog::info("initializing structured cache at '{}'", RootPath);
+ m_Log.set_level(spdlog::level::debug);
+
+ m_Log.info("initializing structured cache at '{}'", RootPath);
#if 0
m_Cloud = new CloudCacheClient("https://jupiter.devtools.epicgames.com"sv,
@@ -36,6 +61,14 @@ HttpStructuredCacheService::HttpStructuredCacheService(std::filesystem::path Roo
"0oao91lrhqPiAlaGD0x7"sv /* client id */,
"-GBWjjenhCgOwhxL5yBKNJECVIoDPH0MK4RDuN7d"sv /* oauth secret */);
#endif
+
+#if 0
+ std::string_view UpstreamSpec = "http://arn-wd-15192.epicgames.net"sv;
+
+ m_Log.info("Using upstream Zen cache at '{}'", UpstreamSpec);
+
+ m_ZenClient = new ZenStructuredCacheClient(UpstreamSpec);
+#endif
}
HttpStructuredCacheService::~HttpStructuredCacheService()
@@ -50,6 +83,11 @@ HttpStructuredCacheService::BaseUri() const
}
void
+HttpStructuredCacheService::Flush()
+{
+}
+
+void
HttpStructuredCacheService::HandleRequest(zen::HttpServerRequest& Request)
{
CacheRef Ref;
@@ -84,8 +122,73 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req
ZenCacheValue Value;
bool Success = m_CacheStore.Get(Ref.BucketSegment, Ref.HashKey, /* out */ Value);
+ if (!Success && m_ZenClient)
+ {
+ ZenStructuredCacheSession Session(*m_ZenClient);
+
+ zen::Stopwatch Timer;
+
+ try
+ {
+ Value.Value = Session.Get(Ref.BucketSegment, Ref.HashKey);
+
+ m_Log.debug("Zen upstream GET ({}/{}) succeeded after {:5}!",
+ Ref.BucketSegment,
+ Ref.HashKey,
+ zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()));
+
+ // TODO: this is incomplete and needs to propagate any referenced content
+
+ Success = true;
+ }
+ catch (std::exception& e)
+ {
+ m_Log.warn("Zen upstream GET ({}/{}) FAILED after {:5}: '{}'",
+ Ref.BucketSegment,
+ Ref.HashKey,
+ zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()),
+ e.what());
+
+ throw;
+ }
+ }
+
+ if (m_Cloud)
+ {
+ // Note that this is not fully functional, pending implementation work on
+ // the Jupiter end
+
+ CloudCacheSession Session(m_Cloud);
+
+ zen::Stopwatch Timer;
+
+ try
+ {
+ Value.Value = Session.Get(Ref.BucketSegment, Ref.HashKey);
+
+ m_Log.debug("Cloud upstream GET ({}/{}) succeeded after {:5}!",
+ Ref.BucketSegment,
+ Ref.HashKey,
+ zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()));
+
+ // TODO: this is incomplete and needs to propagate any referenced content
+ }
+ catch (std::exception& e)
+ {
+ m_Log.warn("Cloud upstream GET ({}/{}) FAILED after {:5}: '{}'",
+ Ref.BucketSegment,
+ Ref.HashKey,
+ zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()),
+ e.what());
+
+ throw;
+ }
+ }
+
if (!Success)
{
+ m_Log.debug("MISS - '{}/{}'", Ref.BucketSegment, Ref.HashKey);
+
return Request.WriteResponse(zen::HttpResponse::NotFound);
}
@@ -94,7 +197,13 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req
Request.SetSuppressResponseBody();
}
- return Request.WriteResponse(zen::HttpResponse::OK, zen::HttpContentType::kBinary, Value.Value);
+ m_Log.debug("HIT - '{}/{}' ({} bytes, {})",
+ Ref.BucketSegment,
+ Ref.HashKey,
+ Value.Value.Size(),
+ Value.Value.GetContentType());
+
+ return Request.WriteResponse(zen::HttpResponse::OK, MapToHttpContentType(Value.Value.GetContentType()), Value.Value);
}
break;
@@ -139,6 +248,8 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req
if (ValidationResult != CbValidateError::None)
{
+ m_Log.warn("Payload for key '{}/{}' ({} bytes) failed validation", Ref.BucketSegment, Ref.HashKey, Body.Size());
+
// TODO: add details in response
return Request.WriteResponse(HttpResponse::BadRequest);
}
@@ -160,15 +271,42 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req
}
Idx.EndArray();
- }
- // TODO: store references in index
+ // TODO: store references in index
+ }
}
m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, Value);
- // This is currently synchronous for simplicity and debuggability but should be
- // made asynchronous
+ // This is currently synchronous for simplicity and debuggability but should
+ // absolutely be made asynchronous. By default these should be deferred
+ // because the client should not care if the data has propagated upstream or
+ // not
+
+ if (m_ZenClient)
+ {
+ ZenStructuredCacheSession Session(*m_ZenClient);
+
+ zen::Stopwatch Timer;
+
+ try
+ {
+ Session.Put(Ref.BucketSegment, Ref.HashKey, Value);
+
+ m_Log.debug("Zen upstream PUT ({}) succeeded after {:5}!",
+ Ref.HashKey,
+ zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()));
+ }
+ catch (std::exception& e)
+ {
+ m_Log.warn("Zen upstream PUT ({}) FAILED after {:5}: '{}'",
+ Ref.HashKey,
+ zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()),
+ e.what());
+
+ throw;
+ }
+ }
if (m_Cloud)
{
@@ -179,16 +317,17 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req
try
{
Session.Put(Ref.BucketSegment, Ref.HashKey, Value);
- spdlog::debug("upstream PUT ({}) succeeded after {:5}!",
- Ref.HashKey,
- zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()));
+
+ m_Log.debug("upstream PUT ({}) succeeded after {:5}!",
+ Ref.HashKey,
+ zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()));
}
catch (std::exception& e)
{
- spdlog::debug("upstream PUT ({}) failed after {:5}: '{}'",
- Ref.HashKey,
- zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()),
- e.what());
+ m_Log.warn("upstream PUT ({}) FAILED after {:5}: '{}'",
+ Ref.HashKey,
+ zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()),
+ e.what());
throw;
}
@@ -227,16 +366,27 @@ HttpStructuredCacheService::HandleCachePayloadRequest(zen::HttpServerRequest& Re
case kHead:
case kGet:
{
- // TODO: need to map from uncompressed content address into the storage
- // (compressed) content address
-
zen::IoBuffer Payload = m_CidStore.FindChunkByCid(Ref.PayloadId);
if (!Payload)
{
+ m_Log.debug("MISS - '{}/{}/{}' ({} bytes, {})",
+ Ref.BucketSegment,
+ Ref.HashKey,
+ Ref.PayloadId,
+ Payload.Size(),
+ Payload.GetContentType());
+
return Request.WriteResponse(zen::HttpResponse::NotFound);
}
+ m_Log.debug("HIT - '{}/{}/{}' ({} bytes, {})",
+ Ref.BucketSegment,
+ Ref.HashKey,
+ Ref.PayloadId,
+ Payload.Size(),
+ Payload.GetContentType());
+
if (Verb == kHead)
{
Request.SetSuppressResponseBody();