diff options
| author | Martin Ridgers <[email protected]> | 2021-11-17 08:30:40 +0100 |
|---|---|---|
| committer | Martin Ridgers <[email protected]> | 2021-11-17 08:30:40 +0100 |
| commit | 5e6b243d7fee946cc8126e54ffaee050fce7433d (patch) | |
| tree | 4a953b941e794c9fe978987696a1acf50fed5858 | |
| parent | Hooked up zenserver:main() to ZenEntryPoint::Run() on POSIX (diff) | |
| parent | Format fix. (diff) | |
| download | zen-5e6b243d7fee946cc8126e54ffaee050fce7433d.tar.xz zen-5e6b243d7fee946cc8126e54ffaee050fce7433d.zip | |
Merged main
| -rw-r--r-- | zenserver/config.cpp | 16 | ||||
| -rw-r--r-- | zenserver/config.h | 8 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamapply.cpp | 12 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamcache.cpp | 21 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamcache.h | 3 | ||||
| -rw-r--r-- | zenserver/upstream/zen.cpp | 34 | ||||
| -rw-r--r-- | zenserver/upstream/zen.h | 18 | ||||
| -rw-r--r-- | zenserver/zenserver.cpp | 5 |
8 files changed, 87 insertions, 30 deletions
diff --git a/zenserver/config.cpp b/zenserver/config.cpp index 94226ef26..7afac3406 100644 --- a/zenserver/config.cpp +++ b/zenserver/config.cpp @@ -243,7 +243,7 @@ ParseGlobalCliOptions(int argc, char* argv[], ZenServerOptions& GlobalOptions, Z "", "upstream-thread-count", "Number of threads used for upstream procsssing", - cxxopts::value<int>(ServiceConfig.UpstreamCacheConfig.UpstreamThreadCount)->default_value("4"), + cxxopts::value<int32_t>(ServiceConfig.UpstreamCacheConfig.UpstreamThreadCount)->default_value("4"), ""); options.add_option("cache", @@ -253,6 +253,20 @@ ParseGlobalCliOptions(int argc, char* argv[], ZenServerOptions& GlobalOptions, Z cxxopts::value<bool>(ServiceConfig.UpstreamCacheConfig.StatsEnabled)->default_value("false"), ""); + options.add_option("cache", + "", + "upstream-connect-timeout-ms", + "Connect timeout in millisecond(s). Default 5000 ms.", + cxxopts::value<int32_t>(ServiceConfig.UpstreamCacheConfig.ConnectTimeoutMilliseconds)->default_value("5000"), + ""); + + options.add_option("cache", + "", + "upstream-timeout-ms", + "Timeout in millisecond(s). Default 0 ms", + cxxopts::value<int32_t>(ServiceConfig.UpstreamCacheConfig.TimeoutMilliseconds)->default_value("0"), + ""); + try { auto result = options.parse(argc, argv); diff --git a/zenserver/config.h b/zenserver/config.h index 55e846352..e7b228b91 100644 --- a/zenserver/config.h +++ b/zenserver/config.h @@ -58,9 +58,11 @@ struct ZenUpstreamCacheConfig { ZenUpstreamJupiterConfig JupiterConfig; ZenUpstreamZenConfig ZenConfig; - int UpstreamThreadCount = 4; - UpstreamCachePolicy CachePolicy = UpstreamCachePolicy::ReadWrite; - bool StatsEnabled = false; + int32_t UpstreamThreadCount = 4; + int32_t ConnectTimeoutMilliseconds = 5000; + int32_t TimeoutMilliseconds = 0; + UpstreamCachePolicy CachePolicy = UpstreamCachePolicy::ReadWrite; + bool StatsEnabled = false; }; struct ZenServiceConfig diff --git a/zenserver/upstream/upstreamapply.cpp b/zenserver/upstream/upstreamapply.cpp index f32b08959..a0c6a91cf 100644 --- a/zenserver/upstream/upstreamapply.cpp +++ b/zenserver/upstream/upstreamapply.cpp @@ -800,13 +800,13 @@ namespace detail { { using namespace fmt::literals; - std::string_view HostPlatform = ApplyRecord.WorkerDescriptor["host"sv].AsString(); + std::string_view HostPlatform = ApplyRecord.WorkerDescriptor["host"sv].AsString(); // TODO: Enable when Horde accepts the UE style Host Platforms (Win64, Linux, Mac) - //CbObject Requirements = BuildRequirements("OSFamily == '{}'"_format(HostPlatform), {}, false); - CbObject Requirements = BuildRequirements("OSFamily == 'Windows'", {}, false); - const IoHash RequirementsId = Requirements.GetHash(); - Data.Objects[RequirementsId] = std::move(Requirements); - Data.RequirementsId = RequirementsId; + // CbObject Requirements = BuildRequirements("OSFamily == '{}'"_format(HostPlatform), {}, false); + CbObject Requirements = BuildRequirements("OSFamily == 'Windows'", {}, false); + const IoHash RequirementsId = Requirements.GetHash(); + Data.Objects[RequirementsId] = std::move(Requirements); + Data.RequirementsId = RequirementsId; } CbObject Task = BuildTask(ExecutablePath, diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index 1655f2b68..dec61ad65 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -442,9 +442,13 @@ namespace detail { }; public: - ZenUpstreamEndpoint(std::span<std::string const> Urls) : m_Log(zen::logging::Get("upstream")), m_DisplayName("ZEN") + ZenUpstreamEndpoint(const ZenClientOptions& Options) + : m_Log(zen::logging::Get("upstream")) + , m_DisplayName("ZEN") + , m_ConnectTimeout(Options.ConnectTimeout) + , m_Timeout(Options.Timeout) { - for (const auto& Url : Urls) + for (const auto& Url : Options.Urls) { m_Endpoints.push_back({.Url = Url}); } @@ -461,7 +465,7 @@ namespace detail { { m_ServiceUrl = Ep.Url; m_DisplayName = "ZEN - {}"_format(m_ServiceUrl); - m_Client = new ZenStructuredCacheClient(m_ServiceUrl); + m_Client = new ZenStructuredCacheClient({.Url = m_ServiceUrl, .ConnectTimeout = m_ConnectTimeout, .Timeout = m_Timeout}); m_HealthOk = true; return {.Ok = true}; @@ -486,7 +490,8 @@ namespace detail { { m_ServiceUrl = Ep.Url; m_DisplayName = "ZEN - {}"_format(m_ServiceUrl); - m_Client = new ZenStructuredCacheClient(m_ServiceUrl); + m_Client = + new ZenStructuredCacheClient({.Url = m_ServiceUrl, .ConnectTimeout = m_ConnectTimeout, .Timeout = m_Timeout}); m_HealthOk = true; return {.Ok = true}; @@ -825,7 +830,7 @@ namespace detail { { for (ZenEndpoint& Ep : m_Endpoints) { - ZenStructuredCacheClient Client(Ep.Url); + ZenStructuredCacheClient Client({.Url = Ep.Url, .ConnectTimeout = std::chrono::milliseconds(1000)}); ZenStructuredCacheSession Session(Client); const int32_t SampleCount = 2; @@ -858,6 +863,8 @@ namespace detail { std::string m_ServiceUrl; std::vector<ZenEndpoint> m_Endpoints; std::string m_DisplayName; + std::chrono::milliseconds m_ConnectTimeout; + std::chrono::milliseconds m_Timeout; RefPtr<ZenStructuredCacheClient> m_Client; UpstreamEndpointStats m_Stats; std::atomic_bool m_HealthOk{false}; @@ -1347,9 +1354,9 @@ MakeJupiterUpstreamEndpoint(const CloudCacheClientOptions& Options) } std::unique_ptr<UpstreamEndpoint> -MakeZenUpstreamEndpoint(std::span<std::string const> Urls) +MakeZenUpstreamEndpoint(const ZenClientOptions& Options) { - return std::make_unique<detail::ZenUpstreamEndpoint>(Urls); + return std::make_unique<detail::ZenUpstreamEndpoint>(Options); } } // namespace zen diff --git a/zenserver/upstream/upstreamcache.h b/zenserver/upstream/upstreamcache.h index 4b0c17181..82f1c9fd8 100644 --- a/zenserver/upstream/upstreamcache.h +++ b/zenserver/upstream/upstreamcache.h @@ -20,6 +20,7 @@ class CbObjectWriter; class CidStore; class ZenCacheStore; struct CloudCacheClientOptions; +struct ZenClientOptions; struct UpstreamCacheRecord { @@ -174,6 +175,6 @@ std::unique_ptr<UpstreamCache> MakeUpstreamCache(const UpstreamCacheOptions& Opt std::unique_ptr<UpstreamEndpoint> MakeJupiterUpstreamEndpoint(const CloudCacheClientOptions& Options); -std::unique_ptr<UpstreamEndpoint> MakeZenUpstreamEndpoint(std::span<std::string const> Urls); +std::unique_ptr<UpstreamEndpoint> MakeZenUpstreamEndpoint(const ZenClientOptions& Options); } // namespace zen diff --git a/zenserver/upstream/zen.cpp b/zenserver/upstream/zen.cpp index 9ba767098..3e5a42c22 100644 --- a/zenserver/upstream/zen.cpp +++ b/zenserver/upstream/zen.cpp @@ -313,6 +313,15 @@ namespace detail { void Reset() {} + cpr::Session& GetSession() + { + OwnerClient.InitializeSessionState(*this); + return Session; + } + + private: + friend class ZenStructuredCacheClient; + ZenStructuredCacheClient& OwnerClient; cpr::Session Session; }; @@ -321,9 +330,11 @@ namespace detail { ////////////////////////////////////////////////////////////////////////// -ZenStructuredCacheClient::ZenStructuredCacheClient(std::string_view ServiceUrl) +ZenStructuredCacheClient::ZenStructuredCacheClient(const ZenClientOptions& Options) : m_Log(logging::Get(std::string_view("zenclient"))) -, m_ServiceUrl(ServiceUrl) +, m_ServiceUrl(Options.Url) +, m_ConnectTimeout(Options.ConnectTimeout) +, m_Timeout(Options.Timeout) { } @@ -359,6 +370,13 @@ ZenStructuredCacheClient::FreeSessionState(detail::ZenCacheSessionState* State) m_SessionStateCache.push_front(State); } +void +ZenStructuredCacheClient::InitializeSessionState(detail::ZenCacheSessionState& State) +{ + State.Session.SetConnectTimeout(m_ConnectTimeout); + State.Session.SetTimeout(m_Timeout); +} + ////////////////////////////////////////////////////////////////////////// using namespace std::literals; @@ -381,7 +399,7 @@ ZenStructuredCacheSession::CheckHealth() ExtendableStringBuilder<256> Uri; Uri << m_Client.ServiceUrl() << "/health/check"; - cpr::Session& Session = m_SessionState->Session; + cpr::Session& Session = m_SessionState->GetSession(); Session.SetOption(cpr::Url{Uri.c_str()}); cpr::Response Response = Session.Get(); @@ -399,7 +417,7 @@ ZenStructuredCacheSession::GetCacheRecord(std::string_view BucketId, const IoHas ExtendableStringBuilder<256> Uri; Uri << m_Client.ServiceUrl() << "/z$/" << BucketId << "/" << Key.ToHexString(); - cpr::Session& Session = m_SessionState->Session; + cpr::Session& Session = m_SessionState->GetSession(); Session.SetOption(cpr::Url{Uri.c_str()}); Session.SetHeader(cpr::Header{{"Accept", @@ -427,7 +445,7 @@ ZenStructuredCacheSession::GetCachePayload(std::string_view BucketId, const IoHa ExtendableStringBuilder<256> Uri; Uri << m_Client.ServiceUrl() << "/z$/" << BucketId << "/" << Key.ToHexString() << "/" << PayloadId.ToHexString(); - cpr::Session& Session = m_SessionState->Session; + cpr::Session& Session = m_SessionState->GetSession(); Session.SetOption(cpr::Url{Uri.c_str()}); Session.SetHeader(cpr::Header{{"Accept", "application/x-ue-comp"}}); @@ -452,7 +470,7 @@ ZenStructuredCacheSession::PutCacheRecord(std::string_view BucketId, const IoHas ExtendableStringBuilder<256> Uri; Uri << m_Client.ServiceUrl() << "/z$/" << BucketId << "/" << Key.ToHexString(); - cpr::Session& Session = m_SessionState->Session; + cpr::Session& Session = m_SessionState->GetSession(); Session.SetOption(cpr::Url{Uri.c_str()}); Session.SetHeader(cpr::Header{{"Content-Type", @@ -480,7 +498,7 @@ ZenStructuredCacheSession::PutCachePayload(std::string_view BucketId, const IoHa ExtendableStringBuilder<256> Uri; Uri << m_Client.ServiceUrl() << "/z$/" << BucketId << "/" << Key.ToHexString() << "/" << PayloadId.ToHexString(); - cpr::Session& Session = m_SessionState->Session; + cpr::Session& Session = m_SessionState->GetSession(); Session.SetOption(cpr::Url{Uri.c_str()}); Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-comp"}}); @@ -508,7 +526,7 @@ ZenStructuredCacheSession::InvokeRpc(const CbObjectView& Request) BinaryWriter Body; Request.CopyTo(Body); - cpr::Session& Session = m_SessionState->Session; + cpr::Session& Session = m_SessionState->GetSession(); Session.SetOption(cpr::Url{Uri.c_str()}); Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}, {"Accept", "application/x-ue-cbpkg"}}); diff --git a/zenserver/upstream/zen.h b/zenserver/upstream/zen.h index 1fbfed7dd..ac3abc931 100644 --- a/zenserver/upstream/zen.h +++ b/zenserver/upstream/zen.h @@ -101,6 +101,14 @@ struct ZenCacheResult bool Success = false; }; +struct ZenClientOptions +{ + std::string_view Url; + std::span<std::string const> Urls; + std::chrono::milliseconds ConnectTimeout{}; + std::chrono::milliseconds Timeout{}; +}; + /** Zen Structured Cache session * * This provides a context in which cache queries can be performed @@ -136,7 +144,7 @@ private: class ZenStructuredCacheClient : public RefCounted { public: - ZenStructuredCacheClient(std::string_view ServiceUrl); + ZenStructuredCacheClient(const ZenClientOptions& Options); ~ZenStructuredCacheClient(); std::string_view ServiceUrl() const { return m_ServiceUrl; } @@ -144,16 +152,20 @@ public: inline spdlog::logger& Log() { return m_Log; } private: - spdlog::logger& m_Log; - std::string m_ServiceUrl; + spdlog::logger& m_Log; + std::string m_ServiceUrl; + std::chrono::milliseconds m_ConnectTimeout; + std::chrono::milliseconds m_Timeout; RwLock m_SessionStateLock; std::list<detail::ZenCacheSessionState*> m_SessionStateCache; detail::ZenCacheSessionState* AllocSessionState(); void FreeSessionState(detail::ZenCacheSessionState*); + void InitializeSessionState(detail::ZenCacheSessionState& State); friend class ZenStructuredCacheSession; + friend struct detail::ZenCacheSessionState; }; } // namespace zen diff --git a/zenserver/zenserver.cpp b/zenserver/zenserver.cpp index f0648698c..b18378862 100644 --- a/zenserver/zenserver.cpp +++ b/zenserver/zenserver.cpp @@ -673,7 +673,10 @@ ZenServer::InitializeStructuredCache(ZenServiceConfig& ServiceConfig) if (!ZenUrls.empty()) { - std::unique_ptr<zen::UpstreamEndpoint> ZenEndpoint = zen::MakeZenUpstreamEndpoint(ZenUrls); + std::unique_ptr<zen::UpstreamEndpoint> ZenEndpoint = + zen::MakeZenUpstreamEndpoint({.Urls = ZenUrls, + .ConnectTimeout = std::chrono::milliseconds(UpstreamConfig.ConnectTimeoutMilliseconds), + .Timeout = std::chrono::milliseconds(UpstreamConfig.TimeoutMilliseconds)}); UpstreamCache->RegisterEndpoint(std::move(ZenEndpoint)); } } |