aboutsummaryrefslogtreecommitdiff
path: root/src/trigger/notifications.ts
diff options
context:
space:
mode:
Diffstat (limited to 'src/trigger/notifications.ts')
-rw-r--r--src/trigger/notifications.ts156
1 files changed, 121 insertions, 35 deletions
diff --git a/src/trigger/notifications.ts b/src/trigger/notifications.ts
index 77b8713f..e36f913f 100644
--- a/src/trigger/notifications.ts
+++ b/src/trigger/notifications.ts
@@ -1,38 +1,124 @@
-import { schedules, envvars } from '@trigger.dev/sdk/v3';
-import * as webpush from 'web-push';
-import { createClient } from '@supabase/supabase-js';
+import { createClient } from "@supabase/supabase-js";
+import { envvars, schedules } from "@trigger.dev/sdk";
+import * as webpush from "web-push";
+
+const isExpiredSubscriptionError = (error: unknown) => {
+ const statusCode =
+ typeof error === "object" &&
+ error !== null &&
+ "statusCode" in error &&
+ typeof error.statusCode === "number"
+ ? error.statusCode
+ : null;
+
+ return statusCode === 404 || statusCode === 410;
+};
+
+const subscriptionEndpoint = (subscription: unknown) =>
+ typeof subscription === "object" &&
+ subscription !== null &&
+ "endpoint" in subscription &&
+ typeof subscription.endpoint === "string" &&
+ subscription.endpoint.length
+ ? subscription.endpoint
+ : null;
export const notificationsTask = schedules.task({
- id: 'notifications',
- run: async (_payload, { ctx }) => {
- const environment = ctx.environment.slug;
- const triggerProjectReference = ctx.project.ref;
- const getUserSubscriptions = async () => {
- const { data, error } = await createClient(
- (await envvars.retrieve(triggerProjectReference, environment, 'SUPABASE_URL')).value,
- (await envvars.retrieve(triggerProjectReference, environment, 'SUPABASE_ANON_KEY')).value
- )
- .from('user_notifications')
- .select('*');
-
- if (error) return [];
-
- return data;
- };
-
- webpush.setVapidDetails(
- (await envvars.retrieve(triggerProjectReference, environment, 'VAPID_SUBJECT')).value,
- (await envvars.retrieve(triggerProjectReference, environment, 'VAPID_PUBLIC_KEY')).value,
- (await envvars.retrieve(triggerProjectReference, environment, 'VAPID_PRIVATE_KEY')).value
- );
-
- for (const subscription of await getUserSubscriptions())
- try {
- await webpush.sendNotification(subscription['subscription'], '.');
- } catch (error) {
- console.error(error);
- }
-
- return {};
- }
+ id: "notifications",
+ run: async (_payload, { ctx: taskContext }) => {
+ const environment = taskContext.environment.slug;
+ const triggerProjectReference = taskContext.project.ref;
+ const supabase = createClient(
+ (
+ await envvars.retrieve(
+ triggerProjectReference,
+ environment,
+ "SUPABASE_URL",
+ )
+ ).value,
+ (
+ await envvars.retrieve(
+ triggerProjectReference,
+ environment,
+ "SUPABASE_SERVICE_ROLE_KEY",
+ )
+ ).value,
+ );
+ const getUserSubscriptions = async () => {
+ const { data, error } = await supabase
+ .from("user_notifications")
+ .select("*");
+
+ if (error) return [];
+
+ return data;
+ };
+ const deleteUserSubscription = async (
+ userId: number,
+ fingerprint: string,
+ ) => {
+ await supabase
+ .from("user_notifications")
+ .delete()
+ .eq("user_id", userId)
+ .eq("fingerprint", fingerprint);
+ };
+
+ const transientErrors: unknown[] = [];
+ const seenEndpoints = new Set<string>();
+
+ webpush.setVapidDetails(
+ (
+ await envvars.retrieve(
+ triggerProjectReference,
+ environment,
+ "VAPID_SUBJECT",
+ )
+ ).value,
+ (
+ await envvars.retrieve(
+ triggerProjectReference,
+ environment,
+ "VAPID_PUBLIC_KEY",
+ )
+ ).value,
+ (
+ await envvars.retrieve(
+ triggerProjectReference,
+ environment,
+ "VAPID_PRIVATE_KEY",
+ )
+ ).value,
+ );
+
+ for (const subscription of await getUserSubscriptions()) {
+ const endpoint = subscriptionEndpoint(subscription.subscription);
+
+ if (endpoint) {
+ if (seenEndpoints.has(endpoint)) continue;
+
+ seenEndpoints.add(endpoint);
+ }
+
+ try {
+ await webpush.sendNotification(subscription.subscription, ".");
+ } catch (error) {
+ if (isExpiredSubscriptionError(error))
+ await deleteUserSubscription(
+ subscription.user_id,
+ subscription.fingerprint,
+ );
+ else transientErrors.push(error);
+
+ console.error(error);
+ }
+ }
+
+ if (transientErrors.length > 0)
+ throw new Error("Transient push delivery failures occurred", {
+ cause: transientErrors[0],
+ });
+
+ return {};
+ },
});