diff options
Diffstat (limited to 'internal/ingest')
| -rw-r--r-- | internal/ingest/client.go | 83 | ||||
| -rw-r--r-- | internal/ingest/client_retry_test.go | 12 | ||||
| -rw-r--r-- | internal/ingest/client_test.go | 6 | ||||
| -rw-r--r-- | internal/ingest/service.go | 269 | ||||
| -rw-r--r-- | internal/ingest/service_integration_test.go | 52 | ||||
| -rw-r--r-- | internal/ingest/service_order_test.go | 8 |
6 files changed, 422 insertions, 8 deletions
diff --git a/internal/ingest/client.go b/internal/ingest/client.go index d25b73f..12d8bd6 100644 --- a/internal/ingest/client.go +++ b/internal/ingest/client.go @@ -17,7 +17,6 @@ import ( "strconv" "strings" "time" - "github.com/Fuwn/plutia/internal/types" ) @@ -39,17 +38,21 @@ func NewClient(source string, opts ...ClientOptions) *Client { BaseDelay: 250 * time.Millisecond, MaxDelay: 10 * time.Second, } + if len(opts) > 0 { if opts[0].MaxAttempts > 0 { cfg.MaxAttempts = opts[0].MaxAttempts } + if opts[0].BaseDelay > 0 { cfg.BaseDelay = opts[0].BaseDelay } + if opts[0].MaxDelay > 0 { cfg.MaxDelay = opts[0].MaxDelay } } + transport := &http.Transport{ Proxy: http.ProxyFromEnvironment, DialContext: (&net.Dialer{Timeout: 10 * time.Second, KeepAlive: 30 * time.Second}).DialContext, @@ -60,6 +63,7 @@ func NewClient(source string, opts ...ClientOptions) *Client { TLSHandshakeTimeout: 10 * time.Second, ExpectContinueTimeout: 1 * time.Second, } + return &Client{ source: strings.TrimRight(source, "/"), opts: cfg, @@ -80,45 +84,63 @@ func (c *Client) FetchExportLimited(ctx context.Context, after uint64, limit uin } u, err := url.Parse(c.source) + if err != nil { return nil, fmt.Errorf("parse plc source: %w", err) } + u.Path = strings.TrimRight(u.Path, "/") + "/export" q := u.Query() + q.Set("after", fmt.Sprintf("%d", after)) - u.RawQuery = q.Encode() + u.RawQuery = q.Encode() req, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil) + if err != nil { return nil, fmt.Errorf("new request: %w", err) } + maxAttempts := c.opts.MaxAttempts + if maxAttempts < 1 { maxAttempts = 1 } + var lastErr error + for attempt := 1; attempt <= maxAttempts; attempt++ { records, retryAfter, retryable, err := c.fetchExportOnce(req, limit) + if err == nil { return records, nil } + lastErr = err + if !retryable || attempt == maxAttempts || ctx.Err() != nil { break } + delay := retryAfter + if delay <= 0 { delay = c.backoffDelay(attempt) } + log.Printf("retrying plc export fetch attempt=%d after_seq=%d delay=%s reason=%v", attempt, after, delay, err) + timer := time.NewTimer(delay) + select { case <-ctx.Done(): timer.Stop() + return nil, ctx.Err() case <-timer.C: } } + return nil, lastErr } @@ -133,11 +155,15 @@ func (e httpStatusError) Error() string { func (c *Client) fetchExportOnce(req *http.Request, limit uint64) ([]types.ExportRecord, time.Duration, bool, error) { reqClone := req.Clone(req.Context()) + reqClone.Header.Set("Accept-Encoding", "gzip") + resp, err := c.http.Do(reqClone) + if err != nil { return nil, 0, isTransientNetworkErr(err), fmt.Errorf("fetch export: %w", err) } + defer resp.Body.Close() if resp.StatusCode != http.StatusOK { @@ -145,12 +171,16 @@ func (c *Client) fetchExportOnce(req *http.Request, limit uint64) ([]types.Expor body := strings.TrimSpace(string(b)) retryDelay := parseRetryAfter(resp.Header.Get("Retry-After")) err := httpStatusError{StatusCode: resp.StatusCode, Body: body} + return nil, retryDelay, shouldRetryStatus(resp.StatusCode), err } + records, err := decodeExportBody(resp.Body, limit) + if err != nil { return nil, 0, false, err } + return records, 0, false, nil } @@ -158,23 +188,31 @@ func (c *Client) backoffDelay(attempt int) time.Duration { if attempt < 1 { attempt = 1 } + delay := c.opts.BaseDelay + if delay <= 0 { delay = 250 * time.Millisecond } + maxDelay := c.opts.MaxDelay + if maxDelay <= 0 { maxDelay = 10 * time.Second } + for i := 1; i < attempt; i++ { delay *= 2 + if delay >= maxDelay { return maxDelay } } + if delay > maxDelay { return maxDelay } + return delay } @@ -189,18 +227,23 @@ func shouldRetryStatus(status int) bool { func parseRetryAfter(v string) time.Duration { v = strings.TrimSpace(v) + if v == "" { return 0 } + if secs, err := strconv.Atoi(v); err == nil && secs > 0 { return time.Duration(secs) * time.Second } + if ts, err := http.ParseTime(v); err == nil { delay := time.Until(ts) + if delay > 0 { return delay } } + return 0 } @@ -208,63 +251,85 @@ func isTransientNetworkErr(err error) bool { if err == nil { return false } + var netErr net.Error + if errors.As(err, &netErr) { return true } + return errors.Is(err, io.ErrUnexpectedEOF) || errors.Is(err, io.EOF) } func decodeExportBody(r io.Reader, limit uint64) ([]types.ExportRecord, error) { br := bufio.NewReader(r) first, err := peekFirstNonSpace(br) + if err != nil { if err == io.EOF { return nil, nil } + return nil, err } if first == '[' { b, err := io.ReadAll(br) + if err != nil { return nil, fmt.Errorf("read export body: %w", err) } + trimmed := bytes.TrimSpace(b) + var records []types.ExportRecord + if err := json.Unmarshal(trimmed, &records); err != nil { return nil, fmt.Errorf("decode export json array: %w", err) } + if limit > 0 && uint64(len(records)) > limit { records = records[:limit] } + return records, nil } + out := make([]types.ExportRecord, 0, 1024) + for { line, err := br.ReadBytes('\n') isEOF := errors.Is(err, io.EOF) + if err != nil && !isEOF { return nil, fmt.Errorf("read ndjson line: %w", err) } + if limit > 0 && uint64(len(out)) >= limit { return out, nil } + trimmed := bytes.TrimSpace(line) + if len(trimmed) > 0 { var rec types.ExportRecord + if err := json.Unmarshal(trimmed, &rec); err != nil { if isEOF && isTrailingNDJSONPartial(err) { return out, nil } + return nil, fmt.Errorf("decode ndjson line: %w", err) } + out = append(out, rec) } + if isEOF { break } } + return out, nil } @@ -272,19 +337,23 @@ func isTrailingNDJSONPartial(err error) bool { if !errors.Is(err, io.ErrUnexpectedEOF) && !strings.Contains(err.Error(), "unexpected end of JSON input") { return false } + return true } func peekFirstNonSpace(br *bufio.Reader) (byte, error) { for { b, err := br.ReadByte() + if err != nil { return 0, err } + if !isSpace(b) { if err := br.UnreadByte(); err != nil { return 0, fmt.Errorf("unread byte: %w", err) } + return b, nil } } @@ -301,27 +370,37 @@ func isSpace(b byte) bool { func (c *Client) fetchFromFile(after uint64, limit uint64) ([]types.ExportRecord, error) { path := c.source + if strings.HasPrefix(path, "file://") { path = strings.TrimPrefix(path, "file://") } + path = filepath.Clean(path) b, err := os.ReadFile(path) + if err != nil { return nil, fmt.Errorf("read source file: %w", err) } + recs, err := decodeExportBody(bytes.NewReader(b), 0) + if err != nil { return nil, err } + out := make([]types.ExportRecord, 0, len(recs)) + for _, r := range recs { if r.Seq <= after { continue } + out = append(out, r) + if limit > 0 && uint64(len(out)) >= limit { break } } + return out, nil } diff --git a/internal/ingest/client_retry_test.go b/internal/ingest/client_retry_test.go index a23fceb..b019ac6 100644 --- a/internal/ingest/client_retry_test.go +++ b/internal/ingest/client_retry_test.go @@ -12,15 +12,20 @@ import ( func TestFetchExportLimitedRetries429ThenSucceeds(t *testing.T) { var attempts atomic.Int32 + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { n := attempts.Add(1) + if n <= 2 { w.Header().Set("Retry-After", "0") http.Error(w, "rate limited", http.StatusTooManyRequests) + return } + _, _ = fmt.Fprintln(w, `{"seq":1,"did":"did:plc:alice","cid":"cid1","operation":{"x":1}}`) })) + defer ts.Close() client := NewClient(ts.URL, ClientOptions{ @@ -29,12 +34,15 @@ func TestFetchExportLimitedRetries429ThenSucceeds(t *testing.T) { MaxDelay: 2 * time.Millisecond, }) records, err := client.FetchExportLimited(context.Background(), 0, 0) + if err != nil { t.Fatalf("fetch export: %v", err) } + if len(records) != 1 { t.Fatalf("record count mismatch: got %d want 1", len(records)) } + if got := attempts.Load(); got != 3 { t.Fatalf("attempt count mismatch: got %d want 3", got) } @@ -42,10 +50,12 @@ func TestFetchExportLimitedRetries429ThenSucceeds(t *testing.T) { func TestFetchExportLimitedDoesNotRetry400(t *testing.T) { var attempts atomic.Int32 + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { attempts.Add(1) http.Error(w, "bad request", http.StatusBadRequest) })) + defer ts.Close() client := NewClient(ts.URL, ClientOptions{ @@ -54,9 +64,11 @@ func TestFetchExportLimitedDoesNotRetry400(t *testing.T) { MaxDelay: 2 * time.Millisecond, }) _, err := client.FetchExportLimited(context.Background(), 0, 0) + if err == nil { t.Fatalf("expected error for 400 response") } + if got := attempts.Load(); got != 1 { t.Fatalf("unexpected retries on 400: got attempts=%d want 1", got) } diff --git a/internal/ingest/client_test.go b/internal/ingest/client_test.go index ff80225..50595ec 100644 --- a/internal/ingest/client_test.go +++ b/internal/ingest/client_test.go @@ -11,14 +11,16 @@ func TestDecodeExportBody_IgnoresTrailingPartialNDJSONLine(t *testing.T) { `{"seq":2,"did":"did:plc:bob","cid":"cid2","operation":{"x":2}}`, `{"seq":3,"did":"did:plc:carol","cid":"cid3","operation":{"x":3}`, }, "\n") - records, err := decodeExportBody(strings.NewReader(body), 0) + if err != nil { t.Fatalf("decode export body: %v", err) } + if len(records) != 2 { t.Fatalf("record count mismatch: got %d want 2", len(records)) } + if records[0].Seq != 1 || records[1].Seq != 2 { t.Fatalf("unexpected sequences: got [%d %d], want [1 2]", records[0].Seq, records[1].Seq) } @@ -30,8 +32,8 @@ func TestDecodeExportBody_FailsOnMalformedNonTrailingNDJSONLine(t *testing.T) { `{"seq":2,"did":"did:plc:bob","cid":"cid2","operation":{"x":2}`, `{"seq":3,"did":"did:plc:carol","cid":"cid3","operation":{"x":3}}`, }, "\n") - _, err := decodeExportBody(strings.NewReader(body), 0) + if err == nil { t.Fatalf("expected malformed middle line to fail") } diff --git a/internal/ingest/service.go b/internal/ingest/service.go index f6b2433..1779db9 100644 --- a/internal/ingest/service.go +++ b/internal/ingest/service.go @@ -15,7 +15,6 @@ import ( "sync" "sync/atomic" "time" - "github.com/Fuwn/plutia/internal/checkpoint" "github.com/Fuwn/plutia/internal/config" "github.com/Fuwn/plutia/internal/state" @@ -73,18 +72,23 @@ func NewService(cfg config.Config, store storage.Store, client *Client, blockLog blockLog: blockLog, startedAt: time.Now(), } + if didCount, err := countStates(store); err == nil { atomic.StoreUint64(&s.stats.DIDCount, didCount) } + if cp, ok, err := store.GetLatestCheckpoint(); err == nil && ok { atomic.StoreUint64(&s.stats.CheckpointSeq, cp.Sequence) } + if cfg.Mode == config.ModeMirror && blockLog != nil { s.appender = newBlockAppender(blockLog, 1024) + if _, err := blockLog.ValidateAndRecover(store); err != nil { s.MarkCorrupted(err) } } + return s } @@ -95,9 +99,11 @@ func (s *Service) SetMaxOps(max uint64) { func (s *Service) Replay(ctx context.Context) error { for { changed, err := s.RunOnce(ctx) + if err != nil { return err } + if !changed { return nil } @@ -106,11 +112,14 @@ func (s *Service) Replay(ctx context.Context) error { func (s *Service) Poll(ctx context.Context) error { ticker := time.NewTicker(s.cfg.PollInterval) + defer ticker.Stop() + for { if _, err := s.RunOnce(ctx); err != nil { atomic.AddUint64(&s.stats.Errors, 1) } + select { case <-ctx.Done(): return ctx.Err() @@ -123,42 +132,59 @@ func (s *Service) RunOnce(ctx context.Context) (bool, error) { if err := s.CorruptionError(); err != nil { return false, err } + if s.maxOps > 0 && s.runOps >= s.maxOps { return false, nil } + lastSeq, err := s.store.GetGlobalSeq() + if err != nil { return false, fmt.Errorf("load global sequence: %w", err) } + limit := uint64(0) + if s.maxOps > 0 { remaining := s.maxOps - s.runOps + if remaining == 0 { return false, nil } + limit = remaining } + records, err := s.client.FetchExportLimited(ctx, lastSeq, limit) + if err != nil { return false, err } + if len(records) == 0 { atomic.StoreUint64(&s.stats.LagOps, 0) + return false, nil } + latestSourceSeq := records[len(records)-1].Seq committed, err := s.processRecords(ctx, records) s.runOps += committed + if err != nil { atomic.AddUint64(&s.stats.Errors, 1) + return committed > 0, err } + lastCommitted := atomic.LoadUint64(&s.stats.LastSeq) + if latestSourceSeq > lastCommitted { atomic.StoreUint64(&s.stats.LagOps, latestSourceSeq-lastCommitted) } else { atomic.StoreUint64(&s.stats.LagOps, 0) } + return true, nil } @@ -177,50 +203,72 @@ type verifyResult struct { func (s *Service) processRecords(ctx context.Context, records []types.ExportRecord) (uint64, error) { verified, err := s.verifyRecords(ctx, records) + if err != nil { return 0, err } + return s.commitVerified(ctx, verified) } func (s *Service) verifyRecords(ctx context.Context, records []types.ExportRecord) ([]verifyResult, error) { _ = ctx workers := s.cfg.VerifyWorkers + if workers < 1 { workers = 1 } + queues := make([]chan verifyTask, workers) results := make(chan verifyResult, len(records)) + var wg sync.WaitGroup for i := 0; i < workers; i++ { queues[i] = make(chan verifyTask, 64) + wg.Add(1) + go func(queue <-chan verifyTask) { defer wg.Done() + cache := map[string]*types.StateV1{} + for task := range queue { op, err := types.ParseOperation(task.rec) + if err != nil { results <- verifyResult{index: task.index, err: err} + continue } + existing, err := s.loadExistingState(cache, op.DID) + if err != nil { results <- verifyResult{index: task.index, err: err} + continue } + if err := s.verifier.VerifyOperation(op, existing); err != nil { atomic.AddUint64(&s.stats.VerifyFailures, 1) + results <- verifyResult{index: task.index, err: err} + continue } + next, err := state.ComputeNextState(op, existing) + if err != nil { results <- verifyResult{index: task.index, err: err} + continue } + cache[op.DID] = cloneState(&next) + results <- verifyResult{index: task.index, op: op, state: next, newDID: existing == nil} } }(queues[i]) @@ -228,13 +276,17 @@ func (s *Service) verifyRecords(ctx context.Context, records []types.ExportRecor for idx, rec := range records { worker := didWorker(rec.DID, workers) + queues[worker] <- verifyTask{index: idx, rec: rec} } + for _, q := range queues { close(q) } + wg.Wait() close(results) + return collectVerifiedInOrder(len(records), results) } @@ -242,15 +294,21 @@ func (s *Service) loadExistingState(cache map[string]*types.StateV1, did string) if existing, ok := cache[did]; ok { return cloneState(existing), nil } + stateVal, ok, err := s.store.GetState(did) + if err != nil { return nil, err } + if !ok { cache[did] = nil + return nil, nil } + cache[did] = cloneState(&stateVal) + return cloneState(&stateVal), nil } @@ -259,37 +317,48 @@ func collectVerifiedInOrder(total int, results <-chan verifyResult) ([]verifyRes seen := make([]bool, total) firstErr := error(nil) received := 0 + for r := range results { received++ + if r.index < 0 || r.index >= total { if firstErr == nil { firstErr = fmt.Errorf("verify worker returned out-of-range index %d", r.index) } + continue } + if seen[r.index] { if firstErr == nil { firstErr = fmt.Errorf("duplicate verify result for index %d", r.index) } + continue } + seen[r.index] = true ordered[r.index] = r + if r.err != nil && firstErr == nil { firstErr = r.err } } + if received != total && firstErr == nil { firstErr = fmt.Errorf("verify result count mismatch: got %d want %d", received, total) } + if firstErr != nil { return nil, firstErr } + for i := range seen { if !seen[i] { return nil, fmt.Errorf("missing verify result index %d", i) } } + return ordered, nil } @@ -297,8 +366,10 @@ func didWorker(did string, workers int) int { if workers <= 1 { return 0 } + hasher := fnv.New32a() _, _ = hasher.Write([]byte(did)) + return int(hasher.Sum32() % uint32(workers)) } @@ -306,50 +377,64 @@ func cloneState(in *types.StateV1) *types.StateV1 { if in == nil { return nil } + out := *in out.DIDDocument = append([]byte(nil), in.DIDDocument...) out.RotationKeys = append([]string(nil), in.RotationKeys...) + return &out } func (s *Service) commitVerified(ctx context.Context, verified []verifyResult) (uint64, error) { _ = ctx batchSize := s.cfg.CommitBatchSize + if batchSize < 1 { batchSize = 1 } + pendingOps := make([]storage.OperationMutation, 0, batchSize) pendingSeqs := make([]uint64, 0, batchSize) pendingBlockHashes := map[uint64]string{} pendingNewDIDs := map[string]struct{}{} + var committed uint64 commit := func() error { if len(pendingOps) == 0 { return nil } + if s.cfg.Mode == config.ModeMirror { flush, err := s.appender.Flush() + if err != nil { return err } + if flush != nil { pendingBlockHashes[flush.BlockID] = flush.Hash } } + blockEntries := make([]storage.BlockHashEntry, 0, len(pendingBlockHashes)) + for id, hash := range pendingBlockHashes { blockEntries = append(blockEntries, storage.BlockHashEntry{BlockID: id, Hash: hash}) } + sort.Slice(blockEntries, func(i, j int) bool { return blockEntries[i].BlockID < blockEntries[j].BlockID }) if err := s.store.ApplyOperationsBatch(pendingOps, blockEntries); err != nil { return err } + lastSeq := pendingSeqs[len(pendingSeqs)-1] + atomic.StoreUint64(&s.stats.LastSeq, lastSeq) atomic.AddUint64(&s.stats.IngestedOps, uint64(len(pendingOps))) atomic.AddUint64(&s.stats.DIDCount, uint64(len(pendingNewDIDs))) + committed += uint64(len(pendingOps)) if s.checkpoints != nil { @@ -364,59 +449,78 @@ func (s *Service) commitVerified(ctx context.Context, verified []verifyResult) ( pendingOps = pendingOps[:0] pendingSeqs = pendingSeqs[:0] + clear(pendingBlockHashes) clear(pendingNewDIDs) + return nil } for _, v := range verified { var ref *types.BlockRefV1 + if s.cfg.Mode == config.ModeMirror { if s.appender == nil { return committed, fmt.Errorf("mirror mode requires block appender") } + result, err := s.appender.Append(v.op) + if err != nil { return committed, err } + if result.Flush != nil { pendingBlockHashes[result.Flush.BlockID] = result.Flush.Hash } + result.Ref.Received = time.Now().UTC().Format(time.RFC3339) ref = &result.Ref } + pendingOps = append(pendingOps, storage.OperationMutation{State: v.state, Ref: ref}) pendingSeqs = append(pendingSeqs, v.op.Sequence) + if v.newDID { pendingNewDIDs[v.op.DID] = struct{}{} } + if len(pendingOps) >= batchSize { if err := commit(); err != nil { return committed, err } } } + if err := commit(); err != nil { return committed, err } + return committed, nil } func (s *Service) createCheckpoint(sequence uint64) error { blocks, err := s.store.ListBlockHashes() + if err != nil { return fmt.Errorf("list blocks for checkpoint: %w", err) } + done := make(chan struct{}) usageCh := make(chan checkpointProcessUsage, 1) + go sampleCheckpointProcessUsage(done, usageCh) cp, metrics, err := s.checkpoints.BuildAndStoreFromStoreWithMetrics(sequence, blocks) + close(done) + usage := <-usageCh + if err != nil { return fmt.Errorf("build checkpoint: %w", err) } + log.Printf( "checkpoint_metrics seq=%d did_count=%d merkle_compute_ms=%d total_checkpoint_ms=%d cpu_pct=%.2f rss_peak_mb=%.2f completed_unix_ms=%d", cp.Sequence, @@ -428,41 +532,54 @@ func (s *Service) createCheckpoint(sequence uint64) error { time.Now().UnixMilli(), ) atomic.StoreUint64(&s.stats.CheckpointSeq, cp.Sequence) + if s.metricsSink != nil { s.metricsSink.ObserveCheckpoint(metrics.Total, cp.Sequence) } + return nil } func (s *Service) Snapshot(ctx context.Context) (types.CheckpointV1, error) { if s.appender != nil { flush, err := s.appender.Flush() + if err != nil { return types.CheckpointV1{}, err } + if flush != nil { if err := s.store.ApplyOperationsBatch(nil, []storage.BlockHashEntry{{BlockID: flush.BlockID, Hash: flush.Hash}}); err != nil { return types.CheckpointV1{}, err } } } + _ = ctx seq, err := s.store.GetGlobalSeq() + if err != nil { return types.CheckpointV1{}, err } + blocks, err := s.store.ListBlockHashes() + if err != nil { return types.CheckpointV1{}, err } + cp, metrics, err := s.checkpoints.BuildAndStoreFromStoreWithMetrics(seq, blocks) + if err != nil { return types.CheckpointV1{}, err } + atomic.StoreUint64(&s.stats.CheckpointSeq, cp.Sequence) + if s.metricsSink != nil { s.metricsSink.ObserveCheckpoint(metrics.Total, cp.Sequence) } + return cp, nil } @@ -474,31 +591,44 @@ type checkpointProcessUsage struct { func sampleCheckpointProcessUsage(done <-chan struct{}, out chan<- checkpointProcessUsage) { pid := os.Getpid() ticker := time.NewTicker(25 * time.Millisecond) + defer ticker.Stop() + var cpuSum float64 var samples int var rssPeak int64 + sample := func() { cpu, rss, err := processMetrics(pid) + if err != nil { return } + cpuSum += cpu + samples++ + if rss > rssPeak { rssPeak = rss } } + sample() + for { select { case <-done: sample() + avg := 0.0 + if samples > 0 { avg = cpuSum / float64(samples) } + out <- checkpointProcessUsage{CPUPercentAvg: avg, RSSPeakKB: rssPeak} + return case <-ticker.C: sample() @@ -508,15 +638,20 @@ func sampleCheckpointProcessUsage(done <-chan struct{}, out chan<- checkpointPro func processMetrics(pid int) (cpuPct float64, rssKB int64, err error) { out, err := exec.Command("ps", "-p", strconv.Itoa(pid), "-o", "pcpu=,rss=").Output() + if err != nil { return 0, 0, err } + fields := strings.Fields(string(out)) + if len(fields) < 2 { return 0, 0, fmt.Errorf("unexpected ps output: %q", strings.TrimSpace(string(out))) } + cpuPct, _ = strconv.ParseFloat(fields[0], 64) rssKB, _ = strconv.ParseInt(fields[1], 10, 64) + return cpuPct, rssKB, nil } @@ -524,51 +659,69 @@ func (s *Service) VerifyDID(ctx context.Context, did string) error { if err := s.CorruptionError(); err != nil { return err } + if s.cfg.Mode != config.ModeMirror { _, ok, err := s.store.GetState(did) + if err != nil { return err } + if !ok { return fmt.Errorf("did not found") } + return nil } + seqs, err := s.store.ListDIDSequences(did) + if err != nil { return err } + if len(seqs) == 0 { return fmt.Errorf("no operations for did %s", did) } var previous *types.StateV1 + for _, seq := range seqs { if err := ctx.Err(); err != nil { return err } + ref, ok, err := s.store.GetOpSeqRef(seq) + if err != nil { return err } + if !ok { return fmt.Errorf("missing op reference for seq %d", seq) } + payload, err := s.blockLog.ReadRecord(ref) + if err != nil { return err } + rec := types.ExportRecord{Seq: seq, DID: did, CID: ref.CID, Operation: payload} op, err := types.ParseOperation(rec) + if err != nil { return err } + if err := s.verifier.VerifyOperation(op, previous); err != nil { return fmt.Errorf("verify seq %d failed: %w", seq, err) } + s := types.StateV1{DID: did, ChainTipHash: op.CID, LatestOpSeq: seq} previous = &s } + return nil } @@ -576,51 +729,69 @@ func (s *Service) RecomputeTipAtOrBefore(ctx context.Context, did string, sequen if err := s.CorruptionError(); err != nil { return "", nil, err } + if s.cfg.Mode != config.ModeMirror { return "", nil, errors.New("historical proof requires mirror mode") } + seqs, err := s.store.ListDIDSequences(did) + if err != nil { return "", nil, err } + if len(seqs) == 0 { return "", nil, fmt.Errorf("no operations for did %s", did) } + filtered := make([]uint64, 0, len(seqs)) + for _, seq := range seqs { if seq <= sequence { filtered = append(filtered, seq) } } + if len(filtered) == 0 { return "", nil, fmt.Errorf("no operations for did %s at checkpoint %d", did, sequence) } + sort.Slice(filtered, func(i, j int) bool { return filtered[i] < filtered[j] }) var previous *types.StateV1 var tip string + for _, seq := range filtered { if err := ctx.Err(); err != nil { return "", nil, err } + ref, ok, err := s.store.GetOpSeqRef(seq) + if err != nil { return "", nil, err } + if !ok { return "", nil, fmt.Errorf("missing op reference for seq %d", seq) } + payload, err := s.blockLog.ReadRecord(ref) + if err != nil { return "", nil, err } + op, err := types.ParseOperation(types.ExportRecord{Seq: seq, DID: did, CID: ref.CID, Operation: payload}) + if err != nil { return "", nil, err } + if err := s.verifier.VerifyOperation(op, previous); err != nil { return "", nil, fmt.Errorf("verify seq %d failed: %w", seq, err) } + tip = op.CID next := types.StateV1{ DID: did, @@ -628,12 +799,15 @@ func (s *Service) RecomputeTipAtOrBefore(ctx context.Context, did string, sequen LatestOpSeq: seq, } prior := []string(nil) + if previous != nil { prior = append(prior, previous.RotationKeys...) } + next.RotationKeys = extractRotationKeysFromPayload(op.Payload, prior) previous = &next } + return tip, filtered, nil } @@ -641,32 +815,44 @@ func (s *Service) LoadDIDLog(ctx context.Context, did string) ([]types.ExportRec if err := s.CorruptionError(); err != nil { return nil, err } + if s.cfg.Mode != config.ModeMirror || s.blockLog == nil { return nil, ErrHistoryNotStored } + seqs, err := s.store.ListDIDSequences(did) + if err != nil { return nil, err } + if len(seqs) == 0 { return nil, ErrDIDNotFound } + out := make([]types.ExportRecord, 0, len(seqs)) + for _, seq := range seqs { if err := ctx.Err(); err != nil { return nil, err } + ref, ok, err := s.store.GetOpSeqRef(seq) + if err != nil { return nil, err } + if !ok { return nil, fmt.Errorf("missing op reference for seq %d", seq) } + payload, err := s.blockLog.ReadRecord(ref) + if err != nil { return nil, err } + out = append(out, types.ExportRecord{ Seq: seq, DID: did, @@ -676,6 +862,7 @@ func (s *Service) LoadDIDLog(ctx context.Context, did string) ([]types.ExportRec Operation: json.RawMessage(payload), }) } + return out, nil } @@ -683,31 +870,43 @@ func (s *Service) LoadLatestDIDOperation(ctx context.Context, did string) (types if err := s.CorruptionError(); err != nil { return types.ExportRecord{}, err } + if s.cfg.Mode != config.ModeMirror || s.blockLog == nil { return types.ExportRecord{}, ErrHistoryNotStored } + seqs, err := s.store.ListDIDSequences(did) + if err != nil { return types.ExportRecord{}, err } + if len(seqs) == 0 { return types.ExportRecord{}, ErrDIDNotFound } + lastSeq := seqs[len(seqs)-1] + if err := ctx.Err(); err != nil { return types.ExportRecord{}, err } + ref, ok, err := s.store.GetOpSeqRef(lastSeq) + if err != nil { return types.ExportRecord{}, err } + if !ok { return types.ExportRecord{}, fmt.Errorf("missing op reference for seq %d", lastSeq) } + payload, err := s.blockLog.ReadRecord(ref) + if err != nil { return types.ExportRecord{}, err } + return types.ExportRecord{ Seq: lastSeq, DID: did, @@ -722,32 +921,44 @@ func (s *Service) LoadCurrentPLCData(ctx context.Context, did string) (map[strin if err := s.CorruptionError(); err != nil { return nil, err } + if s.cfg.Mode != config.ModeMirror || s.blockLog == nil { state, ok, err := s.store.GetState(did) + if err != nil { return nil, err } + if !ok { return nil, ErrDIDNotFound } + var doc map[string]any + if err := json.Unmarshal(state.DIDDocument, &doc); err != nil { return nil, err } + return doc, nil } + last, err := s.LoadLatestDIDOperation(ctx, did) + if err != nil { return nil, err } + var op map[string]any + if err := json.Unmarshal(last.Operation, &op); err != nil { return nil, fmt.Errorf("decode latest operation: %w", err) } + delete(op, "sig") delete(op, "signature") delete(op, "sigPayload") delete(op, "signaturePayload") + return op, nil } @@ -755,16 +966,21 @@ func (s *Service) StreamExport(ctx context.Context, after time.Time, limit int, if err := s.CorruptionError(); err != nil { return err } + if s.cfg.Mode != config.ModeMirror || s.blockLog == nil { return nil } + if limit <= 0 { limit = 1000 } + const maxCount = 1000 + if limit > maxCount { limit = maxCount } + afterSet := !after.IsZero() emitted := 0 stop := errors.New("stop export iteration") @@ -772,25 +988,33 @@ func (s *Service) StreamExport(ctx context.Context, after time.Time, limit int, if err := ctx.Err(); err != nil { return err } + if emitted >= limit { return stop } + if afterSet { if strings.TrimSpace(ref.Received) == "" { return nil } + ts, err := time.Parse(time.RFC3339, ref.Received) + if err != nil { return nil } + if !ts.After(after) { return nil } } + payload, err := s.blockLog.ReadRecord(ref) + if err != nil { return err } + rec := types.ExportRecord{ Seq: seq, DID: ref.DID, @@ -799,41 +1023,54 @@ func (s *Service) StreamExport(ctx context.Context, after time.Time, limit int, Nullified: detectNullified(payload), Operation: json.RawMessage(payload), } + emitted++ + return emit(rec) }) + if err == nil || errors.Is(err, stop) { return nil } + return err } func detectNullified(operation []byte) bool { var payload map[string]any + if err := json.Unmarshal(operation, &payload); err != nil { return false } + v, _ := payload["nullified"].(bool) + return v } func (s *Service) Flush(ctx context.Context) error { _ = ctx + if err := s.CorruptionError(); err != nil { return err } + if s.appender == nil { return nil } + flush, err := s.appender.Flush() + if err != nil { return err } + if flush != nil { if err := s.store.ApplyOperationsBatch(nil, []storage.BlockHashEntry{{BlockID: flush.BlockID, Hash: flush.Hash}}); err != nil { return err } } + return nil } @@ -841,6 +1078,7 @@ func (s *Service) MarkCorrupted(err error) { if err == nil { return } + s.corrupted.Store(true) s.corruptErr.Store(err.Error()) } @@ -853,10 +1091,13 @@ func (s *Service) CorruptionError() error { if !s.IsCorrupted() { return nil } + msg, _ := s.corruptErr.Load().(string) + if msg == "" { msg = "data corruption detected" } + return fmt.Errorf("data corruption detected: %s", msg) } @@ -865,15 +1106,19 @@ func extractRotationKeysFromPayload(payload map[string]any, prior []string) []st seen := map[string]struct{}{} add := func(v string) { v = strings.TrimSpace(v) + if v == "" { return } + if _, ok := seen[v]; ok { return } + seen[v] = struct{}{} out = append(out, v) } + if arr, ok := payload["rotationKeys"].([]any); ok { for _, v := range arr { if s, ok := v.(string); ok { @@ -881,17 +1126,21 @@ func extractRotationKeysFromPayload(payload map[string]any, prior []string) []st } } } + if recovery, ok := payload["recoveryKey"].(string); ok { add(recovery) } + if signing, ok := payload["signingKey"].(string); ok { add(signing) } + if len(out) == 0 { for _, v := range prior { add(v) } } + return out } @@ -908,9 +1157,11 @@ func (s *Service) SetMetricsSink(sink MetricsSink) { func (s *Service) Stats() Stats { ingested := atomic.LoadUint64(&s.stats.IngestedOps) opsPerSec := 0.0 + if elapsed := time.Since(s.startedAt).Seconds(); elapsed > 0 { opsPerSec = float64(ingested) / elapsed } + return Stats{ IngestedOps: ingested, Errors: atomic.LoadUint64(&s.stats.Errors), @@ -925,12 +1176,15 @@ func (s *Service) Stats() Stats { func countStates(store storage.Store) (uint64, error) { var count uint64 + if err := store.ForEachState(func(types.StateV1) error { count++ + return nil }); err != nil { return 0, err } + return count, nil } @@ -959,34 +1213,47 @@ func newBlockAppender(log *storage.BlockLog, buffer int) *blockAppender { queue: make(chan appendRequest, buffer), closed: make(chan struct{}), } + go ba.run() + return ba } func (b *blockAppender) run() { defer close(b.closed) + for req := range b.queue { if req.Flush { flush, err := b.log.Flush() + req.Reply <- appendResult{Flush: flush, Err: err} + continue } + ref, flush, err := b.log.Append(req.Operation.Sequence, req.Operation.DID, req.Operation.CID, req.Operation.Prev, req.Operation.CanonicalBytes) + req.Reply <- appendResult{Ref: ref, Flush: flush, Err: err} } } func (b *blockAppender) Append(op types.ParsedOperation) (appendResult, error) { reply := make(chan appendResult, 1) + b.queue <- appendRequest{Operation: op, Reply: reply} + res := <-reply + return res, res.Err } func (b *blockAppender) Flush() (*storage.FlushInfo, error) { reply := make(chan appendResult, 1) + b.queue <- appendRequest{Flush: true, Reply: reply} + res := <-reply + return res.Flush, res.Err } diff --git a/internal/ingest/service_integration_test.go b/internal/ingest/service_integration_test.go index e01dd4e..9985367 100644 --- a/internal/ingest/service_integration_test.go +++ b/internal/ingest/service_integration_test.go @@ -10,7 +10,6 @@ import ( "os" "path/filepath" "testing" - "github.com/Fuwn/plutia/internal/checkpoint" "github.com/Fuwn/plutia/internal/config" "github.com/Fuwn/plutia/internal/storage" @@ -20,15 +19,19 @@ import ( func TestReplayIntegration(t *testing.T) { tmp := t.TempDir() dataDir := filepath.Join(tmp, "data") + if err := os.MkdirAll(dataDir, 0o755); err != nil { t.Fatalf("mkdir data dir: %v", err) } keySeed := make([]byte, ed25519.SeedSize) + if _, err := rand.Read(keySeed); err != nil { t.Fatalf("rand seed: %v", err) } + keyPath := filepath.Join(tmp, "mirror.key") + if err := os.WriteFile(keyPath, []byte(base64.RawURLEncoding.EncodeToString(keySeed)), 0o600); err != nil { t.Fatalf("write mirror key: %v", err) } @@ -36,27 +39,35 @@ func TestReplayIntegration(t *testing.T) { records := buildSignedRecords(t) sourcePath := filepath.Join(tmp, "sample.ndjson") file, err := os.Create(sourcePath) + if err != nil { t.Fatalf("create source: %v", err) } + for _, rec := range records { b, _ := json.Marshal(rec) + if _, err := fmt.Fprintln(file, string(b)); err != nil { t.Fatalf("write source: %v", err) } } + file.Close() store, err := storage.OpenPebble(dataDir) + if err != nil { t.Fatalf("open pebble: %v", err) } + defer store.Close() + if err := store.SetMode(config.ModeMirror); err != nil { t.Fatalf("set mode: %v", err) } bl, err := storage.OpenBlockLog(dataDir, 3, 4) + if err != nil { t.Fatalf("open block log: %v", err) } @@ -73,31 +84,39 @@ func TestReplayIntegration(t *testing.T) { MirrorPrivateKeyPath: keyPath, } service := NewService(cfg, store, NewClient(sourcePath), bl, checkpoint.NewManager(store, dataDir, keyPath)) + if err := service.Replay(context.Background()); err != nil { t.Fatalf("replay: %v", err) } + if err := service.Flush(context.Background()); err != nil { t.Fatalf("flush: %v", err) } seq, err := store.GetGlobalSeq() + if err != nil { t.Fatalf("get global seq: %v", err) } + if seq != 3 { t.Fatalf("global seq mismatch: got %d want 3", seq) } s, ok, err := store.GetState("did:plc:alice") + if err != nil { t.Fatalf("get state: %v", err) } + if !ok { t.Fatalf("missing alice state") } + if s.LatestOpSeq != 2 { t.Fatalf("latest op seq mismatch for alice: got %d want 2", s.LatestOpSeq) } + if err := service.VerifyDID(context.Background(), "did:plc:alice"); err != nil { t.Fatalf("verify alice did: %v", err) } @@ -105,6 +124,7 @@ func TestReplayIntegration(t *testing.T) { if _, err := service.Snapshot(context.Background()); err != nil { t.Fatalf("snapshot: %v", err) } + if _, ok, err := store.GetLatestCheckpoint(); err != nil || !ok { t.Fatalf("expected latest checkpoint, err=%v ok=%v", err, ok) } @@ -112,14 +132,19 @@ func TestReplayIntegration(t *testing.T) { func buildSignedRecords(t *testing.T) []types.ExportRecord { t.Helper() + pub1, priv1, err := ed25519.GenerateKey(rand.Reader) + if err != nil { t.Fatalf("generate key1: %v", err) } + pub2, priv2, err := ed25519.GenerateKey(rand.Reader) + if err != nil { t.Fatalf("generate key2: %v", err) } + var out []types.ExportRecord mk := func(seq uint64, did, prev string, pub ed25519.PublicKey, priv ed25519.PrivateKey) types.ExportRecord { @@ -127,9 +152,11 @@ func buildSignedRecords(t *testing.T) []types.ExportRecord { "did": did, "didDoc": map[string]any{"id": did, "seq": seq}, } + if prev != "" { payloadDoc["prev"] = prev } + payloadBytes, _ := json.Marshal(payloadDoc) canon, _ := types.CanonicalizeJSON(payloadBytes) sig := ed25519.Sign(priv, canon) @@ -140,34 +167,41 @@ func buildSignedRecords(t *testing.T) []types.ExportRecord { "sigPayload": base64.RawURLEncoding.EncodeToString(canon), "sig": base64.RawURLEncoding.EncodeToString(sig), } + if prev != "" { op["prev"] = prev } + opRaw, _ := json.Marshal(op) opCanon, _ := types.CanonicalizeJSON(opRaw) cid := types.ComputeDigestCID(opCanon) + return types.ExportRecord{Seq: seq, DID: did, CID: cid, Operation: opRaw} } - rec1 := mk(1, "did:plc:alice", "", pub1, priv1) rec2 := mk(2, "did:plc:alice", rec1.CID, pub1, priv1) rec3 := mk(3, "did:plc:bob", "", pub2, priv2) out = append(out, rec1, rec2, rec3) + return out } func TestRecomputeTipAtOrBeforeHonorsContextCancellation(t *testing.T) { tmp := t.TempDir() dataDir := filepath.Join(tmp, "data") + if err := os.MkdirAll(dataDir, 0o755); err != nil { t.Fatalf("mkdir data dir: %v", err) } keySeed := make([]byte, ed25519.SeedSize) + if _, err := rand.Read(keySeed); err != nil { t.Fatalf("rand seed: %v", err) } + keyPath := filepath.Join(tmp, "mirror.key") + if err := os.WriteFile(keyPath, []byte(base64.RawURLEncoding.EncodeToString(keySeed)), 0o600); err != nil { t.Fatalf("write mirror key: %v", err) } @@ -175,29 +209,39 @@ func TestRecomputeTipAtOrBeforeHonorsContextCancellation(t *testing.T) { records := buildSignedRecords(t) sourcePath := filepath.Join(tmp, "sample.ndjson") file, err := os.Create(sourcePath) + if err != nil { t.Fatalf("create source: %v", err) } + for _, rec := range records { b, _ := json.Marshal(rec) + if _, err := fmt.Fprintln(file, string(b)); err != nil { t.Fatalf("write source: %v", err) } } + file.Close() store, err := storage.OpenPebble(dataDir) + if err != nil { t.Fatalf("open pebble: %v", err) } + defer store.Close() + if err := store.SetMode(config.ModeMirror); err != nil { t.Fatalf("set mode: %v", err) } + bl, err := storage.OpenBlockLog(dataDir, 3, 4) + if err != nil { t.Fatalf("open block log: %v", err) } + cfg := config.Config{ Mode: config.ModeMirror, DataDir: dataDir, @@ -211,15 +255,19 @@ func TestRecomputeTipAtOrBeforeHonorsContextCancellation(t *testing.T) { PollInterval: 5, } service := NewService(cfg, store, NewClient(sourcePath), bl, checkpoint.NewManager(store, dataDir, keyPath)) + if err := service.Replay(context.Background()); err != nil { t.Fatalf("replay: %v", err) } + if err := service.Flush(context.Background()); err != nil { t.Fatalf("flush: %v", err) } ctx, cancel := context.WithCancel(context.Background()) + cancel() + if _, _, err := service.RecomputeTipAtOrBefore(ctx, "did:plc:alice", 2); err == nil { t.Fatalf("expected cancellation error") } diff --git a/internal/ingest/service_order_test.go b/internal/ingest/service_order_test.go index 0e2299c..1d7c766 100644 --- a/internal/ingest/service_order_test.go +++ b/internal/ingest/service_order_test.go @@ -2,24 +2,28 @@ package ingest import ( "testing" - "github.com/Fuwn/plutia/internal/types" ) func TestCollectVerifiedInOrder_OutOfOrderInput(t *testing.T) { results := make(chan verifyResult, 3) + results <- verifyResult{index: 2, op: types.ParsedOperation{Sequence: 3}} results <- verifyResult{index: 0, op: types.ParsedOperation{Sequence: 1}} results <- verifyResult{index: 1, op: types.ParsedOperation{Sequence: 2}} + close(results) ordered, err := collectVerifiedInOrder(3, results) + if err != nil { t.Fatalf("collect verified: %v", err) } + if len(ordered) != 3 { t.Fatalf("ordered length mismatch: got %d want 3", len(ordered)) } + for i := 0; i < 3; i++ { if ordered[i].op.Sequence != uint64(i+1) { t.Fatalf("unexpected sequence at index %d: got %d want %d", i, ordered[i].op.Sequence, i+1) @@ -29,8 +33,10 @@ func TestCollectVerifiedInOrder_OutOfOrderInput(t *testing.T) { func TestCollectVerifiedInOrder_MissingResult(t *testing.T) { results := make(chan verifyResult, 2) + results <- verifyResult{index: 0, op: types.ParsedOperation{Sequence: 1}} results <- verifyResult{index: 2, op: types.ParsedOperation{Sequence: 3}} + close(results) if _, err := collectVerifiedInOrder(3, results); err == nil { |