fix(adar1000): correct 1-based channel indexing in setters (issue #90)

The four channel-indexed ADAR1000 setters (adarSetRxPhase, adarSetTxPhase,
adarSetRxVgaGain, adarSetTxVgaGain) computed their register offset as
`(channel & 0x03) * stride`, which silently aliased CH4 (channel=4 ->
mask=0) onto CH1 and shifted CH1..CH3 by one. The API contract (1-based
CH1..CH4) is documented in ADAR1000_AGC.cpp:76 and matches the ADI
datasheet; every existing caller already passes `ch + 1`.

Fix: subtract 1 before masking -- `((channel - 1) & 0x03) * stride` --
and reject `channel < 1 || channel > 4` early with a DIAG message so a
future stale 0-based caller fails loudly instead of writing to CH4.

Adds TestTier1Adar1000ChannelRegisterRoundTrip (9 tests) which closes
the loop independently of the driver:
  - parses the ADI register map directly from ADAR1000_Manager.h,
  - verifies the datasheet stride invariants (gain=1, phase=2),
  - auto-discovers every C++ TU under MCU_LIB_DIR / MCU_CODE_DIR so a
    new caller cannot silently escape the round-trip check,
  - asserts every caller's channel argument evaluates to {1,2,3,4} for
    ch in {0,1,2,3} (catches bare 0-based or literal-0 callers at CI
    time before the runtime bounds-check would silently drop them),
  - round-trips each (caller, ch) through the helper arithmetic and
    checks the final address equals REG_CH{ch+1}_*.

Adversarially validated: reverting any one helper, all four helpers,
corrupting the parsed register map, injecting a bare-ch caller, and
auto-discovering a literal-0 caller in a fresh TU each cause the
expected (and only the expected) test to fail.

Stacked on fix/adar1000-vm-tables (PR #107).
This commit is contained in:
Jason
2026-04-18 06:39:07 +05:45
parent 7c91a3e0b9
commit 582476fa0d
2 changed files with 448 additions and 6 deletions
@@ -868,11 +868,22 @@ void ADAR1000Manager::adarSetRamBypass(uint8_t deviceIndex, uint8_t broadcast) {
} }
void ADAR1000Manager::adarSetRxPhase(uint8_t deviceIndex, uint8_t channel, uint8_t phase, uint8_t broadcast) { void ADAR1000Manager::adarSetRxPhase(uint8_t deviceIndex, uint8_t channel, uint8_t phase, uint8_t broadcast) {
// channel is 1-based (CH1..CH4) per API contract documented in
// ADAR1000_AGC.cpp and matching ADI datasheet terminology.
// Reject out-of-range early so a stale 0-based caller does not
// silently wrap to ((0-1) & 0x03) == 3 and write to CH4.
// See issue #90.
if (channel < 1 || channel > 4) {
DIAG("BF", "adarSetRxPhase: channel %u out of range [1..4], ignored", channel);
return;
}
uint8_t i_val = VM_I[phase % 128]; uint8_t i_val = VM_I[phase % 128];
uint8_t q_val = VM_Q[phase % 128]; uint8_t q_val = VM_Q[phase % 128];
uint32_t mem_addr_i = REG_CH1_RX_PHS_I + (channel & 0x03) * 2; // Subtract 1 to convert 1-based channel to 0-based register offset
uint32_t mem_addr_q = REG_CH1_RX_PHS_Q + (channel & 0x03) * 2; // before masking. See issue #90.
uint32_t mem_addr_i = REG_CH1_RX_PHS_I + ((channel - 1) & 0x03) * 2;
uint32_t mem_addr_q = REG_CH1_RX_PHS_Q + ((channel - 1) & 0x03) * 2;
adarWrite(deviceIndex, mem_addr_i, i_val, broadcast); adarWrite(deviceIndex, mem_addr_i, i_val, broadcast);
adarWrite(deviceIndex, mem_addr_q, q_val, broadcast); adarWrite(deviceIndex, mem_addr_q, q_val, broadcast);
@@ -880,11 +891,16 @@ void ADAR1000Manager::adarSetRxPhase(uint8_t deviceIndex, uint8_t channel, uint8
} }
void ADAR1000Manager::adarSetTxPhase(uint8_t deviceIndex, uint8_t channel, uint8_t phase, uint8_t broadcast) { void ADAR1000Manager::adarSetTxPhase(uint8_t deviceIndex, uint8_t channel, uint8_t phase, uint8_t broadcast) {
// channel is 1-based (CH1..CH4). See issue #90.
if (channel < 1 || channel > 4) {
DIAG("BF", "adarSetTxPhase: channel %u out of range [1..4], ignored", channel);
return;
}
uint8_t i_val = VM_I[phase % 128]; uint8_t i_val = VM_I[phase % 128];
uint8_t q_val = VM_Q[phase % 128]; uint8_t q_val = VM_Q[phase % 128];
uint32_t mem_addr_i = REG_CH1_TX_PHS_I + (channel & 0x03) * 2; uint32_t mem_addr_i = REG_CH1_TX_PHS_I + ((channel - 1) & 0x03) * 2;
uint32_t mem_addr_q = REG_CH1_TX_PHS_Q + (channel & 0x03) * 2; uint32_t mem_addr_q = REG_CH1_TX_PHS_Q + ((channel - 1) & 0x03) * 2;
adarWrite(deviceIndex, mem_addr_i, i_val, broadcast); adarWrite(deviceIndex, mem_addr_i, i_val, broadcast);
adarWrite(deviceIndex, mem_addr_q, q_val, broadcast); adarWrite(deviceIndex, mem_addr_q, q_val, broadcast);
@@ -892,13 +908,23 @@ void ADAR1000Manager::adarSetTxPhase(uint8_t deviceIndex, uint8_t channel, uint8
} }
void ADAR1000Manager::adarSetRxVgaGain(uint8_t deviceIndex, uint8_t channel, uint8_t gain, uint8_t broadcast) { void ADAR1000Manager::adarSetRxVgaGain(uint8_t deviceIndex, uint8_t channel, uint8_t gain, uint8_t broadcast) {
uint32_t mem_addr = REG_CH1_RX_GAIN + (channel & 0x03); // channel is 1-based (CH1..CH4). See issue #90.
if (channel < 1 || channel > 4) {
DIAG("BF", "adarSetRxVgaGain: channel %u out of range [1..4], ignored", channel);
return;
}
uint32_t mem_addr = REG_CH1_RX_GAIN + ((channel - 1) & 0x03);
adarWrite(deviceIndex, mem_addr, gain, broadcast); adarWrite(deviceIndex, mem_addr, gain, broadcast);
adarWrite(deviceIndex, REG_LOAD_WORKING, 0x1, broadcast); adarWrite(deviceIndex, REG_LOAD_WORKING, 0x1, broadcast);
} }
void ADAR1000Manager::adarSetTxVgaGain(uint8_t deviceIndex, uint8_t channel, uint8_t gain, uint8_t broadcast) { void ADAR1000Manager::adarSetTxVgaGain(uint8_t deviceIndex, uint8_t channel, uint8_t gain, uint8_t broadcast) {
uint32_t mem_addr = REG_CH1_TX_GAIN + (channel & 0x03); // channel is 1-based (CH1..CH4). See issue #90.
if (channel < 1 || channel > 4) {
DIAG("BF", "adarSetTxVgaGain: channel %u out of range [1..4], ignored", channel);
return;
}
uint32_t mem_addr = REG_CH1_TX_GAIN + ((channel - 1) & 0x03);
adarWrite(deviceIndex, mem_addr, gain, broadcast); adarWrite(deviceIndex, mem_addr, gain, broadcast);
adarWrite(deviceIndex, REG_LOAD_WORKING, LD_WRK_REGS_LDTX_OVERRIDE, broadcast); adarWrite(deviceIndex, REG_LOAD_WORKING, LD_WRK_REGS_LDTX_OVERRIDE, broadcast);
} }
@@ -26,12 +26,14 @@ layers agree (because both could be wrong).
from __future__ import annotations from __future__ import annotations
import ast
import os import os
import re import re
import struct import struct
import subprocess import subprocess
import tempfile import tempfile
from pathlib import Path from pathlib import Path
from typing import ClassVar
import pytest import pytest
@@ -625,6 +627,420 @@ class TestTier1AgcCrossLayerInvariant:
) )
# ===================================================================
# ADAR1000 channel→register round-trip invariant (issue #90)
# ===================================================================
#
# Ground-truth invariant crossing three system layers:
# Chip (datasheet) -> Driver (MCU helpers) -> Application (callers).
#
# For every logical element ch in {0,1,2,3} (hardware channels CH1..CH4),
# the round-trip
# caller_expr(ch) --> helper_offset(channel) * stride --> base + off
# must land on the physical register REG_CH{ch+1}_* defined in the ADI
# ADAR1000 register map parsed from ADAR1000_Manager.h.
#
# Catches:
# * #90 channel rotation regardless of which side is fixed (caller OR helper).
# * Wrong stride (e.g. phase written with stride 1 instead of 2).
# * Bad mask (e.g. `channel & 0x07`, `channel & 0x01`).
# * Wrong base register in a helper.
# * New setter added with mismatched convention.
# * Caller moved to a file the test no longer scans (fails loudly).
#
# Cannot be defeated by:
# * Renaming/refactoring helper layout: the setter coverage test
# (`test_helper_sites_exist_for_all_setters`) catches missing parse.
# * Changing 0x03 to 3 or adding a named constant: the offset is
# evaluated symbolically via AST, not matched by regex.
def _parse_adar_register_map(header_text):
"""Extract `#define REG_CHn_(RX|TX)_(GAIN|PHS_I|PHS_Q)` values."""
regs = {}
for m in re.finditer(
r"^#define\s+(REG_CH[1-4]_(?:RX|TX)_(?:GAIN|PHS_I|PHS_Q))\s+(0x[0-9A-Fa-f]+)",
header_text,
re.MULTILINE,
):
regs[m.group(1)] = int(m.group(2), 16)
return regs
def _safe_eval_int_expr(expr, **variables):
"""
Evaluate a small integer expression with +, -, *, &, |, ^, ~, <<, >>.
Python's & / | / ^ / ~ / << / >> have the same semantics as C for the
operand widths we care about here (uint8_t after the mask makes the
result fit in 0..3). No floating point, no function calls, no names
outside ``variables``.
SECURITY: ``expr`` MUST come from a trusted source -- specifically,
C/C++ source text under version control in this repository (e.g.
arguments parsed out of ``main.cpp``/``ADAR1000_AGC.cpp``). Although
the AST whitelist below rejects function calls, attribute access,
subscripts, and any name not in ``variables``, ``eval`` is still
invoked on the compiled tree. Do NOT pass user-supplied / network /
GUI input here.
"""
tree = ast.parse(expr, mode="eval")
allowed = (
ast.Expression, ast.BinOp, ast.UnaryOp, ast.Constant,
ast.Name, ast.Load,
ast.Add, ast.Sub, ast.Mult, ast.Mod, ast.FloorDiv,
ast.BitAnd, ast.BitOr, ast.BitXor,
ast.USub, ast.UAdd, ast.Invert,
ast.LShift, ast.RShift,
)
for node in ast.walk(tree):
if not isinstance(node, allowed):
raise ValueError(
f"disallowed AST node {type(node).__name__!s} in `{expr}`"
)
return eval(
compile(tree, "<expr>", "eval"),
{"__builtins__": {}},
variables,
)
def _extract_adar_helper_sites(manager_cpp, setter_names):
"""
For each setter, locate the body of ``void ADAR1000Manager::<setter>``
and return a list of (setter, base_register, offset_expr_c, stride)
for every ``REG_CHn_XXX + <expr>`` memory-address assignment.
"""
sites = []
for setter in setter_names:
m = re.search(
rf"void\s+ADAR1000Manager::{setter}\s*\([^)]*\)\s*\{{(.+?)^\}}",
manager_cpp,
re.MULTILINE | re.DOTALL,
)
if not m:
continue
body = m.group(1)
for access in re.finditer(
r"=\s*(REG_CH[1-4]_(?:RX|TX)_(?:GAIN|PHS_I|PHS_Q))\s*\+\s*([^;]+);",
body,
):
base = access.group(1)
rhs = access.group(2).strip()
# Trailing `* <integer>` = stride multiplier (2 for phase I/Q).
stride_match = re.match(r"(.+?)\s*\*\s*(\d+)\s*$", rhs)
if stride_match:
offset_expr = stride_match.group(1).strip()
stride = int(stride_match.group(2))
else:
offset_expr = rhs
stride = 1
sites.append((setter, base, offset_expr, stride))
return sites
# Method-definition line pattern: `[qualifier...] <ret-type> <Class>::<setter>(`
# Covers: plain `void X::f(`, `inline void X::f(`, `static bool X::f(`, etc.
_DEFN_RE = re.compile(
r"^\s*(?:inline\s+|static\s+|virtual\s+|constexpr\s+|explicit\s+)*"
r"(?:void|bool|uint\w+|int\w*|auto)\s+\S+::\w+\s*\("
)
def _extract_adar_caller_sites(sources, setter):
"""
Find every call ``<obj>.<setter>(dev, <channel_expr>, ...)`` across
``sources = [(filename, text), ...]``. Returns (filename, line_no,
channel_expr) for each. Skips function declarations/definitions.
Arg list up to matching `)`: restricted to a single line. All existing
call sites fit on one line; a future multi-line refactor would drop
callers from the scan, which the round-trip test surfaces loudly via
`assert callers` (rather than silently missing a site).
"""
out = []
call_re = re.compile(rf"\b{setter}\s*\(([^;]*?)\)\s*;")
for filename, text in sources:
for line_no, line in enumerate(text.splitlines(), start=1):
# Skip method definition / declaration lines.
if _DEFN_RE.match(line):
continue
cm = call_re.search(line)
if not cm:
continue
args = _split_top_level_commas(cm.group(1))
if len(args) < 2:
continue
channel_expr = args[1].strip()
out.append((filename, line_no, channel_expr))
return out
def _split_top_level_commas(text):
"""Split on commas that sit at paren-depth 0 (ignores nested calls)."""
parts, depth, cur = [], 0, []
for ch in text:
if ch == "(":
depth += 1
cur.append(ch)
elif ch == ")":
depth -= 1
cur.append(ch)
elif ch == "," and depth == 0:
parts.append("".join(cur))
cur = []
else:
cur.append(ch)
if cur:
parts.append("".join(cur))
return parts
class TestTier1Adar1000ChannelRegisterRoundTrip:
"""
Cross-layer round-trip: caller channel expr -> helper offset formula
-> physical register address must equal REG_CH{ch+1}_* for every
caller and every ch in {0,1,2,3}.
See module-level block comment above and upstream issue #90.
"""
_SETTERS = (
"adarSetRxPhase",
"adarSetTxPhase",
"adarSetRxVgaGain",
"adarSetTxVgaGain",
)
# Register base -> stride override. Parsed values of stride are
# trusted; this table is the independent ground truth for cross-check.
_EXPECTED_STRIDE: ClassVar[dict[str, int]] = {
"REG_CH1_RX_GAIN": 1,
"REG_CH1_TX_GAIN": 1,
"REG_CH1_RX_PHS_I": 2,
"REG_CH1_RX_PHS_Q": 2,
"REG_CH1_TX_PHS_I": 2,
"REG_CH1_TX_PHS_Q": 2,
}
@classmethod
def setup_class(cls):
cls.header_txt = (cp.MCU_LIB_DIR / "ADAR1000_Manager.h").read_text()
cls.manager_txt = (cp.MCU_LIB_DIR / "ADAR1000_Manager.cpp").read_text()
cls.reg_map = _parse_adar_register_map(cls.header_txt)
cls.helper_sites = _extract_adar_helper_sites(
cls.manager_txt, cls._SETTERS,
)
# Auto-discover every C++ TU under the MCU tree so a new caller
# added to e.g. a future ``ADAR1000_Calibration.cpp`` cannot
# silently escape the round-trip check (issue #90 reviewer note).
# Exclude any path containing a ``tests`` segment so this test
# does not parse its own fixtures. The resulting list is
# deterministic (sorted) for reproducible parametrization.
scanned = []
seen = set()
for root in (cp.MCU_LIB_DIR, cp.MCU_CODE_DIR):
for path in sorted(root.rglob("*.cpp")):
if "tests" in path.parts:
continue
if path in seen:
continue
seen.add(path)
scanned.append((path.name, path.read_text()))
cls.sources = scanned
# Sanity: the two TUs known to call ADAR1000 setters at the time
# of issue #90 must be in scope. If a future refactor renames or
# moves them this assert fires loudly rather than silently
# passing an empty round-trip.
scanned_names = {n for (n, _) in scanned}
for required in ("ADAR1000_AGC.cpp", "main.cpp", "ADAR1000_Manager.cpp"):
assert required in scanned_names, (
f"Auto-discovery missed `{required}`; check MCU_LIB_DIR / "
f"MCU_CODE_DIR roots in contract_parser.py."
)
# ---------- Tier A: chip ground truth ----------------------------
def test_register_map_gain_stride_is_one_per_channel(self):
"""Datasheet invariant: RX/TX VGA gain registers are 1 byte apart."""
for kind in ("RX_GAIN", "TX_GAIN"):
for n in range(1, 4):
delta = (
self.reg_map[f"REG_CH{n+1}_{kind}"]
- self.reg_map[f"REG_CH{n}_{kind}"]
)
assert delta == 1, (
f"ADAR1000 register map invariant broken: "
f"REG_CH{n+1}_{kind} - REG_CH{n}_{kind} = {delta}, "
f"datasheet says 1. Either the header was mis-edited "
f"or ADI released a part with a different map."
)
def test_register_map_phase_stride_is_two_per_channel(self):
"""Datasheet invariant: phase I/Q pairs occupy 2 bytes per channel."""
for kind in ("RX_PHS_I", "RX_PHS_Q", "TX_PHS_I", "TX_PHS_Q"):
for n in range(1, 4):
delta = (
self.reg_map[f"REG_CH{n+1}_{kind}"]
- self.reg_map[f"REG_CH{n}_{kind}"]
)
assert delta == 2, (
f"ADAR1000 register map invariant broken: "
f"REG_CH{n+1}_{kind} - REG_CH{n}_{kind} = {delta}, "
f"datasheet says 2."
)
# ---------- Tier B: driver parses cleanly -------------------------
def test_helper_sites_exist_for_all_setters(self):
"""Every channel-indexed setter must parse at least one register access."""
found = {s for (s, _, _, _) in self.helper_sites}
missing = set(self._SETTERS) - found
assert not missing, (
f"Helper parse failed for: {sorted(missing)}. "
f"Either a setter was renamed (update _SETTERS), moved out of "
f"ADAR1000_Manager.cpp (extend scan scope), or the register-"
f"access form changed beyond `REG_CHn_XXX + <expr>`. "
f"DO NOT weaken this test without reviewing issue #90."
)
def test_helper_parsed_stride_matches_datasheet(self):
"""Parsed helper strides must match the datasheet register spacing."""
for setter, base, offset_expr, stride in self.helper_sites:
expected = self._EXPECTED_STRIDE.get(base)
assert expected is not None, (
f"{setter} writes to unrecognised base `{base}`. "
f"If ADI added a new channel-indexed register block, "
f"extend _EXPECTED_STRIDE with its datasheet stride."
)
assert stride == expected, (
f"{setter} helper uses stride {stride} for `{base}` "
f"(`{offset_expr} * {stride}`), datasheet says {expected}. "
f"Writes will overlap or skip channels."
)
# ---------- Tier C: round-trip to physical register ---------------
def test_all_callers_pass_one_based_channel(self):
"""
INVARIANT: every caller's channel argument must, for ch in
{0,1,2,3}, evaluate to a 1-based ADI channel index in {1,2,3,4}.
The bug fixed in #90 was that helpers used ``channel & 0x03``
directly, so a caller passing bare ``ch`` (0..3) appeared to
work for ch=0..2 and silently aliased ch=3 onto CH4-then-CH1.
After the fix, helpers do ``(channel - 1) & 0x03`` and reject
``channel < 1 || channel > 4``. A future caller written as
``adarSetRxPhase(dev, ch, ...)`` (bare 0-based) or
``adarSetRxPhase(dev, 0, ...)`` (literal 0) would silently be
dropped by the bounds-check at runtime; this test catches it at
CI time instead.
The check intentionally lives one tier above the round-trip test
so the failure message points the reader at the API contract
(1-based per ADI datasheet & ADAR1000_AGC.cpp:76) rather than at
a register-arithmetic mismatch.
"""
offenders = []
for setter in self._SETTERS:
callers = _extract_adar_caller_sites(self.sources, setter)
for filename, line_no, ch_expr in callers:
for ch in range(4):
try:
channel_val = _safe_eval_int_expr(ch_expr, ch=ch)
except (NameError, KeyError, ValueError) as e:
offenders.append(
f" - {filename}:{line_no} {setter}("
f"…, `{ch_expr}`, …) -- ch={ch}: "
f"unparseable ({e})"
)
continue
if channel_val not in (1, 2, 3, 4):
offenders.append(
f" - {filename}:{line_no} {setter}("
f"…, `{ch_expr}`, …) -- ch={ch}: "
f"channel={channel_val}, expected 1..4"
)
assert not offenders, (
"ADAR1000 1-based channel API contract violated. The fix "
"for issue #90 requires every caller to pass channel in "
"{1,2,3,4} (CH1..CH4 per ADI datasheet). Bare 0-based ch "
"or a literal 0 will be silently dropped by the helper's "
"bounds check. Offenders:\n" + "\n".join(offenders)
)
@pytest.mark.parametrize(
"setter",
[
"adarSetRxPhase",
"adarSetTxPhase",
"adarSetRxVgaGain",
"adarSetTxVgaGain",
],
)
def test_round_trip_lands_on_intended_physical_channel(self, setter):
"""
INVARIANT: for every caller of ``<setter>`` and every logical ch
in {0,1,2,3}, the effective register address equals
REG_CH{ch+1}_*. Catches #90 regardless of fix direction.
"""
callers = _extract_adar_caller_sites(self.sources, setter)
assert callers, (
f"No callers of `{setter}` found. Either the test scope is "
f"incomplete (extend `setup_class.sources`) or the symbol was "
f"inlined/removed. A blind test is a dangerous test — "
f"investigate before weakening."
)
helpers = [
(b, e, s) for (nm, b, e, s) in self.helper_sites if nm == setter
]
assert helpers, f"helper body for `{setter}` not parseable"
errors = []
for filename, line_no, ch_expr in callers:
for ch in range(4):
try:
channel_val = _safe_eval_int_expr(ch_expr, ch=ch)
except (NameError, KeyError, ValueError) as e:
pytest.fail(
f"{filename}:{line_no}: caller channel expression "
f"`{ch_expr}` uses symbol outside {{ch}} or a "
f"disallowed operator ({e}). Extend "
f"_safe_eval_int_expr variables or rewrite the "
f"call site with a supported expression."
)
for base_sym, offset_expr, stride in helpers:
try:
offset = _safe_eval_int_expr(
offset_expr, channel=channel_val,
)
except (NameError, KeyError, ValueError) as e:
pytest.fail(
f"helper `{setter}` offset expr "
f"`{offset_expr}` uses symbol outside "
f"{{channel}} or a disallowed operator ({e}). "
f"Extend _safe_eval_int_expr variables if new "
f"driver state is introduced."
)
final = self.reg_map[base_sym] + offset * stride
expected_sym = base_sym.replace("CH1", f"CH{ch + 1}")
expected = self.reg_map[expected_sym]
if final != expected:
errors.append(
f" - {filename}:{line_no} {setter} "
f"caller `{ch_expr}` | ch={ch} -> "
f"channel={channel_val} -> "
f"`{base_sym} + ({offset_expr})"
f"{' * ' + str(stride) if stride != 1 else ''}`"
f" = 0x{final:03X} "
f"(expected {expected_sym} = 0x{expected:03X})"
)
assert not errors, (
f"ADAR1000 channel round-trip FAILED for {setter} "
f"({len(errors)} mismatches) — writes routed to wrong physical "
f"channel. This is issue #90.\n" + "\n".join(errors)
)
class TestTier1DataPacketLayout: class TestTier1DataPacketLayout:
"""Verify data packet byte layout matches between Python and Verilog.""" """Verify data packet byte layout matches between Python and Verilog."""