From f9ad30e737c8726b801d76805acf000660602781 Mon Sep 17 00:00:00 2001 From: Jason <83615043+JJassonn69@users.noreply.github.com> Date: Fri, 20 Mar 2026 20:54:42 +0200 Subject: [PATCH] GUI: add self-test UI, fix opcode mismatches (0x16->0x06, 0x04->0x05), update status parsing to 6-word/26-byte format --- 9_Firmware/9_3_GUI/radar_dashboard.py | 74 ++++++++++- 9_Firmware/9_3_GUI/radar_protocol.py | 91 ++++++++----- 9_Firmware/9_3_GUI/test_radar_dashboard.py | 146 +++++++++++++++++++-- 3 files changed, 266 insertions(+), 45 deletions(-) diff --git a/9_Firmware/9_3_GUI/radar_dashboard.py b/9_Firmware/9_3_GUI/radar_dashboard.py index 2973316..191ebff 100644 --- a/9_Firmware/9_3_GUI/radar_dashboard.py +++ b/9_Firmware/9_3_GUI/radar_dashboard.py @@ -270,6 +270,33 @@ class RadarDashboard: ttk.Button(left, text="Request Status (0xFF)", command=lambda: self._send_cmd(0xFF, 0)).pack(fill="x", pady=3) + ttk.Separator(left, orient="horizontal").pack(fill="x", pady=6) + + ttk.Label(left, text="FPGA Self-Test", font=("Menlo", 10, "bold")).pack( + anchor="w", pady=(2, 0)) + ttk.Button(left, text="Run Self-Test (0x30)", + command=lambda: self._send_cmd(0x30, 1)).pack(fill="x", pady=3) + ttk.Button(left, text="Read Self-Test Result (0x31)", + command=lambda: self._send_cmd(0x31, 0)).pack(fill="x", pady=3) + + # Self-test result display + st_frame = ttk.LabelFrame(left, text="Self-Test Results", padding=6) + st_frame.pack(fill="x", pady=(6, 0)) + self._st_labels = {} + for name, default_text in [ + ("busy", "Busy: --"), + ("flags", "Flags: -----"), + ("detail", "Detail: 0x--"), + ("t0", "T0 BRAM: --"), + ("t1", "T1 CIC: --"), + ("t2", "T2 FFT: --"), + ("t3", "T3 Arith: --"), + ("t4", "T4 ADC: --"), + ]: + lbl = ttk.Label(st_frame, text=default_text, font=("Menlo", 9)) + lbl.pack(anchor="w") + self._st_labels[name] = lbl + # Right column: Parameter configuration right = ttk.LabelFrame(outer, text="Parameter Configuration", padding=12) right.grid(row=0, column=1, sticky="nsew", padx=(8, 0)) @@ -281,10 +308,10 @@ class RadarDashboard: ("CFAR Alpha Q4.4 (0x23)", 0x23, "48"), ("CFAR Mode (0x24)", 0x24, "0"), ("Threshold (0x10)", 0x10, "500"), - ("Gain Shift (0x16)", 0x16, "0"), + ("Gain Shift (0x06)", 0x06, "0"), ("DC Notch Width (0x27)", 0x27, "0"), ("Range Mode (0x20)", 0x20, "0"), - ("Stream Enable (0x04)", 0x04, "7"), + ("Stream Enable (0x05)", 0x05, "7"), ] for row_idx, (label, opcode, default) in enumerate(params): @@ -367,7 +394,8 @@ class RadarDashboard: self.lbl_status.config(text="CONNECTED", foreground=GREEN) self.btn_connect.config(text="Disconnect") self._acq_thread = RadarAcquisition( - self.conn, self.frame_queue, self.recorder) + self.conn, self.frame_queue, self.recorder, + status_callback=self._on_status_received) self._acq_thread.start() log.info("Connected and acquisition started") else: @@ -402,6 +430,46 @@ class RadarDashboard: except ValueError: log.error("Invalid custom command values") + def _on_status_received(self, status: StatusResponse): + """Called from acquisition thread — schedule UI update on main thread.""" + self.root.after(0, self._update_self_test_labels, status) + + def _update_self_test_labels(self, status: StatusResponse): + """Update the self-test result labels from a StatusResponse.""" + if not hasattr(self, '_st_labels'): + return + flags = status.self_test_flags + detail = status.self_test_detail + busy = status.self_test_busy + + busy_str = "RUNNING" if busy else "IDLE" + busy_color = YELLOW if busy else FG + self._st_labels["busy"].config(text=f"Busy: {busy_str}", + foreground=busy_color) + self._st_labels["flags"].config(text=f"Flags: {flags:05b}") + self._st_labels["detail"].config(text=f"Detail: 0x{detail:02X}") + + # Individual test results (bit = 1 means PASS) + test_names = [ + ("t0", "T0 BRAM"), + ("t1", "T1 CIC"), + ("t2", "T2 FFT"), + ("t3", "T3 Arith"), + ("t4", "T4 ADC"), + ] + for i, (key, name) in enumerate(test_names): + if busy: + result_str = "..." + color = YELLOW + elif flags & (1 << i): + result_str = "PASS" + color = GREEN + else: + result_str = "FAIL" + color = RED + self._st_labels[key].config( + text=f"{name}: {result_str}", foreground=color) + # --------------------------------------------------------- Display loop def _schedule_update(self): self._update_display() diff --git a/9_Firmware/9_3_GUI/radar_protocol.py b/9_Firmware/9_3_GUI/radar_protocol.py index 7249aaf..415be09 100644 --- a/9_Firmware/9_3_GUI/radar_protocol.py +++ b/9_Firmware/9_3_GUI/radar_protocol.py @@ -10,7 +10,7 @@ Matches usb_data_interface.v packet format exactly. USB Packet Protocol: TX (FPGA→Host): Data packet: [0xAA] [range 4×32b] [doppler 4×32b] [det 1B] [0x55] - Status packet: [0xBB] [status 5×32b] [0x55] + Status packet: [0xBB] [status 6×32b] [0x55] RX (Host→FPGA): Command word: {opcode[31:24], addr[23:16], value[15:0]} """ @@ -47,29 +47,30 @@ WATERFALL_DEPTH = 64 class Opcode(IntEnum): """Host register opcodes (matches radar_system_top.v command decode).""" - TRIGGER = 0x01 - PRF_DIV = 0x02 - NUM_CHIRPS = 0x03 - CHIRP_TIMER = 0x04 - STREAM_ENABLE = 0x05 - GAIN_SHIFT = 0x06 - THRESHOLD = 0x10 - LONG_CHIRP = 0x10 - LONG_LISTEN = 0x11 - GUARD = 0x12 - SHORT_CHIRP = 0x13 - SHORT_LISTEN = 0x14 - CHIRPS_PER_ELEV = 0x15 - DIGITAL_GAIN = 0x16 - RANGE_MODE = 0x20 - CFAR_GUARD = 0x21 - CFAR_TRAIN = 0x22 - CFAR_ALPHA = 0x23 - CFAR_MODE = 0x24 - CFAR_ENABLE = 0x25 - MTI_ENABLE = 0x26 - DC_NOTCH_WIDTH = 0x27 - STATUS_REQUEST = 0xFF + TRIGGER = 0x01 + PRF_DIV = 0x02 + NUM_CHIRPS = 0x03 + CHIRP_TIMER = 0x04 + STREAM_ENABLE = 0x05 + GAIN_SHIFT = 0x06 + THRESHOLD = 0x10 + LONG_CHIRP = 0x10 + LONG_LISTEN = 0x11 + GUARD = 0x12 + SHORT_CHIRP = 0x13 + SHORT_LISTEN = 0x14 + CHIRPS_PER_ELEV = 0x15 + RANGE_MODE = 0x20 + CFAR_GUARD = 0x21 + CFAR_TRAIN = 0x22 + CFAR_ALPHA = 0x23 + CFAR_MODE = 0x24 + CFAR_ENABLE = 0x25 + MTI_ENABLE = 0x26 + DC_NOTCH_WIDTH = 0x27 + SELF_TEST_TRIGGER = 0x30 + SELF_TEST_STATUS = 0x31 + STATUS_REQUEST = 0xFF # ============================================================================ @@ -96,7 +97,7 @@ class RadarFrame: @dataclass class StatusResponse: - """Parsed status response from FPGA.""" + """Parsed status response from FPGA (8-word packet as of Build 26).""" radar_mode: int = 0 stream_ctrl: int = 0 cfar_threshold: int = 0 @@ -107,6 +108,10 @@ class StatusResponse: short_listen: int = 0 chirps_per_elev: int = 0 range_mode: int = 0 + # Self-test results (word 5, added in Build 26) + self_test_flags: int = 0 # 5-bit result flags [4:0] + self_test_detail: int = 0 # 8-bit detail code [7:0] + self_test_busy: int = 0 # 1-bit busy flag # ============================================================================ @@ -197,19 +202,19 @@ class RadarProtocol: def parse_status_packet(raw: bytes) -> Optional[StatusResponse]: """ Parse a status response packet. - Format: [0xBB] [5×4B status words] [0x55] = 1 + 20 + 1 = 22 bytes + Format: [0xBB] [6×4B status words] [0x55] = 1 + 24 + 1 = 26 bytes """ - if len(raw) < 22: + if len(raw) < 26: return None if raw[0] != STATUS_HEADER_BYTE: return None words = [] - for i in range(5): + for i in range(6): w = struct.unpack_from(">I", raw, 1 + i * 4)[0] words.append(w) - if raw[21] != FOOTER_BYTE: + if raw[25] != FOOTER_BYTE: return None sr = StatusResponse() @@ -228,6 +233,11 @@ class RadarProtocol: sr.short_listen = (words[3] >> 16) & 0xFFFF # Word 4: {30'd0, range_mode[1:0]} sr.range_mode = words[4] & 0x03 + # Word 5: {7'd0, self_test_busy, 8'd0, self_test_detail[7:0], + # 3'd0, self_test_flags[4:0]} + sr.self_test_flags = words[5] & 0x1F + sr.self_test_detail = (words[5] >> 8) & 0xFF + sr.self_test_busy = (words[5] >> 24) & 0x01 return sr @staticmethod @@ -248,8 +258,8 @@ class RadarProtocol: else: break elif buf[i] == STATUS_HEADER_BYTE: - # Status packet: 22 bytes - end = i + 22 + # Status packet: 26 bytes (6 words + header + footer) + end = i + 26 if end <= len(buf): packets.append((i, end, "status")) i = end @@ -423,8 +433,9 @@ _HARDWARE_ONLY_OPCODES = { 0x13, # SHORT_CHIRP 0x14, # SHORT_LISTEN 0x15, # CHIRPS_PER_ELEV - 0x16, # DIGITAL_GAIN 0x20, # RANGE_MODE + 0x30, # SELF_TEST_TRIGGER + 0x31, # SELF_TEST_STATUS 0xFF, # STATUS_REQUEST } @@ -873,11 +884,13 @@ class RadarAcquisition(threading.Thread): """ def __init__(self, connection: FT601Connection, frame_queue: queue.Queue, - recorder: Optional[DataRecorder] = None): + recorder: Optional[DataRecorder] = None, + status_callback=None): super().__init__(daemon=True) self.conn = connection self.frame_queue = frame_queue self.recorder = recorder + self._status_callback = status_callback self._stop_event = threading.Event() self._frame = RadarFrame() self._sample_idx = 0 @@ -903,7 +916,17 @@ class RadarAcquisition(threading.Thread): elif ptype == "status": status = RadarProtocol.parse_status_packet(raw[start:end]) if status is not None: - log.info(f"Status: mode={status.radar_mode} stream={status.stream_ctrl}") + log.info(f"Status: mode={status.radar_mode} " + f"stream={status.stream_ctrl}") + if status.self_test_busy or status.self_test_flags: + log.info(f"Self-test: busy={status.self_test_busy} " + f"flags=0b{status.self_test_flags:05b} " + f"detail=0x{status.self_test_detail:02X}") + if self._status_callback is not None: + try: + self._status_callback(status) + except Exception as e: + log.error(f"Status callback error: {e}") log.info("Acquisition thread stopped") diff --git a/9_Firmware/9_3_GUI/test_radar_dashboard.py b/9_Firmware/9_3_GUI/test_radar_dashboard.py index 556cd87..00d15cc 100644 --- a/9_Firmware/9_3_GUI/test_radar_dashboard.py +++ b/9_Firmware/9_3_GUI/test_radar_dashboard.py @@ -17,7 +17,7 @@ import numpy as np from radar_protocol import ( RadarProtocol, FT601Connection, DataRecorder, RadarAcquisition, - RadarFrame, StatusResponse, + RadarFrame, StatusResponse, Opcode, HEADER_BYTE, FOOTER_BYTE, STATUS_HEADER_BYTE, NUM_RANGE_BINS, NUM_DOPPLER_BINS, NUM_CELLS, _HARDWARE_ONLY_OPCODES, _REPLAY_ADJUSTABLE_OPCODES, @@ -133,8 +133,9 @@ class TestRadarProtocol(unittest.TestCase): def _make_status_packet(self, mode=1, stream=7, threshold=10000, long_chirp=3000, long_listen=13700, guard=17540, short_chirp=50, - short_listen=17450, chirps=32, range_mode=0): - """Build a 22-byte status response matching FPGA format.""" + short_listen=17450, chirps=32, range_mode=0, + st_flags=0, st_detail=0, st_busy=0): + """Build a 26-byte status response matching FPGA format (Build 26).""" pkt = bytearray() pkt.append(STATUS_HEADER_BYTE) @@ -158,6 +159,11 @@ class TestRadarProtocol(unittest.TestCase): w4 = range_mode & 0x03 pkt += struct.pack(">I", w4) + # Word 5: {7'd0, self_test_busy, 8'd0, self_test_detail[7:0], + # 3'd0, self_test_flags[4:0]} + w5 = ((st_busy & 0x01) << 24) | ((st_detail & 0xFF) << 8) | (st_flags & 0x1F) + pkt += struct.pack(">I", w5) + pkt.append(FOOTER_BYTE) return bytes(pkt) @@ -182,7 +188,7 @@ class TestRadarProtocol(unittest.TestCase): self.assertEqual(sr.range_mode, 2) def test_parse_status_too_short(self): - self.assertIsNone(RadarProtocol.parse_status_packet(b"\xBB" + b"\x00" * 10)) + self.assertIsNone(RadarProtocol.parse_status_packet(b"\xBB" + b"\x00" * 20)) def test_parse_status_wrong_header(self): raw = self._make_status_packet() @@ -191,9 +197,55 @@ class TestRadarProtocol(unittest.TestCase): def test_parse_status_wrong_footer(self): raw = bytearray(self._make_status_packet()) - raw[-1] = 0x00 # corrupt footer + raw[25] = 0x00 # corrupt footer (was at index 21 in old 5-word format) self.assertIsNone(RadarProtocol.parse_status_packet(bytes(raw))) + def test_parse_status_self_test_all_pass(self): + """Status with all self-test flags set (all tests pass).""" + raw = self._make_status_packet(st_flags=0x1F, st_detail=0xA5, st_busy=0) + sr = RadarProtocol.parse_status_packet(raw) + self.assertIsNotNone(sr) + self.assertEqual(sr.self_test_flags, 0x1F) + self.assertEqual(sr.self_test_detail, 0xA5) + self.assertEqual(sr.self_test_busy, 0) + + def test_parse_status_self_test_busy(self): + """Status with self-test busy flag set.""" + raw = self._make_status_packet(st_flags=0x00, st_detail=0x00, st_busy=1) + sr = RadarProtocol.parse_status_packet(raw) + self.assertIsNotNone(sr) + self.assertEqual(sr.self_test_busy, 1) + self.assertEqual(sr.self_test_flags, 0) + self.assertEqual(sr.self_test_detail, 0) + + def test_parse_status_self_test_partial_fail(self): + """Status with partial self-test failures (flags=0b10110).""" + raw = self._make_status_packet(st_flags=0b10110, st_detail=0x42, st_busy=0) + sr = RadarProtocol.parse_status_packet(raw) + self.assertIsNotNone(sr) + self.assertEqual(sr.self_test_flags, 0b10110) + self.assertEqual(sr.self_test_detail, 0x42) + self.assertEqual(sr.self_test_busy, 0) + # T0 (BRAM) failed, T1 (CIC) passed, T2 (FFT) passed, T3 (arith) failed, T4 (ADC) passed + self.assertFalse(sr.self_test_flags & 0x01) # T0 fail + self.assertTrue(sr.self_test_flags & 0x02) # T1 pass + self.assertTrue(sr.self_test_flags & 0x04) # T2 pass + self.assertFalse(sr.self_test_flags & 0x08) # T3 fail + self.assertTrue(sr.self_test_flags & 0x10) # T4 pass + + def test_parse_status_self_test_zero_word5(self): + """Status with zero word 5 (self-test never run).""" + raw = self._make_status_packet() + sr = RadarProtocol.parse_status_packet(raw) + self.assertEqual(sr.self_test_flags, 0) + self.assertEqual(sr.self_test_detail, 0) + self.assertEqual(sr.self_test_busy, 0) + + def test_status_packet_is_26_bytes(self): + """Verify status packet is exactly 26 bytes.""" + raw = self._make_status_packet() + self.assertEqual(len(raw), 26) + # ---------------------------------------------------------------- # Boundary detection # ---------------------------------------------------------------- @@ -375,9 +427,9 @@ class TestEndToEnd(unittest.TestCase): def test_command_roundtrip_all_opcodes(self): """Verify all opcodes produce valid 4-byte commands.""" - opcodes = [0x01, 0x02, 0x03, 0x04, 0x10, 0x11, 0x12, 0x13, 0x14, - 0x15, 0x16, 0x20, 0x21, 0x22, 0x23, 0x24, 0x25, 0x26, - 0x27, 0xFF] + opcodes = [0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x10, 0x11, 0x12, + 0x13, 0x14, 0x15, 0x20, 0x21, 0x22, 0x23, 0x24, 0x25, + 0x26, 0x27, 0x30, 0x31, 0xFF] for op in opcodes: cmd = RadarProtocol.build_command(op, 42) self.assertEqual(len(cmd), 4, f"opcode 0x{op:02X}") @@ -608,6 +660,84 @@ class TestReplayConnection(unittest.TestCase): self.assertFalse(conn._needs_rebuild) conn.close() + def test_replay_self_test_opcodes_are_hardware_only(self): + """Self-test opcodes 0x30/0x31 are hardware-only (ignored in replay).""" + if not self._npy_available(): + self.skipTest("npy data files not found") + from radar_protocol import ReplayConnection + conn = ReplayConnection(self.NPY_DIR, use_mti=True) + conn.open() + # Send self-test trigger + cmd = RadarProtocol.build_command(0x30, 1) + conn.write(cmd) + self.assertFalse(conn._needs_rebuild) + # Send self-test status request + cmd = RadarProtocol.build_command(0x31, 0) + conn.write(cmd) + self.assertFalse(conn._needs_rebuild) + conn.close() + + +class TestOpcodeEnum(unittest.TestCase): + """Verify Opcode enum matches RTL host register map.""" + + def test_gain_shift_is_0x06(self): + """GAIN_SHIFT opcode must be 0x06 (not 0x16).""" + self.assertEqual(Opcode.GAIN_SHIFT, 0x06) + + def test_no_digital_gain_alias(self): + """DIGITAL_GAIN should NOT exist (was bogus 0x16 alias).""" + self.assertFalse(hasattr(Opcode, 'DIGITAL_GAIN')) + + def test_self_test_trigger(self): + """SELF_TEST_TRIGGER opcode must be 0x30.""" + self.assertEqual(Opcode.SELF_TEST_TRIGGER, 0x30) + + def test_self_test_status(self): + """SELF_TEST_STATUS opcode must be 0x31.""" + self.assertEqual(Opcode.SELF_TEST_STATUS, 0x31) + + def test_self_test_in_hardware_only(self): + """Self-test opcodes must be in _HARDWARE_ONLY_OPCODES.""" + self.assertIn(0x30, _HARDWARE_ONLY_OPCODES) + self.assertIn(0x31, _HARDWARE_ONLY_OPCODES) + + def test_0x16_not_in_hardware_only(self): + """Bogus 0x16 must not be in _HARDWARE_ONLY_OPCODES.""" + self.assertNotIn(0x16, _HARDWARE_ONLY_OPCODES) + + def test_stream_enable_is_0x05(self): + """STREAM_ENABLE must be 0x05 (not 0x04).""" + self.assertEqual(Opcode.STREAM_ENABLE, 0x05) + + def test_all_rtl_opcodes_present(self): + """Every RTL opcode has a matching Opcode enum member.""" + expected = {0x01, 0x02, 0x03, 0x04, 0x05, 0x06, + 0x10, 0x11, 0x12, 0x13, 0x14, 0x15, + 0x20, 0x21, 0x22, 0x23, 0x24, 0x25, 0x26, 0x27, + 0x30, 0x31, 0xFF} + enum_values = set(int(m) for m in Opcode) + for op in expected: + self.assertIn(op, enum_values, f"0x{op:02X} missing from Opcode enum") + + +class TestStatusResponseDefaults(unittest.TestCase): + """Verify StatusResponse dataclass has self-test fields.""" + + def test_default_self_test_fields(self): + sr = StatusResponse() + self.assertEqual(sr.self_test_flags, 0) + self.assertEqual(sr.self_test_detail, 0) + self.assertEqual(sr.self_test_busy, 0) + + def test_self_test_fields_set(self): + sr = StatusResponse(self_test_flags=0x1F, + self_test_detail=0xAB, + self_test_busy=1) + self.assertEqual(sr.self_test_flags, 0x1F) + self.assertEqual(sr.self_test_detail, 0xAB) + self.assertEqual(sr.self_test_busy, 1) + if __name__ == "__main__": unittest.main(verbosity=2)