aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2021-08-12 10:05:20 +0200
committerStefan Boberg <[email protected]>2021-08-12 10:05:20 +0200
commite664bbc31ff4ea866401c9303c53108242a6cb27 (patch)
tree71dccfc377f58042a5776db85738e3cf2e495358
parentImplemented Flush() operation for CID/CAS store interfaces (diff)
downloadzen-e664bbc31ff4ea866401c9303c53108242a6cb27.tar.xz
zen-e664bbc31ff4ea866401c9303c53108242a6cb27.zip
Implemented flush operations for cache services
Also implemented basic upstream query interface, which needs a bit more work to be fully functional (chunk propagation / fetching and new propagation policies as per DDC requirements)
-rw-r--r--zenserver/cache/kvcache.cpp5
-rw-r--r--zenserver/cache/kvcache.h1
-rw-r--r--zenserver/cache/structuredcache.cpp184
-rw-r--r--zenserver/cache/structuredcache.h15
-rw-r--r--zenserver/cache/structuredcachestore.cpp6
-rw-r--r--zenserver/cache/structuredcachestore.h1
-rw-r--r--zenserver/upstream/zen.cpp97
-rw-r--r--zenserver/upstream/zen.h54
8 files changed, 338 insertions, 25 deletions
diff --git a/zenserver/cache/kvcache.cpp b/zenserver/cache/kvcache.cpp
index 4c3b1e33d..2753952ae 100644
--- a/zenserver/cache/kvcache.cpp
+++ b/zenserver/cache/kvcache.cpp
@@ -205,4 +205,9 @@ HttpKvCacheService::HandleRequest(zen::HttpServerRequest& Request)
}
}
+void
+HttpKvCacheService::Flush()
+{
+}
+
} // namespace zen
diff --git a/zenserver/cache/kvcache.h b/zenserver/cache/kvcache.h
index e601582a4..ce4fb1d36 100644
--- a/zenserver/cache/kvcache.h
+++ b/zenserver/cache/kvcache.h
@@ -22,6 +22,7 @@ public:
virtual const char* BaseUri() const override;
virtual void HandleRequest(zen::HttpServerRequest& Request) override;
+ void Flush();
private:
MemoryCacheStore m_cache;
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp
index 9083f764e..7f6f49e06 100644
--- a/zenserver/cache/structuredcache.cpp
+++ b/zenserver/cache/structuredcache.cpp
@@ -12,6 +12,7 @@
#include "structuredcache.h"
#include "structuredcachestore.h"
#include "upstream/jupiter.h"
+#include "upstream/zen.h"
#include "zenstore/cidstore.h"
#include <spdlog/spdlog.h>
@@ -22,12 +23,36 @@ namespace zen {
using namespace std::literals;
+zen::HttpContentType
+MapToHttpContentType(zen::ZenContentType Type)
+{
+ switch (Type)
+ {
+ default:
+ case zen::ZenContentType::kBinary:
+ return zen::HttpContentType::kBinary;
+ case zen::ZenContentType::kCbObject:
+ return zen::HttpContentType::kCbObject;
+ case zen::ZenContentType::kCbPackage:
+ return zen::HttpContentType::kCbPackage;
+ case zen::ZenContentType::kText:
+ return zen::HttpContentType::kText;
+ case zen::ZenContentType::kJSON:
+ return zen::HttpContentType::kJSON;
+ case zen::ZenContentType::kYAML:
+ return zen::HttpContentType::kYAML;
+ }
+};
+
HttpStructuredCacheService::HttpStructuredCacheService(std::filesystem::path RootPath, zen::CasStore& InStore, zen::CidStore& InCidStore)
-: m_CasStore(InStore)
+: m_Log("cache", begin(spdlog::default_logger()->sinks()), end(spdlog::default_logger()->sinks()))
+, m_CasStore(InStore)
, m_CacheStore(InStore, RootPath)
, m_CidStore(InCidStore)
{
- spdlog::info("initializing structured cache at '{}'", RootPath);
+ m_Log.set_level(spdlog::level::debug);
+
+ m_Log.info("initializing structured cache at '{}'", RootPath);
#if 0
m_Cloud = new CloudCacheClient("https://jupiter.devtools.epicgames.com"sv,
@@ -36,6 +61,14 @@ HttpStructuredCacheService::HttpStructuredCacheService(std::filesystem::path Roo
"0oao91lrhqPiAlaGD0x7"sv /* client id */,
"-GBWjjenhCgOwhxL5yBKNJECVIoDPH0MK4RDuN7d"sv /* oauth secret */);
#endif
+
+#if 0
+ std::string_view UpstreamSpec = "http://arn-wd-15192.epicgames.net"sv;
+
+ m_Log.info("Using upstream Zen cache at '{}'", UpstreamSpec);
+
+ m_ZenClient = new ZenStructuredCacheClient(UpstreamSpec);
+#endif
}
HttpStructuredCacheService::~HttpStructuredCacheService()
@@ -50,6 +83,11 @@ HttpStructuredCacheService::BaseUri() const
}
void
+HttpStructuredCacheService::Flush()
+{
+}
+
+void
HttpStructuredCacheService::HandleRequest(zen::HttpServerRequest& Request)
{
CacheRef Ref;
@@ -84,8 +122,73 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req
ZenCacheValue Value;
bool Success = m_CacheStore.Get(Ref.BucketSegment, Ref.HashKey, /* out */ Value);
+ if (!Success && m_ZenClient)
+ {
+ ZenStructuredCacheSession Session(*m_ZenClient);
+
+ zen::Stopwatch Timer;
+
+ try
+ {
+ Value.Value = Session.Get(Ref.BucketSegment, Ref.HashKey);
+
+ m_Log.debug("Zen upstream GET ({}/{}) succeeded after {:5}!",
+ Ref.BucketSegment,
+ Ref.HashKey,
+ zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()));
+
+ // TODO: this is incomplete and needs to propagate any referenced content
+
+ Success = true;
+ }
+ catch (std::exception& e)
+ {
+ m_Log.warn("Zen upstream GET ({}/{}) FAILED after {:5}: '{}'",
+ Ref.BucketSegment,
+ Ref.HashKey,
+ zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()),
+ e.what());
+
+ throw;
+ }
+ }
+
+ if (m_Cloud)
+ {
+ // Note that this is not fully functional, pending implementation work on
+ // the Jupiter end
+
+ CloudCacheSession Session(m_Cloud);
+
+ zen::Stopwatch Timer;
+
+ try
+ {
+ Value.Value = Session.Get(Ref.BucketSegment, Ref.HashKey);
+
+ m_Log.debug("Cloud upstream GET ({}/{}) succeeded after {:5}!",
+ Ref.BucketSegment,
+ Ref.HashKey,
+ zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()));
+
+ // TODO: this is incomplete and needs to propagate any referenced content
+ }
+ catch (std::exception& e)
+ {
+ m_Log.warn("Cloud upstream GET ({}/{}) FAILED after {:5}: '{}'",
+ Ref.BucketSegment,
+ Ref.HashKey,
+ zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()),
+ e.what());
+
+ throw;
+ }
+ }
+
if (!Success)
{
+ m_Log.debug("MISS - '{}/{}'", Ref.BucketSegment, Ref.HashKey);
+
return Request.WriteResponse(zen::HttpResponse::NotFound);
}
@@ -94,7 +197,13 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req
Request.SetSuppressResponseBody();
}
- return Request.WriteResponse(zen::HttpResponse::OK, zen::HttpContentType::kBinary, Value.Value);
+ m_Log.debug("HIT - '{}/{}' ({} bytes, {})",
+ Ref.BucketSegment,
+ Ref.HashKey,
+ Value.Value.Size(),
+ Value.Value.GetContentType());
+
+ return Request.WriteResponse(zen::HttpResponse::OK, MapToHttpContentType(Value.Value.GetContentType()), Value.Value);
}
break;
@@ -139,6 +248,8 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req
if (ValidationResult != CbValidateError::None)
{
+ m_Log.warn("Payload for key '{}/{}' ({} bytes) failed validation", Ref.BucketSegment, Ref.HashKey, Body.Size());
+
// TODO: add details in response
return Request.WriteResponse(HttpResponse::BadRequest);
}
@@ -160,15 +271,42 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req
}
Idx.EndArray();
- }
- // TODO: store references in index
+ // TODO: store references in index
+ }
}
m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, Value);
- // This is currently synchronous for simplicity and debuggability but should be
- // made asynchronous
+ // This is currently synchronous for simplicity and debuggability but should
+ // absolutely be made asynchronous. By default these should be deferred
+ // because the client should not care if the data has propagated upstream or
+ // not
+
+ if (m_ZenClient)
+ {
+ ZenStructuredCacheSession Session(*m_ZenClient);
+
+ zen::Stopwatch Timer;
+
+ try
+ {
+ Session.Put(Ref.BucketSegment, Ref.HashKey, Value);
+
+ m_Log.debug("Zen upstream PUT ({}) succeeded after {:5}!",
+ Ref.HashKey,
+ zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()));
+ }
+ catch (std::exception& e)
+ {
+ m_Log.warn("Zen upstream PUT ({}) FAILED after {:5}: '{}'",
+ Ref.HashKey,
+ zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()),
+ e.what());
+
+ throw;
+ }
+ }
if (m_Cloud)
{
@@ -179,16 +317,17 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req
try
{
Session.Put(Ref.BucketSegment, Ref.HashKey, Value);
- spdlog::debug("upstream PUT ({}) succeeded after {:5}!",
- Ref.HashKey,
- zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()));
+
+ m_Log.debug("upstream PUT ({}) succeeded after {:5}!",
+ Ref.HashKey,
+ zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()));
}
catch (std::exception& e)
{
- spdlog::debug("upstream PUT ({}) failed after {:5}: '{}'",
- Ref.HashKey,
- zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()),
- e.what());
+ m_Log.warn("upstream PUT ({}) FAILED after {:5}: '{}'",
+ Ref.HashKey,
+ zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()),
+ e.what());
throw;
}
@@ -227,16 +366,27 @@ HttpStructuredCacheService::HandleCachePayloadRequest(zen::HttpServerRequest& Re
case kHead:
case kGet:
{
- // TODO: need to map from uncompressed content address into the storage
- // (compressed) content address
-
zen::IoBuffer Payload = m_CidStore.FindChunkByCid(Ref.PayloadId);
if (!Payload)
{
+ m_Log.debug("MISS - '{}/{}/{}' ({} bytes, {})",
+ Ref.BucketSegment,
+ Ref.HashKey,
+ Ref.PayloadId,
+ Payload.Size(),
+ Payload.GetContentType());
+
return Request.WriteResponse(zen::HttpResponse::NotFound);
}
+ m_Log.debug("HIT - '{}/{}/{}' ({} bytes, {})",
+ Ref.BucketSegment,
+ Ref.HashKey,
+ Ref.PayloadId,
+ Payload.Size(),
+ Payload.GetContentType());
+
if (Verb == kHead)
{
Request.SetSuppressResponseBody();
diff --git a/zenserver/cache/structuredcache.h b/zenserver/cache/structuredcache.h
index 95ae6a76c..f42b5bfb7 100644
--- a/zenserver/cache/structuredcache.h
+++ b/zenserver/cache/structuredcache.h
@@ -8,10 +8,13 @@
#include "structuredcachestore.h"
#include "upstream/jupiter.h"
+#include <spdlog/spdlog.h>
+
namespace zen {
class CloudCacheClient;
class CidStore;
+class ZenStructuredCacheClient;
/**
* Structured cache service. Imposes constraints on keys, supports blobs and
@@ -51,6 +54,8 @@ public:
virtual void HandleRequest(zen::HttpServerRequest& Request) override;
+ void Flush();
+
private:
struct CacheRef
{
@@ -63,10 +68,12 @@ private:
void HandleCacheRecordRequest(zen::HttpServerRequest& Request, CacheRef& Ref);
void HandleCachePayloadRequest(zen::HttpServerRequest& Request, CacheRef& Ref);
- zen::CasStore& m_CasStore;
- zen::CidStore& m_CidStore;
- ZenCacheStore m_CacheStore;
- RefPtr<CloudCacheClient> m_Cloud;
+ spdlog::logger m_Log;
+ zen::CasStore& m_CasStore;
+ zen::CidStore& m_CidStore;
+ ZenCacheStore m_CacheStore;
+ RefPtr<CloudCacheClient> m_Cloud;
+ RefPtr<ZenStructuredCacheClient> m_ZenClient;
};
} // namespace zen
diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp
index a7904a1ab..9ed3cf53e 100644
--- a/zenserver/cache/structuredcachestore.cpp
+++ b/zenserver/cache/structuredcachestore.cpp
@@ -81,6 +81,12 @@ ZenCacheStore::Put(std::string_view InBucket, const zen::IoHash& HashKey, const
}
}
+void
+ZenCacheStore::Flush()
+{
+ m_DiskLayer.Flush();
+}
+
//////////////////////////////////////////////////////////////////////////
ZenCacheMemoryLayer::ZenCacheMemoryLayer()
diff --git a/zenserver/cache/structuredcachestore.h b/zenserver/cache/structuredcachestore.h
index 81425a29e..781a6e636 100644
--- a/zenserver/cache/structuredcachestore.h
+++ b/zenserver/cache/structuredcachestore.h
@@ -102,6 +102,7 @@ public:
bool Get(std::string_view Bucket, const zen::IoHash& HashKey, ZenCacheValue& OutValue);
void Put(std::string_view Bucket, const zen::IoHash& HashKey, const ZenCacheValue& Value);
+ void Flush();
private:
std::filesystem::path m_RootDir;
diff --git a/zenserver/upstream/zen.cpp b/zenserver/upstream/zen.cpp
index 7904f9b28..715df6f69 100644
--- a/zenserver/upstream/zen.cpp
+++ b/zenserver/upstream/zen.cpp
@@ -7,12 +7,14 @@
#include <zencore/fmtutils.h>
#include <zencore/stream.h>
+#include "cache/structuredcachestore.h"
+
+#include <cpr/cpr.h>
#include <spdlog/spdlog.h>
#include <xxhash.h>
#include <gsl/gsl-lite.hpp>
namespace zen {
-
namespace detail {
struct MessageHeader
{
@@ -288,4 +290,97 @@ Mesh::IssueReceive()
});
}
+//////////////////////////////////////////////////////////////////////////
+
+namespace detail {
+ struct ZenCacheSessionState
+ {
+ ZenCacheSessionState(ZenStructuredCacheClient& Client) : OwnerClient(Client) {}
+ ~ZenCacheSessionState() {}
+
+ void Reset() {}
+
+ ZenStructuredCacheClient& OwnerClient;
+ cpr::Session Session;
+ };
+} // namespace detail
+
+//////////////////////////////////////////////////////////////////////////
+
+ZenStructuredCacheClient::ZenStructuredCacheClient(std::string_view ServiceUrl) : m_ServiceUrl(ServiceUrl)
+{
+}
+
+ZenStructuredCacheClient::~ZenStructuredCacheClient()
+{
+}
+
+detail::ZenCacheSessionState*
+ZenStructuredCacheClient::AllocSessionState()
+{
+ detail::ZenCacheSessionState* State = nullptr;
+
+ if (RwLock::ExclusiveLockScope _(m_SessionStateLock); !m_SessionStateCache.empty())
+ {
+ State = m_SessionStateCache.front();
+ m_SessionStateCache.pop_front();
+ }
+
+ if (State == nullptr)
+ {
+ State = new detail::ZenCacheSessionState(*this);
+ }
+
+ State->Reset();
+
+ return State;
+}
+
+void
+ZenStructuredCacheClient::FreeSessionState(detail::ZenCacheSessionState* State)
+{
+ RwLock::ExclusiveLockScope _(m_SessionStateLock);
+ m_SessionStateCache.push_front(State);
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+ZenStructuredCacheSession::ZenStructuredCacheSession(ZenStructuredCacheClient& OuterClient) : m_Client(OuterClient)
+{
+}
+
+ZenStructuredCacheSession::~ZenStructuredCacheSession()
+{
+}
+
+IoBuffer
+ZenStructuredCacheSession::Get(std::string_view BucketId, std::string_view Key)
+{
+ ZEN_UNUSED(BucketId, Key);
+
+ return {};
+}
+
+void
+ZenStructuredCacheSession::Put(std::string_view BucketId, std::string_view Key, IoBuffer Data)
+{
+ ZEN_UNUSED(BucketId, Key, Data);
+}
+
+// Structured cache operations
+
+IoBuffer
+ZenStructuredCacheSession::Get(std::string_view BucketId, const IoHash& Key)
+{
+ ZEN_UNUSED(BucketId, Key);
+
+ return {};
+}
+
+void
+ZenStructuredCacheSession::Put(std::string_view BucketId, const IoHash& Key, ZenCacheValue Data)
+{
+ ZEN_UNUSED(BucketId, Key, Data);
+}
+
} // namespace zen
diff --git a/zenserver/upstream/zen.h b/zenserver/upstream/zen.h
index 75e29bf86..9bdcdda23 100644
--- a/zenserver/upstream/zen.h
+++ b/zenserver/upstream/zen.h
@@ -2,6 +2,8 @@
#pragma once
+#include <zencore/iobuffer.h>
+#include <zencore/iohash.h>
#include <zencore/memory.h>
#include <zencore/thread.h>
#include <zencore/uid.h>
@@ -16,9 +18,12 @@
#include <chrono>
+struct ZenCacheValue;
+
namespace zen {
class CbObjectWriter;
+class ZenStructuredCacheClient;
/** Zen mesh tracker
*
@@ -72,13 +77,56 @@ private:
tsl::robin_map<Oid, PeerInfo, Oid::Hasher> m_KnownPeers;
};
-class ZenKvCacheClient
+namespace detail {
+ struct ZenCacheSessionState;
+}
+
+/** Zen Structured Cache session
+ *
+ * This provides a context in which cache queries can be performed
+ *
+ * These are currently all synchronous. Will need to be made asynchronous
+ */
+class ZenStructuredCacheSession
+{
+public:
+ ZenStructuredCacheSession(ZenStructuredCacheClient& OuterClient);
+ ~ZenStructuredCacheSession();
+
+ // Key-value cache operations
+ IoBuffer Get(std::string_view BucketId, std::string_view Key);
+ void Put(std::string_view BucketId, std::string_view Key, IoBuffer Data);
+
+ // Structured cache operations
+ IoBuffer Get(std::string_view BucketId, const IoHash& Key);
+ void Put(std::string_view BucketId, const IoHash& Key, ZenCacheValue Data);
+
+private:
+ ZenStructuredCacheClient& m_Client;
+ detail::ZenCacheSessionState* m_SessionState;
+};
+
+/** Zen Structured Cache client
+ *
+ * This represents an endpoint to query -- actual queries should be done via
+ * ZenStructuredCacheSession
+ */
+class ZenStructuredCacheClient : public RefCounted
{
public:
- ZenKvCacheClient();
- ~ZenKvCacheClient();
+ ZenStructuredCacheClient(std::string_view ServiceUrl);
+ ~ZenStructuredCacheClient();
private:
+ std::string m_ServiceUrl;
+
+ RwLock m_SessionStateLock;
+ std::list<detail::ZenCacheSessionState*> m_SessionStateCache;
+
+ detail::ZenCacheSessionState* AllocSessionState();
+ void FreeSessionState(detail::ZenCacheSessionState*);
+
+ friend class ZenStructuredCacheSession;
};
} // namespace zen