aboutsummaryrefslogtreecommitdiff
path: root/apps/backend/src/workflow/index.ts
diff options
context:
space:
mode:
Diffstat (limited to 'apps/backend/src/workflow/index.ts')
-rw-r--r--apps/backend/src/workflow/index.ts334
1 files changed, 223 insertions, 111 deletions
diff --git a/apps/backend/src/workflow/index.ts b/apps/backend/src/workflow/index.ts
index 41c73015..fe1f4143 100644
--- a/apps/backend/src/workflow/index.ts
+++ b/apps/backend/src/workflow/index.ts
@@ -6,17 +6,106 @@ import {
import { Env, WorkflowParams } from "../types";
import { fetchContent } from "../utils/fetchers";
import chunkText from "../utils/chunkers";
-import { database, eq, inArray } from "@supermemory/db";
+import { database, eq, inArray, and, or, sql } from "@supermemory/db";
import {
ChunkInsert,
contentToSpace,
documents,
spaces,
+ chunk,
+ Document,
} from "@supermemory/db/schema";
import { embedMany } from "ai";
import { openai } from "../providers";
-import { chunk } from "@supermemory/db/schema";
import { NonRetryableError } from "cloudflare:workflows";
+import { createHash } from "crypto";
+
+// Helper function to generate content hash
+const generateHash = (content: string) => {
+ return createHash("sha256").update(content).digest("hex");
+};
+
+interface ChunkUpdate {
+ oldChunk?: typeof chunk.$inferSelect;
+ newContent?: string;
+ orderInDocument: number;
+ needsUpdate: boolean;
+}
+
+// Helper function to determine which chunks need updates
+const analyzeContentChanges = async (
+ oldContent: string,
+ newContent: string,
+ existingChunks: (typeof chunk.$inferSelect)[],
+ chunkSize: number = 768
+): Promise<ChunkUpdate[]> => {
+ // First, chunk the new content with size limits
+ const newChunks = chunkText(newContent, chunkSize);
+ const updates: ChunkUpdate[] = [];
+
+ // Map existing chunks for quick lookup
+ const existingChunksMap = new Map(
+ existingChunks.map((c) => [c.orderInDocument, c])
+ );
+
+ // Track which old chunks have been processed
+ const processedOldChunks = new Set<number>();
+
+ // Process new chunks and match with old ones
+ let currentOrder = 0;
+ for (const newChunkText of newChunks) {
+ const oldChunk = existingChunksMap.get(currentOrder);
+ const newChunkHash = generateHash(newChunkText);
+
+ if (oldChunk) {
+ processedOldChunks.add(currentOrder);
+ }
+
+ // If the new chunk is too large, we need to split it
+ if (newChunkText.length > chunkSize) {
+ // Re-chunk this specific piece to ensure it fits
+ const subChunks = chunkText(newChunkText, chunkSize);
+
+ // Add each sub-chunk as a separate update
+ for (let i = 0; i < subChunks.length; i++) {
+ const subChunk = subChunks[i];
+ const subChunkHash = generateHash(subChunk);
+
+ updates.push({
+ oldChunk: i === 0 ? oldChunk : undefined, // Only use the old chunk for the first sub-chunk
+ newContent: subChunk,
+ orderInDocument: currentOrder + i,
+ needsUpdate: true, // Always need to update since we split the chunk
+ });
+ }
+
+ currentOrder += subChunks.length;
+ } else {
+ // Normal case - chunk fits within size limit
+ updates.push({
+ oldChunk,
+ newContent: newChunkText,
+ orderInDocument: currentOrder,
+ needsUpdate: !oldChunk || oldChunk.contentHash !== newChunkHash,
+ });
+ currentOrder++;
+ }
+ }
+
+ // Handle any remaining old chunks that weren't processed
+ for (const [order, oldChunk] of existingChunksMap) {
+ if (!processedOldChunks.has(order)) {
+ updates.push({
+ oldChunk,
+ orderInDocument: order,
+ needsUpdate: true, // Mark for deletion since it wasn't used in new content
+ });
+ }
+ }
+
+ // Sort updates by order to ensure proper sequence
+ return updates.sort((a, b) => a.orderInDocument - b.orderInDocument);
+};
// TODO: handle errors properly here.
@@ -24,16 +113,17 @@ export class ContentWorkflow extends WorkflowEntrypoint<Env, WorkflowParams> {
async run(event: WorkflowEvent<WorkflowParams>, step: WorkflowStep) {
// Step 0: Check if user has reached memory limit
await step.do("check memory limit", async () => {
- const existingMemories = await database(this.env.HYPERDRIVE.connectionString)
+ const existingMemories = await database(
+ this.env.HYPERDRIVE.connectionString
+ )
.select()
.from(documents)
.where(eq(documents.userId, event.payload.userId));
if (existingMemories.length >= 2000) {
- await database(this.env.HYPERDRIVE.connectionString)
- .delete(documents)
- .where(eq(documents.uuid, event.payload.uuid));
- throw new NonRetryableError("You have reached the maximum limit of 2000 memories");
+ throw new NonRetryableError(
+ "You have reached the maximum limit of 2000 memories"
+ );
}
});
@@ -53,29 +143,59 @@ export class ContentWorkflow extends WorkflowEntrypoint<Env, WorkflowParams> {
throw new NonRetryableError("The content is too big (maximum 20 pages)");
}
- const chunked = await step.do("chunk content", async () =>
- chunkText(rawContent.contentToVectorize, 768)
- );
+ // Generate content hash
+ const contentHash = generateHash(rawContent.contentToVectorize);
+
+ // Step 2: Check for existing document by URL
+ const existingDocument = await step.do(
+ "check existing document",
+ async () => {
+ if (!event.payload.url) return null;
- // Step 2: Create the document in the database.
- const document = await step.do("create document", async () => {
- try {
- // First check if document exists
- const existingDoc = await database(this.env.HYPERDRIVE.connectionString)
+ console.log(
+ "[Workflow] Checking for existing document with URL:",
+ event.payload.url
+ );
+ const docs = await database(this.env.HYPERDRIVE.connectionString)
.select()
.from(documents)
- .where(eq(documents.uuid, event.payload.uuid))
+ .where(
+ and(
+ eq(documents.userId, event.payload.userId),
+ eq(documents.url, event.payload.url),
+ sql`${documents.url} IS NOT NULL`
+ )
+ )
.limit(1);
- return await database(this.env.HYPERDRIVE.connectionString)
- .insert(documents)
- .values({
- userId: event.payload.userId,
- type: event.payload.type,
- uuid: event.payload.uuid,
- ...(event.payload.url && { url: event.payload.url }),
+ if (docs[0]) {
+ console.log("[Workflow] Found existing document:", {
+ id: docs[0].id,
+ uuid: docs[0].uuid,
+ url: docs[0].url,
+ });
+ } else {
+ console.log("[Workflow] No existing document found for URL");
+ }
+
+ return docs[0] || null;
+ }
+ );
+
+ // Step 3: Update or create document
+ const document = await step.do("update or create document", async () => {
+ const db = database(this.env.HYPERDRIVE.connectionString);
+
+ if (existingDocument) {
+ console.log("[Workflow] Updating existing document:", {
+ id: existingDocument.id,
+ uuid: existingDocument.uuid,
+ });
+ // Update existing document
+ await db
+ .update(documents)
+ .set({
title: rawContent.title,
- content: rawContent.contentToSave,
description:
"description" in rawContent
? (rawContent.description ?? "")
@@ -85,62 +205,56 @@ export class ContentWorkflow extends WorkflowEntrypoint<Env, WorkflowParams> {
? (rawContent.image ?? "")
: (event.payload.prefetched?.ogImage ?? undefined),
raw: rawContent.contentToVectorize,
+ content: rawContent.contentToSave,
+ contentHash,
isSuccessfullyProcessed: false,
updatedAt: new Date(),
- ...(event.payload.createdAt && {
- createdAt: new Date(event.payload.createdAt),
- }),
})
- .onConflictDoUpdate({
- target: documents.uuid,
- set: {
- title: rawContent.title,
- content: rawContent.contentToSave,
- description:
- "description" in rawContent
- ? (rawContent.description ?? "")
- : (event.payload.prefetched?.description ?? undefined),
- ogImage:
- "image" in rawContent
- ? (rawContent.image ?? "")
- : (event.payload.prefetched?.ogImage ?? undefined),
- raw: rawContent.contentToVectorize,
- isSuccessfullyProcessed: false,
- updatedAt: new Date(),
- },
- })
- .returning();
- } catch (error) {
- console.log("here's the error", error);
- // Check if error is a unique constraint violation
- if (
- error instanceof Error &&
- error.message.includes("document_url_user_id_idx")
- ) {
- // Document already exists for this user, stop workflow
- await database(this.env.HYPERDRIVE.connectionString)
- .delete(documents)
- .where(eq(documents.uuid, event.payload.uuid));
- throw new NonRetryableError("Document already exists for this user");
- }
- if (
- error instanceof Error &&
- error.message.includes("document_raw_user_idx")
- ) {
- await database(this.env.HYPERDRIVE.connectionString)
- .delete(documents)
- .where(eq(documents.uuid, event.payload.uuid));
- throw new NonRetryableError("The exact same document already exists");
- }
- throw error; // Re-throw other errors
+ .where(eq(documents.id, existingDocument.id));
+ console.log("[Workflow] Document updated successfully");
+
+ return [existingDocument];
}
- });
- if (!document || document.length === 0) {
- throw new Error(
- "Failed to create/update document - no document returned"
+ console.log(
+ "[Workflow] Updating document with UUID:",
+ event.payload.uuid
);
- }
+ // Create new document
+ const updated = await db
+ .update(documents)
+ .set({
+ title: rawContent.title,
+ description:
+ "description" in rawContent
+ ? (rawContent.description ?? "")
+ : (event.payload.prefetched?.description ?? undefined),
+ ogImage:
+ "image" in rawContent
+ ? (rawContent.image ?? "")
+ : (event.payload.prefetched?.ogImage ?? undefined),
+ content: rawContent.contentToSave,
+ contentHash,
+ isSuccessfullyProcessed: false,
+ updatedAt: new Date(),
+ })
+ .where(eq(documents.uuid, event.payload.uuid))
+ .returning();
+ console.log("[Workflow] Document update result:", {
+ updatedId: updated[0]?.id,
+ updatedUuid: updated[0]?.uuid,
+ });
+ return updated;
+ });
+
+ // Step 4: Process content
+ console.log("[Workflow] Processing content for document:", {
+ id: document[0].id,
+ uuid: document[0].uuid,
+ });
+ const chunked = await step.do("chunk content", async () =>
+ chunkText(rawContent.contentToVectorize, 768)
+ );
const model = openai(this.env, this.env.OPEN_AI_API_KEY).embedding(
"text-embedding-3-large",
@@ -149,7 +263,7 @@ export class ContentWorkflow extends WorkflowEntrypoint<Env, WorkflowParams> {
}
);
- // Step 3: Create chunks from the content.
+ // Create embeddings for chunks
const embeddings = await step.do(
"create embeddings",
{
@@ -167,52 +281,60 @@ export class ContentWorkflow extends WorkflowEntrypoint<Env, WorkflowParams> {
values: chunked,
}
);
-
return embeddings;
}
);
- // Step 4: Prepare chunk data
- const chunkInsertData: ChunkInsert[] = await step.do(
- "prepare chunk data",
- async () =>
- chunked.map((chunk, index) => ({
+ // Step 5: Update chunks
+ await step.do("update chunks", async () => {
+ const db = database(this.env.HYPERDRIVE.connectionString);
+
+ // Delete existing chunks if any
+ await db.delete(chunk).where(eq(chunk.documentId, document[0].id));
+
+ // Insert new chunks
+ const chunkInsertData: ChunkInsert[] = chunked.map(
+ (chunkText, index) => ({
documentId: document[0].id,
- textContent: chunk,
+ textContent: chunkText,
+ contentHash: generateHash(chunkText),
orderInDocument: index,
embeddings: embeddings[index],
- }))
- );
-
- console.log(chunkInsertData);
+ })
+ );
- // Step 5: Insert chunks
- if (chunkInsertData.length > 0) {
- await step.do("insert chunks", async () =>
- database(this.env.HYPERDRIVE.connectionString).transaction(
- async (trx) => {
- await trx.insert(chunk).values(chunkInsertData);
+ if (chunkInsertData.length > 0) {
+ await db.transaction(async (trx) => {
+ for (const chunkData of chunkInsertData) {
+ await trx
+ .insert(chunk)
+ .values(chunkData)
+ .onConflictDoNothing({ target: chunk.contentHash });
}
- )
- );
- }
+ });
+ }
+ });
- // step 6: add content to spaces
+ // Step 6: Mark document as processed
+ await step.do("mark document as processed", async () => {
+ await database(this.env.HYPERDRIVE.connectionString)
+ .update(documents)
+ .set({ isSuccessfullyProcessed: true })
+ .where(eq(documents.id, document[0].id));
+ });
+
+ // Step 7: Add content to spaces if specified
if (event.payload.spaces) {
await step.do("add content to spaces", async () => {
await database(this.env.HYPERDRIVE.connectionString).transaction(
async (trx) => {
- // First get the space IDs from the UUIDs
const spaceIds = await trx
.select({ id: spaces.id })
.from(spaces)
.where(inArray(spaces.uuid, event.payload.spaces ?? []));
- if (spaceIds.length === 0) {
- return;
- }
+ if (spaceIds.length === 0) return;
- // Then insert the content-space mappings using the actual space IDs
await trx.insert(contentToSpace).values(
spaceIds.map((space) => ({
contentId: document[0].id,
@@ -223,15 +345,5 @@ export class ContentWorkflow extends WorkflowEntrypoint<Env, WorkflowParams> {
);
});
}
-
- // Step 7: Mark the document as successfully processed
- await step.do("mark document as successfully processed", async () => {
- await database(this.env.HYPERDRIVE.connectionString)
- .update(documents)
- .set({
- isSuccessfullyProcessed: true,
- })
- .where(eq(documents.id, document[0].id));
- });
}
}