Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion tests/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,33 @@ per-cell stdout/stderr logs end up at `/tmp/devourer-regress-last/`.
- `--vm-name NAME` / `--vm-ssh USER@HOST` — enter VM mode
- `--keep-logs` — symlink the temp log dir at `/tmp/devourer-regress-last`

Environment variable equivalents: `DEVOURER_VM_NAME`, `DEVOURER_VM_SSH`.
Environment variable equivalents: `DEVOURER_VM_NAME`, `DEVOURER_VM_SSH`,
`DEVOURER_SNIFFER_IFACE`.

### `--sniffer-iface` — on-air encoding verification

When set, each matrix cell additionally captures on a third (host-local)
monitor iface and reports the decoded radiotap encoding distribution
alongside the hit count. Composes with `--full-matrix` and
`--encoding-matrix`. Intended for an AR9271 (vanilla radiotap, no
driver-side flag filtering).

```bash
sudo python3 tests/regress.py --encoding-matrix \
--tx-pid 0x8813 --rx-pid 0x0120 --channel 100 \
--vm-name devourer-testrig --vm-ssh <user>@<VM-IP> \
--sniffer-iface wlan0mon
```

Per-cell output gains a `↪ sniffer: N frames — <encoding>=N, ...` line
showing what actually flew. If the `--ldpc` injection comes back tagged
as BCC, mac80211 / the OOT driver stripped the flag before the air —
the chip-side RX never had to refuse an LDPC frame, so any
`k/d`-row-flat result for LDPC means the test setup, not the chip.

The sniffer is host-only and never moved through the VM USB
passthrough. The same helper, run standalone outside the matrix, is
`tests/sniff_air.py` (see below).

## Specialized modes

