diff options
| author | Stefan Boberg <[email protected]> | 2026-03-15 12:53:46 +0100 |
|---|---|---|
| committer | Stefan Boberg <[email protected]> | 2026-03-15 12:53:46 +0100 |
| commit | 0da02a3e6d13c1626e6422f6d7ead3848036a146 (patch) | |
| tree | 94a6cd36c25df00455a727850fb14d440dc5b0c5 | |
| parent | Use gathered buffer sequence in TcpLogStreamSink for batched TCP writes (diff) | |
| download | zen-sb/sessionize.tar.xz zen-sb/sessionize.zip | |
Add sequence numbers to log stream protocol and tests for drop detectionsb/sessionize
TcpLogStreamSink now stamps each message with a monotonic sequence number.
LogStreamListener tracks the expected sequence per session and emits a
synthetic "dropped" notice when gaps appear. Includes tests covering
basic delivery, multi-line splitting, drop detection, and contiguous
sequencing. Also simplifies LogMessage::s_DefaultPoint to a single entry.
| -rw-r--r-- | src/zencore/include/zencore/logging/logmsg.h | 12 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/splitconsole/logstreamlistener.h | 2 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/splitconsole/tcplogstreamsink.h | 10 | ||||
| -rw-r--r-- | src/zenutil/splitconsole/logstreamlistener.cpp | 164 | ||||
| -rw-r--r-- | src/zenutil/zenutil.cpp | 4 |
5 files changed, 179 insertions, 13 deletions
diff --git a/src/zencore/include/zencore/logging/logmsg.h b/src/zencore/include/zencore/logging/logmsg.h index 1d8b6b1b7..45b0e38fb 100644 --- a/src/zencore/include/zencore/logging/logmsg.h +++ b/src/zencore/include/zencore/logging/logmsg.h @@ -44,22 +44,14 @@ struct LogMessage mutable size_t ColorRangeEnd = 0; private: - static constexpr LogPoint s_DefaultPoints[LogLevelCount] = { - {{}, Trace, {}}, - {{}, Debug, {}}, - {{}, Info, {}}, - {{}, Warn, {}}, - {{}, Err, {}}, - {{}, Critical, {}}, - {{}, Off, {}}, - }; + static constexpr LogPoint s_DefaultPoint{{}, Off, {}}; std::string_view m_LoggerName; LogLevel m_Level = Off; std::chrono::system_clock::time_point m_Time; SourceLocation m_Source; std::string_view m_Payload; - const LogPoint* m_Point = &s_DefaultPoints[Off]; + const LogPoint* m_Point = &s_DefaultPoint; int m_ThreadId = 0; }; diff --git a/src/zenutil/include/zenutil/splitconsole/logstreamlistener.h b/src/zenutil/include/zenutil/splitconsole/logstreamlistener.h index a0e39ba47..f3b960f51 100644 --- a/src/zenutil/include/zenutil/splitconsole/logstreamlistener.h +++ b/src/zenutil/include/zenutil/splitconsole/logstreamlistener.h @@ -56,4 +56,6 @@ private: std::unique_ptr<Impl> m_Impl; }; +void logstreamlistener_forcelink(); + } // namespace zen diff --git a/src/zenutil/include/zenutil/splitconsole/tcplogstreamsink.h b/src/zenutil/include/zenutil/splitconsole/tcplogstreamsink.h index e6320d9d5..f4ac5ff22 100644 --- a/src/zenutil/include/zenutil/splitconsole/tcplogstreamsink.h +++ b/src/zenutil/include/zenutil/splitconsole/tcplogstreamsink.h @@ -11,6 +11,7 @@ ZEN_THIRD_PARTY_INCLUDES_START #include <asio.hpp> ZEN_THIRD_PARTY_INCLUDES_END +#include <atomic> #include <condition_variable> #include <deque> #include <mutex> @@ -60,11 +61,14 @@ public: Text.remove_suffix(1); } - // Build CbObject with text, source, and level fields + uint64_t Seq = m_NextSequence.fetch_add(1, std::memory_order_relaxed); + + // Build CbObject with text, source, level, and sequence number fields CbObjectWriter Writer; Writer.AddString("text", Text); Writer.AddString("source", m_Source); Writer.AddString("level", logging::ToStringView(Msg.GetLevel())); + Writer.AddInteger("seq", Seq); CbObject Obj = Writer.Save(); // Enqueue for async write @@ -172,6 +176,10 @@ private: std::string m_Source; uint32_t m_MaxQueueSize; + // Sequence counter — incremented atomically by Log() callers. + // Gaps in the sequence seen by the receiver indicate dropped messages. + std::atomic<uint64_t> m_NextSequence{0}; + // Queue shared between Log() callers and IO thread std::mutex m_QueueMutex; std::condition_variable m_QueueCv; diff --git a/src/zenutil/splitconsole/logstreamlistener.cpp b/src/zenutil/splitconsole/logstreamlistener.cpp index c51794d27..fa554ba96 100644 --- a/src/zenutil/splitconsole/logstreamlistener.cpp +++ b/src/zenutil/splitconsole/logstreamlistener.cpp @@ -66,6 +66,15 @@ private: std::string_view Text = Obj["text"].AsString(); std::string_view Source = Obj["source"].AsString(); + // Check sequence number for gaps (dropped messages) + uint64_t Seq = Obj["seq"].AsUInt64(); + if (Seq > m_NextExpectedSeq) + { + uint64_t Dropped = Seq - m_NextExpectedSeq; + m_Target.AppendLogLine(fmt::format("[{}] *** {} log message(s) dropped ***", Source.empty() ? "log" : Source, Dropped)); + } + m_NextExpectedSeq = Seq + 1; + // Split multi-line messages into individual AppendLogLine calls so that // each line gets its own row in the target's log output. while (!Text.empty()) @@ -122,7 +131,8 @@ private: asio::ip::tcp::socket m_Socket; LogStreamTarget& m_Target; std::array<uint8_t, 65536> m_ReadBuf{}; - std::size_t m_BufferUsed = 0; + std::size_t m_BufferUsed = 0; + uint64_t m_NextExpectedSeq = 0; }; ////////////////////////////////////////////////////////////////////////// @@ -261,3 +271,155 @@ LogStreamListener::Shutdown() } } // namespace zen + +#if ZEN_WITH_TESTS + +# include <zencore/testing.h> +# include <zenutil/splitconsole/tcplogstreamsink.h> + +namespace zen { + +void +logstreamlistener_forcelink() +{ +} + +namespace { + + class CollectingTarget : public LogStreamTarget + { + public: + void AppendLogLine(std::string_view Text) override + { + std::lock_guard<std::mutex> Lock(m_Mutex); + m_Lines.emplace_back(Text); + m_Cv.notify_all(); + } + + std::vector<std::string> WaitForLines(size_t Count, std::chrono::milliseconds Timeout = std::chrono::milliseconds(5000)) + { + std::unique_lock<std::mutex> Lock(m_Mutex); + m_Cv.wait_for(Lock, Timeout, [&]() { return m_Lines.size() >= Count; }); + return m_Lines; + } + + private: + std::mutex m_Mutex; + std::condition_variable m_Cv; + std::vector<std::string> m_Lines; + }; + + logging::LogMessage MakeLogMessage(std::string_view Text, logging::LogLevel Level = logging::Info) + { + static logging::LogPoint Point{{}, Level, {}}; + Point.Level = Level; + return logging::LogMessage(Point, "test", Text); + } + +} // namespace + +TEST_SUITE_BEGIN("util.logstreamlistener"); + +TEST_CASE("BasicMessageDelivery") +{ + CollectingTarget Target; + LogStreamListener Listener(Target); + + { + TcpLogStreamSink Sink("127.0.0.1", Listener.GetPort(), "TestSource", 64); + Sink.Log(MakeLogMessage("hello world")); + Sink.Log(MakeLogMessage("second line")); + } + + auto Lines = Target.WaitForLines(2); + REQUIRE(Lines.size() == 2); + CHECK(Lines[0] == "[TestSource] hello world"); + CHECK(Lines[1] == "[TestSource] second line"); +} + +TEST_CASE("MultiLineMessageSplit") +{ + CollectingTarget Target; + LogStreamListener Listener(Target); + + { + TcpLogStreamSink Sink("127.0.0.1", Listener.GetPort(), "src", 64); + Sink.Log(MakeLogMessage("line1\nline2\nline3")); + } + + auto Lines = Target.WaitForLines(3); + REQUIRE(Lines.size() == 3); + CHECK(Lines[0] == "[src] line1"); + CHECK(Lines[1] == "[src] line2"); + CHECK(Lines[2] == "[src] line3"); +} + +TEST_CASE("DroppedMessageDetection") +{ + CollectingTarget Target; + LogStreamListener Listener(Target); + + // Use a tiny queue so messages get dropped + { + TcpLogStreamSink Sink("127.0.0.1", Listener.GetPort(), "src", 2); + + // Flood the queue while the sink hasn't connected yet — the first messages + // will be evicted before they can be sent. + for (int i = 0; i < 10; i++) + { + Sink.Log(MakeLogMessage(fmt::format("msg{}", i))); + } + } + + // Wait for whatever arrives + auto Lines = Target.WaitForLines(1); + REQUIRE(!Lines.empty()); + + // At least one line should be the "dropped" notification + bool FoundDropNotice = false; + for (auto& Line : Lines) + { + if (Line.find("dropped") != std::string::npos) + { + FoundDropNotice = true; + break; + } + } + CHECK(FoundDropNotice); +} + +TEST_CASE("SequenceNumbersAreContiguous") +{ + CollectingTarget Target; + LogStreamListener Listener(Target); + + constexpr int NumMessages = 5; + { + TcpLogStreamSink Sink("127.0.0.1", Listener.GetPort(), "seq", 64); + for (int i = 0; i < NumMessages; i++) + { + Sink.Log(MakeLogMessage(fmt::format("msg{}", i))); + } + } + + auto Lines = Target.WaitForLines(NumMessages); + REQUIRE(Lines.size() == NumMessages); + + // No "dropped" notices should appear when nothing is dropped + for (auto& Line : Lines) + { + CHECK(Line.find("dropped") == std::string::npos); + } + + // Verify ordering + for (int i = 0; i < NumMessages; i++) + { + CHECK(Lines[i] == fmt::format("[seq] msg{}", i)); + } +} + +TEST_SUITE_END(); + +} // namespace zen + +#endif diff --git a/src/zenutil/zenutil.cpp b/src/zenutil/zenutil.cpp index 291dbeadd..9172e3134 100644 --- a/src/zenutil/zenutil.cpp +++ b/src/zenutil/zenutil.cpp @@ -4,8 +4,9 @@ #if ZEN_WITH_TESTS -# include <zenutil/rpcrecording.h> # include <zenutil/config/commandlineoptions.h> +# include <zenutil/rpcrecording.h> +# include <zenutil/splitconsole/logstreamlistener.h> # include <zenutil/wildcard.h> namespace zen { @@ -15,6 +16,7 @@ zenutil_forcelinktests() { cache::rpcrecord_forcelink(); commandlineoptions_forcelink(); + logstreamlistener_forcelink(); wildcard_forcelink(); } |