diff options
| author | Fuwn <[email protected]> | 2026-02-07 01:42:57 -0800 |
|---|---|---|
| committer | Fuwn <[email protected]> | 2026-02-07 01:42:57 -0800 |
| commit | 5c5b1993edd890a80870ee05607ac5f088191d4e (patch) | |
| tree | a721b76bcd49ba10826c53efc87302c7a689512f /services/worker/internal/scheduler | |
| download | asa.news-5c5b1993edd890a80870ee05607ac5f088191d4e.tar.xz asa.news-5c5b1993edd890a80870ee05607ac5f088191d4e.zip | |
feat: asa.news RSS reader with developer tier, REST API, and webhooks
Full-stack RSS reader SaaS: Supabase + Next.js + Go worker.
Includes three subscription tiers (free/pro/developer), API key auth,
read-only REST API, webhook push notifications, Stripe billing with
proration, and PWA support.
Diffstat (limited to 'services/worker/internal/scheduler')
| -rw-r--r-- | services/worker/internal/scheduler/refresh.go | 196 | ||||
| -rw-r--r-- | services/worker/internal/scheduler/scheduler.go | 283 |
2 files changed, 479 insertions, 0 deletions
diff --git a/services/worker/internal/scheduler/refresh.go b/services/worker/internal/scheduler/refresh.go new file mode 100644 index 0000000..8271fa0 --- /dev/null +++ b/services/worker/internal/scheduler/refresh.go @@ -0,0 +1,196 @@ +package scheduler + +import ( + "context" + "github.com/Fuwn/asa-news/internal/fetcher" + "github.com/Fuwn/asa-news/internal/model" + "github.com/Fuwn/asa-news/internal/parser" + "github.com/Fuwn/asa-news/internal/webhook" + "github.com/Fuwn/asa-news/internal/writer" + "log/slog" +) + +func ProcessRefreshRequest( + refreshContext context.Context, + feed model.Feed, + feedFetcher *fetcher.Fetcher, + feedParser *parser.Parser, + feedWriter *writer.Writer, + webhookDispatcher *webhook.Dispatcher, + logger *slog.Logger, +) { + logger.Info( + "processing feed refresh", + "feed_identifier", feed.Identifier, + "feed_url", feed.URL, + ) + + var ownerIdentifier *string + + if feed.Visibility == "authenticated" { + logger.Warn( + "authenticated feed refresh not yet implemented", + "feed_identifier", feed.Identifier, + ) + + return + } + + authenticationConfig := fetcher.AuthenticationConfiguration{} + + entityTag := "" + + if feed.EntityTag != nil { + entityTag = *feed.EntityTag + } + + lastModified := "" + + if feed.LastModified != nil { + lastModified = *feed.LastModified + } + + fetchResult, fetchError := feedFetcher.Fetch( + refreshContext, + feed.URL, + entityTag, + lastModified, + authenticationConfig, + ) + + if fetchError != nil { + logger.Warn( + "feed fetch failed", + "feed_identifier", feed.Identifier, + "error", fetchError, + ) + + recordError := feedWriter.RecordFeedError(refreshContext, feed.Identifier, fetchError.Error()) + + if recordError != nil { + logger.Error( + "failed to record feed error", + "feed_identifier", feed.Identifier, + "error", recordError, + ) + } + + return + } + + if fetchResult.NotModified { + logger.Info( + "feed not modified", + "feed_identifier", feed.Identifier, + ) + + metadataError := feedWriter.UpdateFeedMetadata( + refreshContext, + feed.Identifier, + fetchResult.EntityTag, + fetchResult.LastModifiedHeader, + "", + "", + ) + + if metadataError != nil { + logger.Error( + "failed to update feed metadata after not-modified", + "feed_identifier", feed.Identifier, + "error", metadataError, + ) + } + + return + } + + parseResult, parseError := feedParser.Parse(feed.Identifier, ownerIdentifier, fetchResult.Body) + + if parseError != nil { + logger.Warn( + "feed parse failed", + "feed_identifier", feed.Identifier, + "error", parseError, + ) + + recordError := feedWriter.RecordFeedError(refreshContext, feed.Identifier, parseError.Error()) + + if recordError != nil { + logger.Error( + "failed to record parse error", + "feed_identifier", feed.Identifier, + "error", recordError, + ) + } + + return + } + + rowsAffected, writeError := feedWriter.WriteEntries(refreshContext, parseResult.Entries) + + if writeError != nil { + logger.Error( + "failed to write feed entries", + "feed_identifier", feed.Identifier, + "error", writeError, + ) + + recordError := feedWriter.RecordFeedError(refreshContext, feed.Identifier, writeError.Error()) + + if recordError != nil { + logger.Error( + "failed to record write error", + "feed_identifier", feed.Identifier, + "error", recordError, + ) + } + + return + } + + if rowsAffected > 0 && webhookDispatcher != nil { + webhookDispatcher.DispatchForFeed(refreshContext, feed.Identifier, parseResult.Entries) + } + + metadataError := feedWriter.UpdateFeedMetadata( + refreshContext, + feed.Identifier, + fetchResult.EntityTag, + fetchResult.LastModifiedHeader, + parseResult.FeedTitle, + parseResult.SiteURL, + ) + + if metadataError != nil { + logger.Error( + "failed to update feed metadata", + "feed_identifier", feed.Identifier, + "error", metadataError, + ) + } + + if parseResult.AudioEnclosureRatio > 0.5 { + if feedTypeError := feedWriter.UpdateFeedType(refreshContext, feed.Identifier, "podcast"); feedTypeError != nil { + logger.Error( + "failed to update feed type to podcast", + "feed_identifier", feed.Identifier, + "error", feedTypeError, + ) + } + } else if feed.FeedType != nil && *feed.FeedType == "podcast" { + if feedTypeError := feedWriter.UpdateFeedType(refreshContext, feed.Identifier, "rss"); feedTypeError != nil { + logger.Error( + "failed to clear podcast feed type", + "feed_identifier", feed.Identifier, + "error", feedTypeError, + ) + } + } + + logger.Info( + "feed refresh completed", + "feed_identifier", feed.Identifier, + "entries_parsed", len(parseResult.Entries), + "rows_affected", rowsAffected, + ) +} diff --git a/services/worker/internal/scheduler/scheduler.go b/services/worker/internal/scheduler/scheduler.go new file mode 100644 index 0000000..646e263 --- /dev/null +++ b/services/worker/internal/scheduler/scheduler.go @@ -0,0 +1,283 @@ +package scheduler + +import ( + "context" + "encoding/json" + "fmt" + "github.com/Fuwn/asa-news/internal/fetcher" + "github.com/Fuwn/asa-news/internal/model" + "github.com/Fuwn/asa-news/internal/parser" + "github.com/Fuwn/asa-news/internal/pool" + "github.com/Fuwn/asa-news/internal/webhook" + "github.com/Fuwn/asa-news/internal/writer" + pgmq "github.com/craigpastro/pgmq-go" + "github.com/jackc/pgx/v5/pgxpool" + "log/slog" + "time" +) + +type Scheduler struct { + databaseConnectionPool *pgxpool.Pool + feedFetcher *fetcher.Fetcher + feedParser *parser.Parser + feedWriter *writer.Writer + webhookDispatcher *webhook.Dispatcher + workerPool *pool.WorkerPool + pollInterval time.Duration + queuePollInterval time.Duration + batchSize int + logger *slog.Logger +} + +func NewScheduler( + databaseConnectionPool *pgxpool.Pool, + feedFetcher *fetcher.Fetcher, + feedParser *parser.Parser, + feedWriter *writer.Writer, + webhookDispatcher *webhook.Dispatcher, + workerPool *pool.WorkerPool, + pollInterval time.Duration, + queuePollInterval time.Duration, + batchSize int, + logger *slog.Logger, +) *Scheduler { + return &Scheduler{ + databaseConnectionPool: databaseConnectionPool, + feedFetcher: feedFetcher, + feedParser: feedParser, + feedWriter: feedWriter, + webhookDispatcher: webhookDispatcher, + workerPool: workerPool, + pollInterval: pollInterval, + queuePollInterval: queuePollInterval, + batchSize: batchSize, + logger: logger, + } +} + +func (feedScheduler *Scheduler) Run(schedulerContext context.Context) { + pollTicker := time.NewTicker(feedScheduler.pollInterval) + + defer pollTicker.Stop() + + queueTicker := time.NewTicker(feedScheduler.queuePollInterval) + + defer queueTicker.Stop() + + feedScheduler.logger.Info("scheduler started", + "poll_interval", feedScheduler.pollInterval, + "queue_poll_interval", feedScheduler.queuePollInterval, + "batch_size", feedScheduler.batchSize, + ) + feedScheduler.executePollCycle(schedulerContext) + feedScheduler.executeQueueCycle(schedulerContext) + + for { + select { + case <-schedulerContext.Done(): + feedScheduler.logger.Info("scheduler shutting down") + + return + case <-pollTicker.C: + feedScheduler.executePollCycle(schedulerContext) + case <-queueTicker.C: + feedScheduler.executeQueueCycle(schedulerContext) + } + } +} + +func (feedScheduler *Scheduler) executePollCycle(cycleContext context.Context) { + claimedFeeds, claimError := feedScheduler.claimDueFeeds(cycleContext) + + if claimError != nil { + feedScheduler.logger.Error("failed to claim due feeds", "error", claimError) + + return + } + + if len(claimedFeeds) == 0 { + return + } + + feedScheduler.logger.Info("claimed feeds for processing", "count", len(claimedFeeds)) + + for _, claimedFeed := range claimedFeeds { + capturedFeed := claimedFeed + + feedScheduler.workerPool.Submit(cycleContext, func(workContext context.Context) { + ProcessRefreshRequest( + workContext, + capturedFeed, + feedScheduler.feedFetcher, + feedScheduler.feedParser, + feedScheduler.feedWriter, + feedScheduler.webhookDispatcher, + feedScheduler.logger, + ) + }) + } +} + +func (feedScheduler *Scheduler) executeQueueCycle(cycleContext context.Context) { + queueMessage, readError := pgmq.Read(cycleContext, feedScheduler.databaseConnectionPool, "feed_refresh", 30) + + if readError != nil { + return + } + + if queueMessage == nil { + return + } + + var refreshRequest model.RefreshRequest + + unmarshalError := json.Unmarshal(queueMessage.Message, &refreshRequest) + + if unmarshalError != nil { + feedScheduler.logger.Error("failed to unmarshal refresh request", "error", unmarshalError) + + return + } + + feed, lookupError := feedScheduler.lookupFeed(cycleContext, refreshRequest.FeedIdentifier) + + if lookupError != nil { + feedScheduler.logger.Error( + "failed to look up feed for queue request", + "feed_identifier", refreshRequest.FeedIdentifier, + "error", lookupError, + ) + + return + } + + capturedMessageIdentifier := queueMessage.MsgID + + feedScheduler.workerPool.Submit(cycleContext, func(workContext context.Context) { + ProcessRefreshRequest( + workContext, + feed, + feedScheduler.feedFetcher, + feedScheduler.feedParser, + feedScheduler.feedWriter, + feedScheduler.webhookDispatcher, + feedScheduler.logger, + ) + + _, archiveError := pgmq.Archive(workContext, feedScheduler.databaseConnectionPool, "feed_refresh", capturedMessageIdentifier) + + if archiveError != nil { + feedScheduler.logger.Error( + "failed to archive queue message", + "message_id", capturedMessageIdentifier, + "error", archiveError, + ) + } + }) +} + +func (feedScheduler *Scheduler) claimDueFeeds(claimContext context.Context) ([]model.Feed, error) { + claimQuery := ` + UPDATE feeds + SET last_fetched_at = NOW() + WHERE id IN ( + SELECT id + FROM feeds + WHERE next_fetch_at <= NOW() + AND subscriber_count > 0 + ORDER BY next_fetch_at ASC + LIMIT $1 + FOR UPDATE SKIP LOCKED + ) + RETURNING + id, url, site_url, title, feed_type, + visibility, etag, last_modified, + last_fetched_at, last_fetch_error, + consecutive_failures, next_fetch_at, + fetch_interval_seconds, + created_at, updated_at + ` + rows, queryError := feedScheduler.databaseConnectionPool.Query(claimContext, claimQuery, feedScheduler.batchSize) + + if queryError != nil { + return nil, fmt.Errorf("failed to query due feeds: %w", queryError) + } + + defer rows.Close() + + claimedFeeds := make([]model.Feed, 0) + + for rows.Next() { + var feed model.Feed + + scanError := rows.Scan( + &feed.Identifier, + &feed.URL, + &feed.SiteURL, + &feed.Title, + &feed.FeedType, + &feed.Visibility, + &feed.EntityTag, + &feed.LastModified, + &feed.LastFetchedAt, + &feed.LastFetchError, + &feed.ConsecutiveFailures, + &feed.NextFetchAt, + &feed.FetchIntervalSeconds, + &feed.CreatedAt, + &feed.UpdatedAt, + ) + + if scanError != nil { + return nil, fmt.Errorf("failed to scan feed row: %w", scanError) + } + + claimedFeeds = append(claimedFeeds, feed) + } + + if rows.Err() != nil { + return nil, fmt.Errorf("error iterating feed rows: %w", rows.Err()) + } + + return claimedFeeds, nil +} + +func (feedScheduler *Scheduler) lookupFeed(lookupContext context.Context, feedIdentifier string) (model.Feed, error) { + lookupQuery := ` + SELECT + id, url, site_url, title, feed_type, + visibility, etag, last_modified, + last_fetched_at, last_fetch_error, + consecutive_failures, next_fetch_at, + fetch_interval_seconds, + created_at, updated_at + FROM feeds + WHERE id = $1 + ` + + var feed model.Feed + + scanError := feedScheduler.databaseConnectionPool.QueryRow(lookupContext, lookupQuery, feedIdentifier).Scan( + &feed.Identifier, + &feed.URL, + &feed.SiteURL, + &feed.Title, + &feed.FeedType, + &feed.Visibility, + &feed.EntityTag, + &feed.LastModified, + &feed.LastFetchedAt, + &feed.LastFetchError, + &feed.ConsecutiveFailures, + &feed.NextFetchAt, + &feed.FetchIntervalSeconds, + &feed.CreatedAt, + &feed.UpdatedAt, + ) + + if scanError != nil { + return model.Feed{}, fmt.Errorf("failed to look up feed %s: %w", feedIdentifier, scanError) + } + + return feed, nil +} |