aboutsummaryrefslogtreecommitdiff
path: root/internal/checkpoint
diff options
context:
space:
mode:
authorFuwn <[email protected]>2026-02-26 15:41:45 -0800
committerFuwn <[email protected]>2026-02-26 15:41:45 -0800
commitfec9114caaa7d274e524793d5eb0cf2ef2c5af11 (patch)
tree0897394ccdfaf6633e1a4ca8eb02bff49bb93c00 /internal/checkpoint
parentfeat: add read-only PLC API compatibility endpoints (diff)
downloadplutia-test-fec9114caaa7d274e524793d5eb0cf2ef2c5af11.tar.xz
plutia-test-fec9114caaa7d274e524793d5eb0cf2ef2c5af11.zip
feat: Apply Iku formatting
Diffstat (limited to 'internal/checkpoint')
-rw-r--r--internal/checkpoint/checkpoint.go102
-rw-r--r--internal/checkpoint/checkpoint_test.go39
2 files changed, 137 insertions, 4 deletions
diff --git a/internal/checkpoint/checkpoint.go b/internal/checkpoint/checkpoint.go
index c3ed0da..85c4992 100644
--- a/internal/checkpoint/checkpoint.go
+++ b/internal/checkpoint/checkpoint.go
@@ -15,7 +15,6 @@ import (
"sort"
"strings"
"time"
-
"github.com/Fuwn/plutia/internal/merkle"
"github.com/Fuwn/plutia/internal/storage"
"github.com/Fuwn/plutia/internal/types"
@@ -40,27 +39,34 @@ func NewManager(store storage.Store, dataDir, keyPath string) *Manager {
func (m *Manager) BuildAndStore(sequence uint64, states []types.StateV1, blockHashes []string) (types.CheckpointV1, error) {
didRoot, leaves := didMerkle(states)
+
if err := m.writeCheckpointStateSnapshot(sequence, leaves); err != nil {
return types.CheckpointV1{}, err
}
+
return m.signAndPersist(sequence, didRoot, blockHashes)
}
func (m *Manager) BuildAndStoreFromStore(sequence uint64, blockHashes []string) (types.CheckpointV1, error) {
cp, _, err := m.BuildAndStoreFromStoreWithMetrics(sequence, blockHashes)
+
return cp, err
}
func (m *Manager) BuildAndStoreFromStoreWithMetrics(sequence uint64, blockHashes []string) (types.CheckpointV1, BuildMetrics, error) {
start := time.Now()
didRoot, didCount, merkleCompute, err := m.writeCheckpointStateSnapshotFromStore(sequence)
+
if err != nil {
return types.CheckpointV1{}, BuildMetrics{}, err
}
+
cp, err := m.signAndPersist(sequence, didRoot, blockHashes)
+
if err != nil {
return types.CheckpointV1{}, BuildMetrics{}, err
}
+
return cp, BuildMetrics{
DIDCount: didCount,
MerkleCompute: merkleCompute,
@@ -70,12 +76,14 @@ func (m *Manager) BuildAndStoreFromStoreWithMetrics(sequence uint64, blockHashes
func (m *Manager) signAndPersist(sequence uint64, didRoot string, blockHashes []string) (types.CheckpointV1, error) {
privateKey, keyID, err := m.loadSigningKey()
+
if err != nil {
return types.CheckpointV1{}, err
}
- blockRoot := blockMerkle(blockHashes)
+ blockRoot := blockMerkle(blockHashes)
prev := ""
+
if latest, ok, err := m.store.GetLatestCheckpoint(); err == nil && ok {
prev = latest.CheckpointHash
} else if err != nil {
@@ -92,9 +100,11 @@ func (m *Manager) signAndPersist(sequence uint64, didRoot string, blockHashes []
KeyID: keyID,
}
payload, err := marshalCheckpointPayload(unsigned)
+
if err != nil {
return types.CheckpointV1{}, err
}
+
sum := sha256.Sum256(payload)
unsigned.CheckpointHash = hex.EncodeToString(sum[:])
unsigned.Signature = base64.RawURLEncoding.EncodeToString(ed25519.Sign(privateKey, payload))
@@ -102,84 +112,112 @@ func (m *Manager) signAndPersist(sequence uint64, didRoot string, blockHashes []
if err := m.store.PutCheckpoint(unsigned); err != nil {
return types.CheckpointV1{}, fmt.Errorf("persist checkpoint: %w", err)
}
+
if err := m.writeCheckpointFile(unsigned); err != nil {
return types.CheckpointV1{}, err
}
+
return unsigned, nil
}
func (m *Manager) BuildDIDProofAtCheckpoint(ctx context.Context, did, chainTipHash string, checkpointSeq uint64) ([]merkle.Sibling, string, bool, error) {
snapshot, err := m.LoadStateSnapshot(checkpointSeq)
+
if err != nil {
return nil, "", false, err
}
+
leaves := make([][]byte, len(snapshot.Leaves))
index := -1
leafHashHex := ""
+
for i, s := range snapshot.Leaves {
if err := ctx.Err(); err != nil {
return nil, "", false, err
}
+
h := merkle.HashLeaf([]byte(s.DID + s.ChainTipHash))
leaves[i] = h
+
if s.DID == did && s.ChainTipHash == chainTipHash {
index = i
leafHashHex = hex.EncodeToString(h)
}
}
+
if index < 0 {
return nil, "", false, nil
}
+
proof := merkle.BuildProof(leaves, index)
+
return proof, leafHashHex, true, nil
}
func (m *Manager) LoadStateSnapshot(sequence uint64) (types.CheckpointStateSnapshotV1, error) {
path := filepath.Join(m.dataDir, "checkpoints", fmt.Sprintf("%020d.state.json", sequence))
b, err := os.ReadFile(path)
+
if err != nil {
return types.CheckpointStateSnapshotV1{}, fmt.Errorf("read checkpoint state snapshot: %w", err)
}
+
var snap types.CheckpointStateSnapshotV1
+
if err := json.Unmarshal(b, &snap); err != nil {
return types.CheckpointStateSnapshotV1{}, fmt.Errorf("unmarshal checkpoint state snapshot: %w", err)
}
+
if snap.Sequence != sequence {
return types.CheckpointStateSnapshotV1{}, fmt.Errorf("snapshot sequence mismatch: got %d want %d", snap.Sequence, sequence)
}
+
return snap, nil
}
func didMerkle(states []types.StateV1) (string, []types.DIDLeaf) {
sorted := append([]types.StateV1(nil), states...)
+
sort.Slice(sorted, func(i, j int) bool { return sorted[i].DID < sorted[j].DID })
+
acc := merkle.NewAccumulator()
snapshotLeaves := make([]types.DIDLeaf, 0, len(sorted))
+
for _, s := range sorted {
snapshotLeaves = append(snapshotLeaves, types.DIDLeaf{
DID: s.DID,
ChainTipHash: s.ChainTipHash,
})
+
acc.AddLeafHash(merkle.HashLeaf([]byte(s.DID + s.ChainTipHash)))
}
+
root := acc.RootDuplicateLast()
+
return hex.EncodeToString(root), snapshotLeaves
}
func blockMerkle(hashes []string) string {
if len(hashes) == 0 {
root := merkle.Root(nil)
+
return hex.EncodeToString(root)
}
+
leaves := make([][]byte, 0, len(hashes))
+
for _, h := range hashes {
decoded, err := hex.DecodeString(strings.TrimSpace(h))
+
if err != nil {
decoded = []byte(h)
}
+
leaves = append(leaves, merkle.HashLeaf(decoded))
}
+
root := merkle.Root(leaves)
+
return hex.EncodeToString(root)
}
@@ -188,33 +226,42 @@ func marshalCheckpointPayload(cp types.CheckpointV1) ([]byte, error) {
clone.Signature = ""
clone.CheckpointHash = ""
b, err := json.Marshal(clone)
+
if err != nil {
return nil, fmt.Errorf("marshal checkpoint payload: %w", err)
}
+
return types.CanonicalizeJSON(b)
}
func (m *Manager) writeCheckpointFile(cp types.CheckpointV1) error {
dir := filepath.Join(m.dataDir, "checkpoints")
+
if err := os.MkdirAll(dir, 0o755); err != nil {
return fmt.Errorf("mkdir checkpoints: %w", err)
}
+
path := filepath.Join(dir, fmt.Sprintf("%020d.json", cp.Sequence))
b, err := json.MarshalIndent(cp, "", " ")
+
if err != nil {
return fmt.Errorf("marshal checkpoint: %w", err)
}
+
if err := os.WriteFile(path, b, 0o644); err != nil {
return fmt.Errorf("write checkpoint file: %w", err)
}
+
return nil
}
func (m *Manager) writeCheckpointStateSnapshot(sequence uint64, leaves []types.DIDLeaf) error {
dir := filepath.Join(m.dataDir, "checkpoints")
+
if err := os.MkdirAll(dir, 0o755); err != nil {
return fmt.Errorf("mkdir checkpoints: %w", err)
}
+
snap := types.CheckpointStateSnapshotV1{
Version: 1,
Sequence: sequence,
@@ -222,32 +269,41 @@ func (m *Manager) writeCheckpointStateSnapshot(sequence uint64, leaves []types.D
Leaves: leaves,
}
b, err := json.Marshal(snap)
+
if err != nil {
return fmt.Errorf("marshal checkpoint state snapshot: %w", err)
}
+
path := filepath.Join(dir, fmt.Sprintf("%020d.state.json", sequence))
+
if err := os.WriteFile(path, b, 0o644); err != nil {
return fmt.Errorf("write checkpoint state snapshot: %w", err)
}
+
return nil
}
func (m *Manager) writeCheckpointStateSnapshotFromStore(sequence uint64) (string, int, time.Duration, error) {
dir := filepath.Join(m.dataDir, "checkpoints")
+
if err := os.MkdirAll(dir, 0o755); err != nil {
return "", 0, 0, fmt.Errorf("mkdir checkpoints: %w", err)
}
+
finalPath := filepath.Join(dir, fmt.Sprintf("%020d.state.json", sequence))
tmpPath := finalPath + ".tmp"
-
f, err := os.Create(tmpPath)
+
if err != nil {
return "", 0, 0, fmt.Errorf("create checkpoint state snapshot temp file: %w", err)
}
+
createdAt := time.Now().UTC().Format(time.RFC3339)
w := bufio.NewWriterSize(f, 1<<20)
+
if _, err := fmt.Fprintf(w, `{"v":1,"sequence":%d,"created_at":"%s","leaves":[`, sequence, createdAt); err != nil {
_ = f.Close()
+
return "", 0, 0, fmt.Errorf("write snapshot header: %w", err)
}
@@ -255,61 +311,82 @@ func (m *Manager) writeCheckpointStateSnapshotFromStore(sequence uint64) (string
didCount := 0
merkleCompute := time.Duration(0)
acc := merkle.NewAccumulator()
+
if err := m.store.ForEachState(func(s types.StateV1) error {
leaf := types.DIDLeaf{
DID: s.DID,
ChainTipHash: s.ChainTipHash,
}
b, err := json.Marshal(leaf)
+
if err != nil {
return fmt.Errorf("marshal checkpoint leaf: %w", err)
}
+
if !first {
if _, err := w.WriteString(","); err != nil {
return err
}
}
+
if _, err := w.Write(b); err != nil {
return err
}
+
first = false
hashStart := time.Now()
+
acc.AddLeafHash(merkle.HashLeaf([]byte(leaf.DID + leaf.ChainTipHash)))
+
merkleCompute += time.Since(hashStart)
+
didCount++
+
return nil
}); err != nil {
_ = f.Close()
+
return "", 0, 0, err
}
if _, err := w.WriteString(`]}`); err != nil {
_ = f.Close()
+
return "", 0, 0, fmt.Errorf("write snapshot trailer: %w", err)
}
+
if err := w.Flush(); err != nil {
_ = f.Close()
+
return "", 0, 0, fmt.Errorf("flush snapshot writer: %w", err)
}
+
if err := f.Sync(); err != nil {
_ = f.Close()
+
return "", 0, 0, fmt.Errorf("sync snapshot file: %w", err)
}
+
if err := f.Close(); err != nil {
return "", 0, 0, fmt.Errorf("close snapshot file: %w", err)
}
+
if err := os.Rename(tmpPath, finalPath); err != nil {
return "", 0, 0, fmt.Errorf("rename snapshot file: %w", err)
}
+
return hex.EncodeToString(acc.RootDuplicateLast()), didCount, merkleCompute, nil
}
func (m *Manager) loadSigningKey() (ed25519.PrivateKey, string, error) {
data, err := os.ReadFile(m.keyPath)
+
if err != nil {
return nil, "", fmt.Errorf("read mirror private key: %w", err)
}
+
text := strings.TrimSpace(string(data))
+
if text == "" {
return nil, "", errors.New("empty mirror private key")
}
@@ -321,11 +398,14 @@ func (m *Manager) loadSigningKey() (ed25519.PrivateKey, string, error) {
var k struct {
PrivateKey string `json:"private_key"`
}
+
if err := json.Unmarshal(data, &k); err == nil && strings.TrimSpace(k.PrivateKey) != "" {
raw, err := decodeKeyString(k.PrivateKey)
+
if err != nil {
return nil, "", err
}
+
return keyFromRaw(raw)
}
@@ -335,32 +415,42 @@ func (m *Manager) loadSigningKey() (ed25519.PrivateKey, string, error) {
func decodeKeyString(v string) ([]byte, error) {
if strings.HasPrefix(v, "did:key:") {
mb := strings.TrimPrefix(v, "did:key:")
+
if mb == "" || mb[0] != 'z' {
return nil, errors.New("unsupported did:key format")
}
+
decoded, err := base58.Decode(mb[1:])
+
if err != nil {
return nil, err
}
+
if len(decoded) < 34 {
return nil, errors.New("invalid did:key length")
}
+
return decoded[len(decoded)-32:], nil
}
+
if isHexString(v) {
if b, err := hex.DecodeString(v); err == nil {
return b, nil
}
}
+
if b, err := base64.RawURLEncoding.DecodeString(v); err == nil {
return b, nil
}
+
if b, err := base64.StdEncoding.DecodeString(v); err == nil {
return b, nil
}
+
if b, err := hex.DecodeString(v); err == nil {
return b, nil
}
+
return nil, errors.New("unknown key encoding")
}
@@ -368,12 +458,15 @@ func isHexString(v string) bool {
if len(v) == 0 || len(v)%2 != 0 {
return false
}
+
for _, r := range v {
if (r >= '0' && r <= '9') || (r >= 'a' && r <= 'f') || (r >= 'A' && r <= 'F') {
continue
}
+
return false
}
+
return true
}
@@ -382,10 +475,12 @@ func keyFromRaw(raw []byte) (ed25519.PrivateKey, string, error) {
case ed25519.SeedSize:
pk := ed25519.NewKeyFromSeed(raw)
kid := keyID(pk.Public().(ed25519.PublicKey))
+
return pk, kid, nil
case ed25519.PrivateKeySize:
pk := ed25519.PrivateKey(raw)
kid := keyID(pk.Public().(ed25519.PublicKey))
+
return pk, kid, nil
default:
return nil, "", fmt.Errorf("invalid private key length %d", len(raw))
@@ -394,5 +489,6 @@ func keyFromRaw(raw []byte) (ed25519.PrivateKey, string, error) {
func keyID(pub ed25519.PublicKey) string {
sum := sha256.Sum256(pub)
+
return "ed25519:" + hex.EncodeToString(sum[:8])
}
diff --git a/internal/checkpoint/checkpoint_test.go b/internal/checkpoint/checkpoint_test.go
index 8149129..156aa9e 100644
--- a/internal/checkpoint/checkpoint_test.go
+++ b/internal/checkpoint/checkpoint_test.go
@@ -9,7 +9,6 @@ import (
"os"
"path/filepath"
"testing"
-
"github.com/Fuwn/plutia/internal/storage"
"github.com/Fuwn/plutia/internal/types"
)
@@ -17,16 +16,21 @@ import (
func TestBuildAndStoreCheckpoint(t *testing.T) {
tmp := t.TempDir()
store, err := storage.OpenPebble(tmp)
+
if err != nil {
t.Fatalf("open pebble: %v", err)
}
+
defer store.Close()
pub, priv, err := ed25519.GenerateKey(rand.Reader)
+
if err != nil {
t.Fatalf("generate key: %v", err)
}
+
keyPath := filepath.Join(tmp, "mirror.key")
+
if err := os.WriteFile(keyPath, []byte(base64.RawURLEncoding.EncodeToString(priv)), 0o600); err != nil {
t.Fatalf("write key: %v", err)
}
@@ -37,28 +41,37 @@ func TestBuildAndStoreCheckpoint(t *testing.T) {
{Version: 1, DID: "did:plc:b", ChainTipHash: "tip-b"},
}
cp, err := mgr.BuildAndStore(42, states, []string{"abc", "def"})
+
if err != nil {
t.Fatalf("build checkpoint: %v", err)
}
+
if cp.Signature == "" || cp.CheckpointHash == "" {
t.Fatalf("expected signed checkpoint")
}
+
payload, err := marshalCheckpointPayload(cp)
+
if err != nil {
t.Fatalf("marshal payload: %v", err)
}
+
rawSig, err := base64.RawURLEncoding.DecodeString(cp.Signature)
+
if err != nil {
t.Fatalf("decode signature: %v", err)
}
+
if !ed25519.Verify(pub, payload, rawSig) {
t.Fatalf("signature verification failed")
}
stored, ok, err := store.GetCheckpoint(cp.Sequence)
+
if err != nil || !ok {
t.Fatalf("stored checkpoint missing: ok=%v err=%v", ok, err)
}
+
if stored.CheckpointHash != cp.CheckpointHash {
t.Fatalf("checkpoint hash mismatch")
}
@@ -67,33 +80,42 @@ func TestBuildAndStoreCheckpoint(t *testing.T) {
func TestCheckpointRootStability(t *testing.T) {
tmp := t.TempDir()
store, err := storage.OpenPebble(tmp)
+
if err != nil {
t.Fatalf("open pebble: %v", err)
}
+
defer store.Close()
if err := store.PutState(types.StateV1{Version: 1, DID: "did:plc:b", ChainTipHash: "tip-b"}); err != nil {
t.Fatalf("put state b: %v", err)
}
+
if err := store.PutState(types.StateV1{Version: 1, DID: "did:plc:a", ChainTipHash: "tip-a"}); err != nil {
t.Fatalf("put state a: %v", err)
}
_, priv, err := ed25519.GenerateKey(rand.Reader)
+
if err != nil {
t.Fatalf("generate key: %v", err)
}
+
keyPath := filepath.Join(tmp, "mirror.key")
+
if err := os.WriteFile(keyPath, []byte(base64.RawURLEncoding.EncodeToString(priv)), 0o600); err != nil {
t.Fatalf("write key: %v", err)
}
mgr := NewManager(store, tmp, keyPath)
cp1, err := mgr.BuildAndStoreFromStore(100, []string{"abc"})
+
if err != nil {
t.Fatalf("build checkpoint 1: %v", err)
}
+
cp2, err := mgr.BuildAndStoreFromStore(200, []string{"abc"})
+
if err != nil {
t.Fatalf("build checkpoint 2: %v", err)
}
@@ -101,6 +123,7 @@ func TestCheckpointRootStability(t *testing.T) {
if cp1.DIDMerkleRoot != cp2.DIDMerkleRoot {
t.Fatalf("did root changed for identical state: %s vs %s", cp1.DIDMerkleRoot, cp2.DIDMerkleRoot)
}
+
if cp1.BlockMerkleRoot != cp2.BlockMerkleRoot {
t.Fatalf("block root changed for identical block set: %s vs %s", cp1.BlockMerkleRoot, cp2.BlockMerkleRoot)
}
@@ -109,30 +132,40 @@ func TestCheckpointRootStability(t *testing.T) {
func TestBuildDIDProofAtCheckpointHonorsContextCancellation(t *testing.T) {
tmp := t.TempDir()
store, err := storage.OpenPebble(tmp)
+
if err != nil {
t.Fatalf("open pebble: %v", err)
}
+
defer store.Close()
_, priv, err := ed25519.GenerateKey(rand.Reader)
+
if err != nil {
t.Fatalf("generate key: %v", err)
}
+
keyPath := filepath.Join(tmp, "mirror.key")
+
if err := os.WriteFile(keyPath, []byte(base64.RawURLEncoding.EncodeToString(priv)), 0o600); err != nil {
t.Fatalf("write key: %v", err)
}
+
checkpointsDir := filepath.Join(tmp, "checkpoints")
+
if err := os.MkdirAll(checkpointsDir, 0o755); err != nil {
t.Fatalf("mkdir checkpoints: %v", err)
}
+
leaves := make([]types.DIDLeaf, 500)
+
for i := range leaves {
leaves[i] = types.DIDLeaf{
DID: "did:plc:test-" + string(rune('a'+(i%26))),
ChainTipHash: "tip",
}
}
+
snapshot := types.CheckpointStateSnapshotV1{
Version: 1,
Sequence: 10,
@@ -140,16 +173,20 @@ func TestBuildDIDProofAtCheckpointHonorsContextCancellation(t *testing.T) {
Leaves: leaves,
}
b, err := json.Marshal(snapshot)
+
if err != nil {
t.Fatalf("marshal snapshot: %v", err)
}
+
if err := os.WriteFile(filepath.Join(checkpointsDir, "00000000000000000010.state.json"), b, 0o644); err != nil {
t.Fatalf("write snapshot: %v", err)
}
mgr := NewManager(store, tmp, keyPath)
ctx, cancel := context.WithCancel(context.Background())
+
cancel()
+
if _, _, _, err := mgr.BuildDIDProofAtCheckpoint(ctx, "did:plc:test-a", "tip", 10); err == nil {
t.Fatalf("expected context cancellation error")
}