Expand Down
131 changes: 130 additions & 1 deletion tests/regress.py
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,78 @@ def _spawn_kernel_tx(
return kh.popen(cmd, stdout=fh, stderr=subprocess.STDOUT)


def _spawn_sniffer(
iface: str, channel: int, pcap_path: Path,
) -> subprocess.Popen:
"""Optional 3rd-adapter on-air verifier. Puts `iface` in monitor mode
on `channel` and runs tcpdump → pcap filtered on the canonical SA.
Lets sniff_air's radiotap parser later report what encoding actually
flew (vs what inject_beacon.py / txdemo *requested*). Host-side only;
sniffer adapters don't get moved into the VM. AR9271 is the canonical
sniffer chip — vanilla radiotap, no driver-side flag filtering."""
# Reuse the local iface_to_monitor logic via subprocess (matches the
# Local-mode path of KernelHost.iface_to_monitor; we don't go through
# the kh abstraction because the sniffer never leaves the host).
for c in [
["ip", "link", "set", iface, "down"],
["iw", "dev", iface, "set", "type", "monitor"],
["ip", "link", "set", iface, "up"],
["iw", "dev", iface, "set", "channel", str(channel)],
]:
subprocess.run(c, check=False)
fh = open(pcap_path.with_suffix(".tcpdump.log"), "w")
sa_str = ":".join(
f"{b:02x}" for b in bytes.fromhex(CANONICAL_SA.replace(":", ""))
)
return subprocess.Popen(
["tcpdump", "-i", iface, "-w", str(pcap_path), "-U", "-nn",
f"ether src {sa_str}"],
stdout=fh, stderr=subprocess.STDOUT, start_new_session=True,
)


def _summarise_sniffer_pcap(pcap_path: Path) -> str:
"""Parse the captured pcap and return a one-line summary of encoding
distribution. Returns an empty string if the pcap is missing or empty —
callers should treat that as "no on-air data" rather than a failure."""
try:
# Local import: sniff_air sits next to regress.py.
sys.path.insert(0, str(Path(__file__).resolve().parent))
import sniff_air # type: ignore
except ImportError:
return ""
if not pcap_path.exists() or pcap_path.stat().st_size == 0:
return ""
from collections import Counter
buckets: Counter = Counter()
total = 0
for frame in sniff_air._read_pcap_frames(pcap_path):
sa = sniff_air._frame_sa(frame)
if sa != sniff_air.CANONICAL_SA:
continue
total += 1
info = sniff_air._parse_radiotap(frame)
if info is None:
buckets["parse-error"] += 1
continue
if info["kind"] == "VHT":
buckets[f"VHT MCS{info['mcs']}/NSS{info['nss']} "
f"{'LDPC' if info['ldpc'] else 'BCC'} "
f"{info['bw']}MHz STBC={int(info['stbc'])}"] += 1
elif info["kind"] == "HT":
buckets[f"HT MCS{info['mcs']} "
f"{'LDPC' if info['ldpc'] else 'BCC'} "
f"{info['bw']}MHz STBC={int(info['stbc'])}"] += 1
else:
buckets["legacy"] += 1
if total == 0:
return "sniffer: 0 frames matched"
parts = [
f"{label}={n}" for label, n in buckets.most_common(3)
]
return f"sniffer: {total} frames — " + ", ".join(parts)


def _terminate(proc: subprocess.Popen, grace: float = 2.0) -> None:
if proc.poll() is not None:
return
Expand Down Expand Up @@ -733,23 +805,32 @@ def run_cell(
tmpdir: Path,
kh: KernelHost,
encoding: Optional[dict] = None,
sniffer_iface: Optional[str] = None,
) -> CellResult:
"""Run one matrix cell. State contract: always restore DUTs to a clean
baseline (host kernel-bound) on exit via try/finally.

`encoding` (optional dict, used by --encoding-matrix) passes radiotap
TX encoding flags through to the TX-side spawn: kernel TX → injector
CLI args, devourer TX → DEVOURER_TX_* env vars."""
CLI args, devourer TX → DEVOURER_TX_* env vars.

`sniffer_iface` (optional, --sniffer-iface) puts a 3rd-adapter monitor
iface into capture alongside the cell. The pcap is parsed at cell-end
via sniff_air's radiotap decoder and a one-line summary appended to
the CellResult.notes. Lets the matrix prove what encoding actually
flew, vs what inject_beacon.py / txdemo *requested*."""
cell_id = f"tx-{tx_side}_rx-{rx_side}"
tx_log = tmpdir / f"{cell_id}.tx.log"
rx_log = tmpdir / f"{cell_id}.rx.log"
sniffer_pcap = tmpdir / f"{cell_id}.sniffer.pcap" if sniffer_iface else None

# Stage 1: route DUTs to their target machines for this cell.
_ensure_dut_location(tx_dut, want_at_kernel_host=(tx_side == "kernel"), kh=kh)
_ensure_dut_location(rx_dut, want_at_kernel_host=(rx_side == "kernel"), kh=kh)

rx_proc: Optional[subprocess.Popen] = None
tx_proc: Optional[subprocess.Popen] = None
sniffer_proc: Optional[subprocess.Popen] = None
try:
# Stage 2: bring up RX side first so it's listening when TX begins.
if rx_side == "devourer":
Expand All @@ -761,6 +842,20 @@ def run_cell(
rx_proc = _spawn_kernel_rx(kh, rx_iface, channel, rx_log)
time.sleep(1.0)

# Stage 2b: 3rd-adapter sniffer (optional, host-local). Starts
# before TX so it captures the full window.
if sniffer_iface and sniffer_pcap is not None:
try:
sniffer_proc = _spawn_sniffer(
sniffer_iface, channel, sniffer_pcap
)
time.sleep(0.5)
except Exception as e:
# Sniffer is observational — never fail the cell on
# sniffer issues, just record and move on.
print(f" ! sniffer failed to start: {e}", flush=True)
sniffer_proc = None

# Stage 3: TX side.
if tx_side == "devourer":
tx_proc = _spawn_devourer_tx(
Expand Down Expand Up @@ -795,6 +890,8 @@ def run_cell(
_terminate(tx_proc)
time.sleep(1.0)
_terminate(rx_proc)
if sniffer_proc is not None:
_terminate(sniffer_proc)

if rx_side == "devourer":
hits = _count_devourer_rx_hits(rx_log)
Expand All @@ -806,18 +903,25 @@ def run_cell(
tx_attempts = _count_kernel_tx_sent(tx_log)
tx_failures = 0

notes = ""
if sniffer_pcap is not None:
notes = _summarise_sniffer_pcap(sniffer_pcap)

return CellResult(
hits=hits,
tx_attempts=tx_attempts,
tx_failures=tx_failures,
duration_s=measure_end - measure_start,
notes=notes,
)
finally:
# Restore clean baseline so the next cell starts from a known state.
if tx_proc is not None and tx_proc.poll() is None:
_terminate(tx_proc)
if rx_proc is not None and rx_proc.poll() is None:
_terminate(rx_proc)
if sniffer_proc is not None and sniffer_proc.poll() is None:
_terminate(sniffer_proc)
# Pull DUTs back to the host (so the next cell can choose freely).
if kh.is_remote:
kh.release_dut(tx_dut)
Expand All @@ -843,6 +947,7 @@ def run_matrix(
tmpdir: Path,
kh: KernelHost,
abort_on_baseline_fail: bool = True,
sniffer_iface: Optional[str] = None,
) -> dict[tuple[str, str], CellResult]:
cells = [
("kernel", "kernel"), # baseline — rig sanity
Expand All @@ -859,13 +964,16 @@ def run_matrix(
r = run_cell(
devourer_root, tx_dut, rx_dut, tx_side, rx_side,
channel, duration, tmpdir, kh,
sniffer_iface=sniffer_iface,
)
except Exception as e:
print(f" ✗ cell crashed: {e}", flush=True)
r = CellResult(hits=0, tx_attempts=0, tx_failures=0,
duration_s=0.0, notes=str(e))
results[(tx_side, rx_side)] = r
print(f" → {r.fmt(threshold)}", flush=True)
if r.notes and sniffer_iface:
print(f" ↪ {r.notes}", flush=True)
if (
(tx_side, rx_side) == ("kernel", "kernel")
and not r.passed(threshold)
Expand Down Expand Up @@ -915,6 +1023,7 @@ def run_full_matrix(
threshold: int,
tmpdir: Path,
kh: KernelHost,
sniffer_iface: Optional[str] = None,
) -> dict[tuple[str, str, str, str], CellResult]:
"""Run every ordered (TX, RX) pair of distinct DUTs across all four
driver-side combinations. Returns a dict keyed by
Expand All @@ -936,13 +1045,16 @@ def run_full_matrix(
r = run_cell(
devourer_root, tx_dut, rx_dut, tx_side, rx_side,
channel, duration, tmpdir, kh,
sniffer_iface=sniffer_iface,
)
except Exception as e:
print(f" ✗ cell crashed: {e}", flush=True)
r = CellResult(hits=0, tx_attempts=0, tx_failures=0,
duration_s=0.0, notes=str(e))
results[(tx_side, rx_side, tx_dut.vidpid, rx_dut.vidpid)] = r
print(f" → {r.fmt(threshold)}", flush=True)
if r.notes and sniffer_iface:
print(f" ↪ {r.notes}", flush=True)
return results


Expand Down Expand Up @@ -1024,6 +1136,7 @@ def run_encoding_matrix(
threshold: int,
tmpdir: Path,
kh: KernelHost,
sniffer_iface: Optional[str] = None,
) -> dict[tuple[str, str, str], CellResult]:
"""For one ordered (TX, RX) pair, iterate every driver-mode × encoding
combination. Returns dict keyed by (tx_side, rx_side, encoding_label)."""
Expand All @@ -1043,13 +1156,16 @@ def run_encoding_matrix(
r = run_cell(
devourer_root, tx_dut, rx_dut, tx_side, rx_side,
channel, duration, tmpdir, kh, encoding=enc,
sniffer_iface=sniffer_iface,
)
except Exception as e:
print(f" ✗ cell crashed: {e}", flush=True)
r = CellResult(hits=0, tx_attempts=0, tx_failures=0,
duration_s=0.0, notes=str(e))
results[(tx_side, rx_side, enc_label)] = r
print(f" → {r.fmt(threshold)}", flush=True)
if r.notes and sniffer_iface:
print(f" ↪ {r.notes}", flush=True)
return results


Expand Down Expand Up @@ -1180,6 +1296,16 @@ def main():
help="ssh target (user@host) for the VM (env: DEVOURER_VM_SSH). "
"Required if --vm-name is set.",
)
ap.add_argument(
"--sniffer-iface",
default=os.environ.get("DEVOURER_SNIFFER_IFACE", ""),
help="host monitor-mode iface to attach as a 3rd-adapter sniffer "
"(intended: AR9271). When set, each cell pcap-captures the air "
"and reports decoded encoding distribution alongside the hit "
"count — answers 'what encoding actually flew' for "
"--encoding-matrix runs. Always host-local; never moved to the "
"VM. Env: DEVOURER_SNIFFER_IFACE.",
)
args = ap.parse_args()

if args.vm_name and not args.vm_ssh:
Expand Down Expand Up @@ -1242,6 +1368,7 @@ def pick(pid_arg, default_idx):
channel=args.channel, duration=args.duration,
threshold=args.pass_threshold,
tmpdir=tmpdir, kh=kh,
sniffer_iface=args.sniffer_iface or None,
)
print()
md = emit_encoding_markdown(
Expand Down Expand Up @@ -1284,6 +1411,7 @@ def pick(pid_arg, default_idx):
channel=args.channel, duration=args.duration,
threshold=args.pass_threshold,
tmpdir=tmpdir, kh=kh,
sniffer_iface=args.sniffer_iface or None,
)
print()
md = emit_full_markdown(
Expand Down Expand Up @@ -1326,6 +1454,7 @@ def pick(pid_arg, default_idx):
threshold=args.pass_threshold,
tmpdir=tmpdir, kh=kh,
abort_on_baseline_fail=not args.no_baseline_abort,
sniffer_iface=args.sniffer_iface or None,
)
print()
md = emit_markdown(
Expand Down
Loading