diff options
| author | Dan Engelbrecht <[email protected]> | 2022-08-19 03:30:54 -0700 |
|---|---|---|
| committer | GitHub <[email protected]> | 2022-08-19 03:30:54 -0700 |
| commit | 606274a83d71928f8621c1d23648a26e8f79fa7d (patch) | |
| tree | d951d51e1a7919463aeec398612e97b47f99f788 | |
| parent | bump vcpkg version to 2022.08.15 (#146) (diff) | |
| download | zen-606274a83d71928f8621c1d23648a26e8f79fa7d.tar.xz zen-606274a83d71928f8621c1d23648a26e8f79fa7d.zip | |
De/fix crash on non responding upstream (#145)
* Fix ZenStructuredCacheClient lifetime issues
| -rw-r--r-- | zenhttp/httpasio.cpp | 5 | ||||
| -rw-r--r-- | zenserver-test/zenserver-test.cpp | 280 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamcache.cpp | 60 | ||||
| -rw-r--r-- | zenserver/upstream/zen.cpp | 24 | ||||
| -rw-r--r-- | zenserver/upstream/zen.h | 4 | ||||
| -rw-r--r-- | zenutil/zenserverprocess.cpp | 1 |
6 files changed, 344 insertions, 30 deletions
diff --git a/zenhttp/httpasio.cpp b/zenhttp/httpasio.cpp index bc3ee754d..e4b85710d 100644 --- a/zenhttp/httpasio.cpp +++ b/zenhttp/httpasio.cpp @@ -987,7 +987,10 @@ struct HttpAcceptor m_Acceptor.async_accept(SocketRef, [this, Socket = std::move(SocketPtr)](const asio::error_code& Ec) mutable { if (Ec) { - // TODO: Error condition - please handle and report properly + ZEN_ERROR("asio async_accept, connection failed to '{}:{}' reason '{}'", + m_Acceptor.local_endpoint().address().to_string(), + m_Acceptor.local_endpoint().port(), + Ec.message()); } else { diff --git a/zenserver-test/zenserver-test.cpp b/zenserver-test/zenserver-test.cpp index a61c16916..0c519dc7e 100644 --- a/zenserver-test/zenserver-test.cpp +++ b/zenserver-test/zenserver-test.cpp @@ -68,6 +68,7 @@ ZEN_THIRD_PARTY_INCLUDES_END #if ZEN_WITH_TESTS # define ZEN_TEST_WITH_RUNNER 1 # include <zencore/testing.h> +# include <zencore/workthreadpool.h> #endif using namespace std::literals; @@ -637,6 +638,16 @@ namespace utils { return New(13337, fmt::format("--debug --upstream-thread-count=0 --upstream-zen-url=http://localhost:{}", UpstreamPort)); } + static ZenConfig NewWithThreadedUpstreams(std::span<uint16_t> UpstreamPorts, bool Debug) + { + std::string Args = Debug ? "--debug" : ""; + for (uint16_t Port : UpstreamPorts) + { + Args = fmt::format("{}{}--upstream-zen-url=http://localhost:{}", Args, Args.length() > 0 ? " " : "", Port); + } + return New(13337, Args); + } + void Spawn(ZenServerInstance& Inst) { Inst.SetTestDir(DataDir); @@ -1351,8 +1362,7 @@ TEST_CASE("zcache.rpc") { zen::CbPackage Response; cacherequests::GetCacheRecordsResult Result; - // std::vector<zen::CbFieldView> Records; - bool Success; + bool Success; }; auto GetCacheRecords = [](std::string_view BaseUri, @@ -1521,6 +1531,272 @@ TEST_CASE("zcache.rpc") } } +TEST_CASE("zcache.failing.upstream") +{ + // This is an exploratory test that takes a long time to run, so lets skip it by default + if (true) + { + return; + } + + using namespace std::literals; + using namespace utils; + + const uint16_t Upstream1PortNumber = 13338; + ZenConfig Upstream1Cfg = ZenConfig::New(Upstream1PortNumber); + Upstream1Cfg.Args += (" --http asio"); + ZenServerInstance Upstream1Server(TestEnv); + + const uint16_t Upstream2PortNumber = 13339; + ZenConfig Upstream2Cfg = ZenConfig::New(Upstream2PortNumber); + Upstream2Cfg.Args += (" --http asio"); + ZenServerInstance Upstream2Server(TestEnv); + + std::vector<std::uint16_t> UpstreamPorts = {Upstream1PortNumber, Upstream2PortNumber}; + ZenConfig LocalCfg = ZenConfig::NewWithThreadedUpstreams(UpstreamPorts, false); + LocalCfg.Args += (" --http asio --upstream-thread-count 2"); + ZenServerInstance LocalServer(TestEnv); + const uint16_t LocalPortNumber = 13337; + const auto LocalUri = fmt::format("http://localhost:{}/z$", LocalPortNumber); + const auto Upstream1Uri = fmt::format("http://localhost:{}/z$", Upstream1PortNumber); + const auto Upstream2Uri = fmt::format("http://localhost:{}/z$", Upstream2PortNumber); + + SpawnServer(Upstream1Server, Upstream1Cfg); + SpawnServer(Upstream2Server, Upstream2Cfg); + SpawnServer(LocalServer, LocalCfg); + bool Upstream1Running = true; + bool Upstream2Running = true; + + using namespace std::literals; + + auto AppendCacheRecord = [](cacherequests::PutCacheRecordsRequest& Request, + const zen::CacheKey& CacheKey, + size_t PayloadSize, + CachePolicy RecordPolicy) { + std::vector<uint32_t> Data; + Data.resize(PayloadSize / 4); + for (uint32_t Idx = 0; Idx < PayloadSize / 4; ++Idx) + { + Data[Idx] = (*reinterpret_cast<const uint32_t*>(&CacheKey.Hash.Hash[0])) + Idx; + } + + CompressedBuffer Value = zen::CompressedBuffer::Compress(SharedBuffer::MakeView(Data.data(), Data.size() * 4)); + Request.Requests.push_back({.Key = CacheKey, .Values = {{.Id = Oid::NewOid(), .Body = std::move(Value)}}, .Policy = RecordPolicy}); + }; + + auto PutCacheRecords = [&AppendCacheRecord](std::string_view BaseUri, + std::string_view Namespace, + std::string_view Bucket, + size_t Num, + size_t KeyOffset, + size_t PayloadSize = 8192) -> std::vector<CacheKey> { + std::vector<zen::CacheKey> OutKeys; + + cacherequests::PutCacheRecordsRequest Request = {.Namespace = std::string(Namespace)}; + for (size_t Key = 1; Key <= Num; ++Key) + { + zen::IoHash KeyHash; + ((size_t*)(KeyHash.Hash))[0] = KeyOffset + Key; + const zen::CacheKey CacheKey = zen::CacheKey::Create(Bucket, KeyHash); + + AppendCacheRecord(Request, CacheKey, PayloadSize, CachePolicy::Default); + OutKeys.push_back(CacheKey); + } + + CbPackage Package; + CHECK(Request.Format(Package)); + + IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer(); + cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)}, + cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}}, + cpr::Body{(const char*)Body.GetData(), Body.GetSize()}); + + if (Result.status_code != 200) + { + ZEN_DEBUG("PutCacheRecords failed with {}, reason '{}'", Result.status_code, Result.reason); + OutKeys.clear(); + } + + return OutKeys; + }; + + struct GetCacheRecordResult + { + zen::CbPackage Response; + cacherequests::GetCacheRecordsResult Result; + bool Success = false; + }; + + auto GetCacheRecords = [](std::string_view BaseUri, + std::string_view Namespace, + std::span<zen::CacheKey> Keys, + zen::CachePolicy Policy) -> GetCacheRecordResult { + cacherequests::GetCacheRecordsRequest Request = {.DefaultPolicy = Policy, .Namespace = std::string(Namespace)}; + for (const CacheKey& Key : Keys) + { + Request.Requests.push_back({.Key = Key}); + } + + CbObjectWriter RequestWriter; + CHECK(Request.Format(RequestWriter)); + + BinaryWriter Body; + RequestWriter.Save(Body); + + cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)}, + cpr::Header{{"Content-Type", "application/x-ue-cb"}, {"Accept", "application/x-ue-cbpkg"}}, + cpr::Body{(const char*)Body.GetData(), Body.GetSize()}); + + GetCacheRecordResult OutResult; + + if (Result.status_code == 200) + { + CbPackage Response; + if (Response.TryLoad(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()))) + { + OutResult.Response = std::move(Response); + CHECK(OutResult.Result.Parse(OutResult.Response)); + OutResult.Success = true; + } + } + else + { + ZEN_DEBUG("GetCacheRecords with {}, reason '{}'", Result.reason, Result.status_code); + } + + return OutResult; + }; + + // Populate with some simple data + + CachePolicy Policy = CachePolicy::Default; + + const size_t ThreadCount = 128; + const size_t KeyMultiplier = 16384; + const size_t RecordsPerRequest = 64; + WorkerThreadPool Pool(ThreadCount); + + std::atomic_size_t Completed = 0; + + auto Keys = new std::vector<CacheKey>[ThreadCount * KeyMultiplier]; + RwLock KeysLock; + + for (size_t I = 0; I < ThreadCount * KeyMultiplier; I++) + { + size_t Iteration = I; + Pool.ScheduleWork([&] { + std::vector<CacheKey> NewKeys = PutCacheRecords(LocalUri, "ue4.ddc"sv, "mastodon"sv, RecordsPerRequest, I * RecordsPerRequest); + if (NewKeys.size() != RecordsPerRequest) + { + ZEN_DEBUG("PutCacheRecords iteration {} failed", Iteration); + Completed.fetch_add(1); + return; + } + { + RwLock::ExclusiveLockScope _(KeysLock); + Keys[Iteration].swap(NewKeys); + } + Completed.fetch_add(1); + }); + } + bool UseUpstream1 = false; + while (Completed < ThreadCount * KeyMultiplier) + { + Sleep(8000); + + if (UseUpstream1) + { + if (Upstream2Running) + { + Upstream2Server.EnableTermination(); + Upstream2Server.Shutdown(); + Sleep(100); + Upstream2Running = false; + } + if (!Upstream1Running) + { + SpawnServer(Upstream1Server, Upstream1Cfg); + Upstream1Running = true; + } + UseUpstream1 = !UseUpstream1; + } + else + { + if (Upstream1Running) + { + Upstream1Server.EnableTermination(); + Upstream1Server.Shutdown(); + Sleep(100); + Upstream1Running = false; + } + if (!Upstream2Running) + { + SpawnServer(Upstream2Server, Upstream2Cfg); + Upstream2Running = true; + } + UseUpstream1 = !UseUpstream1; + } + } + + Completed = 0; + for (size_t I = 0; I < ThreadCount * KeyMultiplier; I++) + { + size_t Iteration = I; + std::vector<CacheKey>& LocalKeys = Keys[Iteration]; + if (LocalKeys.empty()) + { + Completed.fetch_add(1); + continue; + } + Pool.ScheduleWork([&] { + GetCacheRecordResult Result = GetCacheRecords(LocalUri, "ue4.ddc"sv, LocalKeys, Policy); + + if (!Result.Success) + { + ZEN_DEBUG("GetCacheRecords iteration {} failed", Iteration); + Completed.fetch_add(1); + return; + } + + if (Result.Result.Results.size() != LocalKeys.size()) + { + ZEN_DEBUG("GetCacheRecords iteration {} empty records", Iteration); + Completed.fetch_add(1); + return; + } + for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results) + { + const CacheKey& ExpectedKey = LocalKeys[Index++]; + if (!Record) + { + continue; + } + if (Record->Key != ExpectedKey) + { + continue; + } + if (Record->Values.size() != 1) + { + continue; + } + + for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values) + { + if (!Value.Body) + { + continue; + } + } + } + Completed.fetch_add(1); + }); + } + while (Completed < ThreadCount * KeyMultiplier) + { + Sleep(10); + } +} + TEST_CASE("zcache.rpc.allpolicies") { using namespace std::literals; diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index 7d1f72004..40d627862 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -717,10 +717,15 @@ namespace detail { const ZenEndpoint& Ep = GetEndpoint(); - m_Info.Url = Ep.Url; + if (m_Info.Url != Ep.Url) + { + ZEN_INFO("Setting Zen upstream URL to '{}'", Ep.Url); + m_Info.Url = Ep.Url; + } if (Ep.Ok) { + RwLock::ExclusiveLockScope _(m_ClientLock); m_Client = new ZenStructuredCacheClient({.Url = m_Info.Url, .ConnectTimeout = m_ConnectTimeout, .Timeout = m_Timeout}); m_Status.Set(UpstreamEndpointState::kOk); } @@ -749,7 +754,7 @@ namespace detail { try { - ZenStructuredCacheSession Session(*m_Client); + ZenStructuredCacheSession Session(GetClientRef()); const ZenCacheResult Result = Session.GetCacheRecord(Namespace, CacheKey.Bucket, CacheKey.Hash, Type); m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason); @@ -820,7 +825,7 @@ namespace detail { ZenCacheResult Result; { - ZenStructuredCacheSession Session(*m_Client); + ZenStructuredCacheSession Session(GetClientRef()); Result = Session.InvokeRpc(BatchRequest.Save()); } @@ -864,7 +869,7 @@ namespace detail { try { - ZenStructuredCacheSession Session(*m_Client); + ZenStructuredCacheSession Session(GetClientRef()); const ZenCacheResult Result = Session.GetCacheValue(Namespace, CacheKey.Bucket, CacheKey.Hash, ValueContentId); m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason); @@ -952,7 +957,7 @@ namespace detail { ZenCacheResult Result; { - ZenStructuredCacheSession Session(*m_Client); + ZenStructuredCacheSession Session(GetClientRef()); Result = Session.InvokeRpc(BatchRequest.Save()); } @@ -1028,7 +1033,7 @@ namespace detail { try { - ZenStructuredCacheSession Session(*m_Client); + ZenStructuredCacheSession Session(GetClientRef()); ZenCacheResult Result; int64_t TotalBytes = 0ull; double TotalElapsedSeconds = 0.0; @@ -1180,12 +1185,24 @@ namespace detail { virtual UpstreamEndpointStats& Stats() override { return m_Stats; } private: + Ref<ZenStructuredCacheClient> GetClientRef() + { + // m_Client can be modified at any time by a different thread. + // Make sure we safely bump the refcount inside a scope lock + RwLock::SharedLockScope _(m_ClientLock); + ZEN_ASSERT(m_Client); + Ref<ZenStructuredCacheClient> ClientRef(m_Client); + _.ReleaseNow(); + return ClientRef; + } + const ZenEndpoint& GetEndpoint() { for (ZenEndpoint& Ep : m_Endpoints) { - ZenStructuredCacheClient Client({.Url = Ep.Url, .ConnectTimeout = std::chrono::milliseconds(1000)}); - ZenStructuredCacheSession Session(Client); + Ref<ZenStructuredCacheClient> Client( + new ZenStructuredCacheClient({.Url = Ep.Url, .ConnectTimeout = std::chrono::milliseconds(1000)})); + ZenStructuredCacheSession Session(std::move(Client)); const int32_t SampleCount = 2; Ep.Ok = false; @@ -1220,6 +1237,7 @@ namespace detail { std::vector<ZenEndpoint> m_Endpoints; std::chrono::milliseconds m_ConnectTimeout; std::chrono::milliseconds m_Timeout; + RwLock m_ClientLock; RefPtr<ZenStructuredCacheClient> m_Client; }; @@ -1582,7 +1600,10 @@ private: if (!m_CacheStore.Get(CacheRecord.Namespace, CacheRecord.Key.Bucket, CacheRecord.Key.Hash, CacheValue)) { - ZEN_WARN("process upstream FAILED, '{}/{}', cache record doesn't exist", CacheRecord.Key.Bucket, CacheRecord.Key.Hash); + ZEN_WARN("process upstream FAILED, '{}/{}/{}', cache record doesn't exist", + CacheRecord.Namespace, + CacheRecord.Key.Bucket, + CacheRecord.Key.Hash); return; } @@ -1623,7 +1644,8 @@ private: if (!Result.Success) { - ZEN_WARN("upload cache record '{}/{}' FAILED, endpoint '{}', reason '{}'", + ZEN_WARN("upload cache record '{}/{}/{}' FAILED, endpoint '{}', reason '{}'", + CacheRecord.Namespace, CacheRecord.Key.Bucket, CacheRecord.Key.Hash, Endpoint->GetEndpointInfo().Url, @@ -1645,7 +1667,11 @@ private: } catch (std::exception& Err) { - ZEN_ERROR("upload cache record '{}/{}' FAILED, reason '{}'", CacheRecord.Key.Bucket, CacheRecord.Key.Hash, Err.what()); + ZEN_ERROR("upload cache record '{}/{}/{}' FAILED, reason '{}'", + CacheRecord.Namespace, + CacheRecord.Key.Bucket, + CacheRecord.Key.Hash, + Err.what()); } } @@ -1677,8 +1703,16 @@ private: for (auto& Endpoint : m_Endpoints) { - if (Endpoint->GetState() == UpstreamEndpointState::kError || - Endpoint->GetState() == UpstreamEndpointState::kUnauthorized) + UpstreamEndpointState State = Endpoint->GetState(); + if (State == UpstreamEndpointState::kError) + { + Endpoints.push_back(Endpoint.get()); + ZEN_WARN("HEALTH - endpoint '{} - {}' is in error state '{}'", + Endpoint->GetEndpointInfo().Name, + Endpoint->GetEndpointInfo().Url, + Endpoint->GetStatus().Reason); + } + if (State == UpstreamEndpointState::kUnauthorized) { Endpoints.push_back(Endpoint.get()); } diff --git a/zenserver/upstream/zen.cpp b/zenserver/upstream/zen.cpp index cd6a531ca..b837f767c 100644 --- a/zenserver/upstream/zen.cpp +++ b/zenserver/upstream/zen.cpp @@ -377,23 +377,23 @@ ZenStructuredCacheClient::FreeSessionState(detail::ZenCacheSessionState* State) using namespace std::literals; -ZenStructuredCacheSession::ZenStructuredCacheSession(ZenStructuredCacheClient& OuterClient) -: m_Log(OuterClient.Log()) -, m_Client(OuterClient) +ZenStructuredCacheSession::ZenStructuredCacheSession(Ref<ZenStructuredCacheClient>&& OuterClient) +: m_Log(OuterClient->Log()) +, m_Client(std::move(OuterClient)) { - m_SessionState = m_Client.AllocSessionState(); + m_SessionState = m_Client->AllocSessionState(); } ZenStructuredCacheSession::~ZenStructuredCacheSession() { - m_Client.FreeSessionState(m_SessionState); + m_Client->FreeSessionState(m_SessionState); } ZenCacheResult ZenStructuredCacheSession::CheckHealth() { ExtendableStringBuilder<256> Uri; - Uri << m_Client.ServiceUrl() << "/health/check"; + Uri << m_Client->ServiceUrl() << "/health/check"; cpr::Session& Session = m_SessionState->GetSession(); Session.SetOption(cpr::Url{Uri.c_str()}); @@ -411,7 +411,7 @@ ZenCacheResult ZenStructuredCacheSession::GetCacheRecord(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, ZenContentType Type) { ExtendableStringBuilder<256> Uri; - Uri << m_Client.ServiceUrl() << "/z$/"; + Uri << m_Client->ServiceUrl() << "/z$/"; if (Namespace != ZenCacheStore::DefaultNamespace) { Uri << Namespace << "/"; @@ -443,7 +443,7 @@ ZenStructuredCacheSession::GetCacheValue(std::string_view Namespace, const IoHash& ValueContentId) { ExtendableStringBuilder<256> Uri; - Uri << m_Client.ServiceUrl() << "/z$/"; + Uri << m_Client->ServiceUrl() << "/z$/"; if (Namespace != ZenCacheStore::DefaultNamespace) { Uri << Namespace << "/"; @@ -481,7 +481,7 @@ ZenStructuredCacheSession::PutCacheRecord(std::string_view Namespace, ZenContentType Type) { ExtendableStringBuilder<256> Uri; - Uri << m_Client.ServiceUrl() << "/z$/"; + Uri << m_Client->ServiceUrl() << "/z$/"; if (Namespace != ZenCacheStore::DefaultNamespace) { Uri << Namespace << "/"; @@ -517,7 +517,7 @@ ZenStructuredCacheSession::PutCacheValue(std::string_view Namespace, IoBuffer Payload) { ExtendableStringBuilder<256> Uri; - Uri << m_Client.ServiceUrl() << "/z$/"; + Uri << m_Client->ServiceUrl() << "/z$/"; if (Namespace != ZenCacheStore::DefaultNamespace) { Uri << Namespace << "/"; @@ -546,7 +546,7 @@ ZenCacheResult ZenStructuredCacheSession::InvokeRpc(const CbObjectView& Request) { ExtendableStringBuilder<256> Uri; - Uri << m_Client.ServiceUrl() << "/z$/$rpc"; + Uri << m_Client->ServiceUrl() << "/z$/$rpc"; BinaryWriter Body; Request.CopyTo(Body); @@ -579,7 +579,7 @@ ZenCacheResult ZenStructuredCacheSession::InvokeRpc(const CbPackage& Request) { ExtendableStringBuilder<256> Uri; - Uri << m_Client.ServiceUrl() << "/z$/$rpc"; + Uri << m_Client->ServiceUrl() << "/z$/$rpc"; SharedBuffer Message = FormatPackageMessageBuffer(Request).Flatten(); diff --git a/zenserver/upstream/zen.h b/zenserver/upstream/zen.h index e8590f940..955cfa107 100644 --- a/zenserver/upstream/zen.h +++ b/zenserver/upstream/zen.h @@ -124,7 +124,7 @@ struct ZenStructuredCacheClientOptions class ZenStructuredCacheSession { public: - ZenStructuredCacheSession(ZenStructuredCacheClient& OuterClient); + ZenStructuredCacheSession(Ref<ZenStructuredCacheClient>&& OuterClient); ~ZenStructuredCacheSession(); ZenCacheResult CheckHealth(); @@ -147,7 +147,7 @@ private: inline spdlog::logger& Log() { return m_Log; } spdlog::logger& m_Log; - ZenStructuredCacheClient& m_Client; + Ref<ZenStructuredCacheClient> m_Client; detail::ZenCacheSessionState* m_SessionState; }; diff --git a/zenutil/zenserverprocess.cpp b/zenutil/zenserverprocess.cpp index 3a4957b76..ae347dd53 100644 --- a/zenutil/zenserverprocess.cpp +++ b/zenutil/zenserverprocess.cpp @@ -466,6 +466,7 @@ ZenServerInstance::Shutdown() { ZEN_INFO("Terminating zenserver process"); m_Process.Terminate(111); + m_Process.Reset(); } else { |