// Copyright Epic Games, Inc. All Rights Reserved. #include "sessions.h" #include #include #include #include #include #include namespace zen { using namespace std::literals; namespace { // Per-session log file layout: // [LogFileHeader][Record]* // where Record = [uint32_t ByteLength][CbObject bytes of ByteLength]. // Records are written in order. A partial trailing record (short write after // crash) is ignored on load. constexpr uint32_t kLogFileMagic = 0x5A534C47u; // 'ZSLG' constexpr uint32_t kLogFileVersion = 1; #pragma pack(push, 1) struct LogFileHeader { uint32_t Magic; uint32_t Version; }; #pragma pack(pop) constexpr uint64_t kLogFileHeaderSize = sizeof(LogFileHeader); void WriteLogEntryFields(CbObjectWriter& Writer, const SessionsService::LogEntryInput& Entry) { Writer << "ts" << Entry.Timestamp; if (Entry.Level != logging::Off) { // Store as a small integer (int8_t range) rather than a string — // fixed-width, one byte in the serialized CbObject. Writer << "lvl" << static_cast(Entry.Level); } if (!Entry.LoggerName.empty()) { Writer << "cat" << Entry.LoggerName; } if (!Entry.Message.empty()) { Writer << "msg" << Entry.Message; } // Structured-log template + fields. Only present for UE_LOGFMT-shaped // entries; Message already holds the rendered text for those so a // reader that ignores these two can still display the line. if (!Entry.Format.empty()) { Writer << "fmt" << Entry.Format; if (Entry.Fields.GetSize() > 0) { Writer.AddObject("flds"sv, Entry.Fields); } } } // Parse a serialized record into an input form. The string_views in // the result borrow from `Obj`'s underlying buffer; the caller must // keep that buffer alive (or arena-copy via PreloadEntries) before // the views are used. bool ReadLogEntry(CbObjectView Obj, SessionsService::LogEntryInput& OutInput) { CbFieldView TsField = Obj["ts"sv]; if (!TsField) { return false; } OutInput.Timestamp = TsField.AsDateTime(); // New format: integer. Legacy format (pre-refactor log.bin files on // this same branch): string. Accept either so existing persisted // entries keep their level when loaded. CbFieldView LvlField = Obj["lvl"sv]; if (LvlField.IsInteger()) { const int32_t Lvl = LvlField.AsInt32(); if (Lvl >= 0 && Lvl < logging::LogLevelCount) { OutInput.Level = static_cast(Lvl); } } else if (LvlField.IsString()) { OutInput.Level = logging::ParseLogLevelString(LvlField.AsString()); } OutInput.LoggerName = Obj["cat"sv].AsString(); OutInput.Message = Obj["msg"sv].AsString(); OutInput.Format = Obj["fmt"sv].AsString(); if (CbObjectView FieldsView = Obj["flds"sv].AsObjectView(); FieldsView.GetSize() > 0) { OutInput.Fields = CbObject::Clone(FieldsView); } return true; } void WriteSessionInfoFields(CbObjectWriter& Writer, const SessionsService::SessionInfo& Info) { Writer << "id" << Info.Id; if (!Info.AppName.empty()) { Writer << "app" << Info.AppName; } if (!Info.Mode.empty()) { Writer << "mode" << Info.Mode; } if (!Info.Platform.empty()) { Writer << "plat" << Info.Platform; } if (Info.ClientPid != 0) { Writer << "pid" << Info.ClientPid; } if (Info.ParentSessionId != Oid::Zero) { Writer << "parent_session_id" << Info.ParentSessionId; } if (Info.JobId != Oid::Zero) { Writer << "jobid" << Info.JobId; } Writer << "ca" << Info.CreatedAt; Writer << "ua" << Info.UpdatedAt; if (Info.EndedAt.GetTicks() != 0) { Writer << "ea" << Info.EndedAt; } if (Info.Metadata.GetSize() > 0) { Writer.AddObject("meta"sv, Info.Metadata); } } bool ReadSessionInfo(CbObjectView Obj, SessionsService::SessionInfo& OutInfo) { OutInfo.Id = Obj["id"sv].AsObjectId(); if (OutInfo.Id == Oid::Zero) { return false; } OutInfo.AppName = std::string(Obj["app"sv].AsString()); OutInfo.Mode = std::string(Obj["mode"sv].AsString()); OutInfo.Platform = std::string(Obj["plat"sv].AsString()); OutInfo.ClientPid = Obj["pid"sv].AsUInt32(); OutInfo.ParentSessionId = Obj["parent_session_id"sv].AsObjectId(); OutInfo.JobId = Obj["jobid"sv].AsObjectId(); OutInfo.CreatedAt = Obj["ca"sv].AsDateTime(); OutInfo.UpdatedAt = Obj["ua"sv].AsDateTime(); OutInfo.EndedAt = Obj["ea"sv].AsDateTime(DateTime{0}); CbObjectView MetaView = Obj["meta"sv].AsObjectView(); if (MetaView.GetSize() > 0) { OutInfo.Metadata = CbObject::Clone(MetaView); } return true; } std::filesystem::path SessionDir(const std::filesystem::path& Root, const Oid& Id) { return Root / Id.ToString(); } #if ZEN_PLATFORM_WINDOWS // Turn a Windows process exit code into a human-friendly termination // reason. Most abnormal terminations surface as NTSTATUS values (high // bit set); the ones below are what you'll actually encounter in the // wild. Anything we don't recognize falls through to a formatted hex // (NTSTATUS-shaped) or decimal (application-level) exit code. std::string DescribeWindowsExitCode(uint32_t ExitCode) { struct Named { uint32_t Code; std::string_view Name; }; using namespace std::literals; static constexpr Named kKnown[] = { {0xC000013Au, "interrupted (Ctrl-C)"sv}, {0xC0000005u, "access violation"sv}, {0xC000001Du, "illegal instruction"sv}, {0xC0000094u, "integer divide by zero"sv}, {0xC0000096u, "privileged instruction"sv}, {0xC00000FDu, "stack overflow"sv}, {0xC0000409u, "stack buffer overrun"sv}, {0xC0000374u, "heap corruption"sv}, {0xC0000135u, "DLL not found"sv}, {0xC0000142u, "DLL initialization failed"sv}, {0xC000007Bu, "invalid image format"sv}, {0xC0000420u, "assertion failure"sv}, {0xC0000008u, "invalid handle"sv}, {0xC000008Eu, "float divide by zero"sv}, {0xC0000091u, "float overflow"sv}, {0xC0000093u, "float underflow"sv}, {0x80000003u, "breakpoint"sv}, {0x40000015u, "fatal app exit"sv}, }; for (const Named& Entry : kKnown) { if (Entry.Code == ExitCode) { return fmt::format("process exited ({}, exit code 0x{:08X})", Entry.Name, ExitCode); } } // NTSTATUS-shaped codes have the high bit set; show them as hex so // they're recognizable (and matchable against Microsoft's doc tables). if ((ExitCode & 0x80000000u) != 0) { return fmt::format("process exited (exit code 0x{:08X})", ExitCode); } return fmt::format("process exited (exit code {})", ExitCode); } #endif } // namespace class SessionLog : public TRefCounted { public: explicit SessionLog(std::filesystem::path LogFilePath); void Append(const SessionsService::LogEntryInput& Entry); // LoadTail returns input-form entries: their string_views borrow // from m_OwnedBuffers (held internally) so the caller's PreloadEntries // can intern/copy them into the session arena before the buffers are // dropped at LoadResult destruction. struct LoadResult { std::vector TailEntries; // Backing memory for the views in TailEntries. Each ParsedRecord // keeps a CbObject alive whose payload bytes back the strings. std::vector OwnedBuffers; uint64_t TotalCount = 0; }; LoadResult LoadTail(size_t MaxEntries); private: static LoggerRef Log() { static LoggerRef L(logging::Get("sessions")); return L; } std::filesystem::path m_Path; std::mutex m_Mutex; BasicFile m_File; uint64_t m_WriteOffset = 0; bool m_Enabled = false; }; SessionLog::SessionLog(std::filesystem::path LogFilePath) : m_Path(std::move(LogFilePath)) { std::error_code Ec; std::filesystem::create_directories(m_Path.parent_path(), Ec); m_File.Open(m_Path, BasicFile::Mode::kWrite, Ec); if (Ec) { ZEN_WARN("Session log '{}' could not be opened: {} - persistence disabled", m_Path, Ec.message()); return; } const uint64_t Size = m_File.FileSize(Ec); if (Ec) { m_File.Close(); ZEN_WARN("Session log '{}' could not be sized: {} - persistence disabled", m_Path, Ec.message()); return; } LogFileHeader Header{}; bool NeedsInit = Size < kLogFileHeaderSize; if (!NeedsInit) { // Read is throwing-only; guard so a read failure doesn't escape. try { m_File.Read(&Header, sizeof(Header), 0); } catch (const std::exception& E) { ZEN_WARN("Session log '{}' header read failed: {} - reinitializing", m_Path, E.what()); NeedsInit = true; } if (!NeedsInit && (Header.Magic != kLogFileMagic || Header.Version != kLogFileVersion)) { NeedsInit = true; } } if (NeedsInit) { m_File.SetFileSize(0); Header = LogFileHeader{.Magic = kLogFileMagic, .Version = kLogFileVersion}; m_File.Write(&Header, sizeof(Header), 0, Ec); if (Ec) { m_File.Close(); ZEN_WARN("Session log '{}' header write failed: {} - persistence disabled", m_Path, Ec.message()); return; } m_WriteOffset = kLogFileHeaderSize; } else { m_WriteOffset = Size; } m_Enabled = true; } void SessionLog::Append(const SessionsService::LogEntryInput& Entry) { if (!m_Enabled) { return; } CbObjectWriter Writer; WriteLogEntryFields(Writer, Entry); CbObject Obj = Writer.Save(); // Write directly from the CbObject's owned buffer — no need to allocate // a fresh UniqueBuffer and memcpy just to hand the bytes to BasicFile::Write. const MemoryView View = Obj.GetView(); const uint64_t ObjSize = View.GetSize(); if (ObjSize == 0 || ObjSize > std::numeric_limits::max()) { return; } const uint32_t Len = static_cast(ObjSize); std::lock_guard Lock(m_Mutex); std::error_code Ec; m_File.Write(&Len, sizeof(Len), m_WriteOffset, Ec); if (Ec) { return; } m_File.Write(View.GetData(), ObjSize, m_WriteOffset + sizeof(Len), Ec); if (Ec) { return; } m_WriteOffset += sizeof(Len) + ObjSize; } SessionLog::LoadResult SessionLog::LoadTail(size_t MaxEntries) { std::lock_guard Lock(m_Mutex); LoadResult Result; if (!m_Enabled) { return Result; } std::error_code Ec; const uint64_t Size = m_File.FileSize(Ec); if (Ec || Size <= kLogFileHeaderSize) { return Result; } IoBuffer Buffer; try { Buffer = m_File.ReadRange(kLogFileHeaderSize, Size - kLogFileHeaderSize); } catch (const std::exception& E) { ZEN_WARN("Session log '{}' tail read failed: {}", m_Path, E.what()); return Result; } const uint8_t* Data = reinterpret_cast(Buffer.GetData()); const uint64_t DataSize = Buffer.GetSize(); // Walk all valid record positions, ignoring any partial trailing record. struct RecRef { const uint8_t* Ptr; uint32_t Len; }; std::vector Records; uint64_t Pos = 0; while (Pos + sizeof(uint32_t) <= DataSize) { uint32_t RecLen = 0; std::memcpy(&RecLen, Data + Pos, sizeof(RecLen)); const uint64_t Next = Pos + sizeof(uint32_t) + RecLen; if (RecLen == 0 || Next > DataSize) { break; } Records.push_back(RecRef{.Ptr = Data + Pos + sizeof(uint32_t), .Len = RecLen}); Pos = Next; } // Parse every record so TotalCount reflects how many actually decode // — cursors anchor to this number, and counting CB-corrupt records // would shift subsequent cursor math off. Only the trailing window // of size MaxEntries is materialized into TailEntries; head records // are parsed purely for the count and discarded. const size_t TailStart = Records.size() > MaxEntries ? Records.size() - MaxEntries : 0; Result.TailEntries.reserve(Records.size() - TailStart); Result.OwnedBuffers.reserve(Records.size() - TailStart); for (size_t i = 0; i < Records.size(); ++i) { try { IoBuffer RecBuf = IoBufferBuilder::MakeCloneFromMemory(MemoryView(Records[i].Ptr, Records[i].Len)); CbObject Obj = LoadCompactBinaryObject(std::move(RecBuf)); SessionsService::LogEntryInput Input; if (ReadLogEntry(Obj, Input)) { ++Result.TotalCount; if (i >= TailStart) { // Keep the CbObject alive for as long as the views // in the input are needed — PreloadEntries will copy // the strings into the session arena and we drop the // buffer set when LoadResult is destroyed. Result.TailEntries.push_back(std::move(Input)); Result.OwnedBuffers.push_back(std::move(Obj)); } } } catch (const std::exception&) { // Skip malformed record — does not contribute to TotalCount. } } return Result; } ////////////////////////////////////////////////////////////////////////// class SessionLogStore { public: explicit SessionLogStore(std::filesystem::path StoragePath); Ref GetOrCreateLogForSession(const Oid& SessionId); void WriteSessionInfoFile(const SessionsService::SessionInfo& Info); void DeleteSession(const Oid& SessionId); uint64_t GetSessionSize(const Oid& SessionId) const; struct PersistedSession { SessionsService::SessionInfo Info; std::filesystem::path LogPath; }; std::vector Scan() const; private: static LoggerRef Log() { static LoggerRef L(logging::Get("sessions")); return L; } std::filesystem::path m_StoragePath; }; SessionLogStore::SessionLogStore(std::filesystem::path StoragePath) : m_StoragePath(std::move(StoragePath)) { std::error_code Ec; std::filesystem::create_directories(m_StoragePath, Ec); } Ref SessionLogStore::GetOrCreateLogForSession(const Oid& SessionId) { const std::filesystem::path Dir = SessionDir(m_StoragePath, SessionId); std::error_code Ec; std::filesystem::create_directories(Dir, Ec); return Ref(new SessionLog(Dir / "log.bin")); } void SessionLogStore::DeleteSession(const Oid& SessionId) { const std::filesystem::path Dir = SessionDir(m_StoragePath, SessionId); std::error_code Ec; std::filesystem::remove_all(Dir, Ec); if (Ec) { ZEN_WARN("Failed to remove session directory '{}': {}", Dir, Ec.message()); } } uint64_t SessionLogStore::GetSessionSize(const Oid& SessionId) const { const std::filesystem::path Dir = SessionDir(m_StoragePath, SessionId); std::error_code Ec; uint64_t Total = 0; std::filesystem::directory_iterator It{Dir, Ec}; if (Ec) { return 0; } for (const std::filesystem::directory_entry& Entry : It) { std::error_code FileEc; if (Entry.is_regular_file(FileEc)) { const uintmax_t Size = Entry.file_size(FileEc); if (!FileEc) { Total += uint64_t(Size); } } } return Total; } void SessionLogStore::WriteSessionInfoFile(const SessionsService::SessionInfo& Info) { const std::filesystem::path Dir = SessionDir(m_StoragePath, Info.Id); std::error_code Ec; std::filesystem::create_directories(Dir, Ec); CbObjectWriter Writer; WriteSessionInfoFields(Writer, Info); CbObject Obj = Writer.Save(); const MemoryView View = Obj.GetView(); if (View.GetSize() == 0) { return; } TemporaryFile::SafeWriteFile(Dir / "info.cb", View, Ec); } std::vector SessionLogStore::Scan() const { std::vector Result; std::error_code Ec; if (!std::filesystem::exists(m_StoragePath, Ec)) { return Result; } for (const std::filesystem::directory_entry& Entry : std::filesystem::directory_iterator{m_StoragePath, Ec}) { if (Ec || !Entry.is_directory(Ec)) { continue; } const std::filesystem::path InfoPath = Entry.path() / "info.cb"; const std::filesystem::path LogPath = Entry.path() / "log.bin"; if (!std::filesystem::exists(InfoPath, Ec)) { continue; } try { BasicFile InfoFile; std::error_code OpenEc; InfoFile.Open(InfoPath, BasicFile::Mode::kRead, OpenEc); if (OpenEc) { continue; } IoBuffer InfoBuf = InfoFile.ReadAll(); if (InfoBuf.GetSize() == 0) { continue; } CbObject InfoObj = LoadCompactBinaryObject(std::move(InfoBuf)); PersistedSession PS{.Info = SessionsService::SessionInfo{ .Id = Oid::Zero, .CreatedAt = DateTime{0}, .UpdatedAt = DateTime{0}, }}; if (!ReadSessionInfo(InfoObj, PS.Info)) { continue; } PS.LogPath = LogPath; Result.push_back(std::move(PS)); } catch (const std::exception& E) { ZEN_WARN("Skipping session directory '{}': {}", Entry.path(), E.what()); } } return Result; } ////////////////////////////////////////////////////////////////////////// SessionsService::Session::Session(const SessionInfo& Info, Ref Log, ProcessHandle ClientProcess) : m_Info(Info) , m_Log(std::move(Log)) , m_ClientProcess(std::move(ClientProcess)) { } SessionsService::Session::~Session() = default; const char* SessionsService::Session::InternLoggerNameLocked(std::string_view Name) { if (Name.empty()) { return ""; } if (auto It = m_InternedLoggerNames.find(Name); It != m_InternedLoggerNames.end()) { return It->second; } const char* Arena = m_LogArena.DuplicateString(Name); // The map's key view borrows from Arena (which lives as long as the // session does), so it's safe to outlive `Name`. m_InternedLoggerNames.emplace(std::string_view{Arena, Name.size()}, Arena); return Arena; } const char* SessionsService::Session::AllocateLogStringLocked(std::string_view Str) { if (Str.empty()) { return ""; } return m_LogArena.DuplicateString(Str); } uint64_t SessionsService::Session::AppendLog(LogEntryInput Input) { // Persist first (outside the deque lock ordering) so disk I/O doesn't // starve cursor readers holding the shared lock. The SessionLog has its // own internal mutex that serializes file writes. if (m_Log) { m_Log->Append(Input); } RwLock::ExclusiveLockScope Lock(m_LogLock); m_LogEntries.emplace_back(); LogEntry& Entry = m_LogEntries.back(); Entry.Timestamp = Input.Timestamp; Entry.Level = Input.Level; Entry.LoggerName = InternLoggerNameLocked(Input.LoggerName); Entry.Message = AllocateLogStringLocked(Input.Message); Entry.Format = AllocateLogStringLocked(Input.Format); Entry.Fields = std::move(Input.Fields); const uint64_t NewCursor = ++m_TotalAppended; while (m_LogEntries.size() > MaxLogEntries) { m_LogEntries.pop_front(); } return NewCursor; } uint64_t SessionsService::Session::AppendLogBatch(std::span Inputs) { if (Inputs.empty()) { return 0; } // Persist first (per-entry; SessionLog's internal mutex serializes // these writes). We do this outside m_LogLock so file I/O doesn't // stall cursor readers. if (m_Log) { for (LogEntryInput& Input : Inputs) { m_Log->Append(Input); } } RwLock::ExclusiveLockScope Lock(m_LogLock); for (LogEntryInput& Input : Inputs) { m_LogEntries.emplace_back(); LogEntry& Entry = m_LogEntries.back(); Entry.Timestamp = Input.Timestamp; Entry.Level = Input.Level; Entry.LoggerName = InternLoggerNameLocked(Input.LoggerName); Entry.Message = AllocateLogStringLocked(Input.Message); Entry.Format = AllocateLogStringLocked(Input.Format); Entry.Fields = std::move(Input.Fields); ++m_TotalAppended; } while (m_LogEntries.size() > MaxLogEntries) { m_LogEntries.pop_front(); } return m_TotalAppended; } void SessionsService::Session::PreloadEntries(std::span Tail, uint64_t TotalCount) { RwLock::ExclusiveLockScope Lock(m_LogLock); m_LogEntries.clear(); for (const LogEntryInput& Input : Tail) { m_LogEntries.emplace_back(); LogEntry& Entry = m_LogEntries.back(); Entry.Timestamp = Input.Timestamp; Entry.Level = Input.Level; Entry.LoggerName = InternLoggerNameLocked(Input.LoggerName); Entry.Message = AllocateLogStringLocked(Input.Message); Entry.Format = AllocateLogStringLocked(Input.Format); Entry.Fields = Input.Fields; } m_TotalAppended = TotalCount; } std::vector SessionsService::Session::GetLogEntries(uint32_t Limit, uint32_t Offset) const { RwLock::SharedLockScope Lock(m_LogLock); const uint32_t Total = uint32_t(m_LogEntries.size()); if (Offset >= Total) { return {}; } const uint32_t Available = Total - Offset; const uint32_t Count = (Limit > 0) ? std::min(Limit, Available) : Available; std::vector Result; Result.reserve(Count); for (uint32_t i = Offset; i < Offset + Count; i++) { Result.push_back(m_LogEntries[i]); } return Result; } uint64_t SessionsService::Session::GetLogCount() const { RwLock::SharedLockScope Lock(m_LogLock); return m_LogEntries.size(); } SessionsService::Session::CursorResult SessionsService::Session::GetLogEntriesAfter(uint64_t AfterCursor) const { RwLock::SharedLockScope Lock(m_LogLock); const uint64_t DequeSize = m_LogEntries.size(); // Cursor 0 means "give me everything currently in the deque". // Otherwise, compute how many new entries were appended since the cursor. uint64_t NewCount = (AfterCursor == 0) ? DequeSize : (m_TotalAppended > AfterCursor ? m_TotalAppended - AfterCursor : 0); // Clamp to what's actually available in the deque (entries may have been evicted). NewCount = std::min(NewCount, DequeSize); std::vector Result; Result.reserve(NewCount); const uint64_t StartIndex = DequeSize - NewCount; for (uint64_t i = StartIndex; i < DequeSize; i++) { Result.push_back(m_LogEntries[i]); } return CursorResult{ .Entries = std::move(Result), .Cursor = m_TotalAppended, .Count = DequeSize, }; } uint64_t SessionsService::AppendLog(const Oid& SessionId, LogEntryInput Input) { // Resolve the session without holding any external lock — GetSession // acquires m_Lock shared briefly and returns a ref-counted handle. Ref Target = GetSession(SessionId); if (!Target) { return 0; } const uint64_t NewCursor = Target->AppendLog(std::move(Input)); // Fire after Session::m_LogLock is released (inside AppendLog) so the // callback can safely call back into this service (e.g. to resolve // the session again and fetch the delta for its subscribers) without // any nested-lock concerns. if (m_LogAppendedCallback) { m_LogAppendedCallback(SessionId, NewCursor); } return NewCursor; } uint64_t SessionsService::AppendLogBatch(const Oid& SessionId, std::span Inputs) { if (Inputs.empty()) { return 0; } Ref Target = GetSession(SessionId); if (!Target) { return 0; } const uint64_t NewCursor = Target->AppendLogBatch(Inputs); // One callback fires for the whole batch — subscribers see all // entries in a single delta rather than N separate deltas. Fired // after Session::m_LogLock is released for the same reason as the // single-entry path. if (m_LogAppendedCallback) { m_LogAppendedCallback(SessionId, NewCursor); } return NewCursor; } void SessionsService::SetLogAppendedCallback(LogAppendedCallback Callback) { m_LogAppendedCallback = std::move(Callback); } ////////////////////////////////////////////////////////////////////////// SessionsService::SessionsService(std::filesystem::path StorageRoot) : m_Log(logging::Get("sessions")) { if (StorageRoot.empty()) { return; } m_SessionLogs = std::make_unique(StorageRoot); // Load all previously-persisted sessions as ended sessions. Their log // tails are preloaded into the in-memory deque so the UI can view them. std::vector Persisted = m_SessionLogs->Scan(); for (SessionLogStore::PersistedSession& PS : Persisted) { // Sessions that were active at shutdown need a synthetic ended time so // they sort correctly in the UI. if (PS.Info.EndedAt.GetTicks() == 0) { PS.Info.EndedAt = PS.Info.UpdatedAt; } Ref S = Ref(new Session(PS.Info)); // Load the log tail (no SessionLog attached — historical sessions do // not receive new appends and the file handle is released here). // PreloadEntries copies/interns each input's strings into the // session's arena, after which Loaded.OwnedBuffers can be // released along with the rest of the LoadResult. SessionLog Reader(PS.LogPath); SessionLog::LoadResult Loaded = Reader.LoadTail(Session::MaxLogEntries); S->PreloadEntries(Loaded.TailEntries, Loaded.TotalCount); m_EndedSessions.push_back(std::move(S)); } ZEN_INFO("Sessions service loaded {} persisted session(s) from '{}'", m_EndedSessions.size(), StorageRoot); } SessionsService::~SessionsService() = default; bool SessionsService::RegisterSession(const Oid& SessionId, std::string AppName, std::string Mode, std::string Platform, uint32_t ClientPid, const Oid& ParentSessionId, const Oid& JobId, CbObjectView Metadata) { // Open a process handle eagerly — BEFORE any pid-reuse window opens. On // Windows the handle is tied to the specific process instance, so a // later pid recycle can't fool later liveness checks. Do the syscall // outside the service lock (it's a kernel round-trip). On POSIX this is // effectively a no-op (just stores the pid). ProcessHandle ClientProcess; if (ClientPid != 0) { std::error_code Ec; ClientProcess.Initialize(static_cast(ClientPid), Ec); if (Ec) { ZEN_WARN("Session {} registered with pid {} but OpenProcess failed: {} — liveness tracking disabled", SessionId, ClientPid, Ec.message()); } } SessionInfo PersistedInfo{.Id = Oid::Zero, .CreatedAt = DateTime{0}, .UpdatedAt = DateTime{0}}; // Log outside the lock scope - InProcSessionLogSink calls back into // GetSession() which acquires m_Lock shared, so logging while holding // m_Lock exclusively would deadlock. { RwLock::ExclusiveLockScope Lock(m_Lock); if (m_Sessions.contains(SessionId)) { return false; } const DateTime Now = DateTime::Now(); SessionInfo Info{.Id = SessionId, .AppName = AppName, .Mode = Mode, .Platform = Platform, .ClientPid = ClientPid, .ParentSessionId = ParentSessionId, .JobId = JobId, .Metadata = CbObject::Clone(Metadata), .CreatedAt = Now, .UpdatedAt = Now}; Ref Log; if (m_SessionLogs) { Log = m_SessionLogs->GetOrCreateLogForSession(SessionId); } m_Sessions.emplace(SessionId, Ref(new Session(Info, std::move(Log), std::move(ClientProcess)))); PersistedInfo = std::move(Info); } if (m_SessionLogs) { m_SessionLogs->WriteSessionInfoFile(PersistedInfo); } // Include the tracked pid so the log makes it obvious whether the client // opted into liveness tracking (and whether our OpenProcess succeeded). // "pid: 0" = no liveness tracking (remote or client didn't report); // "pid: N" with an immediately-prior warning = OpenProcess failed. ZEN_INFO("Session {} registered (AppName: {}, Mode: {}, Platform: {}, Pid: {}, ParentSessionId: {}, JobId: {})", SessionId, AppName, Mode, Platform, ClientPid, ParentSessionId, JobId); return true; } bool SessionsService::UpdateSession(const Oid& SessionId, CbObjectView Metadata) { SessionInfo PersistedInfo{.Id = Oid::Zero, .CreatedAt = DateTime{0}, .UpdatedAt = DateTime{0}}; { RwLock::ExclusiveLockScope Lock(m_Lock); auto It = m_Sessions.find(SessionId); if (It == m_Sessions.end()) { return false; } It.value()->UpdateMetadata(Metadata); PersistedInfo = It.value()->Info(); } if (m_SessionLogs) { m_SessionLogs->WriteSessionInfoFile(PersistedInfo); } ZEN_DEBUG("Session {} updated (AppName: {}, JobId: {})", SessionId, PersistedInfo.AppName, PersistedInfo.JobId); return true; } Ref SessionsService::GetSession(const Oid& SessionId) const { RwLock::SharedLockScope Lock(m_Lock); if (auto It = m_Sessions.find(SessionId); It != m_Sessions.end()) { return It->second; } // Fall back to ended sessions so HTTP consumers can fetch logs/metadata // for sessions that have finished (including sessions loaded from disk on // startup as historical ended sessions). for (const Ref& Ended : m_EndedSessions) { if (Ended->Info().Id == SessionId) { return Ended; } } return {}; } std::vector> SessionsService::GetSessions() const { RwLock::SharedLockScope Lock(m_Lock); std::vector> Result; Result.reserve(m_Sessions.size()); for (const auto& [Id, SessionRef] : m_Sessions) { Result.push_back(SessionRef); } return Result; } bool SessionsService::RemoveSession(const Oid& SessionId, std::string_view Reason) { std::string RemovedAppName; Oid RemovedJobId; SessionInfo PersistedInfo{.Id = Oid::Zero, .CreatedAt = DateTime{0}, .UpdatedAt = DateTime{0}}; bool Persist = false; Ref Ended; const DateTime EndTime = DateTime::Now(); { RwLock::ExclusiveLockScope Lock(m_Lock); auto It = m_Sessions.find(SessionId); if (It == m_Sessions.end()) { return false; } RemovedAppName = It.value()->Info().AppName; RemovedJobId = It.value()->Info().JobId; Ended = It.value(); Ended->SetEndedAt(EndTime); if (m_SessionLogs) { PersistedInfo = Ended->Info(); Persist = true; } m_EndedSessions.push_back(Ended); m_Sessions.erase(It); } // Synthetic "Session ended" entry is appended *after* m_Lock is // released. AppendLog hits disk via SessionLog::Append, so doing it // inside the exclusive scope would block every other session lookup // / registration / list while the I/O completes. The callback that // pushes the delta to WS subscribers also fires from outside the // lock — same self-deadlock concern as the GetSession path. uint64_t SyntheticEndCursor = 0; { ExtendableStringBuilder<128> MsgBuilder; MsgBuilder << "Session ended"sv; if (!Reason.empty()) { MsgBuilder << ": "sv << Reason; } SyntheticEndCursor = Ended->AppendLog(LogEntryInput{ .Timestamp = EndTime, .Level = logging::Info, .LoggerName = "sessions"sv, .Message = MsgBuilder.ToView(), }); } if (m_LogAppendedCallback) { m_LogAppendedCallback(SessionId, SyntheticEndCursor); } if (Persist) { m_SessionLogs->WriteSessionInfoFile(PersistedInfo); } ZEN_INFO("Session {} removed (AppName: {}, JobId: {})", SessionId, RemovedAppName, RemovedJobId); return true; } std::vector> SessionsService::GetEndedSessions() const { RwLock::SharedLockScope Lock(m_Lock); return m_EndedSessions; } uint64_t SessionsService::GetSessionCount() const { RwLock::SharedLockScope Lock(m_Lock); return m_Sessions.size(); } // The "when was this session last relevant" timestamp used for age checks // and count-based eviction ordering: EndedAt if set, otherwise UpdatedAt. static DateTime ReferenceTime(const SessionsService::SessionInfo& Info) { return Info.EndedAt.GetTicks() != 0 ? Info.EndedAt : Info.UpdatedAt; } size_t SessionsService::CheckProcessLiveness() { // Snapshot active sessions under a shared lock, then probe liveness and // drop dead ones without holding the service lock (IsRunning() is a // kernel round-trip and RemoveSession() takes the lock exclusively). std::vector> Candidates; { RwLock::SharedLockScope Lock(m_Lock); Candidates.reserve(m_Sessions.size()); for (const auto& [Id, SessionRef] : m_Sessions) { if (SessionRef->GetClientProcess().IsValid()) { Candidates.push_back(SessionRef); } } } size_t Ended = 0; for (const Ref& S : Candidates) { // m_ClientProcess is set once at construction and never mutated, so // reading it here without synchronization is safe. if (!S->GetClientProcess().IsRunning()) { // Build the termination reason. On Windows, GetExitCode() returns // the real OS exit code and is cheap right after !IsRunning(); we // map the common NTSTATUS codes (Ctrl-C, access violation, DLL // init failure, …) to human-readable names. On POSIX the exit // code is only populated via Wait() which we never call, so // stick with the plain reason. std::string Reason = "process exited"; #if ZEN_PLATFORM_WINDOWS const uint32_t ExitCode = static_cast(S->GetClientProcess().GetExitCode()); if (ExitCode != 0) { Reason = DescribeWindowsExitCode(ExitCode); } #endif if (RemoveSession(S->Info().Id, Reason)) { ++Ended; } } } return Ended; } SessionsService::PruneResult SessionsService::PruneExpired(TimeSpan MaxAge, size_t MaxCount, uint64_t MaxStorageBytes) { PruneResult Result; std::vector ToDelete; // Phase 1: age + count pruning (fast; purely in-memory). { RwLock::ExclusiveLockScope Lock(m_Lock); const uint64_t NowTicks = DateTime::Now().GetTicks(); const uint64_t CutoffTicks = NowTicks > MaxAge.GetTicks() ? NowTicks - MaxAge.GetTicks() : 0; const DateTime Cutoff{CutoffTicks}; // Age-based pruning: drop ended sessions whose reference time is // older than Cutoff. auto ExpiredIt = std::remove_if(m_EndedSessions.begin(), m_EndedSessions.end(), [&](const Ref& S) { if (ReferenceTime(S->Info()) < Cutoff) { ToDelete.push_back(S->Info().Id); return true; } return false; }); Result.ExpiredByAge = size_t(m_EndedSessions.end() - ExpiredIt); m_EndedSessions.erase(ExpiredIt, m_EndedSessions.end()); // Count-based pruning: keep at most MaxCount sessions total. Active // sessions are never touched; if there are already >= MaxCount active // sessions then all ended sessions get evicted. const size_t ActiveCount = m_Sessions.size(); const size_t EndedTarget = MaxCount > ActiveCount ? MaxCount - ActiveCount : 0; if (m_EndedSessions.size() > EndedTarget) { const size_t ToRemove = m_EndedSessions.size() - EndedTarget; // Move the `ToRemove` oldest entries to the front. std::partial_sort( m_EndedSessions.begin(), m_EndedSessions.begin() + ToRemove, m_EndedSessions.end(), [](const Ref& A, const Ref& B) { return ReferenceTime(A->Info()) < ReferenceTime(B->Info()); }); for (size_t i = 0; i < ToRemove; ++i) { ToDelete.push_back(m_EndedSessions[i]->Info().Id); } m_EndedSessions.erase(m_EndedSessions.begin(), m_EndedSessions.begin() + ToRemove); Result.ExpiredByCount = ToRemove; } } // Phase 1 disk deletion, outside the service lock. if (m_SessionLogs) { for (const Oid& Id : ToDelete) { m_SessionLogs->DeleteSession(Id); } } // Phase 2: storage-footprint pruning. Snapshot remaining ended sessions // (id + reference time) under a shared lock, then stat each directory // outside the lock so we don't hold writers off during filesystem calls. if (!m_SessionLogs) { return Result; } struct Candidate { Oid Id; DateTime RefTime; uint64_t Size = 0; }; std::vector Candidates; { RwLock::SharedLockScope Lock(m_Lock); Candidates.reserve(m_EndedSessions.size()); for (const Ref& S : m_EndedSessions) { Candidates.push_back(Candidate{.Id = S->Info().Id, .RefTime = ReferenceTime(S->Info())}); } } uint64_t TotalBytes = 0; for (Candidate& C : Candidates) { C.Size = m_SessionLogs->GetSessionSize(C.Id); TotalBytes += C.Size; } if (TotalBytes <= MaxStorageBytes) { return Result; } // Oldest first so we evict in chronological order. std::sort(Candidates.begin(), Candidates.end(), [](const Candidate& A, const Candidate& B) { return A.RefTime < B.RefTime; }); std::vector StorageDelete; uint64_t Reclaimed = 0; const uint64_t NeedBytes = TotalBytes - MaxStorageBytes; for (const Candidate& C : Candidates) { if (Reclaimed >= NeedBytes) { break; } StorageDelete.push_back(C.Id); Reclaimed += C.Size; } if (StorageDelete.empty()) { return Result; } // Erase from m_EndedSessions under exclusive lock. Concurrent RemoveSession // calls between the snapshot and here will have inserted new entries at // the back, which we safely leave alone. { RwLock::ExclusiveLockScope Lock(m_Lock); tsl::robin_map IdSet; for (const Oid& Id : StorageDelete) { IdSet[Id] = 1; } auto It = std::remove_if(m_EndedSessions.begin(), m_EndedSessions.end(), [&](const Ref& S) { return IdSet.contains(S->Info().Id); }); m_EndedSessions.erase(It, m_EndedSessions.end()); } for (const Oid& Id : StorageDelete) { m_SessionLogs->DeleteSession(Id); } Result.ExpiredByStorage = StorageDelete.size(); return Result; } } // namespace zen