aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKush Thaker <[email protected]>2024-08-06 20:48:13 +0530
committerKush Thaker <[email protected]>2024-08-06 20:48:13 +0530
commit1336da8aae05a0acdb3e03561fa7378f238d3eda (patch)
tree483144b7ae6a63635e2e3753168f34f5109c5a9c
parentMerge branch 'kush/be-queue' of https://github.com/Dhravya/supermemory into k... (diff)
downloadsupermemory-1336da8aae05a0acdb3e03561fa7378f238d3eda.tar.xz
supermemory-1336da8aae05a0acdb3e03561fa7378f238d3eda.zip
Fix job errors not reflecting in D1; add delays on document insert in vectorize to help with open ai rate limits
-rw-r--r--apps/cf-ai-backend/src/helper.ts60
-rw-r--r--apps/cf-ai-backend/src/queueConsumer/helpers/processPage.ts2
-rw-r--r--apps/cf-ai-backend/src/queueConsumer/index.ts28
-rw-r--r--apps/cf-ai-backend/wrangler.toml2
-rw-r--r--apps/web/wrangler.toml5
5 files changed, 54 insertions, 43 deletions
diff --git a/apps/cf-ai-backend/src/helper.ts b/apps/cf-ai-backend/src/helper.ts
index eadd9c21..998fd4b5 100644
--- a/apps/cf-ai-backend/src/helper.ts
+++ b/apps/cf-ai-backend/src/helper.ts
@@ -140,6 +140,7 @@ export async function batchCreateChunksAndEmbeddings({
//! If a user saves it through the extension, we don't want other users to be able to see it.
// Requests from the extension should ALWAYS have a unique ID with the USERiD in it.
// I cannot stress this enough, important for security.
+
const ourID = `${body.url}#supermemory-web`;
const random = seededRandom(ourID);
const uuid =
@@ -262,21 +263,30 @@ export async function batchCreateChunksAndEmbeddings({
}, {});
const ids = [];
- const preparedDocuments = chunks.chunks.map((chunk, i) => {
+ console.log("Page hit moving on to the for loop");
+ for (let i = 0; i < chunks.chunks.length; i++) {
+ const chunk = chunks.chunks[i];
const id = `${uuid}-${i}`;
ids.push(id);
- return {
+ const document = {
pageContent: chunk,
metadata: {
- content: chunk,
...commonMetaData,
...spaceMetadata,
},
};
- });
+ const docs = await store.addDocuments([document], { ids: [id] });
+ console.log("Docs added:", docs);
+ // Wait for a second after every 20 documents for open ai rate limit
+ console.log(
+ "This is the 20th thing in the list?",
+ (i + 1) % 20 === 0,
+ );
+ if ((i + 1) % 20 === 0) {
+ await new Promise((resolve) => setTimeout(resolve, 1000));
+ }
+ }
- const docs = await store.addDocuments(preparedDocuments, { ids: ids });
- console.log("Docs added:", docs);
const { CF_KV_AUTH_TOKEN, CF_ACCOUNT_ID, KV_NAMESPACE_ID } = env;
await bulkInsertKv(
{ CF_KV_AUTH_TOKEN, CF_ACCOUNT_ID, KV_NAMESPACE_ID },
@@ -299,21 +309,29 @@ export async function batchCreateChunksAndEmbeddings({
}, {});
const ids = [];
- const preparedDocuments = chunks.chunks.map((chunk, i) => {
+ for (let i = 0; i < chunks.chunks.length; i++) {
+ const chunk = chunks.chunks[i];
const id = `${uuid}-${i}`;
ids.push(id);
- return {
+ const document = {
pageContent: chunk,
metadata: {
- content: chunk,
...commonMetaData,
...spaceMetadata,
},
};
- });
+ const docs = await store.addDocuments([document], { ids: [id] });
+ console.log("Docs added:", docs);
+ // Wait for a second after every 20 documents for open ai rate limit
+ console.log(
+ "This is the 20th thing in the list?",
+ (i + 1) % 20 === 0,
+ );
+ if ((i + 1) % 20 === 0) {
+ await new Promise((resolve) => setTimeout(resolve, 1000));
+ }
+ }
- const docs = await store.addDocuments(preparedDocuments, { ids: ids });
- console.log("Docs added:", docs);
const { CF_KV_AUTH_TOKEN, CF_ACCOUNT_ID, KV_NAMESPACE_ID } = env;
await bulkInsertKv(
{ CF_KV_AUTH_TOKEN, CF_ACCOUNT_ID, KV_NAMESPACE_ID },
@@ -335,20 +353,27 @@ export async function batchCreateChunksAndEmbeddings({
}, {});
const ids = [];
- const preparedDocuments = chunks.chunks.map((chunk, i) => {
+ for (let i = 0; i < chunks.chunks.length; i++) {
+ const chunk = chunks.chunks[i];
const id = `${uuid}-${i}`;
ids.push(id);
- return {
+ const document = {
pageContent: chunk,
metadata: {
...commonMetaData,
...spaceMetadata,
},
};
- });
+ const docs = await store.addDocuments([document], { ids: [id] });
+ console.log("Docs added:", docs);
+ // Wait for a second after every 20 documents for open ai rate limit
+ console.log("This is the 20th thing in the list?", (i + 1) % 20 === 0);
+ if ((i + 1) % 20 === 0) {
+ console.log("-----------waiting atm");
+ await new Promise((resolve) => setTimeout(resolve, 1000));
+ }
+ }
- const docs = await store.addDocuments(preparedDocuments, { ids: ids });
- console.log("Docs added:", docs);
const { CF_KV_AUTH_TOKEN, CF_ACCOUNT_ID, KV_NAMESPACE_ID } = env;
await bulkInsertKv(
{ CF_KV_AUTH_TOKEN, CF_ACCOUNT_ID, KV_NAMESPACE_ID },
@@ -356,5 +381,6 @@ export async function batchCreateChunksAndEmbeddings({
);
}
}
+
return;
}
diff --git a/apps/cf-ai-backend/src/queueConsumer/helpers/processPage.ts b/apps/cf-ai-backend/src/queueConsumer/helpers/processPage.ts
index f967736e..9a50d701 100644
--- a/apps/cf-ai-backend/src/queueConsumer/helpers/processPage.ts
+++ b/apps/cf-ai-backend/src/queueConsumer/helpers/processPage.ts
@@ -29,12 +29,12 @@ export async function processPage(input: {
),
);
}
- console.log("[This is the page content]", pageContent);
const metadataResult = await getMetaData(input.url);
if (isErr(metadataResult)) {
throw metadataResult.error;
}
const metadata = metadataResult.value;
+ console.log("[this is the metadata]", metadata);
return Ok({ pageContent, metadata });
} catch (e) {
console.error("[Page Processing Error]", e);
diff --git a/apps/cf-ai-backend/src/queueConsumer/index.ts b/apps/cf-ai-backend/src/queueConsumer/index.ts
index 8ca23739..393f1fbf 100644
--- a/apps/cf-ai-backend/src/queueConsumer/index.ts
+++ b/apps/cf-ai-backend/src/queueConsumer/index.ts
@@ -45,7 +45,7 @@ const calculateExponentialBackoff = (
return baseDelaySeconds ** attempts;
};
-const BASE_DELAY_SECONDS = 1.5;
+const BASE_DELAY_SECONDS = 5;
export async function queue(
batch: MessageBatch<{
content: string;
@@ -102,6 +102,7 @@ export async function queue(
.set({
attempts: existingJob.value[0].attempts + 1,
updatedAt: new Date(),
+ status: "Processing",
})
.where(eq(jobs.id, jobId)),
d1ErrorFactory,
@@ -248,7 +249,7 @@ export async function queue(
if (isErr(vectorResult)) {
await db
.update(jobs)
- .set({ error: vectorResult.error })
+ .set({ error: vectorResult.error.message, status: "error" })
.where(eq(jobs.id, jobId));
message.retry({
delaySeconds: calculateExponentialBackoff(
@@ -288,7 +289,7 @@ export async function queue(
if (isErr(insertResponse)) {
await db
.update(jobs)
- .set({ error: insertResponse.error })
+ .set({ error: insertResponse.error.message, status: "error" })
.where(eq(jobs.id, jobId));
message.retry({
delaySeconds: calculateExponentialBackoff(
@@ -340,7 +341,7 @@ export async function queue(
}
} catch (e) {
console.error("Error in simulated transaction", e.message);
- console.log("Rooling back changes");
+
message.retry({
delaySeconds: calculateExponentialBackoff(
message.attempts,
@@ -366,23 +367,6 @@ export async function queue(
/*
To do:
-1. Abstract and shitft the entrie creatememory function to the queue consumer --> Hopefully done
-2. Make the front end use that instead of whatever khichidi is going on right now
-3. remove getMetada form the lib file as it's not being used anywhere else
-4. Figure out the limit stuff ( server action for that seems fine because no use in limiting after they already in the queue rigth? )
-5. Figure out the initQuery stuff ( ;( ) --> This is a bad way of doing stuff :0
-6. How do I hande the content already exists wala use case? --> Also how do I figure out limits?
-
-
-
-8. Wrap the d1 thing in a transaction and then write to vectorize if d1 is sucessful if it's not then just error out ( if d1 fails dlq, recoverable failure --> retry )
-
-Firt write to d1 in a transaction ( sotredContent + sapces ) --> write to vectorize --> vectorize failes --> reset d1 alternatively first we can also do the vectorise stuff if that suceeds then do the d1 stuff in a batch right?
-
-
-DEBUG:
-What's hapenning:
-1. The stuff in the d1 is updating but nothing is hapenning in the vectorize for some reason
-
+Figure out rate limits!!
*/
diff --git a/apps/cf-ai-backend/wrangler.toml b/apps/cf-ai-backend/wrangler.toml
index 665ca593..b5d67eb6 100644
--- a/apps/cf-ai-backend/wrangler.toml
+++ b/apps/cf-ai-backend/wrangler.toml
@@ -44,7 +44,7 @@ mode = "smart"
[[queues.consumers]]
queue = "embedchunks-queue"
max_batch_size = 100
- max_retries = 10
+ max_retries = 3
dead_letter_queue = "embedchunks-dlq"
diff --git a/apps/web/wrangler.toml b/apps/web/wrangler.toml
index a6232450..54b27325 100644
--- a/apps/web/wrangler.toml
+++ b/apps/web/wrangler.toml
@@ -26,8 +26,9 @@ bucket_name = "dev-r2-anycontext"
[[d1_databases]]
binding = "DATABASE"
-database_name = "dev-d1-anycontext"
-database_id = "fc562605-157a-4f60-b439-2a24ffed5b4c"
+database_name = "supermemlocal"
+database_id = "0f93c990-72fb-489c-8563-57a7bb18dc43"
+
[[env.production.d1_databases]]
binding = "DATABASE"