summaryrefslogtreecommitdiff
path: root/services/worker/internal/writer/writer.go
diff options
context:
space:
mode:
Diffstat (limited to 'services/worker/internal/writer/writer.go')
-rw-r--r--services/worker/internal/writer/writer.go222
1 files changed, 222 insertions, 0 deletions
diff --git a/services/worker/internal/writer/writer.go b/services/worker/internal/writer/writer.go
new file mode 100644
index 0000000..de81bfa
--- /dev/null
+++ b/services/worker/internal/writer/writer.go
@@ -0,0 +1,222 @@
+package writer
+
+import (
+ "context"
+ "fmt"
+ "github.com/Fuwn/asa-news/internal/model"
+ "github.com/jackc/pgx/v5/pgxpool"
+ "strings"
+ "time"
+)
+
+type Writer struct {
+ databaseConnectionPool *pgxpool.Pool
+}
+
+func NewWriter(databaseConnectionPool *pgxpool.Pool) *Writer {
+ return &Writer{
+ databaseConnectionPool: databaseConnectionPool,
+ }
+}
+
+func (feedWriter *Writer) WriteEntries(writeContext context.Context, feedEntries []model.FeedEntry) (int64, error) {
+ if len(feedEntries) == 0 {
+ return 0, nil
+ }
+
+ valueParameterPlaceholders := make([]string, 0, len(feedEntries))
+ queryArguments := make([]interface{}, 0, len(feedEntries)*15)
+
+ for entryIndex, entry := range feedEntries {
+ parameterOffset := entryIndex * 15
+ valueParameterPlaceholders = append(
+ valueParameterPlaceholders,
+ fmt.Sprintf(
+ "($%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d)",
+ parameterOffset+1, parameterOffset+2, parameterOffset+3,
+ parameterOffset+4, parameterOffset+5, parameterOffset+6,
+ parameterOffset+7, parameterOffset+8, parameterOffset+9,
+ parameterOffset+10, parameterOffset+11, parameterOffset+12,
+ parameterOffset+13, parameterOffset+14, parameterOffset+15,
+ ),
+ )
+ queryArguments = append(
+ queryArguments,
+ entry.FeedIdentifier,
+ entry.OwnerIdentifier,
+ entry.GUID,
+ entry.URL,
+ entry.Title,
+ entry.Author,
+ entry.Summary,
+ entry.ContentHTML,
+ entry.ContentText,
+ entry.ImageURL,
+ entry.PublishedAt,
+ entry.WordCount,
+ entry.EnclosureURL,
+ entry.EnclosureType,
+ entry.EnclosureLength,
+ )
+ }
+
+ isPublicEntry := feedEntries[0].OwnerIdentifier == nil
+
+ var conflictClause string
+
+ if isPublicEntry {
+ conflictClause = "ON CONFLICT (feed_id, guid) WHERE owner_id IS NULL"
+ } else {
+ conflictClause = "ON CONFLICT (feed_id, owner_id, guid) WHERE owner_id IS NOT NULL"
+ }
+
+ upsertQuery := fmt.Sprintf(`
+ INSERT INTO entries (
+ feed_id, owner_id, guid, url, title, author,
+ summary, content_html, content_text, image_url,
+ published_at, word_count,
+ enclosure_url, enclosure_type, enclosure_length
+ )
+ VALUES %s
+ %s
+ DO UPDATE SET
+ url = EXCLUDED.url,
+ title = EXCLUDED.title,
+ author = EXCLUDED.author,
+ summary = CASE
+ WHEN EXISTS (SELECT 1 FROM user_highlights WHERE entry_id = entries.id)
+ THEN entries.summary
+ ELSE EXCLUDED.summary
+ END,
+ content_html = CASE
+ WHEN EXISTS (SELECT 1 FROM user_highlights WHERE entry_id = entries.id)
+ THEN entries.content_html
+ ELSE EXCLUDED.content_html
+ END,
+ content_text = CASE
+ WHEN EXISTS (SELECT 1 FROM user_highlights WHERE entry_id = entries.id)
+ THEN entries.content_text
+ ELSE EXCLUDED.content_text
+ END,
+ image_url = EXCLUDED.image_url,
+ published_at = EXCLUDED.published_at,
+ word_count = EXCLUDED.word_count,
+ enclosure_url = EXCLUDED.enclosure_url,
+ enclosure_type = EXCLUDED.enclosure_type,
+ enclosure_length = EXCLUDED.enclosure_length
+ WHERE
+ entries.title IS DISTINCT FROM EXCLUDED.title
+ OR entries.content_html IS DISTINCT FROM EXCLUDED.content_html
+ OR entries.enclosure_url IS DISTINCT FROM EXCLUDED.enclosure_url
+ `, strings.Join(valueParameterPlaceholders, ", "), conflictClause)
+ commandTag, executeError := feedWriter.databaseConnectionPool.Exec(writeContext, upsertQuery, queryArguments...)
+
+ if executeError != nil {
+ return 0, fmt.Errorf("failed to upsert feed entries: %w", executeError)
+ }
+
+ return commandTag.RowsAffected(), nil
+}
+
+func (feedWriter *Writer) UpdateFeedMetadata(
+ updateContext context.Context,
+ feedIdentifier string,
+ entityTag string,
+ lastModified string,
+ feedTitle string,
+ siteURL string,
+) error {
+ currentTime := time.Now().UTC()
+
+ var titleParameter *string
+ if feedTitle != "" {
+ titleParameter = &feedTitle
+ }
+
+ var siteURLParameter *string
+ if siteURL != "" {
+ siteURLParameter = &siteURL
+ }
+
+ updateQuery := `
+ UPDATE feeds
+ SET
+ last_fetched_at = $1::timestamptz,
+ etag = $2,
+ last_modified = $3,
+ consecutive_failures = 0,
+ last_fetch_error = NULL,
+ last_fetch_error_at = NULL,
+ next_fetch_at = $1::timestamptz + (fetch_interval_seconds * INTERVAL '1 second'),
+ title = COALESCE($5, title),
+ site_url = COALESCE($6, site_url)
+ WHERE id = $4
+ `
+ _, executeError := feedWriter.databaseConnectionPool.Exec(
+ updateContext,
+ updateQuery,
+ currentTime,
+ entityTag,
+ lastModified,
+ feedIdentifier,
+ titleParameter,
+ siteURLParameter,
+ )
+
+ if executeError != nil {
+ return fmt.Errorf("failed to update feed metadata: %w", executeError)
+ }
+
+ return nil
+}
+
+func (feedWriter *Writer) UpdateFeedType(
+ updateContext context.Context,
+ feedIdentifier string,
+ feedType string,
+) error {
+ updateQuery := `UPDATE feeds SET feed_type = $1 WHERE id = $2`
+ _, executeError := feedWriter.databaseConnectionPool.Exec(
+ updateContext,
+ updateQuery,
+ feedType,
+ feedIdentifier,
+ )
+
+ if executeError != nil {
+ return fmt.Errorf("failed to update feed type: %w", executeError)
+ }
+
+ return nil
+}
+
+func (feedWriter *Writer) RecordFeedError(
+ updateContext context.Context,
+ feedIdentifier string,
+ errorMessage string,
+) error {
+ currentTime := time.Now().UTC()
+ updateQuery := `
+ UPDATE feeds
+ SET
+ last_fetched_at = $1::timestamptz,
+ last_fetch_error = $2,
+ last_fetch_error_at = $1::timestamptz,
+ consecutive_failures = consecutive_failures + 1,
+ next_fetch_at = $1::timestamptz + (LEAST(POWER(2, consecutive_failures) * 60, 21600) * INTERVAL '1 second')
+ WHERE id = $3
+ `
+ _, executeError := feedWriter.databaseConnectionPool.Exec(
+ updateContext,
+ updateQuery,
+ currentTime,
+ errorMessage,
+ feedIdentifier,
+ )
+
+ if executeError != nil {
+ return fmt.Errorf("failed to record feed error: %w", executeError)
+ }
+
+ return nil
+}