diff options
| author | Dhravya Shah <[email protected]> | 2025-01-23 19:05:33 -0700 |
|---|---|---|
| committer | Dhravya Shah <[email protected]> | 2025-01-23 19:05:33 -0700 |
| commit | 12b26382094c0faf27bfb94ec50564e966a22993 (patch) | |
| tree | 1841c3a6d59dd662e3c065422fe01ba4937008be /apps/backend/src | |
| parent | import tools: CSV and markdown (obsidian) (diff) | |
| download | supermemory-chunking-and-retrieval.tar.xz supermemory-chunking-and-retrieval.zip | |
De duplication and updating chunkschunking-and-retrieval
Diffstat (limited to 'apps/backend/src')
| -rw-r--r-- | apps/backend/src/auth.ts | 2 | ||||
| -rw-r--r-- | apps/backend/src/index.tsx | 4 | ||||
| -rw-r--r-- | apps/backend/src/routes/actions.ts | 169 | ||||
| -rw-r--r-- | apps/backend/src/utils/extractor.ts | 4 | ||||
| -rw-r--r-- | apps/backend/src/workflow/index.ts | 334 |
5 files changed, 348 insertions, 165 deletions
diff --git a/apps/backend/src/auth.ts b/apps/backend/src/auth.ts index b66bba39..206624d3 100644 --- a/apps/backend/src/auth.ts +++ b/apps/backend/src/auth.ts @@ -93,7 +93,6 @@ export const auth = async ( }; const session = await getSessionFromRequest(c.req.raw, context); - console.log("Session", session); c.set("session", session); if (session?.user?.id) { @@ -129,7 +128,6 @@ export const auth = async ( user = Array.isArray(user) ? user[0] : user; c.set("user", user); - console.log("User", user); } } } diff --git a/apps/backend/src/index.tsx b/apps/backend/src/index.tsx index cb63e53e..dea96448 100644 --- a/apps/backend/src/index.tsx +++ b/apps/backend/src/index.tsx @@ -48,6 +48,10 @@ export const app = new Hono<{ Variables: Variables; Bindings: Env }>() .use("/api/*", (c, next) => { const user = c.get("user"); + if (c.env.NODE_ENV === "development") { + return next(); + } + // RATELIMITS const rateLimitConfig = { // Endpoints that bypass rate limiting diff --git a/apps/backend/src/routes/actions.ts b/apps/backend/src/routes/actions.ts index 4de1d339..28124347 100644 --- a/apps/backend/src/routes/actions.ts +++ b/apps/backend/src/routes/actions.ts @@ -717,40 +717,6 @@ const actions = new Hono<{ Variables: Variables; Bindings: Env }>() const db = database(c.env.HYPERDRIVE.connectionString); - // Calculate document hash early to enable faster duplicate detection - const content = body.prefetched?.contentToVectorize || body.content; - const encoder = new TextEncoder(); - const data = encoder.encode(content); - const hashBuffer = await crypto.subtle.digest("SHA-256", data); - const hashArray = Array.from(new Uint8Array(hashBuffer)); - const documentHash = hashArray - .map((b) => b.toString(16).padStart(2, "0")) - .join(""); - - // Check for duplicates using hash - const existingDocs = await db - .select() - .from(documents) - .where( - and( - eq(documents.userId, user.id), - or( - eq(documents.contentHash, documentHash), - and( - eq(documents.type, type.value), - or(eq(documents.url, body.content), eq(documents.raw, content)) - ) - ) - ) - ); - - if (existingDocs.length > 0) { - return c.json( - { error: `That ${type.value} already exists in your memories` }, - 409 - ); - } - // Check space permissions if spaces are specified if (body.spaces && body.spaces.length > 0) { const spacePermissions = await Promise.all( @@ -828,38 +794,90 @@ const actions = new Hono<{ Variables: Variables; Bindings: Env }>() ? body.content : `https://supermemory.ai/content/${contentId}`; - // Insert into documents table with hash + // Insert minimal document record try { - await db.insert(documents).values({ - uuid: contentId, - userId: user.id, - type: type.value, + // Check if document with same URL exists + console.log( + "[Add] Checking for existing document with URL:", + indexedUrl + ); + const existingDoc = await db + .select() + .from(documents) + .where( + and( + eq(documents.userId, user.id), + eq(documents.url, indexedUrl), + sql`${documents.url} IS NOT NULL` + ) + ) + .limit(1); + + let documentId = contentId; + + if (existingDoc.length > 0) { + console.log("[Add] Found existing document:", { + id: existingDoc[0].id, + uuid: existingDoc[0].uuid, + url: existingDoc[0].url, + }); + documentId = existingDoc[0].uuid; + // Update the raw content of existing document + console.log("[Add] Updating existing document content"); + await db + .update(documents) + .set({ + raw: + (body.prefetched ?? body.content) + + "\n\n" + + body.spaces?.join(" "), + updatedAt: new Date(), + }) + .where(eq(documents.id, existingDoc[0].id)); + console.log("[Add] Document updated successfully"); + } else { + console.log("[Add] No existing document found, creating new one"); + // Insert new document + await db.insert(documents).values({ + uuid: contentId, + userId: user.id, + type: type.value, + url: indexedUrl, + raw: + (body.prefetched ?? body.content) + + "\n\n" + + body.spaces?.join(" "), + }); + console.log("[Add] New document created successfully"); + } + + console.log("[Add] Starting workflow with params:", { + documentId, url: indexedUrl, - title: body.prefetched?.title, - description: body.prefetched?.description, - ogImage: body.prefetched?.ogImage, - contentHash: documentHash, - raw: - (body.prefetched ?? body.content) + "\n\n" + body.spaces?.join(" "), + isUpdate: existingDoc.length > 0, }); - + // Start the workflow which will handle everything else await c.env.CONTENT_WORKFLOW.create({ params: { userId: user.id, content: body.content, spaces: body.spaces, type: type.value, - uuid: contentId, + uuid: documentId, url: indexedUrl, prefetched: body.prefetched, }, - id: contentId, + id: documentId, }); return c.json({ - message: "Content added successfully", - id: contentId, + message: + existingDoc.length > 0 + ? "Content update started" + : "Content processing started", + id: documentId, type: type.value, + updated: existingDoc.length > 0, }); } catch (error) { console.error("[Add Content Error]", error); @@ -1054,6 +1072,57 @@ const actions = new Hono<{ Variables: Variables; Bindings: Env }>() ? content : `https://supermemory.ai/content/${contentId}`; + // Check for existing document with same URL + const existingDoc = await db + .select() + .from(documents) + .where( + and( + eq(documents.userId, user.id), + eq(documents.url, url), + sql`${documents.url} IS NOT NULL` + ) + ) + .limit(1); + + if (existingDoc.length > 0) { + // Update existing document + await db + .update(documents) + .set({ + title, + contentHash: documentHash, + raw: content + "\n\n" + spaces?.join(" "), + updatedAt: new Date(), + }) + .where(eq(documents.id, existingDoc[0].id)); + + // Create workflow for updating + await c.env.CONTENT_WORKFLOW.create({ + params: { + userId: user.id, + content, + spaces, + type: type.value, + uuid: existingDoc[0].uuid, + url, + }, + id: existingDoc[0].uuid, + }); + + succeeded++; + sendMessage({ + progress: Math.round((processed / total) * 100), + status: "updated", + title: typeof item === "string" ? item : item.title, + processed, + total, + succeeded, + failed, + }); + continue; + } + // Insert into documents table await db.insert(documents).values({ uuid: contentId, diff --git a/apps/backend/src/utils/extractor.ts b/apps/backend/src/utils/extractor.ts index 9bf76181..8201cd28 100644 --- a/apps/backend/src/utils/extractor.ts +++ b/apps/backend/src/utils/extractor.ts @@ -2,7 +2,7 @@ import { Env } from "../types"; export const extractPageContent = async (content: string, env: Env) => { console.log("content", content); - const resp = await fetch(`https://md.dhr.wtf?url=${content}`); + const resp = await fetch(`https://md.dhr.wtf?url=${content}?nocache`); if (!resp.ok) { throw new Error( @@ -10,7 +10,7 @@ export const extractPageContent = async (content: string, env: Env) => { ); } - const metadataResp = await fetch(`https://md.dhr.wtf/metadata?url=${content}`); + const metadataResp = await fetch(`https://md.dhr.wtf/metadata?url=${content}?nocache`); if (!metadataResp.ok) { throw new Error( diff --git a/apps/backend/src/workflow/index.ts b/apps/backend/src/workflow/index.ts index 41c73015..fe1f4143 100644 --- a/apps/backend/src/workflow/index.ts +++ b/apps/backend/src/workflow/index.ts @@ -6,17 +6,106 @@ import { import { Env, WorkflowParams } from "../types"; import { fetchContent } from "../utils/fetchers"; import chunkText from "../utils/chunkers"; -import { database, eq, inArray } from "@supermemory/db"; +import { database, eq, inArray, and, or, sql } from "@supermemory/db"; import { ChunkInsert, contentToSpace, documents, spaces, + chunk, + Document, } from "@supermemory/db/schema"; import { embedMany } from "ai"; import { openai } from "../providers"; -import { chunk } from "@supermemory/db/schema"; import { NonRetryableError } from "cloudflare:workflows"; +import { createHash } from "crypto"; + +// Helper function to generate content hash +const generateHash = (content: string) => { + return createHash("sha256").update(content).digest("hex"); +}; + +interface ChunkUpdate { + oldChunk?: typeof chunk.$inferSelect; + newContent?: string; + orderInDocument: number; + needsUpdate: boolean; +} + +// Helper function to determine which chunks need updates +const analyzeContentChanges = async ( + oldContent: string, + newContent: string, + existingChunks: (typeof chunk.$inferSelect)[], + chunkSize: number = 768 +): Promise<ChunkUpdate[]> => { + // First, chunk the new content with size limits + const newChunks = chunkText(newContent, chunkSize); + const updates: ChunkUpdate[] = []; + + // Map existing chunks for quick lookup + const existingChunksMap = new Map( + existingChunks.map((c) => [c.orderInDocument, c]) + ); + + // Track which old chunks have been processed + const processedOldChunks = new Set<number>(); + + // Process new chunks and match with old ones + let currentOrder = 0; + for (const newChunkText of newChunks) { + const oldChunk = existingChunksMap.get(currentOrder); + const newChunkHash = generateHash(newChunkText); + + if (oldChunk) { + processedOldChunks.add(currentOrder); + } + + // If the new chunk is too large, we need to split it + if (newChunkText.length > chunkSize) { + // Re-chunk this specific piece to ensure it fits + const subChunks = chunkText(newChunkText, chunkSize); + + // Add each sub-chunk as a separate update + for (let i = 0; i < subChunks.length; i++) { + const subChunk = subChunks[i]; + const subChunkHash = generateHash(subChunk); + + updates.push({ + oldChunk: i === 0 ? oldChunk : undefined, // Only use the old chunk for the first sub-chunk + newContent: subChunk, + orderInDocument: currentOrder + i, + needsUpdate: true, // Always need to update since we split the chunk + }); + } + + currentOrder += subChunks.length; + } else { + // Normal case - chunk fits within size limit + updates.push({ + oldChunk, + newContent: newChunkText, + orderInDocument: currentOrder, + needsUpdate: !oldChunk || oldChunk.contentHash !== newChunkHash, + }); + currentOrder++; + } + } + + // Handle any remaining old chunks that weren't processed + for (const [order, oldChunk] of existingChunksMap) { + if (!processedOldChunks.has(order)) { + updates.push({ + oldChunk, + orderInDocument: order, + needsUpdate: true, // Mark for deletion since it wasn't used in new content + }); + } + } + + // Sort updates by order to ensure proper sequence + return updates.sort((a, b) => a.orderInDocument - b.orderInDocument); +}; // TODO: handle errors properly here. @@ -24,16 +113,17 @@ export class ContentWorkflow extends WorkflowEntrypoint<Env, WorkflowParams> { async run(event: WorkflowEvent<WorkflowParams>, step: WorkflowStep) { // Step 0: Check if user has reached memory limit await step.do("check memory limit", async () => { - const existingMemories = await database(this.env.HYPERDRIVE.connectionString) + const existingMemories = await database( + this.env.HYPERDRIVE.connectionString + ) .select() .from(documents) .where(eq(documents.userId, event.payload.userId)); if (existingMemories.length >= 2000) { - await database(this.env.HYPERDRIVE.connectionString) - .delete(documents) - .where(eq(documents.uuid, event.payload.uuid)); - throw new NonRetryableError("You have reached the maximum limit of 2000 memories"); + throw new NonRetryableError( + "You have reached the maximum limit of 2000 memories" + ); } }); @@ -53,29 +143,59 @@ export class ContentWorkflow extends WorkflowEntrypoint<Env, WorkflowParams> { throw new NonRetryableError("The content is too big (maximum 20 pages)"); } - const chunked = await step.do("chunk content", async () => - chunkText(rawContent.contentToVectorize, 768) - ); + // Generate content hash + const contentHash = generateHash(rawContent.contentToVectorize); + + // Step 2: Check for existing document by URL + const existingDocument = await step.do( + "check existing document", + async () => { + if (!event.payload.url) return null; - // Step 2: Create the document in the database. - const document = await step.do("create document", async () => { - try { - // First check if document exists - const existingDoc = await database(this.env.HYPERDRIVE.connectionString) + console.log( + "[Workflow] Checking for existing document with URL:", + event.payload.url + ); + const docs = await database(this.env.HYPERDRIVE.connectionString) .select() .from(documents) - .where(eq(documents.uuid, event.payload.uuid)) + .where( + and( + eq(documents.userId, event.payload.userId), + eq(documents.url, event.payload.url), + sql`${documents.url} IS NOT NULL` + ) + ) .limit(1); - return await database(this.env.HYPERDRIVE.connectionString) - .insert(documents) - .values({ - userId: event.payload.userId, - type: event.payload.type, - uuid: event.payload.uuid, - ...(event.payload.url && { url: event.payload.url }), + if (docs[0]) { + console.log("[Workflow] Found existing document:", { + id: docs[0].id, + uuid: docs[0].uuid, + url: docs[0].url, + }); + } else { + console.log("[Workflow] No existing document found for URL"); + } + + return docs[0] || null; + } + ); + + // Step 3: Update or create document + const document = await step.do("update or create document", async () => { + const db = database(this.env.HYPERDRIVE.connectionString); + + if (existingDocument) { + console.log("[Workflow] Updating existing document:", { + id: existingDocument.id, + uuid: existingDocument.uuid, + }); + // Update existing document + await db + .update(documents) + .set({ title: rawContent.title, - content: rawContent.contentToSave, description: "description" in rawContent ? (rawContent.description ?? "") @@ -85,62 +205,56 @@ export class ContentWorkflow extends WorkflowEntrypoint<Env, WorkflowParams> { ? (rawContent.image ?? "") : (event.payload.prefetched?.ogImage ?? undefined), raw: rawContent.contentToVectorize, + content: rawContent.contentToSave, + contentHash, isSuccessfullyProcessed: false, updatedAt: new Date(), - ...(event.payload.createdAt && { - createdAt: new Date(event.payload.createdAt), - }), }) - .onConflictDoUpdate({ - target: documents.uuid, - set: { - title: rawContent.title, - content: rawContent.contentToSave, - description: - "description" in rawContent - ? (rawContent.description ?? "") - : (event.payload.prefetched?.description ?? undefined), - ogImage: - "image" in rawContent - ? (rawContent.image ?? "") - : (event.payload.prefetched?.ogImage ?? undefined), - raw: rawContent.contentToVectorize, - isSuccessfullyProcessed: false, - updatedAt: new Date(), - }, - }) - .returning(); - } catch (error) { - console.log("here's the error", error); - // Check if error is a unique constraint violation - if ( - error instanceof Error && - error.message.includes("document_url_user_id_idx") - ) { - // Document already exists for this user, stop workflow - await database(this.env.HYPERDRIVE.connectionString) - .delete(documents) - .where(eq(documents.uuid, event.payload.uuid)); - throw new NonRetryableError("Document already exists for this user"); - } - if ( - error instanceof Error && - error.message.includes("document_raw_user_idx") - ) { - await database(this.env.HYPERDRIVE.connectionString) - .delete(documents) - .where(eq(documents.uuid, event.payload.uuid)); - throw new NonRetryableError("The exact same document already exists"); - } - throw error; // Re-throw other errors + .where(eq(documents.id, existingDocument.id)); + console.log("[Workflow] Document updated successfully"); + + return [existingDocument]; } - }); - if (!document || document.length === 0) { - throw new Error( - "Failed to create/update document - no document returned" + console.log( + "[Workflow] Updating document with UUID:", + event.payload.uuid ); - } + // Create new document + const updated = await db + .update(documents) + .set({ + title: rawContent.title, + description: + "description" in rawContent + ? (rawContent.description ?? "") + : (event.payload.prefetched?.description ?? undefined), + ogImage: + "image" in rawContent + ? (rawContent.image ?? "") + : (event.payload.prefetched?.ogImage ?? undefined), + content: rawContent.contentToSave, + contentHash, + isSuccessfullyProcessed: false, + updatedAt: new Date(), + }) + .where(eq(documents.uuid, event.payload.uuid)) + .returning(); + console.log("[Workflow] Document update result:", { + updatedId: updated[0]?.id, + updatedUuid: updated[0]?.uuid, + }); + return updated; + }); + + // Step 4: Process content + console.log("[Workflow] Processing content for document:", { + id: document[0].id, + uuid: document[0].uuid, + }); + const chunked = await step.do("chunk content", async () => + chunkText(rawContent.contentToVectorize, 768) + ); const model = openai(this.env, this.env.OPEN_AI_API_KEY).embedding( "text-embedding-3-large", @@ -149,7 +263,7 @@ export class ContentWorkflow extends WorkflowEntrypoint<Env, WorkflowParams> { } ); - // Step 3: Create chunks from the content. + // Create embeddings for chunks const embeddings = await step.do( "create embeddings", { @@ -167,52 +281,60 @@ export class ContentWorkflow extends WorkflowEntrypoint<Env, WorkflowParams> { values: chunked, } ); - return embeddings; } ); - // Step 4: Prepare chunk data - const chunkInsertData: ChunkInsert[] = await step.do( - "prepare chunk data", - async () => - chunked.map((chunk, index) => ({ + // Step 5: Update chunks + await step.do("update chunks", async () => { + const db = database(this.env.HYPERDRIVE.connectionString); + + // Delete existing chunks if any + await db.delete(chunk).where(eq(chunk.documentId, document[0].id)); + + // Insert new chunks + const chunkInsertData: ChunkInsert[] = chunked.map( + (chunkText, index) => ({ documentId: document[0].id, - textContent: chunk, + textContent: chunkText, + contentHash: generateHash(chunkText), orderInDocument: index, embeddings: embeddings[index], - })) - ); - - console.log(chunkInsertData); + }) + ); - // Step 5: Insert chunks - if (chunkInsertData.length > 0) { - await step.do("insert chunks", async () => - database(this.env.HYPERDRIVE.connectionString).transaction( - async (trx) => { - await trx.insert(chunk).values(chunkInsertData); + if (chunkInsertData.length > 0) { + await db.transaction(async (trx) => { + for (const chunkData of chunkInsertData) { + await trx + .insert(chunk) + .values(chunkData) + .onConflictDoNothing({ target: chunk.contentHash }); } - ) - ); - } + }); + } + }); - // step 6: add content to spaces + // Step 6: Mark document as processed + await step.do("mark document as processed", async () => { + await database(this.env.HYPERDRIVE.connectionString) + .update(documents) + .set({ isSuccessfullyProcessed: true }) + .where(eq(documents.id, document[0].id)); + }); + + // Step 7: Add content to spaces if specified if (event.payload.spaces) { await step.do("add content to spaces", async () => { await database(this.env.HYPERDRIVE.connectionString).transaction( async (trx) => { - // First get the space IDs from the UUIDs const spaceIds = await trx .select({ id: spaces.id }) .from(spaces) .where(inArray(spaces.uuid, event.payload.spaces ?? [])); - if (spaceIds.length === 0) { - return; - } + if (spaceIds.length === 0) return; - // Then insert the content-space mappings using the actual space IDs await trx.insert(contentToSpace).values( spaceIds.map((space) => ({ contentId: document[0].id, @@ -223,15 +345,5 @@ export class ContentWorkflow extends WorkflowEntrypoint<Env, WorkflowParams> { ); }); } - - // Step 7: Mark the document as successfully processed - await step.do("mark document as successfully processed", async () => { - await database(this.env.HYPERDRIVE.connectionString) - .update(documents) - .set({ - isSuccessfullyProcessed: true, - }) - .where(eq(documents.id, document[0].id)); - }); } } |