summaryrefslogtreecommitdiff
path: root/services
diff options
context:
space:
mode:
authorFuwn <[email protected]>2026-02-08 01:35:41 -0800
committerFuwn <[email protected]>2026-02-08 01:35:41 -0800
commitb8a9b40f786554e5a49511ce1b2dd2ea7f3db94c (patch)
tree23fcdad5c10a7a269286459b4b666120d92f2af7 /services
parentfix: update worker Dockerfile to Go 1.24 to match go.mod requirement (diff)
downloadasa.news-b8a9b40f786554e5a49511ce1b2dd2ea7f3db94c.tar.xz
asa.news-b8a9b40f786554e5a49511ce1b2dd2ea7f3db94c.zip
feat: implement authenticated feed support across worker and web app
Wire up the full authenticated feeds pipeline: - Worker resolves credentials from Supabase Vault for authenticated feeds - Worker sets owner_id on entries for per-user dedup - query_param auth now parses name=value format - Add-feed dialog shows auth type + credential fields for pro/developer - Subscribe mutation passes credentials to RPC - Sidebar and settings show [auth] indicator for authenticated feeds
Diffstat (limited to 'services')
-rw-r--r--services/worker/internal/fetcher/authentication.go9
-rw-r--r--services/worker/internal/model/feed.go7
-rw-r--r--services/worker/internal/scheduler/refresh.go23
-rw-r--r--services/worker/internal/scheduler/scheduler.go58
4 files changed, 87 insertions, 10 deletions
diff --git a/services/worker/internal/fetcher/authentication.go b/services/worker/internal/fetcher/authentication.go
index ba10196..9ee4539 100644
--- a/services/worker/internal/fetcher/authentication.go
+++ b/services/worker/internal/fetcher/authentication.go
@@ -5,6 +5,7 @@ import (
"fmt"
"net/http"
"net/url"
+ "strings"
)
type AuthenticationConfiguration struct {
@@ -21,6 +22,12 @@ func ApplyAuthentication(request *http.Request, authenticationConfig Authenticat
request.Header.Set("Authorization", "Basic "+encodedCredentials)
case "query_param":
+ parts := strings.SplitN(authenticationConfig.AuthenticationValue, "=", 2)
+
+ if len(parts) != 2 {
+ return fmt.Errorf("query_param credential must be in format 'param_name=value', got: %s", authenticationConfig.AuthenticationValue)
+ }
+
existingURL, parseError := url.Parse(request.URL.String())
if parseError != nil {
@@ -29,7 +36,7 @@ func ApplyAuthentication(request *http.Request, authenticationConfig Authenticat
queryParameters := existingURL.Query()
- queryParameters.Set("key", authenticationConfig.AuthenticationValue)
+ queryParameters.Set(parts[0], parts[1])
existingURL.RawQuery = queryParameters.Encode()
request.URL = existingURL
diff --git a/services/worker/internal/model/feed.go b/services/worker/internal/model/feed.go
index 611c820..a903af0 100644
--- a/services/worker/internal/model/feed.go
+++ b/services/worker/internal/model/feed.go
@@ -18,8 +18,11 @@ type Feed struct {
ConsecutiveFailures int
NextFetchAt time.Time
FetchIntervalSeconds int
- CreatedAt time.Time
- UpdatedAt time.Time
+ CreatedAt time.Time
+ UpdatedAt time.Time
+ SubscriberUserIdentifier *string
+ AuthenticationType *string
+ AuthenticationValue *string
}
type FeedEntry struct {
diff --git a/services/worker/internal/scheduler/refresh.go b/services/worker/internal/scheduler/refresh.go
index 229a20f..d1c5d26 100644
--- a/services/worker/internal/scheduler/refresh.go
+++ b/services/worker/internal/scheduler/refresh.go
@@ -27,16 +27,25 @@ func ProcessRefreshRequest(
var ownerIdentifier *string
+ authenticationConfig := fetcher.AuthenticationConfiguration{}
+
if feed.Visibility == "authenticated" {
- logger.Warn(
- "authenticated feed refresh not yet implemented",
- "feed_identifier", feed.Identifier,
- )
+ if feed.AuthenticationType == nil || feed.AuthenticationValue == nil {
+ logger.Warn(
+ "authenticated feed missing credentials, skipping",
+ "feed_identifier", feed.Identifier,
+ )
- return
- }
+ return
+ }
- authenticationConfig := fetcher.AuthenticationConfiguration{}
+ authenticationConfig = fetcher.AuthenticationConfiguration{
+ AuthenticationType: *feed.AuthenticationType,
+ AuthenticationValue: *feed.AuthenticationValue,
+ }
+
+ ownerIdentifier = feed.SubscriberUserIdentifier
+ }
entityTag := ""
if feed.EntityTag != nil {
diff --git a/services/worker/internal/scheduler/scheduler.go b/services/worker/internal/scheduler/scheduler.go
index 19023e1..f6ebacd 100644
--- a/services/worker/internal/scheduler/scheduler.go
+++ b/services/worker/internal/scheduler/scheduler.go
@@ -170,6 +170,8 @@ func (feedScheduler *Scheduler) executeQueueCycle(cycleContext context.Context)
return
}
+ feedScheduler.enrichAuthenticatedFeed(cycleContext, &feed)
+
capturedMessageIdentifier := queueMessage.MsgID
submitted := feedScheduler.workerPool.Submit(cycleContext, func(workContext context.Context) {
@@ -267,6 +269,10 @@ func (feedScheduler *Scheduler) claimDueFeeds(claimContext context.Context) ([]m
return nil, fmt.Errorf("error iterating feed rows: %w", rows.Err())
}
+ for feedIndex := range claimedFeeds {
+ feedScheduler.enrichAuthenticatedFeed(claimContext, &claimedFeeds[feedIndex])
+ }
+
return claimedFeeds, nil
}
@@ -310,6 +316,58 @@ func (feedScheduler *Scheduler) lookupFeed(lookupContext context.Context, feedId
return feed, nil
}
+func (feedScheduler *Scheduler) resolveAuthenticationCredentials(
+ lookupContext context.Context,
+ feedIdentifier string,
+) (subscriberUserIdentifier *string, authenticationType *string, authenticationValue *string, resolveError error) {
+ credentialQuery := `
+ SELECT
+ s.user_id,
+ (ds.decrypted_secret::jsonb ->> 'authenticationType'),
+ (ds.decrypted_secret::jsonb ->> 'authenticationValue')
+ FROM subscriptions s
+ JOIN vault.decrypted_secrets ds ON ds.id = s.vault_secret_id
+ WHERE s.feed_id = $1
+ LIMIT 1
+ `
+
+ scanError := feedScheduler.databaseConnectionPool.QueryRow(
+ lookupContext, credentialQuery, feedIdentifier,
+ ).Scan(&subscriberUserIdentifier, &authenticationType, &authenticationValue)
+
+ if scanError != nil {
+ return nil, nil, nil, fmt.Errorf("failed to resolve credentials for feed %s: %w", feedIdentifier, scanError)
+ }
+
+ return subscriberUserIdentifier, authenticationType, authenticationValue, nil
+}
+
+func (feedScheduler *Scheduler) enrichAuthenticatedFeed(
+ enrichContext context.Context,
+ feed *model.Feed,
+) {
+ if feed.Visibility != "authenticated" {
+ return
+ }
+
+ subscriberUserIdentifier, authenticationType, authenticationValue, resolveError :=
+ feedScheduler.resolveAuthenticationCredentials(enrichContext, feed.Identifier)
+
+ if resolveError != nil {
+ feedScheduler.logger.Error(
+ "failed to resolve authentication credentials",
+ "feed_identifier", feed.Identifier,
+ "error", resolveError,
+ )
+
+ return
+ }
+
+ feed.SubscriberUserIdentifier = subscriberUserIdentifier
+ feed.AuthenticationType = authenticationType
+ feed.AuthenticationValue = authenticationValue
+}
+
func (feedScheduler *Scheduler) archivePoisonedMessage(archiveContext context.Context, messageIdentifier int64) {
_, archiveError := pgmq.Archive(archiveContext, feedScheduler.databaseConnectionPool, "feed_refresh", messageIdentifier)