aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2026-03-15 12:53:46 +0100
committerStefan Boberg <[email protected]>2026-03-15 12:53:46 +0100
commit0da02a3e6d13c1626e6422f6d7ead3848036a146 (patch)
tree94a6cd36c25df00455a727850fb14d440dc5b0c5
parentUse gathered buffer sequence in TcpLogStreamSink for batched TCP writes (diff)
downloadzen-sb/sessionize.tar.xz
zen-sb/sessionize.zip
Add sequence numbers to log stream protocol and tests for drop detectionsb/sessionize
TcpLogStreamSink now stamps each message with a monotonic sequence number. LogStreamListener tracks the expected sequence per session and emits a synthetic "dropped" notice when gaps appear. Includes tests covering basic delivery, multi-line splitting, drop detection, and contiguous sequencing. Also simplifies LogMessage::s_DefaultPoint to a single entry.
-rw-r--r--src/zencore/include/zencore/logging/logmsg.h12
-rw-r--r--src/zenutil/include/zenutil/splitconsole/logstreamlistener.h2
-rw-r--r--src/zenutil/include/zenutil/splitconsole/tcplogstreamsink.h10
-rw-r--r--src/zenutil/splitconsole/logstreamlistener.cpp164
-rw-r--r--src/zenutil/zenutil.cpp4
5 files changed, 179 insertions, 13 deletions
diff --git a/src/zencore/include/zencore/logging/logmsg.h b/src/zencore/include/zencore/logging/logmsg.h
index 1d8b6b1b7..45b0e38fb 100644
--- a/src/zencore/include/zencore/logging/logmsg.h
+++ b/src/zencore/include/zencore/logging/logmsg.h
@@ -44,22 +44,14 @@ struct LogMessage
mutable size_t ColorRangeEnd = 0;
private:
- static constexpr LogPoint s_DefaultPoints[LogLevelCount] = {
- {{}, Trace, {}},
- {{}, Debug, {}},
- {{}, Info, {}},
- {{}, Warn, {}},
- {{}, Err, {}},
- {{}, Critical, {}},
- {{}, Off, {}},
- };
+ static constexpr LogPoint s_DefaultPoint{{}, Off, {}};
std::string_view m_LoggerName;
LogLevel m_Level = Off;
std::chrono::system_clock::time_point m_Time;
SourceLocation m_Source;
std::string_view m_Payload;
- const LogPoint* m_Point = &s_DefaultPoints[Off];
+ const LogPoint* m_Point = &s_DefaultPoint;
int m_ThreadId = 0;
};
diff --git a/src/zenutil/include/zenutil/splitconsole/logstreamlistener.h b/src/zenutil/include/zenutil/splitconsole/logstreamlistener.h
index a0e39ba47..f3b960f51 100644
--- a/src/zenutil/include/zenutil/splitconsole/logstreamlistener.h
+++ b/src/zenutil/include/zenutil/splitconsole/logstreamlistener.h
@@ -56,4 +56,6 @@ private:
std::unique_ptr<Impl> m_Impl;
};
+void logstreamlistener_forcelink();
+
} // namespace zen
diff --git a/src/zenutil/include/zenutil/splitconsole/tcplogstreamsink.h b/src/zenutil/include/zenutil/splitconsole/tcplogstreamsink.h
index e6320d9d5..f4ac5ff22 100644
--- a/src/zenutil/include/zenutil/splitconsole/tcplogstreamsink.h
+++ b/src/zenutil/include/zenutil/splitconsole/tcplogstreamsink.h
@@ -11,6 +11,7 @@ ZEN_THIRD_PARTY_INCLUDES_START
#include <asio.hpp>
ZEN_THIRD_PARTY_INCLUDES_END
+#include <atomic>
#include <condition_variable>
#include <deque>
#include <mutex>
@@ -60,11 +61,14 @@ public:
Text.remove_suffix(1);
}
- // Build CbObject with text, source, and level fields
+ uint64_t Seq = m_NextSequence.fetch_add(1, std::memory_order_relaxed);
+
+ // Build CbObject with text, source, level, and sequence number fields
CbObjectWriter Writer;
Writer.AddString("text", Text);
Writer.AddString("source", m_Source);
Writer.AddString("level", logging::ToStringView(Msg.GetLevel()));
+ Writer.AddInteger("seq", Seq);
CbObject Obj = Writer.Save();
// Enqueue for async write
@@ -172,6 +176,10 @@ private:
std::string m_Source;
uint32_t m_MaxQueueSize;
+ // Sequence counter — incremented atomically by Log() callers.
+ // Gaps in the sequence seen by the receiver indicate dropped messages.
+ std::atomic<uint64_t> m_NextSequence{0};
+
// Queue shared between Log() callers and IO thread
std::mutex m_QueueMutex;
std::condition_variable m_QueueCv;
diff --git a/src/zenutil/splitconsole/logstreamlistener.cpp b/src/zenutil/splitconsole/logstreamlistener.cpp
index c51794d27..fa554ba96 100644
--- a/src/zenutil/splitconsole/logstreamlistener.cpp
+++ b/src/zenutil/splitconsole/logstreamlistener.cpp
@@ -66,6 +66,15 @@ private:
std::string_view Text = Obj["text"].AsString();
std::string_view Source = Obj["source"].AsString();
+ // Check sequence number for gaps (dropped messages)
+ uint64_t Seq = Obj["seq"].AsUInt64();
+ if (Seq > m_NextExpectedSeq)
+ {
+ uint64_t Dropped = Seq - m_NextExpectedSeq;
+ m_Target.AppendLogLine(fmt::format("[{}] *** {} log message(s) dropped ***", Source.empty() ? "log" : Source, Dropped));
+ }
+ m_NextExpectedSeq = Seq + 1;
+
// Split multi-line messages into individual AppendLogLine calls so that
// each line gets its own row in the target's log output.
while (!Text.empty())
@@ -122,7 +131,8 @@ private:
asio::ip::tcp::socket m_Socket;
LogStreamTarget& m_Target;
std::array<uint8_t, 65536> m_ReadBuf{};
- std::size_t m_BufferUsed = 0;
+ std::size_t m_BufferUsed = 0;
+ uint64_t m_NextExpectedSeq = 0;
};
//////////////////////////////////////////////////////////////////////////
@@ -261,3 +271,155 @@ LogStreamListener::Shutdown()
}
} // namespace zen
+
+#if ZEN_WITH_TESTS
+
+# include <zencore/testing.h>
+# include <zenutil/splitconsole/tcplogstreamsink.h>
+
+namespace zen {
+
+void
+logstreamlistener_forcelink()
+{
+}
+
+namespace {
+
+ class CollectingTarget : public LogStreamTarget
+ {
+ public:
+ void AppendLogLine(std::string_view Text) override
+ {
+ std::lock_guard<std::mutex> Lock(m_Mutex);
+ m_Lines.emplace_back(Text);
+ m_Cv.notify_all();
+ }
+
+ std::vector<std::string> WaitForLines(size_t Count, std::chrono::milliseconds Timeout = std::chrono::milliseconds(5000))
+ {
+ std::unique_lock<std::mutex> Lock(m_Mutex);
+ m_Cv.wait_for(Lock, Timeout, [&]() { return m_Lines.size() >= Count; });
+ return m_Lines;
+ }
+
+ private:
+ std::mutex m_Mutex;
+ std::condition_variable m_Cv;
+ std::vector<std::string> m_Lines;
+ };
+
+ logging::LogMessage MakeLogMessage(std::string_view Text, logging::LogLevel Level = logging::Info)
+ {
+ static logging::LogPoint Point{{}, Level, {}};
+ Point.Level = Level;
+ return logging::LogMessage(Point, "test", Text);
+ }
+
+} // namespace
+
+TEST_SUITE_BEGIN("util.logstreamlistener");
+
+TEST_CASE("BasicMessageDelivery")
+{
+ CollectingTarget Target;
+ LogStreamListener Listener(Target);
+
+ {
+ TcpLogStreamSink Sink("127.0.0.1", Listener.GetPort(), "TestSource", 64);
+ Sink.Log(MakeLogMessage("hello world"));
+ Sink.Log(MakeLogMessage("second line"));
+ }
+
+ auto Lines = Target.WaitForLines(2);
+ REQUIRE(Lines.size() == 2);
+ CHECK(Lines[0] == "[TestSource] hello world");
+ CHECK(Lines[1] == "[TestSource] second line");
+}
+
+TEST_CASE("MultiLineMessageSplit")
+{
+ CollectingTarget Target;
+ LogStreamListener Listener(Target);
+
+ {
+ TcpLogStreamSink Sink("127.0.0.1", Listener.GetPort(), "src", 64);
+ Sink.Log(MakeLogMessage("line1\nline2\nline3"));
+ }
+
+ auto Lines = Target.WaitForLines(3);
+ REQUIRE(Lines.size() == 3);
+ CHECK(Lines[0] == "[src] line1");
+ CHECK(Lines[1] == "[src] line2");
+ CHECK(Lines[2] == "[src] line3");
+}
+
+TEST_CASE("DroppedMessageDetection")
+{
+ CollectingTarget Target;
+ LogStreamListener Listener(Target);
+
+ // Use a tiny queue so messages get dropped
+ {
+ TcpLogStreamSink Sink("127.0.0.1", Listener.GetPort(), "src", 2);
+
+ // Flood the queue while the sink hasn't connected yet — the first messages
+ // will be evicted before they can be sent.
+ for (int i = 0; i < 10; i++)
+ {
+ Sink.Log(MakeLogMessage(fmt::format("msg{}", i)));
+ }
+ }
+
+ // Wait for whatever arrives
+ auto Lines = Target.WaitForLines(1);
+ REQUIRE(!Lines.empty());
+
+ // At least one line should be the "dropped" notification
+ bool FoundDropNotice = false;
+ for (auto& Line : Lines)
+ {
+ if (Line.find("dropped") != std::string::npos)
+ {
+ FoundDropNotice = true;
+ break;
+ }
+ }
+ CHECK(FoundDropNotice);
+}
+
+TEST_CASE("SequenceNumbersAreContiguous")
+{
+ CollectingTarget Target;
+ LogStreamListener Listener(Target);
+
+ constexpr int NumMessages = 5;
+ {
+ TcpLogStreamSink Sink("127.0.0.1", Listener.GetPort(), "seq", 64);
+ for (int i = 0; i < NumMessages; i++)
+ {
+ Sink.Log(MakeLogMessage(fmt::format("msg{}", i)));
+ }
+ }
+
+ auto Lines = Target.WaitForLines(NumMessages);
+ REQUIRE(Lines.size() == NumMessages);
+
+ // No "dropped" notices should appear when nothing is dropped
+ for (auto& Line : Lines)
+ {
+ CHECK(Line.find("dropped") == std::string::npos);
+ }
+
+ // Verify ordering
+ for (int i = 0; i < NumMessages; i++)
+ {
+ CHECK(Lines[i] == fmt::format("[seq] msg{}", i));
+ }
+}
+
+TEST_SUITE_END();
+
+} // namespace zen
+
+#endif
diff --git a/src/zenutil/zenutil.cpp b/src/zenutil/zenutil.cpp
index 291dbeadd..9172e3134 100644
--- a/src/zenutil/zenutil.cpp
+++ b/src/zenutil/zenutil.cpp
@@ -4,8 +4,9 @@
#if ZEN_WITH_TESTS
-# include <zenutil/rpcrecording.h>
# include <zenutil/config/commandlineoptions.h>
+# include <zenutil/rpcrecording.h>
+# include <zenutil/splitconsole/logstreamlistener.h>
# include <zenutil/wildcard.h>
namespace zen {
@@ -15,6 +16,7 @@ zenutil_forcelinktests()
{
cache::rpcrecord_forcelink();
commandlineoptions_forcelink();
+ logstreamlistener_forcelink();
wildcard_forcelink();
}