diff options
| author | Fuwn <[email protected]> | 2026-02-08 01:35:41 -0800 |
|---|---|---|
| committer | Fuwn <[email protected]> | 2026-02-08 01:35:41 -0800 |
| commit | b8a9b40f786554e5a49511ce1b2dd2ea7f3db94c (patch) | |
| tree | 23fcdad5c10a7a269286459b4b666120d92f2af7 /services | |
| parent | fix: update worker Dockerfile to Go 1.24 to match go.mod requirement (diff) | |
| download | asa.news-b8a9b40f786554e5a49511ce1b2dd2ea7f3db94c.tar.xz asa.news-b8a9b40f786554e5a49511ce1b2dd2ea7f3db94c.zip | |
feat: implement authenticated feed support across worker and web app
Wire up the full authenticated feeds pipeline:
- Worker resolves credentials from Supabase Vault for authenticated feeds
- Worker sets owner_id on entries for per-user dedup
- query_param auth now parses name=value format
- Add-feed dialog shows auth type + credential fields for pro/developer
- Subscribe mutation passes credentials to RPC
- Sidebar and settings show [auth] indicator for authenticated feeds
Diffstat (limited to 'services')
| -rw-r--r-- | services/worker/internal/fetcher/authentication.go | 9 | ||||
| -rw-r--r-- | services/worker/internal/model/feed.go | 7 | ||||
| -rw-r--r-- | services/worker/internal/scheduler/refresh.go | 23 | ||||
| -rw-r--r-- | services/worker/internal/scheduler/scheduler.go | 58 |
4 files changed, 87 insertions, 10 deletions
diff --git a/services/worker/internal/fetcher/authentication.go b/services/worker/internal/fetcher/authentication.go index ba10196..9ee4539 100644 --- a/services/worker/internal/fetcher/authentication.go +++ b/services/worker/internal/fetcher/authentication.go @@ -5,6 +5,7 @@ import ( "fmt" "net/http" "net/url" + "strings" ) type AuthenticationConfiguration struct { @@ -21,6 +22,12 @@ func ApplyAuthentication(request *http.Request, authenticationConfig Authenticat request.Header.Set("Authorization", "Basic "+encodedCredentials) case "query_param": + parts := strings.SplitN(authenticationConfig.AuthenticationValue, "=", 2) + + if len(parts) != 2 { + return fmt.Errorf("query_param credential must be in format 'param_name=value', got: %s", authenticationConfig.AuthenticationValue) + } + existingURL, parseError := url.Parse(request.URL.String()) if parseError != nil { @@ -29,7 +36,7 @@ func ApplyAuthentication(request *http.Request, authenticationConfig Authenticat queryParameters := existingURL.Query() - queryParameters.Set("key", authenticationConfig.AuthenticationValue) + queryParameters.Set(parts[0], parts[1]) existingURL.RawQuery = queryParameters.Encode() request.URL = existingURL diff --git a/services/worker/internal/model/feed.go b/services/worker/internal/model/feed.go index 611c820..a903af0 100644 --- a/services/worker/internal/model/feed.go +++ b/services/worker/internal/model/feed.go @@ -18,8 +18,11 @@ type Feed struct { ConsecutiveFailures int NextFetchAt time.Time FetchIntervalSeconds int - CreatedAt time.Time - UpdatedAt time.Time + CreatedAt time.Time + UpdatedAt time.Time + SubscriberUserIdentifier *string + AuthenticationType *string + AuthenticationValue *string } type FeedEntry struct { diff --git a/services/worker/internal/scheduler/refresh.go b/services/worker/internal/scheduler/refresh.go index 229a20f..d1c5d26 100644 --- a/services/worker/internal/scheduler/refresh.go +++ b/services/worker/internal/scheduler/refresh.go @@ -27,16 +27,25 @@ func ProcessRefreshRequest( var ownerIdentifier *string + authenticationConfig := fetcher.AuthenticationConfiguration{} + if feed.Visibility == "authenticated" { - logger.Warn( - "authenticated feed refresh not yet implemented", - "feed_identifier", feed.Identifier, - ) + if feed.AuthenticationType == nil || feed.AuthenticationValue == nil { + logger.Warn( + "authenticated feed missing credentials, skipping", + "feed_identifier", feed.Identifier, + ) - return - } + return + } - authenticationConfig := fetcher.AuthenticationConfiguration{} + authenticationConfig = fetcher.AuthenticationConfiguration{ + AuthenticationType: *feed.AuthenticationType, + AuthenticationValue: *feed.AuthenticationValue, + } + + ownerIdentifier = feed.SubscriberUserIdentifier + } entityTag := "" if feed.EntityTag != nil { diff --git a/services/worker/internal/scheduler/scheduler.go b/services/worker/internal/scheduler/scheduler.go index 19023e1..f6ebacd 100644 --- a/services/worker/internal/scheduler/scheduler.go +++ b/services/worker/internal/scheduler/scheduler.go @@ -170,6 +170,8 @@ func (feedScheduler *Scheduler) executeQueueCycle(cycleContext context.Context) return } + feedScheduler.enrichAuthenticatedFeed(cycleContext, &feed) + capturedMessageIdentifier := queueMessage.MsgID submitted := feedScheduler.workerPool.Submit(cycleContext, func(workContext context.Context) { @@ -267,6 +269,10 @@ func (feedScheduler *Scheduler) claimDueFeeds(claimContext context.Context) ([]m return nil, fmt.Errorf("error iterating feed rows: %w", rows.Err()) } + for feedIndex := range claimedFeeds { + feedScheduler.enrichAuthenticatedFeed(claimContext, &claimedFeeds[feedIndex]) + } + return claimedFeeds, nil } @@ -310,6 +316,58 @@ func (feedScheduler *Scheduler) lookupFeed(lookupContext context.Context, feedId return feed, nil } +func (feedScheduler *Scheduler) resolveAuthenticationCredentials( + lookupContext context.Context, + feedIdentifier string, +) (subscriberUserIdentifier *string, authenticationType *string, authenticationValue *string, resolveError error) { + credentialQuery := ` + SELECT + s.user_id, + (ds.decrypted_secret::jsonb ->> 'authenticationType'), + (ds.decrypted_secret::jsonb ->> 'authenticationValue') + FROM subscriptions s + JOIN vault.decrypted_secrets ds ON ds.id = s.vault_secret_id + WHERE s.feed_id = $1 + LIMIT 1 + ` + + scanError := feedScheduler.databaseConnectionPool.QueryRow( + lookupContext, credentialQuery, feedIdentifier, + ).Scan(&subscriberUserIdentifier, &authenticationType, &authenticationValue) + + if scanError != nil { + return nil, nil, nil, fmt.Errorf("failed to resolve credentials for feed %s: %w", feedIdentifier, scanError) + } + + return subscriberUserIdentifier, authenticationType, authenticationValue, nil +} + +func (feedScheduler *Scheduler) enrichAuthenticatedFeed( + enrichContext context.Context, + feed *model.Feed, +) { + if feed.Visibility != "authenticated" { + return + } + + subscriberUserIdentifier, authenticationType, authenticationValue, resolveError := + feedScheduler.resolveAuthenticationCredentials(enrichContext, feed.Identifier) + + if resolveError != nil { + feedScheduler.logger.Error( + "failed to resolve authentication credentials", + "feed_identifier", feed.Identifier, + "error", resolveError, + ) + + return + } + + feed.SubscriberUserIdentifier = subscriberUserIdentifier + feed.AuthenticationType = authenticationType + feed.AuthenticationValue = authenticationValue +} + func (feedScheduler *Scheduler) archivePoisonedMessage(archiveContext context.Context, messageIdentifier int64) { _, archiveError := pgmq.Archive(archiveContext, feedScheduler.databaseConnectionPool, "feed_refresh", messageIdentifier) |