diff options
| author | Stefan Boberg <[email protected]> | 2026-03-23 12:54:14 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2026-03-23 12:54:14 +0100 |
| commit | 8e2c307bdb501db0ab0ce2d51bc61b552855ee11 (patch) | |
| tree | 8f9be7e926bc555318a68794ee75ad5ad0dd979f /src | |
| parent | Logger simplification (#883) (diff) | |
| download | zen-8e2c307bdb501db0ab0ce2d51bc61b552855ee11.tar.xz zen-8e2c307bdb501db0ab0ce2d51bc61b552855ee11.zip | |
Unique session/client tracking using HyperLogLog (#884)
## Summary
Adds probabilistic cardinality estimation for tracking unique HTTP clients and sessions using a HyperLogLog implementation.
- Add a `HyperLogLog<Precision>` template in `zentelemetry` with thread-safe lock-free register updates, merge support, and XXH3 hashing
- Feed client IP addresses (via raw bytes) and session IDs (via `Oid` bytes) into their respective HyperLogLog estimators from both the ASIO and http.sys server backends
- Emit `distinct_clients` and `distinct_sessions` cardinality estimates in HTTP `CollectStats()`
- Add tests covering empty, single, duplicates, accuracy, merge, and clear scenarios
## Why HyperLogLog
Tracking exact unique counts would require storing every observed IP or session ID. HyperLogLog provides a memory-bounded probabilistic estimate (~1–2% error) using only a few KB of memory regardless of traffic volume.
Diffstat (limited to 'src')
| -rw-r--r-- | src/zenhttp/httpserver.cpp | 3 | ||||
| -rw-r--r-- | src/zenhttp/include/zenhttp/httpserver.h | 16 | ||||
| -rw-r--r-- | src/zenhttp/servers/httpasio.cpp | 28 | ||||
| -rw-r--r-- | src/zenhttp/servers/httpsys.cpp | 17 | ||||
| -rw-r--r-- | src/zentelemetry/hyperloglog.cpp | 127 | ||||
| -rw-r--r-- | src/zentelemetry/include/zentelemetry/hyperloglog.h | 165 | ||||
| -rw-r--r-- | src/zentelemetry/zentelemetry.cpp | 2 |
7 files changed, 356 insertions, 2 deletions
diff --git a/src/zenhttp/httpserver.cpp b/src/zenhttp/httpserver.cpp index e5cfbcbae..a46c5b851 100644 --- a/src/zenhttp/httpserver.cpp +++ b/src/zenhttp/httpserver.cpp @@ -988,6 +988,9 @@ HttpServer::CollectStats() } Cbo.EndObject(); + Cbo << "distinct_clients" << m_ClientAddresses.Count(); + Cbo << "distinct_sessions" << m_ClientSessions.Count(); + Cbo.BeginObject("websockets"); { Cbo << "active_connections" << GetActiveWebSocketConnectionCount(); diff --git a/src/zenhttp/include/zenhttp/httpserver.h b/src/zenhttp/include/zenhttp/httpserver.h index a7d7f4d9c..633eb06be 100644 --- a/src/zenhttp/include/zenhttp/httpserver.h +++ b/src/zenhttp/include/zenhttp/httpserver.h @@ -13,6 +13,7 @@ #include <zencore/uid.h> #include <zenhttp/httpcommon.h> +#include <zentelemetry/hyperloglog.h> #include <zentelemetry/stats.h> #include <filesystem> @@ -265,6 +266,19 @@ public: /** Mark that a request has been handled. Called by server implementations. */ void MarkRequest() { m_RequestMeter.Mark(); } + /** Record a client address for distinct-client tracking. Pass the raw + * address bytes (4 bytes for IPv4, 16 for IPv6) to avoid string conversion. */ + void MarkClientAddress(const void* AddressBytes, size_t Size) { m_ClientAddresses.Add(AddressBytes, Size); } + + /** Record a session ID for distinct-session tracking. */ + void MarkSessionId(const Oid& SessionId) + { + if (SessionId) + { + m_ClientSessions.Add(&SessionId.OidBits, sizeof(SessionId.OidBits)); + } + } + /** Set a default redirect path for root requests */ void SetDefaultRedirect(std::string_view Path) { m_DefaultRedirect = Path; } @@ -297,6 +311,8 @@ private: int m_EffectiveHttpsPort = 0; std::string m_ExternalHost; metrics::Meter m_RequestMeter; + metrics::HyperLogLog<12> m_ClientAddresses; // ~4 KiB, ~1.6% error — sufficient for client counting + metrics::HyperLogLog<12> m_ClientSessions; std::string m_DefaultRedirect; std::atomic<uint64_t> m_ActiveWebSocketConnections{0}; std::atomic<uint64_t> m_WsFramesReceived{0}; diff --git a/src/zenhttp/servers/httpasio.cpp b/src/zenhttp/servers/httpasio.cpp index a2cae8762..7972777b8 100644 --- a/src/zenhttp/servers/httpasio.cpp +++ b/src/zenhttp/servers/httpasio.cpp @@ -1330,14 +1330,36 @@ HttpServerConnectionT<SocketType>::HandleRequest() { auto RemoteEndpoint = m_Socket->remote_endpoint(); IsLocalConnection = m_Socket->local_endpoint().address() == RemoteEndpoint.address(); - RemoteAddress = RemoteEndpoint.address().to_string(); + auto Addr = RemoteEndpoint.address(); + RemoteAddress = Addr.to_string(); + if (Addr.is_v4()) + { + auto Bytes = Addr.to_v4().to_bytes(); + m_Server.m_HttpServer->MarkClientAddress(Bytes.data(), Bytes.size()); + } + else + { + auto Bytes = Addr.to_v6().to_bytes(); + m_Server.m_HttpServer->MarkClientAddress(Bytes.data(), Bytes.size()); + } } #if ZEN_USE_OPENSSL else if constexpr (std::is_same_v<SocketType, SslSocket>) { auto RemoteEndpoint = m_Socket->lowest_layer().remote_endpoint(); IsLocalConnection = m_Socket->lowest_layer().local_endpoint().address() == RemoteEndpoint.address(); - RemoteAddress = RemoteEndpoint.address().to_string(); + auto Addr = RemoteEndpoint.address(); + RemoteAddress = Addr.to_string(); + if (Addr.is_v4()) + { + auto Bytes = Addr.to_v4().to_bytes(); + m_Server.m_HttpServer->MarkClientAddress(Bytes.data(), Bytes.size()); + } + else + { + auto Bytes = Addr.to_v6().to_bytes(); + m_Server.m_HttpServer->MarkClientAddress(Bytes.data(), Bytes.size()); + } } #endif else @@ -1345,6 +1367,8 @@ HttpServerConnectionT<SocketType>::HandleRequest() RemoteAddress = "unix"; } + m_Server.m_HttpServer->MarkSessionId(m_RequestData.SessionId()); + HttpAsioServerRequest Request(m_RequestData, *Service, m_RequestData.Body(), diff --git a/src/zenhttp/servers/httpsys.cpp b/src/zenhttp/servers/httpsys.cpp index 9fe9a2254..2cad97725 100644 --- a/src/zenhttp/servers/httpsys.cpp +++ b/src/zenhttp/servers/httpsys.cpp @@ -2020,6 +2020,23 @@ HttpSysTransaction::InvokeRequestHandler(HttpService& Service, IoBuffer Payload) m_HttpServer.MarkRequest(); + // Track distinct client addresses + { + const SOCKADDR* SockAddr = HttpRequest()->Address.pRemoteAddress; + if (SockAddr->sa_family == AF_INET) + { + const SOCKADDR_IN* V4 = reinterpret_cast<const SOCKADDR_IN*>(SockAddr); + m_HttpServer.MarkClientAddress(&V4->sin_addr, sizeof(V4->sin_addr)); + } + else if (SockAddr->sa_family == AF_INET6) + { + const SOCKADDR_IN6* V6 = reinterpret_cast<const SOCKADDR_IN6*>(SockAddr); + m_HttpServer.MarkClientAddress(&V6->sin6_addr, sizeof(V6->sin6_addr)); + } + } + + m_HttpServer.MarkSessionId(ThisRequest.SessionId()); + // Default request handling # if ZEN_WITH_OTEL diff --git a/src/zentelemetry/hyperloglog.cpp b/src/zentelemetry/hyperloglog.cpp new file mode 100644 index 000000000..3a06fc59f --- /dev/null +++ b/src/zentelemetry/hyperloglog.cpp @@ -0,0 +1,127 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <zentelemetry/hyperloglog.h> + +namespace zen { +void +hyperloglog_forcelink() +{ +} +} // namespace zen + +#if ZEN_WITH_TESTS + +# include <zencore/testing.h> + +# include <fmt/format.h> + +# include <set> +# include <string> + +namespace zen::tests { + +TEST_SUITE_BEGIN("telemetry.hyperloglog"); + +TEST_CASE("hyperloglog.empty") +{ + metrics::HyperLogLog<> Hll; + CHECK_EQ(Hll.Count(), 0); +} + +TEST_CASE("hyperloglog.single_element") +{ + metrics::HyperLogLog<> Hll; + Hll.Add("hello"); + CHECK(Hll.Count() >= 1); + CHECK(Hll.Count() <= 2); +} + +TEST_CASE("hyperloglog.duplicates") +{ + metrics::HyperLogLog<> Hll; + for (int i = 0; i < 1000; ++i) + { + Hll.Add("same_value"); + } + CHECK_EQ(Hll.Count(), 1); +} + +TEST_CASE("hyperloglog.estimate_accuracy") +{ + // With precision 14, expected error is ~0.81% + // Use a generous margin (5%) for the test to be reliable + constexpr uint64_t N = 100000; + + metrics::HyperLogLog<14> Hll; + for (uint64_t i = 0; i < N; ++i) + { + std::string Value = fmt::format("item_{}", i); + Hll.Add(Value); + } + + uint64_t Estimate = Hll.Count(); + double Error = std::abs(static_cast<double>(Estimate) - static_cast<double>(N)) / static_cast<double>(N); + + CHECK(Error < 0.05); +} + +TEST_CASE("hyperloglog.small_cardinality") +{ + metrics::HyperLogLog<> Hll; + for (int i = 0; i < 10; ++i) + { + std::string Value = fmt::format("key_{}", i); + Hll.Add(Value); + } + + uint64_t Estimate = Hll.Count(); + CHECK(Estimate >= 8); + CHECK(Estimate <= 12); +} + +TEST_CASE("hyperloglog.clear") +{ + metrics::HyperLogLog<> Hll; + for (int i = 0; i < 100; ++i) + { + std::string Value = fmt::format("v_{}", i); + Hll.Add(Value); + } + CHECK(Hll.Count() > 0); + + Hll.Clear(); + CHECK_EQ(Hll.Count(), 0); +} + +TEST_CASE("hyperloglog.merge") +{ + constexpr uint64_t N = 50000; + + metrics::HyperLogLog<14> A; + metrics::HyperLogLog<14> B; + + // A gets even numbers, B gets odd numbers + for (uint64_t i = 0; i < N; ++i) + { + std::string Value = fmt::format("item_{}", i * 2); + A.Add(Value); + } + for (uint64_t i = 0; i < N; ++i) + { + std::string Value = fmt::format("item_{}", i * 2 + 1); + B.Add(Value); + } + + A.Merge(B); + + uint64_t Estimate = A.Count(); + double Error = std::abs(static_cast<double>(Estimate) - static_cast<double>(N * 2)) / static_cast<double>(N * 2); + + CHECK(Error < 0.05); +} + +TEST_SUITE_END(); + +} // namespace zen::tests + +#endif // ZEN_WITH_TESTS diff --git a/src/zentelemetry/include/zentelemetry/hyperloglog.h b/src/zentelemetry/include/zentelemetry/hyperloglog.h new file mode 100644 index 000000000..2daf75a43 --- /dev/null +++ b/src/zentelemetry/include/zentelemetry/hyperloglog.h @@ -0,0 +1,165 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include "zentelemetry.h" + +#include <zencore/intmath.h> +#include <zencore/xxhash.h> + +#include <array> +#include <atomic> +#include <cstdint> +#include <string_view> + +namespace zen::metrics { + +/** HyperLogLog cardinality estimator. + * + * Estimates the number of distinct elements in a stream using O(2^Precision) + * bytes of memory. The relative error is approximately 1.04 / sqrt(2^Precision). + * + * @tparam Precision Number of bits used for register indexing (4..16). + * Higher precision = more memory but lower error. + * Default 14 gives 16384 registers (~16 KiB) and ~0.81% error. + * + * All operations are thread-safe. Add() uses relaxed atomic compare-exchange + * so concurrent updates are lock-free but may occasionally lose a race (which + * only means a slightly delayed register update, not a correctness issue). + */ +template<uint8_t Precision = 14> +class HyperLogLog +{ + static_assert(Precision >= 4 && Precision <= 16, "Precision must be between 4 and 16"); + + static constexpr uint32_t RegisterCount = 1u << Precision; + static constexpr uint64_t IndexMask = RegisterCount - 1; + +public: + HyperLogLog() { Clear(); } + + /** Add a raw byte sequence to the sketch. */ + void Add(const void* Data, size_t Size) + { + XXH3_128 Hash = XXH3_128::HashMemory(Data, Size); + AddHash(Hash); + } + + /** Add a string value to the sketch. */ + void Add(std::string_view Value) { Add(Value.data(), Value.size()); } + + /** Add a pre-computed hash to the sketch. */ + void AddHash(const XXH3_128& Hash) + { + // Use the first 8 bytes of the hash as our 64-bit value + uint64_t H; + memcpy(&H, Hash.Hash, sizeof(H)); + + uint32_t Index = static_cast<uint32_t>(H >> (64 - Precision)) & IndexMask; + uint64_t W = H << Precision; + + // Count leading zeros of W + 1 (the +1 ensures we never store 0) + uint8_t Rank = static_cast<uint8_t>(CountLeadingZeros64(W) + 1); + + // Atomically update the register to the maximum observed rank + uint8_t Current = m_Registers[Index].load(std::memory_order_relaxed); + while (Rank > Current) + { + if (m_Registers[Index].compare_exchange_weak(Current, Rank, std::memory_order_relaxed)) + { + break; + } + } + } + + /** Estimate the number of distinct elements added. */ + double Estimate() const + { + // Compute the raw harmonic mean estimate + double Sum = 0.0; + uint32_t ZeroCount = 0; + + for (uint32_t i = 0; i < RegisterCount; ++i) + { + uint8_t Val = m_Registers[i].load(std::memory_order_relaxed); + Sum += 1.0 / static_cast<double>(1ull << Val); + if (Val == 0) + { + ++ZeroCount; + } + } + + double Alpha = AlphaM(); + double RawEstimate = Alpha * RegisterCount * RegisterCount / Sum; + + // Small range correction using LinearCounting + if (RawEstimate <= 2.5 * RegisterCount && ZeroCount > 0) + { + return RegisterCount * std::log(static_cast<double>(RegisterCount) / ZeroCount); + } + + return RawEstimate; + } + + /** Return the estimate as an integer. */ + uint64_t Count() const + { + double E = Estimate(); + return E < 0.5 ? 0 : static_cast<uint64_t>(E + 0.5); + } + + /** Reset all registers to zero. */ + void Clear() + { + for (uint32_t i = 0; i < RegisterCount; ++i) + { + m_Registers[i].store(0, std::memory_order_relaxed); + } + } + + /** Merge another HyperLogLog sketch into this one (union). */ + void Merge(const HyperLogLog& Other) + { + for (uint32_t i = 0; i < RegisterCount; ++i) + { + uint8_t OtherVal = Other.m_Registers[i].load(std::memory_order_relaxed); + uint8_t Current = m_Registers[i].load(std::memory_order_relaxed); + while (OtherVal > Current) + { + if (m_Registers[i].compare_exchange_weak(Current, OtherVal, std::memory_order_relaxed)) + { + break; + } + } + } + } + +private: + std::array<std::atomic<uint8_t>, RegisterCount> m_Registers; + + static double AlphaM() + { + if constexpr (Precision == 4) + { + return 0.673; + } + else if constexpr (Precision == 5) + { + return 0.697; + } + else if constexpr (Precision == 6) + { + return 0.709; + } + else + { + return 0.7213 / (1.0 + 1.079 / RegisterCount); + } + } +}; + +} // namespace zen::metrics + +namespace zen { +void hyperloglog_forcelink(); +} // namespace zen diff --git a/src/zentelemetry/zentelemetry.cpp b/src/zentelemetry/zentelemetry.cpp index ed6ad13b9..63af8a450 100644 --- a/src/zentelemetry/zentelemetry.cpp +++ b/src/zentelemetry/zentelemetry.cpp @@ -2,6 +2,7 @@ #include "zentelemetry/zentelemetry.h" +#include "zentelemetry/hyperloglog.h" #include "zentelemetry/otlptrace.h" #include "zentelemetry/stats.h" @@ -11,6 +12,7 @@ void zentelemetry_forcelinktests() { zen::stats_forcelink(); + zen::hyperloglog_forcelink(); #if ZEN_WITH_OTEL zen::otel::otlptrace_forcelink(); #endif |