aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2026-03-23 12:54:14 +0100
committerGitHub Enterprise <[email protected]>2026-03-23 12:54:14 +0100
commit8e2c307bdb501db0ab0ce2d51bc61b552855ee11 (patch)
tree8f9be7e926bc555318a68794ee75ad5ad0dd979f /src
parentLogger simplification (#883) (diff)
downloadzen-8e2c307bdb501db0ab0ce2d51bc61b552855ee11.tar.xz
zen-8e2c307bdb501db0ab0ce2d51bc61b552855ee11.zip
Unique session/client tracking using HyperLogLog (#884)
## Summary Adds probabilistic cardinality estimation for tracking unique HTTP clients and sessions using a HyperLogLog implementation. - Add a `HyperLogLog<Precision>` template in `zentelemetry` with thread-safe lock-free register updates, merge support, and XXH3 hashing - Feed client IP addresses (via raw bytes) and session IDs (via `Oid` bytes) into their respective HyperLogLog estimators from both the ASIO and http.sys server backends - Emit `distinct_clients` and `distinct_sessions` cardinality estimates in HTTP `CollectStats()` - Add tests covering empty, single, duplicates, accuracy, merge, and clear scenarios ## Why HyperLogLog Tracking exact unique counts would require storing every observed IP or session ID. HyperLogLog provides a memory-bounded probabilistic estimate (~1–2% error) using only a few KB of memory regardless of traffic volume.
Diffstat (limited to 'src')
-rw-r--r--src/zenhttp/httpserver.cpp3
-rw-r--r--src/zenhttp/include/zenhttp/httpserver.h16
-rw-r--r--src/zenhttp/servers/httpasio.cpp28
-rw-r--r--src/zenhttp/servers/httpsys.cpp17
-rw-r--r--src/zentelemetry/hyperloglog.cpp127
-rw-r--r--src/zentelemetry/include/zentelemetry/hyperloglog.h165
-rw-r--r--src/zentelemetry/zentelemetry.cpp2
7 files changed, 356 insertions, 2 deletions
diff --git a/src/zenhttp/httpserver.cpp b/src/zenhttp/httpserver.cpp
index e5cfbcbae..a46c5b851 100644
--- a/src/zenhttp/httpserver.cpp
+++ b/src/zenhttp/httpserver.cpp
@@ -988,6 +988,9 @@ HttpServer::CollectStats()
}
Cbo.EndObject();
+ Cbo << "distinct_clients" << m_ClientAddresses.Count();
+ Cbo << "distinct_sessions" << m_ClientSessions.Count();
+
Cbo.BeginObject("websockets");
{
Cbo << "active_connections" << GetActiveWebSocketConnectionCount();
diff --git a/src/zenhttp/include/zenhttp/httpserver.h b/src/zenhttp/include/zenhttp/httpserver.h
index a7d7f4d9c..633eb06be 100644
--- a/src/zenhttp/include/zenhttp/httpserver.h
+++ b/src/zenhttp/include/zenhttp/httpserver.h
@@ -13,6 +13,7 @@
#include <zencore/uid.h>
#include <zenhttp/httpcommon.h>
+#include <zentelemetry/hyperloglog.h>
#include <zentelemetry/stats.h>
#include <filesystem>
@@ -265,6 +266,19 @@ public:
/** Mark that a request has been handled. Called by server implementations. */
void MarkRequest() { m_RequestMeter.Mark(); }
+ /** Record a client address for distinct-client tracking. Pass the raw
+ * address bytes (4 bytes for IPv4, 16 for IPv6) to avoid string conversion. */
+ void MarkClientAddress(const void* AddressBytes, size_t Size) { m_ClientAddresses.Add(AddressBytes, Size); }
+
+ /** Record a session ID for distinct-session tracking. */
+ void MarkSessionId(const Oid& SessionId)
+ {
+ if (SessionId)
+ {
+ m_ClientSessions.Add(&SessionId.OidBits, sizeof(SessionId.OidBits));
+ }
+ }
+
/** Set a default redirect path for root requests */
void SetDefaultRedirect(std::string_view Path) { m_DefaultRedirect = Path; }
@@ -297,6 +311,8 @@ private:
int m_EffectiveHttpsPort = 0;
std::string m_ExternalHost;
metrics::Meter m_RequestMeter;
+ metrics::HyperLogLog<12> m_ClientAddresses; // ~4 KiB, ~1.6% error — sufficient for client counting
+ metrics::HyperLogLog<12> m_ClientSessions;
std::string m_DefaultRedirect;
std::atomic<uint64_t> m_ActiveWebSocketConnections{0};
std::atomic<uint64_t> m_WsFramesReceived{0};
diff --git a/src/zenhttp/servers/httpasio.cpp b/src/zenhttp/servers/httpasio.cpp
index a2cae8762..7972777b8 100644
--- a/src/zenhttp/servers/httpasio.cpp
+++ b/src/zenhttp/servers/httpasio.cpp
@@ -1330,14 +1330,36 @@ HttpServerConnectionT<SocketType>::HandleRequest()
{
auto RemoteEndpoint = m_Socket->remote_endpoint();
IsLocalConnection = m_Socket->local_endpoint().address() == RemoteEndpoint.address();
- RemoteAddress = RemoteEndpoint.address().to_string();
+ auto Addr = RemoteEndpoint.address();
+ RemoteAddress = Addr.to_string();
+ if (Addr.is_v4())
+ {
+ auto Bytes = Addr.to_v4().to_bytes();
+ m_Server.m_HttpServer->MarkClientAddress(Bytes.data(), Bytes.size());
+ }
+ else
+ {
+ auto Bytes = Addr.to_v6().to_bytes();
+ m_Server.m_HttpServer->MarkClientAddress(Bytes.data(), Bytes.size());
+ }
}
#if ZEN_USE_OPENSSL
else if constexpr (std::is_same_v<SocketType, SslSocket>)
{
auto RemoteEndpoint = m_Socket->lowest_layer().remote_endpoint();
IsLocalConnection = m_Socket->lowest_layer().local_endpoint().address() == RemoteEndpoint.address();
- RemoteAddress = RemoteEndpoint.address().to_string();
+ auto Addr = RemoteEndpoint.address();
+ RemoteAddress = Addr.to_string();
+ if (Addr.is_v4())
+ {
+ auto Bytes = Addr.to_v4().to_bytes();
+ m_Server.m_HttpServer->MarkClientAddress(Bytes.data(), Bytes.size());
+ }
+ else
+ {
+ auto Bytes = Addr.to_v6().to_bytes();
+ m_Server.m_HttpServer->MarkClientAddress(Bytes.data(), Bytes.size());
+ }
}
#endif
else
@@ -1345,6 +1367,8 @@ HttpServerConnectionT<SocketType>::HandleRequest()
RemoteAddress = "unix";
}
+ m_Server.m_HttpServer->MarkSessionId(m_RequestData.SessionId());
+
HttpAsioServerRequest Request(m_RequestData,
*Service,
m_RequestData.Body(),
diff --git a/src/zenhttp/servers/httpsys.cpp b/src/zenhttp/servers/httpsys.cpp
index 9fe9a2254..2cad97725 100644
--- a/src/zenhttp/servers/httpsys.cpp
+++ b/src/zenhttp/servers/httpsys.cpp
@@ -2020,6 +2020,23 @@ HttpSysTransaction::InvokeRequestHandler(HttpService& Service, IoBuffer Payload)
m_HttpServer.MarkRequest();
+ // Track distinct client addresses
+ {
+ const SOCKADDR* SockAddr = HttpRequest()->Address.pRemoteAddress;
+ if (SockAddr->sa_family == AF_INET)
+ {
+ const SOCKADDR_IN* V4 = reinterpret_cast<const SOCKADDR_IN*>(SockAddr);
+ m_HttpServer.MarkClientAddress(&V4->sin_addr, sizeof(V4->sin_addr));
+ }
+ else if (SockAddr->sa_family == AF_INET6)
+ {
+ const SOCKADDR_IN6* V6 = reinterpret_cast<const SOCKADDR_IN6*>(SockAddr);
+ m_HttpServer.MarkClientAddress(&V6->sin6_addr, sizeof(V6->sin6_addr));
+ }
+ }
+
+ m_HttpServer.MarkSessionId(ThisRequest.SessionId());
+
// Default request handling
# if ZEN_WITH_OTEL
diff --git a/src/zentelemetry/hyperloglog.cpp b/src/zentelemetry/hyperloglog.cpp
new file mode 100644
index 000000000..3a06fc59f
--- /dev/null
+++ b/src/zentelemetry/hyperloglog.cpp
@@ -0,0 +1,127 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <zentelemetry/hyperloglog.h>
+
+namespace zen {
+void
+hyperloglog_forcelink()
+{
+}
+} // namespace zen
+
+#if ZEN_WITH_TESTS
+
+# include <zencore/testing.h>
+
+# include <fmt/format.h>
+
+# include <set>
+# include <string>
+
+namespace zen::tests {
+
+TEST_SUITE_BEGIN("telemetry.hyperloglog");
+
+TEST_CASE("hyperloglog.empty")
+{
+ metrics::HyperLogLog<> Hll;
+ CHECK_EQ(Hll.Count(), 0);
+}
+
+TEST_CASE("hyperloglog.single_element")
+{
+ metrics::HyperLogLog<> Hll;
+ Hll.Add("hello");
+ CHECK(Hll.Count() >= 1);
+ CHECK(Hll.Count() <= 2);
+}
+
+TEST_CASE("hyperloglog.duplicates")
+{
+ metrics::HyperLogLog<> Hll;
+ for (int i = 0; i < 1000; ++i)
+ {
+ Hll.Add("same_value");
+ }
+ CHECK_EQ(Hll.Count(), 1);
+}
+
+TEST_CASE("hyperloglog.estimate_accuracy")
+{
+ // With precision 14, expected error is ~0.81%
+ // Use a generous margin (5%) for the test to be reliable
+ constexpr uint64_t N = 100000;
+
+ metrics::HyperLogLog<14> Hll;
+ for (uint64_t i = 0; i < N; ++i)
+ {
+ std::string Value = fmt::format("item_{}", i);
+ Hll.Add(Value);
+ }
+
+ uint64_t Estimate = Hll.Count();
+ double Error = std::abs(static_cast<double>(Estimate) - static_cast<double>(N)) / static_cast<double>(N);
+
+ CHECK(Error < 0.05);
+}
+
+TEST_CASE("hyperloglog.small_cardinality")
+{
+ metrics::HyperLogLog<> Hll;
+ for (int i = 0; i < 10; ++i)
+ {
+ std::string Value = fmt::format("key_{}", i);
+ Hll.Add(Value);
+ }
+
+ uint64_t Estimate = Hll.Count();
+ CHECK(Estimate >= 8);
+ CHECK(Estimate <= 12);
+}
+
+TEST_CASE("hyperloglog.clear")
+{
+ metrics::HyperLogLog<> Hll;
+ for (int i = 0; i < 100; ++i)
+ {
+ std::string Value = fmt::format("v_{}", i);
+ Hll.Add(Value);
+ }
+ CHECK(Hll.Count() > 0);
+
+ Hll.Clear();
+ CHECK_EQ(Hll.Count(), 0);
+}
+
+TEST_CASE("hyperloglog.merge")
+{
+ constexpr uint64_t N = 50000;
+
+ metrics::HyperLogLog<14> A;
+ metrics::HyperLogLog<14> B;
+
+ // A gets even numbers, B gets odd numbers
+ for (uint64_t i = 0; i < N; ++i)
+ {
+ std::string Value = fmt::format("item_{}", i * 2);
+ A.Add(Value);
+ }
+ for (uint64_t i = 0; i < N; ++i)
+ {
+ std::string Value = fmt::format("item_{}", i * 2 + 1);
+ B.Add(Value);
+ }
+
+ A.Merge(B);
+
+ uint64_t Estimate = A.Count();
+ double Error = std::abs(static_cast<double>(Estimate) - static_cast<double>(N * 2)) / static_cast<double>(N * 2);
+
+ CHECK(Error < 0.05);
+}
+
+TEST_SUITE_END();
+
+} // namespace zen::tests
+
+#endif // ZEN_WITH_TESTS
diff --git a/src/zentelemetry/include/zentelemetry/hyperloglog.h b/src/zentelemetry/include/zentelemetry/hyperloglog.h
new file mode 100644
index 000000000..2daf75a43
--- /dev/null
+++ b/src/zentelemetry/include/zentelemetry/hyperloglog.h
@@ -0,0 +1,165 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include "zentelemetry.h"
+
+#include <zencore/intmath.h>
+#include <zencore/xxhash.h>
+
+#include <array>
+#include <atomic>
+#include <cstdint>
+#include <string_view>
+
+namespace zen::metrics {
+
+/** HyperLogLog cardinality estimator.
+ *
+ * Estimates the number of distinct elements in a stream using O(2^Precision)
+ * bytes of memory. The relative error is approximately 1.04 / sqrt(2^Precision).
+ *
+ * @tparam Precision Number of bits used for register indexing (4..16).
+ * Higher precision = more memory but lower error.
+ * Default 14 gives 16384 registers (~16 KiB) and ~0.81% error.
+ *
+ * All operations are thread-safe. Add() uses relaxed atomic compare-exchange
+ * so concurrent updates are lock-free but may occasionally lose a race (which
+ * only means a slightly delayed register update, not a correctness issue).
+ */
+template<uint8_t Precision = 14>
+class HyperLogLog
+{
+ static_assert(Precision >= 4 && Precision <= 16, "Precision must be between 4 and 16");
+
+ static constexpr uint32_t RegisterCount = 1u << Precision;
+ static constexpr uint64_t IndexMask = RegisterCount - 1;
+
+public:
+ HyperLogLog() { Clear(); }
+
+ /** Add a raw byte sequence to the sketch. */
+ void Add(const void* Data, size_t Size)
+ {
+ XXH3_128 Hash = XXH3_128::HashMemory(Data, Size);
+ AddHash(Hash);
+ }
+
+ /** Add a string value to the sketch. */
+ void Add(std::string_view Value) { Add(Value.data(), Value.size()); }
+
+ /** Add a pre-computed hash to the sketch. */
+ void AddHash(const XXH3_128& Hash)
+ {
+ // Use the first 8 bytes of the hash as our 64-bit value
+ uint64_t H;
+ memcpy(&H, Hash.Hash, sizeof(H));
+
+ uint32_t Index = static_cast<uint32_t>(H >> (64 - Precision)) & IndexMask;
+ uint64_t W = H << Precision;
+
+ // Count leading zeros of W + 1 (the +1 ensures we never store 0)
+ uint8_t Rank = static_cast<uint8_t>(CountLeadingZeros64(W) + 1);
+
+ // Atomically update the register to the maximum observed rank
+ uint8_t Current = m_Registers[Index].load(std::memory_order_relaxed);
+ while (Rank > Current)
+ {
+ if (m_Registers[Index].compare_exchange_weak(Current, Rank, std::memory_order_relaxed))
+ {
+ break;
+ }
+ }
+ }
+
+ /** Estimate the number of distinct elements added. */
+ double Estimate() const
+ {
+ // Compute the raw harmonic mean estimate
+ double Sum = 0.0;
+ uint32_t ZeroCount = 0;
+
+ for (uint32_t i = 0; i < RegisterCount; ++i)
+ {
+ uint8_t Val = m_Registers[i].load(std::memory_order_relaxed);
+ Sum += 1.0 / static_cast<double>(1ull << Val);
+ if (Val == 0)
+ {
+ ++ZeroCount;
+ }
+ }
+
+ double Alpha = AlphaM();
+ double RawEstimate = Alpha * RegisterCount * RegisterCount / Sum;
+
+ // Small range correction using LinearCounting
+ if (RawEstimate <= 2.5 * RegisterCount && ZeroCount > 0)
+ {
+ return RegisterCount * std::log(static_cast<double>(RegisterCount) / ZeroCount);
+ }
+
+ return RawEstimate;
+ }
+
+ /** Return the estimate as an integer. */
+ uint64_t Count() const
+ {
+ double E = Estimate();
+ return E < 0.5 ? 0 : static_cast<uint64_t>(E + 0.5);
+ }
+
+ /** Reset all registers to zero. */
+ void Clear()
+ {
+ for (uint32_t i = 0; i < RegisterCount; ++i)
+ {
+ m_Registers[i].store(0, std::memory_order_relaxed);
+ }
+ }
+
+ /** Merge another HyperLogLog sketch into this one (union). */
+ void Merge(const HyperLogLog& Other)
+ {
+ for (uint32_t i = 0; i < RegisterCount; ++i)
+ {
+ uint8_t OtherVal = Other.m_Registers[i].load(std::memory_order_relaxed);
+ uint8_t Current = m_Registers[i].load(std::memory_order_relaxed);
+ while (OtherVal > Current)
+ {
+ if (m_Registers[i].compare_exchange_weak(Current, OtherVal, std::memory_order_relaxed))
+ {
+ break;
+ }
+ }
+ }
+ }
+
+private:
+ std::array<std::atomic<uint8_t>, RegisterCount> m_Registers;
+
+ static double AlphaM()
+ {
+ if constexpr (Precision == 4)
+ {
+ return 0.673;
+ }
+ else if constexpr (Precision == 5)
+ {
+ return 0.697;
+ }
+ else if constexpr (Precision == 6)
+ {
+ return 0.709;
+ }
+ else
+ {
+ return 0.7213 / (1.0 + 1.079 / RegisterCount);
+ }
+ }
+};
+
+} // namespace zen::metrics
+
+namespace zen {
+void hyperloglog_forcelink();
+} // namespace zen
diff --git a/src/zentelemetry/zentelemetry.cpp b/src/zentelemetry/zentelemetry.cpp
index ed6ad13b9..63af8a450 100644
--- a/src/zentelemetry/zentelemetry.cpp
+++ b/src/zentelemetry/zentelemetry.cpp
@@ -2,6 +2,7 @@
#include "zentelemetry/zentelemetry.h"
+#include "zentelemetry/hyperloglog.h"
#include "zentelemetry/otlptrace.h"
#include "zentelemetry/stats.h"
@@ -11,6 +12,7 @@ void
zentelemetry_forcelinktests()
{
zen::stats_forcelink();
+ zen::hyperloglog_forcelink();
#if ZEN_WITH_OTEL
zen::otel::otlptrace_forcelink();
#endif