aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.gitignore10
-rw-r--r--LICENSE-APACHE202
-rw-r--r--LICENSE-MIT5
-rw-r--r--PLUTIA.md369
-rw-r--r--PROMPT.md25
-rw-r--r--README.md109
-rw-r--r--cmd/plutia/main.go383
-rw-r--r--config.yaml12
-rw-r--r--go.mod41
-rw-r--r--go.sum115
-rw-r--r--internal/api/server.go287
-rw-r--r--internal/api/server_checkpoint_test.go98
-rw-r--r--internal/api/server_integration_test.go261
-rw-r--r--internal/checkpoint/checkpoint.go394
-rw-r--r--internal/checkpoint/checkpoint_test.go105
-rw-r--r--internal/config/config.go153
-rw-r--r--internal/ingest/client.go189
-rw-r--r--internal/ingest/client_test.go38
-rw-r--r--internal/ingest/service.go741
-rw-r--r--internal/ingest/service_integration_test.go157
-rw-r--r--internal/ingest/service_order_test.go39
-rw-r--r--internal/merkle/tree.go188
-rw-r--r--internal/merkle/tree_test.go46
-rw-r--r--internal/state/engine.go172
-rw-r--r--internal/storage/blocklog.go330
-rw-r--r--internal/storage/blocklog_test.go56
-rw-r--r--internal/storage/pebble_store.go373
-rw-r--r--internal/storage/pebble_store_batch_durability_test.go66
-rw-r--r--internal/storage/pebble_store_batch_test.go74
-rw-r--r--internal/storage/store.go46
-rw-r--r--internal/types/checkpoint.go13
-rw-r--r--internal/types/operation.go77
-rw-r--r--internal/types/operation_test.go35
-rw-r--r--internal/types/state.go45
-rw-r--r--internal/verify/verifier.go296
-rw-r--r--internal/verify/verifier_test.go183
-rw-r--r--pkg/proof/proof.go15
37 files changed, 5354 insertions, 394 deletions
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..a217c29
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,10 @@
+bin/
+data/
+tmp-test-data/
+tmp-test-data-resolver/
+scale-test-data/
+scale-test-data-resolver/
+scale-test-data-resolver-stateonly/
+*.zst
+bench-*.out
+bench-*.time
diff --git a/LICENSE-APACHE b/LICENSE-APACHE
new file mode 100644
index 0000000..d645695
--- /dev/null
+++ b/LICENSE-APACHE
@@ -0,0 +1,202 @@
+
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright [yyyy] [name of copyright owner]
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
diff --git a/LICENSE-MIT b/LICENSE-MIT
new file mode 100644
index 0000000..0e77b64
--- /dev/null
+++ b/LICENSE-MIT
@@ -0,0 +1,5 @@
+Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the “Software”), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
diff --git a/PLUTIA.md b/PLUTIA.md
deleted file mode 100644
index f0dddaf..0000000
--- a/PLUTIA.md
+++ /dev/null
@@ -1,369 +0,0 @@
-# Plutia — Verifiable PLC Mirror
-
-Repository: [https://github.com/Fuwn/plutia](https://github.com/Fuwn/plutia)
-License: MIT OR Apache-2.0
-
-## Project Overview
-
-Plutia is a high-performance, verifiable alternative mirror for plc.directory.
-
-It must support two configurable operating modes:
-
-1. Resolver Mode (lightweight, state-only)
-2. Mirror Mode (full historical archive + verifiable proofs)
-
-Plutia is intended to be a production-grade, decentralization-focused infrastructure component capable of becoming a trusted first-choice mirror to plc.directory.
-
-The implementation must prioritize:
-
-* Deterministic data handling
-* Cryptographic correctness
-* Append-only integrity guarantees
-* Efficient disk usage (single-digit GB footprint for full mirror)
-* Static Go binaries
-* Clean modular architecture
-* Reproducibility
-* Long-term maintainability
-
-The implementation language is Go.
-
----
-
-## Design Goals
-
-1. Eliminate Postgres/WAL/index overhead seen in existing plc-mirror implementations.
-2. Store historical operations in compressed append-only block files.
-3. Maintain a minimal KV state store for current DID resolution.
-4. Provide verifiable proofs of inclusion and integrity.
-5. Allow configurable verification policy.
-6. Produce signed checkpoints to prevent equivocation.
-7. Be able to operate on modest VPS instances (20–40GB disk).
-8. Be horizontally extensible later (replication / gossip optional future).
-
----
-
-## Operating Modes
-
-Configurable via YAML OR environment variables:
-
-mode: resolver | mirror
-
-### Resolver Mode
-
-Stores only:
-
-* did -> latest operation reference
-* did -> materialized DID document
-* did -> chain tip hash
-
-Does not retain full historical chain.
-
-Intended footprint: 1–3GB.
-
-### Mirror Mode
-
-Stores:
-
-* Full operation history
-* Compressed append-only operation blocks
-* Minimal index mapping op sequence and DID chains
-* Signed checkpoints
-* State KV store (same as resolver mode)
-
-Intended footprint: ~4–6GB total.
-
----
-
-## Storage Architecture
-
-### 1) Append-Only Operation Log (Mirror Mode Only)
-
-Directory layout:
-
-data/
-ops/
-000001.zst
-000002.zst
-index/
-state/
-checkpoints/
-
-Each block file:
-
-* Contains multiple operations
-* Each operation is canonicalized JSON bytes
-* Stored as:
- [varint length][operation bytes]
-* Block compressed with zstd
-* Block hash recorded in index
-
-Block size target: 4–16MB uncompressed before compression.
-
-Compression: zstd level configurable (default level 9).
-
-### 2) KV State Store
-
-Use Pebble (github.com/cockroachdb/pebble).
-
-Keys:
-
-did:{did} -> serialized current state struct
-opseq:{global_seq} -> block_ref (block_id, offset, length)
-chain:{did} -> latest op sequence
-block:{block_id} -> block hash
-meta:global_seq -> uint64
-meta:mode -> resolver | mirror
-
-Value structures must be versioned.
-
-### 3) Canonicalization
-
-Operations must be:
-
-* Deterministically encoded
-* Signature-verifiable
-* Stored exactly as received except whitespace normalization
-
-No reordering of fields that would invalidate signature material.
-
----
-
-## Verification Policy
-
-Configurable:
-
-verify: full | lazy | state-only
-
-full:
-
-* Verify each operation upon ingestion.
-* Validate signature chain.
-* Validate prev CID linkage.
-
-lazy:
-
-* Store raw ops.
-* Verify on-demand and in background.
-
-state-only:
-
-* Verify only latest operation for each DID.
-
-Verification must use ed25519 via standard Go crypto libraries.
-
----
-
-## Checkpoint System
-
-Every N operations (configurable, default 100000):
-
-Produce checkpoint:
-
-{
-sequence: uint64,
-timestamp: RFC3339,
-did_merkle_root: hash,
-block_merkle_root: hash,
-previous_checkpoint_hash: hash
-}
-
-Checkpoint is:
-
-* Serialized deterministically
-* Signed with mirror private key
-* Stored under data/checkpoints/
-* Exposed via API
-
-Merkle tree:
-
-* Leaves = hash(did + chain_tip_hash)
-* Tree hash = sha256
-
-Purpose:
-
-* Allow external clients to verify inclusion.
-* Detect equivocation.
-* Enable mirror-to-mirror comparison.
-
----
-
-## API Surface
-
-HTTP server using net/http.
-
-Endpoints:
-
-GET /health
-GET /metrics
-GET /did/{did}
-GET /did/{did}/proof
-GET /checkpoints/latest
-GET /checkpoints/{sequence}
-GET /status
-
-Resolver response includes:
-
-{
-did_document,
-chain_tip_hash,
-checkpoint_reference
-}
-
-Proof endpoint includes:
-
-* Chain proof or chain tip reference
-* Merkle inclusion proof
-* Checkpoint signature
-
----
-
-## Ingestion
-
-Source: [https://plc.directory](https://plc.directory)
-
-Plutia must support:
-
-* Poll-based ingestion
-* Replay from genesis
-* Resume from stored sequence
-* Graceful recovery
-
-Concurrency model:
-
-* Single ingestion pipeline
-* Buffered channel for block assembly
-* Background checkpointing
-
----
-
-## Configuration
-
-config.yaml:
-
-mode: mirror
-data_dir: ./data
-plc_source: [https://plc.directory](https://plc.directory)
-verify: full
-zstd_level: 9
-block_size_mb: 8
-checkpoint_interval: 100000
-listen_addr: :8080
-mirror_private_key_path: ./mirror.key
-
----
-
-## Project Structure
-
-plutia/
-cmd/plutia/main.go
-internal/config/
-internal/ingest/
-internal/storage/
-internal/state/
-internal/checkpoint/
-internal/merkle/
-internal/api/
-internal/verify/
-internal/types/
-pkg/proof/
-go.mod
-
-Clean separation:
-
-* Storage interface
-* Verification logic
-* API layer
-* Merkle logic
-* Ingestion logic
-
----
-
-## Performance Requirements
-
-* Memory footprint under 1GB for mirror mode.
-* No relational database.
-* Efficient sequential disk writes.
-* Minimal random IO.
-* Backpressure-aware ingestion.
-* Safe shutdown with flush.
-
----
-
-## Security Requirements
-
-* No unsafe deserialization.
-* Deterministic hashing.
-* Signature verification strict.
-* Refuse malformed operations.
-* No silent chain truncation.
-* Checkpoint signing key rotation supported.
-
----
-
-## CLI
-
-plutia serve --config=config.yaml
-plutia verify --did=did:plc:xyz
-plutia snapshot
-plutia replay
-
----
-
-## Testing Requirements
-
-* Unit tests for:
-
- * Canonical encoding
- * Signature verification
- * Merkle tree correctness
- * Block encoding/decoding
-* Integration test:
-
- * Replay small PLC sample dataset
- * Verify final state correctness
-
----
-
-## Deliverables
-
-The Codex agent must:
-
-1. Initialize Go module.
-2. Implement storage layer with Pebble.
-3. Implement append-only block writer with zstd.
-4. Implement state KV logic.
-5. Implement verification engine.
-6. Implement Merkle + checkpoint system.
-7. Implement HTTP API.
-8. Provide README explaining usage.
-9. Provide sample config.yaml.
-10. Ensure build via:
- go build ./cmd/plutia
-
----
-
-## Non-Goals (for v1)
-
-* Gossip between mirrors
-* Distributed clustering
-* UI
-* Advanced analytics
-* Horizontal sharding
-
----
-
-## Core Principle
-
-Plutia must be:
-
-* Smaller than Postgres-based mirrors
-* Deterministic
-* Tamper-evident
-* Efficient
-* Trustworthy
-
-It should be credible as a first-choice alternative to plc.directory.
-
----
-
-End of specification.
diff --git a/PROMPT.md b/PROMPT.md
deleted file mode 100644
index 6b8e295..0000000
--- a/PROMPT.md
+++ /dev/null
@@ -1,25 +0,0 @@
-Build the project defined in PLUTIA.md end-to-end without asking for clarification.
-
-Initialize the repository github.com/Fuwn/plutia as a complete, production-grade Go project implementing the full specification in PLUTIA.md.
-
-You must:
-
-* Follow the architecture and storage design exactly as described.
-* Implement both resolver and mirror modes.
-* Use Pebble for KV storage.
-* Implement compressed append-only block storage using zstd.
-* Implement deterministic canonical operation handling.
-* Implement signature verification.
-* Implement Merkle checkpoint system with signed checkpoints.
-* Implement HTTP API.
-* Provide CLI commands.
-* Include full test coverage for core components.
-* Ensure the project builds with go build ./cmd/plutia.
-* Include README and example config.yaml.
-* Produce clean, idiomatic, well-structured Go code.
-* Do not scaffold placeholders — implement real working logic.
-* Do not pause for input. Complete the entire implementation.
-
-Act as a senior distributed systems engineer building a production-ready, verifiable PLC mirror called Plutia.
-
-Proceed.
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..2a4f6f1
--- /dev/null
+++ b/README.md
@@ -0,0 +1,109 @@
+# Plutia
+
+Plutia is a verifiable PLC mirror for `plc.directory`, implemented in Go.
+
+It supports:
+
+- `resolver` mode: compact state-only storage.
+- `mirror` mode: full historical operation archive with compressed append-only blocks, Merkle checkpoints, and inclusion proofs.
+
+## Features
+
+- Pebble-backed KV/index store (`data/index`).
+- Mirror-mode append-only operation log (`data/ops/*.zst`) with zstd compression.
+- Deterministic JSON canonicalization (`json.Compact`) with stable operation hashing.
+- Ed25519 signature verification with configurable policy (`full`, `lazy`, `state-only`).
+- Prev-link chain validation for tamper-evident DID histories.
+- Signed checkpoints with DID and block Merkle roots.
+- HTTP API for resolution, proofs, status, and checkpoints.
+- CLI for serving, replay, DID verification, and snapshots.
+- Benchmark command with rolling throughput/CPU/RSS/disk reporting.
+
+## Project Layout
+
+- `cmd/plutia`: CLI entrypoint.
+- `internal/config`: YAML/env configuration.
+- `internal/types`: operation/state/checkpoint models + canonicalization.
+- `internal/storage`: Pebble store + zstd append-only block log.
+- `internal/verify`: signature and chain-link validation.
+- `internal/state`: state materialization and KV updates.
+- `internal/merkle`: Merkle root/proof generation.
+- `internal/checkpoint`: checkpoint creation/signing/persistence.
+- `internal/ingest`: replay/poll ingestion pipeline.
+- `internal/api`: HTTP handlers.
+- `pkg/proof`: proof response structures.
+
+## Configuration
+
+Use `config.yaml` (see included example) and/or environment variables:
+
+- `PLUTIA_MODE`
+- `PLUTIA_DATA_DIR`
+- `PLUTIA_PLC_SOURCE`
+- `PLUTIA_VERIFY`
+- `PLUTIA_ZSTD_LEVEL`
+- `PLUTIA_BLOCK_SIZE_MB`
+- `PLUTIA_CHECKPOINT_INTERVAL`
+- `PLUTIA_COMMIT_BATCH_SIZE`
+- `PLUTIA_VERIFY_WORKERS`
+- `PLUTIA_LISTEN_ADDR`
+- `PLUTIA_MIRROR_PRIVATE_KEY_PATH`
+- `PLUTIA_POLL_INTERVAL`
+
+## Mirror Key
+
+Checkpoints are signed using `mirror_private_key_path`.
+Accepted key formats:
+
+- raw ed25519 seed/private key (`base64url`, `base64`, or `hex`)
+- JSON object: `{"private_key":"..."}`
+
+## CLI
+
+```bash
+plutia serve --config=config.yaml
+plutia replay --config=config.yaml
+plutia verify --config=config.yaml --did=did:plc:xyz
+plutia snapshot --config=config.yaml
+plutia bench --config=config.yaml --max-ops=200000
+```
+
+## HTTP API
+
+- `GET /health`
+- `GET /metrics`
+- `GET /status`
+- `GET /did/{did}`
+- `GET /did/{did}/proof`
+- `GET /checkpoints/latest`
+- `GET /checkpoints/{sequence}`
+
+`/did/{did}` returns `did_document`, `chain_tip_hash`, and latest checkpoint reference when available.
+
+`/did/{did}/proof` returns chain-tip reference, operation sequence references, Merkle inclusion proof, and checkpoint signature metadata.
+
+## Ingestion Behavior
+
+- Replay from genesis via `plutia replay`.
+- Poll-based incremental ingestion in `serve` mode.
+- Resume from `meta:global_seq`.
+- Parallel DID-partitioned verification with ordered commit.
+- Configurable synced commit batching (`commit_batch_size`).
+- Safe flush on shutdown.
+
+## Verification Policies
+
+- `full`: verify all operations on ingest.
+- `lazy`: ingest without immediate signature verification.
+- `state-only`: verify latest operations per DID.
+
+## Build and Test
+
+```bash
+go test ./...
+go build ./cmd/plutia
+```
+
+## License
+
+MIT OR Apache-2.0
diff --git a/cmd/plutia/main.go b/cmd/plutia/main.go
new file mode 100644
index 0000000..0849e0a
--- /dev/null
+++ b/cmd/plutia/main.go
@@ -0,0 +1,383 @@
+package main
+
+import (
+ "context"
+ "errors"
+ "flag"
+ "fmt"
+ "log"
+ "net/http"
+ "os"
+ "os/exec"
+ "os/signal"
+ "path/filepath"
+ "strconv"
+ "strings"
+ "syscall"
+ "time"
+
+ "github.com/Fuwn/plutia/internal/api"
+ "github.com/Fuwn/plutia/internal/checkpoint"
+ "github.com/Fuwn/plutia/internal/config"
+ "github.com/Fuwn/plutia/internal/ingest"
+ "github.com/Fuwn/plutia/internal/storage"
+)
+
+type app struct {
+ cfg config.Config
+ store *storage.PebbleStore
+ service *ingest.Service
+ apiServer *api.Server
+ checkpointM *checkpoint.Manager
+}
+
+func main() {
+ if len(os.Args) < 2 {
+ usage()
+ os.Exit(2)
+ }
+ cmd := os.Args[1]
+ switch cmd {
+ case "serve":
+ if err := runServe(os.Args[2:]); err != nil {
+ log.Fatal(err)
+ }
+ case "replay":
+ if err := runReplay(os.Args[2:]); err != nil {
+ log.Fatal(err)
+ }
+ case "verify":
+ if err := runVerify(os.Args[2:]); err != nil {
+ log.Fatal(err)
+ }
+ case "snapshot":
+ if err := runSnapshot(os.Args[2:]); err != nil {
+ log.Fatal(err)
+ }
+ case "bench":
+ if err := runBench(os.Args[2:]); err != nil {
+ log.Fatal(err)
+ }
+ default:
+ usage()
+ os.Exit(2)
+ }
+}
+
+func runServe(args []string) error {
+ fs := flag.NewFlagSet("serve", flag.ExitOnError)
+ configPath := fs.String("config", "config.yaml", "config path")
+ maxOps := fs.Uint64("max-ops", 0, "max operations to ingest in this process (0 = unlimited)")
+ if err := fs.Parse(args); err != nil {
+ return err
+ }
+ app, err := bootstrap(*configPath)
+ if err != nil {
+ return err
+ }
+ defer app.store.Close()
+ defer app.service.Close()
+ app.service.SetMaxOps(*maxOps)
+
+ ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
+ defer stop()
+
+ if err := app.service.Replay(ctx); err != nil {
+ if app.service.IsCorrupted() {
+ log.Printf("starting in read-only corrupted mode: %v", err)
+ } else {
+ return fmt.Errorf("initial replay failed: %w", err)
+ }
+ }
+
+ httpSrv := &http.Server{
+ Addr: app.cfg.ListenAddr,
+ Handler: app.apiServer.Handler(),
+ }
+
+ errCh := make(chan error, 2)
+ go func() {
+ log.Printf("HTTP server listening on %s", app.cfg.ListenAddr)
+ if err := httpSrv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
+ errCh <- err
+ }
+ }()
+ if !app.service.IsCorrupted() {
+ go func() {
+ if err := app.service.Poll(ctx); err != nil && !errors.Is(err, context.Canceled) {
+ errCh <- err
+ }
+ }()
+ }
+
+ select {
+ case <-ctx.Done():
+ case err := <-errCh:
+ return err
+ }
+
+ shutdownCtx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
+ defer cancel()
+ if err := app.service.Flush(shutdownCtx); err != nil {
+ return err
+ }
+ return httpSrv.Shutdown(shutdownCtx)
+}
+
+func runReplay(args []string) error {
+ fs := flag.NewFlagSet("replay", flag.ExitOnError)
+ configPath := fs.String("config", "config.yaml", "config path")
+ maxOps := fs.Uint64("max-ops", 0, "max operations to ingest in this process (0 = unlimited)")
+ if err := fs.Parse(args); err != nil {
+ return err
+ }
+ app, err := bootstrap(*configPath)
+ if err != nil {
+ return err
+ }
+ defer app.store.Close()
+ defer app.service.Close()
+ app.service.SetMaxOps(*maxOps)
+
+ ctx := context.Background()
+ if err := app.service.Replay(ctx); err != nil {
+ return err
+ }
+ if err := app.service.Flush(ctx); err != nil {
+ return err
+ }
+ seq, _ := app.store.GetGlobalSeq()
+ fmt.Printf("replay complete at sequence %d\n", seq)
+ return nil
+}
+
+func runVerify(args []string) error {
+ fs := flag.NewFlagSet("verify", flag.ExitOnError)
+ configPath := fs.String("config", "config.yaml", "config path")
+ did := fs.String("did", "", "did to verify")
+ if err := fs.Parse(args); err != nil {
+ return err
+ }
+ if *did == "" {
+ return fmt.Errorf("--did is required")
+ }
+ app, err := bootstrap(*configPath)
+ if err != nil {
+ return err
+ }
+ defer app.store.Close()
+ defer app.service.Close()
+
+ if err := app.service.VerifyDID(context.Background(), *did); err != nil {
+ return err
+ }
+ fmt.Printf("verification succeeded for %s\n", *did)
+ return nil
+}
+
+func runSnapshot(args []string) error {
+ fs := flag.NewFlagSet("snapshot", flag.ExitOnError)
+ configPath := fs.String("config", "config.yaml", "config path")
+ if err := fs.Parse(args); err != nil {
+ return err
+ }
+ app, err := bootstrap(*configPath)
+ if err != nil {
+ return err
+ }
+ defer app.store.Close()
+ defer app.service.Close()
+ cp, err := app.service.Snapshot(context.Background())
+ if err != nil {
+ return err
+ }
+ fmt.Printf("checkpoint sequence=%d hash=%s\n", cp.Sequence, cp.CheckpointHash)
+ return nil
+}
+
+func runBench(args []string) error {
+ fs := flag.NewFlagSet("bench", flag.ExitOnError)
+ configPath := fs.String("config", "config.yaml", "config path")
+ maxOps := fs.Uint64("max-ops", 200000, "max operations to ingest for benchmark")
+ interval := fs.Duration("interval", 10*time.Second, "rolling report interval")
+ if err := fs.Parse(args); err != nil {
+ return err
+ }
+ app, err := bootstrap(*configPath)
+ if err != nil {
+ return err
+ }
+ defer app.store.Close()
+ defer app.service.Close()
+ app.service.SetMaxOps(*maxOps)
+
+ startSeq, err := app.store.GetGlobalSeq()
+ if err != nil {
+ return err
+ }
+ start := time.Now()
+ lastSeq := startSeq
+ pid := os.Getpid()
+
+ done := make(chan struct{})
+ go func() {
+ ticker := time.NewTicker(*interval)
+ defer ticker.Stop()
+ for {
+ select {
+ case <-done:
+ return
+ case <-ticker.C:
+ seq, err := app.store.GetGlobalSeq()
+ if err != nil {
+ continue
+ }
+ delta := seq - lastSeq
+ lastSeq = seq
+ opsSec := float64(delta) / interval.Seconds()
+ cpuPct, rssKB, _ := processMetrics(pid)
+ fmt.Printf("rolling seq=%d ops_sec=%.2f cpu_pct=%.2f rss_kb=%d\n", seq, opsSec, cpuPct, rssKB)
+ }
+ }
+ }()
+
+ ctx := context.Background()
+ replayErr := app.service.Replay(ctx)
+ close(done)
+ if replayErr != nil {
+ return replayErr
+ }
+ if err := app.service.Flush(ctx); err != nil {
+ return err
+ }
+
+ endSeq, err := app.store.GetGlobalSeq()
+ if err != nil {
+ return err
+ }
+ elapsed := time.Since(start)
+ totalOps := endSeq - startSeq
+ totalOpsSec := float64(totalOps) / elapsed.Seconds()
+ cpuPct, rssKB, _ := processMetrics(pid)
+ totalBytes, opsBytes, indexBytes, cpBytes, err := diskUsageBreakdown(app.cfg.DataDir)
+ if err != nil {
+ return err
+ }
+
+ fmt.Printf("bench_total elapsed=%s ops=%d ops_sec=%.2f cpu_pct=%.2f rss_kb=%d\n", elapsed.Round(time.Millisecond), totalOps, totalOpsSec, cpuPct, rssKB)
+ fmt.Printf("bench_disk total_bytes=%d ops_bytes=%d index_bytes=%d checkpoints_bytes=%d bytes_per_op=%.2f\n",
+ totalBytes, opsBytes, indexBytes, cpBytes, float64(totalBytes)/max(1, float64(totalOps)))
+ return nil
+}
+
+func bootstrap(path string) (*app, error) {
+ cfg, err := config.Load(path)
+ if err != nil {
+ return nil, err
+ }
+ for _, p := range []string{cfg.DataDir, filepath.Join(cfg.DataDir, "ops"), filepath.Join(cfg.DataDir, "index"), filepath.Join(cfg.DataDir, "checkpoints")} {
+ if err := os.MkdirAll(p, 0o755); err != nil {
+ return nil, fmt.Errorf("mkdir %s: %w", p, err)
+ }
+ }
+ store, err := storage.OpenPebble(cfg.DataDir)
+ if err != nil {
+ return nil, err
+ }
+ mode, err := store.GetMode()
+ if err != nil {
+ return nil, err
+ }
+ if mode == "" {
+ if err := store.SetMode(cfg.Mode); err != nil {
+ return nil, err
+ }
+ } else if mode != cfg.Mode {
+ return nil, fmt.Errorf("mode mismatch: store=%s config=%s", mode, cfg.Mode)
+ }
+
+ var blockLog *storage.BlockLog
+ if cfg.Mode == config.ModeMirror {
+ blockLog, err = storage.OpenBlockLog(cfg.DataDir, cfg.ZstdLevel, cfg.BlockSizeMB)
+ if err != nil {
+ return nil, err
+ }
+ }
+ client := ingest.NewClient(cfg.PLCSource)
+ checkpointMgr := checkpoint.NewManager(store, cfg.DataDir, cfg.MirrorPrivateKeyPath)
+ service := ingest.NewService(cfg, store, client, blockLog, checkpointMgr)
+ apiServer := api.NewServer(cfg, store, service, checkpointMgr)
+ return &app{cfg: cfg, store: store, service: service, apiServer: apiServer, checkpointM: checkpointMgr}, nil
+}
+
+func usage() {
+ fmt.Fprintf(os.Stderr, `plutia: verifiable PLC mirror
+
+Commands:
+ plutia serve --config=config.yaml [--max-ops=0]
+ plutia replay --config=config.yaml [--max-ops=0]
+ plutia verify --config=config.yaml --did=did:plc:xyz
+ plutia snapshot --config=config.yaml
+ plutia bench --config=config.yaml [--max-ops=200000] [--interval=10s]
+`)
+}
+
+func processMetrics(pid int) (cpuPct float64, rssKB int64, err error) {
+ out, err := exec.Command("ps", "-p", strconv.Itoa(pid), "-o", "pcpu=,rss=").Output()
+ if err != nil {
+ return 0, 0, err
+ }
+ fields := strings.Fields(string(out))
+ if len(fields) < 2 {
+ return 0, 0, fmt.Errorf("unexpected ps output: %q", strings.TrimSpace(string(out)))
+ }
+ cpuPct, _ = strconv.ParseFloat(fields[0], 64)
+ rssKB, _ = strconv.ParseInt(fields[1], 10, 64)
+ return cpuPct, rssKB, nil
+}
+
+func diskUsageBreakdown(dataDir string) (total, ops, index, checkpoints int64, err error) {
+ total, err = dirSize(dataDir)
+ if err != nil {
+ return 0, 0, 0, 0, err
+ }
+ ops, err = dirSize(filepath.Join(dataDir, "ops"))
+ if err != nil {
+ return 0, 0, 0, 0, err
+ }
+ index, err = dirSize(filepath.Join(dataDir, "index"))
+ if err != nil {
+ return 0, 0, 0, 0, err
+ }
+ checkpoints, err = dirSize(filepath.Join(dataDir, "checkpoints"))
+ if err != nil {
+ return 0, 0, 0, 0, err
+ }
+ return total, ops, index, checkpoints, nil
+}
+
+func dirSize(path string) (int64, error) {
+ var total int64
+ err := filepath.WalkDir(path, func(_ string, d os.DirEntry, err error) error {
+ if err != nil {
+ return err
+ }
+ if d.IsDir() {
+ return nil
+ }
+ info, err := d.Info()
+ if err != nil {
+ return err
+ }
+ total += info.Size()
+ return nil
+ })
+ return total, err
+}
+
+func max(a, b float64) float64 {
+ if a > b {
+ return a
+ }
+ return b
+}
diff --git a/config.yaml b/config.yaml
new file mode 100644
index 0000000..8373155
--- /dev/null
+++ b/config.yaml
@@ -0,0 +1,12 @@
+mode: mirror
+data_dir: ./data
+plc_source: https://plc.directory
+verify: full
+zstd_level: 9
+block_size_mb: 8
+checkpoint_interval: 100000
+commit_batch_size: 128
+verify_workers: 10
+listen_addr: :8080
+mirror_private_key_path: ./mirror.key
+poll_interval: 5s
diff --git a/go.mod b/go.mod
new file mode 100644
index 0000000..e86efe7
--- /dev/null
+++ b/go.mod
@@ -0,0 +1,41 @@
+module github.com/Fuwn/plutia
+
+go 1.25
+
+require (
+ github.com/cockroachdb/pebble v1.1.5
+ github.com/decred/dcrd/dcrec/secp256k1/v4 v4.4.0
+ github.com/fxamacker/cbor/v2 v2.9.0
+ github.com/klauspost/compress v1.17.11
+ github.com/mr-tron/base58 v1.2.0
+ gopkg.in/yaml.v3 v3.0.1
+)
+
+require (
+ github.com/DataDog/zstd v1.4.5 // indirect
+ github.com/beorn7/perks v1.0.1 // indirect
+ github.com/cespare/xxhash/v2 v2.2.0 // indirect
+ github.com/cockroachdb/errors v1.11.3 // indirect
+ github.com/cockroachdb/fifo v0.0.0-20240606204812-0bbfbd93a7ce // indirect
+ github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b // indirect
+ github.com/cockroachdb/redact v1.1.5 // indirect
+ github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06 // indirect
+ github.com/getsentry/sentry-go v0.27.0 // indirect
+ github.com/gogo/protobuf v1.3.2 // indirect
+ github.com/golang/snappy v0.0.4 // indirect
+ github.com/kr/pretty v0.3.1 // indirect
+ github.com/kr/text v0.2.0 // indirect
+ github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect
+ github.com/pkg/errors v0.9.1 // indirect
+ github.com/prometheus/client_golang v1.17.0 // indirect
+ github.com/prometheus/client_model v0.5.0 // indirect
+ github.com/prometheus/common v0.45.0 // indirect
+ github.com/prometheus/procfs v0.12.0 // indirect
+ github.com/rogpeppe/go-internal v1.10.0 // indirect
+ github.com/stretchr/testify v1.10.0 // indirect
+ github.com/x448/float16 v0.8.4 // indirect
+ golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa // indirect
+ golang.org/x/sys v0.22.0 // indirect
+ golang.org/x/text v0.14.0 // indirect
+ google.golang.org/protobuf v1.33.0 // indirect
+)
diff --git a/go.sum b/go.sum
new file mode 100644
index 0000000..abddae2
--- /dev/null
+++ b/go.sum
@@ -0,0 +1,115 @@
+github.com/DataDog/zstd v1.4.5 h1:EndNeuB0l9syBZhut0wns3gV1hL8zX8LIu6ZiVHWLIQ=
+github.com/DataDog/zstd v1.4.5/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo=
+github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
+github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
+github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
+github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
+github.com/cockroachdb/datadriven v1.0.3-0.20230413201302-be42291fc80f h1:otljaYPt5hWxV3MUfO5dFPFiOXg9CyG5/kCfayTqsJ4=
+github.com/cockroachdb/datadriven v1.0.3-0.20230413201302-be42291fc80f/go.mod h1:a9RdTaap04u637JoCzcUoIcDmvwSUtcUFtT/C3kJlTU=
+github.com/cockroachdb/errors v1.11.3 h1:5bA+k2Y6r+oz/6Z/RFlNeVCesGARKuC6YymtcDrbC/I=
+github.com/cockroachdb/errors v1.11.3/go.mod h1:m4UIW4CDjx+R5cybPsNrRbreomiFqt8o1h1wUVazSd8=
+github.com/cockroachdb/fifo v0.0.0-20240606204812-0bbfbd93a7ce h1:giXvy4KSc/6g/esnpM7Geqxka4WSqI1SZc7sMJFd3y4=
+github.com/cockroachdb/fifo v0.0.0-20240606204812-0bbfbd93a7ce/go.mod h1:9/y3cnZ5GKakj/H4y9r9GTjCvAFta7KLgSHPJJYc52M=
+github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b h1:r6VH0faHjZeQy818SGhaone5OnYfxFR/+AzdY3sf5aE=
+github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b/go.mod h1:Vz9DsVWQQhf3vs21MhPMZpMGSht7O/2vFW2xusFUVOs=
+github.com/cockroachdb/pebble v1.1.5 h1:5AAWCBWbat0uE0blr8qzufZP5tBjkRyy/jWe1QWLnvw=
+github.com/cockroachdb/pebble v1.1.5/go.mod h1:17wO9el1YEigxkP/YtV8NtCivQDgoCyBg5c4VR/eOWo=
+github.com/cockroachdb/redact v1.1.5 h1:u1PMllDkdFfPWaNGMyLD1+so+aq3uUItthCFqzwPJ30=
+github.com/cockroachdb/redact v1.1.5/go.mod h1:BVNblN9mBWFyMyqK1k3AAiSxhvhfK2oOZZ2lK+dpvRg=
+github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06 h1:zuQyyAKVxetITBuuhv3BI9cMrmStnpT18zmgmTxunpo=
+github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06/go.mod h1:7nc4anLGjupUW/PeY5qiNYsdNXj7zopG+eqsS7To5IQ=
+github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
+github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
+github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/decred/dcrd/crypto/blake256 v1.1.0 h1:zPMNGQCm0g4QTY27fOCorQW7EryeQ/U0x++OzVrdms8=
+github.com/decred/dcrd/crypto/blake256 v1.1.0/go.mod h1:2OfgNZ5wDpcsFmHmCK5gZTPcCXqlm2ArzUIkw9czNJo=
+github.com/decred/dcrd/dcrec/secp256k1/v4 v4.4.0 h1:NMZiJj8QnKe1LgsbDayM4UoHwbvwDRwnI3hwNaAHRnc=
+github.com/decred/dcrd/dcrec/secp256k1/v4 v4.4.0/go.mod h1:ZXNYxsqcloTdSy/rNShjYzMhyjf0LaoftYK0p+A3h40=
+github.com/fxamacker/cbor/v2 v2.9.0 h1:NpKPmjDBgUfBms6tr6JZkTHtfFGcMKsw3eGcmD/sapM=
+github.com/fxamacker/cbor/v2 v2.9.0/go.mod h1:vM4b+DJCtHn+zz7h3FFp/hDAI9WNWCsZj23V5ytsSxQ=
+github.com/getsentry/sentry-go v0.27.0 h1:Pv98CIbtB3LkMWmXi4Joa5OOcwbmnX88sF5qbK3r3Ps=
+github.com/getsentry/sentry-go v0.27.0/go.mod h1:lc76E2QywIyW8WuBnwl8Lc4bkmQH4+w1gwTf25trprY=
+github.com/go-errors/errors v1.4.2 h1:J6MZopCL4uSllY1OfXM374weqZFFItUbrImctkmUxIA=
+github.com/go-errors/errors v1.4.2/go.mod h1:sIVyrIiJhuEF+Pj9Ebtd6P/rEYROXFi3BopGUQ5a5Og=
+github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
+github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
+github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
+github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
+github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
+github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
+github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
+github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
+github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc=
+github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0=
+github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
+github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
+github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
+github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
+github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 h1:jWpvCLoY8Z/e3VKvlsiIGKtc+UG6U5vzxaoagmhXfyg=
+github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0/go.mod h1:QUyp042oQthUoa9bqDv0ER0wrtXnBruoNd7aNjkbP+k=
+github.com/mr-tron/base58 v1.2.0 h1:T/HDJBh4ZCPbU39/+c3rRvE0uKBQlU27+QI8LJ4t64o=
+github.com/mr-tron/base58 v1.2.0/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc=
+github.com/pingcap/errors v0.11.4 h1:lFuQV/oaUMGcD2tqt+01ROSmJs75VG1ToEOkZIZ4nE4=
+github.com/pingcap/errors v0.11.4/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8=
+github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
+github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
+github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
+github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
+github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/prometheus/client_golang v1.17.0 h1:rl2sfwZMtSthVU752MqfjQozy7blglC+1SOtjMAMh+Q=
+github.com/prometheus/client_golang v1.17.0/go.mod h1:VeL+gMmOAxkS2IqfCq0ZmHSL+LjWfWDUmp1mBz9JgUY=
+github.com/prometheus/client_model v0.5.0 h1:VQw1hfvPvk3Uv6Qf29VrPF32JB6rtbgI6cYPYQjL0Qw=
+github.com/prometheus/client_model v0.5.0/go.mod h1:dTiFglRmd66nLR9Pv9f0mZi7B7fk5Pm3gvsjB5tr+kI=
+github.com/prometheus/common v0.45.0 h1:2BGz0eBc2hdMDLnO/8n0jeB3oPrt2D08CekT0lneoxM=
+github.com/prometheus/common v0.45.0/go.mod h1:YJmSTw9BoKxJplESWWxlbyttQR4uaEcGyv9MZjVOJsY=
+github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo=
+github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo=
+github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
+github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
+github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
+github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
+github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
+github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM=
+github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg=
+github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
+github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
+golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
+golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
+golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
+golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa h1:FRnLl4eNAQl8hwxVVC17teOw8kdjVDVAiFMtgUdTSRQ=
+golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa/go.mod h1:zk2irFbV9DP96SEBUUAy67IdHUaZuSnrz1n472HUCLE=
+golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
+golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
+golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
+golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
+golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
+golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
+golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI=
+golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
+golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
+golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
+golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
+golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
+golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
+golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
+golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
+golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
+golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI=
+google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
+gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
+gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
+gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
diff --git a/internal/api/server.go b/internal/api/server.go
new file mode 100644
index 0000000..2159029
--- /dev/null
+++ b/internal/api/server.go
@@ -0,0 +1,287 @@
+package api
+
+import (
+ "encoding/hex"
+ "encoding/json"
+ "fmt"
+ "net/http"
+ "strconv"
+ "strings"
+
+ "github.com/Fuwn/plutia/internal/checkpoint"
+ "github.com/Fuwn/plutia/internal/config"
+ "github.com/Fuwn/plutia/internal/ingest"
+ "github.com/Fuwn/plutia/internal/merkle"
+ "github.com/Fuwn/plutia/internal/storage"
+ "github.com/Fuwn/plutia/internal/types"
+ "github.com/Fuwn/plutia/pkg/proof"
+)
+
+type Server struct {
+ cfg config.Config
+ store storage.Store
+ ingestor *ingest.Service
+ checkpoints *checkpoint.Manager
+}
+
+func NewServer(cfg config.Config, store storage.Store, ingestor *ingest.Service, checkpoints *checkpoint.Manager) *Server {
+ return &Server{cfg: cfg, store: store, ingestor: ingestor, checkpoints: checkpoints}
+}
+
+func (s *Server) Handler() http.Handler {
+ mux := http.NewServeMux()
+ mux.HandleFunc("/health", s.handleHealth)
+ mux.HandleFunc("/metrics", s.handleMetrics)
+ mux.HandleFunc("/status", s.handleStatus)
+ mux.HandleFunc("/checkpoints/latest", s.handleLatestCheckpoint)
+ mux.HandleFunc("/checkpoints/", s.handleCheckpointBySequence)
+ mux.HandleFunc("/did/", s.handleDID)
+ return mux
+}
+
+func (s *Server) handleHealth(w http.ResponseWriter, r *http.Request) {
+ writeJSON(w, http.StatusOK, map[string]any{"status": "ok"})
+}
+
+func (s *Server) handleMetrics(w http.ResponseWriter, r *http.Request) {
+ st := s.ingestor.Stats()
+ seq, _ := s.store.GetGlobalSeq()
+ w.Header().Set("Content-Type", "text/plain; version=0.0.4")
+ _, _ = fmt.Fprintf(w, "plutia_ingested_ops %d\n", st.IngestedOps)
+ _, _ = fmt.Fprintf(w, "plutia_ingest_errors %d\n", st.Errors)
+ _, _ = fmt.Fprintf(w, "plutia_last_seq %d\n", seq)
+}
+
+func (s *Server) handleStatus(w http.ResponseWriter, r *http.Request) {
+ seq, err := s.store.GetGlobalSeq()
+ if err != nil {
+ writeErr(w, http.StatusInternalServerError, err)
+ return
+ }
+ cp, ok, err := s.store.GetLatestCheckpoint()
+ if err != nil {
+ writeErr(w, http.StatusInternalServerError, err)
+ return
+ }
+ payload := map[string]any{
+ "mode": s.cfg.Mode,
+ "verify_policy": s.cfg.VerifyPolicy,
+ "global_seq": seq,
+ "stats": s.ingestor.Stats(),
+ "corrupted": s.ingestor.IsCorrupted(),
+ }
+ if err := s.ingestor.CorruptionError(); err != nil {
+ payload["corruption_error"] = err.Error()
+ }
+ if ok {
+ payload["latest_checkpoint"] = cp
+ }
+ writeJSON(w, http.StatusOK, payload)
+}
+
+func (s *Server) handleLatestCheckpoint(w http.ResponseWriter, r *http.Request) {
+ cp, ok, err := s.store.GetLatestCheckpoint()
+ if err != nil {
+ writeErr(w, http.StatusInternalServerError, err)
+ return
+ }
+ if !ok {
+ writeErr(w, http.StatusNotFound, fmt.Errorf("no checkpoint"))
+ return
+ }
+ writeJSON(w, http.StatusOK, cp)
+}
+
+func (s *Server) handleCheckpointBySequence(w http.ResponseWriter, r *http.Request) {
+ path := strings.TrimPrefix(r.URL.Path, "/checkpoints/")
+ if path == "" {
+ writeErr(w, http.StatusNotFound, fmt.Errorf("missing checkpoint sequence"))
+ return
+ }
+ seq, err := strconv.ParseUint(path, 10, 64)
+ if err != nil {
+ writeErr(w, http.StatusBadRequest, fmt.Errorf("invalid checkpoint sequence"))
+ return
+ }
+ cp, ok, err := s.store.GetCheckpoint(seq)
+ if err != nil {
+ writeErr(w, http.StatusInternalServerError, err)
+ return
+ }
+ if !ok {
+ writeErr(w, http.StatusNotFound, fmt.Errorf("checkpoint not found"))
+ return
+ }
+ writeJSON(w, http.StatusOK, cp)
+}
+
+func (s *Server) handleDID(w http.ResponseWriter, r *http.Request) {
+ path := strings.TrimPrefix(r.URL.Path, "/did/")
+ if path == "" {
+ writeErr(w, http.StatusBadRequest, fmt.Errorf("missing did"))
+ return
+ }
+ if strings.HasSuffix(path, "/proof") {
+ did := strings.TrimSuffix(path, "/proof")
+ s.handleDIDProof(w, r, did)
+ return
+ }
+ s.handleDIDResolve(w, r, path)
+}
+
+func (s *Server) handleDIDResolve(w http.ResponseWriter, r *http.Request, did string) {
+ state, ok, err := s.store.GetState(did)
+ if err != nil {
+ writeErr(w, http.StatusInternalServerError, err)
+ return
+ }
+ if !ok {
+ writeErr(w, http.StatusNotFound, fmt.Errorf("did not found"))
+ return
+ }
+ cp, cpOK, err := s.store.GetLatestCheckpoint()
+ if err != nil {
+ writeErr(w, http.StatusInternalServerError, err)
+ return
+ }
+ resp := map[string]any{
+ "did": did,
+ "did_document": json.RawMessage(state.DIDDocument),
+ "chain_tip_hash": state.ChainTipHash,
+ }
+ if cpOK {
+ resp["checkpoint_reference"] = map[string]any{
+ "sequence": cp.Sequence,
+ "checkpoint_hash": cp.CheckpointHash,
+ }
+ }
+ writeJSON(w, http.StatusOK, resp)
+}
+
+func (s *Server) handleDIDProof(w http.ResponseWriter, r *http.Request, did string) {
+ if err := s.ingestor.CorruptionError(); err != nil {
+ writeErr(w, http.StatusServiceUnavailable, err)
+ return
+ }
+
+ cp, verifyCheckpointUnchanged, err := s.selectCheckpointForProof(r)
+ if err != nil {
+ writeErr(w, http.StatusBadRequest, err)
+ return
+ }
+
+ tipHash, seqs, err := s.ingestor.RecomputeTipAtOrBefore(r.Context(), did, cp.Sequence)
+ if err != nil {
+ writeErr(w, http.StatusNotFound, err)
+ return
+ }
+ siblings, leafHash, found, err := s.checkpoints.BuildDIDProofAtCheckpoint(did, tipHash, cp.Sequence)
+ if err != nil {
+ writeErr(w, http.StatusInternalServerError, err)
+ return
+ }
+ if !found {
+ writeErr(w, http.StatusNotFound, fmt.Errorf("did not present in checkpoint state"))
+ return
+ }
+
+ leafBytes, err := hex.DecodeString(leafHash)
+ if err != nil {
+ writeErr(w, http.StatusInternalServerError, fmt.Errorf("invalid leaf hash: %w", err))
+ return
+ }
+ root, err := hex.DecodeString(cp.DIDMerkleRoot)
+ if err != nil {
+ writeErr(w, http.StatusInternalServerError, fmt.Errorf("invalid checkpoint root"))
+ return
+ }
+ if !merkle.VerifyProof(leafBytes, siblings, root) {
+ writeErr(w, http.StatusInternalServerError, fmt.Errorf("inclusion proof failed consistency check"))
+ return
+ }
+
+ if err := verifyCheckpointUnchanged(); err != nil {
+ writeErr(w, http.StatusConflict, err)
+ return
+ }
+
+ response := proof.DIDInclusionProof{
+ DID: did,
+ ChainTipHash: tipHash,
+ LeafHash: leafHash,
+ MerkleRoot: cp.DIDMerkleRoot,
+ Siblings: siblings,
+ CheckpointSeq: cp.Sequence,
+ CheckpointHash: cp.CheckpointHash,
+ CheckpointSig: cp.Signature,
+ CheckpointKeyID: cp.KeyID,
+ }
+ writeJSON(w, http.StatusOK, map[string]any{
+ "did": did,
+ "checkpoint_sequence": cp.Sequence,
+ "checkpoint_hash": cp.CheckpointHash,
+ "checkpoint_signature": cp.Signature,
+ "merkle_root": cp.DIDMerkleRoot,
+ "chain_tip_reference": tipHash,
+ "inclusion_proof": response,
+ "chain_operation_indices": seqs,
+ })
+}
+
+func (s *Server) selectCheckpointForProof(r *http.Request) (types.CheckpointV1, func() error, error) {
+ checkpointParam := strings.TrimSpace(r.URL.Query().Get("checkpoint"))
+ if checkpointParam == "" {
+ cp, ok, err := s.store.GetLatestCheckpoint()
+ if err != nil {
+ return types.CheckpointV1{}, nil, err
+ }
+ if !ok {
+ return types.CheckpointV1{}, nil, fmt.Errorf("no checkpoint available")
+ }
+ return cp, func() error {
+ now, ok, err := s.store.GetLatestCheckpoint()
+ if err != nil {
+ return err
+ }
+ if !ok {
+ return fmt.Errorf("latest checkpoint disappeared during request")
+ }
+ if now.CheckpointHash != cp.CheckpointHash {
+ return fmt.Errorf("checkpoint advanced during proof generation")
+ }
+ return nil
+ }, nil
+ }
+
+ seq, err := strconv.ParseUint(checkpointParam, 10, 64)
+ if err != nil {
+ return types.CheckpointV1{}, nil, fmt.Errorf("invalid checkpoint query parameter")
+ }
+ cp, ok, err := s.store.GetCheckpoint(seq)
+ if err != nil {
+ return types.CheckpointV1{}, nil, err
+ }
+ if !ok {
+ return types.CheckpointV1{}, nil, fmt.Errorf("checkpoint %d unavailable", seq)
+ }
+ return cp, func() error {
+ again, ok, err := s.store.GetCheckpoint(seq)
+ if err != nil {
+ return err
+ }
+ if !ok || again.CheckpointHash != cp.CheckpointHash {
+ return fmt.Errorf("checkpoint %d changed during proof generation", seq)
+ }
+ return nil
+ }, nil
+}
+
+func writeJSON(w http.ResponseWriter, code int, v any) {
+ w.Header().Set("Content-Type", "application/json")
+ w.WriteHeader(code)
+ _ = json.NewEncoder(w).Encode(v)
+}
+
+func writeErr(w http.ResponseWriter, code int, err error) {
+ writeJSON(w, code, map[string]any{"error": err.Error()})
+}
diff --git a/internal/api/server_checkpoint_test.go b/internal/api/server_checkpoint_test.go
new file mode 100644
index 0000000..9a12f61
--- /dev/null
+++ b/internal/api/server_checkpoint_test.go
@@ -0,0 +1,98 @@
+package api
+
+import (
+ "net/http/httptest"
+ "path/filepath"
+ "strings"
+ "testing"
+
+ "github.com/Fuwn/plutia/internal/config"
+ "github.com/Fuwn/plutia/internal/storage"
+ "github.com/Fuwn/plutia/internal/types"
+)
+
+func TestSelectCheckpointForProofDetectsLatestAdvance(t *testing.T) {
+ dataDir := t.TempDir()
+ store, err := storage.OpenPebble(dataDir)
+ if err != nil {
+ t.Fatalf("open pebble: %v", err)
+ }
+ defer store.Close()
+
+ if err := store.PutCheckpoint(types.CheckpointV1{
+ Version: 1,
+ Sequence: 10,
+ DIDMerkleRoot: "root-10",
+ CheckpointHash: "cp-10",
+ Signature: "sig-10",
+ }); err != nil {
+ t.Fatalf("put checkpoint 10: %v", err)
+ }
+
+ srv := NewServer(config.Default(), store, nil, nil)
+ req := httptest.NewRequest("GET", "/did/did:plc:alice/proof", nil)
+ cp, verifyUnchanged, err := srv.selectCheckpointForProof(req)
+ if err != nil {
+ t.Fatalf("select checkpoint: %v", err)
+ }
+ if cp.Sequence != 10 {
+ t.Fatalf("selected unexpected checkpoint sequence: got %d want 10", cp.Sequence)
+ }
+
+ if err := store.PutCheckpoint(types.CheckpointV1{
+ Version: 1,
+ Sequence: 11,
+ DIDMerkleRoot: "root-11",
+ CheckpointHash: "cp-11",
+ Signature: "sig-11",
+ }); err != nil {
+ t.Fatalf("put checkpoint 11: %v", err)
+ }
+
+ if err := verifyUnchanged(); err == nil || !strings.Contains(err.Error(), "advanced") {
+ t.Fatalf("expected latest-checkpoint advance error, got: %v", err)
+ }
+}
+
+func TestSelectCheckpointForProofDetectsHistoricalMutation(t *testing.T) {
+ dataDir := t.TempDir()
+ store, err := storage.OpenPebble(filepath.Clean(dataDir))
+ if err != nil {
+ t.Fatalf("open pebble: %v", err)
+ }
+ defer store.Close()
+
+ if err := store.PutCheckpoint(types.CheckpointV1{
+ Version: 1,
+ Sequence: 20,
+ DIDMerkleRoot: "root-20-a",
+ CheckpointHash: "cp-20-a",
+ Signature: "sig-20-a",
+ }); err != nil {
+ t.Fatalf("put checkpoint 20(a): %v", err)
+ }
+
+ srv := NewServer(config.Default(), store, nil, nil)
+ req := httptest.NewRequest("GET", "/did/did:plc:alice/proof?checkpoint=20", nil)
+ cp, verifyUnchanged, err := srv.selectCheckpointForProof(req)
+ if err != nil {
+ t.Fatalf("select historical checkpoint: %v", err)
+ }
+ if cp.CheckpointHash != "cp-20-a" {
+ t.Fatalf("selected unexpected checkpoint hash: got %s", cp.CheckpointHash)
+ }
+
+ if err := store.PutCheckpoint(types.CheckpointV1{
+ Version: 1,
+ Sequence: 20,
+ DIDMerkleRoot: "root-20-b",
+ CheckpointHash: "cp-20-b",
+ Signature: "sig-20-b",
+ }); err != nil {
+ t.Fatalf("put checkpoint 20(b): %v", err)
+ }
+
+ if err := verifyUnchanged(); err == nil || !strings.Contains(err.Error(), "changed") {
+ t.Fatalf("expected historical-checkpoint changed error, got: %v", err)
+ }
+}
diff --git a/internal/api/server_integration_test.go b/internal/api/server_integration_test.go
new file mode 100644
index 0000000..1736750
--- /dev/null
+++ b/internal/api/server_integration_test.go
@@ -0,0 +1,261 @@
+package api
+
+import (
+ "context"
+ "crypto/ed25519"
+ "crypto/rand"
+ "encoding/base64"
+ "encoding/json"
+ "fmt"
+ "io"
+ "net/http"
+ "net/http/httptest"
+ "os"
+ "path/filepath"
+ "strings"
+ "testing"
+
+ "github.com/Fuwn/plutia/internal/checkpoint"
+ "github.com/Fuwn/plutia/internal/config"
+ "github.com/Fuwn/plutia/internal/ingest"
+ "github.com/Fuwn/plutia/internal/storage"
+ "github.com/Fuwn/plutia/internal/types"
+)
+
+func TestProofAgainstOlderCheckpointAfterFurtherIngest(t *testing.T) {
+ tmp := t.TempDir()
+ dataDir := filepath.Join(tmp, "data")
+ if err := os.MkdirAll(dataDir, 0o755); err != nil {
+ t.Fatalf("mkdir data: %v", err)
+ }
+
+ keyPath := filepath.Join(tmp, "mirror.key")
+ seed := make([]byte, ed25519.SeedSize)
+ if _, err := rand.Read(seed); err != nil {
+ t.Fatalf("seed: %v", err)
+ }
+ if err := os.WriteFile(keyPath, []byte(base64.RawURLEncoding.EncodeToString(seed)), 0o600); err != nil {
+ t.Fatalf("write key: %v", err)
+ }
+
+ recs := buildCheckpointScenarioRecords(t)
+ sourcePath := filepath.Join(tmp, "records.ndjson")
+ writeRecordsFile(t, sourcePath, recs)
+
+ store, err := storage.OpenPebble(dataDir)
+ if err != nil {
+ t.Fatalf("open pebble: %v", err)
+ }
+ defer store.Close()
+ if err := store.SetMode(config.ModeMirror); err != nil {
+ t.Fatalf("set mode: %v", err)
+ }
+ bl, err := storage.OpenBlockLog(dataDir, 3, 4)
+ if err != nil {
+ t.Fatalf("open block log: %v", err)
+ }
+
+ cfg := config.Config{
+ Mode: config.ModeMirror,
+ DataDir: dataDir,
+ PLCSource: sourcePath,
+ VerifyPolicy: config.VerifyFull,
+ ZstdLevel: 3,
+ BlockSizeMB: 4,
+ CheckpointInterval: 2,
+ ListenAddr: ":0",
+ MirrorPrivateKeyPath: keyPath,
+ PollInterval: 5,
+ }
+ cpMgr := checkpoint.NewManager(store, dataDir, keyPath)
+ svc := ingest.NewService(cfg, store, ingest.NewClient(sourcePath), bl, cpMgr)
+ if err := svc.Replay(context.Background()); err != nil {
+ t.Fatalf("replay: %v", err)
+ }
+ if err := svc.Flush(context.Background()); err != nil {
+ t.Fatalf("flush: %v", err)
+ }
+
+ cp2, ok, err := store.GetCheckpoint(2)
+ if err != nil || !ok {
+ t.Fatalf("checkpoint 2 missing: ok=%v err=%v", ok, err)
+ }
+
+ ts := httptest.NewServer(NewServer(cfg, store, svc, cpMgr).Handler())
+ defer ts.Close()
+
+ url := ts.URL + "/did/" + strings.ReplaceAll("did:plc:alice", ":", "%3A") + "/proof?checkpoint=2"
+ resp, err := http.Get(url)
+ if err != nil {
+ t.Fatalf("get proof: %v", err)
+ }
+ defer resp.Body.Close()
+ if resp.StatusCode != http.StatusOK {
+ bodyBytes, _ := io.ReadAll(resp.Body)
+ t.Fatalf("proof status: %d body=%s", resp.StatusCode, string(bodyBytes))
+ }
+
+ var body struct {
+ CheckpointSequence uint64 `json:"checkpoint_sequence"`
+ ChainTipReference string `json:"chain_tip_reference"`
+ InclusionProof struct {
+ MerkleRoot string `json:"merkle_root"`
+ } `json:"inclusion_proof"`
+ }
+ if err := json.NewDecoder(resp.Body).Decode(&body); err != nil {
+ t.Fatalf("decode response: %v", err)
+ }
+ if body.CheckpointSequence != 2 {
+ t.Fatalf("unexpected checkpoint sequence: got %d want 2", body.CheckpointSequence)
+ }
+ if body.ChainTipReference != recs[0].CID {
+ t.Fatalf("expected old tip at checkpoint=2: got %s want %s", body.ChainTipReference, recs[0].CID)
+ }
+ if body.InclusionProof.MerkleRoot != cp2.DIDMerkleRoot {
+ t.Fatalf("merkle root mismatch: got %s want %s", body.InclusionProof.MerkleRoot, cp2.DIDMerkleRoot)
+ }
+}
+
+func TestCorruptedBlockRefusesProof(t *testing.T) {
+ tmp := t.TempDir()
+ dataDir := filepath.Join(tmp, "data")
+ if err := os.MkdirAll(dataDir, 0o755); err != nil {
+ t.Fatalf("mkdir data: %v", err)
+ }
+
+ seed := make([]byte, ed25519.SeedSize)
+ if _, err := rand.Read(seed); err != nil {
+ t.Fatalf("seed: %v", err)
+ }
+ keyPath := filepath.Join(tmp, "mirror.key")
+ if err := os.WriteFile(keyPath, []byte(base64.RawURLEncoding.EncodeToString(seed)), 0o600); err != nil {
+ t.Fatalf("write key: %v", err)
+ }
+
+ recs := buildCheckpointScenarioRecords(t)
+ sourcePath := filepath.Join(tmp, "records.ndjson")
+ writeRecordsFile(t, sourcePath, recs)
+
+ store, err := storage.OpenPebble(dataDir)
+ if err != nil {
+ t.Fatalf("open pebble: %v", err)
+ }
+ if err := store.SetMode(config.ModeMirror); err != nil {
+ t.Fatalf("set mode: %v", err)
+ }
+ bl, err := storage.OpenBlockLog(dataDir, 3, 4)
+ if err != nil {
+ t.Fatalf("open block log: %v", err)
+ }
+ cfg := config.Config{
+ Mode: config.ModeMirror,
+ DataDir: dataDir,
+ PLCSource: sourcePath,
+ VerifyPolicy: config.VerifyFull,
+ ZstdLevel: 3,
+ BlockSizeMB: 4,
+ CheckpointInterval: 2,
+ ListenAddr: ":0",
+ MirrorPrivateKeyPath: keyPath,
+ PollInterval: 5,
+ }
+ cpMgr := checkpoint.NewManager(store, dataDir, keyPath)
+ svc := ingest.NewService(cfg, store, ingest.NewClient(sourcePath), bl, cpMgr)
+ if err := svc.Replay(context.Background()); err != nil {
+ t.Fatalf("replay: %v", err)
+ }
+ if _, err := svc.Snapshot(context.Background()); err != nil {
+ t.Fatalf("snapshot: %v", err)
+ }
+ svc.Close()
+ if err := store.Close(); err != nil {
+ t.Fatalf("close store: %v", err)
+ }
+
+ blockPath := filepath.Join(dataDir, "ops", "000001.zst")
+ b, err := os.ReadFile(blockPath)
+ if err != nil {
+ t.Fatalf("read block: %v", err)
+ }
+ b[len(b)/2] ^= 0xFF
+ if err := os.WriteFile(blockPath, b, 0o644); err != nil {
+ t.Fatalf("write corrupted block: %v", err)
+ }
+
+ store2, err := storage.OpenPebble(dataDir)
+ if err != nil {
+ t.Fatalf("open store2: %v", err)
+ }
+ defer store2.Close()
+ bl2, err := storage.OpenBlockLog(dataDir, 3, 4)
+ if err != nil {
+ t.Fatalf("open blocklog2: %v", err)
+ }
+ svc2 := ingest.NewService(cfg, store2, ingest.NewClient("file:///nonexistent"), bl2, checkpoint.NewManager(store2, dataDir, keyPath))
+ if !svc2.IsCorrupted() {
+ t.Fatalf("expected service to detect corruption on restart")
+ }
+
+ ts := httptest.NewServer(NewServer(cfg, store2, svc2, checkpoint.NewManager(store2, dataDir, keyPath)).Handler())
+ defer ts.Close()
+ url := ts.URL + "/did/" + strings.ReplaceAll("did:plc:alice", ":", "%3A") + "/proof"
+ resp, err := http.Get(url)
+ if err != nil {
+ t.Fatalf("request proof: %v", err)
+ }
+ defer resp.Body.Close()
+ if resp.StatusCode != http.StatusServiceUnavailable {
+ t.Fatalf("expected 503 for corrupted proof, got %d", resp.StatusCode)
+ }
+}
+
+func writeRecordsFile(t *testing.T, path string, recs []types.ExportRecord) {
+ t.Helper()
+ f, err := os.Create(path)
+ if err != nil {
+ t.Fatalf("create records file: %v", err)
+ }
+ defer f.Close()
+ for _, rec := range recs {
+ b, _ := json.Marshal(rec)
+ if _, err := fmt.Fprintln(f, string(b)); err != nil {
+ t.Fatalf("write record: %v", err)
+ }
+ }
+}
+
+func buildCheckpointScenarioRecords(t *testing.T) []types.ExportRecord {
+ t.Helper()
+ pub, priv, err := ed25519.GenerateKey(rand.Reader)
+ if err != nil {
+ t.Fatalf("generate key: %v", err)
+ }
+ mk := func(seq uint64, did, prev string) types.ExportRecord {
+ unsigned := map[string]any{
+ "did": did,
+ "didDoc": map[string]any{"id": did, "seq": seq},
+ "publicKey": base64.RawURLEncoding.EncodeToString(pub),
+ }
+ if prev != "" {
+ unsigned["prev"] = prev
+ }
+ payload, _ := json.Marshal(unsigned)
+ canon, _ := types.CanonicalizeJSON(payload)
+ sig := ed25519.Sign(priv, canon)
+ op := map[string]any{}
+ for k, v := range unsigned {
+ op[k] = v
+ }
+ op["sigPayload"] = base64.RawURLEncoding.EncodeToString(canon)
+ op["sig"] = base64.RawURLEncoding.EncodeToString(sig)
+ raw, _ := json.Marshal(op)
+ opCanon, _ := types.CanonicalizeJSON(raw)
+ cid := types.ComputeDigestCID(opCanon)
+ return types.ExportRecord{Seq: seq, DID: did, CID: cid, Operation: raw}
+ }
+
+ r1 := mk(1, "did:plc:alice", "")
+ r2 := mk(2, "did:plc:bob", "")
+ r3 := mk(3, "did:plc:alice", r1.CID)
+ return []types.ExportRecord{r1, r2, r3}
+}
diff --git a/internal/checkpoint/checkpoint.go b/internal/checkpoint/checkpoint.go
new file mode 100644
index 0000000..840fac3
--- /dev/null
+++ b/internal/checkpoint/checkpoint.go
@@ -0,0 +1,394 @@
+package checkpoint
+
+import (
+ "bufio"
+ "crypto/ed25519"
+ "crypto/sha256"
+ "encoding/base64"
+ "encoding/hex"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "os"
+ "path/filepath"
+ "sort"
+ "strings"
+ "time"
+
+ "github.com/Fuwn/plutia/internal/merkle"
+ "github.com/Fuwn/plutia/internal/storage"
+ "github.com/Fuwn/plutia/internal/types"
+ "github.com/mr-tron/base58"
+)
+
+type Manager struct {
+ store storage.Store
+ dataDir string
+ keyPath string
+}
+
+type BuildMetrics struct {
+ DIDCount int
+ MerkleCompute time.Duration
+ Total time.Duration
+}
+
+func NewManager(store storage.Store, dataDir, keyPath string) *Manager {
+ return &Manager{store: store, dataDir: dataDir, keyPath: keyPath}
+}
+
+func (m *Manager) BuildAndStore(sequence uint64, states []types.StateV1, blockHashes []string) (types.CheckpointV1, error) {
+ didRoot, leaves := didMerkle(states)
+ if err := m.writeCheckpointStateSnapshot(sequence, leaves); err != nil {
+ return types.CheckpointV1{}, err
+ }
+ return m.signAndPersist(sequence, didRoot, blockHashes)
+}
+
+func (m *Manager) BuildAndStoreFromStore(sequence uint64, blockHashes []string) (types.CheckpointV1, error) {
+ cp, _, err := m.BuildAndStoreFromStoreWithMetrics(sequence, blockHashes)
+ return cp, err
+}
+
+func (m *Manager) BuildAndStoreFromStoreWithMetrics(sequence uint64, blockHashes []string) (types.CheckpointV1, BuildMetrics, error) {
+ start := time.Now()
+ didRoot, didCount, merkleCompute, err := m.writeCheckpointStateSnapshotFromStore(sequence)
+ if err != nil {
+ return types.CheckpointV1{}, BuildMetrics{}, err
+ }
+ cp, err := m.signAndPersist(sequence, didRoot, blockHashes)
+ if err != nil {
+ return types.CheckpointV1{}, BuildMetrics{}, err
+ }
+ return cp, BuildMetrics{
+ DIDCount: didCount,
+ MerkleCompute: merkleCompute,
+ Total: time.Since(start),
+ }, nil
+}
+
+func (m *Manager) signAndPersist(sequence uint64, didRoot string, blockHashes []string) (types.CheckpointV1, error) {
+ privateKey, keyID, err := m.loadSigningKey()
+ if err != nil {
+ return types.CheckpointV1{}, err
+ }
+ blockRoot := blockMerkle(blockHashes)
+
+ prev := ""
+ if latest, ok, err := m.store.GetLatestCheckpoint(); err == nil && ok {
+ prev = latest.CheckpointHash
+ } else if err != nil {
+ return types.CheckpointV1{}, fmt.Errorf("load latest checkpoint: %w", err)
+ }
+
+ unsigned := types.CheckpointV1{
+ Version: 1,
+ Sequence: sequence,
+ Timestamp: time.Now().UTC().Format(time.RFC3339),
+ DIDMerkleRoot: didRoot,
+ BlockMerkleRoot: blockRoot,
+ PreviousCheckpointHash: prev,
+ KeyID: keyID,
+ }
+ payload, err := marshalCheckpointPayload(unsigned)
+ if err != nil {
+ return types.CheckpointV1{}, err
+ }
+ sum := sha256.Sum256(payload)
+ unsigned.CheckpointHash = hex.EncodeToString(sum[:])
+ unsigned.Signature = base64.RawURLEncoding.EncodeToString(ed25519.Sign(privateKey, payload))
+
+ if err := m.store.PutCheckpoint(unsigned); err != nil {
+ return types.CheckpointV1{}, fmt.Errorf("persist checkpoint: %w", err)
+ }
+ if err := m.writeCheckpointFile(unsigned); err != nil {
+ return types.CheckpointV1{}, err
+ }
+ return unsigned, nil
+}
+
+func (m *Manager) BuildDIDProofAtCheckpoint(did, chainTipHash string, checkpointSeq uint64) ([]merkle.Sibling, string, bool, error) {
+ snapshot, err := m.LoadStateSnapshot(checkpointSeq)
+ if err != nil {
+ return nil, "", false, err
+ }
+ leaves := make([][]byte, len(snapshot.Leaves))
+ index := -1
+ leafHashHex := ""
+ for i, s := range snapshot.Leaves {
+ h := merkle.HashLeaf([]byte(s.DID + s.ChainTipHash))
+ leaves[i] = h
+ if s.DID == did && s.ChainTipHash == chainTipHash {
+ index = i
+ leafHashHex = hex.EncodeToString(h)
+ }
+ }
+ if index < 0 {
+ return nil, "", false, nil
+ }
+ proof := merkle.BuildProof(leaves, index)
+ return proof, leafHashHex, true, nil
+}
+
+func (m *Manager) LoadStateSnapshot(sequence uint64) (types.CheckpointStateSnapshotV1, error) {
+ path := filepath.Join(m.dataDir, "checkpoints", fmt.Sprintf("%020d.state.json", sequence))
+ b, err := os.ReadFile(path)
+ if err != nil {
+ return types.CheckpointStateSnapshotV1{}, fmt.Errorf("read checkpoint state snapshot: %w", err)
+ }
+ var snap types.CheckpointStateSnapshotV1
+ if err := json.Unmarshal(b, &snap); err != nil {
+ return types.CheckpointStateSnapshotV1{}, fmt.Errorf("unmarshal checkpoint state snapshot: %w", err)
+ }
+ if snap.Sequence != sequence {
+ return types.CheckpointStateSnapshotV1{}, fmt.Errorf("snapshot sequence mismatch: got %d want %d", snap.Sequence, sequence)
+ }
+ return snap, nil
+}
+
+func didMerkle(states []types.StateV1) (string, []types.DIDLeaf) {
+ sorted := append([]types.StateV1(nil), states...)
+ sort.Slice(sorted, func(i, j int) bool { return sorted[i].DID < sorted[j].DID })
+ acc := merkle.NewAccumulator()
+ snapshotLeaves := make([]types.DIDLeaf, 0, len(sorted))
+ for _, s := range sorted {
+ snapshotLeaves = append(snapshotLeaves, types.DIDLeaf{
+ DID: s.DID,
+ ChainTipHash: s.ChainTipHash,
+ })
+ acc.AddLeafHash(merkle.HashLeaf([]byte(s.DID + s.ChainTipHash)))
+ }
+ root := acc.RootDuplicateLast()
+ return hex.EncodeToString(root), snapshotLeaves
+}
+
+func blockMerkle(hashes []string) string {
+ if len(hashes) == 0 {
+ root := merkle.Root(nil)
+ return hex.EncodeToString(root)
+ }
+ leaves := make([][]byte, 0, len(hashes))
+ for _, h := range hashes {
+ decoded, err := hex.DecodeString(strings.TrimSpace(h))
+ if err != nil {
+ decoded = []byte(h)
+ }
+ leaves = append(leaves, merkle.HashLeaf(decoded))
+ }
+ root := merkle.Root(leaves)
+ return hex.EncodeToString(root)
+}
+
+func marshalCheckpointPayload(cp types.CheckpointV1) ([]byte, error) {
+ clone := cp
+ clone.Signature = ""
+ clone.CheckpointHash = ""
+ b, err := json.Marshal(clone)
+ if err != nil {
+ return nil, fmt.Errorf("marshal checkpoint payload: %w", err)
+ }
+ return types.CanonicalizeJSON(b)
+}
+
+func (m *Manager) writeCheckpointFile(cp types.CheckpointV1) error {
+ dir := filepath.Join(m.dataDir, "checkpoints")
+ if err := os.MkdirAll(dir, 0o755); err != nil {
+ return fmt.Errorf("mkdir checkpoints: %w", err)
+ }
+ path := filepath.Join(dir, fmt.Sprintf("%020d.json", cp.Sequence))
+ b, err := json.MarshalIndent(cp, "", " ")
+ if err != nil {
+ return fmt.Errorf("marshal checkpoint: %w", err)
+ }
+ if err := os.WriteFile(path, b, 0o644); err != nil {
+ return fmt.Errorf("write checkpoint file: %w", err)
+ }
+ return nil
+}
+
+func (m *Manager) writeCheckpointStateSnapshot(sequence uint64, leaves []types.DIDLeaf) error {
+ dir := filepath.Join(m.dataDir, "checkpoints")
+ if err := os.MkdirAll(dir, 0o755); err != nil {
+ return fmt.Errorf("mkdir checkpoints: %w", err)
+ }
+ snap := types.CheckpointStateSnapshotV1{
+ Version: 1,
+ Sequence: sequence,
+ CreatedAt: time.Now().UTC().Format(time.RFC3339),
+ Leaves: leaves,
+ }
+ b, err := json.Marshal(snap)
+ if err != nil {
+ return fmt.Errorf("marshal checkpoint state snapshot: %w", err)
+ }
+ path := filepath.Join(dir, fmt.Sprintf("%020d.state.json", sequence))
+ if err := os.WriteFile(path, b, 0o644); err != nil {
+ return fmt.Errorf("write checkpoint state snapshot: %w", err)
+ }
+ return nil
+}
+
+func (m *Manager) writeCheckpointStateSnapshotFromStore(sequence uint64) (string, int, time.Duration, error) {
+ dir := filepath.Join(m.dataDir, "checkpoints")
+ if err := os.MkdirAll(dir, 0o755); err != nil {
+ return "", 0, 0, fmt.Errorf("mkdir checkpoints: %w", err)
+ }
+ finalPath := filepath.Join(dir, fmt.Sprintf("%020d.state.json", sequence))
+ tmpPath := finalPath + ".tmp"
+
+ f, err := os.Create(tmpPath)
+ if err != nil {
+ return "", 0, 0, fmt.Errorf("create checkpoint state snapshot temp file: %w", err)
+ }
+ createdAt := time.Now().UTC().Format(time.RFC3339)
+ w := bufio.NewWriterSize(f, 1<<20)
+ if _, err := fmt.Fprintf(w, `{"v":1,"sequence":%d,"created_at":"%s","leaves":[`, sequence, createdAt); err != nil {
+ _ = f.Close()
+ return "", 0, 0, fmt.Errorf("write snapshot header: %w", err)
+ }
+
+ first := true
+ didCount := 0
+ merkleCompute := time.Duration(0)
+ acc := merkle.NewAccumulator()
+ if err := m.store.ForEachState(func(s types.StateV1) error {
+ leaf := types.DIDLeaf{
+ DID: s.DID,
+ ChainTipHash: s.ChainTipHash,
+ }
+ b, err := json.Marshal(leaf)
+ if err != nil {
+ return fmt.Errorf("marshal checkpoint leaf: %w", err)
+ }
+ if !first {
+ if _, err := w.WriteString(","); err != nil {
+ return err
+ }
+ }
+ if _, err := w.Write(b); err != nil {
+ return err
+ }
+ first = false
+ hashStart := time.Now()
+ acc.AddLeafHash(merkle.HashLeaf([]byte(leaf.DID + leaf.ChainTipHash)))
+ merkleCompute += time.Since(hashStart)
+ didCount++
+ return nil
+ }); err != nil {
+ _ = f.Close()
+ return "", 0, 0, err
+ }
+
+ if _, err := w.WriteString(`]}`); err != nil {
+ _ = f.Close()
+ return "", 0, 0, fmt.Errorf("write snapshot trailer: %w", err)
+ }
+ if err := w.Flush(); err != nil {
+ _ = f.Close()
+ return "", 0, 0, fmt.Errorf("flush snapshot writer: %w", err)
+ }
+ if err := f.Sync(); err != nil {
+ _ = f.Close()
+ return "", 0, 0, fmt.Errorf("sync snapshot file: %w", err)
+ }
+ if err := f.Close(); err != nil {
+ return "", 0, 0, fmt.Errorf("close snapshot file: %w", err)
+ }
+ if err := os.Rename(tmpPath, finalPath); err != nil {
+ return "", 0, 0, fmt.Errorf("rename snapshot file: %w", err)
+ }
+ return hex.EncodeToString(acc.RootDuplicateLast()), didCount, merkleCompute, nil
+}
+
+func (m *Manager) loadSigningKey() (ed25519.PrivateKey, string, error) {
+ data, err := os.ReadFile(m.keyPath)
+ if err != nil {
+ return nil, "", fmt.Errorf("read mirror private key: %w", err)
+ }
+ text := strings.TrimSpace(string(data))
+ if text == "" {
+ return nil, "", errors.New("empty mirror private key")
+ }
+
+ if raw, err := decodeKeyString(text); err == nil {
+ return keyFromRaw(raw)
+ }
+
+ var k struct {
+ PrivateKey string `json:"private_key"`
+ }
+ if err := json.Unmarshal(data, &k); err == nil && strings.TrimSpace(k.PrivateKey) != "" {
+ raw, err := decodeKeyString(k.PrivateKey)
+ if err != nil {
+ return nil, "", err
+ }
+ return keyFromRaw(raw)
+ }
+
+ return nil, "", errors.New("unsupported private key format")
+}
+
+func decodeKeyString(v string) ([]byte, error) {
+ if strings.HasPrefix(v, "did:key:") {
+ mb := strings.TrimPrefix(v, "did:key:")
+ if mb == "" || mb[0] != 'z' {
+ return nil, errors.New("unsupported did:key format")
+ }
+ decoded, err := base58.Decode(mb[1:])
+ if err != nil {
+ return nil, err
+ }
+ if len(decoded) < 34 {
+ return nil, errors.New("invalid did:key length")
+ }
+ return decoded[len(decoded)-32:], nil
+ }
+ if isHexString(v) {
+ if b, err := hex.DecodeString(v); err == nil {
+ return b, nil
+ }
+ }
+ if b, err := base64.RawURLEncoding.DecodeString(v); err == nil {
+ return b, nil
+ }
+ if b, err := base64.StdEncoding.DecodeString(v); err == nil {
+ return b, nil
+ }
+ if b, err := hex.DecodeString(v); err == nil {
+ return b, nil
+ }
+ return nil, errors.New("unknown key encoding")
+}
+
+func isHexString(v string) bool {
+ if len(v) == 0 || len(v)%2 != 0 {
+ return false
+ }
+ for _, r := range v {
+ if (r >= '0' && r <= '9') || (r >= 'a' && r <= 'f') || (r >= 'A' && r <= 'F') {
+ continue
+ }
+ return false
+ }
+ return true
+}
+
+func keyFromRaw(raw []byte) (ed25519.PrivateKey, string, error) {
+ switch len(raw) {
+ case ed25519.SeedSize:
+ pk := ed25519.NewKeyFromSeed(raw)
+ kid := keyID(pk.Public().(ed25519.PublicKey))
+ return pk, kid, nil
+ case ed25519.PrivateKeySize:
+ pk := ed25519.PrivateKey(raw)
+ kid := keyID(pk.Public().(ed25519.PublicKey))
+ return pk, kid, nil
+ default:
+ return nil, "", fmt.Errorf("invalid private key length %d", len(raw))
+ }
+}
+
+func keyID(pub ed25519.PublicKey) string {
+ sum := sha256.Sum256(pub)
+ return "ed25519:" + hex.EncodeToString(sum[:8])
+}
diff --git a/internal/checkpoint/checkpoint_test.go b/internal/checkpoint/checkpoint_test.go
new file mode 100644
index 0000000..9ca9b2a
--- /dev/null
+++ b/internal/checkpoint/checkpoint_test.go
@@ -0,0 +1,105 @@
+package checkpoint
+
+import (
+ "crypto/ed25519"
+ "crypto/rand"
+ "encoding/base64"
+ "os"
+ "path/filepath"
+ "testing"
+
+ "github.com/Fuwn/plutia/internal/storage"
+ "github.com/Fuwn/plutia/internal/types"
+)
+
+func TestBuildAndStoreCheckpoint(t *testing.T) {
+ tmp := t.TempDir()
+ store, err := storage.OpenPebble(tmp)
+ if err != nil {
+ t.Fatalf("open pebble: %v", err)
+ }
+ defer store.Close()
+
+ pub, priv, err := ed25519.GenerateKey(rand.Reader)
+ if err != nil {
+ t.Fatalf("generate key: %v", err)
+ }
+ keyPath := filepath.Join(tmp, "mirror.key")
+ if err := os.WriteFile(keyPath, []byte(base64.RawURLEncoding.EncodeToString(priv)), 0o600); err != nil {
+ t.Fatalf("write key: %v", err)
+ }
+
+ mgr := NewManager(store, tmp, keyPath)
+ states := []types.StateV1{
+ {Version: 1, DID: "did:plc:a", ChainTipHash: "tip-a"},
+ {Version: 1, DID: "did:plc:b", ChainTipHash: "tip-b"},
+ }
+ cp, err := mgr.BuildAndStore(42, states, []string{"abc", "def"})
+ if err != nil {
+ t.Fatalf("build checkpoint: %v", err)
+ }
+ if cp.Signature == "" || cp.CheckpointHash == "" {
+ t.Fatalf("expected signed checkpoint")
+ }
+ payload, err := marshalCheckpointPayload(cp)
+ if err != nil {
+ t.Fatalf("marshal payload: %v", err)
+ }
+ rawSig, err := base64.RawURLEncoding.DecodeString(cp.Signature)
+ if err != nil {
+ t.Fatalf("decode signature: %v", err)
+ }
+ if !ed25519.Verify(pub, payload, rawSig) {
+ t.Fatalf("signature verification failed")
+ }
+
+ stored, ok, err := store.GetCheckpoint(cp.Sequence)
+ if err != nil || !ok {
+ t.Fatalf("stored checkpoint missing: ok=%v err=%v", ok, err)
+ }
+ if stored.CheckpointHash != cp.CheckpointHash {
+ t.Fatalf("checkpoint hash mismatch")
+ }
+}
+
+func TestCheckpointRootStability(t *testing.T) {
+ tmp := t.TempDir()
+ store, err := storage.OpenPebble(tmp)
+ if err != nil {
+ t.Fatalf("open pebble: %v", err)
+ }
+ defer store.Close()
+
+ if err := store.PutState(types.StateV1{Version: 1, DID: "did:plc:b", ChainTipHash: "tip-b"}); err != nil {
+ t.Fatalf("put state b: %v", err)
+ }
+ if err := store.PutState(types.StateV1{Version: 1, DID: "did:plc:a", ChainTipHash: "tip-a"}); err != nil {
+ t.Fatalf("put state a: %v", err)
+ }
+
+ _, priv, err := ed25519.GenerateKey(rand.Reader)
+ if err != nil {
+ t.Fatalf("generate key: %v", err)
+ }
+ keyPath := filepath.Join(tmp, "mirror.key")
+ if err := os.WriteFile(keyPath, []byte(base64.RawURLEncoding.EncodeToString(priv)), 0o600); err != nil {
+ t.Fatalf("write key: %v", err)
+ }
+
+ mgr := NewManager(store, tmp, keyPath)
+ cp1, err := mgr.BuildAndStoreFromStore(100, []string{"abc"})
+ if err != nil {
+ t.Fatalf("build checkpoint 1: %v", err)
+ }
+ cp2, err := mgr.BuildAndStoreFromStore(200, []string{"abc"})
+ if err != nil {
+ t.Fatalf("build checkpoint 2: %v", err)
+ }
+
+ if cp1.DIDMerkleRoot != cp2.DIDMerkleRoot {
+ t.Fatalf("did root changed for identical state: %s vs %s", cp1.DIDMerkleRoot, cp2.DIDMerkleRoot)
+ }
+ if cp1.BlockMerkleRoot != cp2.BlockMerkleRoot {
+ t.Fatalf("block root changed for identical block set: %s vs %s", cp1.BlockMerkleRoot, cp2.BlockMerkleRoot)
+ }
+}
diff --git a/internal/config/config.go b/internal/config/config.go
new file mode 100644
index 0000000..5e6e467
--- /dev/null
+++ b/internal/config/config.go
@@ -0,0 +1,153 @@
+package config
+
+import (
+ "errors"
+ "fmt"
+ "os"
+ "runtime"
+ "strconv"
+ "strings"
+ "time"
+
+ "gopkg.in/yaml.v3"
+)
+
+const (
+ ModeResolver = "resolver"
+ ModeMirror = "mirror"
+
+ VerifyFull = "full"
+ VerifyLazy = "lazy"
+ VerifyStateOnly = "state-only"
+)
+
+type Config struct {
+ Mode string `yaml:"mode"`
+ DataDir string `yaml:"data_dir"`
+ PLCSource string `yaml:"plc_source"`
+ VerifyPolicy string `yaml:"verify"`
+ ZstdLevel int `yaml:"zstd_level"`
+ BlockSizeMB int `yaml:"block_size_mb"`
+ CheckpointInterval uint64 `yaml:"checkpoint_interval"`
+ CommitBatchSize int `yaml:"commit_batch_size"`
+ VerifyWorkers int `yaml:"verify_workers"`
+ ListenAddr string `yaml:"listen_addr"`
+ MirrorPrivateKeyPath string `yaml:"mirror_private_key_path"`
+ PollInterval time.Duration `yaml:"poll_interval"`
+}
+
+func Default() Config {
+ return Config{
+ Mode: ModeMirror,
+ DataDir: "./data",
+ PLCSource: "https://plc.directory",
+ VerifyPolicy: VerifyFull,
+ ZstdLevel: 9,
+ BlockSizeMB: 8,
+ CheckpointInterval: 100000,
+ CommitBatchSize: 128,
+ VerifyWorkers: runtime.NumCPU(),
+ ListenAddr: ":8080",
+ MirrorPrivateKeyPath: "./mirror.key",
+ PollInterval: 5 * time.Second,
+ }
+}
+
+func Load(path string) (Config, error) {
+ cfg := Default()
+ if path != "" {
+ b, err := os.ReadFile(path)
+ if err != nil {
+ return Config{}, fmt.Errorf("read config: %w", err)
+ }
+ if err := yaml.Unmarshal(b, &cfg); err != nil {
+ return Config{}, fmt.Errorf("parse config: %w", err)
+ }
+ }
+ applyEnv(&cfg)
+ if err := cfg.Validate(); err != nil {
+ return Config{}, err
+ }
+ return cfg, nil
+}
+
+func applyEnv(cfg *Config) {
+ setString := func(key string, dst *string) {
+ if v := strings.TrimSpace(os.Getenv(key)); v != "" {
+ *dst = v
+ }
+ }
+ setInt := func(key string, dst *int) {
+ if v := strings.TrimSpace(os.Getenv(key)); v != "" {
+ if n, err := strconv.Atoi(v); err == nil {
+ *dst = n
+ }
+ }
+ }
+ setUint64 := func(key string, dst *uint64) {
+ if v := strings.TrimSpace(os.Getenv(key)); v != "" {
+ if n, err := strconv.ParseUint(v, 10, 64); err == nil {
+ *dst = n
+ }
+ }
+ }
+ setDuration := func(key string, dst *time.Duration) {
+ if v := strings.TrimSpace(os.Getenv(key)); v != "" {
+ if d, err := time.ParseDuration(v); err == nil {
+ *dst = d
+ }
+ }
+ }
+
+ setString("PLUTIA_MODE", &cfg.Mode)
+ setString("PLUTIA_DATA_DIR", &cfg.DataDir)
+ setString("PLUTIA_PLC_SOURCE", &cfg.PLCSource)
+ setString("PLUTIA_VERIFY", &cfg.VerifyPolicy)
+ setInt("PLUTIA_ZSTD_LEVEL", &cfg.ZstdLevel)
+ setInt("PLUTIA_BLOCK_SIZE_MB", &cfg.BlockSizeMB)
+ setInt("PLUTIA_COMMIT_BATCH_SIZE", &cfg.CommitBatchSize)
+ setInt("PLUTIA_VERIFY_WORKERS", &cfg.VerifyWorkers)
+ setUint64("PLUTIA_CHECKPOINT_INTERVAL", &cfg.CheckpointInterval)
+ setString("PLUTIA_LISTEN_ADDR", &cfg.ListenAddr)
+ setString("PLUTIA_MIRROR_PRIVATE_KEY_PATH", &cfg.MirrorPrivateKeyPath)
+ setDuration("PLUTIA_POLL_INTERVAL", &cfg.PollInterval)
+}
+
+func (c Config) Validate() error {
+ if c.Mode != ModeResolver && c.Mode != ModeMirror {
+ return fmt.Errorf("invalid mode %q", c.Mode)
+ }
+ switch c.VerifyPolicy {
+ case VerifyFull, VerifyLazy, VerifyStateOnly:
+ default:
+ return fmt.Errorf("invalid verify policy %q", c.VerifyPolicy)
+ }
+ if c.DataDir == "" {
+ return errors.New("data_dir is required")
+ }
+ if c.PLCSource == "" {
+ return errors.New("plc_source is required")
+ }
+ if c.ZstdLevel < 1 || c.ZstdLevel > 22 {
+ return fmt.Errorf("zstd_level must be between 1 and 22, got %d", c.ZstdLevel)
+ }
+ if c.BlockSizeMB < 4 || c.BlockSizeMB > 16 {
+ return fmt.Errorf("block_size_mb must be between 4 and 16, got %d", c.BlockSizeMB)
+ }
+ if c.CheckpointInterval == 0 {
+ return errors.New("checkpoint_interval must be > 0")
+ }
+ if c.CommitBatchSize <= 0 || c.CommitBatchSize > 4096 {
+ return fmt.Errorf("commit_batch_size must be between 1 and 4096, got %d", c.CommitBatchSize)
+ }
+ if c.VerifyWorkers <= 0 || c.VerifyWorkers > 1024 {
+ return fmt.Errorf("verify_workers must be between 1 and 1024, got %d", c.VerifyWorkers)
+ }
+ if c.ListenAddr == "" {
+ return errors.New("listen_addr is required")
+ }
+ if c.PollInterval <= 0 {
+ return errors.New("poll_interval must be > 0")
+ }
+ return nil
+}
diff --git a/internal/ingest/client.go b/internal/ingest/client.go
new file mode 100644
index 0000000..0f9ad43
--- /dev/null
+++ b/internal/ingest/client.go
@@ -0,0 +1,189 @@
+package ingest
+
+import (
+ "bufio"
+ "bytes"
+ "context"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "io"
+ "net"
+ "net/http"
+ "net/url"
+ "os"
+ "path/filepath"
+ "strings"
+ "time"
+
+ "github.com/Fuwn/plutia/internal/types"
+)
+
+type Client struct {
+ source string
+ http *http.Client
+}
+
+func NewClient(source string) *Client {
+ transport := &http.Transport{
+ Proxy: http.ProxyFromEnvironment,
+ DialContext: (&net.Dialer{Timeout: 10 * time.Second, KeepAlive: 30 * time.Second}).DialContext,
+ ForceAttemptHTTP2: true,
+ MaxIdleConns: 256,
+ MaxIdleConnsPerHost: 64,
+ IdleConnTimeout: 90 * time.Second,
+ TLSHandshakeTimeout: 10 * time.Second,
+ ExpectContinueTimeout: 1 * time.Second,
+ }
+ return &Client{
+ source: strings.TrimRight(source, "/"),
+ http: &http.Client{
+ Timeout: 60 * time.Second,
+ Transport: transport,
+ },
+ }
+}
+
+func (c *Client) FetchExport(ctx context.Context, after uint64) ([]types.ExportRecord, error) {
+ return c.FetchExportLimited(ctx, after, 0)
+}
+
+func (c *Client) FetchExportLimited(ctx context.Context, after uint64, limit uint64) ([]types.ExportRecord, error) {
+ if strings.HasPrefix(c.source, "file://") || strings.HasSuffix(c.source, ".ndjson") || strings.HasSuffix(c.source, ".json") {
+ return c.fetchFromFile(after, limit)
+ }
+ u, err := url.Parse(c.source)
+ if err != nil {
+ return nil, fmt.Errorf("parse plc source: %w", err)
+ }
+ u.Path = strings.TrimRight(u.Path, "/") + "/export"
+ q := u.Query()
+ q.Set("after", fmt.Sprintf("%d", after))
+ u.RawQuery = q.Encode()
+
+ req, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil)
+ if err != nil {
+ return nil, fmt.Errorf("new request: %w", err)
+ }
+ resp, err := c.http.Do(req)
+ if err != nil {
+ return nil, fmt.Errorf("fetch export: %w", err)
+ }
+ defer resp.Body.Close()
+ if resp.StatusCode != http.StatusOK {
+ b, _ := io.ReadAll(io.LimitReader(resp.Body, 4*1024))
+ return nil, fmt.Errorf("export response %d: %s", resp.StatusCode, strings.TrimSpace(string(b)))
+ }
+ return decodeExportBody(resp.Body, limit)
+}
+
+func decodeExportBody(r io.Reader, limit uint64) ([]types.ExportRecord, error) {
+ br := bufio.NewReader(r)
+ first, err := peekFirstNonSpace(br)
+ if err != nil {
+ if err == io.EOF {
+ return nil, nil
+ }
+ return nil, err
+ }
+
+ if first == '[' {
+ b, err := io.ReadAll(br)
+ if err != nil {
+ return nil, fmt.Errorf("read export body: %w", err)
+ }
+ trimmed := bytes.TrimSpace(b)
+ var records []types.ExportRecord
+ if err := json.Unmarshal(trimmed, &records); err != nil {
+ return nil, fmt.Errorf("decode export json array: %w", err)
+ }
+ if limit > 0 && uint64(len(records)) > limit {
+ records = records[:limit]
+ }
+ return records, nil
+ }
+ out := make([]types.ExportRecord, 0, 1024)
+ for {
+ line, err := br.ReadBytes('\n')
+ isEOF := errors.Is(err, io.EOF)
+ if err != nil && !isEOF {
+ return nil, fmt.Errorf("read ndjson line: %w", err)
+ }
+ if limit > 0 && uint64(len(out)) >= limit {
+ return out, nil
+ }
+ trimmed := bytes.TrimSpace(line)
+ if len(trimmed) > 0 {
+ var rec types.ExportRecord
+ if err := json.Unmarshal(trimmed, &rec); err != nil {
+ if isEOF && isTrailingNDJSONPartial(err) {
+ return out, nil
+ }
+ return nil, fmt.Errorf("decode ndjson line: %w", err)
+ }
+ out = append(out, rec)
+ }
+ if isEOF {
+ break
+ }
+ }
+ return out, nil
+}
+
+func isTrailingNDJSONPartial(err error) bool {
+ if !errors.Is(err, io.ErrUnexpectedEOF) && !strings.Contains(err.Error(), "unexpected end of JSON input") {
+ return false
+ }
+ return true
+}
+
+func peekFirstNonSpace(br *bufio.Reader) (byte, error) {
+ for {
+ b, err := br.ReadByte()
+ if err != nil {
+ return 0, err
+ }
+ if !isSpace(b) {
+ if err := br.UnreadByte(); err != nil {
+ return 0, fmt.Errorf("unread byte: %w", err)
+ }
+ return b, nil
+ }
+ }
+}
+
+func isSpace(b byte) bool {
+ switch b {
+ case ' ', '\n', '\t', '\r':
+ return true
+ default:
+ return false
+ }
+}
+
+func (c *Client) fetchFromFile(after uint64, limit uint64) ([]types.ExportRecord, error) {
+ path := c.source
+ if strings.HasPrefix(path, "file://") {
+ path = strings.TrimPrefix(path, "file://")
+ }
+ path = filepath.Clean(path)
+ b, err := os.ReadFile(path)
+ if err != nil {
+ return nil, fmt.Errorf("read source file: %w", err)
+ }
+ recs, err := decodeExportBody(bytes.NewReader(b), 0)
+ if err != nil {
+ return nil, err
+ }
+ out := make([]types.ExportRecord, 0, len(recs))
+ for _, r := range recs {
+ if r.Seq <= after {
+ continue
+ }
+ out = append(out, r)
+ if limit > 0 && uint64(len(out)) >= limit {
+ break
+ }
+ }
+ return out, nil
+}
diff --git a/internal/ingest/client_test.go b/internal/ingest/client_test.go
new file mode 100644
index 0000000..ff80225
--- /dev/null
+++ b/internal/ingest/client_test.go
@@ -0,0 +1,38 @@
+package ingest
+
+import (
+ "strings"
+ "testing"
+)
+
+func TestDecodeExportBody_IgnoresTrailingPartialNDJSONLine(t *testing.T) {
+ body := strings.Join([]string{
+ `{"seq":1,"did":"did:plc:alice","cid":"cid1","operation":{"x":1}}`,
+ `{"seq":2,"did":"did:plc:bob","cid":"cid2","operation":{"x":2}}`,
+ `{"seq":3,"did":"did:plc:carol","cid":"cid3","operation":{"x":3}`,
+ }, "\n")
+
+ records, err := decodeExportBody(strings.NewReader(body), 0)
+ if err != nil {
+ t.Fatalf("decode export body: %v", err)
+ }
+ if len(records) != 2 {
+ t.Fatalf("record count mismatch: got %d want 2", len(records))
+ }
+ if records[0].Seq != 1 || records[1].Seq != 2 {
+ t.Fatalf("unexpected sequences: got [%d %d], want [1 2]", records[0].Seq, records[1].Seq)
+ }
+}
+
+func TestDecodeExportBody_FailsOnMalformedNonTrailingNDJSONLine(t *testing.T) {
+ body := strings.Join([]string{
+ `{"seq":1,"did":"did:plc:alice","cid":"cid1","operation":{"x":1}}`,
+ `{"seq":2,"did":"did:plc:bob","cid":"cid2","operation":{"x":2}`,
+ `{"seq":3,"did":"did:plc:carol","cid":"cid3","operation":{"x":3}}`,
+ }, "\n")
+
+ _, err := decodeExportBody(strings.NewReader(body), 0)
+ if err == nil {
+ t.Fatalf("expected malformed middle line to fail")
+ }
+}
diff --git a/internal/ingest/service.go b/internal/ingest/service.go
new file mode 100644
index 0000000..8451246
--- /dev/null
+++ b/internal/ingest/service.go
@@ -0,0 +1,741 @@
+package ingest
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "hash/fnv"
+ "log"
+ "os"
+ "os/exec"
+ "sort"
+ "strconv"
+ "strings"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/Fuwn/plutia/internal/checkpoint"
+ "github.com/Fuwn/plutia/internal/config"
+ "github.com/Fuwn/plutia/internal/state"
+ "github.com/Fuwn/plutia/internal/storage"
+ "github.com/Fuwn/plutia/internal/types"
+ "github.com/Fuwn/plutia/internal/verify"
+)
+
+type Stats struct {
+ IngestedOps uint64 `json:"ingested_ops"`
+ Errors uint64 `json:"errors"`
+ LastSeq uint64 `json:"last_seq"`
+}
+
+type Service struct {
+ cfg config.Config
+ store storage.Store
+ client *Client
+ verifier *verify.Verifier
+ engine *state.Engine
+ checkpoints *checkpoint.Manager
+ blockLog *storage.BlockLog
+ appender *blockAppender
+ stats Stats
+ maxOps uint64
+ runOps uint64
+ corrupted atomic.Bool
+ corruptErr atomic.Value
+}
+
+func NewService(cfg config.Config, store storage.Store, client *Client, blockLog *storage.BlockLog, checkpointMgr *checkpoint.Manager) *Service {
+ s := &Service{
+ cfg: cfg,
+ store: store,
+ client: client,
+ verifier: verify.New(cfg.VerifyPolicy),
+ engine: state.New(store, cfg.Mode),
+ checkpoints: checkpointMgr,
+ blockLog: blockLog,
+ }
+ if cfg.Mode == config.ModeMirror && blockLog != nil {
+ s.appender = newBlockAppender(blockLog, 1024)
+ if _, err := blockLog.ValidateAndRecover(store); err != nil {
+ s.MarkCorrupted(err)
+ }
+ }
+ return s
+}
+
+func (s *Service) SetMaxOps(max uint64) {
+ s.maxOps = max
+}
+
+func (s *Service) Replay(ctx context.Context) error {
+ for {
+ changed, err := s.RunOnce(ctx)
+ if err != nil {
+ return err
+ }
+ if !changed {
+ return nil
+ }
+ }
+}
+
+func (s *Service) Poll(ctx context.Context) error {
+ ticker := time.NewTicker(s.cfg.PollInterval)
+ defer ticker.Stop()
+ for {
+ if _, err := s.RunOnce(ctx); err != nil {
+ atomic.AddUint64(&s.stats.Errors, 1)
+ }
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case <-ticker.C:
+ }
+ }
+}
+
+func (s *Service) RunOnce(ctx context.Context) (bool, error) {
+ if err := s.CorruptionError(); err != nil {
+ return false, err
+ }
+ if s.maxOps > 0 && s.runOps >= s.maxOps {
+ return false, nil
+ }
+ lastSeq, err := s.store.GetGlobalSeq()
+ if err != nil {
+ return false, fmt.Errorf("load global sequence: %w", err)
+ }
+ limit := uint64(0)
+ if s.maxOps > 0 {
+ remaining := s.maxOps - s.runOps
+ if remaining == 0 {
+ return false, nil
+ }
+ limit = remaining
+ }
+ records, err := s.client.FetchExportLimited(ctx, lastSeq, limit)
+ if err != nil {
+ return false, err
+ }
+ if len(records) == 0 {
+ return false, nil
+ }
+ committed, err := s.processRecords(ctx, records)
+ s.runOps += committed
+ if err != nil {
+ atomic.AddUint64(&s.stats.Errors, 1)
+ return committed > 0, err
+ }
+ return true, nil
+}
+
+type verifyTask struct {
+ index int
+ rec types.ExportRecord
+}
+
+type verifyResult struct {
+ index int
+ op types.ParsedOperation
+ state types.StateV1
+ err error
+}
+
+func (s *Service) processRecords(ctx context.Context, records []types.ExportRecord) (uint64, error) {
+ verified, err := s.verifyRecords(ctx, records)
+ if err != nil {
+ return 0, err
+ }
+ return s.commitVerified(ctx, verified)
+}
+
+func (s *Service) verifyRecords(ctx context.Context, records []types.ExportRecord) ([]verifyResult, error) {
+ _ = ctx
+ workers := s.cfg.VerifyWorkers
+ if workers < 1 {
+ workers = 1
+ }
+ queues := make([]chan verifyTask, workers)
+ results := make(chan verifyResult, len(records))
+ var wg sync.WaitGroup
+
+ for i := 0; i < workers; i++ {
+ queues[i] = make(chan verifyTask, 64)
+ wg.Add(1)
+ go func(queue <-chan verifyTask) {
+ defer wg.Done()
+ cache := map[string]*types.StateV1{}
+ for task := range queue {
+ op, err := types.ParseOperation(task.rec)
+ if err != nil {
+ results <- verifyResult{index: task.index, err: err}
+ continue
+ }
+ existing, err := s.loadExistingState(cache, op.DID)
+ if err != nil {
+ results <- verifyResult{index: task.index, err: err}
+ continue
+ }
+ if err := s.verifier.VerifyOperation(op, existing); err != nil {
+ results <- verifyResult{index: task.index, err: err}
+ continue
+ }
+ next, err := state.ComputeNextState(op, existing)
+ if err != nil {
+ results <- verifyResult{index: task.index, err: err}
+ continue
+ }
+ cache[op.DID] = cloneState(&next)
+ results <- verifyResult{index: task.index, op: op, state: next}
+ }
+ }(queues[i])
+ }
+
+ for idx, rec := range records {
+ worker := didWorker(rec.DID, workers)
+ queues[worker] <- verifyTask{index: idx, rec: rec}
+ }
+ for _, q := range queues {
+ close(q)
+ }
+ wg.Wait()
+ close(results)
+ return collectVerifiedInOrder(len(records), results)
+}
+
+func (s *Service) loadExistingState(cache map[string]*types.StateV1, did string) (*types.StateV1, error) {
+ if existing, ok := cache[did]; ok {
+ return cloneState(existing), nil
+ }
+ stateVal, ok, err := s.store.GetState(did)
+ if err != nil {
+ return nil, err
+ }
+ if !ok {
+ cache[did] = nil
+ return nil, nil
+ }
+ cache[did] = cloneState(&stateVal)
+ return cloneState(&stateVal), nil
+}
+
+func collectVerifiedInOrder(total int, results <-chan verifyResult) ([]verifyResult, error) {
+ ordered := make([]verifyResult, total)
+ seen := make([]bool, total)
+ firstErr := error(nil)
+ received := 0
+ for r := range results {
+ received++
+ if r.index < 0 || r.index >= total {
+ if firstErr == nil {
+ firstErr = fmt.Errorf("verify worker returned out-of-range index %d", r.index)
+ }
+ continue
+ }
+ if seen[r.index] {
+ if firstErr == nil {
+ firstErr = fmt.Errorf("duplicate verify result for index %d", r.index)
+ }
+ continue
+ }
+ seen[r.index] = true
+ ordered[r.index] = r
+ if r.err != nil && firstErr == nil {
+ firstErr = r.err
+ }
+ }
+ if received != total && firstErr == nil {
+ firstErr = fmt.Errorf("verify result count mismatch: got %d want %d", received, total)
+ }
+ if firstErr != nil {
+ return nil, firstErr
+ }
+ for i := range seen {
+ if !seen[i] {
+ return nil, fmt.Errorf("missing verify result index %d", i)
+ }
+ }
+ return ordered, nil
+}
+
+func didWorker(did string, workers int) int {
+ if workers <= 1 {
+ return 0
+ }
+ hasher := fnv.New32a()
+ _, _ = hasher.Write([]byte(did))
+ return int(hasher.Sum32() % uint32(workers))
+}
+
+func cloneState(in *types.StateV1) *types.StateV1 {
+ if in == nil {
+ return nil
+ }
+ out := *in
+ out.DIDDocument = append([]byte(nil), in.DIDDocument...)
+ out.RotationKeys = append([]string(nil), in.RotationKeys...)
+ return &out
+}
+
+func (s *Service) commitVerified(ctx context.Context, verified []verifyResult) (uint64, error) {
+ _ = ctx
+ batchSize := s.cfg.CommitBatchSize
+ if batchSize < 1 {
+ batchSize = 1
+ }
+ pendingOps := make([]storage.OperationMutation, 0, batchSize)
+ pendingSeqs := make([]uint64, 0, batchSize)
+ pendingBlockHashes := map[uint64]string{}
+ var committed uint64
+
+ commit := func() error {
+ if len(pendingOps) == 0 {
+ return nil
+ }
+ if s.cfg.Mode == config.ModeMirror {
+ flush, err := s.appender.Flush()
+ if err != nil {
+ return err
+ }
+ if flush != nil {
+ pendingBlockHashes[flush.BlockID] = flush.Hash
+ }
+ }
+ blockEntries := make([]storage.BlockHashEntry, 0, len(pendingBlockHashes))
+ for id, hash := range pendingBlockHashes {
+ blockEntries = append(blockEntries, storage.BlockHashEntry{BlockID: id, Hash: hash})
+ }
+ sort.Slice(blockEntries, func(i, j int) bool { return blockEntries[i].BlockID < blockEntries[j].BlockID })
+
+ if err := s.store.ApplyOperationsBatch(pendingOps, blockEntries); err != nil {
+ return err
+ }
+ lastSeq := pendingSeqs[len(pendingSeqs)-1]
+ atomic.StoreUint64(&s.stats.LastSeq, lastSeq)
+ atomic.AddUint64(&s.stats.IngestedOps, uint64(len(pendingOps)))
+ committed += uint64(len(pendingOps))
+
+ if s.checkpoints != nil {
+ for _, seq := range pendingSeqs {
+ if seq%s.cfg.CheckpointInterval == 0 {
+ if err := s.createCheckpoint(seq); err != nil {
+ return err
+ }
+ }
+ }
+ }
+
+ pendingOps = pendingOps[:0]
+ pendingSeqs = pendingSeqs[:0]
+ clear(pendingBlockHashes)
+ return nil
+ }
+
+ for _, v := range verified {
+ var ref *types.BlockRefV1
+ if s.cfg.Mode == config.ModeMirror {
+ if s.appender == nil {
+ return committed, fmt.Errorf("mirror mode requires block appender")
+ }
+ result, err := s.appender.Append(v.op)
+ if err != nil {
+ return committed, err
+ }
+ if result.Flush != nil {
+ pendingBlockHashes[result.Flush.BlockID] = result.Flush.Hash
+ }
+ result.Ref.Received = time.Now().UTC().Format(time.RFC3339)
+ ref = &result.Ref
+ }
+ pendingOps = append(pendingOps, storage.OperationMutation{State: v.state, Ref: ref})
+ pendingSeqs = append(pendingSeqs, v.op.Sequence)
+ if len(pendingOps) >= batchSize {
+ if err := commit(); err != nil {
+ return committed, err
+ }
+ }
+ }
+ if err := commit(); err != nil {
+ return committed, err
+ }
+ return committed, nil
+}
+
+func (s *Service) createCheckpoint(sequence uint64) error {
+ blocks, err := s.store.ListBlockHashes()
+ if err != nil {
+ return fmt.Errorf("list blocks for checkpoint: %w", err)
+ }
+ done := make(chan struct{})
+ usageCh := make(chan checkpointProcessUsage, 1)
+ go sampleCheckpointProcessUsage(done, usageCh)
+
+ cp, metrics, err := s.checkpoints.BuildAndStoreFromStoreWithMetrics(sequence, blocks)
+ close(done)
+ usage := <-usageCh
+ if err != nil {
+ return fmt.Errorf("build checkpoint: %w", err)
+ }
+ log.Printf(
+ "checkpoint_metrics seq=%d did_count=%d merkle_compute_ms=%d total_checkpoint_ms=%d cpu_pct=%.2f rss_peak_mb=%.2f completed_unix_ms=%d",
+ cp.Sequence,
+ metrics.DIDCount,
+ metrics.MerkleCompute.Milliseconds(),
+ metrics.Total.Milliseconds(),
+ usage.CPUPercentAvg,
+ float64(usage.RSSPeakKB)/1024.0,
+ time.Now().UnixMilli(),
+ )
+ return nil
+}
+
+func (s *Service) Snapshot(ctx context.Context) (types.CheckpointV1, error) {
+ if s.appender != nil {
+ flush, err := s.appender.Flush()
+ if err != nil {
+ return types.CheckpointV1{}, err
+ }
+ if flush != nil {
+ if err := s.store.ApplyOperationsBatch(nil, []storage.BlockHashEntry{{BlockID: flush.BlockID, Hash: flush.Hash}}); err != nil {
+ return types.CheckpointV1{}, err
+ }
+ }
+ }
+ _ = ctx
+ seq, err := s.store.GetGlobalSeq()
+ if err != nil {
+ return types.CheckpointV1{}, err
+ }
+ blocks, err := s.store.ListBlockHashes()
+ if err != nil {
+ return types.CheckpointV1{}, err
+ }
+ cp, err := s.checkpoints.BuildAndStoreFromStore(seq, blocks)
+ if err != nil {
+ return types.CheckpointV1{}, err
+ }
+ return cp, nil
+}
+
+type checkpointProcessUsage struct {
+ CPUPercentAvg float64
+ RSSPeakKB int64
+}
+
+func sampleCheckpointProcessUsage(done <-chan struct{}, out chan<- checkpointProcessUsage) {
+ pid := os.Getpid()
+ ticker := time.NewTicker(25 * time.Millisecond)
+ defer ticker.Stop()
+ var cpuSum float64
+ var samples int
+ var rssPeak int64
+ sample := func() {
+ cpu, rss, err := processMetrics(pid)
+ if err != nil {
+ return
+ }
+ cpuSum += cpu
+ samples++
+ if rss > rssPeak {
+ rssPeak = rss
+ }
+ }
+ sample()
+ for {
+ select {
+ case <-done:
+ sample()
+ avg := 0.0
+ if samples > 0 {
+ avg = cpuSum / float64(samples)
+ }
+ out <- checkpointProcessUsage{CPUPercentAvg: avg, RSSPeakKB: rssPeak}
+ return
+ case <-ticker.C:
+ sample()
+ }
+ }
+}
+
+func processMetrics(pid int) (cpuPct float64, rssKB int64, err error) {
+ out, err := exec.Command("ps", "-p", strconv.Itoa(pid), "-o", "pcpu=,rss=").Output()
+ if err != nil {
+ return 0, 0, err
+ }
+ fields := strings.Fields(string(out))
+ if len(fields) < 2 {
+ return 0, 0, fmt.Errorf("unexpected ps output: %q", strings.TrimSpace(string(out)))
+ }
+ cpuPct, _ = strconv.ParseFloat(fields[0], 64)
+ rssKB, _ = strconv.ParseInt(fields[1], 10, 64)
+ return cpuPct, rssKB, nil
+}
+
+func (s *Service) VerifyDID(ctx context.Context, did string) error {
+ _ = ctx
+ if err := s.CorruptionError(); err != nil {
+ return err
+ }
+ if s.cfg.Mode != config.ModeMirror {
+ _, ok, err := s.store.GetState(did)
+ if err != nil {
+ return err
+ }
+ if !ok {
+ return fmt.Errorf("did not found")
+ }
+ return nil
+ }
+ seqs, err := s.store.ListDIDSequences(did)
+ if err != nil {
+ return err
+ }
+ if len(seqs) == 0 {
+ return fmt.Errorf("no operations for did %s", did)
+ }
+
+ var previous *types.StateV1
+ for _, seq := range seqs {
+ ref, ok, err := s.store.GetOpSeqRef(seq)
+ if err != nil {
+ return err
+ }
+ if !ok {
+ return fmt.Errorf("missing op reference for seq %d", seq)
+ }
+ payload, err := s.blockLog.ReadRecord(ref)
+ if err != nil {
+ return err
+ }
+ rec := types.ExportRecord{Seq: seq, DID: did, CID: ref.CID, Operation: payload}
+ op, err := types.ParseOperation(rec)
+ if err != nil {
+ return err
+ }
+ if err := s.verifier.VerifyOperation(op, previous); err != nil {
+ return fmt.Errorf("verify seq %d failed: %w", seq, err)
+ }
+ s := types.StateV1{DID: did, ChainTipHash: op.CID, LatestOpSeq: seq}
+ previous = &s
+ }
+ return nil
+}
+
+func (s *Service) RecomputeTipAtOrBefore(ctx context.Context, did string, sequence uint64) (string, []uint64, error) {
+ _ = ctx
+ if err := s.CorruptionError(); err != nil {
+ return "", nil, err
+ }
+ if s.cfg.Mode != config.ModeMirror {
+ return "", nil, errors.New("historical proof requires mirror mode")
+ }
+ seqs, err := s.store.ListDIDSequences(did)
+ if err != nil {
+ return "", nil, err
+ }
+ if len(seqs) == 0 {
+ return "", nil, fmt.Errorf("no operations for did %s", did)
+ }
+ filtered := make([]uint64, 0, len(seqs))
+ for _, seq := range seqs {
+ if seq <= sequence {
+ filtered = append(filtered, seq)
+ }
+ }
+ if len(filtered) == 0 {
+ return "", nil, fmt.Errorf("no operations for did %s at checkpoint %d", did, sequence)
+ }
+ sort.Slice(filtered, func(i, j int) bool { return filtered[i] < filtered[j] })
+
+ var previous *types.StateV1
+ var tip string
+ for _, seq := range filtered {
+ ref, ok, err := s.store.GetOpSeqRef(seq)
+ if err != nil {
+ return "", nil, err
+ }
+ if !ok {
+ return "", nil, fmt.Errorf("missing op reference for seq %d", seq)
+ }
+ payload, err := s.blockLog.ReadRecord(ref)
+ if err != nil {
+ return "", nil, err
+ }
+ op, err := types.ParseOperation(types.ExportRecord{Seq: seq, DID: did, CID: ref.CID, Operation: payload})
+ if err != nil {
+ return "", nil, err
+ }
+ if err := s.verifier.VerifyOperation(op, previous); err != nil {
+ return "", nil, fmt.Errorf("verify seq %d failed: %w", seq, err)
+ }
+ tip = op.CID
+ next := types.StateV1{
+ DID: did,
+ ChainTipHash: op.CID,
+ LatestOpSeq: seq,
+ }
+ prior := []string(nil)
+ if previous != nil {
+ prior = append(prior, previous.RotationKeys...)
+ }
+ next.RotationKeys = extractRotationKeysFromPayload(op.Payload, prior)
+ previous = &next
+ }
+ return tip, filtered, nil
+}
+
+func (s *Service) Flush(ctx context.Context) error {
+ _ = ctx
+ if err := s.CorruptionError(); err != nil {
+ return err
+ }
+ if s.appender == nil {
+ return nil
+ }
+ flush, err := s.appender.Flush()
+ if err != nil {
+ return err
+ }
+ if flush != nil {
+ if err := s.store.ApplyOperationsBatch(nil, []storage.BlockHashEntry{{BlockID: flush.BlockID, Hash: flush.Hash}}); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+func (s *Service) MarkCorrupted(err error) {
+ if err == nil {
+ return
+ }
+ s.corrupted.Store(true)
+ s.corruptErr.Store(err.Error())
+}
+
+func (s *Service) IsCorrupted() bool {
+ return s.corrupted.Load()
+}
+
+func (s *Service) CorruptionError() error {
+ if !s.IsCorrupted() {
+ return nil
+ }
+ msg, _ := s.corruptErr.Load().(string)
+ if msg == "" {
+ msg = "data corruption detected"
+ }
+ return fmt.Errorf("data corruption detected: %s", msg)
+}
+
+func extractRotationKeysFromPayload(payload map[string]any, prior []string) []string {
+ out := make([]string, 0, len(prior)+4)
+ seen := map[string]struct{}{}
+ add := func(v string) {
+ v = strings.TrimSpace(v)
+ if v == "" {
+ return
+ }
+ if _, ok := seen[v]; ok {
+ return
+ }
+ seen[v] = struct{}{}
+ out = append(out, v)
+ }
+ if arr, ok := payload["rotationKeys"].([]any); ok {
+ for _, v := range arr {
+ if s, ok := v.(string); ok {
+ add(s)
+ }
+ }
+ }
+ if recovery, ok := payload["recoveryKey"].(string); ok {
+ add(recovery)
+ }
+ if signing, ok := payload["signingKey"].(string); ok {
+ add(signing)
+ }
+ if len(out) == 0 {
+ for _, v := range prior {
+ add(v)
+ }
+ }
+ return out
+}
+
+func (s *Service) Close() {
+ if s.appender != nil {
+ s.appender.Close()
+ }
+}
+
+func (s *Service) Stats() Stats {
+ return Stats{
+ IngestedOps: atomic.LoadUint64(&s.stats.IngestedOps),
+ Errors: atomic.LoadUint64(&s.stats.Errors),
+ LastSeq: atomic.LoadUint64(&s.stats.LastSeq),
+ }
+}
+
+type appendResult struct {
+ Ref types.BlockRefV1
+ Flush *storage.FlushInfo
+ Err error
+}
+
+type appendRequest struct {
+ Operation types.ParsedOperation
+ Reply chan appendResult
+ Flush bool
+}
+
+type blockAppender struct {
+ log *storage.BlockLog
+ queue chan appendRequest
+ once sync.Once
+ closed chan struct{}
+}
+
+func newBlockAppender(log *storage.BlockLog, buffer int) *blockAppender {
+ ba := &blockAppender{
+ log: log,
+ queue: make(chan appendRequest, buffer),
+ closed: make(chan struct{}),
+ }
+ go ba.run()
+ return ba
+}
+
+func (b *blockAppender) run() {
+ defer close(b.closed)
+ for req := range b.queue {
+ if req.Flush {
+ flush, err := b.log.Flush()
+ req.Reply <- appendResult{Flush: flush, Err: err}
+ continue
+ }
+ ref, flush, err := b.log.Append(req.Operation.Sequence, req.Operation.DID, req.Operation.CID, req.Operation.Prev, req.Operation.CanonicalBytes)
+ req.Reply <- appendResult{Ref: ref, Flush: flush, Err: err}
+ }
+}
+
+func (b *blockAppender) Append(op types.ParsedOperation) (appendResult, error) {
+ reply := make(chan appendResult, 1)
+ b.queue <- appendRequest{Operation: op, Reply: reply}
+ res := <-reply
+ return res, res.Err
+}
+
+func (b *blockAppender) Flush() (*storage.FlushInfo, error) {
+ reply := make(chan appendResult, 1)
+ b.queue <- appendRequest{Flush: true, Reply: reply}
+ res := <-reply
+ return res.Flush, res.Err
+}
+
+func (b *blockAppender) Close() {
+ b.once.Do(func() {
+ close(b.queue)
+ <-b.closed
+ })
+}
diff --git a/internal/ingest/service_integration_test.go b/internal/ingest/service_integration_test.go
new file mode 100644
index 0000000..d7bd1b6
--- /dev/null
+++ b/internal/ingest/service_integration_test.go
@@ -0,0 +1,157 @@
+package ingest
+
+import (
+ "context"
+ "crypto/ed25519"
+ "crypto/rand"
+ "encoding/base64"
+ "encoding/json"
+ "fmt"
+ "os"
+ "path/filepath"
+ "testing"
+
+ "github.com/Fuwn/plutia/internal/checkpoint"
+ "github.com/Fuwn/plutia/internal/config"
+ "github.com/Fuwn/plutia/internal/storage"
+ "github.com/Fuwn/plutia/internal/types"
+)
+
+func TestReplayIntegration(t *testing.T) {
+ tmp := t.TempDir()
+ dataDir := filepath.Join(tmp, "data")
+ if err := os.MkdirAll(dataDir, 0o755); err != nil {
+ t.Fatalf("mkdir data dir: %v", err)
+ }
+
+ keySeed := make([]byte, ed25519.SeedSize)
+ if _, err := rand.Read(keySeed); err != nil {
+ t.Fatalf("rand seed: %v", err)
+ }
+ keyPath := filepath.Join(tmp, "mirror.key")
+ if err := os.WriteFile(keyPath, []byte(base64.RawURLEncoding.EncodeToString(keySeed)), 0o600); err != nil {
+ t.Fatalf("write mirror key: %v", err)
+ }
+
+ records := buildSignedRecords(t)
+ sourcePath := filepath.Join(tmp, "sample.ndjson")
+ file, err := os.Create(sourcePath)
+ if err != nil {
+ t.Fatalf("create source: %v", err)
+ }
+ for _, rec := range records {
+ b, _ := json.Marshal(rec)
+ if _, err := fmt.Fprintln(file, string(b)); err != nil {
+ t.Fatalf("write source: %v", err)
+ }
+ }
+ file.Close()
+
+ store, err := storage.OpenPebble(dataDir)
+ if err != nil {
+ t.Fatalf("open pebble: %v", err)
+ }
+ defer store.Close()
+ if err := store.SetMode(config.ModeMirror); err != nil {
+ t.Fatalf("set mode: %v", err)
+ }
+
+ bl, err := storage.OpenBlockLog(dataDir, 3, 4)
+ if err != nil {
+ t.Fatalf("open block log: %v", err)
+ }
+
+ cfg := config.Config{
+ Mode: config.ModeMirror,
+ DataDir: dataDir,
+ PLCSource: sourcePath,
+ VerifyPolicy: config.VerifyFull,
+ ZstdLevel: 3,
+ BlockSizeMB: 4,
+ CheckpointInterval: 2,
+ ListenAddr: ":0",
+ MirrorPrivateKeyPath: keyPath,
+ }
+ service := NewService(cfg, store, NewClient(sourcePath), bl, checkpoint.NewManager(store, dataDir, keyPath))
+ if err := service.Replay(context.Background()); err != nil {
+ t.Fatalf("replay: %v", err)
+ }
+ if err := service.Flush(context.Background()); err != nil {
+ t.Fatalf("flush: %v", err)
+ }
+
+ seq, err := store.GetGlobalSeq()
+ if err != nil {
+ t.Fatalf("get global seq: %v", err)
+ }
+ if seq != 3 {
+ t.Fatalf("global seq mismatch: got %d want 3", seq)
+ }
+
+ s, ok, err := store.GetState("did:plc:alice")
+ if err != nil {
+ t.Fatalf("get state: %v", err)
+ }
+ if !ok {
+ t.Fatalf("missing alice state")
+ }
+ if s.LatestOpSeq != 2 {
+ t.Fatalf("latest op seq mismatch for alice: got %d want 2", s.LatestOpSeq)
+ }
+ if err := service.VerifyDID(context.Background(), "did:plc:alice"); err != nil {
+ t.Fatalf("verify alice did: %v", err)
+ }
+
+ if _, err := service.Snapshot(context.Background()); err != nil {
+ t.Fatalf("snapshot: %v", err)
+ }
+ if _, ok, err := store.GetLatestCheckpoint(); err != nil || !ok {
+ t.Fatalf("expected latest checkpoint, err=%v ok=%v", err, ok)
+ }
+}
+
+func buildSignedRecords(t *testing.T) []types.ExportRecord {
+ t.Helper()
+ pub1, priv1, err := ed25519.GenerateKey(rand.Reader)
+ if err != nil {
+ t.Fatalf("generate key1: %v", err)
+ }
+ pub2, priv2, err := ed25519.GenerateKey(rand.Reader)
+ if err != nil {
+ t.Fatalf("generate key2: %v", err)
+ }
+ var out []types.ExportRecord
+
+ mk := func(seq uint64, did, prev string, pub ed25519.PublicKey, priv ed25519.PrivateKey) types.ExportRecord {
+ payloadDoc := map[string]any{
+ "did": did,
+ "didDoc": map[string]any{"id": did, "seq": seq},
+ }
+ if prev != "" {
+ payloadDoc["prev"] = prev
+ }
+ payloadBytes, _ := json.Marshal(payloadDoc)
+ canon, _ := types.CanonicalizeJSON(payloadBytes)
+ sig := ed25519.Sign(priv, canon)
+ op := map[string]any{
+ "did": did,
+ "didDoc": payloadDoc["didDoc"],
+ "publicKey": base64.RawURLEncoding.EncodeToString(pub),
+ "sigPayload": base64.RawURLEncoding.EncodeToString(canon),
+ "sig": base64.RawURLEncoding.EncodeToString(sig),
+ }
+ if prev != "" {
+ op["prev"] = prev
+ }
+ opRaw, _ := json.Marshal(op)
+ opCanon, _ := types.CanonicalizeJSON(opRaw)
+ cid := types.ComputeDigestCID(opCanon)
+ return types.ExportRecord{Seq: seq, DID: did, CID: cid, Operation: opRaw}
+ }
+
+ rec1 := mk(1, "did:plc:alice", "", pub1, priv1)
+ rec2 := mk(2, "did:plc:alice", rec1.CID, pub1, priv1)
+ rec3 := mk(3, "did:plc:bob", "", pub2, priv2)
+ out = append(out, rec1, rec2, rec3)
+ return out
+}
diff --git a/internal/ingest/service_order_test.go b/internal/ingest/service_order_test.go
new file mode 100644
index 0000000..0e2299c
--- /dev/null
+++ b/internal/ingest/service_order_test.go
@@ -0,0 +1,39 @@
+package ingest
+
+import (
+ "testing"
+
+ "github.com/Fuwn/plutia/internal/types"
+)
+
+func TestCollectVerifiedInOrder_OutOfOrderInput(t *testing.T) {
+ results := make(chan verifyResult, 3)
+ results <- verifyResult{index: 2, op: types.ParsedOperation{Sequence: 3}}
+ results <- verifyResult{index: 0, op: types.ParsedOperation{Sequence: 1}}
+ results <- verifyResult{index: 1, op: types.ParsedOperation{Sequence: 2}}
+ close(results)
+
+ ordered, err := collectVerifiedInOrder(3, results)
+ if err != nil {
+ t.Fatalf("collect verified: %v", err)
+ }
+ if len(ordered) != 3 {
+ t.Fatalf("ordered length mismatch: got %d want 3", len(ordered))
+ }
+ for i := 0; i < 3; i++ {
+ if ordered[i].op.Sequence != uint64(i+1) {
+ t.Fatalf("unexpected sequence at index %d: got %d want %d", i, ordered[i].op.Sequence, i+1)
+ }
+ }
+}
+
+func TestCollectVerifiedInOrder_MissingResult(t *testing.T) {
+ results := make(chan verifyResult, 2)
+ results <- verifyResult{index: 0, op: types.ParsedOperation{Sequence: 1}}
+ results <- verifyResult{index: 2, op: types.ParsedOperation{Sequence: 3}}
+ close(results)
+
+ if _, err := collectVerifiedInOrder(3, results); err == nil {
+ t.Fatalf("expected missing result error")
+ }
+}
diff --git a/internal/merkle/tree.go b/internal/merkle/tree.go
new file mode 100644
index 0000000..ff97217
--- /dev/null
+++ b/internal/merkle/tree.go
@@ -0,0 +1,188 @@
+package merkle
+
+import (
+ "crypto/sha256"
+ "encoding/hex"
+)
+
+type Sibling struct {
+ Hash string `json:"hash"`
+ Left bool `json:"left"`
+}
+
+func HashLeaf(leaf []byte) []byte {
+ s := sha256.Sum256(leaf)
+ return s[:]
+}
+
+func Root(leaves [][]byte) []byte {
+ if len(leaves) == 0 {
+ empty := sha256.Sum256(nil)
+ return empty[:]
+ }
+ level := cloneLevel(leaves)
+ for len(level) > 1 {
+ next := make([][]byte, 0, (len(level)+1)/2)
+ for i := 0; i < len(level); i += 2 {
+ left := level[i]
+ right := left
+ if i+1 < len(level) {
+ right = level[i+1]
+ }
+ next = append(next, hashPair(left, right))
+ }
+ level = next
+ }
+ return level[0]
+}
+
+func BuildProof(leaves [][]byte, index int) []Sibling {
+ if len(leaves) == 0 || index < 0 || index >= len(leaves) {
+ return nil
+ }
+ proof := make([]Sibling, 0)
+ level := cloneLevel(leaves)
+ idx := index
+ for len(level) > 1 {
+ if idx%2 == 0 {
+ sib := idx + 1
+ if sib >= len(level) {
+ sib = idx
+ }
+ proof = append(proof, Sibling{Hash: hex.EncodeToString(level[sib]), Left: false})
+ } else {
+ sib := idx - 1
+ proof = append(proof, Sibling{Hash: hex.EncodeToString(level[sib]), Left: true})
+ }
+
+ next := make([][]byte, 0, (len(level)+1)/2)
+ for i := 0; i < len(level); i += 2 {
+ left := level[i]
+ right := left
+ if i+1 < len(level) {
+ right = level[i+1]
+ }
+ next = append(next, hashPair(left, right))
+ }
+ idx /= 2
+ level = next
+ }
+ return proof
+}
+
+func VerifyProof(leafHash []byte, proof []Sibling, root []byte) bool {
+ cur := append([]byte(nil), leafHash...)
+ for _, s := range proof {
+ sib, err := hex.DecodeString(s.Hash)
+ if err != nil {
+ return false
+ }
+ var combined []byte
+ if s.Left {
+ combined = append(append([]byte(nil), sib...), cur...)
+ } else {
+ combined = append(append([]byte(nil), cur...), sib...)
+ }
+ h := sha256.Sum256(combined)
+ cur = h[:]
+ }
+ return hex.EncodeToString(cur) == hex.EncodeToString(root)
+}
+
+func cloneLevel(in [][]byte) [][]byte {
+ out := make([][]byte, len(in))
+ for i := range in {
+ out[i] = append([]byte(nil), in[i]...)
+ }
+ return out
+}
+
+type Accumulator struct {
+ levels [][]byte
+ pendingCount int
+}
+
+func NewAccumulator() *Accumulator {
+ return &Accumulator{levels: make([][]byte, 0, 32)}
+}
+
+func (a *Accumulator) AddLeafHash(leafHash []byte) {
+ if len(leafHash) == 0 {
+ return
+ }
+ cur := append([]byte(nil), leafHash...)
+ level := 0
+ for {
+ if level >= len(a.levels) {
+ a.levels = append(a.levels, nil)
+ }
+ if a.levels[level] == nil {
+ a.levels[level] = cur
+ a.pendingCount++
+ break
+ }
+ cur = hashPair(a.levels[level], cur)
+ a.levels[level] = nil
+ a.pendingCount--
+ level++
+ }
+}
+
+func (a *Accumulator) RootDuplicateLast() []byte {
+ if a.pendingCount == 0 {
+ empty := sha256.Sum256(nil)
+ return empty[:]
+ }
+ clone := &Accumulator{
+ levels: cloneLevel(a.levels),
+ pendingCount: a.pendingCount,
+ }
+ for clone.pendingCount > 1 {
+ level := clone.lowestPendingLevel()
+ cur := clone.levels[level]
+ clone.levels[level] = nil
+ clone.pendingCount--
+
+ cur = hashPair(cur, cur)
+ level++
+ for {
+ if level >= len(clone.levels) {
+ clone.levels = append(clone.levels, nil)
+ }
+ if clone.levels[level] == nil {
+ clone.levels[level] = cur
+ clone.pendingCount++
+ break
+ }
+ cur = hashPair(clone.levels[level], cur)
+ clone.levels[level] = nil
+ clone.pendingCount--
+ level++
+ }
+ }
+ return clone.highestPendingHash()
+}
+
+func hashPair(left, right []byte) []byte {
+ h := sha256.Sum256(append(append([]byte(nil), left...), right...))
+ return h[:]
+}
+
+func (a *Accumulator) lowestPendingLevel() int {
+ for i := 0; i < len(a.levels); i++ {
+ if a.levels[i] != nil {
+ return i
+ }
+ }
+ return 0
+}
+
+func (a *Accumulator) highestPendingHash() []byte {
+ for i := len(a.levels) - 1; i >= 0; i-- {
+ if a.levels[i] != nil {
+ return a.levels[i]
+ }
+ }
+ empty := sha256.Sum256(nil)
+ return empty[:]
+}
diff --git a/internal/merkle/tree_test.go b/internal/merkle/tree_test.go
new file mode 100644
index 0000000..9fa791d
--- /dev/null
+++ b/internal/merkle/tree_test.go
@@ -0,0 +1,46 @@
+package merkle
+
+import (
+ "encoding/hex"
+ "fmt"
+ "testing"
+)
+
+func TestRootAndProof(t *testing.T) {
+ inputs := [][]byte{HashLeaf([]byte("a")), HashLeaf([]byte("b")), HashLeaf([]byte("c"))}
+ root := Root(inputs)
+ if len(root) == 0 {
+ t.Fatalf("expected root")
+ }
+
+ for i := range inputs {
+ proof := BuildProof(inputs, i)
+ if !VerifyProof(inputs[i], proof, root) {
+ t.Fatalf("proof verification failed for index %d", i)
+ }
+ }
+}
+
+func TestRootEmpty(t *testing.T) {
+ r := Root(nil)
+ if got := hex.EncodeToString(r); got != "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855" {
+ t.Fatalf("unexpected empty root: %s", got)
+ }
+}
+
+func TestAccumulatorRootMatchesRoot(t *testing.T) {
+ for n := 1; n <= 128; n++ {
+ leaves := make([][]byte, 0, n)
+ acc := NewAccumulator()
+ for i := 0; i < n; i++ {
+ leaf := HashLeaf([]byte(fmt.Sprintf("leaf-%d", i)))
+ leaves = append(leaves, leaf)
+ acc.AddLeafHash(leaf)
+ }
+ want := hex.EncodeToString(Root(leaves))
+ got := hex.EncodeToString(acc.RootDuplicateLast())
+ if got != want {
+ t.Fatalf("root mismatch n=%d got=%s want=%s", n, got, want)
+ }
+ }
+}
diff --git a/internal/state/engine.go b/internal/state/engine.go
new file mode 100644
index 0000000..4ecdaca
--- /dev/null
+++ b/internal/state/engine.go
@@ -0,0 +1,172 @@
+package state
+
+import (
+ "encoding/json"
+ "fmt"
+ "time"
+
+ "github.com/Fuwn/plutia/internal/config"
+ "github.com/Fuwn/plutia/internal/storage"
+ "github.com/Fuwn/plutia/internal/types"
+)
+
+type Engine struct {
+ store storage.Store
+ mode string
+}
+
+func New(store storage.Store, mode string) *Engine {
+ return &Engine{store: store, mode: mode}
+}
+
+func (e *Engine) Apply(op types.ParsedOperation, blockRef *types.BlockRefV1) error {
+ existing, ok, err := e.store.GetState(op.DID)
+ if err != nil {
+ return fmt.Errorf("load state: %w", err)
+ }
+ var prior *types.StateV1
+ if ok {
+ prior = &existing
+ }
+ next, err := ComputeNextState(op, prior)
+ if err != nil {
+ return err
+ }
+ if pebbleStore, ok := e.store.(*storage.PebbleStore); ok {
+ includeOpRef := e.mode == config.ModeMirror && blockRef != nil
+ if err := pebbleStore.ApplyOperationBatch(next, blockRef, includeOpRef); err != nil {
+ return fmt.Errorf("apply operation batch: %w", err)
+ }
+ return nil
+ }
+ if err := e.store.PutState(next); err != nil {
+ return fmt.Errorf("put state: %w", err)
+ }
+ if err := e.store.SetChainHead(op.DID, op.Sequence); err != nil {
+ return fmt.Errorf("set chain head: %w", err)
+ }
+ if err := e.store.AddDIDSequence(op.DID, op.Sequence); err != nil {
+ return fmt.Errorf("append did sequence: %w", err)
+ }
+ if e.mode == config.ModeMirror && blockRef != nil {
+ if err := e.store.PutOpSeqRef(op.Sequence, *blockRef); err != nil {
+ return fmt.Errorf("put opseq ref: %w", err)
+ }
+ }
+ if err := e.store.SetGlobalSeq(op.Sequence); err != nil {
+ return fmt.Errorf("set global seq: %w", err)
+ }
+ return nil
+}
+
+func ComputeNextState(op types.ParsedOperation, existing *types.StateV1) (types.StateV1, error) {
+ if existing != nil && op.Sequence <= existing.LatestOpSeq {
+ if existing.ChainTipHash == op.CID {
+ return *existing, nil
+ }
+ return types.StateV1{}, fmt.Errorf("non-monotonic sequence for %s: got %d <= %d", op.DID, op.Sequence, existing.LatestOpSeq)
+ }
+ doc, err := materializeDIDDocument(op.DID, op.Payload)
+ if err != nil {
+ return types.StateV1{}, err
+ }
+ next := types.StateV1{
+ Version: 1,
+ DID: op.DID,
+ DIDDocument: doc,
+ ChainTipHash: op.CID,
+ LatestOpSeq: op.Sequence,
+ UpdatedAt: time.Now().UTC(),
+ }
+ next.RotationKeys = extractRotationKeys(op.Payload)
+ if len(next.RotationKeys) == 0 && existing != nil {
+ next.RotationKeys = append([]string(nil), existing.RotationKeys...)
+ }
+ return next, nil
+}
+
+func materializeDIDDocument(did string, payload map[string]any) ([]byte, error) {
+ for _, k := range []string{"did_document", "didDoc", "document", "didDocument"} {
+ if v, ok := payload[k]; ok {
+ b, err := json.Marshal(v)
+ if err != nil {
+ return nil, fmt.Errorf("marshal did document field %s: %w", k, err)
+ }
+ return types.CanonicalizeJSON(b)
+ }
+ }
+ if v, ok := payload["alsoKnownAs"]; ok {
+ doc := map[string]any{
+ "id": did,
+ "alsoKnownAs": v,
+ }
+ b, err := json.Marshal(doc)
+ if err != nil {
+ return nil, fmt.Errorf("marshal derived did document: %w", err)
+ }
+ return types.CanonicalizeJSON(b)
+ }
+
+ doc := map[string]any{"id": did}
+ if typ, _ := payload["type"].(string); typ == "plc_tombstone" || typ == "tombstone" {
+ doc["deactivated"] = true
+ }
+ if handle, ok := payload["handle"].(string); ok && handle != "" {
+ doc["alsoKnownAs"] = []string{"at://" + handle}
+ }
+ if serviceEndpoint, ok := payload["service"].(string); ok && serviceEndpoint != "" {
+ doc["service"] = []map[string]any{
+ {
+ "id": "#atproto_pds",
+ "type": "AtprotoPersonalDataServer",
+ "serviceEndpoint": serviceEndpoint,
+ },
+ }
+ }
+ if signingKey, ok := payload["signingKey"].(string); ok && signingKey != "" {
+ doc["verificationMethod"] = []map[string]any{
+ {
+ "id": "#atproto",
+ "type": "Multikey",
+ "controller": did,
+ "publicKeyMultibase": signingKey,
+ },
+ }
+ doc["authentication"] = []string{"#atproto"}
+ }
+ b, err := json.Marshal(doc)
+ if err != nil {
+ return nil, fmt.Errorf("marshal synthesized did document: %w", err)
+ }
+ return types.CanonicalizeJSON(b)
+}
+
+func extractRotationKeys(payload map[string]any) []string {
+ keys := make([]string, 0, 4)
+ seen := map[string]struct{}{}
+ add := func(v string) {
+ if v == "" {
+ return
+ }
+ if _, ok := seen[v]; ok {
+ return
+ }
+ seen[v] = struct{}{}
+ keys = append(keys, v)
+ }
+
+ if arr, ok := payload["rotationKeys"].([]any); ok {
+ for _, v := range arr {
+ if s, ok := v.(string); ok {
+ add(s)
+ }
+ }
+ }
+ if recovery, ok := payload["recoveryKey"].(string); ok {
+ add(recovery)
+ }
+ if signing, ok := payload["signingKey"].(string); ok {
+ add(signing)
+ }
+ return keys
+}
diff --git a/internal/storage/blocklog.go b/internal/storage/blocklog.go
new file mode 100644
index 0000000..879d3bd
--- /dev/null
+++ b/internal/storage/blocklog.go
@@ -0,0 +1,330 @@
+package storage
+
+import (
+ "bytes"
+ "crypto/sha256"
+ "encoding/binary"
+ "encoding/hex"
+ "fmt"
+ "os"
+ "path/filepath"
+ "sort"
+ "strconv"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/Fuwn/plutia/internal/types"
+ "github.com/klauspost/compress/zstd"
+)
+
+type FlushInfo struct {
+ BlockID uint64
+ Hash string
+}
+
+type IntegrityReport struct {
+ VerifiedBlocks int
+ RemovedTempFiles []string
+ RemovedOrphans []uint64
+}
+
+type BlockLog struct {
+ dir string
+ zstdLevel int
+ targetSize int
+
+ mu sync.Mutex
+ buf bytes.Buffer
+ blockID uint64
+}
+
+func OpenBlockLog(dataDir string, zstdLevel int, targetMB int) (*BlockLog, error) {
+ dir := filepath.Join(dataDir, "ops")
+ if err := os.MkdirAll(dir, 0o755); err != nil {
+ return nil, fmt.Errorf("mkdir ops: %w", err)
+ }
+ nextID, err := detectNextBlockID(dir)
+ if err != nil {
+ return nil, err
+ }
+ return &BlockLog{
+ dir: dir,
+ zstdLevel: zstdLevel,
+ targetSize: targetMB * 1024 * 1024,
+ blockID: nextID,
+ }, nil
+}
+
+func (l *BlockLog) Append(seq uint64, did, cid, prev string, canonical []byte) (types.BlockRefV1, *FlushInfo, error) {
+ l.mu.Lock()
+ defer l.mu.Unlock()
+
+ record := encodeRecord(canonical)
+ offset := uint64(l.buf.Len())
+ if _, err := l.buf.Write(record); err != nil {
+ return types.BlockRefV1{}, nil, fmt.Errorf("buffer write: %w", err)
+ }
+
+ ref := types.BlockRefV1{
+ Version: 1,
+ BlockID: l.blockID,
+ Offset: offset,
+ Length: uint64(len(record)),
+ OpSeq: seq,
+ DID: did,
+ CID: cid,
+ PrevCID: prev,
+ OpHash: types.ComputeDigestCID(canonical),
+ Received: "",
+ }
+
+ if l.buf.Len() < l.targetSize {
+ return ref, nil, nil
+ }
+ flush, err := l.flushLocked()
+ if err != nil {
+ return types.BlockRefV1{}, nil, err
+ }
+ return ref, flush, nil
+}
+
+func (l *BlockLog) Flush() (*FlushInfo, error) {
+ l.mu.Lock()
+ defer l.mu.Unlock()
+ return l.flushLocked()
+}
+
+func (l *BlockLog) flushLocked() (*FlushInfo, error) {
+ if l.buf.Len() == 0 {
+ return nil, nil
+ }
+ encLevel := zstd.EncoderLevelFromZstd(l.zstdLevel)
+ enc, err := zstd.NewWriter(nil, zstd.WithEncoderLevel(encLevel))
+ if err != nil {
+ return nil, fmt.Errorf("zstd encoder: %w", err)
+ }
+ compressed := enc.EncodeAll(l.buf.Bytes(), nil)
+ if err := enc.Close(); err != nil {
+ return nil, fmt.Errorf("close zstd encoder: %w", err)
+ }
+
+ path := filepath.Join(l.dir, fmt.Sprintf("%06d.zst", l.blockID))
+ tmpPath := path + ".tmp"
+ f, err := os.Create(tmpPath)
+ if err != nil {
+ return nil, fmt.Errorf("create temp block file: %w", err)
+ }
+ half := len(compressed) / 2
+ if _, err := f.Write(compressed[:half]); err != nil {
+ _ = f.Close()
+ return nil, fmt.Errorf("write temp block first half: %w", err)
+ }
+ if sleepMs, err := strconv.Atoi(strings.TrimSpace(os.Getenv("PLUTIA_TEST_SLOW_FLUSH_MS"))); err == nil && sleepMs > 0 {
+ time.Sleep(time.Duration(sleepMs) * time.Millisecond)
+ }
+ if _, err := f.Write(compressed[half:]); err != nil {
+ _ = f.Close()
+ return nil, fmt.Errorf("write temp block second half: %w", err)
+ }
+ if err := f.Sync(); err != nil {
+ _ = f.Close()
+ return nil, fmt.Errorf("sync temp block file: %w", err)
+ }
+ if err := f.Close(); err != nil {
+ return nil, fmt.Errorf("close temp block file: %w", err)
+ }
+ if err := os.Rename(tmpPath, path); err != nil {
+ return nil, fmt.Errorf("rename temp block file: %w", err)
+ }
+ sum := sha256.Sum256(compressed)
+ hash := hex.EncodeToString(sum[:])
+
+ l.blockID++
+ l.buf.Reset()
+ return &FlushInfo{BlockID: l.blockID - 1, Hash: hash}, nil
+}
+
+func (l *BlockLog) ValidateAndRecover(store Store) (*IntegrityReport, error) {
+ report := &IntegrityReport{
+ RemovedTempFiles: make([]string, 0),
+ RemovedOrphans: make([]uint64, 0),
+ }
+ entries, err := os.ReadDir(l.dir)
+ if err != nil {
+ return nil, fmt.Errorf("read ops dir: %w", err)
+ }
+ storedEntries, err := store.ListBlockHashEntries()
+ if err != nil {
+ return nil, fmt.Errorf("list stored block hashes: %w", err)
+ }
+ stored := make(map[uint64]string, len(storedEntries))
+ for _, e := range storedEntries {
+ stored[e.BlockID] = e.Hash
+ }
+
+ files := make(map[uint64]string)
+ for _, e := range entries {
+ name := e.Name()
+ if strings.HasSuffix(name, ".tmp") {
+ path := filepath.Join(l.dir, name)
+ if err := os.Remove(path); err != nil {
+ return nil, fmt.Errorf("remove temp block file %s: %w", name, err)
+ }
+ report.RemovedTempFiles = append(report.RemovedTempFiles, name)
+ continue
+ }
+ if !strings.HasSuffix(name, ".zst") {
+ continue
+ }
+ base := strings.TrimSuffix(name, ".zst")
+ id, err := strconv.Atoi(base)
+ if err != nil {
+ continue
+ }
+ files[uint64(id)] = filepath.Join(l.dir, name)
+ }
+
+ for id, path := range files {
+ expected, ok := stored[id]
+ if !ok {
+ if err := os.Remove(path); err != nil {
+ return nil, fmt.Errorf("remove orphan block %d: %w", id, err)
+ }
+ report.RemovedOrphans = append(report.RemovedOrphans, id)
+ continue
+ }
+ actual, err := fileHash(path)
+ if err != nil {
+ return nil, fmt.Errorf("hash block %d: %w", id, err)
+ }
+ if actual != expected {
+ return nil, fmt.Errorf("block hash mismatch block=%d expected=%s got=%s", id, expected, actual)
+ }
+ report.VerifiedBlocks++
+ }
+
+ for _, e := range storedEntries {
+ if _, ok := files[e.BlockID]; !ok {
+ return nil, fmt.Errorf("missing block file for indexed block %d", e.BlockID)
+ }
+ }
+
+ return report, nil
+}
+
+func (l *BlockLog) ReadRecord(ref types.BlockRefV1) ([]byte, error) {
+ path := filepath.Join(l.dir, fmt.Sprintf("%06d.zst", ref.BlockID))
+ compressed, err := os.ReadFile(path)
+ if err != nil {
+ return nil, fmt.Errorf("read block file: %w", err)
+ }
+ dec, err := zstd.NewReader(nil)
+ if err != nil {
+ return nil, fmt.Errorf("zstd reader: %w", err)
+ }
+ decompressed, err := dec.DecodeAll(compressed, nil)
+ if err != nil {
+ return nil, fmt.Errorf("decode block: %w", err)
+ }
+ dec.Close()
+ if ref.Offset+ref.Length > uint64(len(decompressed)) {
+ return nil, fmt.Errorf("record bounds out of range")
+ }
+ record := decompressed[ref.Offset : ref.Offset+ref.Length]
+ _, payload, err := decodeRecord(record)
+ if err != nil {
+ return nil, err
+ }
+ return payload, nil
+}
+
+func (l *BlockLog) IterateBlockRecords(blockID uint64, fn func(offset uint64, payload []byte) error) error {
+ path := filepath.Join(l.dir, fmt.Sprintf("%06d.zst", blockID))
+ compressed, err := os.ReadFile(path)
+ if err != nil {
+ return fmt.Errorf("read block file: %w", err)
+ }
+ dec, err := zstd.NewReader(nil)
+ if err != nil {
+ return fmt.Errorf("zstd reader: %w", err)
+ }
+ decompressed, err := dec.DecodeAll(compressed, nil)
+ if err != nil {
+ return fmt.Errorf("decode block: %w", err)
+ }
+ dec.Close()
+
+ var offset uint64
+ for offset < uint64(len(decompressed)) {
+ start := offset
+ length, n := binary.Uvarint(decompressed[offset:])
+ if n <= 0 {
+ return fmt.Errorf("invalid varint at offset %d", offset)
+ }
+ offset += uint64(n)
+ if offset+length > uint64(len(decompressed)) {
+ return fmt.Errorf("record out of bounds at offset %d", start)
+ }
+ payload := decompressed[offset : offset+length]
+ offset += length
+ if err := fn(start, payload); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+func encodeRecord(payload []byte) []byte {
+ var hdr [binary.MaxVarintLen64]byte
+ n := binary.PutUvarint(hdr[:], uint64(len(payload)))
+ out := make([]byte, n+len(payload))
+ copy(out, hdr[:n])
+ copy(out[n:], payload)
+ return out
+}
+
+func decodeRecord(b []byte) (uint64, []byte, error) {
+ length, n := binary.Uvarint(b)
+ if n <= 0 {
+ return 0, nil, fmt.Errorf("invalid varint record")
+ }
+ if uint64(n)+length != uint64(len(b)) {
+ return 0, nil, fmt.Errorf("record length mismatch")
+ }
+ return length, b[n:], nil
+}
+
+func detectNextBlockID(dir string) (uint64, error) {
+ entries, err := os.ReadDir(dir)
+ if err != nil {
+ return 0, fmt.Errorf("read ops dir: %w", err)
+ }
+ ids := make([]int, 0)
+ for _, e := range entries {
+ name := e.Name()
+ if !strings.HasSuffix(name, ".zst") {
+ continue
+ }
+ base := strings.TrimSuffix(name, ".zst")
+ n, err := strconv.Atoi(base)
+ if err != nil {
+ continue
+ }
+ ids = append(ids, n)
+ }
+ if len(ids) == 0 {
+ return 1, nil
+ }
+ sort.Ints(ids)
+ return uint64(ids[len(ids)-1] + 1), nil
+}
+
+func fileHash(path string) (string, error) {
+ b, err := os.ReadFile(path)
+ if err != nil {
+ return "", err
+ }
+ sum := sha256.Sum256(b)
+ return hex.EncodeToString(sum[:]), nil
+}
diff --git a/internal/storage/blocklog_test.go b/internal/storage/blocklog_test.go
new file mode 100644
index 0000000..23045cc
--- /dev/null
+++ b/internal/storage/blocklog_test.go
@@ -0,0 +1,56 @@
+package storage
+
+import (
+ "os"
+ "path/filepath"
+ "testing"
+)
+
+func TestBlockLogAppendReadFlush(t *testing.T) {
+ tmp := t.TempDir()
+ log, err := OpenBlockLog(tmp, 3, 4)
+ if err != nil {
+ t.Fatalf("OpenBlockLog: %v", err)
+ }
+
+ payload1 := []byte(`{"op":1}`)
+ payload2 := []byte(`{"op":2}`)
+ ref1, flush, err := log.Append(1, "did:plc:a", "cid1", "", payload1)
+ if err != nil {
+ t.Fatalf("append 1: %v", err)
+ }
+ if flush != nil {
+ t.Fatalf("unexpected automatic flush")
+ }
+ ref2, _, err := log.Append(2, "did:plc:a", "cid2", "cid1", payload2)
+ if err != nil {
+ t.Fatalf("append 2: %v", err)
+ }
+
+ flushed, err := log.Flush()
+ if err != nil {
+ t.Fatalf("flush: %v", err)
+ }
+ if flushed == nil {
+ t.Fatalf("expected flushed block")
+ }
+ blockFile := filepath.Join(tmp, "ops", "000001.zst")
+ if _, err := os.Stat(blockFile); err != nil {
+ t.Fatalf("expected block file: %v", err)
+ }
+
+ got1, err := log.ReadRecord(ref1)
+ if err != nil {
+ t.Fatalf("read record1: %v", err)
+ }
+ if string(got1) != string(payload1) {
+ t.Fatalf("payload1 mismatch: got %s want %s", got1, payload1)
+ }
+ got2, err := log.ReadRecord(ref2)
+ if err != nil {
+ t.Fatalf("read record2: %v", err)
+ }
+ if string(got2) != string(payload2) {
+ t.Fatalf("payload2 mismatch: got %s want %s", got2, payload2)
+ }
+}
diff --git a/internal/storage/pebble_store.go b/internal/storage/pebble_store.go
new file mode 100644
index 0000000..5e15d7e
--- /dev/null
+++ b/internal/storage/pebble_store.go
@@ -0,0 +1,373 @@
+package storage
+
+import (
+ "encoding/binary"
+ "encoding/json"
+ "fmt"
+ "path/filepath"
+ "sort"
+
+ "github.com/Fuwn/plutia/internal/types"
+ "github.com/cockroachdb/pebble"
+)
+
+var _ Store = (*PebbleStore)(nil)
+
+type PebbleStore struct {
+ db *pebble.DB
+}
+
+func OpenPebble(dataDir string) (*PebbleStore, error) {
+ indexDir := filepath.Join(dataDir, "index")
+ db, err := pebble.Open(indexDir, &pebble.Options{})
+ if err != nil {
+ return nil, fmt.Errorf("open pebble: %w", err)
+ }
+ return &PebbleStore{db: db}, nil
+}
+
+func (p *PebbleStore) Close() error { return p.db.Close() }
+
+func (p *PebbleStore) GetMode() (string, error) {
+ v, ok, err := p.getString(metaKey("mode"))
+ return v, okOrErr(ok, err)
+}
+
+func (p *PebbleStore) SetMode(mode string) error {
+ return p.db.Set(metaKey("mode"), []byte(mode), pebble.Sync)
+}
+
+func (p *PebbleStore) GetGlobalSeq() (uint64, error) {
+ v, ok, err := p.getUint64(metaKey("global_seq"))
+ if err != nil {
+ return 0, err
+ }
+ if !ok {
+ return 0, nil
+ }
+ return v, nil
+}
+
+func (p *PebbleStore) SetGlobalSeq(seq uint64) error {
+ b := make([]byte, 8)
+ binary.BigEndian.PutUint64(b, seq)
+ return p.db.Set(metaKey("global_seq"), b, pebble.Sync)
+}
+
+func (p *PebbleStore) PutState(state types.StateV1) error {
+ b, err := json.Marshal(state)
+ if err != nil {
+ return fmt.Errorf("marshal state: %w", err)
+ }
+ return p.db.Set(didKey(state.DID), b, pebble.Sync)
+}
+
+func (p *PebbleStore) ApplyOperationBatch(state types.StateV1, ref *types.BlockRefV1, includeOpRef bool) error {
+ var opRef *types.BlockRefV1
+ if includeOpRef {
+ opRef = ref
+ }
+ return p.ApplyOperationsBatch([]OperationMutation{{State: state, Ref: opRef}}, nil)
+}
+
+func (p *PebbleStore) GetState(did string) (types.StateV1, bool, error) {
+ b, ok, err := p.getBytes(didKey(did))
+ if err != nil || !ok {
+ return types.StateV1{}, ok, err
+ }
+ var s types.StateV1
+ if err := json.Unmarshal(b, &s); err != nil {
+ return types.StateV1{}, false, fmt.Errorf("unmarshal state: %w", err)
+ }
+ return s, true, nil
+}
+
+func (p *PebbleStore) ListStates() ([]types.StateV1, error) {
+ states := make([]types.StateV1, 0)
+ if err := p.ForEachState(func(s types.StateV1) error {
+ states = append(states, s)
+ return nil
+ }); err != nil {
+ return nil, err
+ }
+ sort.Slice(states, func(i, j int) bool { return states[i].DID < states[j].DID })
+ return states, nil
+}
+
+func (p *PebbleStore) ForEachState(fn func(types.StateV1) error) error {
+ iter, err := p.db.NewIter(&pebble.IterOptions{LowerBound: []byte("did:"), UpperBound: []byte("did;")})
+ if err != nil {
+ return fmt.Errorf("new iterator: %w", err)
+ }
+ defer iter.Close()
+
+ for iter.First(); iter.Valid(); iter.Next() {
+ var s types.StateV1
+ if err := json.Unmarshal(iter.Value(), &s); err != nil {
+ return fmt.Errorf("unmarshal state: %w", err)
+ }
+ if err := fn(s); err != nil {
+ return err
+ }
+ }
+ if err := iter.Error(); err != nil {
+ return fmt.Errorf("iterate states: %w", err)
+ }
+ return nil
+}
+
+func (p *PebbleStore) ApplyOperationsBatch(ops []OperationMutation, blockHashes []BlockHashEntry) error {
+ if len(ops) == 0 && len(blockHashes) == 0 {
+ return nil
+ }
+ batch := p.db.NewBatch()
+ defer batch.Close()
+
+ for _, op := range ops {
+ stateBytes, err := json.Marshal(op.State)
+ if err != nil {
+ return fmt.Errorf("marshal state: %w", err)
+ }
+ seqBytes := make([]byte, 8)
+ binary.BigEndian.PutUint64(seqBytes, op.State.LatestOpSeq)
+
+ if err := batch.Set(didKey(op.State.DID), stateBytes, nil); err != nil {
+ return err
+ }
+ if err := batch.Set(chainKey(op.State.DID), seqBytes, nil); err != nil {
+ return err
+ }
+ if err := batch.Set(didOpKey(op.State.DID, op.State.LatestOpSeq), []byte{1}, nil); err != nil {
+ return err
+ }
+ if op.Ref != nil {
+ refBytes, err := json.Marshal(op.Ref)
+ if err != nil {
+ return fmt.Errorf("marshal opseq ref: %w", err)
+ }
+ if err := batch.Set(opSeqKey(op.State.LatestOpSeq), refBytes, nil); err != nil {
+ return err
+ }
+ }
+ }
+
+ for _, bh := range blockHashes {
+ if err := batch.Set(blockKey(bh.BlockID), []byte(bh.Hash), nil); err != nil {
+ return err
+ }
+ }
+
+ if len(ops) > 0 {
+ lastSeq := ops[len(ops)-1].State.LatestOpSeq
+ seqBytes := make([]byte, 8)
+ binary.BigEndian.PutUint64(seqBytes, lastSeq)
+ if err := batch.Set(metaKey("global_seq"), seqBytes, nil); err != nil {
+ return err
+ }
+ }
+
+ return batch.Commit(pebble.Sync)
+}
+
+func (p *PebbleStore) SetChainHead(did string, seq uint64) error {
+ b := make([]byte, 8)
+ binary.BigEndian.PutUint64(b, seq)
+ return p.db.Set(chainKey(did), b, pebble.Sync)
+}
+
+func (p *PebbleStore) GetChainHead(did string) (uint64, bool, error) {
+ return p.getUint64(chainKey(did))
+}
+
+func (p *PebbleStore) AddDIDSequence(did string, seq uint64) error {
+ return p.db.Set(didOpKey(did, seq), []byte{1}, pebble.Sync)
+}
+
+func (p *PebbleStore) ListDIDSequences(did string) ([]uint64, error) {
+ prefix := []byte("didop:" + did + ":")
+ upper := append(append([]byte(nil), prefix...), 0xFF)
+ iter, err := p.db.NewIter(&pebble.IterOptions{LowerBound: prefix, UpperBound: upper})
+ if err != nil {
+ return nil, fmt.Errorf("new iterator: %w", err)
+ }
+ defer iter.Close()
+
+ seqs := make([]uint64, 0)
+ for iter.First(); iter.Valid(); iter.Next() {
+ key := iter.Key()
+ if len(key) < len(prefix)+8 {
+ continue
+ }
+ seq := binary.BigEndian.Uint64(key[len(prefix):])
+ seqs = append(seqs, seq)
+ }
+ if err := iter.Error(); err != nil {
+ return nil, fmt.Errorf("iterate did op sequences: %w", err)
+ }
+ sort.Slice(seqs, func(i, j int) bool { return seqs[i] < seqs[j] })
+ return seqs, nil
+}
+
+func (p *PebbleStore) PutOpSeqRef(seq uint64, ref types.BlockRefV1) error {
+ b, err := json.Marshal(ref)
+ if err != nil {
+ return fmt.Errorf("marshal opseq ref: %w", err)
+ }
+ return p.db.Set(opSeqKey(seq), b, pebble.Sync)
+}
+
+func (p *PebbleStore) GetOpSeqRef(seq uint64) (types.BlockRefV1, bool, error) {
+ b, ok, err := p.getBytes(opSeqKey(seq))
+ if err != nil || !ok {
+ return types.BlockRefV1{}, ok, err
+ }
+ var ref types.BlockRefV1
+ if err := json.Unmarshal(b, &ref); err != nil {
+ return types.BlockRefV1{}, false, fmt.Errorf("unmarshal opseq ref: %w", err)
+ }
+ return ref, true, nil
+}
+
+func (p *PebbleStore) PutBlockHash(blockID uint64, hash string) error {
+ return p.db.Set(blockKey(blockID), []byte(hash), pebble.Sync)
+}
+
+func (p *PebbleStore) GetBlockHash(blockID uint64) (string, bool, error) {
+ return p.getString(blockKey(blockID))
+}
+
+func (p *PebbleStore) ListBlockHashes() ([]string, error) {
+ iter, err := p.db.NewIter(&pebble.IterOptions{LowerBound: []byte("block:"), UpperBound: []byte("block;")})
+ if err != nil {
+ return nil, fmt.Errorf("new iterator: %w", err)
+ }
+ defer iter.Close()
+
+ hashes := make([]string, 0)
+ for iter.First(); iter.Valid(); iter.Next() {
+ hashes = append(hashes, string(iter.Value()))
+ }
+ if err := iter.Error(); err != nil {
+ return nil, fmt.Errorf("iterate blocks: %w", err)
+ }
+ return hashes, nil
+}
+
+func (p *PebbleStore) ListBlockHashEntries() ([]BlockHashEntry, error) {
+ iter, err := p.db.NewIter(&pebble.IterOptions{LowerBound: []byte("block:"), UpperBound: []byte("block;")})
+ if err != nil {
+ return nil, fmt.Errorf("new iterator: %w", err)
+ }
+ defer iter.Close()
+
+ out := make([]BlockHashEntry, 0)
+ for iter.First(); iter.Valid(); iter.Next() {
+ key := iter.Key()
+ if len(key) < len("block:")+8 {
+ continue
+ }
+ id := binary.BigEndian.Uint64(key[len("block:"):])
+ out = append(out, BlockHashEntry{BlockID: id, Hash: string(iter.Value())})
+ }
+ if err := iter.Error(); err != nil {
+ return nil, fmt.Errorf("iterate block entries: %w", err)
+ }
+ sort.Slice(out, func(i, j int) bool { return out[i].BlockID < out[j].BlockID })
+ return out, nil
+}
+
+func (p *PebbleStore) PutCheckpoint(cp types.CheckpointV1) error {
+ b, err := json.Marshal(cp)
+ if err != nil {
+ return fmt.Errorf("marshal checkpoint: %w", err)
+ }
+ if err := p.db.Set(checkpointKey(cp.Sequence), b, pebble.Sync); err != nil {
+ return err
+ }
+ latest := make([]byte, 8)
+ binary.BigEndian.PutUint64(latest, cp.Sequence)
+ return p.db.Set(metaKey("latest_checkpoint"), latest, pebble.Sync)
+}
+
+func (p *PebbleStore) GetCheckpoint(sequence uint64) (types.CheckpointV1, bool, error) {
+ b, ok, err := p.getBytes(checkpointKey(sequence))
+ if err != nil || !ok {
+ return types.CheckpointV1{}, ok, err
+ }
+ var cp types.CheckpointV1
+ if err := json.Unmarshal(b, &cp); err != nil {
+ return types.CheckpointV1{}, false, fmt.Errorf("unmarshal checkpoint: %w", err)
+ }
+ return cp, true, nil
+}
+
+func (p *PebbleStore) GetLatestCheckpoint() (types.CheckpointV1, bool, error) {
+ seq, ok, err := p.getUint64(metaKey("latest_checkpoint"))
+ if err != nil || !ok {
+ return types.CheckpointV1{}, ok, err
+ }
+ return p.GetCheckpoint(seq)
+}
+
+func (p *PebbleStore) getBytes(key []byte) ([]byte, bool, error) {
+ v, closer, err := p.db.Get(key)
+ if err != nil {
+ if err == pebble.ErrNotFound {
+ return nil, false, nil
+ }
+ return nil, false, err
+ }
+ defer closer.Close()
+ return append([]byte(nil), v...), true, nil
+}
+
+func (p *PebbleStore) getString(key []byte) (string, bool, error) {
+ b, ok, err := p.getBytes(key)
+ if err != nil || !ok {
+ return "", ok, err
+ }
+ return string(b), true, nil
+}
+
+func (p *PebbleStore) getUint64(key []byte) (uint64, bool, error) {
+ b, ok, err := p.getBytes(key)
+ if err != nil || !ok {
+ return 0, ok, err
+ }
+ if len(b) != 8 {
+ return 0, false, fmt.Errorf("invalid uint64 value length for %q", key)
+ }
+ return binary.BigEndian.Uint64(b), true, nil
+}
+
+func didKey(did string) []byte { return []byte("did:" + did) }
+func chainKey(did string) []byte { return []byte("chain:" + did) }
+func metaKey(k string) []byte { return []byte("meta:" + k) }
+func checkpointKey(s uint64) []byte {
+ return append([]byte("checkpoint:"), u64bytes(s)...)
+}
+func opSeqKey(s uint64) []byte {
+ return append([]byte("opseq:"), u64bytes(s)...)
+}
+func blockKey(id uint64) []byte {
+ return append([]byte("block:"), u64bytes(id)...)
+}
+func didOpKey(did string, seq uint64) []byte {
+ return append([]byte("didop:"+did+":"), u64bytes(seq)...)
+}
+
+func u64bytes(v uint64) []byte {
+ b := make([]byte, 8)
+ binary.BigEndian.PutUint64(b, v)
+ return b
+}
+
+func okOrErr(ok bool, err error) error {
+ if err != nil {
+ return err
+ }
+ if !ok {
+ return nil
+ }
+ return nil
+}
diff --git a/internal/storage/pebble_store_batch_durability_test.go b/internal/storage/pebble_store_batch_durability_test.go
new file mode 100644
index 0000000..9d881c3
--- /dev/null
+++ b/internal/storage/pebble_store_batch_durability_test.go
@@ -0,0 +1,66 @@
+package storage
+
+import (
+ "testing"
+ "time"
+
+ "github.com/Fuwn/plutia/internal/types"
+)
+
+func TestApplyOperationsBatch_DurabilityBetweenBatches(t *testing.T) {
+ tmp := t.TempDir()
+ store, err := OpenPebble(tmp)
+ if err != nil {
+ t.Fatalf("open pebble: %v", err)
+ }
+
+ makeMutation := func(did string, seq uint64) OperationMutation {
+ return OperationMutation{
+ State: types.StateV1{
+ Version: 1,
+ DID: did,
+ DIDDocument: []byte(`{"id":"` + did + `"}`),
+ ChainTipHash: "sha256:tip",
+ LatestOpSeq: seq,
+ UpdatedAt: time.Now().UTC(),
+ },
+ Ref: &types.BlockRefV1{
+ Version: 1,
+ BlockID: 1,
+ OpSeq: seq,
+ DID: did,
+ CID: "sha256:cid",
+ },
+ }
+ }
+
+ if err := store.ApplyOperationsBatch([]OperationMutation{
+ makeMutation("did:plc:a", 1),
+ makeMutation("did:plc:b", 2),
+ }, []BlockHashEntry{{BlockID: 1, Hash: "abc"}}); err != nil {
+ t.Fatalf("apply first batch: %v", err)
+ }
+ if err := store.Close(); err != nil {
+ t.Fatalf("close store: %v", err)
+ }
+
+ // Simulate a crash before the second batch commit by reopening without applying it.
+ reopened, err := OpenPebble(tmp)
+ if err != nil {
+ t.Fatalf("reopen pebble: %v", err)
+ }
+ defer reopened.Close()
+
+ seq, err := reopened.GetGlobalSeq()
+ if err != nil {
+ t.Fatalf("get global seq: %v", err)
+ }
+ if seq != 2 {
+ t.Fatalf("global seq mismatch after simulated crash: got %d want 2", seq)
+ }
+ if _, ok, err := reopened.GetOpSeqRef(3); err != nil {
+ t.Fatalf("get opseq 3: %v", err)
+ } else if ok {
+ t.Fatalf("unexpected opseq ref for uncommitted sequence 3")
+ }
+}
diff --git a/internal/storage/pebble_store_batch_test.go b/internal/storage/pebble_store_batch_test.go
new file mode 100644
index 0000000..85afc92
--- /dev/null
+++ b/internal/storage/pebble_store_batch_test.go
@@ -0,0 +1,74 @@
+package storage
+
+import (
+ "testing"
+ "time"
+
+ "github.com/Fuwn/plutia/internal/types"
+)
+
+func TestPebbleStoreApplyOperationBatch(t *testing.T) {
+ tmp := t.TempDir()
+ store, err := OpenPebble(tmp)
+ if err != nil {
+ t.Fatalf("open pebble: %v", err)
+ }
+ defer store.Close()
+
+ state := types.StateV1{
+ Version: 1,
+ DID: "did:plc:test",
+ DIDDocument: []byte(`{"id":"did:plc:test"}`),
+ ChainTipHash: "bafy-state-tip",
+ LatestOpSeq: 42,
+ UpdatedAt: time.Now().UTC(),
+ }
+ ref := &types.BlockRefV1{
+ Version: 1,
+ BlockID: 9,
+ Offset: 123,
+ Length: 456,
+ OpSeq: 42,
+ DID: "did:plc:test",
+ CID: "bafy-op-cid",
+ OpHash: "bafy-op-hash",
+ }
+
+ if err := store.ApplyOperationBatch(state, ref, true); err != nil {
+ t.Fatalf("apply operation batch: %v", err)
+ }
+
+ gotState, ok, err := store.GetState(state.DID)
+ if err != nil || !ok {
+ t.Fatalf("get state: ok=%v err=%v", ok, err)
+ }
+ if gotState.ChainTipHash != state.ChainTipHash || gotState.LatestOpSeq != state.LatestOpSeq {
+ t.Fatalf("state mismatch: got tip=%s seq=%d", gotState.ChainTipHash, gotState.LatestOpSeq)
+ }
+
+ head, ok, err := store.GetChainHead(state.DID)
+ if err != nil || !ok || head != state.LatestOpSeq {
+ t.Fatalf("chain head mismatch: head=%d ok=%v err=%v", head, ok, err)
+ }
+
+ seqs, err := store.ListDIDSequences(state.DID)
+ if err != nil {
+ t.Fatalf("list did sequences: %v", err)
+ }
+ if len(seqs) != 1 || seqs[0] != state.LatestOpSeq {
+ t.Fatalf("did sequence mismatch: %v", seqs)
+ }
+
+ gotRef, ok, err := store.GetOpSeqRef(state.LatestOpSeq)
+ if err != nil || !ok {
+ t.Fatalf("get opseq ref: ok=%v err=%v", ok, err)
+ }
+ if gotRef.BlockID != ref.BlockID || gotRef.CID != ref.CID {
+ t.Fatalf("op ref mismatch: got block=%d cid=%s", gotRef.BlockID, gotRef.CID)
+ }
+
+ globalSeq, err := store.GetGlobalSeq()
+ if err != nil || globalSeq != state.LatestOpSeq {
+ t.Fatalf("global seq mismatch: seq=%d err=%v", globalSeq, err)
+ }
+}
diff --git a/internal/storage/store.go b/internal/storage/store.go
new file mode 100644
index 0000000..59055b7
--- /dev/null
+++ b/internal/storage/store.go
@@ -0,0 +1,46 @@
+package storage
+
+import "github.com/Fuwn/plutia/internal/types"
+
+type BlockHashEntry struct {
+ BlockID uint64
+ Hash string
+}
+
+type OperationMutation struct {
+ State types.StateV1
+ Ref *types.BlockRefV1
+}
+
+type Store interface {
+ Close() error
+
+ GetMode() (string, error)
+ SetMode(mode string) error
+
+ GetGlobalSeq() (uint64, error)
+ SetGlobalSeq(seq uint64) error
+
+ PutState(state types.StateV1) error
+ GetState(did string) (types.StateV1, bool, error)
+ ListStates() ([]types.StateV1, error)
+ ForEachState(fn func(types.StateV1) error) error
+ ApplyOperationsBatch(ops []OperationMutation, blockHashes []BlockHashEntry) error
+
+ SetChainHead(did string, seq uint64) error
+ GetChainHead(did string) (uint64, bool, error)
+ AddDIDSequence(did string, seq uint64) error
+ ListDIDSequences(did string) ([]uint64, error)
+
+ PutOpSeqRef(seq uint64, ref types.BlockRefV1) error
+ GetOpSeqRef(seq uint64) (types.BlockRefV1, bool, error)
+
+ PutBlockHash(blockID uint64, hash string) error
+ GetBlockHash(blockID uint64) (string, bool, error)
+ ListBlockHashes() ([]string, error)
+ ListBlockHashEntries() ([]BlockHashEntry, error)
+
+ PutCheckpoint(cp types.CheckpointV1) error
+ GetCheckpoint(sequence uint64) (types.CheckpointV1, bool, error)
+ GetLatestCheckpoint() (types.CheckpointV1, bool, error)
+}
diff --git a/internal/types/checkpoint.go b/internal/types/checkpoint.go
new file mode 100644
index 0000000..7f5f41a
--- /dev/null
+++ b/internal/types/checkpoint.go
@@ -0,0 +1,13 @@
+package types
+
+type DIDLeaf struct {
+ DID string
+ ChainTipHash string
+}
+
+type CheckpointStateSnapshotV1 struct {
+ Version uint8 `json:"v"`
+ Sequence uint64 `json:"sequence"`
+ CreatedAt string `json:"created_at"`
+ Leaves []DIDLeaf `json:"leaves"`
+}
diff --git a/internal/types/operation.go b/internal/types/operation.go
new file mode 100644
index 0000000..2facb82
--- /dev/null
+++ b/internal/types/operation.go
@@ -0,0 +1,77 @@
+package types
+
+import (
+ "bytes"
+ "crypto/sha256"
+ "encoding/hex"
+ "encoding/json"
+ "fmt"
+)
+
+// ExportRecord mirrors a line/event from plc.directory export stream.
+type ExportRecord struct {
+ Seq uint64 `json:"seq"`
+ DID string `json:"did"`
+ CreatedAt string `json:"createdAt,omitempty"`
+ CID string `json:"cid,omitempty"`
+ Nullified bool `json:"nullified,omitempty"`
+ Operation json.RawMessage `json:"operation"`
+}
+
+type ParsedOperation struct {
+ DID string
+ CanonicalBytes []byte
+ Payload map[string]any
+ Sequence uint64
+ CID string
+ Prev string
+ RawRecord ExportRecord
+}
+
+func CanonicalizeJSON(raw []byte) ([]byte, error) {
+ trimmed := bytes.TrimSpace(raw)
+ if len(trimmed) == 0 {
+ return nil, fmt.Errorf("empty json payload")
+ }
+ if !json.Valid(trimmed) {
+ return nil, fmt.Errorf("invalid json payload")
+ }
+ var out bytes.Buffer
+ if err := json.Compact(&out, trimmed); err != nil {
+ return nil, fmt.Errorf("compact json: %w", err)
+ }
+ return out.Bytes(), nil
+}
+
+func ParseOperation(rec ExportRecord) (ParsedOperation, error) {
+ if rec.DID == "" {
+ return ParsedOperation{}, fmt.Errorf("missing did")
+ }
+ canonical, err := CanonicalizeJSON(rec.Operation)
+ if err != nil {
+ return ParsedOperation{}, err
+ }
+ payload := map[string]any{}
+ if err := json.Unmarshal(canonical, &payload); err != nil {
+ return ParsedOperation{}, fmt.Errorf("decode operation: %w", err)
+ }
+ prev, _ := payload["prev"].(string)
+ cid := rec.CID
+ if cid == "" {
+ cid = ComputeDigestCID(canonical)
+ }
+ return ParsedOperation{
+ DID: rec.DID,
+ CanonicalBytes: canonical,
+ Payload: payload,
+ Sequence: rec.Seq,
+ CID: cid,
+ Prev: prev,
+ RawRecord: rec,
+ }, nil
+}
+
+func ComputeDigestCID(payload []byte) string {
+ sum := sha256.Sum256(payload)
+ return "sha256:" + hex.EncodeToString(sum[:])
+}
diff --git a/internal/types/operation_test.go b/internal/types/operation_test.go
new file mode 100644
index 0000000..a0cbc38
--- /dev/null
+++ b/internal/types/operation_test.go
@@ -0,0 +1,35 @@
+package types
+
+import (
+ "encoding/json"
+ "testing"
+)
+
+func TestCanonicalizeJSON(t *testing.T) {
+ raw := []byte(` { "z": 1, "a" : { "b" : 2 } } `)
+ canon, err := CanonicalizeJSON(raw)
+ if err != nil {
+ t.Fatalf("CanonicalizeJSON returned error: %v", err)
+ }
+ if got, want := string(canon), `{"z":1,"a":{"b":2}}`; got != want {
+ t.Fatalf("canonical mismatch: got %s want %s", got, want)
+ }
+}
+
+func TestParseOperationComputesCID(t *testing.T) {
+ rec := ExportRecord{
+ Seq: 10,
+ DID: "did:plc:test",
+ Operation: json.RawMessage(`{"didDoc":{"id":"did:plc:test"},"sig":"abc","publicKey":"def"}`),
+ }
+ op, err := ParseOperation(rec)
+ if err != nil {
+ t.Fatalf("ParseOperation returned error: %v", err)
+ }
+ if op.CID == "" {
+ t.Fatalf("expected computed CID")
+ }
+ if op.Sequence != rec.Seq {
+ t.Fatalf("seq mismatch: got %d want %d", op.Sequence, rec.Seq)
+ }
+}
diff --git a/internal/types/state.go b/internal/types/state.go
new file mode 100644
index 0000000..cba1235
--- /dev/null
+++ b/internal/types/state.go
@@ -0,0 +1,45 @@
+package types
+
+import "time"
+
+// StateV1 is versioned for KV compatibility.
+type StateV1 struct {
+ Version uint8 `json:"v"`
+ DID string `json:"did"`
+ DIDDocument []byte `json:"did_document"`
+ ChainTipHash string `json:"chain_tip_hash"`
+ RotationKeys []string `json:"rotation_keys,omitempty"`
+ LatestOpSeq uint64 `json:"latest_op_seq"`
+ UpdatedAt time.Time `json:"updated_at"`
+}
+
+type BlockRefV1 struct {
+ Version uint8 `json:"v"`
+ BlockID uint64 `json:"block_id"`
+ Offset uint64 `json:"offset"`
+ Length uint64 `json:"length"`
+ OpSeq uint64 `json:"op_seq"`
+ DID string `json:"did"`
+ CID string `json:"cid"`
+ PrevCID string `json:"prev_cid"`
+ OpHash string `json:"op_hash"`
+ Received string `json:"received"`
+}
+
+type CheckpointReference struct {
+ Sequence uint64 `json:"sequence"`
+ CheckpointHash string `json:"checkpoint_hash"`
+}
+
+// CheckpointV1 is serialized deterministically for signing.
+type CheckpointV1 struct {
+ Version uint8 `json:"v"`
+ Sequence uint64 `json:"sequence"`
+ Timestamp string `json:"timestamp"`
+ DIDMerkleRoot string `json:"did_merkle_root"`
+ BlockMerkleRoot string `json:"block_merkle_root"`
+ PreviousCheckpointHash string `json:"previous_checkpoint_hash"`
+ KeyID string `json:"key_id"`
+ Signature string `json:"signature"`
+ CheckpointHash string `json:"checkpoint_hash"`
+}
diff --git a/internal/verify/verifier.go b/internal/verify/verifier.go
new file mode 100644
index 0000000..ae648d3
--- /dev/null
+++ b/internal/verify/verifier.go
@@ -0,0 +1,296 @@
+package verify
+
+import (
+ "crypto/ecdsa"
+ "crypto/ed25519"
+ "crypto/elliptic"
+ "crypto/sha256"
+ "encoding/base64"
+ "encoding/binary"
+ "encoding/hex"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "math/big"
+ "strings"
+
+ "github.com/Fuwn/plutia/internal/config"
+ "github.com/Fuwn/plutia/internal/types"
+ "github.com/decred/dcrd/dcrec/secp256k1/v4"
+ secpECDSA "github.com/decred/dcrd/dcrec/secp256k1/v4/ecdsa"
+ "github.com/fxamacker/cbor/v2"
+ "github.com/mr-tron/base58"
+)
+
+type Verifier struct {
+ Policy string
+}
+
+func New(policy string) *Verifier {
+ return &Verifier{Policy: policy}
+}
+
+func (v *Verifier) ShouldVerify(existing *types.StateV1, seq uint64) bool {
+ switch v.Policy {
+ case config.VerifyLazy:
+ return false
+ case config.VerifyStateOnly:
+ if existing == nil {
+ return true
+ }
+ return seq >= existing.LatestOpSeq
+ default:
+ return true
+ }
+}
+
+func (v *Verifier) VerifyOperation(op types.ParsedOperation, existing *types.StateV1) error {
+ if !v.ShouldVerify(existing, op.Sequence) {
+ return nil
+ }
+
+ if existing != nil && existing.ChainTipHash != "" {
+ if op.Prev == "" {
+ return errors.New("missing prev on non-genesis operation")
+ }
+ if op.Prev != existing.ChainTipHash {
+ return fmt.Errorf("prev linkage mismatch: got %s want %s", op.Prev, existing.ChainTipHash)
+ }
+ }
+
+ sig, ok := findString(op.Payload, "sig", "signature")
+ if !ok || strings.TrimSpace(sig) == "" {
+ return errors.New("missing signature")
+ }
+ pubKeys := make([]string, 0, 8)
+ if existing != nil && len(existing.RotationKeys) > 0 {
+ pubKeys = append(pubKeys, existing.RotationKeys...)
+ }
+ pubKeys = append(pubKeys, extractPublicKeys(op.Payload)...)
+ if len(pubKeys) == 0 {
+ return errors.New("missing verification public key")
+ }
+ sigBytes, err := decodeFlexible(sig)
+ if err != nil {
+ return fmt.Errorf("decode signature: %w", err)
+ }
+ payload, err := signaturePayload(op.Payload)
+ if err != nil {
+ return err
+ }
+ for _, key := range pubKeys {
+ vk, err := decodePublicKey(key)
+ if err != nil {
+ continue
+ }
+ switch vk.Algo {
+ case "ed25519":
+ if ed25519.Verify(vk.Ed25519, payload, sigBytes) {
+ return nil
+ }
+ case "secp256k1":
+ if verifySecp256k1(vk.Secp256k1, payload, sigBytes) {
+ return nil
+ }
+ case "p256":
+ if verifyP256(vk.P256, payload, sigBytes) {
+ return nil
+ }
+ }
+ }
+ return errors.New("signature verification failed")
+}
+
+func signaturePayload(m map[string]any) ([]byte, error) {
+ if raw, ok := findString(m, "sigPayload", "signaturePayload"); ok && raw != "" {
+ decoded, err := decodeFlexible(raw)
+ if err == nil && json.Valid(decoded) {
+ return types.CanonicalizeJSON(decoded)
+ }
+ }
+
+ clone := make(map[string]any, len(m))
+ for k, v := range m {
+ switch k {
+ case "sig", "signature", "sigPayload", "signaturePayload":
+ continue
+ default:
+ clone[k] = v
+ }
+ }
+ encMode, err := cbor.CanonicalEncOptions().EncMode()
+ if err != nil {
+ return nil, fmt.Errorf("init canonical cbor encoder: %w", err)
+ }
+ b, err := encMode.Marshal(clone)
+ if err != nil {
+ return nil, fmt.Errorf("marshal signature payload cbor: %w", err)
+ }
+ return b, nil
+}
+
+func findString(m map[string]any, keys ...string) (string, bool) {
+ for _, k := range keys {
+ if v, ok := m[k].(string); ok {
+ return v, true
+ }
+ }
+ return "", false
+}
+
+func extractPublicKeys(payload map[string]any) []string {
+ out := make([]string, 0, 6)
+ seen := map[string]struct{}{}
+ add := func(v string) {
+ v = strings.TrimSpace(v)
+ if v == "" {
+ return
+ }
+ if _, ok := seen[v]; ok {
+ return
+ }
+ seen[v] = struct{}{}
+ out = append(out, v)
+ }
+ if arr, ok := payload["rotationKeys"].([]any); ok {
+ for _, v := range arr {
+ if s, ok := v.(string); ok {
+ add(s)
+ }
+ }
+ }
+ if v, ok := findString(payload, "publicKey", "verificationMethod", "signingKey", "recoveryKey"); ok {
+ add(v)
+ }
+ if vm, ok := payload["verificationMethods"].(map[string]any); ok {
+ if v, ok := vm["atproto"].(string); ok {
+ add(v)
+ }
+ for _, anyV := range vm {
+ if s, ok := anyV.(string); ok {
+ add(s)
+ }
+ }
+ }
+ return out
+}
+
+type verificationKey struct {
+ Algo string
+ Ed25519 ed25519.PublicKey
+ Secp256k1 *secp256k1.PublicKey
+ P256 *ecdsa.PublicKey
+}
+
+func decodePublicKey(value string) (verificationKey, error) {
+ if strings.HasPrefix(value, "did:key:") {
+ mb := strings.TrimPrefix(value, "did:key:")
+ if mb == "" || mb[0] != 'z' {
+ return verificationKey{}, errors.New("did:key must be multibase base58btc")
+ }
+ decoded, err := base58.Decode(mb[1:])
+ if err != nil {
+ return verificationKey{}, fmt.Errorf("decode did:key base58: %w", err)
+ }
+ if len(decoded) < 3 {
+ return verificationKey{}, errors.New("invalid did:key length")
+ }
+ code, n := binary.Uvarint(decoded)
+ if n <= 0 || n >= len(decoded) {
+ return verificationKey{}, errors.New("invalid did:key multicodec prefix")
+ }
+ keyBytes := decoded[n:]
+ switch code {
+ case 0xED:
+ if len(keyBytes) != ed25519.PublicKeySize {
+ return verificationKey{}, errors.New("invalid did:key ed25519 length")
+ }
+ return verificationKey{
+ Algo: "ed25519",
+ Ed25519: ed25519.PublicKey(keyBytes),
+ }, nil
+ case 0xE7:
+ pub, err := secp256k1.ParsePubKey(keyBytes)
+ if err != nil {
+ return verificationKey{}, fmt.Errorf("parse secp256k1 did:key: %w", err)
+ }
+ return verificationKey{Algo: "secp256k1", Secp256k1: pub}, nil
+ case 0x1200:
+ x, y := elliptic.UnmarshalCompressed(elliptic.P256(), keyBytes)
+ if x == nil || y == nil {
+ return verificationKey{}, errors.New("parse p256 did:key: invalid compressed key")
+ }
+ return verificationKey{
+ Algo: "p256",
+ P256: &ecdsa.PublicKey{Curve: elliptic.P256(), X: x, Y: y},
+ }, nil
+ default:
+ return verificationKey{}, errors.New("unsupported did:key multicodec")
+ }
+ }
+ b, err := decodeFlexible(value)
+ if err != nil {
+ return verificationKey{}, fmt.Errorf("decode public key: %w", err)
+ }
+ if len(b) == ed25519.PublicKeySize {
+ return verificationKey{Algo: "ed25519", Ed25519: ed25519.PublicKey(b)}, nil
+ }
+ pub, err := secp256k1.ParsePubKey(b)
+ if err == nil {
+ return verificationKey{Algo: "secp256k1", Secp256k1: pub}, nil
+ }
+ if x, y := elliptic.UnmarshalCompressed(elliptic.P256(), b); x != nil && y != nil {
+ return verificationKey{
+ Algo: "p256",
+ P256: &ecdsa.PublicKey{Curve: elliptic.P256(), X: x, Y: y},
+ }, nil
+ }
+ return verificationKey{}, fmt.Errorf("invalid public key length/type: %d", len(b))
+}
+
+func verifySecp256k1(pub *secp256k1.PublicKey, payload, sig []byte) bool {
+ if pub == nil {
+ return false
+ }
+ var parsed *secpECDSA.Signature
+ if len(sig) == 64 {
+ var r, s secp256k1.ModNScalar
+ r.SetByteSlice(sig[:32])
+ s.SetByteSlice(sig[32:])
+ parsed = secpECDSA.NewSignature(&r, &s)
+ } else {
+ der, err := secpECDSA.ParseDERSignature(sig)
+ if err != nil {
+ return false
+ }
+ parsed = der
+ }
+ sum := sha256.Sum256(payload)
+ return parsed.Verify(sum[:], pub)
+}
+
+func verifyP256(pub *ecdsa.PublicKey, payload, sig []byte) bool {
+ if pub == nil {
+ return false
+ }
+ sum := sha256.Sum256(payload)
+ if len(sig) == 64 {
+ r := new(big.Int).SetBytes(sig[:32])
+ s := new(big.Int).SetBytes(sig[32:])
+ return ecdsa.Verify(pub, sum[:], r, s)
+ }
+ return ecdsa.VerifyASN1(pub, sum[:], sig)
+}
+
+func decodeFlexible(v string) ([]byte, error) {
+ if b, err := base64.RawURLEncoding.DecodeString(v); err == nil {
+ return b, nil
+ }
+ if b, err := base64.StdEncoding.DecodeString(v); err == nil {
+ return b, nil
+ }
+ if b, err := hex.DecodeString(v); err == nil {
+ return b, nil
+ }
+ return nil, errors.New("unsupported encoding")
+}
diff --git a/internal/verify/verifier_test.go b/internal/verify/verifier_test.go
new file mode 100644
index 0000000..0b38411
--- /dev/null
+++ b/internal/verify/verifier_test.go
@@ -0,0 +1,183 @@
+package verify
+
+import (
+ "crypto/ecdsa"
+ "crypto/ed25519"
+ "crypto/elliptic"
+ "crypto/rand"
+ "crypto/sha256"
+ "encoding/base64"
+ "encoding/json"
+ "testing"
+
+ "github.com/Fuwn/plutia/internal/config"
+ "github.com/Fuwn/plutia/internal/types"
+ "github.com/decred/dcrd/dcrec/secp256k1/v4"
+ secpECDSA "github.com/decred/dcrd/dcrec/secp256k1/v4/ecdsa"
+ "github.com/fxamacker/cbor/v2"
+ "github.com/mr-tron/base58"
+)
+
+func TestVerifyOperationValidSignature(t *testing.T) {
+ pub, priv, err := ed25519.GenerateKey(rand.Reader)
+ if err != nil {
+ t.Fatalf("generate key: %v", err)
+ }
+ payloadDoc := []byte(`{"did":"did:plc:alice","didDoc":{"id":"did:plc:alice"}}`)
+ sig := ed25519.Sign(priv, payloadDoc)
+ opJSON := map[string]any{
+ "did": "did:plc:alice",
+ "didDoc": map[string]any{"id": "did:plc:alice"},
+ "publicKey": base64.RawURLEncoding.EncodeToString(pub),
+ "sigPayload": base64.RawURLEncoding.EncodeToString(payloadDoc),
+ "sig": base64.RawURLEncoding.EncodeToString(sig),
+ }
+ raw, _ := json.Marshal(opJSON)
+ op, err := types.ParseOperation(types.ExportRecord{
+ Seq: 1,
+ DID: "did:plc:alice",
+ Operation: raw,
+ })
+ if err != nil {
+ t.Fatalf("parse operation: %v", err)
+ }
+ v := New(config.VerifyFull)
+ if err := v.VerifyOperation(op, nil); err != nil {
+ t.Fatalf("verify operation: %v", err)
+ }
+}
+
+func TestVerifyOperationPrevMismatch(t *testing.T) {
+ pub, priv, err := ed25519.GenerateKey(rand.Reader)
+ if err != nil {
+ t.Fatalf("generate key: %v", err)
+ }
+ payloadDoc := []byte(`{"did":"did:plc:alice","didDoc":{"id":"did:plc:alice"},"prev":"sha256:wrong"}`)
+ sig := ed25519.Sign(priv, payloadDoc)
+ opJSON := map[string]any{
+ "did": "did:plc:alice",
+ "didDoc": map[string]any{"id": "did:plc:alice"},
+ "prev": "sha256:wrong",
+ "publicKey": base64.RawURLEncoding.EncodeToString(pub),
+ "sigPayload": base64.RawURLEncoding.EncodeToString(payloadDoc),
+ "sig": base64.RawURLEncoding.EncodeToString(sig),
+ }
+ raw, _ := json.Marshal(opJSON)
+ op, err := types.ParseOperation(types.ExportRecord{Seq: 2, DID: "did:plc:alice", Operation: raw})
+ if err != nil {
+ t.Fatalf("parse operation: %v", err)
+ }
+ v := New(config.VerifyFull)
+ existing := &types.StateV1{DID: "did:plc:alice", ChainTipHash: "sha256:right", LatestOpSeq: 1}
+ if err := v.VerifyOperation(op, existing); err == nil {
+ t.Fatalf("expected prev mismatch error")
+ }
+}
+
+func TestVerifyOperationSecp256k1(t *testing.T) {
+ priv, err := secp256k1.GeneratePrivateKey()
+ if err != nil {
+ t.Fatalf("generate secp256k1 key: %v", err)
+ }
+ pubKey := priv.PubKey()
+ didKeyBytes := append([]byte{0xE7, 0x01}, pubKey.SerializeCompressed()...)
+ didKey := "did:key:z" + base58.Encode(didKeyBytes)
+
+ unsigned := map[string]any{
+ "type": "create",
+ "prev": nil,
+ "handle": "alice.example.com",
+ "service": "https://example.com",
+ "signingKey": didKey,
+ }
+ enc, err := cbor.CanonicalEncOptions().EncMode()
+ if err != nil {
+ t.Fatalf("init cbor encoder: %v", err)
+ }
+ payload, err := enc.Marshal(unsigned)
+ if err != nil {
+ t.Fatalf("marshal cbor: %v", err)
+ }
+ sum := sha256.Sum256(payload)
+ sig := secpECDSA.Sign(priv, sum[:]).Serialize()
+
+ opJSON := map[string]any{
+ "type": "create",
+ "prev": nil,
+ "handle": "alice.example.com",
+ "service": "https://example.com",
+ "signingKey": didKey,
+ "sig": base64.RawURLEncoding.EncodeToString(sig),
+ }
+ raw, _ := json.Marshal(opJSON)
+ op, err := types.ParseOperation(types.ExportRecord{
+ Seq: 1,
+ DID: "did:plc:alice",
+ Operation: raw,
+ })
+ if err != nil {
+ t.Fatalf("parse operation: %v", err)
+ }
+ v := New(config.VerifyFull)
+ if err := v.VerifyOperation(op, nil); err != nil {
+ t.Fatalf("verify operation: %v", err)
+ }
+}
+
+func TestVerifyOperationP256(t *testing.T) {
+ priv, err := ecdsa.GenerateKey(elliptic.P256(), rand.Reader)
+ if err != nil {
+ t.Fatalf("generate p256 key: %v", err)
+ }
+ compressed := elliptic.MarshalCompressed(priv.Curve, priv.PublicKey.X, priv.PublicKey.Y)
+ didKeyBytes := append([]byte{0x80, 0x24}, compressed...)
+ didKey := "did:key:z" + base58.Encode(didKeyBytes)
+
+ unsigned := map[string]any{
+ "type": "create",
+ "prev": nil,
+ "handle": "alice.example.com",
+ "service": "https://example.com",
+ "signingKey": didKey,
+ }
+ enc, err := cbor.CanonicalEncOptions().EncMode()
+ if err != nil {
+ t.Fatalf("init cbor encoder: %v", err)
+ }
+ payload, err := enc.Marshal(unsigned)
+ if err != nil {
+ t.Fatalf("marshal cbor: %v", err)
+ }
+ sum := sha256.Sum256(payload)
+ r, s, err := ecdsa.Sign(rand.Reader, priv, sum[:])
+ if err != nil {
+ t.Fatalf("sign p256: %v", err)
+ }
+ sig := make([]byte, 64)
+ rb := r.Bytes()
+ sb := s.Bytes()
+ copy(sig[32-len(rb):32], rb)
+ copy(sig[64-len(sb):], sb)
+
+ opJSON := map[string]any{
+ "type": "create",
+ "prev": nil,
+ "handle": "alice.example.com",
+ "service": "https://example.com",
+ "signingKey": didKey,
+ "sig": base64.RawURLEncoding.EncodeToString(sig),
+ }
+ raw, _ := json.Marshal(opJSON)
+ op, err := types.ParseOperation(types.ExportRecord{
+ Seq: 1,
+ DID: "did:plc:alice",
+ Operation: raw,
+ })
+ if err != nil {
+ t.Fatalf("parse operation: %v", err)
+ }
+ v := New(config.VerifyFull)
+ if err := v.VerifyOperation(op, nil); err != nil {
+ t.Fatalf("verify operation: %v", err)
+ }
+}
diff --git a/pkg/proof/proof.go b/pkg/proof/proof.go
new file mode 100644
index 0000000..14ad1ac
--- /dev/null
+++ b/pkg/proof/proof.go
@@ -0,0 +1,15 @@
+package proof
+
+import "github.com/Fuwn/plutia/internal/merkle"
+
+type DIDInclusionProof struct {
+ DID string `json:"did"`
+ ChainTipHash string `json:"chain_tip_hash"`
+ LeafHash string `json:"leaf_hash"`
+ MerkleRoot string `json:"merkle_root"`
+ Siblings []merkle.Sibling `json:"siblings"`
+ CheckpointSeq uint64 `json:"checkpoint_sequence"`
+ CheckpointHash string `json:"checkpoint_hash"`
+ CheckpointSig string `json:"checkpoint_signature"`
+ CheckpointKeyID string `json:"checkpoint_key_id"`
+}