diff options
Diffstat (limited to 'services/worker/internal/writer/writer.go')
| -rw-r--r-- | services/worker/internal/writer/writer.go | 222 |
1 files changed, 222 insertions, 0 deletions
diff --git a/services/worker/internal/writer/writer.go b/services/worker/internal/writer/writer.go new file mode 100644 index 0000000..de81bfa --- /dev/null +++ b/services/worker/internal/writer/writer.go @@ -0,0 +1,222 @@ +package writer + +import ( + "context" + "fmt" + "github.com/Fuwn/asa-news/internal/model" + "github.com/jackc/pgx/v5/pgxpool" + "strings" + "time" +) + +type Writer struct { + databaseConnectionPool *pgxpool.Pool +} + +func NewWriter(databaseConnectionPool *pgxpool.Pool) *Writer { + return &Writer{ + databaseConnectionPool: databaseConnectionPool, + } +} + +func (feedWriter *Writer) WriteEntries(writeContext context.Context, feedEntries []model.FeedEntry) (int64, error) { + if len(feedEntries) == 0 { + return 0, nil + } + + valueParameterPlaceholders := make([]string, 0, len(feedEntries)) + queryArguments := make([]interface{}, 0, len(feedEntries)*15) + + for entryIndex, entry := range feedEntries { + parameterOffset := entryIndex * 15 + valueParameterPlaceholders = append( + valueParameterPlaceholders, + fmt.Sprintf( + "($%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d)", + parameterOffset+1, parameterOffset+2, parameterOffset+3, + parameterOffset+4, parameterOffset+5, parameterOffset+6, + parameterOffset+7, parameterOffset+8, parameterOffset+9, + parameterOffset+10, parameterOffset+11, parameterOffset+12, + parameterOffset+13, parameterOffset+14, parameterOffset+15, + ), + ) + queryArguments = append( + queryArguments, + entry.FeedIdentifier, + entry.OwnerIdentifier, + entry.GUID, + entry.URL, + entry.Title, + entry.Author, + entry.Summary, + entry.ContentHTML, + entry.ContentText, + entry.ImageURL, + entry.PublishedAt, + entry.WordCount, + entry.EnclosureURL, + entry.EnclosureType, + entry.EnclosureLength, + ) + } + + isPublicEntry := feedEntries[0].OwnerIdentifier == nil + + var conflictClause string + + if isPublicEntry { + conflictClause = "ON CONFLICT (feed_id, guid) WHERE owner_id IS NULL" + } else { + conflictClause = "ON CONFLICT (feed_id, owner_id, guid) WHERE owner_id IS NOT NULL" + } + + upsertQuery := fmt.Sprintf(` + INSERT INTO entries ( + feed_id, owner_id, guid, url, title, author, + summary, content_html, content_text, image_url, + published_at, word_count, + enclosure_url, enclosure_type, enclosure_length + ) + VALUES %s + %s + DO UPDATE SET + url = EXCLUDED.url, + title = EXCLUDED.title, + author = EXCLUDED.author, + summary = CASE + WHEN EXISTS (SELECT 1 FROM user_highlights WHERE entry_id = entries.id) + THEN entries.summary + ELSE EXCLUDED.summary + END, + content_html = CASE + WHEN EXISTS (SELECT 1 FROM user_highlights WHERE entry_id = entries.id) + THEN entries.content_html + ELSE EXCLUDED.content_html + END, + content_text = CASE + WHEN EXISTS (SELECT 1 FROM user_highlights WHERE entry_id = entries.id) + THEN entries.content_text + ELSE EXCLUDED.content_text + END, + image_url = EXCLUDED.image_url, + published_at = EXCLUDED.published_at, + word_count = EXCLUDED.word_count, + enclosure_url = EXCLUDED.enclosure_url, + enclosure_type = EXCLUDED.enclosure_type, + enclosure_length = EXCLUDED.enclosure_length + WHERE + entries.title IS DISTINCT FROM EXCLUDED.title + OR entries.content_html IS DISTINCT FROM EXCLUDED.content_html + OR entries.enclosure_url IS DISTINCT FROM EXCLUDED.enclosure_url + `, strings.Join(valueParameterPlaceholders, ", "), conflictClause) + commandTag, executeError := feedWriter.databaseConnectionPool.Exec(writeContext, upsertQuery, queryArguments...) + + if executeError != nil { + return 0, fmt.Errorf("failed to upsert feed entries: %w", executeError) + } + + return commandTag.RowsAffected(), nil +} + +func (feedWriter *Writer) UpdateFeedMetadata( + updateContext context.Context, + feedIdentifier string, + entityTag string, + lastModified string, + feedTitle string, + siteURL string, +) error { + currentTime := time.Now().UTC() + + var titleParameter *string + if feedTitle != "" { + titleParameter = &feedTitle + } + + var siteURLParameter *string + if siteURL != "" { + siteURLParameter = &siteURL + } + + updateQuery := ` + UPDATE feeds + SET + last_fetched_at = $1::timestamptz, + etag = $2, + last_modified = $3, + consecutive_failures = 0, + last_fetch_error = NULL, + last_fetch_error_at = NULL, + next_fetch_at = $1::timestamptz + (fetch_interval_seconds * INTERVAL '1 second'), + title = COALESCE($5, title), + site_url = COALESCE($6, site_url) + WHERE id = $4 + ` + _, executeError := feedWriter.databaseConnectionPool.Exec( + updateContext, + updateQuery, + currentTime, + entityTag, + lastModified, + feedIdentifier, + titleParameter, + siteURLParameter, + ) + + if executeError != nil { + return fmt.Errorf("failed to update feed metadata: %w", executeError) + } + + return nil +} + +func (feedWriter *Writer) UpdateFeedType( + updateContext context.Context, + feedIdentifier string, + feedType string, +) error { + updateQuery := `UPDATE feeds SET feed_type = $1 WHERE id = $2` + _, executeError := feedWriter.databaseConnectionPool.Exec( + updateContext, + updateQuery, + feedType, + feedIdentifier, + ) + + if executeError != nil { + return fmt.Errorf("failed to update feed type: %w", executeError) + } + + return nil +} + +func (feedWriter *Writer) RecordFeedError( + updateContext context.Context, + feedIdentifier string, + errorMessage string, +) error { + currentTime := time.Now().UTC() + updateQuery := ` + UPDATE feeds + SET + last_fetched_at = $1::timestamptz, + last_fetch_error = $2, + last_fetch_error_at = $1::timestamptz, + consecutive_failures = consecutive_failures + 1, + next_fetch_at = $1::timestamptz + (LEAST(POWER(2, consecutive_failures) * 60, 21600) * INTERVAL '1 second') + WHERE id = $3 + ` + _, executeError := feedWriter.databaseConnectionPool.Exec( + updateContext, + updateQuery, + currentTime, + errorMessage, + feedIdentifier, + ) + + if executeError != nil { + return fmt.Errorf("failed to record feed error: %w", executeError) + } + + return nil +} |