aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKush Thaker <[email protected]>2024-08-05 21:25:11 +0530
committerKush Thaker <[email protected]>2024-08-05 21:25:11 +0530
commite4fd7f5aacc3c9f7f000e1858248d49aa4d3410e (patch)
tree4f1af30951860629ef376c37bc995a65dda8b813
parentdb schema in packages (diff)
downloadsupermemory-e4fd7f5aacc3c9f7f000e1858248d49aa4d3410e.tar.xz
supermemory-e4fd7f5aacc3c9f7f000e1858248d49aa4d3410e.zip
move limit to backend and thread service binding
-rw-r--r--apps/cf-ai-backend/src/errors/baseError.ts2
-rw-r--r--apps/cf-ai-backend/src/errors/results.ts19
-rw-r--r--apps/cf-ai-backend/src/helper.ts23
-rw-r--r--apps/cf-ai-backend/src/index.ts117
-rw-r--r--apps/cf-ai-backend/src/queueConsumer/helpers/initQuery.ts58
-rw-r--r--apps/cf-ai-backend/src/queueConsumer/helpers/processPage.ts13
-rw-r--r--apps/cf-ai-backend/src/queueConsumer/helpers/processTweet.ts73
-rw-r--r--apps/cf-ai-backend/src/queueConsumer/index.ts336
-rw-r--r--apps/cf-ai-backend/src/types.ts3
-rw-r--r--apps/cf-ai-backend/wrangler.toml6
-rw-r--r--apps/web/app/actions/doers.ts526
-rw-r--r--apps/web/app/api/store/helper.ts18
-rw-r--r--apps/web/lib/constants.ts6
-rw-r--r--packages/db/schema.ts10
-rw-r--r--packages/shared-types/index.ts9
15 files changed, 701 insertions, 518 deletions
diff --git a/apps/cf-ai-backend/src/errors/baseError.ts b/apps/cf-ai-backend/src/errors/baseError.ts
index 2723d45b..0dcc2203 100644
--- a/apps/cf-ai-backend/src/errors/baseError.ts
+++ b/apps/cf-ai-backend/src/errors/baseError.ts
@@ -7,7 +7,7 @@ export class BaseHttpError extends Error {
this.status = status;
this.message = message;
Object.setPrototypeOf(this, new.target.prototype); // Restore prototype chain
- }
+ }
}
diff --git a/apps/cf-ai-backend/src/errors/results.ts b/apps/cf-ai-backend/src/errors/results.ts
index 87ea0c63..ccce1396 100644
--- a/apps/cf-ai-backend/src/errors/results.ts
+++ b/apps/cf-ai-backend/src/errors/results.ts
@@ -14,15 +14,18 @@ export const Err = <E extends BaseError>(error: E): Result<never, E> => {
export async function wrap<T, E extends BaseError>(
p: Promise<T>,
- errorFactory: (err: Error) => E,
-): Promise<Result<T, E>> {
+ errorFactory: (err: Error, source: string) => E,
+ source: string = "unspecified"
+ ): Promise<Result<T, E>> {
try {
- return Ok(await p);
+ return Ok(await p);
} catch (e) {
- return Err(errorFactory(e as Error));
+ return Err(errorFactory(e as Error, source));
}
-}
+ }
-export function isErr<T, E extends Error>(result: Result<T, E>): result is { ok: false; error: E } {
- return !result.ok;
- } \ No newline at end of file
+export function isErr<T, E extends Error>(
+ result: Result<T, E>,
+): result is { ok: false; error: E } {
+ return !result.ok;
+}
diff --git a/apps/cf-ai-backend/src/helper.ts b/apps/cf-ai-backend/src/helper.ts
index 1568996a..eadd9c21 100644
--- a/apps/cf-ai-backend/src/helper.ts
+++ b/apps/cf-ai-backend/src/helper.ts
@@ -9,17 +9,14 @@ import { z } from "zod";
import { seededRandom } from "./utils/seededRandom";
import { bulkInsertKv } from "./utils/kvBulkInsert";
-export async function initQuery(
- c: Context<{ Bindings: Env }>,
- model: string = "gpt-4o",
-) {
+export async function initQuery(env: Env, model: string = "gpt-4o") {
const embeddings = new OpenAIEmbeddings({
- apiKey: c.env.OPENAI_API_KEY,
+ apiKey: env.OPENAI_API_KEY,
modelName: "text-embedding-3-small",
});
const store = new CloudflareVectorizeStore(embeddings, {
- index: c.env.VECTORIZE_INDEX,
+ index: env.VECTORIZE_INDEX,
});
let selectedModel:
@@ -30,7 +27,7 @@ export async function initQuery(
switch (model) {
case "claude-3-opus":
const anthropic = createAnthropic({
- apiKey: c.env.ANTHROPIC_API_KEY,
+ apiKey: env.ANTHROPIC_API_KEY,
baseURL:
"https://gateway.ai.cloudflare.com/v1/47c2b4d598af9d423c06fc9f936226d5/supermemory/anthropic",
});
@@ -39,7 +36,7 @@ export async function initQuery(
break;
case "gemini-1.5-pro":
const googleai = createGoogleGenerativeAI({
- apiKey: c.env.GOOGLE_AI_API_KEY,
+ apiKey: env.GOOGLE_AI_API_KEY,
baseURL:
"https://gateway.ai.cloudflare.com/v1/47c2b4d598af9d423c06fc9f936226d5/supermemory/google-vertex-ai",
});
@@ -49,7 +46,7 @@ export async function initQuery(
case "gpt-4o":
default:
const openai = createOpenAI({
- apiKey: c.env.OPENAI_API_KEY,
+ apiKey: env.OPENAI_API_KEY,
baseURL:
"https://gateway.ai.cloudflare.com/v1/47c2b4d598af9d423c06fc9f936226d5/supermemory/openai",
compatibility: "strict",
@@ -204,7 +201,7 @@ export async function batchCreateChunksAndEmbeddings({
const commonMetaData = {
type: body.type ?? "tweet",
title: body.title?.slice(0, 50) ?? "",
- description: body.description ?? "",
+ description: body.description?.slice(0, 50) ?? "",
url: body.url,
[sanitizeKey(`user-${body.user}`)]: 1,
};
@@ -255,7 +252,7 @@ export async function batchCreateChunksAndEmbeddings({
const commonMetaData = {
type: body.type ?? "page",
title: body.title?.slice(0, 50) ?? "",
- description: body.description ?? "",
+ description: body.description?.slice(0, 50) ?? "",
url: body.url,
[sanitizeKey(`user-${body.user}`)]: 1,
};
@@ -292,7 +289,7 @@ export async function batchCreateChunksAndEmbeddings({
const commonMetaData = {
title: body.title?.slice(0, 50) ?? "",
type: body.type ?? "page",
- description: body.description ?? "",
+ description: body.description?.slice(0, 50) ?? "",
url: body.url,
[sanitizeKey(`user-${body.user}`)]: 1,
};
@@ -328,7 +325,7 @@ export async function batchCreateChunksAndEmbeddings({
const commonMetaData = {
type: body.type ?? "image",
title: body.title,
- description: body.description ?? "",
+ description: body.description?.slice(0, 50) ?? "",
url: body.url,
[sanitizeKey(`user-${body.user}`)]: 1,
};
diff --git a/apps/cf-ai-backend/src/index.ts b/apps/cf-ai-backend/src/index.ts
index 4949fab3..a46d0080 100644
--- a/apps/cf-ai-backend/src/index.ts
+++ b/apps/cf-ai-backend/src/index.ts
@@ -24,12 +24,18 @@ import { zValidator } from "@hono/zod-validator";
import chunkText from "./queueConsumer/chunkers/chonker";
import { systemPrompt, template } from "./prompts/prompt1";
import { swaggerUI } from "@hono/swagger-ui";
+import { database } from "./db";
+import { storedContent } from "@repo/db/schema";
+import { sql, and, eq } from "drizzle-orm";
+import { LIMITS } from "@repo/shared-types";
+import { typeDecider } from "./queueConsumer/utils/typeDecider";
// import { chunkThread } from "./utils/chunkTweet";
import {
chunkNote,
chunkPage,
} from "./queueConsumer/chunkers/chunkPageOrNotes";
import { queue } from "./queueConsumer";
+import { isErr } from "./errors/results";
const app = new Hono<{ Bindings: Env }>();
@@ -68,42 +74,75 @@ app.get("/api/health", (c) => {
app.post("/api/add", zValidator("json", vectorBody), async (c) => {
try {
- // console.log("api/add hit!!!!");
const body = c.req.valid("json");
- const spaceNumbers = body.spaces.map((s: string) => Number(s));
- await c.env.EMBEDCHUNKS_QUEUE.send({
- content: body.url,
- user: body.user,
- space: spaceNumbers,
- });
-
- // const { store } = await initQuery(c);
+ //This is something I don't like
+ // console.log("api/add hit!!!!");
+ //Have to do limit on this also duplicate check here
+ const db = database(c.env);
+ const typeResult = typeDecider(body.url);
+
+ const saveToDbUrl =
+ (body.url.split("#supermemory-user-")[0] ?? body.url) + // Why does this have to be a split from #supermemory-user?
+ "#supermemory-user-" +
+ body.user;
+
+ console.log(
+ "---------------------------------------------------------------------------------------------------------------------------------------------",
+ saveToDbUrl,
+ );
+ const alreadyExist = await db
+ .select()
+ .from(storedContent)
+ .where(eq(storedContent.baseUrl, saveToDbUrl));
+ console.log(
+ "------------------------------------------------",
+ JSON.stringify(alreadyExist),
+ );
- // console.log(body.spaces);
- // let chunks: TweetChunks | PageOrNoteChunks;
- // // remove everything in <raw> tags
- // // const newPageContent = body.pageContent?.replace(/<raw>.*?<\/raw>/g, "");
+ if (alreadyExist.length > 0) {
+ console.log(
+ "------------------------------------------------------------------------------------------------I exist------------------------",
+ );
+ return c.json({ status: "error", message: "the content already exists" });
+ }
- // switch (body.type) {
- // case "tweet":
- // chunks = chunkThread(body.pageContent);
- // break;
+ if (isErr(typeResult)) {
+ throw typeResult.error;
+ }
+ // limiting in the backend
+ const type = typeResult.value;
+ const countResult = await db
+ .select({
+ count: sql<number>`count(*)`.mapWith(Number),
+ })
+ .from(storedContent)
+ .where(
+ and(eq(storedContent.userId, body.user), eq(storedContent.type, type)),
+ );
- // case "page":
- // chunks = chunkPage(body.pageContent);
- // break;
+ const currentCount = countResult[0]?.count || 0;
+ const totalLimit = LIMITS[type as keyof typeof LIMITS];
+ const remainingLimit = totalLimit - currentCount;
+ const items = 1;
+ const isWithinLimit = items <= remainingLimit;
- // case "note":
- // chunks = chunkNote(body.pageContent);
- // break;
- // }
+ // unique contraint check
- // await batchCreateChunksAndEmbeddings({
- // store,
- // body,
- // chunks: chunks,
- // env: c,
- // });
+ if (isWithinLimit) {
+ const spaceNumbers = body.spaces.map((s: string) => Number(s));
+ await c.env.EMBEDCHUNKS_QUEUE.send({
+ content: body.url,
+ user: body.user,
+ space: spaceNumbers,
+ type: type,
+ });
+ } else {
+ return c.json({
+ status: "error",
+ message:
+ "You have exceed the current limit for this type of document, please try removing something form memories ",
+ });
+ }
return c.json({ status: "ok" });
} catch (error) {
@@ -137,7 +176,7 @@ app.post(
async (c) => {
const body = c.req.valid("form");
- const { store } = await initQuery(c);
+ const { store } = await initQuery(c.env);
if (!(body.images || body["images[]"])) {
return c.json({ status: "error", message: "No images found" }, 400);
@@ -203,7 +242,7 @@ app.get(
async (c) => {
const query = c.req.valid("query");
- const { model } = await initQuery(c);
+ const { model } = await initQuery(c.env);
const response = await streamText({ model, prompt: query.query });
const r = response.toTextStreamResponse();
@@ -221,7 +260,7 @@ app.get(
[`user-${user}`]: 1,
};
- const { store } = await initQuery(c);
+ const { store } = await initQuery(c.env);
const queryAsVector = await store.embeddings.embedQuery(query);
const resp = await c.env.VECTORIZE_INDEX.query(queryAsVector, {
@@ -273,7 +312,7 @@ app.post(
const { query, user } = c.req.valid("query");
const { chatHistory } = c.req.valid("json");
- const { store, model } = await initQuery(c);
+ const { store, model } = await initQuery(c.env);
let task: "add" | "chat" = "chat";
let thingToAdd: "page" | "image" | "text" | undefined = undefined;
@@ -448,7 +487,7 @@ app.post(
const spaces = query.spaces?.split(",") ?? [undefined];
// Get the AI model maker and vector store
- const { model, store } = await initQuery(c, query.model);
+ const { model, store } = await initQuery(c.env, query.model);
if (!body.sources) {
const filter: VectorizeVectorMetadataFilter = {
@@ -591,6 +630,8 @@ app.post(
}
}
+ //Serach mem0
+
const preparedContext = body.sources.normalizedData.map(
({ metadata, score, normalizedScore }) => ({
context: `Website title: ${metadata!.title}\nDescription: ${metadata!.description}\nURL: ${metadata!.url}\nContent: ${metadata!.text}`,
@@ -601,7 +642,7 @@ app.post(
const initialMessages: CoreMessage[] = [
{ role: "user", content: systemPrompt },
- { role: "assistant", content: "Hello, how can I help?" },
+ { role: "assistant", content: "Hello, how can I help?" }, // prase and add memory json here
];
const prompt = template({
@@ -637,7 +678,7 @@ app.delete(
async (c) => {
const { websiteUrl, user } = c.req.valid("query");
- const { store } = await initQuery(c);
+ const { store } = await initQuery(c.env);
await deleteDocument({ url: websiteUrl, user, c, store });
@@ -657,7 +698,7 @@ app.get(
),
async (c) => {
const { context, request } = c.req.valid("query");
- const { model } = await initQuery(c);
+ const { model } = await initQuery(c.env);
const response = await streamText({
model,
diff --git a/apps/cf-ai-backend/src/queueConsumer/helpers/initQuery.ts b/apps/cf-ai-backend/src/queueConsumer/helpers/initQuery.ts
deleted file mode 100644
index a7d85c23..00000000
--- a/apps/cf-ai-backend/src/queueConsumer/helpers/initQuery.ts
+++ /dev/null
@@ -1,58 +0,0 @@
-import { Env } from "../../types";
-import { OpenAIEmbeddings } from "../../utils/OpenAIEmbedder";
-import { CloudflareVectorizeStore } from "@langchain/cloudflare";
-import { createOpenAI } from "@ai-sdk/openai";
-import { createGoogleGenerativeAI } from "@ai-sdk/google";
-import { createAnthropic } from "@ai-sdk/anthropic";
-
-export async function initQQuery(
- env: Env,
- model: string = "gpt-4o",
-) {
- const embeddings = new OpenAIEmbeddings({
- apiKey: env.OPENAI_API_KEY,
- modelName: "text-embedding-3-small",
- });
-
- const store = new CloudflareVectorizeStore(embeddings, {
- index: env.VECTORIZE_INDEX,
- });
-
- let selectedModel:
- | ReturnType<ReturnType<typeof createOpenAI>>
- | ReturnType<ReturnType<typeof createGoogleGenerativeAI>>
- | ReturnType<ReturnType<typeof createAnthropic>>;
-
- switch (model) {
- case "claude-3-opus":
- const anthropic = createAnthropic({
- apiKey: env.ANTHROPIC_API_KEY,
- baseURL:
- "https://gateway.ai.cloudflare.com/v1/47c2b4d598af9d423c06fc9f936226d5/supermemory/anthropic",
- });
- selectedModel = anthropic.chat("claude-3-opus-20240229");
- console.log("Selected model: ", selectedModel);
- break;
- case "gemini-1.5-pro":
- const googleai = createGoogleGenerativeAI({
- apiKey: env.GOOGLE_AI_API_KEY,
- baseURL:
- "https://gateway.ai.cloudflare.com/v1/47c2b4d598af9d423c06fc9f936226d5/supermemory/google-vertex-ai",
- });
- selectedModel = googleai.chat("models/gemini-1.5-pro-latest");
- console.log("Selected model: ", selectedModel);
- break;
- case "gpt-4o":
- default:
- const openai = createOpenAI({
- apiKey: env.OPENAI_API_KEY,
- baseURL:
- "https://gateway.ai.cloudflare.com/v1/47c2b4d598af9d423c06fc9f936226d5/supermemory/openai",
- compatibility: "strict",
- });
- selectedModel = openai.chat("gpt-4o-mini");
- break;
- }
-
- return { store, model: selectedModel };
-} \ No newline at end of file
diff --git a/apps/cf-ai-backend/src/queueConsumer/helpers/processPage.ts b/apps/cf-ai-backend/src/queueConsumer/helpers/processPage.ts
index 6b28c975..f967736e 100644
--- a/apps/cf-ai-backend/src/queueConsumer/helpers/processPage.ts
+++ b/apps/cf-ai-backend/src/queueConsumer/helpers/processPage.ts
@@ -10,13 +10,14 @@ class ProcessPageError extends BaseError {
type PageProcessResult = { pageContent: string; metadata: Metadata };
-export async function processPage(
- url: string,
-): Promise<Result<PageProcessResult, ProcessPageError>> {
+export async function processPage(input: {
+ url: string;
+ securityKey: string;
+}): Promise<Result<PageProcessResult, ProcessPageError>> {
try {
- const response = await fetch("https://md.dhr.wtf/?url=" + url, {
+ const response = await fetch("https://md.dhr.wtf/?url=" + input.url, {
headers: {
- Authorization: "Bearer " + process.env.BACKEND_SECURITY_KEY,
+ Authorization: "Bearer " + input.securityKey,
},
});
const pageContent = await response.text();
@@ -29,7 +30,7 @@ export async function processPage(
);
}
console.log("[This is the page content]", pageContent);
- const metadataResult = await getMetaData(url);
+ const metadataResult = await getMetaData(input.url);
if (isErr(metadataResult)) {
throw metadataResult.error;
}
diff --git a/apps/cf-ai-backend/src/queueConsumer/helpers/processTweet.ts b/apps/cf-ai-backend/src/queueConsumer/helpers/processTweet.ts
index ef5d9f5b..8d83f2dc 100644
--- a/apps/cf-ai-backend/src/queueConsumer/helpers/processTweet.ts
+++ b/apps/cf-ai-backend/src/queueConsumer/helpers/processTweet.ts
@@ -3,6 +3,7 @@ import { Result, Ok, Err, isErr } from "../../errors/results";
import { BaseError } from "../../errors/baseError";
import { getMetaData, Metadata } from "../utils/get-metadata";
import { tweetToMd } from "@repo/shared-types/utils"; // can I do this?
+import { Env } from "../../types";
class ProcessTweetError extends BaseError {
constructor(message?: string, source?: string) {
@@ -43,39 +44,45 @@ export const getTweetData = async (
}
};
-export const getThreadData = async (
- tweetUrl: string,
- cf_thread_endpoint: string,
- authKey: string,
-): Promise<Result<string, ProcessTweetError>> => {
- const threadRequest = await fetch(cf_thread_endpoint, {
- method: "POST",
- headers: {
- "Content-Type": "application/json",
- Authorization: authKey,
- },
- body: JSON.stringify({ url: tweetUrl }),
- });
- if (threadRequest.status !== 200) {
- return Err(
- new ProcessTweetError(
- `Failed to fetch the thread: ${tweetUrl}, Reason: ${threadRequest.statusText}`,
- "getThreadData",
- ),
- );
- }
-
- const thread = await threadRequest.text();
- console.log("[thread response]");
+export const getThreadData = async (input: {
+ tweetUrl: string;
+ env: Env;
+}): Promise<Result<any, ProcessTweetError>> => {
+ try {
+ // const threadRequest = await fetch(input.cf_thread_endpoint, {
+ // method: "POST",
+ // headers: {
+ // "Content-Type": "application/json",
+ // Authorization: input.authKey,
+ // },
+ // body: JSON.stringify({ url: input.tweetUrl }),
+ // });
+ // if (threadRequest.status !== 200) {
+ // console.log(await threadRequest.text());
+ // console.log(input.tweetUrl);
+ // return Err(
+ // new ProcessTweetError(
+ // `Failed to fetch the thread: ${input.tweetUrl}, Reason: ${threadRequest.statusText}`,
+ // "getThreadData",
+ // ),
+ // );
+ // }
+ //@ts-ignore
+ const thread = await input.env.THREAD.processTweets(input.tweetUrl);
+ console.log("[thread response]", thread);
- if (thread.trim().length === 2) {
- console.log("Thread is an empty array");
- return Err(
- new ProcessTweetError(
- "[THREAD FETCHING SERVICE] Got no content form thread worker",
- "getThreadData",
- ),
- );
+ if (!thread.length) {
+ console.log("Thread is an empty array");
+ return Err(
+ new ProcessTweetError(
+ "[THREAD FETCHING SERVICE] Got no content form thread worker",
+ "getThreadData",
+ ),
+ );
+ }
+ return Ok(thread);
+ } catch (e) {
+ console.error("[Thread Processing Error]", e);
+ return Err(new ProcessTweetError((e as Error).message, "getThreadData"));
}
- return Ok(thread);
};
diff --git a/apps/cf-ai-backend/src/queueConsumer/index.ts b/apps/cf-ai-backend/src/queueConsumer/index.ts
index 8b9064cb..8ca23739 100644
--- a/apps/cf-ai-backend/src/queueConsumer/index.ts
+++ b/apps/cf-ai-backend/src/queueConsumer/index.ts
@@ -5,15 +5,21 @@ import { processNote } from "./helpers/processNotes";
import { processPage } from "./helpers/processPage";
import { getThreadData, getTweetData } from "./helpers/processTweet";
import { tweetToMd } from "@repo/shared-types/utils";
-import { initQQuery } from "./helpers/initQuery";
import { chunkNote, chunkPage } from "./chunkers/chunkPageOrNotes";
import { chunkThread } from "./chunkers/chunkTweet";
-import { batchCreateChunksAndEmbeddings } from "../helper";
+import { batchCreateChunksAndEmbeddings, initQuery } from "../helper";
import { z } from "zod";
import { Metadata } from "./utils/get-metadata";
import { BaseError } from "../errors/baseError";
import { database } from "../db";
-import { storedContent, space, contentToSpace } from "@repo/db/schema";
+import {
+ storedContent,
+ space,
+ contentToSpace,
+ users,
+ jobs,
+ Job,
+} from "@repo/db/schema";
import { and, eq, inArray, sql } from "drizzle-orm";
class VectorInsertError extends BaseError {
@@ -29,24 +35,99 @@ class D1InsertError extends BaseError {
}
}
+const d1ErrorFactory = (err: Error, source: string) =>
+ new D1InsertError(err.message, source);
+
+const calculateExponentialBackoff = (
+ attempts: number,
+ baseDelaySeconds: number,
+) => {
+ return baseDelaySeconds ** attempts;
+};
+
+const BASE_DELAY_SECONDS = 1.5;
export async function queue(
- batch: MessageBatch<{ content: string; space: Array<number>; user: string }>,
+ batch: MessageBatch<{
+ content: string;
+ space: Array<number>;
+ user: string;
+ type: string;
+ }>,
env: Env,
): Promise<void> {
+ const db = database(env);
console.log(env.CF_ACCOUNT_ID, env.CF_KV_AUTH_TOKEN);
for (let message of batch.messages) {
console.log(env.CF_ACCOUNT_ID, env.CF_KV_AUTH_TOKEN);
console.log("is thie even running?", message.body);
const body = message.body;
- console.log("v got shit in the queue", body);
- const typeResult = typeDecider(body.content);
+ const type = body.type;
+ const userExists = await wrap(
+ db.select().from(users).where(eq(users.id, body.user)).limit(1),
+ d1ErrorFactory,
+ "Error when trying to verify user",
+ );
+
+ if (isErr(userExists)) {
+ throw userExists.error;
+ }
+
+ //check if this is a retry job.. by checking if the combination of the userId and the url already exists on the queue
+ let jobId;
+ const existingJob = await wrap(
+ db
+ .select()
+ .from(jobs)
+ .where(
+ and(
+ eq(jobs.userId, userExists.value[0].id),
+ eq(jobs.url, body.content),
+ ),
+ )
+ .limit(1),
+ d1ErrorFactory,
+ "Error when checking for existing job",
+ );
+
+ if (isErr(existingJob)) {
+ throw existingJob.error;
+ }
- if (isErr(typeResult)) {
- throw typeResult.error;
+ if (existingJob.value.length > 0) {
+ jobId = existingJob.value[0].id;
+ await wrap(
+ db
+ .update(jobs)
+ .set({
+ attempts: existingJob.value[0].attempts + 1,
+ updatedAt: new Date(),
+ })
+ .where(eq(jobs.id, jobId)),
+ d1ErrorFactory,
+ "Error when updating job attempts",
+ );
+ } else {
+ const job = await wrap(
+ db
+ .insert(jobs)
+ .values({
+ userId: userExists.value[0].id as string,
+ url: body.content,
+ status: "Processing",
+ attempts: 1,
+ createdAt: new Date(),
+ updatedAt: new Date(),
+ })
+ .returning({ jobId: jobs.id }),
+ d1ErrorFactory,
+ "Error When inserting into jobs table",
+ );
+ if (isErr(job)) {
+ throw job.error;
+ }
+ jobId = job.value[0].jobId;
}
- console.log(typeResult.value);
- const type = typeResult.value;
let pageContent: string;
let vectorData: string;
@@ -70,8 +151,12 @@ export async function queue(
}
case "page": {
console.log("page hit");
- const page = await processPage(body.content);
+ const page = await processPage({
+ url: body.content,
+ securityKey: env.MD_SEC_KEY,
+ });
if (isErr(page)) {
+ console.log("there is a page error here");
throw page.error;
}
pageContent = page.value.pageContent;
@@ -83,15 +168,13 @@ export async function queue(
case "tweet": {
console.log("tweet hit");
- console.log(body.content.split("/").pop());
const tweet = await getTweetData(body.content.split("/").pop());
- console.log(tweet);
- const thread = await getThreadData(
- body.content,
- env.THREAD_CF_WORKER,
- env.THREAD_CF_AUTH,
- );
-
+ console.log(env.THREAD_CF_WORKER, env.THREAD_CF_AUTH);
+ const thread = await getThreadData({
+ tweetUrl: body.content,
+ env: env,
+ });
+ console.log("[This is the thread]", thread);
if (isErr(tweet)) {
throw tweet.error;
}
@@ -108,6 +191,7 @@ export async function queue(
vectorData = JSON.stringify(pageContent);
console.error(thread.error);
} else {
+ console.log("thread worker is fine");
vectorData = thread.value;
}
chunks = chunkThread(vectorData);
@@ -115,8 +199,27 @@ export async function queue(
}
}
+ //add to mem0, abstract
+
+ // const mem0Response = fetch('https://api.mem0.ai/v1/memories/', {
+ // method: 'POST',
+ // headers: {
+ // 'Content-Type': 'application/json',
+ // Authorization: `Token ${process.env.MEM0_API_KEY}`,
+ // },
+ // body: JSON.stringify({
+ // messages: [
+ // {
+ // role: 'user',
+ // content: query,
+ // },
+ // ],
+ // user_id: user?.user?.email,
+ // }),
+ // });
+
// see what's up with the storedToSpaces in this block
- const { store } = await initQQuery(env);
+ const { store } = await initQuery(env);
type body = z.infer<typeof vectorObj>;
@@ -129,66 +232,135 @@ export async function queue(
description: metadata.description,
title: metadata.description,
};
- const vectorResult = await wrap(
- batchCreateChunksAndEmbeddings({
- store: store,
- body: Chunkbody,
- chunks: chunks,
- env: env,
- }),
- vectorErrorFactory,
- );
- if (isErr(vectorResult)) {
- throw vectorResult.error;
- }
- const saveToDbUrl =
- (metadata.baseUrl.split("#supermemory-user-")[0] ?? metadata.baseUrl) +
- "#supermemory-user-" +
- body.user;
- let contentId: number;
- const db = database(env);
- const insertResponse = await db
- .insert(storedContent)
- .values({
- content: pageContent as string,
- title: metadata.title,
- description: metadata.description,
- url: saveToDbUrl,
- baseUrl: saveToDbUrl,
- image: metadata.image,
- savedAt: new Date(),
- userId: body.user,
- type: type,
- noteId: noteId,
- })
- .returning({ id: storedContent.id });
-
- if (!insertResponse[0]?.id) {
- throw new D1InsertError(
- "something went worng when inserting to database",
- "inresertResponse",
+ try {
+ const vectorResult = await wrap(
+ batchCreateChunksAndEmbeddings({
+ store: store,
+ body: Chunkbody,
+ chunks: chunks,
+ env: env,
+ }),
+ vectorErrorFactory,
+ "Error when Inserting into vector database",
);
- }
- contentId = insertResponse[0]?.id;
- if (storeToSpaces.length > 0) {
- // Adding the many-to-many relationship between content and spaces
- const spaceData = await db
- .select()
- .from(space)
- .where(and(inArray(space.id, storeToSpaces), eq(space.user, body.user)))
- .all();
- await Promise.all(
- spaceData.map(async (s) => {
- await db
- .insert(contentToSpace)
- .values({ contentId: contentId, spaceId: s.id });
+ if (isErr(vectorResult)) {
+ await db
+ .update(jobs)
+ .set({ error: vectorResult.error })
+ .where(eq(jobs.id, jobId));
+ message.retry({
+ delaySeconds: calculateExponentialBackoff(
+ message.attempts,
+ BASE_DELAY_SECONDS,
+ ),
+ });
+ throw vectorResult.error;
+ }
- await db.update(space).set({ numItems: s.numItems + 1 });
- }),
+ const saveToDbUrl =
+ (metadata.baseUrl.split("#supermemory-user-")[0] ?? metadata.baseUrl) +
+ "#supermemory-user-" +
+ body.user;
+ let contentId: number;
+
+ const insertResponse = await wrap(
+ db
+ .insert(storedContent)
+ .values({
+ content: pageContent as string,
+ title: metadata.title,
+ description: metadata.description,
+ url: saveToDbUrl,
+ baseUrl: saveToDbUrl,
+ image: metadata.image,
+ savedAt: new Date(),
+ userId: body.user,
+ type: type,
+ noteId: noteId,
+ })
+ .returning({ id: storedContent.id }),
+ d1ErrorFactory,
+ "Error when inserting into storedContent",
+ );
+
+ if (isErr(insertResponse)) {
+ await db
+ .update(jobs)
+ .set({ error: insertResponse.error })
+ .where(eq(jobs.id, jobId));
+ message.retry({
+ delaySeconds: calculateExponentialBackoff(
+ message.attempts,
+ BASE_DELAY_SECONDS,
+ ),
+ });
+ throw insertResponse.error;
+ }
+ console.log(JSON.stringify(insertResponse));
+ contentId = insertResponse[0]?.id;
+ console.log("this is the content Id", contentId);
+ if (storeToSpaces.length > 0) {
+ // Adding the many-to-many relationship between content and spaces
+ const spaceData = await wrap(
+ db
+ .select()
+ .from(space)
+ .where(
+ and(inArray(space.id, storeToSpaces), eq(space.user, body.user)),
+ )
+ .all(),
+ d1ErrorFactory,
+ "Error when getting data from spaces",
+ );
+
+ if (isErr(spaceData)) {
+ throw spaceData.error;
+ }
+ try {
+ await Promise.all(
+ spaceData.value.map(async (s) => {
+ try {
+ await db
+ .insert(contentToSpace)
+ .values({ contentId: contentId, spaceId: s.id });
+
+ await db.update(space).set({ numItems: s.numItems + 1 });
+ } catch (e) {
+ console.error(`Error updating space ${s.id}:`, e);
+ throw e;
+ }
+ }),
+ );
+ } catch (e) {
+ console.error("Error in updateSpacesWithContent:", e);
+ throw new Error(`Failed to update spaces: ${e.message}`);
+ }
+ }
+ } catch (e) {
+ console.error("Error in simulated transaction", e.message);
+ console.log("Rooling back changes");
+ message.retry({
+ delaySeconds: calculateExponentialBackoff(
+ message.attempts,
+ BASE_DELAY_SECONDS,
+ ),
+ });
+ throw new D1InsertError(
+ "Error when inserting into d1",
+ "D1 stuff after the vectorize",
);
}
+
+ // After the d1 and vectories suceeds then finally update the jobs table to indicate that the job has completed
+
+ await db
+ .update(jobs)
+ .set({ status: "Processed" })
+ .where(eq(jobs.id, jobId));
+
+ return;
}
}
@@ -199,6 +371,18 @@ To do:
3. remove getMetada form the lib file as it's not being used anywhere else
4. Figure out the limit stuff ( server action for that seems fine because no use in limiting after they already in the queue rigth? )
5. Figure out the initQuery stuff ( ;( ) --> This is a bad way of doing stuff :0
-6. How do I hande the content already exists wala use case?
-7. Figure out retry and not add shit to the vectirze over and over again on failure
+6. How do I hande the content already exists wala use case? --> Also how do I figure out limits?
+
+
+
+8. Wrap the d1 thing in a transaction and then write to vectorize if d1 is sucessful if it's not then just error out ( if d1 fails dlq, recoverable failure --> retry )
+
+Firt write to d1 in a transaction ( sotredContent + sapces ) --> write to vectorize --> vectorize failes --> reset d1 alternatively first we can also do the vectorise stuff if that suceeds then do the d1 stuff in a batch right?
+
+
+DEBUG:
+What's hapenning:
+1. The stuff in the d1 is updating but nothing is hapenning in the vectorize for some reason
+
+
*/
diff --git a/apps/cf-ai-backend/src/types.ts b/apps/cf-ai-backend/src/types.ts
index 5294e0a1..e4f13f1b 100644
--- a/apps/cf-ai-backend/src/types.ts
+++ b/apps/cf-ai-backend/src/types.ts
@@ -18,14 +18,17 @@ export type Env = {
MYBROWSER: unknown;
ANTHROPIC_API_KEY: string;
THREAD_CF_AUTH: string;
+ THREAD: { processTweets: () => Promise<Array<any>> };
THREAD_CF_WORKER: string;
NODE_ENV: string;
+ MD_SEC_KEY: string;
};
export interface JobData {
content: string;
space: Array<number>;
user: string;
+ type: string
}
export interface TweetData {
diff --git a/apps/cf-ai-backend/wrangler.toml b/apps/cf-ai-backend/wrangler.toml
index ca8e2b1d..665ca593 100644
--- a/apps/cf-ai-backend/wrangler.toml
+++ b/apps/cf-ai-backend/wrangler.toml
@@ -3,8 +3,12 @@ main = "src/index.ts"
compatibility_date = "2024-02-23"
node_compat = true
-tail_consumers = [{service = "new-cf-ai-backend-tail"}]
+# tail_consumers = [{service = "new-cf-ai-backend-tail"}]
+[[services]]
+binding = "THREAD"
+service = "tweet-thread"
+entrypoint = "ThreadWorker"
# [env.preview]
[[vectorize]]
diff --git a/apps/web/app/actions/doers.ts b/apps/web/app/actions/doers.ts
index f17032b9..500a8608 100644
--- a/apps/web/app/actions/doers.ts
+++ b/apps/web/app/actions/doers.ts
@@ -17,7 +17,7 @@ import { auth } from "../../server/auth";
import { Tweet } from "react-tweet/api";
// import { getMetaData } from "@/lib/get-metadata";
import { and, eq, inArray, sql } from "drizzle-orm";
-import { LIMITS } from "@/lib/constants";
+import { LIMITS } from "@repo/shared-types";
import { ChatHistory } from "@repo/shared-types";
import { decipher } from "@/server/encrypt";
import { redirect } from "next/navigation";
@@ -104,25 +104,6 @@ const typeDecider = (content: string): "page" | "tweet" | "note" => {
}
};
-export const limit = async (
- userId: string,
- type = "page",
- items: number = 1,
-) => {
- const countResult = await db
- .select({
- count: sql<number>`count(*)`.mapWith(Number),
- })
- .from(storedContent)
- .where(and(eq(storedContent.userId, userId), eq(storedContent.type, type)));
-
- const currentCount = countResult[0]?.count || 0;
- const totalLimit = LIMITS[type as keyof typeof LIMITS];
- const remainingLimit = totalLimit - currentCount;
-
- return items <= remainingLimit;
-};
-
export const addUserToSpace = async (userEmail: string, spaceId: number) => {
const data = await auth();
@@ -197,7 +178,6 @@ export const createMemory = async (input: {
return { error: "Not authenticated", success: false };
}
-
// make the backend reqeust for the queue here
const vectorSaveResponses = await fetch(
`${process.env.BACKEND_BASE_URL}/api/add`,
@@ -214,250 +194,262 @@ export const createMemory = async (input: {
},
},
);
+ const response = (await vectorSaveResponses.json()) as {
+ status: string;
+ message?: string;
+ };
+
+ if (response.status !== "ok") {
+ return {
+ success: false,
+ data: 0,
+ error: response.message,
+ };
+ }
-// const type = typeDecider(input.content);
-
-// let pageContent = input.content;
-// let metadata: Awaited<ReturnType<typeof getMetaData>>;
-// let vectorData: string;
-
-// if (!(await limit(data.user.id, type))) {
-// return {
-// success: false,
-// data: 0,
-// error: `You have exceeded the limit of ${LIMITS[type as keyof typeof LIMITS]} ${type}s.`,
-// };
-// }
-
-// let noteId = 0;
-
-// if (type === "page") {
-// const response = await fetch("https://md.dhr.wtf/?url=" + input.content, {
-// headers: {
-// Authorization: "Bearer " + process.env.BACKEND_SECURITY_KEY,
-// },
-// });
-// pageContent = await response.text();
-// vectorData = pageContent;
-// try {
-// metadata = await getMetaData(input.content);
-// } catch (e) {
-// return {
-// success: false,
-// error: "Failed to fetch metadata for the page. Please try again later.",
-// };
-// }
-// } else if (type === "tweet") {
-// //Request the worker for the entire thread
-
-// let thread: string;
-// let errorOccurred: boolean = false;
-
-// try {
-// const cf_thread_endpoint = process.env.THREAD_CF_WORKER;
-// const authKey = process.env.THREAD_CF_AUTH;
-// const threadRequest = await fetch(cf_thread_endpoint, {
-// method: "POST",
-// headers: {
-// "Content-Type": "application/json",
-// Authorization: authKey,
-// },
-// body: JSON.stringify({ url: input.content }),
-// });
-
-// if (threadRequest.status !== 200) {
-// throw new Error(
-// `Failed to fetch the thread: ${input.content}, Reason: ${threadRequest.statusText}`,
-// );
-// }
-
-// thread = await threadRequest.text();
-// if (thread.trim().length === 2) {
-// console.log("Thread is an empty array");
-// throw new Error(
-// "[THREAD FETCHING SERVICE] Got no content form thread worker",
-// );
-// }
-// } catch (e) {
-// console.log("[THREAD FETCHING SERVICE] Failed to fetch the thread", e);
-// errorOccurred = true;
-// }
-
-// const tweet = await getTweetData(input.content.split("/").pop() as string);
-
-// pageContent = tweetToMd(tweet);
-// console.log("THis ishte page content!!", pageContent);
-// //@ts-ignore
-// vectorData = errorOccurred ? JSON.stringify(pageContent) : thread;
-// metadata = {
-// baseUrl: input.content,
-// description: tweet.text.slice(0, 200),
-// image: tweet.user.profile_image_url_https,
-// title: `Tweet by ${tweet.user.name}`,
-// };
-// } else if (type === "note") {
-// pageContent = input.content;
-// vectorData = pageContent;
-// noteId = new Date().getTime();
-// metadata = {
-// baseUrl: `https://supermemory.ai/note/${noteId}`,
-// description: `Note created at ${new Date().toLocaleString()}`,
-// image: "https://supermemory.ai/logo.png",
-// title: `${pageContent.slice(0, 20)} ${pageContent.length > 20 ? "..." : ""}`,
-// };
-// } else {
-// return {
-// success: false,
-// data: 0,
-// error: "Invalid type",
-// };
-// }
-
-// let storeToSpaces = input.spaces;
-
-// if (!storeToSpaces) {
-// storeToSpaces = [];
-// }
-
-// const vectorSaveResponse = await fetch(
-// `${process.env.BACKEND_BASE_URL}/api/add`,
-// {
-// method: "POST",
-// body: JSON.stringify({
-// pageContent: vectorData,
-// title: metadata.title,
-// description: metadata.description,
-// url: metadata.baseUrl,
-// spaces: storeToSpaces.map((spaceId) => spaceId.toString()),
-// user: data.user.id,
-// type,
-// }),
-// headers: {
-// "Content-Type": "application/json",
-// Authorization: "Bearer " + process.env.BACKEND_SECURITY_KEY,
-// },
-// },
-// );
-
-// if (!vectorSaveResponse.ok) {
-// const errorData = await vectorSaveResponse.text();
-// console.error(errorData);
-// return {
-// success: false,
-// data: 0,
-// error: `Failed to save to vector store. Backend returned error: ${errorData}`,
-// };
-// }
-
-// let contentId: number;
-
-// const response = (await vectorSaveResponse.json()) as {
-// status: string;
-// chunkedInput: string;
-// message?: string;
-// };
-
-// try {
-// if (response.status !== "ok") {
-// if (response.status === "error") {
-// return {
-// success: false,
-// data: 0,
-// error: response.message,
-// };
-// } else {
-// return {
-// success: false,
-// data: 0,
-// error: `Failed to save to vector store. Backend returned error: ${response.message}`,
-// };
-// }
-// }
-// } catch (e) {
-// return {
-// success: false,
-// data: 0,
-// error: `Failed to save to vector store. Backend returned error: ${e}`,
-// };
-// }
-
-// const saveToDbUrl =
-// (metadata.baseUrl.split("#supermemory-user-")[0] ?? metadata.baseUrl) +
-// "#supermemory-user-" +
-// data.user.id;
-
-// // Insert into database
-// try {
-// const insertResponse = await db
-// .insert(storedContent)
-// .values({
-// content: pageContent,
-// title: metadata.title,
-// description: metadata.description,
-// url: saveToDbUrl,
-// baseUrl: saveToDbUrl,
-// image: metadata.image,
-// savedAt: new Date(),
-// userId: data.user.id,
-// type,
-// noteId,
-// })
-// .returning({ id: storedContent.id });
-// revalidatePath("/memories");
-// revalidatePath("/home");
-
-// if (!insertResponse[0]?.id) {
-// return {
-// success: false,
-// data: 0,
-// error: "Something went wrong while saving the document to the database",
-// };
-// }
-
-// contentId = insertResponse[0]?.id;
-// } catch (e) {
-// const error = e as Error;
-// console.log("Error: ", error.message);
-
-// if (
-// error.message.includes(
-// "D1_ERROR: UNIQUE constraint failed: storedContent.baseUrl",
-// )
-// ) {
-// return {
-// success: false,
-// data: 0,
-// error: "Content already exists",
-// };
-// }
-
-// return {
-// success: false,
-// data: 0,
-// error: "Failed to save to database with error: " + error.message,
-// };
-// }
-
-// if (storeToSpaces.length > 0) {
-// // Adding the many-to-many relationship between content and spaces
-// const spaceData = await db
-// .select()
-// .from(space)
-// .where(
-// and(inArray(space.id, storeToSpaces), eq(space.user, data.user.id)),
-// )
-// .all();
-
-// await Promise.all(
-// spaceData.map(async (s) => {
-// await db
-// .insert(contentToSpace)
-// .values({ contentId: contentId, spaceId: s.id });
-
-// await db.update(space).set({ numItems: s.numItems + 1 });
-// }),
-// );
-// }
+ // const type = typeDecider(input.content);
+
+ // let pageContent = input.content;
+ // let metadata: Awaited<ReturnType<typeof getMetaData>>;
+ // let vectorData: string;
+
+ // if (!(await limit(data.user.id, type))) {
+ // return {
+ // success: false,
+ // data: 0,
+ // error: `You have exceeded the limit of ${LIMITS[type as keyof typeof LIMITS]} ${type}s.`,
+ // };
+ // } --> How would this fit in the backend???
+
+ // let noteId = 0;
+
+ // if (type === "page") {
+ // const response = await fetch("https://md.dhr.wtf/?url=" + input.content, {
+ // headers: {
+ // Authorization: "Bearer " + process.env.BACKEND_SECURITY_KEY,
+ // },
+ // });
+ // pageContent = await response.text();
+ // vectorData = pageContent;
+ // try {
+ // metadata = await getMetaData(input.content);
+ // } catch (e) {
+ // return {
+ // success: false,
+ // error: "Failed to fetch metadata for the page. Please try again later.",
+ // };
+ // }
+ // } else if (type === "tweet") {
+ // //Request the worker for the entire thread
+
+ // let thread: string;
+ // let errorOccurred: boolean = false;
+
+ // try {
+ // const cf_thread_endpoint = process.env.THREAD_CF_WORKER;
+ // const authKey = process.env.THREAD_CF_AUTH;
+ // const threadRequest = await fetch(cf_thread_endpoint, {
+ // method: "POST",
+ // headers: {
+ // "Content-Type": "application/json",
+ // Authorization: authKey,
+ // },
+ // body: JSON.stringify({ url: input.content }),
+ // });
+
+ // if (threadRequest.status !== 200) {
+ // throw new Error(
+ // `Failed to fetch the thread: ${input.content}, Reason: ${threadRequest.statusText}`,
+ // );
+ // }
+
+ // thread = await threadRequest.text();
+ // if (thread.trim().length === 2) {
+ // console.log("Thread is an empty array");
+ // throw new Error(
+ // "[THREAD FETCHING SERVICE] Got no content form thread worker",
+ // );
+ // }
+ // } catch (e) {
+ // console.log("[THREAD FETCHING SERVICE] Failed to fetch the thread", e);
+ // errorOccurred = true;
+ // }
+
+ // const tweet = await getTweetData(input.content.split("/").pop() as string);
+
+ // pageContent = tweetToMd(tweet);
+ // console.log("THis ishte page content!!", pageContent);
+ // //@ts-ignore
+ // vectorData = errorOccurred ? JSON.stringify(pageContent) : thread;
+ // metadata = {
+ // baseUrl: input.content,
+ // description: tweet.text.slice(0, 200),
+ // image: tweet.user.profile_image_url_https,
+ // title: `Tweet by ${tweet.user.name}`,
+ // };
+ // } else if (type === "note") {
+ // pageContent = input.content;
+ // vectorData = pageContent;
+ // noteId = new Date().getTime();
+ // metadata = {
+ // baseUrl: `https://supermemory.ai/note/${noteId}`,
+ // description: `Note created at ${new Date().toLocaleString()}`,
+ // image: "https://supermemory.ai/logo.png",
+ // title: `${pageContent.slice(0, 20)} ${pageContent.length > 20 ? "..." : ""}`,
+ // };
+ // } else {
+ // return {
+ // success: false,
+ // data: 0,
+ // error: "Invalid type",
+ // };
+ // }
+
+ // let storeToSpaces = input.spaces;
+
+ // if (!storeToSpaces) {
+ // storeToSpaces = [];
+ // }
+
+ // const vectorSaveResponse = await fetch(
+ // `${process.env.BACKEND_BASE_URL}/api/add`,
+ // {
+ // method: "POST",
+ // body: JSON.stringify({
+ // pageContent: vectorData,
+ // title: metadata.title,
+ // description: metadata.description,
+ // url: metadata.baseUrl,
+ // spaces: storeToSpaces.map((spaceId) => spaceId.toString()),
+ // user: data.user.id,
+ // type,
+ // }),
+ // headers: {
+ // "Content-Type": "application/json",
+ // Authorization: "Bearer " + process.env.BACKEND_SECURITY_KEY,
+ // },
+ // },
+ // );
+
+ // if (!vectorSaveResponse.ok) {
+ // const errorData = await vectorSaveResponse.text();
+ // console.error(errorData);
+ // return {
+ // success: false,
+ // data: 0,
+ // error: `Failed to save to vector store. Backend returned error: ${errorData}`,
+ // };
+ // }
+
+ // let contentId: number;
+
+ // const response = (await vectorSaveResponse.json()) as {
+ // status: string;
+ // chunkedInput: string;
+ // message?: string;
+ // };
+
+ // try {
+ // if (response.status !== "ok") {
+ // if (response.status === "error") {
+ // return {
+ // success: false,
+ // data: 0,
+ // error: response.message,
+ // };
+ // } else {
+ // return {
+ // success: false,
+ // data: 0,
+ // error: `Failed to save to vector store. Backend returned error: ${response.message}`,
+ // };
+ // }
+ // }
+ // } catch (e) {
+ // return {
+ // success: false,
+ // data: 0,
+ // error: `Failed to save to vector store. Backend returned error: ${e}`,
+ // };
+ // }
+
+ // const saveToDbUrl =
+ // (metadata.baseUrl.split("#supermemory-user-")[0] ?? metadata.baseUrl) +
+ // "#supermemory-user-" +
+ // data.user.id;
+
+ // // Insert into database
+ // try {
+ // const insertResponse = await db
+ // .insert(storedContent)
+ // .values({
+ // content: pageContent,
+ // title: metadata.title,
+ // description: metadata.description,
+ // url: saveToDbUrl,
+ // baseUrl: saveToDbUrl,
+ // image: metadata.image,
+ // savedAt: new Date(),
+ // userId: data.user.id,
+ // type,
+ // noteId,
+ // })
+ // .returning({ id: storedContent.id });
+ // revalidatePath("/memories");
+ // revalidatePath("/home");
+
+ // if (!insertResponse[0]?.id) {
+ // return {
+ // success: false,
+ // data: 0,
+ // error: "Something went wrong while saving the document to the database",
+ // };
+ // }
+
+ // contentId = insertResponse[0]?.id;
+ // } catch (e) {
+ // const error = e as Error;
+ // console.log("Error: ", error.message);
+
+ // if (
+ // error.message.includes(
+ // "D1_ERROR: UNIQUE constraint failed: storedContent.baseUrl",
+ // )
+ // ) {
+ // return {
+ // success: false,
+ // data: 0,
+ // error: "Content already exists",
+ // };
+ // }
+
+ // return {
+ // success: false,
+ // data: 0,
+ // error: "Failed to save to database with error: " + error.message,
+ // };
+ // }
+
+ // if (storeToSpaces.length > 0) {
+ // // Adding the many-to-many relationship between content and spaces
+ // const spaceData = await db
+ // .select()
+ // .from(space)
+ // .where(
+ // and(inArray(space.id, storeToSpaces), eq(space.user, data.user.id)),
+ // )
+ // .all();
+
+ // await Promise.all(
+ // spaceData.map(async (s) => {
+ // await db
+ // .insert(contentToSpace)
+ // .values({ contentId: contentId, spaceId: s.id });
+
+ // await db.update(space).set({ numItems: s.numItems + 1 });
+ // }),
+ // );
+ // }
return {
success: true,
@@ -475,7 +467,6 @@ export const createChatThread = async (
return { error: "Not authenticated", success: false };
}
-
const thread = await db
.insert(chatThreads)
.values({
@@ -836,8 +827,9 @@ export async function getQuerySuggestions() {
};
}
- const fullQuery = (content?.map((c) => `${c.title} \n\n${c.content}`) ?? [])
- .join(" ");
+ const fullQuery = (
+ content?.map((c) => `${c.title} \n\n${c.content}`) ?? []
+ ).join(" ");
const suggestionsCall = (await env.AI.run(
// @ts-ignore
diff --git a/apps/web/app/api/store/helper.ts b/apps/web/app/api/store/helper.ts
index 2dc42125..6ab7fb23 100644
--- a/apps/web/app/api/store/helper.ts
+++ b/apps/web/app/api/store/helper.ts
@@ -2,21 +2,21 @@ import { z } from "zod";
import { db } from "@/server/db";
import { contentToSpace, space, storedContent } from "@repo/db/schema";
import { and, eq, inArray } from "drizzle-orm";
-import { LIMITS } from "@/lib/constants";
-import { limit } from "@/app/actions/doers";
+// import { LIMITS } from "@repo/shared-types";
+// import { limit } from "@/app/actions/doers";
import { type AddFromAPIType } from "@repo/shared-types";
export const createMemoryFromAPI = async (input: {
data: AddFromAPIType;
userId: string;
}) => {
- if (!(await limit(input.userId, input.data.type))) {
- return {
- success: false,
- data: 0,
- error: `You have exceeded the limit of ${LIMITS[input.data.type as keyof typeof LIMITS]} ${input.data.type}s.`,
- };
- }
+ // if (!(await limit(input.userId, input.data.type))) {
+ // return {
+ // success: false,
+ // data: 0,
+ // error: `You have exceeded the limit of ${LIMITS[input.data.type as keyof typeof LIMITS]} ${input.data.type}s.`,
+ // };
+ // }
const vectorSaveResponse = await fetch(
`${process.env.BACKEND_BASE_URL}/api/add`,
diff --git a/apps/web/lib/constants.ts b/apps/web/lib/constants.ts
index 241a6a1d..73f9a83d 100644
--- a/apps/web/lib/constants.ts
+++ b/apps/web/lib/constants.ts
@@ -1,9 +1,3 @@
-export const LIMITS = {
- page: 100,
- tweet: 1000,
- note: 1000,
-};
-
export const codeLanguageSubset = [
"python",
"javascript",
diff --git a/packages/db/schema.ts b/packages/db/schema.ts
index 11711997..70860066 100644
--- a/packages/db/schema.ts
+++ b/packages/db/schema.ts
@@ -256,8 +256,14 @@ export const jobs = createTable(
attempts: integer("attempts").notNull().default(0),
lastAttemptAt: integer("lastAttemptAt"),
error: blob("error"),
- createdAt: integer("createdAt").notNull(),
- updatedAt: integer("updatedAt").notNull(),
+ createdAt: int("createdAt", { mode: "timestamp" })
+ .notNull()
+ .notNull()
+ .default(new Date()),
+ updatedAt: int("updatedAt", { mode: "timestamp" })
+ .notNull()
+ .notNull()
+ .default(new Date()),
},
(job) => ({
userIdx: index("jobs_userId_idx").on(job.userId),
diff --git a/packages/shared-types/index.ts b/packages/shared-types/index.ts
index a9933b84..e49cf8e0 100644
--- a/packages/shared-types/index.ts
+++ b/packages/shared-types/index.ts
@@ -1,5 +1,14 @@
import { z } from "zod";
+
+export const LIMITS = {
+ page: 100,
+ tweet: 1000,
+ note: 1000,
+};
+
+
+
export const SourceZod = z.object({
type: z.string(),
source: z.string(),