summaryrefslogtreecommitdiff
path: root/services/worker/internal/scheduler
diff options
context:
space:
mode:
authorFuwn <[email protected]>2026-02-07 01:42:57 -0800
committerFuwn <[email protected]>2026-02-07 01:42:57 -0800
commit5c5b1993edd890a80870ee05607ac5f088191d4e (patch)
treea721b76bcd49ba10826c53efc87302c7a689512f /services/worker/internal/scheduler
downloadasa.news-5c5b1993edd890a80870ee05607ac5f088191d4e.tar.xz
asa.news-5c5b1993edd890a80870ee05607ac5f088191d4e.zip
feat: asa.news RSS reader with developer tier, REST API, and webhooks
Full-stack RSS reader SaaS: Supabase + Next.js + Go worker. Includes three subscription tiers (free/pro/developer), API key auth, read-only REST API, webhook push notifications, Stripe billing with proration, and PWA support.
Diffstat (limited to 'services/worker/internal/scheduler')
-rw-r--r--services/worker/internal/scheduler/refresh.go196
-rw-r--r--services/worker/internal/scheduler/scheduler.go283
2 files changed, 479 insertions, 0 deletions
diff --git a/services/worker/internal/scheduler/refresh.go b/services/worker/internal/scheduler/refresh.go
new file mode 100644
index 0000000..8271fa0
--- /dev/null
+++ b/services/worker/internal/scheduler/refresh.go
@@ -0,0 +1,196 @@
+package scheduler
+
+import (
+ "context"
+ "github.com/Fuwn/asa-news/internal/fetcher"
+ "github.com/Fuwn/asa-news/internal/model"
+ "github.com/Fuwn/asa-news/internal/parser"
+ "github.com/Fuwn/asa-news/internal/webhook"
+ "github.com/Fuwn/asa-news/internal/writer"
+ "log/slog"
+)
+
+func ProcessRefreshRequest(
+ refreshContext context.Context,
+ feed model.Feed,
+ feedFetcher *fetcher.Fetcher,
+ feedParser *parser.Parser,
+ feedWriter *writer.Writer,
+ webhookDispatcher *webhook.Dispatcher,
+ logger *slog.Logger,
+) {
+ logger.Info(
+ "processing feed refresh",
+ "feed_identifier", feed.Identifier,
+ "feed_url", feed.URL,
+ )
+
+ var ownerIdentifier *string
+
+ if feed.Visibility == "authenticated" {
+ logger.Warn(
+ "authenticated feed refresh not yet implemented",
+ "feed_identifier", feed.Identifier,
+ )
+
+ return
+ }
+
+ authenticationConfig := fetcher.AuthenticationConfiguration{}
+
+ entityTag := ""
+
+ if feed.EntityTag != nil {
+ entityTag = *feed.EntityTag
+ }
+
+ lastModified := ""
+
+ if feed.LastModified != nil {
+ lastModified = *feed.LastModified
+ }
+
+ fetchResult, fetchError := feedFetcher.Fetch(
+ refreshContext,
+ feed.URL,
+ entityTag,
+ lastModified,
+ authenticationConfig,
+ )
+
+ if fetchError != nil {
+ logger.Warn(
+ "feed fetch failed",
+ "feed_identifier", feed.Identifier,
+ "error", fetchError,
+ )
+
+ recordError := feedWriter.RecordFeedError(refreshContext, feed.Identifier, fetchError.Error())
+
+ if recordError != nil {
+ logger.Error(
+ "failed to record feed error",
+ "feed_identifier", feed.Identifier,
+ "error", recordError,
+ )
+ }
+
+ return
+ }
+
+ if fetchResult.NotModified {
+ logger.Info(
+ "feed not modified",
+ "feed_identifier", feed.Identifier,
+ )
+
+ metadataError := feedWriter.UpdateFeedMetadata(
+ refreshContext,
+ feed.Identifier,
+ fetchResult.EntityTag,
+ fetchResult.LastModifiedHeader,
+ "",
+ "",
+ )
+
+ if metadataError != nil {
+ logger.Error(
+ "failed to update feed metadata after not-modified",
+ "feed_identifier", feed.Identifier,
+ "error", metadataError,
+ )
+ }
+
+ return
+ }
+
+ parseResult, parseError := feedParser.Parse(feed.Identifier, ownerIdentifier, fetchResult.Body)
+
+ if parseError != nil {
+ logger.Warn(
+ "feed parse failed",
+ "feed_identifier", feed.Identifier,
+ "error", parseError,
+ )
+
+ recordError := feedWriter.RecordFeedError(refreshContext, feed.Identifier, parseError.Error())
+
+ if recordError != nil {
+ logger.Error(
+ "failed to record parse error",
+ "feed_identifier", feed.Identifier,
+ "error", recordError,
+ )
+ }
+
+ return
+ }
+
+ rowsAffected, writeError := feedWriter.WriteEntries(refreshContext, parseResult.Entries)
+
+ if writeError != nil {
+ logger.Error(
+ "failed to write feed entries",
+ "feed_identifier", feed.Identifier,
+ "error", writeError,
+ )
+
+ recordError := feedWriter.RecordFeedError(refreshContext, feed.Identifier, writeError.Error())
+
+ if recordError != nil {
+ logger.Error(
+ "failed to record write error",
+ "feed_identifier", feed.Identifier,
+ "error", recordError,
+ )
+ }
+
+ return
+ }
+
+ if rowsAffected > 0 && webhookDispatcher != nil {
+ webhookDispatcher.DispatchForFeed(refreshContext, feed.Identifier, parseResult.Entries)
+ }
+
+ metadataError := feedWriter.UpdateFeedMetadata(
+ refreshContext,
+ feed.Identifier,
+ fetchResult.EntityTag,
+ fetchResult.LastModifiedHeader,
+ parseResult.FeedTitle,
+ parseResult.SiteURL,
+ )
+
+ if metadataError != nil {
+ logger.Error(
+ "failed to update feed metadata",
+ "feed_identifier", feed.Identifier,
+ "error", metadataError,
+ )
+ }
+
+ if parseResult.AudioEnclosureRatio > 0.5 {
+ if feedTypeError := feedWriter.UpdateFeedType(refreshContext, feed.Identifier, "podcast"); feedTypeError != nil {
+ logger.Error(
+ "failed to update feed type to podcast",
+ "feed_identifier", feed.Identifier,
+ "error", feedTypeError,
+ )
+ }
+ } else if feed.FeedType != nil && *feed.FeedType == "podcast" {
+ if feedTypeError := feedWriter.UpdateFeedType(refreshContext, feed.Identifier, "rss"); feedTypeError != nil {
+ logger.Error(
+ "failed to clear podcast feed type",
+ "feed_identifier", feed.Identifier,
+ "error", feedTypeError,
+ )
+ }
+ }
+
+ logger.Info(
+ "feed refresh completed",
+ "feed_identifier", feed.Identifier,
+ "entries_parsed", len(parseResult.Entries),
+ "rows_affected", rowsAffected,
+ )
+}
diff --git a/services/worker/internal/scheduler/scheduler.go b/services/worker/internal/scheduler/scheduler.go
new file mode 100644
index 0000000..646e263
--- /dev/null
+++ b/services/worker/internal/scheduler/scheduler.go
@@ -0,0 +1,283 @@
+package scheduler
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "github.com/Fuwn/asa-news/internal/fetcher"
+ "github.com/Fuwn/asa-news/internal/model"
+ "github.com/Fuwn/asa-news/internal/parser"
+ "github.com/Fuwn/asa-news/internal/pool"
+ "github.com/Fuwn/asa-news/internal/webhook"
+ "github.com/Fuwn/asa-news/internal/writer"
+ pgmq "github.com/craigpastro/pgmq-go"
+ "github.com/jackc/pgx/v5/pgxpool"
+ "log/slog"
+ "time"
+)
+
+type Scheduler struct {
+ databaseConnectionPool *pgxpool.Pool
+ feedFetcher *fetcher.Fetcher
+ feedParser *parser.Parser
+ feedWriter *writer.Writer
+ webhookDispatcher *webhook.Dispatcher
+ workerPool *pool.WorkerPool
+ pollInterval time.Duration
+ queuePollInterval time.Duration
+ batchSize int
+ logger *slog.Logger
+}
+
+func NewScheduler(
+ databaseConnectionPool *pgxpool.Pool,
+ feedFetcher *fetcher.Fetcher,
+ feedParser *parser.Parser,
+ feedWriter *writer.Writer,
+ webhookDispatcher *webhook.Dispatcher,
+ workerPool *pool.WorkerPool,
+ pollInterval time.Duration,
+ queuePollInterval time.Duration,
+ batchSize int,
+ logger *slog.Logger,
+) *Scheduler {
+ return &Scheduler{
+ databaseConnectionPool: databaseConnectionPool,
+ feedFetcher: feedFetcher,
+ feedParser: feedParser,
+ feedWriter: feedWriter,
+ webhookDispatcher: webhookDispatcher,
+ workerPool: workerPool,
+ pollInterval: pollInterval,
+ queuePollInterval: queuePollInterval,
+ batchSize: batchSize,
+ logger: logger,
+ }
+}
+
+func (feedScheduler *Scheduler) Run(schedulerContext context.Context) {
+ pollTicker := time.NewTicker(feedScheduler.pollInterval)
+
+ defer pollTicker.Stop()
+
+ queueTicker := time.NewTicker(feedScheduler.queuePollInterval)
+
+ defer queueTicker.Stop()
+
+ feedScheduler.logger.Info("scheduler started",
+ "poll_interval", feedScheduler.pollInterval,
+ "queue_poll_interval", feedScheduler.queuePollInterval,
+ "batch_size", feedScheduler.batchSize,
+ )
+ feedScheduler.executePollCycle(schedulerContext)
+ feedScheduler.executeQueueCycle(schedulerContext)
+
+ for {
+ select {
+ case <-schedulerContext.Done():
+ feedScheduler.logger.Info("scheduler shutting down")
+
+ return
+ case <-pollTicker.C:
+ feedScheduler.executePollCycle(schedulerContext)
+ case <-queueTicker.C:
+ feedScheduler.executeQueueCycle(schedulerContext)
+ }
+ }
+}
+
+func (feedScheduler *Scheduler) executePollCycle(cycleContext context.Context) {
+ claimedFeeds, claimError := feedScheduler.claimDueFeeds(cycleContext)
+
+ if claimError != nil {
+ feedScheduler.logger.Error("failed to claim due feeds", "error", claimError)
+
+ return
+ }
+
+ if len(claimedFeeds) == 0 {
+ return
+ }
+
+ feedScheduler.logger.Info("claimed feeds for processing", "count", len(claimedFeeds))
+
+ for _, claimedFeed := range claimedFeeds {
+ capturedFeed := claimedFeed
+
+ feedScheduler.workerPool.Submit(cycleContext, func(workContext context.Context) {
+ ProcessRefreshRequest(
+ workContext,
+ capturedFeed,
+ feedScheduler.feedFetcher,
+ feedScheduler.feedParser,
+ feedScheduler.feedWriter,
+ feedScheduler.webhookDispatcher,
+ feedScheduler.logger,
+ )
+ })
+ }
+}
+
+func (feedScheduler *Scheduler) executeQueueCycle(cycleContext context.Context) {
+ queueMessage, readError := pgmq.Read(cycleContext, feedScheduler.databaseConnectionPool, "feed_refresh", 30)
+
+ if readError != nil {
+ return
+ }
+
+ if queueMessage == nil {
+ return
+ }
+
+ var refreshRequest model.RefreshRequest
+
+ unmarshalError := json.Unmarshal(queueMessage.Message, &refreshRequest)
+
+ if unmarshalError != nil {
+ feedScheduler.logger.Error("failed to unmarshal refresh request", "error", unmarshalError)
+
+ return
+ }
+
+ feed, lookupError := feedScheduler.lookupFeed(cycleContext, refreshRequest.FeedIdentifier)
+
+ if lookupError != nil {
+ feedScheduler.logger.Error(
+ "failed to look up feed for queue request",
+ "feed_identifier", refreshRequest.FeedIdentifier,
+ "error", lookupError,
+ )
+
+ return
+ }
+
+ capturedMessageIdentifier := queueMessage.MsgID
+
+ feedScheduler.workerPool.Submit(cycleContext, func(workContext context.Context) {
+ ProcessRefreshRequest(
+ workContext,
+ feed,
+ feedScheduler.feedFetcher,
+ feedScheduler.feedParser,
+ feedScheduler.feedWriter,
+ feedScheduler.webhookDispatcher,
+ feedScheduler.logger,
+ )
+
+ _, archiveError := pgmq.Archive(workContext, feedScheduler.databaseConnectionPool, "feed_refresh", capturedMessageIdentifier)
+
+ if archiveError != nil {
+ feedScheduler.logger.Error(
+ "failed to archive queue message",
+ "message_id", capturedMessageIdentifier,
+ "error", archiveError,
+ )
+ }
+ })
+}
+
+func (feedScheduler *Scheduler) claimDueFeeds(claimContext context.Context) ([]model.Feed, error) {
+ claimQuery := `
+ UPDATE feeds
+ SET last_fetched_at = NOW()
+ WHERE id IN (
+ SELECT id
+ FROM feeds
+ WHERE next_fetch_at <= NOW()
+ AND subscriber_count > 0
+ ORDER BY next_fetch_at ASC
+ LIMIT $1
+ FOR UPDATE SKIP LOCKED
+ )
+ RETURNING
+ id, url, site_url, title, feed_type,
+ visibility, etag, last_modified,
+ last_fetched_at, last_fetch_error,
+ consecutive_failures, next_fetch_at,
+ fetch_interval_seconds,
+ created_at, updated_at
+ `
+ rows, queryError := feedScheduler.databaseConnectionPool.Query(claimContext, claimQuery, feedScheduler.batchSize)
+
+ if queryError != nil {
+ return nil, fmt.Errorf("failed to query due feeds: %w", queryError)
+ }
+
+ defer rows.Close()
+
+ claimedFeeds := make([]model.Feed, 0)
+
+ for rows.Next() {
+ var feed model.Feed
+
+ scanError := rows.Scan(
+ &feed.Identifier,
+ &feed.URL,
+ &feed.SiteURL,
+ &feed.Title,
+ &feed.FeedType,
+ &feed.Visibility,
+ &feed.EntityTag,
+ &feed.LastModified,
+ &feed.LastFetchedAt,
+ &feed.LastFetchError,
+ &feed.ConsecutiveFailures,
+ &feed.NextFetchAt,
+ &feed.FetchIntervalSeconds,
+ &feed.CreatedAt,
+ &feed.UpdatedAt,
+ )
+
+ if scanError != nil {
+ return nil, fmt.Errorf("failed to scan feed row: %w", scanError)
+ }
+
+ claimedFeeds = append(claimedFeeds, feed)
+ }
+
+ if rows.Err() != nil {
+ return nil, fmt.Errorf("error iterating feed rows: %w", rows.Err())
+ }
+
+ return claimedFeeds, nil
+}
+
+func (feedScheduler *Scheduler) lookupFeed(lookupContext context.Context, feedIdentifier string) (model.Feed, error) {
+ lookupQuery := `
+ SELECT
+ id, url, site_url, title, feed_type,
+ visibility, etag, last_modified,
+ last_fetched_at, last_fetch_error,
+ consecutive_failures, next_fetch_at,
+ fetch_interval_seconds,
+ created_at, updated_at
+ FROM feeds
+ WHERE id = $1
+ `
+
+ var feed model.Feed
+
+ scanError := feedScheduler.databaseConnectionPool.QueryRow(lookupContext, lookupQuery, feedIdentifier).Scan(
+ &feed.Identifier,
+ &feed.URL,
+ &feed.SiteURL,
+ &feed.Title,
+ &feed.FeedType,
+ &feed.Visibility,
+ &feed.EntityTag,
+ &feed.LastModified,
+ &feed.LastFetchedAt,
+ &feed.LastFetchError,
+ &feed.ConsecutiveFailures,
+ &feed.NextFetchAt,
+ &feed.FetchIntervalSeconds,
+ &feed.CreatedAt,
+ &feed.UpdatedAt,
+ )
+
+ if scanError != nil {
+ return model.Feed{}, fmt.Errorf("failed to look up feed %s: %w", feedIdentifier, scanError)
+ }
+
+ return feed, nil
+}