aboutsummaryrefslogtreecommitdiff
path: root/internal/ingest/service.go
diff options
context:
space:
mode:
authorFuwn <[email protected]>2026-02-26 15:41:45 -0800
committerFuwn <[email protected]>2026-02-26 15:41:45 -0800
commitfec9114caaa7d274e524793d5eb0cf2ef2c5af11 (patch)
tree0897394ccdfaf6633e1a4ca8eb02bff49bb93c00 /internal/ingest/service.go
parentfeat: add read-only PLC API compatibility endpoints (diff)
downloadplutia-test-fec9114caaa7d274e524793d5eb0cf2ef2c5af11.tar.xz
plutia-test-fec9114caaa7d274e524793d5eb0cf2ef2c5af11.zip
feat: Apply Iku formatting
Diffstat (limited to 'internal/ingest/service.go')
-rw-r--r--internal/ingest/service.go269
1 files changed, 268 insertions, 1 deletions
diff --git a/internal/ingest/service.go b/internal/ingest/service.go
index f6b2433..1779db9 100644
--- a/internal/ingest/service.go
+++ b/internal/ingest/service.go
@@ -15,7 +15,6 @@ import (
"sync"
"sync/atomic"
"time"
-
"github.com/Fuwn/plutia/internal/checkpoint"
"github.com/Fuwn/plutia/internal/config"
"github.com/Fuwn/plutia/internal/state"
@@ -73,18 +72,23 @@ func NewService(cfg config.Config, store storage.Store, client *Client, blockLog
blockLog: blockLog,
startedAt: time.Now(),
}
+
if didCount, err := countStates(store); err == nil {
atomic.StoreUint64(&s.stats.DIDCount, didCount)
}
+
if cp, ok, err := store.GetLatestCheckpoint(); err == nil && ok {
atomic.StoreUint64(&s.stats.CheckpointSeq, cp.Sequence)
}
+
if cfg.Mode == config.ModeMirror && blockLog != nil {
s.appender = newBlockAppender(blockLog, 1024)
+
if _, err := blockLog.ValidateAndRecover(store); err != nil {
s.MarkCorrupted(err)
}
}
+
return s
}
@@ -95,9 +99,11 @@ func (s *Service) SetMaxOps(max uint64) {
func (s *Service) Replay(ctx context.Context) error {
for {
changed, err := s.RunOnce(ctx)
+
if err != nil {
return err
}
+
if !changed {
return nil
}
@@ -106,11 +112,14 @@ func (s *Service) Replay(ctx context.Context) error {
func (s *Service) Poll(ctx context.Context) error {
ticker := time.NewTicker(s.cfg.PollInterval)
+
defer ticker.Stop()
+
for {
if _, err := s.RunOnce(ctx); err != nil {
atomic.AddUint64(&s.stats.Errors, 1)
}
+
select {
case <-ctx.Done():
return ctx.Err()
@@ -123,42 +132,59 @@ func (s *Service) RunOnce(ctx context.Context) (bool, error) {
if err := s.CorruptionError(); err != nil {
return false, err
}
+
if s.maxOps > 0 && s.runOps >= s.maxOps {
return false, nil
}
+
lastSeq, err := s.store.GetGlobalSeq()
+
if err != nil {
return false, fmt.Errorf("load global sequence: %w", err)
}
+
limit := uint64(0)
+
if s.maxOps > 0 {
remaining := s.maxOps - s.runOps
+
if remaining == 0 {
return false, nil
}
+
limit = remaining
}
+
records, err := s.client.FetchExportLimited(ctx, lastSeq, limit)
+
if err != nil {
return false, err
}
+
if len(records) == 0 {
atomic.StoreUint64(&s.stats.LagOps, 0)
+
return false, nil
}
+
latestSourceSeq := records[len(records)-1].Seq
committed, err := s.processRecords(ctx, records)
s.runOps += committed
+
if err != nil {
atomic.AddUint64(&s.stats.Errors, 1)
+
return committed > 0, err
}
+
lastCommitted := atomic.LoadUint64(&s.stats.LastSeq)
+
if latestSourceSeq > lastCommitted {
atomic.StoreUint64(&s.stats.LagOps, latestSourceSeq-lastCommitted)
} else {
atomic.StoreUint64(&s.stats.LagOps, 0)
}
+
return true, nil
}
@@ -177,50 +203,72 @@ type verifyResult struct {
func (s *Service) processRecords(ctx context.Context, records []types.ExportRecord) (uint64, error) {
verified, err := s.verifyRecords(ctx, records)
+
if err != nil {
return 0, err
}
+
return s.commitVerified(ctx, verified)
}
func (s *Service) verifyRecords(ctx context.Context, records []types.ExportRecord) ([]verifyResult, error) {
_ = ctx
workers := s.cfg.VerifyWorkers
+
if workers < 1 {
workers = 1
}
+
queues := make([]chan verifyTask, workers)
results := make(chan verifyResult, len(records))
+
var wg sync.WaitGroup
for i := 0; i < workers; i++ {
queues[i] = make(chan verifyTask, 64)
+
wg.Add(1)
+
go func(queue <-chan verifyTask) {
defer wg.Done()
+
cache := map[string]*types.StateV1{}
+
for task := range queue {
op, err := types.ParseOperation(task.rec)
+
if err != nil {
results <- verifyResult{index: task.index, err: err}
+
continue
}
+
existing, err := s.loadExistingState(cache, op.DID)
+
if err != nil {
results <- verifyResult{index: task.index, err: err}
+
continue
}
+
if err := s.verifier.VerifyOperation(op, existing); err != nil {
atomic.AddUint64(&s.stats.VerifyFailures, 1)
+
results <- verifyResult{index: task.index, err: err}
+
continue
}
+
next, err := state.ComputeNextState(op, existing)
+
if err != nil {
results <- verifyResult{index: task.index, err: err}
+
continue
}
+
cache[op.DID] = cloneState(&next)
+
results <- verifyResult{index: task.index, op: op, state: next, newDID: existing == nil}
}
}(queues[i])
@@ -228,13 +276,17 @@ func (s *Service) verifyRecords(ctx context.Context, records []types.ExportRecor
for idx, rec := range records {
worker := didWorker(rec.DID, workers)
+
queues[worker] <- verifyTask{index: idx, rec: rec}
}
+
for _, q := range queues {
close(q)
}
+
wg.Wait()
close(results)
+
return collectVerifiedInOrder(len(records), results)
}
@@ -242,15 +294,21 @@ func (s *Service) loadExistingState(cache map[string]*types.StateV1, did string)
if existing, ok := cache[did]; ok {
return cloneState(existing), nil
}
+
stateVal, ok, err := s.store.GetState(did)
+
if err != nil {
return nil, err
}
+
if !ok {
cache[did] = nil
+
return nil, nil
}
+
cache[did] = cloneState(&stateVal)
+
return cloneState(&stateVal), nil
}
@@ -259,37 +317,48 @@ func collectVerifiedInOrder(total int, results <-chan verifyResult) ([]verifyRes
seen := make([]bool, total)
firstErr := error(nil)
received := 0
+
for r := range results {
received++
+
if r.index < 0 || r.index >= total {
if firstErr == nil {
firstErr = fmt.Errorf("verify worker returned out-of-range index %d", r.index)
}
+
continue
}
+
if seen[r.index] {
if firstErr == nil {
firstErr = fmt.Errorf("duplicate verify result for index %d", r.index)
}
+
continue
}
+
seen[r.index] = true
ordered[r.index] = r
+
if r.err != nil && firstErr == nil {
firstErr = r.err
}
}
+
if received != total && firstErr == nil {
firstErr = fmt.Errorf("verify result count mismatch: got %d want %d", received, total)
}
+
if firstErr != nil {
return nil, firstErr
}
+
for i := range seen {
if !seen[i] {
return nil, fmt.Errorf("missing verify result index %d", i)
}
}
+
return ordered, nil
}
@@ -297,8 +366,10 @@ func didWorker(did string, workers int) int {
if workers <= 1 {
return 0
}
+
hasher := fnv.New32a()
_, _ = hasher.Write([]byte(did))
+
return int(hasher.Sum32() % uint32(workers))
}
@@ -306,50 +377,64 @@ func cloneState(in *types.StateV1) *types.StateV1 {
if in == nil {
return nil
}
+
out := *in
out.DIDDocument = append([]byte(nil), in.DIDDocument...)
out.RotationKeys = append([]string(nil), in.RotationKeys...)
+
return &out
}
func (s *Service) commitVerified(ctx context.Context, verified []verifyResult) (uint64, error) {
_ = ctx
batchSize := s.cfg.CommitBatchSize
+
if batchSize < 1 {
batchSize = 1
}
+
pendingOps := make([]storage.OperationMutation, 0, batchSize)
pendingSeqs := make([]uint64, 0, batchSize)
pendingBlockHashes := map[uint64]string{}
pendingNewDIDs := map[string]struct{}{}
+
var committed uint64
commit := func() error {
if len(pendingOps) == 0 {
return nil
}
+
if s.cfg.Mode == config.ModeMirror {
flush, err := s.appender.Flush()
+
if err != nil {
return err
}
+
if flush != nil {
pendingBlockHashes[flush.BlockID] = flush.Hash
}
}
+
blockEntries := make([]storage.BlockHashEntry, 0, len(pendingBlockHashes))
+
for id, hash := range pendingBlockHashes {
blockEntries = append(blockEntries, storage.BlockHashEntry{BlockID: id, Hash: hash})
}
+
sort.Slice(blockEntries, func(i, j int) bool { return blockEntries[i].BlockID < blockEntries[j].BlockID })
if err := s.store.ApplyOperationsBatch(pendingOps, blockEntries); err != nil {
return err
}
+
lastSeq := pendingSeqs[len(pendingSeqs)-1]
+
atomic.StoreUint64(&s.stats.LastSeq, lastSeq)
atomic.AddUint64(&s.stats.IngestedOps, uint64(len(pendingOps)))
atomic.AddUint64(&s.stats.DIDCount, uint64(len(pendingNewDIDs)))
+
committed += uint64(len(pendingOps))
if s.checkpoints != nil {
@@ -364,59 +449,78 @@ func (s *Service) commitVerified(ctx context.Context, verified []verifyResult) (
pendingOps = pendingOps[:0]
pendingSeqs = pendingSeqs[:0]
+
clear(pendingBlockHashes)
clear(pendingNewDIDs)
+
return nil
}
for _, v := range verified {
var ref *types.BlockRefV1
+
if s.cfg.Mode == config.ModeMirror {
if s.appender == nil {
return committed, fmt.Errorf("mirror mode requires block appender")
}
+
result, err := s.appender.Append(v.op)
+
if err != nil {
return committed, err
}
+
if result.Flush != nil {
pendingBlockHashes[result.Flush.BlockID] = result.Flush.Hash
}
+
result.Ref.Received = time.Now().UTC().Format(time.RFC3339)
ref = &result.Ref
}
+
pendingOps = append(pendingOps, storage.OperationMutation{State: v.state, Ref: ref})
pendingSeqs = append(pendingSeqs, v.op.Sequence)
+
if v.newDID {
pendingNewDIDs[v.op.DID] = struct{}{}
}
+
if len(pendingOps) >= batchSize {
if err := commit(); err != nil {
return committed, err
}
}
}
+
if err := commit(); err != nil {
return committed, err
}
+
return committed, nil
}
func (s *Service) createCheckpoint(sequence uint64) error {
blocks, err := s.store.ListBlockHashes()
+
if err != nil {
return fmt.Errorf("list blocks for checkpoint: %w", err)
}
+
done := make(chan struct{})
usageCh := make(chan checkpointProcessUsage, 1)
+
go sampleCheckpointProcessUsage(done, usageCh)
cp, metrics, err := s.checkpoints.BuildAndStoreFromStoreWithMetrics(sequence, blocks)
+
close(done)
+
usage := <-usageCh
+
if err != nil {
return fmt.Errorf("build checkpoint: %w", err)
}
+
log.Printf(
"checkpoint_metrics seq=%d did_count=%d merkle_compute_ms=%d total_checkpoint_ms=%d cpu_pct=%.2f rss_peak_mb=%.2f completed_unix_ms=%d",
cp.Sequence,
@@ -428,41 +532,54 @@ func (s *Service) createCheckpoint(sequence uint64) error {
time.Now().UnixMilli(),
)
atomic.StoreUint64(&s.stats.CheckpointSeq, cp.Sequence)
+
if s.metricsSink != nil {
s.metricsSink.ObserveCheckpoint(metrics.Total, cp.Sequence)
}
+
return nil
}
func (s *Service) Snapshot(ctx context.Context) (types.CheckpointV1, error) {
if s.appender != nil {
flush, err := s.appender.Flush()
+
if err != nil {
return types.CheckpointV1{}, err
}
+
if flush != nil {
if err := s.store.ApplyOperationsBatch(nil, []storage.BlockHashEntry{{BlockID: flush.BlockID, Hash: flush.Hash}}); err != nil {
return types.CheckpointV1{}, err
}
}
}
+
_ = ctx
seq, err := s.store.GetGlobalSeq()
+
if err != nil {
return types.CheckpointV1{}, err
}
+
blocks, err := s.store.ListBlockHashes()
+
if err != nil {
return types.CheckpointV1{}, err
}
+
cp, metrics, err := s.checkpoints.BuildAndStoreFromStoreWithMetrics(seq, blocks)
+
if err != nil {
return types.CheckpointV1{}, err
}
+
atomic.StoreUint64(&s.stats.CheckpointSeq, cp.Sequence)
+
if s.metricsSink != nil {
s.metricsSink.ObserveCheckpoint(metrics.Total, cp.Sequence)
}
+
return cp, nil
}
@@ -474,31 +591,44 @@ type checkpointProcessUsage struct {
func sampleCheckpointProcessUsage(done <-chan struct{}, out chan<- checkpointProcessUsage) {
pid := os.Getpid()
ticker := time.NewTicker(25 * time.Millisecond)
+
defer ticker.Stop()
+
var cpuSum float64
var samples int
var rssPeak int64
+
sample := func() {
cpu, rss, err := processMetrics(pid)
+
if err != nil {
return
}
+
cpuSum += cpu
+
samples++
+
if rss > rssPeak {
rssPeak = rss
}
}
+
sample()
+
for {
select {
case <-done:
sample()
+
avg := 0.0
+
if samples > 0 {
avg = cpuSum / float64(samples)
}
+
out <- checkpointProcessUsage{CPUPercentAvg: avg, RSSPeakKB: rssPeak}
+
return
case <-ticker.C:
sample()
@@ -508,15 +638,20 @@ func sampleCheckpointProcessUsage(done <-chan struct{}, out chan<- checkpointPro
func processMetrics(pid int) (cpuPct float64, rssKB int64, err error) {
out, err := exec.Command("ps", "-p", strconv.Itoa(pid), "-o", "pcpu=,rss=").Output()
+
if err != nil {
return 0, 0, err
}
+
fields := strings.Fields(string(out))
+
if len(fields) < 2 {
return 0, 0, fmt.Errorf("unexpected ps output: %q", strings.TrimSpace(string(out)))
}
+
cpuPct, _ = strconv.ParseFloat(fields[0], 64)
rssKB, _ = strconv.ParseInt(fields[1], 10, 64)
+
return cpuPct, rssKB, nil
}
@@ -524,51 +659,69 @@ func (s *Service) VerifyDID(ctx context.Context, did string) error {
if err := s.CorruptionError(); err != nil {
return err
}
+
if s.cfg.Mode != config.ModeMirror {
_, ok, err := s.store.GetState(did)
+
if err != nil {
return err
}
+
if !ok {
return fmt.Errorf("did not found")
}
+
return nil
}
+
seqs, err := s.store.ListDIDSequences(did)
+
if err != nil {
return err
}
+
if len(seqs) == 0 {
return fmt.Errorf("no operations for did %s", did)
}
var previous *types.StateV1
+
for _, seq := range seqs {
if err := ctx.Err(); err != nil {
return err
}
+
ref, ok, err := s.store.GetOpSeqRef(seq)
+
if err != nil {
return err
}
+
if !ok {
return fmt.Errorf("missing op reference for seq %d", seq)
}
+
payload, err := s.blockLog.ReadRecord(ref)
+
if err != nil {
return err
}
+
rec := types.ExportRecord{Seq: seq, DID: did, CID: ref.CID, Operation: payload}
op, err := types.ParseOperation(rec)
+
if err != nil {
return err
}
+
if err := s.verifier.VerifyOperation(op, previous); err != nil {
return fmt.Errorf("verify seq %d failed: %w", seq, err)
}
+
s := types.StateV1{DID: did, ChainTipHash: op.CID, LatestOpSeq: seq}
previous = &s
}
+
return nil
}
@@ -576,51 +729,69 @@ func (s *Service) RecomputeTipAtOrBefore(ctx context.Context, did string, sequen
if err := s.CorruptionError(); err != nil {
return "", nil, err
}
+
if s.cfg.Mode != config.ModeMirror {
return "", nil, errors.New("historical proof requires mirror mode")
}
+
seqs, err := s.store.ListDIDSequences(did)
+
if err != nil {
return "", nil, err
}
+
if len(seqs) == 0 {
return "", nil, fmt.Errorf("no operations for did %s", did)
}
+
filtered := make([]uint64, 0, len(seqs))
+
for _, seq := range seqs {
if seq <= sequence {
filtered = append(filtered, seq)
}
}
+
if len(filtered) == 0 {
return "", nil, fmt.Errorf("no operations for did %s at checkpoint %d", did, sequence)
}
+
sort.Slice(filtered, func(i, j int) bool { return filtered[i] < filtered[j] })
var previous *types.StateV1
var tip string
+
for _, seq := range filtered {
if err := ctx.Err(); err != nil {
return "", nil, err
}
+
ref, ok, err := s.store.GetOpSeqRef(seq)
+
if err != nil {
return "", nil, err
}
+
if !ok {
return "", nil, fmt.Errorf("missing op reference for seq %d", seq)
}
+
payload, err := s.blockLog.ReadRecord(ref)
+
if err != nil {
return "", nil, err
}
+
op, err := types.ParseOperation(types.ExportRecord{Seq: seq, DID: did, CID: ref.CID, Operation: payload})
+
if err != nil {
return "", nil, err
}
+
if err := s.verifier.VerifyOperation(op, previous); err != nil {
return "", nil, fmt.Errorf("verify seq %d failed: %w", seq, err)
}
+
tip = op.CID
next := types.StateV1{
DID: did,
@@ -628,12 +799,15 @@ func (s *Service) RecomputeTipAtOrBefore(ctx context.Context, did string, sequen
LatestOpSeq: seq,
}
prior := []string(nil)
+
if previous != nil {
prior = append(prior, previous.RotationKeys...)
}
+
next.RotationKeys = extractRotationKeysFromPayload(op.Payload, prior)
previous = &next
}
+
return tip, filtered, nil
}
@@ -641,32 +815,44 @@ func (s *Service) LoadDIDLog(ctx context.Context, did string) ([]types.ExportRec
if err := s.CorruptionError(); err != nil {
return nil, err
}
+
if s.cfg.Mode != config.ModeMirror || s.blockLog == nil {
return nil, ErrHistoryNotStored
}
+
seqs, err := s.store.ListDIDSequences(did)
+
if err != nil {
return nil, err
}
+
if len(seqs) == 0 {
return nil, ErrDIDNotFound
}
+
out := make([]types.ExportRecord, 0, len(seqs))
+
for _, seq := range seqs {
if err := ctx.Err(); err != nil {
return nil, err
}
+
ref, ok, err := s.store.GetOpSeqRef(seq)
+
if err != nil {
return nil, err
}
+
if !ok {
return nil, fmt.Errorf("missing op reference for seq %d", seq)
}
+
payload, err := s.blockLog.ReadRecord(ref)
+
if err != nil {
return nil, err
}
+
out = append(out, types.ExportRecord{
Seq: seq,
DID: did,
@@ -676,6 +862,7 @@ func (s *Service) LoadDIDLog(ctx context.Context, did string) ([]types.ExportRec
Operation: json.RawMessage(payload),
})
}
+
return out, nil
}
@@ -683,31 +870,43 @@ func (s *Service) LoadLatestDIDOperation(ctx context.Context, did string) (types
if err := s.CorruptionError(); err != nil {
return types.ExportRecord{}, err
}
+
if s.cfg.Mode != config.ModeMirror || s.blockLog == nil {
return types.ExportRecord{}, ErrHistoryNotStored
}
+
seqs, err := s.store.ListDIDSequences(did)
+
if err != nil {
return types.ExportRecord{}, err
}
+
if len(seqs) == 0 {
return types.ExportRecord{}, ErrDIDNotFound
}
+
lastSeq := seqs[len(seqs)-1]
+
if err := ctx.Err(); err != nil {
return types.ExportRecord{}, err
}
+
ref, ok, err := s.store.GetOpSeqRef(lastSeq)
+
if err != nil {
return types.ExportRecord{}, err
}
+
if !ok {
return types.ExportRecord{}, fmt.Errorf("missing op reference for seq %d", lastSeq)
}
+
payload, err := s.blockLog.ReadRecord(ref)
+
if err != nil {
return types.ExportRecord{}, err
}
+
return types.ExportRecord{
Seq: lastSeq,
DID: did,
@@ -722,32 +921,44 @@ func (s *Service) LoadCurrentPLCData(ctx context.Context, did string) (map[strin
if err := s.CorruptionError(); err != nil {
return nil, err
}
+
if s.cfg.Mode != config.ModeMirror || s.blockLog == nil {
state, ok, err := s.store.GetState(did)
+
if err != nil {
return nil, err
}
+
if !ok {
return nil, ErrDIDNotFound
}
+
var doc map[string]any
+
if err := json.Unmarshal(state.DIDDocument, &doc); err != nil {
return nil, err
}
+
return doc, nil
}
+
last, err := s.LoadLatestDIDOperation(ctx, did)
+
if err != nil {
return nil, err
}
+
var op map[string]any
+
if err := json.Unmarshal(last.Operation, &op); err != nil {
return nil, fmt.Errorf("decode latest operation: %w", err)
}
+
delete(op, "sig")
delete(op, "signature")
delete(op, "sigPayload")
delete(op, "signaturePayload")
+
return op, nil
}
@@ -755,16 +966,21 @@ func (s *Service) StreamExport(ctx context.Context, after time.Time, limit int,
if err := s.CorruptionError(); err != nil {
return err
}
+
if s.cfg.Mode != config.ModeMirror || s.blockLog == nil {
return nil
}
+
if limit <= 0 {
limit = 1000
}
+
const maxCount = 1000
+
if limit > maxCount {
limit = maxCount
}
+
afterSet := !after.IsZero()
emitted := 0
stop := errors.New("stop export iteration")
@@ -772,25 +988,33 @@ func (s *Service) StreamExport(ctx context.Context, after time.Time, limit int,
if err := ctx.Err(); err != nil {
return err
}
+
if emitted >= limit {
return stop
}
+
if afterSet {
if strings.TrimSpace(ref.Received) == "" {
return nil
}
+
ts, err := time.Parse(time.RFC3339, ref.Received)
+
if err != nil {
return nil
}
+
if !ts.After(after) {
return nil
}
}
+
payload, err := s.blockLog.ReadRecord(ref)
+
if err != nil {
return err
}
+
rec := types.ExportRecord{
Seq: seq,
DID: ref.DID,
@@ -799,41 +1023,54 @@ func (s *Service) StreamExport(ctx context.Context, after time.Time, limit int,
Nullified: detectNullified(payload),
Operation: json.RawMessage(payload),
}
+
emitted++
+
return emit(rec)
})
+
if err == nil || errors.Is(err, stop) {
return nil
}
+
return err
}
func detectNullified(operation []byte) bool {
var payload map[string]any
+
if err := json.Unmarshal(operation, &payload); err != nil {
return false
}
+
v, _ := payload["nullified"].(bool)
+
return v
}
func (s *Service) Flush(ctx context.Context) error {
_ = ctx
+
if err := s.CorruptionError(); err != nil {
return err
}
+
if s.appender == nil {
return nil
}
+
flush, err := s.appender.Flush()
+
if err != nil {
return err
}
+
if flush != nil {
if err := s.store.ApplyOperationsBatch(nil, []storage.BlockHashEntry{{BlockID: flush.BlockID, Hash: flush.Hash}}); err != nil {
return err
}
}
+
return nil
}
@@ -841,6 +1078,7 @@ func (s *Service) MarkCorrupted(err error) {
if err == nil {
return
}
+
s.corrupted.Store(true)
s.corruptErr.Store(err.Error())
}
@@ -853,10 +1091,13 @@ func (s *Service) CorruptionError() error {
if !s.IsCorrupted() {
return nil
}
+
msg, _ := s.corruptErr.Load().(string)
+
if msg == "" {
msg = "data corruption detected"
}
+
return fmt.Errorf("data corruption detected: %s", msg)
}
@@ -865,15 +1106,19 @@ func extractRotationKeysFromPayload(payload map[string]any, prior []string) []st
seen := map[string]struct{}{}
add := func(v string) {
v = strings.TrimSpace(v)
+
if v == "" {
return
}
+
if _, ok := seen[v]; ok {
return
}
+
seen[v] = struct{}{}
out = append(out, v)
}
+
if arr, ok := payload["rotationKeys"].([]any); ok {
for _, v := range arr {
if s, ok := v.(string); ok {
@@ -881,17 +1126,21 @@ func extractRotationKeysFromPayload(payload map[string]any, prior []string) []st
}
}
}
+
if recovery, ok := payload["recoveryKey"].(string); ok {
add(recovery)
}
+
if signing, ok := payload["signingKey"].(string); ok {
add(signing)
}
+
if len(out) == 0 {
for _, v := range prior {
add(v)
}
}
+
return out
}
@@ -908,9 +1157,11 @@ func (s *Service) SetMetricsSink(sink MetricsSink) {
func (s *Service) Stats() Stats {
ingested := atomic.LoadUint64(&s.stats.IngestedOps)
opsPerSec := 0.0
+
if elapsed := time.Since(s.startedAt).Seconds(); elapsed > 0 {
opsPerSec = float64(ingested) / elapsed
}
+
return Stats{
IngestedOps: ingested,
Errors: atomic.LoadUint64(&s.stats.Errors),
@@ -925,12 +1176,15 @@ func (s *Service) Stats() Stats {
func countStates(store storage.Store) (uint64, error) {
var count uint64
+
if err := store.ForEachState(func(types.StateV1) error {
count++
+
return nil
}); err != nil {
return 0, err
}
+
return count, nil
}
@@ -959,34 +1213,47 @@ func newBlockAppender(log *storage.BlockLog, buffer int) *blockAppender {
queue: make(chan appendRequest, buffer),
closed: make(chan struct{}),
}
+
go ba.run()
+
return ba
}
func (b *blockAppender) run() {
defer close(b.closed)
+
for req := range b.queue {
if req.Flush {
flush, err := b.log.Flush()
+
req.Reply <- appendResult{Flush: flush, Err: err}
+
continue
}
+
ref, flush, err := b.log.Append(req.Operation.Sequence, req.Operation.DID, req.Operation.CID, req.Operation.Prev, req.Operation.CanonicalBytes)
+
req.Reply <- appendResult{Ref: ref, Flush: flush, Err: err}
}
}
func (b *blockAppender) Append(op types.ParsedOperation) (appendResult, error) {
reply := make(chan appendResult, 1)
+
b.queue <- appendRequest{Operation: op, Reply: reply}
+
res := <-reply
+
return res, res.Err
}
func (b *blockAppender) Flush() (*storage.FlushInfo, error) {
reply := make(chan appendResult, 1)
+
b.queue <- appendRequest{Flush: true, Reply: reply}
+
res := <-reply
+
return res.Flush, res.Err
}