summaryrefslogtreecommitdiff
path: root/services/worker
diff options
context:
space:
mode:
authorFuwn <[email protected]>2026-02-07 05:45:41 -0800
committerFuwn <[email protected]>2026-02-07 05:45:41 -0800
commit6368c74432ced80e0ac6ad2c5fe9c2495d1bc6ae (patch)
treec8b583a21bd489170b8f664e8c028fbbd9d95d49 /services/worker
parentfix: resolve 6 pre-ship audit bugs (diff)
downloadasa.news-6368c74432ced80e0ac6ad2c5fe9c2495d1bc6ae.tar.xz
asa.news-6368c74432ced80e0ac6ad2c5fe9c2495d1bc6ae.zip
feat: resolve 7 pre-ship QoL items
- Space/Shift+Space: page down/up in detail panel (80% scroll) - Content font: sans-serif/serif/monospace selector in appearance settings, applied to article content in detail panel - Accessibility: entry-list-item uses button instead of div, folder toggles have aria-expanded, shortcut keys have aria-labels - Share notes: replaced window.prompt with proper modal dialog matching existing UI patterns - Worker .env.example: template with all 10 environment variables - Worker poisoned messages: archive unprocessable queue messages instead of leaving them stuck forever - Worker pool Submit: check return value, reschedule dropped feeds 30s into the future, log warnings for rejected submissions
Diffstat (limited to 'services/worker')
-rw-r--r--services/worker/.env.example29
-rw-r--r--services/worker/internal/scheduler/scheduler.go78
2 files changed, 102 insertions, 5 deletions
diff --git a/services/worker/.env.example b/services/worker/.env.example
new file mode 100644
index 0000000..4a4a706
--- /dev/null
+++ b/services/worker/.env.example
@@ -0,0 +1,29 @@
+# Database connection string (required)
+DATABASE_URL=postgresql://user:password@localhost:5432/asa_news
+
+# Number of concurrent workers processing feed updates
+WORKER_CONCURRENCY=10
+
+# Interval for polling the job queue (duration string, e.g., 30s, 5m)
+POLL_INTERVAL=30s
+
+# Timeout for individual feed fetch operations
+FETCH_TIMEOUT=30s
+
+# Interval for checking the task queue
+QUEUE_POLL_INTERVAL=5s
+
+# Number of entries to process per batch
+BATCH_SIZE=50
+
+# Port for health check HTTP server
+HEALTH_PORT=8080
+
+# Encryption key for sensitive vault credentials (optional)
+ENCRYPTION_KEY=
+
+# Log level: debug, info, warn, error
+LOG_LEVEL=info
+
+# Enable JSON structured logging output
+LOG_JSON=false
diff --git a/services/worker/internal/scheduler/scheduler.go b/services/worker/internal/scheduler/scheduler.go
index 646e263..19023e1 100644
--- a/services/worker/internal/scheduler/scheduler.go
+++ b/services/worker/internal/scheduler/scheduler.go
@@ -104,7 +104,7 @@ func (feedScheduler *Scheduler) executePollCycle(cycleContext context.Context) {
for _, claimedFeed := range claimedFeeds {
capturedFeed := claimedFeed
- feedScheduler.workerPool.Submit(cycleContext, func(workContext context.Context) {
+ submitted := feedScheduler.workerPool.Submit(cycleContext, func(workContext context.Context) {
ProcessRefreshRequest(
workContext,
capturedFeed,
@@ -115,6 +115,16 @@ func (feedScheduler *Scheduler) executePollCycle(cycleContext context.Context) {
feedScheduler.logger,
)
})
+
+ if !submitted {
+ feedScheduler.logger.Warn(
+ "failed to submit feed refresh to worker pool, rescheduling",
+ "feed_identifier", capturedFeed.Identifier,
+ "feed_url", capturedFeed.URL,
+ )
+
+ feedScheduler.rescheduleDroppedFeed(cycleContext, capturedFeed)
+ }
}
}
@@ -134,7 +144,13 @@ func (feedScheduler *Scheduler) executeQueueCycle(cycleContext context.Context)
unmarshalError := json.Unmarshal(queueMessage.Message, &refreshRequest)
if unmarshalError != nil {
- feedScheduler.logger.Error("failed to unmarshal refresh request", "error", unmarshalError)
+ feedScheduler.logger.Error(
+ "failed to unmarshal refresh request, archiving poisoned message",
+ "message_identifier", queueMessage.MsgID,
+ "error", unmarshalError,
+ )
+
+ feedScheduler.archivePoisonedMessage(cycleContext, queueMessage.MsgID)
return
}
@@ -143,17 +159,20 @@ func (feedScheduler *Scheduler) executeQueueCycle(cycleContext context.Context)
if lookupError != nil {
feedScheduler.logger.Error(
- "failed to look up feed for queue request",
+ "failed to look up feed for queue request, archiving unresolvable message",
+ "message_identifier", queueMessage.MsgID,
"feed_identifier", refreshRequest.FeedIdentifier,
"error", lookupError,
)
+ feedScheduler.archivePoisonedMessage(cycleContext, queueMessage.MsgID)
+
return
}
capturedMessageIdentifier := queueMessage.MsgID
- feedScheduler.workerPool.Submit(cycleContext, func(workContext context.Context) {
+ submitted := feedScheduler.workerPool.Submit(cycleContext, func(workContext context.Context) {
ProcessRefreshRequest(
workContext,
feed,
@@ -169,11 +188,20 @@ func (feedScheduler *Scheduler) executeQueueCycle(cycleContext context.Context)
if archiveError != nil {
feedScheduler.logger.Error(
"failed to archive queue message",
- "message_id", capturedMessageIdentifier,
+ "message_identifier", capturedMessageIdentifier,
"error", archiveError,
)
}
})
+
+ if !submitted {
+ feedScheduler.logger.Warn(
+ "failed to submit queue refresh to worker pool, message will retry after visibility timeout",
+ "message_identifier", capturedMessageIdentifier,
+ "feed_identifier", feed.Identifier,
+ "feed_url", feed.URL,
+ )
+ }
}
func (feedScheduler *Scheduler) claimDueFeeds(claimContext context.Context) ([]model.Feed, error) {
@@ -281,3 +309,43 @@ func (feedScheduler *Scheduler) lookupFeed(lookupContext context.Context, feedId
return feed, nil
}
+
+func (feedScheduler *Scheduler) archivePoisonedMessage(archiveContext context.Context, messageIdentifier int64) {
+ _, archiveError := pgmq.Archive(archiveContext, feedScheduler.databaseConnectionPool, "feed_refresh", messageIdentifier)
+
+ if archiveError != nil {
+ feedScheduler.logger.Error(
+ "failed to archive poisoned queue message",
+ "message_identifier", messageIdentifier,
+ "error", archiveError,
+ )
+ }
+}
+
+const droppedFeedRescheduleDelay = 30 * time.Second
+
+func (feedScheduler *Scheduler) rescheduleDroppedFeed(rescheduleContext context.Context, droppedFeed model.Feed) {
+ rescheduleQuery := `
+ UPDATE feeds
+ SET next_fetch_at = $1
+ WHERE id = $2
+ `
+
+ rescheduleTime := time.Now().UTC().Add(droppedFeedRescheduleDelay)
+
+ _, executeError := feedScheduler.databaseConnectionPool.Exec(
+ rescheduleContext,
+ rescheduleQuery,
+ rescheduleTime,
+ droppedFeed.Identifier,
+ )
+
+ if executeError != nil {
+ feedScheduler.logger.Error(
+ "failed to reschedule dropped feed",
+ "feed_identifier", droppedFeed.Identifier,
+ "feed_url", droppedFeed.URL,
+ "error", executeError,
+ )
+ }
+}