diff options
| author | Fuwn <[email protected]> | 2026-02-07 01:42:57 -0800 |
|---|---|---|
| committer | Fuwn <[email protected]> | 2026-02-07 01:42:57 -0800 |
| commit | 5c5b1993edd890a80870ee05607ac5f088191d4e (patch) | |
| tree | a721b76bcd49ba10826c53efc87302c7a689512f /services/worker/internal | |
| download | asa.news-5c5b1993edd890a80870ee05607ac5f088191d4e.tar.xz asa.news-5c5b1993edd890a80870ee05607ac5f088191d4e.zip | |
feat: asa.news RSS reader with developer tier, REST API, and webhooks
Full-stack RSS reader SaaS: Supabase + Next.js + Go worker.
Includes three subscription tiers (free/pro/developer), API key auth,
read-only REST API, webhook push notifications, Stripe billing with
proration, and PWA support.
Diffstat (limited to 'services/worker/internal')
| -rw-r--r-- | services/worker/internal/configuration/configuration.go | 110 | ||||
| -rw-r--r-- | services/worker/internal/database/database.go | 39 | ||||
| -rw-r--r-- | services/worker/internal/fetcher/authentication.go | 43 | ||||
| -rw-r--r-- | services/worker/internal/fetcher/errors.go | 145 | ||||
| -rw-r--r-- | services/worker/internal/fetcher/fetcher.go | 116 | ||||
| -rw-r--r-- | services/worker/internal/fetcher/ssrf_protection.go | 77 | ||||
| -rw-r--r-- | services/worker/internal/health/health.go | 117 | ||||
| -rw-r--r-- | services/worker/internal/model/feed.go | 41 | ||||
| -rw-r--r-- | services/worker/internal/model/queue.go | 5 | ||||
| -rw-r--r-- | services/worker/internal/parser/parser.go | 234 | ||||
| -rw-r--r-- | services/worker/internal/pool/pool.go | 60 | ||||
| -rw-r--r-- | services/worker/internal/scheduler/refresh.go | 196 | ||||
| -rw-r--r-- | services/worker/internal/scheduler/scheduler.go | 283 | ||||
| -rw-r--r-- | services/worker/internal/webhook/webhook.go | 333 | ||||
| -rw-r--r-- | services/worker/internal/writer/writer.go | 222 |
15 files changed, 2021 insertions, 0 deletions
diff --git a/services/worker/internal/configuration/configuration.go b/services/worker/internal/configuration/configuration.go new file mode 100644 index 0000000..84a5995 --- /dev/null +++ b/services/worker/internal/configuration/configuration.go @@ -0,0 +1,110 @@ +package configuration + +import ( + "fmt" + "os" + "strconv" + "time" +) + +type Configuration struct { + DatabaseURL string + WorkerConcurrency int + PollInterval time.Duration + FetchTimeout time.Duration + QueuePollInterval time.Duration + BatchSize int + HealthPort int + EncryptionKey string + LogLevel string + LogJSON bool +} + +func Load() (Configuration, error) { + databaseURL := os.Getenv("DATABASE_URL") + + if databaseURL == "" { + return Configuration{}, fmt.Errorf("DATABASE_URL is required") + } + + workerConcurrency := getEnvironmentInteger("WORKER_CONCURRENCY", 10) + pollInterval := getEnvironmentDuration("POLL_INTERVAL", 30*time.Second) + fetchTimeout := getEnvironmentDuration("FETCH_TIMEOUT", 30*time.Second) + queuePollInterval := getEnvironmentDuration("QUEUE_POLL_INTERVAL", 5*time.Second) + batchSize := getEnvironmentInteger("BATCH_SIZE", 50) + healthPort := getEnvironmentInteger("HEALTH_PORT", 8080) + encryptionKey := os.Getenv("ENCRYPTION_KEY") + logLevel := getEnvironmentString("LOG_LEVEL", "info") + logJSON := getEnvironmentBoolean("LOG_JSON", false) + + return Configuration{ + DatabaseURL: databaseURL, + WorkerConcurrency: workerConcurrency, + PollInterval: pollInterval, + FetchTimeout: fetchTimeout, + QueuePollInterval: queuePollInterval, + BatchSize: batchSize, + HealthPort: healthPort, + EncryptionKey: encryptionKey, + LogLevel: logLevel, + LogJSON: logJSON, + }, nil +} + +func getEnvironmentString(key string, defaultValue string) string { + value := os.Getenv(key) + + if value == "" { + return defaultValue + } + + return value +} + +func getEnvironmentInteger(key string, defaultValue int) int { + raw := os.Getenv(key) + + if raw == "" { + return defaultValue + } + + parsed, parseError := strconv.Atoi(raw) + + if parseError != nil { + return defaultValue + } + + return parsed +} + +func getEnvironmentDuration(key string, defaultValue time.Duration) time.Duration { + raw := os.Getenv(key) + + if raw == "" { + return defaultValue + } + + parsed, parseError := time.ParseDuration(raw) + + if parseError != nil { + return defaultValue + } + + return parsed +} + +func getEnvironmentBoolean(key string, defaultValue bool) bool { + raw := os.Getenv(key) + + if raw == "" { + return defaultValue + } + + parsed, parseError := strconv.ParseBool(raw) + + if parseError != nil { + return defaultValue + } + + return parsed +} diff --git a/services/worker/internal/database/database.go b/services/worker/internal/database/database.go new file mode 100644 index 0000000..2b2900f --- /dev/null +++ b/services/worker/internal/database/database.go @@ -0,0 +1,39 @@ +package database + +import ( + "context" + "fmt" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" + "time" +) + +func CreateConnectionPool(parentContext context.Context, databaseURL string) (*pgxpool.Pool, error) { + poolConfiguration, parseError := pgxpool.ParseConfig(databaseURL) + + if parseError != nil { + return nil, fmt.Errorf("failed to parse database URL: %w", parseError) + } + + poolConfiguration.MaxConns = 25 + poolConfiguration.MinConns = 5 + poolConfiguration.MaxConnLifetime = 30 * time.Minute + poolConfiguration.MaxConnIdleTime = 5 * time.Minute + poolConfiguration.HealthCheckPeriod = 30 * time.Second + poolConfiguration.ConnConfig.DefaultQueryExecMode = pgx.QueryExecModeExec + connectionPool, connectionError := pgxpool.NewWithConfig(parentContext, poolConfiguration) + + if connectionError != nil { + return nil, fmt.Errorf("failed to create connection pool: %w", connectionError) + } + + pingError := connectionPool.Ping(parentContext) + + if pingError != nil { + connectionPool.Close() + + return nil, fmt.Errorf("failed to ping database: %w", pingError) + } + + return connectionPool, nil +} diff --git a/services/worker/internal/fetcher/authentication.go b/services/worker/internal/fetcher/authentication.go new file mode 100644 index 0000000..ba10196 --- /dev/null +++ b/services/worker/internal/fetcher/authentication.go @@ -0,0 +1,43 @@ +package fetcher + +import ( + "encoding/base64" + "fmt" + "net/http" + "net/url" +) + +type AuthenticationConfiguration struct { + AuthenticationType string + AuthenticationValue string +} + +func ApplyAuthentication(request *http.Request, authenticationConfig AuthenticationConfiguration) error { + switch authenticationConfig.AuthenticationType { + case "bearer": + request.Header.Set("Authorization", "Bearer "+authenticationConfig.AuthenticationValue) + case "basic": + encodedCredentials := base64.StdEncoding.EncodeToString([]byte(authenticationConfig.AuthenticationValue)) + + request.Header.Set("Authorization", "Basic "+encodedCredentials) + case "query_param": + existingURL, parseError := url.Parse(request.URL.String()) + + if parseError != nil { + return fmt.Errorf("failed to parse request URL for query param authentication: %w", parseError) + } + + queryParameters := existingURL.Query() + + queryParameters.Set("key", authenticationConfig.AuthenticationValue) + + existingURL.RawQuery = queryParameters.Encode() + request.URL = existingURL + case "": + return nil + default: + return fmt.Errorf("unsupported authentication type: %s", authenticationConfig.AuthenticationType) + } + + return nil +} diff --git a/services/worker/internal/fetcher/errors.go b/services/worker/internal/fetcher/errors.go new file mode 100644 index 0000000..abf5553 --- /dev/null +++ b/services/worker/internal/fetcher/errors.go @@ -0,0 +1,145 @@ +package fetcher + +import ( + "errors" + "fmt" + "net" + "net/url" + "strings" +) + +type FetchError struct { + StatusCode int + UserMessage string + Retryable bool + UnderlyingError error +} + +func (fetchError *FetchError) Error() string { + if fetchError.UnderlyingError != nil { + return fmt.Sprintf("%s: %v", fetchError.UserMessage, fetchError.UnderlyingError) + } + + return fetchError.UserMessage +} + +func (fetchError *FetchError) Unwrap() error { + return fetchError.UnderlyingError +} + +func ClassifyError(originalError error, statusCode int) *FetchError { + if statusCode >= 400 { + return classifyHTTPStatusCode(statusCode, originalError) + } + + return classifyNetworkError(originalError) +} + +func classifyHTTPStatusCode(statusCode int, originalError error) *FetchError { + switch statusCode { + case 401: + return &FetchError{ + StatusCode: statusCode, + UserMessage: "authentication required - check feed credentials", + Retryable: false, + UnderlyingError: originalError, + } + case 403: + return &FetchError{ + StatusCode: statusCode, + UserMessage: "access forbidden - feed may require different credentials", + Retryable: false, + UnderlyingError: originalError, + } + case 404: + return &FetchError{ + StatusCode: statusCode, + UserMessage: "feed not found - URL may be incorrect or feed removed", + Retryable: false, + UnderlyingError: originalError, + } + case 410: + return &FetchError{ + StatusCode: statusCode, + UserMessage: "feed permanently removed", + Retryable: false, + UnderlyingError: originalError, + } + case 429: + return &FetchError{ + StatusCode: statusCode, + UserMessage: "rate limited by feed server - will retry later", + Retryable: true, + UnderlyingError: originalError, + } + case 500, 502, 503, 504: + return &FetchError{ + StatusCode: statusCode, + UserMessage: fmt.Sprintf("feed server error (HTTP %d) - will retry later", statusCode), + Retryable: true, + UnderlyingError: originalError, + } + default: + return &FetchError{ + StatusCode: statusCode, + UserMessage: fmt.Sprintf("unexpected HTTP status %d", statusCode), + Retryable: statusCode >= 500, + UnderlyingError: originalError, + } + } +} + +func classifyNetworkError(originalError error) *FetchError { + if originalError == nil { + return &FetchError{ + UserMessage: "unknown fetch error", + Retryable: true, + } + } + + var dnsError *net.DNSError + + if errors.As(originalError, &dnsError) { + return &FetchError{ + UserMessage: "DNS resolution failed - check feed URL", + Retryable: !dnsError.IsNotFound, + UnderlyingError: originalError, + } + } + + var urlError *url.Error + + if errors.As(originalError, &urlError) { + if urlError.Timeout() { + return &FetchError{ + UserMessage: "connection timed out - feed server may be slow", + Retryable: true, + UnderlyingError: originalError, + } + } + } + + var netOpError *net.OpError + + if errors.As(originalError, &netOpError) { + return &FetchError{ + UserMessage: "network connection error - will retry later", + Retryable: true, + UnderlyingError: originalError, + } + } + + if strings.Contains(originalError.Error(), "certificate") || strings.Contains(originalError.Error(), "tls") { + return &FetchError{ + UserMessage: "TLS/certificate error - feed server has invalid certificate", + Retryable: false, + UnderlyingError: originalError, + } + } + + return &FetchError{ + UserMessage: "unexpected network error", + Retryable: true, + UnderlyingError: originalError, + } +} diff --git a/services/worker/internal/fetcher/fetcher.go b/services/worker/internal/fetcher/fetcher.go new file mode 100644 index 0000000..019bd39 --- /dev/null +++ b/services/worker/internal/fetcher/fetcher.go @@ -0,0 +1,116 @@ +package fetcher + +import ( + "context" + "fmt" + "io" + "net/http" + "time" +) + +type FetchResult struct { + Body []byte + StatusCode int + EntityTag string + LastModifiedHeader string + NotModified bool +} + +type Fetcher struct { + httpClient *http.Client +} + +func NewFetcher(fetchTimeout time.Duration) *Fetcher { + return &Fetcher{ + httpClient: &http.Client{ + Timeout: fetchTimeout, + CheckRedirect: func(request *http.Request, previousRequests []*http.Request) error { + if len(previousRequests) >= 5 { + return fmt.Errorf("too many redirects (exceeded 5)") + } + + redirectValidationError := ValidateRedirectTarget(request.URL.String()) + if redirectValidationError != nil { + return fmt.Errorf("blocked redirect to reserved address: %w", redirectValidationError) + } + + return nil + }, + }, + } +} + +func (feedFetcher *Fetcher) Fetch( + requestContext context.Context, + feedURL string, + previousEntityTag string, + previousLastModified string, + authenticationConfig AuthenticationConfiguration, +) (*FetchResult, error) { + urlValidationError := ValidateFeedURL(feedURL) + if urlValidationError != nil { + return nil, fmt.Errorf("blocked request to disallowed URL: %w", urlValidationError) + } + + request, requestCreationError := http.NewRequestWithContext(requestContext, http.MethodGet, feedURL, nil) + + if requestCreationError != nil { + return nil, fmt.Errorf("failed to create HTTP request: %w", requestCreationError) + } + + request.Header.Set("User-Agent", "asa.news Feed Worker/1.0") + request.Header.Set("Accept", "application/rss+xml, application/atom+xml, application/xml, text/xml, */*") + + if previousEntityTag != "" { + request.Header.Set("If-None-Match", previousEntityTag) + } + + if previousLastModified != "" { + request.Header.Set("If-Modified-Since", previousLastModified) + } + + authenticationError := ApplyAuthentication(request, authenticationConfig) + + if authenticationError != nil { + return nil, fmt.Errorf("failed to apply authentication: %w", authenticationError) + } + + response, requestError := feedFetcher.httpClient.Do(request) + + if requestError != nil { + classifiedError := ClassifyError(requestError, 0) + + return nil, classifiedError + } + + defer response.Body.Close() + + if response.StatusCode == http.StatusNotModified { + return &FetchResult{ + StatusCode: response.StatusCode, + EntityTag: response.Header.Get("ETag"), + LastModifiedHeader: response.Header.Get("Last-Modified"), + NotModified: true, + }, nil + } + + if response.StatusCode >= 400 { + classifiedError := ClassifyError(nil, response.StatusCode) + + return nil, classifiedError + } + + responseBody, readError := io.ReadAll(io.LimitReader(response.Body, 10*1024*1024)) + + if readError != nil { + return nil, fmt.Errorf("failed to read response body: %w", readError) + } + + return &FetchResult{ + Body: responseBody, + StatusCode: response.StatusCode, + EntityTag: response.Header.Get("ETag"), + LastModifiedHeader: response.Header.Get("Last-Modified"), + NotModified: false, + }, nil +} diff --git a/services/worker/internal/fetcher/ssrf_protection.go b/services/worker/internal/fetcher/ssrf_protection.go new file mode 100644 index 0000000..1887e78 --- /dev/null +++ b/services/worker/internal/fetcher/ssrf_protection.go @@ -0,0 +1,77 @@ +package fetcher + +import ( + "context" + "fmt" + "net" + "net/url" + "strings" + "time" +) + +var reservedNetworks = []net.IPNet{ + {IP: net.IPv4(10, 0, 0, 0), Mask: net.CIDRMask(8, 32)}, + {IP: net.IPv4(172, 16, 0, 0), Mask: net.CIDRMask(12, 32)}, + {IP: net.IPv4(192, 168, 0, 0), Mask: net.CIDRMask(16, 32)}, + {IP: net.IPv4(127, 0, 0, 0), Mask: net.CIDRMask(8, 32)}, + {IP: net.IPv4(169, 254, 0, 0), Mask: net.CIDRMask(16, 32)}, + {IP: net.IPv4(0, 0, 0, 0), Mask: net.CIDRMask(8, 32)}, + {IP: net.ParseIP("::1"), Mask: net.CIDRMask(128, 128)}, + {IP: net.ParseIP("fc00::"), Mask: net.CIDRMask(7, 128)}, + {IP: net.ParseIP("fe80::"), Mask: net.CIDRMask(10, 128)}, +} + +func isReservedAddress(ipAddress net.IP) bool { + for _, network := range reservedNetworks { + if network.Contains(ipAddress) { + return true + } + } + + return false +} + +func ValidateFeedURL(feedURL string) error { + parsedURL, parseError := url.Parse(feedURL) + if parseError != nil { + return fmt.Errorf("invalid URL: %w", parseError) + } + + scheme := strings.ToLower(parsedURL.Scheme) + if scheme != "http" && scheme != "https" { + return fmt.Errorf("unsupported scheme %q: only http and https are allowed", parsedURL.Scheme) + } + + hostname := parsedURL.Hostname() + if hostname == "" { + return fmt.Errorf("URL has no hostname") + } + + if parsedIP := net.ParseIP(hostname); parsedIP != nil { + if isReservedAddress(parsedIP) { + return fmt.Errorf("feed URL resolves to a reserved IP address") + } + + return nil + } + + resolverContext, cancelResolver := context.WithTimeout(context.Background(), 5*time.Second) + defer cancelResolver() + + resolvedAddresses, lookupError := net.DefaultResolver.LookupIPAddr(resolverContext, hostname) + if lookupError != nil { + return fmt.Errorf("failed to resolve hostname %q: %w", hostname, lookupError) + } + + for _, resolvedAddress := range resolvedAddresses { + if isReservedAddress(resolvedAddress.IP) { + return fmt.Errorf("feed URL resolves to a reserved IP address") + } + } + + return nil +} + +func ValidateRedirectTarget(redirectURL string) error { + return ValidateFeedURL(redirectURL) +} diff --git a/services/worker/internal/health/health.go b/services/worker/internal/health/health.go new file mode 100644 index 0000000..0a29c92 --- /dev/null +++ b/services/worker/internal/health/health.go @@ -0,0 +1,117 @@ +package health + +import ( + "context" + "encoding/json" + "fmt" + "github.com/jackc/pgx/v5/pgxpool" + "log/slog" + "net/http" + "time" +) + +type HealthStatus struct { + Status string `json:"status"` + Timestamp string `json:"timestamp"` + Database string `json:"database"` +} + +type HealthServer struct { + httpServer *http.Server + databaseConnectionPool *pgxpool.Pool + logger *slog.Logger +} + +func NewHealthServer( + healthPort int, + databaseConnectionPool *pgxpool.Pool, + logger *slog.Logger, +) *HealthServer { + healthServer := &HealthServer{ + databaseConnectionPool: databaseConnectionPool, + logger: logger, + } + serveMux := http.NewServeMux() + + serveMux.HandleFunc("/health", healthServer.handleHealthCheck) + serveMux.HandleFunc("/ready", healthServer.handleReadinessCheck) + + healthServer.httpServer = &http.Server{ + Addr: fmt.Sprintf(":%d", healthPort), + Handler: serveMux, + ReadTimeout: 5 * time.Second, + WriteTimeout: 5 * time.Second, + IdleTimeout: 30 * time.Second, + } + + return healthServer +} + +func (healthServer *HealthServer) Start() { + go func() { + healthServer.logger.Info("health server starting", "address", healthServer.httpServer.Addr) + + listenError := healthServer.httpServer.ListenAndServe() + + if listenError != nil && listenError != http.ErrServerClosed { + healthServer.logger.Error("health server failed", "error", listenError) + } + }() +} + +func (healthServer *HealthServer) Stop(shutdownContext context.Context) error { + return healthServer.httpServer.Shutdown(shutdownContext) +} + +func (healthServer *HealthServer) handleHealthCheck(responseWriter http.ResponseWriter, request *http.Request) { + healthStatus := HealthStatus{ + Status: "healthy", + Timestamp: time.Now().UTC().Format(time.RFC3339), + Database: "unknown", + } + pingContext, cancelPing := context.WithTimeout(request.Context(), 2*time.Second) + + defer cancelPing() + + pingError := healthServer.databaseConnectionPool.Ping(pingContext) + + if pingError != nil { + healthStatus.Status = "unhealthy" + healthStatus.Database = "unreachable" + + respondWithJSON(responseWriter, http.StatusServiceUnavailable, healthStatus) + + return + } + + healthStatus.Database = "connected" + + respondWithJSON(responseWriter, http.StatusOK, healthStatus) +} + +func (healthServer *HealthServer) handleReadinessCheck(responseWriter http.ResponseWriter, request *http.Request) { + pingContext, cancelPing := context.WithTimeout(request.Context(), 2*time.Second) + + defer cancelPing() + + pingError := healthServer.databaseConnectionPool.Ping(pingContext) + + if pingError != nil { + responseWriter.WriteHeader(http.StatusServiceUnavailable) + + return + } + + responseWriter.WriteHeader(http.StatusOK) +} + +func respondWithJSON(responseWriter http.ResponseWriter, statusCode int, payload interface{}) { + responseWriter.Header().Set("Content-Type", "application/json") + responseWriter.WriteHeader(statusCode) + + encodingError := json.NewEncoder(responseWriter).Encode(payload) + + if encodingError != nil { + return + } +} diff --git a/services/worker/internal/model/feed.go b/services/worker/internal/model/feed.go new file mode 100644 index 0000000..611c820 --- /dev/null +++ b/services/worker/internal/model/feed.go @@ -0,0 +1,41 @@ +package model + +import ( + "time" +) + +type Feed struct { + Identifier string + URL string + SiteURL *string + Title *string + FeedType *string + Visibility string + EntityTag *string + LastModified *string + LastFetchedAt *time.Time + LastFetchError *string + ConsecutiveFailures int + NextFetchAt time.Time + FetchIntervalSeconds int + CreatedAt time.Time + UpdatedAt time.Time +} + +type FeedEntry struct { + FeedIdentifier string + OwnerIdentifier *string + GUID string + URL *string + Title *string + Author *string + Summary *string + ContentHTML *string + ContentText *string + ImageURL *string + PublishedAt *time.Time + WordCount *int + EnclosureURL *string + EnclosureType *string + EnclosureLength *int64 +} diff --git a/services/worker/internal/model/queue.go b/services/worker/internal/model/queue.go new file mode 100644 index 0000000..a5aaa7c --- /dev/null +++ b/services/worker/internal/model/queue.go @@ -0,0 +1,5 @@ +package model + +type RefreshRequest struct { + FeedIdentifier string `json:"feed_id"` +} diff --git a/services/worker/internal/parser/parser.go b/services/worker/internal/parser/parser.go new file mode 100644 index 0000000..1fb2f76 --- /dev/null +++ b/services/worker/internal/parser/parser.go @@ -0,0 +1,234 @@ +package parser + +import ( + "crypto/sha256" + "fmt" + "github.com/Fuwn/asa-news/internal/model" + "github.com/mmcdole/gofeed" + "strconv" + "strings" + "time" + "unicode/utf8" +) + +type Parser struct { + gofeedParser *gofeed.Parser +} + +func NewParser() *Parser { + return &Parser{ + gofeedParser: gofeed.NewParser(), + } +} + +type ParseResult struct { + Entries []model.FeedEntry + FeedTitle string + SiteURL string + AudioEnclosureRatio float64 +} + +func (feedParser *Parser) Parse(feedIdentifier string, ownerIdentifier *string, rawFeedContent []byte) (*ParseResult, error) { + parsedFeed, parseError := feedParser.gofeedParser.ParseString(string(rawFeedContent)) + + if parseError != nil { + return nil, fmt.Errorf("failed to parse feed content: %w", parseError) + } + + feedEntries := make([]model.FeedEntry, 0, len(parsedFeed.Items)) + + audioEnclosureCount := 0 + + for _, feedItem := range parsedFeed.Items { + normalizedEntry := normalizeFeedItem(feedIdentifier, ownerIdentifier, feedItem) + if normalizedEntry.EnclosureURL != nil { + audioEnclosureCount++ + } + feedEntries = append(feedEntries, normalizedEntry) + } + + audioEnclosureRatio := 0.0 + if len(feedEntries) > 0 { + audioEnclosureRatio = float64(audioEnclosureCount) / float64(len(feedEntries)) + } + + return &ParseResult{ + Entries: feedEntries, + FeedTitle: strings.TrimSpace(parsedFeed.Title), + SiteURL: strings.TrimSpace(parsedFeed.Link), + AudioEnclosureRatio: audioEnclosureRatio, + }, nil +} + +func normalizeFeedItem(feedIdentifier string, ownerIdentifier *string, feedItem *gofeed.Item) model.FeedEntry { + globallyUniqueIdentifier := resolveGloballyUniqueIdentifier(feedItem) + entryURL := stringPointerOrNil(resolveEntryURL(feedItem)) + entryTitle := stringPointerOrNil(strings.TrimSpace(feedItem.Title)) + entrySummary := stringPointerOrNil(strings.TrimSpace(feedItem.Description)) + entryContentHTML := stringPointerOrNil(resolveContentHTML(feedItem)) + entryContentText := resolveContentText(feedItem) + authorName := stringPointerOrNil(resolveAuthorName(feedItem)) + publishedAt := resolvePublishedDate(feedItem) + entryImageURL := stringPointerOrNil(resolveImageURL(feedItem)) + wordCount := countWords(entryContentText) + enclosureURL, enclosureType, enclosureLength := resolveAudioEnclosure(feedItem) + + return model.FeedEntry{ + FeedIdentifier: feedIdentifier, + OwnerIdentifier: ownerIdentifier, + GUID: globallyUniqueIdentifier, + URL: entryURL, + Title: entryTitle, + Author: authorName, + Summary: entrySummary, + ContentHTML: entryContentHTML, + ContentText: stringPointerOrNil(entryContentText), + ImageURL: entryImageURL, + PublishedAt: publishedAt, + WordCount: wordCount, + EnclosureURL: enclosureURL, + EnclosureType: enclosureType, + EnclosureLength: enclosureLength, + } +} + +func stringPointerOrNil(value string) *string { + if value == "" { + return nil + } + + return &value +} + +func resolveGloballyUniqueIdentifier(feedItem *gofeed.Item) string { + if feedItem.GUID != "" { + return feedItem.GUID + } + + if feedItem.Link != "" { + return feedItem.Link + } + + hashInput := feedItem.Title + feedItem.Description + hashBytes := sha256.Sum256([]byte(hashInput)) + + return fmt.Sprintf("sha256:%x", hashBytes) +} + +func resolveEntryURL(feedItem *gofeed.Item) string { + if feedItem.Link != "" { + return feedItem.Link + } + + if feedItem.GUID != "" && strings.HasPrefix(feedItem.GUID, "http") { + return feedItem.GUID + } + + return "" +} + +func resolveContentHTML(feedItem *gofeed.Item) string { + if feedItem.Content != "" { + return feedItem.Content + } + + return feedItem.Description +} + +func resolveContentText(feedItem *gofeed.Item) string { + contentSource := feedItem.Content + + if contentSource == "" { + contentSource = feedItem.Description + } + + return stripHTMLTags(contentSource) +} + +func resolveAuthorName(feedItem *gofeed.Item) string { + if feedItem.Author != nil && feedItem.Author.Name != "" { + return feedItem.Author.Name + } + + if len(feedItem.Authors) > 0 && feedItem.Authors[0].Name != "" { + return feedItem.Authors[0].Name + } + + return "" +} + +func resolvePublishedDate(feedItem *gofeed.Item) *time.Time { + if feedItem.PublishedParsed != nil { + return feedItem.PublishedParsed + } + + if feedItem.UpdatedParsed != nil { + return feedItem.UpdatedParsed + } + + return nil +} + +func resolveAudioEnclosure(feedItem *gofeed.Item) (*string, *string, *int64) { + if feedItem.Enclosures == nil { + return nil, nil, nil + } + + for _, enclosure := range feedItem.Enclosures { + if strings.HasPrefix(enclosure.Type, "audio/") && enclosure.URL != "" { + enclosureURL := enclosure.URL + enclosureType := enclosure.Type + + var enclosureLength *int64 + if enclosure.Length != "" { + if parsedLength, parseError := strconv.ParseInt(enclosure.Length, 10, 64); parseError == nil { + enclosureLength = &parsedLength + } + } + + return &enclosureURL, &enclosureType, enclosureLength + } + } + + return nil, nil, nil +} + +func resolveImageURL(feedItem *gofeed.Item) string { + if feedItem.Image != nil && feedItem.Image.URL != "" { + return feedItem.Image.URL + } + + return "" +} + +func countWords(plainText string) *int { + if plainText == "" { + return nil + } + + count := len(strings.Fields(plainText)) + + return &count +} + +func stripHTMLTags(htmlContent string) string { + var resultBuilder strings.Builder + + insideTag := false + + for characterIndex := 0; characterIndex < len(htmlContent); { + currentRune, runeSize := utf8.DecodeRuneInString(htmlContent[characterIndex:]) + + if currentRune == '<' { + insideTag = true + } else if currentRune == '>' { + insideTag = false + } else if !insideTag { + resultBuilder.WriteRune(currentRune) + } + + characterIndex += runeSize + } + + return strings.TrimSpace(resultBuilder.String()) +} diff --git a/services/worker/internal/pool/pool.go b/services/worker/internal/pool/pool.go new file mode 100644 index 0000000..7df03e2 --- /dev/null +++ b/services/worker/internal/pool/pool.go @@ -0,0 +1,60 @@ +package pool + +import ( + "context" + "log/slog" + "sync" +) + +type WorkFunction func(workContext context.Context) + +type WorkerPool struct { + concurrencyLimit int + semaphoreChannel chan struct{} + waitGroup sync.WaitGroup + logger *slog.Logger +} + +func NewWorkerPool(concurrencyLimit int, logger *slog.Logger) *WorkerPool { + return &WorkerPool{ + concurrencyLimit: concurrencyLimit, + semaphoreChannel: make(chan struct{}, concurrencyLimit), + logger: logger, + } +} + +func (workerPool *WorkerPool) Submit(workContext context.Context, workFunction WorkFunction) bool { + select { + case workerPool.semaphoreChannel <- struct{}{}: + workerPool.waitGroup.Add(1) + + go func() { + defer workerPool.waitGroup.Done() + defer func() { <-workerPool.semaphoreChannel }() + defer func() { + recoveredPanic := recover() + + if recoveredPanic != nil { + workerPool.logger.Error( + "worker panic recovered", + "panic_value", recoveredPanic, + ) + } + }() + + workFunction(workContext) + }() + + return true + case <-workContext.Done(): + return false + } +} + +func (workerPool *WorkerPool) Wait() { + workerPool.waitGroup.Wait() +} + +func (workerPool *WorkerPool) ActiveWorkerCount() int { + return len(workerPool.semaphoreChannel) +} diff --git a/services/worker/internal/scheduler/refresh.go b/services/worker/internal/scheduler/refresh.go new file mode 100644 index 0000000..8271fa0 --- /dev/null +++ b/services/worker/internal/scheduler/refresh.go @@ -0,0 +1,196 @@ +package scheduler + +import ( + "context" + "github.com/Fuwn/asa-news/internal/fetcher" + "github.com/Fuwn/asa-news/internal/model" + "github.com/Fuwn/asa-news/internal/parser" + "github.com/Fuwn/asa-news/internal/webhook" + "github.com/Fuwn/asa-news/internal/writer" + "log/slog" +) + +func ProcessRefreshRequest( + refreshContext context.Context, + feed model.Feed, + feedFetcher *fetcher.Fetcher, + feedParser *parser.Parser, + feedWriter *writer.Writer, + webhookDispatcher *webhook.Dispatcher, + logger *slog.Logger, +) { + logger.Info( + "processing feed refresh", + "feed_identifier", feed.Identifier, + "feed_url", feed.URL, + ) + + var ownerIdentifier *string + + if feed.Visibility == "authenticated" { + logger.Warn( + "authenticated feed refresh not yet implemented", + "feed_identifier", feed.Identifier, + ) + + return + } + + authenticationConfig := fetcher.AuthenticationConfiguration{} + + entityTag := "" + + if feed.EntityTag != nil { + entityTag = *feed.EntityTag + } + + lastModified := "" + + if feed.LastModified != nil { + lastModified = *feed.LastModified + } + + fetchResult, fetchError := feedFetcher.Fetch( + refreshContext, + feed.URL, + entityTag, + lastModified, + authenticationConfig, + ) + + if fetchError != nil { + logger.Warn( + "feed fetch failed", + "feed_identifier", feed.Identifier, + "error", fetchError, + ) + + recordError := feedWriter.RecordFeedError(refreshContext, feed.Identifier, fetchError.Error()) + + if recordError != nil { + logger.Error( + "failed to record feed error", + "feed_identifier", feed.Identifier, + "error", recordError, + ) + } + + return + } + + if fetchResult.NotModified { + logger.Info( + "feed not modified", + "feed_identifier", feed.Identifier, + ) + + metadataError := feedWriter.UpdateFeedMetadata( + refreshContext, + feed.Identifier, + fetchResult.EntityTag, + fetchResult.LastModifiedHeader, + "", + "", + ) + + if metadataError != nil { + logger.Error( + "failed to update feed metadata after not-modified", + "feed_identifier", feed.Identifier, + "error", metadataError, + ) + } + + return + } + + parseResult, parseError := feedParser.Parse(feed.Identifier, ownerIdentifier, fetchResult.Body) + + if parseError != nil { + logger.Warn( + "feed parse failed", + "feed_identifier", feed.Identifier, + "error", parseError, + ) + + recordError := feedWriter.RecordFeedError(refreshContext, feed.Identifier, parseError.Error()) + + if recordError != nil { + logger.Error( + "failed to record parse error", + "feed_identifier", feed.Identifier, + "error", recordError, + ) + } + + return + } + + rowsAffected, writeError := feedWriter.WriteEntries(refreshContext, parseResult.Entries) + + if writeError != nil { + logger.Error( + "failed to write feed entries", + "feed_identifier", feed.Identifier, + "error", writeError, + ) + + recordError := feedWriter.RecordFeedError(refreshContext, feed.Identifier, writeError.Error()) + + if recordError != nil { + logger.Error( + "failed to record write error", + "feed_identifier", feed.Identifier, + "error", recordError, + ) + } + + return + } + + if rowsAffected > 0 && webhookDispatcher != nil { + webhookDispatcher.DispatchForFeed(refreshContext, feed.Identifier, parseResult.Entries) + } + + metadataError := feedWriter.UpdateFeedMetadata( + refreshContext, + feed.Identifier, + fetchResult.EntityTag, + fetchResult.LastModifiedHeader, + parseResult.FeedTitle, + parseResult.SiteURL, + ) + + if metadataError != nil { + logger.Error( + "failed to update feed metadata", + "feed_identifier", feed.Identifier, + "error", metadataError, + ) + } + + if parseResult.AudioEnclosureRatio > 0.5 { + if feedTypeError := feedWriter.UpdateFeedType(refreshContext, feed.Identifier, "podcast"); feedTypeError != nil { + logger.Error( + "failed to update feed type to podcast", + "feed_identifier", feed.Identifier, + "error", feedTypeError, + ) + } + } else if feed.FeedType != nil && *feed.FeedType == "podcast" { + if feedTypeError := feedWriter.UpdateFeedType(refreshContext, feed.Identifier, "rss"); feedTypeError != nil { + logger.Error( + "failed to clear podcast feed type", + "feed_identifier", feed.Identifier, + "error", feedTypeError, + ) + } + } + + logger.Info( + "feed refresh completed", + "feed_identifier", feed.Identifier, + "entries_parsed", len(parseResult.Entries), + "rows_affected", rowsAffected, + ) +} diff --git a/services/worker/internal/scheduler/scheduler.go b/services/worker/internal/scheduler/scheduler.go new file mode 100644 index 0000000..646e263 --- /dev/null +++ b/services/worker/internal/scheduler/scheduler.go @@ -0,0 +1,283 @@ +package scheduler + +import ( + "context" + "encoding/json" + "fmt" + "github.com/Fuwn/asa-news/internal/fetcher" + "github.com/Fuwn/asa-news/internal/model" + "github.com/Fuwn/asa-news/internal/parser" + "github.com/Fuwn/asa-news/internal/pool" + "github.com/Fuwn/asa-news/internal/webhook" + "github.com/Fuwn/asa-news/internal/writer" + pgmq "github.com/craigpastro/pgmq-go" + "github.com/jackc/pgx/v5/pgxpool" + "log/slog" + "time" +) + +type Scheduler struct { + databaseConnectionPool *pgxpool.Pool + feedFetcher *fetcher.Fetcher + feedParser *parser.Parser + feedWriter *writer.Writer + webhookDispatcher *webhook.Dispatcher + workerPool *pool.WorkerPool + pollInterval time.Duration + queuePollInterval time.Duration + batchSize int + logger *slog.Logger +} + +func NewScheduler( + databaseConnectionPool *pgxpool.Pool, + feedFetcher *fetcher.Fetcher, + feedParser *parser.Parser, + feedWriter *writer.Writer, + webhookDispatcher *webhook.Dispatcher, + workerPool *pool.WorkerPool, + pollInterval time.Duration, + queuePollInterval time.Duration, + batchSize int, + logger *slog.Logger, +) *Scheduler { + return &Scheduler{ + databaseConnectionPool: databaseConnectionPool, + feedFetcher: feedFetcher, + feedParser: feedParser, + feedWriter: feedWriter, + webhookDispatcher: webhookDispatcher, + workerPool: workerPool, + pollInterval: pollInterval, + queuePollInterval: queuePollInterval, + batchSize: batchSize, + logger: logger, + } +} + +func (feedScheduler *Scheduler) Run(schedulerContext context.Context) { + pollTicker := time.NewTicker(feedScheduler.pollInterval) + + defer pollTicker.Stop() + + queueTicker := time.NewTicker(feedScheduler.queuePollInterval) + + defer queueTicker.Stop() + + feedScheduler.logger.Info("scheduler started", + "poll_interval", feedScheduler.pollInterval, + "queue_poll_interval", feedScheduler.queuePollInterval, + "batch_size", feedScheduler.batchSize, + ) + feedScheduler.executePollCycle(schedulerContext) + feedScheduler.executeQueueCycle(schedulerContext) + + for { + select { + case <-schedulerContext.Done(): + feedScheduler.logger.Info("scheduler shutting down") + + return + case <-pollTicker.C: + feedScheduler.executePollCycle(schedulerContext) + case <-queueTicker.C: + feedScheduler.executeQueueCycle(schedulerContext) + } + } +} + +func (feedScheduler *Scheduler) executePollCycle(cycleContext context.Context) { + claimedFeeds, claimError := feedScheduler.claimDueFeeds(cycleContext) + + if claimError != nil { + feedScheduler.logger.Error("failed to claim due feeds", "error", claimError) + + return + } + + if len(claimedFeeds) == 0 { + return + } + + feedScheduler.logger.Info("claimed feeds for processing", "count", len(claimedFeeds)) + + for _, claimedFeed := range claimedFeeds { + capturedFeed := claimedFeed + + feedScheduler.workerPool.Submit(cycleContext, func(workContext context.Context) { + ProcessRefreshRequest( + workContext, + capturedFeed, + feedScheduler.feedFetcher, + feedScheduler.feedParser, + feedScheduler.feedWriter, + feedScheduler.webhookDispatcher, + feedScheduler.logger, + ) + }) + } +} + +func (feedScheduler *Scheduler) executeQueueCycle(cycleContext context.Context) { + queueMessage, readError := pgmq.Read(cycleContext, feedScheduler.databaseConnectionPool, "feed_refresh", 30) + + if readError != nil { + return + } + + if queueMessage == nil { + return + } + + var refreshRequest model.RefreshRequest + + unmarshalError := json.Unmarshal(queueMessage.Message, &refreshRequest) + + if unmarshalError != nil { + feedScheduler.logger.Error("failed to unmarshal refresh request", "error", unmarshalError) + + return + } + + feed, lookupError := feedScheduler.lookupFeed(cycleContext, refreshRequest.FeedIdentifier) + + if lookupError != nil { + feedScheduler.logger.Error( + "failed to look up feed for queue request", + "feed_identifier", refreshRequest.FeedIdentifier, + "error", lookupError, + ) + + return + } + + capturedMessageIdentifier := queueMessage.MsgID + + feedScheduler.workerPool.Submit(cycleContext, func(workContext context.Context) { + ProcessRefreshRequest( + workContext, + feed, + feedScheduler.feedFetcher, + feedScheduler.feedParser, + feedScheduler.feedWriter, + feedScheduler.webhookDispatcher, + feedScheduler.logger, + ) + + _, archiveError := pgmq.Archive(workContext, feedScheduler.databaseConnectionPool, "feed_refresh", capturedMessageIdentifier) + + if archiveError != nil { + feedScheduler.logger.Error( + "failed to archive queue message", + "message_id", capturedMessageIdentifier, + "error", archiveError, + ) + } + }) +} + +func (feedScheduler *Scheduler) claimDueFeeds(claimContext context.Context) ([]model.Feed, error) { + claimQuery := ` + UPDATE feeds + SET last_fetched_at = NOW() + WHERE id IN ( + SELECT id + FROM feeds + WHERE next_fetch_at <= NOW() + AND subscriber_count > 0 + ORDER BY next_fetch_at ASC + LIMIT $1 + FOR UPDATE SKIP LOCKED + ) + RETURNING + id, url, site_url, title, feed_type, + visibility, etag, last_modified, + last_fetched_at, last_fetch_error, + consecutive_failures, next_fetch_at, + fetch_interval_seconds, + created_at, updated_at + ` + rows, queryError := feedScheduler.databaseConnectionPool.Query(claimContext, claimQuery, feedScheduler.batchSize) + + if queryError != nil { + return nil, fmt.Errorf("failed to query due feeds: %w", queryError) + } + + defer rows.Close() + + claimedFeeds := make([]model.Feed, 0) + + for rows.Next() { + var feed model.Feed + + scanError := rows.Scan( + &feed.Identifier, + &feed.URL, + &feed.SiteURL, + &feed.Title, + &feed.FeedType, + &feed.Visibility, + &feed.EntityTag, + &feed.LastModified, + &feed.LastFetchedAt, + &feed.LastFetchError, + &feed.ConsecutiveFailures, + &feed.NextFetchAt, + &feed.FetchIntervalSeconds, + &feed.CreatedAt, + &feed.UpdatedAt, + ) + + if scanError != nil { + return nil, fmt.Errorf("failed to scan feed row: %w", scanError) + } + + claimedFeeds = append(claimedFeeds, feed) + } + + if rows.Err() != nil { + return nil, fmt.Errorf("error iterating feed rows: %w", rows.Err()) + } + + return claimedFeeds, nil +} + +func (feedScheduler *Scheduler) lookupFeed(lookupContext context.Context, feedIdentifier string) (model.Feed, error) { + lookupQuery := ` + SELECT + id, url, site_url, title, feed_type, + visibility, etag, last_modified, + last_fetched_at, last_fetch_error, + consecutive_failures, next_fetch_at, + fetch_interval_seconds, + created_at, updated_at + FROM feeds + WHERE id = $1 + ` + + var feed model.Feed + + scanError := feedScheduler.databaseConnectionPool.QueryRow(lookupContext, lookupQuery, feedIdentifier).Scan( + &feed.Identifier, + &feed.URL, + &feed.SiteURL, + &feed.Title, + &feed.FeedType, + &feed.Visibility, + &feed.EntityTag, + &feed.LastModified, + &feed.LastFetchedAt, + &feed.LastFetchError, + &feed.ConsecutiveFailures, + &feed.NextFetchAt, + &feed.FetchIntervalSeconds, + &feed.CreatedAt, + &feed.UpdatedAt, + ) + + if scanError != nil { + return model.Feed{}, fmt.Errorf("failed to look up feed %s: %w", feedIdentifier, scanError) + } + + return feed, nil +} diff --git a/services/worker/internal/webhook/webhook.go b/services/worker/internal/webhook/webhook.go new file mode 100644 index 0000000..c812820 --- /dev/null +++ b/services/worker/internal/webhook/webhook.go @@ -0,0 +1,333 @@ +package webhook + +import ( + "bytes" + "context" + "crypto/hmac" + "crypto/sha256" + "encoding/hex" + "encoding/json" + "fmt" + "github.com/Fuwn/asa-news/internal/fetcher" + "github.com/Fuwn/asa-news/internal/model" + "github.com/jackc/pgx/v5/pgxpool" + "log/slog" + "net/http" + "time" +) + +type Dispatcher struct { + databaseConnectionPool *pgxpool.Pool + httpClient *http.Client + logger *slog.Logger +} + +type webhookSubscriber struct { + UserIdentifier string + WebhookURL string + WebhookSecret *string +} + +type EntryPayload struct { + EntryIdentifier string `json:"entryIdentifier"` + FeedIdentifier string `json:"feedIdentifier"` + GUID string `json:"guid"` + URL *string `json:"url"` + Title *string `json:"title"` + Author *string `json:"author"` + Summary *string `json:"summary"` + PublishedAt *string `json:"publishedAt"` + EnclosureURL *string `json:"enclosureUrl"` + EnclosureType *string `json:"enclosureType"` +} + +type WebhookPayload struct { + Event string `json:"event"` + Timestamp string `json:"timestamp"` + Entries []EntryPayload `json:"entries"` +} + +func NewDispatcher( + databaseConnectionPool *pgxpool.Pool, + logger *slog.Logger, +) *Dispatcher { + return &Dispatcher{ + databaseConnectionPool: databaseConnectionPool, + httpClient: &http.Client{ + Timeout: 10 * time.Second, + }, + logger: logger, + } +} + +func (webhookDispatcher *Dispatcher) DispatchForFeed( + dispatchContext context.Context, + feedIdentifier string, + entries []model.FeedEntry, +) { + subscribers, queryError := webhookDispatcher.findWebhookSubscribers(dispatchContext, feedIdentifier) + + if queryError != nil { + webhookDispatcher.logger.Error( + "failed to query webhook subscribers", + "feed_identifier", feedIdentifier, + "error", queryError, + ) + + return + } + + if len(subscribers) == 0 { + return + } + + entryPayloads := make([]EntryPayload, 0, len(entries)) + + for _, entry := range entries { + var publishedAtString *string + + if entry.PublishedAt != nil { + formattedTime := entry.PublishedAt.Format(time.RFC3339) + publishedAtString = &formattedTime + } + + entryPayloads = append(entryPayloads, EntryPayload{ + EntryIdentifier: entry.FeedIdentifier, + FeedIdentifier: entry.FeedIdentifier, + GUID: entry.GUID, + URL: entry.URL, + Title: entry.Title, + Author: entry.Author, + Summary: entry.Summary, + PublishedAt: publishedAtString, + EnclosureURL: entry.EnclosureURL, + EnclosureType: entry.EnclosureType, + }) + } + + payload := WebhookPayload{ + Event: "entries.created", + Timestamp: time.Now().UTC().Format(time.RFC3339), + Entries: entryPayloads, + } + + for _, subscriber := range subscribers { + webhookDispatcher.deliverWebhook(dispatchContext, subscriber, payload) + } +} + +func (webhookDispatcher *Dispatcher) findWebhookSubscribers( + queryContext context.Context, + feedIdentifier string, +) ([]webhookSubscriber, error) { + subscriberQuery := ` + SELECT up.id, up.webhook_url, up.webhook_secret + FROM subscriptions s + JOIN user_profiles up ON up.id = s.user_id + WHERE s.feed_id = $1 + AND up.tier = 'developer' + AND up.webhook_enabled = true + AND up.webhook_url IS NOT NULL + ` + + rows, queryError := webhookDispatcher.databaseConnectionPool.Query( + queryContext, + subscriberQuery, + feedIdentifier, + ) + + if queryError != nil { + return nil, fmt.Errorf("failed to query webhook subscribers: %w", queryError) + } + + defer rows.Close() + + subscribers := make([]webhookSubscriber, 0) + + for rows.Next() { + var subscriber webhookSubscriber + + scanError := rows.Scan( + &subscriber.UserIdentifier, + &subscriber.WebhookURL, + &subscriber.WebhookSecret, + ) + + if scanError != nil { + return nil, fmt.Errorf("failed to scan subscriber row: %w", scanError) + } + + subscribers = append(subscribers, subscriber) + } + + return subscribers, rows.Err() +} + +func (webhookDispatcher *Dispatcher) deliverWebhook( + deliveryContext context.Context, + subscriber webhookSubscriber, + payload WebhookPayload, +) { + urlValidationError := fetcher.ValidateFeedURL(subscriber.WebhookURL) + + if urlValidationError != nil { + webhookDispatcher.logger.Warn( + "webhook URL failed SSRF validation", + "user_identifier", subscriber.UserIdentifier, + "webhook_url", subscriber.WebhookURL, + "error", urlValidationError, + ) + + return + } + + payloadBytes, marshalError := json.Marshal(payload) + + if marshalError != nil { + webhookDispatcher.logger.Error( + "failed to marshal webhook payload", + "user_identifier", subscriber.UserIdentifier, + "error", marshalError, + ) + + return + } + + retryDelays := []time.Duration{0, 1 * time.Second, 4 * time.Second, 16 * time.Second} + var lastError error + + for attemptIndex, delay := range retryDelays { + if delay > 0 { + select { + case <-time.After(delay): + case <-deliveryContext.Done(): + return + } + } + + deliveryError := webhookDispatcher.sendWebhookRequest( + deliveryContext, + subscriber, + payloadBytes, + ) + + if deliveryError == nil { + if attemptIndex > 0 { + webhookDispatcher.logger.Info( + "webhook delivered after retry", + "user_identifier", subscriber.UserIdentifier, + "attempt", attemptIndex+1, + ) + } + + webhookDispatcher.resetConsecutiveFailures(deliveryContext, subscriber.UserIdentifier) + + return + } + + lastError = deliveryError + + webhookDispatcher.logger.Warn( + "webhook delivery attempt failed", + "user_identifier", subscriber.UserIdentifier, + "attempt", attemptIndex+1, + "error", deliveryError, + ) + } + + webhookDispatcher.logger.Error( + "webhook delivery failed after all retries", + "user_identifier", subscriber.UserIdentifier, + "error", lastError, + ) + + webhookDispatcher.incrementConsecutiveFailures(deliveryContext, subscriber.UserIdentifier) +} + +func (webhookDispatcher *Dispatcher) sendWebhookRequest( + requestContext context.Context, + subscriber webhookSubscriber, + payloadBytes []byte, +) error { + request, requestCreationError := http.NewRequestWithContext( + requestContext, + http.MethodPost, + subscriber.WebhookURL, + bytes.NewReader(payloadBytes), + ) + + if requestCreationError != nil { + return fmt.Errorf("failed to create webhook request: %w", requestCreationError) + } + + request.Header.Set("Content-Type", "application/json") + request.Header.Set("User-Agent", "asa.news Webhook/1.0") + + if subscriber.WebhookSecret != nil && *subscriber.WebhookSecret != "" { + mac := hmac.New(sha256.New, []byte(*subscriber.WebhookSecret)) + mac.Write(payloadBytes) + signature := hex.EncodeToString(mac.Sum(nil)) + request.Header.Set("X-Asa-Signature-256", "sha256="+signature) + } + + response, requestError := webhookDispatcher.httpClient.Do(request) + + if requestError != nil { + return fmt.Errorf("webhook request failed: %w", requestError) + } + + defer response.Body.Close() + + if response.StatusCode >= 200 && response.StatusCode < 300 { + return nil + } + + return fmt.Errorf("webhook returned status %d", response.StatusCode) +} + +func (webhookDispatcher *Dispatcher) resetConsecutiveFailures( + updateContext context.Context, + userIdentifier string, +) { + _, updateError := webhookDispatcher.databaseConnectionPool.Exec( + updateContext, + "UPDATE user_profiles SET webhook_consecutive_failures = 0 WHERE id = $1", + userIdentifier, + ) + + if updateError != nil { + webhookDispatcher.logger.Error( + "failed to reset webhook consecutive failures", + "user_identifier", userIdentifier, + "error", updateError, + ) + } +} + +func (webhookDispatcher *Dispatcher) incrementConsecutiveFailures( + updateContext context.Context, + userIdentifier string, +) { + maximumConsecutiveFailures := 10 + + _, updateError := webhookDispatcher.databaseConnectionPool.Exec( + updateContext, + `UPDATE user_profiles + SET webhook_consecutive_failures = webhook_consecutive_failures + 1, + webhook_enabled = CASE + WHEN webhook_consecutive_failures + 1 >= $2 THEN false + ELSE webhook_enabled + END + WHERE id = $1`, + userIdentifier, + maximumConsecutiveFailures, + ) + + if updateError != nil { + webhookDispatcher.logger.Error( + "failed to increment webhook consecutive failures", + "user_identifier", userIdentifier, + "error", updateError, + ) + } +} diff --git a/services/worker/internal/writer/writer.go b/services/worker/internal/writer/writer.go new file mode 100644 index 0000000..de81bfa --- /dev/null +++ b/services/worker/internal/writer/writer.go @@ -0,0 +1,222 @@ +package writer + +import ( + "context" + "fmt" + "github.com/Fuwn/asa-news/internal/model" + "github.com/jackc/pgx/v5/pgxpool" + "strings" + "time" +) + +type Writer struct { + databaseConnectionPool *pgxpool.Pool +} + +func NewWriter(databaseConnectionPool *pgxpool.Pool) *Writer { + return &Writer{ + databaseConnectionPool: databaseConnectionPool, + } +} + +func (feedWriter *Writer) WriteEntries(writeContext context.Context, feedEntries []model.FeedEntry) (int64, error) { + if len(feedEntries) == 0 { + return 0, nil + } + + valueParameterPlaceholders := make([]string, 0, len(feedEntries)) + queryArguments := make([]interface{}, 0, len(feedEntries)*15) + + for entryIndex, entry := range feedEntries { + parameterOffset := entryIndex * 15 + valueParameterPlaceholders = append( + valueParameterPlaceholders, + fmt.Sprintf( + "($%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d)", + parameterOffset+1, parameterOffset+2, parameterOffset+3, + parameterOffset+4, parameterOffset+5, parameterOffset+6, + parameterOffset+7, parameterOffset+8, parameterOffset+9, + parameterOffset+10, parameterOffset+11, parameterOffset+12, + parameterOffset+13, parameterOffset+14, parameterOffset+15, + ), + ) + queryArguments = append( + queryArguments, + entry.FeedIdentifier, + entry.OwnerIdentifier, + entry.GUID, + entry.URL, + entry.Title, + entry.Author, + entry.Summary, + entry.ContentHTML, + entry.ContentText, + entry.ImageURL, + entry.PublishedAt, + entry.WordCount, + entry.EnclosureURL, + entry.EnclosureType, + entry.EnclosureLength, + ) + } + + isPublicEntry := feedEntries[0].OwnerIdentifier == nil + + var conflictClause string + + if isPublicEntry { + conflictClause = "ON CONFLICT (feed_id, guid) WHERE owner_id IS NULL" + } else { + conflictClause = "ON CONFLICT (feed_id, owner_id, guid) WHERE owner_id IS NOT NULL" + } + + upsertQuery := fmt.Sprintf(` + INSERT INTO entries ( + feed_id, owner_id, guid, url, title, author, + summary, content_html, content_text, image_url, + published_at, word_count, + enclosure_url, enclosure_type, enclosure_length + ) + VALUES %s + %s + DO UPDATE SET + url = EXCLUDED.url, + title = EXCLUDED.title, + author = EXCLUDED.author, + summary = CASE + WHEN EXISTS (SELECT 1 FROM user_highlights WHERE entry_id = entries.id) + THEN entries.summary + ELSE EXCLUDED.summary + END, + content_html = CASE + WHEN EXISTS (SELECT 1 FROM user_highlights WHERE entry_id = entries.id) + THEN entries.content_html + ELSE EXCLUDED.content_html + END, + content_text = CASE + WHEN EXISTS (SELECT 1 FROM user_highlights WHERE entry_id = entries.id) + THEN entries.content_text + ELSE EXCLUDED.content_text + END, + image_url = EXCLUDED.image_url, + published_at = EXCLUDED.published_at, + word_count = EXCLUDED.word_count, + enclosure_url = EXCLUDED.enclosure_url, + enclosure_type = EXCLUDED.enclosure_type, + enclosure_length = EXCLUDED.enclosure_length + WHERE + entries.title IS DISTINCT FROM EXCLUDED.title + OR entries.content_html IS DISTINCT FROM EXCLUDED.content_html + OR entries.enclosure_url IS DISTINCT FROM EXCLUDED.enclosure_url + `, strings.Join(valueParameterPlaceholders, ", "), conflictClause) + commandTag, executeError := feedWriter.databaseConnectionPool.Exec(writeContext, upsertQuery, queryArguments...) + + if executeError != nil { + return 0, fmt.Errorf("failed to upsert feed entries: %w", executeError) + } + + return commandTag.RowsAffected(), nil +} + +func (feedWriter *Writer) UpdateFeedMetadata( + updateContext context.Context, + feedIdentifier string, + entityTag string, + lastModified string, + feedTitle string, + siteURL string, +) error { + currentTime := time.Now().UTC() + + var titleParameter *string + if feedTitle != "" { + titleParameter = &feedTitle + } + + var siteURLParameter *string + if siteURL != "" { + siteURLParameter = &siteURL + } + + updateQuery := ` + UPDATE feeds + SET + last_fetched_at = $1::timestamptz, + etag = $2, + last_modified = $3, + consecutive_failures = 0, + last_fetch_error = NULL, + last_fetch_error_at = NULL, + next_fetch_at = $1::timestamptz + (fetch_interval_seconds * INTERVAL '1 second'), + title = COALESCE($5, title), + site_url = COALESCE($6, site_url) + WHERE id = $4 + ` + _, executeError := feedWriter.databaseConnectionPool.Exec( + updateContext, + updateQuery, + currentTime, + entityTag, + lastModified, + feedIdentifier, + titleParameter, + siteURLParameter, + ) + + if executeError != nil { + return fmt.Errorf("failed to update feed metadata: %w", executeError) + } + + return nil +} + +func (feedWriter *Writer) UpdateFeedType( + updateContext context.Context, + feedIdentifier string, + feedType string, +) error { + updateQuery := `UPDATE feeds SET feed_type = $1 WHERE id = $2` + _, executeError := feedWriter.databaseConnectionPool.Exec( + updateContext, + updateQuery, + feedType, + feedIdentifier, + ) + + if executeError != nil { + return fmt.Errorf("failed to update feed type: %w", executeError) + } + + return nil +} + +func (feedWriter *Writer) RecordFeedError( + updateContext context.Context, + feedIdentifier string, + errorMessage string, +) error { + currentTime := time.Now().UTC() + updateQuery := ` + UPDATE feeds + SET + last_fetched_at = $1::timestamptz, + last_fetch_error = $2, + last_fetch_error_at = $1::timestamptz, + consecutive_failures = consecutive_failures + 1, + next_fetch_at = $1::timestamptz + (LEAST(POWER(2, consecutive_failures) * 60, 21600) * INTERVAL '1 second') + WHERE id = $3 + ` + _, executeError := feedWriter.databaseConnectionPool.Exec( + updateContext, + updateQuery, + currentTime, + errorMessage, + feedIdentifier, + ) + + if executeError != nil { + return fmt.Errorf("failed to record feed error: %w", executeError) + } + + return nil +} |