summaryrefslogtreecommitdiff
path: root/services/worker/internal/webhook/webhook.go
diff options
context:
space:
mode:
authorFuwn <[email protected]>2026-02-07 01:42:57 -0800
committerFuwn <[email protected]>2026-02-07 01:42:57 -0800
commit5c5b1993edd890a80870ee05607ac5f088191d4e (patch)
treea721b76bcd49ba10826c53efc87302c7a689512f /services/worker/internal/webhook/webhook.go
downloadasa.news-5c5b1993edd890a80870ee05607ac5f088191d4e.tar.xz
asa.news-5c5b1993edd890a80870ee05607ac5f088191d4e.zip
feat: asa.news RSS reader with developer tier, REST API, and webhooks
Full-stack RSS reader SaaS: Supabase + Next.js + Go worker. Includes three subscription tiers (free/pro/developer), API key auth, read-only REST API, webhook push notifications, Stripe billing with proration, and PWA support.
Diffstat (limited to 'services/worker/internal/webhook/webhook.go')
-rw-r--r--services/worker/internal/webhook/webhook.go333
1 files changed, 333 insertions, 0 deletions
diff --git a/services/worker/internal/webhook/webhook.go b/services/worker/internal/webhook/webhook.go
new file mode 100644
index 0000000..c812820
--- /dev/null
+++ b/services/worker/internal/webhook/webhook.go
@@ -0,0 +1,333 @@
+package webhook
+
+import (
+ "bytes"
+ "context"
+ "crypto/hmac"
+ "crypto/sha256"
+ "encoding/hex"
+ "encoding/json"
+ "fmt"
+ "github.com/Fuwn/asa-news/internal/fetcher"
+ "github.com/Fuwn/asa-news/internal/model"
+ "github.com/jackc/pgx/v5/pgxpool"
+ "log/slog"
+ "net/http"
+ "time"
+)
+
+type Dispatcher struct {
+ databaseConnectionPool *pgxpool.Pool
+ httpClient *http.Client
+ logger *slog.Logger
+}
+
+type webhookSubscriber struct {
+ UserIdentifier string
+ WebhookURL string
+ WebhookSecret *string
+}
+
+type EntryPayload struct {
+ EntryIdentifier string `json:"entryIdentifier"`
+ FeedIdentifier string `json:"feedIdentifier"`
+ GUID string `json:"guid"`
+ URL *string `json:"url"`
+ Title *string `json:"title"`
+ Author *string `json:"author"`
+ Summary *string `json:"summary"`
+ PublishedAt *string `json:"publishedAt"`
+ EnclosureURL *string `json:"enclosureUrl"`
+ EnclosureType *string `json:"enclosureType"`
+}
+
+type WebhookPayload struct {
+ Event string `json:"event"`
+ Timestamp string `json:"timestamp"`
+ Entries []EntryPayload `json:"entries"`
+}
+
+func NewDispatcher(
+ databaseConnectionPool *pgxpool.Pool,
+ logger *slog.Logger,
+) *Dispatcher {
+ return &Dispatcher{
+ databaseConnectionPool: databaseConnectionPool,
+ httpClient: &http.Client{
+ Timeout: 10 * time.Second,
+ },
+ logger: logger,
+ }
+}
+
+func (webhookDispatcher *Dispatcher) DispatchForFeed(
+ dispatchContext context.Context,
+ feedIdentifier string,
+ entries []model.FeedEntry,
+) {
+ subscribers, queryError := webhookDispatcher.findWebhookSubscribers(dispatchContext, feedIdentifier)
+
+ if queryError != nil {
+ webhookDispatcher.logger.Error(
+ "failed to query webhook subscribers",
+ "feed_identifier", feedIdentifier,
+ "error", queryError,
+ )
+
+ return
+ }
+
+ if len(subscribers) == 0 {
+ return
+ }
+
+ entryPayloads := make([]EntryPayload, 0, len(entries))
+
+ for _, entry := range entries {
+ var publishedAtString *string
+
+ if entry.PublishedAt != nil {
+ formattedTime := entry.PublishedAt.Format(time.RFC3339)
+ publishedAtString = &formattedTime
+ }
+
+ entryPayloads = append(entryPayloads, EntryPayload{
+ EntryIdentifier: entry.FeedIdentifier,
+ FeedIdentifier: entry.FeedIdentifier,
+ GUID: entry.GUID,
+ URL: entry.URL,
+ Title: entry.Title,
+ Author: entry.Author,
+ Summary: entry.Summary,
+ PublishedAt: publishedAtString,
+ EnclosureURL: entry.EnclosureURL,
+ EnclosureType: entry.EnclosureType,
+ })
+ }
+
+ payload := WebhookPayload{
+ Event: "entries.created",
+ Timestamp: time.Now().UTC().Format(time.RFC3339),
+ Entries: entryPayloads,
+ }
+
+ for _, subscriber := range subscribers {
+ webhookDispatcher.deliverWebhook(dispatchContext, subscriber, payload)
+ }
+}
+
+func (webhookDispatcher *Dispatcher) findWebhookSubscribers(
+ queryContext context.Context,
+ feedIdentifier string,
+) ([]webhookSubscriber, error) {
+ subscriberQuery := `
+ SELECT up.id, up.webhook_url, up.webhook_secret
+ FROM subscriptions s
+ JOIN user_profiles up ON up.id = s.user_id
+ WHERE s.feed_id = $1
+ AND up.tier = 'developer'
+ AND up.webhook_enabled = true
+ AND up.webhook_url IS NOT NULL
+ `
+
+ rows, queryError := webhookDispatcher.databaseConnectionPool.Query(
+ queryContext,
+ subscriberQuery,
+ feedIdentifier,
+ )
+
+ if queryError != nil {
+ return nil, fmt.Errorf("failed to query webhook subscribers: %w", queryError)
+ }
+
+ defer rows.Close()
+
+ subscribers := make([]webhookSubscriber, 0)
+
+ for rows.Next() {
+ var subscriber webhookSubscriber
+
+ scanError := rows.Scan(
+ &subscriber.UserIdentifier,
+ &subscriber.WebhookURL,
+ &subscriber.WebhookSecret,
+ )
+
+ if scanError != nil {
+ return nil, fmt.Errorf("failed to scan subscriber row: %w", scanError)
+ }
+
+ subscribers = append(subscribers, subscriber)
+ }
+
+ return subscribers, rows.Err()
+}
+
+func (webhookDispatcher *Dispatcher) deliverWebhook(
+ deliveryContext context.Context,
+ subscriber webhookSubscriber,
+ payload WebhookPayload,
+) {
+ urlValidationError := fetcher.ValidateFeedURL(subscriber.WebhookURL)
+
+ if urlValidationError != nil {
+ webhookDispatcher.logger.Warn(
+ "webhook URL failed SSRF validation",
+ "user_identifier", subscriber.UserIdentifier,
+ "webhook_url", subscriber.WebhookURL,
+ "error", urlValidationError,
+ )
+
+ return
+ }
+
+ payloadBytes, marshalError := json.Marshal(payload)
+
+ if marshalError != nil {
+ webhookDispatcher.logger.Error(
+ "failed to marshal webhook payload",
+ "user_identifier", subscriber.UserIdentifier,
+ "error", marshalError,
+ )
+
+ return
+ }
+
+ retryDelays := []time.Duration{0, 1 * time.Second, 4 * time.Second, 16 * time.Second}
+ var lastError error
+
+ for attemptIndex, delay := range retryDelays {
+ if delay > 0 {
+ select {
+ case <-time.After(delay):
+ case <-deliveryContext.Done():
+ return
+ }
+ }
+
+ deliveryError := webhookDispatcher.sendWebhookRequest(
+ deliveryContext,
+ subscriber,
+ payloadBytes,
+ )
+
+ if deliveryError == nil {
+ if attemptIndex > 0 {
+ webhookDispatcher.logger.Info(
+ "webhook delivered after retry",
+ "user_identifier", subscriber.UserIdentifier,
+ "attempt", attemptIndex+1,
+ )
+ }
+
+ webhookDispatcher.resetConsecutiveFailures(deliveryContext, subscriber.UserIdentifier)
+
+ return
+ }
+
+ lastError = deliveryError
+
+ webhookDispatcher.logger.Warn(
+ "webhook delivery attempt failed",
+ "user_identifier", subscriber.UserIdentifier,
+ "attempt", attemptIndex+1,
+ "error", deliveryError,
+ )
+ }
+
+ webhookDispatcher.logger.Error(
+ "webhook delivery failed after all retries",
+ "user_identifier", subscriber.UserIdentifier,
+ "error", lastError,
+ )
+
+ webhookDispatcher.incrementConsecutiveFailures(deliveryContext, subscriber.UserIdentifier)
+}
+
+func (webhookDispatcher *Dispatcher) sendWebhookRequest(
+ requestContext context.Context,
+ subscriber webhookSubscriber,
+ payloadBytes []byte,
+) error {
+ request, requestCreationError := http.NewRequestWithContext(
+ requestContext,
+ http.MethodPost,
+ subscriber.WebhookURL,
+ bytes.NewReader(payloadBytes),
+ )
+
+ if requestCreationError != nil {
+ return fmt.Errorf("failed to create webhook request: %w", requestCreationError)
+ }
+
+ request.Header.Set("Content-Type", "application/json")
+ request.Header.Set("User-Agent", "asa.news Webhook/1.0")
+
+ if subscriber.WebhookSecret != nil && *subscriber.WebhookSecret != "" {
+ mac := hmac.New(sha256.New, []byte(*subscriber.WebhookSecret))
+ mac.Write(payloadBytes)
+ signature := hex.EncodeToString(mac.Sum(nil))
+ request.Header.Set("X-Asa-Signature-256", "sha256="+signature)
+ }
+
+ response, requestError := webhookDispatcher.httpClient.Do(request)
+
+ if requestError != nil {
+ return fmt.Errorf("webhook request failed: %w", requestError)
+ }
+
+ defer response.Body.Close()
+
+ if response.StatusCode >= 200 && response.StatusCode < 300 {
+ return nil
+ }
+
+ return fmt.Errorf("webhook returned status %d", response.StatusCode)
+}
+
+func (webhookDispatcher *Dispatcher) resetConsecutiveFailures(
+ updateContext context.Context,
+ userIdentifier string,
+) {
+ _, updateError := webhookDispatcher.databaseConnectionPool.Exec(
+ updateContext,
+ "UPDATE user_profiles SET webhook_consecutive_failures = 0 WHERE id = $1",
+ userIdentifier,
+ )
+
+ if updateError != nil {
+ webhookDispatcher.logger.Error(
+ "failed to reset webhook consecutive failures",
+ "user_identifier", userIdentifier,
+ "error", updateError,
+ )
+ }
+}
+
+func (webhookDispatcher *Dispatcher) incrementConsecutiveFailures(
+ updateContext context.Context,
+ userIdentifier string,
+) {
+ maximumConsecutiveFailures := 10
+
+ _, updateError := webhookDispatcher.databaseConnectionPool.Exec(
+ updateContext,
+ `UPDATE user_profiles
+ SET webhook_consecutive_failures = webhook_consecutive_failures + 1,
+ webhook_enabled = CASE
+ WHEN webhook_consecutive_failures + 1 >= $2 THEN false
+ ELSE webhook_enabled
+ END
+ WHERE id = $1`,
+ userIdentifier,
+ maximumConsecutiveFailures,
+ )
+
+ if updateError != nil {
+ webhookDispatcher.logger.Error(
+ "failed to increment webhook consecutive failures",
+ "user_identifier", userIdentifier,
+ "error", updateError,
+ )
+ }
+}