diff --git a/9_Firmware/9_1_Microcontroller/9_1_1_C_Cpp_Libraries/ADAR1000_Manager.cpp b/9_Firmware/9_1_Microcontroller/9_1_1_C_Cpp_Libraries/ADAR1000_Manager.cpp index e8a49fc..0e7c906 100644 --- a/9_Firmware/9_1_Microcontroller/9_1_1_C_Cpp_Libraries/ADAR1000_Manager.cpp +++ b/9_Firmware/9_1_Microcontroller/9_1_1_C_Cpp_Libraries/ADAR1000_Manager.cpp @@ -868,11 +868,22 @@ void ADAR1000Manager::adarSetRamBypass(uint8_t deviceIndex, uint8_t broadcast) { } void ADAR1000Manager::adarSetRxPhase(uint8_t deviceIndex, uint8_t channel, uint8_t phase, uint8_t broadcast) { + // channel is 1-based (CH1..CH4) per API contract documented in + // ADAR1000_AGC.cpp and matching ADI datasheet terminology. + // Reject out-of-range early so a stale 0-based caller does not + // silently wrap to ((0-1) & 0x03) == 3 and write to CH4. + // See issue #90. + if (channel < 1 || channel > 4) { + DIAG("BF", "adarSetRxPhase: channel %u out of range [1..4], ignored", channel); + return; + } uint8_t i_val = VM_I[phase % 128]; uint8_t q_val = VM_Q[phase % 128]; - uint32_t mem_addr_i = REG_CH1_RX_PHS_I + (channel & 0x03) * 2; - uint32_t mem_addr_q = REG_CH1_RX_PHS_Q + (channel & 0x03) * 2; + // Subtract 1 to convert 1-based channel to 0-based register offset + // before masking. See issue #90. + uint32_t mem_addr_i = REG_CH1_RX_PHS_I + ((channel - 1) & 0x03) * 2; + uint32_t mem_addr_q = REG_CH1_RX_PHS_Q + ((channel - 1) & 0x03) * 2; adarWrite(deviceIndex, mem_addr_i, i_val, broadcast); adarWrite(deviceIndex, mem_addr_q, q_val, broadcast); @@ -880,11 +891,16 @@ void ADAR1000Manager::adarSetRxPhase(uint8_t deviceIndex, uint8_t channel, uint8 } void ADAR1000Manager::adarSetTxPhase(uint8_t deviceIndex, uint8_t channel, uint8_t phase, uint8_t broadcast) { + // channel is 1-based (CH1..CH4). See issue #90. + if (channel < 1 || channel > 4) { + DIAG("BF", "adarSetTxPhase: channel %u out of range [1..4], ignored", channel); + return; + } uint8_t i_val = VM_I[phase % 128]; uint8_t q_val = VM_Q[phase % 128]; - uint32_t mem_addr_i = REG_CH1_TX_PHS_I + (channel & 0x03) * 2; - uint32_t mem_addr_q = REG_CH1_TX_PHS_Q + (channel & 0x03) * 2; + uint32_t mem_addr_i = REG_CH1_TX_PHS_I + ((channel - 1) & 0x03) * 2; + uint32_t mem_addr_q = REG_CH1_TX_PHS_Q + ((channel - 1) & 0x03) * 2; adarWrite(deviceIndex, mem_addr_i, i_val, broadcast); adarWrite(deviceIndex, mem_addr_q, q_val, broadcast); @@ -892,13 +908,23 @@ void ADAR1000Manager::adarSetTxPhase(uint8_t deviceIndex, uint8_t channel, uint8 } void ADAR1000Manager::adarSetRxVgaGain(uint8_t deviceIndex, uint8_t channel, uint8_t gain, uint8_t broadcast) { - uint32_t mem_addr = REG_CH1_RX_GAIN + (channel & 0x03); + // channel is 1-based (CH1..CH4). See issue #90. + if (channel < 1 || channel > 4) { + DIAG("BF", "adarSetRxVgaGain: channel %u out of range [1..4], ignored", channel); + return; + } + uint32_t mem_addr = REG_CH1_RX_GAIN + ((channel - 1) & 0x03); adarWrite(deviceIndex, mem_addr, gain, broadcast); adarWrite(deviceIndex, REG_LOAD_WORKING, 0x1, broadcast); } void ADAR1000Manager::adarSetTxVgaGain(uint8_t deviceIndex, uint8_t channel, uint8_t gain, uint8_t broadcast) { - uint32_t mem_addr = REG_CH1_TX_GAIN + (channel & 0x03); + // channel is 1-based (CH1..CH4). See issue #90. + if (channel < 1 || channel > 4) { + DIAG("BF", "adarSetTxVgaGain: channel %u out of range [1..4], ignored", channel); + return; + } + uint32_t mem_addr = REG_CH1_TX_GAIN + ((channel - 1) & 0x03); adarWrite(deviceIndex, mem_addr, gain, broadcast); adarWrite(deviceIndex, REG_LOAD_WORKING, LD_WRK_REGS_LDTX_OVERRIDE, broadcast); } diff --git a/9_Firmware/tests/cross_layer/test_cross_layer_contract.py b/9_Firmware/tests/cross_layer/test_cross_layer_contract.py index 53b1554..d73c282 100644 --- a/9_Firmware/tests/cross_layer/test_cross_layer_contract.py +++ b/9_Firmware/tests/cross_layer/test_cross_layer_contract.py @@ -26,12 +26,14 @@ layers agree (because both could be wrong). from __future__ import annotations +import ast import os import re import struct import subprocess import tempfile from pathlib import Path +from typing import ClassVar import pytest @@ -625,6 +627,420 @@ class TestTier1AgcCrossLayerInvariant: ) +# =================================================================== +# ADAR1000 channel→register round-trip invariant (issue #90) +# =================================================================== +# +# Ground-truth invariant crossing three system layers: +# Chip (datasheet) -> Driver (MCU helpers) -> Application (callers). +# +# For every logical element ch in {0,1,2,3} (hardware channels CH1..CH4), +# the round-trip +# caller_expr(ch) --> helper_offset(channel) * stride --> base + off +# must land on the physical register REG_CH{ch+1}_* defined in the ADI +# ADAR1000 register map parsed from ADAR1000_Manager.h. +# +# Catches: +# * #90 channel rotation regardless of which side is fixed (caller OR helper). +# * Wrong stride (e.g. phase written with stride 1 instead of 2). +# * Bad mask (e.g. `channel & 0x07`, `channel & 0x01`). +# * Wrong base register in a helper. +# * New setter added with mismatched convention. +# * Caller moved to a file the test no longer scans (fails loudly). +# +# Cannot be defeated by: +# * Renaming/refactoring helper layout: the setter coverage test +# (`test_helper_sites_exist_for_all_setters`) catches missing parse. +# * Changing 0x03 to 3 or adding a named constant: the offset is +# evaluated symbolically via AST, not matched by regex. + + +def _parse_adar_register_map(header_text): + """Extract `#define REG_CHn_(RX|TX)_(GAIN|PHS_I|PHS_Q)` values.""" + regs = {} + for m in re.finditer( + r"^#define\s+(REG_CH[1-4]_(?:RX|TX)_(?:GAIN|PHS_I|PHS_Q))\s+(0x[0-9A-Fa-f]+)", + header_text, + re.MULTILINE, + ): + regs[m.group(1)] = int(m.group(2), 16) + return regs + + +def _safe_eval_int_expr(expr, **variables): + """ + Evaluate a small integer expression with +, -, *, &, |, ^, ~, <<, >>. + Python's & / | / ^ / ~ / << / >> have the same semantics as C for the + operand widths we care about here (uint8_t after the mask makes the + result fit in 0..3). No floating point, no function calls, no names + outside ``variables``. + + SECURITY: ``expr`` MUST come from a trusted source -- specifically, + C/C++ source text under version control in this repository (e.g. + arguments parsed out of ``main.cpp``/``ADAR1000_AGC.cpp``). Although + the AST whitelist below rejects function calls, attribute access, + subscripts, and any name not in ``variables``, ``eval`` is still + invoked on the compiled tree. Do NOT pass user-supplied / network / + GUI input here. + """ + tree = ast.parse(expr, mode="eval") + allowed = ( + ast.Expression, ast.BinOp, ast.UnaryOp, ast.Constant, + ast.Name, ast.Load, + ast.Add, ast.Sub, ast.Mult, ast.Mod, ast.FloorDiv, + ast.BitAnd, ast.BitOr, ast.BitXor, + ast.USub, ast.UAdd, ast.Invert, + ast.LShift, ast.RShift, + ) + for node in ast.walk(tree): + if not isinstance(node, allowed): + raise ValueError( + f"disallowed AST node {type(node).__name__!s} in `{expr}`" + ) + return eval( + compile(tree, "", "eval"), + {"__builtins__": {}}, + variables, + ) + + +def _extract_adar_helper_sites(manager_cpp, setter_names): + """ + For each setter, locate the body of ``void ADAR1000Manager::`` + and return a list of (setter, base_register, offset_expr_c, stride) + for every ``REG_CHn_XXX + `` memory-address assignment. + """ + sites = [] + for setter in setter_names: + m = re.search( + rf"void\s+ADAR1000Manager::{setter}\s*\([^)]*\)\s*\{{(.+?)^\}}", + manager_cpp, + re.MULTILINE | re.DOTALL, + ) + if not m: + continue + body = m.group(1) + for access in re.finditer( + r"=\s*(REG_CH[1-4]_(?:RX|TX)_(?:GAIN|PHS_I|PHS_Q))\s*\+\s*([^;]+);", + body, + ): + base = access.group(1) + rhs = access.group(2).strip() + # Trailing `* ` = stride multiplier (2 for phase I/Q). + stride_match = re.match(r"(.+?)\s*\*\s*(\d+)\s*$", rhs) + if stride_match: + offset_expr = stride_match.group(1).strip() + stride = int(stride_match.group(2)) + else: + offset_expr = rhs + stride = 1 + sites.append((setter, base, offset_expr, stride)) + return sites + + +# Method-definition line pattern: `[qualifier...] ::(` +# Covers: plain `void X::f(`, `inline void X::f(`, `static bool X::f(`, etc. +_DEFN_RE = re.compile( + r"^\s*(?:inline\s+|static\s+|virtual\s+|constexpr\s+|explicit\s+)*" + r"(?:void|bool|uint\w+|int\w*|auto)\s+\S+::\w+\s*\(" +) + + +def _extract_adar_caller_sites(sources, setter): + """ + Find every call ``.(dev, , ...)`` across + ``sources = [(filename, text), ...]``. Returns (filename, line_no, + channel_expr) for each. Skips function declarations/definitions. + + Arg list up to matching `)`: restricted to a single line. All existing + call sites fit on one line; a future multi-line refactor would drop + callers from the scan, which the round-trip test surfaces loudly via + `assert callers` (rather than silently missing a site). + """ + out = [] + call_re = re.compile(rf"\b{setter}\s*\(([^;]*?)\)\s*;") + for filename, text in sources: + for line_no, line in enumerate(text.splitlines(), start=1): + # Skip method definition / declaration lines. + if _DEFN_RE.match(line): + continue + cm = call_re.search(line) + if not cm: + continue + args = _split_top_level_commas(cm.group(1)) + if len(args) < 2: + continue + channel_expr = args[1].strip() + out.append((filename, line_no, channel_expr)) + return out + + +def _split_top_level_commas(text): + """Split on commas that sit at paren-depth 0 (ignores nested calls).""" + parts, depth, cur = [], 0, [] + for ch in text: + if ch == "(": + depth += 1 + cur.append(ch) + elif ch == ")": + depth -= 1 + cur.append(ch) + elif ch == "," and depth == 0: + parts.append("".join(cur)) + cur = [] + else: + cur.append(ch) + if cur: + parts.append("".join(cur)) + return parts + + +class TestTier1Adar1000ChannelRegisterRoundTrip: + """ + Cross-layer round-trip: caller channel expr -> helper offset formula + -> physical register address must equal REG_CH{ch+1}_* for every + caller and every ch in {0,1,2,3}. + + See module-level block comment above and upstream issue #90. + """ + + _SETTERS = ( + "adarSetRxPhase", + "adarSetTxPhase", + "adarSetRxVgaGain", + "adarSetTxVgaGain", + ) + + # Register base -> stride override. Parsed values of stride are + # trusted; this table is the independent ground truth for cross-check. + _EXPECTED_STRIDE: ClassVar[dict[str, int]] = { + "REG_CH1_RX_GAIN": 1, + "REG_CH1_TX_GAIN": 1, + "REG_CH1_RX_PHS_I": 2, + "REG_CH1_RX_PHS_Q": 2, + "REG_CH1_TX_PHS_I": 2, + "REG_CH1_TX_PHS_Q": 2, + } + + @classmethod + def setup_class(cls): + cls.header_txt = (cp.MCU_LIB_DIR / "ADAR1000_Manager.h").read_text() + cls.manager_txt = (cp.MCU_LIB_DIR / "ADAR1000_Manager.cpp").read_text() + cls.reg_map = _parse_adar_register_map(cls.header_txt) + cls.helper_sites = _extract_adar_helper_sites( + cls.manager_txt, cls._SETTERS, + ) + # Auto-discover every C++ TU under the MCU tree so a new caller + # added to e.g. a future ``ADAR1000_Calibration.cpp`` cannot + # silently escape the round-trip check (issue #90 reviewer note). + # Exclude any path containing a ``tests`` segment so this test + # does not parse its own fixtures. The resulting list is + # deterministic (sorted) for reproducible parametrization. + scanned = [] + seen = set() + for root in (cp.MCU_LIB_DIR, cp.MCU_CODE_DIR): + for path in sorted(root.rglob("*.cpp")): + if "tests" in path.parts: + continue + if path in seen: + continue + seen.add(path) + scanned.append((path.name, path.read_text())) + cls.sources = scanned + # Sanity: the two TUs known to call ADAR1000 setters at the time + # of issue #90 must be in scope. If a future refactor renames or + # moves them this assert fires loudly rather than silently + # passing an empty round-trip. + scanned_names = {n for (n, _) in scanned} + for required in ("ADAR1000_AGC.cpp", "main.cpp", "ADAR1000_Manager.cpp"): + assert required in scanned_names, ( + f"Auto-discovery missed `{required}`; check MCU_LIB_DIR / " + f"MCU_CODE_DIR roots in contract_parser.py." + ) + + # ---------- Tier A: chip ground truth ---------------------------- + + def test_register_map_gain_stride_is_one_per_channel(self): + """Datasheet invariant: RX/TX VGA gain registers are 1 byte apart.""" + for kind in ("RX_GAIN", "TX_GAIN"): + for n in range(1, 4): + delta = ( + self.reg_map[f"REG_CH{n+1}_{kind}"] + - self.reg_map[f"REG_CH{n}_{kind}"] + ) + assert delta == 1, ( + f"ADAR1000 register map invariant broken: " + f"REG_CH{n+1}_{kind} - REG_CH{n}_{kind} = {delta}, " + f"datasheet says 1. Either the header was mis-edited " + f"or ADI released a part with a different map." + ) + + def test_register_map_phase_stride_is_two_per_channel(self): + """Datasheet invariant: phase I/Q pairs occupy 2 bytes per channel.""" + for kind in ("RX_PHS_I", "RX_PHS_Q", "TX_PHS_I", "TX_PHS_Q"): + for n in range(1, 4): + delta = ( + self.reg_map[f"REG_CH{n+1}_{kind}"] + - self.reg_map[f"REG_CH{n}_{kind}"] + ) + assert delta == 2, ( + f"ADAR1000 register map invariant broken: " + f"REG_CH{n+1}_{kind} - REG_CH{n}_{kind} = {delta}, " + f"datasheet says 2." + ) + + # ---------- Tier B: driver parses cleanly ------------------------- + + def test_helper_sites_exist_for_all_setters(self): + """Every channel-indexed setter must parse at least one register access.""" + found = {s for (s, _, _, _) in self.helper_sites} + missing = set(self._SETTERS) - found + assert not missing, ( + f"Helper parse failed for: {sorted(missing)}. " + f"Either a setter was renamed (update _SETTERS), moved out of " + f"ADAR1000_Manager.cpp (extend scan scope), or the register-" + f"access form changed beyond `REG_CHn_XXX + `. " + f"DO NOT weaken this test without reviewing issue #90." + ) + + def test_helper_parsed_stride_matches_datasheet(self): + """Parsed helper strides must match the datasheet register spacing.""" + for setter, base, offset_expr, stride in self.helper_sites: + expected = self._EXPECTED_STRIDE.get(base) + assert expected is not None, ( + f"{setter} writes to unrecognised base `{base}`. " + f"If ADI added a new channel-indexed register block, " + f"extend _EXPECTED_STRIDE with its datasheet stride." + ) + assert stride == expected, ( + f"{setter} helper uses stride {stride} for `{base}` " + f"(`{offset_expr} * {stride}`), datasheet says {expected}. " + f"Writes will overlap or skip channels." + ) + + # ---------- Tier C: round-trip to physical register --------------- + + def test_all_callers_pass_one_based_channel(self): + """ + INVARIANT: every caller's channel argument must, for ch in + {0,1,2,3}, evaluate to a 1-based ADI channel index in {1,2,3,4}. + + The bug fixed in #90 was that helpers used ``channel & 0x03`` + directly, so a caller passing bare ``ch`` (0..3) appeared to + work for ch=0..2 and silently aliased ch=3 onto CH4-then-CH1. + After the fix, helpers do ``(channel - 1) & 0x03`` and reject + ``channel < 1 || channel > 4``. A future caller written as + ``adarSetRxPhase(dev, ch, ...)`` (bare 0-based) or + ``adarSetRxPhase(dev, 0, ...)`` (literal 0) would silently be + dropped by the bounds-check at runtime; this test catches it at + CI time instead. + + The check intentionally lives one tier above the round-trip test + so the failure message points the reader at the API contract + (1-based per ADI datasheet & ADAR1000_AGC.cpp:76) rather than at + a register-arithmetic mismatch. + """ + offenders = [] + for setter in self._SETTERS: + callers = _extract_adar_caller_sites(self.sources, setter) + for filename, line_no, ch_expr in callers: + for ch in range(4): + try: + channel_val = _safe_eval_int_expr(ch_expr, ch=ch) + except (NameError, KeyError, ValueError) as e: + offenders.append( + f" - {filename}:{line_no} {setter}(" + f"…, `{ch_expr}`, …) -- ch={ch}: " + f"unparseable ({e})" + ) + continue + if channel_val not in (1, 2, 3, 4): + offenders.append( + f" - {filename}:{line_no} {setter}(" + f"…, `{ch_expr}`, …) -- ch={ch}: " + f"channel={channel_val}, expected 1..4" + ) + assert not offenders, ( + "ADAR1000 1-based channel API contract violated. The fix " + "for issue #90 requires every caller to pass channel in " + "{1,2,3,4} (CH1..CH4 per ADI datasheet). Bare 0-based ch " + "or a literal 0 will be silently dropped by the helper's " + "bounds check. Offenders:\n" + "\n".join(offenders) + ) + + @pytest.mark.parametrize( + "setter", + [ + "adarSetRxPhase", + "adarSetTxPhase", + "adarSetRxVgaGain", + "adarSetTxVgaGain", + ], + ) + def test_round_trip_lands_on_intended_physical_channel(self, setter): + """ + INVARIANT: for every caller of ```` and every logical ch + in {0,1,2,3}, the effective register address equals + REG_CH{ch+1}_*. Catches #90 regardless of fix direction. + """ + callers = _extract_adar_caller_sites(self.sources, setter) + assert callers, ( + f"No callers of `{setter}` found. Either the test scope is " + f"incomplete (extend `setup_class.sources`) or the symbol was " + f"inlined/removed. A blind test is a dangerous test — " + f"investigate before weakening." + ) + helpers = [ + (b, e, s) for (nm, b, e, s) in self.helper_sites if nm == setter + ] + assert helpers, f"helper body for `{setter}` not parseable" + + errors = [] + for filename, line_no, ch_expr in callers: + for ch in range(4): + try: + channel_val = _safe_eval_int_expr(ch_expr, ch=ch) + except (NameError, KeyError, ValueError) as e: + pytest.fail( + f"{filename}:{line_no}: caller channel expression " + f"`{ch_expr}` uses symbol outside {{ch}} or a " + f"disallowed operator ({e}). Extend " + f"_safe_eval_int_expr variables or rewrite the " + f"call site with a supported expression." + ) + for base_sym, offset_expr, stride in helpers: + try: + offset = _safe_eval_int_expr( + offset_expr, channel=channel_val, + ) + except (NameError, KeyError, ValueError) as e: + pytest.fail( + f"helper `{setter}` offset expr " + f"`{offset_expr}` uses symbol outside " + f"{{channel}} or a disallowed operator ({e}). " + f"Extend _safe_eval_int_expr variables if new " + f"driver state is introduced." + ) + final = self.reg_map[base_sym] + offset * stride + expected_sym = base_sym.replace("CH1", f"CH{ch + 1}") + expected = self.reg_map[expected_sym] + if final != expected: + errors.append( + f" - {filename}:{line_no} {setter} " + f"caller `{ch_expr}` | ch={ch} -> " + f"channel={channel_val} -> " + f"`{base_sym} + ({offset_expr})" + f"{' * ' + str(stride) if stride != 1 else ''}`" + f" = 0x{final:03X} " + f"(expected {expected_sym} = 0x{expected:03X})" + ) + assert not errors, ( + f"ADAR1000 channel round-trip FAILED for {setter} " + f"({len(errors)} mismatches) — writes routed to wrong physical " + f"channel. This is issue #90.\n" + "\n".join(errors) + ) + + class TestTier1DataPacketLayout: """Verify data packet byte layout matches between Python and Verilog."""