summaryrefslogtreecommitdiff
path: root/services/worker/internal
diff options
context:
space:
mode:
authorFuwn <[email protected]>2026-02-07 01:42:57 -0800
committerFuwn <[email protected]>2026-02-07 01:42:57 -0800
commit5c5b1993edd890a80870ee05607ac5f088191d4e (patch)
treea721b76bcd49ba10826c53efc87302c7a689512f /services/worker/internal
downloadasa.news-5c5b1993edd890a80870ee05607ac5f088191d4e.tar.xz
asa.news-5c5b1993edd890a80870ee05607ac5f088191d4e.zip
feat: asa.news RSS reader with developer tier, REST API, and webhooks
Full-stack RSS reader SaaS: Supabase + Next.js + Go worker. Includes three subscription tiers (free/pro/developer), API key auth, read-only REST API, webhook push notifications, Stripe billing with proration, and PWA support.
Diffstat (limited to 'services/worker/internal')
-rw-r--r--services/worker/internal/configuration/configuration.go110
-rw-r--r--services/worker/internal/database/database.go39
-rw-r--r--services/worker/internal/fetcher/authentication.go43
-rw-r--r--services/worker/internal/fetcher/errors.go145
-rw-r--r--services/worker/internal/fetcher/fetcher.go116
-rw-r--r--services/worker/internal/fetcher/ssrf_protection.go77
-rw-r--r--services/worker/internal/health/health.go117
-rw-r--r--services/worker/internal/model/feed.go41
-rw-r--r--services/worker/internal/model/queue.go5
-rw-r--r--services/worker/internal/parser/parser.go234
-rw-r--r--services/worker/internal/pool/pool.go60
-rw-r--r--services/worker/internal/scheduler/refresh.go196
-rw-r--r--services/worker/internal/scheduler/scheduler.go283
-rw-r--r--services/worker/internal/webhook/webhook.go333
-rw-r--r--services/worker/internal/writer/writer.go222
15 files changed, 2021 insertions, 0 deletions
diff --git a/services/worker/internal/configuration/configuration.go b/services/worker/internal/configuration/configuration.go
new file mode 100644
index 0000000..84a5995
--- /dev/null
+++ b/services/worker/internal/configuration/configuration.go
@@ -0,0 +1,110 @@
+package configuration
+
+import (
+ "fmt"
+ "os"
+ "strconv"
+ "time"
+)
+
+type Configuration struct {
+ DatabaseURL string
+ WorkerConcurrency int
+ PollInterval time.Duration
+ FetchTimeout time.Duration
+ QueuePollInterval time.Duration
+ BatchSize int
+ HealthPort int
+ EncryptionKey string
+ LogLevel string
+ LogJSON bool
+}
+
+func Load() (Configuration, error) {
+ databaseURL := os.Getenv("DATABASE_URL")
+
+ if databaseURL == "" {
+ return Configuration{}, fmt.Errorf("DATABASE_URL is required")
+ }
+
+ workerConcurrency := getEnvironmentInteger("WORKER_CONCURRENCY", 10)
+ pollInterval := getEnvironmentDuration("POLL_INTERVAL", 30*time.Second)
+ fetchTimeout := getEnvironmentDuration("FETCH_TIMEOUT", 30*time.Second)
+ queuePollInterval := getEnvironmentDuration("QUEUE_POLL_INTERVAL", 5*time.Second)
+ batchSize := getEnvironmentInteger("BATCH_SIZE", 50)
+ healthPort := getEnvironmentInteger("HEALTH_PORT", 8080)
+ encryptionKey := os.Getenv("ENCRYPTION_KEY")
+ logLevel := getEnvironmentString("LOG_LEVEL", "info")
+ logJSON := getEnvironmentBoolean("LOG_JSON", false)
+
+ return Configuration{
+ DatabaseURL: databaseURL,
+ WorkerConcurrency: workerConcurrency,
+ PollInterval: pollInterval,
+ FetchTimeout: fetchTimeout,
+ QueuePollInterval: queuePollInterval,
+ BatchSize: batchSize,
+ HealthPort: healthPort,
+ EncryptionKey: encryptionKey,
+ LogLevel: logLevel,
+ LogJSON: logJSON,
+ }, nil
+}
+
+func getEnvironmentString(key string, defaultValue string) string {
+ value := os.Getenv(key)
+
+ if value == "" {
+ return defaultValue
+ }
+
+ return value
+}
+
+func getEnvironmentInteger(key string, defaultValue int) int {
+ raw := os.Getenv(key)
+
+ if raw == "" {
+ return defaultValue
+ }
+
+ parsed, parseError := strconv.Atoi(raw)
+
+ if parseError != nil {
+ return defaultValue
+ }
+
+ return parsed
+}
+
+func getEnvironmentDuration(key string, defaultValue time.Duration) time.Duration {
+ raw := os.Getenv(key)
+
+ if raw == "" {
+ return defaultValue
+ }
+
+ parsed, parseError := time.ParseDuration(raw)
+
+ if parseError != nil {
+ return defaultValue
+ }
+
+ return parsed
+}
+
+func getEnvironmentBoolean(key string, defaultValue bool) bool {
+ raw := os.Getenv(key)
+
+ if raw == "" {
+ return defaultValue
+ }
+
+ parsed, parseError := strconv.ParseBool(raw)
+
+ if parseError != nil {
+ return defaultValue
+ }
+
+ return parsed
+}
diff --git a/services/worker/internal/database/database.go b/services/worker/internal/database/database.go
new file mode 100644
index 0000000..2b2900f
--- /dev/null
+++ b/services/worker/internal/database/database.go
@@ -0,0 +1,39 @@
+package database
+
+import (
+ "context"
+ "fmt"
+ "github.com/jackc/pgx/v5"
+ "github.com/jackc/pgx/v5/pgxpool"
+ "time"
+)
+
+func CreateConnectionPool(parentContext context.Context, databaseURL string) (*pgxpool.Pool, error) {
+ poolConfiguration, parseError := pgxpool.ParseConfig(databaseURL)
+
+ if parseError != nil {
+ return nil, fmt.Errorf("failed to parse database URL: %w", parseError)
+ }
+
+ poolConfiguration.MaxConns = 25
+ poolConfiguration.MinConns = 5
+ poolConfiguration.MaxConnLifetime = 30 * time.Minute
+ poolConfiguration.MaxConnIdleTime = 5 * time.Minute
+ poolConfiguration.HealthCheckPeriod = 30 * time.Second
+ poolConfiguration.ConnConfig.DefaultQueryExecMode = pgx.QueryExecModeExec
+ connectionPool, connectionError := pgxpool.NewWithConfig(parentContext, poolConfiguration)
+
+ if connectionError != nil {
+ return nil, fmt.Errorf("failed to create connection pool: %w", connectionError)
+ }
+
+ pingError := connectionPool.Ping(parentContext)
+
+ if pingError != nil {
+ connectionPool.Close()
+
+ return nil, fmt.Errorf("failed to ping database: %w", pingError)
+ }
+
+ return connectionPool, nil
+}
diff --git a/services/worker/internal/fetcher/authentication.go b/services/worker/internal/fetcher/authentication.go
new file mode 100644
index 0000000..ba10196
--- /dev/null
+++ b/services/worker/internal/fetcher/authentication.go
@@ -0,0 +1,43 @@
+package fetcher
+
+import (
+ "encoding/base64"
+ "fmt"
+ "net/http"
+ "net/url"
+)
+
+type AuthenticationConfiguration struct {
+ AuthenticationType string
+ AuthenticationValue string
+}
+
+func ApplyAuthentication(request *http.Request, authenticationConfig AuthenticationConfiguration) error {
+ switch authenticationConfig.AuthenticationType {
+ case "bearer":
+ request.Header.Set("Authorization", "Bearer "+authenticationConfig.AuthenticationValue)
+ case "basic":
+ encodedCredentials := base64.StdEncoding.EncodeToString([]byte(authenticationConfig.AuthenticationValue))
+
+ request.Header.Set("Authorization", "Basic "+encodedCredentials)
+ case "query_param":
+ existingURL, parseError := url.Parse(request.URL.String())
+
+ if parseError != nil {
+ return fmt.Errorf("failed to parse request URL for query param authentication: %w", parseError)
+ }
+
+ queryParameters := existingURL.Query()
+
+ queryParameters.Set("key", authenticationConfig.AuthenticationValue)
+
+ existingURL.RawQuery = queryParameters.Encode()
+ request.URL = existingURL
+ case "":
+ return nil
+ default:
+ return fmt.Errorf("unsupported authentication type: %s", authenticationConfig.AuthenticationType)
+ }
+
+ return nil
+}
diff --git a/services/worker/internal/fetcher/errors.go b/services/worker/internal/fetcher/errors.go
new file mode 100644
index 0000000..abf5553
--- /dev/null
+++ b/services/worker/internal/fetcher/errors.go
@@ -0,0 +1,145 @@
+package fetcher
+
+import (
+ "errors"
+ "fmt"
+ "net"
+ "net/url"
+ "strings"
+)
+
+type FetchError struct {
+ StatusCode int
+ UserMessage string
+ Retryable bool
+ UnderlyingError error
+}
+
+func (fetchError *FetchError) Error() string {
+ if fetchError.UnderlyingError != nil {
+ return fmt.Sprintf("%s: %v", fetchError.UserMessage, fetchError.UnderlyingError)
+ }
+
+ return fetchError.UserMessage
+}
+
+func (fetchError *FetchError) Unwrap() error {
+ return fetchError.UnderlyingError
+}
+
+func ClassifyError(originalError error, statusCode int) *FetchError {
+ if statusCode >= 400 {
+ return classifyHTTPStatusCode(statusCode, originalError)
+ }
+
+ return classifyNetworkError(originalError)
+}
+
+func classifyHTTPStatusCode(statusCode int, originalError error) *FetchError {
+ switch statusCode {
+ case 401:
+ return &FetchError{
+ StatusCode: statusCode,
+ UserMessage: "authentication required - check feed credentials",
+ Retryable: false,
+ UnderlyingError: originalError,
+ }
+ case 403:
+ return &FetchError{
+ StatusCode: statusCode,
+ UserMessage: "access forbidden - feed may require different credentials",
+ Retryable: false,
+ UnderlyingError: originalError,
+ }
+ case 404:
+ return &FetchError{
+ StatusCode: statusCode,
+ UserMessage: "feed not found - URL may be incorrect or feed removed",
+ Retryable: false,
+ UnderlyingError: originalError,
+ }
+ case 410:
+ return &FetchError{
+ StatusCode: statusCode,
+ UserMessage: "feed permanently removed",
+ Retryable: false,
+ UnderlyingError: originalError,
+ }
+ case 429:
+ return &FetchError{
+ StatusCode: statusCode,
+ UserMessage: "rate limited by feed server - will retry later",
+ Retryable: true,
+ UnderlyingError: originalError,
+ }
+ case 500, 502, 503, 504:
+ return &FetchError{
+ StatusCode: statusCode,
+ UserMessage: fmt.Sprintf("feed server error (HTTP %d) - will retry later", statusCode),
+ Retryable: true,
+ UnderlyingError: originalError,
+ }
+ default:
+ return &FetchError{
+ StatusCode: statusCode,
+ UserMessage: fmt.Sprintf("unexpected HTTP status %d", statusCode),
+ Retryable: statusCode >= 500,
+ UnderlyingError: originalError,
+ }
+ }
+}
+
+func classifyNetworkError(originalError error) *FetchError {
+ if originalError == nil {
+ return &FetchError{
+ UserMessage: "unknown fetch error",
+ Retryable: true,
+ }
+ }
+
+ var dnsError *net.DNSError
+
+ if errors.As(originalError, &dnsError) {
+ return &FetchError{
+ UserMessage: "DNS resolution failed - check feed URL",
+ Retryable: !dnsError.IsNotFound,
+ UnderlyingError: originalError,
+ }
+ }
+
+ var urlError *url.Error
+
+ if errors.As(originalError, &urlError) {
+ if urlError.Timeout() {
+ return &FetchError{
+ UserMessage: "connection timed out - feed server may be slow",
+ Retryable: true,
+ UnderlyingError: originalError,
+ }
+ }
+ }
+
+ var netOpError *net.OpError
+
+ if errors.As(originalError, &netOpError) {
+ return &FetchError{
+ UserMessage: "network connection error - will retry later",
+ Retryable: true,
+ UnderlyingError: originalError,
+ }
+ }
+
+ if strings.Contains(originalError.Error(), "certificate") || strings.Contains(originalError.Error(), "tls") {
+ return &FetchError{
+ UserMessage: "TLS/certificate error - feed server has invalid certificate",
+ Retryable: false,
+ UnderlyingError: originalError,
+ }
+ }
+
+ return &FetchError{
+ UserMessage: "unexpected network error",
+ Retryable: true,
+ UnderlyingError: originalError,
+ }
+}
diff --git a/services/worker/internal/fetcher/fetcher.go b/services/worker/internal/fetcher/fetcher.go
new file mode 100644
index 0000000..019bd39
--- /dev/null
+++ b/services/worker/internal/fetcher/fetcher.go
@@ -0,0 +1,116 @@
+package fetcher
+
+import (
+ "context"
+ "fmt"
+ "io"
+ "net/http"
+ "time"
+)
+
+type FetchResult struct {
+ Body []byte
+ StatusCode int
+ EntityTag string
+ LastModifiedHeader string
+ NotModified bool
+}
+
+type Fetcher struct {
+ httpClient *http.Client
+}
+
+func NewFetcher(fetchTimeout time.Duration) *Fetcher {
+ return &Fetcher{
+ httpClient: &http.Client{
+ Timeout: fetchTimeout,
+ CheckRedirect: func(request *http.Request, previousRequests []*http.Request) error {
+ if len(previousRequests) >= 5 {
+ return fmt.Errorf("too many redirects (exceeded 5)")
+ }
+
+ redirectValidationError := ValidateRedirectTarget(request.URL.String())
+ if redirectValidationError != nil {
+ return fmt.Errorf("blocked redirect to reserved address: %w", redirectValidationError)
+ }
+
+ return nil
+ },
+ },
+ }
+}
+
+func (feedFetcher *Fetcher) Fetch(
+ requestContext context.Context,
+ feedURL string,
+ previousEntityTag string,
+ previousLastModified string,
+ authenticationConfig AuthenticationConfiguration,
+) (*FetchResult, error) {
+ urlValidationError := ValidateFeedURL(feedURL)
+ if urlValidationError != nil {
+ return nil, fmt.Errorf("blocked request to disallowed URL: %w", urlValidationError)
+ }
+
+ request, requestCreationError := http.NewRequestWithContext(requestContext, http.MethodGet, feedURL, nil)
+
+ if requestCreationError != nil {
+ return nil, fmt.Errorf("failed to create HTTP request: %w", requestCreationError)
+ }
+
+ request.Header.Set("User-Agent", "asa.news Feed Worker/1.0")
+ request.Header.Set("Accept", "application/rss+xml, application/atom+xml, application/xml, text/xml, */*")
+
+ if previousEntityTag != "" {
+ request.Header.Set("If-None-Match", previousEntityTag)
+ }
+
+ if previousLastModified != "" {
+ request.Header.Set("If-Modified-Since", previousLastModified)
+ }
+
+ authenticationError := ApplyAuthentication(request, authenticationConfig)
+
+ if authenticationError != nil {
+ return nil, fmt.Errorf("failed to apply authentication: %w", authenticationError)
+ }
+
+ response, requestError := feedFetcher.httpClient.Do(request)
+
+ if requestError != nil {
+ classifiedError := ClassifyError(requestError, 0)
+
+ return nil, classifiedError
+ }
+
+ defer response.Body.Close()
+
+ if response.StatusCode == http.StatusNotModified {
+ return &FetchResult{
+ StatusCode: response.StatusCode,
+ EntityTag: response.Header.Get("ETag"),
+ LastModifiedHeader: response.Header.Get("Last-Modified"),
+ NotModified: true,
+ }, nil
+ }
+
+ if response.StatusCode >= 400 {
+ classifiedError := ClassifyError(nil, response.StatusCode)
+
+ return nil, classifiedError
+ }
+
+ responseBody, readError := io.ReadAll(io.LimitReader(response.Body, 10*1024*1024))
+
+ if readError != nil {
+ return nil, fmt.Errorf("failed to read response body: %w", readError)
+ }
+
+ return &FetchResult{
+ Body: responseBody,
+ StatusCode: response.StatusCode,
+ EntityTag: response.Header.Get("ETag"),
+ LastModifiedHeader: response.Header.Get("Last-Modified"),
+ NotModified: false,
+ }, nil
+}
diff --git a/services/worker/internal/fetcher/ssrf_protection.go b/services/worker/internal/fetcher/ssrf_protection.go
new file mode 100644
index 0000000..1887e78
--- /dev/null
+++ b/services/worker/internal/fetcher/ssrf_protection.go
@@ -0,0 +1,77 @@
+package fetcher
+
+import (
+ "context"
+ "fmt"
+ "net"
+ "net/url"
+ "strings"
+ "time"
+)
+
+var reservedNetworks = []net.IPNet{
+ {IP: net.IPv4(10, 0, 0, 0), Mask: net.CIDRMask(8, 32)},
+ {IP: net.IPv4(172, 16, 0, 0), Mask: net.CIDRMask(12, 32)},
+ {IP: net.IPv4(192, 168, 0, 0), Mask: net.CIDRMask(16, 32)},
+ {IP: net.IPv4(127, 0, 0, 0), Mask: net.CIDRMask(8, 32)},
+ {IP: net.IPv4(169, 254, 0, 0), Mask: net.CIDRMask(16, 32)},
+ {IP: net.IPv4(0, 0, 0, 0), Mask: net.CIDRMask(8, 32)},
+ {IP: net.ParseIP("::1"), Mask: net.CIDRMask(128, 128)},
+ {IP: net.ParseIP("fc00::"), Mask: net.CIDRMask(7, 128)},
+ {IP: net.ParseIP("fe80::"), Mask: net.CIDRMask(10, 128)},
+}
+
+func isReservedAddress(ipAddress net.IP) bool {
+ for _, network := range reservedNetworks {
+ if network.Contains(ipAddress) {
+ return true
+ }
+ }
+
+ return false
+}
+
+func ValidateFeedURL(feedURL string) error {
+ parsedURL, parseError := url.Parse(feedURL)
+ if parseError != nil {
+ return fmt.Errorf("invalid URL: %w", parseError)
+ }
+
+ scheme := strings.ToLower(parsedURL.Scheme)
+ if scheme != "http" && scheme != "https" {
+ return fmt.Errorf("unsupported scheme %q: only http and https are allowed", parsedURL.Scheme)
+ }
+
+ hostname := parsedURL.Hostname()
+ if hostname == "" {
+ return fmt.Errorf("URL has no hostname")
+ }
+
+ if parsedIP := net.ParseIP(hostname); parsedIP != nil {
+ if isReservedAddress(parsedIP) {
+ return fmt.Errorf("feed URL resolves to a reserved IP address")
+ }
+
+ return nil
+ }
+
+ resolverContext, cancelResolver := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancelResolver()
+
+ resolvedAddresses, lookupError := net.DefaultResolver.LookupIPAddr(resolverContext, hostname)
+ if lookupError != nil {
+ return fmt.Errorf("failed to resolve hostname %q: %w", hostname, lookupError)
+ }
+
+ for _, resolvedAddress := range resolvedAddresses {
+ if isReservedAddress(resolvedAddress.IP) {
+ return fmt.Errorf("feed URL resolves to a reserved IP address")
+ }
+ }
+
+ return nil
+}
+
+func ValidateRedirectTarget(redirectURL string) error {
+ return ValidateFeedURL(redirectURL)
+}
diff --git a/services/worker/internal/health/health.go b/services/worker/internal/health/health.go
new file mode 100644
index 0000000..0a29c92
--- /dev/null
+++ b/services/worker/internal/health/health.go
@@ -0,0 +1,117 @@
+package health
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "github.com/jackc/pgx/v5/pgxpool"
+ "log/slog"
+ "net/http"
+ "time"
+)
+
+type HealthStatus struct {
+ Status string `json:"status"`
+ Timestamp string `json:"timestamp"`
+ Database string `json:"database"`
+}
+
+type HealthServer struct {
+ httpServer *http.Server
+ databaseConnectionPool *pgxpool.Pool
+ logger *slog.Logger
+}
+
+func NewHealthServer(
+ healthPort int,
+ databaseConnectionPool *pgxpool.Pool,
+ logger *slog.Logger,
+) *HealthServer {
+ healthServer := &HealthServer{
+ databaseConnectionPool: databaseConnectionPool,
+ logger: logger,
+ }
+ serveMux := http.NewServeMux()
+
+ serveMux.HandleFunc("/health", healthServer.handleHealthCheck)
+ serveMux.HandleFunc("/ready", healthServer.handleReadinessCheck)
+
+ healthServer.httpServer = &http.Server{
+ Addr: fmt.Sprintf(":%d", healthPort),
+ Handler: serveMux,
+ ReadTimeout: 5 * time.Second,
+ WriteTimeout: 5 * time.Second,
+ IdleTimeout: 30 * time.Second,
+ }
+
+ return healthServer
+}
+
+func (healthServer *HealthServer) Start() {
+ go func() {
+ healthServer.logger.Info("health server starting", "address", healthServer.httpServer.Addr)
+
+ listenError := healthServer.httpServer.ListenAndServe()
+
+ if listenError != nil && listenError != http.ErrServerClosed {
+ healthServer.logger.Error("health server failed", "error", listenError)
+ }
+ }()
+}
+
+func (healthServer *HealthServer) Stop(shutdownContext context.Context) error {
+ return healthServer.httpServer.Shutdown(shutdownContext)
+}
+
+func (healthServer *HealthServer) handleHealthCheck(responseWriter http.ResponseWriter, request *http.Request) {
+ healthStatus := HealthStatus{
+ Status: "healthy",
+ Timestamp: time.Now().UTC().Format(time.RFC3339),
+ Database: "unknown",
+ }
+ pingContext, cancelPing := context.WithTimeout(request.Context(), 2*time.Second)
+
+ defer cancelPing()
+
+ pingError := healthServer.databaseConnectionPool.Ping(pingContext)
+
+ if pingError != nil {
+ healthStatus.Status = "unhealthy"
+ healthStatus.Database = "unreachable"
+
+ respondWithJSON(responseWriter, http.StatusServiceUnavailable, healthStatus)
+
+ return
+ }
+
+ healthStatus.Database = "connected"
+
+ respondWithJSON(responseWriter, http.StatusOK, healthStatus)
+}
+
+func (healthServer *HealthServer) handleReadinessCheck(responseWriter http.ResponseWriter, request *http.Request) {
+ pingContext, cancelPing := context.WithTimeout(request.Context(), 2*time.Second)
+
+ defer cancelPing()
+
+ pingError := healthServer.databaseConnectionPool.Ping(pingContext)
+
+ if pingError != nil {
+ responseWriter.WriteHeader(http.StatusServiceUnavailable)
+
+ return
+ }
+
+ responseWriter.WriteHeader(http.StatusOK)
+}
+
+func respondWithJSON(responseWriter http.ResponseWriter, statusCode int, payload interface{}) {
+ responseWriter.Header().Set("Content-Type", "application/json")
+ responseWriter.WriteHeader(statusCode)
+
+ encodingError := json.NewEncoder(responseWriter).Encode(payload)
+
+ if encodingError != nil {
+ return
+ }
+}
diff --git a/services/worker/internal/model/feed.go b/services/worker/internal/model/feed.go
new file mode 100644
index 0000000..611c820
--- /dev/null
+++ b/services/worker/internal/model/feed.go
@@ -0,0 +1,41 @@
+package model
+
+import (
+ "time"
+)
+
+type Feed struct {
+ Identifier string
+ URL string
+ SiteURL *string
+ Title *string
+ FeedType *string
+ Visibility string
+ EntityTag *string
+ LastModified *string
+ LastFetchedAt *time.Time
+ LastFetchError *string
+ ConsecutiveFailures int
+ NextFetchAt time.Time
+ FetchIntervalSeconds int
+ CreatedAt time.Time
+ UpdatedAt time.Time
+}
+
+type FeedEntry struct {
+ FeedIdentifier string
+ OwnerIdentifier *string
+ GUID string
+ URL *string
+ Title *string
+ Author *string
+ Summary *string
+ ContentHTML *string
+ ContentText *string
+ ImageURL *string
+ PublishedAt *time.Time
+ WordCount *int
+ EnclosureURL *string
+ EnclosureType *string
+ EnclosureLength *int64
+}
diff --git a/services/worker/internal/model/queue.go b/services/worker/internal/model/queue.go
new file mode 100644
index 0000000..a5aaa7c
--- /dev/null
+++ b/services/worker/internal/model/queue.go
@@ -0,0 +1,5 @@
+package model
+
+type RefreshRequest struct {
+ FeedIdentifier string `json:"feed_id"`
+}
diff --git a/services/worker/internal/parser/parser.go b/services/worker/internal/parser/parser.go
new file mode 100644
index 0000000..1fb2f76
--- /dev/null
+++ b/services/worker/internal/parser/parser.go
@@ -0,0 +1,234 @@
+package parser
+
+import (
+ "crypto/sha256"
+ "fmt"
+ "github.com/Fuwn/asa-news/internal/model"
+ "github.com/mmcdole/gofeed"
+ "strconv"
+ "strings"
+ "time"
+ "unicode/utf8"
+)
+
+type Parser struct {
+ gofeedParser *gofeed.Parser
+}
+
+func NewParser() *Parser {
+ return &Parser{
+ gofeedParser: gofeed.NewParser(),
+ }
+}
+
+type ParseResult struct {
+ Entries []model.FeedEntry
+ FeedTitle string
+ SiteURL string
+ AudioEnclosureRatio float64
+}
+
+func (feedParser *Parser) Parse(feedIdentifier string, ownerIdentifier *string, rawFeedContent []byte) (*ParseResult, error) {
+ parsedFeed, parseError := feedParser.gofeedParser.ParseString(string(rawFeedContent))
+
+ if parseError != nil {
+ return nil, fmt.Errorf("failed to parse feed content: %w", parseError)
+ }
+
+ feedEntries := make([]model.FeedEntry, 0, len(parsedFeed.Items))
+
+ audioEnclosureCount := 0
+
+ for _, feedItem := range parsedFeed.Items {
+ normalizedEntry := normalizeFeedItem(feedIdentifier, ownerIdentifier, feedItem)
+ if normalizedEntry.EnclosureURL != nil {
+ audioEnclosureCount++
+ }
+ feedEntries = append(feedEntries, normalizedEntry)
+ }
+
+ audioEnclosureRatio := 0.0
+ if len(feedEntries) > 0 {
+ audioEnclosureRatio = float64(audioEnclosureCount) / float64(len(feedEntries))
+ }
+
+ return &ParseResult{
+ Entries: feedEntries,
+ FeedTitle: strings.TrimSpace(parsedFeed.Title),
+ SiteURL: strings.TrimSpace(parsedFeed.Link),
+ AudioEnclosureRatio: audioEnclosureRatio,
+ }, nil
+}
+
+func normalizeFeedItem(feedIdentifier string, ownerIdentifier *string, feedItem *gofeed.Item) model.FeedEntry {
+ globallyUniqueIdentifier := resolveGloballyUniqueIdentifier(feedItem)
+ entryURL := stringPointerOrNil(resolveEntryURL(feedItem))
+ entryTitle := stringPointerOrNil(strings.TrimSpace(feedItem.Title))
+ entrySummary := stringPointerOrNil(strings.TrimSpace(feedItem.Description))
+ entryContentHTML := stringPointerOrNil(resolveContentHTML(feedItem))
+ entryContentText := resolveContentText(feedItem)
+ authorName := stringPointerOrNil(resolveAuthorName(feedItem))
+ publishedAt := resolvePublishedDate(feedItem)
+ entryImageURL := stringPointerOrNil(resolveImageURL(feedItem))
+ wordCount := countWords(entryContentText)
+ enclosureURL, enclosureType, enclosureLength := resolveAudioEnclosure(feedItem)
+
+ return model.FeedEntry{
+ FeedIdentifier: feedIdentifier,
+ OwnerIdentifier: ownerIdentifier,
+ GUID: globallyUniqueIdentifier,
+ URL: entryURL,
+ Title: entryTitle,
+ Author: authorName,
+ Summary: entrySummary,
+ ContentHTML: entryContentHTML,
+ ContentText: stringPointerOrNil(entryContentText),
+ ImageURL: entryImageURL,
+ PublishedAt: publishedAt,
+ WordCount: wordCount,
+ EnclosureURL: enclosureURL,
+ EnclosureType: enclosureType,
+ EnclosureLength: enclosureLength,
+ }
+}
+
+func stringPointerOrNil(value string) *string {
+ if value == "" {
+ return nil
+ }
+
+ return &value
+}
+
+func resolveGloballyUniqueIdentifier(feedItem *gofeed.Item) string {
+ if feedItem.GUID != "" {
+ return feedItem.GUID
+ }
+
+ if feedItem.Link != "" {
+ return feedItem.Link
+ }
+
+ hashInput := feedItem.Title + feedItem.Description
+ hashBytes := sha256.Sum256([]byte(hashInput))
+
+ return fmt.Sprintf("sha256:%x", hashBytes)
+}
+
+func resolveEntryURL(feedItem *gofeed.Item) string {
+ if feedItem.Link != "" {
+ return feedItem.Link
+ }
+
+ if feedItem.GUID != "" && strings.HasPrefix(feedItem.GUID, "http") {
+ return feedItem.GUID
+ }
+
+ return ""
+}
+
+func resolveContentHTML(feedItem *gofeed.Item) string {
+ if feedItem.Content != "" {
+ return feedItem.Content
+ }
+
+ return feedItem.Description
+}
+
+func resolveContentText(feedItem *gofeed.Item) string {
+ contentSource := feedItem.Content
+
+ if contentSource == "" {
+ contentSource = feedItem.Description
+ }
+
+ return stripHTMLTags(contentSource)
+}
+
+func resolveAuthorName(feedItem *gofeed.Item) string {
+ if feedItem.Author != nil && feedItem.Author.Name != "" {
+ return feedItem.Author.Name
+ }
+
+ if len(feedItem.Authors) > 0 && feedItem.Authors[0].Name != "" {
+ return feedItem.Authors[0].Name
+ }
+
+ return ""
+}
+
+func resolvePublishedDate(feedItem *gofeed.Item) *time.Time {
+ if feedItem.PublishedParsed != nil {
+ return feedItem.PublishedParsed
+ }
+
+ if feedItem.UpdatedParsed != nil {
+ return feedItem.UpdatedParsed
+ }
+
+ return nil
+}
+
+func resolveAudioEnclosure(feedItem *gofeed.Item) (*string, *string, *int64) {
+ if feedItem.Enclosures == nil {
+ return nil, nil, nil
+ }
+
+ for _, enclosure := range feedItem.Enclosures {
+ if strings.HasPrefix(enclosure.Type, "audio/") && enclosure.URL != "" {
+ enclosureURL := enclosure.URL
+ enclosureType := enclosure.Type
+
+ var enclosureLength *int64
+ if enclosure.Length != "" {
+ if parsedLength, parseError := strconv.ParseInt(enclosure.Length, 10, 64); parseError == nil {
+ enclosureLength = &parsedLength
+ }
+ }
+
+ return &enclosureURL, &enclosureType, enclosureLength
+ }
+ }
+
+ return nil, nil, nil
+}
+
+func resolveImageURL(feedItem *gofeed.Item) string {
+ if feedItem.Image != nil && feedItem.Image.URL != "" {
+ return feedItem.Image.URL
+ }
+
+ return ""
+}
+
+func countWords(plainText string) *int {
+ if plainText == "" {
+ return nil
+ }
+
+ count := len(strings.Fields(plainText))
+
+ return &count
+}
+
+func stripHTMLTags(htmlContent string) string {
+ var resultBuilder strings.Builder
+
+ insideTag := false
+
+ for characterIndex := 0; characterIndex < len(htmlContent); {
+ currentRune, runeSize := utf8.DecodeRuneInString(htmlContent[characterIndex:])
+
+ if currentRune == '<' {
+ insideTag = true
+ } else if currentRune == '>' {
+ insideTag = false
+ } else if !insideTag {
+ resultBuilder.WriteRune(currentRune)
+ }
+
+ characterIndex += runeSize
+ }
+
+ return strings.TrimSpace(resultBuilder.String())
+}
diff --git a/services/worker/internal/pool/pool.go b/services/worker/internal/pool/pool.go
new file mode 100644
index 0000000..7df03e2
--- /dev/null
+++ b/services/worker/internal/pool/pool.go
@@ -0,0 +1,60 @@
+package pool
+
+import (
+ "context"
+ "log/slog"
+ "sync"
+)
+
+type WorkFunction func(workContext context.Context)
+
+type WorkerPool struct {
+ concurrencyLimit int
+ semaphoreChannel chan struct{}
+ waitGroup sync.WaitGroup
+ logger *slog.Logger
+}
+
+func NewWorkerPool(concurrencyLimit int, logger *slog.Logger) *WorkerPool {
+ return &WorkerPool{
+ concurrencyLimit: concurrencyLimit,
+ semaphoreChannel: make(chan struct{}, concurrencyLimit),
+ logger: logger,
+ }
+}
+
+func (workerPool *WorkerPool) Submit(workContext context.Context, workFunction WorkFunction) bool {
+ select {
+ case workerPool.semaphoreChannel <- struct{}{}:
+ workerPool.waitGroup.Add(1)
+
+ go func() {
+ defer workerPool.waitGroup.Done()
+ defer func() { <-workerPool.semaphoreChannel }()
+ defer func() {
+ recoveredPanic := recover()
+
+ if recoveredPanic != nil {
+ workerPool.logger.Error(
+ "worker panic recovered",
+ "panic_value", recoveredPanic,
+ )
+ }
+ }()
+
+ workFunction(workContext)
+ }()
+
+ return true
+ case <-workContext.Done():
+ return false
+ }
+}
+
+func (workerPool *WorkerPool) Wait() {
+ workerPool.waitGroup.Wait()
+}
+
+func (workerPool *WorkerPool) ActiveWorkerCount() int {
+ return len(workerPool.semaphoreChannel)
+}
diff --git a/services/worker/internal/scheduler/refresh.go b/services/worker/internal/scheduler/refresh.go
new file mode 100644
index 0000000..8271fa0
--- /dev/null
+++ b/services/worker/internal/scheduler/refresh.go
@@ -0,0 +1,196 @@
+package scheduler
+
+import (
+ "context"
+ "github.com/Fuwn/asa-news/internal/fetcher"
+ "github.com/Fuwn/asa-news/internal/model"
+ "github.com/Fuwn/asa-news/internal/parser"
+ "github.com/Fuwn/asa-news/internal/webhook"
+ "github.com/Fuwn/asa-news/internal/writer"
+ "log/slog"
+)
+
+func ProcessRefreshRequest(
+ refreshContext context.Context,
+ feed model.Feed,
+ feedFetcher *fetcher.Fetcher,
+ feedParser *parser.Parser,
+ feedWriter *writer.Writer,
+ webhookDispatcher *webhook.Dispatcher,
+ logger *slog.Logger,
+) {
+ logger.Info(
+ "processing feed refresh",
+ "feed_identifier", feed.Identifier,
+ "feed_url", feed.URL,
+ )
+
+ var ownerIdentifier *string
+
+ if feed.Visibility == "authenticated" {
+ logger.Warn(
+ "authenticated feed refresh not yet implemented",
+ "feed_identifier", feed.Identifier,
+ )
+
+ return
+ }
+
+ authenticationConfig := fetcher.AuthenticationConfiguration{}
+
+ entityTag := ""
+
+ if feed.EntityTag != nil {
+ entityTag = *feed.EntityTag
+ }
+
+ lastModified := ""
+
+ if feed.LastModified != nil {
+ lastModified = *feed.LastModified
+ }
+
+ fetchResult, fetchError := feedFetcher.Fetch(
+ refreshContext,
+ feed.URL,
+ entityTag,
+ lastModified,
+ authenticationConfig,
+ )
+
+ if fetchError != nil {
+ logger.Warn(
+ "feed fetch failed",
+ "feed_identifier", feed.Identifier,
+ "error", fetchError,
+ )
+
+ recordError := feedWriter.RecordFeedError(refreshContext, feed.Identifier, fetchError.Error())
+
+ if recordError != nil {
+ logger.Error(
+ "failed to record feed error",
+ "feed_identifier", feed.Identifier,
+ "error", recordError,
+ )
+ }
+
+ return
+ }
+
+ if fetchResult.NotModified {
+ logger.Info(
+ "feed not modified",
+ "feed_identifier", feed.Identifier,
+ )
+
+ metadataError := feedWriter.UpdateFeedMetadata(
+ refreshContext,
+ feed.Identifier,
+ fetchResult.EntityTag,
+ fetchResult.LastModifiedHeader,
+ "",
+ "",
+ )
+
+ if metadataError != nil {
+ logger.Error(
+ "failed to update feed metadata after not-modified",
+ "feed_identifier", feed.Identifier,
+ "error", metadataError,
+ )
+ }
+
+ return
+ }
+
+ parseResult, parseError := feedParser.Parse(feed.Identifier, ownerIdentifier, fetchResult.Body)
+
+ if parseError != nil {
+ logger.Warn(
+ "feed parse failed",
+ "feed_identifier", feed.Identifier,
+ "error", parseError,
+ )
+
+ recordError := feedWriter.RecordFeedError(refreshContext, feed.Identifier, parseError.Error())
+
+ if recordError != nil {
+ logger.Error(
+ "failed to record parse error",
+ "feed_identifier", feed.Identifier,
+ "error", recordError,
+ )
+ }
+
+ return
+ }
+
+ rowsAffected, writeError := feedWriter.WriteEntries(refreshContext, parseResult.Entries)
+
+ if writeError != nil {
+ logger.Error(
+ "failed to write feed entries",
+ "feed_identifier", feed.Identifier,
+ "error", writeError,
+ )
+
+ recordError := feedWriter.RecordFeedError(refreshContext, feed.Identifier, writeError.Error())
+
+ if recordError != nil {
+ logger.Error(
+ "failed to record write error",
+ "feed_identifier", feed.Identifier,
+ "error", recordError,
+ )
+ }
+
+ return
+ }
+
+ if rowsAffected > 0 && webhookDispatcher != nil {
+ webhookDispatcher.DispatchForFeed(refreshContext, feed.Identifier, parseResult.Entries)
+ }
+
+ metadataError := feedWriter.UpdateFeedMetadata(
+ refreshContext,
+ feed.Identifier,
+ fetchResult.EntityTag,
+ fetchResult.LastModifiedHeader,
+ parseResult.FeedTitle,
+ parseResult.SiteURL,
+ )
+
+ if metadataError != nil {
+ logger.Error(
+ "failed to update feed metadata",
+ "feed_identifier", feed.Identifier,
+ "error", metadataError,
+ )
+ }
+
+ if parseResult.AudioEnclosureRatio > 0.5 {
+ if feedTypeError := feedWriter.UpdateFeedType(refreshContext, feed.Identifier, "podcast"); feedTypeError != nil {
+ logger.Error(
+ "failed to update feed type to podcast",
+ "feed_identifier", feed.Identifier,
+ "error", feedTypeError,
+ )
+ }
+ } else if feed.FeedType != nil && *feed.FeedType == "podcast" {
+ if feedTypeError := feedWriter.UpdateFeedType(refreshContext, feed.Identifier, "rss"); feedTypeError != nil {
+ logger.Error(
+ "failed to clear podcast feed type",
+ "feed_identifier", feed.Identifier,
+ "error", feedTypeError,
+ )
+ }
+ }
+
+ logger.Info(
+ "feed refresh completed",
+ "feed_identifier", feed.Identifier,
+ "entries_parsed", len(parseResult.Entries),
+ "rows_affected", rowsAffected,
+ )
+}
diff --git a/services/worker/internal/scheduler/scheduler.go b/services/worker/internal/scheduler/scheduler.go
new file mode 100644
index 0000000..646e263
--- /dev/null
+++ b/services/worker/internal/scheduler/scheduler.go
@@ -0,0 +1,283 @@
+package scheduler
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "github.com/Fuwn/asa-news/internal/fetcher"
+ "github.com/Fuwn/asa-news/internal/model"
+ "github.com/Fuwn/asa-news/internal/parser"
+ "github.com/Fuwn/asa-news/internal/pool"
+ "github.com/Fuwn/asa-news/internal/webhook"
+ "github.com/Fuwn/asa-news/internal/writer"
+ pgmq "github.com/craigpastro/pgmq-go"
+ "github.com/jackc/pgx/v5/pgxpool"
+ "log/slog"
+ "time"
+)
+
+type Scheduler struct {
+ databaseConnectionPool *pgxpool.Pool
+ feedFetcher *fetcher.Fetcher
+ feedParser *parser.Parser
+ feedWriter *writer.Writer
+ webhookDispatcher *webhook.Dispatcher
+ workerPool *pool.WorkerPool
+ pollInterval time.Duration
+ queuePollInterval time.Duration
+ batchSize int
+ logger *slog.Logger
+}
+
+func NewScheduler(
+ databaseConnectionPool *pgxpool.Pool,
+ feedFetcher *fetcher.Fetcher,
+ feedParser *parser.Parser,
+ feedWriter *writer.Writer,
+ webhookDispatcher *webhook.Dispatcher,
+ workerPool *pool.WorkerPool,
+ pollInterval time.Duration,
+ queuePollInterval time.Duration,
+ batchSize int,
+ logger *slog.Logger,
+) *Scheduler {
+ return &Scheduler{
+ databaseConnectionPool: databaseConnectionPool,
+ feedFetcher: feedFetcher,
+ feedParser: feedParser,
+ feedWriter: feedWriter,
+ webhookDispatcher: webhookDispatcher,
+ workerPool: workerPool,
+ pollInterval: pollInterval,
+ queuePollInterval: queuePollInterval,
+ batchSize: batchSize,
+ logger: logger,
+ }
+}
+
+func (feedScheduler *Scheduler) Run(schedulerContext context.Context) {
+ pollTicker := time.NewTicker(feedScheduler.pollInterval)
+
+ defer pollTicker.Stop()
+
+ queueTicker := time.NewTicker(feedScheduler.queuePollInterval)
+
+ defer queueTicker.Stop()
+
+ feedScheduler.logger.Info("scheduler started",
+ "poll_interval", feedScheduler.pollInterval,
+ "queue_poll_interval", feedScheduler.queuePollInterval,
+ "batch_size", feedScheduler.batchSize,
+ )
+ feedScheduler.executePollCycle(schedulerContext)
+ feedScheduler.executeQueueCycle(schedulerContext)
+
+ for {
+ select {
+ case <-schedulerContext.Done():
+ feedScheduler.logger.Info("scheduler shutting down")
+
+ return
+ case <-pollTicker.C:
+ feedScheduler.executePollCycle(schedulerContext)
+ case <-queueTicker.C:
+ feedScheduler.executeQueueCycle(schedulerContext)
+ }
+ }
+}
+
+func (feedScheduler *Scheduler) executePollCycle(cycleContext context.Context) {
+ claimedFeeds, claimError := feedScheduler.claimDueFeeds(cycleContext)
+
+ if claimError != nil {
+ feedScheduler.logger.Error("failed to claim due feeds", "error", claimError)
+
+ return
+ }
+
+ if len(claimedFeeds) == 0 {
+ return
+ }
+
+ feedScheduler.logger.Info("claimed feeds for processing", "count", len(claimedFeeds))
+
+ for _, claimedFeed := range claimedFeeds {
+ capturedFeed := claimedFeed
+
+ feedScheduler.workerPool.Submit(cycleContext, func(workContext context.Context) {
+ ProcessRefreshRequest(
+ workContext,
+ capturedFeed,
+ feedScheduler.feedFetcher,
+ feedScheduler.feedParser,
+ feedScheduler.feedWriter,
+ feedScheduler.webhookDispatcher,
+ feedScheduler.logger,
+ )
+ })
+ }
+}
+
+func (feedScheduler *Scheduler) executeQueueCycle(cycleContext context.Context) {
+ queueMessage, readError := pgmq.Read(cycleContext, feedScheduler.databaseConnectionPool, "feed_refresh", 30)
+
+ if readError != nil {
+ return
+ }
+
+ if queueMessage == nil {
+ return
+ }
+
+ var refreshRequest model.RefreshRequest
+
+ unmarshalError := json.Unmarshal(queueMessage.Message, &refreshRequest)
+
+ if unmarshalError != nil {
+ feedScheduler.logger.Error("failed to unmarshal refresh request", "error", unmarshalError)
+
+ return
+ }
+
+ feed, lookupError := feedScheduler.lookupFeed(cycleContext, refreshRequest.FeedIdentifier)
+
+ if lookupError != nil {
+ feedScheduler.logger.Error(
+ "failed to look up feed for queue request",
+ "feed_identifier", refreshRequest.FeedIdentifier,
+ "error", lookupError,
+ )
+
+ return
+ }
+
+ capturedMessageIdentifier := queueMessage.MsgID
+
+ feedScheduler.workerPool.Submit(cycleContext, func(workContext context.Context) {
+ ProcessRefreshRequest(
+ workContext,
+ feed,
+ feedScheduler.feedFetcher,
+ feedScheduler.feedParser,
+ feedScheduler.feedWriter,
+ feedScheduler.webhookDispatcher,
+ feedScheduler.logger,
+ )
+
+ _, archiveError := pgmq.Archive(workContext, feedScheduler.databaseConnectionPool, "feed_refresh", capturedMessageIdentifier)
+
+ if archiveError != nil {
+ feedScheduler.logger.Error(
+ "failed to archive queue message",
+ "message_id", capturedMessageIdentifier,
+ "error", archiveError,
+ )
+ }
+ })
+}
+
+func (feedScheduler *Scheduler) claimDueFeeds(claimContext context.Context) ([]model.Feed, error) {
+ claimQuery := `
+ UPDATE feeds
+ SET last_fetched_at = NOW()
+ WHERE id IN (
+ SELECT id
+ FROM feeds
+ WHERE next_fetch_at <= NOW()
+ AND subscriber_count > 0
+ ORDER BY next_fetch_at ASC
+ LIMIT $1
+ FOR UPDATE SKIP LOCKED
+ )
+ RETURNING
+ id, url, site_url, title, feed_type,
+ visibility, etag, last_modified,
+ last_fetched_at, last_fetch_error,
+ consecutive_failures, next_fetch_at,
+ fetch_interval_seconds,
+ created_at, updated_at
+ `
+ rows, queryError := feedScheduler.databaseConnectionPool.Query(claimContext, claimQuery, feedScheduler.batchSize)
+
+ if queryError != nil {
+ return nil, fmt.Errorf("failed to query due feeds: %w", queryError)
+ }
+
+ defer rows.Close()
+
+ claimedFeeds := make([]model.Feed, 0)
+
+ for rows.Next() {
+ var feed model.Feed
+
+ scanError := rows.Scan(
+ &feed.Identifier,
+ &feed.URL,
+ &feed.SiteURL,
+ &feed.Title,
+ &feed.FeedType,
+ &feed.Visibility,
+ &feed.EntityTag,
+ &feed.LastModified,
+ &feed.LastFetchedAt,
+ &feed.LastFetchError,
+ &feed.ConsecutiveFailures,
+ &feed.NextFetchAt,
+ &feed.FetchIntervalSeconds,
+ &feed.CreatedAt,
+ &feed.UpdatedAt,
+ )
+
+ if scanError != nil {
+ return nil, fmt.Errorf("failed to scan feed row: %w", scanError)
+ }
+
+ claimedFeeds = append(claimedFeeds, feed)
+ }
+
+ if rows.Err() != nil {
+ return nil, fmt.Errorf("error iterating feed rows: %w", rows.Err())
+ }
+
+ return claimedFeeds, nil
+}
+
+func (feedScheduler *Scheduler) lookupFeed(lookupContext context.Context, feedIdentifier string) (model.Feed, error) {
+ lookupQuery := `
+ SELECT
+ id, url, site_url, title, feed_type,
+ visibility, etag, last_modified,
+ last_fetched_at, last_fetch_error,
+ consecutive_failures, next_fetch_at,
+ fetch_interval_seconds,
+ created_at, updated_at
+ FROM feeds
+ WHERE id = $1
+ `
+
+ var feed model.Feed
+
+ scanError := feedScheduler.databaseConnectionPool.QueryRow(lookupContext, lookupQuery, feedIdentifier).Scan(
+ &feed.Identifier,
+ &feed.URL,
+ &feed.SiteURL,
+ &feed.Title,
+ &feed.FeedType,
+ &feed.Visibility,
+ &feed.EntityTag,
+ &feed.LastModified,
+ &feed.LastFetchedAt,
+ &feed.LastFetchError,
+ &feed.ConsecutiveFailures,
+ &feed.NextFetchAt,
+ &feed.FetchIntervalSeconds,
+ &feed.CreatedAt,
+ &feed.UpdatedAt,
+ )
+
+ if scanError != nil {
+ return model.Feed{}, fmt.Errorf("failed to look up feed %s: %w", feedIdentifier, scanError)
+ }
+
+ return feed, nil
+}
diff --git a/services/worker/internal/webhook/webhook.go b/services/worker/internal/webhook/webhook.go
new file mode 100644
index 0000000..c812820
--- /dev/null
+++ b/services/worker/internal/webhook/webhook.go
@@ -0,0 +1,333 @@
+package webhook
+
+import (
+ "bytes"
+ "context"
+ "crypto/hmac"
+ "crypto/sha256"
+ "encoding/hex"
+ "encoding/json"
+ "fmt"
+ "github.com/Fuwn/asa-news/internal/fetcher"
+ "github.com/Fuwn/asa-news/internal/model"
+ "github.com/jackc/pgx/v5/pgxpool"
+ "log/slog"
+ "net/http"
+ "time"
+)
+
+type Dispatcher struct {
+ databaseConnectionPool *pgxpool.Pool
+ httpClient *http.Client
+ logger *slog.Logger
+}
+
+type webhookSubscriber struct {
+ UserIdentifier string
+ WebhookURL string
+ WebhookSecret *string
+}
+
+type EntryPayload struct {
+ EntryIdentifier string `json:"entryIdentifier"`
+ FeedIdentifier string `json:"feedIdentifier"`
+ GUID string `json:"guid"`
+ URL *string `json:"url"`
+ Title *string `json:"title"`
+ Author *string `json:"author"`
+ Summary *string `json:"summary"`
+ PublishedAt *string `json:"publishedAt"`
+ EnclosureURL *string `json:"enclosureUrl"`
+ EnclosureType *string `json:"enclosureType"`
+}
+
+type WebhookPayload struct {
+ Event string `json:"event"`
+ Timestamp string `json:"timestamp"`
+ Entries []EntryPayload `json:"entries"`
+}
+
+func NewDispatcher(
+ databaseConnectionPool *pgxpool.Pool,
+ logger *slog.Logger,
+) *Dispatcher {
+ return &Dispatcher{
+ databaseConnectionPool: databaseConnectionPool,
+ httpClient: &http.Client{
+ Timeout: 10 * time.Second,
+ },
+ logger: logger,
+ }
+}
+
+func (webhookDispatcher *Dispatcher) DispatchForFeed(
+ dispatchContext context.Context,
+ feedIdentifier string,
+ entries []model.FeedEntry,
+) {
+ subscribers, queryError := webhookDispatcher.findWebhookSubscribers(dispatchContext, feedIdentifier)
+
+ if queryError != nil {
+ webhookDispatcher.logger.Error(
+ "failed to query webhook subscribers",
+ "feed_identifier", feedIdentifier,
+ "error", queryError,
+ )
+
+ return
+ }
+
+ if len(subscribers) == 0 {
+ return
+ }
+
+ entryPayloads := make([]EntryPayload, 0, len(entries))
+
+ for _, entry := range entries {
+ var publishedAtString *string
+
+ if entry.PublishedAt != nil {
+ formattedTime := entry.PublishedAt.Format(time.RFC3339)
+ publishedAtString = &formattedTime
+ }
+
+ entryPayloads = append(entryPayloads, EntryPayload{
+ EntryIdentifier: entry.FeedIdentifier,
+ FeedIdentifier: entry.FeedIdentifier,
+ GUID: entry.GUID,
+ URL: entry.URL,
+ Title: entry.Title,
+ Author: entry.Author,
+ Summary: entry.Summary,
+ PublishedAt: publishedAtString,
+ EnclosureURL: entry.EnclosureURL,
+ EnclosureType: entry.EnclosureType,
+ })
+ }
+
+ payload := WebhookPayload{
+ Event: "entries.created",
+ Timestamp: time.Now().UTC().Format(time.RFC3339),
+ Entries: entryPayloads,
+ }
+
+ for _, subscriber := range subscribers {
+ webhookDispatcher.deliverWebhook(dispatchContext, subscriber, payload)
+ }
+}
+
+func (webhookDispatcher *Dispatcher) findWebhookSubscribers(
+ queryContext context.Context,
+ feedIdentifier string,
+) ([]webhookSubscriber, error) {
+ subscriberQuery := `
+ SELECT up.id, up.webhook_url, up.webhook_secret
+ FROM subscriptions s
+ JOIN user_profiles up ON up.id = s.user_id
+ WHERE s.feed_id = $1
+ AND up.tier = 'developer'
+ AND up.webhook_enabled = true
+ AND up.webhook_url IS NOT NULL
+ `
+
+ rows, queryError := webhookDispatcher.databaseConnectionPool.Query(
+ queryContext,
+ subscriberQuery,
+ feedIdentifier,
+ )
+
+ if queryError != nil {
+ return nil, fmt.Errorf("failed to query webhook subscribers: %w", queryError)
+ }
+
+ defer rows.Close()
+
+ subscribers := make([]webhookSubscriber, 0)
+
+ for rows.Next() {
+ var subscriber webhookSubscriber
+
+ scanError := rows.Scan(
+ &subscriber.UserIdentifier,
+ &subscriber.WebhookURL,
+ &subscriber.WebhookSecret,
+ )
+
+ if scanError != nil {
+ return nil, fmt.Errorf("failed to scan subscriber row: %w", scanError)
+ }
+
+ subscribers = append(subscribers, subscriber)
+ }
+
+ return subscribers, rows.Err()
+}
+
+func (webhookDispatcher *Dispatcher) deliverWebhook(
+ deliveryContext context.Context,
+ subscriber webhookSubscriber,
+ payload WebhookPayload,
+) {
+ urlValidationError := fetcher.ValidateFeedURL(subscriber.WebhookURL)
+
+ if urlValidationError != nil {
+ webhookDispatcher.logger.Warn(
+ "webhook URL failed SSRF validation",
+ "user_identifier", subscriber.UserIdentifier,
+ "webhook_url", subscriber.WebhookURL,
+ "error", urlValidationError,
+ )
+
+ return
+ }
+
+ payloadBytes, marshalError := json.Marshal(payload)
+
+ if marshalError != nil {
+ webhookDispatcher.logger.Error(
+ "failed to marshal webhook payload",
+ "user_identifier", subscriber.UserIdentifier,
+ "error", marshalError,
+ )
+
+ return
+ }
+
+ retryDelays := []time.Duration{0, 1 * time.Second, 4 * time.Second, 16 * time.Second}
+ var lastError error
+
+ for attemptIndex, delay := range retryDelays {
+ if delay > 0 {
+ select {
+ case <-time.After(delay):
+ case <-deliveryContext.Done():
+ return
+ }
+ }
+
+ deliveryError := webhookDispatcher.sendWebhookRequest(
+ deliveryContext,
+ subscriber,
+ payloadBytes,
+ )
+
+ if deliveryError == nil {
+ if attemptIndex > 0 {
+ webhookDispatcher.logger.Info(
+ "webhook delivered after retry",
+ "user_identifier", subscriber.UserIdentifier,
+ "attempt", attemptIndex+1,
+ )
+ }
+
+ webhookDispatcher.resetConsecutiveFailures(deliveryContext, subscriber.UserIdentifier)
+
+ return
+ }
+
+ lastError = deliveryError
+
+ webhookDispatcher.logger.Warn(
+ "webhook delivery attempt failed",
+ "user_identifier", subscriber.UserIdentifier,
+ "attempt", attemptIndex+1,
+ "error", deliveryError,
+ )
+ }
+
+ webhookDispatcher.logger.Error(
+ "webhook delivery failed after all retries",
+ "user_identifier", subscriber.UserIdentifier,
+ "error", lastError,
+ )
+
+ webhookDispatcher.incrementConsecutiveFailures(deliveryContext, subscriber.UserIdentifier)
+}
+
+func (webhookDispatcher *Dispatcher) sendWebhookRequest(
+ requestContext context.Context,
+ subscriber webhookSubscriber,
+ payloadBytes []byte,
+) error {
+ request, requestCreationError := http.NewRequestWithContext(
+ requestContext,
+ http.MethodPost,
+ subscriber.WebhookURL,
+ bytes.NewReader(payloadBytes),
+ )
+
+ if requestCreationError != nil {
+ return fmt.Errorf("failed to create webhook request: %w", requestCreationError)
+ }
+
+ request.Header.Set("Content-Type", "application/json")
+ request.Header.Set("User-Agent", "asa.news Webhook/1.0")
+
+ if subscriber.WebhookSecret != nil && *subscriber.WebhookSecret != "" {
+ mac := hmac.New(sha256.New, []byte(*subscriber.WebhookSecret))
+ mac.Write(payloadBytes)
+ signature := hex.EncodeToString(mac.Sum(nil))
+ request.Header.Set("X-Asa-Signature-256", "sha256="+signature)
+ }
+
+ response, requestError := webhookDispatcher.httpClient.Do(request)
+
+ if requestError != nil {
+ return fmt.Errorf("webhook request failed: %w", requestError)
+ }
+
+ defer response.Body.Close()
+
+ if response.StatusCode >= 200 && response.StatusCode < 300 {
+ return nil
+ }
+
+ return fmt.Errorf("webhook returned status %d", response.StatusCode)
+}
+
+func (webhookDispatcher *Dispatcher) resetConsecutiveFailures(
+ updateContext context.Context,
+ userIdentifier string,
+) {
+ _, updateError := webhookDispatcher.databaseConnectionPool.Exec(
+ updateContext,
+ "UPDATE user_profiles SET webhook_consecutive_failures = 0 WHERE id = $1",
+ userIdentifier,
+ )
+
+ if updateError != nil {
+ webhookDispatcher.logger.Error(
+ "failed to reset webhook consecutive failures",
+ "user_identifier", userIdentifier,
+ "error", updateError,
+ )
+ }
+}
+
+func (webhookDispatcher *Dispatcher) incrementConsecutiveFailures(
+ updateContext context.Context,
+ userIdentifier string,
+) {
+ maximumConsecutiveFailures := 10
+
+ _, updateError := webhookDispatcher.databaseConnectionPool.Exec(
+ updateContext,
+ `UPDATE user_profiles
+ SET webhook_consecutive_failures = webhook_consecutive_failures + 1,
+ webhook_enabled = CASE
+ WHEN webhook_consecutive_failures + 1 >= $2 THEN false
+ ELSE webhook_enabled
+ END
+ WHERE id = $1`,
+ userIdentifier,
+ maximumConsecutiveFailures,
+ )
+
+ if updateError != nil {
+ webhookDispatcher.logger.Error(
+ "failed to increment webhook consecutive failures",
+ "user_identifier", userIdentifier,
+ "error", updateError,
+ )
+ }
+}
diff --git a/services/worker/internal/writer/writer.go b/services/worker/internal/writer/writer.go
new file mode 100644
index 0000000..de81bfa
--- /dev/null
+++ b/services/worker/internal/writer/writer.go
@@ -0,0 +1,222 @@
+package writer
+
+import (
+ "context"
+ "fmt"
+ "github.com/Fuwn/asa-news/internal/model"
+ "github.com/jackc/pgx/v5/pgxpool"
+ "strings"
+ "time"
+)
+
+type Writer struct {
+ databaseConnectionPool *pgxpool.Pool
+}
+
+func NewWriter(databaseConnectionPool *pgxpool.Pool) *Writer {
+ return &Writer{
+ databaseConnectionPool: databaseConnectionPool,
+ }
+}
+
+func (feedWriter *Writer) WriteEntries(writeContext context.Context, feedEntries []model.FeedEntry) (int64, error) {
+ if len(feedEntries) == 0 {
+ return 0, nil
+ }
+
+ valueParameterPlaceholders := make([]string, 0, len(feedEntries))
+ queryArguments := make([]interface{}, 0, len(feedEntries)*15)
+
+ for entryIndex, entry := range feedEntries {
+ parameterOffset := entryIndex * 15
+ valueParameterPlaceholders = append(
+ valueParameterPlaceholders,
+ fmt.Sprintf(
+ "($%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d)",
+ parameterOffset+1, parameterOffset+2, parameterOffset+3,
+ parameterOffset+4, parameterOffset+5, parameterOffset+6,
+ parameterOffset+7, parameterOffset+8, parameterOffset+9,
+ parameterOffset+10, parameterOffset+11, parameterOffset+12,
+ parameterOffset+13, parameterOffset+14, parameterOffset+15,
+ ),
+ )
+ queryArguments = append(
+ queryArguments,
+ entry.FeedIdentifier,
+ entry.OwnerIdentifier,
+ entry.GUID,
+ entry.URL,
+ entry.Title,
+ entry.Author,
+ entry.Summary,
+ entry.ContentHTML,
+ entry.ContentText,
+ entry.ImageURL,
+ entry.PublishedAt,
+ entry.WordCount,
+ entry.EnclosureURL,
+ entry.EnclosureType,
+ entry.EnclosureLength,
+ )
+ }
+
+ isPublicEntry := feedEntries[0].OwnerIdentifier == nil
+
+ var conflictClause string
+
+ if isPublicEntry {
+ conflictClause = "ON CONFLICT (feed_id, guid) WHERE owner_id IS NULL"
+ } else {
+ conflictClause = "ON CONFLICT (feed_id, owner_id, guid) WHERE owner_id IS NOT NULL"
+ }
+
+ upsertQuery := fmt.Sprintf(`
+ INSERT INTO entries (
+ feed_id, owner_id, guid, url, title, author,
+ summary, content_html, content_text, image_url,
+ published_at, word_count,
+ enclosure_url, enclosure_type, enclosure_length
+ )
+ VALUES %s
+ %s
+ DO UPDATE SET
+ url = EXCLUDED.url,
+ title = EXCLUDED.title,
+ author = EXCLUDED.author,
+ summary = CASE
+ WHEN EXISTS (SELECT 1 FROM user_highlights WHERE entry_id = entries.id)
+ THEN entries.summary
+ ELSE EXCLUDED.summary
+ END,
+ content_html = CASE
+ WHEN EXISTS (SELECT 1 FROM user_highlights WHERE entry_id = entries.id)
+ THEN entries.content_html
+ ELSE EXCLUDED.content_html
+ END,
+ content_text = CASE
+ WHEN EXISTS (SELECT 1 FROM user_highlights WHERE entry_id = entries.id)
+ THEN entries.content_text
+ ELSE EXCLUDED.content_text
+ END,
+ image_url = EXCLUDED.image_url,
+ published_at = EXCLUDED.published_at,
+ word_count = EXCLUDED.word_count,
+ enclosure_url = EXCLUDED.enclosure_url,
+ enclosure_type = EXCLUDED.enclosure_type,
+ enclosure_length = EXCLUDED.enclosure_length
+ WHERE
+ entries.title IS DISTINCT FROM EXCLUDED.title
+ OR entries.content_html IS DISTINCT FROM EXCLUDED.content_html
+ OR entries.enclosure_url IS DISTINCT FROM EXCLUDED.enclosure_url
+ `, strings.Join(valueParameterPlaceholders, ", "), conflictClause)
+ commandTag, executeError := feedWriter.databaseConnectionPool.Exec(writeContext, upsertQuery, queryArguments...)
+
+ if executeError != nil {
+ return 0, fmt.Errorf("failed to upsert feed entries: %w", executeError)
+ }
+
+ return commandTag.RowsAffected(), nil
+}
+
+func (feedWriter *Writer) UpdateFeedMetadata(
+ updateContext context.Context,
+ feedIdentifier string,
+ entityTag string,
+ lastModified string,
+ feedTitle string,
+ siteURL string,
+) error {
+ currentTime := time.Now().UTC()
+
+ var titleParameter *string
+ if feedTitle != "" {
+ titleParameter = &feedTitle
+ }
+
+ var siteURLParameter *string
+ if siteURL != "" {
+ siteURLParameter = &siteURL
+ }
+
+ updateQuery := `
+ UPDATE feeds
+ SET
+ last_fetched_at = $1::timestamptz,
+ etag = $2,
+ last_modified = $3,
+ consecutive_failures = 0,
+ last_fetch_error = NULL,
+ last_fetch_error_at = NULL,
+ next_fetch_at = $1::timestamptz + (fetch_interval_seconds * INTERVAL '1 second'),
+ title = COALESCE($5, title),
+ site_url = COALESCE($6, site_url)
+ WHERE id = $4
+ `
+ _, executeError := feedWriter.databaseConnectionPool.Exec(
+ updateContext,
+ updateQuery,
+ currentTime,
+ entityTag,
+ lastModified,
+ feedIdentifier,
+ titleParameter,
+ siteURLParameter,
+ )
+
+ if executeError != nil {
+ return fmt.Errorf("failed to update feed metadata: %w", executeError)
+ }
+
+ return nil
+}
+
+func (feedWriter *Writer) UpdateFeedType(
+ updateContext context.Context,
+ feedIdentifier string,
+ feedType string,
+) error {
+ updateQuery := `UPDATE feeds SET feed_type = $1 WHERE id = $2`
+ _, executeError := feedWriter.databaseConnectionPool.Exec(
+ updateContext,
+ updateQuery,
+ feedType,
+ feedIdentifier,
+ )
+
+ if executeError != nil {
+ return fmt.Errorf("failed to update feed type: %w", executeError)
+ }
+
+ return nil
+}
+
+func (feedWriter *Writer) RecordFeedError(
+ updateContext context.Context,
+ feedIdentifier string,
+ errorMessage string,
+) error {
+ currentTime := time.Now().UTC()
+ updateQuery := `
+ UPDATE feeds
+ SET
+ last_fetched_at = $1::timestamptz,
+ last_fetch_error = $2,
+ last_fetch_error_at = $1::timestamptz,
+ consecutive_failures = consecutive_failures + 1,
+ next_fetch_at = $1::timestamptz + (LEAST(POWER(2, consecutive_failures) * 60, 21600) * INTERVAL '1 second')
+ WHERE id = $3
+ `
+ _, executeError := feedWriter.databaseConnectionPool.Exec(
+ updateContext,
+ updateQuery,
+ currentTime,
+ errorMessage,
+ feedIdentifier,
+ )
+
+ if executeError != nil {
+ return fmt.Errorf("failed to record feed error: %w", executeError)
+ }
+
+ return nil
+}