From bba212f57958f7b554ad41d608e16df66f397db8 Mon Sep 17 00:00:00 2001 From: Stefan Boberg Date: Thu, 16 Sep 2021 15:52:28 +0200 Subject: Minor CbPackage serialization tweaks --- zenhttp/httpshared.cpp | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/zenhttp/httpshared.cpp b/zenhttp/httpshared.cpp index 2dbf95959..b0c5493db 100644 --- a/zenhttp/httpshared.cpp +++ b/zenhttp/httpshared.cpp @@ -123,23 +123,22 @@ ParsePackageMessage(IoBuffer Payload, std::function AttachmentEntries{new CbAttachmentEntry[ChunkCount]}; Reader.Read(AttachmentEntries.get(), sizeof(CbAttachmentEntry) * ChunkCount); + CbPackage Package; + for (uint32_t i = 0; i < ChunkCount; ++i) { const CbAttachmentEntry& Entry = AttachmentEntries[i]; -- cgit v1.2.3 From 4b1b3edc21acf3b19649a85e49a57464bf169314 Mon Sep 17 00:00:00 2001 From: Stefan Boberg Date: Thu, 16 Sep 2021 21:04:35 +0200 Subject: Added ProcessMonitor class, which is used to monitor a number of sponsor processes, to control Zen instance lifetime --- zencore/include/zencore/thread.h | 27 ++++++++++++++- zencore/thread.cpp | 75 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 101 insertions(+), 1 deletion(-) diff --git a/zencore/include/zencore/thread.h b/zencore/include/zencore/thread.h index b18da6031..0e34d6518 100644 --- a/zencore/include/zencore/thread.h +++ b/zencore/include/zencore/thread.h @@ -8,6 +8,8 @@ # include #endif +#include + namespace zen { /** @@ -93,6 +95,7 @@ public: ZENCORE_API void Set(); ZENCORE_API void Reset(); ZENCORE_API bool Wait(int TimeoutMs = -1); + ZENCORE_API void Close(); protected: explicit Event(void* EventHandle) : m_EventHandle(EventHandle) {} @@ -125,7 +128,7 @@ private: }; /** Basic process abstraction - */ + */ class ProcessHandle { public: @@ -150,6 +153,28 @@ private: int m_Pid = 0; }; +/** Process monitor - monitors a list of running processes via polling + + Intended to be used to monitor a set of "sponsor" processes, where + we need to determine when none of them remain alive + + */ + +class ProcessMonitor +{ +public: + ProcessMonitor(); + ~ProcessMonitor(); + + ZENCORE_API bool IsRunning(); + ZENCORE_API void AddPid(int Pid); + ZENCORE_API bool IsActive() const; + +private: + mutable RwLock m_Lock; + std::vector m_ProcessHandles; +}; + ZENCORE_API bool IsProcessRunning(int pid); ZENCORE_API int GetCurrentProcessId(); diff --git a/zencore/thread.cpp b/zencore/thread.cpp index 598466bb4..ee00d38d4 100644 --- a/zencore/thread.cpp +++ b/zencore/thread.cpp @@ -76,6 +76,13 @@ Event::Reset() ResetEvent(m_EventHandle); } +void +Event::Close() +{ + CloseHandle(m_EventHandle); + m_EventHandle = nullptr; +} + bool Event::Wait(int TimeoutMs) { @@ -168,6 +175,7 @@ ProcessHandle::Initialize(void* ProcessHandle) ZEN_ASSERT(m_ProcessHandle == nullptr); // TODO: perform some debug verification here to verify it's a valid handle? m_ProcessHandle = ProcessHandle; + m_Pid = GetProcessId(m_ProcessHandle); } ProcessHandle::~ProcessHandle() @@ -255,6 +263,73 @@ ProcessHandle::Wait(int TimeoutMs) return false; } +////////////////////////////////////////////////////////////////////////// + +ProcessMonitor::ProcessMonitor() +{ +} + +ProcessMonitor::~ProcessMonitor() +{ + RwLock::ExclusiveLockScope _(m_Lock); + + for (HANDLE& Proc : m_ProcessHandles) + { + CloseHandle(Proc); + Proc = 0; + } +} + +bool +ProcessMonitor::IsRunning() +{ + RwLock::ExclusiveLockScope _(m_Lock); + + bool FoundOne = false; + + for (HANDLE& Proc : m_ProcessHandles) + { + DWORD ExitCode = 0; + GetExitCodeProcess(Proc, &ExitCode); + + if (ExitCode != STILL_ACTIVE) + { + CloseHandle(Proc); + Proc = 0; + } + else + { + // Still alive + FoundOne = true; + } + } + + std::erase_if(m_ProcessHandles, [](HANDLE Handle) { return Handle == 0; }); + + return FoundOne; +} + +void +ProcessMonitor::AddPid(int Pid) +{ + HANDLE ProcessHandle = OpenProcess(PROCESS_QUERY_INFORMATION | SYNCHRONIZE, FALSE, Pid); + + if (ProcessHandle) + { + RwLock::ExclusiveLockScope _(m_Lock); + m_ProcessHandles.push_back(ProcessHandle); + } +} + +bool +ProcessMonitor::IsActive() const +{ + RwLock::SharedLockScope _(m_Lock); + return m_ProcessHandles.empty() == false; +} + +////////////////////////////////////////////////////////////////////////// + bool IsProcessRunning(int pid) { -- cgit v1.2.3 From 80e904aa7cab7980249cf127573eec5b12b3e01d Mon Sep 17 00:00:00 2001 From: Stefan Boberg Date: Thu, 16 Sep 2021 21:04:56 +0200 Subject: Added ZEN_CONSOLE macro, which logs the output directly to console --- zencore/include/zencore/logging.h | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/zencore/include/zencore/logging.h b/zencore/include/zencore/logging.h index 8e6b3a244..e98509bf8 100644 --- a/zencore/include/zencore/logging.h +++ b/zencore/include/zencore/logging.h @@ -80,3 +80,10 @@ using zen::Log; using namespace std::literals; \ Log().critical(fmtstr##sv, __VA_ARGS__); \ } while (false) + +#define ZEN_CONSOLE(fmtstr, ...) \ + do \ + { \ + using namespace std::literals; \ + ConsoleLog().info(fmtstr##sv, __VA_ARGS__); \ + } while (false) -- cgit v1.2.3 From f26790ef625625eb9b62799e646e772ae0291ca7 Mon Sep 17 00:00:00 2001 From: Stefan Boberg Date: Thu, 16 Sep 2021 21:05:22 +0200 Subject: Improved top/ps behaviour --- zen/cmds/top.cpp | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/zen/cmds/top.cpp b/zen/cmds/top.cpp index 5bb11d0a0..b0d684705 100644 --- a/zen/cmds/top.cpp +++ b/zen/cmds/top.cpp @@ -23,7 +23,9 @@ TopCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) ZenServerState State; if (!State.InitializeReadOnly()) { - ZEN_INFO("no Zen state found"); + ZEN_CONSOLE("no Zen state found"); + + return 0; } State.Snapshot([&](const ZenServerState::ZenServerEntry& Entry) { ZEN_INFO("Port {} : pid {}", Entry.ListenPort, Entry.Pid); }); @@ -47,10 +49,12 @@ PsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) ZenServerState State; if (!State.InitializeReadOnly()) { - ZEN_INFO("no Zen state found"); + ZEN_CONSOLE("no Zen state found"); + + return 0; } - State.Snapshot([&](const ZenServerState::ZenServerEntry& Entry) { ZEN_INFO("Port {} : pid {}", Entry.ListenPort, Entry.Pid); }); + State.Snapshot([&](const ZenServerState::ZenServerEntry& Entry) { ZEN_CONSOLE("Port {} : pid {}", Entry.ListenPort, Entry.Pid); }); return 0; } -- cgit v1.2.3 From e5d44a89fac05c90ef7d784a6e82029f4f9b2065 Mon Sep 17 00:00:00 2001 From: Stefan Boberg Date: Thu, 16 Sep 2021 21:05:39 +0200 Subject: Added stubs for scrub CLI command --- zen/cmds/scrub.cpp | 19 +++++++++++++++++++ zen/cmds/scrub.h | 23 +++++++++++++++++++++++ zen/zen.vcxproj | 2 ++ zen/zen.vcxproj.filters | 2 ++ 4 files changed, 46 insertions(+) create mode 100644 zen/cmds/scrub.cpp create mode 100644 zen/cmds/scrub.h diff --git a/zen/cmds/scrub.cpp b/zen/cmds/scrub.cpp new file mode 100644 index 000000000..a9b8505ec --- /dev/null +++ b/zen/cmds/scrub.cpp @@ -0,0 +1,19 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "scrub.h" + +using namespace std::literals; + +ScrubCommand::ScrubCommand() +{ +} + +ScrubCommand::~ScrubCommand() = default; + +int +ScrubCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) +{ + ZEN_UNUSED(GlobalOptions, argc, argv); + + return 0; +} diff --git a/zen/cmds/scrub.h b/zen/cmds/scrub.h new file mode 100644 index 000000000..1bfb4ad6c --- /dev/null +++ b/zen/cmds/scrub.h @@ -0,0 +1,23 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include "../internalfile.h" +#include "../zen.h" + +#include + +/** Scrub storage + */ +class ScrubCommand : public ZenCmdBase +{ +public: + ScrubCommand(); + ~ScrubCommand(); + + virtual int Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) override; + virtual cxxopts::Options* Options() override { return &m_Options; } + +private: + cxxopts::Options m_Options{"scrub", "Scrub zen storage"}; +}; diff --git a/zen/zen.vcxproj b/zen/zen.vcxproj index 4f0691fab..ff10f4c59 100644 --- a/zen/zen.vcxproj +++ b/zen/zen.vcxproj @@ -100,6 +100,7 @@ + @@ -114,6 +115,7 @@ + diff --git a/zen/zen.vcxproj.filters b/zen/zen.vcxproj.filters index 47b321727..a38771944 100644 --- a/zen/zen.vcxproj.filters +++ b/zen/zen.vcxproj.filters @@ -27,6 +27,7 @@ + @@ -55,6 +56,7 @@ + -- cgit v1.2.3 From 287f6280fcb32820f2c69def0bdced96d804429b Mon Sep 17 00:00:00 2001 From: Stefan Boberg Date: Thu, 16 Sep 2021 21:08:47 +0200 Subject: Changed how sponsor processes are managed We can now monitor more than one process and if a new process is started on the same port we will hand over the owner pid to the process which is already executing before exiting. Note that this is only done if there is actually already an owner process in the instance list. --- zenserver-test/zenserver-test.cpp | 62 +++++++++++++++++++++++++++ zenserver/zenserver.cpp | 87 +++++++++++++++++++++++++++++--------- zenutil/include/zenserverprocess.h | 12 +++++- zenutil/zenserverprocess.cpp | 58 ++++++++++++++++++++++++- 4 files changed, 194 insertions(+), 25 deletions(-) diff --git a/zenserver-test/zenserver-test.cpp b/zenserver-test/zenserver-test.cpp index 8baf57d7e..973ef874a 100644 --- a/zenserver-test/zenserver-test.cpp +++ b/zenserver-test/zenserver-test.cpp @@ -1491,4 +1491,66 @@ TEST_CASE("http.package") CHECK_EQ(ResponsePackage, TestPackage); } +# if 0 +TEST_CASE("lifetime.owner") +{ + // This test is designed to verify that the hand-over of sponsor processes is handled + // correctly for the case when a second or third process is launched on the same port + // + // Due to the nature of it, it cannot be + + const uint16_t PortNumber = 23456; + + ZenServerInstance Zen1(TestEnv); + std::filesystem::path TestDir1 = TestEnv.CreateNewTestDir(); + Zen1.SetTestDir(TestDir1); + Zen1.SpawnServer(PortNumber); + Zen1.WaitUntilReady(); + Zen1.Detach(); + + ZenServerInstance Zen2(TestEnv); + std::filesystem::path TestDir2 = TestEnv.CreateNewTestDir(); + Zen2.SetTestDir(TestDir2); + Zen2.SpawnServer(PortNumber); + Zen2.WaitUntilReady(); + Zen2.Detach(); +} + +TEST_CASE("lifetime.owner.2") +{ + // This test is designed to verify that the hand-over of sponsor processes is handled + // correctly for the case when a second or third process is launched on the same port + // + // Due to the nature of it, it cannot be + + const uint16_t PortNumber = 13456; + + std::filesystem::path TestDir1 = TestEnv.CreateNewTestDir(); + std::filesystem::path TestDir2 = TestEnv.CreateNewTestDir(); + + ZenServerInstance Zen1(TestEnv); + Zen1.SetTestDir(TestDir1); + Zen1.SpawnServer(PortNumber); + Zen1.WaitUntilReady(); + + ZenServerInstance Zen2(TestEnv); + Zen2.SetTestDir(TestDir2); + Zen2.SetOwnerPid(Zen1.GetPid()); + Zen2.SpawnServer(PortNumber + 1); + Zen2.Detach(); + + ZenServerInstance Zen3(TestEnv); + Zen3.SetTestDir(TestDir2); + Zen3.SetOwnerPid(Zen1.GetPid()); + Zen3.SpawnServer(PortNumber + 1); + Zen3.Detach(); + + ZenServerInstance Zen4(TestEnv); + Zen4.SetTestDir(TestDir2); + Zen4.SetOwnerPid(Zen1.GetPid()); + Zen4.SpawnServer(PortNumber + 1); + Zen4.Detach(); +} +# endif + #endif diff --git a/zenserver/zenserver.cpp b/zenserver/zenserver.cpp index 3b56d8683..aa4a42fd7 100644 --- a/zenserver/zenserver.cpp +++ b/zenserver/zenserver.cpp @@ -84,9 +84,12 @@ class ZenServer { + ZenServerState::ZenServerEntry* m_ServerEntry = nullptr; + public: - void Initialize(ZenServiceConfig& ServiceConfig, int BasePort, int ParentPid) + void Initialize(ZenServiceConfig& ServiceConfig, int BasePort, int ParentPid, ZenServerState::ZenServerEntry* ServerEntry) { + m_ServerEntry = ServerEntry; using namespace fmt::literals; ZEN_INFO(ZEN_APP_NAME " initializing"); @@ -94,23 +97,29 @@ public: if (ParentPid) { - m_Process.Initialize(ParentPid); + zen::ProcessHandle OwnerProcess; + OwnerProcess.Initialize(ParentPid); - if (!m_Process.IsValid()) + if (!OwnerProcess.IsValid()) { ZEN_WARN("Unable to initialize process handle for specified parent pid #{}", ParentPid); + + // If the pid is not reachable should we just shut down immediately? the intended owner process + // could have been killed or somehow crashed already } else { ZEN_INFO("Using parent pid #{} to control process lifetime", ParentPid); } + + m_ProcessMonitor.AddPid(ParentPid); } // Initialize/check mutex based on base port std::string MutexName = "zen_{}"_format(BasePort); - if (zen::NamedMutex::Exists(MutexName) || (m_ServerMutex.Create(MutexName) == false)) + if (zen::NamedMutex::Exists(MutexName) || ((m_ServerMutex.Create(MutexName) == false))) { throw std::runtime_error("Failed to create mutex '{}' - is another instance already running?"_format(MutexName).c_str()); } @@ -267,7 +276,7 @@ public: void Run() { - if (m_Process.IsValid()) + if (m_ProcessMonitor.IsActive()) { EnqueueTimer(); } @@ -282,7 +291,7 @@ public: ZEN_INFO(" \\/ \\/ \\/ \\/ \\/ "); } - ZEN_INFO(ZEN_APP_NAME " now running"); + ZEN_INFO(ZEN_APP_NAME " now running (pid: {})", zen::GetCurrentProcessId()); #if USE_SENTRY sentry_clear_modulecache(); @@ -332,13 +341,33 @@ public: void CheckOwnerPid() { - if (m_Process.IsRunning()) + // Pick up any new "owner" processes + + std::set AddedPids; + + for (auto& PidEntry : m_ServerEntry->SponsorPids) + { + if (uint32_t ThisPid = PidEntry.load(std::memory_order::memory_order_relaxed)) + { + if (PidEntry.compare_exchange_strong(ThisPid, 0)) + { + if (AddedPids.insert(ThisPid).second) + { + m_ProcessMonitor.AddPid(ThisPid); + + ZEN_INFO("added process with pid #{} as a sponsor process", ThisPid); + } + } + } + } + + if (m_ProcessMonitor.IsRunning()) { EnqueueTimer(); } else { - ZEN_INFO(ZEN_APP_NAME " exiting since parent process id {} is gone", m_Process.Pid()); + ZEN_INFO(ZEN_APP_NAME " exiting since sponsor processes are all gone"); RequestExit(0); } @@ -366,7 +395,7 @@ private: std::jthread m_IoRunner; asio::io_context m_IoContext; asio::steady_timer m_PidCheckTimer{m_IoContext}; - zen::ProcessHandle m_Process; + zen::ProcessMonitor m_ProcessMonitor; zen::NamedMutex m_ServerMutex; zen::Ref m_Http; @@ -411,27 +440,41 @@ main(int argc, char* argv[]) auto _ = zen::MakeGuard([&] { sentry_close(); }); #endif - // Prototype config system, let's see how this pans out + try + { + // Prototype config system, we'll see how this pans out + // + // TODO: we need to report any parse errors here - ParseServiceConfig(GlobalOptions.DataDir, /* out */ ServiceConfig); + ParseServiceConfig(GlobalOptions.DataDir, /* out */ ServiceConfig); - ZEN_INFO("zen cache server starting on port {}", GlobalOptions.BasePort); + ZEN_INFO("zen cache server starting on port {}", GlobalOptions.BasePort); - try - { ZenServerState ServerState; ServerState.Initialize(); ServerState.Sweep(); - if (ZenServerState::ZenServerEntry* Entry = ServerState.Lookup(GlobalOptions.BasePort)) + ZenServerState::ZenServerEntry* Entry = ServerState.Lookup(GlobalOptions.BasePort); + + if (Entry) { // Instance already running for this port? Should double check pid ZEN_WARN("Looks like there is already a process listening to this port (pid: {})", Entry->Pid); + + if (GlobalOptions.OwnerPid) + { + Entry->AddSponsorProcess(GlobalOptions.OwnerPid); + + std::exit(0); + } } - else + + Entry = ServerState.Register(GlobalOptions.BasePort); + + if (GlobalOptions.OwnerPid) { - ServerState.Register(GlobalOptions.BasePort); + Entry->AddSponsorProcess(GlobalOptions.OwnerPid); } std::unique_ptr ShutdownThread; @@ -445,15 +488,17 @@ main(int argc, char* argv[]) Server.SetDataRoot(GlobalOptions.DataDir); Server.SetTestMode(GlobalOptions.IsTest); Server.SetDedicatedMode(GlobalOptions.IsDedicated); - Server.Initialize(ServiceConfig, GlobalOptions.BasePort, GlobalOptions.OwnerPid); + Server.Initialize(ServiceConfig, GlobalOptions.BasePort, GlobalOptions.OwnerPid, Entry); // Monitor shutdown signals ShutdownThread.reset(new std::thread{[&] { ZEN_INFO("shutdown monitor thread waiting for shutdown signal '{}'", ShutdownEventName); - ShutdownEvent->Wait(); - ZEN_INFO("shutdown signal received"); - Server.RequestExit(0); + if (ShutdownEvent->Wait()) + { + ZEN_INFO("shutdown signal received"); + Server.RequestExit(0); + } }}); // If we have a parent process, establish the mechanisms we need diff --git a/zenutil/include/zenserverprocess.h b/zenutil/include/zenserverprocess.h index b81d61c25..7fcacf788 100644 --- a/zenutil/include/zenserverprocess.h +++ b/zenutil/include/zenserverprocess.h @@ -10,6 +10,7 @@ #include #include +#include class ZenServerEnvironment { @@ -44,6 +45,9 @@ struct ZenServerInstance [[nodiscard]] bool WaitUntilReady(int Timeout); void EnableTermination() { m_Terminate = true; } void EnableMesh() { m_MeshEnabled = true; } + void Detach(); + inline int GetPid() { return m_Process.Pid(); } + inline void SetOwnerPid(int Pid) { m_OwnerPid = Pid; } void SetTestDir(std::filesystem::path TestDir) { @@ -66,6 +70,7 @@ private: std::filesystem::path m_TestDir; bool m_MeshEnabled = false; int m_BasePort = 0; + std::optional m_OwnerPid; void CreateShutdownEvent(int BasePort); }; @@ -90,7 +95,9 @@ public: std::atomic ListenPort; std::atomic Flags; uint8_t SessionId[12]; + std::atomic SponsorPids[32]; uint8_t Padding[12]; + uint8_t Padding2[96]; enum class FlagsEnum : uint16_t { @@ -101,9 +108,10 @@ public: void Reset(); void SignalShutdownRequest(); + bool AddSponsorProcess(uint32_t Pid); }; - static_assert(sizeof(ZenServerEntry) == 32); + static_assert(sizeof(ZenServerEntry) == 256); void Initialize(); [[nodiscard]] bool InitializeReadOnly(); @@ -115,6 +123,6 @@ public: private: void* m_hMapFile = nullptr; ZenServerEntry* m_Data; - int m_MaxEntryCount = 4096 / sizeof(ZenServerEntry); + int m_MaxEntryCount = 131072 / sizeof(ZenServerEntry); ZenServerEntry* m_OurEntry = nullptr; }; diff --git a/zenutil/zenserverprocess.cpp b/zenutil/zenserverprocess.cpp index 00f5d4c0d..5142c6a54 100644 --- a/zenutil/zenserverprocess.cpp +++ b/zenutil/zenserverprocess.cpp @@ -6,6 +6,7 @@ #include #include #include +#include #include #include @@ -199,6 +200,9 @@ ZenServerState::Register(int ListenPort) Entry.Pid = Pid; Entry.Flags = 0; + const zen::Oid SesId = zen::GetSessionId(); + memcpy(Entry.SessionId, &SesId, sizeof SesId); + return &Entry; } } @@ -264,6 +268,30 @@ ZenServerState::ZenServerEntry::SignalShutdownRequest() Flags |= uint16_t(FlagsEnum::kShutdownPlease); } +bool +ZenServerState::ZenServerEntry::AddSponsorProcess(uint32_t PidToAdd) +{ + for (std::atomic& PidEntry : SponsorPids) + { + if (PidEntry.load(std::memory_order::memory_order_relaxed) == 0) + { + uint32_t Expected = 0; + if (PidEntry.compare_exchange_strong(Expected, uint16_t(PidToAdd))) + { + // Success! + return true; + } + } + else if (PidEntry.load(std::memory_order::memory_order_relaxed) == PidToAdd) + { + // Success, the because pid is already in the list + return true; + } + } + + return false; +} + ////////////////////////////////////////////////////////////////////////// std::atomic TestCounter{0}; @@ -395,7 +423,17 @@ ZenServerInstance::SpawnServer(int BasePort) if (IsTest) { - CommandLine << " --test --owner-pid " << MyPid << " --log-id " << LogId; + if (!m_OwnerPid.has_value()) + { + m_OwnerPid = MyPid; + } + + CommandLine << " --test --log-id " << LogId; + } + + if (m_OwnerPid.has_value()) + { + CommandLine << " --owner-pid " << m_OwnerPid.value(); } CommandLine << " --child-id " << ChildEventName; @@ -548,10 +586,26 @@ ZenServerInstance::AttachToRunningServer(int BasePort) CreateShutdownEvent(BasePort); } +void +ZenServerInstance::Detach() +{ + if (m_Process.IsValid()) + { + m_Process.Reset(); + m_ShutdownEvent.Close(); + } +} + void ZenServerInstance::WaitUntilReady() { - m_ReadyEvent.Wait(); + while (m_ReadyEvent.Wait(100) == false) + { + if (!m_Process.IsRunning() || !m_Process.IsValid()) + { + return; + } + } } bool -- cgit v1.2.3