diff options
| author | Fuwn <[email protected]> | 2026-02-07 05:45:41 -0800 |
|---|---|---|
| committer | Fuwn <[email protected]> | 2026-02-07 05:45:41 -0800 |
| commit | 6368c74432ced80e0ac6ad2c5fe9c2495d1bc6ae (patch) | |
| tree | c8b583a21bd489170b8f664e8c028fbbd9d95d49 /services/worker | |
| parent | fix: resolve 6 pre-ship audit bugs (diff) | |
| download | asa.news-6368c74432ced80e0ac6ad2c5fe9c2495d1bc6ae.tar.xz asa.news-6368c74432ced80e0ac6ad2c5fe9c2495d1bc6ae.zip | |
feat: resolve 7 pre-ship QoL items
- Space/Shift+Space: page down/up in detail panel (80% scroll)
- Content font: sans-serif/serif/monospace selector in appearance
settings, applied to article content in detail panel
- Accessibility: entry-list-item uses button instead of div, folder
toggles have aria-expanded, shortcut keys have aria-labels
- Share notes: replaced window.prompt with proper modal dialog
matching existing UI patterns
- Worker .env.example: template with all 10 environment variables
- Worker poisoned messages: archive unprocessable queue messages
instead of leaving them stuck forever
- Worker pool Submit: check return value, reschedule dropped feeds
30s into the future, log warnings for rejected submissions
Diffstat (limited to 'services/worker')
| -rw-r--r-- | services/worker/.env.example | 29 | ||||
| -rw-r--r-- | services/worker/internal/scheduler/scheduler.go | 78 |
2 files changed, 102 insertions, 5 deletions
diff --git a/services/worker/.env.example b/services/worker/.env.example new file mode 100644 index 0000000..4a4a706 --- /dev/null +++ b/services/worker/.env.example @@ -0,0 +1,29 @@ +# Database connection string (required) +DATABASE_URL=postgresql://user:password@localhost:5432/asa_news + +# Number of concurrent workers processing feed updates +WORKER_CONCURRENCY=10 + +# Interval for polling the job queue (duration string, e.g., 30s, 5m) +POLL_INTERVAL=30s + +# Timeout for individual feed fetch operations +FETCH_TIMEOUT=30s + +# Interval for checking the task queue +QUEUE_POLL_INTERVAL=5s + +# Number of entries to process per batch +BATCH_SIZE=50 + +# Port for health check HTTP server +HEALTH_PORT=8080 + +# Encryption key for sensitive vault credentials (optional) +ENCRYPTION_KEY= + +# Log level: debug, info, warn, error +LOG_LEVEL=info + +# Enable JSON structured logging output +LOG_JSON=false diff --git a/services/worker/internal/scheduler/scheduler.go b/services/worker/internal/scheduler/scheduler.go index 646e263..19023e1 100644 --- a/services/worker/internal/scheduler/scheduler.go +++ b/services/worker/internal/scheduler/scheduler.go @@ -104,7 +104,7 @@ func (feedScheduler *Scheduler) executePollCycle(cycleContext context.Context) { for _, claimedFeed := range claimedFeeds { capturedFeed := claimedFeed - feedScheduler.workerPool.Submit(cycleContext, func(workContext context.Context) { + submitted := feedScheduler.workerPool.Submit(cycleContext, func(workContext context.Context) { ProcessRefreshRequest( workContext, capturedFeed, @@ -115,6 +115,16 @@ func (feedScheduler *Scheduler) executePollCycle(cycleContext context.Context) { feedScheduler.logger, ) }) + + if !submitted { + feedScheduler.logger.Warn( + "failed to submit feed refresh to worker pool, rescheduling", + "feed_identifier", capturedFeed.Identifier, + "feed_url", capturedFeed.URL, + ) + + feedScheduler.rescheduleDroppedFeed(cycleContext, capturedFeed) + } } } @@ -134,7 +144,13 @@ func (feedScheduler *Scheduler) executeQueueCycle(cycleContext context.Context) unmarshalError := json.Unmarshal(queueMessage.Message, &refreshRequest) if unmarshalError != nil { - feedScheduler.logger.Error("failed to unmarshal refresh request", "error", unmarshalError) + feedScheduler.logger.Error( + "failed to unmarshal refresh request, archiving poisoned message", + "message_identifier", queueMessage.MsgID, + "error", unmarshalError, + ) + + feedScheduler.archivePoisonedMessage(cycleContext, queueMessage.MsgID) return } @@ -143,17 +159,20 @@ func (feedScheduler *Scheduler) executeQueueCycle(cycleContext context.Context) if lookupError != nil { feedScheduler.logger.Error( - "failed to look up feed for queue request", + "failed to look up feed for queue request, archiving unresolvable message", + "message_identifier", queueMessage.MsgID, "feed_identifier", refreshRequest.FeedIdentifier, "error", lookupError, ) + feedScheduler.archivePoisonedMessage(cycleContext, queueMessage.MsgID) + return } capturedMessageIdentifier := queueMessage.MsgID - feedScheduler.workerPool.Submit(cycleContext, func(workContext context.Context) { + submitted := feedScheduler.workerPool.Submit(cycleContext, func(workContext context.Context) { ProcessRefreshRequest( workContext, feed, @@ -169,11 +188,20 @@ func (feedScheduler *Scheduler) executeQueueCycle(cycleContext context.Context) if archiveError != nil { feedScheduler.logger.Error( "failed to archive queue message", - "message_id", capturedMessageIdentifier, + "message_identifier", capturedMessageIdentifier, "error", archiveError, ) } }) + + if !submitted { + feedScheduler.logger.Warn( + "failed to submit queue refresh to worker pool, message will retry after visibility timeout", + "message_identifier", capturedMessageIdentifier, + "feed_identifier", feed.Identifier, + "feed_url", feed.URL, + ) + } } func (feedScheduler *Scheduler) claimDueFeeds(claimContext context.Context) ([]model.Feed, error) { @@ -281,3 +309,43 @@ func (feedScheduler *Scheduler) lookupFeed(lookupContext context.Context, feedId return feed, nil } + +func (feedScheduler *Scheduler) archivePoisonedMessage(archiveContext context.Context, messageIdentifier int64) { + _, archiveError := pgmq.Archive(archiveContext, feedScheduler.databaseConnectionPool, "feed_refresh", messageIdentifier) + + if archiveError != nil { + feedScheduler.logger.Error( + "failed to archive poisoned queue message", + "message_identifier", messageIdentifier, + "error", archiveError, + ) + } +} + +const droppedFeedRescheduleDelay = 30 * time.Second + +func (feedScheduler *Scheduler) rescheduleDroppedFeed(rescheduleContext context.Context, droppedFeed model.Feed) { + rescheduleQuery := ` + UPDATE feeds + SET next_fetch_at = $1 + WHERE id = $2 + ` + + rescheduleTime := time.Now().UTC().Add(droppedFeedRescheduleDelay) + + _, executeError := feedScheduler.databaseConnectionPool.Exec( + rescheduleContext, + rescheduleQuery, + rescheduleTime, + droppedFeed.Identifier, + ) + + if executeError != nil { + feedScheduler.logger.Error( + "failed to reschedule dropped feed", + "feed_identifier", droppedFeed.Identifier, + "feed_url", droppedFeed.URL, + "error", executeError, + ) + } +} |