aboutsummaryrefslogtreecommitdiff
path: root/src/queries/sql/sessions/saveSessionData.ts
diff options
context:
space:
mode:
Diffstat (limited to 'src/queries/sql/sessions/saveSessionData.ts')
-rw-r--r--src/queries/sql/sessions/saveSessionData.ts112
1 files changed, 112 insertions, 0 deletions
diff --git a/src/queries/sql/sessions/saveSessionData.ts b/src/queries/sql/sessions/saveSessionData.ts
new file mode 100644
index 0000000..7409317
--- /dev/null
+++ b/src/queries/sql/sessions/saveSessionData.ts
@@ -0,0 +1,112 @@
+import clickhouse from '@/lib/clickhouse';
+import { DATA_TYPE } from '@/lib/constants';
+import { uuid } from '@/lib/crypto';
+import { flattenJSON, getStringValue } from '@/lib/data';
+import { CLICKHOUSE, PRISMA, runQuery } from '@/lib/db';
+import kafka from '@/lib/kafka';
+import prisma from '@/lib/prisma';
+import type { DynamicData } from '@/lib/types';
+
+export interface SaveSessionDataArgs {
+ websiteId: string;
+ sessionId: string;
+ sessionData: DynamicData;
+ distinctId?: string;
+ createdAt?: Date;
+}
+
+export async function saveSessionData(data: SaveSessionDataArgs) {
+ return runQuery({
+ [PRISMA]: () => relationalQuery(data),
+ [CLICKHOUSE]: () => clickhouseQuery(data),
+ });
+}
+
+export async function relationalQuery({
+ websiteId,
+ sessionId,
+ sessionData,
+ distinctId,
+ createdAt,
+}: SaveSessionDataArgs) {
+ const { client } = prisma;
+
+ const jsonKeys = flattenJSON(sessionData);
+
+ const flattenedData = jsonKeys.map(a => ({
+ id: uuid(),
+ websiteId,
+ sessionId,
+ dataKey: a.key,
+ stringValue: getStringValue(a.value, a.dataType),
+ numberValue: a.dataType === DATA_TYPE.number ? a.value : null,
+ dateValue: a.dataType === DATA_TYPE.date ? new Date(a.value) : null,
+ dataType: a.dataType,
+ distinctId,
+ createdAt,
+ }));
+
+ const existing = await client.sessionData.findMany({
+ where: {
+ sessionId,
+ },
+ select: {
+ id: true,
+ sessionId: true,
+ dataKey: true,
+ },
+ });
+
+ for (const data of flattenedData) {
+ const { sessionId, dataKey, ...props } = data;
+ const record = existing.find(e => e.sessionId === sessionId && e.dataKey === dataKey);
+
+ if (record) {
+ await client.sessionData.update({
+ where: {
+ id: record.id,
+ },
+ data: {
+ ...props,
+ },
+ });
+ } else {
+ await client.sessionData.create({
+ data,
+ });
+ }
+ }
+}
+
+async function clickhouseQuery({
+ websiteId,
+ sessionId,
+ sessionData,
+ distinctId,
+ createdAt,
+}: SaveSessionDataArgs) {
+ const { insert, getUTCString } = clickhouse;
+ const { sendMessage } = kafka;
+
+ const jsonKeys = flattenJSON(sessionData);
+
+ const messages = jsonKeys.map(({ key, value, dataType }) => {
+ return {
+ website_id: websiteId,
+ session_id: sessionId,
+ data_key: key,
+ data_type: dataType,
+ string_value: getStringValue(value, dataType),
+ number_value: dataType === DATA_TYPE.number ? value : null,
+ date_value: dataType === DATA_TYPE.date ? getUTCString(value) : null,
+ distinct_id: distinctId,
+ created_at: getUTCString(createdAt),
+ };
+ });
+
+ if (kafka.enabled) {
+ await sendMessage('session_data', messages);
+ } else {
+ await insert('session_data', messages);
+ }
+}