diff options
| author | Kush Thaker <[email protected]> | 2024-08-05 21:25:11 +0530 |
|---|---|---|
| committer | Kush Thaker <[email protected]> | 2024-08-05 21:25:11 +0530 |
| commit | e4fd7f5aacc3c9f7f000e1858248d49aa4d3410e (patch) | |
| tree | 4f1af30951860629ef376c37bc995a65dda8b813 | |
| parent | db schema in packages (diff) | |
| download | supermemory-e4fd7f5aacc3c9f7f000e1858248d49aa4d3410e.tar.xz supermemory-e4fd7f5aacc3c9f7f000e1858248d49aa4d3410e.zip | |
move limit to backend and thread service binding
| -rw-r--r-- | apps/cf-ai-backend/src/errors/baseError.ts | 2 | ||||
| -rw-r--r-- | apps/cf-ai-backend/src/errors/results.ts | 19 | ||||
| -rw-r--r-- | apps/cf-ai-backend/src/helper.ts | 23 | ||||
| -rw-r--r-- | apps/cf-ai-backend/src/index.ts | 117 | ||||
| -rw-r--r-- | apps/cf-ai-backend/src/queueConsumer/helpers/initQuery.ts | 58 | ||||
| -rw-r--r-- | apps/cf-ai-backend/src/queueConsumer/helpers/processPage.ts | 13 | ||||
| -rw-r--r-- | apps/cf-ai-backend/src/queueConsumer/helpers/processTweet.ts | 73 | ||||
| -rw-r--r-- | apps/cf-ai-backend/src/queueConsumer/index.ts | 336 | ||||
| -rw-r--r-- | apps/cf-ai-backend/src/types.ts | 3 | ||||
| -rw-r--r-- | apps/cf-ai-backend/wrangler.toml | 6 | ||||
| -rw-r--r-- | apps/web/app/actions/doers.ts | 526 | ||||
| -rw-r--r-- | apps/web/app/api/store/helper.ts | 18 | ||||
| -rw-r--r-- | apps/web/lib/constants.ts | 6 | ||||
| -rw-r--r-- | packages/db/schema.ts | 10 | ||||
| -rw-r--r-- | packages/shared-types/index.ts | 9 |
15 files changed, 701 insertions, 518 deletions
diff --git a/apps/cf-ai-backend/src/errors/baseError.ts b/apps/cf-ai-backend/src/errors/baseError.ts index 2723d45b..0dcc2203 100644 --- a/apps/cf-ai-backend/src/errors/baseError.ts +++ b/apps/cf-ai-backend/src/errors/baseError.ts @@ -7,7 +7,7 @@ export class BaseHttpError extends Error { this.status = status; this.message = message; Object.setPrototypeOf(this, new.target.prototype); // Restore prototype chain - } + } } diff --git a/apps/cf-ai-backend/src/errors/results.ts b/apps/cf-ai-backend/src/errors/results.ts index 87ea0c63..ccce1396 100644 --- a/apps/cf-ai-backend/src/errors/results.ts +++ b/apps/cf-ai-backend/src/errors/results.ts @@ -14,15 +14,18 @@ export const Err = <E extends BaseError>(error: E): Result<never, E> => { export async function wrap<T, E extends BaseError>( p: Promise<T>, - errorFactory: (err: Error) => E, -): Promise<Result<T, E>> { + errorFactory: (err: Error, source: string) => E, + source: string = "unspecified" + ): Promise<Result<T, E>> { try { - return Ok(await p); + return Ok(await p); } catch (e) { - return Err(errorFactory(e as Error)); + return Err(errorFactory(e as Error, source)); } -} + } -export function isErr<T, E extends Error>(result: Result<T, E>): result is { ok: false; error: E } { - return !result.ok; - }
\ No newline at end of file +export function isErr<T, E extends Error>( + result: Result<T, E>, +): result is { ok: false; error: E } { + return !result.ok; +} diff --git a/apps/cf-ai-backend/src/helper.ts b/apps/cf-ai-backend/src/helper.ts index 1568996a..eadd9c21 100644 --- a/apps/cf-ai-backend/src/helper.ts +++ b/apps/cf-ai-backend/src/helper.ts @@ -9,17 +9,14 @@ import { z } from "zod"; import { seededRandom } from "./utils/seededRandom"; import { bulkInsertKv } from "./utils/kvBulkInsert"; -export async function initQuery( - c: Context<{ Bindings: Env }>, - model: string = "gpt-4o", -) { +export async function initQuery(env: Env, model: string = "gpt-4o") { const embeddings = new OpenAIEmbeddings({ - apiKey: c.env.OPENAI_API_KEY, + apiKey: env.OPENAI_API_KEY, modelName: "text-embedding-3-small", }); const store = new CloudflareVectorizeStore(embeddings, { - index: c.env.VECTORIZE_INDEX, + index: env.VECTORIZE_INDEX, }); let selectedModel: @@ -30,7 +27,7 @@ export async function initQuery( switch (model) { case "claude-3-opus": const anthropic = createAnthropic({ - apiKey: c.env.ANTHROPIC_API_KEY, + apiKey: env.ANTHROPIC_API_KEY, baseURL: "https://gateway.ai.cloudflare.com/v1/47c2b4d598af9d423c06fc9f936226d5/supermemory/anthropic", }); @@ -39,7 +36,7 @@ export async function initQuery( break; case "gemini-1.5-pro": const googleai = createGoogleGenerativeAI({ - apiKey: c.env.GOOGLE_AI_API_KEY, + apiKey: env.GOOGLE_AI_API_KEY, baseURL: "https://gateway.ai.cloudflare.com/v1/47c2b4d598af9d423c06fc9f936226d5/supermemory/google-vertex-ai", }); @@ -49,7 +46,7 @@ export async function initQuery( case "gpt-4o": default: const openai = createOpenAI({ - apiKey: c.env.OPENAI_API_KEY, + apiKey: env.OPENAI_API_KEY, baseURL: "https://gateway.ai.cloudflare.com/v1/47c2b4d598af9d423c06fc9f936226d5/supermemory/openai", compatibility: "strict", @@ -204,7 +201,7 @@ export async function batchCreateChunksAndEmbeddings({ const commonMetaData = { type: body.type ?? "tweet", title: body.title?.slice(0, 50) ?? "", - description: body.description ?? "", + description: body.description?.slice(0, 50) ?? "", url: body.url, [sanitizeKey(`user-${body.user}`)]: 1, }; @@ -255,7 +252,7 @@ export async function batchCreateChunksAndEmbeddings({ const commonMetaData = { type: body.type ?? "page", title: body.title?.slice(0, 50) ?? "", - description: body.description ?? "", + description: body.description?.slice(0, 50) ?? "", url: body.url, [sanitizeKey(`user-${body.user}`)]: 1, }; @@ -292,7 +289,7 @@ export async function batchCreateChunksAndEmbeddings({ const commonMetaData = { title: body.title?.slice(0, 50) ?? "", type: body.type ?? "page", - description: body.description ?? "", + description: body.description?.slice(0, 50) ?? "", url: body.url, [sanitizeKey(`user-${body.user}`)]: 1, }; @@ -328,7 +325,7 @@ export async function batchCreateChunksAndEmbeddings({ const commonMetaData = { type: body.type ?? "image", title: body.title, - description: body.description ?? "", + description: body.description?.slice(0, 50) ?? "", url: body.url, [sanitizeKey(`user-${body.user}`)]: 1, }; diff --git a/apps/cf-ai-backend/src/index.ts b/apps/cf-ai-backend/src/index.ts index 4949fab3..a46d0080 100644 --- a/apps/cf-ai-backend/src/index.ts +++ b/apps/cf-ai-backend/src/index.ts @@ -24,12 +24,18 @@ import { zValidator } from "@hono/zod-validator"; import chunkText from "./queueConsumer/chunkers/chonker"; import { systemPrompt, template } from "./prompts/prompt1"; import { swaggerUI } from "@hono/swagger-ui"; +import { database } from "./db"; +import { storedContent } from "@repo/db/schema"; +import { sql, and, eq } from "drizzle-orm"; +import { LIMITS } from "@repo/shared-types"; +import { typeDecider } from "./queueConsumer/utils/typeDecider"; // import { chunkThread } from "./utils/chunkTweet"; import { chunkNote, chunkPage, } from "./queueConsumer/chunkers/chunkPageOrNotes"; import { queue } from "./queueConsumer"; +import { isErr } from "./errors/results"; const app = new Hono<{ Bindings: Env }>(); @@ -68,42 +74,75 @@ app.get("/api/health", (c) => { app.post("/api/add", zValidator("json", vectorBody), async (c) => { try { - // console.log("api/add hit!!!!"); const body = c.req.valid("json"); - const spaceNumbers = body.spaces.map((s: string) => Number(s)); - await c.env.EMBEDCHUNKS_QUEUE.send({ - content: body.url, - user: body.user, - space: spaceNumbers, - }); - - // const { store } = await initQuery(c); + //This is something I don't like + // console.log("api/add hit!!!!"); + //Have to do limit on this also duplicate check here + const db = database(c.env); + const typeResult = typeDecider(body.url); + + const saveToDbUrl = + (body.url.split("#supermemory-user-")[0] ?? body.url) + // Why does this have to be a split from #supermemory-user? + "#supermemory-user-" + + body.user; + + console.log( + "---------------------------------------------------------------------------------------------------------------------------------------------", + saveToDbUrl, + ); + const alreadyExist = await db + .select() + .from(storedContent) + .where(eq(storedContent.baseUrl, saveToDbUrl)); + console.log( + "------------------------------------------------", + JSON.stringify(alreadyExist), + ); - // console.log(body.spaces); - // let chunks: TweetChunks | PageOrNoteChunks; - // // remove everything in <raw> tags - // // const newPageContent = body.pageContent?.replace(/<raw>.*?<\/raw>/g, ""); + if (alreadyExist.length > 0) { + console.log( + "------------------------------------------------------------------------------------------------I exist------------------------", + ); + return c.json({ status: "error", message: "the content already exists" }); + } - // switch (body.type) { - // case "tweet": - // chunks = chunkThread(body.pageContent); - // break; + if (isErr(typeResult)) { + throw typeResult.error; + } + // limiting in the backend + const type = typeResult.value; + const countResult = await db + .select({ + count: sql<number>`count(*)`.mapWith(Number), + }) + .from(storedContent) + .where( + and(eq(storedContent.userId, body.user), eq(storedContent.type, type)), + ); - // case "page": - // chunks = chunkPage(body.pageContent); - // break; + const currentCount = countResult[0]?.count || 0; + const totalLimit = LIMITS[type as keyof typeof LIMITS]; + const remainingLimit = totalLimit - currentCount; + const items = 1; + const isWithinLimit = items <= remainingLimit; - // case "note": - // chunks = chunkNote(body.pageContent); - // break; - // } + // unique contraint check - // await batchCreateChunksAndEmbeddings({ - // store, - // body, - // chunks: chunks, - // env: c, - // }); + if (isWithinLimit) { + const spaceNumbers = body.spaces.map((s: string) => Number(s)); + await c.env.EMBEDCHUNKS_QUEUE.send({ + content: body.url, + user: body.user, + space: spaceNumbers, + type: type, + }); + } else { + return c.json({ + status: "error", + message: + "You have exceed the current limit for this type of document, please try removing something form memories ", + }); + } return c.json({ status: "ok" }); } catch (error) { @@ -137,7 +176,7 @@ app.post( async (c) => { const body = c.req.valid("form"); - const { store } = await initQuery(c); + const { store } = await initQuery(c.env); if (!(body.images || body["images[]"])) { return c.json({ status: "error", message: "No images found" }, 400); @@ -203,7 +242,7 @@ app.get( async (c) => { const query = c.req.valid("query"); - const { model } = await initQuery(c); + const { model } = await initQuery(c.env); const response = await streamText({ model, prompt: query.query }); const r = response.toTextStreamResponse(); @@ -221,7 +260,7 @@ app.get( [`user-${user}`]: 1, }; - const { store } = await initQuery(c); + const { store } = await initQuery(c.env); const queryAsVector = await store.embeddings.embedQuery(query); const resp = await c.env.VECTORIZE_INDEX.query(queryAsVector, { @@ -273,7 +312,7 @@ app.post( const { query, user } = c.req.valid("query"); const { chatHistory } = c.req.valid("json"); - const { store, model } = await initQuery(c); + const { store, model } = await initQuery(c.env); let task: "add" | "chat" = "chat"; let thingToAdd: "page" | "image" | "text" | undefined = undefined; @@ -448,7 +487,7 @@ app.post( const spaces = query.spaces?.split(",") ?? [undefined]; // Get the AI model maker and vector store - const { model, store } = await initQuery(c, query.model); + const { model, store } = await initQuery(c.env, query.model); if (!body.sources) { const filter: VectorizeVectorMetadataFilter = { @@ -591,6 +630,8 @@ app.post( } } + //Serach mem0 + const preparedContext = body.sources.normalizedData.map( ({ metadata, score, normalizedScore }) => ({ context: `Website title: ${metadata!.title}\nDescription: ${metadata!.description}\nURL: ${metadata!.url}\nContent: ${metadata!.text}`, @@ -601,7 +642,7 @@ app.post( const initialMessages: CoreMessage[] = [ { role: "user", content: systemPrompt }, - { role: "assistant", content: "Hello, how can I help?" }, + { role: "assistant", content: "Hello, how can I help?" }, // prase and add memory json here ]; const prompt = template({ @@ -637,7 +678,7 @@ app.delete( async (c) => { const { websiteUrl, user } = c.req.valid("query"); - const { store } = await initQuery(c); + const { store } = await initQuery(c.env); await deleteDocument({ url: websiteUrl, user, c, store }); @@ -657,7 +698,7 @@ app.get( ), async (c) => { const { context, request } = c.req.valid("query"); - const { model } = await initQuery(c); + const { model } = await initQuery(c.env); const response = await streamText({ model, diff --git a/apps/cf-ai-backend/src/queueConsumer/helpers/initQuery.ts b/apps/cf-ai-backend/src/queueConsumer/helpers/initQuery.ts deleted file mode 100644 index a7d85c23..00000000 --- a/apps/cf-ai-backend/src/queueConsumer/helpers/initQuery.ts +++ /dev/null @@ -1,58 +0,0 @@ -import { Env } from "../../types"; -import { OpenAIEmbeddings } from "../../utils/OpenAIEmbedder"; -import { CloudflareVectorizeStore } from "@langchain/cloudflare"; -import { createOpenAI } from "@ai-sdk/openai"; -import { createGoogleGenerativeAI } from "@ai-sdk/google"; -import { createAnthropic } from "@ai-sdk/anthropic"; - -export async function initQQuery( - env: Env, - model: string = "gpt-4o", -) { - const embeddings = new OpenAIEmbeddings({ - apiKey: env.OPENAI_API_KEY, - modelName: "text-embedding-3-small", - }); - - const store = new CloudflareVectorizeStore(embeddings, { - index: env.VECTORIZE_INDEX, - }); - - let selectedModel: - | ReturnType<ReturnType<typeof createOpenAI>> - | ReturnType<ReturnType<typeof createGoogleGenerativeAI>> - | ReturnType<ReturnType<typeof createAnthropic>>; - - switch (model) { - case "claude-3-opus": - const anthropic = createAnthropic({ - apiKey: env.ANTHROPIC_API_KEY, - baseURL: - "https://gateway.ai.cloudflare.com/v1/47c2b4d598af9d423c06fc9f936226d5/supermemory/anthropic", - }); - selectedModel = anthropic.chat("claude-3-opus-20240229"); - console.log("Selected model: ", selectedModel); - break; - case "gemini-1.5-pro": - const googleai = createGoogleGenerativeAI({ - apiKey: env.GOOGLE_AI_API_KEY, - baseURL: - "https://gateway.ai.cloudflare.com/v1/47c2b4d598af9d423c06fc9f936226d5/supermemory/google-vertex-ai", - }); - selectedModel = googleai.chat("models/gemini-1.5-pro-latest"); - console.log("Selected model: ", selectedModel); - break; - case "gpt-4o": - default: - const openai = createOpenAI({ - apiKey: env.OPENAI_API_KEY, - baseURL: - "https://gateway.ai.cloudflare.com/v1/47c2b4d598af9d423c06fc9f936226d5/supermemory/openai", - compatibility: "strict", - }); - selectedModel = openai.chat("gpt-4o-mini"); - break; - } - - return { store, model: selectedModel }; -}
\ No newline at end of file diff --git a/apps/cf-ai-backend/src/queueConsumer/helpers/processPage.ts b/apps/cf-ai-backend/src/queueConsumer/helpers/processPage.ts index 6b28c975..f967736e 100644 --- a/apps/cf-ai-backend/src/queueConsumer/helpers/processPage.ts +++ b/apps/cf-ai-backend/src/queueConsumer/helpers/processPage.ts @@ -10,13 +10,14 @@ class ProcessPageError extends BaseError { type PageProcessResult = { pageContent: string; metadata: Metadata }; -export async function processPage( - url: string, -): Promise<Result<PageProcessResult, ProcessPageError>> { +export async function processPage(input: { + url: string; + securityKey: string; +}): Promise<Result<PageProcessResult, ProcessPageError>> { try { - const response = await fetch("https://md.dhr.wtf/?url=" + url, { + const response = await fetch("https://md.dhr.wtf/?url=" + input.url, { headers: { - Authorization: "Bearer " + process.env.BACKEND_SECURITY_KEY, + Authorization: "Bearer " + input.securityKey, }, }); const pageContent = await response.text(); @@ -29,7 +30,7 @@ export async function processPage( ); } console.log("[This is the page content]", pageContent); - const metadataResult = await getMetaData(url); + const metadataResult = await getMetaData(input.url); if (isErr(metadataResult)) { throw metadataResult.error; } diff --git a/apps/cf-ai-backend/src/queueConsumer/helpers/processTweet.ts b/apps/cf-ai-backend/src/queueConsumer/helpers/processTweet.ts index ef5d9f5b..8d83f2dc 100644 --- a/apps/cf-ai-backend/src/queueConsumer/helpers/processTweet.ts +++ b/apps/cf-ai-backend/src/queueConsumer/helpers/processTweet.ts @@ -3,6 +3,7 @@ import { Result, Ok, Err, isErr } from "../../errors/results"; import { BaseError } from "../../errors/baseError"; import { getMetaData, Metadata } from "../utils/get-metadata"; import { tweetToMd } from "@repo/shared-types/utils"; // can I do this? +import { Env } from "../../types"; class ProcessTweetError extends BaseError { constructor(message?: string, source?: string) { @@ -43,39 +44,45 @@ export const getTweetData = async ( } }; -export const getThreadData = async ( - tweetUrl: string, - cf_thread_endpoint: string, - authKey: string, -): Promise<Result<string, ProcessTweetError>> => { - const threadRequest = await fetch(cf_thread_endpoint, { - method: "POST", - headers: { - "Content-Type": "application/json", - Authorization: authKey, - }, - body: JSON.stringify({ url: tweetUrl }), - }); - if (threadRequest.status !== 200) { - return Err( - new ProcessTweetError( - `Failed to fetch the thread: ${tweetUrl}, Reason: ${threadRequest.statusText}`, - "getThreadData", - ), - ); - } - - const thread = await threadRequest.text(); - console.log("[thread response]"); +export const getThreadData = async (input: { + tweetUrl: string; + env: Env; +}): Promise<Result<any, ProcessTweetError>> => { + try { + // const threadRequest = await fetch(input.cf_thread_endpoint, { + // method: "POST", + // headers: { + // "Content-Type": "application/json", + // Authorization: input.authKey, + // }, + // body: JSON.stringify({ url: input.tweetUrl }), + // }); + // if (threadRequest.status !== 200) { + // console.log(await threadRequest.text()); + // console.log(input.tweetUrl); + // return Err( + // new ProcessTweetError( + // `Failed to fetch the thread: ${input.tweetUrl}, Reason: ${threadRequest.statusText}`, + // "getThreadData", + // ), + // ); + // } + //@ts-ignore + const thread = await input.env.THREAD.processTweets(input.tweetUrl); + console.log("[thread response]", thread); - if (thread.trim().length === 2) { - console.log("Thread is an empty array"); - return Err( - new ProcessTweetError( - "[THREAD FETCHING SERVICE] Got no content form thread worker", - "getThreadData", - ), - ); + if (!thread.length) { + console.log("Thread is an empty array"); + return Err( + new ProcessTweetError( + "[THREAD FETCHING SERVICE] Got no content form thread worker", + "getThreadData", + ), + ); + } + return Ok(thread); + } catch (e) { + console.error("[Thread Processing Error]", e); + return Err(new ProcessTweetError((e as Error).message, "getThreadData")); } - return Ok(thread); }; diff --git a/apps/cf-ai-backend/src/queueConsumer/index.ts b/apps/cf-ai-backend/src/queueConsumer/index.ts index 8b9064cb..8ca23739 100644 --- a/apps/cf-ai-backend/src/queueConsumer/index.ts +++ b/apps/cf-ai-backend/src/queueConsumer/index.ts @@ -5,15 +5,21 @@ import { processNote } from "./helpers/processNotes"; import { processPage } from "./helpers/processPage"; import { getThreadData, getTweetData } from "./helpers/processTweet"; import { tweetToMd } from "@repo/shared-types/utils"; -import { initQQuery } from "./helpers/initQuery"; import { chunkNote, chunkPage } from "./chunkers/chunkPageOrNotes"; import { chunkThread } from "./chunkers/chunkTweet"; -import { batchCreateChunksAndEmbeddings } from "../helper"; +import { batchCreateChunksAndEmbeddings, initQuery } from "../helper"; import { z } from "zod"; import { Metadata } from "./utils/get-metadata"; import { BaseError } from "../errors/baseError"; import { database } from "../db"; -import { storedContent, space, contentToSpace } from "@repo/db/schema"; +import { + storedContent, + space, + contentToSpace, + users, + jobs, + Job, +} from "@repo/db/schema"; import { and, eq, inArray, sql } from "drizzle-orm"; class VectorInsertError extends BaseError { @@ -29,24 +35,99 @@ class D1InsertError extends BaseError { } } +const d1ErrorFactory = (err: Error, source: string) => + new D1InsertError(err.message, source); + +const calculateExponentialBackoff = ( + attempts: number, + baseDelaySeconds: number, +) => { + return baseDelaySeconds ** attempts; +}; + +const BASE_DELAY_SECONDS = 1.5; export async function queue( - batch: MessageBatch<{ content: string; space: Array<number>; user: string }>, + batch: MessageBatch<{ + content: string; + space: Array<number>; + user: string; + type: string; + }>, env: Env, ): Promise<void> { + const db = database(env); console.log(env.CF_ACCOUNT_ID, env.CF_KV_AUTH_TOKEN); for (let message of batch.messages) { console.log(env.CF_ACCOUNT_ID, env.CF_KV_AUTH_TOKEN); console.log("is thie even running?", message.body); const body = message.body; - console.log("v got shit in the queue", body); - const typeResult = typeDecider(body.content); + const type = body.type; + const userExists = await wrap( + db.select().from(users).where(eq(users.id, body.user)).limit(1), + d1ErrorFactory, + "Error when trying to verify user", + ); + + if (isErr(userExists)) { + throw userExists.error; + } + + //check if this is a retry job.. by checking if the combination of the userId and the url already exists on the queue + let jobId; + const existingJob = await wrap( + db + .select() + .from(jobs) + .where( + and( + eq(jobs.userId, userExists.value[0].id), + eq(jobs.url, body.content), + ), + ) + .limit(1), + d1ErrorFactory, + "Error when checking for existing job", + ); + + if (isErr(existingJob)) { + throw existingJob.error; + } - if (isErr(typeResult)) { - throw typeResult.error; + if (existingJob.value.length > 0) { + jobId = existingJob.value[0].id; + await wrap( + db + .update(jobs) + .set({ + attempts: existingJob.value[0].attempts + 1, + updatedAt: new Date(), + }) + .where(eq(jobs.id, jobId)), + d1ErrorFactory, + "Error when updating job attempts", + ); + } else { + const job = await wrap( + db + .insert(jobs) + .values({ + userId: userExists.value[0].id as string, + url: body.content, + status: "Processing", + attempts: 1, + createdAt: new Date(), + updatedAt: new Date(), + }) + .returning({ jobId: jobs.id }), + d1ErrorFactory, + "Error When inserting into jobs table", + ); + if (isErr(job)) { + throw job.error; + } + jobId = job.value[0].jobId; } - console.log(typeResult.value); - const type = typeResult.value; let pageContent: string; let vectorData: string; @@ -70,8 +151,12 @@ export async function queue( } case "page": { console.log("page hit"); - const page = await processPage(body.content); + const page = await processPage({ + url: body.content, + securityKey: env.MD_SEC_KEY, + }); if (isErr(page)) { + console.log("there is a page error here"); throw page.error; } pageContent = page.value.pageContent; @@ -83,15 +168,13 @@ export async function queue( case "tweet": { console.log("tweet hit"); - console.log(body.content.split("/").pop()); const tweet = await getTweetData(body.content.split("/").pop()); - console.log(tweet); - const thread = await getThreadData( - body.content, - env.THREAD_CF_WORKER, - env.THREAD_CF_AUTH, - ); - + console.log(env.THREAD_CF_WORKER, env.THREAD_CF_AUTH); + const thread = await getThreadData({ + tweetUrl: body.content, + env: env, + }); + console.log("[This is the thread]", thread); if (isErr(tweet)) { throw tweet.error; } @@ -108,6 +191,7 @@ export async function queue( vectorData = JSON.stringify(pageContent); console.error(thread.error); } else { + console.log("thread worker is fine"); vectorData = thread.value; } chunks = chunkThread(vectorData); @@ -115,8 +199,27 @@ export async function queue( } } + //add to mem0, abstract + + // const mem0Response = fetch('https://api.mem0.ai/v1/memories/', { + // method: 'POST', + // headers: { + // 'Content-Type': 'application/json', + // Authorization: `Token ${process.env.MEM0_API_KEY}`, + // }, + // body: JSON.stringify({ + // messages: [ + // { + // role: 'user', + // content: query, + // }, + // ], + // user_id: user?.user?.email, + // }), + // }); + // see what's up with the storedToSpaces in this block - const { store } = await initQQuery(env); + const { store } = await initQuery(env); type body = z.infer<typeof vectorObj>; @@ -129,66 +232,135 @@ export async function queue( description: metadata.description, title: metadata.description, }; - const vectorResult = await wrap( - batchCreateChunksAndEmbeddings({ - store: store, - body: Chunkbody, - chunks: chunks, - env: env, - }), - vectorErrorFactory, - ); - if (isErr(vectorResult)) { - throw vectorResult.error; - } - const saveToDbUrl = - (metadata.baseUrl.split("#supermemory-user-")[0] ?? metadata.baseUrl) + - "#supermemory-user-" + - body.user; - let contentId: number; - const db = database(env); - const insertResponse = await db - .insert(storedContent) - .values({ - content: pageContent as string, - title: metadata.title, - description: metadata.description, - url: saveToDbUrl, - baseUrl: saveToDbUrl, - image: metadata.image, - savedAt: new Date(), - userId: body.user, - type: type, - noteId: noteId, - }) - .returning({ id: storedContent.id }); - - if (!insertResponse[0]?.id) { - throw new D1InsertError( - "something went worng when inserting to database", - "inresertResponse", + try { + const vectorResult = await wrap( + batchCreateChunksAndEmbeddings({ + store: store, + body: Chunkbody, + chunks: chunks, + env: env, + }), + vectorErrorFactory, + "Error when Inserting into vector database", ); - } - contentId = insertResponse[0]?.id; - if (storeToSpaces.length > 0) { - // Adding the many-to-many relationship between content and spaces - const spaceData = await db - .select() - .from(space) - .where(and(inArray(space.id, storeToSpaces), eq(space.user, body.user))) - .all(); - await Promise.all( - spaceData.map(async (s) => { - await db - .insert(contentToSpace) - .values({ contentId: contentId, spaceId: s.id }); + if (isErr(vectorResult)) { + await db + .update(jobs) + .set({ error: vectorResult.error }) + .where(eq(jobs.id, jobId)); + message.retry({ + delaySeconds: calculateExponentialBackoff( + message.attempts, + BASE_DELAY_SECONDS, + ), + }); + throw vectorResult.error; + } - await db.update(space).set({ numItems: s.numItems + 1 }); - }), + const saveToDbUrl = + (metadata.baseUrl.split("#supermemory-user-")[0] ?? metadata.baseUrl) + + "#supermemory-user-" + + body.user; + let contentId: number; + + const insertResponse = await wrap( + db + .insert(storedContent) + .values({ + content: pageContent as string, + title: metadata.title, + description: metadata.description, + url: saveToDbUrl, + baseUrl: saveToDbUrl, + image: metadata.image, + savedAt: new Date(), + userId: body.user, + type: type, + noteId: noteId, + }) + .returning({ id: storedContent.id }), + d1ErrorFactory, + "Error when inserting into storedContent", + ); + + if (isErr(insertResponse)) { + await db + .update(jobs) + .set({ error: insertResponse.error }) + .where(eq(jobs.id, jobId)); + message.retry({ + delaySeconds: calculateExponentialBackoff( + message.attempts, + BASE_DELAY_SECONDS, + ), + }); + throw insertResponse.error; + } + console.log(JSON.stringify(insertResponse)); + contentId = insertResponse[0]?.id; + console.log("this is the content Id", contentId); + if (storeToSpaces.length > 0) { + // Adding the many-to-many relationship between content and spaces + const spaceData = await wrap( + db + .select() + .from(space) + .where( + and(inArray(space.id, storeToSpaces), eq(space.user, body.user)), + ) + .all(), + d1ErrorFactory, + "Error when getting data from spaces", + ); + + if (isErr(spaceData)) { + throw spaceData.error; + } + try { + await Promise.all( + spaceData.value.map(async (s) => { + try { + await db + .insert(contentToSpace) + .values({ contentId: contentId, spaceId: s.id }); + + await db.update(space).set({ numItems: s.numItems + 1 }); + } catch (e) { + console.error(`Error updating space ${s.id}:`, e); + throw e; + } + }), + ); + } catch (e) { + console.error("Error in updateSpacesWithContent:", e); + throw new Error(`Failed to update spaces: ${e.message}`); + } + } + } catch (e) { + console.error("Error in simulated transaction", e.message); + console.log("Rooling back changes"); + message.retry({ + delaySeconds: calculateExponentialBackoff( + message.attempts, + BASE_DELAY_SECONDS, + ), + }); + throw new D1InsertError( + "Error when inserting into d1", + "D1 stuff after the vectorize", ); } + + // After the d1 and vectories suceeds then finally update the jobs table to indicate that the job has completed + + await db + .update(jobs) + .set({ status: "Processed" }) + .where(eq(jobs.id, jobId)); + + return; } } @@ -199,6 +371,18 @@ To do: 3. remove getMetada form the lib file as it's not being used anywhere else 4. Figure out the limit stuff ( server action for that seems fine because no use in limiting after they already in the queue rigth? ) 5. Figure out the initQuery stuff ( ;( ) --> This is a bad way of doing stuff :0 -6. How do I hande the content already exists wala use case? -7. Figure out retry and not add shit to the vectirze over and over again on failure +6. How do I hande the content already exists wala use case? --> Also how do I figure out limits? + + + +8. Wrap the d1 thing in a transaction and then write to vectorize if d1 is sucessful if it's not then just error out ( if d1 fails dlq, recoverable failure --> retry ) + +Firt write to d1 in a transaction ( sotredContent + sapces ) --> write to vectorize --> vectorize failes --> reset d1 alternatively first we can also do the vectorise stuff if that suceeds then do the d1 stuff in a batch right? + + +DEBUG: +What's hapenning: +1. The stuff in the d1 is updating but nothing is hapenning in the vectorize for some reason + + */ diff --git a/apps/cf-ai-backend/src/types.ts b/apps/cf-ai-backend/src/types.ts index 5294e0a1..e4f13f1b 100644 --- a/apps/cf-ai-backend/src/types.ts +++ b/apps/cf-ai-backend/src/types.ts @@ -18,14 +18,17 @@ export type Env = { MYBROWSER: unknown; ANTHROPIC_API_KEY: string; THREAD_CF_AUTH: string; + THREAD: { processTweets: () => Promise<Array<any>> }; THREAD_CF_WORKER: string; NODE_ENV: string; + MD_SEC_KEY: string; }; export interface JobData { content: string; space: Array<number>; user: string; + type: string } export interface TweetData { diff --git a/apps/cf-ai-backend/wrangler.toml b/apps/cf-ai-backend/wrangler.toml index ca8e2b1d..665ca593 100644 --- a/apps/cf-ai-backend/wrangler.toml +++ b/apps/cf-ai-backend/wrangler.toml @@ -3,8 +3,12 @@ main = "src/index.ts" compatibility_date = "2024-02-23" node_compat = true -tail_consumers = [{service = "new-cf-ai-backend-tail"}] +# tail_consumers = [{service = "new-cf-ai-backend-tail"}] +[[services]] +binding = "THREAD" +service = "tweet-thread" +entrypoint = "ThreadWorker" # [env.preview] [[vectorize]] diff --git a/apps/web/app/actions/doers.ts b/apps/web/app/actions/doers.ts index f17032b9..500a8608 100644 --- a/apps/web/app/actions/doers.ts +++ b/apps/web/app/actions/doers.ts @@ -17,7 +17,7 @@ import { auth } from "../../server/auth"; import { Tweet } from "react-tweet/api"; // import { getMetaData } from "@/lib/get-metadata"; import { and, eq, inArray, sql } from "drizzle-orm"; -import { LIMITS } from "@/lib/constants"; +import { LIMITS } from "@repo/shared-types"; import { ChatHistory } from "@repo/shared-types"; import { decipher } from "@/server/encrypt"; import { redirect } from "next/navigation"; @@ -104,25 +104,6 @@ const typeDecider = (content: string): "page" | "tweet" | "note" => { } }; -export const limit = async ( - userId: string, - type = "page", - items: number = 1, -) => { - const countResult = await db - .select({ - count: sql<number>`count(*)`.mapWith(Number), - }) - .from(storedContent) - .where(and(eq(storedContent.userId, userId), eq(storedContent.type, type))); - - const currentCount = countResult[0]?.count || 0; - const totalLimit = LIMITS[type as keyof typeof LIMITS]; - const remainingLimit = totalLimit - currentCount; - - return items <= remainingLimit; -}; - export const addUserToSpace = async (userEmail: string, spaceId: number) => { const data = await auth(); @@ -197,7 +178,6 @@ export const createMemory = async (input: { return { error: "Not authenticated", success: false }; } - // make the backend reqeust for the queue here const vectorSaveResponses = await fetch( `${process.env.BACKEND_BASE_URL}/api/add`, @@ -214,250 +194,262 @@ export const createMemory = async (input: { }, }, ); + const response = (await vectorSaveResponses.json()) as { + status: string; + message?: string; + }; + + if (response.status !== "ok") { + return { + success: false, + data: 0, + error: response.message, + }; + } -// const type = typeDecider(input.content); - -// let pageContent = input.content; -// let metadata: Awaited<ReturnType<typeof getMetaData>>; -// let vectorData: string; - -// if (!(await limit(data.user.id, type))) { -// return { -// success: false, -// data: 0, -// error: `You have exceeded the limit of ${LIMITS[type as keyof typeof LIMITS]} ${type}s.`, -// }; -// } - -// let noteId = 0; - -// if (type === "page") { -// const response = await fetch("https://md.dhr.wtf/?url=" + input.content, { -// headers: { -// Authorization: "Bearer " + process.env.BACKEND_SECURITY_KEY, -// }, -// }); -// pageContent = await response.text(); -// vectorData = pageContent; -// try { -// metadata = await getMetaData(input.content); -// } catch (e) { -// return { -// success: false, -// error: "Failed to fetch metadata for the page. Please try again later.", -// }; -// } -// } else if (type === "tweet") { -// //Request the worker for the entire thread - -// let thread: string; -// let errorOccurred: boolean = false; - -// try { -// const cf_thread_endpoint = process.env.THREAD_CF_WORKER; -// const authKey = process.env.THREAD_CF_AUTH; -// const threadRequest = await fetch(cf_thread_endpoint, { -// method: "POST", -// headers: { -// "Content-Type": "application/json", -// Authorization: authKey, -// }, -// body: JSON.stringify({ url: input.content }), -// }); - -// if (threadRequest.status !== 200) { -// throw new Error( -// `Failed to fetch the thread: ${input.content}, Reason: ${threadRequest.statusText}`, -// ); -// } - -// thread = await threadRequest.text(); -// if (thread.trim().length === 2) { -// console.log("Thread is an empty array"); -// throw new Error( -// "[THREAD FETCHING SERVICE] Got no content form thread worker", -// ); -// } -// } catch (e) { -// console.log("[THREAD FETCHING SERVICE] Failed to fetch the thread", e); -// errorOccurred = true; -// } - -// const tweet = await getTweetData(input.content.split("/").pop() as string); - -// pageContent = tweetToMd(tweet); -// console.log("THis ishte page content!!", pageContent); -// //@ts-ignore -// vectorData = errorOccurred ? JSON.stringify(pageContent) : thread; -// metadata = { -// baseUrl: input.content, -// description: tweet.text.slice(0, 200), -// image: tweet.user.profile_image_url_https, -// title: `Tweet by ${tweet.user.name}`, -// }; -// } else if (type === "note") { -// pageContent = input.content; -// vectorData = pageContent; -// noteId = new Date().getTime(); -// metadata = { -// baseUrl: `https://supermemory.ai/note/${noteId}`, -// description: `Note created at ${new Date().toLocaleString()}`, -// image: "https://supermemory.ai/logo.png", -// title: `${pageContent.slice(0, 20)} ${pageContent.length > 20 ? "..." : ""}`, -// }; -// } else { -// return { -// success: false, -// data: 0, -// error: "Invalid type", -// }; -// } - -// let storeToSpaces = input.spaces; - -// if (!storeToSpaces) { -// storeToSpaces = []; -// } - -// const vectorSaveResponse = await fetch( -// `${process.env.BACKEND_BASE_URL}/api/add`, -// { -// method: "POST", -// body: JSON.stringify({ -// pageContent: vectorData, -// title: metadata.title, -// description: metadata.description, -// url: metadata.baseUrl, -// spaces: storeToSpaces.map((spaceId) => spaceId.toString()), -// user: data.user.id, -// type, -// }), -// headers: { -// "Content-Type": "application/json", -// Authorization: "Bearer " + process.env.BACKEND_SECURITY_KEY, -// }, -// }, -// ); - -// if (!vectorSaveResponse.ok) { -// const errorData = await vectorSaveResponse.text(); -// console.error(errorData); -// return { -// success: false, -// data: 0, -// error: `Failed to save to vector store. Backend returned error: ${errorData}`, -// }; -// } - -// let contentId: number; - -// const response = (await vectorSaveResponse.json()) as { -// status: string; -// chunkedInput: string; -// message?: string; -// }; - -// try { -// if (response.status !== "ok") { -// if (response.status === "error") { -// return { -// success: false, -// data: 0, -// error: response.message, -// }; -// } else { -// return { -// success: false, -// data: 0, -// error: `Failed to save to vector store. Backend returned error: ${response.message}`, -// }; -// } -// } -// } catch (e) { -// return { -// success: false, -// data: 0, -// error: `Failed to save to vector store. Backend returned error: ${e}`, -// }; -// } - -// const saveToDbUrl = -// (metadata.baseUrl.split("#supermemory-user-")[0] ?? metadata.baseUrl) + -// "#supermemory-user-" + -// data.user.id; - -// // Insert into database -// try { -// const insertResponse = await db -// .insert(storedContent) -// .values({ -// content: pageContent, -// title: metadata.title, -// description: metadata.description, -// url: saveToDbUrl, -// baseUrl: saveToDbUrl, -// image: metadata.image, -// savedAt: new Date(), -// userId: data.user.id, -// type, -// noteId, -// }) -// .returning({ id: storedContent.id }); -// revalidatePath("/memories"); -// revalidatePath("/home"); - -// if (!insertResponse[0]?.id) { -// return { -// success: false, -// data: 0, -// error: "Something went wrong while saving the document to the database", -// }; -// } - -// contentId = insertResponse[0]?.id; -// } catch (e) { -// const error = e as Error; -// console.log("Error: ", error.message); - -// if ( -// error.message.includes( -// "D1_ERROR: UNIQUE constraint failed: storedContent.baseUrl", -// ) -// ) { -// return { -// success: false, -// data: 0, -// error: "Content already exists", -// }; -// } - -// return { -// success: false, -// data: 0, -// error: "Failed to save to database with error: " + error.message, -// }; -// } - -// if (storeToSpaces.length > 0) { -// // Adding the many-to-many relationship between content and spaces -// const spaceData = await db -// .select() -// .from(space) -// .where( -// and(inArray(space.id, storeToSpaces), eq(space.user, data.user.id)), -// ) -// .all(); - -// await Promise.all( -// spaceData.map(async (s) => { -// await db -// .insert(contentToSpace) -// .values({ contentId: contentId, spaceId: s.id }); - -// await db.update(space).set({ numItems: s.numItems + 1 }); -// }), -// ); -// } + // const type = typeDecider(input.content); + + // let pageContent = input.content; + // let metadata: Awaited<ReturnType<typeof getMetaData>>; + // let vectorData: string; + + // if (!(await limit(data.user.id, type))) { + // return { + // success: false, + // data: 0, + // error: `You have exceeded the limit of ${LIMITS[type as keyof typeof LIMITS]} ${type}s.`, + // }; + // } --> How would this fit in the backend??? + + // let noteId = 0; + + // if (type === "page") { + // const response = await fetch("https://md.dhr.wtf/?url=" + input.content, { + // headers: { + // Authorization: "Bearer " + process.env.BACKEND_SECURITY_KEY, + // }, + // }); + // pageContent = await response.text(); + // vectorData = pageContent; + // try { + // metadata = await getMetaData(input.content); + // } catch (e) { + // return { + // success: false, + // error: "Failed to fetch metadata for the page. Please try again later.", + // }; + // } + // } else if (type === "tweet") { + // //Request the worker for the entire thread + + // let thread: string; + // let errorOccurred: boolean = false; + + // try { + // const cf_thread_endpoint = process.env.THREAD_CF_WORKER; + // const authKey = process.env.THREAD_CF_AUTH; + // const threadRequest = await fetch(cf_thread_endpoint, { + // method: "POST", + // headers: { + // "Content-Type": "application/json", + // Authorization: authKey, + // }, + // body: JSON.stringify({ url: input.content }), + // }); + + // if (threadRequest.status !== 200) { + // throw new Error( + // `Failed to fetch the thread: ${input.content}, Reason: ${threadRequest.statusText}`, + // ); + // } + + // thread = await threadRequest.text(); + // if (thread.trim().length === 2) { + // console.log("Thread is an empty array"); + // throw new Error( + // "[THREAD FETCHING SERVICE] Got no content form thread worker", + // ); + // } + // } catch (e) { + // console.log("[THREAD FETCHING SERVICE] Failed to fetch the thread", e); + // errorOccurred = true; + // } + + // const tweet = await getTweetData(input.content.split("/").pop() as string); + + // pageContent = tweetToMd(tweet); + // console.log("THis ishte page content!!", pageContent); + // //@ts-ignore + // vectorData = errorOccurred ? JSON.stringify(pageContent) : thread; + // metadata = { + // baseUrl: input.content, + // description: tweet.text.slice(0, 200), + // image: tweet.user.profile_image_url_https, + // title: `Tweet by ${tweet.user.name}`, + // }; + // } else if (type === "note") { + // pageContent = input.content; + // vectorData = pageContent; + // noteId = new Date().getTime(); + // metadata = { + // baseUrl: `https://supermemory.ai/note/${noteId}`, + // description: `Note created at ${new Date().toLocaleString()}`, + // image: "https://supermemory.ai/logo.png", + // title: `${pageContent.slice(0, 20)} ${pageContent.length > 20 ? "..." : ""}`, + // }; + // } else { + // return { + // success: false, + // data: 0, + // error: "Invalid type", + // }; + // } + + // let storeToSpaces = input.spaces; + + // if (!storeToSpaces) { + // storeToSpaces = []; + // } + + // const vectorSaveResponse = await fetch( + // `${process.env.BACKEND_BASE_URL}/api/add`, + // { + // method: "POST", + // body: JSON.stringify({ + // pageContent: vectorData, + // title: metadata.title, + // description: metadata.description, + // url: metadata.baseUrl, + // spaces: storeToSpaces.map((spaceId) => spaceId.toString()), + // user: data.user.id, + // type, + // }), + // headers: { + // "Content-Type": "application/json", + // Authorization: "Bearer " + process.env.BACKEND_SECURITY_KEY, + // }, + // }, + // ); + + // if (!vectorSaveResponse.ok) { + // const errorData = await vectorSaveResponse.text(); + // console.error(errorData); + // return { + // success: false, + // data: 0, + // error: `Failed to save to vector store. Backend returned error: ${errorData}`, + // }; + // } + + // let contentId: number; + + // const response = (await vectorSaveResponse.json()) as { + // status: string; + // chunkedInput: string; + // message?: string; + // }; + + // try { + // if (response.status !== "ok") { + // if (response.status === "error") { + // return { + // success: false, + // data: 0, + // error: response.message, + // }; + // } else { + // return { + // success: false, + // data: 0, + // error: `Failed to save to vector store. Backend returned error: ${response.message}`, + // }; + // } + // } + // } catch (e) { + // return { + // success: false, + // data: 0, + // error: `Failed to save to vector store. Backend returned error: ${e}`, + // }; + // } + + // const saveToDbUrl = + // (metadata.baseUrl.split("#supermemory-user-")[0] ?? metadata.baseUrl) + + // "#supermemory-user-" + + // data.user.id; + + // // Insert into database + // try { + // const insertResponse = await db + // .insert(storedContent) + // .values({ + // content: pageContent, + // title: metadata.title, + // description: metadata.description, + // url: saveToDbUrl, + // baseUrl: saveToDbUrl, + // image: metadata.image, + // savedAt: new Date(), + // userId: data.user.id, + // type, + // noteId, + // }) + // .returning({ id: storedContent.id }); + // revalidatePath("/memories"); + // revalidatePath("/home"); + + // if (!insertResponse[0]?.id) { + // return { + // success: false, + // data: 0, + // error: "Something went wrong while saving the document to the database", + // }; + // } + + // contentId = insertResponse[0]?.id; + // } catch (e) { + // const error = e as Error; + // console.log("Error: ", error.message); + + // if ( + // error.message.includes( + // "D1_ERROR: UNIQUE constraint failed: storedContent.baseUrl", + // ) + // ) { + // return { + // success: false, + // data: 0, + // error: "Content already exists", + // }; + // } + + // return { + // success: false, + // data: 0, + // error: "Failed to save to database with error: " + error.message, + // }; + // } + + // if (storeToSpaces.length > 0) { + // // Adding the many-to-many relationship between content and spaces + // const spaceData = await db + // .select() + // .from(space) + // .where( + // and(inArray(space.id, storeToSpaces), eq(space.user, data.user.id)), + // ) + // .all(); + + // await Promise.all( + // spaceData.map(async (s) => { + // await db + // .insert(contentToSpace) + // .values({ contentId: contentId, spaceId: s.id }); + + // await db.update(space).set({ numItems: s.numItems + 1 }); + // }), + // ); + // } return { success: true, @@ -475,7 +467,6 @@ export const createChatThread = async ( return { error: "Not authenticated", success: false }; } - const thread = await db .insert(chatThreads) .values({ @@ -836,8 +827,9 @@ export async function getQuerySuggestions() { }; } - const fullQuery = (content?.map((c) => `${c.title} \n\n${c.content}`) ?? []) - .join(" "); + const fullQuery = ( + content?.map((c) => `${c.title} \n\n${c.content}`) ?? [] + ).join(" "); const suggestionsCall = (await env.AI.run( // @ts-ignore diff --git a/apps/web/app/api/store/helper.ts b/apps/web/app/api/store/helper.ts index 2dc42125..6ab7fb23 100644 --- a/apps/web/app/api/store/helper.ts +++ b/apps/web/app/api/store/helper.ts @@ -2,21 +2,21 @@ import { z } from "zod"; import { db } from "@/server/db"; import { contentToSpace, space, storedContent } from "@repo/db/schema"; import { and, eq, inArray } from "drizzle-orm"; -import { LIMITS } from "@/lib/constants"; -import { limit } from "@/app/actions/doers"; +// import { LIMITS } from "@repo/shared-types"; +// import { limit } from "@/app/actions/doers"; import { type AddFromAPIType } from "@repo/shared-types"; export const createMemoryFromAPI = async (input: { data: AddFromAPIType; userId: string; }) => { - if (!(await limit(input.userId, input.data.type))) { - return { - success: false, - data: 0, - error: `You have exceeded the limit of ${LIMITS[input.data.type as keyof typeof LIMITS]} ${input.data.type}s.`, - }; - } + // if (!(await limit(input.userId, input.data.type))) { + // return { + // success: false, + // data: 0, + // error: `You have exceeded the limit of ${LIMITS[input.data.type as keyof typeof LIMITS]} ${input.data.type}s.`, + // }; + // } const vectorSaveResponse = await fetch( `${process.env.BACKEND_BASE_URL}/api/add`, diff --git a/apps/web/lib/constants.ts b/apps/web/lib/constants.ts index 241a6a1d..73f9a83d 100644 --- a/apps/web/lib/constants.ts +++ b/apps/web/lib/constants.ts @@ -1,9 +1,3 @@ -export const LIMITS = { - page: 100, - tweet: 1000, - note: 1000, -}; - export const codeLanguageSubset = [ "python", "javascript", diff --git a/packages/db/schema.ts b/packages/db/schema.ts index 11711997..70860066 100644 --- a/packages/db/schema.ts +++ b/packages/db/schema.ts @@ -256,8 +256,14 @@ export const jobs = createTable( attempts: integer("attempts").notNull().default(0), lastAttemptAt: integer("lastAttemptAt"), error: blob("error"), - createdAt: integer("createdAt").notNull(), - updatedAt: integer("updatedAt").notNull(), + createdAt: int("createdAt", { mode: "timestamp" }) + .notNull() + .notNull() + .default(new Date()), + updatedAt: int("updatedAt", { mode: "timestamp" }) + .notNull() + .notNull() + .default(new Date()), }, (job) => ({ userIdx: index("jobs_userId_idx").on(job.userId), diff --git a/packages/shared-types/index.ts b/packages/shared-types/index.ts index a9933b84..e49cf8e0 100644 --- a/packages/shared-types/index.ts +++ b/packages/shared-types/index.ts @@ -1,5 +1,14 @@ import { z } from "zod"; + +export const LIMITS = { + page: 100, + tweet: 1000, + note: 1000, +}; + + + export const SourceZod = z.object({ type: z.string(), source: z.string(), |