aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Ridgers <[email protected]>2021-11-17 08:30:40 +0100
committerMartin Ridgers <[email protected]>2021-11-17 08:30:40 +0100
commit5e6b243d7fee946cc8126e54ffaee050fce7433d (patch)
tree4a953b941e794c9fe978987696a1acf50fed5858
parentHooked up zenserver:main() to ZenEntryPoint::Run() on POSIX (diff)
parentFormat fix. (diff)
downloadzen-5e6b243d7fee946cc8126e54ffaee050fce7433d.tar.xz
zen-5e6b243d7fee946cc8126e54ffaee050fce7433d.zip
Merged main
-rw-r--r--zenserver/config.cpp16
-rw-r--r--zenserver/config.h8
-rw-r--r--zenserver/upstream/upstreamapply.cpp12
-rw-r--r--zenserver/upstream/upstreamcache.cpp21
-rw-r--r--zenserver/upstream/upstreamcache.h3
-rw-r--r--zenserver/upstream/zen.cpp34
-rw-r--r--zenserver/upstream/zen.h18
-rw-r--r--zenserver/zenserver.cpp5
8 files changed, 87 insertions, 30 deletions
diff --git a/zenserver/config.cpp b/zenserver/config.cpp
index 94226ef26..7afac3406 100644
--- a/zenserver/config.cpp
+++ b/zenserver/config.cpp
@@ -243,7 +243,7 @@ ParseGlobalCliOptions(int argc, char* argv[], ZenServerOptions& GlobalOptions, Z
"",
"upstream-thread-count",
"Number of threads used for upstream procsssing",
- cxxopts::value<int>(ServiceConfig.UpstreamCacheConfig.UpstreamThreadCount)->default_value("4"),
+ cxxopts::value<int32_t>(ServiceConfig.UpstreamCacheConfig.UpstreamThreadCount)->default_value("4"),
"");
options.add_option("cache",
@@ -253,6 +253,20 @@ ParseGlobalCliOptions(int argc, char* argv[], ZenServerOptions& GlobalOptions, Z
cxxopts::value<bool>(ServiceConfig.UpstreamCacheConfig.StatsEnabled)->default_value("false"),
"");
+ options.add_option("cache",
+ "",
+ "upstream-connect-timeout-ms",
+ "Connect timeout in millisecond(s). Default 5000 ms.",
+ cxxopts::value<int32_t>(ServiceConfig.UpstreamCacheConfig.ConnectTimeoutMilliseconds)->default_value("5000"),
+ "");
+
+ options.add_option("cache",
+ "",
+ "upstream-timeout-ms",
+ "Timeout in millisecond(s). Default 0 ms",
+ cxxopts::value<int32_t>(ServiceConfig.UpstreamCacheConfig.TimeoutMilliseconds)->default_value("0"),
+ "");
+
try
{
auto result = options.parse(argc, argv);
diff --git a/zenserver/config.h b/zenserver/config.h
index 55e846352..e7b228b91 100644
--- a/zenserver/config.h
+++ b/zenserver/config.h
@@ -58,9 +58,11 @@ struct ZenUpstreamCacheConfig
{
ZenUpstreamJupiterConfig JupiterConfig;
ZenUpstreamZenConfig ZenConfig;
- int UpstreamThreadCount = 4;
- UpstreamCachePolicy CachePolicy = UpstreamCachePolicy::ReadWrite;
- bool StatsEnabled = false;
+ int32_t UpstreamThreadCount = 4;
+ int32_t ConnectTimeoutMilliseconds = 5000;
+ int32_t TimeoutMilliseconds = 0;
+ UpstreamCachePolicy CachePolicy = UpstreamCachePolicy::ReadWrite;
+ bool StatsEnabled = false;
};
struct ZenServiceConfig
diff --git a/zenserver/upstream/upstreamapply.cpp b/zenserver/upstream/upstreamapply.cpp
index f32b08959..a0c6a91cf 100644
--- a/zenserver/upstream/upstreamapply.cpp
+++ b/zenserver/upstream/upstreamapply.cpp
@@ -800,13 +800,13 @@ namespace detail {
{
using namespace fmt::literals;
- std::string_view HostPlatform = ApplyRecord.WorkerDescriptor["host"sv].AsString();
+ std::string_view HostPlatform = ApplyRecord.WorkerDescriptor["host"sv].AsString();
// TODO: Enable when Horde accepts the UE style Host Platforms (Win64, Linux, Mac)
- //CbObject Requirements = BuildRequirements("OSFamily == '{}'"_format(HostPlatform), {}, false);
- CbObject Requirements = BuildRequirements("OSFamily == 'Windows'", {}, false);
- const IoHash RequirementsId = Requirements.GetHash();
- Data.Objects[RequirementsId] = std::move(Requirements);
- Data.RequirementsId = RequirementsId;
+ // CbObject Requirements = BuildRequirements("OSFamily == '{}'"_format(HostPlatform), {}, false);
+ CbObject Requirements = BuildRequirements("OSFamily == 'Windows'", {}, false);
+ const IoHash RequirementsId = Requirements.GetHash();
+ Data.Objects[RequirementsId] = std::move(Requirements);
+ Data.RequirementsId = RequirementsId;
}
CbObject Task = BuildTask(ExecutablePath,
diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp
index 1655f2b68..dec61ad65 100644
--- a/zenserver/upstream/upstreamcache.cpp
+++ b/zenserver/upstream/upstreamcache.cpp
@@ -442,9 +442,13 @@ namespace detail {
};
public:
- ZenUpstreamEndpoint(std::span<std::string const> Urls) : m_Log(zen::logging::Get("upstream")), m_DisplayName("ZEN")
+ ZenUpstreamEndpoint(const ZenClientOptions& Options)
+ : m_Log(zen::logging::Get("upstream"))
+ , m_DisplayName("ZEN")
+ , m_ConnectTimeout(Options.ConnectTimeout)
+ , m_Timeout(Options.Timeout)
{
- for (const auto& Url : Urls)
+ for (const auto& Url : Options.Urls)
{
m_Endpoints.push_back({.Url = Url});
}
@@ -461,7 +465,7 @@ namespace detail {
{
m_ServiceUrl = Ep.Url;
m_DisplayName = "ZEN - {}"_format(m_ServiceUrl);
- m_Client = new ZenStructuredCacheClient(m_ServiceUrl);
+ m_Client = new ZenStructuredCacheClient({.Url = m_ServiceUrl, .ConnectTimeout = m_ConnectTimeout, .Timeout = m_Timeout});
m_HealthOk = true;
return {.Ok = true};
@@ -486,7 +490,8 @@ namespace detail {
{
m_ServiceUrl = Ep.Url;
m_DisplayName = "ZEN - {}"_format(m_ServiceUrl);
- m_Client = new ZenStructuredCacheClient(m_ServiceUrl);
+ m_Client =
+ new ZenStructuredCacheClient({.Url = m_ServiceUrl, .ConnectTimeout = m_ConnectTimeout, .Timeout = m_Timeout});
m_HealthOk = true;
return {.Ok = true};
@@ -825,7 +830,7 @@ namespace detail {
{
for (ZenEndpoint& Ep : m_Endpoints)
{
- ZenStructuredCacheClient Client(Ep.Url);
+ ZenStructuredCacheClient Client({.Url = Ep.Url, .ConnectTimeout = std::chrono::milliseconds(1000)});
ZenStructuredCacheSession Session(Client);
const int32_t SampleCount = 2;
@@ -858,6 +863,8 @@ namespace detail {
std::string m_ServiceUrl;
std::vector<ZenEndpoint> m_Endpoints;
std::string m_DisplayName;
+ std::chrono::milliseconds m_ConnectTimeout;
+ std::chrono::milliseconds m_Timeout;
RefPtr<ZenStructuredCacheClient> m_Client;
UpstreamEndpointStats m_Stats;
std::atomic_bool m_HealthOk{false};
@@ -1347,9 +1354,9 @@ MakeJupiterUpstreamEndpoint(const CloudCacheClientOptions& Options)
}
std::unique_ptr<UpstreamEndpoint>
-MakeZenUpstreamEndpoint(std::span<std::string const> Urls)
+MakeZenUpstreamEndpoint(const ZenClientOptions& Options)
{
- return std::make_unique<detail::ZenUpstreamEndpoint>(Urls);
+ return std::make_unique<detail::ZenUpstreamEndpoint>(Options);
}
} // namespace zen
diff --git a/zenserver/upstream/upstreamcache.h b/zenserver/upstream/upstreamcache.h
index 4b0c17181..82f1c9fd8 100644
--- a/zenserver/upstream/upstreamcache.h
+++ b/zenserver/upstream/upstreamcache.h
@@ -20,6 +20,7 @@ class CbObjectWriter;
class CidStore;
class ZenCacheStore;
struct CloudCacheClientOptions;
+struct ZenClientOptions;
struct UpstreamCacheRecord
{
@@ -174,6 +175,6 @@ std::unique_ptr<UpstreamCache> MakeUpstreamCache(const UpstreamCacheOptions& Opt
std::unique_ptr<UpstreamEndpoint> MakeJupiterUpstreamEndpoint(const CloudCacheClientOptions& Options);
-std::unique_ptr<UpstreamEndpoint> MakeZenUpstreamEndpoint(std::span<std::string const> Urls);
+std::unique_ptr<UpstreamEndpoint> MakeZenUpstreamEndpoint(const ZenClientOptions& Options);
} // namespace zen
diff --git a/zenserver/upstream/zen.cpp b/zenserver/upstream/zen.cpp
index 9ba767098..3e5a42c22 100644
--- a/zenserver/upstream/zen.cpp
+++ b/zenserver/upstream/zen.cpp
@@ -313,6 +313,15 @@ namespace detail {
void Reset() {}
+ cpr::Session& GetSession()
+ {
+ OwnerClient.InitializeSessionState(*this);
+ return Session;
+ }
+
+ private:
+ friend class ZenStructuredCacheClient;
+
ZenStructuredCacheClient& OwnerClient;
cpr::Session Session;
};
@@ -321,9 +330,11 @@ namespace detail {
//////////////////////////////////////////////////////////////////////////
-ZenStructuredCacheClient::ZenStructuredCacheClient(std::string_view ServiceUrl)
+ZenStructuredCacheClient::ZenStructuredCacheClient(const ZenClientOptions& Options)
: m_Log(logging::Get(std::string_view("zenclient")))
-, m_ServiceUrl(ServiceUrl)
+, m_ServiceUrl(Options.Url)
+, m_ConnectTimeout(Options.ConnectTimeout)
+, m_Timeout(Options.Timeout)
{
}
@@ -359,6 +370,13 @@ ZenStructuredCacheClient::FreeSessionState(detail::ZenCacheSessionState* State)
m_SessionStateCache.push_front(State);
}
+void
+ZenStructuredCacheClient::InitializeSessionState(detail::ZenCacheSessionState& State)
+{
+ State.Session.SetConnectTimeout(m_ConnectTimeout);
+ State.Session.SetTimeout(m_Timeout);
+}
+
//////////////////////////////////////////////////////////////////////////
using namespace std::literals;
@@ -381,7 +399,7 @@ ZenStructuredCacheSession::CheckHealth()
ExtendableStringBuilder<256> Uri;
Uri << m_Client.ServiceUrl() << "/health/check";
- cpr::Session& Session = m_SessionState->Session;
+ cpr::Session& Session = m_SessionState->GetSession();
Session.SetOption(cpr::Url{Uri.c_str()});
cpr::Response Response = Session.Get();
@@ -399,7 +417,7 @@ ZenStructuredCacheSession::GetCacheRecord(std::string_view BucketId, const IoHas
ExtendableStringBuilder<256> Uri;
Uri << m_Client.ServiceUrl() << "/z$/" << BucketId << "/" << Key.ToHexString();
- cpr::Session& Session = m_SessionState->Session;
+ cpr::Session& Session = m_SessionState->GetSession();
Session.SetOption(cpr::Url{Uri.c_str()});
Session.SetHeader(cpr::Header{{"Accept",
@@ -427,7 +445,7 @@ ZenStructuredCacheSession::GetCachePayload(std::string_view BucketId, const IoHa
ExtendableStringBuilder<256> Uri;
Uri << m_Client.ServiceUrl() << "/z$/" << BucketId << "/" << Key.ToHexString() << "/" << PayloadId.ToHexString();
- cpr::Session& Session = m_SessionState->Session;
+ cpr::Session& Session = m_SessionState->GetSession();
Session.SetOption(cpr::Url{Uri.c_str()});
Session.SetHeader(cpr::Header{{"Accept", "application/x-ue-comp"}});
@@ -452,7 +470,7 @@ ZenStructuredCacheSession::PutCacheRecord(std::string_view BucketId, const IoHas
ExtendableStringBuilder<256> Uri;
Uri << m_Client.ServiceUrl() << "/z$/" << BucketId << "/" << Key.ToHexString();
- cpr::Session& Session = m_SessionState->Session;
+ cpr::Session& Session = m_SessionState->GetSession();
Session.SetOption(cpr::Url{Uri.c_str()});
Session.SetHeader(cpr::Header{{"Content-Type",
@@ -480,7 +498,7 @@ ZenStructuredCacheSession::PutCachePayload(std::string_view BucketId, const IoHa
ExtendableStringBuilder<256> Uri;
Uri << m_Client.ServiceUrl() << "/z$/" << BucketId << "/" << Key.ToHexString() << "/" << PayloadId.ToHexString();
- cpr::Session& Session = m_SessionState->Session;
+ cpr::Session& Session = m_SessionState->GetSession();
Session.SetOption(cpr::Url{Uri.c_str()});
Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-comp"}});
@@ -508,7 +526,7 @@ ZenStructuredCacheSession::InvokeRpc(const CbObjectView& Request)
BinaryWriter Body;
Request.CopyTo(Body);
- cpr::Session& Session = m_SessionState->Session;
+ cpr::Session& Session = m_SessionState->GetSession();
Session.SetOption(cpr::Url{Uri.c_str()});
Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}, {"Accept", "application/x-ue-cbpkg"}});
diff --git a/zenserver/upstream/zen.h b/zenserver/upstream/zen.h
index 1fbfed7dd..ac3abc931 100644
--- a/zenserver/upstream/zen.h
+++ b/zenserver/upstream/zen.h
@@ -101,6 +101,14 @@ struct ZenCacheResult
bool Success = false;
};
+struct ZenClientOptions
+{
+ std::string_view Url;
+ std::span<std::string const> Urls;
+ std::chrono::milliseconds ConnectTimeout{};
+ std::chrono::milliseconds Timeout{};
+};
+
/** Zen Structured Cache session
*
* This provides a context in which cache queries can be performed
@@ -136,7 +144,7 @@ private:
class ZenStructuredCacheClient : public RefCounted
{
public:
- ZenStructuredCacheClient(std::string_view ServiceUrl);
+ ZenStructuredCacheClient(const ZenClientOptions& Options);
~ZenStructuredCacheClient();
std::string_view ServiceUrl() const { return m_ServiceUrl; }
@@ -144,16 +152,20 @@ public:
inline spdlog::logger& Log() { return m_Log; }
private:
- spdlog::logger& m_Log;
- std::string m_ServiceUrl;
+ spdlog::logger& m_Log;
+ std::string m_ServiceUrl;
+ std::chrono::milliseconds m_ConnectTimeout;
+ std::chrono::milliseconds m_Timeout;
RwLock m_SessionStateLock;
std::list<detail::ZenCacheSessionState*> m_SessionStateCache;
detail::ZenCacheSessionState* AllocSessionState();
void FreeSessionState(detail::ZenCacheSessionState*);
+ void InitializeSessionState(detail::ZenCacheSessionState& State);
friend class ZenStructuredCacheSession;
+ friend struct detail::ZenCacheSessionState;
};
} // namespace zen
diff --git a/zenserver/zenserver.cpp b/zenserver/zenserver.cpp
index f0648698c..b18378862 100644
--- a/zenserver/zenserver.cpp
+++ b/zenserver/zenserver.cpp
@@ -673,7 +673,10 @@ ZenServer::InitializeStructuredCache(ZenServiceConfig& ServiceConfig)
if (!ZenUrls.empty())
{
- std::unique_ptr<zen::UpstreamEndpoint> ZenEndpoint = zen::MakeZenUpstreamEndpoint(ZenUrls);
+ std::unique_ptr<zen::UpstreamEndpoint> ZenEndpoint =
+ zen::MakeZenUpstreamEndpoint({.Urls = ZenUrls,
+ .ConnectTimeout = std::chrono::milliseconds(UpstreamConfig.ConnectTimeoutMilliseconds),
+ .Timeout = std::chrono::milliseconds(UpstreamConfig.TimeoutMilliseconds)});
UpstreamCache->RegisterEndpoint(std::move(ZenEndpoint));
}
}