// Copyright Epic Games, Inc. All Rights Reserved. #include #include namespace zen::logging { AsyncSink::AsyncSink(std::vector InSinks) : m_Sinks(std::move(InSinks)) { m_WorkerThread = std::thread([this]() { zen::SetCurrentThreadName("AsyncLog"); WorkerLoop(); }); } AsyncSink::~AsyncSink() { AsyncLogMessage ShutdownMsg; ShutdownMsg.MsgType = AsyncLogMessage::Type::Shutdown; m_Queue.Enqueue(std::move(ShutdownMsg)); if (m_WorkerThread.joinable()) { m_WorkerThread.join(); } } void AsyncSink::Log(const LogMessage& Msg) { AsyncLogMessage AsyncMsg; AsyncMsg.OwnedPayload = std::string(Msg.Payload); AsyncMsg.OwnedLoggerName = std::string(Msg.LoggerName); AsyncMsg.Level = Msg.Level; AsyncMsg.Time = Msg.Time; AsyncMsg.ThreadId = Msg.ThreadId; AsyncMsg.Source = Msg.Source; AsyncMsg.MsgType = AsyncLogMessage::Type::Log; m_Queue.Enqueue(std::move(AsyncMsg)); } void AsyncSink::Flush() { auto Promise = std::make_shared>(); auto Future = Promise->get_future(); AsyncLogMessage FlushMsg; FlushMsg.MsgType = AsyncLogMessage::Type::Flush; FlushMsg.FlushPromise = std::move(Promise); m_Queue.Enqueue(std::move(FlushMsg)); Future.get(); } void AsyncSink::SetFormatter(std::unique_ptr InFormatter) { for (auto& CurrentSink : m_Sinks) { CurrentSink->SetFormatter(InFormatter->Clone()); } } void AsyncSink::ForwardLogToSinks(const AsyncLogMessage& AsyncMsg) { LogMessage Reconstructed; Reconstructed.Payload = AsyncMsg.OwnedPayload; Reconstructed.LoggerName = AsyncMsg.OwnedLoggerName; Reconstructed.Level = AsyncMsg.Level; Reconstructed.Time = AsyncMsg.Time; Reconstructed.ThreadId = AsyncMsg.ThreadId; Reconstructed.Source = AsyncMsg.Source; for (auto& CurrentSink : m_Sinks) { if (CurrentSink->ShouldLog(Reconstructed.Level)) { try { CurrentSink->Log(Reconstructed); } catch (const std::exception&) { } } } } void AsyncSink::FlushSinks() { for (auto& CurrentSink : m_Sinks) { try { CurrentSink->Flush(); } catch (const std::exception&) { } } } void AsyncSink::WorkerLoop() { AsyncLogMessage Msg; while (m_Queue.WaitAndDequeue(Msg)) { switch (Msg.MsgType) { case AsyncLogMessage::Type::Log: { ForwardLogToSinks(Msg); break; } case AsyncLogMessage::Type::Flush: { FlushSinks(); if (Msg.FlushPromise) { Msg.FlushPromise->set_value(); } break; } case AsyncLogMessage::Type::Shutdown: { // Signal no more messages will be enqueued so the drain loop // terminates once the queue is empty instead of blocking. m_Queue.CompleteAdding(); // Drain any remaining messages AsyncLogMessage Remaining; while (m_Queue.WaitAndDequeue(Remaining)) { if (Remaining.MsgType == AsyncLogMessage::Type::Log) { ForwardLogToSinks(Remaining); } else if (Remaining.MsgType == AsyncLogMessage::Type::Flush) { FlushSinks(); if (Remaining.FlushPromise) { Remaining.FlushPromise->set_value(); } } } FlushSinks(); return; } } } } } // namespace zen::logging