diff options
Diffstat (limited to 'zenserver')
| -rw-r--r-- | zenserver/cache/structuredcache.cpp | 182 | ||||
| -rw-r--r-- | zenserver/cache/structuredcache.h | 4 | ||||
| -rw-r--r-- | zenserver/config.cpp | 21 | ||||
| -rw-r--r-- | zenserver/resource.h | 18 | ||||
| -rw-r--r-- | zenserver/upstream/jupiter.cpp | 30 | ||||
| -rw-r--r-- | zenserver/upstream/jupiter.h | 10 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamcache.cpp | 210 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamcache.h | 44 | ||||
| -rw-r--r-- | zenserver/upstream/zen.cpp | 29 | ||||
| -rw-r--r-- | zenserver/upstream/zen.h | 12 | ||||
| -rw-r--r-- | zenserver/zenserver.cpp | 4 | ||||
| -rw-r--r-- | zenserver/zenserver.rc | 81 | ||||
| -rw-r--r-- | zenserver/zenserver.vcxproj | 7 | ||||
| -rw-r--r-- | zenserver/zenserver.vcxproj.filters | 7 |
14 files changed, 479 insertions, 180 deletions
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index faef3eb12..3ac1ec37f 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -575,7 +575,6 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request if (Attachment->IsCompressedBinary()) { CompressedBuffer Chunk = Attachment->AsCompressedBinary(); - const uint64_t ChunkSize = Chunk.GetCompressed().GetSize(); CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk); ValidAttachments.emplace_back(InsertResult.DecompressedId); @@ -635,16 +634,8 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request } void -HttpStructuredCacheService::HandleCachePayloadRequest(HttpServerRequest& Request, CacheRef& Ref, CachePolicy Policy) +HttpStructuredCacheService::HandleCachePayloadRequest(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy) { - // Note: the URL references the uncompressed payload hash - so this maintains the mapping - // from uncompressed CAS identity (aka CID/Content ID) to the stored payload hash - // - // this is a PITA but a consequence of the fact that the client side code is not able to - // address data by compressed hash - - ZEN_UNUSED(Policy); - switch (auto Verb = Request.RequestVerb()) { using enum HttpVerb; @@ -652,112 +643,107 @@ HttpStructuredCacheService::HandleCachePayloadRequest(HttpServerRequest& Request case kHead: case kGet: { - IoBuffer Payload = m_CidStore.FindChunkByCid(Ref.PayloadId); - bool InUpstreamCache = false; - - if (!Payload && m_UpstreamCache) - { - if (auto UpstreamResult = m_UpstreamCache->GetCachePayload({{Ref.BucketSegment, Ref.HashKey}, Ref.PayloadId}); - UpstreamResult.Success) - { - if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Value))) - { - Payload = UpstreamResult.Value; - IoHash ChunkHash = IoHash::HashBuffer(Payload); - CasStore::InsertResult Result = m_CasStore.InsertChunk(Payload, ChunkHash); - InUpstreamCache = true; - - m_CidStore.AddCompressedCid(Ref.PayloadId, ChunkHash); - } - else - { - ZEN_WARN("got uncompressed upstream cache payload"); - } - } - } - - if (!Payload) - { - ZEN_DEBUG("MISS - '{}/{}/{}'", Ref.BucketSegment, Ref.HashKey, Ref.PayloadId); - return Request.WriteResponse(HttpResponseCode::NotFound); - } - - ZEN_DEBUG("HIT - '{}/{}/{}' {} (type: {}) ({})", - Ref.BucketSegment, - Ref.HashKey, - Ref.PayloadId, - NiceBytes(Payload.Size()), - Payload.GetContentType(), - InUpstreamCache ? "UPSTREAM" : "LOCAL"); - + HandleGetCachePayload(Request, Ref, Policy); if (Verb == kHead) { Request.SetSuppressResponseBody(); } - - return Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, Payload); } break; - case kPut: + HandlePutCachePayload(Request, Ref, Policy); + break; + default: + break; + } +} + +void +HttpStructuredCacheService::HandleGetCachePayload(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy) +{ + IoBuffer Payload = m_CidStore.FindChunkByCid(Ref.PayloadId); + bool InUpstreamCache = false; + const bool QueryUpstream = !Payload && m_UpstreamCache && (CachePolicy::QueryRemote == (Policy & CachePolicy::QueryRemote)); + + if (QueryUpstream) + { + if (auto UpstreamResult = m_UpstreamCache->GetCachePayload({{Ref.BucketSegment, Ref.HashKey}, Ref.PayloadId}); + UpstreamResult.Success) + { + if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Value))) { - if (IoBuffer Body = Request.ReadPayload()) - { - if (Body.Size() == 0) - { - return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Empty payload not permitted"); - } + Payload = UpstreamResult.Value; + IoHash ChunkHash = IoHash::HashBuffer(Payload); + CasStore::InsertResult Result = m_CasStore.InsertChunk(Payload, ChunkHash); + InUpstreamCache = true; + + m_CidStore.AddCompressedCid(Ref.PayloadId, ChunkHash); + } + else + { + ZEN_WARN("got uncompressed upstream cache payload"); + } + } + } - IoHash ChunkHash = IoHash::HashBuffer(Body); + if (!Payload) + { + ZEN_DEBUG("MISS - '{}/{}/{}'", Ref.BucketSegment, Ref.HashKey, Ref.PayloadId); + return Request.WriteResponse(HttpResponseCode::NotFound); + } - CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Body)); + ZEN_DEBUG("HIT - '{}/{}/{}' {} (type: {}) ({})", + Ref.BucketSegment, + Ref.HashKey, + Ref.PayloadId, + NiceBytes(Payload.Size()), + Payload.GetContentType(), + InUpstreamCache ? "UPSTREAM" : "LOCAL"); - if (!Compressed) - { - // All attachment payloads need to be in compressed buffer format - return Request.WriteResponse(HttpResponseCode::BadRequest, - HttpContentType::kText, - "Attachments must be compressed"); - } - else - { - if (IoHash::FromBLAKE3(Compressed.GetRawHash()) != Ref.PayloadId) - { - // the URL specified content id and content hashes don't match! - return Request.WriteResponse(HttpResponseCode::BadRequest); - } + Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, Payload); +} - CasStore::InsertResult Result = m_CasStore.InsertChunk(Body, ChunkHash); +void +HttpStructuredCacheService::HandlePutCachePayload(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy) +{ + // Note: Individual cache payloads are not propagated upstream until a valid cache record has been stored + ZEN_UNUSED(Policy); - m_CidStore.AddCompressedCid(Ref.PayloadId, ChunkHash); + IoBuffer Body = Request.ReadPayload(); - ZEN_DEBUG("PUT - payload '{}/{}/{}' {} (type: {}) {}", - Ref.BucketSegment, - Ref.HashKey, - Ref.PayloadId, - NiceBytes(Body.Size()), - Body.GetContentType(), - Result.New ? "NEW" : "OLD"); + if (!Body || Body.Size() == 0) + { + return Request.WriteResponse(HttpResponseCode::BadRequest); + } - if (Result.New) - { - return Request.WriteResponse(HttpResponseCode::Created); - } - else - { - return Request.WriteResponse(HttpResponseCode::OK); - } - } - } - } - break; + IoHash ChunkHash = IoHash::HashBuffer(Body); + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Body)); - case kPost: - break; + if (!Compressed) + { + return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Attachments must be compressed"sv); + } - default: - break; + if (IoHash::FromBLAKE3(Compressed.GetRawHash()) != Ref.PayloadId) + { + return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Payload ID does not match attachment hash"sv); } + + CasStore::InsertResult Result = m_CasStore.InsertChunk(Body, ChunkHash); + + m_CidStore.AddCompressedCid(Ref.PayloadId, ChunkHash); + + ZEN_DEBUG("PUT - payload '{}/{}/{}' {} (type: {}) {}", + Ref.BucketSegment, + Ref.HashKey, + Ref.PayloadId, + NiceBytes(Body.Size()), + Body.GetContentType(), + Result.New ? "NEW" : "OLD"); + + const HttpResponseCode ResponseCode = Result.New ? HttpResponseCode::Created : HttpResponseCode::OK; + + Request.WriteResponse(ResponseCode); } bool diff --git a/zenserver/cache/structuredcache.h b/zenserver/cache/structuredcache.h index edf011ae4..3fdaa1236 100644 --- a/zenserver/cache/structuredcache.h +++ b/zenserver/cache/structuredcache.h @@ -74,7 +74,9 @@ private: void HandleCacheRecordRequest(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy); void HandleGetCacheRecord(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy); void HandlePutCacheRecord(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy); - void HandleCachePayloadRequest(zen::HttpServerRequest& Request, CacheRef& Ref, CachePolicy Policy); + void HandleCachePayloadRequest(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy); + void HandleGetCachePayload(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy); + void HandlePutCachePayload(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy); void HandleCacheBucketRequest(zen::HttpServerRequest& Request, std::string_view Bucket); spdlog::logger& Log() { return m_Log; } diff --git a/zenserver/config.cpp b/zenserver/config.cpp index 164d2a792..c21638258 100644 --- a/zenserver/config.cpp +++ b/zenserver/config.cpp @@ -17,6 +17,8 @@ #include <zencore/logging.h> #include <sol/sol.hpp> +#include <conio.h> + #if ZEN_PLATFORM_WINDOWS // Used for getting My Documents for default data directory @@ -219,7 +221,8 @@ ParseGlobalCliOptions(int argc, char* argv[], ZenServerOptions& GlobalOptions, Z if (result.count("help")) { zen::logging::ConsoleLog().info("{}", options.help()); - + zen::logging::ConsoleLog().info("Press any key to exit!"); + _getch(); exit(0); } @@ -274,14 +277,24 @@ ParseServiceConfig(const std::filesystem::path& DataRoot, ZenServiceConfig& Serv try { - sol::load_result config = lua.load(std::string_view((const char*)LuaScript.Data(), LuaScript.Size()), "zencfg"); + sol::load_result config = lua.load(std::string_view((const char*)LuaScript.Data(), LuaScript.Size()), "zen_cfg"); + + if (!config.valid()) + { + sol::error err = config; + + std::string ErrorString = sol::to_string(config.status()); + + throw std::runtime_error("{} error: {}"_format(ErrorString, err.what())); + } + config(); } catch (std::exception& e) { - ZEN_ERROR("config script failure: {}", e.what()); + ZEN_ERROR("config failure: {}", e.what()); - throw std::runtime_error("fatal zen global config script ({}) failure: {}"_format(ConfigScript, e.what()).c_str()); + throw std::runtime_error("failed to run global config script ('{}'): {}"_format(ConfigScript, e.what()).c_str()); } ServiceConfig.MeshEnabled = lua["mesh"]["enable"].get_or(ServiceConfig.MeshEnabled); diff --git a/zenserver/resource.h b/zenserver/resource.h new file mode 100644 index 000000000..f2e3b471b --- /dev/null +++ b/zenserver/resource.h @@ -0,0 +1,18 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +//{{NO_DEPENDENCIES}} +// Microsoft Visual C++ generated include file. +// Used by zenserver.rc +// +#define IDI_ICON1 101 + +// Next default values for new objects +// +#ifdef APSTUDIO_INVOKED +# ifndef APSTUDIO_READONLY_SYMBOLS +# define _APS_NEXT_RESOURCE_VALUE 102 +# define _APS_NEXT_COMMAND_VALUE 40001 +# define _APS_NEXT_CONTROL_VALUE 1001 +# define _APS_NEXT_SYMED_VALUE 101 +# endif +#endif diff --git a/zenserver/upstream/jupiter.cpp b/zenserver/upstream/jupiter.cpp index 2e74602db..14da8cbcc 100644 --- a/zenserver/upstream/jupiter.cpp +++ b/zenserver/upstream/jupiter.cpp @@ -90,6 +90,11 @@ CloudCacheSession::GetDerivedData(std::string_view BucketId, std::string_view Ke cpr::Response Response = Session.Get(); ZEN_DEBUG("GET {}", Response); + if (Response.error) + { + return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message}; + } + const bool Success = Response.status_code == 200; const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); @@ -121,6 +126,11 @@ CloudCacheSession::GetRef(std::string_view BucketId, const IoHash& Key, ZenConte cpr::Response Response = Session.Get(); ZEN_DEBUG("GET {}", Response); + if (Response.error) + { + return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message}; + } + const bool Success = Response.status_code == 200; const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); @@ -144,6 +154,11 @@ CloudCacheSession::GetCompressedBlob(const IoHash& Key) cpr::Response Response = Session.Get(); ZEN_DEBUG("GET {}", Response); + if (Response.error) + { + return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message}; + } + const bool Success = Response.status_code == 200; const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); @@ -171,6 +186,11 @@ CloudCacheSession::PutDerivedData(std::string_view BucketId, std::string_view Ke cpr::Response Response = Session.Put(); ZEN_DEBUG("PUT {}", Response); + if (Response.error) + { + return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message}; + } + return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = (Response.status_code == 200 || Response.status_code == 201)}; @@ -204,6 +224,11 @@ CloudCacheSession::PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer cpr::Response Response = Session.Put(); ZEN_DEBUG("PUT {}", Response); + if (Response.error) + { + return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message}; + } + return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = (Response.status_code == 200 || Response.status_code == 201)}; @@ -227,6 +252,11 @@ CloudCacheSession::PutCompressedBlob(const IoHash& Key, IoBuffer Blob) cpr::Response Response = Session.Put(); ZEN_DEBUG("PUT {}", Response); + if (Response.error) + { + return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message}; + } + return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = (Response.status_code == 200 || Response.status_code == 201)}; diff --git a/zenserver/upstream/jupiter.h b/zenserver/upstream/jupiter.h index 21217387c..94e7e7680 100644 --- a/zenserver/upstream/jupiter.h +++ b/zenserver/upstream/jupiter.h @@ -42,10 +42,12 @@ private: struct CloudCacheResult { - IoBuffer Response; - int64_t Bytes = {}; - double ElapsedSeconds = {}; - bool Success = false; + IoBuffer Response; + int64_t Bytes = {}; + double ElapsedSeconds = {}; + int32_t ErrorCode = {}; + std::string Reason; + bool Success = false; }; /** diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index d6b6d44be..9d43462c0 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -23,7 +23,6 @@ #include <algorithm> #include <atomic> #include <deque> -#include <limits> #include <thread> #include <unordered_map> @@ -106,11 +105,23 @@ namespace detail { virtual ~JupiterUpstreamEndpoint() = default; - virtual bool Initialize() override + virtual bool IsHealthy() const override { return m_HealthOk.load(); } + + virtual UpstreamEndpointHealth CheckHealth() override { - CloudCacheSession Session(m_Client); - const CloudCacheResult Result = Session.Authenticate(); - return Result.Success; + try + { + CloudCacheSession Session(m_Client); + const CloudCacheResult Result = Session.Authenticate(); + + m_HealthOk = Result.ErrorCode == 0; + + return {.Reason = std::move(Result.Reason), .Ok = Result.Success}; + } + catch (std::exception& Err) + { + return {.Reason = Err.what(), .Ok = false}; + } } virtual std::string_view DisplayName() const override { return m_DisplayName; } @@ -143,6 +154,7 @@ namespace detail { CacheRecord.IterateAttachments([&Session, &Result, &Package](CbFieldView AttachmentHash) { CloudCacheResult AttachmentResult = Session.GetCompressedBlob(AttachmentHash.AsHash()); Result.ElapsedSeconds += AttachmentResult.ElapsedSeconds; + Result.ErrorCode = AttachmentResult.ErrorCode; if (CompressedBuffer Chunk = CompressedBuffer::FromCompressed(SharedBuffer(AttachmentResult.Response))) { @@ -169,14 +181,16 @@ namespace detail { } } + m_HealthOk = Result.ErrorCode == 0; + return {.Value = Result.Response, .Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = Result.Success}; } - catch (std::exception& e) + catch (std::exception& Err) { - return {.Reason = std::string(e.what()), .Success = false}; + return {.Error{.StatusCode = UpstreamStatusCode::Error, .Reason = Err.what()}}; } } @@ -187,14 +201,16 @@ namespace detail { CloudCacheSession Session(m_Client); const CloudCacheResult Result = Session.GetCompressedBlob(PayloadKey.PayloadId); + m_HealthOk = Result.ErrorCode == 0; + return {.Value = Result.Response, .Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = Result.Success}; } - catch (std::exception& e) + catch (std::exception& Err) { - return {.Reason = std::string(e.what()), .Success = false}; + return {.Error{.StatusCode = UpstreamStatusCode::Error, .Reason = Err.what()}}; } } @@ -277,6 +293,7 @@ namespace detail { bool m_UseLegacyDdc; std::string m_DisplayName; RefPtr<CloudCacheClient> m_Client; + std::atomic_bool m_HealthOk{false}; }; class ZenUpstreamEndpoint final : public UpstreamEndpoint @@ -285,27 +302,33 @@ namespace detail { ZenUpstreamEndpoint(std::string_view ServiceUrl) { using namespace fmt::literals; - m_DisplayName = "Zen - '{}'"_format(ServiceUrl); + m_DisplayName = "Zen - {}"_format(ServiceUrl); m_Client = new ZenStructuredCacheClient(ServiceUrl); } ~ZenUpstreamEndpoint() = default; - virtual bool Initialize() override + virtual bool IsHealthy() const override { return m_HealthOk; } + + virtual UpstreamEndpointHealth CheckHealth() override { try { ZenStructuredCacheSession Session(*m_Client); ZenCacheResult Result; - for (int32_t Attempt = 0, MaxAttempts = 3; Attempt < MaxAttempts && !Result.Success; ++Attempt) + + for (int32_t Attempt = 0, MaxAttempts = 2; Attempt < MaxAttempts && !Result.Success; ++Attempt) { - Result = Session.SayHello(); + Result = Session.CheckHealth(); } - return Result.Success; + + m_HealthOk = Result.ErrorCode == 0; + + return {.Reason = std::move(Result.Reason), .Ok = m_HealthOk}; } - catch (std::exception&) + catch (std::exception& Err) { - return false; + return {.Reason = Err.what(), .Ok = false}; } } @@ -318,14 +341,16 @@ namespace detail { ZenStructuredCacheSession Session(*m_Client); const ZenCacheResult Result = Session.GetCacheRecord(CacheKey.Bucket, CacheKey.Hash, Type); + m_HealthOk = Result.ErrorCode == 0; + return {.Value = Result.Response, .Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = Result.Success}; } - catch (std::exception& e) + catch (std::exception& Err) { - return {.Reason = std::string(e.what()), .Success = false}; + return {.Error{.StatusCode = UpstreamStatusCode::Error, .Reason = Err.what()}}; } } @@ -337,14 +362,16 @@ namespace detail { const ZenCacheResult Result = Session.GetCachePayload(PayloadKey.CacheKey.Bucket, PayloadKey.CacheKey.Hash, PayloadKey.PayloadId); + m_HealthOk = Result.ErrorCode == 0; + return {.Value = Result.Response, .Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = Result.Success}; } - catch (std::exception& e) + catch (std::exception& Err) { - return {.Reason = std::string(e.what()), .Success = false}; + return {.Error{.StatusCode = UpstreamStatusCode::Error, .Reason = Err.what()}}; } } @@ -390,6 +417,8 @@ namespace detail { CacheRecord.CacheKey.Hash, PackagePayload, CacheRecord.Type); + + m_HealthOk = Result.ErrorCode == 0; } TotalBytes = Result.Bytes; @@ -406,6 +435,8 @@ namespace detail { CacheRecord.CacheKey.Hash, CacheRecord.PayloadIds[Idx], Payloads[Idx]); + + m_HealthOk = Result.ErrorCode == 0; } TotalBytes += Result.Bytes; @@ -425,6 +456,8 @@ namespace detail { { Result = Session.PutCacheRecord(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RecordValue, CacheRecord.Type); + + m_HealthOk = Result.ErrorCode == 0; } TotalBytes += Result.Bytes; @@ -435,6 +468,7 @@ namespace detail { } catch (std::exception& e) { + m_HealthOk = false; return {.Reason = std::string(e.what()), .Success = false}; } } @@ -442,6 +476,7 @@ namespace detail { private: std::string m_DisplayName; RefPtr<ZenStructuredCacheClient> m_Client; + std::atomic_bool m_HealthOk{false}; }; } // namespace detail @@ -548,27 +583,35 @@ public: virtual bool Initialize() override { - auto NewEnd = std::remove_if(std::begin(m_Endpoints), std::end(m_Endpoints), [this](auto& Endpoint) { - const bool Ok = Endpoint->Initialize(); - ZEN_INFO("{} [{}]", Endpoint->DisplayName(), Ok ? "OK" : "FAILED"); - return !Ok; - }); + for (auto& Endpoint : m_Endpoints) + { + const UpstreamEndpointHealth Health = Endpoint->CheckHealth(); + if (Health.Ok) + { + ZEN_INFO("initialize endpoint '{}' OK", Endpoint->DisplayName()); + } + else + { + ZEN_WARN("initialize endpoint '{}' FAILED, reason '{}'", Endpoint->DisplayName(), Health.Reason); + } + } - m_Endpoints.erase(NewEnd, std::end(m_Endpoints)); - m_IsRunning = !m_Endpoints.empty(); + m_RunState.IsRunning = !m_Endpoints.empty(); - if (m_IsRunning) + if (m_RunState.IsRunning) { for (uint32_t Idx = 0; Idx < m_Options.ThreadCount; Idx++) { m_UpstreamThreads.emplace_back(&DefaultUpstreamCache::ProcessUpstreamQueue, this); } + + m_EndpointMonitorThread = std::thread(&DefaultUpstreamCache::MonitorEndpoints, this); } - return m_IsRunning; + return m_RunState.IsRunning; } - virtual void AddEndpoint(std::unique_ptr<UpstreamEndpoint> Endpoint) override { m_Endpoints.emplace_back(std::move(Endpoint)); } + virtual void RegisterEndpoint(std::unique_ptr<UpstreamEndpoint> Endpoint) override { m_Endpoints.emplace_back(std::move(Endpoint)); } virtual GetUpstreamCacheResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) override { @@ -576,10 +619,13 @@ public: { for (auto& Endpoint : m_Endpoints) { - if (GetUpstreamCacheResult Result = Endpoint->GetCacheRecord(CacheKey, Type); Result.Success) + if (Endpoint->IsHealthy()) { - m_Stats.Add(*Endpoint, Result); - return Result; + if (GetUpstreamCacheResult Result = Endpoint->GetCacheRecord(CacheKey, Type); Result.Success) + { + m_Stats.Add(*Endpoint, Result); + return Result; + } } } } @@ -593,10 +639,13 @@ public: { for (auto& Endpoint : m_Endpoints) { - if (GetUpstreamCacheResult Result = Endpoint->GetCachePayload(PayloadKey); Result.Success) + if (Endpoint->IsHealthy()) { - m_Stats.Add(*Endpoint, Result); - return Result; + if (GetUpstreamCacheResult Result = Endpoint->GetCachePayload(PayloadKey); Result.Success) + { + m_Stats.Add(*Endpoint, Result); + return Result; + } } } } @@ -606,7 +655,7 @@ public: virtual EnqueueResult EnqueueUpstream(UpstreamCacheRecord CacheRecord) override { - if (m_IsRunning.load() && m_Options.WriteUpstream) + if (m_RunState.IsRunning && m_Options.WriteUpstream) { if (!m_UpstreamThreads.empty()) { @@ -655,18 +704,21 @@ private: for (auto& Endpoint : m_Endpoints) { - const PutUpstreamCacheResult Result = Endpoint->PutCacheRecord(CacheRecord, CacheValue.Value, std::span(Payloads)); - if (Result.Success) + if (Endpoint->IsHealthy()) { - m_Stats.Add(*Endpoint, Result); - } - else - { - ZEN_WARN("process upstream FAILED, '{}/{}' FAILED, endpoint '{}', reason: '{}'", - CacheRecord.CacheKey.Bucket, - CacheRecord.CacheKey.Hash, - Endpoint->DisplayName(), - Result.Reason); + const PutUpstreamCacheResult Result = Endpoint->PutCacheRecord(CacheRecord, CacheValue.Value, std::span(Payloads)); + if (Result.Success) + { + m_Stats.Add(*Endpoint, Result); + } + else + { + ZEN_WARN("process upstream FAILED, '{}/{}' FAILED, endpoint '{}', reason: '{}'", + CacheRecord.CacheKey.Bucket, + CacheRecord.CacheKey.Hash, + Endpoint->DisplayName(), + Result.Reason); + } } } } @@ -688,33 +740,82 @@ private: } } - if (!m_IsRunning.load()) + if (!m_RunState.IsRunning) { break; } } } + void MonitorEndpoints() + { + for (;;) + { + { + std::unique_lock lk(m_RunState.Mutex); + if (m_RunState.ExitSignal.wait_for(lk, m_Options.HealthCheckInterval, [this]() { return !m_RunState.IsRunning.load(); })) + { + break; + } + } + + for (auto& Endpoint : m_Endpoints) + { + if (!Endpoint->IsHealthy()) + { + if (const UpstreamEndpointHealth Health = Endpoint->CheckHealth(); Health.Ok) + { + ZEN_INFO("health check endpoint '{}' OK", Endpoint->DisplayName(), Health.Reason); + } + else + { + ZEN_WARN("health check endpoint '{}' FAILED, reason '{}'", Endpoint->DisplayName(), Health.Reason); + } + } + } + } + } + void Shutdown() { - if (m_IsRunning.load()) + if (m_RunState.Stop()) { - m_IsRunning.store(false); m_UpstreamQueue.CompleteAdding(); - for (std::thread& Thread : m_UpstreamThreads) { Thread.join(); } + m_EndpointMonitorThread.join(); m_UpstreamThreads.clear(); m_Endpoints.clear(); } } + spdlog::logger& Log() { return m_Log; } + using UpstreamQueue = detail::BlockingQueue<UpstreamCacheRecord>; - spdlog::logger& Log() { return m_Log; } + struct RunState + { + std::mutex Mutex; + std::condition_variable ExitSignal; + std::atomic_bool IsRunning{false}; + + bool Stop() + { + bool Stopped = false; + { + std::lock_guard _(Mutex); + Stopped = IsRunning.exchange(false); + } + if (Stopped) + { + ExitSignal.notify_all(); + } + return Stopped; + } + }; spdlog::logger& m_Log; UpstreamCacheOptions m_Options; @@ -724,7 +825,8 @@ private: UpstreamStats m_Stats; std::vector<std::unique_ptr<UpstreamEndpoint>> m_Endpoints; std::vector<std::thread> m_UpstreamThreads; - std::atomic_bool m_IsRunning{false}; + std::thread m_EndpointMonitorThread; + RunState m_RunState; }; ////////////////////////////////////////////////////////////////////////// diff --git a/zenserver/upstream/upstreamcache.h b/zenserver/upstream/upstreamcache.h index 142fe260f..96ee8bddc 100644 --- a/zenserver/upstream/upstreamcache.h +++ b/zenserver/upstream/upstreamcache.h @@ -6,6 +6,7 @@ #include <zencore/iohash.h> #include <zencore/zencore.h> +#include <chrono> #include <memory> namespace zen { @@ -35,18 +36,33 @@ struct UpstreamCacheRecord struct UpstreamCacheOptions { - uint32_t ThreadCount = 4; - bool ReadUpstream = true; - bool WriteUpstream = true; + std::chrono::seconds HealthCheckInterval{5}; + uint32_t ThreadCount = 4; + bool ReadUpstream = true; + bool WriteUpstream = true; +}; + +enum class UpstreamStatusCode : uint8_t +{ + Ok, + Error +}; + +struct UpstreamError +{ + UpstreamStatusCode StatusCode = UpstreamStatusCode::Ok; + std::string Reason; + + explicit operator bool() const { return StatusCode != UpstreamStatusCode::Ok; } }; struct GetUpstreamCacheResult { - IoBuffer Value; - std::string Reason; - int64_t Bytes = {}; - double ElapsedSeconds = {}; - bool Success = false; + IoBuffer Value; + UpstreamError Error; + int64_t Bytes = {}; + double ElapsedSeconds = {}; + bool Success = false; }; struct PutUpstreamCacheResult @@ -57,6 +73,12 @@ struct PutUpstreamCacheResult bool Success = false; }; +struct UpstreamEndpointHealth +{ + std::string Reason; + bool Ok = false; +}; + /** * The upstream endpont is responsible for handling upload/downloading of cache records. */ @@ -65,7 +87,9 @@ class UpstreamEndpoint public: virtual ~UpstreamEndpoint() = default; - virtual bool Initialize() = 0; + virtual bool IsHealthy() const = 0; + + virtual UpstreamEndpointHealth CheckHealth() = 0; virtual std::string_view DisplayName() const = 0; @@ -88,7 +112,7 @@ public: virtual bool Initialize() = 0; - virtual void AddEndpoint(std::unique_ptr<UpstreamEndpoint> Endpoint) = 0; + virtual void RegisterEndpoint(std::unique_ptr<UpstreamEndpoint> Endpoint) = 0; virtual GetUpstreamCacheResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) = 0; diff --git a/zenserver/upstream/zen.cpp b/zenserver/upstream/zen.cpp index 7f689d7f3..530bed32a 100644 --- a/zenserver/upstream/zen.cpp +++ b/zenserver/upstream/zen.cpp @@ -382,15 +382,20 @@ ZenStructuredCacheSession::~ZenStructuredCacheSession() } ZenCacheResult -ZenStructuredCacheSession::SayHello() +ZenStructuredCacheSession::CheckHealth() { ExtendableStringBuilder<256> Uri; - Uri << m_Client.ServiceUrl() << "/test/hello"; + Uri << m_Client.ServiceUrl() << "/health/check"; cpr::Session& Session = m_SessionState->Session; Session.SetOption(cpr::Url{Uri.c_str()}); cpr::Response Response = Session.Get(); + if (Response.error) + { + return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = std::move(Response.error.message)}; + } + return {.Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200}; } @@ -411,6 +416,11 @@ ZenStructuredCacheSession::GetCacheRecord(std::string_view BucketId, const IoHas cpr::Response Response = Session.Get(); ZEN_DEBUG("GET {}", Response); + if (Response.error) + { + return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = std::move(Response.error.message)}; + } + const bool Success = Response.status_code == 200; const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); @@ -431,6 +441,11 @@ ZenStructuredCacheSession::GetCachePayload(std::string_view BucketId, const IoHa cpr::Response Response = Session.Get(); ZEN_DEBUG("GET {}", Response); + if (Response.error) + { + return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = std::move(Response.error.message)}; + } + const bool Success = Response.status_code == 200; const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); @@ -455,6 +470,11 @@ ZenStructuredCacheSession::PutCacheRecord(std::string_view BucketId, const IoHas cpr::Response Response = Session.Put(); ZEN_DEBUG("PUT {}", Response); + if (Response.error) + { + return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = std::move(Response.error.message)}; + } + return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = (Response.status_code == 200 || Response.status_code == 201)}; @@ -475,6 +495,11 @@ ZenStructuredCacheSession::PutCachePayload(std::string_view BucketId, const IoHa cpr::Response Response = Session.Put(); ZEN_DEBUG("PUT {}", Response); + if (Response.error) + { + return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = std::move(Response.error.message)}; + } + return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = (Response.status_code == 200 || Response.status_code == 201)}; diff --git a/zenserver/upstream/zen.h b/zenserver/upstream/zen.h index 36cfd1217..158be668a 100644 --- a/zenserver/upstream/zen.h +++ b/zenserver/upstream/zen.h @@ -91,10 +91,12 @@ namespace detail { struct ZenCacheResult { - IoBuffer Response; - int64_t Bytes = {}; - double ElapsedSeconds = {}; - bool Success = false; + IoBuffer Response; + int64_t Bytes = {}; + double ElapsedSeconds = {}; + int32_t ErrorCode = {}; + std::string Reason; + bool Success = false; }; /** Zen Structured Cache session @@ -109,7 +111,7 @@ public: ZenStructuredCacheSession(ZenStructuredCacheClient& OuterClient); ~ZenStructuredCacheSession(); - ZenCacheResult SayHello(); + ZenCacheResult CheckHealth(); ZenCacheResult GetCacheRecord(std::string_view BucketId, const IoHash& Key, ZenContentType Type); ZenCacheResult GetCachePayload(std::string_view BucketId, const IoHash& Key, const IoHash& PayloadId); ZenCacheResult PutCacheRecord(std::string_view BucketId, const IoHash& Key, IoBuffer Value, ZenContentType Type); diff --git a/zenserver/zenserver.cpp b/zenserver/zenserver.cpp index cf24dc224..e3b61568f 100644 --- a/zenserver/zenserver.cpp +++ b/zenserver/zenserver.cpp @@ -193,7 +193,7 @@ public: if (!UpstreamConfig.ZenConfig.Url.empty()) { std::unique_ptr<zen::UpstreamEndpoint> ZenEndpoint = zen::MakeZenUpstreamEndpoint(UpstreamConfig.ZenConfig.Url); - UpstreamCache->AddEndpoint(std::move(ZenEndpoint)); + UpstreamCache->RegisterEndpoint(std::move(ZenEndpoint)); } { @@ -221,7 +221,7 @@ public: if (!Options.ServiceUrl.empty()) { std::unique_ptr<zen::UpstreamEndpoint> JupiterEndpoint = zen::MakeJupiterUpstreamEndpoint(Options); - UpstreamCache->AddEndpoint(std::move(JupiterEndpoint)); + UpstreamCache->RegisterEndpoint(std::move(JupiterEndpoint)); } } diff --git a/zenserver/zenserver.rc b/zenserver/zenserver.rc new file mode 100644 index 000000000..c063436ef --- /dev/null +++ b/zenserver/zenserver.rc @@ -0,0 +1,81 @@ +// Microsoft Visual C++ generated resource script. +// +#include "resource.h" + +#define APSTUDIO_READONLY_SYMBOLS +///////////////////////////////////////////////////////////////////////////// +// +// Generated from the TEXTINCLUDE 2 resource. +// +#include "winres.h" + +///////////////////////////////////////////////////////////////////////////// +#undef APSTUDIO_READONLY_SYMBOLS + +///////////////////////////////////////////////////////////////////////////// +// English (United States) resources + +#if !defined(AFX_RESOURCE_DLL) || defined(AFX_TARG_ENU) +LANGUAGE LANG_ENGLISH, SUBLANG_ENGLISH_US +#pragma code_page(1252) + +///////////////////////////////////////////////////////////////////////////// +// +// Icon +// + +// Icon with lowest ID value placed first to ensure application icon +// remains consistent on all systems. +IDI_ICON1 ICON "..\\UnrealEngine.ico" + +#endif // English (United States) resources +///////////////////////////////////////////////////////////////////////////// + + +///////////////////////////////////////////////////////////////////////////// +// English (United Kingdom) resources + +#if !defined(AFX_RESOURCE_DLL) || defined(AFX_TARG_ENG) +LANGUAGE LANG_ENGLISH, SUBLANG_ENGLISH_UK +#pragma code_page(1252) + +#ifdef APSTUDIO_INVOKED +///////////////////////////////////////////////////////////////////////////// +// +// TEXTINCLUDE +// + +1 TEXTINCLUDE +BEGIN + "resource.h\0" +END + +2 TEXTINCLUDE +BEGIN + "#include ""winres.h""\r\n" + "\0" +END + +3 TEXTINCLUDE +BEGIN + "\r\n" + "\0" +END + +#endif // APSTUDIO_INVOKED + +#endif // English (United Kingdom) resources +///////////////////////////////////////////////////////////////////////////// + + + +#ifndef APSTUDIO_INVOKED +///////////////////////////////////////////////////////////////////////////// +// +// Generated from the TEXTINCLUDE 3 resource. +// + + +///////////////////////////////////////////////////////////////////////////// +#endif // not APSTUDIO_INVOKED + diff --git a/zenserver/zenserver.vcxproj b/zenserver/zenserver.vcxproj index db657d192..1671d98a6 100644 --- a/zenserver/zenserver.vcxproj +++ b/zenserver/zenserver.vcxproj @@ -109,6 +109,7 @@ <ClInclude Include="compute\apply.h" /> <ClInclude Include="config.h" /> <ClInclude Include="diag\logging.h" /> + <ClInclude Include="resource.h" /> <ClInclude Include="sos\sos.h" /> <ClInclude Include="testing\httptest.h" /> <ClInclude Include="upstream\jupiter.h" /> @@ -163,6 +164,12 @@ <ItemGroup> <None Include="xmake.lua" /> </ItemGroup> + <ItemGroup> + <ResourceCompile Include="zenserver.rc" /> + </ItemGroup> + <ItemGroup> + <Image Include="..\UnrealEngine.ico" /> + </ItemGroup> <Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" /> <ImportGroup Label="ExtensionTargets"> </ImportGroup> diff --git a/zenserver/zenserver.vcxproj.filters b/zenserver/zenserver.vcxproj.filters index 250c55812..c51a8eb76 100644 --- a/zenserver/zenserver.vcxproj.filters +++ b/zenserver/zenserver.vcxproj.filters @@ -40,6 +40,7 @@ </ClInclude> <ClInclude Include="testing\httptest.h" /> <ClInclude Include="windows\service.h" /> + <ClInclude Include="resource.h" /> </ItemGroup> <ItemGroup> <ClCompile Include="zenserver.cpp" /> @@ -96,4 +97,10 @@ <ItemGroup> <None Include="xmake.lua" /> </ItemGroup> + <ItemGroup> + <ResourceCompile Include="zenserver.rc" /> + </ItemGroup> + <ItemGroup> + <Image Include="..\UnrealEngine.ico" /> + </ItemGroup> </Project>
\ No newline at end of file |