From 6e1d53e28a056e429c54e1e6af45eaa7939daa41 Mon Sep 17 00:00:00 2001 From: Kush Thaker Date: Wed, 31 Jul 2024 10:56:40 +0530 Subject: queues so far Co-authored-by: Dhravya Shah --- .../src/queueConsumer/chunkers/chonker.ts | 47 +++++ .../src/queueConsumer/chunkers/chunkPageOrNotes.ts | 13 ++ .../src/queueConsumer/chunkers/chunkTweet.ts | 65 +++++++ .../src/queueConsumer/helpers/initQuery.ts | 58 ++++++ .../src/queueConsumer/helpers/processNotes.ts | 36 ++++ .../src/queueConsumer/helpers/processPage.ts | 42 +++++ .../src/queueConsumer/helpers/processTweet.ts | 81 ++++++++ apps/cf-ai-backend/src/queueConsumer/index.ts | 204 +++++++++++++++++++++ .../src/queueConsumer/utils/get-metadata.ts | 57 ++++++ .../src/queueConsumer/utils/typeDecider.ts | 34 ++++ 10 files changed, 637 insertions(+) create mode 100644 apps/cf-ai-backend/src/queueConsumer/chunkers/chonker.ts create mode 100644 apps/cf-ai-backend/src/queueConsumer/chunkers/chunkPageOrNotes.ts create mode 100644 apps/cf-ai-backend/src/queueConsumer/chunkers/chunkTweet.ts create mode 100644 apps/cf-ai-backend/src/queueConsumer/helpers/initQuery.ts create mode 100644 apps/cf-ai-backend/src/queueConsumer/helpers/processNotes.ts create mode 100644 apps/cf-ai-backend/src/queueConsumer/helpers/processPage.ts create mode 100644 apps/cf-ai-backend/src/queueConsumer/helpers/processTweet.ts create mode 100644 apps/cf-ai-backend/src/queueConsumer/index.ts create mode 100644 apps/cf-ai-backend/src/queueConsumer/utils/get-metadata.ts create mode 100644 apps/cf-ai-backend/src/queueConsumer/utils/typeDecider.ts (limited to 'apps/cf-ai-backend/src/queueConsumer') diff --git a/apps/cf-ai-backend/src/queueConsumer/chunkers/chonker.ts b/apps/cf-ai-backend/src/queueConsumer/chunkers/chonker.ts new file mode 100644 index 00000000..18788dab --- /dev/null +++ b/apps/cf-ai-backend/src/queueConsumer/chunkers/chonker.ts @@ -0,0 +1,47 @@ +import nlp from "compromise"; + +/** + * Split text into chunks of specified max size with some overlap for continuity. + */ +export default function chunkText( + text: string, + maxChunkSize: number, + overlap: number = 0.2, +): string[] { + const sentences = nlp(text).sentences().out("array"); + const chunks = []; + let currentChunk: string[] = []; + let currentSize = 0; + + for (let i = 0; i < sentences.length; i++) { + const sentence = sentences[i]; + currentChunk.push(sentence); + currentSize += sentence.length; + + if (currentSize >= maxChunkSize) { + // Calculate overlap + const overlapSize = Math.floor(currentChunk.length * overlap); + const chunkText = currentChunk.join(" "); + chunks.push({ + text: chunkText, + start: i - currentChunk.length + 1, + end: i, + }); + + // Prepare the next chunk with overlap + currentChunk = currentChunk.slice(-overlapSize); + currentSize = currentChunk.reduce((sum, s) => sum + s.length, 0); + } + } + + if (currentChunk.length > 0) { + const chunkText = currentChunk.join(" "); + chunks.push({ + text: chunkText, + start: sentences.length - currentChunk.length, + end: sentences.length, + }); + } + + return chunks.map((chunk) => chunk.text); +} diff --git a/apps/cf-ai-backend/src/queueConsumer/chunkers/chunkPageOrNotes.ts b/apps/cf-ai-backend/src/queueConsumer/chunkers/chunkPageOrNotes.ts new file mode 100644 index 00000000..0da01c3f --- /dev/null +++ b/apps/cf-ai-backend/src/queueConsumer/chunkers/chunkPageOrNotes.ts @@ -0,0 +1,13 @@ +import chunkText from "./chonker"; +import { PageOrNoteChunks } from "../../types"; +export function chunkPage(pageContent: string): PageOrNoteChunks { + const chunks = chunkText(pageContent, 1536); + + return { type: "page", chunks: chunks }; +} + +export function chunkNote(noteContent: string): PageOrNoteChunks { + const chunks = chunkText(noteContent, 1536); + + return { type: "note", chunks: chunks }; +} diff --git a/apps/cf-ai-backend/src/queueConsumer/chunkers/chunkTweet.ts b/apps/cf-ai-backend/src/queueConsumer/chunkers/chunkTweet.ts new file mode 100644 index 00000000..f4dd2e16 --- /dev/null +++ b/apps/cf-ai-backend/src/queueConsumer/chunkers/chunkTweet.ts @@ -0,0 +1,65 @@ +import { TweetChunks } from "../../types"; +import chunkText from "./chonker"; +import { getRawTweet } from "@repo/shared-types/utils"; + +interface Tweet { + id: string; + text: string; + links: Array; + images: Array; + videos: Array; +} +interface Metadata { + tweetId: string; + tweetLinks: any[]; + tweetVids: any[]; + tweetImages: any[]; +} + +export interface ThreadTweetData { + chunkedTweet: string[]; + metadata: Metadata; +} + +export function chunkThread(threadText: string): TweetChunks { + const thread = JSON.parse(threadText); + if (typeof thread == "string") { + console.log("DA WORKER FAILED DO SOMEHTING FIX DA WROKER"); + const rawTweet = getRawTweet(thread); + const parsedTweet: any = JSON.parse(rawTweet); + + const chunkedTweet = chunkText(parsedTweet.text, 1536); + const metadata: Metadata = { + tweetId: parsedTweet.id_str, + tweetLinks: parsedTweet.entities.urls.map((url: any) => url.expanded_url), + tweetVids: + parsedTweet.extended_entities?.media + .filter((media: any) => media.type === "video") + .map((media: any) => media.video_info!.variants[0].url) || [], + tweetImages: + parsedTweet.extended_entities?.media + .filter((media: any) => media.type === "photo") + .map((media: any) => media.media_url_https!) || [], + }; + + const chunks = [{ chunkedTweet: chunkedTweet, metadata }]; + + return { type: "tweet", chunks }; + } else { + console.log(JSON.stringify(thread)); + const chunkedTweets = thread.map((tweet: Tweet) => { + const chunkedTweet = chunkText(tweet.text, 1536); + + const metadata = { + tweetId: tweet.id, + tweetLinks: tweet.links, + tweetVids: tweet.videos, + tweetImages: tweet.images, + }; + + return { chunkedTweet, metadata }; + }); + + return { type: "tweet", chunks: chunkedTweets }; + } +} diff --git a/apps/cf-ai-backend/src/queueConsumer/helpers/initQuery.ts b/apps/cf-ai-backend/src/queueConsumer/helpers/initQuery.ts new file mode 100644 index 00000000..a7d85c23 --- /dev/null +++ b/apps/cf-ai-backend/src/queueConsumer/helpers/initQuery.ts @@ -0,0 +1,58 @@ +import { Env } from "../../types"; +import { OpenAIEmbeddings } from "../../utils/OpenAIEmbedder"; +import { CloudflareVectorizeStore } from "@langchain/cloudflare"; +import { createOpenAI } from "@ai-sdk/openai"; +import { createGoogleGenerativeAI } from "@ai-sdk/google"; +import { createAnthropic } from "@ai-sdk/anthropic"; + +export async function initQQuery( + env: Env, + model: string = "gpt-4o", +) { + const embeddings = new OpenAIEmbeddings({ + apiKey: env.OPENAI_API_KEY, + modelName: "text-embedding-3-small", + }); + + const store = new CloudflareVectorizeStore(embeddings, { + index: env.VECTORIZE_INDEX, + }); + + let selectedModel: + | ReturnType> + | ReturnType> + | ReturnType>; + + switch (model) { + case "claude-3-opus": + const anthropic = createAnthropic({ + apiKey: env.ANTHROPIC_API_KEY, + baseURL: + "https://gateway.ai.cloudflare.com/v1/47c2b4d598af9d423c06fc9f936226d5/supermemory/anthropic", + }); + selectedModel = anthropic.chat("claude-3-opus-20240229"); + console.log("Selected model: ", selectedModel); + break; + case "gemini-1.5-pro": + const googleai = createGoogleGenerativeAI({ + apiKey: env.GOOGLE_AI_API_KEY, + baseURL: + "https://gateway.ai.cloudflare.com/v1/47c2b4d598af9d423c06fc9f936226d5/supermemory/google-vertex-ai", + }); + selectedModel = googleai.chat("models/gemini-1.5-pro-latest"); + console.log("Selected model: ", selectedModel); + break; + case "gpt-4o": + default: + const openai = createOpenAI({ + apiKey: env.OPENAI_API_KEY, + baseURL: + "https://gateway.ai.cloudflare.com/v1/47c2b4d598af9d423c06fc9f936226d5/supermemory/openai", + compatibility: "strict", + }); + selectedModel = openai.chat("gpt-4o-mini"); + break; + } + + return { store, model: selectedModel }; +} \ No newline at end of file diff --git a/apps/cf-ai-backend/src/queueConsumer/helpers/processNotes.ts b/apps/cf-ai-backend/src/queueConsumer/helpers/processNotes.ts new file mode 100644 index 00000000..466690cc --- /dev/null +++ b/apps/cf-ai-backend/src/queueConsumer/helpers/processNotes.ts @@ -0,0 +1,36 @@ +import { Result, Ok, Err } from "../../errors/results"; +import { BaseError } from "../../errors/baseError"; +import { Metadata } from "../utils/get-metadata"; + +class ProcessNotesError extends BaseError { + constructor(message?: string, source?: string) { + super("[Note Processing Error]", message, source); + } +} + +type ProcessNoteResult = { + noteContent: { noteId: number; noteContent: string }; + metadata: Metadata; +}; + +export function processNote( + content: string, +): Result { + try { + const pageContent = content; + const noteId = new Date().getTime(); + + const metadata = { + baseUrl: `https://supermemory.ai/note/${noteId}`, + description: `Note created at ${new Date().toLocaleString()}`, + image: "https://supermemory.ai/logo.png", + title: `${pageContent.slice(0, 20)} ${pageContent.length > 20 ? "..." : ""}`, + }; + + const noteContent = { noteId: noteId, noteContent: pageContent }; + return Ok({ noteContent, metadata }); + } catch (e) { + console.error("[Note Processing Error]", e); + return Err(new ProcessNotesError((e as Error).message, "processNote")); + } +} diff --git a/apps/cf-ai-backend/src/queueConsumer/helpers/processPage.ts b/apps/cf-ai-backend/src/queueConsumer/helpers/processPage.ts new file mode 100644 index 00000000..6b28c975 --- /dev/null +++ b/apps/cf-ai-backend/src/queueConsumer/helpers/processPage.ts @@ -0,0 +1,42 @@ +import { Result, Ok, Err, isErr } from "../../errors/results"; +import { BaseError } from "../../errors/baseError"; +import { getMetaData, Metadata } from "../utils/get-metadata"; + +class ProcessPageError extends BaseError { + constructor(message?: string, source?: string) { + super("[Page Proceessing Error]", message, source); + } +} + +type PageProcessResult = { pageContent: string; metadata: Metadata }; + +export async function processPage( + url: string, +): Promise> { + try { + const response = await fetch("https://md.dhr.wtf/?url=" + url, { + headers: { + Authorization: "Bearer " + process.env.BACKEND_SECURITY_KEY, + }, + }); + const pageContent = await response.text(); + if (!pageContent) { + return Err( + new ProcessPageError( + "Failed to get response form markdowner", + "processPage", + ), + ); + } + console.log("[This is the page content]", pageContent); + const metadataResult = await getMetaData(url); + if (isErr(metadataResult)) { + throw metadataResult.error; + } + const metadata = metadataResult.value; + return Ok({ pageContent, metadata }); + } catch (e) { + console.error("[Page Processing Error]", e); + return Err(new ProcessPageError((e as Error).message, "processPage")); + } +} diff --git a/apps/cf-ai-backend/src/queueConsumer/helpers/processTweet.ts b/apps/cf-ai-backend/src/queueConsumer/helpers/processTweet.ts new file mode 100644 index 00000000..ef5d9f5b --- /dev/null +++ b/apps/cf-ai-backend/src/queueConsumer/helpers/processTweet.ts @@ -0,0 +1,81 @@ +import { Tweet } from "react-tweet/api"; +import { Result, Ok, Err, isErr } from "../../errors/results"; +import { BaseError } from "../../errors/baseError"; +import { getMetaData, Metadata } from "../utils/get-metadata"; +import { tweetToMd } from "@repo/shared-types/utils"; // can I do this? + +class ProcessTweetError extends BaseError { + constructor(message?: string, source?: string) { + super("[Tweet Proceessing Error]", message, source); + } +} + +type GetTweetResult = Tweet; + +export const getTweetData = async ( + tweetID: string, +): Promise> => { + try { + console.log("is fetch defined here?"); + const url = `https://cdn.syndication.twimg.com/tweet-result?id=${tweetID}&lang=en&features=tfw_timeline_list%3A%3Btfw_follower_count_sunset%3Atrue%3Btfw_tweet_edit_backend%3Aon%3Btfw_refsrc_session%3Aon%3Btfw_fosnr_soft_interventions_enabled%3Aon%3Btfw_show_birdwatch_pivots_enabled%3Aon%3Btfw_show_business_verified_badge%3Aon%3Btfw_duplicate_scribes_to_settings%3Aon%3Btfw_use_profile_image_shape_enabled%3Aon%3Btfw_show_blue_verified_badge%3Aon%3Btfw_legacy_timeline_sunset%3Atrue%3Btfw_show_gov_verified_badge%3Aon%3Btfw_show_business_affiliate_badge%3Aon%3Btfw_tweet_edit_frontend%3Aon&token=4c2mmul6mnh`; + + const resp = await fetch(url, { + headers: { + "User-Agent": + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3", + Accept: "application/json", + "Accept-Language": "en-US,en;q=0.5", + "Accept-Encoding": "gzip, deflate, br", + Connection: "keep-alive", + "Upgrade-Insecure-Requests": "1", + "Cache-Control": "max-age=0", + TE: "Trailers", + }, + }); + console.log(resp.status); + + const data = (await resp.json()) as Tweet; + + return Ok(data); + } catch (e) { + console.error("[Tweet Proceessing Error]", e); + return Err(new ProcessTweetError(e, "getTweetData")); + } +}; + +export const getThreadData = async ( + tweetUrl: string, + cf_thread_endpoint: string, + authKey: string, +): Promise> => { + const threadRequest = await fetch(cf_thread_endpoint, { + method: "POST", + headers: { + "Content-Type": "application/json", + Authorization: authKey, + }, + body: JSON.stringify({ url: tweetUrl }), + }); + if (threadRequest.status !== 200) { + return Err( + new ProcessTweetError( + `Failed to fetch the thread: ${tweetUrl}, Reason: ${threadRequest.statusText}`, + "getThreadData", + ), + ); + } + + const thread = await threadRequest.text(); + console.log("[thread response]"); + + if (thread.trim().length === 2) { + console.log("Thread is an empty array"); + return Err( + new ProcessTweetError( + "[THREAD FETCHING SERVICE] Got no content form thread worker", + "getThreadData", + ), + ); + } + return Ok(thread); +}; diff --git a/apps/cf-ai-backend/src/queueConsumer/index.ts b/apps/cf-ai-backend/src/queueConsumer/index.ts new file mode 100644 index 00000000..8657603d --- /dev/null +++ b/apps/cf-ai-backend/src/queueConsumer/index.ts @@ -0,0 +1,204 @@ +import { Env, PageOrNoteChunks, TweetChunks, vectorObj } from "../types"; +import { typeDecider } from "./utils/typeDecider"; +import { isErr, wrap } from "../errors/results"; +import { processNote } from "./helpers/processNotes"; +import { processPage } from "./helpers/processPage"; +import { getThreadData, getTweetData } from "./helpers/processTweet"; +import { tweetToMd } from "@repo/shared-types/utils"; +import { initQQuery } from "./helpers/initQuery"; +import { chunkNote, chunkPage } from "./chunkers/chunkPageOrNotes"; +import { chunkThread } from "./chunkers/chunkTweet"; +import { batchCreateChunksAndEmbeddings } from "../helper"; +import { z } from "zod"; +import { Metadata } from "./utils/get-metadata"; +import { BaseError } from "../errors/baseError"; +import { database } from "../db"; +import { storedContent, space, contentToSpace } from "../db/schema"; +import { and, eq, inArray, sql } from "drizzle-orm"; + +class VectorInsertError extends BaseError { + constructor(message?: string, source?: string) { + super("[Vector Insert Error]", message, source); + } +} +const vectorErrorFactory = (err: Error) => new VectorInsertError(err.message); + +class D1InsertError extends BaseError { + constructor(message?: string, source?: string) { + super("[D1 Insert Error]", message, source); + } +} + +export async function queue( + batch: MessageBatch<{ content: string; space: Array; user: string }>, + env: Env, +): Promise { + console.log(env.CF_ACCOUNT_ID, env.CF_KV_AUTH_TOKEN); + for (let message of batch.messages) { + console.log(env.CF_ACCOUNT_ID, env.CF_KV_AUTH_TOKEN); + console.log("is thie even running?", message.body); + const body = message.body; + console.log("v got shit in the queue", body); + + const typeResult = typeDecider(body.content); + + if (isErr(typeResult)) { + throw typeResult.error; + } + console.log(typeResult.value); + const type = typeResult.value; + + let pageContent: string; + let vectorData: string; + let metadata: Metadata; + let storeToSpaces = body.space; + let chunks: TweetChunks | PageOrNoteChunks; + let noteId = 0; + switch (type) { + case "note": { + console.log("note hit"); + const note = processNote(body.content); + if (isErr(note)) { + throw note.error; + } + pageContent = note.value.noteContent.noteContent; + noteId = note.value.noteContent.noteId; + metadata = note.value.metadata; + vectorData = pageContent; + chunks = chunkNote(pageContent); + break; + } + case "page": { + console.log("page hit"); + const page = await processPage(body.content); + if (isErr(page)) { + throw page.error; + } + pageContent = page.value.pageContent; + metadata = page.value.metadata; + vectorData = pageContent; + chunks = chunkPage(pageContent); + break; + } + + case "tweet": { + console.log("tweet hit"); + console.log(body.content.split("/").pop()); + const tweet = await getTweetData(body.content.split("/").pop()); + console.log(tweet); + const thread = await getThreadData( + body.content, + env.THREAD_CF_WORKER, + env.THREAD_CF_AUTH, + ); + + if (isErr(tweet)) { + throw tweet.error; + } + pageContent = tweetToMd(tweet.value); + console.log(pageContent); + metadata = { + baseUrl: body.content, + description: tweet.value.text.slice(0, 200), + image: tweet.value.user.profile_image_url_https, + title: `Tweet by ${tweet.value.user.name}`, + }; + if (isErr(thread)) { + console.log("Thread worker is down!"); + vectorData = JSON.stringify(pageContent); + console.error(thread.error); + } else { + vectorData = thread.value; + } + chunks = chunkThread(vectorData); + break; + } + } + + // see what's up with the storedToSpaces in this block + const { store } = await initQQuery(env); + + type body = z.infer; + + const Chunkbody: body = { + pageContent: pageContent, + spaces: storeToSpaces.map((spaceId) => spaceId.toString()), + user: body.user, + type: type, + url: metadata.baseUrl, + description: metadata.description, + title: metadata.description, + }; + const vectorResult = await wrap( + batchCreateChunksAndEmbeddings({ + store: store, + body: Chunkbody, + chunks: chunks, + env: env, + }), + vectorErrorFactory, + ); + + if (isErr(vectorResult)) { + throw vectorResult.error; + } + const saveToDbUrl = + (metadata.baseUrl.split("#supermemory-user-")[0] ?? metadata.baseUrl) + + "#supermemory-user-" + + body.user; + let contentId: number; + const db = database(env); + const insertResponse = await db + .insert(storedContent) + .values({ + content: pageContent as string, + title: metadata.title, + description: metadata.description, + url: saveToDbUrl, + baseUrl: saveToDbUrl, + image: metadata.image, + savedAt: new Date(), + userId: body.user, + type: type, + noteId: noteId, + }) + .returning({ id: storedContent.id }); + + if (!insertResponse[0]?.id) { + throw new D1InsertError( + "something went worng when inserting to database", + "inresertResponse", + ); + } + contentId = insertResponse[0]?.id; + if (storeToSpaces.length > 0) { + // Adding the many-to-many relationship between content and spaces + const spaceData = await db + .select() + .from(space) + .where(and(inArray(space.id, storeToSpaces), eq(space.user, body.user))) + .all(); + + await Promise.all( + spaceData.map(async (s) => { + await db + .insert(contentToSpace) + .values({ contentId: contentId, spaceId: s.id }); + + await db.update(space).set({ numItems: s.numItems + 1 }); + }), + ); + } + } +} + +/* +To do: +1. Abstract and shitft the entrie creatememory function to the queue consumer --> Hopefully done +2. Make the front end use that instead of whatever khichidi is going on right now +3. remove getMetada form the lib file as it's not being used anywhere else +4. Figure out the limit stuff ( server action for that seems fine because no use in limiting after they already in the queue rigth? ) +5. Figure out the initQuery stuff ( ;( ) --> This is a bad way of doing stuff :0 +6. How do I hande the content already exists wala use case? +7. Figure out retry and not add shit to the vectirze over and over again on failure +*/ diff --git a/apps/cf-ai-backend/src/queueConsumer/utils/get-metadata.ts b/apps/cf-ai-backend/src/queueConsumer/utils/get-metadata.ts new file mode 100644 index 00000000..95916506 --- /dev/null +++ b/apps/cf-ai-backend/src/queueConsumer/utils/get-metadata.ts @@ -0,0 +1,57 @@ +import * as cheerio from "cheerio"; +import { Result, Ok, Err } from "../../errors/results"; +import { BaseError } from "../../errors/baseError"; + +class GetMetadataError extends BaseError { + constructor(message?: string, source?: string) { + super("[Fetch Metadata Error]", message, source); + } +} +export type Metadata = { + title: string; + description: string; + image: string; + baseUrl: string; +}; +// TODO: THIS SHOULD PROBABLY ALSO FETCH THE OG-IMAGE +export async function getMetaData( + url: string, +): Promise> { + try { + const response = await fetch(url); + const html = await response.text(); + + const $ = cheerio.load(html); + + // Extract the base URL + const baseUrl = url; + + // Extract title + const title = $("title").text().trim(); + + const description = $("meta[name=description]").attr("content") ?? ""; + + const _favicon = + $("link[rel=icon]").attr("href") ?? "https://supermemory.dhr.wtf/web.svg"; + + let favicon = + _favicon.trim().length > 0 + ? _favicon.trim() + : "https://supermemory.dhr.wtf/web.svg"; + if (favicon.startsWith("/")) { + favicon = baseUrl + favicon; + } else if (favicon.startsWith("./")) { + favicon = baseUrl + favicon.slice(1); + } + + return Ok({ + title, + description, + image: favicon, + baseUrl, + }); + } catch (e) { + console.error("[Metadata Fetch Error]", e); + return Err(new GetMetadataError((e as Error).message, "getMetaData")); + } +} diff --git a/apps/cf-ai-backend/src/queueConsumer/utils/typeDecider.ts b/apps/cf-ai-backend/src/queueConsumer/utils/typeDecider.ts new file mode 100644 index 00000000..037ab40c --- /dev/null +++ b/apps/cf-ai-backend/src/queueConsumer/utils/typeDecider.ts @@ -0,0 +1,34 @@ +import { Result, Ok, Err } from "../../errors/results"; +import { BaseError } from "../../errors/baseError"; + +export type contentType = "page" | "tweet" | "note"; + +class GetTypeError extends BaseError { + constructor(message?: string, source?: string) { + super("[Decide Type Error]", message, source); + } +} +export const typeDecider = ( + content: string, +): Result => { + try { + // if the content is a URL, then it's a page. if its a URL with https://x.com/user/status/123, then it's a tweet. else, it's a note. + // do strict checking with regex + if ( + content.match(/https?:\/\/(x\.com|twitter\.com)\/[\w]+\/[\w]+\/[\d]+/) + ) { + return Ok("tweet"); + } else if ( + content.match( + /^(https?:\/\/)?(www\.)?[a-z0-9]+([-.]{1}[a-z0-9]+)*\.[a-z]{2,5}(\/.*)?$/i, + ) + ) { + return Ok("page"); + } else { + return Ok("note"); + } + } catch (e) { + console.error("[Decide Type Error]", e); + return Err(new GetTypeError((e as Error).message, "typeDecider")); + } +}; -- cgit v1.2.3 From 241276be588312aec4f9e09a23c7951379238da8 Mon Sep 17 00:00:00 2001 From: Kush Thaker Date: Wed, 31 Jul 2024 11:37:54 +0530 Subject: db schema in packages --- apps/cf-ai-backend/src/queueConsumer/index.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'apps/cf-ai-backend/src/queueConsumer') diff --git a/apps/cf-ai-backend/src/queueConsumer/index.ts b/apps/cf-ai-backend/src/queueConsumer/index.ts index 8657603d..8b9064cb 100644 --- a/apps/cf-ai-backend/src/queueConsumer/index.ts +++ b/apps/cf-ai-backend/src/queueConsumer/index.ts @@ -13,7 +13,7 @@ import { z } from "zod"; import { Metadata } from "./utils/get-metadata"; import { BaseError } from "../errors/baseError"; import { database } from "../db"; -import { storedContent, space, contentToSpace } from "../db/schema"; +import { storedContent, space, contentToSpace } from "@repo/db/schema"; import { and, eq, inArray, sql } from "drizzle-orm"; class VectorInsertError extends BaseError { -- cgit v1.2.3 From e4fd7f5aacc3c9f7f000e1858248d49aa4d3410e Mon Sep 17 00:00:00 2001 From: Kush Thaker Date: Mon, 5 Aug 2024 21:25:11 +0530 Subject: move limit to backend and thread service binding --- .../src/queueConsumer/helpers/initQuery.ts | 58 ---- .../src/queueConsumer/helpers/processPage.ts | 13 +- .../src/queueConsumer/helpers/processTweet.ts | 73 +++-- apps/cf-ai-backend/src/queueConsumer/index.ts | 336 ++++++++++++++++----- 4 files changed, 307 insertions(+), 173 deletions(-) delete mode 100644 apps/cf-ai-backend/src/queueConsumer/helpers/initQuery.ts (limited to 'apps/cf-ai-backend/src/queueConsumer') diff --git a/apps/cf-ai-backend/src/queueConsumer/helpers/initQuery.ts b/apps/cf-ai-backend/src/queueConsumer/helpers/initQuery.ts deleted file mode 100644 index a7d85c23..00000000 --- a/apps/cf-ai-backend/src/queueConsumer/helpers/initQuery.ts +++ /dev/null @@ -1,58 +0,0 @@ -import { Env } from "../../types"; -import { OpenAIEmbeddings } from "../../utils/OpenAIEmbedder"; -import { CloudflareVectorizeStore } from "@langchain/cloudflare"; -import { createOpenAI } from "@ai-sdk/openai"; -import { createGoogleGenerativeAI } from "@ai-sdk/google"; -import { createAnthropic } from "@ai-sdk/anthropic"; - -export async function initQQuery( - env: Env, - model: string = "gpt-4o", -) { - const embeddings = new OpenAIEmbeddings({ - apiKey: env.OPENAI_API_KEY, - modelName: "text-embedding-3-small", - }); - - const store = new CloudflareVectorizeStore(embeddings, { - index: env.VECTORIZE_INDEX, - }); - - let selectedModel: - | ReturnType> - | ReturnType> - | ReturnType>; - - switch (model) { - case "claude-3-opus": - const anthropic = createAnthropic({ - apiKey: env.ANTHROPIC_API_KEY, - baseURL: - "https://gateway.ai.cloudflare.com/v1/47c2b4d598af9d423c06fc9f936226d5/supermemory/anthropic", - }); - selectedModel = anthropic.chat("claude-3-opus-20240229"); - console.log("Selected model: ", selectedModel); - break; - case "gemini-1.5-pro": - const googleai = createGoogleGenerativeAI({ - apiKey: env.GOOGLE_AI_API_KEY, - baseURL: - "https://gateway.ai.cloudflare.com/v1/47c2b4d598af9d423c06fc9f936226d5/supermemory/google-vertex-ai", - }); - selectedModel = googleai.chat("models/gemini-1.5-pro-latest"); - console.log("Selected model: ", selectedModel); - break; - case "gpt-4o": - default: - const openai = createOpenAI({ - apiKey: env.OPENAI_API_KEY, - baseURL: - "https://gateway.ai.cloudflare.com/v1/47c2b4d598af9d423c06fc9f936226d5/supermemory/openai", - compatibility: "strict", - }); - selectedModel = openai.chat("gpt-4o-mini"); - break; - } - - return { store, model: selectedModel }; -} \ No newline at end of file diff --git a/apps/cf-ai-backend/src/queueConsumer/helpers/processPage.ts b/apps/cf-ai-backend/src/queueConsumer/helpers/processPage.ts index 6b28c975..f967736e 100644 --- a/apps/cf-ai-backend/src/queueConsumer/helpers/processPage.ts +++ b/apps/cf-ai-backend/src/queueConsumer/helpers/processPage.ts @@ -10,13 +10,14 @@ class ProcessPageError extends BaseError { type PageProcessResult = { pageContent: string; metadata: Metadata }; -export async function processPage( - url: string, -): Promise> { +export async function processPage(input: { + url: string; + securityKey: string; +}): Promise> { try { - const response = await fetch("https://md.dhr.wtf/?url=" + url, { + const response = await fetch("https://md.dhr.wtf/?url=" + input.url, { headers: { - Authorization: "Bearer " + process.env.BACKEND_SECURITY_KEY, + Authorization: "Bearer " + input.securityKey, }, }); const pageContent = await response.text(); @@ -29,7 +30,7 @@ export async function processPage( ); } console.log("[This is the page content]", pageContent); - const metadataResult = await getMetaData(url); + const metadataResult = await getMetaData(input.url); if (isErr(metadataResult)) { throw metadataResult.error; } diff --git a/apps/cf-ai-backend/src/queueConsumer/helpers/processTweet.ts b/apps/cf-ai-backend/src/queueConsumer/helpers/processTweet.ts index ef5d9f5b..8d83f2dc 100644 --- a/apps/cf-ai-backend/src/queueConsumer/helpers/processTweet.ts +++ b/apps/cf-ai-backend/src/queueConsumer/helpers/processTweet.ts @@ -3,6 +3,7 @@ import { Result, Ok, Err, isErr } from "../../errors/results"; import { BaseError } from "../../errors/baseError"; import { getMetaData, Metadata } from "../utils/get-metadata"; import { tweetToMd } from "@repo/shared-types/utils"; // can I do this? +import { Env } from "../../types"; class ProcessTweetError extends BaseError { constructor(message?: string, source?: string) { @@ -43,39 +44,45 @@ export const getTweetData = async ( } }; -export const getThreadData = async ( - tweetUrl: string, - cf_thread_endpoint: string, - authKey: string, -): Promise> => { - const threadRequest = await fetch(cf_thread_endpoint, { - method: "POST", - headers: { - "Content-Type": "application/json", - Authorization: authKey, - }, - body: JSON.stringify({ url: tweetUrl }), - }); - if (threadRequest.status !== 200) { - return Err( - new ProcessTweetError( - `Failed to fetch the thread: ${tweetUrl}, Reason: ${threadRequest.statusText}`, - "getThreadData", - ), - ); - } - - const thread = await threadRequest.text(); - console.log("[thread response]"); +export const getThreadData = async (input: { + tweetUrl: string; + env: Env; +}): Promise> => { + try { + // const threadRequest = await fetch(input.cf_thread_endpoint, { + // method: "POST", + // headers: { + // "Content-Type": "application/json", + // Authorization: input.authKey, + // }, + // body: JSON.stringify({ url: input.tweetUrl }), + // }); + // if (threadRequest.status !== 200) { + // console.log(await threadRequest.text()); + // console.log(input.tweetUrl); + // return Err( + // new ProcessTweetError( + // `Failed to fetch the thread: ${input.tweetUrl}, Reason: ${threadRequest.statusText}`, + // "getThreadData", + // ), + // ); + // } + //@ts-ignore + const thread = await input.env.THREAD.processTweets(input.tweetUrl); + console.log("[thread response]", thread); - if (thread.trim().length === 2) { - console.log("Thread is an empty array"); - return Err( - new ProcessTweetError( - "[THREAD FETCHING SERVICE] Got no content form thread worker", - "getThreadData", - ), - ); + if (!thread.length) { + console.log("Thread is an empty array"); + return Err( + new ProcessTweetError( + "[THREAD FETCHING SERVICE] Got no content form thread worker", + "getThreadData", + ), + ); + } + return Ok(thread); + } catch (e) { + console.error("[Thread Processing Error]", e); + return Err(new ProcessTweetError((e as Error).message, "getThreadData")); } - return Ok(thread); }; diff --git a/apps/cf-ai-backend/src/queueConsumer/index.ts b/apps/cf-ai-backend/src/queueConsumer/index.ts index 8b9064cb..8ca23739 100644 --- a/apps/cf-ai-backend/src/queueConsumer/index.ts +++ b/apps/cf-ai-backend/src/queueConsumer/index.ts @@ -5,15 +5,21 @@ import { processNote } from "./helpers/processNotes"; import { processPage } from "./helpers/processPage"; import { getThreadData, getTweetData } from "./helpers/processTweet"; import { tweetToMd } from "@repo/shared-types/utils"; -import { initQQuery } from "./helpers/initQuery"; import { chunkNote, chunkPage } from "./chunkers/chunkPageOrNotes"; import { chunkThread } from "./chunkers/chunkTweet"; -import { batchCreateChunksAndEmbeddings } from "../helper"; +import { batchCreateChunksAndEmbeddings, initQuery } from "../helper"; import { z } from "zod"; import { Metadata } from "./utils/get-metadata"; import { BaseError } from "../errors/baseError"; import { database } from "../db"; -import { storedContent, space, contentToSpace } from "@repo/db/schema"; +import { + storedContent, + space, + contentToSpace, + users, + jobs, + Job, +} from "@repo/db/schema"; import { and, eq, inArray, sql } from "drizzle-orm"; class VectorInsertError extends BaseError { @@ -29,24 +35,99 @@ class D1InsertError extends BaseError { } } +const d1ErrorFactory = (err: Error, source: string) => + new D1InsertError(err.message, source); + +const calculateExponentialBackoff = ( + attempts: number, + baseDelaySeconds: number, +) => { + return baseDelaySeconds ** attempts; +}; + +const BASE_DELAY_SECONDS = 1.5; export async function queue( - batch: MessageBatch<{ content: string; space: Array; user: string }>, + batch: MessageBatch<{ + content: string; + space: Array; + user: string; + type: string; + }>, env: Env, ): Promise { + const db = database(env); console.log(env.CF_ACCOUNT_ID, env.CF_KV_AUTH_TOKEN); for (let message of batch.messages) { console.log(env.CF_ACCOUNT_ID, env.CF_KV_AUTH_TOKEN); console.log("is thie even running?", message.body); const body = message.body; - console.log("v got shit in the queue", body); - const typeResult = typeDecider(body.content); + const type = body.type; + const userExists = await wrap( + db.select().from(users).where(eq(users.id, body.user)).limit(1), + d1ErrorFactory, + "Error when trying to verify user", + ); + + if (isErr(userExists)) { + throw userExists.error; + } + + //check if this is a retry job.. by checking if the combination of the userId and the url already exists on the queue + let jobId; + const existingJob = await wrap( + db + .select() + .from(jobs) + .where( + and( + eq(jobs.userId, userExists.value[0].id), + eq(jobs.url, body.content), + ), + ) + .limit(1), + d1ErrorFactory, + "Error when checking for existing job", + ); + + if (isErr(existingJob)) { + throw existingJob.error; + } - if (isErr(typeResult)) { - throw typeResult.error; + if (existingJob.value.length > 0) { + jobId = existingJob.value[0].id; + await wrap( + db + .update(jobs) + .set({ + attempts: existingJob.value[0].attempts + 1, + updatedAt: new Date(), + }) + .where(eq(jobs.id, jobId)), + d1ErrorFactory, + "Error when updating job attempts", + ); + } else { + const job = await wrap( + db + .insert(jobs) + .values({ + userId: userExists.value[0].id as string, + url: body.content, + status: "Processing", + attempts: 1, + createdAt: new Date(), + updatedAt: new Date(), + }) + .returning({ jobId: jobs.id }), + d1ErrorFactory, + "Error When inserting into jobs table", + ); + if (isErr(job)) { + throw job.error; + } + jobId = job.value[0].jobId; } - console.log(typeResult.value); - const type = typeResult.value; let pageContent: string; let vectorData: string; @@ -70,8 +151,12 @@ export async function queue( } case "page": { console.log("page hit"); - const page = await processPage(body.content); + const page = await processPage({ + url: body.content, + securityKey: env.MD_SEC_KEY, + }); if (isErr(page)) { + console.log("there is a page error here"); throw page.error; } pageContent = page.value.pageContent; @@ -83,15 +168,13 @@ export async function queue( case "tweet": { console.log("tweet hit"); - console.log(body.content.split("/").pop()); const tweet = await getTweetData(body.content.split("/").pop()); - console.log(tweet); - const thread = await getThreadData( - body.content, - env.THREAD_CF_WORKER, - env.THREAD_CF_AUTH, - ); - + console.log(env.THREAD_CF_WORKER, env.THREAD_CF_AUTH); + const thread = await getThreadData({ + tweetUrl: body.content, + env: env, + }); + console.log("[This is the thread]", thread); if (isErr(tweet)) { throw tweet.error; } @@ -108,6 +191,7 @@ export async function queue( vectorData = JSON.stringify(pageContent); console.error(thread.error); } else { + console.log("thread worker is fine"); vectorData = thread.value; } chunks = chunkThread(vectorData); @@ -115,8 +199,27 @@ export async function queue( } } + //add to mem0, abstract + + // const mem0Response = fetch('https://api.mem0.ai/v1/memories/', { + // method: 'POST', + // headers: { + // 'Content-Type': 'application/json', + // Authorization: `Token ${process.env.MEM0_API_KEY}`, + // }, + // body: JSON.stringify({ + // messages: [ + // { + // role: 'user', + // content: query, + // }, + // ], + // user_id: user?.user?.email, + // }), + // }); + // see what's up with the storedToSpaces in this block - const { store } = await initQQuery(env); + const { store } = await initQuery(env); type body = z.infer; @@ -129,66 +232,135 @@ export async function queue( description: metadata.description, title: metadata.description, }; - const vectorResult = await wrap( - batchCreateChunksAndEmbeddings({ - store: store, - body: Chunkbody, - chunks: chunks, - env: env, - }), - vectorErrorFactory, - ); - if (isErr(vectorResult)) { - throw vectorResult.error; - } - const saveToDbUrl = - (metadata.baseUrl.split("#supermemory-user-")[0] ?? metadata.baseUrl) + - "#supermemory-user-" + - body.user; - let contentId: number; - const db = database(env); - const insertResponse = await db - .insert(storedContent) - .values({ - content: pageContent as string, - title: metadata.title, - description: metadata.description, - url: saveToDbUrl, - baseUrl: saveToDbUrl, - image: metadata.image, - savedAt: new Date(), - userId: body.user, - type: type, - noteId: noteId, - }) - .returning({ id: storedContent.id }); - - if (!insertResponse[0]?.id) { - throw new D1InsertError( - "something went worng when inserting to database", - "inresertResponse", + try { + const vectorResult = await wrap( + batchCreateChunksAndEmbeddings({ + store: store, + body: Chunkbody, + chunks: chunks, + env: env, + }), + vectorErrorFactory, + "Error when Inserting into vector database", ); - } - contentId = insertResponse[0]?.id; - if (storeToSpaces.length > 0) { - // Adding the many-to-many relationship between content and spaces - const spaceData = await db - .select() - .from(space) - .where(and(inArray(space.id, storeToSpaces), eq(space.user, body.user))) - .all(); - await Promise.all( - spaceData.map(async (s) => { - await db - .insert(contentToSpace) - .values({ contentId: contentId, spaceId: s.id }); + if (isErr(vectorResult)) { + await db + .update(jobs) + .set({ error: vectorResult.error }) + .where(eq(jobs.id, jobId)); + message.retry({ + delaySeconds: calculateExponentialBackoff( + message.attempts, + BASE_DELAY_SECONDS, + ), + }); + throw vectorResult.error; + } - await db.update(space).set({ numItems: s.numItems + 1 }); - }), + const saveToDbUrl = + (metadata.baseUrl.split("#supermemory-user-")[0] ?? metadata.baseUrl) + + "#supermemory-user-" + + body.user; + let contentId: number; + + const insertResponse = await wrap( + db + .insert(storedContent) + .values({ + content: pageContent as string, + title: metadata.title, + description: metadata.description, + url: saveToDbUrl, + baseUrl: saveToDbUrl, + image: metadata.image, + savedAt: new Date(), + userId: body.user, + type: type, + noteId: noteId, + }) + .returning({ id: storedContent.id }), + d1ErrorFactory, + "Error when inserting into storedContent", + ); + + if (isErr(insertResponse)) { + await db + .update(jobs) + .set({ error: insertResponse.error }) + .where(eq(jobs.id, jobId)); + message.retry({ + delaySeconds: calculateExponentialBackoff( + message.attempts, + BASE_DELAY_SECONDS, + ), + }); + throw insertResponse.error; + } + console.log(JSON.stringify(insertResponse)); + contentId = insertResponse[0]?.id; + console.log("this is the content Id", contentId); + if (storeToSpaces.length > 0) { + // Adding the many-to-many relationship between content and spaces + const spaceData = await wrap( + db + .select() + .from(space) + .where( + and(inArray(space.id, storeToSpaces), eq(space.user, body.user)), + ) + .all(), + d1ErrorFactory, + "Error when getting data from spaces", + ); + + if (isErr(spaceData)) { + throw spaceData.error; + } + try { + await Promise.all( + spaceData.value.map(async (s) => { + try { + await db + .insert(contentToSpace) + .values({ contentId: contentId, spaceId: s.id }); + + await db.update(space).set({ numItems: s.numItems + 1 }); + } catch (e) { + console.error(`Error updating space ${s.id}:`, e); + throw e; + } + }), + ); + } catch (e) { + console.error("Error in updateSpacesWithContent:", e); + throw new Error(`Failed to update spaces: ${e.message}`); + } + } + } catch (e) { + console.error("Error in simulated transaction", e.message); + console.log("Rooling back changes"); + message.retry({ + delaySeconds: calculateExponentialBackoff( + message.attempts, + BASE_DELAY_SECONDS, + ), + }); + throw new D1InsertError( + "Error when inserting into d1", + "D1 stuff after the vectorize", ); } + + // After the d1 and vectories suceeds then finally update the jobs table to indicate that the job has completed + + await db + .update(jobs) + .set({ status: "Processed" }) + .where(eq(jobs.id, jobId)); + + return; } } @@ -199,6 +371,18 @@ To do: 3. remove getMetada form the lib file as it's not being used anywhere else 4. Figure out the limit stuff ( server action for that seems fine because no use in limiting after they already in the queue rigth? ) 5. Figure out the initQuery stuff ( ;( ) --> This is a bad way of doing stuff :0 -6. How do I hande the content already exists wala use case? -7. Figure out retry and not add shit to the vectirze over and over again on failure +6. How do I hande the content already exists wala use case? --> Also how do I figure out limits? + + + +8. Wrap the d1 thing in a transaction and then write to vectorize if d1 is sucessful if it's not then just error out ( if d1 fails dlq, recoverable failure --> retry ) + +Firt write to d1 in a transaction ( sotredContent + sapces ) --> write to vectorize --> vectorize failes --> reset d1 alternatively first we can also do the vectorise stuff if that suceeds then do the d1 stuff in a batch right? + + +DEBUG: +What's hapenning: +1. The stuff in the d1 is updating but nothing is hapenning in the vectorize for some reason + + */ -- cgit v1.2.3 From 1336da8aae05a0acdb3e03561fa7378f238d3eda Mon Sep 17 00:00:00 2001 From: Kush Thaker Date: Tue, 6 Aug 2024 20:48:13 +0530 Subject: Fix job errors not reflecting in D1; add delays on document insert in vectorize to help with open ai rate limits --- .../src/queueConsumer/helpers/processPage.ts | 2 +- apps/cf-ai-backend/src/queueConsumer/index.ts | 28 +++++----------------- 2 files changed, 7 insertions(+), 23 deletions(-) (limited to 'apps/cf-ai-backend/src/queueConsumer') diff --git a/apps/cf-ai-backend/src/queueConsumer/helpers/processPage.ts b/apps/cf-ai-backend/src/queueConsumer/helpers/processPage.ts index f967736e..9a50d701 100644 --- a/apps/cf-ai-backend/src/queueConsumer/helpers/processPage.ts +++ b/apps/cf-ai-backend/src/queueConsumer/helpers/processPage.ts @@ -29,12 +29,12 @@ export async function processPage(input: { ), ); } - console.log("[This is the page content]", pageContent); const metadataResult = await getMetaData(input.url); if (isErr(metadataResult)) { throw metadataResult.error; } const metadata = metadataResult.value; + console.log("[this is the metadata]", metadata); return Ok({ pageContent, metadata }); } catch (e) { console.error("[Page Processing Error]", e); diff --git a/apps/cf-ai-backend/src/queueConsumer/index.ts b/apps/cf-ai-backend/src/queueConsumer/index.ts index 8ca23739..393f1fbf 100644 --- a/apps/cf-ai-backend/src/queueConsumer/index.ts +++ b/apps/cf-ai-backend/src/queueConsumer/index.ts @@ -45,7 +45,7 @@ const calculateExponentialBackoff = ( return baseDelaySeconds ** attempts; }; -const BASE_DELAY_SECONDS = 1.5; +const BASE_DELAY_SECONDS = 5; export async function queue( batch: MessageBatch<{ content: string; @@ -102,6 +102,7 @@ export async function queue( .set({ attempts: existingJob.value[0].attempts + 1, updatedAt: new Date(), + status: "Processing", }) .where(eq(jobs.id, jobId)), d1ErrorFactory, @@ -248,7 +249,7 @@ export async function queue( if (isErr(vectorResult)) { await db .update(jobs) - .set({ error: vectorResult.error }) + .set({ error: vectorResult.error.message, status: "error" }) .where(eq(jobs.id, jobId)); message.retry({ delaySeconds: calculateExponentialBackoff( @@ -288,7 +289,7 @@ export async function queue( if (isErr(insertResponse)) { await db .update(jobs) - .set({ error: insertResponse.error }) + .set({ error: insertResponse.error.message, status: "error" }) .where(eq(jobs.id, jobId)); message.retry({ delaySeconds: calculateExponentialBackoff( @@ -340,7 +341,7 @@ export async function queue( } } catch (e) { console.error("Error in simulated transaction", e.message); - console.log("Rooling back changes"); + message.retry({ delaySeconds: calculateExponentialBackoff( message.attempts, @@ -366,23 +367,6 @@ export async function queue( /* To do: -1. Abstract and shitft the entrie creatememory function to the queue consumer --> Hopefully done -2. Make the front end use that instead of whatever khichidi is going on right now -3. remove getMetada form the lib file as it's not being used anywhere else -4. Figure out the limit stuff ( server action for that seems fine because no use in limiting after they already in the queue rigth? ) -5. Figure out the initQuery stuff ( ;( ) --> This is a bad way of doing stuff :0 -6. How do I hande the content already exists wala use case? --> Also how do I figure out limits? - - - -8. Wrap the d1 thing in a transaction and then write to vectorize if d1 is sucessful if it's not then just error out ( if d1 fails dlq, recoverable failure --> retry ) - -Firt write to d1 in a transaction ( sotredContent + sapces ) --> write to vectorize --> vectorize failes --> reset d1 alternatively first we can also do the vectorise stuff if that suceeds then do the d1 stuff in a batch right? - - -DEBUG: -What's hapenning: -1. The stuff in the d1 is updating but nothing is hapenning in the vectorize for some reason - +Figure out rate limits!! */ -- cgit v1.2.3