diff --git a/9_Firmware/9_3_GUI/radar_dashboard.py b/9_Firmware/9_3_GUI/radar_dashboard.py index 7575e63..3d6988c 100644 --- a/9_Firmware/9_3_GUI/radar_dashboard.py +++ b/9_Firmware/9_3_GUI/radar_dashboard.py @@ -43,7 +43,7 @@ from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg # Import protocol layer (no GUI deps) from radar_protocol import ( - RadarProtocol, FT2232HConnection, ReplayConnection, + RadarProtocol, FT2232HConnection, DataRecorder, RadarAcquisition, RadarFrame, StatusResponse, NUM_RANGE_BINS, NUM_DOPPLER_BINS, WATERFALL_DEPTH, @@ -904,22 +904,13 @@ def main(): parser = argparse.ArgumentParser(description="AERIS-10 Radar Dashboard") parser.add_argument("--live", action="store_true", help="Use real FT2232H hardware (default: mock mode)") - parser.add_argument("--replay", type=str, metavar="NPY_DIR", - help="Replay real data from .npy directory " - "(e.g. tb/cosim/real_data/hex/)") - parser.add_argument("--no-mti", action="store_true", - help="With --replay, use non-MTI Doppler data") parser.add_argument("--record", action="store_true", help="Start HDF5 recording immediately") parser.add_argument("--device", type=int, default=0, help="FT2232H device index (default: 0)") args = parser.parse_args() - if args.replay: - npy_dir = os.path.abspath(args.replay) - conn = ReplayConnection(npy_dir, use_mti=not args.no_mti) - mode_str = f"REPLAY ({npy_dir}, MTI={'OFF' if args.no_mti else 'ON'})" - elif args.live: + if args.live: conn = FT2232HConnection(mock=False) mode_str = "LIVE" else: diff --git a/9_Firmware/9_3_GUI/radar_protocol.py b/9_Firmware/9_3_GUI/radar_protocol.py index c266b6d..e04d768 100644 --- a/9_Firmware/9_3_GUI/radar_protocol.py +++ b/9_Firmware/9_3_GUI/radar_protocol.py @@ -15,7 +15,6 @@ USB Packet Protocol (11-byte): Command: 4 bytes received sequentially {opcode, addr, value_hi, value_lo} """ -import os import struct import time import threading @@ -443,391 +442,7 @@ class FT2232HConnection: return bytes(buf) -# ============================================================================ -# Replay Connection — feed real .npy data through the dashboard -# ============================================================================ -# Hardware-only opcodes that cannot be adjusted in replay mode -# Values must match radar_system_top.v case(usb_cmd_opcode). -_HARDWARE_ONLY_OPCODES = { - 0x01, # RADAR_MODE - 0x02, # TRIGGER_PULSE - # 0x03 (DETECT_THRESHOLD) is NOT hardware-only — it's in _REPLAY_ADJUSTABLE_OPCODES - 0x04, # STREAM_CONTROL - 0x10, # LONG_CHIRP - 0x11, # LONG_LISTEN - 0x12, # GUARD - 0x13, # SHORT_CHIRP - 0x14, # SHORT_LISTEN - 0x15, # CHIRPS_PER_ELEV - 0x16, # GAIN_SHIFT - 0x20, # RANGE_MODE - 0x28, # AGC_ENABLE - 0x29, # AGC_TARGET - 0x2A, # AGC_ATTACK - 0x2B, # AGC_DECAY - 0x2C, # AGC_HOLDOFF - 0x30, # SELF_TEST_TRIGGER - 0x31, # SELF_TEST_STATUS - 0xFF, # STATUS_REQUEST -} - -# Replay-adjustable opcodes (re-run signal processing) -_REPLAY_ADJUSTABLE_OPCODES = { - 0x03, # DETECT_THRESHOLD - 0x21, # CFAR_GUARD - 0x22, # CFAR_TRAIN - 0x23, # CFAR_ALPHA - 0x24, # CFAR_MODE - 0x25, # CFAR_ENABLE - 0x26, # MTI_ENABLE - 0x27, # DC_NOTCH_WIDTH -} - - -def _saturate(val: int, bits: int) -> int: - """Saturate signed value to fit in 'bits' width.""" - max_pos = (1 << (bits - 1)) - 1 - max_neg = -(1 << (bits - 1)) - return max(max_neg, min(max_pos, int(val))) - - -def _replay_dc_notch(doppler_i: np.ndarray, doppler_q: np.ndarray, - width: int) -> tuple[np.ndarray, np.ndarray]: - """Bit-accurate DC notch filter (matches radar_system_top.v inline). - - Dual sub-frame notch: doppler_bin[4:0] = {sub_frame, bin[3:0]}. - Each 16-bin sub-frame has its own DC at bin 0, so we zero bins - where ``bin_within_sf < width`` or ``bin_within_sf > (15 - width + 1)``. - """ - out_i = doppler_i.copy() - out_q = doppler_q.copy() - if width == 0: - return out_i, out_q - n_doppler = doppler_i.shape[1] - for dbin in range(n_doppler): - bin_within_sf = dbin & 0xF - if bin_within_sf < width or bin_within_sf > (15 - width + 1): - out_i[:, dbin] = 0 - out_q[:, dbin] = 0 - return out_i, out_q - - -def _replay_cfar(doppler_i: np.ndarray, doppler_q: np.ndarray, - guard: int, train: int, alpha_q44: int, - mode: int) -> tuple[np.ndarray, np.ndarray]: - """ - Bit-accurate CA-CFAR detector (matches cfar_ca.v). - Returns (detect_flags, magnitudes) both (64, 32). - """ - ALPHA_FRAC_BITS = 4 - n_range, n_doppler = doppler_i.shape - if train == 0: - train = 1 - - # Compute magnitudes: |I| + |Q| (17-bit unsigned L1 norm) - magnitudes = np.zeros((n_range, n_doppler), dtype=np.int64) - for r in range(n_range): - for d in range(n_doppler): - i_val = int(doppler_i[r, d]) - q_val = int(doppler_q[r, d]) - abs_i = (-i_val) & 0xFFFF if i_val < 0 else i_val & 0xFFFF - abs_q = (-q_val) & 0xFFFF if q_val < 0 else q_val & 0xFFFF - magnitudes[r, d] = abs_i + abs_q - - detect_flags = np.zeros((n_range, n_doppler), dtype=np.bool_) - MAX_MAG = (1 << 17) - 1 - - mode_names = {0: 'CA', 1: 'GO', 2: 'SO'} - mode_str = mode_names.get(mode, 'CA') - - for dbin in range(n_doppler): - col = magnitudes[:, dbin] - for cut in range(n_range): - lead_sum, lead_cnt = 0, 0 - for t in range(1, train + 1): - idx = cut - guard - t - if 0 <= idx < n_range: - lead_sum += int(col[idx]) - lead_cnt += 1 - lag_sum, lag_cnt = 0, 0 - for t in range(1, train + 1): - idx = cut + guard + t - if 0 <= idx < n_range: - lag_sum += int(col[idx]) - lag_cnt += 1 - - if mode_str == 'CA': - noise = lead_sum + lag_sum - elif mode_str == 'GO': - if lead_cnt > 0 and lag_cnt > 0: - noise = lead_sum if lead_sum * lag_cnt > lag_sum * lead_cnt else lag_sum - else: - noise = lead_sum if lead_cnt > 0 else lag_sum - elif mode_str == 'SO': - if lead_cnt > 0 and lag_cnt > 0: - noise = lead_sum if lead_sum * lag_cnt < lag_sum * lead_cnt else lag_sum - else: - noise = lead_sum if lead_cnt > 0 else lag_sum - else: - noise = lead_sum + lag_sum - - thr = min((alpha_q44 * noise) >> ALPHA_FRAC_BITS, MAX_MAG) - if int(col[cut]) > thr: - detect_flags[cut, dbin] = True - - return detect_flags, magnitudes - - -class ReplayConnection: - """ - Loads pre-computed .npy arrays (from golden_reference.py co-sim output) - and serves them as USB data packets to the dashboard, exercising the full - parsing pipeline with real ADI CN0566 radar data. - - Signal processing parameters (CFAR guard/train/alpha/mode, MTI enable, - DC notch width) can be adjusted at runtime via write() — the connection - re-runs the bit-accurate processing pipeline and rebuilds packets. - - Required npy directory layout (e.g. tb/cosim/real_data/hex/): - decimated_range_i.npy (32, 64) int — pre-Doppler range I - decimated_range_q.npy (32, 64) int — pre-Doppler range Q - doppler_map_i.npy (64, 32) int — Doppler I (no MTI) - doppler_map_q.npy (64, 32) int — Doppler Q (no MTI) - fullchain_mti_doppler_i.npy (64, 32) int — Doppler I (with MTI) - fullchain_mti_doppler_q.npy (64, 32) int — Doppler Q (with MTI) - fullchain_cfar_flags.npy (64, 32) bool — CFAR detections - fullchain_cfar_mag.npy (64, 32) int — CFAR |I|+|Q| magnitude - """ - - def __init__(self, npy_dir: str, use_mti: bool = True, - replay_fps: float = 5.0): - self._npy_dir = npy_dir - self._use_mti = use_mti - self._replay_fps = max(replay_fps, 0.1) - self._lock = threading.Lock() - self.is_open = False - self._packets: bytes = b"" - self._read_offset = 0 - self._frame_len = 0 - # Current signal-processing parameters - self._mti_enable: bool = use_mti - self._dc_notch_width: int = 2 - self._cfar_guard: int = 2 - self._cfar_train: int = 8 - self._cfar_alpha: int = 0x30 - self._cfar_mode: int = 0 # 0=CA, 1=GO, 2=SO - self._cfar_enable: bool = True - self._detect_threshold: int = 10000 # RTL default (host_detect_threshold) - # Raw source arrays (loaded once, reprocessed on param change) - self._dop_mti_i: np.ndarray | None = None - self._dop_mti_q: np.ndarray | None = None - self._dop_nomti_i: np.ndarray | None = None - self._dop_nomti_q: np.ndarray | None = None - self._range_i_vec: np.ndarray | None = None - self._range_q_vec: np.ndarray | None = None - # Rebuild flag - self._needs_rebuild = False - - def open(self, _device_index: int = 0) -> bool: - try: - self._load_arrays() - self._packets = self._build_packets() - self._frame_len = len(self._packets) - self._read_offset = 0 - self.is_open = True - log.info(f"Replay connection opened: {self._npy_dir} " - f"(MTI={'ON' if self._mti_enable else 'OFF'}, " - f"{self._frame_len} bytes/frame)") - return True - except (OSError, ValueError, IndexError, struct.error) as e: - log.error(f"Replay open failed: {e}") - return False - - def close(self): - self.is_open = False - - def read(self, size: int = 4096) -> bytes | None: - if not self.is_open: - return None - # Pace reads to target FPS (spread across ~64 reads per frame) - time.sleep((1.0 / self._replay_fps) / (NUM_CELLS / 32)) - with self._lock: - # If params changed, rebuild packets - if self._needs_rebuild: - self._packets = self._build_packets() - self._frame_len = len(self._packets) - self._read_offset = 0 - self._needs_rebuild = False - end = self._read_offset + size - if end <= self._frame_len: - chunk = self._packets[self._read_offset:end] - self._read_offset = end - else: - chunk = self._packets[self._read_offset:] - self._read_offset = 0 - return chunk - - def write(self, data: bytes) -> bool: - """ - Handle host commands in replay mode. - Signal-processing params (CFAR, MTI, DC notch) trigger re-processing. - Hardware-only params are silently ignored. - """ - if len(data) < 4: - return True - word = struct.unpack(">I", data[:4])[0] - opcode = (word >> 24) & 0xFF - value = word & 0xFFFF - - if opcode in _REPLAY_ADJUSTABLE_OPCODES: - changed = False - with self._lock: - if opcode == 0x03: # DETECT_THRESHOLD - if self._detect_threshold != value: - self._detect_threshold = value - changed = True - elif opcode == 0x21: # CFAR_GUARD - if self._cfar_guard != value: - self._cfar_guard = value - changed = True - elif opcode == 0x22: # CFAR_TRAIN - if self._cfar_train != value: - self._cfar_train = value - changed = True - elif opcode == 0x23: # CFAR_ALPHA - if self._cfar_alpha != value: - self._cfar_alpha = value - changed = True - elif opcode == 0x24: # CFAR_MODE - if self._cfar_mode != value: - self._cfar_mode = value - changed = True - elif opcode == 0x25: # CFAR_ENABLE - new_en = bool(value) - if self._cfar_enable != new_en: - self._cfar_enable = new_en - changed = True - elif opcode == 0x26: # MTI_ENABLE - new_en = bool(value) - if self._mti_enable != new_en: - self._mti_enable = new_en - changed = True - elif opcode == 0x27 and self._dc_notch_width != value: # DC_NOTCH_WIDTH - self._dc_notch_width = value - changed = True - if changed: - self._needs_rebuild = True - if changed: - log.info(f"Replay param updated: opcode=0x{opcode:02X} " - f"value={value} — will re-process") - else: - log.debug(f"Replay param unchanged: opcode=0x{opcode:02X} " - f"value={value}") - elif opcode in _HARDWARE_ONLY_OPCODES: - log.debug(f"Replay: hardware-only opcode 0x{opcode:02X} " - f"(ignored in replay mode)") - else: - log.debug(f"Replay: unknown opcode 0x{opcode:02X} (ignored)") - return True - - def _load_arrays(self): - """Load source npy arrays once.""" - npy = self._npy_dir - # MTI Doppler - self._dop_mti_i = np.load( - os.path.join(npy, "fullchain_mti_doppler_i.npy")).astype(np.int64) - self._dop_mti_q = np.load( - os.path.join(npy, "fullchain_mti_doppler_q.npy")).astype(np.int64) - # Non-MTI Doppler - self._dop_nomti_i = np.load( - os.path.join(npy, "doppler_map_i.npy")).astype(np.int64) - self._dop_nomti_q = np.load( - os.path.join(npy, "doppler_map_q.npy")).astype(np.int64) - # Range data - try: - range_i_all = np.load( - os.path.join(npy, "decimated_range_i.npy")).astype(np.int64) - range_q_all = np.load( - os.path.join(npy, "decimated_range_q.npy")).astype(np.int64) - self._range_i_vec = range_i_all[-1, :] # last chirp - self._range_q_vec = range_q_all[-1, :] - except FileNotFoundError: - self._range_i_vec = np.zeros(NUM_RANGE_BINS, dtype=np.int64) - self._range_q_vec = np.zeros(NUM_RANGE_BINS, dtype=np.int64) - - def _build_packets(self) -> bytes: - """Build a full frame of USB data packets from current params.""" - # Select Doppler data based on MTI - if self._mti_enable: - dop_i = self._dop_mti_i - dop_q = self._dop_mti_q - else: - dop_i = self._dop_nomti_i - dop_q = self._dop_nomti_q - - # Apply DC notch - dop_i, dop_q = _replay_dc_notch(dop_i, dop_q, self._dc_notch_width) - - # Run CFAR - if self._cfar_enable: - det, _mag = _replay_cfar( - dop_i, dop_q, - guard=self._cfar_guard, - train=self._cfar_train, - alpha_q44=self._cfar_alpha, - mode=self._cfar_mode, - ) - else: - # Simple threshold fallback matching RTL cfar_ca.v: - # detect = (|I| + |Q|) > detect_threshold (L1 norm) - mag = np.abs(dop_i) + np.abs(dop_q) - det = mag > self._detect_threshold - - det_count = int(det.sum()) - log.info(f"Replay: rebuilt {NUM_CELLS} packets (" - f"MTI={'ON' if self._mti_enable else 'OFF'}, " - f"DC_notch={self._dc_notch_width}, " - f"CFAR={'ON' if self._cfar_enable else 'OFF'} " - f"G={self._cfar_guard} T={self._cfar_train} " - f"a=0x{self._cfar_alpha:02X} m={self._cfar_mode}, " - f"{det_count} detections)") - - range_i = self._range_i_vec - range_q = self._range_q_vec - - return self._build_packets_data(range_i, range_q, dop_i, dop_q, det) - - def _build_packets_data(self, range_i, range_q, dop_i, dop_q, det) -> bytes: - """Build 11-byte data packets for FT2232H interface.""" - buf = bytearray(NUM_CELLS * DATA_PACKET_SIZE) - pos = 0 - for rbin in range(NUM_RANGE_BINS): - ri = int(np.clip(range_i[rbin], -32768, 32767)) - rq = int(np.clip(range_q[rbin], -32768, 32767)) - rq_bytes = struct.pack(">h", rq) - ri_bytes = struct.pack(">h", ri) - for dbin in range(NUM_DOPPLER_BINS): - di = int(np.clip(dop_i[rbin, dbin], -32768, 32767)) - dq = int(np.clip(dop_q[rbin, dbin], -32768, 32767)) - d = 1 if det[rbin, dbin] else 0 - - buf[pos] = HEADER_BYTE - pos += 1 - buf[pos:pos+2] = rq_bytes - pos += 2 - buf[pos:pos+2] = ri_bytes - pos += 2 - buf[pos:pos+2] = struct.pack(">h", di) - pos += 2 - buf[pos:pos+2] = struct.pack(">h", dq) - pos += 2 - buf[pos] = d - pos += 1 - buf[pos] = FOOTER_BYTE - pos += 1 - - return bytes(buf) # ============================================================================ diff --git a/9_Firmware/9_3_GUI/test_radar_dashboard.py b/9_Firmware/9_3_GUI/test_radar_dashboard.py index b8bf6cf..f94b85a 100644 --- a/9_Firmware/9_3_GUI/test_radar_dashboard.py +++ b/9_Firmware/9_3_GUI/test_radar_dashboard.py @@ -19,9 +19,8 @@ from radar_protocol import ( RadarProtocol, FT2232HConnection, DataRecorder, RadarAcquisition, RadarFrame, StatusResponse, Opcode, HEADER_BYTE, FOOTER_BYTE, STATUS_HEADER_BYTE, - NUM_RANGE_BINS, NUM_DOPPLER_BINS, NUM_CELLS, + NUM_RANGE_BINS, NUM_DOPPLER_BINS, DATA_PACKET_SIZE, - _HARDWARE_ONLY_OPCODES, ) @@ -459,218 +458,6 @@ class TestEndToEnd(unittest.TestCase): self.assertEqual(result["detection"], 1) -class TestReplayConnection(unittest.TestCase): - """Test ReplayConnection with real .npy data files.""" - - NPY_DIR = os.path.join( - os.path.dirname(__file__), "..", "9_2_FPGA", "tb", "cosim", - "real_data", "hex" - ) - - def _npy_available(self): - """Check if the npy data files exist.""" - return os.path.isfile(os.path.join(self.NPY_DIR, - "fullchain_mti_doppler_i.npy")) - - def test_replay_open_close(self): - """ReplayConnection opens and closes without error.""" - if not self._npy_available(): - self.skipTest("npy data files not found") - from radar_protocol import ReplayConnection - conn = ReplayConnection(self.NPY_DIR, use_mti=True) - self.assertTrue(conn.open()) - self.assertTrue(conn.is_open) - conn.close() - self.assertFalse(conn.is_open) - - def test_replay_packet_count(self): - """Replay builds exactly NUM_CELLS (2048) packets.""" - if not self._npy_available(): - self.skipTest("npy data files not found") - from radar_protocol import ReplayConnection - conn = ReplayConnection(self.NPY_DIR, use_mti=True) - conn.open() - # Each packet is 11 bytes, total = 2048 * 11 - expected_bytes = NUM_CELLS * DATA_PACKET_SIZE - self.assertEqual(conn._frame_len, expected_bytes) - conn.close() - - def test_replay_packets_parseable(self): - """Every packet from replay can be parsed by RadarProtocol.""" - if not self._npy_available(): - self.skipTest("npy data files not found") - from radar_protocol import ReplayConnection - conn = ReplayConnection(self.NPY_DIR, use_mti=True) - conn.open() - raw = conn._packets - boundaries = RadarProtocol.find_packet_boundaries(raw) - self.assertEqual(len(boundaries), NUM_CELLS) - parsed_count = 0 - det_count = 0 - for start, end, ptype in boundaries: - self.assertEqual(ptype, "data") - result = RadarProtocol.parse_data_packet(raw[start:end]) - self.assertIsNotNone(result) - parsed_count += 1 - if result["detection"]: - det_count += 1 - self.assertEqual(parsed_count, NUM_CELLS) - # Default: MTI=ON, DC_notch=2, CFAR CA g=2 t=8 a=0x30 → 4 detections - self.assertEqual(det_count, 4) - conn.close() - - def test_replay_read_loops(self): - """Read returns data and loops back around.""" - if not self._npy_available(): - self.skipTest("npy data files not found") - from radar_protocol import ReplayConnection - conn = ReplayConnection(self.NPY_DIR, use_mti=True, replay_fps=1000) - conn.open() - total_read = 0 - for _ in range(100): - chunk = conn.read(1024) - self.assertIsNotNone(chunk) - total_read += len(chunk) - self.assertGreater(total_read, 0) - conn.close() - - def test_replay_no_mti(self): - """ReplayConnection works with use_mti=False (CFAR still runs).""" - if not self._npy_available(): - self.skipTest("npy data files not found") - from radar_protocol import ReplayConnection - conn = ReplayConnection(self.NPY_DIR, use_mti=False) - conn.open() - self.assertEqual(conn._frame_len, NUM_CELLS * DATA_PACKET_SIZE) - # No-MTI with DC notch=2 and default CFAR → 0 detections - raw = conn._packets - boundaries = RadarProtocol.find_packet_boundaries(raw) - det_count = sum(1 for s, e, t in boundaries - if RadarProtocol.parse_data_packet(raw[s:e]).get("detection", 0)) - self.assertEqual(det_count, 0) - conn.close() - - def test_replay_write_returns_true(self): - """Write on replay connection returns True.""" - if not self._npy_available(): - self.skipTest("npy data files not found") - from radar_protocol import ReplayConnection - conn = ReplayConnection(self.NPY_DIR) - conn.open() - self.assertTrue(conn.write(b"\x01\x00\x00\x01")) - conn.close() - - def test_replay_adjustable_param_cfar_guard(self): - """Changing CFAR guard via write() triggers re-processing.""" - if not self._npy_available(): - self.skipTest("npy data files not found") - from radar_protocol import ReplayConnection - conn = ReplayConnection(self.NPY_DIR, use_mti=True) - conn.open() - # Initial: guard=2 → 4 detections - self.assertFalse(conn._needs_rebuild) - # Send CFAR_GUARD=4 - cmd = RadarProtocol.build_command(0x21, 4) - conn.write(cmd) - self.assertTrue(conn._needs_rebuild) - self.assertEqual(conn._cfar_guard, 4) - # Read triggers rebuild - conn.read(1024) - self.assertFalse(conn._needs_rebuild) - conn.close() - - def test_replay_adjustable_param_mti_toggle(self): - """Toggling MTI via write() triggers re-processing.""" - if not self._npy_available(): - self.skipTest("npy data files not found") - from radar_protocol import ReplayConnection - conn = ReplayConnection(self.NPY_DIR, use_mti=True) - conn.open() - # Disable MTI - cmd = RadarProtocol.build_command(0x26, 0) - conn.write(cmd) - self.assertTrue(conn._needs_rebuild) - self.assertFalse(conn._mti_enable) - # Read to trigger rebuild, then count detections - # Drain all packets after rebuild - conn.read(1024) # triggers rebuild - raw = conn._packets - boundaries = RadarProtocol.find_packet_boundaries(raw) - det_count = sum(1 for s, e, t in boundaries - if RadarProtocol.parse_data_packet(raw[s:e]).get("detection", 0)) - # No-MTI with default CFAR → 0 detections - self.assertEqual(det_count, 0) - conn.close() - - def test_replay_adjustable_param_dc_notch(self): - """Changing DC notch width via write() triggers re-processing.""" - if not self._npy_available(): - self.skipTest("npy data files not found") - from radar_protocol import ReplayConnection - conn = ReplayConnection(self.NPY_DIR, use_mti=True) - conn.open() - # Change DC notch to 0 (no notch) - cmd = RadarProtocol.build_command(0x27, 0) - conn.write(cmd) - self.assertTrue(conn._needs_rebuild) - self.assertEqual(conn._dc_notch_width, 0) - conn.read(1024) # triggers rebuild - raw = conn._packets - boundaries = RadarProtocol.find_packet_boundaries(raw) - det_count = sum(1 for s, e, t in boundaries - if RadarProtocol.parse_data_packet(raw[s:e]).get("detection", 0)) - # DC notch=0 with MTI → 6 detections (more noise passes through) - self.assertEqual(det_count, 6) - conn.close() - - def test_replay_hardware_opcode_ignored(self): - """Hardware-only opcodes don't trigger rebuild.""" - if not self._npy_available(): - self.skipTest("npy data files not found") - from radar_protocol import ReplayConnection - conn = ReplayConnection(self.NPY_DIR, use_mti=True) - conn.open() - # Send TRIGGER (hardware-only) - cmd = RadarProtocol.build_command(0x01, 1) - conn.write(cmd) - self.assertFalse(conn._needs_rebuild) - # Send STREAM_CONTROL (hardware-only, opcode 0x04) - cmd = RadarProtocol.build_command(0x04, 7) - conn.write(cmd) - self.assertFalse(conn._needs_rebuild) - conn.close() - - def test_replay_same_value_no_rebuild(self): - """Setting same value as current doesn't trigger rebuild.""" - if not self._npy_available(): - self.skipTest("npy data files not found") - from radar_protocol import ReplayConnection - conn = ReplayConnection(self.NPY_DIR, use_mti=True) - conn.open() - # CFAR guard already 2 - cmd = RadarProtocol.build_command(0x21, 2) - conn.write(cmd) - self.assertFalse(conn._needs_rebuild) - conn.close() - - def test_replay_self_test_opcodes_are_hardware_only(self): - """Self-test opcodes 0x30/0x31 are hardware-only (ignored in replay).""" - if not self._npy_available(): - self.skipTest("npy data files not found") - from radar_protocol import ReplayConnection - conn = ReplayConnection(self.NPY_DIR, use_mti=True) - conn.open() - # Send self-test trigger - cmd = RadarProtocol.build_command(0x30, 1) - conn.write(cmd) - self.assertFalse(conn._needs_rebuild) - # Send self-test status request - cmd = RadarProtocol.build_command(0x31, 0) - conn.write(cmd) - self.assertFalse(conn._needs_rebuild) - conn.close() - - class TestOpcodeEnum(unittest.TestCase): """Verify Opcode enum matches RTL host register map (radar_system_top.v).""" @@ -690,15 +477,6 @@ class TestOpcodeEnum(unittest.TestCase): """SELF_TEST_STATUS opcode must be 0x31.""" self.assertEqual(Opcode.SELF_TEST_STATUS, 0x31) - def test_self_test_in_hardware_only(self): - """Self-test opcodes must be in _HARDWARE_ONLY_OPCODES.""" - self.assertIn(0x30, _HARDWARE_ONLY_OPCODES) - self.assertIn(0x31, _HARDWARE_ONLY_OPCODES) - - def test_0x16_in_hardware_only(self): - """GAIN_SHIFT 0x16 must be in _HARDWARE_ONLY_OPCODES.""" - self.assertIn(0x16, _HARDWARE_ONLY_OPCODES) - def test_stream_control_is_0x04(self): """STREAM_CONTROL must be 0x04 (matches radar_system_top.v:906).""" self.assertEqual(Opcode.STREAM_CONTROL, 0x04) @@ -717,11 +495,6 @@ class TestOpcodeEnum(unittest.TestCase): self.assertEqual(Opcode.DETECT_THRESHOLD, 0x03) self.assertEqual(Opcode.STREAM_CONTROL, 0x04) - def test_stale_opcodes_not_in_hardware_only(self): - """Old wrong opcode values must not be in _HARDWARE_ONLY_OPCODES.""" - self.assertNotIn(0x05, _HARDWARE_ONLY_OPCODES) # was wrong STREAM_ENABLE - self.assertNotIn(0x06, _HARDWARE_ONLY_OPCODES) # was wrong GAIN_SHIFT - def test_all_rtl_opcodes_present(self): """Every RTL opcode (from radar_system_top.v) has a matching Opcode enum member.""" expected = {0x01, 0x02, 0x03, 0x04, diff --git a/9_Firmware/9_3_GUI/test_v7.py b/9_Firmware/9_3_GUI/test_v7.py index bb54e48..4f70ecd 100644 --- a/9_Firmware/9_3_GUI/test_v7.py +++ b/9_Firmware/9_3_GUI/test_v7.py @@ -11,6 +11,7 @@ Does NOT require a running Qt event loop — only unit-testable components. Run with: python -m unittest test_v7 -v """ +import os import struct import unittest from dataclasses import asdict @@ -414,6 +415,559 @@ class TestAGCVisualizationV7(unittest.TestCase): self.assertEqual(pick_color(11), DARK_ERROR) +# ============================================================================= +# Test: v7.models.WaveformConfig +# ============================================================================= + +class TestWaveformConfig(unittest.TestCase): + """WaveformConfig dataclass and derived physical properties.""" + + def test_defaults(self): + from v7.models import WaveformConfig + wc = WaveformConfig() + self.assertEqual(wc.sample_rate_hz, 4e6) + self.assertEqual(wc.bandwidth_hz, 500e6) + self.assertEqual(wc.chirp_duration_s, 300e-6) + self.assertEqual(wc.center_freq_hz, 10.525e9) + self.assertEqual(wc.n_range_bins, 64) + self.assertEqual(wc.n_doppler_bins, 32) + self.assertEqual(wc.fft_size, 1024) + self.assertEqual(wc.decimation_factor, 16) + + def test_range_resolution(self): + """range_resolution_m should be ~5.62 m/bin with ADI defaults.""" + from v7.models import WaveformConfig + wc = WaveformConfig() + self.assertAlmostEqual(wc.range_resolution_m, 5.621, places=1) + + def test_velocity_resolution(self): + """velocity_resolution_mps should be ~1.484 m/s/bin.""" + from v7.models import WaveformConfig + wc = WaveformConfig() + self.assertAlmostEqual(wc.velocity_resolution_mps, 1.484, places=2) + + def test_max_range(self): + """max_range_m = range_resolution * n_range_bins.""" + from v7.models import WaveformConfig + wc = WaveformConfig() + self.assertAlmostEqual(wc.max_range_m, wc.range_resolution_m * 64, places=1) + + def test_max_velocity(self): + """max_velocity_mps = velocity_resolution * n_doppler_bins / 2.""" + from v7.models import WaveformConfig + wc = WaveformConfig() + self.assertAlmostEqual( + wc.max_velocity_mps, + wc.velocity_resolution_mps * 16, + places=2, + ) + + def test_custom_params(self): + """Non-default parameters correctly change derived values.""" + from v7.models import WaveformConfig + wc1 = WaveformConfig() + wc2 = WaveformConfig(bandwidth_hz=1e9) # double BW → halve range res + self.assertAlmostEqual(wc2.range_resolution_m, wc1.range_resolution_m / 2, places=2) + + def test_zero_center_freq_velocity(self): + """Zero center freq should cause ZeroDivisionError in velocity calc.""" + from v7.models import WaveformConfig + wc = WaveformConfig(center_freq_hz=0.0) + with self.assertRaises(ZeroDivisionError): + _ = wc.velocity_resolution_mps + + +# ============================================================================= +# Test: v7.software_fpga.SoftwareFPGA +# ============================================================================= + +class TestSoftwareFPGA(unittest.TestCase): + """SoftwareFPGA register interface and signal chain.""" + + def _make_fpga(self): + from v7.software_fpga import SoftwareFPGA + return SoftwareFPGA() + + def test_reset_defaults(self): + """Register reset values match FPGA RTL (radar_system_top.v).""" + fpga = self._make_fpga() + self.assertEqual(fpga.detect_threshold, 10_000) + self.assertEqual(fpga.gain_shift, 0) + self.assertFalse(fpga.cfar_enable) + self.assertEqual(fpga.cfar_guard, 2) + self.assertEqual(fpga.cfar_train, 8) + self.assertEqual(fpga.cfar_alpha, 0x30) + self.assertEqual(fpga.cfar_mode, 0) + self.assertFalse(fpga.mti_enable) + self.assertEqual(fpga.dc_notch_width, 0) + self.assertFalse(fpga.agc_enable) + self.assertEqual(fpga.agc_target, 200) + self.assertEqual(fpga.agc_attack, 1) + self.assertEqual(fpga.agc_decay, 1) + self.assertEqual(fpga.agc_holdoff, 4) + + def test_setter_detect_threshold(self): + fpga = self._make_fpga() + fpga.set_detect_threshold(5000) + self.assertEqual(fpga.detect_threshold, 5000) + + def test_setter_detect_threshold_clamp_16bit(self): + fpga = self._make_fpga() + fpga.set_detect_threshold(0x1FFFF) # 17-bit + self.assertEqual(fpga.detect_threshold, 0xFFFF) + + def test_setter_gain_shift_clamp_4bit(self): + fpga = self._make_fpga() + fpga.set_gain_shift(0xFF) + self.assertEqual(fpga.gain_shift, 0x0F) + + def test_setter_cfar_enable(self): + fpga = self._make_fpga() + fpga.set_cfar_enable(True) + self.assertTrue(fpga.cfar_enable) + fpga.set_cfar_enable(False) + self.assertFalse(fpga.cfar_enable) + + def test_setter_cfar_guard_clamp_4bit(self): + fpga = self._make_fpga() + fpga.set_cfar_guard(0x1F) + self.assertEqual(fpga.cfar_guard, 0x0F) + + def test_setter_cfar_train_min_1(self): + """CFAR train cells clamped to min 1.""" + fpga = self._make_fpga() + fpga.set_cfar_train(0) + self.assertEqual(fpga.cfar_train, 1) + + def test_setter_cfar_train_clamp_5bit(self): + fpga = self._make_fpga() + fpga.set_cfar_train(0x3F) + self.assertEqual(fpga.cfar_train, 0x1F) + + def test_setter_cfar_alpha_clamp_8bit(self): + fpga = self._make_fpga() + fpga.set_cfar_alpha(0x1FF) + self.assertEqual(fpga.cfar_alpha, 0xFF) + + def test_setter_cfar_mode_clamp_2bit(self): + fpga = self._make_fpga() + fpga.set_cfar_mode(7) + self.assertEqual(fpga.cfar_mode, 3) + + def test_setter_mti_enable(self): + fpga = self._make_fpga() + fpga.set_mti_enable(True) + self.assertTrue(fpga.mti_enable) + + def test_setter_dc_notch_clamp_3bit(self): + fpga = self._make_fpga() + fpga.set_dc_notch_width(0xFF) + self.assertEqual(fpga.dc_notch_width, 7) + + def test_setter_agc_params_selective(self): + """set_agc_params only changes provided fields.""" + fpga = self._make_fpga() + fpga.set_agc_params(target=100) + self.assertEqual(fpga.agc_target, 100) + self.assertEqual(fpga.agc_attack, 1) # unchanged + fpga.set_agc_params(attack=3, decay=5) + self.assertEqual(fpga.agc_attack, 3) + self.assertEqual(fpga.agc_decay, 5) + self.assertEqual(fpga.agc_target, 100) # unchanged + + def test_setter_agc_params_clamp(self): + fpga = self._make_fpga() + fpga.set_agc_params(target=0xFFF, attack=0xFF, decay=0xFF, holdoff=0xFF) + self.assertEqual(fpga.agc_target, 0xFF) + self.assertEqual(fpga.agc_attack, 0x0F) + self.assertEqual(fpga.agc_decay, 0x0F) + self.assertEqual(fpga.agc_holdoff, 0x0F) + + +class TestSoftwareFPGASignalChain(unittest.TestCase): + """SoftwareFPGA.process_chirps with real co-sim data.""" + + COSIM_DIR = os.path.join( + os.path.dirname(__file__), "..", "9_2_FPGA", "tb", "cosim", + "real_data", "hex" + ) + + def _cosim_available(self): + return os.path.isfile(os.path.join(self.COSIM_DIR, "doppler_map_i.npy")) + + def test_process_chirps_returns_radar_frame(self): + """process_chirps produces a RadarFrame with correct shapes.""" + if not self._cosim_available(): + self.skipTest("co-sim data not found") + from v7.software_fpga import SoftwareFPGA + from radar_protocol import RadarFrame + + # Load decimated range data as minimal input (32 chirps x 64 bins) + dec_i = np.load(os.path.join(self.COSIM_DIR, "decimated_range_i.npy")) + dec_q = np.load(os.path.join(self.COSIM_DIR, "decimated_range_q.npy")) + + # Build fake 1024-sample chirps from decimated data (pad with zeros) + n_chirps = dec_i.shape[0] + iq_i = np.zeros((n_chirps, 1024), dtype=np.int64) + iq_q = np.zeros((n_chirps, 1024), dtype=np.int64) + # Put decimated data into first 64 bins so FFT has something + iq_i[:, :dec_i.shape[1]] = dec_i + iq_q[:, :dec_q.shape[1]] = dec_q + + fpga = SoftwareFPGA() + frame = fpga.process_chirps(iq_i, iq_q, frame_number=42, timestamp=1.0) + + self.assertIsInstance(frame, RadarFrame) + self.assertEqual(frame.frame_number, 42) + self.assertAlmostEqual(frame.timestamp, 1.0) + self.assertEqual(frame.range_doppler_i.shape, (64, 32)) + self.assertEqual(frame.range_doppler_q.shape, (64, 32)) + self.assertEqual(frame.magnitude.shape, (64, 32)) + self.assertEqual(frame.detections.shape, (64, 32)) + self.assertEqual(frame.range_profile.shape, (64,)) + self.assertEqual(frame.detection_count, int(frame.detections.sum())) + + def test_cfar_enable_changes_detections(self): + """Enabling CFAR vs simple threshold should yield different detection counts.""" + if not self._cosim_available(): + self.skipTest("co-sim data not found") + from v7.software_fpga import SoftwareFPGA + + iq_i = np.zeros((32, 1024), dtype=np.int64) + iq_q = np.zeros((32, 1024), dtype=np.int64) + # Inject a single strong tone in bin 10 of every chirp + iq_i[:, 10] = 5000 + iq_q[:, 10] = 3000 + + fpga_thresh = SoftwareFPGA() + fpga_thresh.set_detect_threshold(1) # very low → many detections + frame_thresh = fpga_thresh.process_chirps(iq_i, iq_q) + + fpga_cfar = SoftwareFPGA() + fpga_cfar.set_cfar_enable(True) + fpga_cfar.set_cfar_alpha(0x10) # low alpha → more detections + frame_cfar = fpga_cfar.process_chirps(iq_i, iq_q) + + # Just verify both produce valid frames — exact counts depend on chain + self.assertIsNotNone(frame_thresh) + self.assertIsNotNone(frame_cfar) + self.assertEqual(frame_thresh.magnitude.shape, (64, 32)) + self.assertEqual(frame_cfar.magnitude.shape, (64, 32)) + + +class TestQuantizeRawIQ(unittest.TestCase): + """quantize_raw_iq utility function.""" + + def test_3d_input(self): + """3-D (frames, chirps, samples) → uses first frame.""" + from v7.software_fpga import quantize_raw_iq + raw = np.random.randn(5, 32, 1024) + 1j * np.random.randn(5, 32, 1024) + iq_i, iq_q = quantize_raw_iq(raw) + self.assertEqual(iq_i.shape, (32, 1024)) + self.assertEqual(iq_q.shape, (32, 1024)) + self.assertTrue(np.all(np.abs(iq_i) <= 32767)) + self.assertTrue(np.all(np.abs(iq_q) <= 32767)) + + def test_2d_input(self): + """2-D (chirps, samples) → works directly.""" + from v7.software_fpga import quantize_raw_iq + raw = np.random.randn(32, 1024) + 1j * np.random.randn(32, 1024) + iq_i, _iq_q = quantize_raw_iq(raw) + self.assertEqual(iq_i.shape, (32, 1024)) + + def test_zero_input(self): + """All-zero complex input → all-zero output.""" + from v7.software_fpga import quantize_raw_iq + raw = np.zeros((32, 1024), dtype=np.complex128) + iq_i, iq_q = quantize_raw_iq(raw) + self.assertTrue(np.all(iq_i == 0)) + self.assertTrue(np.all(iq_q == 0)) + + def test_peak_target_scaling(self): + """Peak of output should be near peak_target.""" + from v7.software_fpga import quantize_raw_iq + raw = np.zeros((32, 1024), dtype=np.complex128) + raw[0, 0] = 1.0 + 0j # single peak + iq_i, _iq_q = quantize_raw_iq(raw, peak_target=500) + # The peak I value should be exactly 500 (sole max) + self.assertEqual(int(iq_i[0, 0]), 500) + + +# ============================================================================= +# Test: v7.replay (ReplayEngine, detect_format) +# ============================================================================= + +class TestDetectFormat(unittest.TestCase): + """detect_format auto-detection logic.""" + + COSIM_DIR = os.path.join( + os.path.dirname(__file__), "..", "9_2_FPGA", "tb", "cosim", + "real_data", "hex" + ) + + def test_cosim_dir(self): + if not os.path.isdir(self.COSIM_DIR): + self.skipTest("co-sim dir not found") + from v7.replay import detect_format, ReplayFormat + self.assertEqual(detect_format(self.COSIM_DIR), ReplayFormat.COSIM_DIR) + + def test_npy_file(self): + """A .npy file → RAW_IQ_NPY.""" + from v7.replay import detect_format, ReplayFormat + import tempfile + with tempfile.NamedTemporaryFile(suffix=".npy", delete=False) as f: + np.save(f, np.zeros((2, 32, 1024), dtype=np.complex128)) + tmp = f.name + try: + self.assertEqual(detect_format(tmp), ReplayFormat.RAW_IQ_NPY) + finally: + os.unlink(tmp) + + def test_h5_file(self): + """A .h5 file → HDF5.""" + from v7.replay import detect_format, ReplayFormat + self.assertEqual(detect_format("/tmp/fake_recording.h5"), ReplayFormat.HDF5) + + def test_unknown_extension_raises(self): + from v7.replay import detect_format + with self.assertRaises(ValueError): + detect_format("/tmp/data.csv") + + def test_empty_dir_raises(self): + """Directory without co-sim files → ValueError.""" + from v7.replay import detect_format + import tempfile + with tempfile.TemporaryDirectory() as td, self.assertRaises(ValueError): + detect_format(td) + + +class TestReplayEngineCosim(unittest.TestCase): + """ReplayEngine loading from FPGA co-sim directory.""" + + COSIM_DIR = os.path.join( + os.path.dirname(__file__), "..", "9_2_FPGA", "tb", "cosim", + "real_data", "hex" + ) + + def _available(self): + return os.path.isfile(os.path.join(self.COSIM_DIR, "doppler_map_i.npy")) + + def test_load_cosim(self): + if not self._available(): + self.skipTest("co-sim data not found") + from v7.replay import ReplayEngine, ReplayFormat + engine = ReplayEngine(self.COSIM_DIR) + self.assertEqual(engine.fmt, ReplayFormat.COSIM_DIR) + self.assertEqual(engine.total_frames, 1) + + def test_get_frame_cosim(self): + if not self._available(): + self.skipTest("co-sim data not found") + from v7.replay import ReplayEngine + from radar_protocol import RadarFrame + engine = ReplayEngine(self.COSIM_DIR) + frame = engine.get_frame(0) + self.assertIsInstance(frame, RadarFrame) + self.assertEqual(frame.range_doppler_i.shape, (64, 32)) + self.assertEqual(frame.magnitude.shape, (64, 32)) + + def test_get_frame_out_of_range(self): + if not self._available(): + self.skipTest("co-sim data not found") + from v7.replay import ReplayEngine + engine = ReplayEngine(self.COSIM_DIR) + with self.assertRaises(IndexError): + engine.get_frame(1) + with self.assertRaises(IndexError): + engine.get_frame(-1) + + +class TestReplayEngineRawIQ(unittest.TestCase): + """ReplayEngine loading from raw IQ .npy cube.""" + + def test_load_raw_iq_synthetic(self): + """Synthetic raw IQ cube loads and produces correct frame count.""" + import tempfile + from v7.replay import ReplayEngine, ReplayFormat + from v7.software_fpga import SoftwareFPGA + + raw = np.random.randn(3, 32, 1024) + 1j * np.random.randn(3, 32, 1024) + with tempfile.NamedTemporaryFile(suffix=".npy", delete=False) as f: + np.save(f, raw) + tmp = f.name + try: + fpga = SoftwareFPGA() + engine = ReplayEngine(tmp, software_fpga=fpga) + self.assertEqual(engine.fmt, ReplayFormat.RAW_IQ_NPY) + self.assertEqual(engine.total_frames, 3) + finally: + os.unlink(tmp) + + def test_get_frame_raw_iq_synthetic(self): + """get_frame on raw IQ runs SoftwareFPGA and returns RadarFrame.""" + import tempfile + from v7.replay import ReplayEngine + from v7.software_fpga import SoftwareFPGA + from radar_protocol import RadarFrame + + raw = np.random.randn(2, 32, 1024) + 1j * np.random.randn(2, 32, 1024) + with tempfile.NamedTemporaryFile(suffix=".npy", delete=False) as f: + np.save(f, raw) + tmp = f.name + try: + fpga = SoftwareFPGA() + engine = ReplayEngine(tmp, software_fpga=fpga) + frame = engine.get_frame(0) + self.assertIsInstance(frame, RadarFrame) + self.assertEqual(frame.range_doppler_i.shape, (64, 32)) + self.assertEqual(frame.frame_number, 0) + finally: + os.unlink(tmp) + + def test_raw_iq_no_fpga_raises(self): + """Raw IQ get_frame without SoftwareFPGA → RuntimeError.""" + import tempfile + from v7.replay import ReplayEngine + + raw = np.random.randn(1, 32, 1024) + 1j * np.random.randn(1, 32, 1024) + with tempfile.NamedTemporaryFile(suffix=".npy", delete=False) as f: + np.save(f, raw) + tmp = f.name + try: + engine = ReplayEngine(tmp) + with self.assertRaises(RuntimeError): + engine.get_frame(0) + finally: + os.unlink(tmp) + + +class TestReplayEngineHDF5(unittest.TestCase): + """ReplayEngine loading from HDF5 recordings.""" + + def _skip_no_h5py(self): + try: + import h5py # noqa: F401 + except ImportError: + self.skipTest("h5py not installed") + + def test_load_hdf5_synthetic(self): + """Synthetic HDF5 loads and iterates frames.""" + self._skip_no_h5py() + import tempfile + import h5py + from v7.replay import ReplayEngine, ReplayFormat + from radar_protocol import RadarFrame + + with tempfile.NamedTemporaryFile(suffix=".h5", delete=False) as f: + tmp = f.name + + try: + with h5py.File(tmp, "w") as hf: + hf.attrs["creator"] = "test" + hf.attrs["range_bins"] = 64 + hf.attrs["doppler_bins"] = 32 + grp = hf.create_group("frames") + for i in range(3): + fg = grp.create_group(f"frame_{i:06d}") + fg.attrs["timestamp"] = float(i) + fg.attrs["frame_number"] = i + fg.attrs["detection_count"] = 0 + fg.create_dataset("range_doppler_i", + data=np.zeros((64, 32), dtype=np.int16)) + fg.create_dataset("range_doppler_q", + data=np.zeros((64, 32), dtype=np.int16)) + fg.create_dataset("magnitude", + data=np.zeros((64, 32), dtype=np.float64)) + fg.create_dataset("detections", + data=np.zeros((64, 32), dtype=np.uint8)) + fg.create_dataset("range_profile", + data=np.zeros(64, dtype=np.float64)) + + engine = ReplayEngine(tmp) + self.assertEqual(engine.fmt, ReplayFormat.HDF5) + self.assertEqual(engine.total_frames, 3) + + frame = engine.get_frame(1) + self.assertIsInstance(frame, RadarFrame) + self.assertEqual(frame.frame_number, 1) + self.assertEqual(frame.range_doppler_i.shape, (64, 32)) + engine.close() + finally: + os.unlink(tmp) + + +# ============================================================================= +# Test: v7.processing.extract_targets_from_frame +# ============================================================================= + +class TestExtractTargetsFromFrame(unittest.TestCase): + """extract_targets_from_frame bin-to-physical conversion.""" + + def _make_frame(self, det_cells=None): + """Create a minimal RadarFrame with optional detection cells.""" + from radar_protocol import RadarFrame + frame = RadarFrame() + if det_cells: + for rbin, dbin in det_cells: + frame.detections[rbin, dbin] = 1 + frame.magnitude[rbin, dbin] = 1000.0 + frame.detection_count = int(frame.detections.sum()) + frame.timestamp = 1.0 + return frame + + def test_no_detections(self): + from v7.processing import extract_targets_from_frame + frame = self._make_frame() + targets = extract_targets_from_frame(frame) + self.assertEqual(len(targets), 0) + + def test_single_detection_range(self): + """Detection at range bin 10 → range = 10 * range_resolution.""" + from v7.processing import extract_targets_from_frame + frame = self._make_frame(det_cells=[(10, 16)]) # dbin=16 = center → vel=0 + targets = extract_targets_from_frame(frame, range_resolution=5.621) + self.assertEqual(len(targets), 1) + self.assertAlmostEqual(targets[0].range, 10 * 5.621, places=2) + self.assertAlmostEqual(targets[0].velocity, 0.0, places=2) + + def test_velocity_sign(self): + """Doppler bin < center → negative velocity, > center → positive.""" + from v7.processing import extract_targets_from_frame + frame = self._make_frame(det_cells=[(5, 10), (5, 20)]) + targets = extract_targets_from_frame(frame, velocity_resolution=1.484) + # dbin=10: vel = (10-16)*1.484 = -8.904 (approaching) + # dbin=20: vel = (20-16)*1.484 = +5.936 (receding) + self.assertLess(targets[0].velocity, 0) + self.assertGreater(targets[1].velocity, 0) + + def test_snr_positive_for_nonzero_mag(self): + from v7.processing import extract_targets_from_frame + frame = self._make_frame(det_cells=[(3, 16)]) + targets = extract_targets_from_frame(frame) + self.assertGreater(targets[0].snr, 0) + + def test_gps_georef(self): + """With GPS data, targets get non-zero lat/lon.""" + from v7.processing import extract_targets_from_frame + from v7.models import GPSData + gps = GPSData(latitude=41.9, longitude=12.5, altitude=0.0, + pitch=0.0, heading=90.0) + frame = self._make_frame(det_cells=[(10, 16)]) + targets = extract_targets_from_frame( + frame, range_resolution=100.0, gps=gps) + # Should be roughly east of radar position + self.assertAlmostEqual(targets[0].latitude, 41.9, places=2) + self.assertGreater(targets[0].longitude, 12.5) + + def test_multiple_detections(self): + from v7.processing import extract_targets_from_frame + frame = self._make_frame(det_cells=[(0, 0), (10, 10), (63, 31)]) + targets = extract_targets_from_frame(frame) + self.assertEqual(len(targets), 3) + # IDs should be sequential 0, 1, 2 + self.assertEqual([t.id for t in targets], [0, 1, 2]) + + # ============================================================================= # Helper: lazy import of v7.models # ============================================================================= diff --git a/9_Firmware/9_3_GUI/v7/__init__.py b/9_Firmware/9_3_GUI/v7/__init__.py index 175da91..1da6cdb 100644 --- a/9_Firmware/9_3_GUI/v7/__init__.py +++ b/9_Firmware/9_3_GUI/v7/__init__.py @@ -14,6 +14,7 @@ from .models import ( GPSData, ProcessingConfig, TileServer, + WaveformConfig, DARK_BG, DARK_FG, DARK_ACCENT, DARK_HIGHLIGHT, DARK_BORDER, DARK_TEXT, DARK_BUTTON, DARK_BUTTON_HOVER, DARK_TREEVIEW, DARK_TREEVIEW_ALT, @@ -25,7 +26,6 @@ from .models import ( # Hardware interfaces — production protocol via radar_protocol.py from .hardware import ( FT2232HConnection, - ReplayConnection, RadarProtocol, Opcode, RadarAcquisition, @@ -40,8 +40,22 @@ from .processing import ( RadarProcessor, USBPacketParser, apply_pitch_correction, + polar_to_geographic, + extract_targets_from_frame, ) +# Software FPGA (depends on golden_reference.py in FPGA cosim tree) +try: # noqa: SIM105 + from .software_fpga import SoftwareFPGA, quantize_raw_iq +except ImportError: # golden_reference.py not available (e.g. deployment without FPGA tree) + pass + +# Replay engine (no PyQt6 dependency, but needs SoftwareFPGA for raw IQ path) +try: # noqa: SIM105 + from .replay import ReplayEngine, ReplayFormat +except ImportError: # software_fpga unavailable → replay also unavailable + pass + # Workers, map widget, and dashboard require PyQt6 — import lazily so that # tests/CI environments without PyQt6 can still access models/hardware/processing. try: @@ -49,7 +63,7 @@ try: RadarDataWorker, GPSDataWorker, TargetSimulator, - polar_to_geographic, + ReplayWorker, ) from .map_widget import ( @@ -67,6 +81,7 @@ except ImportError: # PyQt6 not installed (e.g. CI headless runner) __all__ = [ # noqa: RUF022 # models "RadarTarget", "RadarSettings", "GPSData", "ProcessingConfig", "TileServer", + "WaveformConfig", "DARK_BG", "DARK_FG", "DARK_ACCENT", "DARK_HIGHLIGHT", "DARK_BORDER", "DARK_TEXT", "DARK_BUTTON", "DARK_BUTTON_HOVER", "DARK_TREEVIEW", "DARK_TREEVIEW_ALT", @@ -74,15 +89,18 @@ __all__ = [ # noqa: RUF022 "USB_AVAILABLE", "FTDI_AVAILABLE", "SCIPY_AVAILABLE", "SKLEARN_AVAILABLE", "FILTERPY_AVAILABLE", # hardware — production FPGA protocol - "FT2232HConnection", "ReplayConnection", "RadarProtocol", "Opcode", + "FT2232HConnection", "RadarProtocol", "Opcode", "RadarAcquisition", "RadarFrame", "StatusResponse", "DataRecorder", "STM32USBInterface", # processing "RadarProcessor", "USBPacketParser", - "apply_pitch_correction", + "apply_pitch_correction", "polar_to_geographic", + "extract_targets_from_frame", + # software FPGA + replay + "SoftwareFPGA", "quantize_raw_iq", + "ReplayEngine", "ReplayFormat", # workers - "RadarDataWorker", "GPSDataWorker", "TargetSimulator", - "polar_to_geographic", + "RadarDataWorker", "GPSDataWorker", "TargetSimulator", "ReplayWorker", # map "MapBridge", "RadarMapWidget", # dashboard diff --git a/9_Firmware/9_3_GUI/v7/dashboard.py b/9_Firmware/9_3_GUI/v7/dashboard.py index 1b3b2a8..c7e2c64 100644 --- a/9_Firmware/9_3_GUI/v7/dashboard.py +++ b/9_Firmware/9_3_GUI/v7/dashboard.py @@ -14,7 +14,7 @@ RadarDashboard is a QMainWindow with six tabs: Uses production radar_protocol.py for all FPGA communication: - FT2232HConnection for real hardware - - ReplayConnection for offline .npy replay + - Unified replay via SoftwareFPGA + ReplayEngine + ReplayWorker - Mock mode (FT2232HConnection(mock=True)) for development The old STM32 magic-packet start flow has been removed. FPGA registers @@ -22,9 +22,12 @@ are controlled directly via 4-byte {opcode, addr, value_hi, value_lo} commands sent over FT2232H. """ +from __future__ import annotations + import time import logging from collections import deque +from typing import TYPE_CHECKING import numpy as np @@ -32,7 +35,7 @@ from PyQt6.QtWidgets import ( QMainWindow, QWidget, QVBoxLayout, QHBoxLayout, QGridLayout, QTabWidget, QSplitter, QGroupBox, QFrame, QScrollArea, QLabel, QPushButton, QComboBox, QCheckBox, - QDoubleSpinBox, QSpinBox, QLineEdit, + QDoubleSpinBox, QSpinBox, QLineEdit, QSlider, QFileDialog, QTableWidget, QTableWidgetItem, QHeaderView, QPlainTextEdit, QStatusBar, QMessageBox, ) @@ -52,7 +55,6 @@ from .models import ( ) from .hardware import ( FT2232HConnection, - ReplayConnection, RadarProtocol, RadarFrame, StatusResponse, @@ -60,9 +62,13 @@ from .hardware import ( STM32USBInterface, ) from .processing import RadarProcessor, USBPacketParser -from .workers import RadarDataWorker, GPSDataWorker, TargetSimulator +from .workers import RadarDataWorker, GPSDataWorker, TargetSimulator, ReplayWorker from .map_widget import RadarMapWidget +if TYPE_CHECKING: + from .software_fpga import SoftwareFPGA + from .replay import ReplayEngine + logger = logging.getLogger(__name__) # Frame dimensions from FPGA @@ -153,6 +159,12 @@ class RadarDashboard(QMainWindow): self._gps_worker: GPSDataWorker | None = None self._simulator: TargetSimulator | None = None + # Replay-specific objects (created when entering replay mode) + self._replay_worker: ReplayWorker | None = None + self._replay_engine: ReplayEngine | None = None + self._software_fpga: SoftwareFPGA | None = None + self._replay_mode = False + # State self._running = False self._demo_mode = False @@ -352,7 +364,7 @@ class RadarDashboard(QMainWindow): # Row 0: connection mode + device combos + buttons ctrl_layout.addWidget(QLabel("Mode:"), 0, 0) self._mode_combo = QComboBox() - self._mode_combo.addItems(["Mock", "Live FT2232H", "Replay (.npy)"]) + self._mode_combo.addItems(["Mock", "Live FT2232H", "Replay"]) self._mode_combo.setCurrentIndex(0) ctrl_layout.addWidget(self._mode_combo, 0, 1) @@ -401,6 +413,55 @@ class RadarDashboard(QMainWindow): self._status_label_main.setAlignment(Qt.AlignmentFlag.AlignRight) ctrl_layout.addWidget(self._status_label_main, 1, 5, 1, 5) + # Row 2: replay transport controls (hidden until replay mode) + self._replay_file_label = QLabel("No file loaded") + self._replay_file_label.setMinimumWidth(200) + ctrl_layout.addWidget(self._replay_file_label, 2, 0, 1, 2) + + self._replay_browse_btn = QPushButton("Browse...") + self._replay_browse_btn.clicked.connect(self._browse_replay_file) + ctrl_layout.addWidget(self._replay_browse_btn, 2, 2) + + self._replay_play_btn = QPushButton("Play") + self._replay_play_btn.clicked.connect(self._replay_play_pause) + ctrl_layout.addWidget(self._replay_play_btn, 2, 3) + + self._replay_stop_btn = QPushButton("Stop") + self._replay_stop_btn.clicked.connect(self._replay_stop) + ctrl_layout.addWidget(self._replay_stop_btn, 2, 4) + + self._replay_slider = QSlider(Qt.Orientation.Horizontal) + self._replay_slider.setMinimum(0) + self._replay_slider.setMaximum(0) + self._replay_slider.valueChanged.connect(self._replay_seek) + ctrl_layout.addWidget(self._replay_slider, 2, 5, 1, 2) + + self._replay_frame_label = QLabel("0 / 0") + ctrl_layout.addWidget(self._replay_frame_label, 2, 7) + + self._replay_speed_combo = QComboBox() + self._replay_speed_combo.addItems(["50 ms", "100 ms", "200 ms", "500 ms"]) + self._replay_speed_combo.setCurrentIndex(1) + self._replay_speed_combo.currentIndexChanged.connect(self._replay_speed_changed) + ctrl_layout.addWidget(self._replay_speed_combo, 2, 8) + + self._replay_loop_cb = QCheckBox("Loop") + self._replay_loop_cb.stateChanged.connect(self._replay_loop_changed) + ctrl_layout.addWidget(self._replay_loop_cb, 2, 9) + + # Collect replay widgets to toggle visibility + self._replay_controls = [ + self._replay_file_label, self._replay_browse_btn, + self._replay_play_btn, self._replay_stop_btn, + self._replay_slider, self._replay_frame_label, + self._replay_speed_combo, self._replay_loop_cb, + ] + for w in self._replay_controls: + w.setVisible(False) + + # Show/hide replay row when mode changes + self._mode_combo.currentTextChanged.connect(self._on_mode_changed) + layout.addWidget(ctrl) # ---- Display area (range-doppler + targets table) ------------------ @@ -1175,7 +1236,11 @@ class RadarDashboard(QMainWindow): logger.error(f"Failed to send FPGA cmd: 0x{opcode:02X}") def _send_fpga_validated(self, opcode: int, value: int, bits: int): - """Clamp value to bit-width and send.""" + """Clamp value to bit-width and send. + + In replay mode, also dispatch to the SoftwareFPGA setter and + re-process the current frame so the user sees immediate effect. + """ max_val = (1 << bits) - 1 clamped = max(0, min(value, max_val)) if clamped != value: @@ -1185,7 +1250,18 @@ class RadarDashboard(QMainWindow): key = f"0x{opcode:02X}" if key in self._param_spins: self._param_spins[key].setValue(clamped) - self._send_fpga_cmd(opcode, clamped) + + # Dispatch to real FPGA (live/mock mode) + if not self._replay_mode: + self._send_fpga_cmd(opcode, clamped) + return + + # Dispatch to SoftwareFPGA (replay mode) + if self._software_fpga is not None: + self._dispatch_to_software_fpga(opcode, clamped) + # Re-process current frame so the effect is visible immediately + if self._replay_worker is not None: + self._replay_worker.seek(self._replay_worker.current_index) def _send_custom_command(self): """Send custom opcode + value from the FPGA Control tab.""" @@ -1210,32 +1286,104 @@ class RadarDashboard(QMainWindow): mode = self._mode_combo.currentText() if "Mock" in mode: + self._replay_mode = False self._connection = FT2232HConnection(mock=True) if not self._connection.open(): QMessageBox.critical(self, "Error", "Failed to open mock connection.") return elif "Live" in mode: + self._replay_mode = False self._connection = FT2232HConnection(mock=False) if not self._connection.open(): QMessageBox.critical(self, "Error", "Failed to open FT2232H. Check USB connection.") return elif "Replay" in mode: - from PyQt6.QtWidgets import QFileDialog - npy_dir = QFileDialog.getExistingDirectory( - self, "Select .npy replay directory") - if not npy_dir: + self._replay_mode = True + replay_path = self._replay_file_label.text() + if replay_path == "No file loaded" or not replay_path: + QMessageBox.warning( + self, "Replay", + "Use 'Browse...' to select a replay" + " file or directory first.") return - self._connection = ReplayConnection(npy_dir) - if not self._connection.open(): - QMessageBox.critical(self, "Error", - "Failed to open replay connection.") + + from .software_fpga import SoftwareFPGA + from .replay import ReplayEngine + + self._software_fpga = SoftwareFPGA() + # Enable CFAR by default for raw IQ replay (avoids 2000+ detections) + self._software_fpga.set_cfar_enable(True) + + try: + self._replay_engine = ReplayEngine( + replay_path, self._software_fpga) + except (OSError, ValueError, RuntimeError) as exc: + QMessageBox.critical(self, "Replay Error", + f"Failed to open replay data:\n{exc}") + self._software_fpga = None return + + if self._replay_engine.total_frames == 0: + QMessageBox.warning(self, "Replay", "No frames found in the selected source.") + self._replay_engine.close() + self._replay_engine = None + self._software_fpga = None + return + + speed_map = {0: 50, 1: 100, 2: 200, 3: 500} + interval = speed_map.get(self._replay_speed_combo.currentIndex(), 100) + + self._replay_worker = ReplayWorker( + replay_engine=self._replay_engine, + settings=self._settings, + gps=self._radar_position, + frame_interval_ms=interval, + ) + self._replay_worker.frameReady.connect(self._on_frame_ready) + self._replay_worker.targetsUpdated.connect(self._on_radar_targets) + self._replay_worker.statsUpdated.connect(self._on_radar_stats) + self._replay_worker.errorOccurred.connect(self._on_worker_error) + self._replay_worker.playbackStateChanged.connect( + self._on_playback_state_changed) + self._replay_worker.frameIndexChanged.connect( + self._on_frame_index_changed) + self._replay_worker.set_loop(self._replay_loop_cb.isChecked()) + + self._replay_slider.setMaximum( + self._replay_engine.total_frames - 1) + self._replay_slider.setValue(0) + self._replay_frame_label.setText( + f"0 / {self._replay_engine.total_frames}") + + self._replay_worker.start() + # Update CFAR enable spinbox to reflect default-on for replay + if "0x25" in self._param_spins: + self._param_spins["0x25"].setValue(1) + + # UI state + self._running = True + self._start_time = time.time() + self._frame_count = 0 + self._start_btn.setEnabled(False) + self._stop_btn.setEnabled(True) + self._mode_combo.setEnabled(False) + self._demo_btn_main.setEnabled(False) + self._demo_btn_map.setEnabled(False) + n_frames = self._replay_engine.total_frames + self._status_label_main.setText( + f"Status: Replay ({n_frames} frames)") + self._sb_status.setText(f"Replay ({n_frames} frames)") + self._sb_mode.setText("Replay") + logger.info( + "Replay started: %s (%d frames)", + replay_path, n_frames) + return else: QMessageBox.warning(self, "Warning", "Unknown connection mode.") return - # Start radar worker + # Start radar worker (mock / live — NOT replay) self._radar_worker = RadarDataWorker( connection=self._connection, processor=self._processor, @@ -1288,6 +1436,18 @@ class RadarDashboard(QMainWindow): self._radar_worker.wait(2000) self._radar_worker = None + if self._replay_worker: + self._replay_worker.stop() + self._replay_worker.wait(2000) + self._replay_worker = None + + if self._replay_engine: + self._replay_engine.close() + self._replay_engine = None + + self._software_fpga = None + self._replay_mode = False + if self._gps_worker: self._gps_worker.stop() self._gps_worker.wait(2000) @@ -1309,6 +1469,113 @@ class RadarDashboard(QMainWindow): self._sb_mode.setText("Idle") logger.info("Radar system stopped") + # ===================================================================== + # Replay helpers + # ===================================================================== + + def _on_mode_changed(self, text: str): + """Show/hide replay transport controls based on mode selection.""" + is_replay = "Replay" in text + for w in self._replay_controls: + w.setVisible(is_replay) + + def _browse_replay_file(self): + """Open file/directory picker for replay source.""" + path, _ = QFileDialog.getOpenFileName( + self, "Select replay file", + "", + "All supported (*.npy *.h5);;NumPy files (*.npy);;HDF5 files (*.h5);;All files (*)", + ) + if path: + self._replay_file_label.setText(path) + return + # If no file selected, try directory (for co-sim) + dir_path = QFileDialog.getExistingDirectory( + self, "Select co-sim replay directory") + if dir_path: + self._replay_file_label.setText(dir_path) + + def _replay_play_pause(self): + """Toggle play/pause on the replay worker.""" + if self._replay_worker is None: + return + if self._replay_worker.is_playing: + self._replay_worker.pause() + self._replay_play_btn.setText("Play") + else: + self._replay_worker.play() + self._replay_play_btn.setText("Pause") + + def _replay_stop(self): + """Stop replay playback (keeps data loaded).""" + if self._replay_worker is not None: + self._replay_worker.pause() + self._replay_worker.seek(0) + self._replay_play_btn.setText("Play") + + def _replay_seek(self, value: int): + """Seek to a specific frame from the slider.""" + if self._replay_worker is not None and not self._replay_worker.is_playing: + self._replay_worker.seek(value) + + def _replay_speed_changed(self, index: int): + """Update replay frame interval from speed combo.""" + speed_map = {0: 50, 1: 100, 2: 200, 3: 500} + ms = speed_map.get(index, 100) + if self._replay_worker is not None: + self._replay_worker.set_frame_interval(ms) + + def _replay_loop_changed(self, state: int): + """Update replay loop setting.""" + if self._replay_worker is not None: + self._replay_worker.set_loop(state == Qt.CheckState.Checked.value) + + @pyqtSlot(str) + def _on_playback_state_changed(self, state: str): + """Update UI when replay playback state changes.""" + if state == "playing": + self._replay_play_btn.setText("Pause") + elif state in ("paused", "stopped"): + self._replay_play_btn.setText("Play") + if state == "stopped" and self._replay_worker is not None: + self._status_label_main.setText("Status: Replay finished") + + @pyqtSlot(int, int) + def _on_frame_index_changed(self, current: int, total: int): + """Update slider and frame label from replay worker.""" + self._replay_slider.blockSignals(True) + self._replay_slider.setValue(current) + self._replay_slider.blockSignals(False) + self._replay_frame_label.setText(f"{current} / {total}") + + def _dispatch_to_software_fpga(self, opcode: int, value: int): + """Route an FPGA opcode+value to the SoftwareFPGA setter.""" + fpga = self._software_fpga + if fpga is None: + return + _opcode_dispatch = { + 0x03: lambda v: fpga.set_detect_threshold(v), + 0x16: lambda v: fpga.set_gain_shift(v), + 0x21: lambda v: fpga.set_cfar_guard(v), + 0x22: lambda v: fpga.set_cfar_train(v), + 0x23: lambda v: fpga.set_cfar_alpha(v), + 0x24: lambda v: fpga.set_cfar_mode(v), + 0x25: lambda v: fpga.set_cfar_enable(bool(v)), + 0x26: lambda v: fpga.set_mti_enable(bool(v)), + 0x27: lambda v: fpga.set_dc_notch_width(v), + 0x28: lambda v: fpga.set_agc_enable(bool(v)), + 0x29: lambda v: fpga.set_agc_params(target=v), + 0x2A: lambda v: fpga.set_agc_params(attack=v), + 0x2B: lambda v: fpga.set_agc_params(decay=v), + 0x2C: lambda v: fpga.set_agc_params(holdoff=v), + } + handler = _opcode_dispatch.get(opcode) + if handler is not None: + handler(value) + logger.info(f"SoftwareFPGA: 0x{opcode:02X} = {value}") + else: + logger.debug(f"SoftwareFPGA: opcode 0x{opcode:02X} not handled (no-op)") + # ===================================================================== # Demo mode # ===================================================================== @@ -1338,7 +1605,7 @@ class RadarDashboard(QMainWindow): self._demo_mode = False if not self._running: mode = "Idle" - elif isinstance(self._connection, ReplayConnection): + elif self._replay_mode: mode = "Replay" else: mode = "Live" diff --git a/9_Firmware/9_3_GUI/v7/hardware.py b/9_Firmware/9_3_GUI/v7/hardware.py index e0231ef..84bbb9a 100644 --- a/9_Firmware/9_3_GUI/v7/hardware.py +++ b/9_Firmware/9_3_GUI/v7/hardware.py @@ -3,14 +3,11 @@ v7.hardware — Hardware interface classes for the PLFM Radar GUI V7. Provides: - FT2232H radar data + command interface via production radar_protocol module - - ReplayConnection for offline .npy replay via production radar_protocol module - STM32USBInterface for GPS data only (USB CDC) The FT2232H interface uses the production protocol layer (radar_protocol.py) which sends 4-byte {opcode, addr, value_hi, value_lo} register commands and -parses 0xAA data / 0xBB status packets from the FPGA. The old magic-packet -and 'SET'...'END' binary settings protocol has been removed — it was -incompatible with the FPGA register interface. +parses 0xAA data / 0xBB status packets from the FPGA. """ import sys @@ -28,7 +25,6 @@ if USB_AVAILABLE: sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) from radar_protocol import ( # noqa: F401 — re-exported for v7 package FT2232HConnection, - ReplayConnection, RadarProtocol, Opcode, RadarAcquisition, diff --git a/9_Firmware/9_3_GUI/v7/models.py b/9_Firmware/9_3_GUI/v7/models.py index a5eb40e..c4b277c 100644 --- a/9_Firmware/9_3_GUI/v7/models.py +++ b/9_Firmware/9_3_GUI/v7/models.py @@ -186,3 +186,59 @@ class TileServer(Enum): GOOGLE_SATELLITE = "google_sat" GOOGLE_HYBRID = "google_hybrid" ESRI_SATELLITE = "esri_sat" + + +# --------------------------------------------------------------------------- +# Waveform configuration (physical parameters for bin→unit conversion) +# --------------------------------------------------------------------------- + +@dataclass +class WaveformConfig: + """Physical waveform parameters for converting bins to SI units. + + Encapsulates the radar waveform so that range/velocity resolution + can be derived automatically instead of hardcoded in RadarSettings. + + Defaults match the ADI CN0566 Phaser capture parameters used in + the golden_reference cosim (4 MSPS, 500 MHz BW, 300 us chirp). + """ + + sample_rate_hz: float = 4e6 # ADC sample rate + bandwidth_hz: float = 500e6 # Chirp bandwidth + chirp_duration_s: float = 300e-6 # Chirp ramp time + center_freq_hz: float = 10.525e9 # Carrier frequency + n_range_bins: int = 64 # After decimation + n_doppler_bins: int = 32 # After Doppler FFT + fft_size: int = 1024 # Pre-decimation FFT length + decimation_factor: int = 16 # 1024 → 64 + + @property + def range_resolution_m(self) -> float: + """Meters per decimated range bin (FMCW deramped baseband). + + For deramped FMCW: bin spacing = c * Fs * T / (2 * N_FFT * BW). + After decimation the bin spacing grows by *decimation_factor*. + """ + c = 299_792_458.0 + raw_bin = ( + c * self.sample_rate_hz * self.chirp_duration_s + / (2.0 * self.fft_size * self.bandwidth_hz) + ) + return raw_bin * self.decimation_factor + + @property + def velocity_resolution_mps(self) -> float: + """m/s per Doppler bin. lambda / (2 * n_doppler * chirp_duration).""" + c = 299_792_458.0 + wavelength = c / self.center_freq_hz + return wavelength / (2.0 * self.n_doppler_bins * self.chirp_duration_s) + + @property + def max_range_m(self) -> float: + """Maximum unambiguous range in meters.""" + return self.range_resolution_m * self.n_range_bins + + @property + def max_velocity_mps(self) -> float: + """Maximum unambiguous velocity (±) in m/s.""" + return self.velocity_resolution_mps * self.n_doppler_bins / 2.0 diff --git a/9_Firmware/9_3_GUI/v7/processing.py b/9_Firmware/9_3_GUI/v7/processing.py index c6ce2cd..c601097 100644 --- a/9_Firmware/9_3_GUI/v7/processing.py +++ b/9_Firmware/9_3_GUI/v7/processing.py @@ -451,3 +451,103 @@ class USBPacketParser: except (ValueError, struct.error) as e: logger.error(f"Error parsing binary GPS: {e}") return None + + +# ============================================================================ +# Utility: polar → geographic coordinate conversion +# ============================================================================ + +def polar_to_geographic( + radar_lat: float, + radar_lon: float, + range_m: float, + azimuth_deg: float, +) -> tuple: + """Convert polar (range, azimuth) relative to radar → (lat, lon). + + azimuth_deg: 0 = North, clockwise. + """ + r_earth = 6_371_000.0 # Earth radius in metres + + lat1 = math.radians(radar_lat) + lon1 = math.radians(radar_lon) + bearing = math.radians(azimuth_deg) + + lat2 = math.asin( + math.sin(lat1) * math.cos(range_m / r_earth) + + math.cos(lat1) * math.sin(range_m / r_earth) * math.cos(bearing) + ) + lon2 = lon1 + math.atan2( + math.sin(bearing) * math.sin(range_m / r_earth) * math.cos(lat1), + math.cos(range_m / r_earth) - math.sin(lat1) * math.sin(lat2), + ) + return (math.degrees(lat2), math.degrees(lon2)) + + +# ============================================================================ +# Shared target extraction (used by both RadarDataWorker and ReplayWorker) +# ============================================================================ + +def extract_targets_from_frame( + frame, + range_resolution: float = 1.0, + velocity_resolution: float = 1.0, + gps: GPSData | None = None, +) -> list[RadarTarget]: + """Extract RadarTarget list from a RadarFrame's detection mask. + + This is the bin-to-physical conversion + geo-mapping shared between + the live and replay data paths. + + Parameters + ---------- + frame : RadarFrame + Frame with populated ``detections``, ``magnitude``, ``range_doppler_i/q``. + range_resolution : float + Meters per range bin. + velocity_resolution : float + m/s per Doppler bin. + gps : GPSData | None + GPS position for geo-mapping (latitude/longitude). + + Returns + ------- + list[RadarTarget] + One target per detection cell. + """ + det_indices = np.argwhere(frame.detections > 0) + n_doppler = frame.detections.shape[1] if frame.detections.ndim == 2 else 32 + doppler_center = n_doppler // 2 + + targets: list[RadarTarget] = [] + for idx in det_indices: + rbin, dbin = int(idx[0]), int(idx[1]) + mag = float(frame.magnitude[rbin, dbin]) + snr = 10.0 * math.log10(max(mag, 1.0)) if mag > 0 else 0.0 + + range_m = float(rbin) * range_resolution + velocity_ms = float(dbin - doppler_center) * velocity_resolution + + lat, lon, azimuth, elevation = 0.0, 0.0, 0.0, 0.0 + if gps is not None: + azimuth = gps.heading + # Spread detections across ±15° sector for single-beam radar + if len(det_indices) > 1: + spread = (dbin - doppler_center) / max(doppler_center, 1) * 15.0 + azimuth = gps.heading + spread + lat, lon = polar_to_geographic( + gps.latitude, gps.longitude, range_m, azimuth, + ) + + targets.append(RadarTarget( + id=len(targets), + range=range_m, + velocity=velocity_ms, + azimuth=azimuth, + elevation=elevation, + latitude=lat, + longitude=lon, + snr=snr, + timestamp=frame.timestamp, + )) + return targets diff --git a/9_Firmware/9_3_GUI/v7/replay.py b/9_Firmware/9_3_GUI/v7/replay.py new file mode 100644 index 0000000..3510d4b --- /dev/null +++ b/9_Firmware/9_3_GUI/v7/replay.py @@ -0,0 +1,288 @@ +""" +v7.replay — ReplayEngine: auto-detect format, load, and iterate RadarFrames. + +Supports three data sources: + 1. **FPGA co-sim directory** — pre-computed ``.npy`` files from golden_reference + 2. **Raw IQ cube** ``.npy`` — complex baseband capture (e.g. ADI Phaser) + 3. **HDF5 recording** ``.h5`` — frames captured by ``DataRecorder`` + +For raw IQ data the engine uses :class:`SoftwareFPGA` to run the full +bit-accurate signal chain, so changing FPGA control registers in the +dashboard re-processes the data. +""" + +from __future__ import annotations + +import logging +import time +from enum import Enum, auto +from pathlib import Path +from typing import TYPE_CHECKING + +import numpy as np + +if TYPE_CHECKING: + from .software_fpga import SoftwareFPGA + +# radar_protocol is a sibling module (not inside v7/) +import sys as _sys + +_GUI_DIR = str(Path(__file__).resolve().parent.parent) +if _GUI_DIR not in _sys.path: + _sys.path.insert(0, _GUI_DIR) +from radar_protocol import RadarFrame # noqa: E402 + +log = logging.getLogger(__name__) + +# Lazy import — h5py is optional +try: + import h5py + + HDF5_AVAILABLE = True +except ImportError: + HDF5_AVAILABLE = False + + +class ReplayFormat(Enum): + """Detected input format.""" + + COSIM_DIR = auto() + RAW_IQ_NPY = auto() + HDF5 = auto() + + +# ─────────────────────────────────────────────────────────────────── +# Format detection +# ─────────────────────────────────────────────────────────────────── + +_COSIM_REQUIRED = {"doppler_map_i.npy", "doppler_map_q.npy"} + + +def detect_format(path: str) -> ReplayFormat: + """Auto-detect the replay data format from *path*. + + Raises + ------ + ValueError + If the format cannot be determined. + """ + p = Path(path) + + if p.is_dir(): + children = {f.name for f in p.iterdir()} + if _COSIM_REQUIRED.issubset(children): + return ReplayFormat.COSIM_DIR + msg = f"Directory {p} does not contain required co-sim files: {_COSIM_REQUIRED - children}" + raise ValueError(msg) + + if p.suffix == ".h5": + return ReplayFormat.HDF5 + + if p.suffix == ".npy": + return ReplayFormat.RAW_IQ_NPY + + msg = f"Cannot determine replay format for: {p}" + raise ValueError(msg) + + +# ─────────────────────────────────────────────────────────────────── +# ReplayEngine +# ─────────────────────────────────────────────────────────────────── + +class ReplayEngine: + """Load replay data and serve RadarFrames on demand. + + Parameters + ---------- + path : str + File or directory path to load. + software_fpga : SoftwareFPGA | None + Required only for ``RAW_IQ_NPY`` format. For other formats the + data is already processed and the FPGA instance is ignored. + """ + + def __init__(self, path: str, software_fpga: SoftwareFPGA | None = None) -> None: + self.path = path + self.fmt = detect_format(path) + self.software_fpga = software_fpga + + # Populated by _load_* + self._total_frames: int = 0 + self._raw_iq: np.ndarray | None = None # for RAW_IQ_NPY + self._h5_file = None + self._h5_keys: list[str] = [] + self._cosim_frame = None # single RadarFrame for co-sim + + self._load() + + # ------------------------------------------------------------------ + # Loading + # ------------------------------------------------------------------ + + def _load(self) -> None: + if self.fmt is ReplayFormat.COSIM_DIR: + self._load_cosim() + elif self.fmt is ReplayFormat.RAW_IQ_NPY: + self._load_raw_iq() + elif self.fmt is ReplayFormat.HDF5: + self._load_hdf5() + + def _load_cosim(self) -> None: + """Load FPGA co-sim directory (already-processed .npy arrays). + + Prefers fullchain (MTI-enabled) files when CFAR outputs are present, + so that I/Q data is consistent with the detection mask. Falls back + to the non-MTI ``doppler_map`` files when fullchain data is absent. + """ + d = Path(self.path) + + # CFAR outputs (from the MTI→Doppler→DC-notch→CFAR chain) + cfar_flags = d / "fullchain_cfar_flags.npy" + cfar_mag = d / "fullchain_cfar_mag.npy" + has_cfar = cfar_flags.exists() and cfar_mag.exists() + + # MTI-consistent I/Q (same chain that produced CFAR outputs) + mti_dop_i = d / "fullchain_mti_doppler_i.npy" + mti_dop_q = d / "fullchain_mti_doppler_q.npy" + has_mti_doppler = mti_dop_i.exists() and mti_dop_q.exists() + + # Choose I/Q: prefer MTI-chain when CFAR data comes from that chain + if has_cfar and has_mti_doppler: + dop_i = np.load(mti_dop_i).astype(np.int16) + dop_q = np.load(mti_dop_q).astype(np.int16) + log.info("Co-sim: using fullchain MTI+Doppler I/Q (matches CFAR chain)") + else: + dop_i = np.load(d / "doppler_map_i.npy").astype(np.int16) + dop_q = np.load(d / "doppler_map_q.npy").astype(np.int16) + log.info("Co-sim: using non-MTI doppler_map I/Q") + + frame = RadarFrame() + frame.range_doppler_i = dop_i + frame.range_doppler_q = dop_q + + if has_cfar: + frame.detections = np.load(cfar_flags).astype(np.uint8) + frame.magnitude = np.load(cfar_mag).astype(np.float64) + else: + frame.magnitude = np.sqrt( + dop_i.astype(np.float64) ** 2 + dop_q.astype(np.float64) ** 2 + ) + frame.detections = np.zeros_like(dop_i, dtype=np.uint8) + + frame.range_profile = frame.magnitude[:, 0] + frame.detection_count = int(frame.detections.sum()) + frame.frame_number = 0 + frame.timestamp = time.time() + + self._cosim_frame = frame + self._total_frames = 1 + log.info("Loaded co-sim directory: %s (1 frame)", self.path) + + def _load_raw_iq(self) -> None: + """Load raw complex IQ cube (.npy).""" + data = np.load(self.path, mmap_mode="r") + if data.ndim == 2: + # (chirps, samples) — single frame + data = data[np.newaxis, ...] + if data.ndim != 3: + msg = f"Expected 3-D array (frames, chirps, samples), got shape {data.shape}" + raise ValueError(msg) + self._raw_iq = data + self._total_frames = data.shape[0] + log.info( + "Loaded raw IQ: %s, shape %s (%d frames)", + self.path, + data.shape, + self._total_frames, + ) + + def _load_hdf5(self) -> None: + """Load HDF5 recording (.h5).""" + if not HDF5_AVAILABLE: + msg = "h5py is required to load HDF5 recordings" + raise ImportError(msg) + self._h5_file = h5py.File(self.path, "r") + frames_grp = self._h5_file.get("frames") + if frames_grp is None: + msg = f"HDF5 file {self.path} has no 'frames' group" + raise ValueError(msg) + self._h5_keys = sorted(frames_grp.keys()) + self._total_frames = len(self._h5_keys) + log.info("Loaded HDF5: %s (%d frames)", self.path, self._total_frames) + + # ------------------------------------------------------------------ + # Public API + # ------------------------------------------------------------------ + + @property + def total_frames(self) -> int: + return self._total_frames + + def get_frame(self, index: int) -> RadarFrame: + """Return the RadarFrame at *index* (0-based). + + For ``RAW_IQ_NPY`` format, this runs the SoftwareFPGA chain + on the requested frame's chirps. + """ + if index < 0 or index >= self._total_frames: + msg = f"Frame index {index} out of range [0, {self._total_frames})" + raise IndexError(msg) + + if self.fmt is ReplayFormat.COSIM_DIR: + return self._get_cosim(index) + if self.fmt is ReplayFormat.RAW_IQ_NPY: + return self._get_raw_iq(index) + return self._get_hdf5(index) + + def close(self) -> None: + """Release any open file handles.""" + if self._h5_file is not None: + self._h5_file.close() + self._h5_file = None + + # ------------------------------------------------------------------ + # Per-format frame getters + # ------------------------------------------------------------------ + + def _get_cosim(self, _index: int) -> RadarFrame: + """Co-sim: single static frame (index ignored). + + Uses deepcopy so numpy arrays are not shared with the source, + preventing in-place mutation from corrupting cached data. + """ + import copy + frame = copy.deepcopy(self._cosim_frame) + frame.timestamp = time.time() + return frame + + def _get_raw_iq(self, index: int) -> RadarFrame: + """Raw IQ: quantize one frame and run through SoftwareFPGA.""" + if self.software_fpga is None: + msg = "SoftwareFPGA is required for raw IQ replay" + raise RuntimeError(msg) + + from .software_fpga import quantize_raw_iq + + raw = self._raw_iq[index] # (chirps, samples) complex + iq_i, iq_q = quantize_raw_iq(raw[np.newaxis, ...]) + return self.software_fpga.process_chirps( + iq_i, iq_q, frame_number=index, timestamp=time.time() + ) + + def _get_hdf5(self, index: int) -> RadarFrame: + """HDF5: reconstruct RadarFrame from stored datasets.""" + key = self._h5_keys[index] + grp = self._h5_file["frames"][key] + + frame = RadarFrame() + frame.timestamp = float(grp.attrs.get("timestamp", time.time())) + frame.frame_number = int(grp.attrs.get("frame_number", index)) + frame.detection_count = int(grp.attrs.get("detection_count", 0)) + + frame.range_doppler_i = np.array(grp["range_doppler_i"], dtype=np.int16) + frame.range_doppler_q = np.array(grp["range_doppler_q"], dtype=np.int16) + frame.magnitude = np.array(grp["magnitude"], dtype=np.float64) + frame.detections = np.array(grp["detections"], dtype=np.uint8) + frame.range_profile = np.array(grp["range_profile"], dtype=np.float64) + + return frame diff --git a/9_Firmware/9_3_GUI/v7/software_fpga.py b/9_Firmware/9_3_GUI/v7/software_fpga.py new file mode 100644 index 0000000..9b0d669 --- /dev/null +++ b/9_Firmware/9_3_GUI/v7/software_fpga.py @@ -0,0 +1,287 @@ +""" +v7.software_fpga — Bit-accurate software replica of the AERIS-10 FPGA signal chain. + +Imports processing functions directly from golden_reference.py (Option A) +to avoid code duplication. Every stage is toggleable via the same host +register interface the real FPGA exposes, so the dashboard spinboxes can +drive either backend transparently. + +Signal chain order (matching RTL): + quantize → range_fft → decimator → MTI → doppler_fft → dc_notch → CFAR → RadarFrame + +Usage: + fpga = SoftwareFPGA() + fpga.set_cfar_enable(True) + frame = fpga.process_chirps(iq_i, iq_q, frame_number=0) +""" + +from __future__ import annotations + +import logging +import os +import sys +from pathlib import Path + +import numpy as np + +# --------------------------------------------------------------------------- +# Import golden_reference by adding the cosim path to sys.path +# --------------------------------------------------------------------------- +_GOLDEN_REF_DIR = str( + Path(__file__).resolve().parents[2] # 9_Firmware/ + / "9_2_FPGA" / "tb" / "cosim" / "real_data" +) +if _GOLDEN_REF_DIR not in sys.path: + sys.path.insert(0, _GOLDEN_REF_DIR) + +from golden_reference import ( # noqa: E402 + run_range_fft, + run_range_bin_decimator, + run_mti_canceller, + run_doppler_fft, + run_dc_notch, + run_cfar_ca, + run_detection, + FFT_SIZE, + DOPPLER_CHIRPS, +) + +# RadarFrame lives in radar_protocol (no circular dep — protocol has no GUI) +sys.path.insert(0, str(Path(__file__).resolve().parents[1])) +from radar_protocol import RadarFrame # noqa: E402 + +log = logging.getLogger(__name__) + +# --------------------------------------------------------------------------- +# Twiddle factor file paths (relative to FPGA root) +# --------------------------------------------------------------------------- +_FPGA_DIR = Path(__file__).resolve().parents[2] / "9_2_FPGA" +TWIDDLE_1024 = str(_FPGA_DIR / "fft_twiddle_1024.mem") +TWIDDLE_16 = str(_FPGA_DIR / "fft_twiddle_16.mem") + +# CFAR mode int→string mapping (FPGA register 0x24: 0=CA, 1=GO, 2=SO) +_CFAR_MODE_MAP = {0: "CA", 1: "GO", 2: "SO", 3: "CA"} + + +class SoftwareFPGA: + """Bit-accurate replica of the AERIS-10 FPGA signal processing chain. + + All registers mirror FPGA reset defaults from ``radar_system_top.v``. + Setters accept the same integer values as the FPGA host commands. + """ + + def __init__(self) -> None: + # --- FPGA register mirror (reset defaults) --- + # Detection + self.detect_threshold: int = 10_000 # 0x03 + self.gain_shift: int = 0 # 0x16 + + # CFAR + self.cfar_enable: bool = False # 0x25 + self.cfar_guard: int = 2 # 0x21 + self.cfar_train: int = 8 # 0x22 + self.cfar_alpha: int = 0x30 # 0x23 Q4.4 + self.cfar_mode: int = 0 # 0x24 0=CA,1=GO,2=SO + + # MTI + self.mti_enable: bool = False # 0x26 + + # DC notch + self.dc_notch_width: int = 0 # 0x27 + + # AGC (tracked but not applied in software chain — AGC operates + # on the analog front-end gain, which doesn't exist in replay) + self.agc_enable: bool = False # 0x28 + self.agc_target: int = 200 # 0x29 + self.agc_attack: int = 1 # 0x2A + self.agc_decay: int = 1 # 0x2B + self.agc_holdoff: int = 4 # 0x2C + + # ------------------------------------------------------------------ + # Register setters (same interface as UART commands to real FPGA) + # ------------------------------------------------------------------ + def set_detect_threshold(self, val: int) -> None: + self.detect_threshold = int(val) & 0xFFFF + + def set_gain_shift(self, val: int) -> None: + self.gain_shift = int(val) & 0x0F + + def set_cfar_enable(self, val: bool) -> None: + self.cfar_enable = bool(val) + + def set_cfar_guard(self, val: int) -> None: + self.cfar_guard = int(val) & 0x0F + + def set_cfar_train(self, val: int) -> None: + self.cfar_train = max(1, int(val) & 0x1F) + + def set_cfar_alpha(self, val: int) -> None: + self.cfar_alpha = int(val) & 0xFF + + def set_cfar_mode(self, val: int) -> None: + self.cfar_mode = int(val) & 0x03 + + def set_mti_enable(self, val: bool) -> None: + self.mti_enable = bool(val) + + def set_dc_notch_width(self, val: int) -> None: + self.dc_notch_width = int(val) & 0x07 + + def set_agc_enable(self, val: bool) -> None: + self.agc_enable = bool(val) + + def set_agc_params( + self, + target: int | None = None, + attack: int | None = None, + decay: int | None = None, + holdoff: int | None = None, + ) -> None: + if target is not None: + self.agc_target = int(target) & 0xFF + if attack is not None: + self.agc_attack = int(attack) & 0x0F + if decay is not None: + self.agc_decay = int(decay) & 0x0F + if holdoff is not None: + self.agc_holdoff = int(holdoff) & 0x0F + + # ------------------------------------------------------------------ + # Core processing: raw IQ chirps → RadarFrame + # ------------------------------------------------------------------ + def process_chirps( + self, + iq_i: np.ndarray, + iq_q: np.ndarray, + frame_number: int = 0, + timestamp: float = 0.0, + ) -> RadarFrame: + """Run the full FPGA signal chain on pre-quantized 16-bit I/Q chirps. + + Parameters + ---------- + iq_i, iq_q : ndarray, shape (n_chirps, n_samples), int16/int64 + Post-DDC I/Q samples. For ADI phaser data, use + ``quantize_raw_iq()`` first. + frame_number : int + Frame counter for the output RadarFrame. + timestamp : float + Timestamp for the output RadarFrame. + + Returns + ------- + RadarFrame + Populated frame identical to what the real FPGA would produce. + """ + n_chirps = iq_i.shape[0] + n_samples = iq_i.shape[1] + + # --- Stage 1: Range FFT (per chirp) --- + range_i = np.zeros((n_chirps, n_samples), dtype=np.int64) + range_q = np.zeros((n_chirps, n_samples), dtype=np.int64) + twiddle_1024 = TWIDDLE_1024 if os.path.exists(TWIDDLE_1024) else None + for c in range(n_chirps): + range_i[c], range_q[c] = run_range_fft( + iq_i[c].astype(np.int64), + iq_q[c].astype(np.int64), + twiddle_file=twiddle_1024, + ) + + # --- Stage 2: Range bin decimation (1024 → 64) --- + decim_i, decim_q = run_range_bin_decimator(range_i, range_q) + + # --- Stage 3: MTI canceller (pre-Doppler, per-chirp) --- + mti_i, mti_q = run_mti_canceller(decim_i, decim_q, enable=self.mti_enable) + + # --- Stage 4: Doppler FFT (dual 16-pt Hamming) --- + twiddle_16 = TWIDDLE_16 if os.path.exists(TWIDDLE_16) else None + doppler_i, doppler_q = run_doppler_fft(mti_i, mti_q, twiddle_file_16=twiddle_16) + + # --- Stage 5: DC notch (bin zeroing) --- + notch_i, notch_q = run_dc_notch(doppler_i, doppler_q, width=self.dc_notch_width) + + # --- Stage 6: Detection --- + if self.cfar_enable: + mode_str = _CFAR_MODE_MAP.get(self.cfar_mode, "CA") + detect_flags, magnitudes, _thresholds = run_cfar_ca( + notch_i, + notch_q, + guard=self.cfar_guard, + train=self.cfar_train, + alpha_q44=self.cfar_alpha, + mode=mode_str, + ) + det_mask = detect_flags.astype(np.uint8) + mag = magnitudes.astype(np.float64) + else: + mag_raw, det_indices = run_detection( + notch_i, notch_q, threshold=self.detect_threshold + ) + mag = mag_raw.astype(np.float64) + det_mask = np.zeros_like(mag, dtype=np.uint8) + for idx in det_indices: + det_mask[idx[0], idx[1]] = 1 + + # --- Assemble RadarFrame --- + frame = RadarFrame() + frame.timestamp = timestamp + frame.frame_number = frame_number + frame.range_doppler_i = np.clip(notch_i, -32768, 32767).astype(np.int16) + frame.range_doppler_q = np.clip(notch_q, -32768, 32767).astype(np.int16) + frame.magnitude = mag + frame.detections = det_mask + frame.range_profile = np.sqrt( + notch_i[:, 0].astype(np.float64) ** 2 + + notch_q[:, 0].astype(np.float64) ** 2 + ) + frame.detection_count = int(det_mask.sum()) + return frame + + +# --------------------------------------------------------------------------- +# Utility: quantize arbitrary complex IQ to 16-bit post-DDC format +# --------------------------------------------------------------------------- +def quantize_raw_iq( + raw_complex: np.ndarray, + n_chirps: int = DOPPLER_CHIRPS, + n_samples: int = FFT_SIZE, + peak_target: int = 200, +) -> tuple[np.ndarray, np.ndarray]: + """Quantize complex IQ data to 16-bit signed, matching DDC output level. + + Parameters + ---------- + raw_complex : ndarray, shape (chirps, samples) or (frames, chirps, samples) + Complex64/128 baseband IQ from SDR capture. If 3-D, the first + axis is treated as frame index and only the first frame is used. + n_chirps : int + Number of chirps to keep (default 32, matching FPGA). + n_samples : int + Number of samples per chirp to keep (default 1024, matching FFT). + peak_target : int + Target peak magnitude after scaling (default 200, matching + golden_reference INPUT_PEAK_TARGET). + + Returns + ------- + iq_i, iq_q : ndarray, each (n_chirps, n_samples) int64 + """ + if raw_complex.ndim == 3: + # (frames, chirps, samples) — take first frame + raw_complex = raw_complex[0] + + # Truncate to FPGA dimensions + block = raw_complex[:n_chirps, :n_samples] + + max_abs = np.max(np.abs(block)) + if max_abs == 0: + return ( + np.zeros((n_chirps, n_samples), dtype=np.int64), + np.zeros((n_chirps, n_samples), dtype=np.int64), + ) + + scale = peak_target / max_abs + scaled = block * scale + iq_i = np.clip(np.round(np.real(scaled)).astype(np.int64), -32768, 32767) + iq_q = np.clip(np.round(np.imag(scaled)).astype(np.int64), -32768, 32767) + return iq_i, iq_q diff --git a/9_Firmware/9_3_GUI/v7/workers.py b/9_Firmware/9_3_GUI/v7/workers.py index c467c98..c29f3bd 100644 --- a/9_Firmware/9_3_GUI/v7/workers.py +++ b/9_Firmware/9_3_GUI/v7/workers.py @@ -13,7 +13,6 @@ All packet parsing now uses the production radar_protocol.py which matches the actual FPGA packet format (0xAA data 11-byte, 0xBB status 26-byte). """ -import math import time import random import queue @@ -36,58 +35,25 @@ from .processing import ( RadarProcessor, USBPacketParser, apply_pitch_correction, + polar_to_geographic, ) logger = logging.getLogger(__name__) -# ============================================================================= -# Utility: polar → geographic -# ============================================================================= - -def polar_to_geographic( - radar_lat: float, - radar_lon: float, - range_m: float, - azimuth_deg: float, -) -> tuple: - """ - Convert polar coordinates (range, azimuth) relative to radar - to geographic (latitude, longitude). - - azimuth_deg: 0 = North, clockwise. - Returns (lat, lon). - """ - R = 6_371_000 # Earth radius in meters - - lat1 = math.radians(radar_lat) - lon1 = math.radians(radar_lon) - bearing = math.radians(azimuth_deg) - - lat2 = math.asin( - math.sin(lat1) * math.cos(range_m / R) - + math.cos(lat1) * math.sin(range_m / R) * math.cos(bearing) - ) - lon2 = lon1 + math.atan2( - math.sin(bearing) * math.sin(range_m / R) * math.cos(lat1), - math.cos(range_m / R) - math.sin(lat1) * math.sin(lat2), - ) - return (math.degrees(lat2), math.degrees(lon2)) - - # ============================================================================= # Radar Data Worker (QThread) — production protocol # ============================================================================= class RadarDataWorker(QThread): """ - Background worker that reads radar data from FT2232H (or ReplayConnection), - parses 0xAA/0xBB packets via production RadarAcquisition, runs optional - host-side DSP, and emits PyQt signals with results. + Background worker that reads radar data from FT2232H, parses 0xAA/0xBB + packets via production RadarAcquisition, runs optional host-side DSP, + and emits PyQt signals with results. - This replaces the old V7 worker which used an incompatible packet format. - Now uses production radar_protocol.py for all packet parsing and frame + Uses production radar_protocol.py for all packet parsing and frame assembly (11-byte 0xAA data packets → 64x32 RadarFrame). + For replay, use ReplayWorker instead. Signals: frameReady(RadarFrame) — a complete 64x32 radar frame @@ -105,7 +71,7 @@ class RadarDataWorker(QThread): def __init__( self, - connection, # FT2232HConnection or ReplayConnection + connection, # FT2232HConnection processor: RadarProcessor | None = None, recorder: DataRecorder | None = None, gps_data_ref: GPSData | None = None, @@ -436,3 +402,172 @@ class TargetSimulator(QObject): self._targets = updated self.targetsUpdated.emit(updated) + + +# ============================================================================= +# Replay Worker (QThread) — unified replay playback +# ============================================================================= + +class ReplayWorker(QThread): + """Background worker for replay data playback. + + Emits the same signals as ``RadarDataWorker`` so the dashboard + treats live and replay identically. Additionally emits playback + state and frame-index signals for the transport controls. + + Signals + ------- + frameReady(object) RadarFrame + targetsUpdated(list) list[RadarTarget] + statsUpdated(dict) processing stats + errorOccurred(str) error message + playbackStateChanged(str) "playing" | "paused" | "stopped" + frameIndexChanged(int, int) (current_index, total_frames) + """ + + frameReady = pyqtSignal(object) + targetsUpdated = pyqtSignal(list) + statsUpdated = pyqtSignal(dict) + errorOccurred = pyqtSignal(str) + playbackStateChanged = pyqtSignal(str) + frameIndexChanged = pyqtSignal(int, int) + + def __init__( + self, + replay_engine, + settings: RadarSettings | None = None, + gps: GPSData | None = None, + frame_interval_ms: int = 100, + parent: QObject | None = None, + ) -> None: + super().__init__(parent) + import threading + + from .processing import extract_targets_from_frame + from .models import WaveformConfig + + self._engine = replay_engine + self._settings = settings or RadarSettings() + self._gps = gps + self._waveform = WaveformConfig() + self._frame_interval_ms = frame_interval_ms + self._extract_targets = extract_targets_from_frame + + self._current_index = 0 + self._last_emitted_index = 0 + self._playing = False + self._stop_flag = False + self._loop = False + self._lock = threading.Lock() # guards _current_index and _emit_frame + + # -- Public control API -- + + @property + def current_index(self) -> int: + """Index of the last frame emitted (for re-seek on param change).""" + return self._last_emitted_index + + @property + def total_frames(self) -> int: + return self._engine.total_frames + + def set_gps(self, gps: GPSData | None) -> None: + self._gps = gps + + def set_waveform(self, wf) -> None: + self._waveform = wf + + def set_loop(self, loop: bool) -> None: + self._loop = loop + + def set_frame_interval(self, ms: int) -> None: + self._frame_interval_ms = max(10, ms) + + def play(self) -> None: + self._playing = True + # If at EOF, rewind so play actually does something + with self._lock: + if self._current_index >= self._engine.total_frames: + self._current_index = 0 + self.playbackStateChanged.emit("playing") + + def pause(self) -> None: + self._playing = False + self.playbackStateChanged.emit("paused") + + def stop(self) -> None: + self._playing = False + self._stop_flag = True + self.playbackStateChanged.emit("stopped") + + @property + def is_playing(self) -> bool: + """Thread-safe read of playback state (for GUI queries).""" + return self._playing + + def seek(self, index: int) -> None: + """Jump to a specific frame and emit it (thread-safe).""" + with self._lock: + idx = max(0, min(index, self._engine.total_frames - 1)) + self._current_index = idx + self._emit_frame(idx) + self._last_emitted_index = idx + + # -- Thread entry -- + + def run(self) -> None: + self._stop_flag = False + self._playing = True + self.playbackStateChanged.emit("playing") + + try: + while not self._stop_flag: + if self._playing: + with self._lock: + if self._current_index < self._engine.total_frames: + self._emit_frame(self._current_index) + self._last_emitted_index = self._current_index + self._current_index += 1 + + # Loop or pause at end + if self._current_index >= self._engine.total_frames: + if self._loop: + self._current_index = 0 + else: + # Pause — keep thread alive for restart + self._playing = False + self.playbackStateChanged.emit("stopped") + + self.msleep(self._frame_interval_ms) + except (OSError, ValueError, RuntimeError, IndexError) as exc: + self.errorOccurred.emit(str(exc)) + + self.playbackStateChanged.emit("stopped") + + # -- Internal -- + + def _emit_frame(self, index: int) -> None: + try: + frame = self._engine.get_frame(index) + except (OSError, ValueError, RuntimeError, IndexError) as exc: + self.errorOccurred.emit(f"Frame {index}: {exc}") + return + + self.frameReady.emit(frame) + self.frameIndexChanged.emit(index, self._engine.total_frames) + + # Target extraction + targets = self._extract_targets( + frame, + range_resolution=self._waveform.range_resolution_m, + velocity_resolution=self._waveform.velocity_resolution_mps, + gps=self._gps, + ) + self.targetsUpdated.emit(targets) + self.statsUpdated.emit({ + "frame_number": frame.frame_number, + "detection_count": frame.detection_count, + "target_count": len(targets), + "replay_index": index, + "replay_total": self._engine.total_frames, + })