summaryrefslogtreecommitdiff
path: root/services
diff options
context:
space:
mode:
authorFuwn <[email protected]>2026-02-07 01:42:57 -0800
committerFuwn <[email protected]>2026-02-07 01:42:57 -0800
commit5c5b1993edd890a80870ee05607ac5f088191d4e (patch)
treea721b76bcd49ba10826c53efc87302c7a689512f /services
downloadasa.news-5c5b1993edd890a80870ee05607ac5f088191d4e.tar.xz
asa.news-5c5b1993edd890a80870ee05607ac5f088191d4e.zip
feat: asa.news RSS reader with developer tier, REST API, and webhooks
Full-stack RSS reader SaaS: Supabase + Next.js + Go worker. Includes three subscription tiers (free/pro/developer), API key auth, read-only REST API, webhook push notifications, Stripe billing with proration, and PWA support.
Diffstat (limited to 'services')
-rw-r--r--services/worker/Dockerfile18
-rw-r--r--services/worker/Taskfile.yaml57
-rw-r--r--services/worker/cmd/worker/main.go143
-rw-r--r--services/worker/go.mod81
-rw-r--r--services/worker/go.sum226
-rw-r--r--services/worker/internal/configuration/configuration.go110
-rw-r--r--services/worker/internal/database/database.go39
-rw-r--r--services/worker/internal/fetcher/authentication.go43
-rw-r--r--services/worker/internal/fetcher/errors.go145
-rw-r--r--services/worker/internal/fetcher/fetcher.go116
-rw-r--r--services/worker/internal/fetcher/ssrf_protection.go77
-rw-r--r--services/worker/internal/health/health.go117
-rw-r--r--services/worker/internal/model/feed.go41
-rw-r--r--services/worker/internal/model/queue.go5
-rw-r--r--services/worker/internal/parser/parser.go234
-rw-r--r--services/worker/internal/pool/pool.go60
-rw-r--r--services/worker/internal/scheduler/refresh.go196
-rw-r--r--services/worker/internal/scheduler/scheduler.go283
-rw-r--r--services/worker/internal/webhook/webhook.go333
-rw-r--r--services/worker/internal/writer/writer.go222
20 files changed, 2546 insertions, 0 deletions
diff --git a/services/worker/Dockerfile b/services/worker/Dockerfile
new file mode 100644
index 0000000..59678ff
--- /dev/null
+++ b/services/worker/Dockerfile
@@ -0,0 +1,18 @@
+FROM golang:1.23-bookworm AS builder
+
+WORKDIR /build
+
+COPY go.mod go.sum ./
+RUN go mod download
+
+COPY . .
+
+RUN CGO_ENABLED=0 GOOS=linux go build -ldflags="-s -w" -o /build/worker ./cmd/worker
+
+FROM gcr.io/distroless/static:nonroot
+
+COPY --from=builder /build/worker /worker
+
+USER nonroot:nonroot
+
+ENTRYPOINT ["/worker"]
diff --git a/services/worker/Taskfile.yaml b/services/worker/Taskfile.yaml
new file mode 100644
index 0000000..ea32aab
--- /dev/null
+++ b/services/worker/Taskfile.yaml
@@ -0,0 +1,57 @@
+version: "3"
+
+vars:
+ BINARY: worker
+
+tasks:
+ default:
+ desc: Build the application
+ cmds:
+ - task: build
+
+ build:
+ desc: Build the binary with optimisations
+ cmds:
+ - go build -ldflags="-s -w" -o {{.BINARY}} ./cmd/worker
+ sources:
+ - ./**/*.go
+ generates:
+ - ./{{.BINARY}}
+
+ run:
+ desc: Build and run the application
+ deps: [build]
+ cmds:
+ - ./{{.BINARY}}
+
+ clean:
+ desc: Remove build artifacts
+ cmds:
+ - rm -f {{.BINARY}}
+ - go clean
+
+ test:
+ desc: Run tests
+ cmds:
+ - go test ./...
+
+ fmt:
+ desc: Format code
+ cmds:
+ - iku -w . || go fmt ./...
+
+ lint:
+ desc: Run linter
+ cmds:
+ - golangci-lint run
+
+ dev:
+ desc: Build and run in development mode
+ cmds:
+ - go run ./cmd/worker
+
+ tidy:
+ desc: Tidy and verify module dependencies
+ cmds:
+ - go mod tidy
+ - go mod verify
diff --git a/services/worker/cmd/worker/main.go b/services/worker/cmd/worker/main.go
new file mode 100644
index 0000000..9732ec6
--- /dev/null
+++ b/services/worker/cmd/worker/main.go
@@ -0,0 +1,143 @@
+package main
+
+import (
+ "context"
+ "fmt"
+ "github.com/Fuwn/asa-news/internal/configuration"
+ "github.com/Fuwn/asa-news/internal/database"
+ "github.com/Fuwn/asa-news/internal/fetcher"
+ "github.com/Fuwn/asa-news/internal/health"
+ "github.com/Fuwn/asa-news/internal/parser"
+ "github.com/Fuwn/asa-news/internal/pool"
+ "github.com/Fuwn/asa-news/internal/scheduler"
+ "github.com/Fuwn/asa-news/internal/webhook"
+ "github.com/Fuwn/asa-news/internal/writer"
+ "log/slog"
+ "os"
+ "os/signal"
+ "syscall"
+ "time"
+)
+
+func main() {
+ exitCode := run()
+
+ os.Exit(exitCode)
+}
+
+func run() int {
+ applicationConfiguration, configurationError := configuration.Load()
+
+ if configurationError != nil {
+ fmt.Fprintf(os.Stderr, "failed to load configuration: %v\n", configurationError)
+
+ return 1
+ }
+
+ logger := createLogger(applicationConfiguration)
+
+ logger.Info("starting feed worker service")
+
+ applicationContext, cancelApplication := signal.NotifyContext(
+ context.Background(),
+ syscall.SIGINT,
+ syscall.SIGTERM,
+ )
+
+ defer cancelApplication()
+
+ databaseConnectionPool, databaseError := database.CreateConnectionPool(
+ applicationContext,
+ applicationConfiguration.DatabaseURL,
+ )
+
+ if databaseError != nil {
+ logger.Error("failed to create database connection pool", "error", databaseError)
+
+ return 1
+ }
+
+ defer databaseConnectionPool.Close()
+
+ feedFetcher := fetcher.NewFetcher(applicationConfiguration.FetchTimeout)
+ feedParser := parser.NewParser()
+ feedWriter := writer.NewWriter(databaseConnectionPool)
+ webhookDispatcher := webhook.NewDispatcher(databaseConnectionPool, logger)
+ workerPool := pool.NewWorkerPool(applicationConfiguration.WorkerConcurrency, logger)
+ healthServer := health.NewHealthServer(
+ applicationConfiguration.HealthPort,
+ databaseConnectionPool,
+ logger,
+ )
+
+ healthServer.Start()
+
+ feedScheduler := scheduler.NewScheduler(
+ databaseConnectionPool,
+ feedFetcher,
+ feedParser,
+ feedWriter,
+ webhookDispatcher,
+ workerPool,
+ applicationConfiguration.PollInterval,
+ applicationConfiguration.QueuePollInterval,
+ applicationConfiguration.BatchSize,
+ logger,
+ )
+
+ logger.Info("feed worker service started",
+ "worker_concurrency", applicationConfiguration.WorkerConcurrency,
+ "poll_interval", applicationConfiguration.PollInterval,
+ "health_port", applicationConfiguration.HealthPort,
+ )
+ feedScheduler.Run(applicationContext)
+ logger.Info("shutting down feed worker service")
+ logger.Info("waiting for in-flight work to complete")
+ workerPool.Wait()
+
+ shutdownContext, cancelShutdown := context.WithTimeout(context.Background(), 10*time.Second)
+
+ defer cancelShutdown()
+
+ healthShutdownError := healthServer.Stop(shutdownContext)
+
+ if healthShutdownError != nil {
+ logger.Error("health server shutdown error", "error", healthShutdownError)
+ }
+
+ logger.Info("feed worker service stopped")
+
+ return 0
+}
+
+func createLogger(applicationConfiguration configuration.Configuration) *slog.Logger {
+ logLevel := resolveLogLevel(applicationConfiguration.LogLevel)
+ handlerOptions := &slog.HandlerOptions{
+ Level: logLevel,
+ }
+
+ var logHandler slog.Handler
+
+ if applicationConfiguration.LogJSON {
+ logHandler = slog.NewJSONHandler(os.Stdout, handlerOptions)
+ } else {
+ logHandler = slog.NewTextHandler(os.Stdout, handlerOptions)
+ }
+
+ return slog.New(logHandler)
+}
+
+func resolveLogLevel(levelString string) slog.Level {
+ switch levelString {
+ case "debug":
+ return slog.LevelDebug
+ case "info":
+ return slog.LevelInfo
+ case "warn":
+ return slog.LevelWarn
+ case "error":
+ return slog.LevelError
+ default:
+ return slog.LevelInfo
+ }
+}
diff --git a/services/worker/go.mod b/services/worker/go.mod
new file mode 100644
index 0000000..2588959
--- /dev/null
+++ b/services/worker/go.mod
@@ -0,0 +1,81 @@
+module github.com/Fuwn/asa-news
+
+go 1.24.0
+
+toolchain go1.25.6
+
+require (
+ github.com/craigpastro/pgmq-go v0.6.0
+ github.com/jackc/pgx/v5 v5.7.2
+ github.com/mmcdole/gofeed v1.3.0
+)
+
+require (
+ dario.cat/mergo v1.0.1 // indirect
+ github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 // indirect
+ github.com/Microsoft/go-winio v0.6.2 // indirect
+ github.com/PuerkitoBio/goquery v1.8.0 // indirect
+ github.com/andybalholm/cascadia v1.3.1 // indirect
+ github.com/avast/retry-go/v4 v4.6.0 // indirect
+ github.com/cenkalti/backoff/v4 v4.3.0 // indirect
+ github.com/cespare/xxhash/v2 v2.3.0 // indirect
+ github.com/containerd/log v0.1.0 // indirect
+ github.com/containerd/platforms v0.2.1 // indirect
+ github.com/cpuguy83/dockercfg v0.3.2 // indirect
+ github.com/davecgh/go-spew v1.1.1 // indirect
+ github.com/distribution/reference v0.6.0 // indirect
+ github.com/docker/docker v27.3.1+incompatible // indirect
+ github.com/docker/go-connections v0.5.0 // indirect
+ github.com/docker/go-units v0.5.0 // indirect
+ github.com/felixge/httpsnoop v1.0.4 // indirect
+ github.com/go-logr/logr v1.4.3 // indirect
+ github.com/go-logr/stdr v1.2.2 // indirect
+ github.com/go-ole/go-ole v1.3.0 // indirect
+ github.com/gogo/protobuf v1.3.2 // indirect
+ github.com/google/uuid v1.6.0 // indirect
+ github.com/jackc/pgpassfile v1.0.0 // indirect
+ github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
+ github.com/jackc/puddle/v2 v2.2.2 // indirect
+ github.com/json-iterator/go v1.1.12 // indirect
+ github.com/klauspost/compress v1.17.11 // indirect
+ github.com/lufia/plan9stats v0.0.0-20240909124753-873cd0166683 // indirect
+ github.com/magiconair/properties v1.8.7 // indirect
+ github.com/mmcdole/goxpp v1.1.1-0.20240225020742-a0c311522b23 // indirect
+ github.com/moby/docker-image-spec v1.3.1 // indirect
+ github.com/moby/patternmatcher v0.6.0 // indirect
+ github.com/moby/sys/sequential v0.6.0 // indirect
+ github.com/moby/sys/user v0.3.0 // indirect
+ github.com/moby/sys/userns v0.1.0 // indirect
+ github.com/moby/term v0.5.0 // indirect
+ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
+ github.com/modern-go/reflect2 v1.0.2 // indirect
+ github.com/morikuni/aec v1.0.0 // indirect
+ github.com/opencontainers/go-digest v1.0.0 // indirect
+ github.com/opencontainers/image-spec v1.1.0 // indirect
+ github.com/pkg/errors v0.9.1 // indirect
+ github.com/pmezard/go-difflib v1.0.0 // indirect
+ github.com/power-devops/perfstat v0.0.0-20240221224432-82ca36839d55 // indirect
+ github.com/shirou/gopsutil/v3 v3.24.5 // indirect
+ github.com/shoenig/go-m1cpu v0.1.6 // indirect
+ github.com/sirupsen/logrus v1.9.3 // indirect
+ github.com/stretchr/testify v1.11.1 // indirect
+ github.com/testcontainers/testcontainers-go v0.35.0 // indirect
+ github.com/tklauser/go-sysconf v0.3.14 // indirect
+ github.com/tklauser/numcpus v0.9.0 // indirect
+ github.com/yusufpapurcu/wmi v1.2.4 // indirect
+ go.opentelemetry.io/auto/sdk v1.2.1 // indirect
+ go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.56.0 // indirect
+ go.opentelemetry.io/otel v1.40.0 // indirect
+ go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.40.0 // indirect
+ go.opentelemetry.io/otel/metric v1.40.0 // indirect
+ go.opentelemetry.io/otel/sdk v1.40.0 // indirect
+ go.opentelemetry.io/otel/trace v1.40.0 // indirect
+ go.opentelemetry.io/proto/otlp v1.9.0 // indirect
+ golang.org/x/crypto v0.41.0 // indirect
+ golang.org/x/net v0.43.0 // indirect
+ golang.org/x/sync v0.16.0 // indirect
+ golang.org/x/sys v0.40.0 // indirect
+ golang.org/x/text v0.28.0 // indirect
+ google.golang.org/protobuf v1.36.11 // indirect
+ gopkg.in/yaml.v3 v3.0.1 // indirect
+)
diff --git a/services/worker/go.sum b/services/worker/go.sum
new file mode 100644
index 0000000..8a61ca7
--- /dev/null
+++ b/services/worker/go.sum
@@ -0,0 +1,226 @@
+dario.cat/mergo v1.0.1 h1:Ra4+bf83h2ztPIQYNP99R6m+Y7KfnARDfID+a+vLl4s=
+dario.cat/mergo v1.0.1/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk=
+github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24 h1:bvDV9vkmnHYOMsOr4WLk+Vo07yKIzd94sVoIqshQ4bU=
+github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24/go.mod h1:8o94RPi1/7XTJvwPpRSzSUedZrtlirdB3r9Z20bi2f8=
+github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 h1:L/gRVlceqvL25UVaW/CKtUDjefjrs0SPonmDGUVOYP0=
+github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E=
+github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY=
+github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU=
+github.com/PuerkitoBio/goquery v1.8.0 h1:PJTF7AmFCFKk1N6V6jmKfrNH9tV5pNE6lZMkG0gta/U=
+github.com/PuerkitoBio/goquery v1.8.0/go.mod h1:ypIiRMtY7COPGk+I/YbZLbxsxn9g5ejnI2HSMtkjZvI=
+github.com/andybalholm/cascadia v1.3.1 h1:nhxRkql1kdYCc8Snf7D5/D3spOX+dBgjA6u8x004T2c=
+github.com/andybalholm/cascadia v1.3.1/go.mod h1:R4bJ1UQfqADjvDa4P6HZHLh/3OxWWEqc0Sk8XGwHqvA=
+github.com/avast/retry-go/v4 v4.6.0 h1:K9xNA+KeB8HHc2aWFuLb25Offp+0iVRXEvFx8IinRJA=
+github.com/avast/retry-go/v4 v4.6.0/go.mod h1:gvWlPhBVsvBbLkVGDg/KwvBv0bEkCOLRRSHKIr2PyOE=
+github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
+github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
+github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
+github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
+github.com/containerd/log v0.1.0 h1:TCJt7ioM2cr/tfR8GPbGf9/VRAX8D2B4PjzCpfX540I=
+github.com/containerd/log v0.1.0/go.mod h1:VRRf09a7mHDIRezVKTRCrOq78v577GXq3bSa3EhrzVo=
+github.com/containerd/platforms v0.2.1 h1:zvwtM3rz2YHPQsF2CHYM8+KtB5dvhISiXh5ZpSBQv6A=
+github.com/containerd/platforms v0.2.1/go.mod h1:XHCb+2/hzowdiut9rkudds9bE5yJ7npe7dG/wG+uFPw=
+github.com/cpuguy83/dockercfg v0.3.2 h1:DlJTyZGBDlXqUZ2Dk2Q3xHs/FtnooJJVaad2S9GKorA=
+github.com/cpuguy83/dockercfg v0.3.2/go.mod h1:sugsbF4//dDlL/i+S+rtpIWp+5h0BHJHfjj5/jFyUJc=
+github.com/craigpastro/pgmq-go v0.6.0 h1:6hrQngiiB6SkpXJNR88dX4bqrZBfCLiuWPQzKoy50RI=
+github.com/craigpastro/pgmq-go v0.6.0/go.mod h1:cDDC2UWrRXP9l0e1dKlKnW3hHftNDobMlCgbw+V56og=
+github.com/creack/pty v1.1.18 h1:n56/Zwd5o6whRC5PMGretI4IdRLlmBXYNjScPaBgsbY=
+github.com/creack/pty v1.1.18/go.mod h1:MOBLtS5ELjhRRrroQr9kyvTxUAFNvYEK993ew/Vr4O4=
+github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
+github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/distribution/reference v0.6.0 h1:0IXCQ5g4/QMHHkarYzh5l+u8T3t73zM5QvfrDyIgxBk=
+github.com/distribution/reference v0.6.0/go.mod h1:BbU0aIcezP1/5jX/8MP0YiH4SdvB5Y4f/wlDRiLyi3E=
+github.com/docker/docker v27.3.1+incompatible h1:KttF0XoteNTicmUtBO0L2tP+J7FGRFTjaEF4k6WdhfI=
+github.com/docker/docker v27.3.1+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk=
+github.com/docker/go-connections v0.5.0 h1:USnMq7hx7gwdVZq1L49hLXaFtUdTADjXGp+uj1Br63c=
+github.com/docker/go-connections v0.5.0/go.mod h1:ov60Kzw0kKElRwhNs9UlUHAE/F9Fe6GLaXnqyDdmEXc=
+github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4=
+github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
+github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg=
+github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
+github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
+github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI=
+github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
+github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
+github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
+github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
+github.com/go-ole/go-ole v1.3.0 h1:Dt6ye7+vXGIKZ7Xtk4s6/xVdGDQynvom7xCFEdWr6uE=
+github.com/go-ole/go-ole v1.3.0/go.mod h1:5LS6F96DhAwUc7C+1HLexzMXY1xGRSryjyPPKW6zv78=
+github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
+github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
+github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
+github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
+github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
+github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
+github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
+github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
+github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.2 h1:8Tjv8EJ+pM1xP8mK6egEbD1OgnVTyacbefKhmbLhIhU=
+github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.2/go.mod h1:pkJQ2tZHJ0aFOVEEot6oZmaVEZcRme73eIFmhiVuRWs=
+github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
+github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
+github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo=
+github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
+github.com/jackc/pgx/v5 v5.7.2 h1:mLoDLV6sonKlvjIEsV56SkWNCnuNv531l94GaIzO+XI=
+github.com/jackc/pgx/v5 v5.7.2/go.mod h1:ncY89UGWxg82EykZUwSpUKEfccBGGYq1xjrOpsbsfGQ=
+github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo=
+github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
+github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
+github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
+github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
+github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
+github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc=
+github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0=
+github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
+github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
+github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
+github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
+github.com/lufia/plan9stats v0.0.0-20240909124753-873cd0166683 h1:7UMa6KCCMjZEMDtTVdcGu0B1GmmC7QJKiCCjyTAWQy0=
+github.com/lufia/plan9stats v0.0.0-20240909124753-873cd0166683/go.mod h1:ilwx/Dta8jXAgpFYFvSWEMwxmbWXyiUHkd5FwyKhb5k=
+github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY=
+github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0=
+github.com/mmcdole/gofeed v1.3.0 h1:5yn+HeqlcvjMeAI4gu6T+crm7d0anY85+M+v6fIFNG4=
+github.com/mmcdole/gofeed v1.3.0/go.mod h1:9TGv2LcJhdXePDzxiuMnukhV2/zb6VtnZt1mS+SjkLE=
+github.com/mmcdole/goxpp v1.1.1-0.20240225020742-a0c311522b23 h1:Zr92CAlFhy2gL+V1F+EyIuzbQNbSgP4xhTODZtrXUtk=
+github.com/mmcdole/goxpp v1.1.1-0.20240225020742-a0c311522b23/go.mod h1:v+25+lT2ViuQ7mVxcncQ8ch1URund48oH+jhjiwEgS8=
+github.com/moby/docker-image-spec v1.3.1 h1:jMKff3w6PgbfSa69GfNg+zN/XLhfXJGnEx3Nl2EsFP0=
+github.com/moby/docker-image-spec v1.3.1/go.mod h1:eKmb5VW8vQEh/BAr2yvVNvuiJuY6UIocYsFu/DxxRpo=
+github.com/moby/patternmatcher v0.6.0 h1:GmP9lR19aU5GqSSFko+5pRqHi+Ohk1O69aFiKkVGiPk=
+github.com/moby/patternmatcher v0.6.0/go.mod h1:hDPoyOpDY7OrrMDLaYoY3hf52gNCR/YOUYxkhApJIxc=
+github.com/moby/sys/sequential v0.6.0 h1:qrx7XFUd/5DxtqcoH1h438hF5TmOvzC/lspjy7zgvCU=
+github.com/moby/sys/sequential v0.6.0/go.mod h1:uyv8EUTrca5PnDsdMGXhZe6CCe8U/UiTWd+lL+7b/Ko=
+github.com/moby/sys/user v0.3.0 h1:9ni5DlcW5an3SvRSx4MouotOygvzaXbaSrc/wGDFWPo=
+github.com/moby/sys/user v0.3.0/go.mod h1:bG+tYYYJgaMtRKgEmuueC0hJEAZWwtIbZTB+85uoHjs=
+github.com/moby/sys/userns v0.1.0 h1:tVLXkFOxVu9A64/yh59slHVv9ahO9UIev4JZusOLG/g=
+github.com/moby/sys/userns v0.1.0/go.mod h1:IHUYgu/kao6N8YZlp9Cf444ySSvCmDlmzUcYfDHOl28=
+github.com/moby/term v0.5.0 h1:xt8Q1nalod/v7BqbG21f8mQPqH+xAaC9C3N3wfWbVP0=
+github.com/moby/term v0.5.0/go.mod h1:8FzsFHVUBGZdbDsJw/ot+X+d5HLUbvklYLJ9uGfcI3Y=
+github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
+github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
+github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
+github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
+github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
+github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A=
+github.com/morikuni/aec v1.0.0/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7PXmsc=
+github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U=
+github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM=
+github.com/opencontainers/image-spec v1.1.0 h1:8SG7/vwALn54lVB/0yZ/MMwhFrPYtpEHQb2IpWsCzug=
+github.com/opencontainers/image-spec v1.1.0/go.mod h1:W4s4sFTMaBeK1BQLXbG4AdM2szdn85PY75RI83NrTrM=
+github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
+github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
+github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
+github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/power-devops/perfstat v0.0.0-20240221224432-82ca36839d55 h1:o4JXh1EVt9k/+g42oCprj/FisM4qX9L3sZB3upGN2ZU=
+github.com/power-devops/perfstat v0.0.0-20240221224432-82ca36839d55/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE=
+github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ=
+github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc=
+github.com/shirou/gopsutil/v3 v3.24.5 h1:i0t8kL+kQTvpAYToeuiVk3TgDeKOFioZO3Ztz/iZ9pI=
+github.com/shirou/gopsutil/v3 v3.24.5/go.mod h1:bsoOS1aStSs9ErQ1WWfxllSeS1K5D+U30r2NfcubMVk=
+github.com/shoenig/go-m1cpu v0.1.6 h1:nxdKQNcEB6vzgA2E2bvzKIYRuNj7XNJ4S/aRSwKzFtM=
+github.com/shoenig/go-m1cpu v0.1.6/go.mod h1:1JJMcUBvfNwpq05QDQVAnx3gUHr9IYF7GNg9SUEw2VQ=
+github.com/shoenig/test v0.6.4 h1:kVTaSd7WLz5WZ2IaoM0RSzRsUD+m8wRR+5qvntpn4LU=
+github.com/shoenig/test v0.6.4/go.mod h1:byHiCGXqrVaflBLAMq/srcZIHynQPQgeyvkvXnjqq0k=
+github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
+github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
+github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY=
+github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
+github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
+github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
+github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
+github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
+github.com/testcontainers/testcontainers-go v0.35.0 h1:uADsZpTKFAtp8SLK+hMwSaa+X+JiERHtd4sQAFmXeMo=
+github.com/testcontainers/testcontainers-go v0.35.0/go.mod h1:oEVBj5zrfJTrgjwONs1SsRbnBtH9OKl+IGl3UMcr2B4=
+github.com/tklauser/go-sysconf v0.3.14 h1:g5vzr9iPFFz24v2KZXs/pvpvh8/V9Fw6vQK5ZZb78yU=
+github.com/tklauser/go-sysconf v0.3.14/go.mod h1:1ym4lWMLUOhuBOPGtRcJm7tEGX4SCYNEEEtghGG/8uY=
+github.com/tklauser/numcpus v0.9.0 h1:lmyCHtANi8aRUgkckBgoDk1nHCux3n2cgkJLXdQGPDo=
+github.com/tklauser/numcpus v0.9.0/go.mod h1:SN6Nq1O3VychhC1npsWostA+oW+VOQTxZrS604NSRyI=
+github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
+github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
+github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0=
+github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
+go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64=
+go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y=
+go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.56.0 h1:UP6IpuHFkUgOQL9FFQFrZ+5LiwhhYRbi7VZSIx6Nj5s=
+go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.56.0/go.mod h1:qxuZLtbq5QDtdeSHsS7bcf6EH6uO6jUAgk764zd3rhM=
+go.opentelemetry.io/otel v1.40.0 h1:oA5YeOcpRTXq6NN7frwmwFR0Cn3RhTVZvXsP4duvCms=
+go.opentelemetry.io/otel v1.40.0/go.mod h1:IMb+uXZUKkMXdPddhwAHm6UfOwJyh4ct1ybIlV14J0g=
+go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.40.0 h1:QKdN8ly8zEMrByybbQgv8cWBcdAarwmIPZ6FThrWXJs=
+go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.40.0/go.mod h1:bTdK1nhqF76qiPoCCdyFIV+N/sRHYXYCTQc+3VCi3MI=
+go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.19.0 h1:IeMeyr1aBvBiPVYihXIaeIZba6b8E1bYp7lbdxK8CQg=
+go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.19.0/go.mod h1:oVdCUtjq9MK9BlS7TtucsQwUcXcymNiEDjgDD2jMtZU=
+go.opentelemetry.io/otel/metric v1.40.0 h1:rcZe317KPftE2rstWIBitCdVp89A2HqjkxR3c11+p9g=
+go.opentelemetry.io/otel/metric v1.40.0/go.mod h1:ib/crwQH7N3r5kfiBZQbwrTge743UDc7DTFVZrrXnqc=
+go.opentelemetry.io/otel/sdk v1.40.0 h1:KHW/jUzgo6wsPh9At46+h4upjtccTmuZCFAc9OJ71f8=
+go.opentelemetry.io/otel/sdk v1.40.0/go.mod h1:Ph7EFdYvxq72Y8Li9q8KebuYUr2KoeyHx0DRMKrYBUE=
+go.opentelemetry.io/otel/trace v1.40.0 h1:WA4etStDttCSYuhwvEa8OP8I5EWu24lkOzp+ZYblVjw=
+go.opentelemetry.io/otel/trace v1.40.0/go.mod h1:zeAhriXecNGP/s2SEG3+Y8X9ujcJOTqQ5RgdEJcawiA=
+go.opentelemetry.io/proto/otlp v1.9.0 h1:l706jCMITVouPOqEnii2fIAuO3IVGBRPV5ICjceRb/A=
+go.opentelemetry.io/proto/otlp v1.9.0/go.mod h1:xE+Cx5E/eEHw+ISFkwPLwCZefwVjY+pqKg1qcK03+/4=
+go.uber.org/mock v0.5.0 h1:KAMbZvZPyBPWgD14IrIQ38QCyjwpvVVV6K/bHl1IwQU=
+go.uber.org/mock v0.5.0/go.mod h1:ge71pBPLYDk7QIi1LupWxdAykm7KIEFchiOqd6z7qMM=
+golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
+golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
+golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
+golang.org/x/crypto v0.41.0 h1:WKYxWedPGCTVVl5+WHSSrOBT0O8lx32+zxmHxijgXp4=
+golang.org/x/crypto v0.41.0/go.mod h1:pO5AFd7FA68rFak7rOAGVuygIISepHftHnr8dr6+sUc=
+golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
+golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
+golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
+golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
+golang.org/x/net v0.0.0-20210916014120-12bc252f5db8/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
+golang.org/x/net v0.43.0 h1:lat02VYK2j4aLzMzecihNvTlJNQUq316m2Mr9rnM6YE=
+golang.org/x/net v0.43.0/go.mod h1:vhO1fvI4dGsIjh73sWfUVjj3N7CA9WkKJNQm2svM6Jg=
+golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.16.0 h1:ycBJEhp9p4vXvUZNszeOq0kGTPghopOL8q0fq3vstxw=
+golang.org/x/sync v0.16.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
+golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20201204225414-ed752295db88/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.40.0 h1:DBZZqJ2Rkml6QMQsZywtnjnnGvHza6BTfYFWY9kjEWQ=
+golang.org/x/sys v0.40.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
+golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
+golang.org/x/term v0.34.0 h1:O/2T7POpk0ZZ7MAzMeWFSg6S5IpWd/RXDlM9hgM3DR4=
+golang.org/x/term v0.34.0/go.mod h1:5jC53AEywhIVebHgPVeg0mj8OD3VO9OzclacVrqpaAw=
+golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
+golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
+golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
+golang.org/x/text v0.28.0 h1:rhazDwis8INMIwQ4tpjLDzUhx6RlXqZNPEM0huQojng=
+golang.org/x/text v0.28.0/go.mod h1:U8nCwOR8jO/marOQ0QbDiOngZVEBB7MAiitBuMjXiNU=
+golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 h1:vVKdlvoWBphwdxWKrFZEuM0kGgGLxUOYcY4U/2Vjg44=
+golang.org/x/time v0.0.0-20220210224613-90d013bbcef8/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
+golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
+golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
+golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
+golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
+golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+google.golang.org/genproto/googleapis/api v0.0.0-20250825161204-c5933d9347a5 h1:BIRfGDEjiHRrk0QKZe3Xv2ieMhtgRGeLcZQ0mIVn4EY=
+google.golang.org/genproto/googleapis/api v0.0.0-20250825161204-c5933d9347a5/go.mod h1:j3QtIyytwqGr1JUDtYXwtMXWPKsEa5LtzIFN1Wn5WvE=
+google.golang.org/genproto/googleapis/rpc v0.0.0-20250825161204-c5933d9347a5 h1:eaY8u2EuxbRv7c3NiGK0/NedzVsCcV6hDuU5qPX5EGE=
+google.golang.org/genproto/googleapis/rpc v0.0.0-20250825161204-c5933d9347a5/go.mod h1:M4/wBTSeyLxupu3W3tJtOgB14jILAS/XWPSSa3TAlJc=
+google.golang.org/grpc v1.75.1 h1:/ODCNEuf9VghjgO3rqLcfg8fiOP0nSluljWFlDxELLI=
+google.golang.org/grpc v1.75.1/go.mod h1:JtPAzKiq4v1xcAB2hydNlWI2RnF85XXcV0mhKXr2ecQ=
+google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE=
+google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
+gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
+gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
+gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
+gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
+gotest.tools/v3 v3.5.1 h1:EENdUnS3pdur5nybKYIh2Vfgc8IUNBjxDPSjtiJcOzU=
+gotest.tools/v3 v3.5.1/go.mod h1:isy3WKz7GK6uNw/sbHzfKBLvlvXwUyV06n6brMxxopU=
diff --git a/services/worker/internal/configuration/configuration.go b/services/worker/internal/configuration/configuration.go
new file mode 100644
index 0000000..84a5995
--- /dev/null
+++ b/services/worker/internal/configuration/configuration.go
@@ -0,0 +1,110 @@
+package configuration
+
+import (
+ "fmt"
+ "os"
+ "strconv"
+ "time"
+)
+
+type Configuration struct {
+ DatabaseURL string
+ WorkerConcurrency int
+ PollInterval time.Duration
+ FetchTimeout time.Duration
+ QueuePollInterval time.Duration
+ BatchSize int
+ HealthPort int
+ EncryptionKey string
+ LogLevel string
+ LogJSON bool
+}
+
+func Load() (Configuration, error) {
+ databaseURL := os.Getenv("DATABASE_URL")
+
+ if databaseURL == "" {
+ return Configuration{}, fmt.Errorf("DATABASE_URL is required")
+ }
+
+ workerConcurrency := getEnvironmentInteger("WORKER_CONCURRENCY", 10)
+ pollInterval := getEnvironmentDuration("POLL_INTERVAL", 30*time.Second)
+ fetchTimeout := getEnvironmentDuration("FETCH_TIMEOUT", 30*time.Second)
+ queuePollInterval := getEnvironmentDuration("QUEUE_POLL_INTERVAL", 5*time.Second)
+ batchSize := getEnvironmentInteger("BATCH_SIZE", 50)
+ healthPort := getEnvironmentInteger("HEALTH_PORT", 8080)
+ encryptionKey := os.Getenv("ENCRYPTION_KEY")
+ logLevel := getEnvironmentString("LOG_LEVEL", "info")
+ logJSON := getEnvironmentBoolean("LOG_JSON", false)
+
+ return Configuration{
+ DatabaseURL: databaseURL,
+ WorkerConcurrency: workerConcurrency,
+ PollInterval: pollInterval,
+ FetchTimeout: fetchTimeout,
+ QueuePollInterval: queuePollInterval,
+ BatchSize: batchSize,
+ HealthPort: healthPort,
+ EncryptionKey: encryptionKey,
+ LogLevel: logLevel,
+ LogJSON: logJSON,
+ }, nil
+}
+
+func getEnvironmentString(key string, defaultValue string) string {
+ value := os.Getenv(key)
+
+ if value == "" {
+ return defaultValue
+ }
+
+ return value
+}
+
+func getEnvironmentInteger(key string, defaultValue int) int {
+ raw := os.Getenv(key)
+
+ if raw == "" {
+ return defaultValue
+ }
+
+ parsed, parseError := strconv.Atoi(raw)
+
+ if parseError != nil {
+ return defaultValue
+ }
+
+ return parsed
+}
+
+func getEnvironmentDuration(key string, defaultValue time.Duration) time.Duration {
+ raw := os.Getenv(key)
+
+ if raw == "" {
+ return defaultValue
+ }
+
+ parsed, parseError := time.ParseDuration(raw)
+
+ if parseError != nil {
+ return defaultValue
+ }
+
+ return parsed
+}
+
+func getEnvironmentBoolean(key string, defaultValue bool) bool {
+ raw := os.Getenv(key)
+
+ if raw == "" {
+ return defaultValue
+ }
+
+ parsed, parseError := strconv.ParseBool(raw)
+
+ if parseError != nil {
+ return defaultValue
+ }
+
+ return parsed
+}
diff --git a/services/worker/internal/database/database.go b/services/worker/internal/database/database.go
new file mode 100644
index 0000000..2b2900f
--- /dev/null
+++ b/services/worker/internal/database/database.go
@@ -0,0 +1,39 @@
+package database
+
+import (
+ "context"
+ "fmt"
+ "github.com/jackc/pgx/v5"
+ "github.com/jackc/pgx/v5/pgxpool"
+ "time"
+)
+
+func CreateConnectionPool(parentContext context.Context, databaseURL string) (*pgxpool.Pool, error) {
+ poolConfiguration, parseError := pgxpool.ParseConfig(databaseURL)
+
+ if parseError != nil {
+ return nil, fmt.Errorf("failed to parse database URL: %w", parseError)
+ }
+
+ poolConfiguration.MaxConns = 25
+ poolConfiguration.MinConns = 5
+ poolConfiguration.MaxConnLifetime = 30 * time.Minute
+ poolConfiguration.MaxConnIdleTime = 5 * time.Minute
+ poolConfiguration.HealthCheckPeriod = 30 * time.Second
+ poolConfiguration.ConnConfig.DefaultQueryExecMode = pgx.QueryExecModeExec
+ connectionPool, connectionError := pgxpool.NewWithConfig(parentContext, poolConfiguration)
+
+ if connectionError != nil {
+ return nil, fmt.Errorf("failed to create connection pool: %w", connectionError)
+ }
+
+ pingError := connectionPool.Ping(parentContext)
+
+ if pingError != nil {
+ connectionPool.Close()
+
+ return nil, fmt.Errorf("failed to ping database: %w", pingError)
+ }
+
+ return connectionPool, nil
+}
diff --git a/services/worker/internal/fetcher/authentication.go b/services/worker/internal/fetcher/authentication.go
new file mode 100644
index 0000000..ba10196
--- /dev/null
+++ b/services/worker/internal/fetcher/authentication.go
@@ -0,0 +1,43 @@
+package fetcher
+
+import (
+ "encoding/base64"
+ "fmt"
+ "net/http"
+ "net/url"
+)
+
+type AuthenticationConfiguration struct {
+ AuthenticationType string
+ AuthenticationValue string
+}
+
+func ApplyAuthentication(request *http.Request, authenticationConfig AuthenticationConfiguration) error {
+ switch authenticationConfig.AuthenticationType {
+ case "bearer":
+ request.Header.Set("Authorization", "Bearer "+authenticationConfig.AuthenticationValue)
+ case "basic":
+ encodedCredentials := base64.StdEncoding.EncodeToString([]byte(authenticationConfig.AuthenticationValue))
+
+ request.Header.Set("Authorization", "Basic "+encodedCredentials)
+ case "query_param":
+ existingURL, parseError := url.Parse(request.URL.String())
+
+ if parseError != nil {
+ return fmt.Errorf("failed to parse request URL for query param authentication: %w", parseError)
+ }
+
+ queryParameters := existingURL.Query()
+
+ queryParameters.Set("key", authenticationConfig.AuthenticationValue)
+
+ existingURL.RawQuery = queryParameters.Encode()
+ request.URL = existingURL
+ case "":
+ return nil
+ default:
+ return fmt.Errorf("unsupported authentication type: %s", authenticationConfig.AuthenticationType)
+ }
+
+ return nil
+}
diff --git a/services/worker/internal/fetcher/errors.go b/services/worker/internal/fetcher/errors.go
new file mode 100644
index 0000000..abf5553
--- /dev/null
+++ b/services/worker/internal/fetcher/errors.go
@@ -0,0 +1,145 @@
+package fetcher
+
+import (
+ "errors"
+ "fmt"
+ "net"
+ "net/url"
+ "strings"
+)
+
+type FetchError struct {
+ StatusCode int
+ UserMessage string
+ Retryable bool
+ UnderlyingError error
+}
+
+func (fetchError *FetchError) Error() string {
+ if fetchError.UnderlyingError != nil {
+ return fmt.Sprintf("%s: %v", fetchError.UserMessage, fetchError.UnderlyingError)
+ }
+
+ return fetchError.UserMessage
+}
+
+func (fetchError *FetchError) Unwrap() error {
+ return fetchError.UnderlyingError
+}
+
+func ClassifyError(originalError error, statusCode int) *FetchError {
+ if statusCode >= 400 {
+ return classifyHTTPStatusCode(statusCode, originalError)
+ }
+
+ return classifyNetworkError(originalError)
+}
+
+func classifyHTTPStatusCode(statusCode int, originalError error) *FetchError {
+ switch statusCode {
+ case 401:
+ return &FetchError{
+ StatusCode: statusCode,
+ UserMessage: "authentication required - check feed credentials",
+ Retryable: false,
+ UnderlyingError: originalError,
+ }
+ case 403:
+ return &FetchError{
+ StatusCode: statusCode,
+ UserMessage: "access forbidden - feed may require different credentials",
+ Retryable: false,
+ UnderlyingError: originalError,
+ }
+ case 404:
+ return &FetchError{
+ StatusCode: statusCode,
+ UserMessage: "feed not found - URL may be incorrect or feed removed",
+ Retryable: false,
+ UnderlyingError: originalError,
+ }
+ case 410:
+ return &FetchError{
+ StatusCode: statusCode,
+ UserMessage: "feed permanently removed",
+ Retryable: false,
+ UnderlyingError: originalError,
+ }
+ case 429:
+ return &FetchError{
+ StatusCode: statusCode,
+ UserMessage: "rate limited by feed server - will retry later",
+ Retryable: true,
+ UnderlyingError: originalError,
+ }
+ case 500, 502, 503, 504:
+ return &FetchError{
+ StatusCode: statusCode,
+ UserMessage: fmt.Sprintf("feed server error (HTTP %d) - will retry later", statusCode),
+ Retryable: true,
+ UnderlyingError: originalError,
+ }
+ default:
+ return &FetchError{
+ StatusCode: statusCode,
+ UserMessage: fmt.Sprintf("unexpected HTTP status %d", statusCode),
+ Retryable: statusCode >= 500,
+ UnderlyingError: originalError,
+ }
+ }
+}
+
+func classifyNetworkError(originalError error) *FetchError {
+ if originalError == nil {
+ return &FetchError{
+ UserMessage: "unknown fetch error",
+ Retryable: true,
+ }
+ }
+
+ var dnsError *net.DNSError
+
+ if errors.As(originalError, &dnsError) {
+ return &FetchError{
+ UserMessage: "DNS resolution failed - check feed URL",
+ Retryable: !dnsError.IsNotFound,
+ UnderlyingError: originalError,
+ }
+ }
+
+ var urlError *url.Error
+
+ if errors.As(originalError, &urlError) {
+ if urlError.Timeout() {
+ return &FetchError{
+ UserMessage: "connection timed out - feed server may be slow",
+ Retryable: true,
+ UnderlyingError: originalError,
+ }
+ }
+ }
+
+ var netOpError *net.OpError
+
+ if errors.As(originalError, &netOpError) {
+ return &FetchError{
+ UserMessage: "network connection error - will retry later",
+ Retryable: true,
+ UnderlyingError: originalError,
+ }
+ }
+
+ if strings.Contains(originalError.Error(), "certificate") || strings.Contains(originalError.Error(), "tls") {
+ return &FetchError{
+ UserMessage: "TLS/certificate error - feed server has invalid certificate",
+ Retryable: false,
+ UnderlyingError: originalError,
+ }
+ }
+
+ return &FetchError{
+ UserMessage: "unexpected network error",
+ Retryable: true,
+ UnderlyingError: originalError,
+ }
+}
diff --git a/services/worker/internal/fetcher/fetcher.go b/services/worker/internal/fetcher/fetcher.go
new file mode 100644
index 0000000..019bd39
--- /dev/null
+++ b/services/worker/internal/fetcher/fetcher.go
@@ -0,0 +1,116 @@
+package fetcher
+
+import (
+ "context"
+ "fmt"
+ "io"
+ "net/http"
+ "time"
+)
+
+type FetchResult struct {
+ Body []byte
+ StatusCode int
+ EntityTag string
+ LastModifiedHeader string
+ NotModified bool
+}
+
+type Fetcher struct {
+ httpClient *http.Client
+}
+
+func NewFetcher(fetchTimeout time.Duration) *Fetcher {
+ return &Fetcher{
+ httpClient: &http.Client{
+ Timeout: fetchTimeout,
+ CheckRedirect: func(request *http.Request, previousRequests []*http.Request) error {
+ if len(previousRequests) >= 5 {
+ return fmt.Errorf("too many redirects (exceeded 5)")
+ }
+
+ redirectValidationError := ValidateRedirectTarget(request.URL.String())
+ if redirectValidationError != nil {
+ return fmt.Errorf("blocked redirect to reserved address: %w", redirectValidationError)
+ }
+
+ return nil
+ },
+ },
+ }
+}
+
+func (feedFetcher *Fetcher) Fetch(
+ requestContext context.Context,
+ feedURL string,
+ previousEntityTag string,
+ previousLastModified string,
+ authenticationConfig AuthenticationConfiguration,
+) (*FetchResult, error) {
+ urlValidationError := ValidateFeedURL(feedURL)
+ if urlValidationError != nil {
+ return nil, fmt.Errorf("blocked request to disallowed URL: %w", urlValidationError)
+ }
+
+ request, requestCreationError := http.NewRequestWithContext(requestContext, http.MethodGet, feedURL, nil)
+
+ if requestCreationError != nil {
+ return nil, fmt.Errorf("failed to create HTTP request: %w", requestCreationError)
+ }
+
+ request.Header.Set("User-Agent", "asa.news Feed Worker/1.0")
+ request.Header.Set("Accept", "application/rss+xml, application/atom+xml, application/xml, text/xml, */*")
+
+ if previousEntityTag != "" {
+ request.Header.Set("If-None-Match", previousEntityTag)
+ }
+
+ if previousLastModified != "" {
+ request.Header.Set("If-Modified-Since", previousLastModified)
+ }
+
+ authenticationError := ApplyAuthentication(request, authenticationConfig)
+
+ if authenticationError != nil {
+ return nil, fmt.Errorf("failed to apply authentication: %w", authenticationError)
+ }
+
+ response, requestError := feedFetcher.httpClient.Do(request)
+
+ if requestError != nil {
+ classifiedError := ClassifyError(requestError, 0)
+
+ return nil, classifiedError
+ }
+
+ defer response.Body.Close()
+
+ if response.StatusCode == http.StatusNotModified {
+ return &FetchResult{
+ StatusCode: response.StatusCode,
+ EntityTag: response.Header.Get("ETag"),
+ LastModifiedHeader: response.Header.Get("Last-Modified"),
+ NotModified: true,
+ }, nil
+ }
+
+ if response.StatusCode >= 400 {
+ classifiedError := ClassifyError(nil, response.StatusCode)
+
+ return nil, classifiedError
+ }
+
+ responseBody, readError := io.ReadAll(io.LimitReader(response.Body, 10*1024*1024))
+
+ if readError != nil {
+ return nil, fmt.Errorf("failed to read response body: %w", readError)
+ }
+
+ return &FetchResult{
+ Body: responseBody,
+ StatusCode: response.StatusCode,
+ EntityTag: response.Header.Get("ETag"),
+ LastModifiedHeader: response.Header.Get("Last-Modified"),
+ NotModified: false,
+ }, nil
+}
diff --git a/services/worker/internal/fetcher/ssrf_protection.go b/services/worker/internal/fetcher/ssrf_protection.go
new file mode 100644
index 0000000..1887e78
--- /dev/null
+++ b/services/worker/internal/fetcher/ssrf_protection.go
@@ -0,0 +1,77 @@
+package fetcher
+
+import (
+ "context"
+ "fmt"
+ "net"
+ "net/url"
+ "strings"
+ "time"
+)
+
+var reservedNetworks = []net.IPNet{
+ {IP: net.IPv4(10, 0, 0, 0), Mask: net.CIDRMask(8, 32)},
+ {IP: net.IPv4(172, 16, 0, 0), Mask: net.CIDRMask(12, 32)},
+ {IP: net.IPv4(192, 168, 0, 0), Mask: net.CIDRMask(16, 32)},
+ {IP: net.IPv4(127, 0, 0, 0), Mask: net.CIDRMask(8, 32)},
+ {IP: net.IPv4(169, 254, 0, 0), Mask: net.CIDRMask(16, 32)},
+ {IP: net.IPv4(0, 0, 0, 0), Mask: net.CIDRMask(8, 32)},
+ {IP: net.ParseIP("::1"), Mask: net.CIDRMask(128, 128)},
+ {IP: net.ParseIP("fc00::"), Mask: net.CIDRMask(7, 128)},
+ {IP: net.ParseIP("fe80::"), Mask: net.CIDRMask(10, 128)},
+}
+
+func isReservedAddress(ipAddress net.IP) bool {
+ for _, network := range reservedNetworks {
+ if network.Contains(ipAddress) {
+ return true
+ }
+ }
+
+ return false
+}
+
+func ValidateFeedURL(feedURL string) error {
+ parsedURL, parseError := url.Parse(feedURL)
+ if parseError != nil {
+ return fmt.Errorf("invalid URL: %w", parseError)
+ }
+
+ scheme := strings.ToLower(parsedURL.Scheme)
+ if scheme != "http" && scheme != "https" {
+ return fmt.Errorf("unsupported scheme %q: only http and https are allowed", parsedURL.Scheme)
+ }
+
+ hostname := parsedURL.Hostname()
+ if hostname == "" {
+ return fmt.Errorf("URL has no hostname")
+ }
+
+ if parsedIP := net.ParseIP(hostname); parsedIP != nil {
+ if isReservedAddress(parsedIP) {
+ return fmt.Errorf("feed URL resolves to a reserved IP address")
+ }
+
+ return nil
+ }
+
+ resolverContext, cancelResolver := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancelResolver()
+
+ resolvedAddresses, lookupError := net.DefaultResolver.LookupIPAddr(resolverContext, hostname)
+ if lookupError != nil {
+ return fmt.Errorf("failed to resolve hostname %q: %w", hostname, lookupError)
+ }
+
+ for _, resolvedAddress := range resolvedAddresses {
+ if isReservedAddress(resolvedAddress.IP) {
+ return fmt.Errorf("feed URL resolves to a reserved IP address")
+ }
+ }
+
+ return nil
+}
+
+func ValidateRedirectTarget(redirectURL string) error {
+ return ValidateFeedURL(redirectURL)
+}
diff --git a/services/worker/internal/health/health.go b/services/worker/internal/health/health.go
new file mode 100644
index 0000000..0a29c92
--- /dev/null
+++ b/services/worker/internal/health/health.go
@@ -0,0 +1,117 @@
+package health
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "github.com/jackc/pgx/v5/pgxpool"
+ "log/slog"
+ "net/http"
+ "time"
+)
+
+type HealthStatus struct {
+ Status string `json:"status"`
+ Timestamp string `json:"timestamp"`
+ Database string `json:"database"`
+}
+
+type HealthServer struct {
+ httpServer *http.Server
+ databaseConnectionPool *pgxpool.Pool
+ logger *slog.Logger
+}
+
+func NewHealthServer(
+ healthPort int,
+ databaseConnectionPool *pgxpool.Pool,
+ logger *slog.Logger,
+) *HealthServer {
+ healthServer := &HealthServer{
+ databaseConnectionPool: databaseConnectionPool,
+ logger: logger,
+ }
+ serveMux := http.NewServeMux()
+
+ serveMux.HandleFunc("/health", healthServer.handleHealthCheck)
+ serveMux.HandleFunc("/ready", healthServer.handleReadinessCheck)
+
+ healthServer.httpServer = &http.Server{
+ Addr: fmt.Sprintf(":%d", healthPort),
+ Handler: serveMux,
+ ReadTimeout: 5 * time.Second,
+ WriteTimeout: 5 * time.Second,
+ IdleTimeout: 30 * time.Second,
+ }
+
+ return healthServer
+}
+
+func (healthServer *HealthServer) Start() {
+ go func() {
+ healthServer.logger.Info("health server starting", "address", healthServer.httpServer.Addr)
+
+ listenError := healthServer.httpServer.ListenAndServe()
+
+ if listenError != nil && listenError != http.ErrServerClosed {
+ healthServer.logger.Error("health server failed", "error", listenError)
+ }
+ }()
+}
+
+func (healthServer *HealthServer) Stop(shutdownContext context.Context) error {
+ return healthServer.httpServer.Shutdown(shutdownContext)
+}
+
+func (healthServer *HealthServer) handleHealthCheck(responseWriter http.ResponseWriter, request *http.Request) {
+ healthStatus := HealthStatus{
+ Status: "healthy",
+ Timestamp: time.Now().UTC().Format(time.RFC3339),
+ Database: "unknown",
+ }
+ pingContext, cancelPing := context.WithTimeout(request.Context(), 2*time.Second)
+
+ defer cancelPing()
+
+ pingError := healthServer.databaseConnectionPool.Ping(pingContext)
+
+ if pingError != nil {
+ healthStatus.Status = "unhealthy"
+ healthStatus.Database = "unreachable"
+
+ respondWithJSON(responseWriter, http.StatusServiceUnavailable, healthStatus)
+
+ return
+ }
+
+ healthStatus.Database = "connected"
+
+ respondWithJSON(responseWriter, http.StatusOK, healthStatus)
+}
+
+func (healthServer *HealthServer) handleReadinessCheck(responseWriter http.ResponseWriter, request *http.Request) {
+ pingContext, cancelPing := context.WithTimeout(request.Context(), 2*time.Second)
+
+ defer cancelPing()
+
+ pingError := healthServer.databaseConnectionPool.Ping(pingContext)
+
+ if pingError != nil {
+ responseWriter.WriteHeader(http.StatusServiceUnavailable)
+
+ return
+ }
+
+ responseWriter.WriteHeader(http.StatusOK)
+}
+
+func respondWithJSON(responseWriter http.ResponseWriter, statusCode int, payload interface{}) {
+ responseWriter.Header().Set("Content-Type", "application/json")
+ responseWriter.WriteHeader(statusCode)
+
+ encodingError := json.NewEncoder(responseWriter).Encode(payload)
+
+ if encodingError != nil {
+ return
+ }
+}
diff --git a/services/worker/internal/model/feed.go b/services/worker/internal/model/feed.go
new file mode 100644
index 0000000..611c820
--- /dev/null
+++ b/services/worker/internal/model/feed.go
@@ -0,0 +1,41 @@
+package model
+
+import (
+ "time"
+)
+
+type Feed struct {
+ Identifier string
+ URL string
+ SiteURL *string
+ Title *string
+ FeedType *string
+ Visibility string
+ EntityTag *string
+ LastModified *string
+ LastFetchedAt *time.Time
+ LastFetchError *string
+ ConsecutiveFailures int
+ NextFetchAt time.Time
+ FetchIntervalSeconds int
+ CreatedAt time.Time
+ UpdatedAt time.Time
+}
+
+type FeedEntry struct {
+ FeedIdentifier string
+ OwnerIdentifier *string
+ GUID string
+ URL *string
+ Title *string
+ Author *string
+ Summary *string
+ ContentHTML *string
+ ContentText *string
+ ImageURL *string
+ PublishedAt *time.Time
+ WordCount *int
+ EnclosureURL *string
+ EnclosureType *string
+ EnclosureLength *int64
+}
diff --git a/services/worker/internal/model/queue.go b/services/worker/internal/model/queue.go
new file mode 100644
index 0000000..a5aaa7c
--- /dev/null
+++ b/services/worker/internal/model/queue.go
@@ -0,0 +1,5 @@
+package model
+
+type RefreshRequest struct {
+ FeedIdentifier string `json:"feed_id"`
+}
diff --git a/services/worker/internal/parser/parser.go b/services/worker/internal/parser/parser.go
new file mode 100644
index 0000000..1fb2f76
--- /dev/null
+++ b/services/worker/internal/parser/parser.go
@@ -0,0 +1,234 @@
+package parser
+
+import (
+ "crypto/sha256"
+ "fmt"
+ "github.com/Fuwn/asa-news/internal/model"
+ "github.com/mmcdole/gofeed"
+ "strconv"
+ "strings"
+ "time"
+ "unicode/utf8"
+)
+
+type Parser struct {
+ gofeedParser *gofeed.Parser
+}
+
+func NewParser() *Parser {
+ return &Parser{
+ gofeedParser: gofeed.NewParser(),
+ }
+}
+
+type ParseResult struct {
+ Entries []model.FeedEntry
+ FeedTitle string
+ SiteURL string
+ AudioEnclosureRatio float64
+}
+
+func (feedParser *Parser) Parse(feedIdentifier string, ownerIdentifier *string, rawFeedContent []byte) (*ParseResult, error) {
+ parsedFeed, parseError := feedParser.gofeedParser.ParseString(string(rawFeedContent))
+
+ if parseError != nil {
+ return nil, fmt.Errorf("failed to parse feed content: %w", parseError)
+ }
+
+ feedEntries := make([]model.FeedEntry, 0, len(parsedFeed.Items))
+
+ audioEnclosureCount := 0
+
+ for _, feedItem := range parsedFeed.Items {
+ normalizedEntry := normalizeFeedItem(feedIdentifier, ownerIdentifier, feedItem)
+ if normalizedEntry.EnclosureURL != nil {
+ audioEnclosureCount++
+ }
+ feedEntries = append(feedEntries, normalizedEntry)
+ }
+
+ audioEnclosureRatio := 0.0
+ if len(feedEntries) > 0 {
+ audioEnclosureRatio = float64(audioEnclosureCount) / float64(len(feedEntries))
+ }
+
+ return &ParseResult{
+ Entries: feedEntries,
+ FeedTitle: strings.TrimSpace(parsedFeed.Title),
+ SiteURL: strings.TrimSpace(parsedFeed.Link),
+ AudioEnclosureRatio: audioEnclosureRatio,
+ }, nil
+}
+
+func normalizeFeedItem(feedIdentifier string, ownerIdentifier *string, feedItem *gofeed.Item) model.FeedEntry {
+ globallyUniqueIdentifier := resolveGloballyUniqueIdentifier(feedItem)
+ entryURL := stringPointerOrNil(resolveEntryURL(feedItem))
+ entryTitle := stringPointerOrNil(strings.TrimSpace(feedItem.Title))
+ entrySummary := stringPointerOrNil(strings.TrimSpace(feedItem.Description))
+ entryContentHTML := stringPointerOrNil(resolveContentHTML(feedItem))
+ entryContentText := resolveContentText(feedItem)
+ authorName := stringPointerOrNil(resolveAuthorName(feedItem))
+ publishedAt := resolvePublishedDate(feedItem)
+ entryImageURL := stringPointerOrNil(resolveImageURL(feedItem))
+ wordCount := countWords(entryContentText)
+ enclosureURL, enclosureType, enclosureLength := resolveAudioEnclosure(feedItem)
+
+ return model.FeedEntry{
+ FeedIdentifier: feedIdentifier,
+ OwnerIdentifier: ownerIdentifier,
+ GUID: globallyUniqueIdentifier,
+ URL: entryURL,
+ Title: entryTitle,
+ Author: authorName,
+ Summary: entrySummary,
+ ContentHTML: entryContentHTML,
+ ContentText: stringPointerOrNil(entryContentText),
+ ImageURL: entryImageURL,
+ PublishedAt: publishedAt,
+ WordCount: wordCount,
+ EnclosureURL: enclosureURL,
+ EnclosureType: enclosureType,
+ EnclosureLength: enclosureLength,
+ }
+}
+
+func stringPointerOrNil(value string) *string {
+ if value == "" {
+ return nil
+ }
+
+ return &value
+}
+
+func resolveGloballyUniqueIdentifier(feedItem *gofeed.Item) string {
+ if feedItem.GUID != "" {
+ return feedItem.GUID
+ }
+
+ if feedItem.Link != "" {
+ return feedItem.Link
+ }
+
+ hashInput := feedItem.Title + feedItem.Description
+ hashBytes := sha256.Sum256([]byte(hashInput))
+
+ return fmt.Sprintf("sha256:%x", hashBytes)
+}
+
+func resolveEntryURL(feedItem *gofeed.Item) string {
+ if feedItem.Link != "" {
+ return feedItem.Link
+ }
+
+ if feedItem.GUID != "" && strings.HasPrefix(feedItem.GUID, "http") {
+ return feedItem.GUID
+ }
+
+ return ""
+}
+
+func resolveContentHTML(feedItem *gofeed.Item) string {
+ if feedItem.Content != "" {
+ return feedItem.Content
+ }
+
+ return feedItem.Description
+}
+
+func resolveContentText(feedItem *gofeed.Item) string {
+ contentSource := feedItem.Content
+
+ if contentSource == "" {
+ contentSource = feedItem.Description
+ }
+
+ return stripHTMLTags(contentSource)
+}
+
+func resolveAuthorName(feedItem *gofeed.Item) string {
+ if feedItem.Author != nil && feedItem.Author.Name != "" {
+ return feedItem.Author.Name
+ }
+
+ if len(feedItem.Authors) > 0 && feedItem.Authors[0].Name != "" {
+ return feedItem.Authors[0].Name
+ }
+
+ return ""
+}
+
+func resolvePublishedDate(feedItem *gofeed.Item) *time.Time {
+ if feedItem.PublishedParsed != nil {
+ return feedItem.PublishedParsed
+ }
+
+ if feedItem.UpdatedParsed != nil {
+ return feedItem.UpdatedParsed
+ }
+
+ return nil
+}
+
+func resolveAudioEnclosure(feedItem *gofeed.Item) (*string, *string, *int64) {
+ if feedItem.Enclosures == nil {
+ return nil, nil, nil
+ }
+
+ for _, enclosure := range feedItem.Enclosures {
+ if strings.HasPrefix(enclosure.Type, "audio/") && enclosure.URL != "" {
+ enclosureURL := enclosure.URL
+ enclosureType := enclosure.Type
+
+ var enclosureLength *int64
+ if enclosure.Length != "" {
+ if parsedLength, parseError := strconv.ParseInt(enclosure.Length, 10, 64); parseError == nil {
+ enclosureLength = &parsedLength
+ }
+ }
+
+ return &enclosureURL, &enclosureType, enclosureLength
+ }
+ }
+
+ return nil, nil, nil
+}
+
+func resolveImageURL(feedItem *gofeed.Item) string {
+ if feedItem.Image != nil && feedItem.Image.URL != "" {
+ return feedItem.Image.URL
+ }
+
+ return ""
+}
+
+func countWords(plainText string) *int {
+ if plainText == "" {
+ return nil
+ }
+
+ count := len(strings.Fields(plainText))
+
+ return &count
+}
+
+func stripHTMLTags(htmlContent string) string {
+ var resultBuilder strings.Builder
+
+ insideTag := false
+
+ for characterIndex := 0; characterIndex < len(htmlContent); {
+ currentRune, runeSize := utf8.DecodeRuneInString(htmlContent[characterIndex:])
+
+ if currentRune == '<' {
+ insideTag = true
+ } else if currentRune == '>' {
+ insideTag = false
+ } else if !insideTag {
+ resultBuilder.WriteRune(currentRune)
+ }
+
+ characterIndex += runeSize
+ }
+
+ return strings.TrimSpace(resultBuilder.String())
+}
diff --git a/services/worker/internal/pool/pool.go b/services/worker/internal/pool/pool.go
new file mode 100644
index 0000000..7df03e2
--- /dev/null
+++ b/services/worker/internal/pool/pool.go
@@ -0,0 +1,60 @@
+package pool
+
+import (
+ "context"
+ "log/slog"
+ "sync"
+)
+
+type WorkFunction func(workContext context.Context)
+
+type WorkerPool struct {
+ concurrencyLimit int
+ semaphoreChannel chan struct{}
+ waitGroup sync.WaitGroup
+ logger *slog.Logger
+}
+
+func NewWorkerPool(concurrencyLimit int, logger *slog.Logger) *WorkerPool {
+ return &WorkerPool{
+ concurrencyLimit: concurrencyLimit,
+ semaphoreChannel: make(chan struct{}, concurrencyLimit),
+ logger: logger,
+ }
+}
+
+func (workerPool *WorkerPool) Submit(workContext context.Context, workFunction WorkFunction) bool {
+ select {
+ case workerPool.semaphoreChannel <- struct{}{}:
+ workerPool.waitGroup.Add(1)
+
+ go func() {
+ defer workerPool.waitGroup.Done()
+ defer func() { <-workerPool.semaphoreChannel }()
+ defer func() {
+ recoveredPanic := recover()
+
+ if recoveredPanic != nil {
+ workerPool.logger.Error(
+ "worker panic recovered",
+ "panic_value", recoveredPanic,
+ )
+ }
+ }()
+
+ workFunction(workContext)
+ }()
+
+ return true
+ case <-workContext.Done():
+ return false
+ }
+}
+
+func (workerPool *WorkerPool) Wait() {
+ workerPool.waitGroup.Wait()
+}
+
+func (workerPool *WorkerPool) ActiveWorkerCount() int {
+ return len(workerPool.semaphoreChannel)
+}
diff --git a/services/worker/internal/scheduler/refresh.go b/services/worker/internal/scheduler/refresh.go
new file mode 100644
index 0000000..8271fa0
--- /dev/null
+++ b/services/worker/internal/scheduler/refresh.go
@@ -0,0 +1,196 @@
+package scheduler
+
+import (
+ "context"
+ "github.com/Fuwn/asa-news/internal/fetcher"
+ "github.com/Fuwn/asa-news/internal/model"
+ "github.com/Fuwn/asa-news/internal/parser"
+ "github.com/Fuwn/asa-news/internal/webhook"
+ "github.com/Fuwn/asa-news/internal/writer"
+ "log/slog"
+)
+
+func ProcessRefreshRequest(
+ refreshContext context.Context,
+ feed model.Feed,
+ feedFetcher *fetcher.Fetcher,
+ feedParser *parser.Parser,
+ feedWriter *writer.Writer,
+ webhookDispatcher *webhook.Dispatcher,
+ logger *slog.Logger,
+) {
+ logger.Info(
+ "processing feed refresh",
+ "feed_identifier", feed.Identifier,
+ "feed_url", feed.URL,
+ )
+
+ var ownerIdentifier *string
+
+ if feed.Visibility == "authenticated" {
+ logger.Warn(
+ "authenticated feed refresh not yet implemented",
+ "feed_identifier", feed.Identifier,
+ )
+
+ return
+ }
+
+ authenticationConfig := fetcher.AuthenticationConfiguration{}
+
+ entityTag := ""
+
+ if feed.EntityTag != nil {
+ entityTag = *feed.EntityTag
+ }
+
+ lastModified := ""
+
+ if feed.LastModified != nil {
+ lastModified = *feed.LastModified
+ }
+
+ fetchResult, fetchError := feedFetcher.Fetch(
+ refreshContext,
+ feed.URL,
+ entityTag,
+ lastModified,
+ authenticationConfig,
+ )
+
+ if fetchError != nil {
+ logger.Warn(
+ "feed fetch failed",
+ "feed_identifier", feed.Identifier,
+ "error", fetchError,
+ )
+
+ recordError := feedWriter.RecordFeedError(refreshContext, feed.Identifier, fetchError.Error())
+
+ if recordError != nil {
+ logger.Error(
+ "failed to record feed error",
+ "feed_identifier", feed.Identifier,
+ "error", recordError,
+ )
+ }
+
+ return
+ }
+
+ if fetchResult.NotModified {
+ logger.Info(
+ "feed not modified",
+ "feed_identifier", feed.Identifier,
+ )
+
+ metadataError := feedWriter.UpdateFeedMetadata(
+ refreshContext,
+ feed.Identifier,
+ fetchResult.EntityTag,
+ fetchResult.LastModifiedHeader,
+ "",
+ "",
+ )
+
+ if metadataError != nil {
+ logger.Error(
+ "failed to update feed metadata after not-modified",
+ "feed_identifier", feed.Identifier,
+ "error", metadataError,
+ )
+ }
+
+ return
+ }
+
+ parseResult, parseError := feedParser.Parse(feed.Identifier, ownerIdentifier, fetchResult.Body)
+
+ if parseError != nil {
+ logger.Warn(
+ "feed parse failed",
+ "feed_identifier", feed.Identifier,
+ "error", parseError,
+ )
+
+ recordError := feedWriter.RecordFeedError(refreshContext, feed.Identifier, parseError.Error())
+
+ if recordError != nil {
+ logger.Error(
+ "failed to record parse error",
+ "feed_identifier", feed.Identifier,
+ "error", recordError,
+ )
+ }
+
+ return
+ }
+
+ rowsAffected, writeError := feedWriter.WriteEntries(refreshContext, parseResult.Entries)
+
+ if writeError != nil {
+ logger.Error(
+ "failed to write feed entries",
+ "feed_identifier", feed.Identifier,
+ "error", writeError,
+ )
+
+ recordError := feedWriter.RecordFeedError(refreshContext, feed.Identifier, writeError.Error())
+
+ if recordError != nil {
+ logger.Error(
+ "failed to record write error",
+ "feed_identifier", feed.Identifier,
+ "error", recordError,
+ )
+ }
+
+ return
+ }
+
+ if rowsAffected > 0 && webhookDispatcher != nil {
+ webhookDispatcher.DispatchForFeed(refreshContext, feed.Identifier, parseResult.Entries)
+ }
+
+ metadataError := feedWriter.UpdateFeedMetadata(
+ refreshContext,
+ feed.Identifier,
+ fetchResult.EntityTag,
+ fetchResult.LastModifiedHeader,
+ parseResult.FeedTitle,
+ parseResult.SiteURL,
+ )
+
+ if metadataError != nil {
+ logger.Error(
+ "failed to update feed metadata",
+ "feed_identifier", feed.Identifier,
+ "error", metadataError,
+ )
+ }
+
+ if parseResult.AudioEnclosureRatio > 0.5 {
+ if feedTypeError := feedWriter.UpdateFeedType(refreshContext, feed.Identifier, "podcast"); feedTypeError != nil {
+ logger.Error(
+ "failed to update feed type to podcast",
+ "feed_identifier", feed.Identifier,
+ "error", feedTypeError,
+ )
+ }
+ } else if feed.FeedType != nil && *feed.FeedType == "podcast" {
+ if feedTypeError := feedWriter.UpdateFeedType(refreshContext, feed.Identifier, "rss"); feedTypeError != nil {
+ logger.Error(
+ "failed to clear podcast feed type",
+ "feed_identifier", feed.Identifier,
+ "error", feedTypeError,
+ )
+ }
+ }
+
+ logger.Info(
+ "feed refresh completed",
+ "feed_identifier", feed.Identifier,
+ "entries_parsed", len(parseResult.Entries),
+ "rows_affected", rowsAffected,
+ )
+}
diff --git a/services/worker/internal/scheduler/scheduler.go b/services/worker/internal/scheduler/scheduler.go
new file mode 100644
index 0000000..646e263
--- /dev/null
+++ b/services/worker/internal/scheduler/scheduler.go
@@ -0,0 +1,283 @@
+package scheduler
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "github.com/Fuwn/asa-news/internal/fetcher"
+ "github.com/Fuwn/asa-news/internal/model"
+ "github.com/Fuwn/asa-news/internal/parser"
+ "github.com/Fuwn/asa-news/internal/pool"
+ "github.com/Fuwn/asa-news/internal/webhook"
+ "github.com/Fuwn/asa-news/internal/writer"
+ pgmq "github.com/craigpastro/pgmq-go"
+ "github.com/jackc/pgx/v5/pgxpool"
+ "log/slog"
+ "time"
+)
+
+type Scheduler struct {
+ databaseConnectionPool *pgxpool.Pool
+ feedFetcher *fetcher.Fetcher
+ feedParser *parser.Parser
+ feedWriter *writer.Writer
+ webhookDispatcher *webhook.Dispatcher
+ workerPool *pool.WorkerPool
+ pollInterval time.Duration
+ queuePollInterval time.Duration
+ batchSize int
+ logger *slog.Logger
+}
+
+func NewScheduler(
+ databaseConnectionPool *pgxpool.Pool,
+ feedFetcher *fetcher.Fetcher,
+ feedParser *parser.Parser,
+ feedWriter *writer.Writer,
+ webhookDispatcher *webhook.Dispatcher,
+ workerPool *pool.WorkerPool,
+ pollInterval time.Duration,
+ queuePollInterval time.Duration,
+ batchSize int,
+ logger *slog.Logger,
+) *Scheduler {
+ return &Scheduler{
+ databaseConnectionPool: databaseConnectionPool,
+ feedFetcher: feedFetcher,
+ feedParser: feedParser,
+ feedWriter: feedWriter,
+ webhookDispatcher: webhookDispatcher,
+ workerPool: workerPool,
+ pollInterval: pollInterval,
+ queuePollInterval: queuePollInterval,
+ batchSize: batchSize,
+ logger: logger,
+ }
+}
+
+func (feedScheduler *Scheduler) Run(schedulerContext context.Context) {
+ pollTicker := time.NewTicker(feedScheduler.pollInterval)
+
+ defer pollTicker.Stop()
+
+ queueTicker := time.NewTicker(feedScheduler.queuePollInterval)
+
+ defer queueTicker.Stop()
+
+ feedScheduler.logger.Info("scheduler started",
+ "poll_interval", feedScheduler.pollInterval,
+ "queue_poll_interval", feedScheduler.queuePollInterval,
+ "batch_size", feedScheduler.batchSize,
+ )
+ feedScheduler.executePollCycle(schedulerContext)
+ feedScheduler.executeQueueCycle(schedulerContext)
+
+ for {
+ select {
+ case <-schedulerContext.Done():
+ feedScheduler.logger.Info("scheduler shutting down")
+
+ return
+ case <-pollTicker.C:
+ feedScheduler.executePollCycle(schedulerContext)
+ case <-queueTicker.C:
+ feedScheduler.executeQueueCycle(schedulerContext)
+ }
+ }
+}
+
+func (feedScheduler *Scheduler) executePollCycle(cycleContext context.Context) {
+ claimedFeeds, claimError := feedScheduler.claimDueFeeds(cycleContext)
+
+ if claimError != nil {
+ feedScheduler.logger.Error("failed to claim due feeds", "error", claimError)
+
+ return
+ }
+
+ if len(claimedFeeds) == 0 {
+ return
+ }
+
+ feedScheduler.logger.Info("claimed feeds for processing", "count", len(claimedFeeds))
+
+ for _, claimedFeed := range claimedFeeds {
+ capturedFeed := claimedFeed
+
+ feedScheduler.workerPool.Submit(cycleContext, func(workContext context.Context) {
+ ProcessRefreshRequest(
+ workContext,
+ capturedFeed,
+ feedScheduler.feedFetcher,
+ feedScheduler.feedParser,
+ feedScheduler.feedWriter,
+ feedScheduler.webhookDispatcher,
+ feedScheduler.logger,
+ )
+ })
+ }
+}
+
+func (feedScheduler *Scheduler) executeQueueCycle(cycleContext context.Context) {
+ queueMessage, readError := pgmq.Read(cycleContext, feedScheduler.databaseConnectionPool, "feed_refresh", 30)
+
+ if readError != nil {
+ return
+ }
+
+ if queueMessage == nil {
+ return
+ }
+
+ var refreshRequest model.RefreshRequest
+
+ unmarshalError := json.Unmarshal(queueMessage.Message, &refreshRequest)
+
+ if unmarshalError != nil {
+ feedScheduler.logger.Error("failed to unmarshal refresh request", "error", unmarshalError)
+
+ return
+ }
+
+ feed, lookupError := feedScheduler.lookupFeed(cycleContext, refreshRequest.FeedIdentifier)
+
+ if lookupError != nil {
+ feedScheduler.logger.Error(
+ "failed to look up feed for queue request",
+ "feed_identifier", refreshRequest.FeedIdentifier,
+ "error", lookupError,
+ )
+
+ return
+ }
+
+ capturedMessageIdentifier := queueMessage.MsgID
+
+ feedScheduler.workerPool.Submit(cycleContext, func(workContext context.Context) {
+ ProcessRefreshRequest(
+ workContext,
+ feed,
+ feedScheduler.feedFetcher,
+ feedScheduler.feedParser,
+ feedScheduler.feedWriter,
+ feedScheduler.webhookDispatcher,
+ feedScheduler.logger,
+ )
+
+ _, archiveError := pgmq.Archive(workContext, feedScheduler.databaseConnectionPool, "feed_refresh", capturedMessageIdentifier)
+
+ if archiveError != nil {
+ feedScheduler.logger.Error(
+ "failed to archive queue message",
+ "message_id", capturedMessageIdentifier,
+ "error", archiveError,
+ )
+ }
+ })
+}
+
+func (feedScheduler *Scheduler) claimDueFeeds(claimContext context.Context) ([]model.Feed, error) {
+ claimQuery := `
+ UPDATE feeds
+ SET last_fetched_at = NOW()
+ WHERE id IN (
+ SELECT id
+ FROM feeds
+ WHERE next_fetch_at <= NOW()
+ AND subscriber_count > 0
+ ORDER BY next_fetch_at ASC
+ LIMIT $1
+ FOR UPDATE SKIP LOCKED
+ )
+ RETURNING
+ id, url, site_url, title, feed_type,
+ visibility, etag, last_modified,
+ last_fetched_at, last_fetch_error,
+ consecutive_failures, next_fetch_at,
+ fetch_interval_seconds,
+ created_at, updated_at
+ `
+ rows, queryError := feedScheduler.databaseConnectionPool.Query(claimContext, claimQuery, feedScheduler.batchSize)
+
+ if queryError != nil {
+ return nil, fmt.Errorf("failed to query due feeds: %w", queryError)
+ }
+
+ defer rows.Close()
+
+ claimedFeeds := make([]model.Feed, 0)
+
+ for rows.Next() {
+ var feed model.Feed
+
+ scanError := rows.Scan(
+ &feed.Identifier,
+ &feed.URL,
+ &feed.SiteURL,
+ &feed.Title,
+ &feed.FeedType,
+ &feed.Visibility,
+ &feed.EntityTag,
+ &feed.LastModified,
+ &feed.LastFetchedAt,
+ &feed.LastFetchError,
+ &feed.ConsecutiveFailures,
+ &feed.NextFetchAt,
+ &feed.FetchIntervalSeconds,
+ &feed.CreatedAt,
+ &feed.UpdatedAt,
+ )
+
+ if scanError != nil {
+ return nil, fmt.Errorf("failed to scan feed row: %w", scanError)
+ }
+
+ claimedFeeds = append(claimedFeeds, feed)
+ }
+
+ if rows.Err() != nil {
+ return nil, fmt.Errorf("error iterating feed rows: %w", rows.Err())
+ }
+
+ return claimedFeeds, nil
+}
+
+func (feedScheduler *Scheduler) lookupFeed(lookupContext context.Context, feedIdentifier string) (model.Feed, error) {
+ lookupQuery := `
+ SELECT
+ id, url, site_url, title, feed_type,
+ visibility, etag, last_modified,
+ last_fetched_at, last_fetch_error,
+ consecutive_failures, next_fetch_at,
+ fetch_interval_seconds,
+ created_at, updated_at
+ FROM feeds
+ WHERE id = $1
+ `
+
+ var feed model.Feed
+
+ scanError := feedScheduler.databaseConnectionPool.QueryRow(lookupContext, lookupQuery, feedIdentifier).Scan(
+ &feed.Identifier,
+ &feed.URL,
+ &feed.SiteURL,
+ &feed.Title,
+ &feed.FeedType,
+ &feed.Visibility,
+ &feed.EntityTag,
+ &feed.LastModified,
+ &feed.LastFetchedAt,
+ &feed.LastFetchError,
+ &feed.ConsecutiveFailures,
+ &feed.NextFetchAt,
+ &feed.FetchIntervalSeconds,
+ &feed.CreatedAt,
+ &feed.UpdatedAt,
+ )
+
+ if scanError != nil {
+ return model.Feed{}, fmt.Errorf("failed to look up feed %s: %w", feedIdentifier, scanError)
+ }
+
+ return feed, nil
+}
diff --git a/services/worker/internal/webhook/webhook.go b/services/worker/internal/webhook/webhook.go
new file mode 100644
index 0000000..c812820
--- /dev/null
+++ b/services/worker/internal/webhook/webhook.go
@@ -0,0 +1,333 @@
+package webhook
+
+import (
+ "bytes"
+ "context"
+ "crypto/hmac"
+ "crypto/sha256"
+ "encoding/hex"
+ "encoding/json"
+ "fmt"
+ "github.com/Fuwn/asa-news/internal/fetcher"
+ "github.com/Fuwn/asa-news/internal/model"
+ "github.com/jackc/pgx/v5/pgxpool"
+ "log/slog"
+ "net/http"
+ "time"
+)
+
+type Dispatcher struct {
+ databaseConnectionPool *pgxpool.Pool
+ httpClient *http.Client
+ logger *slog.Logger
+}
+
+type webhookSubscriber struct {
+ UserIdentifier string
+ WebhookURL string
+ WebhookSecret *string
+}
+
+type EntryPayload struct {
+ EntryIdentifier string `json:"entryIdentifier"`
+ FeedIdentifier string `json:"feedIdentifier"`
+ GUID string `json:"guid"`
+ URL *string `json:"url"`
+ Title *string `json:"title"`
+ Author *string `json:"author"`
+ Summary *string `json:"summary"`
+ PublishedAt *string `json:"publishedAt"`
+ EnclosureURL *string `json:"enclosureUrl"`
+ EnclosureType *string `json:"enclosureType"`
+}
+
+type WebhookPayload struct {
+ Event string `json:"event"`
+ Timestamp string `json:"timestamp"`
+ Entries []EntryPayload `json:"entries"`
+}
+
+func NewDispatcher(
+ databaseConnectionPool *pgxpool.Pool,
+ logger *slog.Logger,
+) *Dispatcher {
+ return &Dispatcher{
+ databaseConnectionPool: databaseConnectionPool,
+ httpClient: &http.Client{
+ Timeout: 10 * time.Second,
+ },
+ logger: logger,
+ }
+}
+
+func (webhookDispatcher *Dispatcher) DispatchForFeed(
+ dispatchContext context.Context,
+ feedIdentifier string,
+ entries []model.FeedEntry,
+) {
+ subscribers, queryError := webhookDispatcher.findWebhookSubscribers(dispatchContext, feedIdentifier)
+
+ if queryError != nil {
+ webhookDispatcher.logger.Error(
+ "failed to query webhook subscribers",
+ "feed_identifier", feedIdentifier,
+ "error", queryError,
+ )
+
+ return
+ }
+
+ if len(subscribers) == 0 {
+ return
+ }
+
+ entryPayloads := make([]EntryPayload, 0, len(entries))
+
+ for _, entry := range entries {
+ var publishedAtString *string
+
+ if entry.PublishedAt != nil {
+ formattedTime := entry.PublishedAt.Format(time.RFC3339)
+ publishedAtString = &formattedTime
+ }
+
+ entryPayloads = append(entryPayloads, EntryPayload{
+ EntryIdentifier: entry.FeedIdentifier,
+ FeedIdentifier: entry.FeedIdentifier,
+ GUID: entry.GUID,
+ URL: entry.URL,
+ Title: entry.Title,
+ Author: entry.Author,
+ Summary: entry.Summary,
+ PublishedAt: publishedAtString,
+ EnclosureURL: entry.EnclosureURL,
+ EnclosureType: entry.EnclosureType,
+ })
+ }
+
+ payload := WebhookPayload{
+ Event: "entries.created",
+ Timestamp: time.Now().UTC().Format(time.RFC3339),
+ Entries: entryPayloads,
+ }
+
+ for _, subscriber := range subscribers {
+ webhookDispatcher.deliverWebhook(dispatchContext, subscriber, payload)
+ }
+}
+
+func (webhookDispatcher *Dispatcher) findWebhookSubscribers(
+ queryContext context.Context,
+ feedIdentifier string,
+) ([]webhookSubscriber, error) {
+ subscriberQuery := `
+ SELECT up.id, up.webhook_url, up.webhook_secret
+ FROM subscriptions s
+ JOIN user_profiles up ON up.id = s.user_id
+ WHERE s.feed_id = $1
+ AND up.tier = 'developer'
+ AND up.webhook_enabled = true
+ AND up.webhook_url IS NOT NULL
+ `
+
+ rows, queryError := webhookDispatcher.databaseConnectionPool.Query(
+ queryContext,
+ subscriberQuery,
+ feedIdentifier,
+ )
+
+ if queryError != nil {
+ return nil, fmt.Errorf("failed to query webhook subscribers: %w", queryError)
+ }
+
+ defer rows.Close()
+
+ subscribers := make([]webhookSubscriber, 0)
+
+ for rows.Next() {
+ var subscriber webhookSubscriber
+
+ scanError := rows.Scan(
+ &subscriber.UserIdentifier,
+ &subscriber.WebhookURL,
+ &subscriber.WebhookSecret,
+ )
+
+ if scanError != nil {
+ return nil, fmt.Errorf("failed to scan subscriber row: %w", scanError)
+ }
+
+ subscribers = append(subscribers, subscriber)
+ }
+
+ return subscribers, rows.Err()
+}
+
+func (webhookDispatcher *Dispatcher) deliverWebhook(
+ deliveryContext context.Context,
+ subscriber webhookSubscriber,
+ payload WebhookPayload,
+) {
+ urlValidationError := fetcher.ValidateFeedURL(subscriber.WebhookURL)
+
+ if urlValidationError != nil {
+ webhookDispatcher.logger.Warn(
+ "webhook URL failed SSRF validation",
+ "user_identifier", subscriber.UserIdentifier,
+ "webhook_url", subscriber.WebhookURL,
+ "error", urlValidationError,
+ )
+
+ return
+ }
+
+ payloadBytes, marshalError := json.Marshal(payload)
+
+ if marshalError != nil {
+ webhookDispatcher.logger.Error(
+ "failed to marshal webhook payload",
+ "user_identifier", subscriber.UserIdentifier,
+ "error", marshalError,
+ )
+
+ return
+ }
+
+ retryDelays := []time.Duration{0, 1 * time.Second, 4 * time.Second, 16 * time.Second}
+ var lastError error
+
+ for attemptIndex, delay := range retryDelays {
+ if delay > 0 {
+ select {
+ case <-time.After(delay):
+ case <-deliveryContext.Done():
+ return
+ }
+ }
+
+ deliveryError := webhookDispatcher.sendWebhookRequest(
+ deliveryContext,
+ subscriber,
+ payloadBytes,
+ )
+
+ if deliveryError == nil {
+ if attemptIndex > 0 {
+ webhookDispatcher.logger.Info(
+ "webhook delivered after retry",
+ "user_identifier", subscriber.UserIdentifier,
+ "attempt", attemptIndex+1,
+ )
+ }
+
+ webhookDispatcher.resetConsecutiveFailures(deliveryContext, subscriber.UserIdentifier)
+
+ return
+ }
+
+ lastError = deliveryError
+
+ webhookDispatcher.logger.Warn(
+ "webhook delivery attempt failed",
+ "user_identifier", subscriber.UserIdentifier,
+ "attempt", attemptIndex+1,
+ "error", deliveryError,
+ )
+ }
+
+ webhookDispatcher.logger.Error(
+ "webhook delivery failed after all retries",
+ "user_identifier", subscriber.UserIdentifier,
+ "error", lastError,
+ )
+
+ webhookDispatcher.incrementConsecutiveFailures(deliveryContext, subscriber.UserIdentifier)
+}
+
+func (webhookDispatcher *Dispatcher) sendWebhookRequest(
+ requestContext context.Context,
+ subscriber webhookSubscriber,
+ payloadBytes []byte,
+) error {
+ request, requestCreationError := http.NewRequestWithContext(
+ requestContext,
+ http.MethodPost,
+ subscriber.WebhookURL,
+ bytes.NewReader(payloadBytes),
+ )
+
+ if requestCreationError != nil {
+ return fmt.Errorf("failed to create webhook request: %w", requestCreationError)
+ }
+
+ request.Header.Set("Content-Type", "application/json")
+ request.Header.Set("User-Agent", "asa.news Webhook/1.0")
+
+ if subscriber.WebhookSecret != nil && *subscriber.WebhookSecret != "" {
+ mac := hmac.New(sha256.New, []byte(*subscriber.WebhookSecret))
+ mac.Write(payloadBytes)
+ signature := hex.EncodeToString(mac.Sum(nil))
+ request.Header.Set("X-Asa-Signature-256", "sha256="+signature)
+ }
+
+ response, requestError := webhookDispatcher.httpClient.Do(request)
+
+ if requestError != nil {
+ return fmt.Errorf("webhook request failed: %w", requestError)
+ }
+
+ defer response.Body.Close()
+
+ if response.StatusCode >= 200 && response.StatusCode < 300 {
+ return nil
+ }
+
+ return fmt.Errorf("webhook returned status %d", response.StatusCode)
+}
+
+func (webhookDispatcher *Dispatcher) resetConsecutiveFailures(
+ updateContext context.Context,
+ userIdentifier string,
+) {
+ _, updateError := webhookDispatcher.databaseConnectionPool.Exec(
+ updateContext,
+ "UPDATE user_profiles SET webhook_consecutive_failures = 0 WHERE id = $1",
+ userIdentifier,
+ )
+
+ if updateError != nil {
+ webhookDispatcher.logger.Error(
+ "failed to reset webhook consecutive failures",
+ "user_identifier", userIdentifier,
+ "error", updateError,
+ )
+ }
+}
+
+func (webhookDispatcher *Dispatcher) incrementConsecutiveFailures(
+ updateContext context.Context,
+ userIdentifier string,
+) {
+ maximumConsecutiveFailures := 10
+
+ _, updateError := webhookDispatcher.databaseConnectionPool.Exec(
+ updateContext,
+ `UPDATE user_profiles
+ SET webhook_consecutive_failures = webhook_consecutive_failures + 1,
+ webhook_enabled = CASE
+ WHEN webhook_consecutive_failures + 1 >= $2 THEN false
+ ELSE webhook_enabled
+ END
+ WHERE id = $1`,
+ userIdentifier,
+ maximumConsecutiveFailures,
+ )
+
+ if updateError != nil {
+ webhookDispatcher.logger.Error(
+ "failed to increment webhook consecutive failures",
+ "user_identifier", userIdentifier,
+ "error", updateError,
+ )
+ }
+}
diff --git a/services/worker/internal/writer/writer.go b/services/worker/internal/writer/writer.go
new file mode 100644
index 0000000..de81bfa
--- /dev/null
+++ b/services/worker/internal/writer/writer.go
@@ -0,0 +1,222 @@
+package writer
+
+import (
+ "context"
+ "fmt"
+ "github.com/Fuwn/asa-news/internal/model"
+ "github.com/jackc/pgx/v5/pgxpool"
+ "strings"
+ "time"
+)
+
+type Writer struct {
+ databaseConnectionPool *pgxpool.Pool
+}
+
+func NewWriter(databaseConnectionPool *pgxpool.Pool) *Writer {
+ return &Writer{
+ databaseConnectionPool: databaseConnectionPool,
+ }
+}
+
+func (feedWriter *Writer) WriteEntries(writeContext context.Context, feedEntries []model.FeedEntry) (int64, error) {
+ if len(feedEntries) == 0 {
+ return 0, nil
+ }
+
+ valueParameterPlaceholders := make([]string, 0, len(feedEntries))
+ queryArguments := make([]interface{}, 0, len(feedEntries)*15)
+
+ for entryIndex, entry := range feedEntries {
+ parameterOffset := entryIndex * 15
+ valueParameterPlaceholders = append(
+ valueParameterPlaceholders,
+ fmt.Sprintf(
+ "($%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d)",
+ parameterOffset+1, parameterOffset+2, parameterOffset+3,
+ parameterOffset+4, parameterOffset+5, parameterOffset+6,
+ parameterOffset+7, parameterOffset+8, parameterOffset+9,
+ parameterOffset+10, parameterOffset+11, parameterOffset+12,
+ parameterOffset+13, parameterOffset+14, parameterOffset+15,
+ ),
+ )
+ queryArguments = append(
+ queryArguments,
+ entry.FeedIdentifier,
+ entry.OwnerIdentifier,
+ entry.GUID,
+ entry.URL,
+ entry.Title,
+ entry.Author,
+ entry.Summary,
+ entry.ContentHTML,
+ entry.ContentText,
+ entry.ImageURL,
+ entry.PublishedAt,
+ entry.WordCount,
+ entry.EnclosureURL,
+ entry.EnclosureType,
+ entry.EnclosureLength,
+ )
+ }
+
+ isPublicEntry := feedEntries[0].OwnerIdentifier == nil
+
+ var conflictClause string
+
+ if isPublicEntry {
+ conflictClause = "ON CONFLICT (feed_id, guid) WHERE owner_id IS NULL"
+ } else {
+ conflictClause = "ON CONFLICT (feed_id, owner_id, guid) WHERE owner_id IS NOT NULL"
+ }
+
+ upsertQuery := fmt.Sprintf(`
+ INSERT INTO entries (
+ feed_id, owner_id, guid, url, title, author,
+ summary, content_html, content_text, image_url,
+ published_at, word_count,
+ enclosure_url, enclosure_type, enclosure_length
+ )
+ VALUES %s
+ %s
+ DO UPDATE SET
+ url = EXCLUDED.url,
+ title = EXCLUDED.title,
+ author = EXCLUDED.author,
+ summary = CASE
+ WHEN EXISTS (SELECT 1 FROM user_highlights WHERE entry_id = entries.id)
+ THEN entries.summary
+ ELSE EXCLUDED.summary
+ END,
+ content_html = CASE
+ WHEN EXISTS (SELECT 1 FROM user_highlights WHERE entry_id = entries.id)
+ THEN entries.content_html
+ ELSE EXCLUDED.content_html
+ END,
+ content_text = CASE
+ WHEN EXISTS (SELECT 1 FROM user_highlights WHERE entry_id = entries.id)
+ THEN entries.content_text
+ ELSE EXCLUDED.content_text
+ END,
+ image_url = EXCLUDED.image_url,
+ published_at = EXCLUDED.published_at,
+ word_count = EXCLUDED.word_count,
+ enclosure_url = EXCLUDED.enclosure_url,
+ enclosure_type = EXCLUDED.enclosure_type,
+ enclosure_length = EXCLUDED.enclosure_length
+ WHERE
+ entries.title IS DISTINCT FROM EXCLUDED.title
+ OR entries.content_html IS DISTINCT FROM EXCLUDED.content_html
+ OR entries.enclosure_url IS DISTINCT FROM EXCLUDED.enclosure_url
+ `, strings.Join(valueParameterPlaceholders, ", "), conflictClause)
+ commandTag, executeError := feedWriter.databaseConnectionPool.Exec(writeContext, upsertQuery, queryArguments...)
+
+ if executeError != nil {
+ return 0, fmt.Errorf("failed to upsert feed entries: %w", executeError)
+ }
+
+ return commandTag.RowsAffected(), nil
+}
+
+func (feedWriter *Writer) UpdateFeedMetadata(
+ updateContext context.Context,
+ feedIdentifier string,
+ entityTag string,
+ lastModified string,
+ feedTitle string,
+ siteURL string,
+) error {
+ currentTime := time.Now().UTC()
+
+ var titleParameter *string
+ if feedTitle != "" {
+ titleParameter = &feedTitle
+ }
+
+ var siteURLParameter *string
+ if siteURL != "" {
+ siteURLParameter = &siteURL
+ }
+
+ updateQuery := `
+ UPDATE feeds
+ SET
+ last_fetched_at = $1::timestamptz,
+ etag = $2,
+ last_modified = $3,
+ consecutive_failures = 0,
+ last_fetch_error = NULL,
+ last_fetch_error_at = NULL,
+ next_fetch_at = $1::timestamptz + (fetch_interval_seconds * INTERVAL '1 second'),
+ title = COALESCE($5, title),
+ site_url = COALESCE($6, site_url)
+ WHERE id = $4
+ `
+ _, executeError := feedWriter.databaseConnectionPool.Exec(
+ updateContext,
+ updateQuery,
+ currentTime,
+ entityTag,
+ lastModified,
+ feedIdentifier,
+ titleParameter,
+ siteURLParameter,
+ )
+
+ if executeError != nil {
+ return fmt.Errorf("failed to update feed metadata: %w", executeError)
+ }
+
+ return nil
+}
+
+func (feedWriter *Writer) UpdateFeedType(
+ updateContext context.Context,
+ feedIdentifier string,
+ feedType string,
+) error {
+ updateQuery := `UPDATE feeds SET feed_type = $1 WHERE id = $2`
+ _, executeError := feedWriter.databaseConnectionPool.Exec(
+ updateContext,
+ updateQuery,
+ feedType,
+ feedIdentifier,
+ )
+
+ if executeError != nil {
+ return fmt.Errorf("failed to update feed type: %w", executeError)
+ }
+
+ return nil
+}
+
+func (feedWriter *Writer) RecordFeedError(
+ updateContext context.Context,
+ feedIdentifier string,
+ errorMessage string,
+) error {
+ currentTime := time.Now().UTC()
+ updateQuery := `
+ UPDATE feeds
+ SET
+ last_fetched_at = $1::timestamptz,
+ last_fetch_error = $2,
+ last_fetch_error_at = $1::timestamptz,
+ consecutive_failures = consecutive_failures + 1,
+ next_fetch_at = $1::timestamptz + (LEAST(POWER(2, consecutive_failures) * 60, 21600) * INTERVAL '1 second')
+ WHERE id = $3
+ `
+ _, executeError := feedWriter.databaseConnectionPool.Exec(
+ updateContext,
+ updateQuery,
+ currentTime,
+ errorMessage,
+ feedIdentifier,
+ )
+
+ if executeError != nil {
+ return fmt.Errorf("failed to record feed error: %w", executeError)
+ }
+
+ return nil
+}