diff options
| author | Stefan Boberg <[email protected]> | 2021-08-12 10:05:20 +0200 |
|---|---|---|
| committer | Stefan Boberg <[email protected]> | 2021-08-12 10:05:20 +0200 |
| commit | e664bbc31ff4ea866401c9303c53108242a6cb27 (patch) | |
| tree | 71dccfc377f58042a5776db85738e3cf2e495358 | |
| parent | Implemented Flush() operation for CID/CAS store interfaces (diff) | |
| download | zen-e664bbc31ff4ea866401c9303c53108242a6cb27.tar.xz zen-e664bbc31ff4ea866401c9303c53108242a6cb27.zip | |
Implemented flush operations for cache services
Also implemented basic upstream query interface, which needs a bit more work to be fully functional (chunk propagation / fetching and new propagation policies as per DDC requirements)
| -rw-r--r-- | zenserver/cache/kvcache.cpp | 5 | ||||
| -rw-r--r-- | zenserver/cache/kvcache.h | 1 | ||||
| -rw-r--r-- | zenserver/cache/structuredcache.cpp | 184 | ||||
| -rw-r--r-- | zenserver/cache/structuredcache.h | 15 | ||||
| -rw-r--r-- | zenserver/cache/structuredcachestore.cpp | 6 | ||||
| -rw-r--r-- | zenserver/cache/structuredcachestore.h | 1 | ||||
| -rw-r--r-- | zenserver/upstream/zen.cpp | 97 | ||||
| -rw-r--r-- | zenserver/upstream/zen.h | 54 |
8 files changed, 338 insertions, 25 deletions
diff --git a/zenserver/cache/kvcache.cpp b/zenserver/cache/kvcache.cpp index 4c3b1e33d..2753952ae 100644 --- a/zenserver/cache/kvcache.cpp +++ b/zenserver/cache/kvcache.cpp @@ -205,4 +205,9 @@ HttpKvCacheService::HandleRequest(zen::HttpServerRequest& Request) } } +void +HttpKvCacheService::Flush() +{ +} + } // namespace zen diff --git a/zenserver/cache/kvcache.h b/zenserver/cache/kvcache.h index e601582a4..ce4fb1d36 100644 --- a/zenserver/cache/kvcache.h +++ b/zenserver/cache/kvcache.h @@ -22,6 +22,7 @@ public: virtual const char* BaseUri() const override; virtual void HandleRequest(zen::HttpServerRequest& Request) override; + void Flush(); private: MemoryCacheStore m_cache; diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index 9083f764e..7f6f49e06 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -12,6 +12,7 @@ #include "structuredcache.h" #include "structuredcachestore.h" #include "upstream/jupiter.h" +#include "upstream/zen.h" #include "zenstore/cidstore.h" #include <spdlog/spdlog.h> @@ -22,12 +23,36 @@ namespace zen { using namespace std::literals; +zen::HttpContentType +MapToHttpContentType(zen::ZenContentType Type) +{ + switch (Type) + { + default: + case zen::ZenContentType::kBinary: + return zen::HttpContentType::kBinary; + case zen::ZenContentType::kCbObject: + return zen::HttpContentType::kCbObject; + case zen::ZenContentType::kCbPackage: + return zen::HttpContentType::kCbPackage; + case zen::ZenContentType::kText: + return zen::HttpContentType::kText; + case zen::ZenContentType::kJSON: + return zen::HttpContentType::kJSON; + case zen::ZenContentType::kYAML: + return zen::HttpContentType::kYAML; + } +}; + HttpStructuredCacheService::HttpStructuredCacheService(std::filesystem::path RootPath, zen::CasStore& InStore, zen::CidStore& InCidStore) -: m_CasStore(InStore) +: m_Log("cache", begin(spdlog::default_logger()->sinks()), end(spdlog::default_logger()->sinks())) +, m_CasStore(InStore) , m_CacheStore(InStore, RootPath) , m_CidStore(InCidStore) { - spdlog::info("initializing structured cache at '{}'", RootPath); + m_Log.set_level(spdlog::level::debug); + + m_Log.info("initializing structured cache at '{}'", RootPath); #if 0 m_Cloud = new CloudCacheClient("https://jupiter.devtools.epicgames.com"sv, @@ -36,6 +61,14 @@ HttpStructuredCacheService::HttpStructuredCacheService(std::filesystem::path Roo "0oao91lrhqPiAlaGD0x7"sv /* client id */, "-GBWjjenhCgOwhxL5yBKNJECVIoDPH0MK4RDuN7d"sv /* oauth secret */); #endif + +#if 0 + std::string_view UpstreamSpec = "http://arn-wd-15192.epicgames.net"sv; + + m_Log.info("Using upstream Zen cache at '{}'", UpstreamSpec); + + m_ZenClient = new ZenStructuredCacheClient(UpstreamSpec); +#endif } HttpStructuredCacheService::~HttpStructuredCacheService() @@ -50,6 +83,11 @@ HttpStructuredCacheService::BaseUri() const } void +HttpStructuredCacheService::Flush() +{ +} + +void HttpStructuredCacheService::HandleRequest(zen::HttpServerRequest& Request) { CacheRef Ref; @@ -84,8 +122,73 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req ZenCacheValue Value; bool Success = m_CacheStore.Get(Ref.BucketSegment, Ref.HashKey, /* out */ Value); + if (!Success && m_ZenClient) + { + ZenStructuredCacheSession Session(*m_ZenClient); + + zen::Stopwatch Timer; + + try + { + Value.Value = Session.Get(Ref.BucketSegment, Ref.HashKey); + + m_Log.debug("Zen upstream GET ({}/{}) succeeded after {:5}!", + Ref.BucketSegment, + Ref.HashKey, + zen::NiceTimeSpanMs(Timer.getElapsedTimeMs())); + + // TODO: this is incomplete and needs to propagate any referenced content + + Success = true; + } + catch (std::exception& e) + { + m_Log.warn("Zen upstream GET ({}/{}) FAILED after {:5}: '{}'", + Ref.BucketSegment, + Ref.HashKey, + zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()), + e.what()); + + throw; + } + } + + if (m_Cloud) + { + // Note that this is not fully functional, pending implementation work on + // the Jupiter end + + CloudCacheSession Session(m_Cloud); + + zen::Stopwatch Timer; + + try + { + Value.Value = Session.Get(Ref.BucketSegment, Ref.HashKey); + + m_Log.debug("Cloud upstream GET ({}/{}) succeeded after {:5}!", + Ref.BucketSegment, + Ref.HashKey, + zen::NiceTimeSpanMs(Timer.getElapsedTimeMs())); + + // TODO: this is incomplete and needs to propagate any referenced content + } + catch (std::exception& e) + { + m_Log.warn("Cloud upstream GET ({}/{}) FAILED after {:5}: '{}'", + Ref.BucketSegment, + Ref.HashKey, + zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()), + e.what()); + + throw; + } + } + if (!Success) { + m_Log.debug("MISS - '{}/{}'", Ref.BucketSegment, Ref.HashKey); + return Request.WriteResponse(zen::HttpResponse::NotFound); } @@ -94,7 +197,13 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req Request.SetSuppressResponseBody(); } - return Request.WriteResponse(zen::HttpResponse::OK, zen::HttpContentType::kBinary, Value.Value); + m_Log.debug("HIT - '{}/{}' ({} bytes, {})", + Ref.BucketSegment, + Ref.HashKey, + Value.Value.Size(), + Value.Value.GetContentType()); + + return Request.WriteResponse(zen::HttpResponse::OK, MapToHttpContentType(Value.Value.GetContentType()), Value.Value); } break; @@ -139,6 +248,8 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req if (ValidationResult != CbValidateError::None) { + m_Log.warn("Payload for key '{}/{}' ({} bytes) failed validation", Ref.BucketSegment, Ref.HashKey, Body.Size()); + // TODO: add details in response return Request.WriteResponse(HttpResponse::BadRequest); } @@ -160,15 +271,42 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req } Idx.EndArray(); - } - // TODO: store references in index + // TODO: store references in index + } } m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, Value); - // This is currently synchronous for simplicity and debuggability but should be - // made asynchronous + // This is currently synchronous for simplicity and debuggability but should + // absolutely be made asynchronous. By default these should be deferred + // because the client should not care if the data has propagated upstream or + // not + + if (m_ZenClient) + { + ZenStructuredCacheSession Session(*m_ZenClient); + + zen::Stopwatch Timer; + + try + { + Session.Put(Ref.BucketSegment, Ref.HashKey, Value); + + m_Log.debug("Zen upstream PUT ({}) succeeded after {:5}!", + Ref.HashKey, + zen::NiceTimeSpanMs(Timer.getElapsedTimeMs())); + } + catch (std::exception& e) + { + m_Log.warn("Zen upstream PUT ({}) FAILED after {:5}: '{}'", + Ref.HashKey, + zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()), + e.what()); + + throw; + } + } if (m_Cloud) { @@ -179,16 +317,17 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req try { Session.Put(Ref.BucketSegment, Ref.HashKey, Value); - spdlog::debug("upstream PUT ({}) succeeded after {:5}!", - Ref.HashKey, - zen::NiceTimeSpanMs(Timer.getElapsedTimeMs())); + + m_Log.debug("upstream PUT ({}) succeeded after {:5}!", + Ref.HashKey, + zen::NiceTimeSpanMs(Timer.getElapsedTimeMs())); } catch (std::exception& e) { - spdlog::debug("upstream PUT ({}) failed after {:5}: '{}'", - Ref.HashKey, - zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()), - e.what()); + m_Log.warn("upstream PUT ({}) FAILED after {:5}: '{}'", + Ref.HashKey, + zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()), + e.what()); throw; } @@ -227,16 +366,27 @@ HttpStructuredCacheService::HandleCachePayloadRequest(zen::HttpServerRequest& Re case kHead: case kGet: { - // TODO: need to map from uncompressed content address into the storage - // (compressed) content address - zen::IoBuffer Payload = m_CidStore.FindChunkByCid(Ref.PayloadId); if (!Payload) { + m_Log.debug("MISS - '{}/{}/{}' ({} bytes, {})", + Ref.BucketSegment, + Ref.HashKey, + Ref.PayloadId, + Payload.Size(), + Payload.GetContentType()); + return Request.WriteResponse(zen::HttpResponse::NotFound); } + m_Log.debug("HIT - '{}/{}/{}' ({} bytes, {})", + Ref.BucketSegment, + Ref.HashKey, + Ref.PayloadId, + Payload.Size(), + Payload.GetContentType()); + if (Verb == kHead) { Request.SetSuppressResponseBody(); diff --git a/zenserver/cache/structuredcache.h b/zenserver/cache/structuredcache.h index 95ae6a76c..f42b5bfb7 100644 --- a/zenserver/cache/structuredcache.h +++ b/zenserver/cache/structuredcache.h @@ -8,10 +8,13 @@ #include "structuredcachestore.h" #include "upstream/jupiter.h" +#include <spdlog/spdlog.h> + namespace zen { class CloudCacheClient; class CidStore; +class ZenStructuredCacheClient; /** * Structured cache service. Imposes constraints on keys, supports blobs and @@ -51,6 +54,8 @@ public: virtual void HandleRequest(zen::HttpServerRequest& Request) override; + void Flush(); + private: struct CacheRef { @@ -63,10 +68,12 @@ private: void HandleCacheRecordRequest(zen::HttpServerRequest& Request, CacheRef& Ref); void HandleCachePayloadRequest(zen::HttpServerRequest& Request, CacheRef& Ref); - zen::CasStore& m_CasStore; - zen::CidStore& m_CidStore; - ZenCacheStore m_CacheStore; - RefPtr<CloudCacheClient> m_Cloud; + spdlog::logger m_Log; + zen::CasStore& m_CasStore; + zen::CidStore& m_CidStore; + ZenCacheStore m_CacheStore; + RefPtr<CloudCacheClient> m_Cloud; + RefPtr<ZenStructuredCacheClient> m_ZenClient; }; } // namespace zen diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp index a7904a1ab..9ed3cf53e 100644 --- a/zenserver/cache/structuredcachestore.cpp +++ b/zenserver/cache/structuredcachestore.cpp @@ -81,6 +81,12 @@ ZenCacheStore::Put(std::string_view InBucket, const zen::IoHash& HashKey, const } } +void +ZenCacheStore::Flush() +{ + m_DiskLayer.Flush(); +} + ////////////////////////////////////////////////////////////////////////// ZenCacheMemoryLayer::ZenCacheMemoryLayer() diff --git a/zenserver/cache/structuredcachestore.h b/zenserver/cache/structuredcachestore.h index 81425a29e..781a6e636 100644 --- a/zenserver/cache/structuredcachestore.h +++ b/zenserver/cache/structuredcachestore.h @@ -102,6 +102,7 @@ public: bool Get(std::string_view Bucket, const zen::IoHash& HashKey, ZenCacheValue& OutValue); void Put(std::string_view Bucket, const zen::IoHash& HashKey, const ZenCacheValue& Value); + void Flush(); private: std::filesystem::path m_RootDir; diff --git a/zenserver/upstream/zen.cpp b/zenserver/upstream/zen.cpp index 7904f9b28..715df6f69 100644 --- a/zenserver/upstream/zen.cpp +++ b/zenserver/upstream/zen.cpp @@ -7,12 +7,14 @@ #include <zencore/fmtutils.h> #include <zencore/stream.h> +#include "cache/structuredcachestore.h" + +#include <cpr/cpr.h> #include <spdlog/spdlog.h> #include <xxhash.h> #include <gsl/gsl-lite.hpp> namespace zen { - namespace detail { struct MessageHeader { @@ -288,4 +290,97 @@ Mesh::IssueReceive() }); } +////////////////////////////////////////////////////////////////////////// + +namespace detail { + struct ZenCacheSessionState + { + ZenCacheSessionState(ZenStructuredCacheClient& Client) : OwnerClient(Client) {} + ~ZenCacheSessionState() {} + + void Reset() {} + + ZenStructuredCacheClient& OwnerClient; + cpr::Session Session; + }; +} // namespace detail + +////////////////////////////////////////////////////////////////////////// + +ZenStructuredCacheClient::ZenStructuredCacheClient(std::string_view ServiceUrl) : m_ServiceUrl(ServiceUrl) +{ +} + +ZenStructuredCacheClient::~ZenStructuredCacheClient() +{ +} + +detail::ZenCacheSessionState* +ZenStructuredCacheClient::AllocSessionState() +{ + detail::ZenCacheSessionState* State = nullptr; + + if (RwLock::ExclusiveLockScope _(m_SessionStateLock); !m_SessionStateCache.empty()) + { + State = m_SessionStateCache.front(); + m_SessionStateCache.pop_front(); + } + + if (State == nullptr) + { + State = new detail::ZenCacheSessionState(*this); + } + + State->Reset(); + + return State; +} + +void +ZenStructuredCacheClient::FreeSessionState(detail::ZenCacheSessionState* State) +{ + RwLock::ExclusiveLockScope _(m_SessionStateLock); + m_SessionStateCache.push_front(State); +} + +////////////////////////////////////////////////////////////////////////// + +ZenStructuredCacheSession::ZenStructuredCacheSession(ZenStructuredCacheClient& OuterClient) : m_Client(OuterClient) +{ +} + +ZenStructuredCacheSession::~ZenStructuredCacheSession() +{ +} + +IoBuffer +ZenStructuredCacheSession::Get(std::string_view BucketId, std::string_view Key) +{ + ZEN_UNUSED(BucketId, Key); + + return {}; +} + +void +ZenStructuredCacheSession::Put(std::string_view BucketId, std::string_view Key, IoBuffer Data) +{ + ZEN_UNUSED(BucketId, Key, Data); +} + +// Structured cache operations + +IoBuffer +ZenStructuredCacheSession::Get(std::string_view BucketId, const IoHash& Key) +{ + ZEN_UNUSED(BucketId, Key); + + return {}; +} + +void +ZenStructuredCacheSession::Put(std::string_view BucketId, const IoHash& Key, ZenCacheValue Data) +{ + ZEN_UNUSED(BucketId, Key, Data); +} + } // namespace zen diff --git a/zenserver/upstream/zen.h b/zenserver/upstream/zen.h index 75e29bf86..9bdcdda23 100644 --- a/zenserver/upstream/zen.h +++ b/zenserver/upstream/zen.h @@ -2,6 +2,8 @@ #pragma once +#include <zencore/iobuffer.h> +#include <zencore/iohash.h> #include <zencore/memory.h> #include <zencore/thread.h> #include <zencore/uid.h> @@ -16,9 +18,12 @@ #include <chrono> +struct ZenCacheValue; + namespace zen { class CbObjectWriter; +class ZenStructuredCacheClient; /** Zen mesh tracker * @@ -72,13 +77,56 @@ private: tsl::robin_map<Oid, PeerInfo, Oid::Hasher> m_KnownPeers; }; -class ZenKvCacheClient +namespace detail { + struct ZenCacheSessionState; +} + +/** Zen Structured Cache session + * + * This provides a context in which cache queries can be performed + * + * These are currently all synchronous. Will need to be made asynchronous + */ +class ZenStructuredCacheSession +{ +public: + ZenStructuredCacheSession(ZenStructuredCacheClient& OuterClient); + ~ZenStructuredCacheSession(); + + // Key-value cache operations + IoBuffer Get(std::string_view BucketId, std::string_view Key); + void Put(std::string_view BucketId, std::string_view Key, IoBuffer Data); + + // Structured cache operations + IoBuffer Get(std::string_view BucketId, const IoHash& Key); + void Put(std::string_view BucketId, const IoHash& Key, ZenCacheValue Data); + +private: + ZenStructuredCacheClient& m_Client; + detail::ZenCacheSessionState* m_SessionState; +}; + +/** Zen Structured Cache client + * + * This represents an endpoint to query -- actual queries should be done via + * ZenStructuredCacheSession + */ +class ZenStructuredCacheClient : public RefCounted { public: - ZenKvCacheClient(); - ~ZenKvCacheClient(); + ZenStructuredCacheClient(std::string_view ServiceUrl); + ~ZenStructuredCacheClient(); private: + std::string m_ServiceUrl; + + RwLock m_SessionStateLock; + std::list<detail::ZenCacheSessionState*> m_SessionStateCache; + + detail::ZenCacheSessionState* AllocSessionState(); + void FreeSessionState(detail::ZenCacheSessionState*); + + friend class ZenStructuredCacheSession; }; } // namespace zen |