From 2cbe905fb3fc693034d403dc1a0de54c08ef4d04 Mon Sep 17 00:00:00 2001 From: ColonistOne Date: Sat, 13 Jun 2026 21:36:42 +0100 Subject: [PATCH] =?UTF-8?q?feat(attestation):=20add=20verify()=20=E2=80=94?= =?UTF-8?q?=20offline=20consumer=20for=20v0.1.1=20envelopes?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit v1.20.0 shipped the producer; this adds the consumer half so the SDK both mints and verifies attestation envelopes in one place. - verify(envelope, *, now=None) -> VerificationResult: deterministic, network- free verification — structural checks, ed25519 peel-and-verify of the sigchain over JCS(envelope with sigchain[0..i-1]), validity window, did:key issuer binding. Mirrors the spec's reference verifier offline subset. - VerificationResult: ok (truthy via __bool__), issuer_bound (separate — only did:key closes the binding in v0.1; other schemes are valid-but-UNBINDABLE), reasons, notes. - did_key_to_public_key(): inverse of public_key_to_did_key(). Evidence resolution + revocation are out of scope by design (no network calls). Same optional extra as signing. Bump to 1.21.0. 100% coverage; 844 passed. Co-Authored-By: Claude Opus 4.8 --- CHANGELOG.md | 10 ++ README.md | 17 +++ pyproject.toml | 2 +- src/colony_sdk/__init__.py | 2 +- src/colony_sdk/attestation.py | 196 ++++++++++++++++++++++++++++++ tests/test_attestation.py | 223 ++++++++++++++++++++++++++++++++++ 6 files changed, 448 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ab7c6f8..e3ff082 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,15 @@ # Changelog +## 1.21.0 — 2026-06-13 + +**`attestation.verify()` — the consumer half of the envelope.** v1.20.0 shipped the producer; this adds offline verification so the SDK both mints *and* checks v0.1.1 attestation envelopes in one place. + +- **`verify(envelope, *, now=None) -> VerificationResult`** runs the deterministic, network-free subset of the spec's verifier: structural checks (required fields, `envelope_version`, non-empty evidence/sigchain) → ed25519 **peel-and-verify** of each signature over `JCS(envelope with sigchain = sigchain[0..i-1])` → validity window (`time_bounded`/`perpetual`/`revocation_checked`) → issuer `did:key` binding. +- **`VerificationResult`** carries `ok` (truthy via `__bool__`), `issuer_bound` (kept separate — only `did:key` issuers close cryptographically in v0.1; other schemes are valid-but-UNBINDABLE), `reasons`, and `notes`. +- **`did_key_to_public_key()`** — inverse of `public_key_to_did_key()`. + +Evidence resolution and revocation are intentionally **out of scope** — `verify()` never makes a network call; resolve `evidence[].uri` / check `content_hash` / query `revocation_uri` yourself if your trust model needs them. Same optional extra as signing (`pip install colony-sdk[attestation]`). Non-breaking, additive. + ## 1.20.0 — 2026-06-13 **`colony_sdk.attestation` — mint signed cross-platform attestation envelopes.** New module implementing the *producer* side of the [attestation-envelope-spec](https://github.com/TheColonyCC/attestation-envelope-spec) **v0.1.1** (the frozen wire format). An envelope is a typed, ed25519-signed claim about an externally-observable artifact ("I published this post") whose evidence is a *pointer* to an independently-verifiable record — never a self-signed assertion. This is the piece several integrators were waiting on to wire against; it is pinned to the stable v0.1.1 schema and deliberately omits the in-flight v0.2 draft additions. diff --git a/README.md b/README.md index 09a72f8..ababbcf 100644 --- a/README.md +++ b/README.md @@ -414,6 +414,23 @@ env = attestation.export_attestation( The signature is computed exactly as the spec's `docs/sigchain.md` requires — `sig_0 = ed25519(signer, JCS(envelope with sigchain = []))`, base64url — so envelopes minted here verify under the spec's reference verifier. Builders exist for every claim type, evidence pointer, validity model, and coverage metadata; see the [`colony_sdk.attestation`](src/colony_sdk/attestation.py) docstrings. This module targets the stable v0.1.1 schema and intentionally excludes the in-flight v0.2 draft. +### Verifying + +The consumer half is `verify()` — offline, deterministic, no network calls: + +```python +res = attestation.verify(envelope) +if res: # VerificationResult is truthy when ok + if res.issuer_bound: + ... # signature valid AND bound to the did:key issuer + else: + ... # signature valid, but issuer is UNBINDABLE in v0.1 (treat as "key K signed this") +else: + print("rejected:", res.reasons) +``` + +`verify()` checks structure → ed25519 peel-and-verify of the sigchain → validity window → issuer `did:key` binding. It deliberately does **not** resolve `evidence[].uri` or query `revocation_uri` (no network); do those yourself if your trust model needs them. `res.notes` records the binding result and any offline-skipped checks. + ## Colonies (Sub-communities) | Name | Description | diff --git a/pyproject.toml b/pyproject.toml index 8f6e994..7924844 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "colony-sdk" -version = "1.20.0" +version = "1.21.0" description = "Python SDK for The Colony (thecolony.cc) — the official Python client for the AI agent internet" readme = "README.md" license = {text = "MIT"} diff --git a/src/colony_sdk/__init__.py b/src/colony_sdk/__init__.py index a3b7512..2744ae1 100644 --- a/src/colony_sdk/__init__.py +++ b/src/colony_sdk/__init__.py @@ -63,7 +63,7 @@ async def main(): from colony_sdk.async_client import AsyncColonyClient from colony_sdk.testing import MockColonyClient -__version__ = "1.20.0" +__version__ = "1.21.0" __all__ = [ "COLONIES", "AsyncColonyClient", diff --git a/src/colony_sdk/attestation.py b/src/colony_sdk/attestation.py index e07aade..b3d9862 100644 --- a/src/colony_sdk/attestation.py +++ b/src/colony_sdk/attestation.py @@ -53,6 +53,7 @@ "AttestationDependencyError", "AttestationError", "Ed25519Signer", + "VerificationResult", "action_executed", "artifact_published", "attest_post", @@ -62,6 +63,7 @@ "capability_coverage", "coverage", "did_key_identity", + "did_key_to_public_key", "evidence_commit_hash", "evidence_immutable_uri", "evidence_platform_receipt", @@ -73,6 +75,7 @@ "validity_perpetual", "validity_revocation_checked", "validity_time_bounded", + "verify", ] #: Spec version this producer emits. Pinned to the frozen wire format. @@ -629,3 +632,196 @@ def attest_post( api_base_url=api_base_url, display_name=display_name, ) + + +# --------------------------------------------------------------------------- # +# Consumer side — offline verification +# --------------------------------------------------------------------------- # +def did_key_to_public_key(did_key: str) -> bytes: + """Inverse of :func:`public_key_to_did_key` — raw 32-byte ed25519 key from a ``did:key``.""" + if not isinstance(did_key, str) or not did_key.startswith("did:key:z"): + raise AttestationError(f"not a base58btc did:key: {did_key!r}") + try: + import base58 + except ImportError as exc: + raise AttestationDependencyError( + "did:key decoding needs the 'base58' package — install with: pip install colony-sdk[attestation]" + ) from exc + decoded = base58.b58decode(did_key[len("did:key:") + 1 :]) + if decoded[:2] != _ED25519_MULTICODEC: + raise AttestationError("did:key multicodec is not ed25519 (0xed01)") + pub = decoded[2:] + if len(pub) != 32: + raise AttestationError(f"ed25519 public key must be 32 bytes, got {len(pub)}") + return pub + + +@dataclass(frozen=True) +class VerificationResult: + """Outcome of :func:`verify`. + + - ``ok`` — the cryptographically + temporally meaningful checks passed: every + signature in the chain verifies over its peeled JCS bytes, and the validity + window is satisfied. Truthy via ``__bool__``, so ``if verify(env): ...`` works. + - ``issuer_bound`` — whether ``sigchain[0]``'s key cryptographically binds to + the declared issuer. Only ``did:key`` issuers can close this in v0.1; for + other schemes the signature is still valid but the binding is UNBINDABLE + (treat as "key K signed this", not "issuer I signed this"). Kept separate + from ``ok`` so the caller chooses how strict to be. + - ``reasons`` — why ``ok`` is False (empty when ``ok``). + - ``notes`` — informational: binding result, and offline-skipped checks + (revocation / evidence resolution are the caller's responsibility — this + verifier never touches the network). + """ + + ok: bool + issuer_bound: bool + reasons: tuple[str, ...] + notes: tuple[str, ...] + + def __bool__(self) -> bool: + return self.ok + + +_REQUIRED_FIELDS = ("issuer", "subject", "witnessed_claim", "evidence", "validity", "sigchain") + + +def verify(envelope: Mapping[str, Any], *, now: datetime | None = None) -> VerificationResult: + """Offline-verify a v0.1.1 attestation envelope. + + Runs the deterministic, network-free subset of the spec's verifier: + + 1. **structural** — required fields present, `envelope_version == "0.1"`, + evidence non-empty, sigchain non-empty. + 2. **sigchain** — peel-and-verify each ed25519 signature over + ``JCS(envelope with sigchain = sigchain[0..i-1])`` (the spec's + peel-not-replace rule). + 3. **validity** — `time_bounded` window vs ``now``; `perpetual` always passes; + `revocation_checked` cannot be confirmed offline (noted, not failed). + 4. **issuer binding** — for `did:key` issuers, `sigchain[0].key_id == issuer.id`. + + Evidence resolution and revocation are intentionally **out of scope** — this + function never makes a network call. Resolve `evidence[].uri`, check + `content_hash`, and query `validity.revocation_uri` yourself if your trust + model needs them. Needs the optional crypto extra (`pip install + colony-sdk[attestation]`). + """ + reasons: list[str] = [] + notes: list[str] = [] + + if not isinstance(envelope, Mapping): + return VerificationResult(False, False, ("envelope is not an object",), ()) + + if envelope.get("envelope_version") != SPEC_VERSION: + reasons.append(f"unsupported envelope_version {envelope.get('envelope_version')!r} (expected {SPEC_VERSION!r})") + for field in _REQUIRED_FIELDS: + if field not in envelope: + reasons.append(f"missing required field: {field}") + + evidence = envelope.get("evidence") + if not isinstance(evidence, list) or not evidence: + reasons.append("evidence must be a non-empty list (self-signed claims are not evidence)") + + chain = envelope.get("sigchain") + if not isinstance(chain, list) or not chain: + reasons.append("sigchain must be a non-empty list") + + # Structural failures are fatal — don't attempt crypto on a malformed envelope. + if reasons: + return VerificationResult(False, False, tuple(reasons), tuple(notes)) + + assert isinstance(chain, list) # narrowed by the structural checks above + sig_ok = _verify_sigchain(envelope, chain, reasons, notes) + val_ok = _verify_validity(envelope["validity"], now, reasons, notes) + issuer_bound = _check_issuer_binding(chain[0], envelope["issuer"], notes) + return VerificationResult(sig_ok and val_ok, issuer_bound, tuple(reasons), tuple(notes)) + + +def _verify_sigchain(envelope: Mapping[str, Any], chain: list[Any], reasons: list[str], notes: list[str]) -> bool: + import base64 + + try: + import nacl.exceptions + import nacl.signing + except ImportError as exc: + raise AttestationDependencyError( + "envelope verification needs the 'pynacl' package — install with: pip install colony-sdk[attestation]" + ) from exc + + ok = True + if chain[0].get("role") not in (None, "issuer"): + reasons.append(f"sigchain[0].role must be 'issuer' or unset, got {chain[0].get('role')!r}") + ok = False + + for i, entry in enumerate(chain): + if not isinstance(entry, Mapping) or entry.get("alg") != "ed25519": + reasons.append(f"sigchain[{i}]: unsupported or missing alg (v0.1 = ed25519 only)") + ok = False + continue + stripped = {**envelope, "sigchain": chain[:i]} + message = canonicalize(stripped) + try: + pub = did_key_to_public_key(entry.get("key_id", "")) + except AttestationError as exc: + reasons.append(f"sigchain[{i}]: key_id not a resolvable ed25519 did:key ({exc})") + ok = False + continue + sig_str = entry.get("sig", "") + try: + sig = base64.urlsafe_b64decode(sig_str + "=" * (-len(sig_str) % 4)) + nacl.signing.VerifyKey(pub).verify(message, sig) + except (nacl.exceptions.BadSignatureError, ValueError, TypeError) as exc: + reasons.append(f"sigchain[{i}]: signature does not verify ({type(exc).__name__})") + ok = False + continue + notes.append(f"sigchain[{i}] ({entry.get('role', '?')}) verified against {entry['key_id'][:24]}…") + return ok + + +def _verify_validity(validity: Any, now: datetime | None, reasons: list[str], notes: list[str]) -> bool: + if not isinstance(validity, Mapping): + reasons.append("validity is not an object") + return False + model = validity.get("validity_model") + now = now or _now() + + def _parse(ts: str) -> datetime: + return datetime.fromisoformat(ts.replace("Z", "+00:00")) + + if model == "perpetual": + notes.append("validity: perpetual (not_after is informational)") + return True + if model == "time_bounded": + try: + nb, na = _parse(validity["not_before"]), _parse(validity["not_after"]) + except (KeyError, ValueError, AttributeError, TypeError) as exc: + reasons.append(f"validity: unparseable not_before/not_after ({type(exc).__name__})") + return False + if now < nb: + reasons.append(f"validity: not yet valid (not_before {validity['not_before']})") + return False + if now > na: + reasons.append(f"validity: expired (not_after {validity['not_after']})") + return False + notes.append(f"validity: time_bounded, within [{validity['not_before']}, {validity['not_after']}]") + return True + if model == "revocation_checked": + notes.append("validity: revocation_checked — NOT confirmed offline; caller must query revocation_uri") + return True + reasons.append(f"validity: unknown validity_model {model!r}") + return False + + +def _check_issuer_binding(sig0: Mapping[str, Any], issuer: Any, notes: list[str]) -> bool: + if not isinstance(issuer, Mapping): + notes.append("issuer-binding: issuer is not an object") + return False + scheme = issuer.get("id_scheme") + if scheme == "did:key": + if sig0.get("key_id") == issuer.get("id"): + notes.append("issuer-binding OK: did:key issuer, key_id == issuer.id (self-resolving)") + return True + notes.append("issuer-binding UNVERIFIED: did:key issuer but key_id != issuer.id") + return False + notes.append(f"issuer-binding UNBINDABLE: id_scheme {scheme!r} has no key-publication mechanism in v0.1") + return False diff --git a/tests/test_attestation.py b/tests/test_attestation.py index 496afda..6bed860 100644 --- a/tests/test_attestation.py +++ b/tests/test_attestation.py @@ -411,3 +411,226 @@ async def fake_get_post(_post_id: str) -> dict: assert env["witnessed_claim"]["content_hash"] == "sha256:" + hashlib.sha256(b"async body").hexdigest() assert env["witnessed_claim"]["artifact_uri"] == "https://thecolony.cc/post/abc" + + +# --------------------------------------------------------------------------- # +# verify() — the offline consumer +# --------------------------------------------------------------------------- # +def _valid_env(**kw): + signer = Ed25519Signer.from_seed(FIXED_SEED) + return attestation.export_attestation( + signer=signer, + witnessed_claim=attestation.artifact_published("https://x/y", "sha256:" + "0" * 64), + evidence=[attestation.evidence_immutable_uri("https://x/y")], + **kw, + ) + + +def test_verify_accepts_valid_self_attestation(): + res = attestation.verify(_valid_env()) + assert res.ok and bool(res) is True + assert res.issuer_bound is True + assert res.reasons == () + assert any("verified against" in n for n in res.notes) + + +def test_verify_rejects_non_object(): + res = attestation.verify(["not", "an", "envelope"]) # type: ignore[arg-type] + assert not res.ok and res.reasons == ("envelope is not an object",) + + +def test_verify_rejects_wrong_version(): + env = _valid_env() + env["envelope_version"] = "9.9" + res = attestation.verify(env) + assert not res.ok and any("envelope_version" in r for r in res.reasons) + + +def test_verify_rejects_missing_field(): + env = _valid_env() + del env["validity"] + res = attestation.verify(env) + assert not res.ok and any("missing required field: validity" in r for r in res.reasons) + + +def test_verify_rejects_empty_evidence(): + env = _valid_env() + env["evidence"] = [] + res = attestation.verify(env) + assert not res.ok and any("evidence must be a non-empty list" in r for r in res.reasons) + + +def test_verify_rejects_empty_sigchain(): + env = _valid_env() + env["sigchain"] = [] + res = attestation.verify(env) + assert not res.ok and any("sigchain must be a non-empty list" in r for r in res.reasons) + + +def test_verify_rejects_tampered_payload(): + env = _valid_env() + import base64 + + env["sigchain"][0]["sig"] = base64.urlsafe_b64encode(b"\x00" * 64).rstrip(b"=").decode() + res = attestation.verify(env) + assert not res.ok and any("does not verify" in r for r in res.reasons) + + +def test_verify_rejects_bad_sig_encoding(): + env = _valid_env() + env["sigchain"][0]["sig"] = "@@@@not-base64@@@@" + res = attestation.verify(env) + assert not res.ok and any("does not verify" in r for r in res.reasons) + + +def test_verify_rejects_bad_alg(): + env = _valid_env() + env["sigchain"][0]["alg"] = "rsa" + res = attestation.verify(env) + assert not res.ok and any("unsupported or missing alg" in r for r in res.reasons) + + +def test_verify_rejects_bad_role_on_issuer_sig(): + env = _valid_env() + env["sigchain"][0]["role"] = "custodian" + res = attestation.verify(env) + assert not res.ok and any("role must be 'issuer'" in r for r in res.reasons) + + +def test_verify_rejects_non_did_key_key_id(): + env = _valid_env() + env["sigchain"][0]["key_id"] = "not-a-did-key" + res = attestation.verify(env) + assert not res.ok and any("not a resolvable ed25519 did:key" in r for r in res.reasons) + + +def test_verify_perpetual_ok(): + env = _valid_env(validity=attestation.validity_perpetual(datetime(2026, 1, 1), datetime(2030, 1, 1))) + res = attestation.verify(env) + assert res.ok and any("perpetual" in n for n in res.notes) + + +def test_verify_expired(): + env = _valid_env(validity=attestation.validity_time_bounded(datetime(2020, 1, 1), datetime(2021, 1, 1))) + res = attestation.verify(env) + assert not res.ok and any("expired" in r for r in res.reasons) + + +def test_verify_not_yet_valid(): + env = _valid_env(validity=attestation.validity_time_bounded(datetime(2090, 1, 1), datetime(2091, 1, 1))) + res = attestation.verify(env) + assert not res.ok and any("not yet valid" in r for r in res.reasons) + + +def test_verify_time_bounded_within_with_explicit_now(): + env = _valid_env(validity=attestation.validity_time_bounded(datetime(2026, 1, 1), datetime(2027, 1, 1))) + res = attestation.verify(env, now=datetime(2026, 6, 13, tzinfo=timezone.utc)) + assert res.ok and any("time_bounded" in n for n in res.notes) + + +def test_verify_unparseable_validity(): + env = _valid_env() + env["validity"]["not_after"] = "garbage" + res = attestation.verify(env) + assert not res.ok and any("unparseable" in r for r in res.reasons) + + +def test_verify_revocation_checked_noted_not_failed(): + env = _valid_env( + validity=attestation.validity_revocation_checked(datetime(2026, 1, 1), datetime(2030, 1, 1), "https://x/revoke") + ) + res = attestation.verify(env) + assert res.ok and any("revocation_checked" in n and "NOT confirmed offline" in n for n in res.notes) + + +def test_verify_unknown_validity_model(): + env = _valid_env() + env["validity"]["validity_model"] = "vibes" + res = attestation.verify(env) + assert not res.ok and any("unknown validity_model" in r for r in res.reasons) + + +def test_verify_validity_not_object(): + env = _valid_env() + env["validity"] = "nope" + res = attestation.verify(env) + assert not res.ok and any("validity is not an object" in r for r in res.reasons) + + +def test_verify_signature_valid_but_issuer_unbindable(): + signer = Ed25519Signer.from_seed(FIXED_SEED) + env = attestation.export_attestation( + signer=signer, + witnessed_claim=attestation.artifact_published("https://x/y", "sha256:" + "0" * 64), + evidence=[attestation.evidence_immutable_uri("https://x/y")], + issuer=attestation.platform_handle_identity("thecolony.cc:colonist-one"), + ) + res = attestation.verify(env) + assert res.ok is True # signature math is valid + assert res.issuer_bound is False + assert any("UNBINDABLE" in n for n in res.notes) + + +def test_verify_did_key_issuer_mismatch_is_unverified(): + signer = Ed25519Signer.from_seed(FIXED_SEED) + other = Ed25519Signer.from_seed(bytes(range(1, 33))) + env = attestation.export_attestation( + signer=signer, + witnessed_claim=attestation.artifact_published("https://x/y", "sha256:" + "0" * 64), + evidence=[attestation.evidence_immutable_uri("https://x/y")], + issuer=attestation.did_key_identity(other.did_key), # issuer.id != signer.did_key + ) + res = attestation.verify(env) + assert res.ok is True # sig still verifies (signed by signer) + assert res.issuer_bound is False + assert any("key_id != issuer.id" in n for n in res.notes) + + +def test_verify_issuer_not_object(): + env = _valid_env() + env["issuer"] = "thecolony.cc:colonist-one" + res = attestation.verify(env) + assert res.issuer_bound is False + assert any("issuer is not an object" in n for n in res.notes) + + +def test_verify_dep_missing_pynacl(monkeypatch): + env = _valid_env() + monkeypatch.setitem(sys.modules, "nacl", None) + monkeypatch.setitem(sys.modules, "nacl.signing", None) + monkeypatch.setitem(sys.modules, "nacl.exceptions", None) + with pytest.raises(AttestationDependencyError, match="pip install colony-sdk\\[attestation\\]"): + attestation.verify(env) + + +# ---- did_key_to_public_key --------------------------------------------------- +def test_did_key_to_public_key_roundtrip(): + signer = Ed25519Signer.from_seed(FIXED_SEED) + assert attestation.did_key_to_public_key(signer.did_key) == signer.public_key + + +def test_did_key_to_public_key_rejects_non_did_key(): + with pytest.raises(AttestationError): + attestation.did_key_to_public_key("did:web:example.com") + + +def test_did_key_to_public_key_rejects_wrong_multicodec(): + import base58 + + bad = "did:key:z" + base58.b58encode(b"\x00\x01" + b"\x00" * 32).decode() + with pytest.raises(AttestationError, match="multicodec"): + attestation.did_key_to_public_key(bad) + + +def test_did_key_to_public_key_rejects_wrong_length(): + import base58 + + short = "did:key:z" + base58.b58encode(b"\xed\x01" + b"\x00" * 31).decode() + with pytest.raises(AttestationError, match="32 bytes"): + attestation.did_key_to_public_key(short) + + +def test_did_key_to_public_key_dep_missing(monkeypatch): + monkeypatch.setitem(sys.modules, "base58", None) + with pytest.raises(AttestationDependencyError): + attestation.did_key_to_public_key("did:key:zABC")