aboutsummaryrefslogtreecommitdiff
path: root/internal/ingest
diff options
context:
space:
mode:
Diffstat (limited to 'internal/ingest')
-rw-r--r--internal/ingest/client.go83
-rw-r--r--internal/ingest/client_retry_test.go12
-rw-r--r--internal/ingest/client_test.go6
-rw-r--r--internal/ingest/service.go269
-rw-r--r--internal/ingest/service_integration_test.go52
-rw-r--r--internal/ingest/service_order_test.go8
6 files changed, 422 insertions, 8 deletions
diff --git a/internal/ingest/client.go b/internal/ingest/client.go
index d25b73f..12d8bd6 100644
--- a/internal/ingest/client.go
+++ b/internal/ingest/client.go
@@ -17,7 +17,6 @@ import (
"strconv"
"strings"
"time"
-
"github.com/Fuwn/plutia/internal/types"
)
@@ -39,17 +38,21 @@ func NewClient(source string, opts ...ClientOptions) *Client {
BaseDelay: 250 * time.Millisecond,
MaxDelay: 10 * time.Second,
}
+
if len(opts) > 0 {
if opts[0].MaxAttempts > 0 {
cfg.MaxAttempts = opts[0].MaxAttempts
}
+
if opts[0].BaseDelay > 0 {
cfg.BaseDelay = opts[0].BaseDelay
}
+
if opts[0].MaxDelay > 0 {
cfg.MaxDelay = opts[0].MaxDelay
}
}
+
transport := &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{Timeout: 10 * time.Second, KeepAlive: 30 * time.Second}).DialContext,
@@ -60,6 +63,7 @@ func NewClient(source string, opts ...ClientOptions) *Client {
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
}
+
return &Client{
source: strings.TrimRight(source, "/"),
opts: cfg,
@@ -80,45 +84,63 @@ func (c *Client) FetchExportLimited(ctx context.Context, after uint64, limit uin
}
u, err := url.Parse(c.source)
+
if err != nil {
return nil, fmt.Errorf("parse plc source: %w", err)
}
+
u.Path = strings.TrimRight(u.Path, "/") + "/export"
q := u.Query()
+
q.Set("after", fmt.Sprintf("%d", after))
- u.RawQuery = q.Encode()
+ u.RawQuery = q.Encode()
req, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil)
+
if err != nil {
return nil, fmt.Errorf("new request: %w", err)
}
+
maxAttempts := c.opts.MaxAttempts
+
if maxAttempts < 1 {
maxAttempts = 1
}
+
var lastErr error
+
for attempt := 1; attempt <= maxAttempts; attempt++ {
records, retryAfter, retryable, err := c.fetchExportOnce(req, limit)
+
if err == nil {
return records, nil
}
+
lastErr = err
+
if !retryable || attempt == maxAttempts || ctx.Err() != nil {
break
}
+
delay := retryAfter
+
if delay <= 0 {
delay = c.backoffDelay(attempt)
}
+
log.Printf("retrying plc export fetch attempt=%d after_seq=%d delay=%s reason=%v", attempt, after, delay, err)
+
timer := time.NewTimer(delay)
+
select {
case <-ctx.Done():
timer.Stop()
+
return nil, ctx.Err()
case <-timer.C:
}
}
+
return nil, lastErr
}
@@ -133,11 +155,15 @@ func (e httpStatusError) Error() string {
func (c *Client) fetchExportOnce(req *http.Request, limit uint64) ([]types.ExportRecord, time.Duration, bool, error) {
reqClone := req.Clone(req.Context())
+
reqClone.Header.Set("Accept-Encoding", "gzip")
+
resp, err := c.http.Do(reqClone)
+
if err != nil {
return nil, 0, isTransientNetworkErr(err), fmt.Errorf("fetch export: %w", err)
}
+
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
@@ -145,12 +171,16 @@ func (c *Client) fetchExportOnce(req *http.Request, limit uint64) ([]types.Expor
body := strings.TrimSpace(string(b))
retryDelay := parseRetryAfter(resp.Header.Get("Retry-After"))
err := httpStatusError{StatusCode: resp.StatusCode, Body: body}
+
return nil, retryDelay, shouldRetryStatus(resp.StatusCode), err
}
+
records, err := decodeExportBody(resp.Body, limit)
+
if err != nil {
return nil, 0, false, err
}
+
return records, 0, false, nil
}
@@ -158,23 +188,31 @@ func (c *Client) backoffDelay(attempt int) time.Duration {
if attempt < 1 {
attempt = 1
}
+
delay := c.opts.BaseDelay
+
if delay <= 0 {
delay = 250 * time.Millisecond
}
+
maxDelay := c.opts.MaxDelay
+
if maxDelay <= 0 {
maxDelay = 10 * time.Second
}
+
for i := 1; i < attempt; i++ {
delay *= 2
+
if delay >= maxDelay {
return maxDelay
}
}
+
if delay > maxDelay {
return maxDelay
}
+
return delay
}
@@ -189,18 +227,23 @@ func shouldRetryStatus(status int) bool {
func parseRetryAfter(v string) time.Duration {
v = strings.TrimSpace(v)
+
if v == "" {
return 0
}
+
if secs, err := strconv.Atoi(v); err == nil && secs > 0 {
return time.Duration(secs) * time.Second
}
+
if ts, err := http.ParseTime(v); err == nil {
delay := time.Until(ts)
+
if delay > 0 {
return delay
}
}
+
return 0
}
@@ -208,63 +251,85 @@ func isTransientNetworkErr(err error) bool {
if err == nil {
return false
}
+
var netErr net.Error
+
if errors.As(err, &netErr) {
return true
}
+
return errors.Is(err, io.ErrUnexpectedEOF) || errors.Is(err, io.EOF)
}
func decodeExportBody(r io.Reader, limit uint64) ([]types.ExportRecord, error) {
br := bufio.NewReader(r)
first, err := peekFirstNonSpace(br)
+
if err != nil {
if err == io.EOF {
return nil, nil
}
+
return nil, err
}
if first == '[' {
b, err := io.ReadAll(br)
+
if err != nil {
return nil, fmt.Errorf("read export body: %w", err)
}
+
trimmed := bytes.TrimSpace(b)
+
var records []types.ExportRecord
+
if err := json.Unmarshal(trimmed, &records); err != nil {
return nil, fmt.Errorf("decode export json array: %w", err)
}
+
if limit > 0 && uint64(len(records)) > limit {
records = records[:limit]
}
+
return records, nil
}
+
out := make([]types.ExportRecord, 0, 1024)
+
for {
line, err := br.ReadBytes('\n')
isEOF := errors.Is(err, io.EOF)
+
if err != nil && !isEOF {
return nil, fmt.Errorf("read ndjson line: %w", err)
}
+
if limit > 0 && uint64(len(out)) >= limit {
return out, nil
}
+
trimmed := bytes.TrimSpace(line)
+
if len(trimmed) > 0 {
var rec types.ExportRecord
+
if err := json.Unmarshal(trimmed, &rec); err != nil {
if isEOF && isTrailingNDJSONPartial(err) {
return out, nil
}
+
return nil, fmt.Errorf("decode ndjson line: %w", err)
}
+
out = append(out, rec)
}
+
if isEOF {
break
}
}
+
return out, nil
}
@@ -272,19 +337,23 @@ func isTrailingNDJSONPartial(err error) bool {
if !errors.Is(err, io.ErrUnexpectedEOF) && !strings.Contains(err.Error(), "unexpected end of JSON input") {
return false
}
+
return true
}
func peekFirstNonSpace(br *bufio.Reader) (byte, error) {
for {
b, err := br.ReadByte()
+
if err != nil {
return 0, err
}
+
if !isSpace(b) {
if err := br.UnreadByte(); err != nil {
return 0, fmt.Errorf("unread byte: %w", err)
}
+
return b, nil
}
}
@@ -301,27 +370,37 @@ func isSpace(b byte) bool {
func (c *Client) fetchFromFile(after uint64, limit uint64) ([]types.ExportRecord, error) {
path := c.source
+
if strings.HasPrefix(path, "file://") {
path = strings.TrimPrefix(path, "file://")
}
+
path = filepath.Clean(path)
b, err := os.ReadFile(path)
+
if err != nil {
return nil, fmt.Errorf("read source file: %w", err)
}
+
recs, err := decodeExportBody(bytes.NewReader(b), 0)
+
if err != nil {
return nil, err
}
+
out := make([]types.ExportRecord, 0, len(recs))
+
for _, r := range recs {
if r.Seq <= after {
continue
}
+
out = append(out, r)
+
if limit > 0 && uint64(len(out)) >= limit {
break
}
}
+
return out, nil
}
diff --git a/internal/ingest/client_retry_test.go b/internal/ingest/client_retry_test.go
index a23fceb..b019ac6 100644
--- a/internal/ingest/client_retry_test.go
+++ b/internal/ingest/client_retry_test.go
@@ -12,15 +12,20 @@ import (
func TestFetchExportLimitedRetries429ThenSucceeds(t *testing.T) {
var attempts atomic.Int32
+
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
n := attempts.Add(1)
+
if n <= 2 {
w.Header().Set("Retry-After", "0")
http.Error(w, "rate limited", http.StatusTooManyRequests)
+
return
}
+
_, _ = fmt.Fprintln(w, `{"seq":1,"did":"did:plc:alice","cid":"cid1","operation":{"x":1}}`)
}))
+
defer ts.Close()
client := NewClient(ts.URL, ClientOptions{
@@ -29,12 +34,15 @@ func TestFetchExportLimitedRetries429ThenSucceeds(t *testing.T) {
MaxDelay: 2 * time.Millisecond,
})
records, err := client.FetchExportLimited(context.Background(), 0, 0)
+
if err != nil {
t.Fatalf("fetch export: %v", err)
}
+
if len(records) != 1 {
t.Fatalf("record count mismatch: got %d want 1", len(records))
}
+
if got := attempts.Load(); got != 3 {
t.Fatalf("attempt count mismatch: got %d want 3", got)
}
@@ -42,10 +50,12 @@ func TestFetchExportLimitedRetries429ThenSucceeds(t *testing.T) {
func TestFetchExportLimitedDoesNotRetry400(t *testing.T) {
var attempts atomic.Int32
+
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
attempts.Add(1)
http.Error(w, "bad request", http.StatusBadRequest)
}))
+
defer ts.Close()
client := NewClient(ts.URL, ClientOptions{
@@ -54,9 +64,11 @@ func TestFetchExportLimitedDoesNotRetry400(t *testing.T) {
MaxDelay: 2 * time.Millisecond,
})
_, err := client.FetchExportLimited(context.Background(), 0, 0)
+
if err == nil {
t.Fatalf("expected error for 400 response")
}
+
if got := attempts.Load(); got != 1 {
t.Fatalf("unexpected retries on 400: got attempts=%d want 1", got)
}
diff --git a/internal/ingest/client_test.go b/internal/ingest/client_test.go
index ff80225..50595ec 100644
--- a/internal/ingest/client_test.go
+++ b/internal/ingest/client_test.go
@@ -11,14 +11,16 @@ func TestDecodeExportBody_IgnoresTrailingPartialNDJSONLine(t *testing.T) {
`{"seq":2,"did":"did:plc:bob","cid":"cid2","operation":{"x":2}}`,
`{"seq":3,"did":"did:plc:carol","cid":"cid3","operation":{"x":3}`,
}, "\n")
-
records, err := decodeExportBody(strings.NewReader(body), 0)
+
if err != nil {
t.Fatalf("decode export body: %v", err)
}
+
if len(records) != 2 {
t.Fatalf("record count mismatch: got %d want 2", len(records))
}
+
if records[0].Seq != 1 || records[1].Seq != 2 {
t.Fatalf("unexpected sequences: got [%d %d], want [1 2]", records[0].Seq, records[1].Seq)
}
@@ -30,8 +32,8 @@ func TestDecodeExportBody_FailsOnMalformedNonTrailingNDJSONLine(t *testing.T) {
`{"seq":2,"did":"did:plc:bob","cid":"cid2","operation":{"x":2}`,
`{"seq":3,"did":"did:plc:carol","cid":"cid3","operation":{"x":3}}`,
}, "\n")
-
_, err := decodeExportBody(strings.NewReader(body), 0)
+
if err == nil {
t.Fatalf("expected malformed middle line to fail")
}
diff --git a/internal/ingest/service.go b/internal/ingest/service.go
index f6b2433..1779db9 100644
--- a/internal/ingest/service.go
+++ b/internal/ingest/service.go
@@ -15,7 +15,6 @@ import (
"sync"
"sync/atomic"
"time"
-
"github.com/Fuwn/plutia/internal/checkpoint"
"github.com/Fuwn/plutia/internal/config"
"github.com/Fuwn/plutia/internal/state"
@@ -73,18 +72,23 @@ func NewService(cfg config.Config, store storage.Store, client *Client, blockLog
blockLog: blockLog,
startedAt: time.Now(),
}
+
if didCount, err := countStates(store); err == nil {
atomic.StoreUint64(&s.stats.DIDCount, didCount)
}
+
if cp, ok, err := store.GetLatestCheckpoint(); err == nil && ok {
atomic.StoreUint64(&s.stats.CheckpointSeq, cp.Sequence)
}
+
if cfg.Mode == config.ModeMirror && blockLog != nil {
s.appender = newBlockAppender(blockLog, 1024)
+
if _, err := blockLog.ValidateAndRecover(store); err != nil {
s.MarkCorrupted(err)
}
}
+
return s
}
@@ -95,9 +99,11 @@ func (s *Service) SetMaxOps(max uint64) {
func (s *Service) Replay(ctx context.Context) error {
for {
changed, err := s.RunOnce(ctx)
+
if err != nil {
return err
}
+
if !changed {
return nil
}
@@ -106,11 +112,14 @@ func (s *Service) Replay(ctx context.Context) error {
func (s *Service) Poll(ctx context.Context) error {
ticker := time.NewTicker(s.cfg.PollInterval)
+
defer ticker.Stop()
+
for {
if _, err := s.RunOnce(ctx); err != nil {
atomic.AddUint64(&s.stats.Errors, 1)
}
+
select {
case <-ctx.Done():
return ctx.Err()
@@ -123,42 +132,59 @@ func (s *Service) RunOnce(ctx context.Context) (bool, error) {
if err := s.CorruptionError(); err != nil {
return false, err
}
+
if s.maxOps > 0 && s.runOps >= s.maxOps {
return false, nil
}
+
lastSeq, err := s.store.GetGlobalSeq()
+
if err != nil {
return false, fmt.Errorf("load global sequence: %w", err)
}
+
limit := uint64(0)
+
if s.maxOps > 0 {
remaining := s.maxOps - s.runOps
+
if remaining == 0 {
return false, nil
}
+
limit = remaining
}
+
records, err := s.client.FetchExportLimited(ctx, lastSeq, limit)
+
if err != nil {
return false, err
}
+
if len(records) == 0 {
atomic.StoreUint64(&s.stats.LagOps, 0)
+
return false, nil
}
+
latestSourceSeq := records[len(records)-1].Seq
committed, err := s.processRecords(ctx, records)
s.runOps += committed
+
if err != nil {
atomic.AddUint64(&s.stats.Errors, 1)
+
return committed > 0, err
}
+
lastCommitted := atomic.LoadUint64(&s.stats.LastSeq)
+
if latestSourceSeq > lastCommitted {
atomic.StoreUint64(&s.stats.LagOps, latestSourceSeq-lastCommitted)
} else {
atomic.StoreUint64(&s.stats.LagOps, 0)
}
+
return true, nil
}
@@ -177,50 +203,72 @@ type verifyResult struct {
func (s *Service) processRecords(ctx context.Context, records []types.ExportRecord) (uint64, error) {
verified, err := s.verifyRecords(ctx, records)
+
if err != nil {
return 0, err
}
+
return s.commitVerified(ctx, verified)
}
func (s *Service) verifyRecords(ctx context.Context, records []types.ExportRecord) ([]verifyResult, error) {
_ = ctx
workers := s.cfg.VerifyWorkers
+
if workers < 1 {
workers = 1
}
+
queues := make([]chan verifyTask, workers)
results := make(chan verifyResult, len(records))
+
var wg sync.WaitGroup
for i := 0; i < workers; i++ {
queues[i] = make(chan verifyTask, 64)
+
wg.Add(1)
+
go func(queue <-chan verifyTask) {
defer wg.Done()
+
cache := map[string]*types.StateV1{}
+
for task := range queue {
op, err := types.ParseOperation(task.rec)
+
if err != nil {
results <- verifyResult{index: task.index, err: err}
+
continue
}
+
existing, err := s.loadExistingState(cache, op.DID)
+
if err != nil {
results <- verifyResult{index: task.index, err: err}
+
continue
}
+
if err := s.verifier.VerifyOperation(op, existing); err != nil {
atomic.AddUint64(&s.stats.VerifyFailures, 1)
+
results <- verifyResult{index: task.index, err: err}
+
continue
}
+
next, err := state.ComputeNextState(op, existing)
+
if err != nil {
results <- verifyResult{index: task.index, err: err}
+
continue
}
+
cache[op.DID] = cloneState(&next)
+
results <- verifyResult{index: task.index, op: op, state: next, newDID: existing == nil}
}
}(queues[i])
@@ -228,13 +276,17 @@ func (s *Service) verifyRecords(ctx context.Context, records []types.ExportRecor
for idx, rec := range records {
worker := didWorker(rec.DID, workers)
+
queues[worker] <- verifyTask{index: idx, rec: rec}
}
+
for _, q := range queues {
close(q)
}
+
wg.Wait()
close(results)
+
return collectVerifiedInOrder(len(records), results)
}
@@ -242,15 +294,21 @@ func (s *Service) loadExistingState(cache map[string]*types.StateV1, did string)
if existing, ok := cache[did]; ok {
return cloneState(existing), nil
}
+
stateVal, ok, err := s.store.GetState(did)
+
if err != nil {
return nil, err
}
+
if !ok {
cache[did] = nil
+
return nil, nil
}
+
cache[did] = cloneState(&stateVal)
+
return cloneState(&stateVal), nil
}
@@ -259,37 +317,48 @@ func collectVerifiedInOrder(total int, results <-chan verifyResult) ([]verifyRes
seen := make([]bool, total)
firstErr := error(nil)
received := 0
+
for r := range results {
received++
+
if r.index < 0 || r.index >= total {
if firstErr == nil {
firstErr = fmt.Errorf("verify worker returned out-of-range index %d", r.index)
}
+
continue
}
+
if seen[r.index] {
if firstErr == nil {
firstErr = fmt.Errorf("duplicate verify result for index %d", r.index)
}
+
continue
}
+
seen[r.index] = true
ordered[r.index] = r
+
if r.err != nil && firstErr == nil {
firstErr = r.err
}
}
+
if received != total && firstErr == nil {
firstErr = fmt.Errorf("verify result count mismatch: got %d want %d", received, total)
}
+
if firstErr != nil {
return nil, firstErr
}
+
for i := range seen {
if !seen[i] {
return nil, fmt.Errorf("missing verify result index %d", i)
}
}
+
return ordered, nil
}
@@ -297,8 +366,10 @@ func didWorker(did string, workers int) int {
if workers <= 1 {
return 0
}
+
hasher := fnv.New32a()
_, _ = hasher.Write([]byte(did))
+
return int(hasher.Sum32() % uint32(workers))
}
@@ -306,50 +377,64 @@ func cloneState(in *types.StateV1) *types.StateV1 {
if in == nil {
return nil
}
+
out := *in
out.DIDDocument = append([]byte(nil), in.DIDDocument...)
out.RotationKeys = append([]string(nil), in.RotationKeys...)
+
return &out
}
func (s *Service) commitVerified(ctx context.Context, verified []verifyResult) (uint64, error) {
_ = ctx
batchSize := s.cfg.CommitBatchSize
+
if batchSize < 1 {
batchSize = 1
}
+
pendingOps := make([]storage.OperationMutation, 0, batchSize)
pendingSeqs := make([]uint64, 0, batchSize)
pendingBlockHashes := map[uint64]string{}
pendingNewDIDs := map[string]struct{}{}
+
var committed uint64
commit := func() error {
if len(pendingOps) == 0 {
return nil
}
+
if s.cfg.Mode == config.ModeMirror {
flush, err := s.appender.Flush()
+
if err != nil {
return err
}
+
if flush != nil {
pendingBlockHashes[flush.BlockID] = flush.Hash
}
}
+
blockEntries := make([]storage.BlockHashEntry, 0, len(pendingBlockHashes))
+
for id, hash := range pendingBlockHashes {
blockEntries = append(blockEntries, storage.BlockHashEntry{BlockID: id, Hash: hash})
}
+
sort.Slice(blockEntries, func(i, j int) bool { return blockEntries[i].BlockID < blockEntries[j].BlockID })
if err := s.store.ApplyOperationsBatch(pendingOps, blockEntries); err != nil {
return err
}
+
lastSeq := pendingSeqs[len(pendingSeqs)-1]
+
atomic.StoreUint64(&s.stats.LastSeq, lastSeq)
atomic.AddUint64(&s.stats.IngestedOps, uint64(len(pendingOps)))
atomic.AddUint64(&s.stats.DIDCount, uint64(len(pendingNewDIDs)))
+
committed += uint64(len(pendingOps))
if s.checkpoints != nil {
@@ -364,59 +449,78 @@ func (s *Service) commitVerified(ctx context.Context, verified []verifyResult) (
pendingOps = pendingOps[:0]
pendingSeqs = pendingSeqs[:0]
+
clear(pendingBlockHashes)
clear(pendingNewDIDs)
+
return nil
}
for _, v := range verified {
var ref *types.BlockRefV1
+
if s.cfg.Mode == config.ModeMirror {
if s.appender == nil {
return committed, fmt.Errorf("mirror mode requires block appender")
}
+
result, err := s.appender.Append(v.op)
+
if err != nil {
return committed, err
}
+
if result.Flush != nil {
pendingBlockHashes[result.Flush.BlockID] = result.Flush.Hash
}
+
result.Ref.Received = time.Now().UTC().Format(time.RFC3339)
ref = &result.Ref
}
+
pendingOps = append(pendingOps, storage.OperationMutation{State: v.state, Ref: ref})
pendingSeqs = append(pendingSeqs, v.op.Sequence)
+
if v.newDID {
pendingNewDIDs[v.op.DID] = struct{}{}
}
+
if len(pendingOps) >= batchSize {
if err := commit(); err != nil {
return committed, err
}
}
}
+
if err := commit(); err != nil {
return committed, err
}
+
return committed, nil
}
func (s *Service) createCheckpoint(sequence uint64) error {
blocks, err := s.store.ListBlockHashes()
+
if err != nil {
return fmt.Errorf("list blocks for checkpoint: %w", err)
}
+
done := make(chan struct{})
usageCh := make(chan checkpointProcessUsage, 1)
+
go sampleCheckpointProcessUsage(done, usageCh)
cp, metrics, err := s.checkpoints.BuildAndStoreFromStoreWithMetrics(sequence, blocks)
+
close(done)
+
usage := <-usageCh
+
if err != nil {
return fmt.Errorf("build checkpoint: %w", err)
}
+
log.Printf(
"checkpoint_metrics seq=%d did_count=%d merkle_compute_ms=%d total_checkpoint_ms=%d cpu_pct=%.2f rss_peak_mb=%.2f completed_unix_ms=%d",
cp.Sequence,
@@ -428,41 +532,54 @@ func (s *Service) createCheckpoint(sequence uint64) error {
time.Now().UnixMilli(),
)
atomic.StoreUint64(&s.stats.CheckpointSeq, cp.Sequence)
+
if s.metricsSink != nil {
s.metricsSink.ObserveCheckpoint(metrics.Total, cp.Sequence)
}
+
return nil
}
func (s *Service) Snapshot(ctx context.Context) (types.CheckpointV1, error) {
if s.appender != nil {
flush, err := s.appender.Flush()
+
if err != nil {
return types.CheckpointV1{}, err
}
+
if flush != nil {
if err := s.store.ApplyOperationsBatch(nil, []storage.BlockHashEntry{{BlockID: flush.BlockID, Hash: flush.Hash}}); err != nil {
return types.CheckpointV1{}, err
}
}
}
+
_ = ctx
seq, err := s.store.GetGlobalSeq()
+
if err != nil {
return types.CheckpointV1{}, err
}
+
blocks, err := s.store.ListBlockHashes()
+
if err != nil {
return types.CheckpointV1{}, err
}
+
cp, metrics, err := s.checkpoints.BuildAndStoreFromStoreWithMetrics(seq, blocks)
+
if err != nil {
return types.CheckpointV1{}, err
}
+
atomic.StoreUint64(&s.stats.CheckpointSeq, cp.Sequence)
+
if s.metricsSink != nil {
s.metricsSink.ObserveCheckpoint(metrics.Total, cp.Sequence)
}
+
return cp, nil
}
@@ -474,31 +591,44 @@ type checkpointProcessUsage struct {
func sampleCheckpointProcessUsage(done <-chan struct{}, out chan<- checkpointProcessUsage) {
pid := os.Getpid()
ticker := time.NewTicker(25 * time.Millisecond)
+
defer ticker.Stop()
+
var cpuSum float64
var samples int
var rssPeak int64
+
sample := func() {
cpu, rss, err := processMetrics(pid)
+
if err != nil {
return
}
+
cpuSum += cpu
+
samples++
+
if rss > rssPeak {
rssPeak = rss
}
}
+
sample()
+
for {
select {
case <-done:
sample()
+
avg := 0.0
+
if samples > 0 {
avg = cpuSum / float64(samples)
}
+
out <- checkpointProcessUsage{CPUPercentAvg: avg, RSSPeakKB: rssPeak}
+
return
case <-ticker.C:
sample()
@@ -508,15 +638,20 @@ func sampleCheckpointProcessUsage(done <-chan struct{}, out chan<- checkpointPro
func processMetrics(pid int) (cpuPct float64, rssKB int64, err error) {
out, err := exec.Command("ps", "-p", strconv.Itoa(pid), "-o", "pcpu=,rss=").Output()
+
if err != nil {
return 0, 0, err
}
+
fields := strings.Fields(string(out))
+
if len(fields) < 2 {
return 0, 0, fmt.Errorf("unexpected ps output: %q", strings.TrimSpace(string(out)))
}
+
cpuPct, _ = strconv.ParseFloat(fields[0], 64)
rssKB, _ = strconv.ParseInt(fields[1], 10, 64)
+
return cpuPct, rssKB, nil
}
@@ -524,51 +659,69 @@ func (s *Service) VerifyDID(ctx context.Context, did string) error {
if err := s.CorruptionError(); err != nil {
return err
}
+
if s.cfg.Mode != config.ModeMirror {
_, ok, err := s.store.GetState(did)
+
if err != nil {
return err
}
+
if !ok {
return fmt.Errorf("did not found")
}
+
return nil
}
+
seqs, err := s.store.ListDIDSequences(did)
+
if err != nil {
return err
}
+
if len(seqs) == 0 {
return fmt.Errorf("no operations for did %s", did)
}
var previous *types.StateV1
+
for _, seq := range seqs {
if err := ctx.Err(); err != nil {
return err
}
+
ref, ok, err := s.store.GetOpSeqRef(seq)
+
if err != nil {
return err
}
+
if !ok {
return fmt.Errorf("missing op reference for seq %d", seq)
}
+
payload, err := s.blockLog.ReadRecord(ref)
+
if err != nil {
return err
}
+
rec := types.ExportRecord{Seq: seq, DID: did, CID: ref.CID, Operation: payload}
op, err := types.ParseOperation(rec)
+
if err != nil {
return err
}
+
if err := s.verifier.VerifyOperation(op, previous); err != nil {
return fmt.Errorf("verify seq %d failed: %w", seq, err)
}
+
s := types.StateV1{DID: did, ChainTipHash: op.CID, LatestOpSeq: seq}
previous = &s
}
+
return nil
}
@@ -576,51 +729,69 @@ func (s *Service) RecomputeTipAtOrBefore(ctx context.Context, did string, sequen
if err := s.CorruptionError(); err != nil {
return "", nil, err
}
+
if s.cfg.Mode != config.ModeMirror {
return "", nil, errors.New("historical proof requires mirror mode")
}
+
seqs, err := s.store.ListDIDSequences(did)
+
if err != nil {
return "", nil, err
}
+
if len(seqs) == 0 {
return "", nil, fmt.Errorf("no operations for did %s", did)
}
+
filtered := make([]uint64, 0, len(seqs))
+
for _, seq := range seqs {
if seq <= sequence {
filtered = append(filtered, seq)
}
}
+
if len(filtered) == 0 {
return "", nil, fmt.Errorf("no operations for did %s at checkpoint %d", did, sequence)
}
+
sort.Slice(filtered, func(i, j int) bool { return filtered[i] < filtered[j] })
var previous *types.StateV1
var tip string
+
for _, seq := range filtered {
if err := ctx.Err(); err != nil {
return "", nil, err
}
+
ref, ok, err := s.store.GetOpSeqRef(seq)
+
if err != nil {
return "", nil, err
}
+
if !ok {
return "", nil, fmt.Errorf("missing op reference for seq %d", seq)
}
+
payload, err := s.blockLog.ReadRecord(ref)
+
if err != nil {
return "", nil, err
}
+
op, err := types.ParseOperation(types.ExportRecord{Seq: seq, DID: did, CID: ref.CID, Operation: payload})
+
if err != nil {
return "", nil, err
}
+
if err := s.verifier.VerifyOperation(op, previous); err != nil {
return "", nil, fmt.Errorf("verify seq %d failed: %w", seq, err)
}
+
tip = op.CID
next := types.StateV1{
DID: did,
@@ -628,12 +799,15 @@ func (s *Service) RecomputeTipAtOrBefore(ctx context.Context, did string, sequen
LatestOpSeq: seq,
}
prior := []string(nil)
+
if previous != nil {
prior = append(prior, previous.RotationKeys...)
}
+
next.RotationKeys = extractRotationKeysFromPayload(op.Payload, prior)
previous = &next
}
+
return tip, filtered, nil
}
@@ -641,32 +815,44 @@ func (s *Service) LoadDIDLog(ctx context.Context, did string) ([]types.ExportRec
if err := s.CorruptionError(); err != nil {
return nil, err
}
+
if s.cfg.Mode != config.ModeMirror || s.blockLog == nil {
return nil, ErrHistoryNotStored
}
+
seqs, err := s.store.ListDIDSequences(did)
+
if err != nil {
return nil, err
}
+
if len(seqs) == 0 {
return nil, ErrDIDNotFound
}
+
out := make([]types.ExportRecord, 0, len(seqs))
+
for _, seq := range seqs {
if err := ctx.Err(); err != nil {
return nil, err
}
+
ref, ok, err := s.store.GetOpSeqRef(seq)
+
if err != nil {
return nil, err
}
+
if !ok {
return nil, fmt.Errorf("missing op reference for seq %d", seq)
}
+
payload, err := s.blockLog.ReadRecord(ref)
+
if err != nil {
return nil, err
}
+
out = append(out, types.ExportRecord{
Seq: seq,
DID: did,
@@ -676,6 +862,7 @@ func (s *Service) LoadDIDLog(ctx context.Context, did string) ([]types.ExportRec
Operation: json.RawMessage(payload),
})
}
+
return out, nil
}
@@ -683,31 +870,43 @@ func (s *Service) LoadLatestDIDOperation(ctx context.Context, did string) (types
if err := s.CorruptionError(); err != nil {
return types.ExportRecord{}, err
}
+
if s.cfg.Mode != config.ModeMirror || s.blockLog == nil {
return types.ExportRecord{}, ErrHistoryNotStored
}
+
seqs, err := s.store.ListDIDSequences(did)
+
if err != nil {
return types.ExportRecord{}, err
}
+
if len(seqs) == 0 {
return types.ExportRecord{}, ErrDIDNotFound
}
+
lastSeq := seqs[len(seqs)-1]
+
if err := ctx.Err(); err != nil {
return types.ExportRecord{}, err
}
+
ref, ok, err := s.store.GetOpSeqRef(lastSeq)
+
if err != nil {
return types.ExportRecord{}, err
}
+
if !ok {
return types.ExportRecord{}, fmt.Errorf("missing op reference for seq %d", lastSeq)
}
+
payload, err := s.blockLog.ReadRecord(ref)
+
if err != nil {
return types.ExportRecord{}, err
}
+
return types.ExportRecord{
Seq: lastSeq,
DID: did,
@@ -722,32 +921,44 @@ func (s *Service) LoadCurrentPLCData(ctx context.Context, did string) (map[strin
if err := s.CorruptionError(); err != nil {
return nil, err
}
+
if s.cfg.Mode != config.ModeMirror || s.blockLog == nil {
state, ok, err := s.store.GetState(did)
+
if err != nil {
return nil, err
}
+
if !ok {
return nil, ErrDIDNotFound
}
+
var doc map[string]any
+
if err := json.Unmarshal(state.DIDDocument, &doc); err != nil {
return nil, err
}
+
return doc, nil
}
+
last, err := s.LoadLatestDIDOperation(ctx, did)
+
if err != nil {
return nil, err
}
+
var op map[string]any
+
if err := json.Unmarshal(last.Operation, &op); err != nil {
return nil, fmt.Errorf("decode latest operation: %w", err)
}
+
delete(op, "sig")
delete(op, "signature")
delete(op, "sigPayload")
delete(op, "signaturePayload")
+
return op, nil
}
@@ -755,16 +966,21 @@ func (s *Service) StreamExport(ctx context.Context, after time.Time, limit int,
if err := s.CorruptionError(); err != nil {
return err
}
+
if s.cfg.Mode != config.ModeMirror || s.blockLog == nil {
return nil
}
+
if limit <= 0 {
limit = 1000
}
+
const maxCount = 1000
+
if limit > maxCount {
limit = maxCount
}
+
afterSet := !after.IsZero()
emitted := 0
stop := errors.New("stop export iteration")
@@ -772,25 +988,33 @@ func (s *Service) StreamExport(ctx context.Context, after time.Time, limit int,
if err := ctx.Err(); err != nil {
return err
}
+
if emitted >= limit {
return stop
}
+
if afterSet {
if strings.TrimSpace(ref.Received) == "" {
return nil
}
+
ts, err := time.Parse(time.RFC3339, ref.Received)
+
if err != nil {
return nil
}
+
if !ts.After(after) {
return nil
}
}
+
payload, err := s.blockLog.ReadRecord(ref)
+
if err != nil {
return err
}
+
rec := types.ExportRecord{
Seq: seq,
DID: ref.DID,
@@ -799,41 +1023,54 @@ func (s *Service) StreamExport(ctx context.Context, after time.Time, limit int,
Nullified: detectNullified(payload),
Operation: json.RawMessage(payload),
}
+
emitted++
+
return emit(rec)
})
+
if err == nil || errors.Is(err, stop) {
return nil
}
+
return err
}
func detectNullified(operation []byte) bool {
var payload map[string]any
+
if err := json.Unmarshal(operation, &payload); err != nil {
return false
}
+
v, _ := payload["nullified"].(bool)
+
return v
}
func (s *Service) Flush(ctx context.Context) error {
_ = ctx
+
if err := s.CorruptionError(); err != nil {
return err
}
+
if s.appender == nil {
return nil
}
+
flush, err := s.appender.Flush()
+
if err != nil {
return err
}
+
if flush != nil {
if err := s.store.ApplyOperationsBatch(nil, []storage.BlockHashEntry{{BlockID: flush.BlockID, Hash: flush.Hash}}); err != nil {
return err
}
}
+
return nil
}
@@ -841,6 +1078,7 @@ func (s *Service) MarkCorrupted(err error) {
if err == nil {
return
}
+
s.corrupted.Store(true)
s.corruptErr.Store(err.Error())
}
@@ -853,10 +1091,13 @@ func (s *Service) CorruptionError() error {
if !s.IsCorrupted() {
return nil
}
+
msg, _ := s.corruptErr.Load().(string)
+
if msg == "" {
msg = "data corruption detected"
}
+
return fmt.Errorf("data corruption detected: %s", msg)
}
@@ -865,15 +1106,19 @@ func extractRotationKeysFromPayload(payload map[string]any, prior []string) []st
seen := map[string]struct{}{}
add := func(v string) {
v = strings.TrimSpace(v)
+
if v == "" {
return
}
+
if _, ok := seen[v]; ok {
return
}
+
seen[v] = struct{}{}
out = append(out, v)
}
+
if arr, ok := payload["rotationKeys"].([]any); ok {
for _, v := range arr {
if s, ok := v.(string); ok {
@@ -881,17 +1126,21 @@ func extractRotationKeysFromPayload(payload map[string]any, prior []string) []st
}
}
}
+
if recovery, ok := payload["recoveryKey"].(string); ok {
add(recovery)
}
+
if signing, ok := payload["signingKey"].(string); ok {
add(signing)
}
+
if len(out) == 0 {
for _, v := range prior {
add(v)
}
}
+
return out
}
@@ -908,9 +1157,11 @@ func (s *Service) SetMetricsSink(sink MetricsSink) {
func (s *Service) Stats() Stats {
ingested := atomic.LoadUint64(&s.stats.IngestedOps)
opsPerSec := 0.0
+
if elapsed := time.Since(s.startedAt).Seconds(); elapsed > 0 {
opsPerSec = float64(ingested) / elapsed
}
+
return Stats{
IngestedOps: ingested,
Errors: atomic.LoadUint64(&s.stats.Errors),
@@ -925,12 +1176,15 @@ func (s *Service) Stats() Stats {
func countStates(store storage.Store) (uint64, error) {
var count uint64
+
if err := store.ForEachState(func(types.StateV1) error {
count++
+
return nil
}); err != nil {
return 0, err
}
+
return count, nil
}
@@ -959,34 +1213,47 @@ func newBlockAppender(log *storage.BlockLog, buffer int) *blockAppender {
queue: make(chan appendRequest, buffer),
closed: make(chan struct{}),
}
+
go ba.run()
+
return ba
}
func (b *blockAppender) run() {
defer close(b.closed)
+
for req := range b.queue {
if req.Flush {
flush, err := b.log.Flush()
+
req.Reply <- appendResult{Flush: flush, Err: err}
+
continue
}
+
ref, flush, err := b.log.Append(req.Operation.Sequence, req.Operation.DID, req.Operation.CID, req.Operation.Prev, req.Operation.CanonicalBytes)
+
req.Reply <- appendResult{Ref: ref, Flush: flush, Err: err}
}
}
func (b *blockAppender) Append(op types.ParsedOperation) (appendResult, error) {
reply := make(chan appendResult, 1)
+
b.queue <- appendRequest{Operation: op, Reply: reply}
+
res := <-reply
+
return res, res.Err
}
func (b *blockAppender) Flush() (*storage.FlushInfo, error) {
reply := make(chan appendResult, 1)
+
b.queue <- appendRequest{Flush: true, Reply: reply}
+
res := <-reply
+
return res.Flush, res.Err
}
diff --git a/internal/ingest/service_integration_test.go b/internal/ingest/service_integration_test.go
index e01dd4e..9985367 100644
--- a/internal/ingest/service_integration_test.go
+++ b/internal/ingest/service_integration_test.go
@@ -10,7 +10,6 @@ import (
"os"
"path/filepath"
"testing"
-
"github.com/Fuwn/plutia/internal/checkpoint"
"github.com/Fuwn/plutia/internal/config"
"github.com/Fuwn/plutia/internal/storage"
@@ -20,15 +19,19 @@ import (
func TestReplayIntegration(t *testing.T) {
tmp := t.TempDir()
dataDir := filepath.Join(tmp, "data")
+
if err := os.MkdirAll(dataDir, 0o755); err != nil {
t.Fatalf("mkdir data dir: %v", err)
}
keySeed := make([]byte, ed25519.SeedSize)
+
if _, err := rand.Read(keySeed); err != nil {
t.Fatalf("rand seed: %v", err)
}
+
keyPath := filepath.Join(tmp, "mirror.key")
+
if err := os.WriteFile(keyPath, []byte(base64.RawURLEncoding.EncodeToString(keySeed)), 0o600); err != nil {
t.Fatalf("write mirror key: %v", err)
}
@@ -36,27 +39,35 @@ func TestReplayIntegration(t *testing.T) {
records := buildSignedRecords(t)
sourcePath := filepath.Join(tmp, "sample.ndjson")
file, err := os.Create(sourcePath)
+
if err != nil {
t.Fatalf("create source: %v", err)
}
+
for _, rec := range records {
b, _ := json.Marshal(rec)
+
if _, err := fmt.Fprintln(file, string(b)); err != nil {
t.Fatalf("write source: %v", err)
}
}
+
file.Close()
store, err := storage.OpenPebble(dataDir)
+
if err != nil {
t.Fatalf("open pebble: %v", err)
}
+
defer store.Close()
+
if err := store.SetMode(config.ModeMirror); err != nil {
t.Fatalf("set mode: %v", err)
}
bl, err := storage.OpenBlockLog(dataDir, 3, 4)
+
if err != nil {
t.Fatalf("open block log: %v", err)
}
@@ -73,31 +84,39 @@ func TestReplayIntegration(t *testing.T) {
MirrorPrivateKeyPath: keyPath,
}
service := NewService(cfg, store, NewClient(sourcePath), bl, checkpoint.NewManager(store, dataDir, keyPath))
+
if err := service.Replay(context.Background()); err != nil {
t.Fatalf("replay: %v", err)
}
+
if err := service.Flush(context.Background()); err != nil {
t.Fatalf("flush: %v", err)
}
seq, err := store.GetGlobalSeq()
+
if err != nil {
t.Fatalf("get global seq: %v", err)
}
+
if seq != 3 {
t.Fatalf("global seq mismatch: got %d want 3", seq)
}
s, ok, err := store.GetState("did:plc:alice")
+
if err != nil {
t.Fatalf("get state: %v", err)
}
+
if !ok {
t.Fatalf("missing alice state")
}
+
if s.LatestOpSeq != 2 {
t.Fatalf("latest op seq mismatch for alice: got %d want 2", s.LatestOpSeq)
}
+
if err := service.VerifyDID(context.Background(), "did:plc:alice"); err != nil {
t.Fatalf("verify alice did: %v", err)
}
@@ -105,6 +124,7 @@ func TestReplayIntegration(t *testing.T) {
if _, err := service.Snapshot(context.Background()); err != nil {
t.Fatalf("snapshot: %v", err)
}
+
if _, ok, err := store.GetLatestCheckpoint(); err != nil || !ok {
t.Fatalf("expected latest checkpoint, err=%v ok=%v", err, ok)
}
@@ -112,14 +132,19 @@ func TestReplayIntegration(t *testing.T) {
func buildSignedRecords(t *testing.T) []types.ExportRecord {
t.Helper()
+
pub1, priv1, err := ed25519.GenerateKey(rand.Reader)
+
if err != nil {
t.Fatalf("generate key1: %v", err)
}
+
pub2, priv2, err := ed25519.GenerateKey(rand.Reader)
+
if err != nil {
t.Fatalf("generate key2: %v", err)
}
+
var out []types.ExportRecord
mk := func(seq uint64, did, prev string, pub ed25519.PublicKey, priv ed25519.PrivateKey) types.ExportRecord {
@@ -127,9 +152,11 @@ func buildSignedRecords(t *testing.T) []types.ExportRecord {
"did": did,
"didDoc": map[string]any{"id": did, "seq": seq},
}
+
if prev != "" {
payloadDoc["prev"] = prev
}
+
payloadBytes, _ := json.Marshal(payloadDoc)
canon, _ := types.CanonicalizeJSON(payloadBytes)
sig := ed25519.Sign(priv, canon)
@@ -140,34 +167,41 @@ func buildSignedRecords(t *testing.T) []types.ExportRecord {
"sigPayload": base64.RawURLEncoding.EncodeToString(canon),
"sig": base64.RawURLEncoding.EncodeToString(sig),
}
+
if prev != "" {
op["prev"] = prev
}
+
opRaw, _ := json.Marshal(op)
opCanon, _ := types.CanonicalizeJSON(opRaw)
cid := types.ComputeDigestCID(opCanon)
+
return types.ExportRecord{Seq: seq, DID: did, CID: cid, Operation: opRaw}
}
-
rec1 := mk(1, "did:plc:alice", "", pub1, priv1)
rec2 := mk(2, "did:plc:alice", rec1.CID, pub1, priv1)
rec3 := mk(3, "did:plc:bob", "", pub2, priv2)
out = append(out, rec1, rec2, rec3)
+
return out
}
func TestRecomputeTipAtOrBeforeHonorsContextCancellation(t *testing.T) {
tmp := t.TempDir()
dataDir := filepath.Join(tmp, "data")
+
if err := os.MkdirAll(dataDir, 0o755); err != nil {
t.Fatalf("mkdir data dir: %v", err)
}
keySeed := make([]byte, ed25519.SeedSize)
+
if _, err := rand.Read(keySeed); err != nil {
t.Fatalf("rand seed: %v", err)
}
+
keyPath := filepath.Join(tmp, "mirror.key")
+
if err := os.WriteFile(keyPath, []byte(base64.RawURLEncoding.EncodeToString(keySeed)), 0o600); err != nil {
t.Fatalf("write mirror key: %v", err)
}
@@ -175,29 +209,39 @@ func TestRecomputeTipAtOrBeforeHonorsContextCancellation(t *testing.T) {
records := buildSignedRecords(t)
sourcePath := filepath.Join(tmp, "sample.ndjson")
file, err := os.Create(sourcePath)
+
if err != nil {
t.Fatalf("create source: %v", err)
}
+
for _, rec := range records {
b, _ := json.Marshal(rec)
+
if _, err := fmt.Fprintln(file, string(b)); err != nil {
t.Fatalf("write source: %v", err)
}
}
+
file.Close()
store, err := storage.OpenPebble(dataDir)
+
if err != nil {
t.Fatalf("open pebble: %v", err)
}
+
defer store.Close()
+
if err := store.SetMode(config.ModeMirror); err != nil {
t.Fatalf("set mode: %v", err)
}
+
bl, err := storage.OpenBlockLog(dataDir, 3, 4)
+
if err != nil {
t.Fatalf("open block log: %v", err)
}
+
cfg := config.Config{
Mode: config.ModeMirror,
DataDir: dataDir,
@@ -211,15 +255,19 @@ func TestRecomputeTipAtOrBeforeHonorsContextCancellation(t *testing.T) {
PollInterval: 5,
}
service := NewService(cfg, store, NewClient(sourcePath), bl, checkpoint.NewManager(store, dataDir, keyPath))
+
if err := service.Replay(context.Background()); err != nil {
t.Fatalf("replay: %v", err)
}
+
if err := service.Flush(context.Background()); err != nil {
t.Fatalf("flush: %v", err)
}
ctx, cancel := context.WithCancel(context.Background())
+
cancel()
+
if _, _, err := service.RecomputeTipAtOrBefore(ctx, "did:plc:alice", 2); err == nil {
t.Fatalf("expected cancellation error")
}
diff --git a/internal/ingest/service_order_test.go b/internal/ingest/service_order_test.go
index 0e2299c..1d7c766 100644
--- a/internal/ingest/service_order_test.go
+++ b/internal/ingest/service_order_test.go
@@ -2,24 +2,28 @@ package ingest
import (
"testing"
-
"github.com/Fuwn/plutia/internal/types"
)
func TestCollectVerifiedInOrder_OutOfOrderInput(t *testing.T) {
results := make(chan verifyResult, 3)
+
results <- verifyResult{index: 2, op: types.ParsedOperation{Sequence: 3}}
results <- verifyResult{index: 0, op: types.ParsedOperation{Sequence: 1}}
results <- verifyResult{index: 1, op: types.ParsedOperation{Sequence: 2}}
+
close(results)
ordered, err := collectVerifiedInOrder(3, results)
+
if err != nil {
t.Fatalf("collect verified: %v", err)
}
+
if len(ordered) != 3 {
t.Fatalf("ordered length mismatch: got %d want 3", len(ordered))
}
+
for i := 0; i < 3; i++ {
if ordered[i].op.Sequence != uint64(i+1) {
t.Fatalf("unexpected sequence at index %d: got %d want %d", i, ordered[i].op.Sequence, i+1)
@@ -29,8 +33,10 @@ func TestCollectVerifiedInOrder_OutOfOrderInput(t *testing.T) {
func TestCollectVerifiedInOrder_MissingResult(t *testing.T) {
results := make(chan verifyResult, 2)
+
results <- verifyResult{index: 0, op: types.ParsedOperation{Sequence: 1}}
results <- verifyResult{index: 2, op: types.ParsedOperation{Sequence: 3}}
+
close(results)
if _, err := collectVerifiedInOrder(3, results); err == nil {