diff options
| author | Dhravya Shah <[email protected]> | 2024-08-06 11:20:29 -0700 |
|---|---|---|
| committer | GitHub <[email protected]> | 2024-08-06 11:20:29 -0700 |
| commit | 7fc39cd770e4b2f55c6fdae1fa02fe0a66a93f6d (patch) | |
| tree | 82e6a03099b50441c2fe9a9bf8e8ddf7afa293e5 /apps/cf-ai-backend/src/queueConsumer | |
| parent | Merge pull request #219 from Deepakchowdavarapu/readme-issue (diff) | |
| parent | updated kv and queues (diff) | |
| download | supermemory-7fc39cd770e4b2f55c6fdae1fa02fe0a66a93f6d.tar.xz supermemory-7fc39cd770e4b2f55c6fdae1fa02fe0a66a93f6d.zip | |
Merge pull request #193 from supermemoryai/kush/be-queue
Kush/be queue
Diffstat (limited to 'apps/cf-ai-backend/src/queueConsumer')
9 files changed, 703 insertions, 5 deletions
diff --git a/apps/cf-ai-backend/src/queueConsumer/chunkers/chonker.ts b/apps/cf-ai-backend/src/queueConsumer/chunkers/chonker.ts new file mode 100644 index 00000000..18788dab --- /dev/null +++ b/apps/cf-ai-backend/src/queueConsumer/chunkers/chonker.ts @@ -0,0 +1,47 @@ +import nlp from "compromise"; + +/** + * Split text into chunks of specified max size with some overlap for continuity. + */ +export default function chunkText( + text: string, + maxChunkSize: number, + overlap: number = 0.2, +): string[] { + const sentences = nlp(text).sentences().out("array"); + const chunks = []; + let currentChunk: string[] = []; + let currentSize = 0; + + for (let i = 0; i < sentences.length; i++) { + const sentence = sentences[i]; + currentChunk.push(sentence); + currentSize += sentence.length; + + if (currentSize >= maxChunkSize) { + // Calculate overlap + const overlapSize = Math.floor(currentChunk.length * overlap); + const chunkText = currentChunk.join(" "); + chunks.push({ + text: chunkText, + start: i - currentChunk.length + 1, + end: i, + }); + + // Prepare the next chunk with overlap + currentChunk = currentChunk.slice(-overlapSize); + currentSize = currentChunk.reduce((sum, s) => sum + s.length, 0); + } + } + + if (currentChunk.length > 0) { + const chunkText = currentChunk.join(" "); + chunks.push({ + text: chunkText, + start: sentences.length - currentChunk.length, + end: sentences.length, + }); + } + + return chunks.map((chunk) => chunk.text); +} diff --git a/apps/cf-ai-backend/src/queueConsumer/chunkers/chunkPageOrNotes.ts b/apps/cf-ai-backend/src/queueConsumer/chunkers/chunkPageOrNotes.ts new file mode 100644 index 00000000..0da01c3f --- /dev/null +++ b/apps/cf-ai-backend/src/queueConsumer/chunkers/chunkPageOrNotes.ts @@ -0,0 +1,13 @@ +import chunkText from "./chonker"; +import { PageOrNoteChunks } from "../../types"; +export function chunkPage(pageContent: string): PageOrNoteChunks { + const chunks = chunkText(pageContent, 1536); + + return { type: "page", chunks: chunks }; +} + +export function chunkNote(noteContent: string): PageOrNoteChunks { + const chunks = chunkText(noteContent, 1536); + + return { type: "note", chunks: chunks }; +} diff --git a/apps/cf-ai-backend/src/queueConsumer/chunkers/chunkTweet.ts b/apps/cf-ai-backend/src/queueConsumer/chunkers/chunkTweet.ts index ae1b18c6..46a56410 100644 --- a/apps/cf-ai-backend/src/queueConsumer/chunkers/chunkTweet.ts +++ b/apps/cf-ai-backend/src/queueConsumer/chunkers/chunkTweet.ts @@ -22,10 +22,18 @@ export interface ThreadTweetData { } export function chunkThread(threadText: string): TweetChunks { - const thread = JSON.parse(threadText); - if (typeof thread == "string") { - console.log("DA WORKER FAILED DO SOMEHTING FIX DA WROKER"); + let thread = threadText; + + try { + thread = JSON.parse(threadText); + } catch (e) { + console.log("error: thread is not json.", e); + } + + if (typeof threadText == "string") { + console.log("DA WORKER FAILED DO SOMEHTING FIX DA WROKER", thread); const rawTweet = getRawTweet(thread); + console.log(rawTweet); const parsedTweet: any = JSON.parse(rawTweet); const chunkedTweet = chunkText(parsedTweet.text, 1536); @@ -48,8 +56,8 @@ export function chunkThread(threadText: string): TweetChunks { return { type: "tweet", chunks }; } else { - console.log(JSON.stringify(thread)); - const chunkedTweets = thread.map((tweet: Tweet) => { + console.log("thread in else statement", JSON.stringify(thread)); + const chunkedTweets = (thread as any).map((tweet: Tweet) => { const chunkedTweet = chunkText(tweet.text, 1536); const metadata = { diff --git a/apps/cf-ai-backend/src/queueConsumer/helpers/processNotes.ts b/apps/cf-ai-backend/src/queueConsumer/helpers/processNotes.ts new file mode 100644 index 00000000..466690cc --- /dev/null +++ b/apps/cf-ai-backend/src/queueConsumer/helpers/processNotes.ts @@ -0,0 +1,36 @@ +import { Result, Ok, Err } from "../../errors/results"; +import { BaseError } from "../../errors/baseError"; +import { Metadata } from "../utils/get-metadata"; + +class ProcessNotesError extends BaseError { + constructor(message?: string, source?: string) { + super("[Note Processing Error]", message, source); + } +} + +type ProcessNoteResult = { + noteContent: { noteId: number; noteContent: string }; + metadata: Metadata; +}; + +export function processNote( + content: string, +): Result<ProcessNoteResult, ProcessNotesError> { + try { + const pageContent = content; + const noteId = new Date().getTime(); + + const metadata = { + baseUrl: `https://supermemory.ai/note/${noteId}`, + description: `Note created at ${new Date().toLocaleString()}`, + image: "https://supermemory.ai/logo.png", + title: `${pageContent.slice(0, 20)} ${pageContent.length > 20 ? "..." : ""}`, + }; + + const noteContent = { noteId: noteId, noteContent: pageContent }; + return Ok({ noteContent, metadata }); + } catch (e) { + console.error("[Note Processing Error]", e); + return Err(new ProcessNotesError((e as Error).message, "processNote")); + } +} diff --git a/apps/cf-ai-backend/src/queueConsumer/helpers/processPage.ts b/apps/cf-ai-backend/src/queueConsumer/helpers/processPage.ts new file mode 100644 index 00000000..9a50d701 --- /dev/null +++ b/apps/cf-ai-backend/src/queueConsumer/helpers/processPage.ts @@ -0,0 +1,43 @@ +import { Result, Ok, Err, isErr } from "../../errors/results"; +import { BaseError } from "../../errors/baseError"; +import { getMetaData, Metadata } from "../utils/get-metadata"; + +class ProcessPageError extends BaseError { + constructor(message?: string, source?: string) { + super("[Page Proceessing Error]", message, source); + } +} + +type PageProcessResult = { pageContent: string; metadata: Metadata }; + +export async function processPage(input: { + url: string; + securityKey: string; +}): Promise<Result<PageProcessResult, ProcessPageError>> { + try { + const response = await fetch("https://md.dhr.wtf/?url=" + input.url, { + headers: { + Authorization: "Bearer " + input.securityKey, + }, + }); + const pageContent = await response.text(); + if (!pageContent) { + return Err( + new ProcessPageError( + "Failed to get response form markdowner", + "processPage", + ), + ); + } + const metadataResult = await getMetaData(input.url); + if (isErr(metadataResult)) { + throw metadataResult.error; + } + const metadata = metadataResult.value; + console.log("[this is the metadata]", metadata); + return Ok({ pageContent, metadata }); + } catch (e) { + console.error("[Page Processing Error]", e); + return Err(new ProcessPageError((e as Error).message, "processPage")); + } +} diff --git a/apps/cf-ai-backend/src/queueConsumer/helpers/processTweet.ts b/apps/cf-ai-backend/src/queueConsumer/helpers/processTweet.ts new file mode 100644 index 00000000..8d83f2dc --- /dev/null +++ b/apps/cf-ai-backend/src/queueConsumer/helpers/processTweet.ts @@ -0,0 +1,88 @@ +import { Tweet } from "react-tweet/api"; +import { Result, Ok, Err, isErr } from "../../errors/results"; +import { BaseError } from "../../errors/baseError"; +import { getMetaData, Metadata } from "../utils/get-metadata"; +import { tweetToMd } from "@repo/shared-types/utils"; // can I do this? +import { Env } from "../../types"; + +class ProcessTweetError extends BaseError { + constructor(message?: string, source?: string) { + super("[Tweet Proceessing Error]", message, source); + } +} + +type GetTweetResult = Tweet; + +export const getTweetData = async ( + tweetID: string, +): Promise<Result<GetTweetResult, ProcessTweetError>> => { + try { + console.log("is fetch defined here?"); + const url = `https://cdn.syndication.twimg.com/tweet-result?id=${tweetID}&lang=en&features=tfw_timeline_list%3A%3Btfw_follower_count_sunset%3Atrue%3Btfw_tweet_edit_backend%3Aon%3Btfw_refsrc_session%3Aon%3Btfw_fosnr_soft_interventions_enabled%3Aon%3Btfw_show_birdwatch_pivots_enabled%3Aon%3Btfw_show_business_verified_badge%3Aon%3Btfw_duplicate_scribes_to_settings%3Aon%3Btfw_use_profile_image_shape_enabled%3Aon%3Btfw_show_blue_verified_badge%3Aon%3Btfw_legacy_timeline_sunset%3Atrue%3Btfw_show_gov_verified_badge%3Aon%3Btfw_show_business_affiliate_badge%3Aon%3Btfw_tweet_edit_frontend%3Aon&token=4c2mmul6mnh`; + + const resp = await fetch(url, { + headers: { + "User-Agent": + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3", + Accept: "application/json", + "Accept-Language": "en-US,en;q=0.5", + "Accept-Encoding": "gzip, deflate, br", + Connection: "keep-alive", + "Upgrade-Insecure-Requests": "1", + "Cache-Control": "max-age=0", + TE: "Trailers", + }, + }); + console.log(resp.status); + + const data = (await resp.json()) as Tweet; + + return Ok(data); + } catch (e) { + console.error("[Tweet Proceessing Error]", e); + return Err(new ProcessTweetError(e, "getTweetData")); + } +}; + +export const getThreadData = async (input: { + tweetUrl: string; + env: Env; +}): Promise<Result<any, ProcessTweetError>> => { + try { + // const threadRequest = await fetch(input.cf_thread_endpoint, { + // method: "POST", + // headers: { + // "Content-Type": "application/json", + // Authorization: input.authKey, + // }, + // body: JSON.stringify({ url: input.tweetUrl }), + // }); + // if (threadRequest.status !== 200) { + // console.log(await threadRequest.text()); + // console.log(input.tweetUrl); + // return Err( + // new ProcessTweetError( + // `Failed to fetch the thread: ${input.tweetUrl}, Reason: ${threadRequest.statusText}`, + // "getThreadData", + // ), + // ); + // } + //@ts-ignore + const thread = await input.env.THREAD.processTweets(input.tweetUrl); + console.log("[thread response]", thread); + + if (!thread.length) { + console.log("Thread is an empty array"); + return Err( + new ProcessTweetError( + "[THREAD FETCHING SERVICE] Got no content form thread worker", + "getThreadData", + ), + ); + } + return Ok(thread); + } catch (e) { + console.error("[Thread Processing Error]", e); + return Err(new ProcessTweetError((e as Error).message, "getThreadData")); + } +}; diff --git a/apps/cf-ai-backend/src/queueConsumer/index.ts b/apps/cf-ai-backend/src/queueConsumer/index.ts new file mode 100644 index 00000000..393f1fbf --- /dev/null +++ b/apps/cf-ai-backend/src/queueConsumer/index.ts @@ -0,0 +1,372 @@ +import { Env, PageOrNoteChunks, TweetChunks, vectorObj } from "../types"; +import { typeDecider } from "./utils/typeDecider"; +import { isErr, wrap } from "../errors/results"; +import { processNote } from "./helpers/processNotes"; +import { processPage } from "./helpers/processPage"; +import { getThreadData, getTweetData } from "./helpers/processTweet"; +import { tweetToMd } from "@repo/shared-types/utils"; +import { chunkNote, chunkPage } from "./chunkers/chunkPageOrNotes"; +import { chunkThread } from "./chunkers/chunkTweet"; +import { batchCreateChunksAndEmbeddings, initQuery } from "../helper"; +import { z } from "zod"; +import { Metadata } from "./utils/get-metadata"; +import { BaseError } from "../errors/baseError"; +import { database } from "../db"; +import { + storedContent, + space, + contentToSpace, + users, + jobs, + Job, +} from "@repo/db/schema"; +import { and, eq, inArray, sql } from "drizzle-orm"; + +class VectorInsertError extends BaseError { + constructor(message?: string, source?: string) { + super("[Vector Insert Error]", message, source); + } +} +const vectorErrorFactory = (err: Error) => new VectorInsertError(err.message); + +class D1InsertError extends BaseError { + constructor(message?: string, source?: string) { + super("[D1 Insert Error]", message, source); + } +} + +const d1ErrorFactory = (err: Error, source: string) => + new D1InsertError(err.message, source); + +const calculateExponentialBackoff = ( + attempts: number, + baseDelaySeconds: number, +) => { + return baseDelaySeconds ** attempts; +}; + +const BASE_DELAY_SECONDS = 5; +export async function queue( + batch: MessageBatch<{ + content: string; + space: Array<number>; + user: string; + type: string; + }>, + env: Env, +): Promise<void> { + const db = database(env); + console.log(env.CF_ACCOUNT_ID, env.CF_KV_AUTH_TOKEN); + for (let message of batch.messages) { + console.log(env.CF_ACCOUNT_ID, env.CF_KV_AUTH_TOKEN); + console.log("is thie even running?", message.body); + const body = message.body; + + const type = body.type; + const userExists = await wrap( + db.select().from(users).where(eq(users.id, body.user)).limit(1), + d1ErrorFactory, + "Error when trying to verify user", + ); + + if (isErr(userExists)) { + throw userExists.error; + } + + //check if this is a retry job.. by checking if the combination of the userId and the url already exists on the queue + let jobId; + const existingJob = await wrap( + db + .select() + .from(jobs) + .where( + and( + eq(jobs.userId, userExists.value[0].id), + eq(jobs.url, body.content), + ), + ) + .limit(1), + d1ErrorFactory, + "Error when checking for existing job", + ); + + if (isErr(existingJob)) { + throw existingJob.error; + } + + if (existingJob.value.length > 0) { + jobId = existingJob.value[0].id; + await wrap( + db + .update(jobs) + .set({ + attempts: existingJob.value[0].attempts + 1, + updatedAt: new Date(), + status: "Processing", + }) + .where(eq(jobs.id, jobId)), + d1ErrorFactory, + "Error when updating job attempts", + ); + } else { + const job = await wrap( + db + .insert(jobs) + .values({ + userId: userExists.value[0].id as string, + url: body.content, + status: "Processing", + attempts: 1, + createdAt: new Date(), + updatedAt: new Date(), + }) + .returning({ jobId: jobs.id }), + d1ErrorFactory, + "Error When inserting into jobs table", + ); + if (isErr(job)) { + throw job.error; + } + jobId = job.value[0].jobId; + } + + let pageContent: string; + let vectorData: string; + let metadata: Metadata; + let storeToSpaces = body.space; + let chunks: TweetChunks | PageOrNoteChunks; + let noteId = 0; + switch (type) { + case "note": { + console.log("note hit"); + const note = processNote(body.content); + if (isErr(note)) { + throw note.error; + } + pageContent = note.value.noteContent.noteContent; + noteId = note.value.noteContent.noteId; + metadata = note.value.metadata; + vectorData = pageContent; + chunks = chunkNote(pageContent); + break; + } + case "page": { + console.log("page hit"); + const page = await processPage({ + url: body.content, + securityKey: env.MD_SEC_KEY, + }); + if (isErr(page)) { + console.log("there is a page error here"); + throw page.error; + } + pageContent = page.value.pageContent; + metadata = page.value.metadata; + vectorData = pageContent; + chunks = chunkPage(pageContent); + break; + } + + case "tweet": { + console.log("tweet hit"); + const tweet = await getTweetData(body.content.split("/").pop()); + console.log(env.THREAD_CF_WORKER, env.THREAD_CF_AUTH); + const thread = await getThreadData({ + tweetUrl: body.content, + env: env, + }); + console.log("[This is the thread]", thread); + if (isErr(tweet)) { + throw tweet.error; + } + pageContent = tweetToMd(tweet.value); + console.log(pageContent); + metadata = { + baseUrl: body.content, + description: tweet.value.text.slice(0, 200), + image: tweet.value.user.profile_image_url_https, + title: `Tweet by ${tweet.value.user.name}`, + }; + if (isErr(thread)) { + console.log("Thread worker is down!"); + vectorData = JSON.stringify(pageContent); + console.error(thread.error); + } else { + console.log("thread worker is fine"); + vectorData = thread.value; + } + chunks = chunkThread(vectorData); + break; + } + } + + //add to mem0, abstract + + // const mem0Response = fetch('https://api.mem0.ai/v1/memories/', { + // method: 'POST', + // headers: { + // 'Content-Type': 'application/json', + // Authorization: `Token ${process.env.MEM0_API_KEY}`, + // }, + // body: JSON.stringify({ + // messages: [ + // { + // role: 'user', + // content: query, + // }, + // ], + // user_id: user?.user?.email, + // }), + // }); + + // see what's up with the storedToSpaces in this block + const { store } = await initQuery(env); + + type body = z.infer<typeof vectorObj>; + + const Chunkbody: body = { + pageContent: pageContent, + spaces: storeToSpaces.map((spaceId) => spaceId.toString()), + user: body.user, + type: type, + url: metadata.baseUrl, + description: metadata.description, + title: metadata.description, + }; + + try { + const vectorResult = await wrap( + batchCreateChunksAndEmbeddings({ + store: store, + body: Chunkbody, + chunks: chunks, + env: env, + }), + vectorErrorFactory, + "Error when Inserting into vector database", + ); + + if (isErr(vectorResult)) { + await db + .update(jobs) + .set({ error: vectorResult.error.message, status: "error" }) + .where(eq(jobs.id, jobId)); + message.retry({ + delaySeconds: calculateExponentialBackoff( + message.attempts, + BASE_DELAY_SECONDS, + ), + }); + throw vectorResult.error; + } + + const saveToDbUrl = + (metadata.baseUrl.split("#supermemory-user-")[0] ?? metadata.baseUrl) + + "#supermemory-user-" + + body.user; + let contentId: number; + + const insertResponse = await wrap( + db + .insert(storedContent) + .values({ + content: pageContent as string, + title: metadata.title, + description: metadata.description, + url: saveToDbUrl, + baseUrl: saveToDbUrl, + image: metadata.image, + savedAt: new Date(), + userId: body.user, + type: type, + noteId: noteId, + }) + .returning({ id: storedContent.id }), + d1ErrorFactory, + "Error when inserting into storedContent", + ); + + if (isErr(insertResponse)) { + await db + .update(jobs) + .set({ error: insertResponse.error.message, status: "error" }) + .where(eq(jobs.id, jobId)); + message.retry({ + delaySeconds: calculateExponentialBackoff( + message.attempts, + BASE_DELAY_SECONDS, + ), + }); + throw insertResponse.error; + } + console.log(JSON.stringify(insertResponse)); + contentId = insertResponse[0]?.id; + console.log("this is the content Id", contentId); + if (storeToSpaces.length > 0) { + // Adding the many-to-many relationship between content and spaces + const spaceData = await wrap( + db + .select() + .from(space) + .where( + and(inArray(space.id, storeToSpaces), eq(space.user, body.user)), + ) + .all(), + d1ErrorFactory, + "Error when getting data from spaces", + ); + + if (isErr(spaceData)) { + throw spaceData.error; + } + try { + await Promise.all( + spaceData.value.map(async (s) => { + try { + await db + .insert(contentToSpace) + .values({ contentId: contentId, spaceId: s.id }); + + await db.update(space).set({ numItems: s.numItems + 1 }); + } catch (e) { + console.error(`Error updating space ${s.id}:`, e); + throw e; + } + }), + ); + } catch (e) { + console.error("Error in updateSpacesWithContent:", e); + throw new Error(`Failed to update spaces: ${e.message}`); + } + } + } catch (e) { + console.error("Error in simulated transaction", e.message); + + message.retry({ + delaySeconds: calculateExponentialBackoff( + message.attempts, + BASE_DELAY_SECONDS, + ), + }); + throw new D1InsertError( + "Error when inserting into d1", + "D1 stuff after the vectorize", + ); + } + + // After the d1 and vectories suceeds then finally update the jobs table to indicate that the job has completed + + await db + .update(jobs) + .set({ status: "Processed" }) + .where(eq(jobs.id, jobId)); + + return; + } +} + +/* +To do: +Figure out rate limits!! + +*/ diff --git a/apps/cf-ai-backend/src/queueConsumer/utils/get-metadata.ts b/apps/cf-ai-backend/src/queueConsumer/utils/get-metadata.ts new file mode 100644 index 00000000..95916506 --- /dev/null +++ b/apps/cf-ai-backend/src/queueConsumer/utils/get-metadata.ts @@ -0,0 +1,57 @@ +import * as cheerio from "cheerio"; +import { Result, Ok, Err } from "../../errors/results"; +import { BaseError } from "../../errors/baseError"; + +class GetMetadataError extends BaseError { + constructor(message?: string, source?: string) { + super("[Fetch Metadata Error]", message, source); + } +} +export type Metadata = { + title: string; + description: string; + image: string; + baseUrl: string; +}; +// TODO: THIS SHOULD PROBABLY ALSO FETCH THE OG-IMAGE +export async function getMetaData( + url: string, +): Promise<Result<Metadata, GetMetadataError>> { + try { + const response = await fetch(url); + const html = await response.text(); + + const $ = cheerio.load(html); + + // Extract the base URL + const baseUrl = url; + + // Extract title + const title = $("title").text().trim(); + + const description = $("meta[name=description]").attr("content") ?? ""; + + const _favicon = + $("link[rel=icon]").attr("href") ?? "https://supermemory.dhr.wtf/web.svg"; + + let favicon = + _favicon.trim().length > 0 + ? _favicon.trim() + : "https://supermemory.dhr.wtf/web.svg"; + if (favicon.startsWith("/")) { + favicon = baseUrl + favicon; + } else if (favicon.startsWith("./")) { + favicon = baseUrl + favicon.slice(1); + } + + return Ok({ + title, + description, + image: favicon, + baseUrl, + }); + } catch (e) { + console.error("[Metadata Fetch Error]", e); + return Err(new GetMetadataError((e as Error).message, "getMetaData")); + } +} diff --git a/apps/cf-ai-backend/src/queueConsumer/utils/typeDecider.ts b/apps/cf-ai-backend/src/queueConsumer/utils/typeDecider.ts new file mode 100644 index 00000000..037ab40c --- /dev/null +++ b/apps/cf-ai-backend/src/queueConsumer/utils/typeDecider.ts @@ -0,0 +1,34 @@ +import { Result, Ok, Err } from "../../errors/results"; +import { BaseError } from "../../errors/baseError"; + +export type contentType = "page" | "tweet" | "note"; + +class GetTypeError extends BaseError { + constructor(message?: string, source?: string) { + super("[Decide Type Error]", message, source); + } +} +export const typeDecider = ( + content: string, +): Result<contentType, GetTypeError> => { + try { + // if the content is a URL, then it's a page. if its a URL with https://x.com/user/status/123, then it's a tweet. else, it's a note. + // do strict checking with regex + if ( + content.match(/https?:\/\/(x\.com|twitter\.com)\/[\w]+\/[\w]+\/[\d]+/) + ) { + return Ok("tweet"); + } else if ( + content.match( + /^(https?:\/\/)?(www\.)?[a-z0-9]+([-.]{1}[a-z0-9]+)*\.[a-z]{2,5}(\/.*)?$/i, + ) + ) { + return Ok("page"); + } else { + return Ok("note"); + } + } catch (e) { + console.error("[Decide Type Error]", e); + return Err(new GetTypeError((e as Error).message, "typeDecider")); + } +}; |