diff options
Diffstat (limited to 'src/queries/sql/sessions/saveSessionData.ts')
| -rw-r--r-- | src/queries/sql/sessions/saveSessionData.ts | 112 |
1 files changed, 112 insertions, 0 deletions
diff --git a/src/queries/sql/sessions/saveSessionData.ts b/src/queries/sql/sessions/saveSessionData.ts new file mode 100644 index 0000000..7409317 --- /dev/null +++ b/src/queries/sql/sessions/saveSessionData.ts @@ -0,0 +1,112 @@ +import clickhouse from '@/lib/clickhouse'; +import { DATA_TYPE } from '@/lib/constants'; +import { uuid } from '@/lib/crypto'; +import { flattenJSON, getStringValue } from '@/lib/data'; +import { CLICKHOUSE, PRISMA, runQuery } from '@/lib/db'; +import kafka from '@/lib/kafka'; +import prisma from '@/lib/prisma'; +import type { DynamicData } from '@/lib/types'; + +export interface SaveSessionDataArgs { + websiteId: string; + sessionId: string; + sessionData: DynamicData; + distinctId?: string; + createdAt?: Date; +} + +export async function saveSessionData(data: SaveSessionDataArgs) { + return runQuery({ + [PRISMA]: () => relationalQuery(data), + [CLICKHOUSE]: () => clickhouseQuery(data), + }); +} + +export async function relationalQuery({ + websiteId, + sessionId, + sessionData, + distinctId, + createdAt, +}: SaveSessionDataArgs) { + const { client } = prisma; + + const jsonKeys = flattenJSON(sessionData); + + const flattenedData = jsonKeys.map(a => ({ + id: uuid(), + websiteId, + sessionId, + dataKey: a.key, + stringValue: getStringValue(a.value, a.dataType), + numberValue: a.dataType === DATA_TYPE.number ? a.value : null, + dateValue: a.dataType === DATA_TYPE.date ? new Date(a.value) : null, + dataType: a.dataType, + distinctId, + createdAt, + })); + + const existing = await client.sessionData.findMany({ + where: { + sessionId, + }, + select: { + id: true, + sessionId: true, + dataKey: true, + }, + }); + + for (const data of flattenedData) { + const { sessionId, dataKey, ...props } = data; + const record = existing.find(e => e.sessionId === sessionId && e.dataKey === dataKey); + + if (record) { + await client.sessionData.update({ + where: { + id: record.id, + }, + data: { + ...props, + }, + }); + } else { + await client.sessionData.create({ + data, + }); + } + } +} + +async function clickhouseQuery({ + websiteId, + sessionId, + sessionData, + distinctId, + createdAt, +}: SaveSessionDataArgs) { + const { insert, getUTCString } = clickhouse; + const { sendMessage } = kafka; + + const jsonKeys = flattenJSON(sessionData); + + const messages = jsonKeys.map(({ key, value, dataType }) => { + return { + website_id: websiteId, + session_id: sessionId, + data_key: key, + data_type: dataType, + string_value: getStringValue(value, dataType), + number_value: dataType === DATA_TYPE.number ? value : null, + date_value: dataType === DATA_TYPE.date ? getUTCString(value) : null, + distinct_id: distinctId, + created_at: getUTCString(createdAt), + }; + }); + + if (kafka.enabled) { + await sendMessage('session_data', messages); + } else { + await insert('session_data', messages); + } +} |