aboutsummaryrefslogtreecommitdiff
path: root/apps/cf-ai-backend/src/queueConsumer
diff options
context:
space:
mode:
authorKush Thaker <[email protected]>2024-08-06 20:48:13 +0530
committerKush Thaker <[email protected]>2024-08-06 20:48:13 +0530
commit1336da8aae05a0acdb3e03561fa7378f238d3eda (patch)
tree483144b7ae6a63635e2e3753168f34f5109c5a9c /apps/cf-ai-backend/src/queueConsumer
parentMerge branch 'kush/be-queue' of https://github.com/Dhravya/supermemory into k... (diff)
downloadsupermemory-1336da8aae05a0acdb3e03561fa7378f238d3eda.tar.xz
supermemory-1336da8aae05a0acdb3e03561fa7378f238d3eda.zip
Fix job errors not reflecting in D1; add delays on document insert in vectorize to help with open ai rate limits
Diffstat (limited to 'apps/cf-ai-backend/src/queueConsumer')
-rw-r--r--apps/cf-ai-backend/src/queueConsumer/helpers/processPage.ts2
-rw-r--r--apps/cf-ai-backend/src/queueConsumer/index.ts28
2 files changed, 7 insertions, 23 deletions
diff --git a/apps/cf-ai-backend/src/queueConsumer/helpers/processPage.ts b/apps/cf-ai-backend/src/queueConsumer/helpers/processPage.ts
index f967736e..9a50d701 100644
--- a/apps/cf-ai-backend/src/queueConsumer/helpers/processPage.ts
+++ b/apps/cf-ai-backend/src/queueConsumer/helpers/processPage.ts
@@ -29,12 +29,12 @@ export async function processPage(input: {
),
);
}
- console.log("[This is the page content]", pageContent);
const metadataResult = await getMetaData(input.url);
if (isErr(metadataResult)) {
throw metadataResult.error;
}
const metadata = metadataResult.value;
+ console.log("[this is the metadata]", metadata);
return Ok({ pageContent, metadata });
} catch (e) {
console.error("[Page Processing Error]", e);
diff --git a/apps/cf-ai-backend/src/queueConsumer/index.ts b/apps/cf-ai-backend/src/queueConsumer/index.ts
index 8ca23739..393f1fbf 100644
--- a/apps/cf-ai-backend/src/queueConsumer/index.ts
+++ b/apps/cf-ai-backend/src/queueConsumer/index.ts
@@ -45,7 +45,7 @@ const calculateExponentialBackoff = (
return baseDelaySeconds ** attempts;
};
-const BASE_DELAY_SECONDS = 1.5;
+const BASE_DELAY_SECONDS = 5;
export async function queue(
batch: MessageBatch<{
content: string;
@@ -102,6 +102,7 @@ export async function queue(
.set({
attempts: existingJob.value[0].attempts + 1,
updatedAt: new Date(),
+ status: "Processing",
})
.where(eq(jobs.id, jobId)),
d1ErrorFactory,
@@ -248,7 +249,7 @@ export async function queue(
if (isErr(vectorResult)) {
await db
.update(jobs)
- .set({ error: vectorResult.error })
+ .set({ error: vectorResult.error.message, status: "error" })
.where(eq(jobs.id, jobId));
message.retry({
delaySeconds: calculateExponentialBackoff(
@@ -288,7 +289,7 @@ export async function queue(
if (isErr(insertResponse)) {
await db
.update(jobs)
- .set({ error: insertResponse.error })
+ .set({ error: insertResponse.error.message, status: "error" })
.where(eq(jobs.id, jobId));
message.retry({
delaySeconds: calculateExponentialBackoff(
@@ -340,7 +341,7 @@ export async function queue(
}
} catch (e) {
console.error("Error in simulated transaction", e.message);
- console.log("Rooling back changes");
+
message.retry({
delaySeconds: calculateExponentialBackoff(
message.attempts,
@@ -366,23 +367,6 @@ export async function queue(
/*
To do:
-1. Abstract and shitft the entrie creatememory function to the queue consumer --> Hopefully done
-2. Make the front end use that instead of whatever khichidi is going on right now
-3. remove getMetada form the lib file as it's not being used anywhere else
-4. Figure out the limit stuff ( server action for that seems fine because no use in limiting after they already in the queue rigth? )
-5. Figure out the initQuery stuff ( ;( ) --> This is a bad way of doing stuff :0
-6. How do I hande the content already exists wala use case? --> Also how do I figure out limits?
-
-
-
-8. Wrap the d1 thing in a transaction and then write to vectorize if d1 is sucessful if it's not then just error out ( if d1 fails dlq, recoverable failure --> retry )
-
-Firt write to d1 in a transaction ( sotredContent + sapces ) --> write to vectorize --> vectorize failes --> reset d1 alternatively first we can also do the vectorise stuff if that suceeds then do the d1 stuff in a batch right?
-
-
-DEBUG:
-What's hapenning:
-1. The stuff in the d1 is updating but nothing is hapenning in the vectorize for some reason
-
+Figure out rate limits!!
*/