diff options
Diffstat (limited to 'src/zenutil')
| -rw-r--r-- | src/zenutil/include/zenutil/logging.h | 24 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/sessionsclient.h | 21 | ||||
| -rw-r--r-- | src/zenutil/logging/logging.cpp | 88 | ||||
| -rw-r--r-- | src/zenutil/sessionsclient.cpp | 35 | ||||
| -rw-r--r-- | src/zenutil/zenserverprocess.cpp | 42 |
5 files changed, 197 insertions, 13 deletions
diff --git a/src/zenutil/include/zenutil/logging.h b/src/zenutil/include/zenutil/logging.h index 6abf6a96f..94a45e46f 100644 --- a/src/zenutil/include/zenutil/logging.h +++ b/src/zenutil/include/zenutil/logging.h @@ -19,8 +19,9 @@ // namespace zen::logging { +class BacklogSink; class BroadcastSink; -} +} // namespace zen::logging namespace zen { @@ -46,4 +47,25 @@ void ShutdownLogging(); logging::SinkPtr GetFileSink(); Ref<logging::BroadcastSink> GetDefaultBroadcastSink(); +/// The default backlog sink, installed alongside the broadcast sink at +/// BeginInitializeLogging. Captures every log line until DisableLogBacklog() +/// is called. Use AttachSinkWithBacklogReplay() to add a sink that should +/// receive the backlog, or call Replay() directly on this sink for ad-hoc +/// replay into a target. May return nullptr before logging is initialized +/// or after the backlog is disabled. +Ref<logging::BacklogSink> GetLogBacklogSink(); + +/// Add a sink to the default broadcast and replay the captured backlog +/// into it before any new messages reach it. Use this for any sink that +/// should see the early-startup window — e.g. an in-proc session log +/// sink, a Sentry breadcrumb sink, an OTLP forwarder. After the backlog +/// has been disabled this is equivalent to a plain AddSink. +void AttachSinkWithBacklogReplay(logging::SinkPtr Sink); + +/// Stop capturing into the backlog and free its buffer. Idempotent. +/// Call this once the bootstrap window has closed — typically right +/// before the server enters its run loop or the CLI dispatches its +/// command. Subsequent log calls bypass the backlog cheaply. +void DisableLogBacklog(); + } // namespace zen diff --git a/src/zenutil/include/zenutil/sessionsclient.h b/src/zenutil/include/zenutil/sessionsclient.h index c144a9baa..ae2364279 100644 --- a/src/zenutil/include/zenutil/sessionsclient.h +++ b/src/zenutil/include/zenutil/sessionsclient.h @@ -24,12 +24,21 @@ class SessionsServiceClient public: struct Options { - std::string TargetUrl; // Base URL of the target zenserver (e.g. "http://localhost:8558") - std::string AppName; // Application name to register - std::string Mode; // Server mode (e.g. "Server", "Compute", "Proxy") - Oid SessionId = Oid::Zero; // Session ID to register under - Oid JobId = Oid::Zero; // Optional job ID - HttpClientSettings ClientSettings; // Optional; timeouts are overridden internally (e.g. for unix sockets) + std::string TargetUrl; // Base URL of the target zenserver (e.g. "http://localhost:8558") + std::string AppName; // Application name to register + std::string Mode; // Server mode (e.g. "Server", "Compute", "Proxy") + std::string Platform; // Client platform; empty = auto-detect via GetRuntimePlatformName() + // PID the server uses to track this client's liveness. 0 = don't + // report. Auto-filled to GetCurrentProcessId() when TargetUrl looks + // local (unix socket, localhost, 127.0.0.1); set explicitly to + // override or suppress. + uint32_t ClientPid = 0; + Oid SessionId = Oid::Zero; // Session ID to register under + Oid ParentSessionId = Oid::Zero; // Optional parent session ID + // Optional task/action identifier. Use this to associate the session with + // a specific unit of work while ParentSessionId links process/session ancestry. + Oid JobId = Oid::Zero; + HttpClientSettings ClientSettings; // Optional; timeouts are overridden internally (e.g. for unix sockets) }; /// Command sent to the background worker thread. diff --git a/src/zenutil/logging/logging.cpp b/src/zenutil/logging/logging.cpp index 936e3c4fd..06e8f920e 100644 --- a/src/zenutil/logging/logging.cpp +++ b/src/zenutil/logging/logging.cpp @@ -8,6 +8,7 @@ #include <zencore/logging.h> #include <zencore/logging/ansicolorsink.h> #include <zencore/logging/asyncsink.h> +#include <zencore/logging/backlogsink.h> #include <zencore/logging/broadcastsink.h> #include <zencore/logging/logger.h> #include <zencore/logging/msvcsink.h> @@ -27,6 +28,7 @@ namespace zen { static bool g_IsLoggingInitialized; logging::SinkPtr g_FileSink; Ref<logging::BroadcastSink> g_BroadcastSink; +Ref<logging::BacklogSink> g_BacklogSink; logging::SinkPtr GetFileSink() @@ -40,6 +42,83 @@ GetDefaultBroadcastSink() return g_BroadcastSink; } +Ref<logging::BacklogSink> +GetLogBacklogSink() +{ + return g_BacklogSink; +} + +void +AttachSinkWithBacklogReplay(logging::SinkPtr Sink) +{ + if (!Sink) + { + return; + } + // Drain any AsyncSink queue first so messages emitted just before this + // call have a chance to land in the backlog before we snapshot its + // cursor. Without this we'd still be correct (AsyncSink would deliver + // the queued lines to the new sink directly via the broadcast after + // AddSink runs), but the new sink would observe them out of timestamp + // order — replayed history first, then late drains interleaved with + // fresh post-attach messages. + zen::logging::FlushLogging(); + + if (!g_BroadcastSink) + { + return; + } + + // Subscribe Sink to the broadcast and snapshot the backlog cursor + // atomically. The broadcast's exclusive lock blocks fanout while it's + // held, so during the callback no thread can be inside backlog.Log() + // and the cursor we capture is exact w.r.t. the moment Sink became + // visible to fanout. + // + // After the callback returns and the lock is released, two things are + // true: + // - Any backlog entry at index < Cursor was added before Sink became + // a fanout target, so Sink did NOT receive it via Log() — Replay + // must deliver it. + // - Any backlog entry at index >= Cursor was added after Sink became + // a fanout target, so Sink already received it via Log() — Replay + // must skip it. + // This closes the prior race where a message arriving between Replay + // and AddSink would land in the backlog only and be lost on + // DisableLogBacklog. + size_t Cursor = 0; + g_BroadcastSink->AddSinkAtomic(Sink, [&]() { + if (g_BacklogSink && g_BacklogSink->IsEnabled()) + { + Cursor = g_BacklogSink->Size(); + } + }); + + if (Cursor > 0) + { + g_BacklogSink->Replay(*Sink, Cursor); + } +} + +void +DisableLogBacklog() +{ + if (!g_BacklogSink) + { + return; + } + // Remove from the broadcast first so subsequent log lines don't even + // fan out to a now-no-op sink — one fewer per-log overhead for the + // rest of the process lifetime. Disable() then frees the arena, and + // dropping our own Ref releases the object. + if (g_BroadcastSink) + { + g_BroadcastSink->RemoveSink(logging::SinkPtr(g_BacklogSink.Get())); + } + g_BacklogSink->Disable(); + g_BacklogSink = nullptr; +} + void InitializeLogging(const LoggingOptions& LogOptions) { @@ -129,6 +208,14 @@ BeginInitializeLogging(const LoggingOptions& LogOptions) // a child sink later is immediately visible to every logger. std::vector<logging::SinkPtr> BroadcastChildren; + // Install the backlog sink as the first broadcast child so it sees + // every line emitted from this point until the bootstrap window + // closes (DisableLogBacklog at server-run-loop / CLI-dispatch). Sinks + // attached later via AttachSinkWithBacklogReplay can replay the + // captured window into themselves so they don't miss the early logs. + g_BacklogSink = Ref<logging::BacklogSink>(new logging::BacklogSink()); + BroadcastChildren.push_back(logging::SinkPtr(g_BacklogSink.Get())); + if (LogOptions.NoConsoleOutput) { zen::logging::SuppressConsoleLog(); @@ -274,6 +361,7 @@ ShutdownLogging() g_FileSink = nullptr; g_BroadcastSink = nullptr; + g_BacklogSink = nullptr; } } // namespace zen diff --git a/src/zenutil/sessionsclient.cpp b/src/zenutil/sessionsclient.cpp index 6ba997a62..f8dab4fb9 100644 --- a/src/zenutil/sessionsclient.cpp +++ b/src/zenutil/sessionsclient.cpp @@ -6,6 +6,8 @@ #include <zencore/fmtutils.h> #include <zencore/iobuffer.h> #include <zencore/logging/logmsg.h> +#include <zencore/process.h> +#include <zencore/system.h> #include <zencore/thread.h> #include <vector> @@ -70,6 +72,27 @@ SessionsServiceClient::SessionsServiceClient(Options Opts) m_Options.TargetUrl.pop_back(); } + // Auto-detect the platform if the caller didn't set one explicitly. + if (m_Options.Platform.empty()) + { + m_Options.Platform = std::string(GetRuntimePlatformName()); + } + + // Auto-fill ClientPid when we can reasonably assume the target is on the + // same machine. The server ALSO defensively gates pid acceptance on + // IsLocalMachineRequest(), so sending a pid for a non-local URL doesn't + // cause false positives — this heuristic just avoids the redundant send. + if (m_Options.ClientPid == 0) + { + const bool IsUnixSocket = !m_Options.ClientSettings.UnixSocketPath.empty(); + const bool LooksLocal = IsUnixSocket || m_Options.TargetUrl.find("localhost") != std::string::npos || + m_Options.TargetUrl.find("127.0.0.1") != std::string::npos; + if (LooksLocal) + { + m_Options.ClientPid = static_cast<uint32_t>(GetCurrentProcessId()); + } + } + m_WorkerThread = std::thread([this]() { zen::SetCurrentThreadName("SessionIO"); WorkerLoop(); @@ -98,6 +121,18 @@ SessionsServiceClient::BuildRequestBody(CbObjectView Metadata) const { Writer << "mode" << m_Options.Mode; } + if (!m_Options.Platform.empty()) + { + Writer << "platform" << m_Options.Platform; + } + if (m_Options.ClientPid != 0) + { + Writer << "pid" << m_Options.ClientPid; + } + if (m_Options.ParentSessionId != Oid::Zero) + { + Writer << "parent_session_id" << m_Options.ParentSessionId; + } if (m_Options.JobId != Oid::Zero) { Writer << "jobid" << m_Options.JobId; diff --git a/src/zenutil/zenserverprocess.cpp b/src/zenutil/zenserverprocess.cpp index 2d4334ffa..16232333f 100644 --- a/src/zenutil/zenserverprocess.cpp +++ b/src/zenutil/zenserverprocess.cpp @@ -15,6 +15,7 @@ #include <zencore/timer.h> #include <atomic> +#include <cctype> #include <string> #include <gsl/gsl-lite.hpp> @@ -555,6 +556,28 @@ ZenServerState::ZenServerEntry::AddSponsorProcess(uint32_t PidToAdd, uint64_t Ti static constexpr size_t kInstanceInfoSize = 4096; +// Token-aware search for a CLI flag (e.g. "--parent-session") within an +// argument string. Avoids false positives like "--parent-session-foo" by +// requiring the match to start at the beginning or after whitespace, and to +// end at the end of the string, at '=', or at whitespace. +static bool +HasCliFlag(std::string_view Args, std::string_view Flag) +{ + size_t Pos = 0; + while ((Pos = Args.find(Flag, Pos)) != std::string_view::npos) + { + const bool LeftOk = (Pos == 0) || std::isspace(static_cast<unsigned char>(Args[Pos - 1])); + const size_t End = Pos + Flag.size(); + const bool RightOk = (End == Args.size()) || (Args[End] == '=') || std::isspace(static_cast<unsigned char>(Args[End])); + if (LeftOk && RightOk) + { + return true; + } + Pos = End; + } + return false; +} + ZenServerInstanceInfo::ZenServerInstanceInfo() = default; ZenServerInstanceInfo::~ZenServerInstanceInfo() @@ -1119,9 +1142,10 @@ ZenServerInstance::SpawnServerInternal(int ChildId, std::string_view ServerArgs, ExtendableStringBuilder<512> CommandLine; { - const std::string ExeUtf8 = PathToUtf8(Executable); + ExtendableStringBuilder<260> ExeUtf8; + PathToUtf8(Executable, ExeUtf8); constexpr AsciiSet QuoteChars = " \t\""; - if (AsciiSet::HasAny(ExeUtf8.c_str(), QuoteChars)) + if (AsciiSet::HasAny(ExeUtf8.ToView(), QuoteChars)) { CommandLine << '"' << ExeUtf8 << '"'; } @@ -1147,6 +1171,11 @@ ZenServerInstance::SpawnServerInternal(int ChildId, std::string_view ServerArgs, CommandLine << " --enable-execution-history=false"; } + if (!HasCliFlag(ServerArgs, "--parent-session")) + { + CommandLine << " --parent-session " << GetSessionIdString(); + } + if (!ServerArgs.empty()) { CommandLine << " " << ServerArgs; @@ -1246,7 +1275,7 @@ ZenServerInstance::SpawnServer(int BasePort, std::string_view AdditionalServerAr CommandLine << " --test --log-id " << m_Name; CommandLine << " --no-sentry"; - if (AdditionalServerArgs.find("--system-dir") == std::string_view::npos) + if (!HasCliFlag(AdditionalServerArgs, "--system-dir")) { CommandLine << " --system-dir "; PathToUtf8((m_Env.CreateNewTestDir() / "system-dir").c_str(), CommandLine); @@ -1665,10 +1694,11 @@ StartupZenServer(LoggerRef LogRef, const StartupZenServerOptions& Options) ZenServerInstance Server(ServerEnvironment, Options.Mode); Server.SetEnableExecutionHistory(Options.EnableExecutionHistory); - std::string ServerArguments(Options.ExtraArgs); - if ((Options.Port != 0) && (ServerArguments.find("--port") == std::string::npos)) + ExtendableStringBuilder<256> ServerArguments; + ServerArguments << Options.ExtraArgs; + if (Options.Port != 0 && !HasCliFlag(ServerArguments, "--port")) { - ServerArguments.append(fmt::format(" --port {}", Options.Port)); + ServerArguments << " --port " << Options.Port; } Server.SpawnServer(ServerArguments, Options.OpenConsole, /*WaitTimeoutMs*/ 0); |