aboutsummaryrefslogtreecommitdiff
path: root/src/zencore/logging/asyncsink.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zencore/logging/asyncsink.cpp')
-rw-r--r--src/zencore/logging/asyncsink.cpp212
1 files changed, 212 insertions, 0 deletions
diff --git a/src/zencore/logging/asyncsink.cpp b/src/zencore/logging/asyncsink.cpp
new file mode 100644
index 000000000..02bf9f3ba
--- /dev/null
+++ b/src/zencore/logging/asyncsink.cpp
@@ -0,0 +1,212 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <zencore/logging/asyncsink.h>
+
+#include <zencore/blockingqueue.h>
+#include <zencore/logging/logmsg.h>
+#include <zencore/thread.h>
+
+#include <future>
+#include <string>
+#include <thread>
+
+namespace zen::logging {
+
+struct AsyncLogMessage
+{
+ enum class Type : uint8_t
+ {
+ Log,
+ Flush,
+ Shutdown
+ };
+
+ Type MsgType = Type::Log;
+
+ // Points to the LogPoint from upstream logging code. LogMessage guarantees
+ // this is always valid (either a static LogPoint from ZEN_LOG macros or one
+ // of the per-level default LogPoints).
+ const LogPoint* Point = nullptr;
+
+ int ThreadId = 0;
+ std::string OwnedPayload;
+ std::string OwnedLoggerName;
+ std::chrono::system_clock::time_point Time;
+
+ std::shared_ptr<std::promise<void>> FlushPromise;
+};
+
+struct AsyncSink::Impl
+{
+ explicit Impl(std::vector<SinkPtr> InSinks) : m_Sinks(std::move(InSinks))
+ {
+ m_WorkerThread = std::thread([this]() {
+ zen::SetCurrentThreadName("AsyncLog");
+ WorkerLoop();
+ });
+ }
+
+ ~Impl()
+ {
+ AsyncLogMessage ShutdownMsg;
+ ShutdownMsg.MsgType = AsyncLogMessage::Type::Shutdown;
+ m_Queue.Enqueue(std::move(ShutdownMsg));
+
+ if (m_WorkerThread.joinable())
+ {
+ m_WorkerThread.join();
+ }
+ }
+
+ void Log(const LogMessage& Msg)
+ {
+ AsyncLogMessage AsyncMsg;
+ AsyncMsg.OwnedPayload = std::string(Msg.GetPayload());
+ AsyncMsg.OwnedLoggerName = std::string(Msg.GetLoggerName());
+ AsyncMsg.ThreadId = Msg.GetThreadId();
+ AsyncMsg.Time = Msg.GetTime();
+ AsyncMsg.Point = &Msg.GetLogPoint();
+ AsyncMsg.MsgType = AsyncLogMessage::Type::Log;
+
+ m_Queue.Enqueue(std::move(AsyncMsg));
+ }
+
+ void Flush()
+ {
+ auto Promise = std::make_shared<std::promise<void>>();
+ auto Future = Promise->get_future();
+
+ AsyncLogMessage FlushMsg;
+ FlushMsg.MsgType = AsyncLogMessage::Type::Flush;
+ FlushMsg.FlushPromise = std::move(Promise);
+
+ m_Queue.Enqueue(std::move(FlushMsg));
+
+ Future.get();
+ }
+
+ void SetFormatter(std::unique_ptr<Formatter> InFormatter)
+ {
+ for (auto& CurrentSink : m_Sinks)
+ {
+ CurrentSink->SetFormatter(InFormatter->Clone());
+ }
+ }
+
+private:
+ void ForwardLogToSinks(const AsyncLogMessage& AsyncMsg)
+ {
+ LogMessage Reconstructed(*AsyncMsg.Point, AsyncMsg.OwnedLoggerName, AsyncMsg.OwnedPayload);
+ Reconstructed.SetTime(AsyncMsg.Time);
+ Reconstructed.SetThreadId(AsyncMsg.ThreadId);
+
+ for (auto& CurrentSink : m_Sinks)
+ {
+ if (CurrentSink->ShouldLog(Reconstructed.GetLevel()))
+ {
+ try
+ {
+ CurrentSink->Log(Reconstructed);
+ }
+ catch (const std::exception&)
+ {
+ }
+ }
+ }
+ }
+
+ void FlushSinks()
+ {
+ for (auto& CurrentSink : m_Sinks)
+ {
+ try
+ {
+ CurrentSink->Flush();
+ }
+ catch (const std::exception&)
+ {
+ }
+ }
+ }
+
+ void WorkerLoop()
+ {
+ AsyncLogMessage Msg;
+ while (m_Queue.WaitAndDequeue(Msg))
+ {
+ switch (Msg.MsgType)
+ {
+ case AsyncLogMessage::Type::Log:
+ {
+ ForwardLogToSinks(Msg);
+ break;
+ }
+
+ case AsyncLogMessage::Type::Flush:
+ {
+ FlushSinks();
+ if (Msg.FlushPromise)
+ {
+ Msg.FlushPromise->set_value();
+ }
+ break;
+ }
+
+ case AsyncLogMessage::Type::Shutdown:
+ {
+ m_Queue.CompleteAdding();
+
+ AsyncLogMessage Remaining;
+ while (m_Queue.WaitAndDequeue(Remaining))
+ {
+ if (Remaining.MsgType == AsyncLogMessage::Type::Log)
+ {
+ ForwardLogToSinks(Remaining);
+ }
+ else if (Remaining.MsgType == AsyncLogMessage::Type::Flush)
+ {
+ FlushSinks();
+ if (Remaining.FlushPromise)
+ {
+ Remaining.FlushPromise->set_value();
+ }
+ }
+ }
+
+ FlushSinks();
+ return;
+ }
+ }
+ }
+ }
+
+ std::vector<SinkPtr> m_Sinks;
+ BlockingQueue<AsyncLogMessage> m_Queue;
+ std::thread m_WorkerThread;
+};
+
+AsyncSink::AsyncSink(std::vector<SinkPtr> InSinks) : m_Impl(std::make_unique<Impl>(std::move(InSinks)))
+{
+}
+
+AsyncSink::~AsyncSink() = default;
+
+void
+AsyncSink::Log(const LogMessage& Msg)
+{
+ m_Impl->Log(Msg);
+}
+
+void
+AsyncSink::Flush()
+{
+ m_Impl->Flush();
+}
+
+void
+AsyncSink::SetFormatter(std::unique_ptr<Formatter> InFormatter)
+{
+ m_Impl->SetFormatter(std::move(InFormatter));
+}
+
+} // namespace zen::logging