From 78b12d81b18b5bad32bc84af8a358472e8a5ddd9 Mon Sep 17 00:00:00 2001 From: ch4r10t33r Date: Wed, 27 May 2026 16:31:53 +0100 Subject: [PATCH 1/2] metrics: anchor pq sig aggregate building time at interval 2 start Record lean_pq_sig_aggregated_signatures_building_time_seconds from the beginning of the aggregation slot interval through STARK proof completion, not just the xmss aggregate call. Expose related PQ production counters and extend the /metrics API fixture contract. --- .../test_fixtures/api_endpoint.py | 3 ++ src/lean_spec/forks/lstar/spec.py | 41 +++++++++++++++- src/lean_spec/subspecs/metrics/registry.py | 34 +++++++++++++ .../forkchoice/test_store_attestations.py | 49 +++++++++++++++++++ .../subspecs/metrics/test_registry.py | 38 ++++++++++++++ 5 files changed, 163 insertions(+), 2 deletions(-) create mode 100644 tests/lean_spec/subspecs/metrics/test_registry.py diff --git a/packages/testing/src/consensus_testing/test_fixtures/api_endpoint.py b/packages/testing/src/consensus_testing/test_fixtures/api_endpoint.py index 34a7e36c7..2e3a3614b 100644 --- a/packages/testing/src/consensus_testing/test_fixtures/api_endpoint.py +++ b/packages/testing/src/consensus_testing/test_fixtures/api_endpoint.py @@ -170,6 +170,9 @@ def _metrics_response(_store: Store, _fixture: "ApiEndpointTest") -> dict[str, A "lean_attestation_validation_time_seconds", "lean_fork_choice_reorgs_total", "lean_fork_choice_reorg_depth", + "lean_pq_sig_aggregated_signatures_total", + "lean_pq_sig_attestations_in_aggregated_signatures_total", + "lean_pq_sig_aggregated_signatures_building_time_seconds", "lean_latest_justified_slot", "lean_latest_finalized_slot", "lean_state_transition_time_seconds", diff --git a/src/lean_spec/forks/lstar/spec.py b/src/lean_spec/forks/lstar/spec.py index a65e0a59a..c9b363b31 100644 --- a/src/lean_spec/forks/lstar/spec.py +++ b/src/lean_spec/forks/lstar/spec.py @@ -1,5 +1,6 @@ """Lstar fork — identity and construction facade.""" +import time from collections import defaultdict from collections.abc import Iterable, Sequence, Set as AbstractSet from typing import Any, ClassVar @@ -31,6 +32,7 @@ JUSTIFICATION_LOOKBACK_SLOTS, MAX_ATTESTATIONS_DATA, ) +from lean_spec.subspecs.metrics.registry import registry as metrics from lean_spec.subspecs.observability import ( observe_on_attestation, observe_on_block, @@ -1602,7 +1604,12 @@ def update_safe_target(self, store: LstarStore) -> LstarStore: # The head and attestation pools remain unchanged. return store.model_copy(update={"safe_target": safe_target}) - def aggregate(self, store: LstarStore) -> tuple[LstarStore, list[SignedAggregatedAttestation]]: + def aggregate( + self, + store: LstarStore, + *, + aggregation_interval_start: float | None = None, + ) -> tuple[LstarStore, list[SignedAggregatedAttestation]]: """Turn raw validator votes into compact aggregated attestations. Validators cast individual signatures over gossip. Before those @@ -1630,6 +1637,12 @@ def aggregate(self, store: LstarStore) -> tuple[LstarStore, list[SignedAggregate - Consumed gossip signatures are removed. - Newly produced proofs are recorded for future reuse. + + ``aggregation_interval_start`` (``perf_counter`` value) anchors + ``lean_pq_sig_aggregated_signatures_building_time_seconds``: wall time + from the beginning of the aggregation slot interval (interval 2) until + each STARK proof completes. Pass it from ``tick_interval`` on the + aggregator-duty path only. """ validators = store.states[store.head].validators gossip_sigs = store.attestation_signatures @@ -1720,6 +1733,10 @@ def aggregate(self, store: LstarStore) -> tuple[LstarStore, list[SignedAggregate message=hash_tree_root(data), slot=data.slot, ) + self._observe_aggregated_signature_produced( + proof, + aggregation_interval_start=aggregation_interval_start, + ) new_aggregates.append(SignedAggregatedAttestation(data=data, proof=proof)) # ── Store bookkeeping ──────────────────────────────────────── @@ -1743,6 +1760,22 @@ def aggregate(self, store: LstarStore) -> tuple[LstarStore, list[SignedAggregate } ), new_aggregates + def _observe_aggregated_signature_produced( + self, + proof: TypeOneMultiSignature, + *, + aggregation_interval_start: float | None, + ) -> None: + """Record PQ-signature production metrics for one aggregate.""" + if aggregation_interval_start is not None: + metrics.lean_pq_sig_aggregated_signatures_building_time_seconds.observe( + time.perf_counter() - aggregation_interval_start + ) + metrics.lean_pq_sig_aggregated_signatures_total.inc() + metrics.lean_pq_sig_attestations_in_aggregated_signatures_total.inc( + len(proof.participants.to_validator_indices()) + ) + def tick_interval( self, store: LstarStore, @@ -1766,7 +1799,11 @@ def tick_interval( if current_interval == Interval(0) and has_proposal: store = self.accept_new_attestations(store) elif current_interval == Interval(2) and is_aggregator: - store, new_aggregates = self.aggregate(store) + interval_start = time.perf_counter() + store, new_aggregates = self.aggregate( + store, + aggregation_interval_start=interval_start, + ) elif current_interval == Interval(3): store = self.update_safe_target(store) elif current_interval == Interval(4): diff --git a/src/lean_spec/subspecs/metrics/registry.py b/src/lean_spec/subspecs/metrics/registry.py index d8306a4e2..cd0f9836d 100644 --- a/src/lean_spec/subspecs/metrics/registry.py +++ b/src/lean_spec/subspecs/metrics/registry.py @@ -45,6 +45,9 @@ REORG_DEPTH_BUCKETS = (1, 2, 3, 5, 7, 10, 20, 30, 50, 100) """Block count. Reorg depths above 10 are rare and signal network issues.""" +PQ_SIG_AGGREGATED_BUILDING_TIME_BUCKETS = (0.1, 0.25, 0.5, 0.75, 1, 1.25, 1.5, 2, 4) +"""Seconds. Aggregated-signature STARK prove latency from interval-2 start.""" + class _NoOpMetric: """ @@ -129,6 +132,14 @@ class MetricsRegistry: lean_fork_choice_reorg_depth: Histogram | _NoOpMetric = _NOOP """Number of blocks rolled back during each reorg event.""" + # PQ signature (leanMetrics: PQ Signature Metrics) + lean_pq_sig_aggregated_signatures_total: Counter | _NoOpMetric = _NOOP + """Running count of aggregated signatures produced.""" + lean_pq_sig_attestations_in_aggregated_signatures_total: Counter | _NoOpMetric = _NOOP + """Running count of validator attestations folded into produced aggregates.""" + lean_pq_sig_aggregated_signatures_building_time_seconds: Histogram | _NoOpMetric = _NOOP + """Wall time from aggregation interval start until STARK proof completion.""" + # State transition lean_latest_justified_slot: Gauge | _NoOpMetric = _NOOP """Slot of the most recently justified checkpoint.""" @@ -247,6 +258,29 @@ def init( registry=reg, ) + # PQ signature (leanMetrics: PQ Signature Metrics) + self.lean_pq_sig_aggregated_signatures_total = Counter( + "lean_pq_sig_aggregated_signatures_total", + "Total number of aggregated signatures.", + registry=reg, + ) + self.lean_pq_sig_attestations_in_aggregated_signatures_total = Counter( + "lean_pq_sig_attestations_in_aggregated_signatures_total", + "Total number of attestations included into aggregated signatures.", + registry=reg, + ) + self.lean_pq_sig_aggregated_signatures_building_time_seconds = Histogram( + "lean_pq_sig_aggregated_signatures_building_time_seconds", + ( + "Wall-clock time from the start of the aggregation slot interval " + "(interval 2) until completion of the STARK proof " + "(TypeOneMultiSignature.aggregate). One observation per produced " + "aggregate on the aggregator-duty path." + ), + buckets=PQ_SIG_AGGREGATED_BUILDING_TIME_BUCKETS, + registry=reg, + ) + # State transition (leanMetrics: State Transition Metrics) self.lean_latest_justified_slot = Gauge( "lean_latest_justified_slot", diff --git a/tests/lean_spec/forks/lstar/forkchoice/test_store_attestations.py b/tests/lean_spec/forks/lstar/forkchoice/test_store_attestations.py index 496895b31..71b28a702 100644 --- a/tests/lean_spec/forks/lstar/forkchoice/test_store_attestations.py +++ b/tests/lean_spec/forks/lstar/forkchoice/test_store_attestations.py @@ -814,3 +814,52 @@ def test_gossip_to_aggregation_to_storage( message=hash_tree_root(attestation_data), slot=attestation_data.slot, ) + + +def test_interval_2_records_pq_sig_building_time_metric( + key_manager: XmssKeyManager, spec: LstarSpec +) -> None: + """Interval-2 aggregation observes building-time from interval start to STARK done.""" + from prometheus_client import CollectorRegistry + + from lean_spec.subspecs.metrics.registry import registry as metrics_registry + + metrics_registry.reset() + metrics_registry._initialized = False + test_reg = CollectorRegistry() + metrics_registry.init(registry=test_reg) + + num_validators = 4 + store = make_store(num_validators=num_validators, key_manager=key_manager) + store = store.model_copy(update={"time": Interval.from_slot(Slot(1))}) + attestation_data = spec.produce_attestation_data(store, Slot(1)) + + for vid in (ValidatorIndex(1), ValidatorIndex(2)): + signed_attestation = SignedAttestation( + validator_id=vid, + data=attestation_data, + signature=key_manager.sign_attestation_data(vid, attestation_data), + ) + store = spec.on_gossip_attestation(store, signed_attestation, is_aggregator=True) + + store = store.model_copy(update={"time": Uint64(1)}) + store, aggregates = spec.tick_interval(store, has_proposal=False, is_aggregator=True) + + assert len(aggregates) == 1 + assert test_reg.get_sample_value("lean_pq_sig_aggregated_signatures_total") == 1.0 + assert ( + test_reg.get_sample_value("lean_pq_sig_attestations_in_aggregated_signatures_total") + == 2.0 + ) + assert ( + test_reg.get_sample_value("lean_pq_sig_aggregated_signatures_building_time_seconds_count") + == 1.0 + ) + building_sum = test_reg.get_sample_value( + "lean_pq_sig_aggregated_signatures_building_time_seconds_sum" + ) + assert building_sum is not None + assert building_sum >= 0.0 + + metrics_registry.reset() + metrics_registry._initialized = False diff --git a/tests/lean_spec/subspecs/metrics/test_registry.py b/tests/lean_spec/subspecs/metrics/test_registry.py new file mode 100644 index 000000000..84cf3dfc2 --- /dev/null +++ b/tests/lean_spec/subspecs/metrics/test_registry.py @@ -0,0 +1,38 @@ +"""Tests for the Prometheus metrics registry.""" + +from __future__ import annotations + +from collections.abc import Iterator + +import pytest +from prometheus_client import CollectorRegistry + +from lean_spec.subspecs.metrics.registry import registry + + +@pytest.fixture(autouse=True) +def _reset_registry() -> Iterator[None]: + """Ensure metrics are uninitialized before and after each test.""" + registry.reset() + registry._initialized = False + yield + registry.reset() + registry._initialized = False + + +def test_pq_sig_aggregated_signature_metrics_registered() -> None: + """PQ-signature production metrics are registered on init.""" + test_reg = CollectorRegistry() + registry.init(registry=test_reg) + + assert test_reg.get_sample_value("lean_pq_sig_aggregated_signatures_total") == 0.0 + assert ( + test_reg.get_sample_value("lean_pq_sig_attestations_in_aggregated_signatures_total") + == 0.0 + ) + assert ( + test_reg.get_sample_value( + "lean_pq_sig_aggregated_signatures_building_time_seconds_count", + ) + == 0.0 + ) From 469d906e1dcb0cf9d93f07f468782db7dbeaedae Mon Sep 17 00:00:00 2001 From: ch4r10t33r Date: Wed, 27 May 2026 17:41:22 +0100 Subject: [PATCH 2/2] metrics: anchor aggregate building time at interval-2 wall start MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously the building-time clock was captured as time.perf_counter() inside tick_interval, immediately before calling aggregate(). That measures 'time since tick_interval fired' — not 'time since the aggregation interval boundary'. Host scheduling lag would silently under-report. Anchor instead at the wall-clock interval-2 start derived from chain config (genesis_time + store.time * MILLISECONDS_PER_INTERVAL). At each STARK proof completion observe time.time() - interval_start, clamped at 0 to guard against clock skew. aggregate() no longer needs the perf_counter parameter from tick_interval. Update the integration test to use a realistic genesis_time so the recorded duration is bounded (< 60s), which would catch any future regression that re-anchors against unix epoch. --- src/lean_spec/forks/lstar/spec.py | 43 +++++++++---------- .../forkchoice/test_store_attestations.py | 22 ++++++++-- 2 files changed, 40 insertions(+), 25 deletions(-) diff --git a/src/lean_spec/forks/lstar/spec.py b/src/lean_spec/forks/lstar/spec.py index 874affbaa..b834e7253 100644 --- a/src/lean_spec/forks/lstar/spec.py +++ b/src/lean_spec/forks/lstar/spec.py @@ -32,6 +32,7 @@ INTERVALS_PER_SLOT, JUSTIFICATION_LOOKBACK_SLOTS, MAX_ATTESTATIONS_DATA, + MILLISECONDS_PER_INTERVAL, ) from lean_spec.subspecs.metrics.registry import registry as metrics from lean_spec.subspecs.observability import ( @@ -1611,12 +1612,7 @@ def update_safe_target(self, store: LstarStore) -> LstarStore: # The head and attestation pools remain unchanged. return store.model_copy(update={"safe_target": safe_target}) - def aggregate( - self, - store: LstarStore, - *, - aggregation_interval_start: float | None = None, - ) -> tuple[LstarStore, list[SignedAggregatedAttestation]]: + def aggregate(self, store: LstarStore) -> tuple[LstarStore, list[SignedAggregatedAttestation]]: """Turn raw validator votes into compact aggregated attestations. Validators cast individual signatures over gossip. Before those @@ -1645,12 +1641,19 @@ def aggregate( - Consumed gossip signatures are removed. - Newly produced proofs are recorded for future reuse. - ``aggregation_interval_start`` (``perf_counter`` value) anchors - ``lean_pq_sig_aggregated_signatures_building_time_seconds``: wall time - from the beginning of the aggregation slot interval (interval 2) until - each STARK proof completes. Pass it from ``tick_interval`` on the - aggregator-duty path only. + ``lean_pq_sig_aggregated_signatures_building_time_seconds`` is anchored + at the wall-clock start of the current aggregation interval (derived + from ``store.config.genesis_time`` and ``store.time``) and recorded at + each STARK proof completion. ``aggregate`` is the interval-2 action, + so ``store.time`` IS that interval. """ + # Wall-clock start of the current interval. Derived from chain config + # so the metric reflects "interval boundary → proof done", independent + # of when the host scheduler actually fired tick_interval. + interval_start_unix_seconds = ( + float(store.config.genesis_time) + + int(store.time) * float(MILLISECONDS_PER_INTERVAL) / 1000.0 + ) validators = store.states[store.head].validators gossip_sigs = store.attestation_signatures new = store.latest_new_aggregated_payloads @@ -1742,7 +1745,7 @@ def aggregate( ) self._observe_aggregated_signature_produced( proof, - aggregation_interval_start=aggregation_interval_start, + interval_start_unix_seconds=interval_start_unix_seconds, ) new_aggregates.append(SignedAggregatedAttestation(data=data, proof=proof)) @@ -1771,13 +1774,13 @@ def _observe_aggregated_signature_produced( self, proof: TypeOneMultiSignature, *, - aggregation_interval_start: float | None, + interval_start_unix_seconds: float, ) -> None: """Record PQ-signature production metrics for one aggregate.""" - if aggregation_interval_start is not None: - metrics.lean_pq_sig_aggregated_signatures_building_time_seconds.observe( - time.perf_counter() - aggregation_interval_start - ) + # Clamp to 0 to guard against clock skew or tests with future-dated + # genesis_time; building time is physically non-negative. + elapsed = max(0.0, time.time() - interval_start_unix_seconds) + metrics.lean_pq_sig_aggregated_signatures_building_time_seconds.observe(elapsed) metrics.lean_pq_sig_aggregated_signatures_total.inc() metrics.lean_pq_sig_attestations_in_aggregated_signatures_total.inc( len(proof.participants.to_validator_indices()) @@ -1806,11 +1809,7 @@ def tick_interval( if current_interval == Interval(0) and has_proposal: store = self.accept_new_attestations(store) elif current_interval == Interval(2) and is_aggregator: - interval_start = time.perf_counter() - store, new_aggregates = self.aggregate( - store, - aggregation_interval_start=interval_start, - ) + store, new_aggregates = self.aggregate(store) elif current_interval == Interval(3): store = self.update_safe_target(store) elif current_interval == Interval(4): diff --git a/tests/lean_spec/forks/lstar/forkchoice/test_store_attestations.py b/tests/lean_spec/forks/lstar/forkchoice/test_store_attestations.py index 1601eb7f5..c11e7de13 100644 --- a/tests/lean_spec/forks/lstar/forkchoice/test_store_attestations.py +++ b/tests/lean_spec/forks/lstar/forkchoice/test_store_attestations.py @@ -2,6 +2,8 @@ from __future__ import annotations +import time + import pytest from consensus_testing.keys import XmssKeyManager from prometheus_client import CollectorRegistry @@ -820,13 +822,25 @@ def test_gossip_to_aggregation_to_storage( def test_interval_2_records_pq_sig_building_time_metric( key_manager: XmssKeyManager, spec: LstarSpec ) -> None: - """Interval-2 aggregation observes building-time from interval start to STARK done.""" + """Interval-2 aggregation observes building-time from interval-2 wall-clock start. + + The metric anchors at ``genesis_time + store.time * interval_duration``. + The elapsed value at proof completion should be small (under one slot) + when ``genesis_time`` is set to roughly "now" minus interval-2 offset. + """ metrics_registry.reset() test_reg = CollectorRegistry() metrics_registry.init(registry=test_reg) try: num_validators = 4 - store = make_store(num_validators=num_validators, key_manager=key_manager) + # Anchor genesis_time so interval 2 of slot 0 has just begun. + # interval-2 wall start = genesis_time + 2 * 0.8s = genesis_time + 1.6s + genesis_time = int(time.time()) - 2 + store = make_store( + num_validators=num_validators, + key_manager=key_manager, + genesis_time=genesis_time, + ) # Set time so gossip-attestation accepts votes for Slot(1); the override # below moves time back to interval 1, so the next tick lands on interval 2. store = store.model_copy(update={"time": Interval.from_slot(Slot(1))}) @@ -859,6 +873,8 @@ def test_interval_2_records_pq_sig_building_time_metric( "lean_pq_sig_aggregated_signatures_building_time_seconds_sum" ) assert building_sum is not None - assert building_sum >= 0.0 + # Anchored at interval-2 wall start (~now - 0.4s), then STARK prove + # runs. Bound at 60s catches anchoring against unix epoch. + assert 0.0 <= building_sum < 60.0 finally: metrics_registry.reset()