diff options
Diffstat (limited to 'src/trigger/notifications.ts')
| -rw-r--r-- | src/trigger/notifications.ts | 156 |
1 files changed, 121 insertions, 35 deletions
diff --git a/src/trigger/notifications.ts b/src/trigger/notifications.ts index 77b8713f..e36f913f 100644 --- a/src/trigger/notifications.ts +++ b/src/trigger/notifications.ts @@ -1,38 +1,124 @@ -import { schedules, envvars } from '@trigger.dev/sdk/v3'; -import * as webpush from 'web-push'; -import { createClient } from '@supabase/supabase-js'; +import { createClient } from "@supabase/supabase-js"; +import { envvars, schedules } from "@trigger.dev/sdk"; +import * as webpush from "web-push"; + +const isExpiredSubscriptionError = (error: unknown) => { + const statusCode = + typeof error === "object" && + error !== null && + "statusCode" in error && + typeof error.statusCode === "number" + ? error.statusCode + : null; + + return statusCode === 404 || statusCode === 410; +}; + +const subscriptionEndpoint = (subscription: unknown) => + typeof subscription === "object" && + subscription !== null && + "endpoint" in subscription && + typeof subscription.endpoint === "string" && + subscription.endpoint.length + ? subscription.endpoint + : null; export const notificationsTask = schedules.task({ - id: 'notifications', - run: async (_payload, { ctx }) => { - const environment = ctx.environment.slug; - const triggerProjectReference = ctx.project.ref; - const getUserSubscriptions = async () => { - const { data, error } = await createClient( - (await envvars.retrieve(triggerProjectReference, environment, 'SUPABASE_URL')).value, - (await envvars.retrieve(triggerProjectReference, environment, 'SUPABASE_ANON_KEY')).value - ) - .from('user_notifications') - .select('*'); - - if (error) return []; - - return data; - }; - - webpush.setVapidDetails( - (await envvars.retrieve(triggerProjectReference, environment, 'VAPID_SUBJECT')).value, - (await envvars.retrieve(triggerProjectReference, environment, 'VAPID_PUBLIC_KEY')).value, - (await envvars.retrieve(triggerProjectReference, environment, 'VAPID_PRIVATE_KEY')).value - ); - - for (const subscription of await getUserSubscriptions()) - try { - await webpush.sendNotification(subscription['subscription'], '.'); - } catch (error) { - console.error(error); - } - - return {}; - } + id: "notifications", + run: async (_payload, { ctx: taskContext }) => { + const environment = taskContext.environment.slug; + const triggerProjectReference = taskContext.project.ref; + const supabase = createClient( + ( + await envvars.retrieve( + triggerProjectReference, + environment, + "SUPABASE_URL", + ) + ).value, + ( + await envvars.retrieve( + triggerProjectReference, + environment, + "SUPABASE_SERVICE_ROLE_KEY", + ) + ).value, + ); + const getUserSubscriptions = async () => { + const { data, error } = await supabase + .from("user_notifications") + .select("*"); + + if (error) return []; + + return data; + }; + const deleteUserSubscription = async ( + userId: number, + fingerprint: string, + ) => { + await supabase + .from("user_notifications") + .delete() + .eq("user_id", userId) + .eq("fingerprint", fingerprint); + }; + + const transientErrors: unknown[] = []; + const seenEndpoints = new Set<string>(); + + webpush.setVapidDetails( + ( + await envvars.retrieve( + triggerProjectReference, + environment, + "VAPID_SUBJECT", + ) + ).value, + ( + await envvars.retrieve( + triggerProjectReference, + environment, + "VAPID_PUBLIC_KEY", + ) + ).value, + ( + await envvars.retrieve( + triggerProjectReference, + environment, + "VAPID_PRIVATE_KEY", + ) + ).value, + ); + + for (const subscription of await getUserSubscriptions()) { + const endpoint = subscriptionEndpoint(subscription.subscription); + + if (endpoint) { + if (seenEndpoints.has(endpoint)) continue; + + seenEndpoints.add(endpoint); + } + + try { + await webpush.sendNotification(subscription.subscription, "."); + } catch (error) { + if (isExpiredSubscriptionError(error)) + await deleteUserSubscription( + subscription.user_id, + subscription.fingerprint, + ); + else transientErrors.push(error); + + console.error(error); + } + } + + if (transientErrors.length > 0) + throw new Error("Transient push delivery failures occurred", { + cause: transientErrors[0], + }); + + return {}; + }, }); |