diff options
| author | Mahesh Sanikommmu <[email protected]> | 2025-08-16 18:50:10 -0700 |
|---|---|---|
| committer | Mahesh Sanikommmu <[email protected]> | 2025-08-16 18:50:10 -0700 |
| commit | 39003aff23d64ff1d96074d71521f6023c9bec01 (patch) | |
| tree | 3f870c04b3dce315bba1b21aa2da158494e71774 /apps/backend/src/workflow | |
| parent | Merge pull request #355 from supermemoryai/archive (diff) | |
| download | supermemory-39003aff23d64ff1d96074d71521f6023c9bec01.tar.xz supermemory-39003aff23d64ff1d96074d71521f6023c9bec01.zip | |
New Version of Supermemory Consumer App
Diffstat (limited to 'apps/backend/src/workflow')
| -rw-r--r-- | apps/backend/src/workflow/index.ts | 217 |
1 files changed, 0 insertions, 217 deletions
diff --git a/apps/backend/src/workflow/index.ts b/apps/backend/src/workflow/index.ts deleted file mode 100644 index 8efcfacc..00000000 --- a/apps/backend/src/workflow/index.ts +++ /dev/null @@ -1,217 +0,0 @@ -import { - WorkflowEntrypoint, - WorkflowStep, - WorkflowEvent, -} from "cloudflare:workers"; -import { Env, WorkflowParams } from "../types"; -import { fetchContent } from "../utils/fetchers"; -import chunkText from "../utils/chunkers"; -import { database, eq, inArray } from "@supermemory/db"; -import { - ChunkInsert, - contentToSpace, - documents, - spaces, -} from "@supermemory/db/schema"; -import { embedMany } from "ai"; -import { openai } from "../providers"; -import { chunk } from "@supermemory/db/schema"; -import { NonRetryableError } from "cloudflare:workflows"; - -// TODO: handle errors properly here. - -export class ContentWorkflow extends WorkflowEntrypoint<Env, WorkflowParams> { - async run(event: WorkflowEvent<WorkflowParams>, step: WorkflowStep) { - // Step 0: Check if user has reached memory limit - await step.do("check memory limit", async () => { - const existingMemories = await database( - this.env.HYPERDRIVE.connectionString - ) - .select() - .from(documents) - .where(eq(documents.userId, event.payload.userId)); - - if (existingMemories.length >= 2000) { - await database(this.env.HYPERDRIVE.connectionString) - .delete(documents) - .where(eq(documents.uuid, event.payload.uuid)); - throw new NonRetryableError( - "You have reached the maximum limit of 2000 memories" - ); - } - }); - - // Step 1: Get and format the content. - const rawContent = - event.payload.prefetched ?? - (await step.do( - "fetch content", - async () => await fetchContent(event.payload, this.env, step) - )); - - // check that the rawcontent is not too big - if (rawContent.contentToVectorize.length > 100000) { - await database(this.env.HYPERDRIVE.connectionString) - .delete(documents) - .where(eq(documents.uuid, event.payload.uuid)); - throw new NonRetryableError("The content is too big (maximum 20 pages)"); - } - - const chunked = await step.do("chunk content", async () => - chunkText(rawContent.contentToVectorize, 768) - ); - - // Step 2: Create the document in the database. - const document = await step.do("create document", async () => { - try { - // First check if document exists - const existingDoc = await database(this.env.HYPERDRIVE.connectionString) - .select() - .from(documents) - .where(eq(documents.uuid, event.payload.uuid)) - .limit(1); - - return await database(this.env.HYPERDRIVE.connectionString) - .insert(documents) - .values({ - userId: event.payload.userId, - type: event.payload.type, - uuid: event.payload.uuid, - ...(event.payload.url && { url: event.payload.url }), - title: rawContent.title, - content: rawContent.contentToSave, - description: - "description" in rawContent - ? (rawContent.description ?? "") - : (event.payload.prefetched?.description ?? undefined), - ogImage: - "image" in rawContent - ? (rawContent.image ?? "") - : (event.payload.prefetched?.ogImage ?? undefined), - raw: rawContent.contentToVectorize, - isSuccessfullyProcessed: false, - updatedAt: new Date(), - ...(event.payload.createdAt && { - createdAt: new Date(event.payload.createdAt), - }), - }) - .onConflictDoUpdate({ - target: documents.uuid, - set: { - title: rawContent.title, - content: rawContent.contentToSave, - description: - "description" in rawContent - ? (rawContent.description ?? "") - : (event.payload.prefetched?.description ?? undefined), - ogImage: - "image" in rawContent - ? (rawContent.image ?? "") - : (event.payload.prefetched?.ogImage ?? undefined), - raw: rawContent.contentToVectorize, - isSuccessfullyProcessed: false, - updatedAt: new Date(), - }, - }) - .returning(); - } catch (error) { - console.log("here's the error", error); - // Check if error is a unique constraint violation - if ( - error instanceof Error && - error.message.includes("document_url_user_id_idx") - ) { - // Document already exists for this user, stop workflow - await database(this.env.HYPERDRIVE.connectionString) - .delete(documents) - .where(eq(documents.uuid, event.payload.uuid)); - throw new NonRetryableError("Document already exists for this user"); - } - if ( - error instanceof Error && - error.message.includes("document_raw_user_idx") - ) { - await database(this.env.HYPERDRIVE.connectionString) - .delete(documents) - .where(eq(documents.uuid, event.payload.uuid)); - throw new NonRetryableError("The exact same document already exists"); - } - throw error; // Re-throw other errors - } - }); - - if (!document || document.length === 0) { - throw new Error( - "Failed to create/update document - no document returned" - ); - } - - // Step 3: Generate embeddings - const { data: embeddings } = await this.env.AI.run( - "@cf/baai/bge-base-en-v1.5", - { - text: chunked, - } - ); - - // Step 4: Prepare chunk data - const chunkInsertData: ChunkInsert[] = await step.do( - "prepare chunk data", - async () => - chunked.map((chunk, index) => ({ - documentId: document[0].id, - textContent: chunk, - orderInDocument: index, - embeddings: embeddings[index], - })) - ); - - // Step 5: Insert chunks - if (chunkInsertData.length > 0) { - await step.do("insert chunks", async () => - database(this.env.HYPERDRIVE.connectionString).transaction( - async (trx) => { - await trx.insert(chunk).values(chunkInsertData); - } - ) - ); - } - - // step 6: add content to spaces - if (event.payload.spaces) { - await step.do("add content to spaces", async () => { - await database(this.env.HYPERDRIVE.connectionString).transaction( - async (trx) => { - // First get the space IDs from the UUIDs - const spaceIds = await trx - .select({ id: spaces.id }) - .from(spaces) - .where(inArray(spaces.uuid, event.payload.spaces ?? [])); - - if (spaceIds.length === 0) { - return; - } - - // Then insert the content-space mappings using the actual space IDs - await trx.insert(contentToSpace).values( - spaceIds.map((space) => ({ - contentId: document[0].id, - spaceId: space.id, - })) - ); - } - ); - }); - } - - // Step 7: Mark the document as successfully processed - await step.do("mark document as successfully processed", async () => { - await database(this.env.HYPERDRIVE.connectionString) - .update(documents) - .set({ - isSuccessfullyProcessed: true, - }) - .where(eq(documents.id, document[0].id)); - }); - } -} |