diff options
| author | Fuwn <[email protected]> | 2026-01-24 13:09:50 +0000 |
|---|---|---|
| committer | Fuwn <[email protected]> | 2026-01-24 13:09:50 +0000 |
| commit | 396acf3bbbe00a192cb0ea0a9ccf91b1d8d2850b (patch) | |
| tree | b9df4ca6a70db45cfffbae6fdd7252e20fb8e93c /src/lib/kafka.ts | |
| download | umami-main.tar.xz umami-main.zip | |
Created from https://vercel.com/new
Diffstat (limited to 'src/lib/kafka.ts')
| -rw-r--r-- | src/lib/kafka.ts | 112 |
1 files changed, 112 insertions, 0 deletions
diff --git a/src/lib/kafka.ts b/src/lib/kafka.ts new file mode 100644 index 0000000..1d60e1f --- /dev/null +++ b/src/lib/kafka.ts @@ -0,0 +1,112 @@ +import type * as tls from 'node:tls'; +import debug from 'debug'; +import { Kafka, logLevel, type Producer, type RecordMetadata, type SASLOptions } from 'kafkajs'; +import { serializeError } from 'serialize-error'; +import { KAFKA, KAFKA_PRODUCER } from '@/lib/db'; + +const log = debug('umami:kafka'); +const CONNECT_TIMEOUT = 5000; +const SEND_TIMEOUT = 3000; +const ACKS = 1; + +let kafka: Kafka; +let producer: Producer; +const enabled = Boolean(process.env.KAFKA_URL && process.env.KAFKA_BROKER); + +function getClient() { + const { username, password } = new URL(process.env.KAFKA_URL); + const brokers = process.env.KAFKA_BROKER.split(','); + const mechanism = + (process.env.KAFKA_SASL_MECHANISM as 'plain' | 'scram-sha-256' | 'scram-sha-512') || 'plain'; + + const ssl: { ssl?: tls.ConnectionOptions | boolean; sasl?: SASLOptions } = + username && password + ? { + ssl: { + rejectUnauthorized: false, + }, + sasl: { + mechanism, + username, + password, + }, + } + : {}; + + const client: Kafka = new Kafka({ + clientId: 'umami', + brokers: brokers, + connectionTimeout: CONNECT_TIMEOUT, + logLevel: logLevel.ERROR, + ...ssl, + }); + + if (process.env.NODE_ENV !== 'production') { + globalThis[KAFKA] = client; + } + + log('Kafka initialized'); + + return client; +} + +async function getProducer(): Promise<Producer> { + const producer = kafka.producer(); + await producer.connect(); + + if (process.env.NODE_ENV !== 'production') { + globalThis[KAFKA_PRODUCER] = producer; + } + + log('Kafka producer initialized'); + + return producer; +} + +async function sendMessage( + topic: string, + message: Record<string, string | number> | Record<string, string | number>[], +): Promise<RecordMetadata[]> { + try { + await connect(); + + return producer.send({ + topic, + messages: Array.isArray(message) + ? message.map(a => { + return { value: JSON.stringify(a) }; + }) + : [ + { + value: JSON.stringify(message), + }, + ], + timeout: SEND_TIMEOUT, + acks: ACKS, + }); + } catch (e) { + // eslint-disable-next-line no-console + console.log('KAFKA ERROR:', serializeError(e)); + } +} + +async function connect(): Promise<Kafka> { + if (!kafka) { + kafka = process.env.KAFKA_URL && process.env.KAFKA_BROKER && (globalThis[KAFKA] || getClient()); + + if (kafka) { + producer = globalThis[KAFKA_PRODUCER] || (await getProducer()); + } + } + + return kafka; +} + +export default { + enabled, + client: kafka, + producer, + log, + connect, + sendMessage, +}; |