diff options
Diffstat (limited to 'apps/backend/src/workflow/index.ts')
| -rw-r--r-- | apps/backend/src/workflow/index.ts | 334 |
1 files changed, 223 insertions, 111 deletions
diff --git a/apps/backend/src/workflow/index.ts b/apps/backend/src/workflow/index.ts index 41c73015..fe1f4143 100644 --- a/apps/backend/src/workflow/index.ts +++ b/apps/backend/src/workflow/index.ts @@ -6,17 +6,106 @@ import { import { Env, WorkflowParams } from "../types"; import { fetchContent } from "../utils/fetchers"; import chunkText from "../utils/chunkers"; -import { database, eq, inArray } from "@supermemory/db"; +import { database, eq, inArray, and, or, sql } from "@supermemory/db"; import { ChunkInsert, contentToSpace, documents, spaces, + chunk, + Document, } from "@supermemory/db/schema"; import { embedMany } from "ai"; import { openai } from "../providers"; -import { chunk } from "@supermemory/db/schema"; import { NonRetryableError } from "cloudflare:workflows"; +import { createHash } from "crypto"; + +// Helper function to generate content hash +const generateHash = (content: string) => { + return createHash("sha256").update(content).digest("hex"); +}; + +interface ChunkUpdate { + oldChunk?: typeof chunk.$inferSelect; + newContent?: string; + orderInDocument: number; + needsUpdate: boolean; +} + +// Helper function to determine which chunks need updates +const analyzeContentChanges = async ( + oldContent: string, + newContent: string, + existingChunks: (typeof chunk.$inferSelect)[], + chunkSize: number = 768 +): Promise<ChunkUpdate[]> => { + // First, chunk the new content with size limits + const newChunks = chunkText(newContent, chunkSize); + const updates: ChunkUpdate[] = []; + + // Map existing chunks for quick lookup + const existingChunksMap = new Map( + existingChunks.map((c) => [c.orderInDocument, c]) + ); + + // Track which old chunks have been processed + const processedOldChunks = new Set<number>(); + + // Process new chunks and match with old ones + let currentOrder = 0; + for (const newChunkText of newChunks) { + const oldChunk = existingChunksMap.get(currentOrder); + const newChunkHash = generateHash(newChunkText); + + if (oldChunk) { + processedOldChunks.add(currentOrder); + } + + // If the new chunk is too large, we need to split it + if (newChunkText.length > chunkSize) { + // Re-chunk this specific piece to ensure it fits + const subChunks = chunkText(newChunkText, chunkSize); + + // Add each sub-chunk as a separate update + for (let i = 0; i < subChunks.length; i++) { + const subChunk = subChunks[i]; + const subChunkHash = generateHash(subChunk); + + updates.push({ + oldChunk: i === 0 ? oldChunk : undefined, // Only use the old chunk for the first sub-chunk + newContent: subChunk, + orderInDocument: currentOrder + i, + needsUpdate: true, // Always need to update since we split the chunk + }); + } + + currentOrder += subChunks.length; + } else { + // Normal case - chunk fits within size limit + updates.push({ + oldChunk, + newContent: newChunkText, + orderInDocument: currentOrder, + needsUpdate: !oldChunk || oldChunk.contentHash !== newChunkHash, + }); + currentOrder++; + } + } + + // Handle any remaining old chunks that weren't processed + for (const [order, oldChunk] of existingChunksMap) { + if (!processedOldChunks.has(order)) { + updates.push({ + oldChunk, + orderInDocument: order, + needsUpdate: true, // Mark for deletion since it wasn't used in new content + }); + } + } + + // Sort updates by order to ensure proper sequence + return updates.sort((a, b) => a.orderInDocument - b.orderInDocument); +}; // TODO: handle errors properly here. @@ -24,16 +113,17 @@ export class ContentWorkflow extends WorkflowEntrypoint<Env, WorkflowParams> { async run(event: WorkflowEvent<WorkflowParams>, step: WorkflowStep) { // Step 0: Check if user has reached memory limit await step.do("check memory limit", async () => { - const existingMemories = await database(this.env.HYPERDRIVE.connectionString) + const existingMemories = await database( + this.env.HYPERDRIVE.connectionString + ) .select() .from(documents) .where(eq(documents.userId, event.payload.userId)); if (existingMemories.length >= 2000) { - await database(this.env.HYPERDRIVE.connectionString) - .delete(documents) - .where(eq(documents.uuid, event.payload.uuid)); - throw new NonRetryableError("You have reached the maximum limit of 2000 memories"); + throw new NonRetryableError( + "You have reached the maximum limit of 2000 memories" + ); } }); @@ -53,29 +143,59 @@ export class ContentWorkflow extends WorkflowEntrypoint<Env, WorkflowParams> { throw new NonRetryableError("The content is too big (maximum 20 pages)"); } - const chunked = await step.do("chunk content", async () => - chunkText(rawContent.contentToVectorize, 768) - ); + // Generate content hash + const contentHash = generateHash(rawContent.contentToVectorize); + + // Step 2: Check for existing document by URL + const existingDocument = await step.do( + "check existing document", + async () => { + if (!event.payload.url) return null; - // Step 2: Create the document in the database. - const document = await step.do("create document", async () => { - try { - // First check if document exists - const existingDoc = await database(this.env.HYPERDRIVE.connectionString) + console.log( + "[Workflow] Checking for existing document with URL:", + event.payload.url + ); + const docs = await database(this.env.HYPERDRIVE.connectionString) .select() .from(documents) - .where(eq(documents.uuid, event.payload.uuid)) + .where( + and( + eq(documents.userId, event.payload.userId), + eq(documents.url, event.payload.url), + sql`${documents.url} IS NOT NULL` + ) + ) .limit(1); - return await database(this.env.HYPERDRIVE.connectionString) - .insert(documents) - .values({ - userId: event.payload.userId, - type: event.payload.type, - uuid: event.payload.uuid, - ...(event.payload.url && { url: event.payload.url }), + if (docs[0]) { + console.log("[Workflow] Found existing document:", { + id: docs[0].id, + uuid: docs[0].uuid, + url: docs[0].url, + }); + } else { + console.log("[Workflow] No existing document found for URL"); + } + + return docs[0] || null; + } + ); + + // Step 3: Update or create document + const document = await step.do("update or create document", async () => { + const db = database(this.env.HYPERDRIVE.connectionString); + + if (existingDocument) { + console.log("[Workflow] Updating existing document:", { + id: existingDocument.id, + uuid: existingDocument.uuid, + }); + // Update existing document + await db + .update(documents) + .set({ title: rawContent.title, - content: rawContent.contentToSave, description: "description" in rawContent ? (rawContent.description ?? "") @@ -85,62 +205,56 @@ export class ContentWorkflow extends WorkflowEntrypoint<Env, WorkflowParams> { ? (rawContent.image ?? "") : (event.payload.prefetched?.ogImage ?? undefined), raw: rawContent.contentToVectorize, + content: rawContent.contentToSave, + contentHash, isSuccessfullyProcessed: false, updatedAt: new Date(), - ...(event.payload.createdAt && { - createdAt: new Date(event.payload.createdAt), - }), }) - .onConflictDoUpdate({ - target: documents.uuid, - set: { - title: rawContent.title, - content: rawContent.contentToSave, - description: - "description" in rawContent - ? (rawContent.description ?? "") - : (event.payload.prefetched?.description ?? undefined), - ogImage: - "image" in rawContent - ? (rawContent.image ?? "") - : (event.payload.prefetched?.ogImage ?? undefined), - raw: rawContent.contentToVectorize, - isSuccessfullyProcessed: false, - updatedAt: new Date(), - }, - }) - .returning(); - } catch (error) { - console.log("here's the error", error); - // Check if error is a unique constraint violation - if ( - error instanceof Error && - error.message.includes("document_url_user_id_idx") - ) { - // Document already exists for this user, stop workflow - await database(this.env.HYPERDRIVE.connectionString) - .delete(documents) - .where(eq(documents.uuid, event.payload.uuid)); - throw new NonRetryableError("Document already exists for this user"); - } - if ( - error instanceof Error && - error.message.includes("document_raw_user_idx") - ) { - await database(this.env.HYPERDRIVE.connectionString) - .delete(documents) - .where(eq(documents.uuid, event.payload.uuid)); - throw new NonRetryableError("The exact same document already exists"); - } - throw error; // Re-throw other errors + .where(eq(documents.id, existingDocument.id)); + console.log("[Workflow] Document updated successfully"); + + return [existingDocument]; } - }); - if (!document || document.length === 0) { - throw new Error( - "Failed to create/update document - no document returned" + console.log( + "[Workflow] Updating document with UUID:", + event.payload.uuid ); - } + // Create new document + const updated = await db + .update(documents) + .set({ + title: rawContent.title, + description: + "description" in rawContent + ? (rawContent.description ?? "") + : (event.payload.prefetched?.description ?? undefined), + ogImage: + "image" in rawContent + ? (rawContent.image ?? "") + : (event.payload.prefetched?.ogImage ?? undefined), + content: rawContent.contentToSave, + contentHash, + isSuccessfullyProcessed: false, + updatedAt: new Date(), + }) + .where(eq(documents.uuid, event.payload.uuid)) + .returning(); + console.log("[Workflow] Document update result:", { + updatedId: updated[0]?.id, + updatedUuid: updated[0]?.uuid, + }); + return updated; + }); + + // Step 4: Process content + console.log("[Workflow] Processing content for document:", { + id: document[0].id, + uuid: document[0].uuid, + }); + const chunked = await step.do("chunk content", async () => + chunkText(rawContent.contentToVectorize, 768) + ); const model = openai(this.env, this.env.OPEN_AI_API_KEY).embedding( "text-embedding-3-large", @@ -149,7 +263,7 @@ export class ContentWorkflow extends WorkflowEntrypoint<Env, WorkflowParams> { } ); - // Step 3: Create chunks from the content. + // Create embeddings for chunks const embeddings = await step.do( "create embeddings", { @@ -167,52 +281,60 @@ export class ContentWorkflow extends WorkflowEntrypoint<Env, WorkflowParams> { values: chunked, } ); - return embeddings; } ); - // Step 4: Prepare chunk data - const chunkInsertData: ChunkInsert[] = await step.do( - "prepare chunk data", - async () => - chunked.map((chunk, index) => ({ + // Step 5: Update chunks + await step.do("update chunks", async () => { + const db = database(this.env.HYPERDRIVE.connectionString); + + // Delete existing chunks if any + await db.delete(chunk).where(eq(chunk.documentId, document[0].id)); + + // Insert new chunks + const chunkInsertData: ChunkInsert[] = chunked.map( + (chunkText, index) => ({ documentId: document[0].id, - textContent: chunk, + textContent: chunkText, + contentHash: generateHash(chunkText), orderInDocument: index, embeddings: embeddings[index], - })) - ); - - console.log(chunkInsertData); + }) + ); - // Step 5: Insert chunks - if (chunkInsertData.length > 0) { - await step.do("insert chunks", async () => - database(this.env.HYPERDRIVE.connectionString).transaction( - async (trx) => { - await trx.insert(chunk).values(chunkInsertData); + if (chunkInsertData.length > 0) { + await db.transaction(async (trx) => { + for (const chunkData of chunkInsertData) { + await trx + .insert(chunk) + .values(chunkData) + .onConflictDoNothing({ target: chunk.contentHash }); } - ) - ); - } + }); + } + }); - // step 6: add content to spaces + // Step 6: Mark document as processed + await step.do("mark document as processed", async () => { + await database(this.env.HYPERDRIVE.connectionString) + .update(documents) + .set({ isSuccessfullyProcessed: true }) + .where(eq(documents.id, document[0].id)); + }); + + // Step 7: Add content to spaces if specified if (event.payload.spaces) { await step.do("add content to spaces", async () => { await database(this.env.HYPERDRIVE.connectionString).transaction( async (trx) => { - // First get the space IDs from the UUIDs const spaceIds = await trx .select({ id: spaces.id }) .from(spaces) .where(inArray(spaces.uuid, event.payload.spaces ?? [])); - if (spaceIds.length === 0) { - return; - } + if (spaceIds.length === 0) return; - // Then insert the content-space mappings using the actual space IDs await trx.insert(contentToSpace).values( spaceIds.map((space) => ({ contentId: document[0].id, @@ -223,15 +345,5 @@ export class ContentWorkflow extends WorkflowEntrypoint<Env, WorkflowParams> { ); }); } - - // Step 7: Mark the document as successfully processed - await step.do("mark document as successfully processed", async () => { - await database(this.env.HYPERDRIVE.connectionString) - .update(documents) - .set({ - isSuccessfullyProcessed: true, - }) - .where(eq(documents.id, document[0].id)); - }); } } |