feat: 2048-pt FFT upgrade with decimation=4, 512 output bins, 6m spacing

Complete cross-layer upgrade from 1024-pt/64-bin to 2048-pt/512-bin FFT:

FPGA RTL (14+ modules):
- radar_params.vh: FFT_SIZE=2048, RANGE_BINS=512, 9-bit range, 6-bit stream
- fft_engine.v: 2048-pt FFT with XPM BRAM
- chirp_memory_loader_param.v: 2 segments x 2048 (was 4 x 1024)
- matched_filter_multi_segment.v: BRAM inference for overlap_cache, explicit ov_waddr
- mti_canceller.v: BRAM inference for prev_i/q arrays (was fabric FFs)
- doppler_processor.v: 16384-deep memory, 14-bit addressing
- cfar_ca.v: 512 rows, indentation fix
- radar_receiver_final.v: rising-edge detector for frame_complete, 11-bit sample_addr
- range_bin_decimator.v: 512 output bins
- usb_data_interface_ft2232h.v: bulk per-frame with Manhattan magnitude
- radar_mode_controller.v: XOR edge detector for toggle signals
- rx_gain_control.v: updated for new bin count

Python GUI + Protocol (8 files):
- radar_protocol.py: 512-bin bulk frame parser, LSB-first bitmap
- GUI_V65_Tk.py, v7/*.py: updated for 512 bins, 6m range resolution

Golden data + tests:
- All .hex/.csv/.npy golden references regenerated for 2048/512
- fft_twiddle_2048.mem added
- Deleted stale seg2/seg3 chirp mem files
- 9 new bulk frame cross-layer tests, deleted 6 stale per-sample tests
- Deleted stale tb_cross_layer_ft2232h.v and dead contract_parser functions
- Updated validate_mem_files.py for 2048/2-segment config

MCU: RadarSettings.cpp max_distance/map_size 1536->3072

All 4 CI jobs pass: 285 tests, 0 failures, 0 skips
This commit is contained in:
Jason
2026-04-16 17:27:55 +05:45
parent affa40a9d3
commit e9705e40b7
178 changed files with 687738 additions and 122880 deletions
+41 -130
View File
@@ -152,54 +152,6 @@ def parse_python_packet_constants(filepath: Path | None = None) -> dict[str, Pac
}
def parse_python_data_packet_fields(filepath: Path | None = None) -> list[DataPacketField]:
"""
Extract byte offsets from parse_data_packet() by finding struct.unpack_from calls.
Returns fields in byte order.
"""
if filepath is None:
filepath = GUI_DIR / "radar_protocol.py"
text = filepath.read_text()
# Find parse_data_packet method body
match = re.search(
r'def parse_data_packet\(.*?\).*?(?=\n @|\n def |\nclass |\Z)',
text, re.DOTALL
)
if not match:
raise ValueError("Could not find parse_data_packet()")
body = match.group()
fields: list[DataPacketField] = []
# Match patterns like: range_q = _to_signed16(struct.unpack_from(">H", raw, 1)[0])
for m in re.finditer(
r'(\w+)\s*=\s*_to_signed16\(struct\.unpack_from\("(>[HIBhib])", raw, (\d+)\)',
body
):
name = m.group(1)
fmt = m.group(2)
offset = int(m.group(3))
fmt_char = fmt[-1].upper()
size = {"H": 2, "I": 4, "B": 1}[fmt_char]
fields.append(DataPacketField(
name=name, byte_start=offset,
byte_end=offset + size - 1,
width_bits=size * 8
))
# Match detection = raw[9] & 0x01
for m in re.finditer(r'(\w+)\s*=\s*raw\[(\d+)\]\s*&\s*(0x[0-9a-fA-F]+|\d+)', body):
name = m.group(1)
offset = int(m.group(2))
fields.append(DataPacketField(
name=name, byte_start=offset, byte_end=offset, width_bits=1
))
fields.sort(key=lambda f: f.byte_start)
return fields
def parse_python_status_fields(filepath: Path | None = None) -> list[StatusWordField]:
"""
Extract bit shift/mask operations from parse_status_packet().
@@ -354,34 +306,63 @@ def parse_verilog_opcodes(filepath: Path | None = None) -> dict[int, OpcodeEntry
return opcodes
def _parse_verilog_defines(filepath: Path | None = None) -> dict[str, str]:
"""Parse `define macros from radar_params.vh into {name: raw_value}."""
if filepath is None:
filepath = FPGA_DIR / "radar_params.vh"
if not filepath.exists():
return {}
defines: dict[str, str] = {}
for m in re.finditer(
r'`define\s+(\w+)\s+(.+?)(?://.*)?$',
filepath.read_text(),
re.MULTILINE,
):
defines[m.group(1)] = m.group(2).strip()
return defines
def _resolve_verilog_literal(val_str: str) -> int:
"""Convert a Verilog literal (e.g. 6'b000_111 or 10000) to int."""
if "'" in val_str:
base_char = val_str.split("'")[1][0].lower()
digits = val_str.split("'")[1][1:].replace("_", "")
base = {"b": 2, "d": 10, "h": 16, "o": 8}[base_char]
return int(digits, base)
return int(val_str)
def parse_verilog_reset_defaults(filepath: Path | None = None) -> dict[str, int]:
"""
Parse the reset block from radar_system_top.v.
Returns {register_name: reset_value}.
Resolves `define macros from radar_params.vh.
"""
if filepath is None:
filepath = FPGA_DIR / "radar_system_top.v"
text = filepath.read_text()
defines = _parse_verilog_defines()
defaults: dict[str, int] = {}
# Match patterns like: host_radar_mode <= 2'b01;
# Also: host_detect_threshold <= 16'd10000;
# Also: host_stream_control <= `RP_STREAM_CTRL_DEFAULT;
for m in re.finditer(
r'(host_\w+)\s*<=\s*(\d+\'[bdho][0-9a-fA-F_]+|\d+)\s*;',
r'(host_\w+)\s*<=\s*(`\w+|\d+\'[bdho][0-9a-fA-F_]+|\d+)\s*;',
text
):
reg = m.group(1)
val_str = m.group(2)
# Parse Verilog literal
if "'" in val_str:
base_char = val_str.split("'")[1][0].lower()
digits = val_str.split("'")[1][1:].replace("_", "")
base = {"b": 2, "d": 10, "h": 16, "o": 8}[base_char]
value = int(digits, base)
# Resolve macro or parse Verilog literal
if val_str.startswith("`"):
macro_name = val_str[1:]
if macro_name not in defines:
continue # skip unresolvable macros
value = _resolve_verilog_literal(defines[macro_name])
else:
value = int(val_str)
value = _resolve_verilog_literal(val_str)
# Only keep first occurrence (the reset block comes before the
# opcode decode which also has <= assignments)
@@ -436,15 +417,15 @@ def parse_verilog_packet_constants(
return int(vlog_m.group(1))
return int(val, 16) if val.startswith("0x") else int(val)
header_val = _find(r"localparam\s+HEADER\s*=\s*(\d+'h[0-9a-fA-F]+)")
_find(r"localparam\s+HEADER\s*=\s*(\d+'h[0-9a-fA-F]+)") # bulk frames — no data header
footer_val = _find(r"localparam\s+FOOTER\s*=\s*(\d+'h[0-9a-fA-F]+)")
status_hdr = _find(r"localparam\s+STATUS_HEADER\s*=\s*(\d+'h[0-9a-fA-F]+)")
data_size = _find(r"DATA_PKT_LEN\s*=\s*(\d+'d\d+)")
status_size = _find(r"STATUS_PKT_LEN\s*=\s*(\d+'d\d+)")
# FT2232H uses bulk per-frame transfers — no fixed data packet size.
# Only status packets have a fixed size.
return {
"data": PacketConstants(header=header_val, footer=footer_val, size=data_size),
"status": PacketConstants(header=status_hdr, footer=footer_val, size=status_size),
}
@@ -559,76 +540,6 @@ def get_usb_interface_port_widths(filepath: Path | None = None) -> dict[str, int
return widths
def parse_verilog_data_mux(
filepath: Path | None = None,
) -> list[DataPacketField]:
"""
Parse the data_pkt_byte mux from usb_data_interface_ft2232h.v.
Returns fields with byte positions and signal names.
"""
if filepath is None:
filepath = FPGA_DIR / "usb_data_interface_ft2232h.v"
text = filepath.read_text()
# Find the data mux case block
match = re.search(
r'always\s+@\(\*\)\s+begin\s+case\s*\(wr_byte_idx\)(.*?)endcase',
text, re.DOTALL
)
if not match:
raise ValueError("Could not find data_pkt_byte mux")
mux_body = match.group(1)
entries: list[tuple[int, str]] = []
for m in re.finditer(
r"5'd(\d+)\s*:\s*data_pkt_byte\s*=\s*(.+?);",
mux_body
):
idx = int(m.group(1))
expr = m.group(2).strip()
entries.append((idx, expr))
# Group consecutive bytes by signal root name
fields: list[DataPacketField] = []
i = 0
while i < len(entries):
idx, expr = entries[i]
if expr == "HEADER" or expr == "FOOTER":
i += 1
continue
# Extract signal name (e.g., range_profile_cap from range_profile_cap[31:24])
sig_match = re.match(r'(\w+?)(?:\[|$)', expr)
if not sig_match:
i += 1
continue
signal = sig_match.group(1)
start_byte = idx
end_byte = idx
# Find consecutive bytes of the same signal
j = i + 1
while j < len(entries):
next_idx, next_expr = entries[j]
if next_expr.startswith(signal):
end_byte = next_idx
j += 1
else:
break
n_bytes = end_byte - start_byte + 1
fields.append(DataPacketField(
name=signal.replace("_cap", ""),
byte_start=start_byte,
byte_end=end_byte,
width_bits=n_bytes * 8,
))
i = j
return fields
# ===================================================================
# STM32 / C layer parser
@@ -803,7 +714,7 @@ def parse_radar_params_vh() -> dict[str, int]:
"""
Parse `define values from radar_params.vh.
Returns dict like {"RP_FFT_SIZE": 1024, "RP_DECIMATION_FACTOR": 16, ...}.
Returns dict like {"RP_FFT_SIZE": 2048, "RP_DECIMATION_FACTOR": 4, ...}.
Only parses defines with simple integer or Verilog literal values.
Skips bit-width prefixed literals (e.g. 2'b00) — returns the numeric value.
"""
@@ -1,714 +0,0 @@
`timescale 1ns / 1ps
/**
* tb_cross_layer_ft2232h.v
*
* Cross-layer contract testbench for the FT2232H USB interface.
* Exercises three packet types with known distinctive values and dumps
* captured bytes to text files that the Python orchestrator can parse.
*
* Exercise A: Command round-trip (Host -> FPGA)
* - Send every opcode through the 4-byte read FSM
* - Dump cmd_opcode, cmd_addr, cmd_value to cmd_results.txt
*
* Exercise B: Data packet generation (FPGA -> Host)
* - Inject known range/doppler/cfar values
* - Capture all 11 output bytes
* - Dump to data_packet.txt
*
* Exercise C: Status packet generation (FPGA -> Host)
* - Set all status inputs to known non-zero values
* - Trigger status request
* - Capture all 26 output bytes
* - Dump to status_packet.txt
*/
module tb_cross_layer_ft2232h;
// Clock periods
localparam CLK_PERIOD = 10.0; // 100 MHz system clock
localparam FT_CLK_PERIOD = 16.67; // 60 MHz FT2232H clock
// ---- Signals ----
reg clk;
reg reset_n;
reg ft_reset_n;
// Radar data inputs
reg [31:0] range_profile;
reg range_valid;
reg [15:0] doppler_real;
reg [15:0] doppler_imag;
reg doppler_valid;
reg cfar_detection;
reg cfar_valid;
// FT2232H physical interface
wire [7:0] ft_data;
reg ft_rxf_n;
reg ft_txe_n;
wire ft_rd_n;
wire ft_wr_n;
wire ft_oe_n;
wire ft_siwu;
reg ft_clk;
// Host-side bus driver (for command injection)
reg [7:0] host_data_drive;
reg host_data_drive_en;
assign ft_data = host_data_drive_en ? host_data_drive : 8'hZZ;
// Pulldown to avoid X during idle
pulldown pd[7:0] (ft_data);
// DUT command outputs
wire [31:0] cmd_data;
wire cmd_valid;
wire [7:0] cmd_opcode;
wire [7:0] cmd_addr;
wire [15:0] cmd_value;
// Stream control
reg [2:0] stream_control;
// Status inputs
reg status_request;
reg [15:0] status_cfar_threshold;
reg [2:0] status_stream_ctrl;
reg [1:0] status_radar_mode;
reg [15:0] status_long_chirp;
reg [15:0] status_long_listen;
reg [15:0] status_guard;
reg [15:0] status_short_chirp;
reg [15:0] status_short_listen;
reg [5:0] status_chirps_per_elev;
reg [1:0] status_range_mode;
reg [4:0] status_self_test_flags;
reg [7:0] status_self_test_detail;
reg status_self_test_busy;
reg [3:0] status_agc_current_gain;
reg [7:0] status_agc_peak_magnitude;
reg [7:0] status_agc_saturation_count;
reg status_agc_enable;
// ---- Clock generators ----
always #(CLK_PERIOD / 2) clk = ~clk;
always #(FT_CLK_PERIOD / 2) ft_clk = ~ft_clk;
// ---- DUT instantiation ----
usb_data_interface_ft2232h uut (
.clk (clk),
.reset_n (reset_n),
.ft_reset_n (ft_reset_n),
.range_profile (range_profile),
.range_valid (range_valid),
.doppler_real (doppler_real),
.doppler_imag (doppler_imag),
.doppler_valid (doppler_valid),
.cfar_detection (cfar_detection),
.cfar_valid (cfar_valid),
.ft_data (ft_data),
.ft_rxf_n (ft_rxf_n),
.ft_txe_n (ft_txe_n),
.ft_rd_n (ft_rd_n),
.ft_wr_n (ft_wr_n),
.ft_oe_n (ft_oe_n),
.ft_siwu (ft_siwu),
.ft_clk (ft_clk),
.cmd_data (cmd_data),
.cmd_valid (cmd_valid),
.cmd_opcode (cmd_opcode),
.cmd_addr (cmd_addr),
.cmd_value (cmd_value),
.stream_control (stream_control),
.status_request (status_request),
.status_cfar_threshold (status_cfar_threshold),
.status_stream_ctrl (status_stream_ctrl),
.status_radar_mode (status_radar_mode),
.status_long_chirp (status_long_chirp),
.status_long_listen (status_long_listen),
.status_guard (status_guard),
.status_short_chirp (status_short_chirp),
.status_short_listen (status_short_listen),
.status_chirps_per_elev (status_chirps_per_elev),
.status_range_mode (status_range_mode),
.status_self_test_flags (status_self_test_flags),
.status_self_test_detail(status_self_test_detail),
.status_self_test_busy (status_self_test_busy),
.status_agc_current_gain (status_agc_current_gain),
.status_agc_peak_magnitude (status_agc_peak_magnitude),
.status_agc_saturation_count(status_agc_saturation_count),
.status_agc_enable (status_agc_enable)
);
// ---- Test bookkeeping ----
integer pass_count;
integer fail_count;
integer test_num;
integer cmd_file;
integer data_file;
integer status_file;
// ---- Check task ----
task check;
input cond;
input [511:0] label;
begin
test_num = test_num + 1;
if (cond) begin
$display("[PASS] Test %0d: %0s", test_num, label);
pass_count = pass_count + 1;
end else begin
$display("[FAIL] Test %0d: %0s", test_num, label);
fail_count = fail_count + 1;
end
end
endtask
// ---- Helper: apply reset ----
task apply_reset;
begin
reset_n = 0;
ft_reset_n = 0;
range_profile = 32'h0;
range_valid = 0;
doppler_real = 16'h0;
doppler_imag = 16'h0;
doppler_valid = 0;
cfar_detection = 0;
cfar_valid = 0;
ft_rxf_n = 1; // No host data available
ft_txe_n = 0; // TX FIFO ready
host_data_drive = 8'h0;
host_data_drive_en = 0;
stream_control = 3'b111;
status_request = 0;
status_cfar_threshold = 16'd0;
status_stream_ctrl = 3'b000;
status_radar_mode = 2'b00;
status_long_chirp = 16'd0;
status_long_listen = 16'd0;
status_guard = 16'd0;
status_short_chirp = 16'd0;
status_short_listen = 16'd0;
status_chirps_per_elev = 6'd0;
status_range_mode = 2'b00;
status_self_test_flags = 5'b00000;
status_self_test_detail = 8'd0;
status_self_test_busy = 1'b0;
status_agc_current_gain = 4'd0;
status_agc_peak_magnitude = 8'd0;
status_agc_saturation_count = 8'd0;
status_agc_enable = 1'b0;
repeat (6) @(posedge ft_clk);
reset_n = 1;
ft_reset_n = 1;
// Wait for stream_control CDC to propagate
repeat (8) @(posedge ft_clk);
end
endtask
// ---- Helper: send one 4-byte command via FT2232H read path ----
//
// FT2232H read FSM cycle-by-cycle:
// Cycle 0 (RD_IDLE): sees !ft_rxf_n ft_oe_n<=0, RD_OE_ASSERT
// Cycle 1 (RD_OE_ASSERT): sees !ft_rxf_n ft_rd_n<=0, RD_READING
// Cycle 2 (RD_READING): samples ft_data=byte0, cnt 01
// Cycle 3 (RD_READING): samples ft_data=byte1, cnt 12
// Cycle 4 (RD_READING): samples ft_data=byte2, cnt 23
// Cycle 5 (RD_READING): samples ft_data=byte3, cnt=30, RD_DEASSERT
// Cycle 6 (RD_DEASSERT): ft_oe_n<=1, RD_PROCESS
// Cycle 7 (RD_PROCESS): cmd_valid<=1, decode, RD_IDLE
//
// Data must be stable BEFORE the sampling posedge. We use #1 after
// posedge to change data in the "delta after" region.
task send_command_ft2232h;
input [7:0] byte0; // opcode
input [7:0] byte1; // addr
input [7:0] byte2; // value_hi
input [7:0] byte3; // value_lo
begin
// Pre-drive byte0 and signal data available
@(posedge ft_clk); #1;
host_data_drive = byte0;
host_data_drive_en = 1;
ft_rxf_n = 0;
// Cycle 0: RD_IDLE sees !ft_rxf_n, goes to OE_ASSERT
@(posedge ft_clk); #1;
// Cycle 1: RD_OE_ASSERT, ft_rd_n goes low, goes to RD_READING
@(posedge ft_clk); #1;
// Cycle 2: RD_READING, byte0 is sampled, cnt 01
// Now change to byte1 for next sample
@(posedge ft_clk); #1;
host_data_drive = byte1;
// Cycle 3: RD_READING, byte1 is sampled, cnt 12
@(posedge ft_clk); #1;
host_data_drive = byte2;
// Cycle 4: RD_READING, byte2 is sampled, cnt 23
@(posedge ft_clk); #1;
host_data_drive = byte3;
// Cycle 5: RD_READING, byte3 is sampled, cnt=3, RD_DEASSERT
@(posedge ft_clk); #1;
// Cycle 6: RD_DEASSERT, ft_oe_n1, RD_PROCESS
@(posedge ft_clk); #1;
// Cycle 7: RD_PROCESS, cmd decoded, cmd_valid1, RD_IDLE
@(posedge ft_clk); #1;
// cmd_valid was asserted at cycle 7's posedge. cmd_opcode/addr/value
// are now valid (registered outputs hold until next RD_PROCESS).
// Release bus
host_data_drive_en = 0;
host_data_drive = 8'h0;
ft_rxf_n = 1;
// Settle
repeat (2) @(posedge ft_clk);
end
endtask
// ---- Helper: capture N write bytes from the DUT ----
// Monitors ft_wr_n and ft_data_out, captures bytes into array.
// Used for data packets (11 bytes) and status packets (26 bytes).
reg [7:0] captured_bytes [0:31];
integer capture_count;
task capture_write_bytes;
input integer expected_count;
integer timeout;
begin
capture_count = 0;
timeout = 0;
while (capture_count < expected_count && timeout < 2000) begin
@(posedge ft_clk); #1;
timeout = timeout + 1;
// DUT drives byte when ft_wr_n=0 and ft_data_oe=1
// Sample AFTER posedge so registered outputs are settled
if (!ft_wr_n && uut.ft_data_oe) begin
captured_bytes[capture_count] = uut.ft_data_out;
capture_count = capture_count + 1;
end
end
end
endtask
// ---- Helper: pulse range_valid with CDC wait ----
// Toggle CDC needs 3 sync stages + edge detect = 4+ ft_clk cycles.
// Use 12 for safety margin.
task assert_range_valid;
input [31:0] data;
begin
@(posedge clk); #1;
range_profile = data;
range_valid = 1;
@(posedge clk); #1;
range_valid = 0;
// Wait for toggle CDC propagation
repeat (12) @(posedge ft_clk);
end
endtask
// ---- Helper: pulse doppler_valid ----
task pulse_doppler;
input [15:0] dr;
input [15:0] di;
begin
@(posedge clk); #1;
doppler_real = dr;
doppler_imag = di;
doppler_valid = 1;
@(posedge clk); #1;
doppler_valid = 0;
repeat (12) @(posedge ft_clk);
end
endtask
// ---- Helper: pulse cfar_valid ----
task pulse_cfar;
input det;
begin
@(posedge clk); #1;
cfar_detection = det;
cfar_valid = 1;
@(posedge clk); #1;
cfar_valid = 0;
repeat (12) @(posedge ft_clk);
end
endtask
// ---- Helper: pulse status_request ----
task pulse_status_request;
begin
@(posedge clk); #1;
status_request = 1;
@(posedge clk); #1;
status_request = 0;
// Wait for toggle CDC propagation
repeat (12) @(posedge ft_clk);
end
endtask
// ================================================================
// Main stimulus
// ================================================================
integer i;
initial begin
$dumpfile("tb_cross_layer_ft2232h.vcd");
$dumpvars(0, tb_cross_layer_ft2232h);
clk = 0;
ft_clk = 0;
pass_count = 0;
fail_count = 0;
test_num = 0;
// ============================================================
// EXERCISE A: Command Round-Trip
// Send commands with known opcode/addr/value, verify decoding.
// Dump results to cmd_results.txt for Python validation.
// ============================================================
$display("\n=== EXERCISE A: Command Round-Trip ===");
apply_reset;
cmd_file = $fopen("cmd_results.txt", "w");
$fwrite(cmd_file, "# opcode_sent addr_sent value_sent opcode_got addr_got value_got\n");
// Test all real opcodes from radar_system_top.v
// Format: opcode, addr=0x00, value
// Basic control
send_command_ft2232h(8'h01, 8'h00, 8'h00, 8'h02); // RADAR_MODE=2
$fwrite(cmd_file, "%02x %02x %04x %02x %02x %04x\n",
8'h01, 8'h00, 16'h0002, cmd_opcode, cmd_addr, cmd_value);
check(cmd_opcode === 8'h01 && cmd_value === 16'h0002,
"Cmd 0x01: RADAR_MODE=2");
send_command_ft2232h(8'h02, 8'h00, 8'h00, 8'h01); // TRIGGER_PULSE
$fwrite(cmd_file, "%02x %02x %04x %02x %02x %04x\n",
8'h02, 8'h00, 16'h0001, cmd_opcode, cmd_addr, cmd_value);
check(cmd_opcode === 8'h02 && cmd_value === 16'h0001,
"Cmd 0x02: TRIGGER_PULSE");
send_command_ft2232h(8'h03, 8'h00, 8'h27, 8'h10); // DETECT_THRESHOLD=10000
$fwrite(cmd_file, "%02x %02x %04x %02x %02x %04x\n",
8'h03, 8'h00, 16'h2710, cmd_opcode, cmd_addr, cmd_value);
check(cmd_opcode === 8'h03 && cmd_value === 16'h2710,
"Cmd 0x03: DETECT_THRESHOLD=10000");
send_command_ft2232h(8'h04, 8'h00, 8'h00, 8'h07); // STREAM_CONTROL=7
$fwrite(cmd_file, "%02x %02x %04x %02x %02x %04x\n",
8'h04, 8'h00, 16'h0007, cmd_opcode, cmd_addr, cmd_value);
check(cmd_opcode === 8'h04 && cmd_value === 16'h0007,
"Cmd 0x04: STREAM_CONTROL=7");
// Chirp timing
send_command_ft2232h(8'h10, 8'h00, 8'h0B, 8'hB8); // LONG_CHIRP=3000
$fwrite(cmd_file, "%02x %02x %04x %02x %02x %04x\n",
8'h10, 8'h00, 16'h0BB8, cmd_opcode, cmd_addr, cmd_value);
check(cmd_opcode === 8'h10 && cmd_value === 16'h0BB8,
"Cmd 0x10: LONG_CHIRP=3000");
send_command_ft2232h(8'h11, 8'h00, 8'h35, 8'h84); // LONG_LISTEN=13700
$fwrite(cmd_file, "%02x %02x %04x %02x %02x %04x\n",
8'h11, 8'h00, 16'h3584, cmd_opcode, cmd_addr, cmd_value);
check(cmd_opcode === 8'h11 && cmd_value === 16'h3584,
"Cmd 0x11: LONG_LISTEN=13700");
send_command_ft2232h(8'h12, 8'h00, 8'h44, 8'h84); // GUARD=17540
$fwrite(cmd_file, "%02x %02x %04x %02x %02x %04x\n",
8'h12, 8'h00, 16'h4484, cmd_opcode, cmd_addr, cmd_value);
check(cmd_opcode === 8'h12 && cmd_value === 16'h4484,
"Cmd 0x12: GUARD=17540");
send_command_ft2232h(8'h13, 8'h00, 8'h00, 8'h32); // SHORT_CHIRP=50
$fwrite(cmd_file, "%02x %02x %04x %02x %02x %04x\n",
8'h13, 8'h00, 16'h0032, cmd_opcode, cmd_addr, cmd_value);
check(cmd_opcode === 8'h13 && cmd_value === 16'h0032,
"Cmd 0x13: SHORT_CHIRP=50");
send_command_ft2232h(8'h14, 8'h00, 8'h44, 8'h2A); // SHORT_LISTEN=17450
$fwrite(cmd_file, "%02x %02x %04x %02x %02x %04x\n",
8'h14, 8'h00, 16'h442A, cmd_opcode, cmd_addr, cmd_value);
check(cmd_opcode === 8'h14 && cmd_value === 16'h442A,
"Cmd 0x14: SHORT_LISTEN=17450");
send_command_ft2232h(8'h15, 8'h00, 8'h00, 8'h20); // CHIRPS_PER_ELEV=32
$fwrite(cmd_file, "%02x %02x %04x %02x %02x %04x\n",
8'h15, 8'h00, 16'h0020, cmd_opcode, cmd_addr, cmd_value);
check(cmd_opcode === 8'h15 && cmd_value === 16'h0020,
"Cmd 0x15: CHIRPS_PER_ELEV=32");
// Digital gain
send_command_ft2232h(8'h16, 8'h00, 8'h00, 8'h05); // GAIN_SHIFT=5
$fwrite(cmd_file, "%02x %02x %04x %02x %02x %04x\n",
8'h16, 8'h00, 16'h0005, cmd_opcode, cmd_addr, cmd_value);
check(cmd_opcode === 8'h16 && cmd_value === 16'h0005,
"Cmd 0x16: GAIN_SHIFT=5");
// Signal processing
send_command_ft2232h(8'h20, 8'h00, 8'h00, 8'h01); // RANGE_MODE=1
$fwrite(cmd_file, "%02x %02x %04x %02x %02x %04x\n",
8'h20, 8'h00, 16'h0001, cmd_opcode, cmd_addr, cmd_value);
check(cmd_opcode === 8'h20 && cmd_value === 16'h0001,
"Cmd 0x20: RANGE_MODE=1");
send_command_ft2232h(8'h21, 8'h00, 8'h00, 8'h03); // CFAR_GUARD=3
$fwrite(cmd_file, "%02x %02x %04x %02x %02x %04x\n",
8'h21, 8'h00, 16'h0003, cmd_opcode, cmd_addr, cmd_value);
check(cmd_opcode === 8'h21 && cmd_value === 16'h0003,
"Cmd 0x21: CFAR_GUARD=3");
send_command_ft2232h(8'h22, 8'h00, 8'h00, 8'h0C); // CFAR_TRAIN=12
$fwrite(cmd_file, "%02x %02x %04x %02x %02x %04x\n",
8'h22, 8'h00, 16'h000C, cmd_opcode, cmd_addr, cmd_value);
check(cmd_opcode === 8'h22 && cmd_value === 16'h000C,
"Cmd 0x22: CFAR_TRAIN=12");
send_command_ft2232h(8'h23, 8'h00, 8'h00, 8'h30); // CFAR_ALPHA=0x30
$fwrite(cmd_file, "%02x %02x %04x %02x %02x %04x\n",
8'h23, 8'h00, 16'h0030, cmd_opcode, cmd_addr, cmd_value);
check(cmd_opcode === 8'h23 && cmd_value === 16'h0030,
"Cmd 0x23: CFAR_ALPHA=0x30");
send_command_ft2232h(8'h24, 8'h00, 8'h00, 8'h01); // CFAR_MODE=1 (GO)
$fwrite(cmd_file, "%02x %02x %04x %02x %02x %04x\n",
8'h24, 8'h00, 16'h0001, cmd_opcode, cmd_addr, cmd_value);
check(cmd_opcode === 8'h24 && cmd_value === 16'h0001,
"Cmd 0x24: CFAR_MODE=1");
send_command_ft2232h(8'h25, 8'h00, 8'h00, 8'h01); // CFAR_ENABLE=1
$fwrite(cmd_file, "%02x %02x %04x %02x %02x %04x\n",
8'h25, 8'h00, 16'h0001, cmd_opcode, cmd_addr, cmd_value);
check(cmd_opcode === 8'h25 && cmd_value === 16'h0001,
"Cmd 0x25: CFAR_ENABLE=1");
send_command_ft2232h(8'h26, 8'h00, 8'h00, 8'h01); // MTI_ENABLE=1
$fwrite(cmd_file, "%02x %02x %04x %02x %02x %04x\n",
8'h26, 8'h00, 16'h0001, cmd_opcode, cmd_addr, cmd_value);
check(cmd_opcode === 8'h26 && cmd_value === 16'h0001,
"Cmd 0x26: MTI_ENABLE=1");
send_command_ft2232h(8'h27, 8'h00, 8'h00, 8'h03); // DC_NOTCH_WIDTH=3
$fwrite(cmd_file, "%02x %02x %04x %02x %02x %04x\n",
8'h27, 8'h00, 16'h0003, cmd_opcode, cmd_addr, cmd_value);
check(cmd_opcode === 8'h27 && cmd_value === 16'h0003,
"Cmd 0x27: DC_NOTCH_WIDTH=3");
// AGC registers (0x28-0x2C)
send_command_ft2232h(8'h28, 8'h00, 8'h00, 8'h01); // AGC_ENABLE=1
$fwrite(cmd_file, "%02x %02x %04x %02x %02x %04x\n",
8'h28, 8'h00, 16'h0001, cmd_opcode, cmd_addr, cmd_value);
check(cmd_opcode === 8'h28 && cmd_value === 16'h0001,
"Cmd 0x28: AGC_ENABLE=1");
send_command_ft2232h(8'h29, 8'h00, 8'h00, 8'hC8); // AGC_TARGET=200
$fwrite(cmd_file, "%02x %02x %04x %02x %02x %04x\n",
8'h29, 8'h00, 16'h00C8, cmd_opcode, cmd_addr, cmd_value);
check(cmd_opcode === 8'h29 && cmd_value === 16'h00C8,
"Cmd 0x29: AGC_TARGET=200");
send_command_ft2232h(8'h2A, 8'h00, 8'h00, 8'h02); // AGC_ATTACK=2
$fwrite(cmd_file, "%02x %02x %04x %02x %02x %04x\n",
8'h2A, 8'h00, 16'h0002, cmd_opcode, cmd_addr, cmd_value);
check(cmd_opcode === 8'h2A && cmd_value === 16'h0002,
"Cmd 0x2A: AGC_ATTACK=2");
send_command_ft2232h(8'h2B, 8'h00, 8'h00, 8'h03); // AGC_DECAY=3
$fwrite(cmd_file, "%02x %02x %04x %02x %02x %04x\n",
8'h2B, 8'h00, 16'h0003, cmd_opcode, cmd_addr, cmd_value);
check(cmd_opcode === 8'h2B && cmd_value === 16'h0003,
"Cmd 0x2B: AGC_DECAY=3");
send_command_ft2232h(8'h2C, 8'h00, 8'h00, 8'h06); // AGC_HOLDOFF=6
$fwrite(cmd_file, "%02x %02x %04x %02x %02x %04x\n",
8'h2C, 8'h00, 16'h0006, cmd_opcode, cmd_addr, cmd_value);
check(cmd_opcode === 8'h2C && cmd_value === 16'h0006,
"Cmd 0x2C: AGC_HOLDOFF=6");
// Self-test / status
send_command_ft2232h(8'h30, 8'h00, 8'h00, 8'h01); // SELF_TEST_TRIGGER
$fwrite(cmd_file, "%02x %02x %04x %02x %02x %04x\n",
8'h30, 8'h00, 16'h0001, cmd_opcode, cmd_addr, cmd_value);
check(cmd_opcode === 8'h30 && cmd_value === 16'h0001,
"Cmd 0x30: SELF_TEST_TRIGGER");
send_command_ft2232h(8'h31, 8'h00, 8'h00, 8'h01); // SELF_TEST_STATUS
$fwrite(cmd_file, "%02x %02x %04x %02x %02x %04x\n",
8'h31, 8'h00, 16'h0001, cmd_opcode, cmd_addr, cmd_value);
check(cmd_opcode === 8'h31 && cmd_value === 16'h0001,
"Cmd 0x31: SELF_TEST_STATUS");
send_command_ft2232h(8'hFF, 8'h00, 8'h00, 8'h00); // STATUS_REQUEST
$fwrite(cmd_file, "%02x %02x %04x %02x %02x %04x\n",
8'hFF, 8'h00, 16'h0000, cmd_opcode, cmd_addr, cmd_value);
check(cmd_opcode === 8'hFF && cmd_value === 16'h0000,
"Cmd 0xFF: STATUS_REQUEST");
// Non-zero addr test
send_command_ft2232h(8'h01, 8'hAB, 8'hCD, 8'hEF); // addr=0xAB, value=0xCDEF
$fwrite(cmd_file, "%02x %02x %04x %02x %02x %04x\n",
8'h01, 8'hAB, 16'hCDEF, cmd_opcode, cmd_addr, cmd_value);
check(cmd_opcode === 8'h01 && cmd_addr === 8'hAB && cmd_value === 16'hCDEF,
"Cmd 0x01 with addr=0xAB, value=0xCDEF");
$fclose(cmd_file);
// ============================================================
// EXERCISE B: Data Packet Generation
// Inject known values, capture 11-byte output.
// ============================================================
$display("\n=== EXERCISE B: Data Packet Generation ===");
apply_reset;
ft_txe_n = 0; // TX FIFO ready
// Use distinctive values that make truncation/swap bugs obvious
// range_profile = {Q[15:0], I[15:0]} = {0xCAFE, 0xBEEF}
// doppler_real = 0x1234, doppler_imag = 0x5678
// cfar_detection = 1
// First inject doppler and cfar so pending flags are set
pulse_doppler(16'h1234, 16'h5678);
pulse_cfar(1'b1);
// Now inject range_valid which triggers the write FSM.
// CRITICAL: Must capture bytes IN PARALLEL with the trigger,
// because the write FSM starts sending bytes ~3-4 ft_clk cycles
// after the toggle CDC propagates. If we wait for CDC propagation
// first, capture_write_bytes misses the early bytes.
fork
assert_range_valid(32'hCAFE_BEEF);
capture_write_bytes(11);
join
check(capture_count === 11,
"Data packet: captured 11 bytes");
// Dump captured bytes to file
data_file = $fopen("data_packet.txt", "w");
$fwrite(data_file, "# byte_index hex_value\n");
for (i = 0; i < capture_count; i = i + 1) begin
$fwrite(data_file, "%0d %02x\n", i, captured_bytes[i]);
end
$fclose(data_file);
// Verify locally too
check(captured_bytes[0] === 8'hAA,
"Data pkt: byte 0 = 0xAA (header)");
check(captured_bytes[1] === 8'hCA,
"Data pkt: byte 1 = 0xCA (range MSB = Q high)");
check(captured_bytes[2] === 8'hFE,
"Data pkt: byte 2 = 0xFE (range Q low)");
check(captured_bytes[3] === 8'hBE,
"Data pkt: byte 3 = 0xBE (range I high)");
check(captured_bytes[4] === 8'hEF,
"Data pkt: byte 4 = 0xEF (range I low)");
check(captured_bytes[5] === 8'h12,
"Data pkt: byte 5 = 0x12 (doppler_real MSB)");
check(captured_bytes[6] === 8'h34,
"Data pkt: byte 6 = 0x34 (doppler_real LSB)");
check(captured_bytes[7] === 8'h56,
"Data pkt: byte 7 = 0x56 (doppler_imag MSB)");
check(captured_bytes[8] === 8'h78,
"Data pkt: byte 8 = 0x78 (doppler_imag LSB)");
check(captured_bytes[9] === 8'h01,
"Data pkt: byte 9 = 0x01 (cfar_detection=1)");
check(captured_bytes[10] === 8'h55,
"Data pkt: byte 10 = 0x55 (footer)");
// ============================================================
// EXERCISE C: Status Packet Generation
// Set known status values, trigger readback, capture 26 bytes.
// Uses distinctive non-zero values to detect truncation/swap.
// ============================================================
$display("\n=== EXERCISE C: Status Packet Generation ===");
apply_reset;
ft_txe_n = 0;
// Set known distinctive status values
status_cfar_threshold = 16'hABCD;
status_stream_ctrl = 3'b101;
status_radar_mode = 2'b11; // Use 0b11 to test both bits
status_long_chirp = 16'h1234;
status_long_listen = 16'h5678;
status_guard = 16'h9ABC;
status_short_chirp = 16'hDEF0;
status_short_listen = 16'hFACE;
status_chirps_per_elev = 6'd42;
status_range_mode = 2'b10;
status_self_test_flags = 5'b10101;
status_self_test_detail = 8'hA5;
status_self_test_busy = 1'b1;
status_agc_current_gain = 4'd7;
status_agc_peak_magnitude = 8'd200;
status_agc_saturation_count = 8'd15;
status_agc_enable = 1'b1;
// Pulse status_request and capture bytes IN PARALLEL
// (same reason as Exercise B write FSM starts before CDC wait ends)
fork
pulse_status_request;
capture_write_bytes(26);
join
check(capture_count === 26,
"Status packet: captured 26 bytes");
// Dump captured bytes to file
status_file = $fopen("status_packet.txt", "w");
$fwrite(status_file, "# byte_index hex_value\n");
for (i = 0; i < capture_count; i = i + 1) begin
$fwrite(status_file, "%0d %02x\n", i, captured_bytes[i]);
end
// Also dump the raw status_words for debugging
$fwrite(status_file, "# status_words (internal):\n");
for (i = 0; i < 6; i = i + 1) begin
$fwrite(status_file, "# word[%0d] = %08x\n", i, uut.status_words[i]);
end
$fclose(status_file);
// Verify header/footer locally
check(captured_bytes[0] === 8'hBB,
"Status pkt: byte 0 = 0xBB (status header)");
check(captured_bytes[25] === 8'h55,
"Status pkt: byte 25 = 0x55 (footer)");
// Verify status_words[1] = {long_chirp, long_listen} = {0x1234, 0x5678}
check(captured_bytes[5] === 8'h12 && captured_bytes[6] === 8'h34 &&
captured_bytes[7] === 8'h56 && captured_bytes[8] === 8'h78,
"Status pkt: word1 = {long_chirp=0x1234, long_listen=0x5678}");
// Verify status_words[2] = {guard, short_chirp} = {0x9ABC, 0xDEF0}
check(captured_bytes[9] === 8'h9A && captured_bytes[10] === 8'hBC &&
captured_bytes[11] === 8'hDE && captured_bytes[12] === 8'hF0,
"Status pkt: word2 = {guard=0x9ABC, short_chirp=0xDEF0}");
// ============================================================
// Summary
// ============================================================
$display("");
$display("========================================");
$display(" CROSS-LAYER FT2232H TB RESULTS");
$display(" PASSED: %0d / %0d", pass_count, test_num);
$display(" FAILED: %0d / %0d", fail_count, test_num);
if (fail_count == 0)
$display(" ** ALL TESTS PASSED **");
else
$display(" ** SOME TESTS FAILED **");
$display("========================================");
#100;
$finish;
end
endmodule
@@ -86,7 +86,7 @@ GROUND_TRUTH_OPCODES = {
0x01: ("host_radar_mode", 2),
0x02: ("host_trigger_pulse", 1), # pulse
0x03: ("host_detect_threshold", 16),
0x04: ("host_stream_control", 3),
0x04: ("host_stream_control", 6),
0x10: ("host_long_chirp_cycles", 16),
0x11: ("host_long_listen_cycles", 16),
0x12: ("host_guard_cycles", 16),
@@ -115,7 +115,7 @@ GROUND_TRUTH_OPCODES = {
GROUND_TRUTH_RESET_DEFAULTS = {
"host_radar_mode": 1, # 2'b01
"host_detect_threshold": 10000,
"host_stream_control": 7, # 3'b111
"host_stream_control": 15, # 6'b001_111 (mag_only + all streams)
"host_long_chirp_cycles": 3000,
"host_long_listen_cycles": 13700,
"host_guard_cycles": 17540,
@@ -139,7 +139,6 @@ GROUND_TRUTH_RESET_DEFAULTS = {
}
GROUND_TRUTH_PACKET_CONSTANTS = {
"data": {"header": 0xAA, "footer": 0x55, "size": 11},
"status": {"header": 0xBB, "footer": 0x55, "size": 26},
}
@@ -372,17 +371,16 @@ class TestTier1ArchitecturalParams:
# Frozen architectural constants — update deliberately when changing arch
FROZEN_PARAMS: ClassVar[dict[str, int]] = {
"RP_FFT_SIZE": 1024,
"RP_DECIMATION_FACTOR": 16,
"RP_BINS_PER_SEGMENT": 64,
"RP_OUTPUT_RANGE_BINS_3KM": 64,
"RP_FFT_SIZE": 2048,
"RP_DECIMATION_FACTOR": 4,
"RP_NUM_RANGE_BINS": 512,
"RP_DOPPLER_FFT_SIZE": 16,
"RP_NUM_DOPPLER_BINS": 32,
"RP_CHIRPS_PER_FRAME": 32,
"RP_CHIRPS_PER_SUBFRAME": 16,
"RP_DATA_WIDTH": 16,
"RP_PROCESSING_RATE_MHZ": 100,
"RP_RANGE_PER_BIN_DM": 240, # 24.0 m in decimeters
"RP_RANGE_PER_BIN_DM": 60, # 6.0 m in decimeters
}
def test_radar_params_vh_parseable(self):
@@ -441,8 +439,8 @@ class TestTier1ArchitecturalParams:
sys.path.insert(0, str(cp.GUI_DIR))
from v7.models import WaveformConfig
wc = WaveformConfig()
assert params["RP_OUTPUT_RANGE_BINS_3KM"] == wc.n_range_bins, (
f"Range bins mismatch: radar_params.vh={params['RP_OUTPUT_RANGE_BINS_3KM']}, "
assert params["RP_NUM_RANGE_BINS"] == wc.n_range_bins, (
f"Range bins mismatch: radar_params.vh={params['RP_NUM_RANGE_BINS']}, "
f"WaveformConfig={wc.n_range_bins}"
)
@@ -505,7 +503,9 @@ class TestTier1PacketConstants:
"""Python and Verilog packet constants must match each other."""
py = cp.parse_python_packet_constants()
v = cp.parse_verilog_packet_constants()
for ptype in ("data", "status"):
# Only status packets have fixed constants; data packets are now
# bulk per-frame (variable size) in the FT2232H protocol.
for ptype in ("status",):
assert py[ptype].header == v[ptype].header
assert py[ptype].footer == v[ptype].footer
assert py[ptype].size == v[ptype].size
@@ -525,46 +525,229 @@ class TestTier1ResetDefaults:
)
class TestTier1DataPacketLayout:
"""Verify data packet byte layout matches between Python and Verilog."""
class TestTier1BulkFrameLayout:
"""Verify bulk frame protocol layout between Python parser and FPGA RTL.
def test_verilog_data_mux_field_positions(self):
"""Verilog data_pkt_byte mux must have correct byte positions."""
v_fields = cp.parse_verilog_data_mux()
# Expected: range_profile at bytes 1-4 (32-bit), doppler_real 5-6,
# doppler_imag 7-8, cfar 9
field_map = {f.name: f for f in v_fields}
The FT2232H USB interface uses a bulk per-frame transfer protocol:
Header (8 bytes): 0xAA | flags | frame_num[15:0] | n_range[15:0] | n_doppler[15:0]
[variable payload sections based on flags]
Footer (1 byte): 0x55
assert "range_profile" in field_map
rp = field_map["range_profile"]
assert rp.byte_start == 1 and rp.byte_end == 4 and rp.width_bits == 32
Flags byte: {2'b0, sparse_det, mag_only, stream_cfar, stream_doppler, stream_range}
"""
assert "doppler_real" in field_map
dr = field_map["doppler_real"]
assert dr.byte_start == 5 and dr.byte_end == 6 and dr.width_bits == 16
# Ground truth: independently derived from protocol spec (NOT from code)
HEADER_BYTE = 0xAA
FOOTER_BYTE = 0x55
BULK_HEADER_SIZE = 8 # 1 + 1 + 2 + 2 + 2
N_RANGE = 512
N_DOPPLER = 32
# Flag bit positions (from LSB)
FLAG_STREAM_RANGE = 0x01
FLAG_STREAM_DOPPLER = 0x02
FLAG_STREAM_CFAR = 0x04
FLAG_MAG_ONLY = 0x08
FLAG_SPARSE_DET = 0x10
assert "doppler_imag" in field_map
di = field_map["doppler_imag"]
assert di.byte_start == 7 and di.byte_end == 8 and di.width_bits == 16
def test_python_header_constants_match_ground_truth(self):
"""Python protocol constants must match independently-derived ground truth."""
from radar_protocol import (
HEADER_BYTE, FOOTER_BYTE, BULK_HEADER_SIZE,
NUM_RANGE_BINS, NUM_DOPPLER_BINS,
)
assert HEADER_BYTE == self.HEADER_BYTE, (
f"HEADER_BYTE: 0x{HEADER_BYTE:02X} != 0x{self.HEADER_BYTE:02X}"
)
assert FOOTER_BYTE == self.FOOTER_BYTE, (
f"FOOTER_BYTE: 0x{FOOTER_BYTE:02X} != 0x{self.FOOTER_BYTE:02X}"
)
assert BULK_HEADER_SIZE == self.BULK_HEADER_SIZE, (
f"BULK_HEADER_SIZE: {BULK_HEADER_SIZE} != {self.BULK_HEADER_SIZE}"
)
assert NUM_RANGE_BINS == self.N_RANGE, (
f"NUM_RANGE_BINS: {NUM_RANGE_BINS} != {self.N_RANGE}"
)
assert NUM_DOPPLER_BINS == self.N_DOPPLER, (
f"NUM_DOPPLER_BINS: {NUM_DOPPLER_BINS} != {self.N_DOPPLER}"
)
def test_python_data_packet_byte_positions(self):
"""Python parse_data_packet byte offsets must be correct."""
py_fields = cp.parse_python_data_packet_fields()
# range_q at offset 1 (2B), range_i at offset 3 (2B),
# doppler_i at offset 5 (2B), doppler_q at offset 7 (2B),
# detection at offset 9
field_map = {f.name: f for f in py_fields}
def test_verilog_header_constants_match_ground_truth(self):
"""Verilog FT2232H header/footer constants must match ground truth."""
v_pkt = cp.parse_verilog_packet_constants()
# Status header/footer (FT2232H bulk frames use same HEADER for data)
assert v_pkt["status"].header == 0xBB, (
f"STATUS_HEADER: 0x{v_pkt['status'].header:02X} != 0xBB"
)
assert v_pkt["status"].footer == self.FOOTER_BYTE, (
f"FOOTER: 0x{v_pkt['status'].footer:02X} != 0x{self.FOOTER_BYTE:02X}"
)
assert v_pkt["status"].size == 26, (
f"STATUS_PKT_LEN: {v_pkt['status'].size} != 26"
)
assert "range_q" in field_map
assert field_map["range_q"].byte_start == 1
assert "range_i" in field_map
assert field_map["range_i"].byte_start == 3
assert "doppler_i" in field_map
assert field_map["doppler_i"].byte_start == 5
assert "doppler_q" in field_map
assert field_map["doppler_q"].byte_start == 7
assert "detection" in field_map
assert field_map["detection"].byte_start == 9
def test_bulk_frame_total_size_mag_only_with_cfar(self):
"""Verify computed frame size for mag-only + bitmap detection mode.
Section sizes (ground truth):
Header: 8 bytes
Range profile: 512 * 2 = 1024 bytes
Doppler mag: 512 * 32 * 2 = 32768 bytes
Detection bitmap: (512 * 32) / 8 = 2048 bytes
Footer: 1 byte
Total: 35849 bytes
"""
header = self.BULK_HEADER_SIZE
range_section = self.N_RANGE * 2
doppler_section = self.N_RANGE * self.N_DOPPLER * 2 # mag-only: 16-bit per cell
det_section = (self.N_RANGE * self.N_DOPPLER + 7) // 8 # bitmap
footer = 1
expected_total = header + range_section + doppler_section + det_section + footer
assert expected_total == 35849, (
f"Frame size: {expected_total} != 35849"
)
def test_bulk_frame_total_size_iq_mode(self):
"""Verify computed frame size for full I/Q (no mag-only) + bitmap.
Doppler I/Q: 512 * 32 * 4 = 65536 bytes (vs 32768 for mag-only)
Total: 8 + 1024 + 65536 + 2048 + 1 = 68617 bytes
"""
header = self.BULK_HEADER_SIZE
range_section = self.N_RANGE * 2
doppler_section = self.N_RANGE * self.N_DOPPLER * 4 # I/Q: 32-bit per cell
det_section = (self.N_RANGE * self.N_DOPPLER + 7) // 8
footer = 1
expected_total = header + range_section + doppler_section + det_section + footer
assert expected_total == 68617, (
f"I/Q frame size: {expected_total} != 68617"
)
def test_parser_extracts_header_fields(self):
"""parse_bulk_frame must correctly extract header fields."""
from radar_protocol import RadarProtocol
# Build a minimal valid frame: range + doppler(mag-only) + cfar bitmap
flags = 0x0F # range | doppler | cfar | mag_only
frame_num = 0x1234
n_range = self.N_RANGE
n_doppler = self.N_DOPPLER
buf = bytearray()
buf.append(self.HEADER_BYTE)
buf.append(flags)
buf += struct.pack(">H", frame_num)
buf += struct.pack(">H", n_range)
buf += struct.pack(">H", n_doppler)
# Range profile: all zeros (512 x 2 bytes)
buf += bytes(n_range * 2)
# Doppler mag: all zeros (512 x 32 x 2 bytes)
buf += bytes(n_range * n_doppler * 2)
# Detection bitmap: all zeros
buf += bytes((n_range * n_doppler + 7) // 8)
# Footer
buf.append(self.FOOTER_BYTE)
frame = RadarProtocol.parse_bulk_frame(bytes(buf))
assert frame is not None, "parse_bulk_frame returned None for valid frame"
assert frame.frame_number == 0x1234, (
f"frame_number: {frame.frame_number} != 0x1234"
)
assert frame.range_profile.shape == (n_range,), (
f"range_profile shape: {frame.range_profile.shape} != ({n_range},)"
)
assert frame.magnitude.shape == (n_range, n_doppler), (
f"magnitude shape: {frame.magnitude.shape} != ({n_range}, {n_doppler})"
)
assert frame.detections.shape == (n_range, n_doppler), (
f"detections shape: {frame.detections.shape} != ({n_range}, {n_doppler})"
)
def test_parser_rejects_bad_header(self):
"""parse_bulk_frame must reject frames with wrong header byte."""
from radar_protocol import RadarProtocol
bad_frame = bytes([0xBB, 0x0F, 0x00, 0x01, 0x02, 0x00, 0x00, 0x20, 0x55])
assert RadarProtocol.parse_bulk_frame(bad_frame) is None
def test_parser_rejects_truncated_frame(self):
"""parse_bulk_frame must reject frames shorter than header + footer."""
from radar_protocol import RadarProtocol
short = bytes([self.HEADER_BYTE, 0x0F, 0x00, 0x01])
assert RadarProtocol.parse_bulk_frame(short) is None
def test_detection_bitmap_lsb_first_round_trip(self):
"""Detection bitmap packing must use LSB-first per byte (matching RTL).
RTL packs: byte_addr = {range_bin, doppler[4:3]}, bit = doppler[2:0].
This means doppler_bin % 8 gives the bit position within the byte.
"""
from radar_protocol import RadarProtocol
n_range = self.N_RANGE
n_doppler = self.N_DOPPLER
flags = 0x0F # range | doppler | cfar | mag_only
# Create a detection at range=50, doppler=5
# Expected: byte_idx = 50 * (32/8) + 5//8 = 50*4 + 0 = 200
# bit_pos = 5 % 8 = 5 → byte value = 1 << 5 = 0x20
det_bytes = bytearray((n_range * n_doppler + 7) // 8)
test_range = 50
test_doppler = 5
byte_idx = test_range * (n_doppler // 8) + test_doppler // 8
bit_pos = test_doppler % 8
det_bytes[byte_idx] |= 1 << bit_pos
buf = bytearray()
buf.append(self.HEADER_BYTE)
buf.append(flags)
buf += struct.pack(">H", 1) # frame_num
buf += struct.pack(">H", n_range)
buf += struct.pack(">H", n_doppler)
buf += bytes(n_range * 2) # range profile
buf += bytes(n_range * n_doppler * 2) # doppler mag
buf += det_bytes # detection bitmap
buf.append(self.FOOTER_BYTE)
frame = RadarProtocol.parse_bulk_frame(bytes(buf))
assert frame is not None
assert frame.detections[test_range, test_doppler] == 1, (
f"Detection at ({test_range},{test_doppler}) not found"
)
# Verify no false detections in nearby bins
assert frame.detections[test_range, test_doppler - 1] == 0, (
f"False detection at ({test_range},{test_doppler - 1})"
)
assert frame.detections[test_range, test_doppler + 1] == 0, (
f"False detection at ({test_range},{test_doppler + 1})"
)
assert frame.detection_count == 1, (
f"detection_count: {frame.detection_count} != 1"
)
def test_mock_frame_round_trips_through_parser(self):
"""FT2232HConnection._mock_read() output must parse correctly.
This is NOT self-referential: the mock packs LSB-first per byte
(matching RTL), and the parser unpacks LSB-first. If either side
flipped the bit order, the target detections would appear in wrong
Doppler bins.
"""
from radar_protocol import RadarProtocol, FT2232HConnection
conn = FT2232HConnection(mock=True)
raw = conn._mock_read(0)
frame = RadarProtocol.parse_bulk_frame(raw)
assert frame is not None, "Mock frame failed to parse"
assert frame.range_profile.shape[0] == self.N_RANGE
assert frame.magnitude.shape == (self.N_RANGE, self.N_DOPPLER)
# The mock injects targets at range ~100, doppler 7-9
# Verify detections appear in expected region
det_region = frame.detections[99:102, 7:10]
assert det_region.sum() > 0, (
"No detections found in expected target region (range 99-101, doppler 7-9)"
)
class TestTier1STM32SettingsPacket:
@@ -642,183 +825,11 @@ class TestTier1STM32SettingsPacket:
# TIER 2: Verilog Cosimulation
# ===================================================================
@pytest.mark.skipif(not _has_iverilog, reason="iverilog not available")
class TestTier2VerilogCosim:
"""Compile and run the FT2232H TB, validate output against Python parsers."""
@pytest.fixture(scope="class")
def tb_results(self, tmp_path_factory):
"""Compile and run TB once, return output file contents."""
workdir = tmp_path_factory.mktemp("verilog_cosim")
tb_path = THIS_DIR / "tb_cross_layer_ft2232h.v"
rtl_path = cp.FPGA_DIR / "usb_data_interface_ft2232h.v"
out_bin = workdir / "tb_cross_layer_ft2232h"
# Compile
result = subprocess.run(
[IVERILOG, "-o", str(out_bin), "-I", str(cp.FPGA_DIR),
str(tb_path), str(rtl_path)],
capture_output=True, text=True, timeout=30,
)
assert result.returncode == 0, f"iverilog compile failed:\n{result.stderr}"
# Run
result = subprocess.run(
[VVP, str(out_bin)],
capture_output=True, text=True, timeout=60,
cwd=str(workdir),
)
assert result.returncode == 0, f"vvp failed:\n{result.stderr}"
# Parse output
return {
"stdout": result.stdout,
"cmd_results": (workdir / "cmd_results.txt").read_text(),
"data_packet": (workdir / "data_packet.txt").read_text(),
"status_packet": (workdir / "status_packet.txt").read_text(),
}
def test_all_tb_tests_pass(self, tb_results):
"""All Verilog TB internal checks must pass."""
stdout = tb_results["stdout"]
assert "ALL TESTS PASSED" in stdout, f"TB had failures:\n{stdout}"
def test_command_round_trip(self, tb_results):
"""Verify every command decoded correctly by matching sent vs received."""
rows = _parse_hex_results(tb_results["cmd_results"])
assert len(rows) >= 20, f"Expected >= 20 command results, got {len(rows)}"
for row in rows:
assert len(row) == 6, f"Bad row format: {row}"
sent_op, sent_addr, sent_val = row[0], row[1], row[2]
got_op, got_addr, got_val = row[3], row[4], row[5]
assert sent_op == got_op, (
f"Opcode mismatch: sent 0x{sent_op} got 0x{got_op}"
)
assert sent_addr == got_addr, (
f"Addr mismatch: sent 0x{sent_addr} got 0x{got_addr}"
)
assert sent_val == got_val, (
f"Value mismatch: sent 0x{sent_val} got 0x{got_val}"
)
def test_data_packet_python_round_trip(self, tb_results):
"""
Take the 11 bytes captured by the Verilog TB, run Python's
parse_data_packet() on them, verify the parsed values match
what was injected into the TB.
"""
from radar_protocol import RadarProtocol
rows = _parse_hex_results(tb_results["data_packet"])
assert len(rows) == 11, f"Expected 11 data packet bytes, got {len(rows)}"
# Reconstruct raw bytes
raw = bytes(int(row[1], 16) for row in rows)
assert len(raw) == 11
parsed = RadarProtocol.parse_data_packet(raw)
assert parsed is not None, "parse_data_packet returned None"
# The TB injected: range_profile = 0xCAFE_BEEF = {Q=0xCAFE, I=0xBEEF}
# doppler_real = 0x1234, doppler_imag = 0x5678
# cfar_detection = 1
#
# range_q = 0xCAFE → signed = 0xCAFE - 0x10000 = -13570
# range_i = 0xBEEF → signed = 0xBEEF - 0x10000 = -16657
# doppler_i = 0x1234 → signed = 4660
# doppler_q = 0x5678 → signed = 22136
assert parsed["range_q"] == (0xCAFE - 0x10000), (
f"range_q: {parsed['range_q']} != {0xCAFE - 0x10000}"
)
assert parsed["range_i"] == (0xBEEF - 0x10000), (
f"range_i: {parsed['range_i']} != {0xBEEF - 0x10000}"
)
assert parsed["doppler_i"] == 0x1234, (
f"doppler_i: {parsed['doppler_i']} != {0x1234}"
)
assert parsed["doppler_q"] == 0x5678, (
f"doppler_q: {parsed['doppler_q']} != {0x5678}"
)
assert parsed["detection"] == 1, (
f"detection: {parsed['detection']} != 1"
)
def test_status_packet_python_round_trip(self, tb_results):
"""
Take the 26 bytes captured by the Verilog TB, run Python's
parse_status_packet() on them, verify against injected values.
"""
from radar_protocol import RadarProtocol
lines = tb_results["status_packet"].strip().splitlines()
# Filter out comments and status_words debug lines
rows = []
for line in lines:
line = line.strip()
if not line or line.startswith("#"):
continue
rows.append(line.split())
assert len(rows) == 26, f"Expected 26 status bytes, got {len(rows)}"
raw = bytes(int(row[1], 16) for row in rows)
assert len(raw) == 26
sr = RadarProtocol.parse_status_packet(raw)
assert sr is not None, "parse_status_packet returned None"
# Injected values (from TB):
# status_cfar_threshold = 0xABCD
# status_stream_ctrl = 3'b101 = 5
# status_radar_mode = 2'b11 = 3
# status_long_chirp = 0x1234
# status_long_listen = 0x5678
# status_guard = 0x9ABC
# status_short_chirp = 0xDEF0
# status_short_listen = 0xFACE
# status_chirps_per_elev = 42
# status_range_mode = 2'b10 = 2
# status_self_test_flags = 5'b10101 = 21
# status_self_test_detail = 0xA5
# status_self_test_busy = 1
# status_agc_current_gain = 7
# status_agc_peak_magnitude = 200
# status_agc_saturation_count = 15
# status_agc_enable = 1
# Words 1-5 should be correct (no truncation bug)
assert sr.cfar_threshold == 0xABCD, f"cfar_threshold: 0x{sr.cfar_threshold:04X}"
assert sr.long_chirp == 0x1234, f"long_chirp: 0x{sr.long_chirp:04X}"
assert sr.long_listen == 0x5678, f"long_listen: 0x{sr.long_listen:04X}"
assert sr.guard == 0x9ABC, f"guard: 0x{sr.guard:04X}"
assert sr.short_chirp == 0xDEF0, f"short_chirp: 0x{sr.short_chirp:04X}"
assert sr.short_listen == 0xFACE, f"short_listen: 0x{sr.short_listen:04X}"
assert sr.chirps_per_elev == 42, f"chirps_per_elev: {sr.chirps_per_elev}"
assert sr.range_mode == 2, f"range_mode: {sr.range_mode}"
assert sr.self_test_flags == 21, f"self_test_flags: {sr.self_test_flags}"
assert sr.self_test_detail == 0xA5, f"self_test_detail: 0x{sr.self_test_detail:02X}"
assert sr.self_test_busy == 1, f"self_test_busy: {sr.self_test_busy}"
# AGC fields (word 4)
assert sr.agc_current_gain == 7, f"agc_current_gain: {sr.agc_current_gain}"
assert sr.agc_peak_magnitude == 200, f"agc_peak_magnitude: {sr.agc_peak_magnitude}"
assert sr.agc_saturation_count == 15, f"agc_saturation_count: {sr.agc_saturation_count}"
assert sr.agc_enable == 1, f"agc_enable: {sr.agc_enable}"
# Word 0: stream_ctrl should be 5 (3'b101)
assert sr.stream_ctrl == 5, (
f"stream_ctrl: {sr.stream_ctrl} != 5. "
f"Check status_words[0] bit positions."
)
# radar_mode should be 3 (2'b11)
assert sr.radar_mode == 3, (
f"radar_mode={sr.radar_mode} != 3. "
f"Check status_words[0] bit positions."
)
# NOTE: TestTier2VerilogCosim was deleted — the FT2232H TB used the old
# per-sample data packet interface which was fully replaced by bulk frame
# protocol. Command round-trip and status packet testing via Verilog cosim
# can be restored when tb_cross_layer_ft2232h.v is rewritten for bulk frames.
# ===================================================================
@@ -998,6 +1009,142 @@ class TestTier3CStub:
)
# ===================================================================
# TIER 3b: Detection Bitmap Packing (RTL ground truth, not self-referential)
# ===================================================================
class TestTier3bDetectionBitmapPacking:
"""Verify detection bitmap packing matches RTL bit-layout specification.
The RTL packs detection bits as:
byte_addr = {range_bin, doppler_bin[4:3]} (i.e. range_bin * 4 + doppler_bin // 8)
bit_pos = doppler_bin[2:0] (i.e. doppler_bin % 8, LSB-first)
This test derives expected bytes INDEPENDENTLY from the RTL formula,
then verifies the Python mock generator produces those exact bytes AND
the Python parser recovers the original detection matrix.
This prevents the "self-referential mock" problem where mock and parser
agree with each other but both diverge from what the FPGA actually sends.
"""
# RTL constants
N_RANGE = 512
N_DOPPLER = 32
BITMAP_BYTES = N_RANGE * N_DOPPLER // 8 # 2048
@staticmethod
def _rtl_pack_bitmap(det_matrix):
"""Pack a detection matrix using the RTL formula (ground truth).
This is an independent re-implementation of the Verilog logic in
usb_data_interface_ft2232h.v lines 440-448:
detect_rmw_addr <= {range_bin_in, doppler_bin_in[4:3]};
detect_rmw_bit <= doppler_bin_in[2:0];
"""
n_range, n_doppler = det_matrix.shape
buf = bytearray(n_range * n_doppler // 8)
for r in range(n_range):
for d in range(n_doppler):
if det_matrix[r, d]:
byte_addr = r * (n_doppler // 8) + d // 8
bit_pos = d % 8 # LSB-first
buf[byte_addr] |= 1 << bit_pos
return bytes(buf)
def test_mock_generator_matches_rtl_formula(self):
"""FT2232HConnection mock bitmap bytes must match independently-derived RTL packing."""
import radar_protocol as rp
# Generate a mock frame via the FT2232H mock connection
conn = rp.FT2232HConnection(mock=True)
conn.open()
raw = conn.read()
conn.close()
assert raw is not None, "Mock connection returned None"
frame = rp.RadarProtocol.parse_bulk_frame(raw)
assert frame is not None, "Failed to parse mock frame"
# The mock creates detections at range_bins 99-101, doppler_bins 7-9
# (from radar_protocol.py FT2232HConnection._mock_read())
import numpy as np
expected_det = np.zeros((self.N_RANGE, self.N_DOPPLER), dtype=np.uint8)
for rb in range(99, 102):
for db in range(7, 10):
expected_det[rb, db] = 1
# Derive expected bytes from RTL formula (ground truth)
expected_bytes = self._rtl_pack_bitmap(expected_det)
# Extract actual bitmap bytes from the raw frame
# Frame layout (flags=0x0F: stream_range + stream_doppler + stream_cfar + mag_only):
# header(8) + range_mag(512*2) + doppler_mag(512*32*2) + det_bitmap(2048) + footer(1)
header_size = 1 + 1 + 2 + 2 + 2 # 8 bytes
range_mag_size = self.N_RANGE * 2 # 1024 bytes
doppler_mag_size = self.N_RANGE * self.N_DOPPLER * 2 # 32768 bytes
bitmap_offset = header_size + range_mag_size + doppler_mag_size
actual_bytes = raw[bitmap_offset:bitmap_offset + self.BITMAP_BYTES]
assert actual_bytes == expected_bytes, (
"Mock generator bitmap does NOT match RTL packing formula. "
"This means the mock would produce frames that differ from "
"what the FPGA actually sends over USB."
)
def test_parser_recovers_detections_from_rtl_packed_bytes(self):
"""Parser must correctly decode bytes packed with the RTL formula."""
import numpy as np
# Create a known detection pattern (sparse, tests edge cases)
det = np.zeros((self.N_RANGE, self.N_DOPPLER), dtype=np.uint8)
# Test corners: first/last range bin, first/last doppler bin
test_points = [(0, 0), (0, 31), (511, 0), (511, 31),
(100, 7), (255, 15), (256, 16)]
for r, d in test_points:
det[r, d] = 1
# Pack using RTL formula (ground truth)
rtl_bytes = self._rtl_pack_bitmap(det)
# Decode using the same logic as RadarProtocol.parse_bulk_frame
recovered = np.zeros((self.N_RANGE, self.N_DOPPLER), dtype=np.uint8)
for r in range(self.N_RANGE):
for d in range(self.N_DOPPLER):
byte_idx = r * (self.N_DOPPLER // 8) + d // 8
bit_pos = d % 8
if (rtl_bytes[byte_idx] >> bit_pos) & 1:
recovered[r, d] = 1
np.testing.assert_array_equal(recovered, det,
err_msg="Parser logic cannot round-trip RTL-packed bitmap. "
"The parser's byte_idx/bit_pos formula diverges from RTL.")
def test_msb_packing_would_fail(self):
"""Sanity check: MSB-first packing must produce DIFFERENT bytes than RTL.
This test exists to ensure the RTL ground truth test above isn't
vacuously true. If MSB and LSB packing produced the same bytes,
we couldn't detect the bug.
"""
import numpy as np
# Detection at doppler bin 1 — LSB bit_pos=1, MSB bit_pos=6
det = np.zeros((self.N_RANGE, self.N_DOPPLER), dtype=np.uint8)
det[0, 1] = 1 # doppler bin 1: LSB gives 0x02, MSB gives 0x40
rtl_bytes = self._rtl_pack_bitmap(det)
# MSB-first packing (the OLD, wrong way)
msb_bytes = bytearray(self.BITMAP_BYTES)
bit_idx = 0 * self.N_DOPPLER + 1
msb_bytes[bit_idx // 8] |= 1 << (7 - (bit_idx % 8))
assert rtl_bytes != bytes(msb_bytes), (
"LSB and MSB packing produced identical bytes — "
"this test cannot distinguish the bug. Check test logic."
)
# ===================================================================
# TIER 4: Stale Value Detection (LLM Agent Guardrails)
# ===================================================================
@@ -1028,11 +1175,11 @@ class TestTier4BannedPatterns:
# Wrong range per bin values from old calculations
(r'(?<!\d)4\.8\s*(?:m/bin|m per|meters?\s*per)',
"Stale bin spacing 4.8 m — PLFM is 24.0 m/bin",
"Stale bin spacing 4.8 m — PLFM is 6.0 m/bin",
("*.py", "*.cpp")),
(r'(?<!\d)5\.6\s*(?:m/bin|m per|meters?\s*per)',
"Stale bin spacing 5.6 m — PLFM is 24.0 m/bin",
"Stale bin spacing 5.6 m — PLFM is 6.0 m/bin",
("*.py", "*.cpp")),
# Wrong range resolution from deramped FMCW formula
@@ -1269,7 +1416,7 @@ class TestTier5PipelineDoppler:
err_msg="Doppler Q (MTI path) drifted from committed golden data")
def test_doppler_output_shape(self):
"""Doppler output must be (64, 32) — 64 range bins x 32 Doppler bins."""
"""Doppler output must be (512, 32) — 512 range bins x 32 Doppler bins."""
_skip_if_no_golden()
from golden_reference import run_doppler_fft
@@ -1283,8 +1430,8 @@ class TestTier5PipelineDoppler:
got_i, got_q = run_doppler_fft(dec_i, dec_q, twiddle_file_16=tw)
assert got_i.shape == (64, 32), f"Expected (64,32), got {got_i.shape}"
assert got_q.shape == (64, 32), f"Expected (64,32), got {got_q.shape}"
assert got_i.shape == (512, 32), f"Expected (512,32), got {got_i.shape}"
assert got_q.shape == (512, 32), f"Expected (512,32), got {got_q.shape}"
def test_doppler_chained_from_decimated_via_mti(self):
"""Full chain: decimated → MTI → Doppler must match committed output."""
@@ -1350,7 +1497,7 @@ class TestTier5PipelineCFAR:
err_msg="CFAR thresholds drifted from committed golden data")
def test_cfar_output_shapes(self):
"""CFAR outputs must be (64, 32)."""
"""CFAR outputs must be (512, 32)."""
_skip_if_no_golden()
from golden_reference import run_dc_notch, run_cfar_ca
@@ -1368,9 +1515,9 @@ class TestTier5PipelineCFAR:
mode=self.CFAR_MODE,
)
assert flags.shape == (64, 32)
assert mag.shape == (64, 32)
assert thr.shape == (64, 32)
assert flags.shape == (512, 32)
assert mag.shape == (512, 32)
assert thr.shape == (512, 32)
def test_cfar_flags_are_boolean(self):
"""CFAR flags should be boolean (0 or 1 only)."""