diff options
| author | Fuwn <[email protected]> | 2026-02-26 15:41:45 -0800 |
|---|---|---|
| committer | Fuwn <[email protected]> | 2026-02-26 15:41:45 -0800 |
| commit | fec9114caaa7d274e524793d5eb0cf2ef2c5af11 (patch) | |
| tree | 0897394ccdfaf6633e1a4ca8eb02bff49bb93c00 /internal/checkpoint | |
| parent | feat: add read-only PLC API compatibility endpoints (diff) | |
| download | plutia-test-fec9114caaa7d274e524793d5eb0cf2ef2c5af11.tar.xz plutia-test-fec9114caaa7d274e524793d5eb0cf2ef2c5af11.zip | |
feat: Apply Iku formatting
Diffstat (limited to 'internal/checkpoint')
| -rw-r--r-- | internal/checkpoint/checkpoint.go | 102 | ||||
| -rw-r--r-- | internal/checkpoint/checkpoint_test.go | 39 |
2 files changed, 137 insertions, 4 deletions
diff --git a/internal/checkpoint/checkpoint.go b/internal/checkpoint/checkpoint.go index c3ed0da..85c4992 100644 --- a/internal/checkpoint/checkpoint.go +++ b/internal/checkpoint/checkpoint.go @@ -15,7 +15,6 @@ import ( "sort" "strings" "time" - "github.com/Fuwn/plutia/internal/merkle" "github.com/Fuwn/plutia/internal/storage" "github.com/Fuwn/plutia/internal/types" @@ -40,27 +39,34 @@ func NewManager(store storage.Store, dataDir, keyPath string) *Manager { func (m *Manager) BuildAndStore(sequence uint64, states []types.StateV1, blockHashes []string) (types.CheckpointV1, error) { didRoot, leaves := didMerkle(states) + if err := m.writeCheckpointStateSnapshot(sequence, leaves); err != nil { return types.CheckpointV1{}, err } + return m.signAndPersist(sequence, didRoot, blockHashes) } func (m *Manager) BuildAndStoreFromStore(sequence uint64, blockHashes []string) (types.CheckpointV1, error) { cp, _, err := m.BuildAndStoreFromStoreWithMetrics(sequence, blockHashes) + return cp, err } func (m *Manager) BuildAndStoreFromStoreWithMetrics(sequence uint64, blockHashes []string) (types.CheckpointV1, BuildMetrics, error) { start := time.Now() didRoot, didCount, merkleCompute, err := m.writeCheckpointStateSnapshotFromStore(sequence) + if err != nil { return types.CheckpointV1{}, BuildMetrics{}, err } + cp, err := m.signAndPersist(sequence, didRoot, blockHashes) + if err != nil { return types.CheckpointV1{}, BuildMetrics{}, err } + return cp, BuildMetrics{ DIDCount: didCount, MerkleCompute: merkleCompute, @@ -70,12 +76,14 @@ func (m *Manager) BuildAndStoreFromStoreWithMetrics(sequence uint64, blockHashes func (m *Manager) signAndPersist(sequence uint64, didRoot string, blockHashes []string) (types.CheckpointV1, error) { privateKey, keyID, err := m.loadSigningKey() + if err != nil { return types.CheckpointV1{}, err } - blockRoot := blockMerkle(blockHashes) + blockRoot := blockMerkle(blockHashes) prev := "" + if latest, ok, err := m.store.GetLatestCheckpoint(); err == nil && ok { prev = latest.CheckpointHash } else if err != nil { @@ -92,9 +100,11 @@ func (m *Manager) signAndPersist(sequence uint64, didRoot string, blockHashes [] KeyID: keyID, } payload, err := marshalCheckpointPayload(unsigned) + if err != nil { return types.CheckpointV1{}, err } + sum := sha256.Sum256(payload) unsigned.CheckpointHash = hex.EncodeToString(sum[:]) unsigned.Signature = base64.RawURLEncoding.EncodeToString(ed25519.Sign(privateKey, payload)) @@ -102,84 +112,112 @@ func (m *Manager) signAndPersist(sequence uint64, didRoot string, blockHashes [] if err := m.store.PutCheckpoint(unsigned); err != nil { return types.CheckpointV1{}, fmt.Errorf("persist checkpoint: %w", err) } + if err := m.writeCheckpointFile(unsigned); err != nil { return types.CheckpointV1{}, err } + return unsigned, nil } func (m *Manager) BuildDIDProofAtCheckpoint(ctx context.Context, did, chainTipHash string, checkpointSeq uint64) ([]merkle.Sibling, string, bool, error) { snapshot, err := m.LoadStateSnapshot(checkpointSeq) + if err != nil { return nil, "", false, err } + leaves := make([][]byte, len(snapshot.Leaves)) index := -1 leafHashHex := "" + for i, s := range snapshot.Leaves { if err := ctx.Err(); err != nil { return nil, "", false, err } + h := merkle.HashLeaf([]byte(s.DID + s.ChainTipHash)) leaves[i] = h + if s.DID == did && s.ChainTipHash == chainTipHash { index = i leafHashHex = hex.EncodeToString(h) } } + if index < 0 { return nil, "", false, nil } + proof := merkle.BuildProof(leaves, index) + return proof, leafHashHex, true, nil } func (m *Manager) LoadStateSnapshot(sequence uint64) (types.CheckpointStateSnapshotV1, error) { path := filepath.Join(m.dataDir, "checkpoints", fmt.Sprintf("%020d.state.json", sequence)) b, err := os.ReadFile(path) + if err != nil { return types.CheckpointStateSnapshotV1{}, fmt.Errorf("read checkpoint state snapshot: %w", err) } + var snap types.CheckpointStateSnapshotV1 + if err := json.Unmarshal(b, &snap); err != nil { return types.CheckpointStateSnapshotV1{}, fmt.Errorf("unmarshal checkpoint state snapshot: %w", err) } + if snap.Sequence != sequence { return types.CheckpointStateSnapshotV1{}, fmt.Errorf("snapshot sequence mismatch: got %d want %d", snap.Sequence, sequence) } + return snap, nil } func didMerkle(states []types.StateV1) (string, []types.DIDLeaf) { sorted := append([]types.StateV1(nil), states...) + sort.Slice(sorted, func(i, j int) bool { return sorted[i].DID < sorted[j].DID }) + acc := merkle.NewAccumulator() snapshotLeaves := make([]types.DIDLeaf, 0, len(sorted)) + for _, s := range sorted { snapshotLeaves = append(snapshotLeaves, types.DIDLeaf{ DID: s.DID, ChainTipHash: s.ChainTipHash, }) + acc.AddLeafHash(merkle.HashLeaf([]byte(s.DID + s.ChainTipHash))) } + root := acc.RootDuplicateLast() + return hex.EncodeToString(root), snapshotLeaves } func blockMerkle(hashes []string) string { if len(hashes) == 0 { root := merkle.Root(nil) + return hex.EncodeToString(root) } + leaves := make([][]byte, 0, len(hashes)) + for _, h := range hashes { decoded, err := hex.DecodeString(strings.TrimSpace(h)) + if err != nil { decoded = []byte(h) } + leaves = append(leaves, merkle.HashLeaf(decoded)) } + root := merkle.Root(leaves) + return hex.EncodeToString(root) } @@ -188,33 +226,42 @@ func marshalCheckpointPayload(cp types.CheckpointV1) ([]byte, error) { clone.Signature = "" clone.CheckpointHash = "" b, err := json.Marshal(clone) + if err != nil { return nil, fmt.Errorf("marshal checkpoint payload: %w", err) } + return types.CanonicalizeJSON(b) } func (m *Manager) writeCheckpointFile(cp types.CheckpointV1) error { dir := filepath.Join(m.dataDir, "checkpoints") + if err := os.MkdirAll(dir, 0o755); err != nil { return fmt.Errorf("mkdir checkpoints: %w", err) } + path := filepath.Join(dir, fmt.Sprintf("%020d.json", cp.Sequence)) b, err := json.MarshalIndent(cp, "", " ") + if err != nil { return fmt.Errorf("marshal checkpoint: %w", err) } + if err := os.WriteFile(path, b, 0o644); err != nil { return fmt.Errorf("write checkpoint file: %w", err) } + return nil } func (m *Manager) writeCheckpointStateSnapshot(sequence uint64, leaves []types.DIDLeaf) error { dir := filepath.Join(m.dataDir, "checkpoints") + if err := os.MkdirAll(dir, 0o755); err != nil { return fmt.Errorf("mkdir checkpoints: %w", err) } + snap := types.CheckpointStateSnapshotV1{ Version: 1, Sequence: sequence, @@ -222,32 +269,41 @@ func (m *Manager) writeCheckpointStateSnapshot(sequence uint64, leaves []types.D Leaves: leaves, } b, err := json.Marshal(snap) + if err != nil { return fmt.Errorf("marshal checkpoint state snapshot: %w", err) } + path := filepath.Join(dir, fmt.Sprintf("%020d.state.json", sequence)) + if err := os.WriteFile(path, b, 0o644); err != nil { return fmt.Errorf("write checkpoint state snapshot: %w", err) } + return nil } func (m *Manager) writeCheckpointStateSnapshotFromStore(sequence uint64) (string, int, time.Duration, error) { dir := filepath.Join(m.dataDir, "checkpoints") + if err := os.MkdirAll(dir, 0o755); err != nil { return "", 0, 0, fmt.Errorf("mkdir checkpoints: %w", err) } + finalPath := filepath.Join(dir, fmt.Sprintf("%020d.state.json", sequence)) tmpPath := finalPath + ".tmp" - f, err := os.Create(tmpPath) + if err != nil { return "", 0, 0, fmt.Errorf("create checkpoint state snapshot temp file: %w", err) } + createdAt := time.Now().UTC().Format(time.RFC3339) w := bufio.NewWriterSize(f, 1<<20) + if _, err := fmt.Fprintf(w, `{"v":1,"sequence":%d,"created_at":"%s","leaves":[`, sequence, createdAt); err != nil { _ = f.Close() + return "", 0, 0, fmt.Errorf("write snapshot header: %w", err) } @@ -255,61 +311,82 @@ func (m *Manager) writeCheckpointStateSnapshotFromStore(sequence uint64) (string didCount := 0 merkleCompute := time.Duration(0) acc := merkle.NewAccumulator() + if err := m.store.ForEachState(func(s types.StateV1) error { leaf := types.DIDLeaf{ DID: s.DID, ChainTipHash: s.ChainTipHash, } b, err := json.Marshal(leaf) + if err != nil { return fmt.Errorf("marshal checkpoint leaf: %w", err) } + if !first { if _, err := w.WriteString(","); err != nil { return err } } + if _, err := w.Write(b); err != nil { return err } + first = false hashStart := time.Now() + acc.AddLeafHash(merkle.HashLeaf([]byte(leaf.DID + leaf.ChainTipHash))) + merkleCompute += time.Since(hashStart) + didCount++ + return nil }); err != nil { _ = f.Close() + return "", 0, 0, err } if _, err := w.WriteString(`]}`); err != nil { _ = f.Close() + return "", 0, 0, fmt.Errorf("write snapshot trailer: %w", err) } + if err := w.Flush(); err != nil { _ = f.Close() + return "", 0, 0, fmt.Errorf("flush snapshot writer: %w", err) } + if err := f.Sync(); err != nil { _ = f.Close() + return "", 0, 0, fmt.Errorf("sync snapshot file: %w", err) } + if err := f.Close(); err != nil { return "", 0, 0, fmt.Errorf("close snapshot file: %w", err) } + if err := os.Rename(tmpPath, finalPath); err != nil { return "", 0, 0, fmt.Errorf("rename snapshot file: %w", err) } + return hex.EncodeToString(acc.RootDuplicateLast()), didCount, merkleCompute, nil } func (m *Manager) loadSigningKey() (ed25519.PrivateKey, string, error) { data, err := os.ReadFile(m.keyPath) + if err != nil { return nil, "", fmt.Errorf("read mirror private key: %w", err) } + text := strings.TrimSpace(string(data)) + if text == "" { return nil, "", errors.New("empty mirror private key") } @@ -321,11 +398,14 @@ func (m *Manager) loadSigningKey() (ed25519.PrivateKey, string, error) { var k struct { PrivateKey string `json:"private_key"` } + if err := json.Unmarshal(data, &k); err == nil && strings.TrimSpace(k.PrivateKey) != "" { raw, err := decodeKeyString(k.PrivateKey) + if err != nil { return nil, "", err } + return keyFromRaw(raw) } @@ -335,32 +415,42 @@ func (m *Manager) loadSigningKey() (ed25519.PrivateKey, string, error) { func decodeKeyString(v string) ([]byte, error) { if strings.HasPrefix(v, "did:key:") { mb := strings.TrimPrefix(v, "did:key:") + if mb == "" || mb[0] != 'z' { return nil, errors.New("unsupported did:key format") } + decoded, err := base58.Decode(mb[1:]) + if err != nil { return nil, err } + if len(decoded) < 34 { return nil, errors.New("invalid did:key length") } + return decoded[len(decoded)-32:], nil } + if isHexString(v) { if b, err := hex.DecodeString(v); err == nil { return b, nil } } + if b, err := base64.RawURLEncoding.DecodeString(v); err == nil { return b, nil } + if b, err := base64.StdEncoding.DecodeString(v); err == nil { return b, nil } + if b, err := hex.DecodeString(v); err == nil { return b, nil } + return nil, errors.New("unknown key encoding") } @@ -368,12 +458,15 @@ func isHexString(v string) bool { if len(v) == 0 || len(v)%2 != 0 { return false } + for _, r := range v { if (r >= '0' && r <= '9') || (r >= 'a' && r <= 'f') || (r >= 'A' && r <= 'F') { continue } + return false } + return true } @@ -382,10 +475,12 @@ func keyFromRaw(raw []byte) (ed25519.PrivateKey, string, error) { case ed25519.SeedSize: pk := ed25519.NewKeyFromSeed(raw) kid := keyID(pk.Public().(ed25519.PublicKey)) + return pk, kid, nil case ed25519.PrivateKeySize: pk := ed25519.PrivateKey(raw) kid := keyID(pk.Public().(ed25519.PublicKey)) + return pk, kid, nil default: return nil, "", fmt.Errorf("invalid private key length %d", len(raw)) @@ -394,5 +489,6 @@ func keyFromRaw(raw []byte) (ed25519.PrivateKey, string, error) { func keyID(pub ed25519.PublicKey) string { sum := sha256.Sum256(pub) + return "ed25519:" + hex.EncodeToString(sum[:8]) } diff --git a/internal/checkpoint/checkpoint_test.go b/internal/checkpoint/checkpoint_test.go index 8149129..156aa9e 100644 --- a/internal/checkpoint/checkpoint_test.go +++ b/internal/checkpoint/checkpoint_test.go @@ -9,7 +9,6 @@ import ( "os" "path/filepath" "testing" - "github.com/Fuwn/plutia/internal/storage" "github.com/Fuwn/plutia/internal/types" ) @@ -17,16 +16,21 @@ import ( func TestBuildAndStoreCheckpoint(t *testing.T) { tmp := t.TempDir() store, err := storage.OpenPebble(tmp) + if err != nil { t.Fatalf("open pebble: %v", err) } + defer store.Close() pub, priv, err := ed25519.GenerateKey(rand.Reader) + if err != nil { t.Fatalf("generate key: %v", err) } + keyPath := filepath.Join(tmp, "mirror.key") + if err := os.WriteFile(keyPath, []byte(base64.RawURLEncoding.EncodeToString(priv)), 0o600); err != nil { t.Fatalf("write key: %v", err) } @@ -37,28 +41,37 @@ func TestBuildAndStoreCheckpoint(t *testing.T) { {Version: 1, DID: "did:plc:b", ChainTipHash: "tip-b"}, } cp, err := mgr.BuildAndStore(42, states, []string{"abc", "def"}) + if err != nil { t.Fatalf("build checkpoint: %v", err) } + if cp.Signature == "" || cp.CheckpointHash == "" { t.Fatalf("expected signed checkpoint") } + payload, err := marshalCheckpointPayload(cp) + if err != nil { t.Fatalf("marshal payload: %v", err) } + rawSig, err := base64.RawURLEncoding.DecodeString(cp.Signature) + if err != nil { t.Fatalf("decode signature: %v", err) } + if !ed25519.Verify(pub, payload, rawSig) { t.Fatalf("signature verification failed") } stored, ok, err := store.GetCheckpoint(cp.Sequence) + if err != nil || !ok { t.Fatalf("stored checkpoint missing: ok=%v err=%v", ok, err) } + if stored.CheckpointHash != cp.CheckpointHash { t.Fatalf("checkpoint hash mismatch") } @@ -67,33 +80,42 @@ func TestBuildAndStoreCheckpoint(t *testing.T) { func TestCheckpointRootStability(t *testing.T) { tmp := t.TempDir() store, err := storage.OpenPebble(tmp) + if err != nil { t.Fatalf("open pebble: %v", err) } + defer store.Close() if err := store.PutState(types.StateV1{Version: 1, DID: "did:plc:b", ChainTipHash: "tip-b"}); err != nil { t.Fatalf("put state b: %v", err) } + if err := store.PutState(types.StateV1{Version: 1, DID: "did:plc:a", ChainTipHash: "tip-a"}); err != nil { t.Fatalf("put state a: %v", err) } _, priv, err := ed25519.GenerateKey(rand.Reader) + if err != nil { t.Fatalf("generate key: %v", err) } + keyPath := filepath.Join(tmp, "mirror.key") + if err := os.WriteFile(keyPath, []byte(base64.RawURLEncoding.EncodeToString(priv)), 0o600); err != nil { t.Fatalf("write key: %v", err) } mgr := NewManager(store, tmp, keyPath) cp1, err := mgr.BuildAndStoreFromStore(100, []string{"abc"}) + if err != nil { t.Fatalf("build checkpoint 1: %v", err) } + cp2, err := mgr.BuildAndStoreFromStore(200, []string{"abc"}) + if err != nil { t.Fatalf("build checkpoint 2: %v", err) } @@ -101,6 +123,7 @@ func TestCheckpointRootStability(t *testing.T) { if cp1.DIDMerkleRoot != cp2.DIDMerkleRoot { t.Fatalf("did root changed for identical state: %s vs %s", cp1.DIDMerkleRoot, cp2.DIDMerkleRoot) } + if cp1.BlockMerkleRoot != cp2.BlockMerkleRoot { t.Fatalf("block root changed for identical block set: %s vs %s", cp1.BlockMerkleRoot, cp2.BlockMerkleRoot) } @@ -109,30 +132,40 @@ func TestCheckpointRootStability(t *testing.T) { func TestBuildDIDProofAtCheckpointHonorsContextCancellation(t *testing.T) { tmp := t.TempDir() store, err := storage.OpenPebble(tmp) + if err != nil { t.Fatalf("open pebble: %v", err) } + defer store.Close() _, priv, err := ed25519.GenerateKey(rand.Reader) + if err != nil { t.Fatalf("generate key: %v", err) } + keyPath := filepath.Join(tmp, "mirror.key") + if err := os.WriteFile(keyPath, []byte(base64.RawURLEncoding.EncodeToString(priv)), 0o600); err != nil { t.Fatalf("write key: %v", err) } + checkpointsDir := filepath.Join(tmp, "checkpoints") + if err := os.MkdirAll(checkpointsDir, 0o755); err != nil { t.Fatalf("mkdir checkpoints: %v", err) } + leaves := make([]types.DIDLeaf, 500) + for i := range leaves { leaves[i] = types.DIDLeaf{ DID: "did:plc:test-" + string(rune('a'+(i%26))), ChainTipHash: "tip", } } + snapshot := types.CheckpointStateSnapshotV1{ Version: 1, Sequence: 10, @@ -140,16 +173,20 @@ func TestBuildDIDProofAtCheckpointHonorsContextCancellation(t *testing.T) { Leaves: leaves, } b, err := json.Marshal(snapshot) + if err != nil { t.Fatalf("marshal snapshot: %v", err) } + if err := os.WriteFile(filepath.Join(checkpointsDir, "00000000000000000010.state.json"), b, 0o644); err != nil { t.Fatalf("write snapshot: %v", err) } mgr := NewManager(store, tmp, keyPath) ctx, cancel := context.WithCancel(context.Background()) + cancel() + if _, _, _, err := mgr.BuildDIDProofAtCheckpoint(ctx, "did:plc:test-a", "tip", 10); err == nil { t.Fatalf("expected context cancellation error") } |