diff options
| author | Liam Mitchell <[email protected]> | 2026-03-09 19:06:36 -0700 |
|---|---|---|
| committer | Liam Mitchell <[email protected]> | 2026-03-09 19:06:36 -0700 |
| commit | d1abc50ee9d4fb72efc646e17decafea741caa34 (patch) | |
| tree | e4288e00f2f7ca0391b83d986efcb69d3ba66a83 /src/zencore/logging/asyncsink.cpp | |
| parent | Allow requests with invalid content-types unless specified in command line or... (diff) | |
| parent | updated chunk–block analyser (#818) (diff) | |
| download | zen-d1abc50ee9d4fb72efc646e17decafea741caa34.tar.xz zen-d1abc50ee9d4fb72efc646e17decafea741caa34.zip | |
Merge branch 'main' into lm/restrict-content-type
Diffstat (limited to 'src/zencore/logging/asyncsink.cpp')
| -rw-r--r-- | src/zencore/logging/asyncsink.cpp | 212 |
1 files changed, 212 insertions, 0 deletions
diff --git a/src/zencore/logging/asyncsink.cpp b/src/zencore/logging/asyncsink.cpp new file mode 100644 index 000000000..02bf9f3ba --- /dev/null +++ b/src/zencore/logging/asyncsink.cpp @@ -0,0 +1,212 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <zencore/logging/asyncsink.h> + +#include <zencore/blockingqueue.h> +#include <zencore/logging/logmsg.h> +#include <zencore/thread.h> + +#include <future> +#include <string> +#include <thread> + +namespace zen::logging { + +struct AsyncLogMessage +{ + enum class Type : uint8_t + { + Log, + Flush, + Shutdown + }; + + Type MsgType = Type::Log; + + // Points to the LogPoint from upstream logging code. LogMessage guarantees + // this is always valid (either a static LogPoint from ZEN_LOG macros or one + // of the per-level default LogPoints). + const LogPoint* Point = nullptr; + + int ThreadId = 0; + std::string OwnedPayload; + std::string OwnedLoggerName; + std::chrono::system_clock::time_point Time; + + std::shared_ptr<std::promise<void>> FlushPromise; +}; + +struct AsyncSink::Impl +{ + explicit Impl(std::vector<SinkPtr> InSinks) : m_Sinks(std::move(InSinks)) + { + m_WorkerThread = std::thread([this]() { + zen::SetCurrentThreadName("AsyncLog"); + WorkerLoop(); + }); + } + + ~Impl() + { + AsyncLogMessage ShutdownMsg; + ShutdownMsg.MsgType = AsyncLogMessage::Type::Shutdown; + m_Queue.Enqueue(std::move(ShutdownMsg)); + + if (m_WorkerThread.joinable()) + { + m_WorkerThread.join(); + } + } + + void Log(const LogMessage& Msg) + { + AsyncLogMessage AsyncMsg; + AsyncMsg.OwnedPayload = std::string(Msg.GetPayload()); + AsyncMsg.OwnedLoggerName = std::string(Msg.GetLoggerName()); + AsyncMsg.ThreadId = Msg.GetThreadId(); + AsyncMsg.Time = Msg.GetTime(); + AsyncMsg.Point = &Msg.GetLogPoint(); + AsyncMsg.MsgType = AsyncLogMessage::Type::Log; + + m_Queue.Enqueue(std::move(AsyncMsg)); + } + + void Flush() + { + auto Promise = std::make_shared<std::promise<void>>(); + auto Future = Promise->get_future(); + + AsyncLogMessage FlushMsg; + FlushMsg.MsgType = AsyncLogMessage::Type::Flush; + FlushMsg.FlushPromise = std::move(Promise); + + m_Queue.Enqueue(std::move(FlushMsg)); + + Future.get(); + } + + void SetFormatter(std::unique_ptr<Formatter> InFormatter) + { + for (auto& CurrentSink : m_Sinks) + { + CurrentSink->SetFormatter(InFormatter->Clone()); + } + } + +private: + void ForwardLogToSinks(const AsyncLogMessage& AsyncMsg) + { + LogMessage Reconstructed(*AsyncMsg.Point, AsyncMsg.OwnedLoggerName, AsyncMsg.OwnedPayload); + Reconstructed.SetTime(AsyncMsg.Time); + Reconstructed.SetThreadId(AsyncMsg.ThreadId); + + for (auto& CurrentSink : m_Sinks) + { + if (CurrentSink->ShouldLog(Reconstructed.GetLevel())) + { + try + { + CurrentSink->Log(Reconstructed); + } + catch (const std::exception&) + { + } + } + } + } + + void FlushSinks() + { + for (auto& CurrentSink : m_Sinks) + { + try + { + CurrentSink->Flush(); + } + catch (const std::exception&) + { + } + } + } + + void WorkerLoop() + { + AsyncLogMessage Msg; + while (m_Queue.WaitAndDequeue(Msg)) + { + switch (Msg.MsgType) + { + case AsyncLogMessage::Type::Log: + { + ForwardLogToSinks(Msg); + break; + } + + case AsyncLogMessage::Type::Flush: + { + FlushSinks(); + if (Msg.FlushPromise) + { + Msg.FlushPromise->set_value(); + } + break; + } + + case AsyncLogMessage::Type::Shutdown: + { + m_Queue.CompleteAdding(); + + AsyncLogMessage Remaining; + while (m_Queue.WaitAndDequeue(Remaining)) + { + if (Remaining.MsgType == AsyncLogMessage::Type::Log) + { + ForwardLogToSinks(Remaining); + } + else if (Remaining.MsgType == AsyncLogMessage::Type::Flush) + { + FlushSinks(); + if (Remaining.FlushPromise) + { + Remaining.FlushPromise->set_value(); + } + } + } + + FlushSinks(); + return; + } + } + } + } + + std::vector<SinkPtr> m_Sinks; + BlockingQueue<AsyncLogMessage> m_Queue; + std::thread m_WorkerThread; +}; + +AsyncSink::AsyncSink(std::vector<SinkPtr> InSinks) : m_Impl(std::make_unique<Impl>(std::move(InSinks))) +{ +} + +AsyncSink::~AsyncSink() = default; + +void +AsyncSink::Log(const LogMessage& Msg) +{ + m_Impl->Log(Msg); +} + +void +AsyncSink::Flush() +{ + m_Impl->Flush(); +} + +void +AsyncSink::SetFormatter(std::unique_ptr<Formatter> InFormatter) +{ + m_Impl->SetFormatter(std::move(InFormatter)); +} + +} // namespace zen::logging |