diff options
| author | Stefan Boberg <[email protected]> | 2025-10-22 17:57:29 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-10-22 17:57:29 +0200 |
| commit | 5c139e2d8a260544bc5e730de0440edbab4b0f03 (patch) | |
| tree | b477208925fe3b373d4833460b90d61a8051cf05 /src/zentelemetry/include | |
| parent | 5.7.7-pre3 (diff) | |
| download | zen-5c139e2d8a260544bc5e730de0440edbab4b0f03.tar.xz zen-5c139e2d8a260544bc5e730de0440edbab4b0f03.zip | |
add support for OTLP logging/tracing (#599)
- adds `zentelemetry` project which houses new functionality for serializing logs and traces in OpenTelemetry Protocol format (OTLP)
- moved existing stats functionality from `zencore` to `zentelemetry`
- adds `TRefCounted<T>` for vtable-less refcounting
- adds `MemoryArena` class which allows for linear allocation of memory from chunks
- adds `protozero` which is used to encode OTLP protobuf messages
Diffstat (limited to 'src/zentelemetry/include')
| -rw-r--r-- | src/zentelemetry/include/zentelemetry/otlpencoder.h | 66 | ||||
| -rw-r--r-- | src/zentelemetry/include/zentelemetry/otlptrace.h | 268 | ||||
| -rw-r--r-- | src/zentelemetry/include/zentelemetry/stats.h | 356 | ||||
| -rw-r--r-- | src/zentelemetry/include/zentelemetry/zentelemetry.h | 9 |
4 files changed, 699 insertions, 0 deletions
diff --git a/src/zentelemetry/include/zentelemetry/otlpencoder.h b/src/zentelemetry/include/zentelemetry/otlpencoder.h new file mode 100644 index 000000000..ed6665781 --- /dev/null +++ b/src/zentelemetry/include/zentelemetry/otlpencoder.h @@ -0,0 +1,66 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/thread.h> +#include <zentelemetry/otlptrace.h> +#include <string> +#include <string_view> +#include <unordered_map> + +#if ZEN_WITH_OTEL + +# include <protozero/pbf_builder.hpp> +# include <protozero/types.hpp> + +namespace spdlog { namespace details { + struct log_msg; +}} // namespace spdlog::details + +namespace zen::otel { +enum class Resource : protozero::pbf_tag_type; +} + +namespace zen { + +/** + * Encoder for OpenTelemetry OTLP Protobuf format + * + * Designed to encode log messages and metrics into the OTLP Protobuf format + * for transmission to an OpenTelemetry collector or compatible backend. + * + * Currently, only log messages and basic resource attributes are supported. + * + * Some initial support for metrics encoding is also included. + * + * Transmission is expected to be handled externally, e.g. via HTTP client. + * + */ + +class OtlpEncoder +{ +public: + OtlpEncoder(); + ~OtlpEncoder(); + + void AddResourceAttribute(const std::string_view& Key, const std::string_view& Value); + void AddResourceAttribute(const std::string_view& Key, int64_t Value); + + std::string FormatOtelProtobuf(const spdlog::details::log_msg& Msg) const; + std::string FormatOtelMetrics() const; + std::string FormatOtelTrace(zen::otel::TraceId Trace, std::span<const zen::otel::Span*> Spans) const; + + OtlpEncoder& operator=(const OtlpEncoder&) = delete; + OtlpEncoder(const OtlpEncoder&) = delete; + +private: + mutable RwLock m_ResourceLock; + std::unordered_map<std::string, std::string> m_ResourceAttributes; + std::unordered_map<std::string, int64_t> m_ResourceIntAttributes; + + void AppendResourceAttributes(protozero::pbf_builder<otel::Resource>& Res) const; +}; + +} // namespace zen + +#endif diff --git a/src/zentelemetry/include/zentelemetry/otlptrace.h b/src/zentelemetry/include/zentelemetry/otlptrace.h new file mode 100644 index 000000000..ebecff91a --- /dev/null +++ b/src/zentelemetry/include/zentelemetry/otlptrace.h @@ -0,0 +1,268 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zenbase/refcount.h> +#include <zencore/uid.h> + +#include <span> +#include <string> + +#define ZEN_WITH_OTEL 1 + +#if ZEN_WITH_OTEL +# define ZEN_CONCAT(a, b) a##b +# define IMPL_ZEN_OTEL_SPAN(Name, Line) ::zen::otel::ScopedSpan ZEN_CONCAT(Span$, Line)(Name); +# define ZEN_OTEL_SPAN(Name) IMPL_ZEN_OTEL_SPAN(Name, __LINE__) +#else +# define ZEN_OTEL_SPAN(Name) +#endif + +namespace zen { +class MemoryArena; +} + +#if ZEN_WITH_OTEL + +namespace zen::otel { + +using AttributeList = std::span<std::pair<std::string, std::string>>; + +class Tracer; + +Tracer& GetTracer(); + +// OLTP Span ID + +struct SpanId +{ + constexpr static size_t kSize = 8; + + inline SpanId() : Id{0} {} + explicit SpanId(const std::span<const uint8_t, 8> Bytes) noexcept { std::copy(Bytes.begin(), Bytes.end(), Id); } + explicit SpanId(const Oid& InId) noexcept { memcpy(Id, reinterpret_cast<const uint8_t*>(InId.OidBits), sizeof Id); } + + auto operator<=>(const SpanId& Rhs) const = default; + inline operator bool() const { return *this != SpanId(); } + + static SpanId NewSpanId(); + + const char* GetData() const { return reinterpret_cast<const char*>(Id); } + +private: + uint8_t Id[kSize]; +}; + +// OLTP Trace ID + +struct TraceId +{ + constexpr static size_t kSize = 16; + + std::span<const uint8_t> GetBytes() const { return std::span<const uint8_t>(Id, kSize); } + + inline TraceId() noexcept : Id{0} {} + explicit TraceId(const std::span<const uint8_t, kSize> Bytes) noexcept { std::copy(Bytes.begin(), Bytes.end(), Id); } + + auto operator<=>(const TraceId& Rhs) const = default; + inline operator bool() const { return *this != TraceId(); } + + static TraceId NewTraceId(); + + inline const char* GetData() const { return reinterpret_cast<const char*>(Id); } + +private: + uint8_t Id[kSize]; +}; + +struct AttributePair +{ + const char* Key = nullptr; + union + { + const char* StringValue; + uint64_t NumericValue; + } Value; + + uint64_t Flags = 0; + + enum + { + kFlags_IsNumeric = 1 << 0, + kFlags_IsString = 1 << 1 + }; + + bool IsNumeric() const { return (Flags & kFlags_IsNumeric) != 0; } + uint64_t GetNumericValue() const { return Value.NumericValue; } + void SetNumericValue(uint64_t InValue) + { + Value.NumericValue = InValue; + Flags |= kFlags_IsNumeric; + } + + bool IsString() const { return (Flags & kFlags_IsString) != 0; } + const char* GetStringValue() const { return Value.StringValue; } + void SetStringValue(const char* InValue) + { + Value.StringValue = InValue; + Flags |= kFlags_IsString; + } + + AttributePair* Next = nullptr; +}; + +struct Event +{ + Event* NextEvent = nullptr; + uint64_t Timestamp = 0; + const char* Name = nullptr; + AttributePair* Attributes = nullptr; +}; + +/** Span within a trace + * + * A span represents a single operation within a trace. Spans can be nested + * to form a trace tree. + * + */ + +struct Span final : public TRefCounted<Span> +{ + Span& operator=(const Span&) = delete; + Span(const Span&) = delete; + + void AddEvent(std::string_view Name); + void AddEvent(std::string_view Name, uint64_t Timestamp); + void AddEvent(std::string_view Name, const AttributeList& Attributes); + + void AddAttribute(std::string_view Key, std::string_view Value); + void AddAttribute(std::string_view Key, uint64_t Value); + void AddAttributes(const AttributeList& Attributes); + + void End(); + + enum class Kind : uint8_t // This must match otel::SpanKind + { + kInternal = 1, + kServer = 2, + kClient = 3, + kProducer = 4, + kConsumer = 5 + }; + + Kind GetKind() const { return m_Kind; } + void SetKind(Kind InKind) { m_Kind = InKind; } + + SpanId GetSpanId() const { return m_SpanId; } + const char* GetName() const { return m_Name; } + uint64_t GetStartTime() const { return m_StartTime; } + uint64_t GetEndTime() const { return m_EndTime; } + const Span* GetParentSpan() const { return m_ParentSpan; } + const AttributePair* GetAttributes() const { return m_Attributes; } + const Event* GetEvents() const { return m_Events; } + + // This is used to get the current span/trace ID for logging purposes + static SpanId GetCurrentSpanId(TraceId& OutTraceId); + static Span* GetCurrentSpan(); + +protected: + void* operator new(size_t Size) = delete; + void* operator new(size_t Size, MemoryArena& Arena); + void operator delete(void* Ptr) = delete; + void operator delete(void* Ptr, MemoryArena& Arena); + + friend class ScopedSpan; + friend class Tracer; + friend class TRefCounted<Span>; + + void DeleteThis() noexcept { End(); } + +private: + // Note: for now there's no locking on the span. To support spans across + // threads, we'll need to add locking around event addition and ending + // and linking of child spans etc + + MemoryArena& m_Arena; + Span* m_NextSibling = nullptr; + Span* m_FirstChild = nullptr; + Span* m_ParentSpan = nullptr; + Span* m_SpanChain = nullptr; + Event* m_Events = nullptr; + AttributePair* m_Attributes = nullptr; + SpanId m_SpanId; + const char* m_Name = nullptr; + uint64_t m_StartTime = 0; + uint64_t m_EndTime = 0; + Kind m_Kind = Kind::kInternal; + bool m_Ended = false; + + Span(MemoryArena& Arena, std::string_view Name, Span* SpanChain); + ~Span(); +}; + +/** Scoped span helper + * + * Automatically ends the span when it goes out of scope + */ + +class ScopedSpan final +{ +public: + ScopedSpan(std::string_view Name); + ScopedSpan() = delete; + ~ScopedSpan(); + + ScopedSpan& operator=(ScopedSpan&& Rhs) = default; + ScopedSpan& operator=(const ScopedSpan& Rhs) = default; + ScopedSpan(ScopedSpan&& Rhs) = default; + ScopedSpan(const ScopedSpan& Rhs) = default; + + Span* operator->() const { return m_Span.Get(); } + +private: + ScopedSpan(Span* InSpan, Tracer* InTracer); + + Ref<Tracer> m_Tracer; // This needs to precede the span ref to ensure proper destruction order + Ref<Span> m_Span; + + friend class Tracer; +}; + +/** + * Tracer for creating spans and managing trace state. + * + * This represents a single trace context identified by a trace ID, + * and is at the root of all spans created within that trace. + * + */ + +class Tracer final : public RefCounted +{ +public: + static ScopedSpan CreateSpan(std::string_view Name); + +private: + struct Impl; + Impl* m_Impl; + + Tracer(); + ~Tracer(); + + static Tracer* GetTracer(); + + friend class ScopedSpan; +}; + +class TraceRecorder : public RefCounted +{ +public: + virtual void RecordSpans(TraceId Trace, std::span<const Span*> Spans) = 0; +}; + +void SetTraceRecorder(Ref<TraceRecorder> Recorder); +bool IsRecording(); + +void otlptrace_forcelink(); + +} // namespace zen::otel +#endif diff --git a/src/zentelemetry/include/zentelemetry/stats.h b/src/zentelemetry/include/zentelemetry/stats.h new file mode 100644 index 000000000..3e67bac1c --- /dev/null +++ b/src/zentelemetry/include/zentelemetry/stats.h @@ -0,0 +1,356 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include "zentelemetry.h" + +#include <zenbase/concepts.h> + +#include <atomic> +#include <string_view> +#include <vector> + +namespace zen { +class CbObjectWriter; +} + +namespace zen::metrics { + +template<typename T> +class Gauge +{ +public: + Gauge() : m_Value{0} {} + + T Value() const { return m_Value; } + void SetValue(T Value) { m_Value = Value; } + +private: + std::atomic<T> m_Value; +}; + +/** Stats counter + * + * A counter is modified by adding or subtracting a value from a current value. + * This would typically be used to track number of requests in flight, number + * of active jobs etc + * + */ +class Counter +{ +public: + inline void SetValue(uint64_t Value) { m_count = Value; } + inline uint64_t Value() const { return m_count; } + + inline void Increment(int64_t AddValue) { m_count.fetch_add(AddValue); } + inline void Decrement(int64_t SubValue) { m_count.fetch_sub(SubValue); } + inline void Clear() { m_count.store(0, std::memory_order_release); } + +private: + std::atomic<uint64_t> m_count{0}; +}; + +/** Exponential Weighted Moving Average + + This is very raw, to use as little state as possible. If we + want to use this more broadly in user code we should perhaps + add a more user-friendly wrapper + */ + +class RawEWMA +{ +public: + /// <summary> + /// Update EWMA with new measure + /// </summary> + /// <param name="Alpha">Smoothing factor (between 0 and 1)</param> + /// <param name="Interval">Elapsed time since last</param> + /// <param name="Count">Value</param> + /// <param name="IsInitialUpdate">Whether this is the first update or not</param> + void Tick(double Alpha, uint64_t Interval, uint64_t Count, bool IsInitialUpdate); + double Rate() const; + +private: + std::atomic<double> m_Rate = 0; +}; + +/// <summary> +/// Tracks rate of events over time (i.e requests/sec), using +/// exponential moving averages +/// </summary> +class Meter +{ +public: + Meter(); + ~Meter(); + + inline uint64_t Count() const { return m_TotalCount; } + double Rate1(); // One-minute rate + double Rate5(); // Five-minute rate + double Rate15(); // Fifteen-minute rate + double MeanRate() const; // Mean rate since instantiation of this meter + void Mark(uint64_t Count = 1); // Register one or more events + +private: + std::atomic<uint64_t> m_TotalCount{0}; // Accumulator counting number of marks since beginning + std::atomic<uint64_t> m_PendingCount{0}; // Pending EWMA update accumulator + std::atomic<uint64_t> m_StartTick{0}; // Time this was instantiated (for mean) + std::atomic<uint64_t> m_LastTick{0}; // Timestamp of last EWMA tick + std::atomic<int64_t> m_Remainder{0}; // Tracks the "modulo" of tick time + bool m_IsFirstTick = true; + RawEWMA m_RateM1; + RawEWMA m_RateM5; + RawEWMA m_RateM15; + + void TickIfNecessary(); + void Tick(); +}; + +/** Moment-in-time snapshot of a distribution + */ +class SampleSnapshot +{ +public: + SampleSnapshot(std::vector<double>&& Values); + ~SampleSnapshot(); + + uint32_t Size() const { return (uint32_t)m_Values.size(); } + double GetQuantileValue(double Quantile); + double GetMedian() { return GetQuantileValue(0.5); } + double Get75Percentile() { return GetQuantileValue(0.75); } + double Get95Percentile() { return GetQuantileValue(0.95); } + double Get98Percentile() { return GetQuantileValue(0.98); } + double Get99Percentile() { return GetQuantileValue(0.99); } + double Get999Percentile() { return GetQuantileValue(0.999); } + const std::vector<double>& GetValues() const; + +private: + std::vector<double> m_Values; +}; + +/** Randomly selects samples from a stream. Uses Vitter's + Algorithm R to produce a statistically representative sample. + + http://www.cs.umd.edu/~samir/498/vitter.pdf - Random Sampling with a Reservoir + */ + +class UniformSample +{ +public: + UniformSample(uint32_t ReservoirSize); + ~UniformSample(); + + void Clear(); + uint32_t Size() const; + void Update(int64_t Value); + SampleSnapshot Snapshot() const; + + template<Invocable<int64_t> T> + void IterateValues(T Callback) const + { + for (const auto& Value : m_Values) + { + Callback(Value); + } + } + +private: + std::atomic<uint64_t> m_SampleCounter{0}; + std::vector<std::atomic<int64_t>> m_Values; +}; + +/** Track (probabilistic) sample distribution along with min/max + */ +class Histogram +{ +public: + Histogram(int32_t SampleCount = 1028); + ~Histogram(); + + void Clear(); + void Update(int64_t Value); + int64_t Max() const; + int64_t Min() const; + double Mean() const; + uint64_t Count() const; + SampleSnapshot Snapshot() const { return m_Sample.Snapshot(); } + +private: + UniformSample m_Sample; + std::atomic<int64_t> m_Min{0}; + std::atomic<int64_t> m_Max{0}; + std::atomic<int64_t> m_Sum{0}; + std::atomic<int64_t> m_Count{0}; +}; + +/** Track timing and frequency of some operation + + Example usage would be to track frequency and duration of network + requests, or function calls. + + */ +class OperationTiming +{ +public: + OperationTiming(int32_t SampleCount = 514); + ~OperationTiming(); + + void Update(int64_t Duration); + int64_t Max() const; + int64_t Min() const; + double Mean() const; + uint64_t Count() const; + SampleSnapshot Snapshot() const { return m_Histogram.Snapshot(); } + + double Rate1() { return m_Meter.Rate1(); } + double Rate5() { return m_Meter.Rate5(); } + double Rate15() { return m_Meter.Rate15(); } + double MeanRate() const { return m_Meter.MeanRate(); } + + struct Scope + { + Scope(OperationTiming& Outer); + ~Scope(); + + void Stop(); + void Cancel(); + + private: + OperationTiming& m_Outer; + uint64_t m_StartTick; + }; + +private: + Meter m_Meter; + Histogram m_Histogram; +}; + +struct MeterSnapshot +{ + uint64_t Count; + double MeanRate; + double Rate1; + double Rate5; + double Rate15; +}; + +struct HistogramSnapshot +{ + double Count; + double Avg; + double Min; + double Max; + double P75; + double P95; + double P99; + double P999; +}; + +struct StatsSnapshot +{ + MeterSnapshot Meter; + HistogramSnapshot Histogram; +}; + +struct RequestStatsSnapshot +{ + StatsSnapshot Requests; + StatsSnapshot Bytes; +}; + +/** Metrics for network requests + + Aggregates tracking of duration, payload sizes into a single + class + + */ +class RequestStats +{ +public: + RequestStats(int32_t SampleCount = 514); + ~RequestStats(); + + void Update(int64_t Duration, int64_t Bytes); + uint64_t Count() const; + + // Timing + + int64_t MaxDuration() const { return m_BytesHistogram.Max(); } + int64_t MinDuration() const { return m_BytesHistogram.Min(); } + double MeanDuration() const { return m_BytesHistogram.Mean(); } + SampleSnapshot DurationSnapshot() const { return m_RequestTimeHistogram.Snapshot(); } + double Rate1() { return m_RequestMeter.Rate1(); } + double Rate5() { return m_RequestMeter.Rate5(); } + double Rate15() { return m_RequestMeter.Rate15(); } + double MeanRate() const { return m_RequestMeter.MeanRate(); } + + // Bytes + + int64_t MaxBytes() const { return m_BytesHistogram.Max(); } + int64_t MinBytes() const { return m_BytesHistogram.Min(); } + double MeanBytes() const { return m_BytesHistogram.Mean(); } + SampleSnapshot BytesSnapshot() const { return m_BytesHistogram.Snapshot(); } + double ByteRate1() { return m_BytesMeter.Rate1(); } + double ByteRate5() { return m_BytesMeter.Rate5(); } + double ByteRate15() { return m_BytesMeter.Rate15(); } + double ByteMeanRate() const { return m_BytesMeter.MeanRate(); } + + struct Scope + { + Scope(RequestStats& Outer, int64_t Bytes); + ~Scope(); + + void SetBytes(int64_t Bytes) { m_Bytes = Bytes; } + void Stop(); + void Cancel(); + + private: + RequestStats& m_Outer; + uint64_t m_StartTick; + int64_t m_Bytes; + }; + + void EmitSnapshot(std::string_view Tag, CbObjectWriter& Cbo); + + RequestStatsSnapshot Snapshot(); + +private: + static StatsSnapshot GetSnapshot(Meter& M, Histogram& H, double ConversionFactor) + { + SampleSnapshot Snap = H.Snapshot(); + return StatsSnapshot{ + .Meter = {.Count = M.Count(), .MeanRate = M.MeanRate(), .Rate1 = M.Rate1(), .Rate5 = M.Rate5(), .Rate15 = M.Rate15()}, + .Histogram = {.Count = H.Count() * ConversionFactor, + .Avg = H.Mean() * ConversionFactor, + .Min = H.Min() * ConversionFactor, + .Max = H.Max() * ConversionFactor, + .P75 = Snap.Get75Percentile() * ConversionFactor, + .P95 = Snap.Get95Percentile() * ConversionFactor, + .P99 = Snap.Get99Percentile() * ConversionFactor, + .P999 = Snap.Get999Percentile() * ConversionFactor}}; + } + + Meter m_RequestMeter; + Meter m_BytesMeter; + Histogram m_RequestTimeHistogram; + Histogram m_BytesHistogram; +}; + +void EmitSnapshot(std::string_view Tag, OperationTiming& Stat, CbObjectWriter& Cbo); +void EmitSnapshot(std::string_view Tag, const Histogram& Stat, CbObjectWriter& Cbo, double ConversionFactor); +void EmitSnapshot(std::string_view Tag, Meter& Stat, CbObjectWriter& Cbo); + +void EmitSnapshot(const Histogram& Stat, CbObjectWriter& Cbo, double ConversionFactor); + +void EmitSnapshot(std::string_view Tag, const MeterSnapshot& Snapshot, CbObjectWriter& Cbo); +void EmitSnapshot(std::string_view Tag, const HistogramSnapshot& Snapshot, CbObjectWriter& Cbo); +void EmitSnapshot(std::string_view Tag, const StatsSnapshot& Snapshot, CbObjectWriter& Cbo); +void EmitSnapshot(std::string_view Tag, const RequestStatsSnapshot& Snapshot, CbObjectWriter& Cbo); + +} // namespace zen::metrics + +namespace zen { + +extern void stats_forcelink(); + +} // namespace zen diff --git a/src/zentelemetry/include/zentelemetry/zentelemetry.h b/src/zentelemetry/include/zentelemetry/zentelemetry.h new file mode 100644 index 000000000..fc811974b --- /dev/null +++ b/src/zentelemetry/include/zentelemetry/zentelemetry.h @@ -0,0 +1,9 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +namespace zen { + +void zentelemetry_forcelinktests(); + +} |