diff options
| author | Fuwn <[email protected]> | 2026-02-07 01:42:57 -0800 |
|---|---|---|
| committer | Fuwn <[email protected]> | 2026-02-07 01:42:57 -0800 |
| commit | 5c5b1993edd890a80870ee05607ac5f088191d4e (patch) | |
| tree | a721b76bcd49ba10826c53efc87302c7a689512f /services | |
| download | asa.news-5c5b1993edd890a80870ee05607ac5f088191d4e.tar.xz asa.news-5c5b1993edd890a80870ee05607ac5f088191d4e.zip | |
feat: asa.news RSS reader with developer tier, REST API, and webhooks
Full-stack RSS reader SaaS: Supabase + Next.js + Go worker.
Includes three subscription tiers (free/pro/developer), API key auth,
read-only REST API, webhook push notifications, Stripe billing with
proration, and PWA support.
Diffstat (limited to 'services')
20 files changed, 2546 insertions, 0 deletions
diff --git a/services/worker/Dockerfile b/services/worker/Dockerfile new file mode 100644 index 0000000..59678ff --- /dev/null +++ b/services/worker/Dockerfile @@ -0,0 +1,18 @@ +FROM golang:1.23-bookworm AS builder + +WORKDIR /build + +COPY go.mod go.sum ./ +RUN go mod download + +COPY . . + +RUN CGO_ENABLED=0 GOOS=linux go build -ldflags="-s -w" -o /build/worker ./cmd/worker + +FROM gcr.io/distroless/static:nonroot + +COPY --from=builder /build/worker /worker + +USER nonroot:nonroot + +ENTRYPOINT ["/worker"] diff --git a/services/worker/Taskfile.yaml b/services/worker/Taskfile.yaml new file mode 100644 index 0000000..ea32aab --- /dev/null +++ b/services/worker/Taskfile.yaml @@ -0,0 +1,57 @@ +version: "3" + +vars: + BINARY: worker + +tasks: + default: + desc: Build the application + cmds: + - task: build + + build: + desc: Build the binary with optimisations + cmds: + - go build -ldflags="-s -w" -o {{.BINARY}} ./cmd/worker + sources: + - ./**/*.go + generates: + - ./{{.BINARY}} + + run: + desc: Build and run the application + deps: [build] + cmds: + - ./{{.BINARY}} + + clean: + desc: Remove build artifacts + cmds: + - rm -f {{.BINARY}} + - go clean + + test: + desc: Run tests + cmds: + - go test ./... + + fmt: + desc: Format code + cmds: + - iku -w . || go fmt ./... + + lint: + desc: Run linter + cmds: + - golangci-lint run + + dev: + desc: Build and run in development mode + cmds: + - go run ./cmd/worker + + tidy: + desc: Tidy and verify module dependencies + cmds: + - go mod tidy + - go mod verify diff --git a/services/worker/cmd/worker/main.go b/services/worker/cmd/worker/main.go new file mode 100644 index 0000000..9732ec6 --- /dev/null +++ b/services/worker/cmd/worker/main.go @@ -0,0 +1,143 @@ +package main + +import ( + "context" + "fmt" + "github.com/Fuwn/asa-news/internal/configuration" + "github.com/Fuwn/asa-news/internal/database" + "github.com/Fuwn/asa-news/internal/fetcher" + "github.com/Fuwn/asa-news/internal/health" + "github.com/Fuwn/asa-news/internal/parser" + "github.com/Fuwn/asa-news/internal/pool" + "github.com/Fuwn/asa-news/internal/scheduler" + "github.com/Fuwn/asa-news/internal/webhook" + "github.com/Fuwn/asa-news/internal/writer" + "log/slog" + "os" + "os/signal" + "syscall" + "time" +) + +func main() { + exitCode := run() + + os.Exit(exitCode) +} + +func run() int { + applicationConfiguration, configurationError := configuration.Load() + + if configurationError != nil { + fmt.Fprintf(os.Stderr, "failed to load configuration: %v\n", configurationError) + + return 1 + } + + logger := createLogger(applicationConfiguration) + + logger.Info("starting feed worker service") + + applicationContext, cancelApplication := signal.NotifyContext( + context.Background(), + syscall.SIGINT, + syscall.SIGTERM, + ) + + defer cancelApplication() + + databaseConnectionPool, databaseError := database.CreateConnectionPool( + applicationContext, + applicationConfiguration.DatabaseURL, + ) + + if databaseError != nil { + logger.Error("failed to create database connection pool", "error", databaseError) + + return 1 + } + + defer databaseConnectionPool.Close() + + feedFetcher := fetcher.NewFetcher(applicationConfiguration.FetchTimeout) + feedParser := parser.NewParser() + feedWriter := writer.NewWriter(databaseConnectionPool) + webhookDispatcher := webhook.NewDispatcher(databaseConnectionPool, logger) + workerPool := pool.NewWorkerPool(applicationConfiguration.WorkerConcurrency, logger) + healthServer := health.NewHealthServer( + applicationConfiguration.HealthPort, + databaseConnectionPool, + logger, + ) + + healthServer.Start() + + feedScheduler := scheduler.NewScheduler( + databaseConnectionPool, + feedFetcher, + feedParser, + feedWriter, + webhookDispatcher, + workerPool, + applicationConfiguration.PollInterval, + applicationConfiguration.QueuePollInterval, + applicationConfiguration.BatchSize, + logger, + ) + + logger.Info("feed worker service started", + "worker_concurrency", applicationConfiguration.WorkerConcurrency, + "poll_interval", applicationConfiguration.PollInterval, + "health_port", applicationConfiguration.HealthPort, + ) + feedScheduler.Run(applicationContext) + logger.Info("shutting down feed worker service") + logger.Info("waiting for in-flight work to complete") + workerPool.Wait() + + shutdownContext, cancelShutdown := context.WithTimeout(context.Background(), 10*time.Second) + + defer cancelShutdown() + + healthShutdownError := healthServer.Stop(shutdownContext) + + if healthShutdownError != nil { + logger.Error("health server shutdown error", "error", healthShutdownError) + } + + logger.Info("feed worker service stopped") + + return 0 +} + +func createLogger(applicationConfiguration configuration.Configuration) *slog.Logger { + logLevel := resolveLogLevel(applicationConfiguration.LogLevel) + handlerOptions := &slog.HandlerOptions{ + Level: logLevel, + } + + var logHandler slog.Handler + + if applicationConfiguration.LogJSON { + logHandler = slog.NewJSONHandler(os.Stdout, handlerOptions) + } else { + logHandler = slog.NewTextHandler(os.Stdout, handlerOptions) + } + + return slog.New(logHandler) +} + +func resolveLogLevel(levelString string) slog.Level { + switch levelString { + case "debug": + return slog.LevelDebug + case "info": + return slog.LevelInfo + case "warn": + return slog.LevelWarn + case "error": + return slog.LevelError + default: + return slog.LevelInfo + } +} diff --git a/services/worker/go.mod b/services/worker/go.mod new file mode 100644 index 0000000..2588959 --- /dev/null +++ b/services/worker/go.mod @@ -0,0 +1,81 @@ +module github.com/Fuwn/asa-news + +go 1.24.0 + +toolchain go1.25.6 + +require ( + github.com/craigpastro/pgmq-go v0.6.0 + github.com/jackc/pgx/v5 v5.7.2 + github.com/mmcdole/gofeed v1.3.0 +) + +require ( + dario.cat/mergo v1.0.1 // indirect + github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 // indirect + github.com/Microsoft/go-winio v0.6.2 // indirect + github.com/PuerkitoBio/goquery v1.8.0 // indirect + github.com/andybalholm/cascadia v1.3.1 // indirect + github.com/avast/retry-go/v4 v4.6.0 // indirect + github.com/cenkalti/backoff/v4 v4.3.0 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/containerd/log v0.1.0 // indirect + github.com/containerd/platforms v0.2.1 // indirect + github.com/cpuguy83/dockercfg v0.3.2 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/distribution/reference v0.6.0 // indirect + github.com/docker/docker v27.3.1+incompatible // indirect + github.com/docker/go-connections v0.5.0 // indirect + github.com/docker/go-units v0.5.0 // indirect + github.com/felixge/httpsnoop v1.0.4 // indirect + github.com/go-logr/logr v1.4.3 // indirect + github.com/go-logr/stdr v1.2.2 // indirect + github.com/go-ole/go-ole v1.3.0 // indirect + github.com/gogo/protobuf v1.3.2 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/jackc/pgpassfile v1.0.0 // indirect + github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect + github.com/jackc/puddle/v2 v2.2.2 // indirect + github.com/json-iterator/go v1.1.12 // indirect + github.com/klauspost/compress v1.17.11 // indirect + github.com/lufia/plan9stats v0.0.0-20240909124753-873cd0166683 // indirect + github.com/magiconair/properties v1.8.7 // indirect + github.com/mmcdole/goxpp v1.1.1-0.20240225020742-a0c311522b23 // indirect + github.com/moby/docker-image-spec v1.3.1 // indirect + github.com/moby/patternmatcher v0.6.0 // indirect + github.com/moby/sys/sequential v0.6.0 // indirect + github.com/moby/sys/user v0.3.0 // indirect + github.com/moby/sys/userns v0.1.0 // indirect + github.com/moby/term v0.5.0 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/morikuni/aec v1.0.0 // indirect + github.com/opencontainers/go-digest v1.0.0 // indirect + github.com/opencontainers/image-spec v1.1.0 // indirect + github.com/pkg/errors v0.9.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/power-devops/perfstat v0.0.0-20240221224432-82ca36839d55 // indirect + github.com/shirou/gopsutil/v3 v3.24.5 // indirect + github.com/shoenig/go-m1cpu v0.1.6 // indirect + github.com/sirupsen/logrus v1.9.3 // indirect + github.com/stretchr/testify v1.11.1 // indirect + github.com/testcontainers/testcontainers-go v0.35.0 // indirect + github.com/tklauser/go-sysconf v0.3.14 // indirect + github.com/tklauser/numcpus v0.9.0 // indirect + github.com/yusufpapurcu/wmi v1.2.4 // indirect + go.opentelemetry.io/auto/sdk v1.2.1 // indirect + go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.56.0 // indirect + go.opentelemetry.io/otel v1.40.0 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.40.0 // indirect + go.opentelemetry.io/otel/metric v1.40.0 // indirect + go.opentelemetry.io/otel/sdk v1.40.0 // indirect + go.opentelemetry.io/otel/trace v1.40.0 // indirect + go.opentelemetry.io/proto/otlp v1.9.0 // indirect + golang.org/x/crypto v0.41.0 // indirect + golang.org/x/net v0.43.0 // indirect + golang.org/x/sync v0.16.0 // indirect + golang.org/x/sys v0.40.0 // indirect + golang.org/x/text v0.28.0 // indirect + google.golang.org/protobuf v1.36.11 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/services/worker/go.sum b/services/worker/go.sum new file mode 100644 index 0000000..8a61ca7 --- /dev/null +++ b/services/worker/go.sum @@ -0,0 +1,226 @@ +dario.cat/mergo v1.0.1 h1:Ra4+bf83h2ztPIQYNP99R6m+Y7KfnARDfID+a+vLl4s= +dario.cat/mergo v1.0.1/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk= +github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24 h1:bvDV9vkmnHYOMsOr4WLk+Vo07yKIzd94sVoIqshQ4bU= +github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24/go.mod h1:8o94RPi1/7XTJvwPpRSzSUedZrtlirdB3r9Z20bi2f8= +github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 h1:L/gRVlceqvL25UVaW/CKtUDjefjrs0SPonmDGUVOYP0= +github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E= +github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY= +github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU= +github.com/PuerkitoBio/goquery v1.8.0 h1:PJTF7AmFCFKk1N6V6jmKfrNH9tV5pNE6lZMkG0gta/U= +github.com/PuerkitoBio/goquery v1.8.0/go.mod h1:ypIiRMtY7COPGk+I/YbZLbxsxn9g5ejnI2HSMtkjZvI= +github.com/andybalholm/cascadia v1.3.1 h1:nhxRkql1kdYCc8Snf7D5/D3spOX+dBgjA6u8x004T2c= +github.com/andybalholm/cascadia v1.3.1/go.mod h1:R4bJ1UQfqADjvDa4P6HZHLh/3OxWWEqc0Sk8XGwHqvA= +github.com/avast/retry-go/v4 v4.6.0 h1:K9xNA+KeB8HHc2aWFuLb25Offp+0iVRXEvFx8IinRJA= +github.com/avast/retry-go/v4 v4.6.0/go.mod h1:gvWlPhBVsvBbLkVGDg/KwvBv0bEkCOLRRSHKIr2PyOE= +github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= +github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/containerd/log v0.1.0 h1:TCJt7ioM2cr/tfR8GPbGf9/VRAX8D2B4PjzCpfX540I= +github.com/containerd/log v0.1.0/go.mod h1:VRRf09a7mHDIRezVKTRCrOq78v577GXq3bSa3EhrzVo= +github.com/containerd/platforms v0.2.1 h1:zvwtM3rz2YHPQsF2CHYM8+KtB5dvhISiXh5ZpSBQv6A= +github.com/containerd/platforms v0.2.1/go.mod h1:XHCb+2/hzowdiut9rkudds9bE5yJ7npe7dG/wG+uFPw= +github.com/cpuguy83/dockercfg v0.3.2 h1:DlJTyZGBDlXqUZ2Dk2Q3xHs/FtnooJJVaad2S9GKorA= +github.com/cpuguy83/dockercfg v0.3.2/go.mod h1:sugsbF4//dDlL/i+S+rtpIWp+5h0BHJHfjj5/jFyUJc= +github.com/craigpastro/pgmq-go v0.6.0 h1:6hrQngiiB6SkpXJNR88dX4bqrZBfCLiuWPQzKoy50RI= +github.com/craigpastro/pgmq-go v0.6.0/go.mod h1:cDDC2UWrRXP9l0e1dKlKnW3hHftNDobMlCgbw+V56og= +github.com/creack/pty v1.1.18 h1:n56/Zwd5o6whRC5PMGretI4IdRLlmBXYNjScPaBgsbY= +github.com/creack/pty v1.1.18/go.mod h1:MOBLtS5ELjhRRrroQr9kyvTxUAFNvYEK993ew/Vr4O4= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/distribution/reference v0.6.0 h1:0IXCQ5g4/QMHHkarYzh5l+u8T3t73zM5QvfrDyIgxBk= +github.com/distribution/reference v0.6.0/go.mod h1:BbU0aIcezP1/5jX/8MP0YiH4SdvB5Y4f/wlDRiLyi3E= +github.com/docker/docker v27.3.1+incompatible h1:KttF0XoteNTicmUtBO0L2tP+J7FGRFTjaEF4k6WdhfI= +github.com/docker/docker v27.3.1+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= +github.com/docker/go-connections v0.5.0 h1:USnMq7hx7gwdVZq1L49hLXaFtUdTADjXGp+uj1Br63c= +github.com/docker/go-connections v0.5.0/go.mod h1:ov60Kzw0kKElRwhNs9UlUHAE/F9Fe6GLaXnqyDdmEXc= +github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4= +github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= +github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= +github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= +github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= +github.com/go-ole/go-ole v1.3.0 h1:Dt6ye7+vXGIKZ7Xtk4s6/xVdGDQynvom7xCFEdWr6uE= +github.com/go-ole/go-ole v1.3.0/go.mod h1:5LS6F96DhAwUc7C+1HLexzMXY1xGRSryjyPPKW6zv78= +github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= +github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.2 h1:8Tjv8EJ+pM1xP8mK6egEbD1OgnVTyacbefKhmbLhIhU= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.2/go.mod h1:pkJQ2tZHJ0aFOVEEot6oZmaVEZcRme73eIFmhiVuRWs= +github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= +github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= +github.com/jackc/pgx/v5 v5.7.2 h1:mLoDLV6sonKlvjIEsV56SkWNCnuNv531l94GaIzO+XI= +github.com/jackc/pgx/v5 v5.7.2/go.mod h1:ncY89UGWxg82EykZUwSpUKEfccBGGYq1xjrOpsbsfGQ= +github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo= +github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= +github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= +github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= +github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc= +github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/lufia/plan9stats v0.0.0-20240909124753-873cd0166683 h1:7UMa6KCCMjZEMDtTVdcGu0B1GmmC7QJKiCCjyTAWQy0= +github.com/lufia/plan9stats v0.0.0-20240909124753-873cd0166683/go.mod h1:ilwx/Dta8jXAgpFYFvSWEMwxmbWXyiUHkd5FwyKhb5k= +github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY= +github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0= +github.com/mmcdole/gofeed v1.3.0 h1:5yn+HeqlcvjMeAI4gu6T+crm7d0anY85+M+v6fIFNG4= +github.com/mmcdole/gofeed v1.3.0/go.mod h1:9TGv2LcJhdXePDzxiuMnukhV2/zb6VtnZt1mS+SjkLE= +github.com/mmcdole/goxpp v1.1.1-0.20240225020742-a0c311522b23 h1:Zr92CAlFhy2gL+V1F+EyIuzbQNbSgP4xhTODZtrXUtk= +github.com/mmcdole/goxpp v1.1.1-0.20240225020742-a0c311522b23/go.mod h1:v+25+lT2ViuQ7mVxcncQ8ch1URund48oH+jhjiwEgS8= +github.com/moby/docker-image-spec v1.3.1 h1:jMKff3w6PgbfSa69GfNg+zN/XLhfXJGnEx3Nl2EsFP0= +github.com/moby/docker-image-spec v1.3.1/go.mod h1:eKmb5VW8vQEh/BAr2yvVNvuiJuY6UIocYsFu/DxxRpo= +github.com/moby/patternmatcher v0.6.0 h1:GmP9lR19aU5GqSSFko+5pRqHi+Ohk1O69aFiKkVGiPk= +github.com/moby/patternmatcher v0.6.0/go.mod h1:hDPoyOpDY7OrrMDLaYoY3hf52gNCR/YOUYxkhApJIxc= +github.com/moby/sys/sequential v0.6.0 h1:qrx7XFUd/5DxtqcoH1h438hF5TmOvzC/lspjy7zgvCU= +github.com/moby/sys/sequential v0.6.0/go.mod h1:uyv8EUTrca5PnDsdMGXhZe6CCe8U/UiTWd+lL+7b/Ko= +github.com/moby/sys/user v0.3.0 h1:9ni5DlcW5an3SvRSx4MouotOygvzaXbaSrc/wGDFWPo= +github.com/moby/sys/user v0.3.0/go.mod h1:bG+tYYYJgaMtRKgEmuueC0hJEAZWwtIbZTB+85uoHjs= +github.com/moby/sys/userns v0.1.0 h1:tVLXkFOxVu9A64/yh59slHVv9ahO9UIev4JZusOLG/g= +github.com/moby/sys/userns v0.1.0/go.mod h1:IHUYgu/kao6N8YZlp9Cf444ySSvCmDlmzUcYfDHOl28= +github.com/moby/term v0.5.0 h1:xt8Q1nalod/v7BqbG21f8mQPqH+xAaC9C3N3wfWbVP0= +github.com/moby/term v0.5.0/go.mod h1:8FzsFHVUBGZdbDsJw/ot+X+d5HLUbvklYLJ9uGfcI3Y= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A= +github.com/morikuni/aec v1.0.0/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7PXmsc= +github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= +github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= +github.com/opencontainers/image-spec v1.1.0 h1:8SG7/vwALn54lVB/0yZ/MMwhFrPYtpEHQb2IpWsCzug= +github.com/opencontainers/image-spec v1.1.0/go.mod h1:W4s4sFTMaBeK1BQLXbG4AdM2szdn85PY75RI83NrTrM= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/power-devops/perfstat v0.0.0-20240221224432-82ca36839d55 h1:o4JXh1EVt9k/+g42oCprj/FisM4qX9L3sZB3upGN2ZU= +github.com/power-devops/perfstat v0.0.0-20240221224432-82ca36839d55/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE= +github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= +github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc= +github.com/shirou/gopsutil/v3 v3.24.5 h1:i0t8kL+kQTvpAYToeuiVk3TgDeKOFioZO3Ztz/iZ9pI= +github.com/shirou/gopsutil/v3 v3.24.5/go.mod h1:bsoOS1aStSs9ErQ1WWfxllSeS1K5D+U30r2NfcubMVk= +github.com/shoenig/go-m1cpu v0.1.6 h1:nxdKQNcEB6vzgA2E2bvzKIYRuNj7XNJ4S/aRSwKzFtM= +github.com/shoenig/go-m1cpu v0.1.6/go.mod h1:1JJMcUBvfNwpq05QDQVAnx3gUHr9IYF7GNg9SUEw2VQ= +github.com/shoenig/test v0.6.4 h1:kVTaSd7WLz5WZ2IaoM0RSzRsUD+m8wRR+5qvntpn4LU= +github.com/shoenig/test v0.6.4/go.mod h1:byHiCGXqrVaflBLAMq/srcZIHynQPQgeyvkvXnjqq0k= +github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= +github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= +github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +github.com/testcontainers/testcontainers-go v0.35.0 h1:uADsZpTKFAtp8SLK+hMwSaa+X+JiERHtd4sQAFmXeMo= +github.com/testcontainers/testcontainers-go v0.35.0/go.mod h1:oEVBj5zrfJTrgjwONs1SsRbnBtH9OKl+IGl3UMcr2B4= +github.com/tklauser/go-sysconf v0.3.14 h1:g5vzr9iPFFz24v2KZXs/pvpvh8/V9Fw6vQK5ZZb78yU= +github.com/tklauser/go-sysconf v0.3.14/go.mod h1:1ym4lWMLUOhuBOPGtRcJm7tEGX4SCYNEEEtghGG/8uY= +github.com/tklauser/numcpus v0.9.0 h1:lmyCHtANi8aRUgkckBgoDk1nHCux3n2cgkJLXdQGPDo= +github.com/tklauser/numcpus v0.9.0/go.mod h1:SN6Nq1O3VychhC1npsWostA+oW+VOQTxZrS604NSRyI= +github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0= +github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= +go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64= +go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.56.0 h1:UP6IpuHFkUgOQL9FFQFrZ+5LiwhhYRbi7VZSIx6Nj5s= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.56.0/go.mod h1:qxuZLtbq5QDtdeSHsS7bcf6EH6uO6jUAgk764zd3rhM= +go.opentelemetry.io/otel v1.40.0 h1:oA5YeOcpRTXq6NN7frwmwFR0Cn3RhTVZvXsP4duvCms= +go.opentelemetry.io/otel v1.40.0/go.mod h1:IMb+uXZUKkMXdPddhwAHm6UfOwJyh4ct1ybIlV14J0g= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.40.0 h1:QKdN8ly8zEMrByybbQgv8cWBcdAarwmIPZ6FThrWXJs= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.40.0/go.mod h1:bTdK1nhqF76qiPoCCdyFIV+N/sRHYXYCTQc+3VCi3MI= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.19.0 h1:IeMeyr1aBvBiPVYihXIaeIZba6b8E1bYp7lbdxK8CQg= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.19.0/go.mod h1:oVdCUtjq9MK9BlS7TtucsQwUcXcymNiEDjgDD2jMtZU= +go.opentelemetry.io/otel/metric v1.40.0 h1:rcZe317KPftE2rstWIBitCdVp89A2HqjkxR3c11+p9g= +go.opentelemetry.io/otel/metric v1.40.0/go.mod h1:ib/crwQH7N3r5kfiBZQbwrTge743UDc7DTFVZrrXnqc= +go.opentelemetry.io/otel/sdk v1.40.0 h1:KHW/jUzgo6wsPh9At46+h4upjtccTmuZCFAc9OJ71f8= +go.opentelemetry.io/otel/sdk v1.40.0/go.mod h1:Ph7EFdYvxq72Y8Li9q8KebuYUr2KoeyHx0DRMKrYBUE= +go.opentelemetry.io/otel/trace v1.40.0 h1:WA4etStDttCSYuhwvEa8OP8I5EWu24lkOzp+ZYblVjw= +go.opentelemetry.io/otel/trace v1.40.0/go.mod h1:zeAhriXecNGP/s2SEG3+Y8X9ujcJOTqQ5RgdEJcawiA= +go.opentelemetry.io/proto/otlp v1.9.0 h1:l706jCMITVouPOqEnii2fIAuO3IVGBRPV5ICjceRb/A= +go.opentelemetry.io/proto/otlp v1.9.0/go.mod h1:xE+Cx5E/eEHw+ISFkwPLwCZefwVjY+pqKg1qcK03+/4= +go.uber.org/mock v0.5.0 h1:KAMbZvZPyBPWgD14IrIQ38QCyjwpvVVV6K/bHl1IwQU= +go.uber.org/mock v0.5.0/go.mod h1:ge71pBPLYDk7QIi1LupWxdAykm7KIEFchiOqd6z7qMM= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.41.0 h1:WKYxWedPGCTVVl5+WHSSrOBT0O8lx32+zxmHxijgXp4= +golang.org/x/crypto v0.41.0/go.mod h1:pO5AFd7FA68rFak7rOAGVuygIISepHftHnr8dr6+sUc= +golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.0.0-20210916014120-12bc252f5db8/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.43.0 h1:lat02VYK2j4aLzMzecihNvTlJNQUq316m2Mr9rnM6YE= +golang.org/x/net v0.43.0/go.mod h1:vhO1fvI4dGsIjh73sWfUVjj3N7CA9WkKJNQm2svM6Jg= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.16.0 h1:ycBJEhp9p4vXvUZNszeOq0kGTPghopOL8q0fq3vstxw= +golang.org/x/sync v0.16.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201204225414-ed752295db88/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.40.0 h1:DBZZqJ2Rkml6QMQsZywtnjnnGvHza6BTfYFWY9kjEWQ= +golang.org/x/sys v0.40.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.34.0 h1:O/2T7POpk0ZZ7MAzMeWFSg6S5IpWd/RXDlM9hgM3DR4= +golang.org/x/term v0.34.0/go.mod h1:5jC53AEywhIVebHgPVeg0mj8OD3VO9OzclacVrqpaAw= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.28.0 h1:rhazDwis8INMIwQ4tpjLDzUhx6RlXqZNPEM0huQojng= +golang.org/x/text v0.28.0/go.mod h1:U8nCwOR8jO/marOQ0QbDiOngZVEBB7MAiitBuMjXiNU= +golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 h1:vVKdlvoWBphwdxWKrFZEuM0kGgGLxUOYcY4U/2Vjg44= +golang.org/x/time v0.0.0-20220210224613-90d013bbcef8/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= +golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/genproto/googleapis/api v0.0.0-20250825161204-c5933d9347a5 h1:BIRfGDEjiHRrk0QKZe3Xv2ieMhtgRGeLcZQ0mIVn4EY= +google.golang.org/genproto/googleapis/api v0.0.0-20250825161204-c5933d9347a5/go.mod h1:j3QtIyytwqGr1JUDtYXwtMXWPKsEa5LtzIFN1Wn5WvE= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250825161204-c5933d9347a5 h1:eaY8u2EuxbRv7c3NiGK0/NedzVsCcV6hDuU5qPX5EGE= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250825161204-c5933d9347a5/go.mod h1:M4/wBTSeyLxupu3W3tJtOgB14jILAS/XWPSSa3TAlJc= +google.golang.org/grpc v1.75.1 h1:/ODCNEuf9VghjgO3rqLcfg8fiOP0nSluljWFlDxELLI= +google.golang.org/grpc v1.75.1/go.mod h1:JtPAzKiq4v1xcAB2hydNlWI2RnF85XXcV0mhKXr2ecQ= +google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE= +google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gotest.tools/v3 v3.5.1 h1:EENdUnS3pdur5nybKYIh2Vfgc8IUNBjxDPSjtiJcOzU= +gotest.tools/v3 v3.5.1/go.mod h1:isy3WKz7GK6uNw/sbHzfKBLvlvXwUyV06n6brMxxopU= diff --git a/services/worker/internal/configuration/configuration.go b/services/worker/internal/configuration/configuration.go new file mode 100644 index 0000000..84a5995 --- /dev/null +++ b/services/worker/internal/configuration/configuration.go @@ -0,0 +1,110 @@ +package configuration + +import ( + "fmt" + "os" + "strconv" + "time" +) + +type Configuration struct { + DatabaseURL string + WorkerConcurrency int + PollInterval time.Duration + FetchTimeout time.Duration + QueuePollInterval time.Duration + BatchSize int + HealthPort int + EncryptionKey string + LogLevel string + LogJSON bool +} + +func Load() (Configuration, error) { + databaseURL := os.Getenv("DATABASE_URL") + + if databaseURL == "" { + return Configuration{}, fmt.Errorf("DATABASE_URL is required") + } + + workerConcurrency := getEnvironmentInteger("WORKER_CONCURRENCY", 10) + pollInterval := getEnvironmentDuration("POLL_INTERVAL", 30*time.Second) + fetchTimeout := getEnvironmentDuration("FETCH_TIMEOUT", 30*time.Second) + queuePollInterval := getEnvironmentDuration("QUEUE_POLL_INTERVAL", 5*time.Second) + batchSize := getEnvironmentInteger("BATCH_SIZE", 50) + healthPort := getEnvironmentInteger("HEALTH_PORT", 8080) + encryptionKey := os.Getenv("ENCRYPTION_KEY") + logLevel := getEnvironmentString("LOG_LEVEL", "info") + logJSON := getEnvironmentBoolean("LOG_JSON", false) + + return Configuration{ + DatabaseURL: databaseURL, + WorkerConcurrency: workerConcurrency, + PollInterval: pollInterval, + FetchTimeout: fetchTimeout, + QueuePollInterval: queuePollInterval, + BatchSize: batchSize, + HealthPort: healthPort, + EncryptionKey: encryptionKey, + LogLevel: logLevel, + LogJSON: logJSON, + }, nil +} + +func getEnvironmentString(key string, defaultValue string) string { + value := os.Getenv(key) + + if value == "" { + return defaultValue + } + + return value +} + +func getEnvironmentInteger(key string, defaultValue int) int { + raw := os.Getenv(key) + + if raw == "" { + return defaultValue + } + + parsed, parseError := strconv.Atoi(raw) + + if parseError != nil { + return defaultValue + } + + return parsed +} + +func getEnvironmentDuration(key string, defaultValue time.Duration) time.Duration { + raw := os.Getenv(key) + + if raw == "" { + return defaultValue + } + + parsed, parseError := time.ParseDuration(raw) + + if parseError != nil { + return defaultValue + } + + return parsed +} + +func getEnvironmentBoolean(key string, defaultValue bool) bool { + raw := os.Getenv(key) + + if raw == "" { + return defaultValue + } + + parsed, parseError := strconv.ParseBool(raw) + + if parseError != nil { + return defaultValue + } + + return parsed +} diff --git a/services/worker/internal/database/database.go b/services/worker/internal/database/database.go new file mode 100644 index 0000000..2b2900f --- /dev/null +++ b/services/worker/internal/database/database.go @@ -0,0 +1,39 @@ +package database + +import ( + "context" + "fmt" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" + "time" +) + +func CreateConnectionPool(parentContext context.Context, databaseURL string) (*pgxpool.Pool, error) { + poolConfiguration, parseError := pgxpool.ParseConfig(databaseURL) + + if parseError != nil { + return nil, fmt.Errorf("failed to parse database URL: %w", parseError) + } + + poolConfiguration.MaxConns = 25 + poolConfiguration.MinConns = 5 + poolConfiguration.MaxConnLifetime = 30 * time.Minute + poolConfiguration.MaxConnIdleTime = 5 * time.Minute + poolConfiguration.HealthCheckPeriod = 30 * time.Second + poolConfiguration.ConnConfig.DefaultQueryExecMode = pgx.QueryExecModeExec + connectionPool, connectionError := pgxpool.NewWithConfig(parentContext, poolConfiguration) + + if connectionError != nil { + return nil, fmt.Errorf("failed to create connection pool: %w", connectionError) + } + + pingError := connectionPool.Ping(parentContext) + + if pingError != nil { + connectionPool.Close() + + return nil, fmt.Errorf("failed to ping database: %w", pingError) + } + + return connectionPool, nil +} diff --git a/services/worker/internal/fetcher/authentication.go b/services/worker/internal/fetcher/authentication.go new file mode 100644 index 0000000..ba10196 --- /dev/null +++ b/services/worker/internal/fetcher/authentication.go @@ -0,0 +1,43 @@ +package fetcher + +import ( + "encoding/base64" + "fmt" + "net/http" + "net/url" +) + +type AuthenticationConfiguration struct { + AuthenticationType string + AuthenticationValue string +} + +func ApplyAuthentication(request *http.Request, authenticationConfig AuthenticationConfiguration) error { + switch authenticationConfig.AuthenticationType { + case "bearer": + request.Header.Set("Authorization", "Bearer "+authenticationConfig.AuthenticationValue) + case "basic": + encodedCredentials := base64.StdEncoding.EncodeToString([]byte(authenticationConfig.AuthenticationValue)) + + request.Header.Set("Authorization", "Basic "+encodedCredentials) + case "query_param": + existingURL, parseError := url.Parse(request.URL.String()) + + if parseError != nil { + return fmt.Errorf("failed to parse request URL for query param authentication: %w", parseError) + } + + queryParameters := existingURL.Query() + + queryParameters.Set("key", authenticationConfig.AuthenticationValue) + + existingURL.RawQuery = queryParameters.Encode() + request.URL = existingURL + case "": + return nil + default: + return fmt.Errorf("unsupported authentication type: %s", authenticationConfig.AuthenticationType) + } + + return nil +} diff --git a/services/worker/internal/fetcher/errors.go b/services/worker/internal/fetcher/errors.go new file mode 100644 index 0000000..abf5553 --- /dev/null +++ b/services/worker/internal/fetcher/errors.go @@ -0,0 +1,145 @@ +package fetcher + +import ( + "errors" + "fmt" + "net" + "net/url" + "strings" +) + +type FetchError struct { + StatusCode int + UserMessage string + Retryable bool + UnderlyingError error +} + +func (fetchError *FetchError) Error() string { + if fetchError.UnderlyingError != nil { + return fmt.Sprintf("%s: %v", fetchError.UserMessage, fetchError.UnderlyingError) + } + + return fetchError.UserMessage +} + +func (fetchError *FetchError) Unwrap() error { + return fetchError.UnderlyingError +} + +func ClassifyError(originalError error, statusCode int) *FetchError { + if statusCode >= 400 { + return classifyHTTPStatusCode(statusCode, originalError) + } + + return classifyNetworkError(originalError) +} + +func classifyHTTPStatusCode(statusCode int, originalError error) *FetchError { + switch statusCode { + case 401: + return &FetchError{ + StatusCode: statusCode, + UserMessage: "authentication required - check feed credentials", + Retryable: false, + UnderlyingError: originalError, + } + case 403: + return &FetchError{ + StatusCode: statusCode, + UserMessage: "access forbidden - feed may require different credentials", + Retryable: false, + UnderlyingError: originalError, + } + case 404: + return &FetchError{ + StatusCode: statusCode, + UserMessage: "feed not found - URL may be incorrect or feed removed", + Retryable: false, + UnderlyingError: originalError, + } + case 410: + return &FetchError{ + StatusCode: statusCode, + UserMessage: "feed permanently removed", + Retryable: false, + UnderlyingError: originalError, + } + case 429: + return &FetchError{ + StatusCode: statusCode, + UserMessage: "rate limited by feed server - will retry later", + Retryable: true, + UnderlyingError: originalError, + } + case 500, 502, 503, 504: + return &FetchError{ + StatusCode: statusCode, + UserMessage: fmt.Sprintf("feed server error (HTTP %d) - will retry later", statusCode), + Retryable: true, + UnderlyingError: originalError, + } + default: + return &FetchError{ + StatusCode: statusCode, + UserMessage: fmt.Sprintf("unexpected HTTP status %d", statusCode), + Retryable: statusCode >= 500, + UnderlyingError: originalError, + } + } +} + +func classifyNetworkError(originalError error) *FetchError { + if originalError == nil { + return &FetchError{ + UserMessage: "unknown fetch error", + Retryable: true, + } + } + + var dnsError *net.DNSError + + if errors.As(originalError, &dnsError) { + return &FetchError{ + UserMessage: "DNS resolution failed - check feed URL", + Retryable: !dnsError.IsNotFound, + UnderlyingError: originalError, + } + } + + var urlError *url.Error + + if errors.As(originalError, &urlError) { + if urlError.Timeout() { + return &FetchError{ + UserMessage: "connection timed out - feed server may be slow", + Retryable: true, + UnderlyingError: originalError, + } + } + } + + var netOpError *net.OpError + + if errors.As(originalError, &netOpError) { + return &FetchError{ + UserMessage: "network connection error - will retry later", + Retryable: true, + UnderlyingError: originalError, + } + } + + if strings.Contains(originalError.Error(), "certificate") || strings.Contains(originalError.Error(), "tls") { + return &FetchError{ + UserMessage: "TLS/certificate error - feed server has invalid certificate", + Retryable: false, + UnderlyingError: originalError, + } + } + + return &FetchError{ + UserMessage: "unexpected network error", + Retryable: true, + UnderlyingError: originalError, + } +} diff --git a/services/worker/internal/fetcher/fetcher.go b/services/worker/internal/fetcher/fetcher.go new file mode 100644 index 0000000..019bd39 --- /dev/null +++ b/services/worker/internal/fetcher/fetcher.go @@ -0,0 +1,116 @@ +package fetcher + +import ( + "context" + "fmt" + "io" + "net/http" + "time" +) + +type FetchResult struct { + Body []byte + StatusCode int + EntityTag string + LastModifiedHeader string + NotModified bool +} + +type Fetcher struct { + httpClient *http.Client +} + +func NewFetcher(fetchTimeout time.Duration) *Fetcher { + return &Fetcher{ + httpClient: &http.Client{ + Timeout: fetchTimeout, + CheckRedirect: func(request *http.Request, previousRequests []*http.Request) error { + if len(previousRequests) >= 5 { + return fmt.Errorf("too many redirects (exceeded 5)") + } + + redirectValidationError := ValidateRedirectTarget(request.URL.String()) + if redirectValidationError != nil { + return fmt.Errorf("blocked redirect to reserved address: %w", redirectValidationError) + } + + return nil + }, + }, + } +} + +func (feedFetcher *Fetcher) Fetch( + requestContext context.Context, + feedURL string, + previousEntityTag string, + previousLastModified string, + authenticationConfig AuthenticationConfiguration, +) (*FetchResult, error) { + urlValidationError := ValidateFeedURL(feedURL) + if urlValidationError != nil { + return nil, fmt.Errorf("blocked request to disallowed URL: %w", urlValidationError) + } + + request, requestCreationError := http.NewRequestWithContext(requestContext, http.MethodGet, feedURL, nil) + + if requestCreationError != nil { + return nil, fmt.Errorf("failed to create HTTP request: %w", requestCreationError) + } + + request.Header.Set("User-Agent", "asa.news Feed Worker/1.0") + request.Header.Set("Accept", "application/rss+xml, application/atom+xml, application/xml, text/xml, */*") + + if previousEntityTag != "" { + request.Header.Set("If-None-Match", previousEntityTag) + } + + if previousLastModified != "" { + request.Header.Set("If-Modified-Since", previousLastModified) + } + + authenticationError := ApplyAuthentication(request, authenticationConfig) + + if authenticationError != nil { + return nil, fmt.Errorf("failed to apply authentication: %w", authenticationError) + } + + response, requestError := feedFetcher.httpClient.Do(request) + + if requestError != nil { + classifiedError := ClassifyError(requestError, 0) + + return nil, classifiedError + } + + defer response.Body.Close() + + if response.StatusCode == http.StatusNotModified { + return &FetchResult{ + StatusCode: response.StatusCode, + EntityTag: response.Header.Get("ETag"), + LastModifiedHeader: response.Header.Get("Last-Modified"), + NotModified: true, + }, nil + } + + if response.StatusCode >= 400 { + classifiedError := ClassifyError(nil, response.StatusCode) + + return nil, classifiedError + } + + responseBody, readError := io.ReadAll(io.LimitReader(response.Body, 10*1024*1024)) + + if readError != nil { + return nil, fmt.Errorf("failed to read response body: %w", readError) + } + + return &FetchResult{ + Body: responseBody, + StatusCode: response.StatusCode, + EntityTag: response.Header.Get("ETag"), + LastModifiedHeader: response.Header.Get("Last-Modified"), + NotModified: false, + }, nil +} diff --git a/services/worker/internal/fetcher/ssrf_protection.go b/services/worker/internal/fetcher/ssrf_protection.go new file mode 100644 index 0000000..1887e78 --- /dev/null +++ b/services/worker/internal/fetcher/ssrf_protection.go @@ -0,0 +1,77 @@ +package fetcher + +import ( + "context" + "fmt" + "net" + "net/url" + "strings" + "time" +) + +var reservedNetworks = []net.IPNet{ + {IP: net.IPv4(10, 0, 0, 0), Mask: net.CIDRMask(8, 32)}, + {IP: net.IPv4(172, 16, 0, 0), Mask: net.CIDRMask(12, 32)}, + {IP: net.IPv4(192, 168, 0, 0), Mask: net.CIDRMask(16, 32)}, + {IP: net.IPv4(127, 0, 0, 0), Mask: net.CIDRMask(8, 32)}, + {IP: net.IPv4(169, 254, 0, 0), Mask: net.CIDRMask(16, 32)}, + {IP: net.IPv4(0, 0, 0, 0), Mask: net.CIDRMask(8, 32)}, + {IP: net.ParseIP("::1"), Mask: net.CIDRMask(128, 128)}, + {IP: net.ParseIP("fc00::"), Mask: net.CIDRMask(7, 128)}, + {IP: net.ParseIP("fe80::"), Mask: net.CIDRMask(10, 128)}, +} + +func isReservedAddress(ipAddress net.IP) bool { + for _, network := range reservedNetworks { + if network.Contains(ipAddress) { + return true + } + } + + return false +} + +func ValidateFeedURL(feedURL string) error { + parsedURL, parseError := url.Parse(feedURL) + if parseError != nil { + return fmt.Errorf("invalid URL: %w", parseError) + } + + scheme := strings.ToLower(parsedURL.Scheme) + if scheme != "http" && scheme != "https" { + return fmt.Errorf("unsupported scheme %q: only http and https are allowed", parsedURL.Scheme) + } + + hostname := parsedURL.Hostname() + if hostname == "" { + return fmt.Errorf("URL has no hostname") + } + + if parsedIP := net.ParseIP(hostname); parsedIP != nil { + if isReservedAddress(parsedIP) { + return fmt.Errorf("feed URL resolves to a reserved IP address") + } + + return nil + } + + resolverContext, cancelResolver := context.WithTimeout(context.Background(), 5*time.Second) + defer cancelResolver() + + resolvedAddresses, lookupError := net.DefaultResolver.LookupIPAddr(resolverContext, hostname) + if lookupError != nil { + return fmt.Errorf("failed to resolve hostname %q: %w", hostname, lookupError) + } + + for _, resolvedAddress := range resolvedAddresses { + if isReservedAddress(resolvedAddress.IP) { + return fmt.Errorf("feed URL resolves to a reserved IP address") + } + } + + return nil +} + +func ValidateRedirectTarget(redirectURL string) error { + return ValidateFeedURL(redirectURL) +} diff --git a/services/worker/internal/health/health.go b/services/worker/internal/health/health.go new file mode 100644 index 0000000..0a29c92 --- /dev/null +++ b/services/worker/internal/health/health.go @@ -0,0 +1,117 @@ +package health + +import ( + "context" + "encoding/json" + "fmt" + "github.com/jackc/pgx/v5/pgxpool" + "log/slog" + "net/http" + "time" +) + +type HealthStatus struct { + Status string `json:"status"` + Timestamp string `json:"timestamp"` + Database string `json:"database"` +} + +type HealthServer struct { + httpServer *http.Server + databaseConnectionPool *pgxpool.Pool + logger *slog.Logger +} + +func NewHealthServer( + healthPort int, + databaseConnectionPool *pgxpool.Pool, + logger *slog.Logger, +) *HealthServer { + healthServer := &HealthServer{ + databaseConnectionPool: databaseConnectionPool, + logger: logger, + } + serveMux := http.NewServeMux() + + serveMux.HandleFunc("/health", healthServer.handleHealthCheck) + serveMux.HandleFunc("/ready", healthServer.handleReadinessCheck) + + healthServer.httpServer = &http.Server{ + Addr: fmt.Sprintf(":%d", healthPort), + Handler: serveMux, + ReadTimeout: 5 * time.Second, + WriteTimeout: 5 * time.Second, + IdleTimeout: 30 * time.Second, + } + + return healthServer +} + +func (healthServer *HealthServer) Start() { + go func() { + healthServer.logger.Info("health server starting", "address", healthServer.httpServer.Addr) + + listenError := healthServer.httpServer.ListenAndServe() + + if listenError != nil && listenError != http.ErrServerClosed { + healthServer.logger.Error("health server failed", "error", listenError) + } + }() +} + +func (healthServer *HealthServer) Stop(shutdownContext context.Context) error { + return healthServer.httpServer.Shutdown(shutdownContext) +} + +func (healthServer *HealthServer) handleHealthCheck(responseWriter http.ResponseWriter, request *http.Request) { + healthStatus := HealthStatus{ + Status: "healthy", + Timestamp: time.Now().UTC().Format(time.RFC3339), + Database: "unknown", + } + pingContext, cancelPing := context.WithTimeout(request.Context(), 2*time.Second) + + defer cancelPing() + + pingError := healthServer.databaseConnectionPool.Ping(pingContext) + + if pingError != nil { + healthStatus.Status = "unhealthy" + healthStatus.Database = "unreachable" + + respondWithJSON(responseWriter, http.StatusServiceUnavailable, healthStatus) + + return + } + + healthStatus.Database = "connected" + + respondWithJSON(responseWriter, http.StatusOK, healthStatus) +} + +func (healthServer *HealthServer) handleReadinessCheck(responseWriter http.ResponseWriter, request *http.Request) { + pingContext, cancelPing := context.WithTimeout(request.Context(), 2*time.Second) + + defer cancelPing() + + pingError := healthServer.databaseConnectionPool.Ping(pingContext) + + if pingError != nil { + responseWriter.WriteHeader(http.StatusServiceUnavailable) + + return + } + + responseWriter.WriteHeader(http.StatusOK) +} + +func respondWithJSON(responseWriter http.ResponseWriter, statusCode int, payload interface{}) { + responseWriter.Header().Set("Content-Type", "application/json") + responseWriter.WriteHeader(statusCode) + + encodingError := json.NewEncoder(responseWriter).Encode(payload) + + if encodingError != nil { + return + } +} diff --git a/services/worker/internal/model/feed.go b/services/worker/internal/model/feed.go new file mode 100644 index 0000000..611c820 --- /dev/null +++ b/services/worker/internal/model/feed.go @@ -0,0 +1,41 @@ +package model + +import ( + "time" +) + +type Feed struct { + Identifier string + URL string + SiteURL *string + Title *string + FeedType *string + Visibility string + EntityTag *string + LastModified *string + LastFetchedAt *time.Time + LastFetchError *string + ConsecutiveFailures int + NextFetchAt time.Time + FetchIntervalSeconds int + CreatedAt time.Time + UpdatedAt time.Time +} + +type FeedEntry struct { + FeedIdentifier string + OwnerIdentifier *string + GUID string + URL *string + Title *string + Author *string + Summary *string + ContentHTML *string + ContentText *string + ImageURL *string + PublishedAt *time.Time + WordCount *int + EnclosureURL *string + EnclosureType *string + EnclosureLength *int64 +} diff --git a/services/worker/internal/model/queue.go b/services/worker/internal/model/queue.go new file mode 100644 index 0000000..a5aaa7c --- /dev/null +++ b/services/worker/internal/model/queue.go @@ -0,0 +1,5 @@ +package model + +type RefreshRequest struct { + FeedIdentifier string `json:"feed_id"` +} diff --git a/services/worker/internal/parser/parser.go b/services/worker/internal/parser/parser.go new file mode 100644 index 0000000..1fb2f76 --- /dev/null +++ b/services/worker/internal/parser/parser.go @@ -0,0 +1,234 @@ +package parser + +import ( + "crypto/sha256" + "fmt" + "github.com/Fuwn/asa-news/internal/model" + "github.com/mmcdole/gofeed" + "strconv" + "strings" + "time" + "unicode/utf8" +) + +type Parser struct { + gofeedParser *gofeed.Parser +} + +func NewParser() *Parser { + return &Parser{ + gofeedParser: gofeed.NewParser(), + } +} + +type ParseResult struct { + Entries []model.FeedEntry + FeedTitle string + SiteURL string + AudioEnclosureRatio float64 +} + +func (feedParser *Parser) Parse(feedIdentifier string, ownerIdentifier *string, rawFeedContent []byte) (*ParseResult, error) { + parsedFeed, parseError := feedParser.gofeedParser.ParseString(string(rawFeedContent)) + + if parseError != nil { + return nil, fmt.Errorf("failed to parse feed content: %w", parseError) + } + + feedEntries := make([]model.FeedEntry, 0, len(parsedFeed.Items)) + + audioEnclosureCount := 0 + + for _, feedItem := range parsedFeed.Items { + normalizedEntry := normalizeFeedItem(feedIdentifier, ownerIdentifier, feedItem) + if normalizedEntry.EnclosureURL != nil { + audioEnclosureCount++ + } + feedEntries = append(feedEntries, normalizedEntry) + } + + audioEnclosureRatio := 0.0 + if len(feedEntries) > 0 { + audioEnclosureRatio = float64(audioEnclosureCount) / float64(len(feedEntries)) + } + + return &ParseResult{ + Entries: feedEntries, + FeedTitle: strings.TrimSpace(parsedFeed.Title), + SiteURL: strings.TrimSpace(parsedFeed.Link), + AudioEnclosureRatio: audioEnclosureRatio, + }, nil +} + +func normalizeFeedItem(feedIdentifier string, ownerIdentifier *string, feedItem *gofeed.Item) model.FeedEntry { + globallyUniqueIdentifier := resolveGloballyUniqueIdentifier(feedItem) + entryURL := stringPointerOrNil(resolveEntryURL(feedItem)) + entryTitle := stringPointerOrNil(strings.TrimSpace(feedItem.Title)) + entrySummary := stringPointerOrNil(strings.TrimSpace(feedItem.Description)) + entryContentHTML := stringPointerOrNil(resolveContentHTML(feedItem)) + entryContentText := resolveContentText(feedItem) + authorName := stringPointerOrNil(resolveAuthorName(feedItem)) + publishedAt := resolvePublishedDate(feedItem) + entryImageURL := stringPointerOrNil(resolveImageURL(feedItem)) + wordCount := countWords(entryContentText) + enclosureURL, enclosureType, enclosureLength := resolveAudioEnclosure(feedItem) + + return model.FeedEntry{ + FeedIdentifier: feedIdentifier, + OwnerIdentifier: ownerIdentifier, + GUID: globallyUniqueIdentifier, + URL: entryURL, + Title: entryTitle, + Author: authorName, + Summary: entrySummary, + ContentHTML: entryContentHTML, + ContentText: stringPointerOrNil(entryContentText), + ImageURL: entryImageURL, + PublishedAt: publishedAt, + WordCount: wordCount, + EnclosureURL: enclosureURL, + EnclosureType: enclosureType, + EnclosureLength: enclosureLength, + } +} + +func stringPointerOrNil(value string) *string { + if value == "" { + return nil + } + + return &value +} + +func resolveGloballyUniqueIdentifier(feedItem *gofeed.Item) string { + if feedItem.GUID != "" { + return feedItem.GUID + } + + if feedItem.Link != "" { + return feedItem.Link + } + + hashInput := feedItem.Title + feedItem.Description + hashBytes := sha256.Sum256([]byte(hashInput)) + + return fmt.Sprintf("sha256:%x", hashBytes) +} + +func resolveEntryURL(feedItem *gofeed.Item) string { + if feedItem.Link != "" { + return feedItem.Link + } + + if feedItem.GUID != "" && strings.HasPrefix(feedItem.GUID, "http") { + return feedItem.GUID + } + + return "" +} + +func resolveContentHTML(feedItem *gofeed.Item) string { + if feedItem.Content != "" { + return feedItem.Content + } + + return feedItem.Description +} + +func resolveContentText(feedItem *gofeed.Item) string { + contentSource := feedItem.Content + + if contentSource == "" { + contentSource = feedItem.Description + } + + return stripHTMLTags(contentSource) +} + +func resolveAuthorName(feedItem *gofeed.Item) string { + if feedItem.Author != nil && feedItem.Author.Name != "" { + return feedItem.Author.Name + } + + if len(feedItem.Authors) > 0 && feedItem.Authors[0].Name != "" { + return feedItem.Authors[0].Name + } + + return "" +} + +func resolvePublishedDate(feedItem *gofeed.Item) *time.Time { + if feedItem.PublishedParsed != nil { + return feedItem.PublishedParsed + } + + if feedItem.UpdatedParsed != nil { + return feedItem.UpdatedParsed + } + + return nil +} + +func resolveAudioEnclosure(feedItem *gofeed.Item) (*string, *string, *int64) { + if feedItem.Enclosures == nil { + return nil, nil, nil + } + + for _, enclosure := range feedItem.Enclosures { + if strings.HasPrefix(enclosure.Type, "audio/") && enclosure.URL != "" { + enclosureURL := enclosure.URL + enclosureType := enclosure.Type + + var enclosureLength *int64 + if enclosure.Length != "" { + if parsedLength, parseError := strconv.ParseInt(enclosure.Length, 10, 64); parseError == nil { + enclosureLength = &parsedLength + } + } + + return &enclosureURL, &enclosureType, enclosureLength + } + } + + return nil, nil, nil +} + +func resolveImageURL(feedItem *gofeed.Item) string { + if feedItem.Image != nil && feedItem.Image.URL != "" { + return feedItem.Image.URL + } + + return "" +} + +func countWords(plainText string) *int { + if plainText == "" { + return nil + } + + count := len(strings.Fields(plainText)) + + return &count +} + +func stripHTMLTags(htmlContent string) string { + var resultBuilder strings.Builder + + insideTag := false + + for characterIndex := 0; characterIndex < len(htmlContent); { + currentRune, runeSize := utf8.DecodeRuneInString(htmlContent[characterIndex:]) + + if currentRune == '<' { + insideTag = true + } else if currentRune == '>' { + insideTag = false + } else if !insideTag { + resultBuilder.WriteRune(currentRune) + } + + characterIndex += runeSize + } + + return strings.TrimSpace(resultBuilder.String()) +} diff --git a/services/worker/internal/pool/pool.go b/services/worker/internal/pool/pool.go new file mode 100644 index 0000000..7df03e2 --- /dev/null +++ b/services/worker/internal/pool/pool.go @@ -0,0 +1,60 @@ +package pool + +import ( + "context" + "log/slog" + "sync" +) + +type WorkFunction func(workContext context.Context) + +type WorkerPool struct { + concurrencyLimit int + semaphoreChannel chan struct{} + waitGroup sync.WaitGroup + logger *slog.Logger +} + +func NewWorkerPool(concurrencyLimit int, logger *slog.Logger) *WorkerPool { + return &WorkerPool{ + concurrencyLimit: concurrencyLimit, + semaphoreChannel: make(chan struct{}, concurrencyLimit), + logger: logger, + } +} + +func (workerPool *WorkerPool) Submit(workContext context.Context, workFunction WorkFunction) bool { + select { + case workerPool.semaphoreChannel <- struct{}{}: + workerPool.waitGroup.Add(1) + + go func() { + defer workerPool.waitGroup.Done() + defer func() { <-workerPool.semaphoreChannel }() + defer func() { + recoveredPanic := recover() + + if recoveredPanic != nil { + workerPool.logger.Error( + "worker panic recovered", + "panic_value", recoveredPanic, + ) + } + }() + + workFunction(workContext) + }() + + return true + case <-workContext.Done(): + return false + } +} + +func (workerPool *WorkerPool) Wait() { + workerPool.waitGroup.Wait() +} + +func (workerPool *WorkerPool) ActiveWorkerCount() int { + return len(workerPool.semaphoreChannel) +} diff --git a/services/worker/internal/scheduler/refresh.go b/services/worker/internal/scheduler/refresh.go new file mode 100644 index 0000000..8271fa0 --- /dev/null +++ b/services/worker/internal/scheduler/refresh.go @@ -0,0 +1,196 @@ +package scheduler + +import ( + "context" + "github.com/Fuwn/asa-news/internal/fetcher" + "github.com/Fuwn/asa-news/internal/model" + "github.com/Fuwn/asa-news/internal/parser" + "github.com/Fuwn/asa-news/internal/webhook" + "github.com/Fuwn/asa-news/internal/writer" + "log/slog" +) + +func ProcessRefreshRequest( + refreshContext context.Context, + feed model.Feed, + feedFetcher *fetcher.Fetcher, + feedParser *parser.Parser, + feedWriter *writer.Writer, + webhookDispatcher *webhook.Dispatcher, + logger *slog.Logger, +) { + logger.Info( + "processing feed refresh", + "feed_identifier", feed.Identifier, + "feed_url", feed.URL, + ) + + var ownerIdentifier *string + + if feed.Visibility == "authenticated" { + logger.Warn( + "authenticated feed refresh not yet implemented", + "feed_identifier", feed.Identifier, + ) + + return + } + + authenticationConfig := fetcher.AuthenticationConfiguration{} + + entityTag := "" + + if feed.EntityTag != nil { + entityTag = *feed.EntityTag + } + + lastModified := "" + + if feed.LastModified != nil { + lastModified = *feed.LastModified + } + + fetchResult, fetchError := feedFetcher.Fetch( + refreshContext, + feed.URL, + entityTag, + lastModified, + authenticationConfig, + ) + + if fetchError != nil { + logger.Warn( + "feed fetch failed", + "feed_identifier", feed.Identifier, + "error", fetchError, + ) + + recordError := feedWriter.RecordFeedError(refreshContext, feed.Identifier, fetchError.Error()) + + if recordError != nil { + logger.Error( + "failed to record feed error", + "feed_identifier", feed.Identifier, + "error", recordError, + ) + } + + return + } + + if fetchResult.NotModified { + logger.Info( + "feed not modified", + "feed_identifier", feed.Identifier, + ) + + metadataError := feedWriter.UpdateFeedMetadata( + refreshContext, + feed.Identifier, + fetchResult.EntityTag, + fetchResult.LastModifiedHeader, + "", + "", + ) + + if metadataError != nil { + logger.Error( + "failed to update feed metadata after not-modified", + "feed_identifier", feed.Identifier, + "error", metadataError, + ) + } + + return + } + + parseResult, parseError := feedParser.Parse(feed.Identifier, ownerIdentifier, fetchResult.Body) + + if parseError != nil { + logger.Warn( + "feed parse failed", + "feed_identifier", feed.Identifier, + "error", parseError, + ) + + recordError := feedWriter.RecordFeedError(refreshContext, feed.Identifier, parseError.Error()) + + if recordError != nil { + logger.Error( + "failed to record parse error", + "feed_identifier", feed.Identifier, + "error", recordError, + ) + } + + return + } + + rowsAffected, writeError := feedWriter.WriteEntries(refreshContext, parseResult.Entries) + + if writeError != nil { + logger.Error( + "failed to write feed entries", + "feed_identifier", feed.Identifier, + "error", writeError, + ) + + recordError := feedWriter.RecordFeedError(refreshContext, feed.Identifier, writeError.Error()) + + if recordError != nil { + logger.Error( + "failed to record write error", + "feed_identifier", feed.Identifier, + "error", recordError, + ) + } + + return + } + + if rowsAffected > 0 && webhookDispatcher != nil { + webhookDispatcher.DispatchForFeed(refreshContext, feed.Identifier, parseResult.Entries) + } + + metadataError := feedWriter.UpdateFeedMetadata( + refreshContext, + feed.Identifier, + fetchResult.EntityTag, + fetchResult.LastModifiedHeader, + parseResult.FeedTitle, + parseResult.SiteURL, + ) + + if metadataError != nil { + logger.Error( + "failed to update feed metadata", + "feed_identifier", feed.Identifier, + "error", metadataError, + ) + } + + if parseResult.AudioEnclosureRatio > 0.5 { + if feedTypeError := feedWriter.UpdateFeedType(refreshContext, feed.Identifier, "podcast"); feedTypeError != nil { + logger.Error( + "failed to update feed type to podcast", + "feed_identifier", feed.Identifier, + "error", feedTypeError, + ) + } + } else if feed.FeedType != nil && *feed.FeedType == "podcast" { + if feedTypeError := feedWriter.UpdateFeedType(refreshContext, feed.Identifier, "rss"); feedTypeError != nil { + logger.Error( + "failed to clear podcast feed type", + "feed_identifier", feed.Identifier, + "error", feedTypeError, + ) + } + } + + logger.Info( + "feed refresh completed", + "feed_identifier", feed.Identifier, + "entries_parsed", len(parseResult.Entries), + "rows_affected", rowsAffected, + ) +} diff --git a/services/worker/internal/scheduler/scheduler.go b/services/worker/internal/scheduler/scheduler.go new file mode 100644 index 0000000..646e263 --- /dev/null +++ b/services/worker/internal/scheduler/scheduler.go @@ -0,0 +1,283 @@ +package scheduler + +import ( + "context" + "encoding/json" + "fmt" + "github.com/Fuwn/asa-news/internal/fetcher" + "github.com/Fuwn/asa-news/internal/model" + "github.com/Fuwn/asa-news/internal/parser" + "github.com/Fuwn/asa-news/internal/pool" + "github.com/Fuwn/asa-news/internal/webhook" + "github.com/Fuwn/asa-news/internal/writer" + pgmq "github.com/craigpastro/pgmq-go" + "github.com/jackc/pgx/v5/pgxpool" + "log/slog" + "time" +) + +type Scheduler struct { + databaseConnectionPool *pgxpool.Pool + feedFetcher *fetcher.Fetcher + feedParser *parser.Parser + feedWriter *writer.Writer + webhookDispatcher *webhook.Dispatcher + workerPool *pool.WorkerPool + pollInterval time.Duration + queuePollInterval time.Duration + batchSize int + logger *slog.Logger +} + +func NewScheduler( + databaseConnectionPool *pgxpool.Pool, + feedFetcher *fetcher.Fetcher, + feedParser *parser.Parser, + feedWriter *writer.Writer, + webhookDispatcher *webhook.Dispatcher, + workerPool *pool.WorkerPool, + pollInterval time.Duration, + queuePollInterval time.Duration, + batchSize int, + logger *slog.Logger, +) *Scheduler { + return &Scheduler{ + databaseConnectionPool: databaseConnectionPool, + feedFetcher: feedFetcher, + feedParser: feedParser, + feedWriter: feedWriter, + webhookDispatcher: webhookDispatcher, + workerPool: workerPool, + pollInterval: pollInterval, + queuePollInterval: queuePollInterval, + batchSize: batchSize, + logger: logger, + } +} + +func (feedScheduler *Scheduler) Run(schedulerContext context.Context) { + pollTicker := time.NewTicker(feedScheduler.pollInterval) + + defer pollTicker.Stop() + + queueTicker := time.NewTicker(feedScheduler.queuePollInterval) + + defer queueTicker.Stop() + + feedScheduler.logger.Info("scheduler started", + "poll_interval", feedScheduler.pollInterval, + "queue_poll_interval", feedScheduler.queuePollInterval, + "batch_size", feedScheduler.batchSize, + ) + feedScheduler.executePollCycle(schedulerContext) + feedScheduler.executeQueueCycle(schedulerContext) + + for { + select { + case <-schedulerContext.Done(): + feedScheduler.logger.Info("scheduler shutting down") + + return + case <-pollTicker.C: + feedScheduler.executePollCycle(schedulerContext) + case <-queueTicker.C: + feedScheduler.executeQueueCycle(schedulerContext) + } + } +} + +func (feedScheduler *Scheduler) executePollCycle(cycleContext context.Context) { + claimedFeeds, claimError := feedScheduler.claimDueFeeds(cycleContext) + + if claimError != nil { + feedScheduler.logger.Error("failed to claim due feeds", "error", claimError) + + return + } + + if len(claimedFeeds) == 0 { + return + } + + feedScheduler.logger.Info("claimed feeds for processing", "count", len(claimedFeeds)) + + for _, claimedFeed := range claimedFeeds { + capturedFeed := claimedFeed + + feedScheduler.workerPool.Submit(cycleContext, func(workContext context.Context) { + ProcessRefreshRequest( + workContext, + capturedFeed, + feedScheduler.feedFetcher, + feedScheduler.feedParser, + feedScheduler.feedWriter, + feedScheduler.webhookDispatcher, + feedScheduler.logger, + ) + }) + } +} + +func (feedScheduler *Scheduler) executeQueueCycle(cycleContext context.Context) { + queueMessage, readError := pgmq.Read(cycleContext, feedScheduler.databaseConnectionPool, "feed_refresh", 30) + + if readError != nil { + return + } + + if queueMessage == nil { + return + } + + var refreshRequest model.RefreshRequest + + unmarshalError := json.Unmarshal(queueMessage.Message, &refreshRequest) + + if unmarshalError != nil { + feedScheduler.logger.Error("failed to unmarshal refresh request", "error", unmarshalError) + + return + } + + feed, lookupError := feedScheduler.lookupFeed(cycleContext, refreshRequest.FeedIdentifier) + + if lookupError != nil { + feedScheduler.logger.Error( + "failed to look up feed for queue request", + "feed_identifier", refreshRequest.FeedIdentifier, + "error", lookupError, + ) + + return + } + + capturedMessageIdentifier := queueMessage.MsgID + + feedScheduler.workerPool.Submit(cycleContext, func(workContext context.Context) { + ProcessRefreshRequest( + workContext, + feed, + feedScheduler.feedFetcher, + feedScheduler.feedParser, + feedScheduler.feedWriter, + feedScheduler.webhookDispatcher, + feedScheduler.logger, + ) + + _, archiveError := pgmq.Archive(workContext, feedScheduler.databaseConnectionPool, "feed_refresh", capturedMessageIdentifier) + + if archiveError != nil { + feedScheduler.logger.Error( + "failed to archive queue message", + "message_id", capturedMessageIdentifier, + "error", archiveError, + ) + } + }) +} + +func (feedScheduler *Scheduler) claimDueFeeds(claimContext context.Context) ([]model.Feed, error) { + claimQuery := ` + UPDATE feeds + SET last_fetched_at = NOW() + WHERE id IN ( + SELECT id + FROM feeds + WHERE next_fetch_at <= NOW() + AND subscriber_count > 0 + ORDER BY next_fetch_at ASC + LIMIT $1 + FOR UPDATE SKIP LOCKED + ) + RETURNING + id, url, site_url, title, feed_type, + visibility, etag, last_modified, + last_fetched_at, last_fetch_error, + consecutive_failures, next_fetch_at, + fetch_interval_seconds, + created_at, updated_at + ` + rows, queryError := feedScheduler.databaseConnectionPool.Query(claimContext, claimQuery, feedScheduler.batchSize) + + if queryError != nil { + return nil, fmt.Errorf("failed to query due feeds: %w", queryError) + } + + defer rows.Close() + + claimedFeeds := make([]model.Feed, 0) + + for rows.Next() { + var feed model.Feed + + scanError := rows.Scan( + &feed.Identifier, + &feed.URL, + &feed.SiteURL, + &feed.Title, + &feed.FeedType, + &feed.Visibility, + &feed.EntityTag, + &feed.LastModified, + &feed.LastFetchedAt, + &feed.LastFetchError, + &feed.ConsecutiveFailures, + &feed.NextFetchAt, + &feed.FetchIntervalSeconds, + &feed.CreatedAt, + &feed.UpdatedAt, + ) + + if scanError != nil { + return nil, fmt.Errorf("failed to scan feed row: %w", scanError) + } + + claimedFeeds = append(claimedFeeds, feed) + } + + if rows.Err() != nil { + return nil, fmt.Errorf("error iterating feed rows: %w", rows.Err()) + } + + return claimedFeeds, nil +} + +func (feedScheduler *Scheduler) lookupFeed(lookupContext context.Context, feedIdentifier string) (model.Feed, error) { + lookupQuery := ` + SELECT + id, url, site_url, title, feed_type, + visibility, etag, last_modified, + last_fetched_at, last_fetch_error, + consecutive_failures, next_fetch_at, + fetch_interval_seconds, + created_at, updated_at + FROM feeds + WHERE id = $1 + ` + + var feed model.Feed + + scanError := feedScheduler.databaseConnectionPool.QueryRow(lookupContext, lookupQuery, feedIdentifier).Scan( + &feed.Identifier, + &feed.URL, + &feed.SiteURL, + &feed.Title, + &feed.FeedType, + &feed.Visibility, + &feed.EntityTag, + &feed.LastModified, + &feed.LastFetchedAt, + &feed.LastFetchError, + &feed.ConsecutiveFailures, + &feed.NextFetchAt, + &feed.FetchIntervalSeconds, + &feed.CreatedAt, + &feed.UpdatedAt, + ) + + if scanError != nil { + return model.Feed{}, fmt.Errorf("failed to look up feed %s: %w", feedIdentifier, scanError) + } + + return feed, nil +} diff --git a/services/worker/internal/webhook/webhook.go b/services/worker/internal/webhook/webhook.go new file mode 100644 index 0000000..c812820 --- /dev/null +++ b/services/worker/internal/webhook/webhook.go @@ -0,0 +1,333 @@ +package webhook + +import ( + "bytes" + "context" + "crypto/hmac" + "crypto/sha256" + "encoding/hex" + "encoding/json" + "fmt" + "github.com/Fuwn/asa-news/internal/fetcher" + "github.com/Fuwn/asa-news/internal/model" + "github.com/jackc/pgx/v5/pgxpool" + "log/slog" + "net/http" + "time" +) + +type Dispatcher struct { + databaseConnectionPool *pgxpool.Pool + httpClient *http.Client + logger *slog.Logger +} + +type webhookSubscriber struct { + UserIdentifier string + WebhookURL string + WebhookSecret *string +} + +type EntryPayload struct { + EntryIdentifier string `json:"entryIdentifier"` + FeedIdentifier string `json:"feedIdentifier"` + GUID string `json:"guid"` + URL *string `json:"url"` + Title *string `json:"title"` + Author *string `json:"author"` + Summary *string `json:"summary"` + PublishedAt *string `json:"publishedAt"` + EnclosureURL *string `json:"enclosureUrl"` + EnclosureType *string `json:"enclosureType"` +} + +type WebhookPayload struct { + Event string `json:"event"` + Timestamp string `json:"timestamp"` + Entries []EntryPayload `json:"entries"` +} + +func NewDispatcher( + databaseConnectionPool *pgxpool.Pool, + logger *slog.Logger, +) *Dispatcher { + return &Dispatcher{ + databaseConnectionPool: databaseConnectionPool, + httpClient: &http.Client{ + Timeout: 10 * time.Second, + }, + logger: logger, + } +} + +func (webhookDispatcher *Dispatcher) DispatchForFeed( + dispatchContext context.Context, + feedIdentifier string, + entries []model.FeedEntry, +) { + subscribers, queryError := webhookDispatcher.findWebhookSubscribers(dispatchContext, feedIdentifier) + + if queryError != nil { + webhookDispatcher.logger.Error( + "failed to query webhook subscribers", + "feed_identifier", feedIdentifier, + "error", queryError, + ) + + return + } + + if len(subscribers) == 0 { + return + } + + entryPayloads := make([]EntryPayload, 0, len(entries)) + + for _, entry := range entries { + var publishedAtString *string + + if entry.PublishedAt != nil { + formattedTime := entry.PublishedAt.Format(time.RFC3339) + publishedAtString = &formattedTime + } + + entryPayloads = append(entryPayloads, EntryPayload{ + EntryIdentifier: entry.FeedIdentifier, + FeedIdentifier: entry.FeedIdentifier, + GUID: entry.GUID, + URL: entry.URL, + Title: entry.Title, + Author: entry.Author, + Summary: entry.Summary, + PublishedAt: publishedAtString, + EnclosureURL: entry.EnclosureURL, + EnclosureType: entry.EnclosureType, + }) + } + + payload := WebhookPayload{ + Event: "entries.created", + Timestamp: time.Now().UTC().Format(time.RFC3339), + Entries: entryPayloads, + } + + for _, subscriber := range subscribers { + webhookDispatcher.deliverWebhook(dispatchContext, subscriber, payload) + } +} + +func (webhookDispatcher *Dispatcher) findWebhookSubscribers( + queryContext context.Context, + feedIdentifier string, +) ([]webhookSubscriber, error) { + subscriberQuery := ` + SELECT up.id, up.webhook_url, up.webhook_secret + FROM subscriptions s + JOIN user_profiles up ON up.id = s.user_id + WHERE s.feed_id = $1 + AND up.tier = 'developer' + AND up.webhook_enabled = true + AND up.webhook_url IS NOT NULL + ` + + rows, queryError := webhookDispatcher.databaseConnectionPool.Query( + queryContext, + subscriberQuery, + feedIdentifier, + ) + + if queryError != nil { + return nil, fmt.Errorf("failed to query webhook subscribers: %w", queryError) + } + + defer rows.Close() + + subscribers := make([]webhookSubscriber, 0) + + for rows.Next() { + var subscriber webhookSubscriber + + scanError := rows.Scan( + &subscriber.UserIdentifier, + &subscriber.WebhookURL, + &subscriber.WebhookSecret, + ) + + if scanError != nil { + return nil, fmt.Errorf("failed to scan subscriber row: %w", scanError) + } + + subscribers = append(subscribers, subscriber) + } + + return subscribers, rows.Err() +} + +func (webhookDispatcher *Dispatcher) deliverWebhook( + deliveryContext context.Context, + subscriber webhookSubscriber, + payload WebhookPayload, +) { + urlValidationError := fetcher.ValidateFeedURL(subscriber.WebhookURL) + + if urlValidationError != nil { + webhookDispatcher.logger.Warn( + "webhook URL failed SSRF validation", + "user_identifier", subscriber.UserIdentifier, + "webhook_url", subscriber.WebhookURL, + "error", urlValidationError, + ) + + return + } + + payloadBytes, marshalError := json.Marshal(payload) + + if marshalError != nil { + webhookDispatcher.logger.Error( + "failed to marshal webhook payload", + "user_identifier", subscriber.UserIdentifier, + "error", marshalError, + ) + + return + } + + retryDelays := []time.Duration{0, 1 * time.Second, 4 * time.Second, 16 * time.Second} + var lastError error + + for attemptIndex, delay := range retryDelays { + if delay > 0 { + select { + case <-time.After(delay): + case <-deliveryContext.Done(): + return + } + } + + deliveryError := webhookDispatcher.sendWebhookRequest( + deliveryContext, + subscriber, + payloadBytes, + ) + + if deliveryError == nil { + if attemptIndex > 0 { + webhookDispatcher.logger.Info( + "webhook delivered after retry", + "user_identifier", subscriber.UserIdentifier, + "attempt", attemptIndex+1, + ) + } + + webhookDispatcher.resetConsecutiveFailures(deliveryContext, subscriber.UserIdentifier) + + return + } + + lastError = deliveryError + + webhookDispatcher.logger.Warn( + "webhook delivery attempt failed", + "user_identifier", subscriber.UserIdentifier, + "attempt", attemptIndex+1, + "error", deliveryError, + ) + } + + webhookDispatcher.logger.Error( + "webhook delivery failed after all retries", + "user_identifier", subscriber.UserIdentifier, + "error", lastError, + ) + + webhookDispatcher.incrementConsecutiveFailures(deliveryContext, subscriber.UserIdentifier) +} + +func (webhookDispatcher *Dispatcher) sendWebhookRequest( + requestContext context.Context, + subscriber webhookSubscriber, + payloadBytes []byte, +) error { + request, requestCreationError := http.NewRequestWithContext( + requestContext, + http.MethodPost, + subscriber.WebhookURL, + bytes.NewReader(payloadBytes), + ) + + if requestCreationError != nil { + return fmt.Errorf("failed to create webhook request: %w", requestCreationError) + } + + request.Header.Set("Content-Type", "application/json") + request.Header.Set("User-Agent", "asa.news Webhook/1.0") + + if subscriber.WebhookSecret != nil && *subscriber.WebhookSecret != "" { + mac := hmac.New(sha256.New, []byte(*subscriber.WebhookSecret)) + mac.Write(payloadBytes) + signature := hex.EncodeToString(mac.Sum(nil)) + request.Header.Set("X-Asa-Signature-256", "sha256="+signature) + } + + response, requestError := webhookDispatcher.httpClient.Do(request) + + if requestError != nil { + return fmt.Errorf("webhook request failed: %w", requestError) + } + + defer response.Body.Close() + + if response.StatusCode >= 200 && response.StatusCode < 300 { + return nil + } + + return fmt.Errorf("webhook returned status %d", response.StatusCode) +} + +func (webhookDispatcher *Dispatcher) resetConsecutiveFailures( + updateContext context.Context, + userIdentifier string, +) { + _, updateError := webhookDispatcher.databaseConnectionPool.Exec( + updateContext, + "UPDATE user_profiles SET webhook_consecutive_failures = 0 WHERE id = $1", + userIdentifier, + ) + + if updateError != nil { + webhookDispatcher.logger.Error( + "failed to reset webhook consecutive failures", + "user_identifier", userIdentifier, + "error", updateError, + ) + } +} + +func (webhookDispatcher *Dispatcher) incrementConsecutiveFailures( + updateContext context.Context, + userIdentifier string, +) { + maximumConsecutiveFailures := 10 + + _, updateError := webhookDispatcher.databaseConnectionPool.Exec( + updateContext, + `UPDATE user_profiles + SET webhook_consecutive_failures = webhook_consecutive_failures + 1, + webhook_enabled = CASE + WHEN webhook_consecutive_failures + 1 >= $2 THEN false + ELSE webhook_enabled + END + WHERE id = $1`, + userIdentifier, + maximumConsecutiveFailures, + ) + + if updateError != nil { + webhookDispatcher.logger.Error( + "failed to increment webhook consecutive failures", + "user_identifier", userIdentifier, + "error", updateError, + ) + } +} diff --git a/services/worker/internal/writer/writer.go b/services/worker/internal/writer/writer.go new file mode 100644 index 0000000..de81bfa --- /dev/null +++ b/services/worker/internal/writer/writer.go @@ -0,0 +1,222 @@ +package writer + +import ( + "context" + "fmt" + "github.com/Fuwn/asa-news/internal/model" + "github.com/jackc/pgx/v5/pgxpool" + "strings" + "time" +) + +type Writer struct { + databaseConnectionPool *pgxpool.Pool +} + +func NewWriter(databaseConnectionPool *pgxpool.Pool) *Writer { + return &Writer{ + databaseConnectionPool: databaseConnectionPool, + } +} + +func (feedWriter *Writer) WriteEntries(writeContext context.Context, feedEntries []model.FeedEntry) (int64, error) { + if len(feedEntries) == 0 { + return 0, nil + } + + valueParameterPlaceholders := make([]string, 0, len(feedEntries)) + queryArguments := make([]interface{}, 0, len(feedEntries)*15) + + for entryIndex, entry := range feedEntries { + parameterOffset := entryIndex * 15 + valueParameterPlaceholders = append( + valueParameterPlaceholders, + fmt.Sprintf( + "($%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d)", + parameterOffset+1, parameterOffset+2, parameterOffset+3, + parameterOffset+4, parameterOffset+5, parameterOffset+6, + parameterOffset+7, parameterOffset+8, parameterOffset+9, + parameterOffset+10, parameterOffset+11, parameterOffset+12, + parameterOffset+13, parameterOffset+14, parameterOffset+15, + ), + ) + queryArguments = append( + queryArguments, + entry.FeedIdentifier, + entry.OwnerIdentifier, + entry.GUID, + entry.URL, + entry.Title, + entry.Author, + entry.Summary, + entry.ContentHTML, + entry.ContentText, + entry.ImageURL, + entry.PublishedAt, + entry.WordCount, + entry.EnclosureURL, + entry.EnclosureType, + entry.EnclosureLength, + ) + } + + isPublicEntry := feedEntries[0].OwnerIdentifier == nil + + var conflictClause string + + if isPublicEntry { + conflictClause = "ON CONFLICT (feed_id, guid) WHERE owner_id IS NULL" + } else { + conflictClause = "ON CONFLICT (feed_id, owner_id, guid) WHERE owner_id IS NOT NULL" + } + + upsertQuery := fmt.Sprintf(` + INSERT INTO entries ( + feed_id, owner_id, guid, url, title, author, + summary, content_html, content_text, image_url, + published_at, word_count, + enclosure_url, enclosure_type, enclosure_length + ) + VALUES %s + %s + DO UPDATE SET + url = EXCLUDED.url, + title = EXCLUDED.title, + author = EXCLUDED.author, + summary = CASE + WHEN EXISTS (SELECT 1 FROM user_highlights WHERE entry_id = entries.id) + THEN entries.summary + ELSE EXCLUDED.summary + END, + content_html = CASE + WHEN EXISTS (SELECT 1 FROM user_highlights WHERE entry_id = entries.id) + THEN entries.content_html + ELSE EXCLUDED.content_html + END, + content_text = CASE + WHEN EXISTS (SELECT 1 FROM user_highlights WHERE entry_id = entries.id) + THEN entries.content_text + ELSE EXCLUDED.content_text + END, + image_url = EXCLUDED.image_url, + published_at = EXCLUDED.published_at, + word_count = EXCLUDED.word_count, + enclosure_url = EXCLUDED.enclosure_url, + enclosure_type = EXCLUDED.enclosure_type, + enclosure_length = EXCLUDED.enclosure_length + WHERE + entries.title IS DISTINCT FROM EXCLUDED.title + OR entries.content_html IS DISTINCT FROM EXCLUDED.content_html + OR entries.enclosure_url IS DISTINCT FROM EXCLUDED.enclosure_url + `, strings.Join(valueParameterPlaceholders, ", "), conflictClause) + commandTag, executeError := feedWriter.databaseConnectionPool.Exec(writeContext, upsertQuery, queryArguments...) + + if executeError != nil { + return 0, fmt.Errorf("failed to upsert feed entries: %w", executeError) + } + + return commandTag.RowsAffected(), nil +} + +func (feedWriter *Writer) UpdateFeedMetadata( + updateContext context.Context, + feedIdentifier string, + entityTag string, + lastModified string, + feedTitle string, + siteURL string, +) error { + currentTime := time.Now().UTC() + + var titleParameter *string + if feedTitle != "" { + titleParameter = &feedTitle + } + + var siteURLParameter *string + if siteURL != "" { + siteURLParameter = &siteURL + } + + updateQuery := ` + UPDATE feeds + SET + last_fetched_at = $1::timestamptz, + etag = $2, + last_modified = $3, + consecutive_failures = 0, + last_fetch_error = NULL, + last_fetch_error_at = NULL, + next_fetch_at = $1::timestamptz + (fetch_interval_seconds * INTERVAL '1 second'), + title = COALESCE($5, title), + site_url = COALESCE($6, site_url) + WHERE id = $4 + ` + _, executeError := feedWriter.databaseConnectionPool.Exec( + updateContext, + updateQuery, + currentTime, + entityTag, + lastModified, + feedIdentifier, + titleParameter, + siteURLParameter, + ) + + if executeError != nil { + return fmt.Errorf("failed to update feed metadata: %w", executeError) + } + + return nil +} + +func (feedWriter *Writer) UpdateFeedType( + updateContext context.Context, + feedIdentifier string, + feedType string, +) error { + updateQuery := `UPDATE feeds SET feed_type = $1 WHERE id = $2` + _, executeError := feedWriter.databaseConnectionPool.Exec( + updateContext, + updateQuery, + feedType, + feedIdentifier, + ) + + if executeError != nil { + return fmt.Errorf("failed to update feed type: %w", executeError) + } + + return nil +} + +func (feedWriter *Writer) RecordFeedError( + updateContext context.Context, + feedIdentifier string, + errorMessage string, +) error { + currentTime := time.Now().UTC() + updateQuery := ` + UPDATE feeds + SET + last_fetched_at = $1::timestamptz, + last_fetch_error = $2, + last_fetch_error_at = $1::timestamptz, + consecutive_failures = consecutive_failures + 1, + next_fetch_at = $1::timestamptz + (LEAST(POWER(2, consecutive_failures) * 60, 21600) * INTERVAL '1 second') + WHERE id = $3 + ` + _, executeError := feedWriter.databaseConnectionPool.Exec( + updateContext, + updateQuery, + currentTime, + errorMessage, + feedIdentifier, + ) + + if executeError != nil { + return fmt.Errorf("failed to record feed error: %w", executeError) + } + + return nil +} |