diff options
| author | Kush Thaker <[email protected]> | 2024-08-06 20:48:13 +0530 |
|---|---|---|
| committer | Kush Thaker <[email protected]> | 2024-08-06 20:48:13 +0530 |
| commit | 1336da8aae05a0acdb3e03561fa7378f238d3eda (patch) | |
| tree | 483144b7ae6a63635e2e3753168f34f5109c5a9c /apps | |
| parent | Merge branch 'kush/be-queue' of https://github.com/Dhravya/supermemory into k... (diff) | |
| download | supermemory-1336da8aae05a0acdb3e03561fa7378f238d3eda.tar.xz supermemory-1336da8aae05a0acdb3e03561fa7378f238d3eda.zip | |
Fix job errors not reflecting in D1; add delays on document insert in vectorize to help with open ai rate limits
Diffstat (limited to 'apps')
| -rw-r--r-- | apps/cf-ai-backend/src/helper.ts | 60 | ||||
| -rw-r--r-- | apps/cf-ai-backend/src/queueConsumer/helpers/processPage.ts | 2 | ||||
| -rw-r--r-- | apps/cf-ai-backend/src/queueConsumer/index.ts | 28 | ||||
| -rw-r--r-- | apps/cf-ai-backend/wrangler.toml | 2 | ||||
| -rw-r--r-- | apps/web/wrangler.toml | 5 |
5 files changed, 54 insertions, 43 deletions
diff --git a/apps/cf-ai-backend/src/helper.ts b/apps/cf-ai-backend/src/helper.ts index eadd9c21..998fd4b5 100644 --- a/apps/cf-ai-backend/src/helper.ts +++ b/apps/cf-ai-backend/src/helper.ts @@ -140,6 +140,7 @@ export async function batchCreateChunksAndEmbeddings({ //! If a user saves it through the extension, we don't want other users to be able to see it. // Requests from the extension should ALWAYS have a unique ID with the USERiD in it. // I cannot stress this enough, important for security. + const ourID = `${body.url}#supermemory-web`; const random = seededRandom(ourID); const uuid = @@ -262,21 +263,30 @@ export async function batchCreateChunksAndEmbeddings({ }, {}); const ids = []; - const preparedDocuments = chunks.chunks.map((chunk, i) => { + console.log("Page hit moving on to the for loop"); + for (let i = 0; i < chunks.chunks.length; i++) { + const chunk = chunks.chunks[i]; const id = `${uuid}-${i}`; ids.push(id); - return { + const document = { pageContent: chunk, metadata: { - content: chunk, ...commonMetaData, ...spaceMetadata, }, }; - }); + const docs = await store.addDocuments([document], { ids: [id] }); + console.log("Docs added:", docs); + // Wait for a second after every 20 documents for open ai rate limit + console.log( + "This is the 20th thing in the list?", + (i + 1) % 20 === 0, + ); + if ((i + 1) % 20 === 0) { + await new Promise((resolve) => setTimeout(resolve, 1000)); + } + } - const docs = await store.addDocuments(preparedDocuments, { ids: ids }); - console.log("Docs added:", docs); const { CF_KV_AUTH_TOKEN, CF_ACCOUNT_ID, KV_NAMESPACE_ID } = env; await bulkInsertKv( { CF_KV_AUTH_TOKEN, CF_ACCOUNT_ID, KV_NAMESPACE_ID }, @@ -299,21 +309,29 @@ export async function batchCreateChunksAndEmbeddings({ }, {}); const ids = []; - const preparedDocuments = chunks.chunks.map((chunk, i) => { + for (let i = 0; i < chunks.chunks.length; i++) { + const chunk = chunks.chunks[i]; const id = `${uuid}-${i}`; ids.push(id); - return { + const document = { pageContent: chunk, metadata: { - content: chunk, ...commonMetaData, ...spaceMetadata, }, }; - }); + const docs = await store.addDocuments([document], { ids: [id] }); + console.log("Docs added:", docs); + // Wait for a second after every 20 documents for open ai rate limit + console.log( + "This is the 20th thing in the list?", + (i + 1) % 20 === 0, + ); + if ((i + 1) % 20 === 0) { + await new Promise((resolve) => setTimeout(resolve, 1000)); + } + } - const docs = await store.addDocuments(preparedDocuments, { ids: ids }); - console.log("Docs added:", docs); const { CF_KV_AUTH_TOKEN, CF_ACCOUNT_ID, KV_NAMESPACE_ID } = env; await bulkInsertKv( { CF_KV_AUTH_TOKEN, CF_ACCOUNT_ID, KV_NAMESPACE_ID }, @@ -335,20 +353,27 @@ export async function batchCreateChunksAndEmbeddings({ }, {}); const ids = []; - const preparedDocuments = chunks.chunks.map((chunk, i) => { + for (let i = 0; i < chunks.chunks.length; i++) { + const chunk = chunks.chunks[i]; const id = `${uuid}-${i}`; ids.push(id); - return { + const document = { pageContent: chunk, metadata: { ...commonMetaData, ...spaceMetadata, }, }; - }); + const docs = await store.addDocuments([document], { ids: [id] }); + console.log("Docs added:", docs); + // Wait for a second after every 20 documents for open ai rate limit + console.log("This is the 20th thing in the list?", (i + 1) % 20 === 0); + if ((i + 1) % 20 === 0) { + console.log("-----------waiting atm"); + await new Promise((resolve) => setTimeout(resolve, 1000)); + } + } - const docs = await store.addDocuments(preparedDocuments, { ids: ids }); - console.log("Docs added:", docs); const { CF_KV_AUTH_TOKEN, CF_ACCOUNT_ID, KV_NAMESPACE_ID } = env; await bulkInsertKv( { CF_KV_AUTH_TOKEN, CF_ACCOUNT_ID, KV_NAMESPACE_ID }, @@ -356,5 +381,6 @@ export async function batchCreateChunksAndEmbeddings({ ); } } + return; } diff --git a/apps/cf-ai-backend/src/queueConsumer/helpers/processPage.ts b/apps/cf-ai-backend/src/queueConsumer/helpers/processPage.ts index f967736e..9a50d701 100644 --- a/apps/cf-ai-backend/src/queueConsumer/helpers/processPage.ts +++ b/apps/cf-ai-backend/src/queueConsumer/helpers/processPage.ts @@ -29,12 +29,12 @@ export async function processPage(input: { ), ); } - console.log("[This is the page content]", pageContent); const metadataResult = await getMetaData(input.url); if (isErr(metadataResult)) { throw metadataResult.error; } const metadata = metadataResult.value; + console.log("[this is the metadata]", metadata); return Ok({ pageContent, metadata }); } catch (e) { console.error("[Page Processing Error]", e); diff --git a/apps/cf-ai-backend/src/queueConsumer/index.ts b/apps/cf-ai-backend/src/queueConsumer/index.ts index 8ca23739..393f1fbf 100644 --- a/apps/cf-ai-backend/src/queueConsumer/index.ts +++ b/apps/cf-ai-backend/src/queueConsumer/index.ts @@ -45,7 +45,7 @@ const calculateExponentialBackoff = ( return baseDelaySeconds ** attempts; }; -const BASE_DELAY_SECONDS = 1.5; +const BASE_DELAY_SECONDS = 5; export async function queue( batch: MessageBatch<{ content: string; @@ -102,6 +102,7 @@ export async function queue( .set({ attempts: existingJob.value[0].attempts + 1, updatedAt: new Date(), + status: "Processing", }) .where(eq(jobs.id, jobId)), d1ErrorFactory, @@ -248,7 +249,7 @@ export async function queue( if (isErr(vectorResult)) { await db .update(jobs) - .set({ error: vectorResult.error }) + .set({ error: vectorResult.error.message, status: "error" }) .where(eq(jobs.id, jobId)); message.retry({ delaySeconds: calculateExponentialBackoff( @@ -288,7 +289,7 @@ export async function queue( if (isErr(insertResponse)) { await db .update(jobs) - .set({ error: insertResponse.error }) + .set({ error: insertResponse.error.message, status: "error" }) .where(eq(jobs.id, jobId)); message.retry({ delaySeconds: calculateExponentialBackoff( @@ -340,7 +341,7 @@ export async function queue( } } catch (e) { console.error("Error in simulated transaction", e.message); - console.log("Rooling back changes"); + message.retry({ delaySeconds: calculateExponentialBackoff( message.attempts, @@ -366,23 +367,6 @@ export async function queue( /* To do: -1. Abstract and shitft the entrie creatememory function to the queue consumer --> Hopefully done -2. Make the front end use that instead of whatever khichidi is going on right now -3. remove getMetada form the lib file as it's not being used anywhere else -4. Figure out the limit stuff ( server action for that seems fine because no use in limiting after they already in the queue rigth? ) -5. Figure out the initQuery stuff ( ;( ) --> This is a bad way of doing stuff :0 -6. How do I hande the content already exists wala use case? --> Also how do I figure out limits? - - - -8. Wrap the d1 thing in a transaction and then write to vectorize if d1 is sucessful if it's not then just error out ( if d1 fails dlq, recoverable failure --> retry ) - -Firt write to d1 in a transaction ( sotredContent + sapces ) --> write to vectorize --> vectorize failes --> reset d1 alternatively first we can also do the vectorise stuff if that suceeds then do the d1 stuff in a batch right? - - -DEBUG: -What's hapenning: -1. The stuff in the d1 is updating but nothing is hapenning in the vectorize for some reason - +Figure out rate limits!! */ diff --git a/apps/cf-ai-backend/wrangler.toml b/apps/cf-ai-backend/wrangler.toml index 665ca593..b5d67eb6 100644 --- a/apps/cf-ai-backend/wrangler.toml +++ b/apps/cf-ai-backend/wrangler.toml @@ -44,7 +44,7 @@ mode = "smart" [[queues.consumers]] queue = "embedchunks-queue" max_batch_size = 100 - max_retries = 10 + max_retries = 3 dead_letter_queue = "embedchunks-dlq" diff --git a/apps/web/wrangler.toml b/apps/web/wrangler.toml index a6232450..54b27325 100644 --- a/apps/web/wrangler.toml +++ b/apps/web/wrangler.toml @@ -26,8 +26,9 @@ bucket_name = "dev-r2-anycontext" [[d1_databases]] binding = "DATABASE" -database_name = "dev-d1-anycontext" -database_id = "fc562605-157a-4f60-b439-2a24ffed5b4c" +database_name = "supermemlocal" +database_id = "0f93c990-72fb-489c-8563-57a7bb18dc43" + [[env.production.d1_databases]] binding = "DATABASE" |