aboutsummaryrefslogtreecommitdiff
path: root/src/lib/kafka.ts
diff options
context:
space:
mode:
authorFuwn <[email protected]>2026-01-24 13:09:50 +0000
committerFuwn <[email protected]>2026-01-24 13:09:50 +0000
commit396acf3bbbe00a192cb0ea0a9ccf91b1d8d2850b (patch)
treeb9df4ca6a70db45cfffbae6fdd7252e20fb8e93c /src/lib/kafka.ts
downloadumami-main.tar.xz
umami-main.zip
Initial commitHEADmain
Created from https://vercel.com/new
Diffstat (limited to 'src/lib/kafka.ts')
-rw-r--r--src/lib/kafka.ts112
1 files changed, 112 insertions, 0 deletions
diff --git a/src/lib/kafka.ts b/src/lib/kafka.ts
new file mode 100644
index 0000000..1d60e1f
--- /dev/null
+++ b/src/lib/kafka.ts
@@ -0,0 +1,112 @@
+import type * as tls from 'node:tls';
+import debug from 'debug';
+import { Kafka, logLevel, type Producer, type RecordMetadata, type SASLOptions } from 'kafkajs';
+import { serializeError } from 'serialize-error';
+import { KAFKA, KAFKA_PRODUCER } from '@/lib/db';
+
+const log = debug('umami:kafka');
+const CONNECT_TIMEOUT = 5000;
+const SEND_TIMEOUT = 3000;
+const ACKS = 1;
+
+let kafka: Kafka;
+let producer: Producer;
+const enabled = Boolean(process.env.KAFKA_URL && process.env.KAFKA_BROKER);
+
+function getClient() {
+ const { username, password } = new URL(process.env.KAFKA_URL);
+ const brokers = process.env.KAFKA_BROKER.split(',');
+ const mechanism =
+ (process.env.KAFKA_SASL_MECHANISM as 'plain' | 'scram-sha-256' | 'scram-sha-512') || 'plain';
+
+ const ssl: { ssl?: tls.ConnectionOptions | boolean; sasl?: SASLOptions } =
+ username && password
+ ? {
+ ssl: {
+ rejectUnauthorized: false,
+ },
+ sasl: {
+ mechanism,
+ username,
+ password,
+ },
+ }
+ : {};
+
+ const client: Kafka = new Kafka({
+ clientId: 'umami',
+ brokers: brokers,
+ connectionTimeout: CONNECT_TIMEOUT,
+ logLevel: logLevel.ERROR,
+ ...ssl,
+ });
+
+ if (process.env.NODE_ENV !== 'production') {
+ globalThis[KAFKA] = client;
+ }
+
+ log('Kafka initialized');
+
+ return client;
+}
+
+async function getProducer(): Promise<Producer> {
+ const producer = kafka.producer();
+ await producer.connect();
+
+ if (process.env.NODE_ENV !== 'production') {
+ globalThis[KAFKA_PRODUCER] = producer;
+ }
+
+ log('Kafka producer initialized');
+
+ return producer;
+}
+
+async function sendMessage(
+ topic: string,
+ message: Record<string, string | number> | Record<string, string | number>[],
+): Promise<RecordMetadata[]> {
+ try {
+ await connect();
+
+ return producer.send({
+ topic,
+ messages: Array.isArray(message)
+ ? message.map(a => {
+ return { value: JSON.stringify(a) };
+ })
+ : [
+ {
+ value: JSON.stringify(message),
+ },
+ ],
+ timeout: SEND_TIMEOUT,
+ acks: ACKS,
+ });
+ } catch (e) {
+ // eslint-disable-next-line no-console
+ console.log('KAFKA ERROR:', serializeError(e));
+ }
+}
+
+async function connect(): Promise<Kafka> {
+ if (!kafka) {
+ kafka = process.env.KAFKA_URL && process.env.KAFKA_BROKER && (globalThis[KAFKA] || getClient());
+
+ if (kafka) {
+ producer = globalThis[KAFKA_PRODUCER] || (await getProducer());
+ }
+ }
+
+ return kafka;
+}
+
+export default {
+ enabled,
+ client: kafka,
+ producer,
+ log,
+ connect,
+ sendMessage,
+};