diff options
| author | Fuwn <[email protected]> | 2026-02-07 01:42:57 -0800 |
|---|---|---|
| committer | Fuwn <[email protected]> | 2026-02-07 01:42:57 -0800 |
| commit | 5c5b1993edd890a80870ee05607ac5f088191d4e (patch) | |
| tree | a721b76bcd49ba10826c53efc87302c7a689512f /services/worker/internal/webhook/webhook.go | |
| download | asa.news-5c5b1993edd890a80870ee05607ac5f088191d4e.tar.xz asa.news-5c5b1993edd890a80870ee05607ac5f088191d4e.zip | |
feat: asa.news RSS reader with developer tier, REST API, and webhooks
Full-stack RSS reader SaaS: Supabase + Next.js + Go worker.
Includes three subscription tiers (free/pro/developer), API key auth,
read-only REST API, webhook push notifications, Stripe billing with
proration, and PWA support.
Diffstat (limited to 'services/worker/internal/webhook/webhook.go')
| -rw-r--r-- | services/worker/internal/webhook/webhook.go | 333 |
1 files changed, 333 insertions, 0 deletions
diff --git a/services/worker/internal/webhook/webhook.go b/services/worker/internal/webhook/webhook.go new file mode 100644 index 0000000..c812820 --- /dev/null +++ b/services/worker/internal/webhook/webhook.go @@ -0,0 +1,333 @@ +package webhook + +import ( + "bytes" + "context" + "crypto/hmac" + "crypto/sha256" + "encoding/hex" + "encoding/json" + "fmt" + "github.com/Fuwn/asa-news/internal/fetcher" + "github.com/Fuwn/asa-news/internal/model" + "github.com/jackc/pgx/v5/pgxpool" + "log/slog" + "net/http" + "time" +) + +type Dispatcher struct { + databaseConnectionPool *pgxpool.Pool + httpClient *http.Client + logger *slog.Logger +} + +type webhookSubscriber struct { + UserIdentifier string + WebhookURL string + WebhookSecret *string +} + +type EntryPayload struct { + EntryIdentifier string `json:"entryIdentifier"` + FeedIdentifier string `json:"feedIdentifier"` + GUID string `json:"guid"` + URL *string `json:"url"` + Title *string `json:"title"` + Author *string `json:"author"` + Summary *string `json:"summary"` + PublishedAt *string `json:"publishedAt"` + EnclosureURL *string `json:"enclosureUrl"` + EnclosureType *string `json:"enclosureType"` +} + +type WebhookPayload struct { + Event string `json:"event"` + Timestamp string `json:"timestamp"` + Entries []EntryPayload `json:"entries"` +} + +func NewDispatcher( + databaseConnectionPool *pgxpool.Pool, + logger *slog.Logger, +) *Dispatcher { + return &Dispatcher{ + databaseConnectionPool: databaseConnectionPool, + httpClient: &http.Client{ + Timeout: 10 * time.Second, + }, + logger: logger, + } +} + +func (webhookDispatcher *Dispatcher) DispatchForFeed( + dispatchContext context.Context, + feedIdentifier string, + entries []model.FeedEntry, +) { + subscribers, queryError := webhookDispatcher.findWebhookSubscribers(dispatchContext, feedIdentifier) + + if queryError != nil { + webhookDispatcher.logger.Error( + "failed to query webhook subscribers", + "feed_identifier", feedIdentifier, + "error", queryError, + ) + + return + } + + if len(subscribers) == 0 { + return + } + + entryPayloads := make([]EntryPayload, 0, len(entries)) + + for _, entry := range entries { + var publishedAtString *string + + if entry.PublishedAt != nil { + formattedTime := entry.PublishedAt.Format(time.RFC3339) + publishedAtString = &formattedTime + } + + entryPayloads = append(entryPayloads, EntryPayload{ + EntryIdentifier: entry.FeedIdentifier, + FeedIdentifier: entry.FeedIdentifier, + GUID: entry.GUID, + URL: entry.URL, + Title: entry.Title, + Author: entry.Author, + Summary: entry.Summary, + PublishedAt: publishedAtString, + EnclosureURL: entry.EnclosureURL, + EnclosureType: entry.EnclosureType, + }) + } + + payload := WebhookPayload{ + Event: "entries.created", + Timestamp: time.Now().UTC().Format(time.RFC3339), + Entries: entryPayloads, + } + + for _, subscriber := range subscribers { + webhookDispatcher.deliverWebhook(dispatchContext, subscriber, payload) + } +} + +func (webhookDispatcher *Dispatcher) findWebhookSubscribers( + queryContext context.Context, + feedIdentifier string, +) ([]webhookSubscriber, error) { + subscriberQuery := ` + SELECT up.id, up.webhook_url, up.webhook_secret + FROM subscriptions s + JOIN user_profiles up ON up.id = s.user_id + WHERE s.feed_id = $1 + AND up.tier = 'developer' + AND up.webhook_enabled = true + AND up.webhook_url IS NOT NULL + ` + + rows, queryError := webhookDispatcher.databaseConnectionPool.Query( + queryContext, + subscriberQuery, + feedIdentifier, + ) + + if queryError != nil { + return nil, fmt.Errorf("failed to query webhook subscribers: %w", queryError) + } + + defer rows.Close() + + subscribers := make([]webhookSubscriber, 0) + + for rows.Next() { + var subscriber webhookSubscriber + + scanError := rows.Scan( + &subscriber.UserIdentifier, + &subscriber.WebhookURL, + &subscriber.WebhookSecret, + ) + + if scanError != nil { + return nil, fmt.Errorf("failed to scan subscriber row: %w", scanError) + } + + subscribers = append(subscribers, subscriber) + } + + return subscribers, rows.Err() +} + +func (webhookDispatcher *Dispatcher) deliverWebhook( + deliveryContext context.Context, + subscriber webhookSubscriber, + payload WebhookPayload, +) { + urlValidationError := fetcher.ValidateFeedURL(subscriber.WebhookURL) + + if urlValidationError != nil { + webhookDispatcher.logger.Warn( + "webhook URL failed SSRF validation", + "user_identifier", subscriber.UserIdentifier, + "webhook_url", subscriber.WebhookURL, + "error", urlValidationError, + ) + + return + } + + payloadBytes, marshalError := json.Marshal(payload) + + if marshalError != nil { + webhookDispatcher.logger.Error( + "failed to marshal webhook payload", + "user_identifier", subscriber.UserIdentifier, + "error", marshalError, + ) + + return + } + + retryDelays := []time.Duration{0, 1 * time.Second, 4 * time.Second, 16 * time.Second} + var lastError error + + for attemptIndex, delay := range retryDelays { + if delay > 0 { + select { + case <-time.After(delay): + case <-deliveryContext.Done(): + return + } + } + + deliveryError := webhookDispatcher.sendWebhookRequest( + deliveryContext, + subscriber, + payloadBytes, + ) + + if deliveryError == nil { + if attemptIndex > 0 { + webhookDispatcher.logger.Info( + "webhook delivered after retry", + "user_identifier", subscriber.UserIdentifier, + "attempt", attemptIndex+1, + ) + } + + webhookDispatcher.resetConsecutiveFailures(deliveryContext, subscriber.UserIdentifier) + + return + } + + lastError = deliveryError + + webhookDispatcher.logger.Warn( + "webhook delivery attempt failed", + "user_identifier", subscriber.UserIdentifier, + "attempt", attemptIndex+1, + "error", deliveryError, + ) + } + + webhookDispatcher.logger.Error( + "webhook delivery failed after all retries", + "user_identifier", subscriber.UserIdentifier, + "error", lastError, + ) + + webhookDispatcher.incrementConsecutiveFailures(deliveryContext, subscriber.UserIdentifier) +} + +func (webhookDispatcher *Dispatcher) sendWebhookRequest( + requestContext context.Context, + subscriber webhookSubscriber, + payloadBytes []byte, +) error { + request, requestCreationError := http.NewRequestWithContext( + requestContext, + http.MethodPost, + subscriber.WebhookURL, + bytes.NewReader(payloadBytes), + ) + + if requestCreationError != nil { + return fmt.Errorf("failed to create webhook request: %w", requestCreationError) + } + + request.Header.Set("Content-Type", "application/json") + request.Header.Set("User-Agent", "asa.news Webhook/1.0") + + if subscriber.WebhookSecret != nil && *subscriber.WebhookSecret != "" { + mac := hmac.New(sha256.New, []byte(*subscriber.WebhookSecret)) + mac.Write(payloadBytes) + signature := hex.EncodeToString(mac.Sum(nil)) + request.Header.Set("X-Asa-Signature-256", "sha256="+signature) + } + + response, requestError := webhookDispatcher.httpClient.Do(request) + + if requestError != nil { + return fmt.Errorf("webhook request failed: %w", requestError) + } + + defer response.Body.Close() + + if response.StatusCode >= 200 && response.StatusCode < 300 { + return nil + } + + return fmt.Errorf("webhook returned status %d", response.StatusCode) +} + +func (webhookDispatcher *Dispatcher) resetConsecutiveFailures( + updateContext context.Context, + userIdentifier string, +) { + _, updateError := webhookDispatcher.databaseConnectionPool.Exec( + updateContext, + "UPDATE user_profiles SET webhook_consecutive_failures = 0 WHERE id = $1", + userIdentifier, + ) + + if updateError != nil { + webhookDispatcher.logger.Error( + "failed to reset webhook consecutive failures", + "user_identifier", userIdentifier, + "error", updateError, + ) + } +} + +func (webhookDispatcher *Dispatcher) incrementConsecutiveFailures( + updateContext context.Context, + userIdentifier string, +) { + maximumConsecutiveFailures := 10 + + _, updateError := webhookDispatcher.databaseConnectionPool.Exec( + updateContext, + `UPDATE user_profiles + SET webhook_consecutive_failures = webhook_consecutive_failures + 1, + webhook_enabled = CASE + WHEN webhook_consecutive_failures + 1 >= $2 THEN false + ELSE webhook_enabled + END + WHERE id = $1`, + userIdentifier, + maximumConsecutiveFailures, + ) + + if updateError != nil { + webhookDispatcher.logger.Error( + "failed to increment webhook consecutive failures", + "user_identifier", userIdentifier, + "error", updateError, + ) + } +} |