diff options
Diffstat (limited to 'src/zenserver/sessions/sessions.cpp')
| -rw-r--r-- | src/zenserver/sessions/sessions.cpp | 1166 |
1 files changed, 1120 insertions, 46 deletions
diff --git a/src/zenserver/sessions/sessions.cpp b/src/zenserver/sessions/sessions.cpp index 9d4e3120c..470117c6a 100644 --- a/src/zenserver/sessions/sessions.cpp +++ b/src/zenserver/sessions/sessions.cpp @@ -3,59 +3,733 @@ #include "sessions.h" #include <zencore/basicfile.h> +#include <zencore/compactbinarybuilder.h> #include <zencore/fmtutils.h> +#include <zencore/iobuffer.h> #include <zencore/logging.h> +#include <mutex> + namespace zen { using namespace std::literals; +namespace { + + // Per-session log file layout: + // [LogFileHeader][Record]* + // where Record = [uint32_t ByteLength][CbObject bytes of ByteLength]. + // Records are written in order. A partial trailing record (short write after + // crash) is ignored on load. + constexpr uint32_t kLogFileMagic = 0x5A534C47u; // 'ZSLG' + constexpr uint32_t kLogFileVersion = 1; + +#pragma pack(push, 1) + struct LogFileHeader + { + uint32_t Magic; + uint32_t Version; + }; +#pragma pack(pop) + + constexpr uint64_t kLogFileHeaderSize = sizeof(LogFileHeader); + + void WriteLogEntryFields(CbObjectWriter& Writer, const SessionsService::LogEntryInput& Entry) + { + Writer << "ts" << Entry.Timestamp; + if (Entry.Level != logging::Off) + { + // Store as a small integer (int8_t range) rather than a string — + // fixed-width, one byte in the serialized CbObject. + Writer << "lvl" << static_cast<int32_t>(Entry.Level); + } + if (!Entry.LoggerName.empty()) + { + Writer << "cat" << Entry.LoggerName; + } + if (!Entry.Message.empty()) + { + Writer << "msg" << Entry.Message; + } + // Structured-log template + fields. Only present for UE_LOGFMT-shaped + // entries; Message already holds the rendered text for those so a + // reader that ignores these two can still display the line. + if (!Entry.Format.empty()) + { + Writer << "fmt" << Entry.Format; + if (Entry.Fields.GetSize() > 0) + { + Writer.AddObject("flds"sv, Entry.Fields); + } + } + } + + // Parse a serialized record into an input form. The string_views in + // the result borrow from `Obj`'s underlying buffer; the caller must + // keep that buffer alive (or arena-copy via PreloadEntries) before + // the views are used. + bool ReadLogEntry(CbObjectView Obj, SessionsService::LogEntryInput& OutInput) + { + CbFieldView TsField = Obj["ts"sv]; + if (!TsField) + { + return false; + } + OutInput.Timestamp = TsField.AsDateTime(); + + // New format: integer. Legacy format (pre-refactor log.bin files on + // this same branch): string. Accept either so existing persisted + // entries keep their level when loaded. + CbFieldView LvlField = Obj["lvl"sv]; + if (LvlField.IsInteger()) + { + const int32_t Lvl = LvlField.AsInt32(); + if (Lvl >= 0 && Lvl < logging::LogLevelCount) + { + OutInput.Level = static_cast<logging::LogLevel>(Lvl); + } + } + else if (LvlField.IsString()) + { + OutInput.Level = logging::ParseLogLevelString(LvlField.AsString()); + } + + OutInput.LoggerName = Obj["cat"sv].AsString(); + OutInput.Message = Obj["msg"sv].AsString(); + OutInput.Format = Obj["fmt"sv].AsString(); + if (CbObjectView FieldsView = Obj["flds"sv].AsObjectView(); FieldsView.GetSize() > 0) + { + OutInput.Fields = CbObject::Clone(FieldsView); + } + return true; + } + + void WriteSessionInfoFields(CbObjectWriter& Writer, const SessionsService::SessionInfo& Info) + { + Writer << "id" << Info.Id; + if (!Info.AppName.empty()) + { + Writer << "app" << Info.AppName; + } + if (!Info.Mode.empty()) + { + Writer << "mode" << Info.Mode; + } + if (!Info.Platform.empty()) + { + Writer << "plat" << Info.Platform; + } + if (Info.ClientPid != 0) + { + Writer << "pid" << Info.ClientPid; + } + if (Info.ParentSessionId != Oid::Zero) + { + Writer << "parent_session_id" << Info.ParentSessionId; + } + if (Info.JobId != Oid::Zero) + { + Writer << "jobid" << Info.JobId; + } + Writer << "ca" << Info.CreatedAt; + Writer << "ua" << Info.UpdatedAt; + if (Info.EndedAt.GetTicks() != 0) + { + Writer << "ea" << Info.EndedAt; + } + if (Info.Metadata.GetSize() > 0) + { + Writer.AddObject("meta"sv, Info.Metadata); + } + } + + bool ReadSessionInfo(CbObjectView Obj, SessionsService::SessionInfo& OutInfo) + { + OutInfo.Id = Obj["id"sv].AsObjectId(); + if (OutInfo.Id == Oid::Zero) + { + return false; + } + OutInfo.AppName = std::string(Obj["app"sv].AsString()); + OutInfo.Mode = std::string(Obj["mode"sv].AsString()); + OutInfo.Platform = std::string(Obj["plat"sv].AsString()); + OutInfo.ClientPid = Obj["pid"sv].AsUInt32(); + OutInfo.ParentSessionId = Obj["parent_session_id"sv].AsObjectId(); + OutInfo.JobId = Obj["jobid"sv].AsObjectId(); + OutInfo.CreatedAt = Obj["ca"sv].AsDateTime(); + OutInfo.UpdatedAt = Obj["ua"sv].AsDateTime(); + OutInfo.EndedAt = Obj["ea"sv].AsDateTime(DateTime{0}); + CbObjectView MetaView = Obj["meta"sv].AsObjectView(); + if (MetaView.GetSize() > 0) + { + OutInfo.Metadata = CbObject::Clone(MetaView); + } + return true; + } + + std::filesystem::path SessionDir(const std::filesystem::path& Root, const Oid& Id) { return Root / Id.ToString(); } + +#if ZEN_PLATFORM_WINDOWS + // Turn a Windows process exit code into a human-friendly termination + // reason. Most abnormal terminations surface as NTSTATUS values (high + // bit set); the ones below are what you'll actually encounter in the + // wild. Anything we don't recognize falls through to a formatted hex + // (NTSTATUS-shaped) or decimal (application-level) exit code. + std::string DescribeWindowsExitCode(uint32_t ExitCode) + { + struct Named + { + uint32_t Code; + std::string_view Name; + }; + using namespace std::literals; + static constexpr Named kKnown[] = { + {0xC000013Au, "interrupted (Ctrl-C)"sv}, + {0xC0000005u, "access violation"sv}, + {0xC000001Du, "illegal instruction"sv}, + {0xC0000094u, "integer divide by zero"sv}, + {0xC0000096u, "privileged instruction"sv}, + {0xC00000FDu, "stack overflow"sv}, + {0xC0000409u, "stack buffer overrun"sv}, + {0xC0000374u, "heap corruption"sv}, + {0xC0000135u, "DLL not found"sv}, + {0xC0000142u, "DLL initialization failed"sv}, + {0xC000007Bu, "invalid image format"sv}, + {0xC0000420u, "assertion failure"sv}, + {0xC0000008u, "invalid handle"sv}, + {0xC000008Eu, "float divide by zero"sv}, + {0xC0000091u, "float overflow"sv}, + {0xC0000093u, "float underflow"sv}, + {0x80000003u, "breakpoint"sv}, + {0x40000015u, "fatal app exit"sv}, + }; + for (const Named& Entry : kKnown) + { + if (Entry.Code == ExitCode) + { + return fmt::format("process exited ({}, exit code 0x{:08X})", Entry.Name, ExitCode); + } + } + // NTSTATUS-shaped codes have the high bit set; show them as hex so + // they're recognizable (and matchable against Microsoft's doc tables). + if ((ExitCode & 0x80000000u) != 0) + { + return fmt::format("process exited (exit code 0x{:08X})", ExitCode); + } + return fmt::format("process exited (exit code {})", ExitCode); + } +#endif + +} // namespace + class SessionLog : public TRefCounted<SessionLog> { public: - SessionLog(std::filesystem::path LogFilePath) { m_LogFile.Open(LogFilePath, BasicFile::Mode::kWrite); } + explicit SessionLog(std::filesystem::path LogFilePath); + + void Append(const SessionsService::LogEntryInput& Entry); + + // LoadTail returns input-form entries: their string_views borrow + // from m_OwnedBuffers (held internally) so the caller's PreloadEntries + // can intern/copy them into the session arena before the buffers are + // dropped at LoadResult destruction. + struct LoadResult + { + std::vector<SessionsService::LogEntryInput> TailEntries; + // Backing memory for the views in TailEntries. Each ParsedRecord + // keeps a CbObject alive whose payload bytes back the strings. + std::vector<CbObject> OwnedBuffers; + uint64_t TotalCount = 0; + }; + LoadResult LoadTail(size_t MaxEntries); private: - BasicFile m_LogFile; + static LoggerRef Log() + { + static LoggerRef L(logging::Get("sessions")); + return L; + } + + std::filesystem::path m_Path; + std::mutex m_Mutex; + BasicFile m_File; + uint64_t m_WriteOffset = 0; + bool m_Enabled = false; }; -class SessionLogStore +SessionLog::SessionLog(std::filesystem::path LogFilePath) : m_Path(std::move(LogFilePath)) { -public: - SessionLogStore(std::filesystem::path StoragePath) : m_StoragePath(std::move(StoragePath)) {} + std::error_code Ec; + std::filesystem::create_directories(m_Path.parent_path(), Ec); - ~SessionLogStore() = default; + m_File.Open(m_Path, BasicFile::Mode::kWrite, Ec); + if (Ec) + { + ZEN_WARN("Session log '{}' could not be opened: {} - persistence disabled", m_Path, Ec.message()); + return; + } - Ref<SessionLog> GetLogForSession(const Oid& SessionId) + const uint64_t Size = m_File.FileSize(Ec); + if (Ec) { - // For now, just return a new log for each session. We can implement actual log storage and retrieval later. - return Ref(new SessionLog(m_StoragePath / (SessionId.ToString() + ".log"))); + m_File.Close(); + ZEN_WARN("Session log '{}' could not be sized: {} - persistence disabled", m_Path, Ec.message()); + return; } - Ref<SessionLog> CreateLogForSession(const Oid& SessionId) + LogFileHeader Header{}; + bool NeedsInit = Size < kLogFileHeaderSize; + if (!NeedsInit) + { + // Read is throwing-only; guard so a read failure doesn't escape. + try + { + m_File.Read(&Header, sizeof(Header), 0); + } + catch (const std::exception& E) + { + ZEN_WARN("Session log '{}' header read failed: {} - reinitializing", m_Path, E.what()); + NeedsInit = true; + } + if (!NeedsInit && (Header.Magic != kLogFileMagic || Header.Version != kLogFileVersion)) + { + NeedsInit = true; + } + } + + if (NeedsInit) + { + m_File.SetFileSize(0); + Header = LogFileHeader{.Magic = kLogFileMagic, .Version = kLogFileVersion}; + m_File.Write(&Header, sizeof(Header), 0, Ec); + if (Ec) + { + m_File.Close(); + ZEN_WARN("Session log '{}' header write failed: {} - persistence disabled", m_Path, Ec.message()); + return; + } + m_WriteOffset = kLogFileHeaderSize; + } + else + { + m_WriteOffset = Size; + } + + m_Enabled = true; +} + +void +SessionLog::Append(const SessionsService::LogEntryInput& Entry) +{ + if (!m_Enabled) { - // For now, just return a new log for each session. We can implement actual log storage and retrieval later. - return Ref(new SessionLog(m_StoragePath / (SessionId.ToString() + ".log"))); + return; } + CbObjectWriter Writer; + WriteLogEntryFields(Writer, Entry); + CbObject Obj = Writer.Save(); + + // Write directly from the CbObject's owned buffer — no need to allocate + // a fresh UniqueBuffer and memcpy just to hand the bytes to BasicFile::Write. + const MemoryView View = Obj.GetView(); + const uint64_t ObjSize = View.GetSize(); + if (ObjSize == 0 || ObjSize > std::numeric_limits<uint32_t>::max()) + { + return; + } + const uint32_t Len = static_cast<uint32_t>(ObjSize); + + std::lock_guard<std::mutex> Lock(m_Mutex); + std::error_code Ec; + m_File.Write(&Len, sizeof(Len), m_WriteOffset, Ec); + if (Ec) + { + return; + } + m_File.Write(View.GetData(), ObjSize, m_WriteOffset + sizeof(Len), Ec); + if (Ec) + { + return; + } + m_WriteOffset += sizeof(Len) + ObjSize; +} + +SessionLog::LoadResult +SessionLog::LoadTail(size_t MaxEntries) +{ + std::lock_guard<std::mutex> Lock(m_Mutex); + LoadResult Result; + + if (!m_Enabled) + { + return Result; + } + + std::error_code Ec; + const uint64_t Size = m_File.FileSize(Ec); + if (Ec || Size <= kLogFileHeaderSize) + { + return Result; + } + + IoBuffer Buffer; + try + { + Buffer = m_File.ReadRange(kLogFileHeaderSize, Size - kLogFileHeaderSize); + } + catch (const std::exception& E) + { + ZEN_WARN("Session log '{}' tail read failed: {}", m_Path, E.what()); + return Result; + } + const uint8_t* Data = reinterpret_cast<const uint8_t*>(Buffer.GetData()); + const uint64_t DataSize = Buffer.GetSize(); + + // Walk all valid record positions, ignoring any partial trailing record. + struct RecRef + { + const uint8_t* Ptr; + uint32_t Len; + }; + std::vector<RecRef> Records; + uint64_t Pos = 0; + while (Pos + sizeof(uint32_t) <= DataSize) + { + uint32_t RecLen = 0; + std::memcpy(&RecLen, Data + Pos, sizeof(RecLen)); + const uint64_t Next = Pos + sizeof(uint32_t) + RecLen; + if (RecLen == 0 || Next > DataSize) + { + break; + } + Records.push_back(RecRef{.Ptr = Data + Pos + sizeof(uint32_t), .Len = RecLen}); + Pos = Next; + } + + // Parse every record so TotalCount reflects how many actually decode + // — cursors anchor to this number, and counting CB-corrupt records + // would shift subsequent cursor math off. Only the trailing window + // of size MaxEntries is materialized into TailEntries; head records + // are parsed purely for the count and discarded. + const size_t TailStart = Records.size() > MaxEntries ? Records.size() - MaxEntries : 0; + Result.TailEntries.reserve(Records.size() - TailStart); + Result.OwnedBuffers.reserve(Records.size() - TailStart); + + for (size_t i = 0; i < Records.size(); ++i) + { + try + { + IoBuffer RecBuf = IoBufferBuilder::MakeCloneFromMemory(MemoryView(Records[i].Ptr, Records[i].Len)); + CbObject Obj = LoadCompactBinaryObject(std::move(RecBuf)); + + SessionsService::LogEntryInput Input; + if (ReadLogEntry(Obj, Input)) + { + ++Result.TotalCount; + if (i >= TailStart) + { + // Keep the CbObject alive for as long as the views + // in the input are needed — PreloadEntries will copy + // the strings into the session arena and we drop the + // buffer set when LoadResult is destroyed. + Result.TailEntries.push_back(std::move(Input)); + Result.OwnedBuffers.push_back(std::move(Obj)); + } + } + } + catch (const std::exception&) + { + // Skip malformed record — does not contribute to TotalCount. + } + } + + return Result; +} + +////////////////////////////////////////////////////////////////////////// + +class SessionLogStore +{ +public: + explicit SessionLogStore(std::filesystem::path StoragePath); + + Ref<SessionLog> GetOrCreateLogForSession(const Oid& SessionId); + void WriteSessionInfoFile(const SessionsService::SessionInfo& Info); + void DeleteSession(const Oid& SessionId); + uint64_t GetSessionSize(const Oid& SessionId) const; + + struct PersistedSession + { + SessionsService::SessionInfo Info; + std::filesystem::path LogPath; + }; + std::vector<PersistedSession> Scan() const; + private: + static LoggerRef Log() + { + static LoggerRef L(logging::Get("sessions")); + return L; + } + std::filesystem::path m_StoragePath; }; -SessionsService::Session::Session(const SessionInfo& Info) : m_Info(Info) +SessionLogStore::SessionLogStore(std::filesystem::path StoragePath) : m_StoragePath(std::move(StoragePath)) { + std::error_code Ec; + std::filesystem::create_directories(m_StoragePath, Ec); +} + +Ref<SessionLog> +SessionLogStore::GetOrCreateLogForSession(const Oid& SessionId) +{ + const std::filesystem::path Dir = SessionDir(m_StoragePath, SessionId); + std::error_code Ec; + std::filesystem::create_directories(Dir, Ec); + return Ref(new SessionLog(Dir / "log.bin")); +} + +void +SessionLogStore::DeleteSession(const Oid& SessionId) +{ + const std::filesystem::path Dir = SessionDir(m_StoragePath, SessionId); + std::error_code Ec; + std::filesystem::remove_all(Dir, Ec); + if (Ec) + { + ZEN_WARN("Failed to remove session directory '{}': {}", Dir, Ec.message()); + } +} + +uint64_t +SessionLogStore::GetSessionSize(const Oid& SessionId) const +{ + const std::filesystem::path Dir = SessionDir(m_StoragePath, SessionId); + std::error_code Ec; + uint64_t Total = 0; + std::filesystem::directory_iterator It{Dir, Ec}; + if (Ec) + { + return 0; + } + for (const std::filesystem::directory_entry& Entry : It) + { + std::error_code FileEc; + if (Entry.is_regular_file(FileEc)) + { + const uintmax_t Size = Entry.file_size(FileEc); + if (!FileEc) + { + Total += uint64_t(Size); + } + } + } + return Total; } -SessionsService::Session::~Session() = default; void -SessionsService::Session::AppendLog(LogEntry Entry) +SessionLogStore::WriteSessionInfoFile(const SessionsService::SessionInfo& Info) { + const std::filesystem::path Dir = SessionDir(m_StoragePath, Info.Id); + std::error_code Ec; + std::filesystem::create_directories(Dir, Ec); + + CbObjectWriter Writer; + WriteSessionInfoFields(Writer, Info); + CbObject Obj = Writer.Save(); + const MemoryView View = Obj.GetView(); + if (View.GetSize() == 0) + { + return; + } + TemporaryFile::SafeWriteFile(Dir / "info.cb", View, Ec); +} + +std::vector<SessionLogStore::PersistedSession> +SessionLogStore::Scan() const +{ + std::vector<PersistedSession> Result; + std::error_code Ec; + + if (!std::filesystem::exists(m_StoragePath, Ec)) + { + return Result; + } + + for (const std::filesystem::directory_entry& Entry : std::filesystem::directory_iterator{m_StoragePath, Ec}) + { + if (Ec || !Entry.is_directory(Ec)) + { + continue; + } + + const std::filesystem::path InfoPath = Entry.path() / "info.cb"; + const std::filesystem::path LogPath = Entry.path() / "log.bin"; + + if (!std::filesystem::exists(InfoPath, Ec)) + { + continue; + } + + try + { + BasicFile InfoFile; + std::error_code OpenEc; + InfoFile.Open(InfoPath, BasicFile::Mode::kRead, OpenEc); + if (OpenEc) + { + continue; + } + IoBuffer InfoBuf = InfoFile.ReadAll(); + if (InfoBuf.GetSize() == 0) + { + continue; + } + CbObject InfoObj = LoadCompactBinaryObject(std::move(InfoBuf)); + PersistedSession PS{.Info = SessionsService::SessionInfo{ + .Id = Oid::Zero, + .CreatedAt = DateTime{0}, + .UpdatedAt = DateTime{0}, + }}; + if (!ReadSessionInfo(InfoObj, PS.Info)) + { + continue; + } + PS.LogPath = LogPath; + Result.push_back(std::move(PS)); + } + catch (const std::exception& E) + { + ZEN_WARN("Skipping session directory '{}': {}", Entry.path(), E.what()); + } + } + + return Result; +} + +////////////////////////////////////////////////////////////////////////// + +SessionsService::Session::Session(const SessionInfo& Info, Ref<SessionLog> Log, ProcessHandle ClientProcess) +: m_Info(Info) +, m_Log(std::move(Log)) +, m_ClientProcess(std::move(ClientProcess)) +{ +} +SessionsService::Session::~Session() = default; + +const char* +SessionsService::Session::InternLoggerNameLocked(std::string_view Name) +{ + if (Name.empty()) + { + return ""; + } + if (auto It = m_InternedLoggerNames.find(Name); It != m_InternedLoggerNames.end()) + { + return It->second; + } + const char* Arena = m_LogArena.DuplicateString(Name); + // The map's key view borrows from Arena (which lives as long as the + // session does), so it's safe to outlive `Name`. + m_InternedLoggerNames.emplace(std::string_view{Arena, Name.size()}, Arena); + return Arena; +} + +const char* +SessionsService::Session::AllocateLogStringLocked(std::string_view Str) +{ + if (Str.empty()) + { + return ""; + } + return m_LogArena.DuplicateString(Str); +} + +uint64_t +SessionsService::Session::AppendLog(LogEntryInput Input) +{ + // Persist first (outside the deque lock ordering) so disk I/O doesn't + // starve cursor readers holding the shared lock. The SessionLog has its + // own internal mutex that serializes file writes. + if (m_Log) + { + m_Log->Append(Input); + } + RwLock::ExclusiveLockScope Lock(m_LogLock); - m_LogEntries.push_back(std::move(Entry)); - ++m_TotalAppended; + m_LogEntries.emplace_back(); + LogEntry& Entry = m_LogEntries.back(); + Entry.Timestamp = Input.Timestamp; + Entry.Level = Input.Level; + Entry.LoggerName = InternLoggerNameLocked(Input.LoggerName); + Entry.Message = AllocateLogStringLocked(Input.Message); + Entry.Format = AllocateLogStringLocked(Input.Format); + Entry.Fields = std::move(Input.Fields); + const uint64_t NewCursor = ++m_TotalAppended; while (m_LogEntries.size() > MaxLogEntries) { m_LogEntries.pop_front(); } + return NewCursor; +} + +uint64_t +SessionsService::Session::AppendLogBatch(std::span<LogEntryInput> Inputs) +{ + if (Inputs.empty()) + { + return 0; + } + + // Persist first (per-entry; SessionLog's internal mutex serializes + // these writes). We do this outside m_LogLock so file I/O doesn't + // stall cursor readers. + if (m_Log) + { + for (LogEntryInput& Input : Inputs) + { + m_Log->Append(Input); + } + } + + RwLock::ExclusiveLockScope Lock(m_LogLock); + for (LogEntryInput& Input : Inputs) + { + m_LogEntries.emplace_back(); + LogEntry& Entry = m_LogEntries.back(); + Entry.Timestamp = Input.Timestamp; + Entry.Level = Input.Level; + Entry.LoggerName = InternLoggerNameLocked(Input.LoggerName); + Entry.Message = AllocateLogStringLocked(Input.Message); + Entry.Format = AllocateLogStringLocked(Input.Format); + Entry.Fields = std::move(Input.Fields); + ++m_TotalAppended; + } + while (m_LogEntries.size() > MaxLogEntries) + { + m_LogEntries.pop_front(); + } + return m_TotalAppended; +} + +void +SessionsService::Session::PreloadEntries(std::span<const LogEntryInput> Tail, uint64_t TotalCount) +{ + RwLock::ExclusiveLockScope Lock(m_LogLock); + m_LogEntries.clear(); + for (const LogEntryInput& Input : Tail) + { + m_LogEntries.emplace_back(); + LogEntry& Entry = m_LogEntries.back(); + Entry.Timestamp = Input.Timestamp; + Entry.Level = Input.Level; + Entry.LoggerName = InternLoggerNameLocked(Input.LoggerName); + Entry.Message = AllocateLogStringLocked(Input.Message); + Entry.Format = AllocateLogStringLocked(Input.Format); + Entry.Fields = Input.Fields; + } + m_TotalAppended = TotalCount; } std::vector<SessionsService::LogEntry> @@ -118,17 +792,134 @@ SessionsService::Session::GetLogEntriesAfter(uint64_t AfterCursor) const }; } +uint64_t +SessionsService::AppendLog(const Oid& SessionId, LogEntryInput Input) +{ + // Resolve the session without holding any external lock — GetSession + // acquires m_Lock shared briefly and returns a ref-counted handle. + Ref<Session> Target = GetSession(SessionId); + if (!Target) + { + return 0; + } + + const uint64_t NewCursor = Target->AppendLog(std::move(Input)); + + // Fire after Session::m_LogLock is released (inside AppendLog) so the + // callback can safely call back into this service (e.g. to resolve + // the session again and fetch the delta for its subscribers) without + // any nested-lock concerns. + if (m_LogAppendedCallback) + { + m_LogAppendedCallback(SessionId, NewCursor); + } + return NewCursor; +} + +uint64_t +SessionsService::AppendLogBatch(const Oid& SessionId, std::span<LogEntryInput> Inputs) +{ + if (Inputs.empty()) + { + return 0; + } + Ref<Session> Target = GetSession(SessionId); + if (!Target) + { + return 0; + } + + const uint64_t NewCursor = Target->AppendLogBatch(Inputs); + + // One callback fires for the whole batch — subscribers see all + // entries in a single delta rather than N separate deltas. Fired + // after Session::m_LogLock is released for the same reason as the + // single-entry path. + if (m_LogAppendedCallback) + { + m_LogAppendedCallback(SessionId, NewCursor); + } + return NewCursor; +} + +void +SessionsService::SetLogAppendedCallback(LogAppendedCallback Callback) +{ + m_LogAppendedCallback = std::move(Callback); +} + ////////////////////////////////////////////////////////////////////////// -SessionsService::SessionsService() : m_Log(logging::Get("sessions")) +SessionsService::SessionsService(std::filesystem::path StorageRoot) : m_Log(logging::Get("sessions")) { + if (StorageRoot.empty()) + { + return; + } + + m_SessionLogs = std::make_unique<SessionLogStore>(StorageRoot); + + // Load all previously-persisted sessions as ended sessions. Their log + // tails are preloaded into the in-memory deque so the UI can view them. + std::vector<SessionLogStore::PersistedSession> Persisted = m_SessionLogs->Scan(); + for (SessionLogStore::PersistedSession& PS : Persisted) + { + // Sessions that were active at shutdown need a synthetic ended time so + // they sort correctly in the UI. + if (PS.Info.EndedAt.GetTicks() == 0) + { + PS.Info.EndedAt = PS.Info.UpdatedAt; + } + + Ref<Session> S = Ref(new Session(PS.Info)); + + // Load the log tail (no SessionLog attached — historical sessions do + // not receive new appends and the file handle is released here). + // PreloadEntries copies/interns each input's strings into the + // session's arena, after which Loaded.OwnedBuffers can be + // released along with the rest of the LoadResult. + SessionLog Reader(PS.LogPath); + SessionLog::LoadResult Loaded = Reader.LoadTail(Session::MaxLogEntries); + S->PreloadEntries(Loaded.TailEntries, Loaded.TotalCount); + + m_EndedSessions.push_back(std::move(S)); + } + + ZEN_INFO("Sessions service loaded {} persisted session(s) from '{}'", m_EndedSessions.size(), StorageRoot); } SessionsService::~SessionsService() = default; bool -SessionsService::RegisterSession(const Oid& SessionId, std::string AppName, std::string Mode, const Oid& JobId, CbObjectView Metadata) +SessionsService::RegisterSession(const Oid& SessionId, + std::string AppName, + std::string Mode, + std::string Platform, + uint32_t ClientPid, + const Oid& ParentSessionId, + const Oid& JobId, + CbObjectView Metadata) { + // Open a process handle eagerly — BEFORE any pid-reuse window opens. On + // Windows the handle is tied to the specific process instance, so a + // later pid recycle can't fool later liveness checks. Do the syscall + // outside the service lock (it's a kernel round-trip). On POSIX this is + // effectively a no-op (just stores the pid). + ProcessHandle ClientProcess; + if (ClientPid != 0) + { + std::error_code Ec; + ClientProcess.Initialize(static_cast<int>(ClientPid), Ec); + if (Ec) + { + ZEN_WARN("Session {} registered with pid {} but OpenProcess failed: {} — liveness tracking disabled", + SessionId, + ClientPid, + Ec.message()); + } + } + + SessionInfo PersistedInfo{.Id = Oid::Zero, .CreatedAt = DateTime{0}, .UpdatedAt = DateTime{0}}; // Log outside the lock scope - InProcSessionLogSink calls back into // GetSession() which acquires m_Lock shared, so logging while holding // m_Lock exclusively would deadlock. @@ -141,35 +932,69 @@ SessionsService::RegisterSession(const Oid& SessionId, std::string AppName, std: } const DateTime Now = DateTime::Now(); - m_Sessions.emplace(SessionId, - Ref(new Session(SessionInfo{.Id = SessionId, - .AppName = AppName, - .Mode = Mode, - .JobId = JobId, - .Metadata = CbObject::Clone(Metadata), - .CreatedAt = Now, - .UpdatedAt = Now}))); + SessionInfo Info{.Id = SessionId, + .AppName = AppName, + .Mode = Mode, + .Platform = Platform, + .ClientPid = ClientPid, + .ParentSessionId = ParentSessionId, + .JobId = JobId, + .Metadata = CbObject::Clone(Metadata), + .CreatedAt = Now, + .UpdatedAt = Now}; + + Ref<SessionLog> Log; + if (m_SessionLogs) + { + Log = m_SessionLogs->GetOrCreateLogForSession(SessionId); + } + m_Sessions.emplace(SessionId, Ref(new Session(Info, std::move(Log), std::move(ClientProcess)))); + PersistedInfo = std::move(Info); + } + + if (m_SessionLogs) + { + m_SessionLogs->WriteSessionInfoFile(PersistedInfo); } - ZEN_INFO("Session {} registered (AppName: {}, Mode: {}, JobId: {})", SessionId, AppName, Mode, JobId); + // Include the tracked pid so the log makes it obvious whether the client + // opted into liveness tracking (and whether our OpenProcess succeeded). + // "pid: 0" = no liveness tracking (remote or client didn't report); + // "pid: N" with an immediately-prior warning = OpenProcess failed. + ZEN_INFO("Session {} registered (AppName: {}, Mode: {}, Platform: {}, Pid: {}, ParentSessionId: {}, JobId: {})", + SessionId, + AppName, + Mode, + Platform, + ClientPid, + ParentSessionId, + JobId); return true; } bool SessionsService::UpdateSession(const Oid& SessionId, CbObjectView Metadata) { - RwLock::ExclusiveLockScope Lock(m_Lock); - - auto It = m_Sessions.find(SessionId); - if (It == m_Sessions.end()) + SessionInfo PersistedInfo{.Id = Oid::Zero, .CreatedAt = DateTime{0}, .UpdatedAt = DateTime{0}}; { - return false; + RwLock::ExclusiveLockScope Lock(m_Lock); + + auto It = m_Sessions.find(SessionId); + if (It == m_Sessions.end()) + { + return false; + } + + It.value()->UpdateMetadata(Metadata); + PersistedInfo = It.value()->Info(); } - It.value()->UpdateMetadata(Metadata); + if (m_SessionLogs) + { + m_SessionLogs->WriteSessionInfoFile(PersistedInfo); + } - const SessionInfo& Info = It.value()->Info(); - ZEN_DEBUG("Session {} updated (AppName: {}, JobId: {})", SessionId, Info.AppName, Info.JobId); + ZEN_DEBUG("Session {} updated (AppName: {}, JobId: {})", SessionId, PersistedInfo.AppName, PersistedInfo.JobId); return true; } @@ -178,13 +1003,23 @@ SessionsService::GetSession(const Oid& SessionId) const { RwLock::SharedLockScope Lock(m_Lock); - auto It = m_Sessions.find(SessionId); - if (It == m_Sessions.end()) + if (auto It = m_Sessions.find(SessionId); It != m_Sessions.end()) { - return {}; + return It->second; + } + + // Fall back to ended sessions so HTTP consumers can fetch logs/metadata + // for sessions that have finished (including sessions loaded from disk on + // startup as historical ended sessions). + for (const Ref<Session>& Ended : m_EndedSessions) + { + if (Ended->Info().Id == SessionId) + { + return Ended; + } } - return It->second; + return {}; } std::vector<Ref<SessionsService::Session>> @@ -202,10 +1037,14 @@ SessionsService::GetSessions() const } bool -SessionsService::RemoveSession(const Oid& SessionId) +SessionsService::RemoveSession(const Oid& SessionId, std::string_view Reason) { - std::string RemovedAppName; - Oid RemovedJobId; + std::string RemovedAppName; + Oid RemovedJobId; + SessionInfo PersistedInfo{.Id = Oid::Zero, .CreatedAt = DateTime{0}, .UpdatedAt = DateTime{0}}; + bool Persist = false; + Ref<Session> Ended; + const DateTime EndTime = DateTime::Now(); { RwLock::ExclusiveLockScope Lock(m_Lock); @@ -219,13 +1058,50 @@ SessionsService::RemoveSession(const Oid& SessionId) RemovedAppName = It.value()->Info().AppName; RemovedJobId = It.value()->Info().JobId; - Ref<Session> Ended = It.value(); - Ended->SetEndedAt(DateTime::Now()); - m_EndedSessions.push_back(std::move(Ended)); + Ended = It.value(); + Ended->SetEndedAt(EndTime); + if (m_SessionLogs) + { + PersistedInfo = Ended->Info(); + Persist = true; + } + m_EndedSessions.push_back(Ended); m_Sessions.erase(It); } + // Synthetic "Session ended" entry is appended *after* m_Lock is + // released. AppendLog hits disk via SessionLog::Append, so doing it + // inside the exclusive scope would block every other session lookup + // / registration / list while the I/O completes. The callback that + // pushes the delta to WS subscribers also fires from outside the + // lock — same self-deadlock concern as the GetSession path. + uint64_t SyntheticEndCursor = 0; + { + ExtendableStringBuilder<128> MsgBuilder; + MsgBuilder << "Session ended"sv; + if (!Reason.empty()) + { + MsgBuilder << ": "sv << Reason; + } + SyntheticEndCursor = Ended->AppendLog(LogEntryInput{ + .Timestamp = EndTime, + .Level = logging::Info, + .LoggerName = "sessions"sv, + .Message = MsgBuilder.ToView(), + }); + } + + if (m_LogAppendedCallback) + { + m_LogAppendedCallback(SessionId, SyntheticEndCursor); + } + + if (Persist) + { + m_SessionLogs->WriteSessionInfoFile(PersistedInfo); + } + ZEN_INFO("Session {} removed (AppName: {}, JobId: {})", SessionId, RemovedAppName, RemovedJobId); return true; } @@ -244,4 +1120,202 @@ SessionsService::GetSessionCount() const return m_Sessions.size(); } +// The "when was this session last relevant" timestamp used for age checks +// and count-based eviction ordering: EndedAt if set, otherwise UpdatedAt. +static DateTime +ReferenceTime(const SessionsService::SessionInfo& Info) +{ + return Info.EndedAt.GetTicks() != 0 ? Info.EndedAt : Info.UpdatedAt; +} + +size_t +SessionsService::CheckProcessLiveness() +{ + // Snapshot active sessions under a shared lock, then probe liveness and + // drop dead ones without holding the service lock (IsRunning() is a + // kernel round-trip and RemoveSession() takes the lock exclusively). + std::vector<Ref<Session>> Candidates; + { + RwLock::SharedLockScope Lock(m_Lock); + Candidates.reserve(m_Sessions.size()); + for (const auto& [Id, SessionRef] : m_Sessions) + { + if (SessionRef->GetClientProcess().IsValid()) + { + Candidates.push_back(SessionRef); + } + } + } + + size_t Ended = 0; + for (const Ref<Session>& S : Candidates) + { + // m_ClientProcess is set once at construction and never mutated, so + // reading it here without synchronization is safe. + if (!S->GetClientProcess().IsRunning()) + { + // Build the termination reason. On Windows, GetExitCode() returns + // the real OS exit code and is cheap right after !IsRunning(); we + // map the common NTSTATUS codes (Ctrl-C, access violation, DLL + // init failure, …) to human-readable names. On POSIX the exit + // code is only populated via Wait() which we never call, so + // stick with the plain reason. + std::string Reason = "process exited"; +#if ZEN_PLATFORM_WINDOWS + const uint32_t ExitCode = static_cast<uint32_t>(S->GetClientProcess().GetExitCode()); + if (ExitCode != 0) + { + Reason = DescribeWindowsExitCode(ExitCode); + } +#endif + if (RemoveSession(S->Info().Id, Reason)) + { + ++Ended; + } + } + } + return Ended; +} + +SessionsService::PruneResult +SessionsService::PruneExpired(TimeSpan MaxAge, size_t MaxCount, uint64_t MaxStorageBytes) +{ + PruneResult Result; + std::vector<Oid> ToDelete; + + // Phase 1: age + count pruning (fast; purely in-memory). + { + RwLock::ExclusiveLockScope Lock(m_Lock); + + const uint64_t NowTicks = DateTime::Now().GetTicks(); + const uint64_t CutoffTicks = NowTicks > MaxAge.GetTicks() ? NowTicks - MaxAge.GetTicks() : 0; + const DateTime Cutoff{CutoffTicks}; + + // Age-based pruning: drop ended sessions whose reference time is + // older than Cutoff. + auto ExpiredIt = std::remove_if(m_EndedSessions.begin(), m_EndedSessions.end(), [&](const Ref<Session>& S) { + if (ReferenceTime(S->Info()) < Cutoff) + { + ToDelete.push_back(S->Info().Id); + return true; + } + return false; + }); + Result.ExpiredByAge = size_t(m_EndedSessions.end() - ExpiredIt); + m_EndedSessions.erase(ExpiredIt, m_EndedSessions.end()); + + // Count-based pruning: keep at most MaxCount sessions total. Active + // sessions are never touched; if there are already >= MaxCount active + // sessions then all ended sessions get evicted. + const size_t ActiveCount = m_Sessions.size(); + const size_t EndedTarget = MaxCount > ActiveCount ? MaxCount - ActiveCount : 0; + if (m_EndedSessions.size() > EndedTarget) + { + const size_t ToRemove = m_EndedSessions.size() - EndedTarget; + // Move the `ToRemove` oldest entries to the front. + std::partial_sort( + m_EndedSessions.begin(), + m_EndedSessions.begin() + ToRemove, + m_EndedSessions.end(), + [](const Ref<Session>& A, const Ref<Session>& B) { return ReferenceTime(A->Info()) < ReferenceTime(B->Info()); }); + for (size_t i = 0; i < ToRemove; ++i) + { + ToDelete.push_back(m_EndedSessions[i]->Info().Id); + } + m_EndedSessions.erase(m_EndedSessions.begin(), m_EndedSessions.begin() + ToRemove); + Result.ExpiredByCount = ToRemove; + } + } + + // Phase 1 disk deletion, outside the service lock. + if (m_SessionLogs) + { + for (const Oid& Id : ToDelete) + { + m_SessionLogs->DeleteSession(Id); + } + } + + // Phase 2: storage-footprint pruning. Snapshot remaining ended sessions + // (id + reference time) under a shared lock, then stat each directory + // outside the lock so we don't hold writers off during filesystem calls. + if (!m_SessionLogs) + { + return Result; + } + + struct Candidate + { + Oid Id; + DateTime RefTime; + uint64_t Size = 0; + }; + std::vector<Candidate> Candidates; + { + RwLock::SharedLockScope Lock(m_Lock); + Candidates.reserve(m_EndedSessions.size()); + for (const Ref<Session>& S : m_EndedSessions) + { + Candidates.push_back(Candidate{.Id = S->Info().Id, .RefTime = ReferenceTime(S->Info())}); + } + } + + uint64_t TotalBytes = 0; + for (Candidate& C : Candidates) + { + C.Size = m_SessionLogs->GetSessionSize(C.Id); + TotalBytes += C.Size; + } + + if (TotalBytes <= MaxStorageBytes) + { + return Result; + } + + // Oldest first so we evict in chronological order. + std::sort(Candidates.begin(), Candidates.end(), [](const Candidate& A, const Candidate& B) { return A.RefTime < B.RefTime; }); + + std::vector<Oid> StorageDelete; + uint64_t Reclaimed = 0; + const uint64_t NeedBytes = TotalBytes - MaxStorageBytes; + for (const Candidate& C : Candidates) + { + if (Reclaimed >= NeedBytes) + { + break; + } + StorageDelete.push_back(C.Id); + Reclaimed += C.Size; + } + + if (StorageDelete.empty()) + { + return Result; + } + + // Erase from m_EndedSessions under exclusive lock. Concurrent RemoveSession + // calls between the snapshot and here will have inserted new entries at + // the back, which we safely leave alone. + { + RwLock::ExclusiveLockScope Lock(m_Lock); + tsl::robin_map<Oid, uint8_t, Oid::Hasher> IdSet; + for (const Oid& Id : StorageDelete) + { + IdSet[Id] = 1; + } + auto It = std::remove_if(m_EndedSessions.begin(), m_EndedSessions.end(), [&](const Ref<Session>& S) { + return IdSet.contains(S->Info().Id); + }); + m_EndedSessions.erase(It, m_EndedSessions.end()); + } + + for (const Oid& Id : StorageDelete) + { + m_SessionLogs->DeleteSession(Id); + } + Result.ExpiredByStorage = StorageDelete.size(); + + return Result; +} + } // namespace zen |