diff options
Diffstat (limited to 'src/zenutil/splitconsole')
| -rw-r--r-- | src/zenutil/splitconsole/logstreamlistener.cpp | 164 |
1 files changed, 163 insertions, 1 deletions
diff --git a/src/zenutil/splitconsole/logstreamlistener.cpp b/src/zenutil/splitconsole/logstreamlistener.cpp index c51794d27..fa554ba96 100644 --- a/src/zenutil/splitconsole/logstreamlistener.cpp +++ b/src/zenutil/splitconsole/logstreamlistener.cpp @@ -66,6 +66,15 @@ private: std::string_view Text = Obj["text"].AsString(); std::string_view Source = Obj["source"].AsString(); + // Check sequence number for gaps (dropped messages) + uint64_t Seq = Obj["seq"].AsUInt64(); + if (Seq > m_NextExpectedSeq) + { + uint64_t Dropped = Seq - m_NextExpectedSeq; + m_Target.AppendLogLine(fmt::format("[{}] *** {} log message(s) dropped ***", Source.empty() ? "log" : Source, Dropped)); + } + m_NextExpectedSeq = Seq + 1; + // Split multi-line messages into individual AppendLogLine calls so that // each line gets its own row in the target's log output. while (!Text.empty()) @@ -122,7 +131,8 @@ private: asio::ip::tcp::socket m_Socket; LogStreamTarget& m_Target; std::array<uint8_t, 65536> m_ReadBuf{}; - std::size_t m_BufferUsed = 0; + std::size_t m_BufferUsed = 0; + uint64_t m_NextExpectedSeq = 0; }; ////////////////////////////////////////////////////////////////////////// @@ -261,3 +271,155 @@ LogStreamListener::Shutdown() } } // namespace zen + +#if ZEN_WITH_TESTS + +# include <zencore/testing.h> +# include <zenutil/splitconsole/tcplogstreamsink.h> + +namespace zen { + +void +logstreamlistener_forcelink() +{ +} + +namespace { + + class CollectingTarget : public LogStreamTarget + { + public: + void AppendLogLine(std::string_view Text) override + { + std::lock_guard<std::mutex> Lock(m_Mutex); + m_Lines.emplace_back(Text); + m_Cv.notify_all(); + } + + std::vector<std::string> WaitForLines(size_t Count, std::chrono::milliseconds Timeout = std::chrono::milliseconds(5000)) + { + std::unique_lock<std::mutex> Lock(m_Mutex); + m_Cv.wait_for(Lock, Timeout, [&]() { return m_Lines.size() >= Count; }); + return m_Lines; + } + + private: + std::mutex m_Mutex; + std::condition_variable m_Cv; + std::vector<std::string> m_Lines; + }; + + logging::LogMessage MakeLogMessage(std::string_view Text, logging::LogLevel Level = logging::Info) + { + static logging::LogPoint Point{{}, Level, {}}; + Point.Level = Level; + return logging::LogMessage(Point, "test", Text); + } + +} // namespace + +TEST_SUITE_BEGIN("util.logstreamlistener"); + +TEST_CASE("BasicMessageDelivery") +{ + CollectingTarget Target; + LogStreamListener Listener(Target); + + { + TcpLogStreamSink Sink("127.0.0.1", Listener.GetPort(), "TestSource", 64); + Sink.Log(MakeLogMessage("hello world")); + Sink.Log(MakeLogMessage("second line")); + } + + auto Lines = Target.WaitForLines(2); + REQUIRE(Lines.size() == 2); + CHECK(Lines[0] == "[TestSource] hello world"); + CHECK(Lines[1] == "[TestSource] second line"); +} + +TEST_CASE("MultiLineMessageSplit") +{ + CollectingTarget Target; + LogStreamListener Listener(Target); + + { + TcpLogStreamSink Sink("127.0.0.1", Listener.GetPort(), "src", 64); + Sink.Log(MakeLogMessage("line1\nline2\nline3")); + } + + auto Lines = Target.WaitForLines(3); + REQUIRE(Lines.size() == 3); + CHECK(Lines[0] == "[src] line1"); + CHECK(Lines[1] == "[src] line2"); + CHECK(Lines[2] == "[src] line3"); +} + +TEST_CASE("DroppedMessageDetection") +{ + CollectingTarget Target; + LogStreamListener Listener(Target); + + // Use a tiny queue so messages get dropped + { + TcpLogStreamSink Sink("127.0.0.1", Listener.GetPort(), "src", 2); + + // Flood the queue while the sink hasn't connected yet — the first messages + // will be evicted before they can be sent. + for (int i = 0; i < 10; i++) + { + Sink.Log(MakeLogMessage(fmt::format("msg{}", i))); + } + } + + // Wait for whatever arrives + auto Lines = Target.WaitForLines(1); + REQUIRE(!Lines.empty()); + + // At least one line should be the "dropped" notification + bool FoundDropNotice = false; + for (auto& Line : Lines) + { + if (Line.find("dropped") != std::string::npos) + { + FoundDropNotice = true; + break; + } + } + CHECK(FoundDropNotice); +} + +TEST_CASE("SequenceNumbersAreContiguous") +{ + CollectingTarget Target; + LogStreamListener Listener(Target); + + constexpr int NumMessages = 5; + { + TcpLogStreamSink Sink("127.0.0.1", Listener.GetPort(), "seq", 64); + for (int i = 0; i < NumMessages; i++) + { + Sink.Log(MakeLogMessage(fmt::format("msg{}", i))); + } + } + + auto Lines = Target.WaitForLines(NumMessages); + REQUIRE(Lines.size() == NumMessages); + + // No "dropped" notices should appear when nothing is dropped + for (auto& Line : Lines) + { + CHECK(Line.find("dropped") == std::string::npos); + } + + // Verify ordering + for (int i = 0; i < NumMessages; i++) + { + CHECK(Lines[i] == fmt::format("[seq] msg{}", i)); + } +} + +TEST_SUITE_END(); + +} // namespace zen + +#endif |