aboutsummaryrefslogtreecommitdiff
path: root/apps/cf-ai-backend/src/queueConsumer
diff options
context:
space:
mode:
Diffstat (limited to 'apps/cf-ai-backend/src/queueConsumer')
-rw-r--r--apps/cf-ai-backend/src/queueConsumer/chunkers/chonker.ts47
-rw-r--r--apps/cf-ai-backend/src/queueConsumer/chunkers/chunkPageOrNotes.ts13
-rw-r--r--apps/cf-ai-backend/src/queueConsumer/chunkers/chunkTweet.ts18
-rw-r--r--apps/cf-ai-backend/src/queueConsumer/helpers/processNotes.ts36
-rw-r--r--apps/cf-ai-backend/src/queueConsumer/helpers/processPage.ts43
-rw-r--r--apps/cf-ai-backend/src/queueConsumer/helpers/processTweet.ts88
-rw-r--r--apps/cf-ai-backend/src/queueConsumer/index.ts372
-rw-r--r--apps/cf-ai-backend/src/queueConsumer/utils/get-metadata.ts57
-rw-r--r--apps/cf-ai-backend/src/queueConsumer/utils/typeDecider.ts34
9 files changed, 703 insertions, 5 deletions
diff --git a/apps/cf-ai-backend/src/queueConsumer/chunkers/chonker.ts b/apps/cf-ai-backend/src/queueConsumer/chunkers/chonker.ts
new file mode 100644
index 00000000..18788dab
--- /dev/null
+++ b/apps/cf-ai-backend/src/queueConsumer/chunkers/chonker.ts
@@ -0,0 +1,47 @@
+import nlp from "compromise";
+
+/**
+ * Split text into chunks of specified max size with some overlap for continuity.
+ */
+export default function chunkText(
+ text: string,
+ maxChunkSize: number,
+ overlap: number = 0.2,
+): string[] {
+ const sentences = nlp(text).sentences().out("array");
+ const chunks = [];
+ let currentChunk: string[] = [];
+ let currentSize = 0;
+
+ for (let i = 0; i < sentences.length; i++) {
+ const sentence = sentences[i];
+ currentChunk.push(sentence);
+ currentSize += sentence.length;
+
+ if (currentSize >= maxChunkSize) {
+ // Calculate overlap
+ const overlapSize = Math.floor(currentChunk.length * overlap);
+ const chunkText = currentChunk.join(" ");
+ chunks.push({
+ text: chunkText,
+ start: i - currentChunk.length + 1,
+ end: i,
+ });
+
+ // Prepare the next chunk with overlap
+ currentChunk = currentChunk.slice(-overlapSize);
+ currentSize = currentChunk.reduce((sum, s) => sum + s.length, 0);
+ }
+ }
+
+ if (currentChunk.length > 0) {
+ const chunkText = currentChunk.join(" ");
+ chunks.push({
+ text: chunkText,
+ start: sentences.length - currentChunk.length,
+ end: sentences.length,
+ });
+ }
+
+ return chunks.map((chunk) => chunk.text);
+}
diff --git a/apps/cf-ai-backend/src/queueConsumer/chunkers/chunkPageOrNotes.ts b/apps/cf-ai-backend/src/queueConsumer/chunkers/chunkPageOrNotes.ts
new file mode 100644
index 00000000..0da01c3f
--- /dev/null
+++ b/apps/cf-ai-backend/src/queueConsumer/chunkers/chunkPageOrNotes.ts
@@ -0,0 +1,13 @@
+import chunkText from "./chonker";
+import { PageOrNoteChunks } from "../../types";
+export function chunkPage(pageContent: string): PageOrNoteChunks {
+ const chunks = chunkText(pageContent, 1536);
+
+ return { type: "page", chunks: chunks };
+}
+
+export function chunkNote(noteContent: string): PageOrNoteChunks {
+ const chunks = chunkText(noteContent, 1536);
+
+ return { type: "note", chunks: chunks };
+}
diff --git a/apps/cf-ai-backend/src/queueConsumer/chunkers/chunkTweet.ts b/apps/cf-ai-backend/src/queueConsumer/chunkers/chunkTweet.ts
index ae1b18c6..46a56410 100644
--- a/apps/cf-ai-backend/src/queueConsumer/chunkers/chunkTweet.ts
+++ b/apps/cf-ai-backend/src/queueConsumer/chunkers/chunkTweet.ts
@@ -22,10 +22,18 @@ export interface ThreadTweetData {
}
export function chunkThread(threadText: string): TweetChunks {
- const thread = JSON.parse(threadText);
- if (typeof thread == "string") {
- console.log("DA WORKER FAILED DO SOMEHTING FIX DA WROKER");
+ let thread = threadText;
+
+ try {
+ thread = JSON.parse(threadText);
+ } catch (e) {
+ console.log("error: thread is not json.", e);
+ }
+
+ if (typeof threadText == "string") {
+ console.log("DA WORKER FAILED DO SOMEHTING FIX DA WROKER", thread);
const rawTweet = getRawTweet(thread);
+ console.log(rawTweet);
const parsedTweet: any = JSON.parse(rawTweet);
const chunkedTweet = chunkText(parsedTweet.text, 1536);
@@ -48,8 +56,8 @@ export function chunkThread(threadText: string): TweetChunks {
return { type: "tweet", chunks };
} else {
- console.log(JSON.stringify(thread));
- const chunkedTweets = thread.map((tweet: Tweet) => {
+ console.log("thread in else statement", JSON.stringify(thread));
+ const chunkedTweets = (thread as any).map((tweet: Tweet) => {
const chunkedTweet = chunkText(tweet.text, 1536);
const metadata = {
diff --git a/apps/cf-ai-backend/src/queueConsumer/helpers/processNotes.ts b/apps/cf-ai-backend/src/queueConsumer/helpers/processNotes.ts
new file mode 100644
index 00000000..466690cc
--- /dev/null
+++ b/apps/cf-ai-backend/src/queueConsumer/helpers/processNotes.ts
@@ -0,0 +1,36 @@
+import { Result, Ok, Err } from "../../errors/results";
+import { BaseError } from "../../errors/baseError";
+import { Metadata } from "../utils/get-metadata";
+
+class ProcessNotesError extends BaseError {
+ constructor(message?: string, source?: string) {
+ super("[Note Processing Error]", message, source);
+ }
+}
+
+type ProcessNoteResult = {
+ noteContent: { noteId: number; noteContent: string };
+ metadata: Metadata;
+};
+
+export function processNote(
+ content: string,
+): Result<ProcessNoteResult, ProcessNotesError> {
+ try {
+ const pageContent = content;
+ const noteId = new Date().getTime();
+
+ const metadata = {
+ baseUrl: `https://supermemory.ai/note/${noteId}`,
+ description: `Note created at ${new Date().toLocaleString()}`,
+ image: "https://supermemory.ai/logo.png",
+ title: `${pageContent.slice(0, 20)} ${pageContent.length > 20 ? "..." : ""}`,
+ };
+
+ const noteContent = { noteId: noteId, noteContent: pageContent };
+ return Ok({ noteContent, metadata });
+ } catch (e) {
+ console.error("[Note Processing Error]", e);
+ return Err(new ProcessNotesError((e as Error).message, "processNote"));
+ }
+}
diff --git a/apps/cf-ai-backend/src/queueConsumer/helpers/processPage.ts b/apps/cf-ai-backend/src/queueConsumer/helpers/processPage.ts
new file mode 100644
index 00000000..9a50d701
--- /dev/null
+++ b/apps/cf-ai-backend/src/queueConsumer/helpers/processPage.ts
@@ -0,0 +1,43 @@
+import { Result, Ok, Err, isErr } from "../../errors/results";
+import { BaseError } from "../../errors/baseError";
+import { getMetaData, Metadata } from "../utils/get-metadata";
+
+class ProcessPageError extends BaseError {
+ constructor(message?: string, source?: string) {
+ super("[Page Proceessing Error]", message, source);
+ }
+}
+
+type PageProcessResult = { pageContent: string; metadata: Metadata };
+
+export async function processPage(input: {
+ url: string;
+ securityKey: string;
+}): Promise<Result<PageProcessResult, ProcessPageError>> {
+ try {
+ const response = await fetch("https://md.dhr.wtf/?url=" + input.url, {
+ headers: {
+ Authorization: "Bearer " + input.securityKey,
+ },
+ });
+ const pageContent = await response.text();
+ if (!pageContent) {
+ return Err(
+ new ProcessPageError(
+ "Failed to get response form markdowner",
+ "processPage",
+ ),
+ );
+ }
+ const metadataResult = await getMetaData(input.url);
+ if (isErr(metadataResult)) {
+ throw metadataResult.error;
+ }
+ const metadata = metadataResult.value;
+ console.log("[this is the metadata]", metadata);
+ return Ok({ pageContent, metadata });
+ } catch (e) {
+ console.error("[Page Processing Error]", e);
+ return Err(new ProcessPageError((e as Error).message, "processPage"));
+ }
+}
diff --git a/apps/cf-ai-backend/src/queueConsumer/helpers/processTweet.ts b/apps/cf-ai-backend/src/queueConsumer/helpers/processTweet.ts
new file mode 100644
index 00000000..8d83f2dc
--- /dev/null
+++ b/apps/cf-ai-backend/src/queueConsumer/helpers/processTweet.ts
@@ -0,0 +1,88 @@
+import { Tweet } from "react-tweet/api";
+import { Result, Ok, Err, isErr } from "../../errors/results";
+import { BaseError } from "../../errors/baseError";
+import { getMetaData, Metadata } from "../utils/get-metadata";
+import { tweetToMd } from "@repo/shared-types/utils"; // can I do this?
+import { Env } from "../../types";
+
+class ProcessTweetError extends BaseError {
+ constructor(message?: string, source?: string) {
+ super("[Tweet Proceessing Error]", message, source);
+ }
+}
+
+type GetTweetResult = Tweet;
+
+export const getTweetData = async (
+ tweetID: string,
+): Promise<Result<GetTweetResult, ProcessTweetError>> => {
+ try {
+ console.log("is fetch defined here?");
+ const url = `https://cdn.syndication.twimg.com/tweet-result?id=${tweetID}&lang=en&features=tfw_timeline_list%3A%3Btfw_follower_count_sunset%3Atrue%3Btfw_tweet_edit_backend%3Aon%3Btfw_refsrc_session%3Aon%3Btfw_fosnr_soft_interventions_enabled%3Aon%3Btfw_show_birdwatch_pivots_enabled%3Aon%3Btfw_show_business_verified_badge%3Aon%3Btfw_duplicate_scribes_to_settings%3Aon%3Btfw_use_profile_image_shape_enabled%3Aon%3Btfw_show_blue_verified_badge%3Aon%3Btfw_legacy_timeline_sunset%3Atrue%3Btfw_show_gov_verified_badge%3Aon%3Btfw_show_business_affiliate_badge%3Aon%3Btfw_tweet_edit_frontend%3Aon&token=4c2mmul6mnh`;
+
+ const resp = await fetch(url, {
+ headers: {
+ "User-Agent":
+ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3",
+ Accept: "application/json",
+ "Accept-Language": "en-US,en;q=0.5",
+ "Accept-Encoding": "gzip, deflate, br",
+ Connection: "keep-alive",
+ "Upgrade-Insecure-Requests": "1",
+ "Cache-Control": "max-age=0",
+ TE: "Trailers",
+ },
+ });
+ console.log(resp.status);
+
+ const data = (await resp.json()) as Tweet;
+
+ return Ok(data);
+ } catch (e) {
+ console.error("[Tweet Proceessing Error]", e);
+ return Err(new ProcessTweetError(e, "getTweetData"));
+ }
+};
+
+export const getThreadData = async (input: {
+ tweetUrl: string;
+ env: Env;
+}): Promise<Result<any, ProcessTweetError>> => {
+ try {
+ // const threadRequest = await fetch(input.cf_thread_endpoint, {
+ // method: "POST",
+ // headers: {
+ // "Content-Type": "application/json",
+ // Authorization: input.authKey,
+ // },
+ // body: JSON.stringify({ url: input.tweetUrl }),
+ // });
+ // if (threadRequest.status !== 200) {
+ // console.log(await threadRequest.text());
+ // console.log(input.tweetUrl);
+ // return Err(
+ // new ProcessTweetError(
+ // `Failed to fetch the thread: ${input.tweetUrl}, Reason: ${threadRequest.statusText}`,
+ // "getThreadData",
+ // ),
+ // );
+ // }
+ //@ts-ignore
+ const thread = await input.env.THREAD.processTweets(input.tweetUrl);
+ console.log("[thread response]", thread);
+
+ if (!thread.length) {
+ console.log("Thread is an empty array");
+ return Err(
+ new ProcessTweetError(
+ "[THREAD FETCHING SERVICE] Got no content form thread worker",
+ "getThreadData",
+ ),
+ );
+ }
+ return Ok(thread);
+ } catch (e) {
+ console.error("[Thread Processing Error]", e);
+ return Err(new ProcessTweetError((e as Error).message, "getThreadData"));
+ }
+};
diff --git a/apps/cf-ai-backend/src/queueConsumer/index.ts b/apps/cf-ai-backend/src/queueConsumer/index.ts
new file mode 100644
index 00000000..393f1fbf
--- /dev/null
+++ b/apps/cf-ai-backend/src/queueConsumer/index.ts
@@ -0,0 +1,372 @@
+import { Env, PageOrNoteChunks, TweetChunks, vectorObj } from "../types";
+import { typeDecider } from "./utils/typeDecider";
+import { isErr, wrap } from "../errors/results";
+import { processNote } from "./helpers/processNotes";
+import { processPage } from "./helpers/processPage";
+import { getThreadData, getTweetData } from "./helpers/processTweet";
+import { tweetToMd } from "@repo/shared-types/utils";
+import { chunkNote, chunkPage } from "./chunkers/chunkPageOrNotes";
+import { chunkThread } from "./chunkers/chunkTweet";
+import { batchCreateChunksAndEmbeddings, initQuery } from "../helper";
+import { z } from "zod";
+import { Metadata } from "./utils/get-metadata";
+import { BaseError } from "../errors/baseError";
+import { database } from "../db";
+import {
+ storedContent,
+ space,
+ contentToSpace,
+ users,
+ jobs,
+ Job,
+} from "@repo/db/schema";
+import { and, eq, inArray, sql } from "drizzle-orm";
+
+class VectorInsertError extends BaseError {
+ constructor(message?: string, source?: string) {
+ super("[Vector Insert Error]", message, source);
+ }
+}
+const vectorErrorFactory = (err: Error) => new VectorInsertError(err.message);
+
+class D1InsertError extends BaseError {
+ constructor(message?: string, source?: string) {
+ super("[D1 Insert Error]", message, source);
+ }
+}
+
+const d1ErrorFactory = (err: Error, source: string) =>
+ new D1InsertError(err.message, source);
+
+const calculateExponentialBackoff = (
+ attempts: number,
+ baseDelaySeconds: number,
+) => {
+ return baseDelaySeconds ** attempts;
+};
+
+const BASE_DELAY_SECONDS = 5;
+export async function queue(
+ batch: MessageBatch<{
+ content: string;
+ space: Array<number>;
+ user: string;
+ type: string;
+ }>,
+ env: Env,
+): Promise<void> {
+ const db = database(env);
+ console.log(env.CF_ACCOUNT_ID, env.CF_KV_AUTH_TOKEN);
+ for (let message of batch.messages) {
+ console.log(env.CF_ACCOUNT_ID, env.CF_KV_AUTH_TOKEN);
+ console.log("is thie even running?", message.body);
+ const body = message.body;
+
+ const type = body.type;
+ const userExists = await wrap(
+ db.select().from(users).where(eq(users.id, body.user)).limit(1),
+ d1ErrorFactory,
+ "Error when trying to verify user",
+ );
+
+ if (isErr(userExists)) {
+ throw userExists.error;
+ }
+
+ //check if this is a retry job.. by checking if the combination of the userId and the url already exists on the queue
+ let jobId;
+ const existingJob = await wrap(
+ db
+ .select()
+ .from(jobs)
+ .where(
+ and(
+ eq(jobs.userId, userExists.value[0].id),
+ eq(jobs.url, body.content),
+ ),
+ )
+ .limit(1),
+ d1ErrorFactory,
+ "Error when checking for existing job",
+ );
+
+ if (isErr(existingJob)) {
+ throw existingJob.error;
+ }
+
+ if (existingJob.value.length > 0) {
+ jobId = existingJob.value[0].id;
+ await wrap(
+ db
+ .update(jobs)
+ .set({
+ attempts: existingJob.value[0].attempts + 1,
+ updatedAt: new Date(),
+ status: "Processing",
+ })
+ .where(eq(jobs.id, jobId)),
+ d1ErrorFactory,
+ "Error when updating job attempts",
+ );
+ } else {
+ const job = await wrap(
+ db
+ .insert(jobs)
+ .values({
+ userId: userExists.value[0].id as string,
+ url: body.content,
+ status: "Processing",
+ attempts: 1,
+ createdAt: new Date(),
+ updatedAt: new Date(),
+ })
+ .returning({ jobId: jobs.id }),
+ d1ErrorFactory,
+ "Error When inserting into jobs table",
+ );
+ if (isErr(job)) {
+ throw job.error;
+ }
+ jobId = job.value[0].jobId;
+ }
+
+ let pageContent: string;
+ let vectorData: string;
+ let metadata: Metadata;
+ let storeToSpaces = body.space;
+ let chunks: TweetChunks | PageOrNoteChunks;
+ let noteId = 0;
+ switch (type) {
+ case "note": {
+ console.log("note hit");
+ const note = processNote(body.content);
+ if (isErr(note)) {
+ throw note.error;
+ }
+ pageContent = note.value.noteContent.noteContent;
+ noteId = note.value.noteContent.noteId;
+ metadata = note.value.metadata;
+ vectorData = pageContent;
+ chunks = chunkNote(pageContent);
+ break;
+ }
+ case "page": {
+ console.log("page hit");
+ const page = await processPage({
+ url: body.content,
+ securityKey: env.MD_SEC_KEY,
+ });
+ if (isErr(page)) {
+ console.log("there is a page error here");
+ throw page.error;
+ }
+ pageContent = page.value.pageContent;
+ metadata = page.value.metadata;
+ vectorData = pageContent;
+ chunks = chunkPage(pageContent);
+ break;
+ }
+
+ case "tweet": {
+ console.log("tweet hit");
+ const tweet = await getTweetData(body.content.split("/").pop());
+ console.log(env.THREAD_CF_WORKER, env.THREAD_CF_AUTH);
+ const thread = await getThreadData({
+ tweetUrl: body.content,
+ env: env,
+ });
+ console.log("[This is the thread]", thread);
+ if (isErr(tweet)) {
+ throw tweet.error;
+ }
+ pageContent = tweetToMd(tweet.value);
+ console.log(pageContent);
+ metadata = {
+ baseUrl: body.content,
+ description: tweet.value.text.slice(0, 200),
+ image: tweet.value.user.profile_image_url_https,
+ title: `Tweet by ${tweet.value.user.name}`,
+ };
+ if (isErr(thread)) {
+ console.log("Thread worker is down!");
+ vectorData = JSON.stringify(pageContent);
+ console.error(thread.error);
+ } else {
+ console.log("thread worker is fine");
+ vectorData = thread.value;
+ }
+ chunks = chunkThread(vectorData);
+ break;
+ }
+ }
+
+ //add to mem0, abstract
+
+ // const mem0Response = fetch('https://api.mem0.ai/v1/memories/', {
+ // method: 'POST',
+ // headers: {
+ // 'Content-Type': 'application/json',
+ // Authorization: `Token ${process.env.MEM0_API_KEY}`,
+ // },
+ // body: JSON.stringify({
+ // messages: [
+ // {
+ // role: 'user',
+ // content: query,
+ // },
+ // ],
+ // user_id: user?.user?.email,
+ // }),
+ // });
+
+ // see what's up with the storedToSpaces in this block
+ const { store } = await initQuery(env);
+
+ type body = z.infer<typeof vectorObj>;
+
+ const Chunkbody: body = {
+ pageContent: pageContent,
+ spaces: storeToSpaces.map((spaceId) => spaceId.toString()),
+ user: body.user,
+ type: type,
+ url: metadata.baseUrl,
+ description: metadata.description,
+ title: metadata.description,
+ };
+
+ try {
+ const vectorResult = await wrap(
+ batchCreateChunksAndEmbeddings({
+ store: store,
+ body: Chunkbody,
+ chunks: chunks,
+ env: env,
+ }),
+ vectorErrorFactory,
+ "Error when Inserting into vector database",
+ );
+
+ if (isErr(vectorResult)) {
+ await db
+ .update(jobs)
+ .set({ error: vectorResult.error.message, status: "error" })
+ .where(eq(jobs.id, jobId));
+ message.retry({
+ delaySeconds: calculateExponentialBackoff(
+ message.attempts,
+ BASE_DELAY_SECONDS,
+ ),
+ });
+ throw vectorResult.error;
+ }
+
+ const saveToDbUrl =
+ (metadata.baseUrl.split("#supermemory-user-")[0] ?? metadata.baseUrl) +
+ "#supermemory-user-" +
+ body.user;
+ let contentId: number;
+
+ const insertResponse = await wrap(
+ db
+ .insert(storedContent)
+ .values({
+ content: pageContent as string,
+ title: metadata.title,
+ description: metadata.description,
+ url: saveToDbUrl,
+ baseUrl: saveToDbUrl,
+ image: metadata.image,
+ savedAt: new Date(),
+ userId: body.user,
+ type: type,
+ noteId: noteId,
+ })
+ .returning({ id: storedContent.id }),
+ d1ErrorFactory,
+ "Error when inserting into storedContent",
+ );
+
+ if (isErr(insertResponse)) {
+ await db
+ .update(jobs)
+ .set({ error: insertResponse.error.message, status: "error" })
+ .where(eq(jobs.id, jobId));
+ message.retry({
+ delaySeconds: calculateExponentialBackoff(
+ message.attempts,
+ BASE_DELAY_SECONDS,
+ ),
+ });
+ throw insertResponse.error;
+ }
+ console.log(JSON.stringify(insertResponse));
+ contentId = insertResponse[0]?.id;
+ console.log("this is the content Id", contentId);
+ if (storeToSpaces.length > 0) {
+ // Adding the many-to-many relationship between content and spaces
+ const spaceData = await wrap(
+ db
+ .select()
+ .from(space)
+ .where(
+ and(inArray(space.id, storeToSpaces), eq(space.user, body.user)),
+ )
+ .all(),
+ d1ErrorFactory,
+ "Error when getting data from spaces",
+ );
+
+ if (isErr(spaceData)) {
+ throw spaceData.error;
+ }
+ try {
+ await Promise.all(
+ spaceData.value.map(async (s) => {
+ try {
+ await db
+ .insert(contentToSpace)
+ .values({ contentId: contentId, spaceId: s.id });
+
+ await db.update(space).set({ numItems: s.numItems + 1 });
+ } catch (e) {
+ console.error(`Error updating space ${s.id}:`, e);
+ throw e;
+ }
+ }),
+ );
+ } catch (e) {
+ console.error("Error in updateSpacesWithContent:", e);
+ throw new Error(`Failed to update spaces: ${e.message}`);
+ }
+ }
+ } catch (e) {
+ console.error("Error in simulated transaction", e.message);
+
+ message.retry({
+ delaySeconds: calculateExponentialBackoff(
+ message.attempts,
+ BASE_DELAY_SECONDS,
+ ),
+ });
+ throw new D1InsertError(
+ "Error when inserting into d1",
+ "D1 stuff after the vectorize",
+ );
+ }
+
+ // After the d1 and vectories suceeds then finally update the jobs table to indicate that the job has completed
+
+ await db
+ .update(jobs)
+ .set({ status: "Processed" })
+ .where(eq(jobs.id, jobId));
+
+ return;
+ }
+}
+
+/*
+To do:
+Figure out rate limits!!
+
+*/
diff --git a/apps/cf-ai-backend/src/queueConsumer/utils/get-metadata.ts b/apps/cf-ai-backend/src/queueConsumer/utils/get-metadata.ts
new file mode 100644
index 00000000..95916506
--- /dev/null
+++ b/apps/cf-ai-backend/src/queueConsumer/utils/get-metadata.ts
@@ -0,0 +1,57 @@
+import * as cheerio from "cheerio";
+import { Result, Ok, Err } from "../../errors/results";
+import { BaseError } from "../../errors/baseError";
+
+class GetMetadataError extends BaseError {
+ constructor(message?: string, source?: string) {
+ super("[Fetch Metadata Error]", message, source);
+ }
+}
+export type Metadata = {
+ title: string;
+ description: string;
+ image: string;
+ baseUrl: string;
+};
+// TODO: THIS SHOULD PROBABLY ALSO FETCH THE OG-IMAGE
+export async function getMetaData(
+ url: string,
+): Promise<Result<Metadata, GetMetadataError>> {
+ try {
+ const response = await fetch(url);
+ const html = await response.text();
+
+ const $ = cheerio.load(html);
+
+ // Extract the base URL
+ const baseUrl = url;
+
+ // Extract title
+ const title = $("title").text().trim();
+
+ const description = $("meta[name=description]").attr("content") ?? "";
+
+ const _favicon =
+ $("link[rel=icon]").attr("href") ?? "https://supermemory.dhr.wtf/web.svg";
+
+ let favicon =
+ _favicon.trim().length > 0
+ ? _favicon.trim()
+ : "https://supermemory.dhr.wtf/web.svg";
+ if (favicon.startsWith("/")) {
+ favicon = baseUrl + favicon;
+ } else if (favicon.startsWith("./")) {
+ favicon = baseUrl + favicon.slice(1);
+ }
+
+ return Ok({
+ title,
+ description,
+ image: favicon,
+ baseUrl,
+ });
+ } catch (e) {
+ console.error("[Metadata Fetch Error]", e);
+ return Err(new GetMetadataError((e as Error).message, "getMetaData"));
+ }
+}
diff --git a/apps/cf-ai-backend/src/queueConsumer/utils/typeDecider.ts b/apps/cf-ai-backend/src/queueConsumer/utils/typeDecider.ts
new file mode 100644
index 00000000..037ab40c
--- /dev/null
+++ b/apps/cf-ai-backend/src/queueConsumer/utils/typeDecider.ts
@@ -0,0 +1,34 @@
+import { Result, Ok, Err } from "../../errors/results";
+import { BaseError } from "../../errors/baseError";
+
+export type contentType = "page" | "tweet" | "note";
+
+class GetTypeError extends BaseError {
+ constructor(message?: string, source?: string) {
+ super("[Decide Type Error]", message, source);
+ }
+}
+export const typeDecider = (
+ content: string,
+): Result<contentType, GetTypeError> => {
+ try {
+ // if the content is a URL, then it's a page. if its a URL with https://x.com/user/status/123, then it's a tweet. else, it's a note.
+ // do strict checking with regex
+ if (
+ content.match(/https?:\/\/(x\.com|twitter\.com)\/[\w]+\/[\w]+\/[\d]+/)
+ ) {
+ return Ok("tweet");
+ } else if (
+ content.match(
+ /^(https?:\/\/)?(www\.)?[a-z0-9]+([-.]{1}[a-z0-9]+)*\.[a-z]{2,5}(\/.*)?$/i,
+ )
+ ) {
+ return Ok("page");
+ } else {
+ return Ok("note");
+ }
+ } catch (e) {
+ console.error("[Decide Type Error]", e);
+ return Err(new GetTypeError((e as Error).message, "typeDecider"));
+ }
+};