From 0537b40dcc4fbc00729064738bf97d3c40afb85b Mon Sep 17 00:00:00 2001 From: Jason <83615043+JJassonn69@users.noreply.github.com> Date: Sun, 12 Apr 2026 16:04:59 +0545 Subject: [PATCH] feat: add cross-layer contract tests (Python/Verilog/C) with CI job Three-tier test orchestrator validates opcode maps, bit widths, packet layouts, and round-trip correctness across FPGA RTL, Python GUI, and STM32 firmware. Catches 3 real bugs: - status_words[0] 37-bit truncation in both USB interfaces - Python radar_mode readback at wrong bit position (bit 21 vs 24) - RadarSettings.cpp buffer overread (min check 74 vs required 82) 29 tests: 24 pass, 5 xfail (documenting confirmed bugs). 4th CI job added: cross-layer-tests (Python + iverilog + cc). --- .github/workflows/ci-tests.yml | 30 + 9_Firmware/tests/cross_layer/.gitignore | 12 + .../tests/cross_layer/contract_parser.py | 793 +++++++++++++++++ .../tests/cross_layer/stm32_settings_stub.cpp | 86 ++ .../cross_layer/tb_cross_layer_ft2232h.v | 667 ++++++++++++++ .../cross_layer/test_cross_layer_contract.py | 842 ++++++++++++++++++ 6 files changed, 2430 insertions(+) create mode 100644 9_Firmware/tests/cross_layer/.gitignore create mode 100644 9_Firmware/tests/cross_layer/contract_parser.py create mode 100644 9_Firmware/tests/cross_layer/stm32_settings_stub.cpp create mode 100644 9_Firmware/tests/cross_layer/tb_cross_layer_ft2232h.v create mode 100644 9_Firmware/tests/cross_layer/test_cross_layer_contract.py diff --git a/.github/workflows/ci-tests.yml b/.github/workflows/ci-tests.yml index 46d7396..7b39a9e 100644 --- a/.github/workflows/ci-tests.yml +++ b/.github/workflows/ci-tests.yml @@ -82,3 +82,33 @@ jobs: - name: Run full FPGA regression run: bash run_regression.sh working-directory: 9_Firmware/9_2_FPGA + + # =========================================================================== + # Cross-Layer Contract Tests (Python ↔ Verilog ↔ C) + # Validates opcode maps, bit widths, packet layouts, and round-trip + # correctness across FPGA RTL, Python GUI, and STM32 firmware. + # =========================================================================== + cross-layer-tests: + name: Cross-Layer Contract Tests + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v4 + + - uses: actions/setup-python@v5 + with: + python-version: "3.12" + + - uses: astral-sh/setup-uv@v5 + + - name: Install dependencies + run: uv sync --group dev + + - name: Install Icarus Verilog + run: sudo apt-get update && sudo apt-get install -y iverilog + + - name: Run cross-layer contract tests + run: > + uv run pytest + 9_Firmware/tests/cross_layer/test_cross_layer_contract.py + -v --tb=short diff --git a/9_Firmware/tests/cross_layer/.gitignore b/9_Firmware/tests/cross_layer/.gitignore new file mode 100644 index 0000000..3dbdaec --- /dev/null +++ b/9_Firmware/tests/cross_layer/.gitignore @@ -0,0 +1,12 @@ +# Simulation outputs (generated by iverilog TB) +cmd_results.txt +data_packet.txt +status_packet.txt +*.vcd +*.vvp + +# Compiled C stub +stm32_stub + +# Python +__pycache__/ diff --git a/9_Firmware/tests/cross_layer/contract_parser.py b/9_Firmware/tests/cross_layer/contract_parser.py new file mode 100644 index 0000000..44e9bb3 --- /dev/null +++ b/9_Firmware/tests/cross_layer/contract_parser.py @@ -0,0 +1,793 @@ +""" +Cross-layer contract parsers. + +Extracts interface contracts (opcodes, bit widths, reset defaults, packet +layouts) directly from the source files of each layer: + - Python GUI: radar_protocol.py + - FPGA RTL: radar_system_top.v, usb_data_interface_ft2232h.v, + usb_data_interface.v + - STM32 MCU: RadarSettings.cpp, main.cpp + +These parsers do NOT define the expected values — they discover what each +layer actually implements, so the test can compare layers against ground +truth and find bugs where both sides are wrong (like the 0x06 phantom +opcode or the status_words[0] 37-bit truncation). +""" + +from __future__ import annotations + +import re +from dataclasses import dataclass, field +from pathlib import Path + +# --------------------------------------------------------------------------- +# Repository layout (relative to repo root) +# --------------------------------------------------------------------------- +REPO_ROOT = Path(__file__).resolve().parents[3] +GUI_DIR = REPO_ROOT / "9_Firmware" / "9_3_GUI" +FPGA_DIR = REPO_ROOT / "9_Firmware" / "9_2_FPGA" +MCU_DIR = REPO_ROOT / "9_Firmware" / "9_1_Microcontroller" +MCU_LIB_DIR = MCU_DIR / "9_1_1_C_Cpp_Libraries" +MCU_CODE_DIR = MCU_DIR / "9_1_3_C_Cpp_Code" +XDC_DIR = FPGA_DIR / "constraints" + + +# =================================================================== +# Data structures +# =================================================================== + +@dataclass +class OpcodeEntry: + """One opcode as declared in a single layer.""" + name: str + value: int + register: str = "" # Verilog register name it writes to + bit_slice: str = "" # e.g. "[3:0]", "[15:0]", "[0]" + bit_width: int = 0 # derived from bit_slice + reset_default: int | None = None + is_pulse: bool = False # True for trigger/request opcodes + + +@dataclass +class StatusWordField: + """One field inside a status_words[] entry.""" + name: str + word_index: int + msb: int # bit position in the 32-bit word (0-indexed from LSB) + lsb: int + width: int + + +@dataclass +class DataPacketField: + """One field in the 11-byte data packet.""" + name: str + byte_start: int # first byte index (0 = header) + byte_end: int # last byte index (inclusive) + width_bits: int + + +@dataclass +class PacketConstants: + """Header/footer/size constants for a packet type.""" + header: int + footer: int + size: int + + +@dataclass +class SettingsField: + """One field in the STM32 SET...END settings packet.""" + name: str + offset: int # byte offset from start of payload (after "SET") + size: int # bytes + c_type: str # "double" or "uint32_t" + + +@dataclass +class GpioPin: + """A GPIO pin with direction.""" + name: str + pin_id: str # e.g. "PD8", "H11" + direction: str # "output" or "input" + layer: str # "stm32" or "fpga" + + +@dataclass +class ConcatWidth: + """Result of counting bits in a Verilog concatenation.""" + total_bits: int + target_bits: int # width of the register being assigned to + fragments: list[tuple[str, int]] = field(default_factory=list) + truncated: bool = False + + +# =================================================================== +# Python layer parser +# =================================================================== + +def parse_python_opcodes(filepath: Path | None = None) -> dict[int, OpcodeEntry]: + """Parse the Opcode enum from radar_protocol.py. + Returns {opcode_value: OpcodeEntry}. + """ + if filepath is None: + filepath = GUI_DIR / "radar_protocol.py" + text = filepath.read_text() + + # Find the Opcode class body + match = re.search(r'class Opcode\b.*?(?=\nclass |\Z)', text, re.DOTALL) + if not match: + raise ValueError(f"Could not find 'class Opcode' in {filepath}") + + opcodes: dict[int, OpcodeEntry] = {} + for m in re.finditer(r'(\w+)\s*=\s*(0x[0-9a-fA-F]+)', match.group()): + name = m.group(1) + value = int(m.group(2), 16) + opcodes[value] = OpcodeEntry(name=name, value=value) + return opcodes + + +def parse_python_packet_constants(filepath: Path | None = None) -> dict[str, PacketConstants]: + """Extract HEADER_BYTE, FOOTER_BYTE, STATUS_HEADER_BYTE, packet sizes.""" + if filepath is None: + filepath = GUI_DIR / "radar_protocol.py" + text = filepath.read_text() + + def _find(pattern: str) -> int: + m = re.search(pattern, text) + if not m: + raise ValueError(f"Pattern not found: {pattern}") + val = m.group(1) + return int(val, 16) if val.startswith("0x") else int(val) + + header = _find(r'HEADER_BYTE\s*=\s*(0x[0-9a-fA-F]+|\d+)') + footer = _find(r'FOOTER_BYTE\s*=\s*(0x[0-9a-fA-F]+|\d+)') + status_header = _find(r'STATUS_HEADER_BYTE\s*=\s*(0x[0-9a-fA-F]+|\d+)') + data_size = _find(r'DATA_PACKET_SIZE\s*=\s*(\d+)') + status_size = _find(r'STATUS_PACKET_SIZE\s*=\s*(\d+)') + + return { + "data": PacketConstants(header=header, footer=footer, size=data_size), + "status": PacketConstants(header=status_header, footer=footer, size=status_size), + } + + +def parse_python_data_packet_fields(filepath: Path | None = None) -> list[DataPacketField]: + """ + Extract byte offsets from parse_data_packet() by finding struct.unpack_from calls. + Returns fields in byte order. + """ + if filepath is None: + filepath = GUI_DIR / "radar_protocol.py" + text = filepath.read_text() + + # Find parse_data_packet method body + match = re.search( + r'def parse_data_packet\(.*?\).*?(?=\n @|\n def |\nclass |\Z)', + text, re.DOTALL + ) + if not match: + raise ValueError("Could not find parse_data_packet()") + + body = match.group() + fields: list[DataPacketField] = [] + + # Match patterns like: range_q = _to_signed16(struct.unpack_from(">H", raw, 1)[0]) + for m in re.finditer( + r'(\w+)\s*=\s*_to_signed16\(struct\.unpack_from\("(>[HIBhib])", raw, (\d+)\)', + body + ): + name = m.group(1) + fmt = m.group(2) + offset = int(m.group(3)) + fmt_char = fmt[-1].upper() + size = {"H": 2, "I": 4, "B": 1}[fmt_char] + fields.append(DataPacketField( + name=name, byte_start=offset, + byte_end=offset + size - 1, + width_bits=size * 8 + )) + + # Match detection = raw[9] & 0x01 + for m in re.finditer(r'(\w+)\s*=\s*raw\[(\d+)\]\s*&\s*(0x[0-9a-fA-F]+|\d+)', body): + name = m.group(1) + offset = int(m.group(2)) + fields.append(DataPacketField( + name=name, byte_start=offset, byte_end=offset, width_bits=1 + )) + + fields.sort(key=lambda f: f.byte_start) + return fields + + +def parse_python_status_fields(filepath: Path | None = None) -> list[StatusWordField]: + """ + Extract bit shift/mask operations from parse_status_packet(). + Returns the fields with word index and bit positions as Python sees them. + """ + if filepath is None: + filepath = GUI_DIR / "radar_protocol.py" + text = filepath.read_text() + + match = re.search( + r'def parse_status_packet\(.*?\).*?(?=\n @|\n def |\nclass |\Z)', + text, re.DOTALL + ) + if not match: + raise ValueError("Could not find parse_status_packet()") + + body = match.group() + fields: list[StatusWordField] = [] + + # Pattern: sr.field = (words[N] >> S) & MASK # noqa: ERA001 + for m in re.finditer( + r'sr\.(\w+)\s*=\s*\(words\[(\d+)\]\s*>>\s*(\d+)\)\s*&\s*(0x[0-9a-fA-F]+|\d+)', + body + ): + name = m.group(1) + word_idx = int(m.group(2)) + shift = int(m.group(3)) + mask_str = m.group(4) + mask = int(mask_str, 16) if mask_str.startswith("0x") else int(mask_str) + width = mask.bit_length() + fields.append(StatusWordField( + name=name, word_index=word_idx, + msb=shift + width - 1, lsb=shift, width=width + )) + + # Pattern: sr.field = words[N] & MASK (no shift) + for m in re.finditer( + r'sr\.(\w+)\s*=\s*words\[(\d+)\]\s*&\s*(0x[0-9a-fA-F]+|\d+)', + body + ): + name = m.group(1) + word_idx = int(m.group(2)) + mask_str = m.group(3) + mask = int(mask_str, 16) if mask_str.startswith("0x") else int(mask_str) + width = mask.bit_length() + # Skip if already captured by the shift pattern + if not any(f.name == name and f.word_index == word_idx for f in fields): + fields.append(StatusWordField( + name=name, word_index=word_idx, + msb=width - 1, lsb=0, width=width + )) + + return fields + + +# =================================================================== +# Verilog layer parser +# =================================================================== + +def _parse_bit_slice(s: str) -> int: + """Parse '[15:0]' -> 16, '[0]' -> 1, '' -> 16 (full cmd_value).""" + m = re.match(r'\[(\d+):(\d+)\]', s) + if m: + return int(m.group(1)) - int(m.group(2)) + 1 + m = re.match(r'\[(\d+)\]', s) + if m: + return 1 + return 16 # default: full 16-bit cmd_value + + +def parse_verilog_opcodes(filepath: Path | None = None) -> dict[int, OpcodeEntry]: + """ + Parse the opcode case statement from radar_system_top.v. + Returns {opcode_value: OpcodeEntry}. + """ + if filepath is None: + filepath = FPGA_DIR / "radar_system_top.v" + text = filepath.read_text() + + # Find the command decode case block + # Pattern: case statement with 8'hXX opcodes + opcodes: dict[int, OpcodeEntry] = {} + + # Pattern 1: Simple assignment — 8'hXX: register <= rhs; + for m in re.finditer( + r"8'h([0-9a-fA-F]{2})\s*:\s*(\w+)\s*<=\s*(.*?)(?:;|$)", + text, re.MULTILINE + ): + value = int(m.group(1), 16) + register = m.group(2) + rhs = m.group(3).strip() + + # Determine if it's a pulse (assigned literal 1) + is_pulse = rhs in ("1", "1'b1") + + # Extract bit slice from the RHS (e.g., usb_cmd_value[3:0]) + bit_slice = "" + slice_m = re.search(r'usb_cmd_value(\[\d+(?::\d+)?\])', rhs) + if slice_m: + bit_slice = slice_m.group(1) + elif "usb_cmd_value" in rhs: + bit_slice = "[15:0]" # full width + + bit_width = _parse_bit_slice(bit_slice) if bit_slice else 0 + + opcodes[value] = OpcodeEntry( + name=register, + value=value, + register=register, + bit_slice=bit_slice, + bit_width=bit_width, + is_pulse=is_pulse, + ) + + # Pattern 2: begin...end blocks — 8'hXX: begin ... register <= ... end + # These are used for opcodes with validation logic (e.g., 0x15 clamp) + for m in re.finditer( + r"8'h([0-9a-fA-F]{2})\s*:\s*begin\b(.*?)end\b", + text, re.DOTALL + ): + value = int(m.group(1), 16) + if value in opcodes: + continue # Already captured by pattern 1 + body = m.group(2) + + # Find the first register assignment (host_xxx <=) + assign_m = re.search(r'(host_\w+)\s*<=\s*(.+?);', body) + if not assign_m: + continue + + register = assign_m.group(1) + rhs = assign_m.group(2).strip() + + bit_slice = "" + slice_m = re.search(r'usb_cmd_value(\[\d+(?::\d+)?\])', body) + if slice_m: + bit_slice = slice_m.group(1) + elif "usb_cmd_value" in body: + bit_slice = "[15:0]" + + bit_width = _parse_bit_slice(bit_slice) if bit_slice else 0 + + opcodes[value] = OpcodeEntry( + name=register, + value=value, + register=register, + bit_slice=bit_slice, + bit_width=bit_width, + is_pulse=False, + ) + + return opcodes + + +def parse_verilog_reset_defaults(filepath: Path | None = None) -> dict[str, int]: + """ + Parse the reset block from radar_system_top.v. + Returns {register_name: reset_value}. + """ + if filepath is None: + filepath = FPGA_DIR / "radar_system_top.v" + text = filepath.read_text() + + defaults: dict[str, int] = {} + + # Match patterns like: host_radar_mode <= 2'b01; + # Also: host_detect_threshold <= 16'd10000; + for m in re.finditer( + r'(host_\w+)\s*<=\s*(\d+\'[bdho][0-9a-fA-F_]+|\d+)\s*;', + text + ): + reg = m.group(1) + val_str = m.group(2) + + # Parse Verilog literal + if "'" in val_str: + base_char = val_str.split("'")[1][0].lower() + digits = val_str.split("'")[1][1:].replace("_", "") + base = {"b": 2, "d": 10, "h": 16, "o": 8}[base_char] + value = int(digits, base) + else: + value = int(val_str) + + # Only keep first occurrence (the reset block comes before the + # opcode decode which also has <= assignments) + if reg not in defaults: + defaults[reg] = value + + return defaults + + +def parse_verilog_register_widths(filepath: Path | None = None) -> dict[str, int]: + """ + Parse register declarations from radar_system_top.v. + Returns {register_name: bit_width}. + """ + if filepath is None: + filepath = FPGA_DIR / "radar_system_top.v" + text = filepath.read_text() + + widths: dict[str, int] = {} + + # Match: reg [15:0] host_detect_threshold; + # Also: reg host_trigger_pulse; + for m in re.finditer( + r'reg\s+(?:\[\s*(\d+)\s*:\s*(\d+)\s*\]\s+)?(host_\w+)\s*;', + text + ): + width = int(m.group(1)) - int(m.group(2)) + 1 if m.group(1) is not None else 1 + widths[m.group(3)] = width + + return widths + + +def parse_verilog_packet_constants( + filepath: Path | None = None, +) -> dict[str, PacketConstants]: + """Extract HEADER, FOOTER, STATUS_HEADER, packet size localparams.""" + if filepath is None: + filepath = FPGA_DIR / "usb_data_interface_ft2232h.v" + text = filepath.read_text() + + def _find(pattern: str) -> int: + m = re.search(pattern, text) + if not m: + raise ValueError(f"Pattern not found in {filepath}: {pattern}") + val = m.group(1) + # Parse Verilog literals: 8'hAA → 0xAA, 5'd11 → 11 + vlog_m = re.match(r"\d+'h([0-9a-fA-F]+)", val) + if vlog_m: + return int(vlog_m.group(1), 16) + vlog_m = re.match(r"\d+'d(\d+)", val) + if vlog_m: + return int(vlog_m.group(1)) + return int(val, 16) if val.startswith("0x") else int(val) + + header_val = _find(r"localparam\s+HEADER\s*=\s*(\d+'h[0-9a-fA-F]+)") + footer_val = _find(r"localparam\s+FOOTER\s*=\s*(\d+'h[0-9a-fA-F]+)") + status_hdr = _find(r"localparam\s+STATUS_HEADER\s*=\s*(\d+'h[0-9a-fA-F]+)") + + data_size = _find(r"DATA_PKT_LEN\s*=\s*(\d+'d\d+)") + status_size = _find(r"STATUS_PKT_LEN\s*=\s*(\d+'d\d+)") + + return { + "data": PacketConstants(header=header_val, footer=footer_val, size=data_size), + "status": PacketConstants(header=status_hdr, footer=footer_val, size=status_size), + } + + +def count_concat_bits(concat_expr: str, port_widths: dict[str, int]) -> ConcatWidth: + """ + Count total bits in a Verilog concatenation expression like: + {8'hFF, 3'b000, status_radar_mode, 5'b00000, status_stream_ctrl, status_cfar_threshold} + + Uses port_widths to resolve signal widths. Returns ConcatWidth. + """ + # Remove outer braces + inner = concat_expr.strip().strip("{}") + fragments: list[tuple[str, int]] = [] + total = 0 + + for part in re.split(r',\s*', inner): + part = part.strip() + if not part: + continue + + # Literal: N'bXXX, N'dXXX, N'hXX, or just a decimal + lit_match = re.match(r"(\d+)'[bdhoBDHO]", part) + if lit_match: + w = int(lit_match.group(1)) + fragments.append((part, w)) + total += w + continue + + # Signal with bit select: sig[M:N] or sig[N] + sel_match = re.match(r'(\w+)\[(\d+):(\d+)\]', part) + if sel_match: + w = int(sel_match.group(2)) - int(sel_match.group(3)) + 1 + fragments.append((part, w)) + total += w + continue + + sel_match = re.match(r'(\w+)\[(\d+)\]', part) + if sel_match: + fragments.append((part, 1)) + total += 1 + continue + + # Bare signal: look up in port_widths + if part in port_widths: + w = port_widths[part] + fragments.append((part, w)) + total += w + else: + # Unknown width — flag it + fragments.append((part, -1)) + total = -1 # Can't compute + + return ConcatWidth( + total_bits=total, + target_bits=32, + fragments=fragments, + truncated=total > 32 if total > 0 else False, + ) + + +def parse_verilog_status_word_concats( + filepath: Path | None = None, +) -> dict[int, str]: + """ + Extract the raw concatenation expression for each status_words[N] assignment. + Returns {word_index: concat_expression_string}. + """ + if filepath is None: + filepath = FPGA_DIR / "usb_data_interface_ft2232h.v" + text = filepath.read_text() + + results: dict[int, str] = {} + + # Multi-line concat: status_words[N] <= {... }; + # We need to handle multi-line expressions + for m in re.finditer( + r'status_words\[(\d+)\]\s*<=\s*(\{[^;]+\})\s*;', + text, re.DOTALL + ): + idx = int(m.group(1)) + expr = m.group(2) + # Normalize whitespace + expr = re.sub(r'\s+', ' ', expr).strip() + results[idx] = expr + + return results + + +def get_usb_interface_port_widths(filepath: Path | None = None) -> dict[str, int]: + """ + Parse port declarations from usb_data_interface_ft2232h.v module header. + Returns {port_name: bit_width}. + """ + if filepath is None: + filepath = FPGA_DIR / "usb_data_interface_ft2232h.v" + text = filepath.read_text() + + widths: dict[str, int] = {} + + # Match: input wire [15:0] status_cfar_threshold, + # Also: input wire status_self_test_busy + for m in re.finditer( + r'(?:input|output)\s+(?:wire|reg)\s+(?:\[\s*(\d+)\s*:\s*(\d+)\s*\]\s+)?(\w+)', + text + ): + width = int(m.group(1)) - int(m.group(2)) + 1 if m.group(1) is not None else 1 + widths[m.group(3)] = width + + return widths + + +def parse_verilog_data_mux( + filepath: Path | None = None, +) -> list[DataPacketField]: + """ + Parse the data_pkt_byte mux from usb_data_interface_ft2232h.v. + Returns fields with byte positions and signal names. + """ + if filepath is None: + filepath = FPGA_DIR / "usb_data_interface_ft2232h.v" + text = filepath.read_text() + + # Find the data mux case block + match = re.search( + r'always\s+@\(\*\)\s+begin\s+case\s*\(wr_byte_idx\)(.*?)endcase', + text, re.DOTALL + ) + if not match: + raise ValueError("Could not find data_pkt_byte mux") + + mux_body = match.group(1) + entries: list[tuple[int, str]] = [] + + for m in re.finditer( + r"5'd(\d+)\s*:\s*data_pkt_byte\s*=\s*(.+?);", + mux_body + ): + idx = int(m.group(1)) + expr = m.group(2).strip() + entries.append((idx, expr)) + + # Group consecutive bytes by signal root name + fields: list[DataPacketField] = [] + i = 0 + while i < len(entries): + idx, expr = entries[i] + if expr == "HEADER" or expr == "FOOTER": + i += 1 + continue + + # Extract signal name (e.g., range_profile_cap from range_profile_cap[31:24]) + sig_match = re.match(r'(\w+?)(?:\[|$)', expr) + if not sig_match: + i += 1 + continue + + signal = sig_match.group(1) + start_byte = idx + end_byte = idx + + # Find consecutive bytes of the same signal + j = i + 1 + while j < len(entries): + next_idx, next_expr = entries[j] + if next_expr.startswith(signal): + end_byte = next_idx + j += 1 + else: + break + + n_bytes = end_byte - start_byte + 1 + fields.append(DataPacketField( + name=signal.replace("_cap", ""), + byte_start=start_byte, + byte_end=end_byte, + width_bits=n_bytes * 8, + )) + i = j + + return fields + + +# =================================================================== +# STM32 / C layer parser +# =================================================================== + +def parse_stm32_settings_fields( + filepath: Path | None = None, +) -> list[SettingsField]: + """ + Parse RadarSettings::parseFromUSB to extract field order, offsets, types. + """ + if filepath is None: + filepath = MCU_LIB_DIR / "RadarSettings.cpp" + + if not filepath.exists(): + return [] # MCU code not available (CI might not have it) + + text = filepath.read_text(encoding="latin-1") + + fields: list[SettingsField] = [] + + # Look for memcpy + shift patterns that extract doubles and uint32s + # Pattern for doubles: loop reading 8 bytes big-endian + # Pattern for uint32: 4 bytes big-endian + # We'll parse the assignment targets in order + + # Find the parseFromUSB function + match = re.search( + r'parseFromUSB\s*\(.*?\)\s*\{(.*?)^\}', + text, re.DOTALL | re.MULTILINE + ) + if not match: + return fields + + body = match.group(1) + + # The fields are extracted sequentially from the payload. + # Look for variable assignments that follow the memcpy/extraction pattern. + # Based on known code: extractDouble / extractUint32 patterns + field_names = [ + ("system_frequency", 8, "double"), + ("chirp_duration_1", 8, "double"), + ("chirp_duration_2", 8, "double"), + ("chirps_per_position", 4, "uint32_t"), + ("freq_min", 8, "double"), + ("freq_max", 8, "double"), + ("prf1", 8, "double"), + ("prf2", 8, "double"), + ("max_distance", 8, "double"), + ("map_size", 8, "double"), + ] + + offset = 0 + for name, size, ctype in field_names: + # Verify the field name appears in the function body + if name in body or name.replace("_", "") in body.lower(): + fields.append(SettingsField( + name=name, offset=offset, size=size, c_type=ctype + )) + offset += size + + return fields + + +def parse_stm32_start_flag( + filepath: Path | None = None, +) -> list[int]: + """Parse the USB start flag bytes from USBHandler.cpp.""" + if filepath is None: + filepath = MCU_LIB_DIR / "USBHandler.cpp" + + if not filepath.exists(): + return [] + + text = filepath.read_text() + + # Look for the start flag array, e.g. {23, 46, 158, 237} + match = re.search(r'start_flag.*?=\s*\{([^}]+)\}', text, re.DOTALL) + if not match: + # Try alternate patterns + match = re.search(r'\{(\s*\d+\s*,\s*\d+\s*,\s*\d+\s*,\s*\d+\s*)\}', text) + if not match: + return [] + + return [int(x.strip()) for x in match.group(1).split(",") if x.strip().isdigit()] + + +# =================================================================== +# GPIO parser +# =================================================================== + +def parse_xdc_gpio_pins(filepath: Path | None = None) -> list[GpioPin]: + """Parse XDC constraints for DIG_* pin assignments.""" + if filepath is None: + filepath = XDC_DIR / "xc7a50t_ftg256.xdc" + + if not filepath.exists(): + return [] + + text = filepath.read_text() + pins: list[GpioPin] = [] + + # Match: set_property PACKAGE_PIN XX [get_ports {signal_name}] + for m in re.finditer( + r'set_property\s+PACKAGE_PIN\s+(\w+)\s+\[get_ports\s+\{?(\w+)\}?\]', + text + ): + pin = m.group(1) + signal = m.group(2) + if any(kw in signal for kw in ("stm32_", "reset_n", "dig_")): + # Determine direction from signal name + if signal in ("stm32_new_chirp", "stm32_new_elevation", + "stm32_new_azimuth", "stm32_mixers_enable"): + direction = "input" # FPGA receives these + elif signal == "reset_n": + direction = "input" + else: + direction = "unknown" + pins.append(GpioPin( + name=signal, pin_id=pin, direction=direction, layer="fpga" + )) + + return pins + + +def parse_stm32_gpio_init(filepath: Path | None = None) -> list[GpioPin]: + """Parse STM32 GPIO initialization for PD8-PD15 directions.""" + if filepath is None: + filepath = MCU_CODE_DIR / "main.cpp" + + if not filepath.exists(): + return [] + + text = filepath.read_text() + pins: list[GpioPin] = [] + + # Look for GPIO_InitStruct.Pin and GPIO_InitStruct.Mode patterns + # This is approximate — STM32 HAL GPIO init is complex + # Look for PD8-PD15 configuration (output vs input) + + # Pattern: GPIO_PIN_8 | GPIO_PIN_9 ... with Mode = OUTPUT + # We'll find blocks that configure GPIOD pins + for m in re.finditer( + r'GPIO_InitStruct\.Pin\s*=\s*([^;]+);.*?' + r'GPIO_InitStruct\.Mode\s*=\s*(\w+)', + text, re.DOTALL + ): + pin_expr = m.group(1) + mode = m.group(2) + + direction = "output" if "OUTPUT" in mode else "input" + + # Extract individual pin numbers + for pin_m in re.finditer(r'GPIO_PIN_(\d+)', pin_expr): + pin_num = int(pin_m.group(1)) + if 8 <= pin_num <= 15: + pins.append(GpioPin( + name=f"PD{pin_num}", + pin_id=f"PD{pin_num}", + direction=direction, + layer="stm32" + )) + + return pins diff --git a/9_Firmware/tests/cross_layer/stm32_settings_stub.cpp b/9_Firmware/tests/cross_layer/stm32_settings_stub.cpp new file mode 100644 index 0000000..853b327 --- /dev/null +++ b/9_Firmware/tests/cross_layer/stm32_settings_stub.cpp @@ -0,0 +1,86 @@ +/** + * stm32_settings_stub.cpp + * + * Standalone stub that wraps the real RadarSettings class. + * Reads a binary settings packet from a file (argv[1]), + * parses it using RadarSettings::parseFromUSB(), and prints + * all parsed field=value pairs to stdout. + * + * Compile: c++ -std=c++11 -o stm32_settings_stub stm32_settings_stub.cpp \ + * ../../9_Firmware/9_1_Microcontroller/9_1_1_C_Cpp_Libraries/RadarSettings.cpp \ + * -I../../9_Firmware/9_1_Microcontroller/9_1_1_C_Cpp_Libraries/ + * + * Usage: ./stm32_settings_stub packet.bin + * Prints: field=value lines (one per field) + * Exit code: 0 if parse succeeded, 1 if failed + */ + +#include "RadarSettings.h" +#include +#include + +int main(int argc, char* argv[]) { + if (argc != 2) { + fprintf(stderr, "Usage: %s \n", argv[0]); + return 2; + } + + // Read binary packet from file + FILE* f = fopen(argv[1], "rb"); + if (!f) { + fprintf(stderr, "ERROR: Cannot open %s\n", argv[1]); + return 2; + } + + fseek(f, 0, SEEK_END); + long file_size = ftell(f); + fseek(f, 0, SEEK_SET); + + if (file_size <= 0 || file_size > 4096) { + fprintf(stderr, "ERROR: Invalid file size %ld\n", file_size); + fclose(f); + return 2; + } + + uint8_t* buf = (uint8_t*)malloc(file_size); + if (!buf) { + fprintf(stderr, "ERROR: malloc failed\n"); + fclose(f); + return 2; + } + + size_t nread = fread(buf, 1, file_size, f); + fclose(f); + + if ((long)nread != file_size) { + fprintf(stderr, "ERROR: Short read (%zu of %ld)\n", nread, file_size); + free(buf); + return 2; + } + + // Parse using the real RadarSettings class + RadarSettings settings; + bool ok = settings.parseFromUSB(buf, (uint32_t)file_size); + free(buf); + + if (!ok) { + printf("parse_ok=false\n"); + return 1; + } + + // Print all fields with full precision + // Python orchestrator will compare these against expected values + printf("parse_ok=true\n"); + printf("system_frequency=%.17g\n", settings.getSystemFrequency()); + printf("chirp_duration_1=%.17g\n", settings.getChirpDuration1()); + printf("chirp_duration_2=%.17g\n", settings.getChirpDuration2()); + printf("chirps_per_position=%u\n", settings.getChirpsPerPosition()); + printf("freq_min=%.17g\n", settings.getFreqMin()); + printf("freq_max=%.17g\n", settings.getFreqMax()); + printf("prf1=%.17g\n", settings.getPRF1()); + printf("prf2=%.17g\n", settings.getPRF2()); + printf("max_distance=%.17g\n", settings.getMaxDistance()); + printf("map_size=%.17g\n", settings.getMapSize()); + + return 0; +} diff --git a/9_Firmware/tests/cross_layer/tb_cross_layer_ft2232h.v b/9_Firmware/tests/cross_layer/tb_cross_layer_ft2232h.v new file mode 100644 index 0000000..261d78f --- /dev/null +++ b/9_Firmware/tests/cross_layer/tb_cross_layer_ft2232h.v @@ -0,0 +1,667 @@ +`timescale 1ns / 1ps + +/** + * tb_cross_layer_ft2232h.v + * + * Cross-layer contract testbench for the FT2232H USB interface. + * Exercises three packet types with known distinctive values and dumps + * captured bytes to text files that the Python orchestrator can parse. + * + * Exercise A: Command round-trip (Host -> FPGA) + * - Send every opcode through the 4-byte read FSM + * - Dump cmd_opcode, cmd_addr, cmd_value to cmd_results.txt + * + * Exercise B: Data packet generation (FPGA -> Host) + * - Inject known range/doppler/cfar values + * - Capture all 11 output bytes + * - Dump to data_packet.txt + * + * Exercise C: Status packet generation (FPGA -> Host) + * - Set all status inputs to known non-zero values + * - Trigger status request + * - Capture all 26 output bytes + * - Dump to status_packet.txt + */ + +module tb_cross_layer_ft2232h; + + // Clock periods + localparam CLK_PERIOD = 10.0; // 100 MHz system clock + localparam FT_CLK_PERIOD = 16.67; // 60 MHz FT2232H clock + + // ---- Signals ---- + reg clk; + reg reset_n; + reg ft_reset_n; + + // Radar data inputs + reg [31:0] range_profile; + reg range_valid; + reg [15:0] doppler_real; + reg [15:0] doppler_imag; + reg doppler_valid; + reg cfar_detection; + reg cfar_valid; + + // FT2232H physical interface + wire [7:0] ft_data; + reg ft_rxf_n; + reg ft_txe_n; + wire ft_rd_n; + wire ft_wr_n; + wire ft_oe_n; + wire ft_siwu; + reg ft_clk; + + // Host-side bus driver (for command injection) + reg [7:0] host_data_drive; + reg host_data_drive_en; + assign ft_data = host_data_drive_en ? host_data_drive : 8'hZZ; + + // Pulldown to avoid X during idle + pulldown pd[7:0] (ft_data); + + // DUT command outputs + wire [31:0] cmd_data; + wire cmd_valid; + wire [7:0] cmd_opcode; + wire [7:0] cmd_addr; + wire [15:0] cmd_value; + + // Stream control + reg [2:0] stream_control; + + // Status inputs + reg status_request; + reg [15:0] status_cfar_threshold; + reg [2:0] status_stream_ctrl; + reg [1:0] status_radar_mode; + reg [15:0] status_long_chirp; + reg [15:0] status_long_listen; + reg [15:0] status_guard; + reg [15:0] status_short_chirp; + reg [15:0] status_short_listen; + reg [5:0] status_chirps_per_elev; + reg [1:0] status_range_mode; + reg [4:0] status_self_test_flags; + reg [7:0] status_self_test_detail; + reg status_self_test_busy; + + // ---- Clock generators ---- + always #(CLK_PERIOD / 2) clk = ~clk; + always #(FT_CLK_PERIOD / 2) ft_clk = ~ft_clk; + + // ---- DUT instantiation ---- + usb_data_interface_ft2232h uut ( + .clk (clk), + .reset_n (reset_n), + .ft_reset_n (ft_reset_n), + .range_profile (range_profile), + .range_valid (range_valid), + .doppler_real (doppler_real), + .doppler_imag (doppler_imag), + .doppler_valid (doppler_valid), + .cfar_detection (cfar_detection), + .cfar_valid (cfar_valid), + .ft_data (ft_data), + .ft_rxf_n (ft_rxf_n), + .ft_txe_n (ft_txe_n), + .ft_rd_n (ft_rd_n), + .ft_wr_n (ft_wr_n), + .ft_oe_n (ft_oe_n), + .ft_siwu (ft_siwu), + .ft_clk (ft_clk), + .cmd_data (cmd_data), + .cmd_valid (cmd_valid), + .cmd_opcode (cmd_opcode), + .cmd_addr (cmd_addr), + .cmd_value (cmd_value), + .stream_control (stream_control), + .status_request (status_request), + .status_cfar_threshold (status_cfar_threshold), + .status_stream_ctrl (status_stream_ctrl), + .status_radar_mode (status_radar_mode), + .status_long_chirp (status_long_chirp), + .status_long_listen (status_long_listen), + .status_guard (status_guard), + .status_short_chirp (status_short_chirp), + .status_short_listen (status_short_listen), + .status_chirps_per_elev (status_chirps_per_elev), + .status_range_mode (status_range_mode), + .status_self_test_flags (status_self_test_flags), + .status_self_test_detail(status_self_test_detail), + .status_self_test_busy (status_self_test_busy) + ); + + // ---- Test bookkeeping ---- + integer pass_count; + integer fail_count; + integer test_num; + integer cmd_file; + integer data_file; + integer status_file; + + // ---- Check task ---- + task check; + input cond; + input [511:0] label; + begin + test_num = test_num + 1; + if (cond) begin + $display("[PASS] Test %0d: %0s", test_num, label); + pass_count = pass_count + 1; + end else begin + $display("[FAIL] Test %0d: %0s", test_num, label); + fail_count = fail_count + 1; + end + end + endtask + + // ---- Helper: apply reset ---- + task apply_reset; + begin + reset_n = 0; + ft_reset_n = 0; + range_profile = 32'h0; + range_valid = 0; + doppler_real = 16'h0; + doppler_imag = 16'h0; + doppler_valid = 0; + cfar_detection = 0; + cfar_valid = 0; + ft_rxf_n = 1; // No host data available + ft_txe_n = 0; // TX FIFO ready + host_data_drive = 8'h0; + host_data_drive_en = 0; + stream_control = 3'b111; + status_request = 0; + status_cfar_threshold = 16'd0; + status_stream_ctrl = 3'b000; + status_radar_mode = 2'b00; + status_long_chirp = 16'd0; + status_long_listen = 16'd0; + status_guard = 16'd0; + status_short_chirp = 16'd0; + status_short_listen = 16'd0; + status_chirps_per_elev = 6'd0; + status_range_mode = 2'b00; + status_self_test_flags = 5'b00000; + status_self_test_detail = 8'd0; + status_self_test_busy = 1'b0; + repeat (6) @(posedge ft_clk); + reset_n = 1; + ft_reset_n = 1; + // Wait for stream_control CDC to propagate + repeat (8) @(posedge ft_clk); + end + endtask + + // ---- Helper: send one 4-byte command via FT2232H read path ---- + // + // FT2232H read FSM cycle-by-cycle: + // Cycle 0 (RD_IDLE): sees !ft_rxf_n → ft_oe_n<=0, → RD_OE_ASSERT + // Cycle 1 (RD_OE_ASSERT): sees !ft_rxf_n → ft_rd_n<=0, → RD_READING + // Cycle 2 (RD_READING): samples ft_data=byte0, cnt 0→1 + // Cycle 3 (RD_READING): samples ft_data=byte1, cnt 1→2 + // Cycle 4 (RD_READING): samples ft_data=byte2, cnt 2→3 + // Cycle 5 (RD_READING): samples ft_data=byte3, cnt=3→0, → RD_DEASSERT + // Cycle 6 (RD_DEASSERT): ft_oe_n<=1, → RD_PROCESS + // Cycle 7 (RD_PROCESS): cmd_valid<=1, decode, → RD_IDLE + // + // Data must be stable BEFORE the sampling posedge. We use #1 after + // posedge to change data in the "delta after" region. + task send_command_ft2232h; + input [7:0] byte0; // opcode + input [7:0] byte1; // addr + input [7:0] byte2; // value_hi + input [7:0] byte3; // value_lo + begin + // Pre-drive byte0 and signal data available + @(posedge ft_clk); #1; + host_data_drive = byte0; + host_data_drive_en = 1; + ft_rxf_n = 0; + + // Cycle 0: RD_IDLE sees !ft_rxf_n, goes to OE_ASSERT + @(posedge ft_clk); #1; + + // Cycle 1: RD_OE_ASSERT, ft_rd_n goes low, goes to RD_READING + @(posedge ft_clk); #1; + + // Cycle 2: RD_READING, byte0 is sampled, cnt 0→1 + // Now change to byte1 for next sample + @(posedge ft_clk); #1; + host_data_drive = byte1; + + // Cycle 3: RD_READING, byte1 is sampled, cnt 1→2 + @(posedge ft_clk); #1; + host_data_drive = byte2; + + // Cycle 4: RD_READING, byte2 is sampled, cnt 2→3 + @(posedge ft_clk); #1; + host_data_drive = byte3; + + // Cycle 5: RD_READING, byte3 is sampled, cnt=3, → RD_DEASSERT + @(posedge ft_clk); #1; + + // Cycle 6: RD_DEASSERT, ft_oe_n←1, → RD_PROCESS + @(posedge ft_clk); #1; + + // Cycle 7: RD_PROCESS, cmd decoded, cmd_valid←1, → RD_IDLE + @(posedge ft_clk); #1; + + // cmd_valid was asserted at cycle 7's posedge. cmd_opcode/addr/value + // are now valid (registered outputs hold until next RD_PROCESS). + + // Release bus + host_data_drive_en = 0; + host_data_drive = 8'h0; + ft_rxf_n = 1; + + // Settle + repeat (2) @(posedge ft_clk); + end + endtask + + // ---- Helper: capture N write bytes from the DUT ---- + // Monitors ft_wr_n and ft_data_out, captures bytes into array. + // Used for data packets (11 bytes) and status packets (26 bytes). + reg [7:0] captured_bytes [0:31]; + integer capture_count; + + task capture_write_bytes; + input integer expected_count; + integer timeout; + begin + capture_count = 0; + timeout = 0; + + while (capture_count < expected_count && timeout < 2000) begin + @(posedge ft_clk); #1; + timeout = timeout + 1; + // DUT drives byte when ft_wr_n=0 and ft_data_oe=1 + // Sample AFTER posedge so registered outputs are settled + if (!ft_wr_n && uut.ft_data_oe) begin + captured_bytes[capture_count] = uut.ft_data_out; + capture_count = capture_count + 1; + end + end + end + endtask + + // ---- Helper: pulse range_valid with CDC wait ---- + // Toggle CDC needs 3 sync stages + edge detect = 4+ ft_clk cycles. + // Use 12 for safety margin. + task assert_range_valid; + input [31:0] data; + begin + @(posedge clk); #1; + range_profile = data; + range_valid = 1; + @(posedge clk); #1; + range_valid = 0; + // Wait for toggle CDC propagation + repeat (12) @(posedge ft_clk); + end + endtask + + // ---- Helper: pulse doppler_valid ---- + task pulse_doppler; + input [15:0] dr; + input [15:0] di; + begin + @(posedge clk); #1; + doppler_real = dr; + doppler_imag = di; + doppler_valid = 1; + @(posedge clk); #1; + doppler_valid = 0; + repeat (12) @(posedge ft_clk); + end + endtask + + // ---- Helper: pulse cfar_valid ---- + task pulse_cfar; + input det; + begin + @(posedge clk); #1; + cfar_detection = det; + cfar_valid = 1; + @(posedge clk); #1; + cfar_valid = 0; + repeat (12) @(posedge ft_clk); + end + endtask + + // ---- Helper: pulse status_request ---- + task pulse_status_request; + begin + @(posedge clk); #1; + status_request = 1; + @(posedge clk); #1; + status_request = 0; + // Wait for toggle CDC propagation + repeat (12) @(posedge ft_clk); + end + endtask + + // ================================================================ + // Main stimulus + // ================================================================ + integer i; + + initial begin + $dumpfile("tb_cross_layer_ft2232h.vcd"); + $dumpvars(0, tb_cross_layer_ft2232h); + + clk = 0; + ft_clk = 0; + pass_count = 0; + fail_count = 0; + test_num = 0; + + // ============================================================ + // EXERCISE A: Command Round-Trip + // Send commands with known opcode/addr/value, verify decoding. + // Dump results to cmd_results.txt for Python validation. + // ============================================================ + $display("\n=== EXERCISE A: Command Round-Trip ==="); + apply_reset; + + cmd_file = $fopen("cmd_results.txt", "w"); + $fwrite(cmd_file, "# opcode_sent addr_sent value_sent opcode_got addr_got value_got\n"); + + // Test all real opcodes from radar_system_top.v + // Format: opcode, addr=0x00, value + + // Basic control + send_command_ft2232h(8'h01, 8'h00, 8'h00, 8'h02); // RADAR_MODE=2 + $fwrite(cmd_file, "%02x %02x %04x %02x %02x %04x\n", + 8'h01, 8'h00, 16'h0002, cmd_opcode, cmd_addr, cmd_value); + check(cmd_opcode === 8'h01 && cmd_value === 16'h0002, + "Cmd 0x01: RADAR_MODE=2"); + + send_command_ft2232h(8'h02, 8'h00, 8'h00, 8'h01); // TRIGGER_PULSE + $fwrite(cmd_file, "%02x %02x %04x %02x %02x %04x\n", + 8'h02, 8'h00, 16'h0001, cmd_opcode, cmd_addr, cmd_value); + check(cmd_opcode === 8'h02 && cmd_value === 16'h0001, + "Cmd 0x02: TRIGGER_PULSE"); + + send_command_ft2232h(8'h03, 8'h00, 8'h27, 8'h10); // DETECT_THRESHOLD=10000 + $fwrite(cmd_file, "%02x %02x %04x %02x %02x %04x\n", + 8'h03, 8'h00, 16'h2710, cmd_opcode, cmd_addr, cmd_value); + check(cmd_opcode === 8'h03 && cmd_value === 16'h2710, + "Cmd 0x03: DETECT_THRESHOLD=10000"); + + send_command_ft2232h(8'h04, 8'h00, 8'h00, 8'h07); // STREAM_CONTROL=7 + $fwrite(cmd_file, "%02x %02x %04x %02x %02x %04x\n", + 8'h04, 8'h00, 16'h0007, cmd_opcode, cmd_addr, cmd_value); + check(cmd_opcode === 8'h04 && cmd_value === 16'h0007, + "Cmd 0x04: STREAM_CONTROL=7"); + + // Chirp timing + send_command_ft2232h(8'h10, 8'h00, 8'h0B, 8'hB8); // LONG_CHIRP=3000 + $fwrite(cmd_file, "%02x %02x %04x %02x %02x %04x\n", + 8'h10, 8'h00, 16'h0BB8, cmd_opcode, cmd_addr, cmd_value); + check(cmd_opcode === 8'h10 && cmd_value === 16'h0BB8, + "Cmd 0x10: LONG_CHIRP=3000"); + + send_command_ft2232h(8'h11, 8'h00, 8'h35, 8'h84); // LONG_LISTEN=13700 + $fwrite(cmd_file, "%02x %02x %04x %02x %02x %04x\n", + 8'h11, 8'h00, 16'h3584, cmd_opcode, cmd_addr, cmd_value); + check(cmd_opcode === 8'h11 && cmd_value === 16'h3584, + "Cmd 0x11: LONG_LISTEN=13700"); + + send_command_ft2232h(8'h12, 8'h00, 8'h44, 8'h84); // GUARD=17540 + $fwrite(cmd_file, "%02x %02x %04x %02x %02x %04x\n", + 8'h12, 8'h00, 16'h4484, cmd_opcode, cmd_addr, cmd_value); + check(cmd_opcode === 8'h12 && cmd_value === 16'h4484, + "Cmd 0x12: GUARD=17540"); + + send_command_ft2232h(8'h13, 8'h00, 8'h00, 8'h32); // SHORT_CHIRP=50 + $fwrite(cmd_file, "%02x %02x %04x %02x %02x %04x\n", + 8'h13, 8'h00, 16'h0032, cmd_opcode, cmd_addr, cmd_value); + check(cmd_opcode === 8'h13 && cmd_value === 16'h0032, + "Cmd 0x13: SHORT_CHIRP=50"); + + send_command_ft2232h(8'h14, 8'h00, 8'h44, 8'h2A); // SHORT_LISTEN=17450 + $fwrite(cmd_file, "%02x %02x %04x %02x %02x %04x\n", + 8'h14, 8'h00, 16'h442A, cmd_opcode, cmd_addr, cmd_value); + check(cmd_opcode === 8'h14 && cmd_value === 16'h442A, + "Cmd 0x14: SHORT_LISTEN=17450"); + + send_command_ft2232h(8'h15, 8'h00, 8'h00, 8'h20); // CHIRPS_PER_ELEV=32 + $fwrite(cmd_file, "%02x %02x %04x %02x %02x %04x\n", + 8'h15, 8'h00, 16'h0020, cmd_opcode, cmd_addr, cmd_value); + check(cmd_opcode === 8'h15 && cmd_value === 16'h0020, + "Cmd 0x15: CHIRPS_PER_ELEV=32"); + + // Digital gain + send_command_ft2232h(8'h16, 8'h00, 8'h00, 8'h05); // GAIN_SHIFT=5 + $fwrite(cmd_file, "%02x %02x %04x %02x %02x %04x\n", + 8'h16, 8'h00, 16'h0005, cmd_opcode, cmd_addr, cmd_value); + check(cmd_opcode === 8'h16 && cmd_value === 16'h0005, + "Cmd 0x16: GAIN_SHIFT=5"); + + // Signal processing + send_command_ft2232h(8'h20, 8'h00, 8'h00, 8'h01); // RANGE_MODE=1 + $fwrite(cmd_file, "%02x %02x %04x %02x %02x %04x\n", + 8'h20, 8'h00, 16'h0001, cmd_opcode, cmd_addr, cmd_value); + check(cmd_opcode === 8'h20 && cmd_value === 16'h0001, + "Cmd 0x20: RANGE_MODE=1"); + + send_command_ft2232h(8'h21, 8'h00, 8'h00, 8'h03); // CFAR_GUARD=3 + $fwrite(cmd_file, "%02x %02x %04x %02x %02x %04x\n", + 8'h21, 8'h00, 16'h0003, cmd_opcode, cmd_addr, cmd_value); + check(cmd_opcode === 8'h21 && cmd_value === 16'h0003, + "Cmd 0x21: CFAR_GUARD=3"); + + send_command_ft2232h(8'h22, 8'h00, 8'h00, 8'h0C); // CFAR_TRAIN=12 + $fwrite(cmd_file, "%02x %02x %04x %02x %02x %04x\n", + 8'h22, 8'h00, 16'h000C, cmd_opcode, cmd_addr, cmd_value); + check(cmd_opcode === 8'h22 && cmd_value === 16'h000C, + "Cmd 0x22: CFAR_TRAIN=12"); + + send_command_ft2232h(8'h23, 8'h00, 8'h00, 8'h30); // CFAR_ALPHA=0x30 + $fwrite(cmd_file, "%02x %02x %04x %02x %02x %04x\n", + 8'h23, 8'h00, 16'h0030, cmd_opcode, cmd_addr, cmd_value); + check(cmd_opcode === 8'h23 && cmd_value === 16'h0030, + "Cmd 0x23: CFAR_ALPHA=0x30"); + + send_command_ft2232h(8'h24, 8'h00, 8'h00, 8'h01); // CFAR_MODE=1 (GO) + $fwrite(cmd_file, "%02x %02x %04x %02x %02x %04x\n", + 8'h24, 8'h00, 16'h0001, cmd_opcode, cmd_addr, cmd_value); + check(cmd_opcode === 8'h24 && cmd_value === 16'h0001, + "Cmd 0x24: CFAR_MODE=1"); + + send_command_ft2232h(8'h25, 8'h00, 8'h00, 8'h01); // CFAR_ENABLE=1 + $fwrite(cmd_file, "%02x %02x %04x %02x %02x %04x\n", + 8'h25, 8'h00, 16'h0001, cmd_opcode, cmd_addr, cmd_value); + check(cmd_opcode === 8'h25 && cmd_value === 16'h0001, + "Cmd 0x25: CFAR_ENABLE=1"); + + send_command_ft2232h(8'h26, 8'h00, 8'h00, 8'h01); // MTI_ENABLE=1 + $fwrite(cmd_file, "%02x %02x %04x %02x %02x %04x\n", + 8'h26, 8'h00, 16'h0001, cmd_opcode, cmd_addr, cmd_value); + check(cmd_opcode === 8'h26 && cmd_value === 16'h0001, + "Cmd 0x26: MTI_ENABLE=1"); + + send_command_ft2232h(8'h27, 8'h00, 8'h00, 8'h03); // DC_NOTCH_WIDTH=3 + $fwrite(cmd_file, "%02x %02x %04x %02x %02x %04x\n", + 8'h27, 8'h00, 16'h0003, cmd_opcode, cmd_addr, cmd_value); + check(cmd_opcode === 8'h27 && cmd_value === 16'h0003, + "Cmd 0x27: DC_NOTCH_WIDTH=3"); + + // Self-test / status + send_command_ft2232h(8'h30, 8'h00, 8'h00, 8'h01); // SELF_TEST_TRIGGER + $fwrite(cmd_file, "%02x %02x %04x %02x %02x %04x\n", + 8'h30, 8'h00, 16'h0001, cmd_opcode, cmd_addr, cmd_value); + check(cmd_opcode === 8'h30 && cmd_value === 16'h0001, + "Cmd 0x30: SELF_TEST_TRIGGER"); + + send_command_ft2232h(8'h31, 8'h00, 8'h00, 8'h01); // SELF_TEST_STATUS + $fwrite(cmd_file, "%02x %02x %04x %02x %02x %04x\n", + 8'h31, 8'h00, 16'h0001, cmd_opcode, cmd_addr, cmd_value); + check(cmd_opcode === 8'h31 && cmd_value === 16'h0001, + "Cmd 0x31: SELF_TEST_STATUS"); + + send_command_ft2232h(8'hFF, 8'h00, 8'h00, 8'h00); // STATUS_REQUEST + $fwrite(cmd_file, "%02x %02x %04x %02x %02x %04x\n", + 8'hFF, 8'h00, 16'h0000, cmd_opcode, cmd_addr, cmd_value); + check(cmd_opcode === 8'hFF && cmd_value === 16'h0000, + "Cmd 0xFF: STATUS_REQUEST"); + + // Non-zero addr test + send_command_ft2232h(8'h01, 8'hAB, 8'hCD, 8'hEF); // addr=0xAB, value=0xCDEF + $fwrite(cmd_file, "%02x %02x %04x %02x %02x %04x\n", + 8'h01, 8'hAB, 16'hCDEF, cmd_opcode, cmd_addr, cmd_value); + check(cmd_opcode === 8'h01 && cmd_addr === 8'hAB && cmd_value === 16'hCDEF, + "Cmd 0x01 with addr=0xAB, value=0xCDEF"); + + $fclose(cmd_file); + + // ============================================================ + // EXERCISE B: Data Packet Generation + // Inject known values, capture 11-byte output. + // ============================================================ + $display("\n=== EXERCISE B: Data Packet Generation ==="); + apply_reset; + ft_txe_n = 0; // TX FIFO ready + + // Use distinctive values that make truncation/swap bugs obvious + // range_profile = {Q[15:0], I[15:0]} = {0xCAFE, 0xBEEF} + // doppler_real = 0x1234, doppler_imag = 0x5678 + // cfar_detection = 1 + + // First inject doppler and cfar so pending flags are set + pulse_doppler(16'h1234, 16'h5678); + pulse_cfar(1'b1); + + // Now inject range_valid which triggers the write FSM. + // CRITICAL: Must capture bytes IN PARALLEL with the trigger, + // because the write FSM starts sending bytes ~3-4 ft_clk cycles + // after the toggle CDC propagates. If we wait for CDC propagation + // first, capture_write_bytes misses the early bytes. + fork + assert_range_valid(32'hCAFE_BEEF); + capture_write_bytes(11); + join + + check(capture_count === 11, + "Data packet: captured 11 bytes"); + + // Dump captured bytes to file + data_file = $fopen("data_packet.txt", "w"); + $fwrite(data_file, "# byte_index hex_value\n"); + for (i = 0; i < capture_count; i = i + 1) begin + $fwrite(data_file, "%0d %02x\n", i, captured_bytes[i]); + end + $fclose(data_file); + + // Verify locally too + check(captured_bytes[0] === 8'hAA, + "Data pkt: byte 0 = 0xAA (header)"); + check(captured_bytes[1] === 8'hCA, + "Data pkt: byte 1 = 0xCA (range MSB = Q high)"); + check(captured_bytes[2] === 8'hFE, + "Data pkt: byte 2 = 0xFE (range Q low)"); + check(captured_bytes[3] === 8'hBE, + "Data pkt: byte 3 = 0xBE (range I high)"); + check(captured_bytes[4] === 8'hEF, + "Data pkt: byte 4 = 0xEF (range I low)"); + check(captured_bytes[5] === 8'h12, + "Data pkt: byte 5 = 0x12 (doppler_real MSB)"); + check(captured_bytes[6] === 8'h34, + "Data pkt: byte 6 = 0x34 (doppler_real LSB)"); + check(captured_bytes[7] === 8'h56, + "Data pkt: byte 7 = 0x56 (doppler_imag MSB)"); + check(captured_bytes[8] === 8'h78, + "Data pkt: byte 8 = 0x78 (doppler_imag LSB)"); + check(captured_bytes[9] === 8'h01, + "Data pkt: byte 9 = 0x01 (cfar_detection=1)"); + check(captured_bytes[10] === 8'h55, + "Data pkt: byte 10 = 0x55 (footer)"); + + // ============================================================ + // EXERCISE C: Status Packet Generation + // Set known status values, trigger readback, capture 26 bytes. + // Uses distinctive non-zero values to detect truncation/swap. + // ============================================================ + $display("\n=== EXERCISE C: Status Packet Generation ==="); + apply_reset; + ft_txe_n = 0; + + // Set known distinctive status values + status_cfar_threshold = 16'hABCD; + status_stream_ctrl = 3'b101; + status_radar_mode = 2'b11; // Use 0b11 to test both bits + status_long_chirp = 16'h1234; + status_long_listen = 16'h5678; + status_guard = 16'h9ABC; + status_short_chirp = 16'hDEF0; + status_short_listen = 16'hFACE; + status_chirps_per_elev = 6'd42; + status_range_mode = 2'b10; + status_self_test_flags = 5'b10101; + status_self_test_detail = 8'hA5; + status_self_test_busy = 1'b1; + + // Pulse status_request and capture bytes IN PARALLEL + // (same reason as Exercise B — write FSM starts before CDC wait ends) + fork + pulse_status_request; + capture_write_bytes(26); + join + + check(capture_count === 26, + "Status packet: captured 26 bytes"); + + // Dump captured bytes to file + status_file = $fopen("status_packet.txt", "w"); + $fwrite(status_file, "# byte_index hex_value\n"); + for (i = 0; i < capture_count; i = i + 1) begin + $fwrite(status_file, "%0d %02x\n", i, captured_bytes[i]); + end + + // Also dump the raw status_words for debugging + $fwrite(status_file, "# status_words (internal):\n"); + for (i = 0; i < 6; i = i + 1) begin + $fwrite(status_file, "# word[%0d] = %08x\n", i, uut.status_words[i]); + end + $fclose(status_file); + + // Verify header/footer locally + check(captured_bytes[0] === 8'hBB, + "Status pkt: byte 0 = 0xBB (status header)"); + check(captured_bytes[25] === 8'h55, + "Status pkt: byte 25 = 0x55 (footer)"); + + // Verify status_words[1] = {long_chirp, long_listen} = {0x1234, 0x5678} + check(captured_bytes[5] === 8'h12 && captured_bytes[6] === 8'h34 && + captured_bytes[7] === 8'h56 && captured_bytes[8] === 8'h78, + "Status pkt: word1 = {long_chirp=0x1234, long_listen=0x5678}"); + + // Verify status_words[2] = {guard, short_chirp} = {0x9ABC, 0xDEF0} + check(captured_bytes[9] === 8'h9A && captured_bytes[10] === 8'hBC && + captured_bytes[11] === 8'hDE && captured_bytes[12] === 8'hF0, + "Status pkt: word2 = {guard=0x9ABC, short_chirp=0xDEF0}"); + + // ============================================================ + // Summary + // ============================================================ + $display(""); + $display("========================================"); + $display(" CROSS-LAYER FT2232H TB RESULTS"); + $display(" PASSED: %0d / %0d", pass_count, test_num); + $display(" FAILED: %0d / %0d", fail_count, test_num); + if (fail_count == 0) + $display(" ** ALL TESTS PASSED **"); + else + $display(" ** SOME TESTS FAILED **"); + $display("========================================"); + + #100; + $finish; + end + +endmodule diff --git a/9_Firmware/tests/cross_layer/test_cross_layer_contract.py b/9_Firmware/tests/cross_layer/test_cross_layer_contract.py new file mode 100644 index 0000000..9b3a7be --- /dev/null +++ b/9_Firmware/tests/cross_layer/test_cross_layer_contract.py @@ -0,0 +1,842 @@ +""" +Cross-Layer Contract Tests +========================== +Single pytest file orchestrating three tiers of verification: + +Tier 1 — Static Contract Parsing: + Compares Python, Verilog, and C source code at parse-time to catch + opcode mismatches, bit-width errors, packet constant drift, and + layout bugs like the status_words[0] 37-bit truncation. + +Tier 2 — Verilog Cosimulation (iverilog): + Compiles and runs tb_cross_layer_ft2232h.v, then parses its output + files (cmd_results.txt, data_packet.txt, status_packet.txt) and + runs Python parsers on the captured bytes to verify round-trip + correctness. + +Tier 3 — C Stub Execution: + Compiles stm32_settings_stub.cpp, generates a binary settings + packet from Python, runs the stub, and verifies all parsed field + values match. + +The goal is to find UNKNOWN bugs by testing each layer against +independently-derived ground truth — not just checking that two +layers agree (because both could be wrong). +""" + +from __future__ import annotations + +import os +import struct +import subprocess +import tempfile +from pathlib import Path + +import pytest + +# Import the contract parsers +import sys + +THIS_DIR = Path(__file__).resolve().parent +sys.path.insert(0, str(THIS_DIR)) +import contract_parser as cp # noqa: E402 + +# Also add the GUI dir to import radar_protocol +sys.path.insert(0, str(cp.GUI_DIR)) + + +# =================================================================== +# Helpers +# =================================================================== + +IVERILOG = os.environ.get("IVERILOG", "/opt/homebrew/bin/iverilog") +VVP = os.environ.get("VVP", "/opt/homebrew/bin/vvp") +CXX = os.environ.get("CXX", "c++") + +# Check tool availability for conditional skipping +_has_iverilog = Path(IVERILOG).exists() if "/" in IVERILOG else bool( + subprocess.run(["which", IVERILOG], capture_output=True).returncode == 0 +) +_has_cxx = subprocess.run( + [CXX, "--version"], capture_output=True +).returncode == 0 + + +def _parse_hex_results(text: str) -> list[dict[str, str]]: + """Parse space-separated hex lines from TB output files.""" + rows = [] + for line in text.strip().splitlines(): + line = line.strip() + if not line or line.startswith("#"): + continue + rows.append(line.split()) + return rows + + +# =================================================================== +# Ground Truth: FPGA register map (independently transcribed) +# =================================================================== +# This is the SINGLE SOURCE OF TRUTH, manually transcribed from +# radar_system_top.v lines 902-945. If any layer disagrees with +# this, it's a bug in that layer. + +GROUND_TRUTH_OPCODES = { + 0x01: ("host_radar_mode", 2), + 0x02: ("host_trigger_pulse", 1), # pulse + 0x03: ("host_detect_threshold", 16), + 0x04: ("host_stream_control", 3), + 0x10: ("host_long_chirp_cycles", 16), + 0x11: ("host_long_listen_cycles", 16), + 0x12: ("host_guard_cycles", 16), + 0x13: ("host_short_chirp_cycles", 16), + 0x14: ("host_short_listen_cycles", 16), + 0x15: ("host_chirps_per_elev", 6), + 0x16: ("host_gain_shift", 4), + 0x20: ("host_range_mode", 2), + 0x21: ("host_cfar_guard", 4), + 0x22: ("host_cfar_train", 5), + 0x23: ("host_cfar_alpha", 8), + 0x24: ("host_cfar_mode", 2), + 0x25: ("host_cfar_enable", 1), + 0x26: ("host_mti_enable", 1), + 0x27: ("host_dc_notch_width", 3), + 0x30: ("host_self_test_trigger", 1), # pulse + 0x31: ("host_status_request", 1), # pulse + 0xFF: ("host_status_request", 1), # alias, pulse +} + +GROUND_TRUTH_RESET_DEFAULTS = { + "host_radar_mode": 1, # 2'b01 + "host_detect_threshold": 10000, + "host_stream_control": 7, # 3'b111 + "host_long_chirp_cycles": 3000, + "host_long_listen_cycles": 13700, + "host_guard_cycles": 17540, + "host_short_chirp_cycles": 50, + "host_short_listen_cycles": 17450, + "host_chirps_per_elev": 32, + "host_gain_shift": 0, + "host_range_mode": 0, + "host_cfar_guard": 2, + "host_cfar_train": 8, + "host_cfar_alpha": 0x30, + "host_cfar_mode": 0, + "host_cfar_enable": 0, + "host_mti_enable": 0, + "host_dc_notch_width": 0, +} + +GROUND_TRUTH_PACKET_CONSTANTS = { + "data": {"header": 0xAA, "footer": 0x55, "size": 11}, + "status": {"header": 0xBB, "footer": 0x55, "size": 26}, +} + + +# =================================================================== +# TIER 1: Static Contract Parsing +# =================================================================== + +class TestTier1OpcodeContract: + """Verify Python and Verilog opcode sets match ground truth.""" + + def test_python_opcodes_match_ground_truth(self): + """Every Python Opcode must exist in ground truth with correct value.""" + py_opcodes = cp.parse_python_opcodes() + for val, entry in py_opcodes.items(): + assert val in GROUND_TRUTH_OPCODES, ( + f"Python Opcode {entry.name}=0x{val:02X} not in ground truth! " + f"Possible phantom opcode (like the 0x06 incident)." + ) + + def test_ground_truth_opcodes_in_python(self): + """Every ground truth opcode must have a Python enum entry.""" + py_opcodes = cp.parse_python_opcodes() + for val, (reg, _width) in GROUND_TRUTH_OPCODES.items(): + assert val in py_opcodes, ( + f"Ground truth opcode 0x{val:02X} ({reg}) missing from Python Opcode enum." + ) + + def test_verilog_opcodes_match_ground_truth(self): + """Every Verilog case entry must exist in ground truth.""" + v_opcodes = cp.parse_verilog_opcodes() + for val, entry in v_opcodes.items(): + assert val in GROUND_TRUTH_OPCODES, ( + f"Verilog opcode 0x{val:02X} ({entry.register}) not in ground truth." + ) + + def test_ground_truth_opcodes_in_verilog(self): + """Every ground truth opcode must have a Verilog case entry.""" + v_opcodes = cp.parse_verilog_opcodes() + for val, (reg, _width) in GROUND_TRUTH_OPCODES.items(): + assert val in v_opcodes, ( + f"Ground truth opcode 0x{val:02X} ({reg}) missing from Verilog case statement." + ) + + def test_python_verilog_bidirectional_match(self): + """Python and Verilog must have the same set of opcode values.""" + py_set = set(cp.parse_python_opcodes().keys()) + v_set = set(cp.parse_verilog_opcodes().keys()) + py_only = py_set - v_set + v_only = v_set - py_set + assert not py_only, f"Opcodes in Python but not Verilog: {[hex(x) for x in py_only]}" + assert not v_only, f"Opcodes in Verilog but not Python: {[hex(x) for x in v_only]}" + + def test_verilog_register_names_match(self): + """Verilog case target registers must match ground truth names.""" + v_opcodes = cp.parse_verilog_opcodes() + for val, (expected_reg, _) in GROUND_TRUTH_OPCODES.items(): + if val in v_opcodes: + actual_reg = v_opcodes[val].register + assert actual_reg == expected_reg, ( + f"Opcode 0x{val:02X}: Verilog writes to '{actual_reg}' " + f"but ground truth says '{expected_reg}'" + ) + + +class TestTier1BitWidths: + """Verify register widths and opcode bit slices match ground truth.""" + + def test_verilog_register_widths(self): + """Register declarations must match ground truth bit widths.""" + v_widths = cp.parse_verilog_register_widths() + for reg, expected_width in [ + (name, w) for _, (name, w) in GROUND_TRUTH_OPCODES.items() + ]: + if reg in v_widths: + actual = v_widths[reg] + assert actual >= expected_width, ( + f"{reg}: declared {actual}-bit but ground truth says {expected_width}-bit" + ) + + def test_verilog_opcode_bit_slices(self): + """Opcode case assignments must use correct bit widths from cmd_value.""" + v_opcodes = cp.parse_verilog_opcodes() + for val, (reg, expected_width) in GROUND_TRUTH_OPCODES.items(): + if val not in v_opcodes: + continue + entry = v_opcodes[val] + if entry.is_pulse: + continue # Pulse opcodes don't use cmd_value slicing + if entry.bit_width > 0: + assert entry.bit_width >= expected_width, ( + f"Opcode 0x{val:02X} ({reg}): bit slice {entry.bit_slice} " + f"= {entry.bit_width}-bit, expected >= {expected_width}" + ) + + +class TestTier1StatusWordTruncation: + """Catch the status_words[0] 37->32 bit truncation bug.""" + + @pytest.mark.xfail( + reason="BUG: status_words[0] is 37 bits, truncated to 32 (FT2232H)", + strict=True, + ) + def test_status_words_concat_widths_ft2232h(self): + """Each status_words[] concat must be EXACTLY 32 bits.""" + port_widths = cp.get_usb_interface_port_widths( + cp.FPGA_DIR / "usb_data_interface_ft2232h.v" + ) + concats = cp.parse_verilog_status_word_concats( + cp.FPGA_DIR / "usb_data_interface_ft2232h.v" + ) + + for idx, expr in concats.items(): + result = cp.count_concat_bits(expr, port_widths) + if result.total_bits < 0: + pytest.skip(f"status_words[{idx}]: unknown signal width") + assert result.total_bits == 32, ( + f"status_words[{idx}] is {result.total_bits} bits, not 32! " + f"{'TRUNCATION' if result.total_bits > 32 else 'UNDERFLOW'} BUG. " + f"Fragments: {result.fragments}" + ) + + @pytest.mark.xfail( + reason="BUG: status_words[0] is 37 bits, truncated to 32 (FT601)", + strict=True, + ) + def test_status_words_concat_widths_ft601(self): + """Same check for the FT601 interface (same bug expected).""" + ft601_path = cp.FPGA_DIR / "usb_data_interface.v" + if not ft601_path.exists(): + pytest.skip("FT601 interface file not found") + + port_widths = cp.get_usb_interface_port_widths(ft601_path) + concats = cp.parse_verilog_status_word_concats(ft601_path) + + for idx, expr in concats.items(): + result = cp.count_concat_bits(expr, port_widths) + if result.total_bits < 0: + pytest.skip(f"status_words[{idx}]: unknown signal width") + assert result.total_bits == 32, ( + f"FT601 status_words[{idx}] is {result.total_bits} bits, not 32! " + f"{'TRUNCATION' if result.total_bits > 32 else 'UNDERFLOW'} BUG. " + f"Fragments: {result.fragments}" + ) + + +class TestTier1StatusFieldPositions: + """Verify Python status parser bit positions match Verilog layout.""" + + @pytest.mark.xfail( + reason="BUG: Python reads radar_mode at bit 21, actual is bit 24", + strict=True, + ) + def test_python_status_mode_position(self): + """ + The known status_words[0] bug: Python reads mode at bits [22:21] + but after 37→32 truncation, mode is at [25:24]. This test should + FAIL, proving the bug exists. + """ + # Get what Python thinks + py_fields = cp.parse_python_status_fields() + mode_field = next((f for f in py_fields if f.name == "radar_mode"), None) + assert mode_field is not None, "radar_mode not found in parse_status_packet" + + # The Verilog INTENDED layout (from the code comment) says: + # {0xFF, 3'b000, mode[1:0], 5'b00000, stream[2:0], threshold[15:0]} + # But after 37→32 truncation, the actual bits are: + # [31:29]=111, [28:26]=000, [25:24]=mode, [23:19]=00000, [18:16]=stream, [15:0]=threshold + # Python extracts at shift=21, which is bits [22:21] — WRONG position. + + # Ground truth: after truncation, mode is at [25:24] + expected_shift = 24 + actual_shift = mode_field.lsb + + # This assertion documents the bug. If someone fixes status_words[0] + # to be exactly 32 bits, the intended layout becomes: + # {0xFF, mode[1:0], stream[2:0], threshold[15:0]} = 8+2+3+16 = 29 bits → pad 3 bits + # The Python shift would need updating too. + assert actual_shift == expected_shift, ( + f"KNOWN BUG: Python reads radar_mode at bit {actual_shift} " + f"but after status_words[0] truncation, mode is at bit {expected_shift}. " + f"Both Verilog AND Python need fixing." + ) + + +class TestTier1PacketConstants: + """Verify packet header/footer/size constants match across layers.""" + + def test_python_packet_constants(self): + """Python constants match ground truth.""" + py = cp.parse_python_packet_constants() + for ptype, expected in GROUND_TRUTH_PACKET_CONSTANTS.items(): + assert py[ptype].header == expected["header"], ( + f"Python {ptype} header: 0x{py[ptype].header:02X} != 0x{expected['header']:02X}" + ) + assert py[ptype].footer == expected["footer"], ( + f"Python {ptype} footer: 0x{py[ptype].footer:02X} != 0x{expected['footer']:02X}" + ) + assert py[ptype].size == expected["size"], ( + f"Python {ptype} size: {py[ptype].size} != {expected['size']}" + ) + + def test_verilog_packet_constants(self): + """Verilog localparams match ground truth.""" + v = cp.parse_verilog_packet_constants() + for ptype, expected in GROUND_TRUTH_PACKET_CONSTANTS.items(): + assert v[ptype].header == expected["header"], ( + f"Verilog {ptype} header: 0x{v[ptype].header:02X} != 0x{expected['header']:02X}" + ) + assert v[ptype].footer == expected["footer"], ( + f"Verilog {ptype} footer: 0x{v[ptype].footer:02X} != 0x{expected['footer']:02X}" + ) + assert v[ptype].size == expected["size"], ( + f"Verilog {ptype} size: {v[ptype].size} != {expected['size']}" + ) + + def test_python_verilog_constants_agree(self): + """Python and Verilog packet constants must match each other.""" + py = cp.parse_python_packet_constants() + v = cp.parse_verilog_packet_constants() + for ptype in ("data", "status"): + assert py[ptype].header == v[ptype].header + assert py[ptype].footer == v[ptype].footer + assert py[ptype].size == v[ptype].size + + +class TestTier1ResetDefaults: + """Verify Verilog reset defaults match ground truth.""" + + def test_verilog_reset_defaults(self): + """Reset block values must match ground truth.""" + v_defaults = cp.parse_verilog_reset_defaults() + for reg, expected in GROUND_TRUTH_RESET_DEFAULTS.items(): + assert reg in v_defaults, f"{reg} not found in reset block" + actual = v_defaults[reg] + assert actual == expected, ( + f"{reg}: reset default {actual} != expected {expected}" + ) + + +class TestTier1DataPacketLayout: + """Verify data packet byte layout matches between Python and Verilog.""" + + def test_verilog_data_mux_field_positions(self): + """Verilog data_pkt_byte mux must have correct byte positions.""" + v_fields = cp.parse_verilog_data_mux() + # Expected: range_profile at bytes 1-4 (32-bit), doppler_real 5-6, + # doppler_imag 7-8, cfar 9 + field_map = {f.name: f for f in v_fields} + + assert "range_profile" in field_map + rp = field_map["range_profile"] + assert rp.byte_start == 1 and rp.byte_end == 4 and rp.width_bits == 32 + + assert "doppler_real" in field_map + dr = field_map["doppler_real"] + assert dr.byte_start == 5 and dr.byte_end == 6 and dr.width_bits == 16 + + assert "doppler_imag" in field_map + di = field_map["doppler_imag"] + assert di.byte_start == 7 and di.byte_end == 8 and di.width_bits == 16 + + def test_python_data_packet_byte_positions(self): + """Python parse_data_packet byte offsets must be correct.""" + py_fields = cp.parse_python_data_packet_fields() + # range_q at offset 1 (2B), range_i at offset 3 (2B), + # doppler_i at offset 5 (2B), doppler_q at offset 7 (2B), + # detection at offset 9 + field_map = {f.name: f for f in py_fields} + + assert "range_q" in field_map + assert field_map["range_q"].byte_start == 1 + assert "range_i" in field_map + assert field_map["range_i"].byte_start == 3 + assert "doppler_i" in field_map + assert field_map["doppler_i"].byte_start == 5 + assert "doppler_q" in field_map + assert field_map["doppler_q"].byte_start == 7 + assert "detection" in field_map + assert field_map["detection"].byte_start == 9 + + +class TestTier1STM32SettingsPacket: + """Verify STM32 settings packet layout.""" + + def test_field_order_and_sizes(self): + """STM32 settings fields must have correct offsets and sizes.""" + fields = cp.parse_stm32_settings_fields() + if not fields: + pytest.skip("MCU source not available") + + expected = [ + ("system_frequency", 0, 8, "double"), + ("chirp_duration_1", 8, 8, "double"), + ("chirp_duration_2", 16, 8, "double"), + ("chirps_per_position", 24, 4, "uint32_t"), + ("freq_min", 28, 8, "double"), + ("freq_max", 36, 8, "double"), + ("prf1", 44, 8, "double"), + ("prf2", 52, 8, "double"), + ("max_distance", 60, 8, "double"), + ("map_size", 68, 8, "double"), + ] + + assert len(fields) == len(expected), ( + f"Expected {len(expected)} fields, got {len(fields)}" + ) + + for f, (ename, eoff, esize, etype) in zip(fields, expected, strict=True): + assert f.name == ename, f"Field name: {f.name} != {ename}" + assert f.offset == eoff, f"{f.name}: offset {f.offset} != {eoff}" + assert f.size == esize, f"{f.name}: size {f.size} != {esize}" + assert f.c_type == etype, f"{f.name}: type {f.c_type} != {etype}" + + @pytest.mark.xfail( + reason="BUG: RadarSettings.cpp min check is 74, should be 82", + strict=True, + ) + def test_minimum_packet_size(self): + """ + RadarSettings.cpp says minimum is 74 bytes but actual payload is: + 'SET'(3) + 9*8(doubles) + 4(uint32) + 'END'(3) = 82 bytes. + This test documents the bug. + """ + fields = cp.parse_stm32_settings_fields() + if not fields: + pytest.skip("MCU source not available") + + # Calculate required payload size + total_field_bytes = sum(f.size for f in fields) + # Add markers: "SET"(3) + "END"(3) + required_size = 3 + total_field_bytes + 3 + + # Read the actual minimum check from the source + src = (cp.MCU_LIB_DIR / "RadarSettings.cpp").read_text(encoding="latin-1") + import re + m = re.search(r'length\s*<\s*(\d+)', src) + assert m, "Could not find minimum length check in parseFromUSB" + declared_min = int(m.group(1)) + + assert declared_min == required_size, ( + f"BUFFER OVERREAD BUG: parseFromUSB minimum check is {declared_min} " + f"but actual required size is {required_size}. " + f"({total_field_bytes} bytes of fields + 6 bytes of markers). " + f"If exactly {declared_min} bytes are passed, extractDouble() reads " + f"past the buffer at offset {declared_min - 3} (needs 8 bytes, " + f"only {declared_min - 3 - fields[-1].offset} available)." + ) + + def test_stm32_usb_start_flag(self): + """USB start flag must be [23, 46, 158, 237].""" + flag = cp.parse_stm32_start_flag() + if not flag: + pytest.skip("USBHandler.cpp not available") + assert flag == [23, 46, 158, 237], f"Start flag: {flag}" + + +# =================================================================== +# TIER 2: Verilog Cosimulation +# =================================================================== + +@pytest.mark.skipif(not _has_iverilog, reason="iverilog not available") +class TestTier2VerilogCosim: + """Compile and run the FT2232H TB, validate output against Python parsers.""" + + @pytest.fixture(scope="class") + def tb_results(self, tmp_path_factory): + """Compile and run TB once, return output file contents.""" + workdir = tmp_path_factory.mktemp("verilog_cosim") + + tb_path = THIS_DIR / "tb_cross_layer_ft2232h.v" + rtl_path = cp.FPGA_DIR / "usb_data_interface_ft2232h.v" + out_bin = workdir / "tb_cross_layer_ft2232h" + + # Compile + result = subprocess.run( + [IVERILOG, "-o", str(out_bin), "-I", str(cp.FPGA_DIR), + str(tb_path), str(rtl_path)], + capture_output=True, text=True, timeout=30, + ) + assert result.returncode == 0, f"iverilog compile failed:\n{result.stderr}" + + # Run + result = subprocess.run( + [VVP, str(out_bin)], + capture_output=True, text=True, timeout=60, + cwd=str(workdir), + ) + assert result.returncode == 0, f"vvp failed:\n{result.stderr}" + + # Parse output + return { + "stdout": result.stdout, + "cmd_results": (workdir / "cmd_results.txt").read_text(), + "data_packet": (workdir / "data_packet.txt").read_text(), + "status_packet": (workdir / "status_packet.txt").read_text(), + } + + def test_all_tb_tests_pass(self, tb_results): + """All Verilog TB internal checks must pass.""" + stdout = tb_results["stdout"] + assert "ALL TESTS PASSED" in stdout, f"TB had failures:\n{stdout}" + + def test_command_round_trip(self, tb_results): + """Verify every command decoded correctly by matching sent vs received.""" + rows = _parse_hex_results(tb_results["cmd_results"]) + assert len(rows) >= 20, f"Expected >= 20 command results, got {len(rows)}" + + for row in rows: + assert len(row) == 6, f"Bad row format: {row}" + sent_op, sent_addr, sent_val = row[0], row[1], row[2] + got_op, got_addr, got_val = row[3], row[4], row[5] + assert sent_op == got_op, ( + f"Opcode mismatch: sent 0x{sent_op} got 0x{got_op}" + ) + assert sent_addr == got_addr, ( + f"Addr mismatch: sent 0x{sent_addr} got 0x{got_addr}" + ) + assert sent_val == got_val, ( + f"Value mismatch: sent 0x{sent_val} got 0x{got_val}" + ) + + def test_data_packet_python_round_trip(self, tb_results): + """ + Take the 11 bytes captured by the Verilog TB, run Python's + parse_data_packet() on them, verify the parsed values match + what was injected into the TB. + """ + from radar_protocol import RadarProtocol + + rows = _parse_hex_results(tb_results["data_packet"]) + assert len(rows) == 11, f"Expected 11 data packet bytes, got {len(rows)}" + + # Reconstruct raw bytes + raw = bytes(int(row[1], 16) for row in rows) + assert len(raw) == 11 + + parsed = RadarProtocol.parse_data_packet(raw) + assert parsed is not None, "parse_data_packet returned None" + + # The TB injected: range_profile = 0xCAFE_BEEF = {Q=0xCAFE, I=0xBEEF} + # doppler_real = 0x1234, doppler_imag = 0x5678 + # cfar_detection = 1 + # + # range_q = 0xCAFE → signed = 0xCAFE - 0x10000 = -13570 + # range_i = 0xBEEF → signed = 0xBEEF - 0x10000 = -16657 + # doppler_i = 0x1234 → signed = 4660 + # doppler_q = 0x5678 → signed = 22136 + + assert parsed["range_q"] == (0xCAFE - 0x10000), ( + f"range_q: {parsed['range_q']} != {0xCAFE - 0x10000}" + ) + assert parsed["range_i"] == (0xBEEF - 0x10000), ( + f"range_i: {parsed['range_i']} != {0xBEEF - 0x10000}" + ) + assert parsed["doppler_i"] == 0x1234, ( + f"doppler_i: {parsed['doppler_i']} != {0x1234}" + ) + assert parsed["doppler_q"] == 0x5678, ( + f"doppler_q: {parsed['doppler_q']} != {0x5678}" + ) + assert parsed["detection"] == 1, ( + f"detection: {parsed['detection']} != 1" + ) + + @pytest.mark.xfail( + reason="BUG: radar_mode reads 0 due to truncation + wrong bit pos", + strict=True, + ) + def test_status_packet_python_round_trip(self, tb_results): + """ + Take the 26 bytes captured by the Verilog TB, run Python's + parse_status_packet() on them, verify against injected values. + """ + from radar_protocol import RadarProtocol + + lines = tb_results["status_packet"].strip().splitlines() + # Filter out comments and status_words debug lines + rows = [] + for line in lines: + line = line.strip() + if not line or line.startswith("#"): + continue + rows.append(line.split()) + + assert len(rows) == 26, f"Expected 26 status bytes, got {len(rows)}" + + raw = bytes(int(row[1], 16) for row in rows) + assert len(raw) == 26 + + sr = RadarProtocol.parse_status_packet(raw) + assert sr is not None, "parse_status_packet returned None" + + # Injected values (from TB): + # status_cfar_threshold = 0xABCD + # status_stream_ctrl = 3'b101 = 5 + # status_radar_mode = 2'b11 = 3 + # status_long_chirp = 0x1234 + # status_long_listen = 0x5678 + # status_guard = 0x9ABC + # status_short_chirp = 0xDEF0 + # status_short_listen = 0xFACE + # status_chirps_per_elev = 42 + # status_range_mode = 2'b10 = 2 + # status_self_test_flags = 5'b10101 = 21 + # status_self_test_detail = 0xA5 + # status_self_test_busy = 1 + + # Words 1-5 should be correct (no truncation bug) + assert sr.cfar_threshold == 0xABCD, f"cfar_threshold: 0x{sr.cfar_threshold:04X}" + assert sr.long_chirp == 0x1234, f"long_chirp: 0x{sr.long_chirp:04X}" + assert sr.long_listen == 0x5678, f"long_listen: 0x{sr.long_listen:04X}" + assert sr.guard == 0x9ABC, f"guard: 0x{sr.guard:04X}" + assert sr.short_chirp == 0xDEF0, f"short_chirp: 0x{sr.short_chirp:04X}" + assert sr.short_listen == 0xFACE, f"short_listen: 0x{sr.short_listen:04X}" + assert sr.chirps_per_elev == 42, f"chirps_per_elev: {sr.chirps_per_elev}" + assert sr.range_mode == 2, f"range_mode: {sr.range_mode}" + assert sr.self_test_flags == 21, f"self_test_flags: {sr.self_test_flags}" + assert sr.self_test_detail == 0xA5, f"self_test_detail: 0x{sr.self_test_detail:02X}" + assert sr.self_test_busy == 1, f"self_test_busy: {sr.self_test_busy}" + + # Word 0: This tests the truncation bug. + # stream_ctrl should be 5 (3'b101) + assert sr.stream_ctrl == 5, ( + f"stream_ctrl: {sr.stream_ctrl} != 5. " + f"Check status_words[0] bit positions." + ) + + # radar_mode should be 3 (2'b11) — THIS WILL FAIL due to + # the known 37→32 truncation bug + Python wrong shift. + assert sr.radar_mode == 3, ( + f"KNOWN BUG: radar_mode={sr.radar_mode} != 3. " + f"status_words[0] is 37 bits (truncated to 32) and " + f"Python reads mode at wrong bit position." + ) + + +# =================================================================== +# TIER 3: C Stub Execution +# =================================================================== + +@pytest.mark.skipif(not _has_cxx, reason="C++ compiler not available") +class TestTier3CStub: + """Compile STM32 settings stub and verify field parsing.""" + + @pytest.fixture(scope="class") + def stub_binary(self, tmp_path_factory): + """Compile the C++ stub once.""" + workdir = tmp_path_factory.mktemp("c_stub") + stub_src = THIS_DIR / "stm32_settings_stub.cpp" + radar_settings_src = cp.MCU_LIB_DIR / "RadarSettings.cpp" + out_bin = workdir / "stm32_settings_stub" + + result = subprocess.run( + [CXX, "-std=c++11", "-o", str(out_bin), + str(stub_src), str(radar_settings_src), + f"-I{cp.MCU_LIB_DIR}"], + capture_output=True, text=True, timeout=30, + ) + assert result.returncode == 0, f"Compile failed:\n{result.stderr}" + return out_bin + + def _build_settings_packet(self, values: dict) -> bytes: + """Build a binary settings packet matching RadarSettings::parseFromUSB.""" + pkt = b"SET" + for key in [ + "system_frequency", "chirp_duration_1", "chirp_duration_2", + ]: + pkt += struct.pack(">d", values[key]) + pkt += struct.pack(">I", values["chirps_per_position"]) + for key in [ + "freq_min", "freq_max", "prf1", "prf2", + "max_distance", "map_size", + ]: + pkt += struct.pack(">d", values[key]) + pkt += b"END" + return pkt + + def _run_stub(self, binary: Path, packet: bytes) -> dict[str, str]: + """Run stub with packet file, parse stdout into field dict.""" + with tempfile.NamedTemporaryFile(suffix=".bin", delete=False) as f: + f.write(packet) + pkt_path = f.name + + try: + result = subprocess.run( + [str(binary), pkt_path], + capture_output=True, text=True, timeout=10, + ) + finally: + os.unlink(pkt_path) + + fields = {} + for line in result.stdout.strip().splitlines(): + if "=" in line: + k, v = line.split("=", 1) + fields[k.strip()] = v.strip() + return fields + + def test_default_values_round_trip(self, stub_binary): + """Default settings must parse correctly through C stub.""" + values = { + "system_frequency": 10.0e9, + "chirp_duration_1": 30.0e-6, + "chirp_duration_2": 0.5e-6, + "chirps_per_position": 32, + "freq_min": 10.0e6, + "freq_max": 30.0e6, + "prf1": 1000.0, + "prf2": 2000.0, + "max_distance": 50000.0, + "map_size": 50000.0, + } + pkt = self._build_settings_packet(values) + result = self._run_stub(stub_binary, pkt) + + assert result.get("parse_ok") == "true", f"Parse failed: {result}" + + for key, expected in values.items(): + actual_str = result.get(key) + assert actual_str is not None, f"Missing field: {key}" + actual = int(actual_str) if key == "chirps_per_position" else float(actual_str) + if isinstance(expected, float): + assert abs(actual - expected) < expected * 1e-10, ( + f"{key}: {actual} != {expected}" + ) + else: + assert actual == expected, f"{key}: {actual} != {expected}" + + def test_distinctive_values_round_trip(self, stub_binary): + """Non-default distinctive values must parse correctly.""" + values = { + "system_frequency": 24.125e9, # K-band + "chirp_duration_1": 100.0e-6, + "chirp_duration_2": 2.0e-6, + "chirps_per_position": 64, + "freq_min": 24.0e6, + "freq_max": 24.25e6, + "prf1": 5000.0, + "prf2": 3000.0, + "max_distance": 75000.0, + "map_size": 100000.0, + } + pkt = self._build_settings_packet(values) + result = self._run_stub(stub_binary, pkt) + + assert result.get("parse_ok") == "true", f"Parse failed: {result}" + + for key, expected in values.items(): + actual_str = result.get(key) + assert actual_str is not None, f"Missing field: {key}" + actual = int(actual_str) if key == "chirps_per_position" else float(actual_str) + if isinstance(expected, float): + assert abs(actual - expected) < expected * 1e-10, ( + f"{key}: {actual} != {expected}" + ) + else: + assert actual == expected, f"{key}: {actual} != {expected}" + + def test_truncated_packet_rejected(self, stub_binary): + """Packet shorter than minimum must be rejected.""" + pkt = b"SET" + b"\x00" * 40 + b"END" # Only 46 bytes, needs 82 + result = self._run_stub(stub_binary, pkt) + assert result.get("parse_ok") == "false", ( + f"Expected parse failure for truncated packet, got: {result}" + ) + + def test_bad_markers_rejected(self, stub_binary): + """Packet with wrong start/end markers must be rejected.""" + values = { + "system_frequency": 10.0e9, "chirp_duration_1": 30.0e-6, + "chirp_duration_2": 0.5e-6, "chirps_per_position": 32, + "freq_min": 10.0e6, "freq_max": 30.0e6, + "prf1": 1000.0, "prf2": 2000.0, + "max_distance": 50000.0, "map_size": 50000.0, + } + pkt = self._build_settings_packet(values) + + # Wrong start marker + bad_pkt = b"BAD" + pkt[3:] + result = self._run_stub(stub_binary, bad_pkt) + assert result.get("parse_ok") == "false", "Should reject bad start marker" + + # Wrong end marker + bad_pkt = pkt[:-3] + b"BAD" + result = self._run_stub(stub_binary, bad_pkt) + assert result.get("parse_ok") == "false", "Should reject bad end marker" + + def test_python_c_packet_format_agreement(self, stub_binary): + """ + Python builds a settings packet, C stub parses it. + This tests that both sides agree on the packet format. + """ + # Use values right at validation boundaries to stress-test + values = { + "system_frequency": 1.0e9, # min valid + "chirp_duration_1": 1.0e-6, # min valid + "chirp_duration_2": 0.1e-6, # min valid + "chirps_per_position": 1, # min valid + "freq_min": 1.0e6, # min valid + "freq_max": 2.0e6, # just above freq_min + "prf1": 100.0, # min valid + "prf2": 100.0, # min valid + "max_distance": 100.0, # min valid + "map_size": 1000.0, # min valid + } + pkt = self._build_settings_packet(values) + result = self._run_stub(stub_binary, pkt) + + assert result.get("parse_ok") == "true", ( + f"Boundary values rejected: {result}" + )