Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,9 @@ def _metrics_response(_store: Store, _fixture: "ApiEndpointTest") -> dict[str, A
"lean_attestation_validation_time_seconds",
"lean_fork_choice_reorgs_total",
"lean_fork_choice_reorg_depth",
"lean_pq_sig_aggregated_signatures_total",
"lean_pq_sig_attestations_in_aggregated_signatures_total",
"lean_pq_sig_aggregated_signatures_building_time_seconds",
"lean_attestation_aggregate_coverage_validators",
"lean_attestation_aggregate_coverage_subnets",
"lean_attestation_aggregate_coverage_diff_validators",
Expand Down
36 changes: 36 additions & 0 deletions src/lean_spec/forks/lstar/spec.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Lstar fork — identity and construction facade."""

import math
import time
from collections import defaultdict
from collections.abc import Iterable, Sequence, Set as AbstractSet
from typing import Any, ClassVar
Expand Down Expand Up @@ -31,7 +32,9 @@
INTERVALS_PER_SLOT,
JUSTIFICATION_LOOKBACK_SLOTS,
MAX_ATTESTATIONS_DATA,
MILLISECONDS_PER_INTERVAL,
)
from lean_spec.subspecs.metrics.registry import registry as metrics
from lean_spec.subspecs.observability import (
observe_on_attestation,
observe_on_block,
Expand Down Expand Up @@ -1637,7 +1640,20 @@ def aggregate(self, store: LstarStore) -> tuple[LstarStore, list[SignedAggregate

- Consumed gossip signatures are removed.
- Newly produced proofs are recorded for future reuse.

``lean_pq_sig_aggregated_signatures_building_time_seconds`` is anchored
at the wall-clock start of the current aggregation interval (derived
from ``store.config.genesis_time`` and ``store.time``) and recorded at
each STARK proof completion. ``aggregate`` is the interval-2 action,
so ``store.time`` IS that interval.
"""
# Wall-clock start of the current interval. Derived from chain config
# so the metric reflects "interval boundary → proof done", independent
# of when the host scheduler actually fired tick_interval.
interval_start_unix_seconds = (
float(store.config.genesis_time)
+ int(store.time) * float(MILLISECONDS_PER_INTERVAL) / 1000.0
)
validators = store.states[store.head].validators
gossip_sigs = store.attestation_signatures
new = store.latest_new_aggregated_payloads
Expand Down Expand Up @@ -1727,6 +1743,10 @@ def aggregate(self, store: LstarStore) -> tuple[LstarStore, list[SignedAggregate
message=hash_tree_root(data),
slot=data.slot,
)
self._observe_aggregated_signature_produced(
proof,
interval_start_unix_seconds=interval_start_unix_seconds,
)
new_aggregates.append(SignedAggregatedAttestation(data=data, proof=proof))

# ── Store bookkeeping ────────────────────────────────────────
Expand All @@ -1750,6 +1770,22 @@ def aggregate(self, store: LstarStore) -> tuple[LstarStore, list[SignedAggregate
}
), new_aggregates

def _observe_aggregated_signature_produced(
self,
proof: TypeOneMultiSignature,
*,
interval_start_unix_seconds: float,
) -> None:
"""Record PQ-signature production metrics for one aggregate."""
# Clamp to 0 to guard against clock skew or tests with future-dated
# genesis_time; building time is physically non-negative.
elapsed = max(0.0, time.time() - interval_start_unix_seconds)
metrics.lean_pq_sig_aggregated_signatures_building_time_seconds.observe(elapsed)
metrics.lean_pq_sig_aggregated_signatures_total.inc()
metrics.lean_pq_sig_attestations_in_aggregated_signatures_total.inc(
len(proof.participants.to_validator_indices())
)

def tick_interval(
self,
store: LstarStore,
Expand Down
34 changes: 34 additions & 0 deletions src/lean_spec/subspecs/metrics/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@
REORG_DEPTH_BUCKETS = (1, 2, 3, 5, 7, 10, 20, 30, 50, 100)
"""Block count. Reorg depths above 10 are rare and signal network issues."""

PQ_SIG_AGGREGATED_BUILDING_TIME_BUCKETS = (0.1, 0.25, 0.5, 0.75, 1, 1.25, 1.5, 2, 4)
"""Seconds. Aggregated-signature STARK prove latency from interval-2 start."""

# Section labels for attestation aggregate coverage gauges. These match the
# names printed in slot/report logs: timely, late, block, combined,
# agg_start_new, proposal_payloads, proposal_gossip, and proposal_combined.
Expand Down Expand Up @@ -154,6 +157,14 @@ class MetricsRegistry:
lean_attestation_aggregate_coverage_diff_validators: Gauge | _NoOpMetric = _NOOP
"""Validator coverage delta between block payloads and timely pre-merge payloads."""

# PQ signature (leanMetrics: PQ Signature Metrics)
lean_pq_sig_aggregated_signatures_total: Counter | _NoOpMetric = _NOOP
"""Running count of aggregated signatures produced."""
lean_pq_sig_attestations_in_aggregated_signatures_total: Counter | _NoOpMetric = _NOOP
"""Running count of validator attestations folded into produced aggregates."""
lean_pq_sig_aggregated_signatures_building_time_seconds: Histogram | _NoOpMetric = _NOOP
"""Wall time from aggregation interval start until STARK proof completion."""

# State transition
lean_latest_justified_slot: Gauge | _NoOpMetric = _NOOP
"""Slot of the most recently justified checkpoint."""
Expand Down Expand Up @@ -316,6 +327,29 @@ def init(
direction=direction,
).set(0)

# PQ signature (leanMetrics: PQ Signature Metrics)
self.lean_pq_sig_aggregated_signatures_total = Counter(
"lean_pq_sig_aggregated_signatures_total",
"Total number of aggregated signatures.",
registry=reg,
)
self.lean_pq_sig_attestations_in_aggregated_signatures_total = Counter(
"lean_pq_sig_attestations_in_aggregated_signatures_total",
"Total number of attestations included into aggregated signatures.",
registry=reg,
)
self.lean_pq_sig_aggregated_signatures_building_time_seconds = Histogram(
"lean_pq_sig_aggregated_signatures_building_time_seconds",
(
"Wall-clock time from the start of the aggregation slot interval "
"(interval 2) until completion of the STARK proof "
"(TypeOneMultiSignature.aggregate). One observation per produced "
"aggregate on the aggregator-duty path."
),
buckets=PQ_SIG_AGGREGATED_BUILDING_TIME_BUCKETS,
registry=reg,
)

# State transition (leanMetrics: State Transition Metrics)
self.lean_latest_justified_slot = Gauge(
"lean_latest_justified_slot",
Expand Down
65 changes: 65 additions & 0 deletions tests/lean_spec/forks/lstar/forkchoice/test_store_attestations.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@

from __future__ import annotations

import time

import pytest
from consensus_testing.keys import XmssKeyManager
from prometheus_client import CollectorRegistry

from lean_spec.forks.lstar import AttestationSignatureEntry
from lean_spec.forks.lstar.containers.attestation import (
Expand All @@ -14,6 +17,7 @@
from lean_spec.forks.lstar.spec import LstarSpec
from lean_spec.subspecs.chain.clock import Interval
from lean_spec.subspecs.chain.config import INTERVALS_PER_SLOT
from lean_spec.subspecs.metrics.registry import registry as metrics_registry
from lean_spec.subspecs.ssz.hash import hash_tree_root
from lean_spec.subspecs.xmss.aggregation import TypeOneMultiSignature
from lean_spec.types import (
Expand Down Expand Up @@ -813,3 +817,64 @@ def test_gossip_to_aggregation_to_storage(
message=hash_tree_root(attestation_data),
slot=attestation_data.slot,
)


def test_interval_2_records_pq_sig_building_time_metric(
key_manager: XmssKeyManager, spec: LstarSpec
) -> None:
"""Interval-2 aggregation observes building-time from interval-2 wall-clock start.

The metric anchors at ``genesis_time + store.time * interval_duration``.
The elapsed value at proof completion should be small (under one slot)
when ``genesis_time`` is set to roughly "now" minus interval-2 offset.
"""
metrics_registry.reset()
test_reg = CollectorRegistry()
metrics_registry.init(registry=test_reg)
try:
num_validators = 4
# Anchor genesis_time so interval 2 of slot 0 has just begun.
# interval-2 wall start = genesis_time + 2 * 0.8s = genesis_time + 1.6s
genesis_time = int(time.time()) - 2
store = make_store(
num_validators=num_validators,
key_manager=key_manager,
genesis_time=genesis_time,
)
# Set time so gossip-attestation accepts votes for Slot(1); the override
# below moves time back to interval 1, so the next tick lands on interval 2.
store = store.model_copy(update={"time": Interval.from_slot(Slot(1))})
attestation_data = spec.produce_attestation_data(store, Slot(1))

for vid in (ValidatorIndex(1), ValidatorIndex(2)):
signed_attestation = SignedAttestation(
validator_id=vid,
data=attestation_data,
signature=key_manager.sign_attestation_data(vid, attestation_data),
)
store = spec.on_gossip_attestation(store, signed_attestation, is_aggregator=True)

store = store.model_copy(update={"time": Interval(1)})
store, aggregates = spec.tick_interval(store, has_proposal=False, is_aggregator=True)

assert len(aggregates) == 1
assert test_reg.get_sample_value("lean_pq_sig_aggregated_signatures_total") == 1.0
assert (
test_reg.get_sample_value("lean_pq_sig_attestations_in_aggregated_signatures_total")
== 2.0
)
assert (
test_reg.get_sample_value(
"lean_pq_sig_aggregated_signatures_building_time_seconds_count"
)
== 1.0
)
building_sum = test_reg.get_sample_value(
"lean_pq_sig_aggregated_signatures_building_time_seconds_sum"
)
assert building_sum is not None
# Anchored at interval-2 wall start (~now - 0.4s), then STARK prove
# runs. Bound at 60s catches anchoring against unix epoch.
assert 0.0 <= building_sum < 60.0
finally:
metrics_registry.reset()
19 changes: 17 additions & 2 deletions tests/lean_spec/subspecs/metrics/test_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,25 @@
def _reset_registry() -> Iterator[None]:
"""Ensure metrics are uninitialized before and after each test."""
registry.reset()
registry._initialized = False
yield
registry.reset()
registry._initialized = False


def test_pq_sig_aggregated_signature_metrics_registered() -> None:
"""PQ-signature production metrics are registered on init."""
test_reg = CollectorRegistry()
registry.init(registry=test_reg)

assert test_reg.get_sample_value("lean_pq_sig_aggregated_signatures_total") == 0.0
assert (
test_reg.get_sample_value("lean_pq_sig_attestations_in_aggregated_signatures_total") == 0.0
)
assert (
test_reg.get_sample_value(
"lean_pq_sig_aggregated_signatures_building_time_seconds_count",
)
== 0.0
)


def test_attestation_aggregate_coverage_metrics_registered() -> None:
Expand Down
Loading