GUI: add self-test UI, fix opcode mismatches (0x16->0x06, 0x04->0x05), update status parsing to 6-word/26-byte format

This commit is contained in:
Jason
2026-03-20 20:54:42 +02:00
parent 4985eccbae
commit f9ad30e737
3 changed files with 266 additions and 45 deletions
+71 -3
View File
@@ -270,6 +270,33 @@ class RadarDashboard:
ttk.Button(left, text="Request Status (0xFF)", ttk.Button(left, text="Request Status (0xFF)",
command=lambda: self._send_cmd(0xFF, 0)).pack(fill="x", pady=3) command=lambda: self._send_cmd(0xFF, 0)).pack(fill="x", pady=3)
ttk.Separator(left, orient="horizontal").pack(fill="x", pady=6)
ttk.Label(left, text="FPGA Self-Test", font=("Menlo", 10, "bold")).pack(
anchor="w", pady=(2, 0))
ttk.Button(left, text="Run Self-Test (0x30)",
command=lambda: self._send_cmd(0x30, 1)).pack(fill="x", pady=3)
ttk.Button(left, text="Read Self-Test Result (0x31)",
command=lambda: self._send_cmd(0x31, 0)).pack(fill="x", pady=3)
# Self-test result display
st_frame = ttk.LabelFrame(left, text="Self-Test Results", padding=6)
st_frame.pack(fill="x", pady=(6, 0))
self._st_labels = {}
for name, default_text in [
("busy", "Busy: --"),
("flags", "Flags: -----"),
("detail", "Detail: 0x--"),
("t0", "T0 BRAM: --"),
("t1", "T1 CIC: --"),
("t2", "T2 FFT: --"),
("t3", "T3 Arith: --"),
("t4", "T4 ADC: --"),
]:
lbl = ttk.Label(st_frame, text=default_text, font=("Menlo", 9))
lbl.pack(anchor="w")
self._st_labels[name] = lbl
# Right column: Parameter configuration # Right column: Parameter configuration
right = ttk.LabelFrame(outer, text="Parameter Configuration", padding=12) right = ttk.LabelFrame(outer, text="Parameter Configuration", padding=12)
right.grid(row=0, column=1, sticky="nsew", padx=(8, 0)) right.grid(row=0, column=1, sticky="nsew", padx=(8, 0))
@@ -281,10 +308,10 @@ class RadarDashboard:
("CFAR Alpha Q4.4 (0x23)", 0x23, "48"), ("CFAR Alpha Q4.4 (0x23)", 0x23, "48"),
("CFAR Mode (0x24)", 0x24, "0"), ("CFAR Mode (0x24)", 0x24, "0"),
("Threshold (0x10)", 0x10, "500"), ("Threshold (0x10)", 0x10, "500"),
("Gain Shift (0x16)", 0x16, "0"), ("Gain Shift (0x06)", 0x06, "0"),
("DC Notch Width (0x27)", 0x27, "0"), ("DC Notch Width (0x27)", 0x27, "0"),
("Range Mode (0x20)", 0x20, "0"), ("Range Mode (0x20)", 0x20, "0"),
("Stream Enable (0x04)", 0x04, "7"), ("Stream Enable (0x05)", 0x05, "7"),
] ]
for row_idx, (label, opcode, default) in enumerate(params): for row_idx, (label, opcode, default) in enumerate(params):
@@ -367,7 +394,8 @@ class RadarDashboard:
self.lbl_status.config(text="CONNECTED", foreground=GREEN) self.lbl_status.config(text="CONNECTED", foreground=GREEN)
self.btn_connect.config(text="Disconnect") self.btn_connect.config(text="Disconnect")
self._acq_thread = RadarAcquisition( self._acq_thread = RadarAcquisition(
self.conn, self.frame_queue, self.recorder) self.conn, self.frame_queue, self.recorder,
status_callback=self._on_status_received)
self._acq_thread.start() self._acq_thread.start()
log.info("Connected and acquisition started") log.info("Connected and acquisition started")
else: else:
@@ -402,6 +430,46 @@ class RadarDashboard:
except ValueError: except ValueError:
log.error("Invalid custom command values") log.error("Invalid custom command values")
def _on_status_received(self, status: StatusResponse):
"""Called from acquisition thread — schedule UI update on main thread."""
self.root.after(0, self._update_self_test_labels, status)
def _update_self_test_labels(self, status: StatusResponse):
"""Update the self-test result labels from a StatusResponse."""
if not hasattr(self, '_st_labels'):
return
flags = status.self_test_flags
detail = status.self_test_detail
busy = status.self_test_busy
busy_str = "RUNNING" if busy else "IDLE"
busy_color = YELLOW if busy else FG
self._st_labels["busy"].config(text=f"Busy: {busy_str}",
foreground=busy_color)
self._st_labels["flags"].config(text=f"Flags: {flags:05b}")
self._st_labels["detail"].config(text=f"Detail: 0x{detail:02X}")
# Individual test results (bit = 1 means PASS)
test_names = [
("t0", "T0 BRAM"),
("t1", "T1 CIC"),
("t2", "T2 FFT"),
("t3", "T3 Arith"),
("t4", "T4 ADC"),
]
for i, (key, name) in enumerate(test_names):
if busy:
result_str = "..."
color = YELLOW
elif flags & (1 << i):
result_str = "PASS"
color = GREEN
else:
result_str = "FAIL"
color = RED
self._st_labels[key].config(
text=f"{name}: {result_str}", foreground=color)
# --------------------------------------------------------- Display loop # --------------------------------------------------------- Display loop
def _schedule_update(self): def _schedule_update(self):
self._update_display() self._update_display()
+57 -34
View File
@@ -10,7 +10,7 @@ Matches usb_data_interface.v packet format exactly.
USB Packet Protocol: USB Packet Protocol:
TX (FPGA→Host): TX (FPGA→Host):
Data packet: [0xAA] [range 4×32b] [doppler 4×32b] [det 1B] [0x55] Data packet: [0xAA] [range 4×32b] [doppler 4×32b] [det 1B] [0x55]
Status packet: [0xBB] [status 5×32b] [0x55] Status packet: [0xBB] [status 6×32b] [0x55]
RX (Host→FPGA): RX (Host→FPGA):
Command word: {opcode[31:24], addr[23:16], value[15:0]} Command word: {opcode[31:24], addr[23:16], value[15:0]}
""" """
@@ -47,29 +47,30 @@ WATERFALL_DEPTH = 64
class Opcode(IntEnum): class Opcode(IntEnum):
"""Host register opcodes (matches radar_system_top.v command decode).""" """Host register opcodes (matches radar_system_top.v command decode)."""
TRIGGER = 0x01 TRIGGER = 0x01
PRF_DIV = 0x02 PRF_DIV = 0x02
NUM_CHIRPS = 0x03 NUM_CHIRPS = 0x03
CHIRP_TIMER = 0x04 CHIRP_TIMER = 0x04
STREAM_ENABLE = 0x05 STREAM_ENABLE = 0x05
GAIN_SHIFT = 0x06 GAIN_SHIFT = 0x06
THRESHOLD = 0x10 THRESHOLD = 0x10
LONG_CHIRP = 0x10 LONG_CHIRP = 0x10
LONG_LISTEN = 0x11 LONG_LISTEN = 0x11
GUARD = 0x12 GUARD = 0x12
SHORT_CHIRP = 0x13 SHORT_CHIRP = 0x13
SHORT_LISTEN = 0x14 SHORT_LISTEN = 0x14
CHIRPS_PER_ELEV = 0x15 CHIRPS_PER_ELEV = 0x15
DIGITAL_GAIN = 0x16 RANGE_MODE = 0x20
RANGE_MODE = 0x20 CFAR_GUARD = 0x21
CFAR_GUARD = 0x21 CFAR_TRAIN = 0x22
CFAR_TRAIN = 0x22 CFAR_ALPHA = 0x23
CFAR_ALPHA = 0x23 CFAR_MODE = 0x24
CFAR_MODE = 0x24 CFAR_ENABLE = 0x25
CFAR_ENABLE = 0x25 MTI_ENABLE = 0x26
MTI_ENABLE = 0x26 DC_NOTCH_WIDTH = 0x27
DC_NOTCH_WIDTH = 0x27 SELF_TEST_TRIGGER = 0x30
STATUS_REQUEST = 0xFF SELF_TEST_STATUS = 0x31
STATUS_REQUEST = 0xFF
# ============================================================================ # ============================================================================
@@ -96,7 +97,7 @@ class RadarFrame:
@dataclass @dataclass
class StatusResponse: class StatusResponse:
"""Parsed status response from FPGA.""" """Parsed status response from FPGA (8-word packet as of Build 26)."""
radar_mode: int = 0 radar_mode: int = 0
stream_ctrl: int = 0 stream_ctrl: int = 0
cfar_threshold: int = 0 cfar_threshold: int = 0
@@ -107,6 +108,10 @@ class StatusResponse:
short_listen: int = 0 short_listen: int = 0
chirps_per_elev: int = 0 chirps_per_elev: int = 0
range_mode: int = 0 range_mode: int = 0
# Self-test results (word 5, added in Build 26)
self_test_flags: int = 0 # 5-bit result flags [4:0]
self_test_detail: int = 0 # 8-bit detail code [7:0]
self_test_busy: int = 0 # 1-bit busy flag
# ============================================================================ # ============================================================================
@@ -197,19 +202,19 @@ class RadarProtocol:
def parse_status_packet(raw: bytes) -> Optional[StatusResponse]: def parse_status_packet(raw: bytes) -> Optional[StatusResponse]:
""" """
Parse a status response packet. Parse a status response packet.
Format: [0xBB] [5×4B status words] [0x55] = 1 + 20 + 1 = 22 bytes Format: [0xBB] [6×4B status words] [0x55] = 1 + 24 + 1 = 26 bytes
""" """
if len(raw) < 22: if len(raw) < 26:
return None return None
if raw[0] != STATUS_HEADER_BYTE: if raw[0] != STATUS_HEADER_BYTE:
return None return None
words = [] words = []
for i in range(5): for i in range(6):
w = struct.unpack_from(">I", raw, 1 + i * 4)[0] w = struct.unpack_from(">I", raw, 1 + i * 4)[0]
words.append(w) words.append(w)
if raw[21] != FOOTER_BYTE: if raw[25] != FOOTER_BYTE:
return None return None
sr = StatusResponse() sr = StatusResponse()
@@ -228,6 +233,11 @@ class RadarProtocol:
sr.short_listen = (words[3] >> 16) & 0xFFFF sr.short_listen = (words[3] >> 16) & 0xFFFF
# Word 4: {30'd0, range_mode[1:0]} # Word 4: {30'd0, range_mode[1:0]}
sr.range_mode = words[4] & 0x03 sr.range_mode = words[4] & 0x03
# Word 5: {7'd0, self_test_busy, 8'd0, self_test_detail[7:0],
# 3'd0, self_test_flags[4:0]}
sr.self_test_flags = words[5] & 0x1F
sr.self_test_detail = (words[5] >> 8) & 0xFF
sr.self_test_busy = (words[5] >> 24) & 0x01
return sr return sr
@staticmethod @staticmethod
@@ -248,8 +258,8 @@ class RadarProtocol:
else: else:
break break
elif buf[i] == STATUS_HEADER_BYTE: elif buf[i] == STATUS_HEADER_BYTE:
# Status packet: 22 bytes # Status packet: 26 bytes (6 words + header + footer)
end = i + 22 end = i + 26
if end <= len(buf): if end <= len(buf):
packets.append((i, end, "status")) packets.append((i, end, "status"))
i = end i = end
@@ -423,8 +433,9 @@ _HARDWARE_ONLY_OPCODES = {
0x13, # SHORT_CHIRP 0x13, # SHORT_CHIRP
0x14, # SHORT_LISTEN 0x14, # SHORT_LISTEN
0x15, # CHIRPS_PER_ELEV 0x15, # CHIRPS_PER_ELEV
0x16, # DIGITAL_GAIN
0x20, # RANGE_MODE 0x20, # RANGE_MODE
0x30, # SELF_TEST_TRIGGER
0x31, # SELF_TEST_STATUS
0xFF, # STATUS_REQUEST 0xFF, # STATUS_REQUEST
} }
@@ -873,11 +884,13 @@ class RadarAcquisition(threading.Thread):
""" """
def __init__(self, connection: FT601Connection, frame_queue: queue.Queue, def __init__(self, connection: FT601Connection, frame_queue: queue.Queue,
recorder: Optional[DataRecorder] = None): recorder: Optional[DataRecorder] = None,
status_callback=None):
super().__init__(daemon=True) super().__init__(daemon=True)
self.conn = connection self.conn = connection
self.frame_queue = frame_queue self.frame_queue = frame_queue
self.recorder = recorder self.recorder = recorder
self._status_callback = status_callback
self._stop_event = threading.Event() self._stop_event = threading.Event()
self._frame = RadarFrame() self._frame = RadarFrame()
self._sample_idx = 0 self._sample_idx = 0
@@ -903,7 +916,17 @@ class RadarAcquisition(threading.Thread):
elif ptype == "status": elif ptype == "status":
status = RadarProtocol.parse_status_packet(raw[start:end]) status = RadarProtocol.parse_status_packet(raw[start:end])
if status is not None: if status is not None:
log.info(f"Status: mode={status.radar_mode} stream={status.stream_ctrl}") log.info(f"Status: mode={status.radar_mode} "
f"stream={status.stream_ctrl}")
if status.self_test_busy or status.self_test_flags:
log.info(f"Self-test: busy={status.self_test_busy} "
f"flags=0b{status.self_test_flags:05b} "
f"detail=0x{status.self_test_detail:02X}")
if self._status_callback is not None:
try:
self._status_callback(status)
except Exception as e:
log.error(f"Status callback error: {e}")
log.info("Acquisition thread stopped") log.info("Acquisition thread stopped")
+138 -8
View File
@@ -17,7 +17,7 @@ import numpy as np
from radar_protocol import ( from radar_protocol import (
RadarProtocol, FT601Connection, DataRecorder, RadarAcquisition, RadarProtocol, FT601Connection, DataRecorder, RadarAcquisition,
RadarFrame, StatusResponse, RadarFrame, StatusResponse, Opcode,
HEADER_BYTE, FOOTER_BYTE, STATUS_HEADER_BYTE, HEADER_BYTE, FOOTER_BYTE, STATUS_HEADER_BYTE,
NUM_RANGE_BINS, NUM_DOPPLER_BINS, NUM_CELLS, NUM_RANGE_BINS, NUM_DOPPLER_BINS, NUM_CELLS,
_HARDWARE_ONLY_OPCODES, _REPLAY_ADJUSTABLE_OPCODES, _HARDWARE_ONLY_OPCODES, _REPLAY_ADJUSTABLE_OPCODES,
@@ -133,8 +133,9 @@ class TestRadarProtocol(unittest.TestCase):
def _make_status_packet(self, mode=1, stream=7, threshold=10000, def _make_status_packet(self, mode=1, stream=7, threshold=10000,
long_chirp=3000, long_listen=13700, long_chirp=3000, long_listen=13700,
guard=17540, short_chirp=50, guard=17540, short_chirp=50,
short_listen=17450, chirps=32, range_mode=0): short_listen=17450, chirps=32, range_mode=0,
"""Build a 22-byte status response matching FPGA format.""" st_flags=0, st_detail=0, st_busy=0):
"""Build a 26-byte status response matching FPGA format (Build 26)."""
pkt = bytearray() pkt = bytearray()
pkt.append(STATUS_HEADER_BYTE) pkt.append(STATUS_HEADER_BYTE)
@@ -158,6 +159,11 @@ class TestRadarProtocol(unittest.TestCase):
w4 = range_mode & 0x03 w4 = range_mode & 0x03
pkt += struct.pack(">I", w4) pkt += struct.pack(">I", w4)
# Word 5: {7'd0, self_test_busy, 8'd0, self_test_detail[7:0],
# 3'd0, self_test_flags[4:0]}
w5 = ((st_busy & 0x01) << 24) | ((st_detail & 0xFF) << 8) | (st_flags & 0x1F)
pkt += struct.pack(">I", w5)
pkt.append(FOOTER_BYTE) pkt.append(FOOTER_BYTE)
return bytes(pkt) return bytes(pkt)
@@ -182,7 +188,7 @@ class TestRadarProtocol(unittest.TestCase):
self.assertEqual(sr.range_mode, 2) self.assertEqual(sr.range_mode, 2)
def test_parse_status_too_short(self): def test_parse_status_too_short(self):
self.assertIsNone(RadarProtocol.parse_status_packet(b"\xBB" + b"\x00" * 10)) self.assertIsNone(RadarProtocol.parse_status_packet(b"\xBB" + b"\x00" * 20))
def test_parse_status_wrong_header(self): def test_parse_status_wrong_header(self):
raw = self._make_status_packet() raw = self._make_status_packet()
@@ -191,9 +197,55 @@ class TestRadarProtocol(unittest.TestCase):
def test_parse_status_wrong_footer(self): def test_parse_status_wrong_footer(self):
raw = bytearray(self._make_status_packet()) raw = bytearray(self._make_status_packet())
raw[-1] = 0x00 # corrupt footer raw[25] = 0x00 # corrupt footer (was at index 21 in old 5-word format)
self.assertIsNone(RadarProtocol.parse_status_packet(bytes(raw))) self.assertIsNone(RadarProtocol.parse_status_packet(bytes(raw)))
def test_parse_status_self_test_all_pass(self):
"""Status with all self-test flags set (all tests pass)."""
raw = self._make_status_packet(st_flags=0x1F, st_detail=0xA5, st_busy=0)
sr = RadarProtocol.parse_status_packet(raw)
self.assertIsNotNone(sr)
self.assertEqual(sr.self_test_flags, 0x1F)
self.assertEqual(sr.self_test_detail, 0xA5)
self.assertEqual(sr.self_test_busy, 0)
def test_parse_status_self_test_busy(self):
"""Status with self-test busy flag set."""
raw = self._make_status_packet(st_flags=0x00, st_detail=0x00, st_busy=1)
sr = RadarProtocol.parse_status_packet(raw)
self.assertIsNotNone(sr)
self.assertEqual(sr.self_test_busy, 1)
self.assertEqual(sr.self_test_flags, 0)
self.assertEqual(sr.self_test_detail, 0)
def test_parse_status_self_test_partial_fail(self):
"""Status with partial self-test failures (flags=0b10110)."""
raw = self._make_status_packet(st_flags=0b10110, st_detail=0x42, st_busy=0)
sr = RadarProtocol.parse_status_packet(raw)
self.assertIsNotNone(sr)
self.assertEqual(sr.self_test_flags, 0b10110)
self.assertEqual(sr.self_test_detail, 0x42)
self.assertEqual(sr.self_test_busy, 0)
# T0 (BRAM) failed, T1 (CIC) passed, T2 (FFT) passed, T3 (arith) failed, T4 (ADC) passed
self.assertFalse(sr.self_test_flags & 0x01) # T0 fail
self.assertTrue(sr.self_test_flags & 0x02) # T1 pass
self.assertTrue(sr.self_test_flags & 0x04) # T2 pass
self.assertFalse(sr.self_test_flags & 0x08) # T3 fail
self.assertTrue(sr.self_test_flags & 0x10) # T4 pass
def test_parse_status_self_test_zero_word5(self):
"""Status with zero word 5 (self-test never run)."""
raw = self._make_status_packet()
sr = RadarProtocol.parse_status_packet(raw)
self.assertEqual(sr.self_test_flags, 0)
self.assertEqual(sr.self_test_detail, 0)
self.assertEqual(sr.self_test_busy, 0)
def test_status_packet_is_26_bytes(self):
"""Verify status packet is exactly 26 bytes."""
raw = self._make_status_packet()
self.assertEqual(len(raw), 26)
# ---------------------------------------------------------------- # ----------------------------------------------------------------
# Boundary detection # Boundary detection
# ---------------------------------------------------------------- # ----------------------------------------------------------------
@@ -375,9 +427,9 @@ class TestEndToEnd(unittest.TestCase):
def test_command_roundtrip_all_opcodes(self): def test_command_roundtrip_all_opcodes(self):
"""Verify all opcodes produce valid 4-byte commands.""" """Verify all opcodes produce valid 4-byte commands."""
opcodes = [0x01, 0x02, 0x03, 0x04, 0x10, 0x11, 0x12, 0x13, 0x14, opcodes = [0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x10, 0x11, 0x12,
0x15, 0x16, 0x20, 0x21, 0x22, 0x23, 0x24, 0x25, 0x26, 0x13, 0x14, 0x15, 0x20, 0x21, 0x22, 0x23, 0x24, 0x25,
0x27, 0xFF] 0x26, 0x27, 0x30, 0x31, 0xFF]
for op in opcodes: for op in opcodes:
cmd = RadarProtocol.build_command(op, 42) cmd = RadarProtocol.build_command(op, 42)
self.assertEqual(len(cmd), 4, f"opcode 0x{op:02X}") self.assertEqual(len(cmd), 4, f"opcode 0x{op:02X}")
@@ -608,6 +660,84 @@ class TestReplayConnection(unittest.TestCase):
self.assertFalse(conn._needs_rebuild) self.assertFalse(conn._needs_rebuild)
conn.close() conn.close()
def test_replay_self_test_opcodes_are_hardware_only(self):
"""Self-test opcodes 0x30/0x31 are hardware-only (ignored in replay)."""
if not self._npy_available():
self.skipTest("npy data files not found")
from radar_protocol import ReplayConnection
conn = ReplayConnection(self.NPY_DIR, use_mti=True)
conn.open()
# Send self-test trigger
cmd = RadarProtocol.build_command(0x30, 1)
conn.write(cmd)
self.assertFalse(conn._needs_rebuild)
# Send self-test status request
cmd = RadarProtocol.build_command(0x31, 0)
conn.write(cmd)
self.assertFalse(conn._needs_rebuild)
conn.close()
class TestOpcodeEnum(unittest.TestCase):
"""Verify Opcode enum matches RTL host register map."""
def test_gain_shift_is_0x06(self):
"""GAIN_SHIFT opcode must be 0x06 (not 0x16)."""
self.assertEqual(Opcode.GAIN_SHIFT, 0x06)
def test_no_digital_gain_alias(self):
"""DIGITAL_GAIN should NOT exist (was bogus 0x16 alias)."""
self.assertFalse(hasattr(Opcode, 'DIGITAL_GAIN'))
def test_self_test_trigger(self):
"""SELF_TEST_TRIGGER opcode must be 0x30."""
self.assertEqual(Opcode.SELF_TEST_TRIGGER, 0x30)
def test_self_test_status(self):
"""SELF_TEST_STATUS opcode must be 0x31."""
self.assertEqual(Opcode.SELF_TEST_STATUS, 0x31)
def test_self_test_in_hardware_only(self):
"""Self-test opcodes must be in _HARDWARE_ONLY_OPCODES."""
self.assertIn(0x30, _HARDWARE_ONLY_OPCODES)
self.assertIn(0x31, _HARDWARE_ONLY_OPCODES)
def test_0x16_not_in_hardware_only(self):
"""Bogus 0x16 must not be in _HARDWARE_ONLY_OPCODES."""
self.assertNotIn(0x16, _HARDWARE_ONLY_OPCODES)
def test_stream_enable_is_0x05(self):
"""STREAM_ENABLE must be 0x05 (not 0x04)."""
self.assertEqual(Opcode.STREAM_ENABLE, 0x05)
def test_all_rtl_opcodes_present(self):
"""Every RTL opcode has a matching Opcode enum member."""
expected = {0x01, 0x02, 0x03, 0x04, 0x05, 0x06,
0x10, 0x11, 0x12, 0x13, 0x14, 0x15,
0x20, 0x21, 0x22, 0x23, 0x24, 0x25, 0x26, 0x27,
0x30, 0x31, 0xFF}
enum_values = set(int(m) for m in Opcode)
for op in expected:
self.assertIn(op, enum_values, f"0x{op:02X} missing from Opcode enum")
class TestStatusResponseDefaults(unittest.TestCase):
"""Verify StatusResponse dataclass has self-test fields."""
def test_default_self_test_fields(self):
sr = StatusResponse()
self.assertEqual(sr.self_test_flags, 0)
self.assertEqual(sr.self_test_detail, 0)
self.assertEqual(sr.self_test_busy, 0)
def test_self_test_fields_set(self):
sr = StatusResponse(self_test_flags=0x1F,
self_test_detail=0xAB,
self_test_busy=1)
self.assertEqual(sr.self_test_flags, 0x1F)
self.assertEqual(sr.self_test_detail, 0xAB)
self.assertEqual(sr.self_test_busy, 1)
if __name__ == "__main__": if __name__ == "__main__":
unittest.main(verbosity=2) unittest.main(verbosity=2)