From eb907de3d1d2051e0ed63bb558c1c8703adf7b31 Mon Sep 17 00:00:00 2001 From: Jason <83615043+JJassonn69@users.noreply.github.com> Date: Fri, 20 Mar 2026 19:36:21 +0200 Subject: [PATCH] Fix 5 GUI bugs: threaded connect, button toggle, live CFAR/MTI/DC replay, stable heatmap, physical axis labels - Bug 1: Move conn.open() to background thread to prevent GUI hang - Bug 2: Save btn_connect as instance var, toggle Connect/Disconnect text - Bug 3: Split opcodes into hardware-only (silent) and replay-adjustable (CFAR/MTI/DC-notch params trigger bit-accurate pipeline re-processing) - Bug 4: EMA-smoothed vmax (alpha=0.15), fftshift on Doppler axis - Bug 5: Physical axis labels (range in meters, velocity in m/s) - Add _replay_mti(), _replay_dc_notch(), _replay_cfar() standalone functions - Expand TestReplayConnection from 6 to 11 tests (42/42 pass) --- 9_Firmware/9_3_GUI/radar_dashboard.py | 108 +++++- 9_Firmware/9_3_GUI/radar_protocol.py | 364 ++++++++++++++++++--- 9_Firmware/9_3_GUI/test_radar_dashboard.py | 102 +++++- 3 files changed, 499 insertions(+), 75 deletions(-) diff --git a/9_Firmware/9_3_GUI/radar_dashboard.py b/9_Firmware/9_3_GUI/radar_dashboard.py index 6abd476..2973316 100644 --- a/9_Firmware/9_3_GUI/radar_dashboard.py +++ b/9_Firmware/9_3_GUI/radar_dashboard.py @@ -27,6 +27,7 @@ import time import queue import logging import argparse +import threading from typing import Optional, Dict from collections import deque @@ -77,6 +78,16 @@ class RadarDashboard: UPDATE_INTERVAL_MS = 100 # 10 Hz display refresh + # Radar parameters for physical axis labels (ADI CN0566 defaults) + # Config: [sample_rate=4e6, IF=1e5, RF=9.9e9, chirps=256, BW=500e6, + # ramp_time=300e-6, ...] + SAMPLE_RATE = 4e6 # Hz — ADC sample rate (baseband) + BANDWIDTH = 500e6 # Hz — chirp bandwidth + RAMP_TIME = 300e-6 # s — chirp ramp time + CENTER_FREQ = 10.5e9 # Hz — X-band center frequency + NUM_CHIRPS_FRAME = 32 # chirps per Doppler frame + C = 3e8 # m/s — speed of light + def __init__(self, root: tk.Tk, connection: FT601Connection, recorder: DataRecorder): self.root = root @@ -101,6 +112,10 @@ class RadarDashboard: self._fps_ts = time.time() self._fps = 0.0 + # Stable colorscale — exponential moving average of vmax + self._vmax_ema = 1000.0 + self._vmax_alpha = 0.15 # smoothing factor (lower = more stable) + self._build_ui() self._schedule_update() @@ -138,9 +153,10 @@ class RadarDashboard: self.lbl_frame = ttk.Label(top, text="Frame: 0", font=("Menlo", 10)) self.lbl_frame.pack(side="left", padx=16) - btn_connect = ttk.Button(top, text="Connect", command=self._on_connect, - style="Accent.TButton") - btn_connect.pack(side="right", padx=4) + self.btn_connect = ttk.Button(top, text="Connect", + command=self._on_connect, + style="Accent.TButton") + self.btn_connect.pack(side="right", padx=4) self.btn_record = ttk.Button(top, text="Record", command=self._on_record) self.btn_record.pack(side="right", padx=4) @@ -161,9 +177,30 @@ class RadarDashboard: self._build_log_tab(tab_log) def _build_display_tab(self, parent): + # Compute physical axis limits + # Range resolution: dR = c / (2 * BW) per range bin + # But we decimate 1024→64 bins, so each bin spans 16 FFT bins. + # Range per FFT bin = c / (2 * BW) * (Fs / FFT_SIZE) — simplified: + # max_range = c * Fs / (4 * BW) for Fs-sampled baseband + # range_per_bin = max_range / NUM_RANGE_BINS + range_res = self.C / (2.0 * self.BANDWIDTH) # ~0.3 m per FFT bin + # After decimation 1024→64, each range bin = 16 FFT bins + range_per_bin = range_res * 16 + max_range = range_per_bin * NUM_RANGE_BINS + + # Velocity resolution: dv = lambda / (2 * N_chirps * T_chirp) + wavelength = self.C / self.CENTER_FREQ + # Max unambiguous velocity = lambda / (4 * T_chirp) + max_vel = wavelength / (4.0 * self.RAMP_TIME) + vel_per_bin = 2.0 * max_vel / NUM_DOPPLER_BINS + # Doppler axis: bin 0 = 0 Hz (DC), wraps at Nyquist + # For display: center DC, so shift axis to [-max_vel, +max_vel) + vel_lo = -max_vel + vel_hi = max_vel + # Matplotlib figure with 3 subplots self.fig = Figure(figsize=(14, 7), facecolor=BG) - self.fig.subplots_adjust(left=0.06, right=0.98, top=0.94, bottom=0.08, + self.fig.subplots_adjust(left=0.07, right=0.98, top=0.94, bottom=0.10, wspace=0.30, hspace=0.35) # Range-Doppler heatmap @@ -172,14 +209,21 @@ class RadarDashboard: self._rd_img = self.ax_rd.imshow( np.zeros((NUM_RANGE_BINS, NUM_DOPPLER_BINS)), aspect="auto", cmap="inferno", origin="lower", - extent=[0, NUM_DOPPLER_BINS, 0, NUM_RANGE_BINS], + extent=[vel_lo, vel_hi, 0, max_range], vmin=0, vmax=1000, ) self.ax_rd.set_title("Range-Doppler Map", color=FG, fontsize=12) - self.ax_rd.set_xlabel("Doppler Bin", color=FG) - self.ax_rd.set_ylabel("Range Bin", color=FG) + self.ax_rd.set_xlabel("Velocity (m/s)", color=FG) + self.ax_rd.set_ylabel("Range (m)", color=FG) self.ax_rd.tick_params(colors=FG) + # Save axis limits for coordinate conversions + self._vel_lo = vel_lo + self._vel_hi = vel_hi + self._max_range = max_range + self._range_per_bin = range_per_bin + self._vel_per_bin = vel_per_bin + # CFAR detection overlay (scatter) self._det_scatter = self.ax_rd.scatter([], [], s=30, c=GREEN, marker="x", linewidths=1.5, @@ -191,11 +235,11 @@ class RadarDashboard: wf_init = np.zeros((WATERFALL_DEPTH, NUM_RANGE_BINS)) self._wf_img = self.ax_wf.imshow( wf_init, aspect="auto", cmap="viridis", origin="lower", - extent=[0, NUM_RANGE_BINS, 0, WATERFALL_DEPTH], + extent=[0, max_range, 0, WATERFALL_DEPTH], vmin=0, vmax=5000, ) self.ax_wf.set_title("Range Waterfall", color=FG, fontsize=12) - self.ax_wf.set_xlabel("Range Bin", color=FG) + self.ax_wf.set_xlabel("Range (m)", color=FG) self.ax_wf.set_ylabel("Frame", color=FG) self.ax_wf.tick_params(colors=FG) @@ -300,17 +344,35 @@ class RadarDashboard: self._acq_thread = None self.conn.close() self.lbl_status.config(text="DISCONNECTED", foreground=RED) + self.btn_connect.config(text="Connect") log.info("Disconnected") return - if self.conn.open(): + # Open connection in a background thread to avoid blocking the GUI + self.lbl_status.config(text="CONNECTING...", foreground=YELLOW) + self.btn_connect.config(state="disabled") + self.root.update_idletasks() + + def _do_connect(): + ok = self.conn.open() + # Schedule UI update back on the main thread + self.root.after(0, lambda: self._on_connect_done(ok)) + + threading.Thread(target=_do_connect, daemon=True).start() + + def _on_connect_done(self, success: bool): + """Called on main thread after connection attempt completes.""" + self.btn_connect.config(state="normal") + if success: self.lbl_status.config(text="CONNECTED", foreground=GREEN) + self.btn_connect.config(text="Disconnect") self._acq_thread = RadarAcquisition( self.conn, self.frame_queue, self.recorder) self._acq_thread.start() log.info("Connected and acquisition started") else: self.lbl_status.config(text="CONNECT FAILED", foreground=RED) + self.btn_connect.config(text="Connect") def _on_record(self): if self.recorder.recording: @@ -375,16 +437,26 @@ class RadarDashboard: self.lbl_frame.config(text=f"Frame: {frame.frame_number}") # Update range-Doppler heatmap - mag = frame.magnitude - vmax = max(np.max(mag), 1.0) - self._rd_img.set_data(mag) - self._rd_img.set_clim(vmin=0, vmax=vmax) + # FFT-shift Doppler axis so DC (bin 0) is in the center + mag = np.fft.fftshift(frame.magnitude, axes=1) + det_shifted = np.fft.fftshift(frame.detections, axes=1) - # Update CFAR overlay - det_coords = np.argwhere(frame.detections > 0) + # Stable colorscale via EMA smoothing of vmax + frame_vmax = float(np.max(mag)) if np.max(mag) > 0 else 1.0 + self._vmax_ema = (self._vmax_alpha * frame_vmax + + (1.0 - self._vmax_alpha) * self._vmax_ema) + stable_vmax = max(self._vmax_ema, 1.0) + + self._rd_img.set_data(mag) + self._rd_img.set_clim(vmin=0, vmax=stable_vmax) + + # Update CFAR overlay — convert bin indices to physical coordinates + det_coords = np.argwhere(det_shifted > 0) if len(det_coords) > 0: - offsets = np.column_stack([det_coords[:, 1] + 0.5, - det_coords[:, 0] + 0.5]) + # det_coords[:, 0] = range bin, det_coords[:, 1] = Doppler bin + range_m = (det_coords[:, 0] + 0.5) * self._range_per_bin + vel_ms = self._vel_lo + (det_coords[:, 1] + 0.5) * self._vel_per_bin + offsets = np.column_stack([vel_ms, range_m]) self._det_scatter.set_offsets(offsets) else: self._det_scatter.set_offsets(np.empty((0, 2))) diff --git a/9_Firmware/9_3_GUI/radar_protocol.py b/9_Firmware/9_3_GUI/radar_protocol.py index cc4370b..7249aaf 100644 --- a/9_Firmware/9_3_GUI/radar_protocol.py +++ b/9_Firmware/9_3_GUI/radar_protocol.py @@ -409,43 +409,201 @@ class FT601Connection: # Replay Connection — feed real .npy data through the dashboard # ============================================================================ +# Hardware-only opcodes that cannot be adjusted in replay mode +_HARDWARE_ONLY_OPCODES = { + 0x01, # TRIGGER + 0x02, # PRF_DIV + 0x03, # NUM_CHIRPS + 0x04, # CHIRP_TIMER + 0x05, # STREAM_ENABLE + 0x06, # GAIN_SHIFT + 0x10, # THRESHOLD / LONG_CHIRP + 0x11, # LONG_LISTEN + 0x12, # GUARD + 0x13, # SHORT_CHIRP + 0x14, # SHORT_LISTEN + 0x15, # CHIRPS_PER_ELEV + 0x16, # DIGITAL_GAIN + 0x20, # RANGE_MODE + 0xFF, # STATUS_REQUEST +} + +# Replay-adjustable opcodes (re-run signal processing) +_REPLAY_ADJUSTABLE_OPCODES = { + 0x21, # CFAR_GUARD + 0x22, # CFAR_TRAIN + 0x23, # CFAR_ALPHA + 0x24, # CFAR_MODE + 0x25, # CFAR_ENABLE + 0x26, # MTI_ENABLE + 0x27, # DC_NOTCH_WIDTH +} + + +def _saturate(val: int, bits: int) -> int: + """Saturate signed value to fit in 'bits' width.""" + max_pos = (1 << (bits - 1)) - 1 + max_neg = -(1 << (bits - 1)) + return max(max_neg, min(max_pos, int(val))) + + +def _replay_mti(decim_i: np.ndarray, decim_q: np.ndarray, + enable: bool) -> Tuple[np.ndarray, np.ndarray]: + """Bit-accurate 2-pulse MTI canceller (matches mti_canceller.v).""" + n_chirps, n_bins = decim_i.shape + mti_i = np.zeros_like(decim_i) + mti_q = np.zeros_like(decim_q) + if not enable: + return decim_i.copy(), decim_q.copy() + for c in range(n_chirps): + if c == 0: + pass # muted + else: + for r in range(n_bins): + mti_i[c, r] = _saturate(int(decim_i[c, r]) - int(decim_i[c - 1, r]), 16) + mti_q[c, r] = _saturate(int(decim_q[c, r]) - int(decim_q[c - 1, r]), 16) + return mti_i, mti_q + + +def _replay_dc_notch(doppler_i: np.ndarray, doppler_q: np.ndarray, + width: int) -> Tuple[np.ndarray, np.ndarray]: + """Bit-accurate DC notch filter (matches radar_system_top.v inline).""" + out_i = doppler_i.copy() + out_q = doppler_q.copy() + if width == 0: + return out_i, out_q + n_doppler = doppler_i.shape[1] + for dbin in range(n_doppler): + if dbin < width or dbin > (n_doppler - 1 - width + 1): + out_i[:, dbin] = 0 + out_q[:, dbin] = 0 + return out_i, out_q + + +def _replay_cfar(doppler_i: np.ndarray, doppler_q: np.ndarray, + guard: int, train: int, alpha_q44: int, + mode: int) -> Tuple[np.ndarray, np.ndarray]: + """ + Bit-accurate CA-CFAR detector (matches cfar_ca.v). + Returns (detect_flags, magnitudes) both (64, 32). + """ + ALPHA_FRAC_BITS = 4 + n_range, n_doppler = doppler_i.shape + if train == 0: + train = 1 + + # Compute magnitudes: |I| + |Q| (17-bit unsigned L1 norm) + magnitudes = np.zeros((n_range, n_doppler), dtype=np.int64) + for r in range(n_range): + for d in range(n_doppler): + i_val = int(doppler_i[r, d]) + q_val = int(doppler_q[r, d]) + abs_i = (-i_val) & 0xFFFF if i_val < 0 else i_val & 0xFFFF + abs_q = (-q_val) & 0xFFFF if q_val < 0 else q_val & 0xFFFF + magnitudes[r, d] = abs_i + abs_q + + detect_flags = np.zeros((n_range, n_doppler), dtype=np.bool_) + MAX_MAG = (1 << 17) - 1 + + mode_names = {0: 'CA', 1: 'GO', 2: 'SO'} + mode_str = mode_names.get(mode, 'CA') + + for dbin in range(n_doppler): + col = magnitudes[:, dbin] + for cut in range(n_range): + lead_sum, lead_cnt = 0, 0 + for t in range(1, train + 1): + idx = cut - guard - t + if 0 <= idx < n_range: + lead_sum += int(col[idx]) + lead_cnt += 1 + lag_sum, lag_cnt = 0, 0 + for t in range(1, train + 1): + idx = cut + guard + t + if 0 <= idx < n_range: + lag_sum += int(col[idx]) + lag_cnt += 1 + + if mode_str == 'CA': + noise = lead_sum + lag_sum + elif mode_str == 'GO': + if lead_cnt > 0 and lag_cnt > 0: + noise = lead_sum if lead_sum * lag_cnt > lag_sum * lead_cnt else lag_sum + else: + noise = lead_sum if lead_cnt > 0 else lag_sum + elif mode_str == 'SO': + if lead_cnt > 0 and lag_cnt > 0: + noise = lead_sum if lead_sum * lag_cnt < lag_sum * lead_cnt else lag_sum + else: + noise = lead_sum if lead_cnt > 0 else lag_sum + else: + noise = lead_sum + lag_sum + + thr = min((alpha_q44 * noise) >> ALPHA_FRAC_BITS, MAX_MAG) + if int(col[cut]) > thr: + detect_flags[cut, dbin] = True + + return detect_flags, magnitudes + + class ReplayConnection: """ Loads pre-computed .npy arrays (from golden_reference.py co-sim output) and serves them as USB data packets to the dashboard, exercising the full parsing pipeline with real ADI CN0566 radar data. - Supports multiple pipeline views (no-MTI, with-MTI) and loops the single - frame continuously so the waterfall/heatmap stay populated. + Signal processing parameters (CFAR guard/train/alpha/mode, MTI enable, + DC notch width) can be adjusted at runtime via write() — the connection + re-runs the bit-accurate processing pipeline and rebuilds packets. Required npy directory layout (e.g. tb/cosim/real_data/hex/): - doppler_map_i.npy (64, 32) int — Doppler I (no MTI) - doppler_map_q.npy (64, 32) int — Doppler Q (no MTI) - fullchain_mti_doppler_i.npy(64, 32) int — Doppler I (with MTI) - fullchain_mti_doppler_q.npy(64, 32) int — Doppler Q (with MTI) - fullchain_cfar_flags.npy (64, 32) bool — CFAR detections - fullchain_cfar_mag.npy (64, 32) int — CFAR |I|+|Q| magnitude + decimated_range_i.npy (32, 64) int — pre-Doppler range I + decimated_range_q.npy (32, 64) int — pre-Doppler range Q + doppler_map_i.npy (64, 32) int — Doppler I (no MTI) + doppler_map_q.npy (64, 32) int — Doppler Q (no MTI) + fullchain_mti_doppler_i.npy (64, 32) int — Doppler I (with MTI) + fullchain_mti_doppler_q.npy (64, 32) int — Doppler Q (with MTI) + fullchain_cfar_flags.npy (64, 32) bool — CFAR detections + fullchain_cfar_mag.npy (64, 32) int — CFAR |I|+|Q| magnitude """ def __init__(self, npy_dir: str, use_mti: bool = True, replay_fps: float = 5.0): self._npy_dir = npy_dir self._use_mti = use_mti - self._replay_interval = 1.0 / max(replay_fps, 0.1) + self._replay_fps = max(replay_fps, 0.1) self._lock = threading.Lock() self.is_open = False self._packets: bytes = b"" self._read_offset = 0 self._frame_len = 0 + # Current signal-processing parameters + self._mti_enable: bool = use_mti + self._dc_notch_width: int = 2 + self._cfar_guard: int = 2 + self._cfar_train: int = 8 + self._cfar_alpha: int = 0x30 + self._cfar_mode: int = 0 # 0=CA, 1=GO, 2=SO + self._cfar_enable: bool = True + # Raw source arrays (loaded once, reprocessed on param change) + self._dop_mti_i: Optional[np.ndarray] = None + self._dop_mti_q: Optional[np.ndarray] = None + self._dop_nomti_i: Optional[np.ndarray] = None + self._dop_nomti_q: Optional[np.ndarray] = None + self._range_i_vec: Optional[np.ndarray] = None + self._range_q_vec: Optional[np.ndarray] = None + # Rebuild flag + self._needs_rebuild = False def open(self, device_index: int = 0) -> bool: try: + self._load_arrays() self._packets = self._build_packets() self._frame_len = len(self._packets) self._read_offset = 0 self.is_open = True log.info(f"Replay connection opened: {self._npy_dir} " - f"(MTI={'ON' if self._use_mti else 'OFF'}, " + f"(MTI={'ON' if self._mti_enable else 'OFF'}, " f"{self._frame_len} bytes/frame)") return True except Exception as e: @@ -458,8 +616,15 @@ class ReplayConnection: def read(self, size: int = 4096) -> Optional[bytes]: if not self.is_open: return None - time.sleep(self._replay_interval / (NUM_CELLS / 32)) + # Pace reads to target FPS (spread across ~64 reads per frame) + time.sleep((1.0 / self._replay_fps) / (NUM_CELLS / 32)) with self._lock: + # If params changed, rebuild packets + if self._needs_rebuild: + self._packets = self._build_packets() + self._frame_len = len(self._packets) + self._read_offset = 0 + self._needs_rebuild = False end = self._read_offset + size if end <= self._frame_len: chunk = self._packets[self._read_offset:end] @@ -470,65 +635,158 @@ class ReplayConnection: return chunk def write(self, data: bytes) -> bool: - log.info(f"Replay write (ignored): {data.hex()}") + """ + Handle host commands in replay mode. + Signal-processing params (CFAR, MTI, DC notch) trigger re-processing. + Hardware-only params are silently ignored. + """ + if len(data) < 4: + return True + word = struct.unpack(">I", data[:4])[0] + opcode = (word >> 24) & 0xFF + value = word & 0xFFFF + + if opcode in _REPLAY_ADJUSTABLE_OPCODES: + changed = False + with self._lock: + if opcode == 0x21: # CFAR_GUARD + if self._cfar_guard != value: + self._cfar_guard = value + changed = True + elif opcode == 0x22: # CFAR_TRAIN + if self._cfar_train != value: + self._cfar_train = value + changed = True + elif opcode == 0x23: # CFAR_ALPHA + if self._cfar_alpha != value: + self._cfar_alpha = value + changed = True + elif opcode == 0x24: # CFAR_MODE + if self._cfar_mode != value: + self._cfar_mode = value + changed = True + elif opcode == 0x25: # CFAR_ENABLE + new_en = bool(value) + if self._cfar_enable != new_en: + self._cfar_enable = new_en + changed = True + elif opcode == 0x26: # MTI_ENABLE + new_en = bool(value) + if self._mti_enable != new_en: + self._mti_enable = new_en + changed = True + elif opcode == 0x27: # DC_NOTCH_WIDTH + if self._dc_notch_width != value: + self._dc_notch_width = value + changed = True + if changed: + self._needs_rebuild = True + if changed: + log.info(f"Replay param updated: opcode=0x{opcode:02X} " + f"value={value} — will re-process") + else: + log.debug(f"Replay param unchanged: opcode=0x{opcode:02X} " + f"value={value}") + elif opcode in _HARDWARE_ONLY_OPCODES: + log.debug(f"Replay: hardware-only opcode 0x{opcode:02X} " + f"(ignored in replay mode)") + else: + log.debug(f"Replay: unknown opcode 0x{opcode:02X} (ignored)") return True - def _build_packets(self) -> bytes: - """Build a full frame of USB data packets from npy arrays.""" + def _load_arrays(self): + """Load source npy arrays once.""" npy = self._npy_dir + # MTI Doppler + self._dop_mti_i = np.load( + os.path.join(npy, "fullchain_mti_doppler_i.npy")).astype(np.int64) + self._dop_mti_q = np.load( + os.path.join(npy, "fullchain_mti_doppler_q.npy")).astype(np.int64) + # Non-MTI Doppler + self._dop_nomti_i = np.load( + os.path.join(npy, "doppler_map_i.npy")).astype(np.int64) + self._dop_nomti_q = np.load( + os.path.join(npy, "doppler_map_q.npy")).astype(np.int64) + # Range data + try: + range_i_all = np.load( + os.path.join(npy, "decimated_range_i.npy")).astype(np.int64) + range_q_all = np.load( + os.path.join(npy, "decimated_range_q.npy")).astype(np.int64) + self._range_i_vec = range_i_all[-1, :] # last chirp + self._range_q_vec = range_q_all[-1, :] + except FileNotFoundError: + self._range_i_vec = np.zeros(NUM_RANGE_BINS, dtype=np.int64) + self._range_q_vec = np.zeros(NUM_RANGE_BINS, dtype=np.int64) - if self._use_mti: - dop_i = np.load(os.path.join(npy, "fullchain_mti_doppler_i.npy")).astype(np.int64) - dop_q = np.load(os.path.join(npy, "fullchain_mti_doppler_q.npy")).astype(np.int64) - det = np.load(os.path.join(npy, "fullchain_cfar_flags.npy")) + def _build_packets(self) -> bytes: + """Build a full frame of USB data packets from current params.""" + # Select Doppler data based on MTI + if self._mti_enable: + dop_i = self._dop_mti_i + dop_q = self._dop_mti_q + else: + dop_i = self._dop_nomti_i + dop_q = self._dop_nomti_q + + # Apply DC notch + dop_i, dop_q = _replay_dc_notch(dop_i, dop_q, self._dc_notch_width) + + # Run CFAR + if self._cfar_enable: + det, _mag = _replay_cfar( + dop_i, dop_q, + guard=self._cfar_guard, + train=self._cfar_train, + alpha_q44=self._cfar_alpha, + mode=self._cfar_mode, + ) else: - dop_i = np.load(os.path.join(npy, "doppler_map_i.npy")).astype(np.int64) - dop_q = np.load(os.path.join(npy, "doppler_map_q.npy")).astype(np.int64) det = np.zeros((NUM_RANGE_BINS, NUM_DOPPLER_BINS), dtype=bool) - # Also load range data (use Doppler bin 0 column as range proxy, - # or load dedicated range if available) - try: - range_i_all = np.load(os.path.join(npy, "decimated_range_i.npy")).astype(np.int64) - range_q_all = np.load(os.path.join(npy, "decimated_range_q.npy")).astype(np.int64) - # Use last chirp as representative range profile - range_i_vec = range_i_all[-1, :] # (64,) - range_q_vec = range_q_all[-1, :] - except FileNotFoundError: - range_i_vec = np.zeros(NUM_RANGE_BINS, dtype=np.int64) - range_q_vec = np.zeros(NUM_RANGE_BINS, dtype=np.int64) + det_count = int(det.sum()) + log.info(f"Replay: rebuilt {NUM_CELLS} packets " + f"(MTI={'ON' if self._mti_enable else 'OFF'}, " + f"DC_notch={self._dc_notch_width}, " + f"CFAR={'ON' if self._cfar_enable else 'OFF'} " + f"G={self._cfar_guard} T={self._cfar_train} " + f"a=0x{self._cfar_alpha:02X} m={self._cfar_mode}, " + f"{det_count} detections)") - buf = bytearray() + range_i = self._range_i_vec + range_q = self._range_q_vec + + # Pre-allocate buffer (35 bytes per packet * 2048 cells) + buf = bytearray(NUM_CELLS * 35) + pos = 0 for rbin in range(NUM_RANGE_BINS): + ri = int(np.clip(range_i[rbin], -32768, 32767)) & 0xFFFF + rq = int(np.clip(range_q[rbin], -32768, 32767)) & 0xFFFF + rword = ((rq << 16) | ri) & 0xFFFFFFFF + rw0 = struct.pack(">I", rword) + rw1 = struct.pack(">I", (rword << 8) & 0xFFFFFFFF) + rw2 = struct.pack(">I", (rword << 16) & 0xFFFFFFFF) + rw3 = struct.pack(">I", (rword << 24) & 0xFFFFFFFF) for dbin in range(NUM_DOPPLER_BINS): - ri = int(np.clip(range_i_vec[rbin], -32768, 32767)) & 0xFFFF - rq = int(np.clip(range_q_vec[rbin], -32768, 32767)) & 0xFFFF di = int(np.clip(dop_i[rbin, dbin], -32768, 32767)) & 0xFFFF dq = int(np.clip(dop_q[rbin, dbin], -32768, 32767)) & 0xFFFF d = 1 if det[rbin, dbin] else 0 - pkt = bytearray() - pkt.append(HEADER_BYTE) - - rword = ((rq << 16) | ri) & 0xFFFFFFFF - pkt += struct.pack(">I", rword) - pkt += struct.pack(">I", (rword << 8) & 0xFFFFFFFF) - pkt += struct.pack(">I", (rword << 16) & 0xFFFFFFFF) - pkt += struct.pack(">I", (rword << 24) & 0xFFFFFFFF) - dword = ((di << 16) | dq) & 0xFFFFFFFF - pkt += struct.pack(">I", dword) - pkt += struct.pack(">I", (dword << 8) & 0xFFFFFFFF) - pkt += struct.pack(">I", (dword << 16) & 0xFFFFFFFF) - pkt += struct.pack(">I", (dword << 24) & 0xFFFFFFFF) - pkt.append(d) - pkt.append(FOOTER_BYTE) + buf[pos] = HEADER_BYTE + pos += 1 + buf[pos:pos+4] = rw0; pos += 4 + buf[pos:pos+4] = rw1; pos += 4 + buf[pos:pos+4] = rw2; pos += 4 + buf[pos:pos+4] = rw3; pos += 4 + buf[pos:pos+4] = struct.pack(">I", dword); pos += 4 + buf[pos:pos+4] = struct.pack(">I", (dword << 8) & 0xFFFFFFFF); pos += 4 + buf[pos:pos+4] = struct.pack(">I", (dword << 16) & 0xFFFFFFFF); pos += 4 + buf[pos:pos+4] = struct.pack(">I", (dword << 24) & 0xFFFFFFFF); pos += 4 + buf[pos] = d; pos += 1 + buf[pos] = FOOTER_BYTE; pos += 1 - buf += pkt - - log.info(f"Replay: built {NUM_CELLS} packets ({len(buf)} bytes), " - f"{int(det.sum())} detections") return bytes(buf) diff --git a/9_Firmware/9_3_GUI/test_radar_dashboard.py b/9_Firmware/9_3_GUI/test_radar_dashboard.py index be9745c..556cd87 100644 --- a/9_Firmware/9_3_GUI/test_radar_dashboard.py +++ b/9_Firmware/9_3_GUI/test_radar_dashboard.py @@ -20,6 +20,7 @@ from radar_protocol import ( RadarFrame, StatusResponse, HEADER_BYTE, FOOTER_BYTE, STATUS_HEADER_BYTE, NUM_RANGE_BINS, NUM_DOPPLER_BINS, NUM_CELLS, + _HARDWARE_ONLY_OPCODES, _REPLAY_ADJUSTABLE_OPCODES, ) @@ -469,7 +470,7 @@ class TestReplayConnection(unittest.TestCase): if result["detection"]: det_count += 1 self.assertEqual(parsed_count, NUM_CELLS) - # Should have 4 CFAR detections from the golden reference + # Default: MTI=ON, DC_notch=2, CFAR CA g=2 t=8 a=0x30 → 4 detections self.assertEqual(det_count, 4) conn.close() @@ -489,14 +490,14 @@ class TestReplayConnection(unittest.TestCase): conn.close() def test_replay_no_mti(self): - """ReplayConnection works with use_mti=False.""" + """ReplayConnection works with use_mti=False (CFAR still runs).""" if not self._npy_available(): self.skipTest("npy data files not found") from radar_protocol import ReplayConnection conn = ReplayConnection(self.NPY_DIR, use_mti=False) conn.open() self.assertEqual(conn._frame_len, NUM_CELLS * 35) - # No detections in non-MTI mode (flags are all zero) + # No-MTI with DC notch=2 and default CFAR → 0 detections raw = conn._packets boundaries = RadarProtocol.find_packet_boundaries(raw) det_count = sum(1 for s, e, t in boundaries @@ -505,7 +506,7 @@ class TestReplayConnection(unittest.TestCase): conn.close() def test_replay_write_returns_true(self): - """Write on replay connection returns True (no-op).""" + """Write on replay connection returns True.""" if not self._npy_available(): self.skipTest("npy data files not found") from radar_protocol import ReplayConnection @@ -514,6 +515,99 @@ class TestReplayConnection(unittest.TestCase): self.assertTrue(conn.write(b"\x01\x00\x00\x01")) conn.close() + def test_replay_adjustable_param_cfar_guard(self): + """Changing CFAR guard via write() triggers re-processing.""" + if not self._npy_available(): + self.skipTest("npy data files not found") + from radar_protocol import ReplayConnection + conn = ReplayConnection(self.NPY_DIR, use_mti=True) + conn.open() + # Initial: guard=2 → 4 detections + self.assertFalse(conn._needs_rebuild) + # Send CFAR_GUARD=4 + cmd = RadarProtocol.build_command(0x21, 4) + conn.write(cmd) + self.assertTrue(conn._needs_rebuild) + self.assertEqual(conn._cfar_guard, 4) + # Read triggers rebuild + conn.read(1024) + self.assertFalse(conn._needs_rebuild) + conn.close() + + def test_replay_adjustable_param_mti_toggle(self): + """Toggling MTI via write() triggers re-processing.""" + if not self._npy_available(): + self.skipTest("npy data files not found") + from radar_protocol import ReplayConnection + conn = ReplayConnection(self.NPY_DIR, use_mti=True) + conn.open() + # Disable MTI + cmd = RadarProtocol.build_command(0x26, 0) + conn.write(cmd) + self.assertTrue(conn._needs_rebuild) + self.assertFalse(conn._mti_enable) + # Read to trigger rebuild, then count detections + # Drain all packets after rebuild + conn.read(1024) # triggers rebuild + raw = conn._packets + boundaries = RadarProtocol.find_packet_boundaries(raw) + det_count = sum(1 for s, e, t in boundaries + if RadarProtocol.parse_data_packet(raw[s:e]).get("detection", 0)) + # No-MTI with default CFAR → 0 detections + self.assertEqual(det_count, 0) + conn.close() + + def test_replay_adjustable_param_dc_notch(self): + """Changing DC notch width via write() triggers re-processing.""" + if not self._npy_available(): + self.skipTest("npy data files not found") + from radar_protocol import ReplayConnection + conn = ReplayConnection(self.NPY_DIR, use_mti=True) + conn.open() + # Change DC notch to 0 (no notch) + cmd = RadarProtocol.build_command(0x27, 0) + conn.write(cmd) + self.assertTrue(conn._needs_rebuild) + self.assertEqual(conn._dc_notch_width, 0) + conn.read(1024) # triggers rebuild + raw = conn._packets + boundaries = RadarProtocol.find_packet_boundaries(raw) + det_count = sum(1 for s, e, t in boundaries + if RadarProtocol.parse_data_packet(raw[s:e]).get("detection", 0)) + # DC notch=0 with MTI → 6 detections (more noise passes through) + self.assertEqual(det_count, 6) + conn.close() + + def test_replay_hardware_opcode_ignored(self): + """Hardware-only opcodes don't trigger rebuild.""" + if not self._npy_available(): + self.skipTest("npy data files not found") + from radar_protocol import ReplayConnection + conn = ReplayConnection(self.NPY_DIR, use_mti=True) + conn.open() + # Send TRIGGER (hardware-only) + cmd = RadarProtocol.build_command(0x01, 1) + conn.write(cmd) + self.assertFalse(conn._needs_rebuild) + # Send STREAM_ENABLE (hardware-only) + cmd = RadarProtocol.build_command(0x05, 7) + conn.write(cmd) + self.assertFalse(conn._needs_rebuild) + conn.close() + + def test_replay_same_value_no_rebuild(self): + """Setting same value as current doesn't trigger rebuild.""" + if not self._npy_available(): + self.skipTest("npy data files not found") + from radar_protocol import ReplayConnection + conn = ReplayConnection(self.NPY_DIR, use_mti=True) + conn.open() + # CFAR guard already 2 + cmd = RadarProtocol.build_command(0x21, 2) + conn.write(cmd) + self.assertFalse(conn._needs_rebuild) + conn.close() + if __name__ == "__main__": unittest.main(verbosity=2)