aboutsummaryrefslogtreecommitdiff
path: root/apps/backend/src
diff options
context:
space:
mode:
authorDhravya Shah <[email protected]>2025-01-23 19:05:33 -0700
committerDhravya Shah <[email protected]>2025-01-23 19:05:33 -0700
commit12b26382094c0faf27bfb94ec50564e966a22993 (patch)
tree1841c3a6d59dd662e3c065422fe01ba4937008be /apps/backend/src
parentimport tools: CSV and markdown (obsidian) (diff)
downloadsupermemory-chunking-and-retrieval.tar.xz
supermemory-chunking-and-retrieval.zip
De duplication and updating chunkschunking-and-retrieval
Diffstat (limited to 'apps/backend/src')
-rw-r--r--apps/backend/src/auth.ts2
-rw-r--r--apps/backend/src/index.tsx4
-rw-r--r--apps/backend/src/routes/actions.ts169
-rw-r--r--apps/backend/src/utils/extractor.ts4
-rw-r--r--apps/backend/src/workflow/index.ts334
5 files changed, 348 insertions, 165 deletions
diff --git a/apps/backend/src/auth.ts b/apps/backend/src/auth.ts
index b66bba39..206624d3 100644
--- a/apps/backend/src/auth.ts
+++ b/apps/backend/src/auth.ts
@@ -93,7 +93,6 @@ export const auth = async (
};
const session = await getSessionFromRequest(c.req.raw, context);
- console.log("Session", session);
c.set("session", session);
if (session?.user?.id) {
@@ -129,7 +128,6 @@ export const auth = async (
user = Array.isArray(user) ? user[0] : user;
c.set("user", user);
- console.log("User", user);
}
}
}
diff --git a/apps/backend/src/index.tsx b/apps/backend/src/index.tsx
index cb63e53e..dea96448 100644
--- a/apps/backend/src/index.tsx
+++ b/apps/backend/src/index.tsx
@@ -48,6 +48,10 @@ export const app = new Hono<{ Variables: Variables; Bindings: Env }>()
.use("/api/*", (c, next) => {
const user = c.get("user");
+ if (c.env.NODE_ENV === "development") {
+ return next();
+ }
+
// RATELIMITS
const rateLimitConfig = {
// Endpoints that bypass rate limiting
diff --git a/apps/backend/src/routes/actions.ts b/apps/backend/src/routes/actions.ts
index 4de1d339..28124347 100644
--- a/apps/backend/src/routes/actions.ts
+++ b/apps/backend/src/routes/actions.ts
@@ -717,40 +717,6 @@ const actions = new Hono<{ Variables: Variables; Bindings: Env }>()
const db = database(c.env.HYPERDRIVE.connectionString);
- // Calculate document hash early to enable faster duplicate detection
- const content = body.prefetched?.contentToVectorize || body.content;
- const encoder = new TextEncoder();
- const data = encoder.encode(content);
- const hashBuffer = await crypto.subtle.digest("SHA-256", data);
- const hashArray = Array.from(new Uint8Array(hashBuffer));
- const documentHash = hashArray
- .map((b) => b.toString(16).padStart(2, "0"))
- .join("");
-
- // Check for duplicates using hash
- const existingDocs = await db
- .select()
- .from(documents)
- .where(
- and(
- eq(documents.userId, user.id),
- or(
- eq(documents.contentHash, documentHash),
- and(
- eq(documents.type, type.value),
- or(eq(documents.url, body.content), eq(documents.raw, content))
- )
- )
- )
- );
-
- if (existingDocs.length > 0) {
- return c.json(
- { error: `That ${type.value} already exists in your memories` },
- 409
- );
- }
-
// Check space permissions if spaces are specified
if (body.spaces && body.spaces.length > 0) {
const spacePermissions = await Promise.all(
@@ -828,38 +794,90 @@ const actions = new Hono<{ Variables: Variables; Bindings: Env }>()
? body.content
: `https://supermemory.ai/content/${contentId}`;
- // Insert into documents table with hash
+ // Insert minimal document record
try {
- await db.insert(documents).values({
- uuid: contentId,
- userId: user.id,
- type: type.value,
+ // Check if document with same URL exists
+ console.log(
+ "[Add] Checking for existing document with URL:",
+ indexedUrl
+ );
+ const existingDoc = await db
+ .select()
+ .from(documents)
+ .where(
+ and(
+ eq(documents.userId, user.id),
+ eq(documents.url, indexedUrl),
+ sql`${documents.url} IS NOT NULL`
+ )
+ )
+ .limit(1);
+
+ let documentId = contentId;
+
+ if (existingDoc.length > 0) {
+ console.log("[Add] Found existing document:", {
+ id: existingDoc[0].id,
+ uuid: existingDoc[0].uuid,
+ url: existingDoc[0].url,
+ });
+ documentId = existingDoc[0].uuid;
+ // Update the raw content of existing document
+ console.log("[Add] Updating existing document content");
+ await db
+ .update(documents)
+ .set({
+ raw:
+ (body.prefetched ?? body.content) +
+ "\n\n" +
+ body.spaces?.join(" "),
+ updatedAt: new Date(),
+ })
+ .where(eq(documents.id, existingDoc[0].id));
+ console.log("[Add] Document updated successfully");
+ } else {
+ console.log("[Add] No existing document found, creating new one");
+ // Insert new document
+ await db.insert(documents).values({
+ uuid: contentId,
+ userId: user.id,
+ type: type.value,
+ url: indexedUrl,
+ raw:
+ (body.prefetched ?? body.content) +
+ "\n\n" +
+ body.spaces?.join(" "),
+ });
+ console.log("[Add] New document created successfully");
+ }
+
+ console.log("[Add] Starting workflow with params:", {
+ documentId,
url: indexedUrl,
- title: body.prefetched?.title,
- description: body.prefetched?.description,
- ogImage: body.prefetched?.ogImage,
- contentHash: documentHash,
- raw:
- (body.prefetched ?? body.content) + "\n\n" + body.spaces?.join(" "),
+ isUpdate: existingDoc.length > 0,
});
-
+ // Start the workflow which will handle everything else
await c.env.CONTENT_WORKFLOW.create({
params: {
userId: user.id,
content: body.content,
spaces: body.spaces,
type: type.value,
- uuid: contentId,
+ uuid: documentId,
url: indexedUrl,
prefetched: body.prefetched,
},
- id: contentId,
+ id: documentId,
});
return c.json({
- message: "Content added successfully",
- id: contentId,
+ message:
+ existingDoc.length > 0
+ ? "Content update started"
+ : "Content processing started",
+ id: documentId,
type: type.value,
+ updated: existingDoc.length > 0,
});
} catch (error) {
console.error("[Add Content Error]", error);
@@ -1054,6 +1072,57 @@ const actions = new Hono<{ Variables: Variables; Bindings: Env }>()
? content
: `https://supermemory.ai/content/${contentId}`;
+ // Check for existing document with same URL
+ const existingDoc = await db
+ .select()
+ .from(documents)
+ .where(
+ and(
+ eq(documents.userId, user.id),
+ eq(documents.url, url),
+ sql`${documents.url} IS NOT NULL`
+ )
+ )
+ .limit(1);
+
+ if (existingDoc.length > 0) {
+ // Update existing document
+ await db
+ .update(documents)
+ .set({
+ title,
+ contentHash: documentHash,
+ raw: content + "\n\n" + spaces?.join(" "),
+ updatedAt: new Date(),
+ })
+ .where(eq(documents.id, existingDoc[0].id));
+
+ // Create workflow for updating
+ await c.env.CONTENT_WORKFLOW.create({
+ params: {
+ userId: user.id,
+ content,
+ spaces,
+ type: type.value,
+ uuid: existingDoc[0].uuid,
+ url,
+ },
+ id: existingDoc[0].uuid,
+ });
+
+ succeeded++;
+ sendMessage({
+ progress: Math.round((processed / total) * 100),
+ status: "updated",
+ title: typeof item === "string" ? item : item.title,
+ processed,
+ total,
+ succeeded,
+ failed,
+ });
+ continue;
+ }
+
// Insert into documents table
await db.insert(documents).values({
uuid: contentId,
diff --git a/apps/backend/src/utils/extractor.ts b/apps/backend/src/utils/extractor.ts
index 9bf76181..8201cd28 100644
--- a/apps/backend/src/utils/extractor.ts
+++ b/apps/backend/src/utils/extractor.ts
@@ -2,7 +2,7 @@ import { Env } from "../types";
export const extractPageContent = async (content: string, env: Env) => {
console.log("content", content);
- const resp = await fetch(`https://md.dhr.wtf?url=${content}`);
+ const resp = await fetch(`https://md.dhr.wtf?url=${content}?nocache`);
if (!resp.ok) {
throw new Error(
@@ -10,7 +10,7 @@ export const extractPageContent = async (content: string, env: Env) => {
);
}
- const metadataResp = await fetch(`https://md.dhr.wtf/metadata?url=${content}`);
+ const metadataResp = await fetch(`https://md.dhr.wtf/metadata?url=${content}?nocache`);
if (!metadataResp.ok) {
throw new Error(
diff --git a/apps/backend/src/workflow/index.ts b/apps/backend/src/workflow/index.ts
index 41c73015..fe1f4143 100644
--- a/apps/backend/src/workflow/index.ts
+++ b/apps/backend/src/workflow/index.ts
@@ -6,17 +6,106 @@ import {
import { Env, WorkflowParams } from "../types";
import { fetchContent } from "../utils/fetchers";
import chunkText from "../utils/chunkers";
-import { database, eq, inArray } from "@supermemory/db";
+import { database, eq, inArray, and, or, sql } from "@supermemory/db";
import {
ChunkInsert,
contentToSpace,
documents,
spaces,
+ chunk,
+ Document,
} from "@supermemory/db/schema";
import { embedMany } from "ai";
import { openai } from "../providers";
-import { chunk } from "@supermemory/db/schema";
import { NonRetryableError } from "cloudflare:workflows";
+import { createHash } from "crypto";
+
+// Helper function to generate content hash
+const generateHash = (content: string) => {
+ return createHash("sha256").update(content).digest("hex");
+};
+
+interface ChunkUpdate {
+ oldChunk?: typeof chunk.$inferSelect;
+ newContent?: string;
+ orderInDocument: number;
+ needsUpdate: boolean;
+}
+
+// Helper function to determine which chunks need updates
+const analyzeContentChanges = async (
+ oldContent: string,
+ newContent: string,
+ existingChunks: (typeof chunk.$inferSelect)[],
+ chunkSize: number = 768
+): Promise<ChunkUpdate[]> => {
+ // First, chunk the new content with size limits
+ const newChunks = chunkText(newContent, chunkSize);
+ const updates: ChunkUpdate[] = [];
+
+ // Map existing chunks for quick lookup
+ const existingChunksMap = new Map(
+ existingChunks.map((c) => [c.orderInDocument, c])
+ );
+
+ // Track which old chunks have been processed
+ const processedOldChunks = new Set<number>();
+
+ // Process new chunks and match with old ones
+ let currentOrder = 0;
+ for (const newChunkText of newChunks) {
+ const oldChunk = existingChunksMap.get(currentOrder);
+ const newChunkHash = generateHash(newChunkText);
+
+ if (oldChunk) {
+ processedOldChunks.add(currentOrder);
+ }
+
+ // If the new chunk is too large, we need to split it
+ if (newChunkText.length > chunkSize) {
+ // Re-chunk this specific piece to ensure it fits
+ const subChunks = chunkText(newChunkText, chunkSize);
+
+ // Add each sub-chunk as a separate update
+ for (let i = 0; i < subChunks.length; i++) {
+ const subChunk = subChunks[i];
+ const subChunkHash = generateHash(subChunk);
+
+ updates.push({
+ oldChunk: i === 0 ? oldChunk : undefined, // Only use the old chunk for the first sub-chunk
+ newContent: subChunk,
+ orderInDocument: currentOrder + i,
+ needsUpdate: true, // Always need to update since we split the chunk
+ });
+ }
+
+ currentOrder += subChunks.length;
+ } else {
+ // Normal case - chunk fits within size limit
+ updates.push({
+ oldChunk,
+ newContent: newChunkText,
+ orderInDocument: currentOrder,
+ needsUpdate: !oldChunk || oldChunk.contentHash !== newChunkHash,
+ });
+ currentOrder++;
+ }
+ }
+
+ // Handle any remaining old chunks that weren't processed
+ for (const [order, oldChunk] of existingChunksMap) {
+ if (!processedOldChunks.has(order)) {
+ updates.push({
+ oldChunk,
+ orderInDocument: order,
+ needsUpdate: true, // Mark for deletion since it wasn't used in new content
+ });
+ }
+ }
+
+ // Sort updates by order to ensure proper sequence
+ return updates.sort((a, b) => a.orderInDocument - b.orderInDocument);
+};
// TODO: handle errors properly here.
@@ -24,16 +113,17 @@ export class ContentWorkflow extends WorkflowEntrypoint<Env, WorkflowParams> {
async run(event: WorkflowEvent<WorkflowParams>, step: WorkflowStep) {
// Step 0: Check if user has reached memory limit
await step.do("check memory limit", async () => {
- const existingMemories = await database(this.env.HYPERDRIVE.connectionString)
+ const existingMemories = await database(
+ this.env.HYPERDRIVE.connectionString
+ )
.select()
.from(documents)
.where(eq(documents.userId, event.payload.userId));
if (existingMemories.length >= 2000) {
- await database(this.env.HYPERDRIVE.connectionString)
- .delete(documents)
- .where(eq(documents.uuid, event.payload.uuid));
- throw new NonRetryableError("You have reached the maximum limit of 2000 memories");
+ throw new NonRetryableError(
+ "You have reached the maximum limit of 2000 memories"
+ );
}
});
@@ -53,29 +143,59 @@ export class ContentWorkflow extends WorkflowEntrypoint<Env, WorkflowParams> {
throw new NonRetryableError("The content is too big (maximum 20 pages)");
}
- const chunked = await step.do("chunk content", async () =>
- chunkText(rawContent.contentToVectorize, 768)
- );
+ // Generate content hash
+ const contentHash = generateHash(rawContent.contentToVectorize);
+
+ // Step 2: Check for existing document by URL
+ const existingDocument = await step.do(
+ "check existing document",
+ async () => {
+ if (!event.payload.url) return null;
- // Step 2: Create the document in the database.
- const document = await step.do("create document", async () => {
- try {
- // First check if document exists
- const existingDoc = await database(this.env.HYPERDRIVE.connectionString)
+ console.log(
+ "[Workflow] Checking for existing document with URL:",
+ event.payload.url
+ );
+ const docs = await database(this.env.HYPERDRIVE.connectionString)
.select()
.from(documents)
- .where(eq(documents.uuid, event.payload.uuid))
+ .where(
+ and(
+ eq(documents.userId, event.payload.userId),
+ eq(documents.url, event.payload.url),
+ sql`${documents.url} IS NOT NULL`
+ )
+ )
.limit(1);
- return await database(this.env.HYPERDRIVE.connectionString)
- .insert(documents)
- .values({
- userId: event.payload.userId,
- type: event.payload.type,
- uuid: event.payload.uuid,
- ...(event.payload.url && { url: event.payload.url }),
+ if (docs[0]) {
+ console.log("[Workflow] Found existing document:", {
+ id: docs[0].id,
+ uuid: docs[0].uuid,
+ url: docs[0].url,
+ });
+ } else {
+ console.log("[Workflow] No existing document found for URL");
+ }
+
+ return docs[0] || null;
+ }
+ );
+
+ // Step 3: Update or create document
+ const document = await step.do("update or create document", async () => {
+ const db = database(this.env.HYPERDRIVE.connectionString);
+
+ if (existingDocument) {
+ console.log("[Workflow] Updating existing document:", {
+ id: existingDocument.id,
+ uuid: existingDocument.uuid,
+ });
+ // Update existing document
+ await db
+ .update(documents)
+ .set({
title: rawContent.title,
- content: rawContent.contentToSave,
description:
"description" in rawContent
? (rawContent.description ?? "")
@@ -85,62 +205,56 @@ export class ContentWorkflow extends WorkflowEntrypoint<Env, WorkflowParams> {
? (rawContent.image ?? "")
: (event.payload.prefetched?.ogImage ?? undefined),
raw: rawContent.contentToVectorize,
+ content: rawContent.contentToSave,
+ contentHash,
isSuccessfullyProcessed: false,
updatedAt: new Date(),
- ...(event.payload.createdAt && {
- createdAt: new Date(event.payload.createdAt),
- }),
})
- .onConflictDoUpdate({
- target: documents.uuid,
- set: {
- title: rawContent.title,
- content: rawContent.contentToSave,
- description:
- "description" in rawContent
- ? (rawContent.description ?? "")
- : (event.payload.prefetched?.description ?? undefined),
- ogImage:
- "image" in rawContent
- ? (rawContent.image ?? "")
- : (event.payload.prefetched?.ogImage ?? undefined),
- raw: rawContent.contentToVectorize,
- isSuccessfullyProcessed: false,
- updatedAt: new Date(),
- },
- })
- .returning();
- } catch (error) {
- console.log("here's the error", error);
- // Check if error is a unique constraint violation
- if (
- error instanceof Error &&
- error.message.includes("document_url_user_id_idx")
- ) {
- // Document already exists for this user, stop workflow
- await database(this.env.HYPERDRIVE.connectionString)
- .delete(documents)
- .where(eq(documents.uuid, event.payload.uuid));
- throw new NonRetryableError("Document already exists for this user");
- }
- if (
- error instanceof Error &&
- error.message.includes("document_raw_user_idx")
- ) {
- await database(this.env.HYPERDRIVE.connectionString)
- .delete(documents)
- .where(eq(documents.uuid, event.payload.uuid));
- throw new NonRetryableError("The exact same document already exists");
- }
- throw error; // Re-throw other errors
+ .where(eq(documents.id, existingDocument.id));
+ console.log("[Workflow] Document updated successfully");
+
+ return [existingDocument];
}
- });
- if (!document || document.length === 0) {
- throw new Error(
- "Failed to create/update document - no document returned"
+ console.log(
+ "[Workflow] Updating document with UUID:",
+ event.payload.uuid
);
- }
+ // Create new document
+ const updated = await db
+ .update(documents)
+ .set({
+ title: rawContent.title,
+ description:
+ "description" in rawContent
+ ? (rawContent.description ?? "")
+ : (event.payload.prefetched?.description ?? undefined),
+ ogImage:
+ "image" in rawContent
+ ? (rawContent.image ?? "")
+ : (event.payload.prefetched?.ogImage ?? undefined),
+ content: rawContent.contentToSave,
+ contentHash,
+ isSuccessfullyProcessed: false,
+ updatedAt: new Date(),
+ })
+ .where(eq(documents.uuid, event.payload.uuid))
+ .returning();
+ console.log("[Workflow] Document update result:", {
+ updatedId: updated[0]?.id,
+ updatedUuid: updated[0]?.uuid,
+ });
+ return updated;
+ });
+
+ // Step 4: Process content
+ console.log("[Workflow] Processing content for document:", {
+ id: document[0].id,
+ uuid: document[0].uuid,
+ });
+ const chunked = await step.do("chunk content", async () =>
+ chunkText(rawContent.contentToVectorize, 768)
+ );
const model = openai(this.env, this.env.OPEN_AI_API_KEY).embedding(
"text-embedding-3-large",
@@ -149,7 +263,7 @@ export class ContentWorkflow extends WorkflowEntrypoint<Env, WorkflowParams> {
}
);
- // Step 3: Create chunks from the content.
+ // Create embeddings for chunks
const embeddings = await step.do(
"create embeddings",
{
@@ -167,52 +281,60 @@ export class ContentWorkflow extends WorkflowEntrypoint<Env, WorkflowParams> {
values: chunked,
}
);
-
return embeddings;
}
);
- // Step 4: Prepare chunk data
- const chunkInsertData: ChunkInsert[] = await step.do(
- "prepare chunk data",
- async () =>
- chunked.map((chunk, index) => ({
+ // Step 5: Update chunks
+ await step.do("update chunks", async () => {
+ const db = database(this.env.HYPERDRIVE.connectionString);
+
+ // Delete existing chunks if any
+ await db.delete(chunk).where(eq(chunk.documentId, document[0].id));
+
+ // Insert new chunks
+ const chunkInsertData: ChunkInsert[] = chunked.map(
+ (chunkText, index) => ({
documentId: document[0].id,
- textContent: chunk,
+ textContent: chunkText,
+ contentHash: generateHash(chunkText),
orderInDocument: index,
embeddings: embeddings[index],
- }))
- );
-
- console.log(chunkInsertData);
+ })
+ );
- // Step 5: Insert chunks
- if (chunkInsertData.length > 0) {
- await step.do("insert chunks", async () =>
- database(this.env.HYPERDRIVE.connectionString).transaction(
- async (trx) => {
- await trx.insert(chunk).values(chunkInsertData);
+ if (chunkInsertData.length > 0) {
+ await db.transaction(async (trx) => {
+ for (const chunkData of chunkInsertData) {
+ await trx
+ .insert(chunk)
+ .values(chunkData)
+ .onConflictDoNothing({ target: chunk.contentHash });
}
- )
- );
- }
+ });
+ }
+ });
- // step 6: add content to spaces
+ // Step 6: Mark document as processed
+ await step.do("mark document as processed", async () => {
+ await database(this.env.HYPERDRIVE.connectionString)
+ .update(documents)
+ .set({ isSuccessfullyProcessed: true })
+ .where(eq(documents.id, document[0].id));
+ });
+
+ // Step 7: Add content to spaces if specified
if (event.payload.spaces) {
await step.do("add content to spaces", async () => {
await database(this.env.HYPERDRIVE.connectionString).transaction(
async (trx) => {
- // First get the space IDs from the UUIDs
const spaceIds = await trx
.select({ id: spaces.id })
.from(spaces)
.where(inArray(spaces.uuid, event.payload.spaces ?? []));
- if (spaceIds.length === 0) {
- return;
- }
+ if (spaceIds.length === 0) return;
- // Then insert the content-space mappings using the actual space IDs
await trx.insert(contentToSpace).values(
spaceIds.map((space) => ({
contentId: document[0].id,
@@ -223,15 +345,5 @@ export class ContentWorkflow extends WorkflowEntrypoint<Env, WorkflowParams> {
);
});
}
-
- // Step 7: Mark the document as successfully processed
- await step.do("mark document as successfully processed", async () => {
- await database(this.env.HYPERDRIVE.connectionString)
- .update(documents)
- .set({
- isSuccessfullyProcessed: true,
- })
- .where(eq(documents.id, document[0].id));
- });
}
}