// Copyright Epic Games, Inc. All Rights Reserved. #include #include #include #include #include #include #include namespace zen::logging { struct AsyncLogMessage { enum class Type : uint8_t { Log, Flush, Shutdown }; Type MsgType = Type::Log; // Points to the LogPoint from upstream logging code. LogMessage guarantees // this is always valid (either a static LogPoint from ZEN_LOG macros or one // of the per-level default LogPoints). const LogPoint* Point = nullptr; int ThreadId = 0; std::string OwnedPayload; std::string OwnedLoggerName; std::chrono::system_clock::time_point Time; std::shared_ptr> FlushPromise; }; struct AsyncSink::Impl { explicit Impl(std::vector InSinks) : m_Sinks(std::move(InSinks)) { m_WorkerThread = std::thread([this]() { zen::SetCurrentThreadName("AsyncLog"); WorkerLoop(); }); } ~Impl() { AsyncLogMessage ShutdownMsg; ShutdownMsg.MsgType = AsyncLogMessage::Type::Shutdown; m_Queue.Enqueue(std::move(ShutdownMsg)); if (m_WorkerThread.joinable()) { m_WorkerThread.join(); } } void Log(const LogMessage& Msg) { AsyncLogMessage AsyncMsg; AsyncMsg.OwnedPayload = std::string(Msg.GetPayload()); AsyncMsg.OwnedLoggerName = std::string(Msg.GetLoggerName()); AsyncMsg.ThreadId = Msg.GetThreadId(); AsyncMsg.Time = Msg.GetTime(); AsyncMsg.Point = &Msg.GetLogPoint(); AsyncMsg.MsgType = AsyncLogMessage::Type::Log; m_Queue.Enqueue(std::move(AsyncMsg)); } void Flush() { auto Promise = std::make_shared>(); auto Future = Promise->get_future(); AsyncLogMessage FlushMsg; FlushMsg.MsgType = AsyncLogMessage::Type::Flush; FlushMsg.FlushPromise = std::move(Promise); m_Queue.Enqueue(std::move(FlushMsg)); Future.get(); } void SetFormatter(std::unique_ptr InFormatter) { for (auto& CurrentSink : m_Sinks) { CurrentSink->SetFormatter(InFormatter->Clone()); } } private: void ForwardLogToSinks(const AsyncLogMessage& AsyncMsg) { LogMessage Reconstructed(*AsyncMsg.Point, AsyncMsg.OwnedLoggerName, AsyncMsg.OwnedPayload); Reconstructed.SetTime(AsyncMsg.Time); Reconstructed.SetThreadId(AsyncMsg.ThreadId); for (auto& CurrentSink : m_Sinks) { if (CurrentSink->ShouldLog(Reconstructed.GetLevel())) { try { CurrentSink->Log(Reconstructed); } catch (const std::exception&) { } } } } void FlushSinks() { for (auto& CurrentSink : m_Sinks) { try { CurrentSink->Flush(); } catch (const std::exception&) { } } } void WorkerLoop() { AsyncLogMessage Msg; while (m_Queue.WaitAndDequeue(Msg)) { switch (Msg.MsgType) { case AsyncLogMessage::Type::Log: { ForwardLogToSinks(Msg); break; } case AsyncLogMessage::Type::Flush: { FlushSinks(); if (Msg.FlushPromise) { Msg.FlushPromise->set_value(); } break; } case AsyncLogMessage::Type::Shutdown: { m_Queue.CompleteAdding(); AsyncLogMessage Remaining; while (m_Queue.WaitAndDequeue(Remaining)) { if (Remaining.MsgType == AsyncLogMessage::Type::Log) { ForwardLogToSinks(Remaining); } else if (Remaining.MsgType == AsyncLogMessage::Type::Flush) { FlushSinks(); if (Remaining.FlushPromise) { Remaining.FlushPromise->set_value(); } } } FlushSinks(); return; } } } } std::vector m_Sinks; BlockingQueue m_Queue; std::thread m_WorkerThread; }; AsyncSink::AsyncSink(std::vector InSinks) : m_Impl(std::make_unique(std::move(InSinks))) { } AsyncSink::~AsyncSink() = default; void AsyncSink::Log(const LogMessage& Msg) { m_Impl->Log(Msg); } void AsyncSink::Flush() { m_Impl->Flush(); } void AsyncSink::SetFormatter(std::unique_ptr InFormatter) { m_Impl->SetFormatter(std::move(InFormatter)); } } // namespace zen::logging