Fix 5 GUI bugs: threaded connect, button toggle, live CFAR/MTI/DC replay, stable heatmap, physical axis labels

- Bug 1: Move conn.open() to background thread to prevent GUI hang
- Bug 2: Save btn_connect as instance var, toggle Connect/Disconnect text
- Bug 3: Split opcodes into hardware-only (silent) and replay-adjustable
  (CFAR/MTI/DC-notch params trigger bit-accurate pipeline re-processing)
- Bug 4: EMA-smoothed vmax (alpha=0.15), fftshift on Doppler axis
- Bug 5: Physical axis labels (range in meters, velocity in m/s)
- Add _replay_mti(), _replay_dc_notch(), _replay_cfar() standalone functions
- Expand TestReplayConnection from 6 to 11 tests (42/42 pass)
This commit is contained in:
Jason
2026-03-20 19:36:21 +02:00
parent f8d80cc96e
commit eb907de3d1
3 changed files with 499 additions and 75 deletions
+90 -18
View File
@@ -27,6 +27,7 @@ import time
import queue
import logging
import argparse
import threading
from typing import Optional, Dict
from collections import deque
@@ -77,6 +78,16 @@ class RadarDashboard:
UPDATE_INTERVAL_MS = 100 # 10 Hz display refresh
# Radar parameters for physical axis labels (ADI CN0566 defaults)
# Config: [sample_rate=4e6, IF=1e5, RF=9.9e9, chirps=256, BW=500e6,
# ramp_time=300e-6, ...]
SAMPLE_RATE = 4e6 # Hz — ADC sample rate (baseband)
BANDWIDTH = 500e6 # Hz — chirp bandwidth
RAMP_TIME = 300e-6 # s — chirp ramp time
CENTER_FREQ = 10.5e9 # Hz — X-band center frequency
NUM_CHIRPS_FRAME = 32 # chirps per Doppler frame
C = 3e8 # m/s — speed of light
def __init__(self, root: tk.Tk, connection: FT601Connection,
recorder: DataRecorder):
self.root = root
@@ -101,6 +112,10 @@ class RadarDashboard:
self._fps_ts = time.time()
self._fps = 0.0
# Stable colorscale — exponential moving average of vmax
self._vmax_ema = 1000.0
self._vmax_alpha = 0.15 # smoothing factor (lower = more stable)
self._build_ui()
self._schedule_update()
@@ -138,9 +153,10 @@ class RadarDashboard:
self.lbl_frame = ttk.Label(top, text="Frame: 0", font=("Menlo", 10))
self.lbl_frame.pack(side="left", padx=16)
btn_connect = ttk.Button(top, text="Connect", command=self._on_connect,
style="Accent.TButton")
btn_connect.pack(side="right", padx=4)
self.btn_connect = ttk.Button(top, text="Connect",
command=self._on_connect,
style="Accent.TButton")
self.btn_connect.pack(side="right", padx=4)
self.btn_record = ttk.Button(top, text="Record", command=self._on_record)
self.btn_record.pack(side="right", padx=4)
@@ -161,9 +177,30 @@ class RadarDashboard:
self._build_log_tab(tab_log)
def _build_display_tab(self, parent):
# Compute physical axis limits
# Range resolution: dR = c / (2 * BW) per range bin
# But we decimate 1024→64 bins, so each bin spans 16 FFT bins.
# Range per FFT bin = c / (2 * BW) * (Fs / FFT_SIZE) — simplified:
# max_range = c * Fs / (4 * BW) for Fs-sampled baseband
# range_per_bin = max_range / NUM_RANGE_BINS
range_res = self.C / (2.0 * self.BANDWIDTH) # ~0.3 m per FFT bin
# After decimation 1024→64, each range bin = 16 FFT bins
range_per_bin = range_res * 16
max_range = range_per_bin * NUM_RANGE_BINS
# Velocity resolution: dv = lambda / (2 * N_chirps * T_chirp)
wavelength = self.C / self.CENTER_FREQ
# Max unambiguous velocity = lambda / (4 * T_chirp)
max_vel = wavelength / (4.0 * self.RAMP_TIME)
vel_per_bin = 2.0 * max_vel / NUM_DOPPLER_BINS
# Doppler axis: bin 0 = 0 Hz (DC), wraps at Nyquist
# For display: center DC, so shift axis to [-max_vel, +max_vel)
vel_lo = -max_vel
vel_hi = max_vel
# Matplotlib figure with 3 subplots
self.fig = Figure(figsize=(14, 7), facecolor=BG)
self.fig.subplots_adjust(left=0.06, right=0.98, top=0.94, bottom=0.08,
self.fig.subplots_adjust(left=0.07, right=0.98, top=0.94, bottom=0.10,
wspace=0.30, hspace=0.35)
# Range-Doppler heatmap
@@ -172,14 +209,21 @@ class RadarDashboard:
self._rd_img = self.ax_rd.imshow(
np.zeros((NUM_RANGE_BINS, NUM_DOPPLER_BINS)),
aspect="auto", cmap="inferno", origin="lower",
extent=[0, NUM_DOPPLER_BINS, 0, NUM_RANGE_BINS],
extent=[vel_lo, vel_hi, 0, max_range],
vmin=0, vmax=1000,
)
self.ax_rd.set_title("Range-Doppler Map", color=FG, fontsize=12)
self.ax_rd.set_xlabel("Doppler Bin", color=FG)
self.ax_rd.set_ylabel("Range Bin", color=FG)
self.ax_rd.set_xlabel("Velocity (m/s)", color=FG)
self.ax_rd.set_ylabel("Range (m)", color=FG)
self.ax_rd.tick_params(colors=FG)
# Save axis limits for coordinate conversions
self._vel_lo = vel_lo
self._vel_hi = vel_hi
self._max_range = max_range
self._range_per_bin = range_per_bin
self._vel_per_bin = vel_per_bin
# CFAR detection overlay (scatter)
self._det_scatter = self.ax_rd.scatter([], [], s=30, c=GREEN,
marker="x", linewidths=1.5,
@@ -191,11 +235,11 @@ class RadarDashboard:
wf_init = np.zeros((WATERFALL_DEPTH, NUM_RANGE_BINS))
self._wf_img = self.ax_wf.imshow(
wf_init, aspect="auto", cmap="viridis", origin="lower",
extent=[0, NUM_RANGE_BINS, 0, WATERFALL_DEPTH],
extent=[0, max_range, 0, WATERFALL_DEPTH],
vmin=0, vmax=5000,
)
self.ax_wf.set_title("Range Waterfall", color=FG, fontsize=12)
self.ax_wf.set_xlabel("Range Bin", color=FG)
self.ax_wf.set_xlabel("Range (m)", color=FG)
self.ax_wf.set_ylabel("Frame", color=FG)
self.ax_wf.tick_params(colors=FG)
@@ -300,17 +344,35 @@ class RadarDashboard:
self._acq_thread = None
self.conn.close()
self.lbl_status.config(text="DISCONNECTED", foreground=RED)
self.btn_connect.config(text="Connect")
log.info("Disconnected")
return
if self.conn.open():
# Open connection in a background thread to avoid blocking the GUI
self.lbl_status.config(text="CONNECTING...", foreground=YELLOW)
self.btn_connect.config(state="disabled")
self.root.update_idletasks()
def _do_connect():
ok = self.conn.open()
# Schedule UI update back on the main thread
self.root.after(0, lambda: self._on_connect_done(ok))
threading.Thread(target=_do_connect, daemon=True).start()
def _on_connect_done(self, success: bool):
"""Called on main thread after connection attempt completes."""
self.btn_connect.config(state="normal")
if success:
self.lbl_status.config(text="CONNECTED", foreground=GREEN)
self.btn_connect.config(text="Disconnect")
self._acq_thread = RadarAcquisition(
self.conn, self.frame_queue, self.recorder)
self._acq_thread.start()
log.info("Connected and acquisition started")
else:
self.lbl_status.config(text="CONNECT FAILED", foreground=RED)
self.btn_connect.config(text="Connect")
def _on_record(self):
if self.recorder.recording:
@@ -375,16 +437,26 @@ class RadarDashboard:
self.lbl_frame.config(text=f"Frame: {frame.frame_number}")
# Update range-Doppler heatmap
mag = frame.magnitude
vmax = max(np.max(mag), 1.0)
self._rd_img.set_data(mag)
self._rd_img.set_clim(vmin=0, vmax=vmax)
# FFT-shift Doppler axis so DC (bin 0) is in the center
mag = np.fft.fftshift(frame.magnitude, axes=1)
det_shifted = np.fft.fftshift(frame.detections, axes=1)
# Update CFAR overlay
det_coords = np.argwhere(frame.detections > 0)
# Stable colorscale via EMA smoothing of vmax
frame_vmax = float(np.max(mag)) if np.max(mag) > 0 else 1.0
self._vmax_ema = (self._vmax_alpha * frame_vmax +
(1.0 - self._vmax_alpha) * self._vmax_ema)
stable_vmax = max(self._vmax_ema, 1.0)
self._rd_img.set_data(mag)
self._rd_img.set_clim(vmin=0, vmax=stable_vmax)
# Update CFAR overlay — convert bin indices to physical coordinates
det_coords = np.argwhere(det_shifted > 0)
if len(det_coords) > 0:
offsets = np.column_stack([det_coords[:, 1] + 0.5,
det_coords[:, 0] + 0.5])
# det_coords[:, 0] = range bin, det_coords[:, 1] = Doppler bin
range_m = (det_coords[:, 0] + 0.5) * self._range_per_bin
vel_ms = self._vel_lo + (det_coords[:, 1] + 0.5) * self._vel_per_bin
offsets = np.column_stack([vel_ms, range_m])
self._det_scatter.set_offsets(offsets)
else:
self._det_scatter.set_offsets(np.empty((0, 2)))
+311 -53
View File
@@ -409,43 +409,201 @@ class FT601Connection:
# Replay Connection — feed real .npy data through the dashboard
# ============================================================================
# Hardware-only opcodes that cannot be adjusted in replay mode
_HARDWARE_ONLY_OPCODES = {
0x01, # TRIGGER
0x02, # PRF_DIV
0x03, # NUM_CHIRPS
0x04, # CHIRP_TIMER
0x05, # STREAM_ENABLE
0x06, # GAIN_SHIFT
0x10, # THRESHOLD / LONG_CHIRP
0x11, # LONG_LISTEN
0x12, # GUARD
0x13, # SHORT_CHIRP
0x14, # SHORT_LISTEN
0x15, # CHIRPS_PER_ELEV
0x16, # DIGITAL_GAIN
0x20, # RANGE_MODE
0xFF, # STATUS_REQUEST
}
# Replay-adjustable opcodes (re-run signal processing)
_REPLAY_ADJUSTABLE_OPCODES = {
0x21, # CFAR_GUARD
0x22, # CFAR_TRAIN
0x23, # CFAR_ALPHA
0x24, # CFAR_MODE
0x25, # CFAR_ENABLE
0x26, # MTI_ENABLE
0x27, # DC_NOTCH_WIDTH
}
def _saturate(val: int, bits: int) -> int:
"""Saturate signed value to fit in 'bits' width."""
max_pos = (1 << (bits - 1)) - 1
max_neg = -(1 << (bits - 1))
return max(max_neg, min(max_pos, int(val)))
def _replay_mti(decim_i: np.ndarray, decim_q: np.ndarray,
enable: bool) -> Tuple[np.ndarray, np.ndarray]:
"""Bit-accurate 2-pulse MTI canceller (matches mti_canceller.v)."""
n_chirps, n_bins = decim_i.shape
mti_i = np.zeros_like(decim_i)
mti_q = np.zeros_like(decim_q)
if not enable:
return decim_i.copy(), decim_q.copy()
for c in range(n_chirps):
if c == 0:
pass # muted
else:
for r in range(n_bins):
mti_i[c, r] = _saturate(int(decim_i[c, r]) - int(decim_i[c - 1, r]), 16)
mti_q[c, r] = _saturate(int(decim_q[c, r]) - int(decim_q[c - 1, r]), 16)
return mti_i, mti_q
def _replay_dc_notch(doppler_i: np.ndarray, doppler_q: np.ndarray,
width: int) -> Tuple[np.ndarray, np.ndarray]:
"""Bit-accurate DC notch filter (matches radar_system_top.v inline)."""
out_i = doppler_i.copy()
out_q = doppler_q.copy()
if width == 0:
return out_i, out_q
n_doppler = doppler_i.shape[1]
for dbin in range(n_doppler):
if dbin < width or dbin > (n_doppler - 1 - width + 1):
out_i[:, dbin] = 0
out_q[:, dbin] = 0
return out_i, out_q
def _replay_cfar(doppler_i: np.ndarray, doppler_q: np.ndarray,
guard: int, train: int, alpha_q44: int,
mode: int) -> Tuple[np.ndarray, np.ndarray]:
"""
Bit-accurate CA-CFAR detector (matches cfar_ca.v).
Returns (detect_flags, magnitudes) both (64, 32).
"""
ALPHA_FRAC_BITS = 4
n_range, n_doppler = doppler_i.shape
if train == 0:
train = 1
# Compute magnitudes: |I| + |Q| (17-bit unsigned L1 norm)
magnitudes = np.zeros((n_range, n_doppler), dtype=np.int64)
for r in range(n_range):
for d in range(n_doppler):
i_val = int(doppler_i[r, d])
q_val = int(doppler_q[r, d])
abs_i = (-i_val) & 0xFFFF if i_val < 0 else i_val & 0xFFFF
abs_q = (-q_val) & 0xFFFF if q_val < 0 else q_val & 0xFFFF
magnitudes[r, d] = abs_i + abs_q
detect_flags = np.zeros((n_range, n_doppler), dtype=np.bool_)
MAX_MAG = (1 << 17) - 1
mode_names = {0: 'CA', 1: 'GO', 2: 'SO'}
mode_str = mode_names.get(mode, 'CA')
for dbin in range(n_doppler):
col = magnitudes[:, dbin]
for cut in range(n_range):
lead_sum, lead_cnt = 0, 0
for t in range(1, train + 1):
idx = cut - guard - t
if 0 <= idx < n_range:
lead_sum += int(col[idx])
lead_cnt += 1
lag_sum, lag_cnt = 0, 0
for t in range(1, train + 1):
idx = cut + guard + t
if 0 <= idx < n_range:
lag_sum += int(col[idx])
lag_cnt += 1
if mode_str == 'CA':
noise = lead_sum + lag_sum
elif mode_str == 'GO':
if lead_cnt > 0 and lag_cnt > 0:
noise = lead_sum if lead_sum * lag_cnt > lag_sum * lead_cnt else lag_sum
else:
noise = lead_sum if lead_cnt > 0 else lag_sum
elif mode_str == 'SO':
if lead_cnt > 0 and lag_cnt > 0:
noise = lead_sum if lead_sum * lag_cnt < lag_sum * lead_cnt else lag_sum
else:
noise = lead_sum if lead_cnt > 0 else lag_sum
else:
noise = lead_sum + lag_sum
thr = min((alpha_q44 * noise) >> ALPHA_FRAC_BITS, MAX_MAG)
if int(col[cut]) > thr:
detect_flags[cut, dbin] = True
return detect_flags, magnitudes
class ReplayConnection:
"""
Loads pre-computed .npy arrays (from golden_reference.py co-sim output)
and serves them as USB data packets to the dashboard, exercising the full
parsing pipeline with real ADI CN0566 radar data.
Supports multiple pipeline views (no-MTI, with-MTI) and loops the single
frame continuously so the waterfall/heatmap stay populated.
Signal processing parameters (CFAR guard/train/alpha/mode, MTI enable,
DC notch width) can be adjusted at runtime via write() — the connection
re-runs the bit-accurate processing pipeline and rebuilds packets.
Required npy directory layout (e.g. tb/cosim/real_data/hex/):
doppler_map_i.npy (64, 32) int — Doppler I (no MTI)
doppler_map_q.npy (64, 32) int — Doppler Q (no MTI)
fullchain_mti_doppler_i.npy(64, 32) int — Doppler I (with MTI)
fullchain_mti_doppler_q.npy(64, 32) int — Doppler Q (with MTI)
fullchain_cfar_flags.npy (64, 32) bool — CFAR detections
fullchain_cfar_mag.npy (64, 32) int — CFAR |I|+|Q| magnitude
decimated_range_i.npy (32, 64) int — pre-Doppler range I
decimated_range_q.npy (32, 64) int — pre-Doppler range Q
doppler_map_i.npy (64, 32) int — Doppler I (no MTI)
doppler_map_q.npy (64, 32) int — Doppler Q (no MTI)
fullchain_mti_doppler_i.npy (64, 32) int — Doppler I (with MTI)
fullchain_mti_doppler_q.npy (64, 32) int — Doppler Q (with MTI)
fullchain_cfar_flags.npy (64, 32) bool — CFAR detections
fullchain_cfar_mag.npy (64, 32) int — CFAR |I|+|Q| magnitude
"""
def __init__(self, npy_dir: str, use_mti: bool = True,
replay_fps: float = 5.0):
self._npy_dir = npy_dir
self._use_mti = use_mti
self._replay_interval = 1.0 / max(replay_fps, 0.1)
self._replay_fps = max(replay_fps, 0.1)
self._lock = threading.Lock()
self.is_open = False
self._packets: bytes = b""
self._read_offset = 0
self._frame_len = 0
# Current signal-processing parameters
self._mti_enable: bool = use_mti
self._dc_notch_width: int = 2
self._cfar_guard: int = 2
self._cfar_train: int = 8
self._cfar_alpha: int = 0x30
self._cfar_mode: int = 0 # 0=CA, 1=GO, 2=SO
self._cfar_enable: bool = True
# Raw source arrays (loaded once, reprocessed on param change)
self._dop_mti_i: Optional[np.ndarray] = None
self._dop_mti_q: Optional[np.ndarray] = None
self._dop_nomti_i: Optional[np.ndarray] = None
self._dop_nomti_q: Optional[np.ndarray] = None
self._range_i_vec: Optional[np.ndarray] = None
self._range_q_vec: Optional[np.ndarray] = None
# Rebuild flag
self._needs_rebuild = False
def open(self, device_index: int = 0) -> bool:
try:
self._load_arrays()
self._packets = self._build_packets()
self._frame_len = len(self._packets)
self._read_offset = 0
self.is_open = True
log.info(f"Replay connection opened: {self._npy_dir} "
f"(MTI={'ON' if self._use_mti else 'OFF'}, "
f"(MTI={'ON' if self._mti_enable else 'OFF'}, "
f"{self._frame_len} bytes/frame)")
return True
except Exception as e:
@@ -458,8 +616,15 @@ class ReplayConnection:
def read(self, size: int = 4096) -> Optional[bytes]:
if not self.is_open:
return None
time.sleep(self._replay_interval / (NUM_CELLS / 32))
# Pace reads to target FPS (spread across ~64 reads per frame)
time.sleep((1.0 / self._replay_fps) / (NUM_CELLS / 32))
with self._lock:
# If params changed, rebuild packets
if self._needs_rebuild:
self._packets = self._build_packets()
self._frame_len = len(self._packets)
self._read_offset = 0
self._needs_rebuild = False
end = self._read_offset + size
if end <= self._frame_len:
chunk = self._packets[self._read_offset:end]
@@ -470,65 +635,158 @@ class ReplayConnection:
return chunk
def write(self, data: bytes) -> bool:
log.info(f"Replay write (ignored): {data.hex()}")
"""
Handle host commands in replay mode.
Signal-processing params (CFAR, MTI, DC notch) trigger re-processing.
Hardware-only params are silently ignored.
"""
if len(data) < 4:
return True
word = struct.unpack(">I", data[:4])[0]
opcode = (word >> 24) & 0xFF
value = word & 0xFFFF
if opcode in _REPLAY_ADJUSTABLE_OPCODES:
changed = False
with self._lock:
if opcode == 0x21: # CFAR_GUARD
if self._cfar_guard != value:
self._cfar_guard = value
changed = True
elif opcode == 0x22: # CFAR_TRAIN
if self._cfar_train != value:
self._cfar_train = value
changed = True
elif opcode == 0x23: # CFAR_ALPHA
if self._cfar_alpha != value:
self._cfar_alpha = value
changed = True
elif opcode == 0x24: # CFAR_MODE
if self._cfar_mode != value:
self._cfar_mode = value
changed = True
elif opcode == 0x25: # CFAR_ENABLE
new_en = bool(value)
if self._cfar_enable != new_en:
self._cfar_enable = new_en
changed = True
elif opcode == 0x26: # MTI_ENABLE
new_en = bool(value)
if self._mti_enable != new_en:
self._mti_enable = new_en
changed = True
elif opcode == 0x27: # DC_NOTCH_WIDTH
if self._dc_notch_width != value:
self._dc_notch_width = value
changed = True
if changed:
self._needs_rebuild = True
if changed:
log.info(f"Replay param updated: opcode=0x{opcode:02X} "
f"value={value} — will re-process")
else:
log.debug(f"Replay param unchanged: opcode=0x{opcode:02X} "
f"value={value}")
elif opcode in _HARDWARE_ONLY_OPCODES:
log.debug(f"Replay: hardware-only opcode 0x{opcode:02X} "
f"(ignored in replay mode)")
else:
log.debug(f"Replay: unknown opcode 0x{opcode:02X} (ignored)")
return True
def _build_packets(self) -> bytes:
"""Build a full frame of USB data packets from npy arrays."""
def _load_arrays(self):
"""Load source npy arrays once."""
npy = self._npy_dir
# MTI Doppler
self._dop_mti_i = np.load(
os.path.join(npy, "fullchain_mti_doppler_i.npy")).astype(np.int64)
self._dop_mti_q = np.load(
os.path.join(npy, "fullchain_mti_doppler_q.npy")).astype(np.int64)
# Non-MTI Doppler
self._dop_nomti_i = np.load(
os.path.join(npy, "doppler_map_i.npy")).astype(np.int64)
self._dop_nomti_q = np.load(
os.path.join(npy, "doppler_map_q.npy")).astype(np.int64)
# Range data
try:
range_i_all = np.load(
os.path.join(npy, "decimated_range_i.npy")).astype(np.int64)
range_q_all = np.load(
os.path.join(npy, "decimated_range_q.npy")).astype(np.int64)
self._range_i_vec = range_i_all[-1, :] # last chirp
self._range_q_vec = range_q_all[-1, :]
except FileNotFoundError:
self._range_i_vec = np.zeros(NUM_RANGE_BINS, dtype=np.int64)
self._range_q_vec = np.zeros(NUM_RANGE_BINS, dtype=np.int64)
if self._use_mti:
dop_i = np.load(os.path.join(npy, "fullchain_mti_doppler_i.npy")).astype(np.int64)
dop_q = np.load(os.path.join(npy, "fullchain_mti_doppler_q.npy")).astype(np.int64)
det = np.load(os.path.join(npy, "fullchain_cfar_flags.npy"))
def _build_packets(self) -> bytes:
"""Build a full frame of USB data packets from current params."""
# Select Doppler data based on MTI
if self._mti_enable:
dop_i = self._dop_mti_i
dop_q = self._dop_mti_q
else:
dop_i = self._dop_nomti_i
dop_q = self._dop_nomti_q
# Apply DC notch
dop_i, dop_q = _replay_dc_notch(dop_i, dop_q, self._dc_notch_width)
# Run CFAR
if self._cfar_enable:
det, _mag = _replay_cfar(
dop_i, dop_q,
guard=self._cfar_guard,
train=self._cfar_train,
alpha_q44=self._cfar_alpha,
mode=self._cfar_mode,
)
else:
dop_i = np.load(os.path.join(npy, "doppler_map_i.npy")).astype(np.int64)
dop_q = np.load(os.path.join(npy, "doppler_map_q.npy")).astype(np.int64)
det = np.zeros((NUM_RANGE_BINS, NUM_DOPPLER_BINS), dtype=bool)
# Also load range data (use Doppler bin 0 column as range proxy,
# or load dedicated range if available)
try:
range_i_all = np.load(os.path.join(npy, "decimated_range_i.npy")).astype(np.int64)
range_q_all = np.load(os.path.join(npy, "decimated_range_q.npy")).astype(np.int64)
# Use last chirp as representative range profile
range_i_vec = range_i_all[-1, :] # (64,)
range_q_vec = range_q_all[-1, :]
except FileNotFoundError:
range_i_vec = np.zeros(NUM_RANGE_BINS, dtype=np.int64)
range_q_vec = np.zeros(NUM_RANGE_BINS, dtype=np.int64)
det_count = int(det.sum())
log.info(f"Replay: rebuilt {NUM_CELLS} packets "
f"(MTI={'ON' if self._mti_enable else 'OFF'}, "
f"DC_notch={self._dc_notch_width}, "
f"CFAR={'ON' if self._cfar_enable else 'OFF'} "
f"G={self._cfar_guard} T={self._cfar_train} "
f"a=0x{self._cfar_alpha:02X} m={self._cfar_mode}, "
f"{det_count} detections)")
buf = bytearray()
range_i = self._range_i_vec
range_q = self._range_q_vec
# Pre-allocate buffer (35 bytes per packet * 2048 cells)
buf = bytearray(NUM_CELLS * 35)
pos = 0
for rbin in range(NUM_RANGE_BINS):
ri = int(np.clip(range_i[rbin], -32768, 32767)) & 0xFFFF
rq = int(np.clip(range_q[rbin], -32768, 32767)) & 0xFFFF
rword = ((rq << 16) | ri) & 0xFFFFFFFF
rw0 = struct.pack(">I", rword)
rw1 = struct.pack(">I", (rword << 8) & 0xFFFFFFFF)
rw2 = struct.pack(">I", (rword << 16) & 0xFFFFFFFF)
rw3 = struct.pack(">I", (rword << 24) & 0xFFFFFFFF)
for dbin in range(NUM_DOPPLER_BINS):
ri = int(np.clip(range_i_vec[rbin], -32768, 32767)) & 0xFFFF
rq = int(np.clip(range_q_vec[rbin], -32768, 32767)) & 0xFFFF
di = int(np.clip(dop_i[rbin, dbin], -32768, 32767)) & 0xFFFF
dq = int(np.clip(dop_q[rbin, dbin], -32768, 32767)) & 0xFFFF
d = 1 if det[rbin, dbin] else 0
pkt = bytearray()
pkt.append(HEADER_BYTE)
rword = ((rq << 16) | ri) & 0xFFFFFFFF
pkt += struct.pack(">I", rword)
pkt += struct.pack(">I", (rword << 8) & 0xFFFFFFFF)
pkt += struct.pack(">I", (rword << 16) & 0xFFFFFFFF)
pkt += struct.pack(">I", (rword << 24) & 0xFFFFFFFF)
dword = ((di << 16) | dq) & 0xFFFFFFFF
pkt += struct.pack(">I", dword)
pkt += struct.pack(">I", (dword << 8) & 0xFFFFFFFF)
pkt += struct.pack(">I", (dword << 16) & 0xFFFFFFFF)
pkt += struct.pack(">I", (dword << 24) & 0xFFFFFFFF)
pkt.append(d)
pkt.append(FOOTER_BYTE)
buf[pos] = HEADER_BYTE
pos += 1
buf[pos:pos+4] = rw0; pos += 4
buf[pos:pos+4] = rw1; pos += 4
buf[pos:pos+4] = rw2; pos += 4
buf[pos:pos+4] = rw3; pos += 4
buf[pos:pos+4] = struct.pack(">I", dword); pos += 4
buf[pos:pos+4] = struct.pack(">I", (dword << 8) & 0xFFFFFFFF); pos += 4
buf[pos:pos+4] = struct.pack(">I", (dword << 16) & 0xFFFFFFFF); pos += 4
buf[pos:pos+4] = struct.pack(">I", (dword << 24) & 0xFFFFFFFF); pos += 4
buf[pos] = d; pos += 1
buf[pos] = FOOTER_BYTE; pos += 1
buf += pkt
log.info(f"Replay: built {NUM_CELLS} packets ({len(buf)} bytes), "
f"{int(det.sum())} detections")
return bytes(buf)
+98 -4
View File
@@ -20,6 +20,7 @@ from radar_protocol import (
RadarFrame, StatusResponse,
HEADER_BYTE, FOOTER_BYTE, STATUS_HEADER_BYTE,
NUM_RANGE_BINS, NUM_DOPPLER_BINS, NUM_CELLS,
_HARDWARE_ONLY_OPCODES, _REPLAY_ADJUSTABLE_OPCODES,
)
@@ -469,7 +470,7 @@ class TestReplayConnection(unittest.TestCase):
if result["detection"]:
det_count += 1
self.assertEqual(parsed_count, NUM_CELLS)
# Should have 4 CFAR detections from the golden reference
# Default: MTI=ON, DC_notch=2, CFAR CA g=2 t=8 a=0x30 → 4 detections
self.assertEqual(det_count, 4)
conn.close()
@@ -489,14 +490,14 @@ class TestReplayConnection(unittest.TestCase):
conn.close()
def test_replay_no_mti(self):
"""ReplayConnection works with use_mti=False."""
"""ReplayConnection works with use_mti=False (CFAR still runs)."""
if not self._npy_available():
self.skipTest("npy data files not found")
from radar_protocol import ReplayConnection
conn = ReplayConnection(self.NPY_DIR, use_mti=False)
conn.open()
self.assertEqual(conn._frame_len, NUM_CELLS * 35)
# No detections in non-MTI mode (flags are all zero)
# No-MTI with DC notch=2 and default CFAR → 0 detections
raw = conn._packets
boundaries = RadarProtocol.find_packet_boundaries(raw)
det_count = sum(1 for s, e, t in boundaries
@@ -505,7 +506,7 @@ class TestReplayConnection(unittest.TestCase):
conn.close()
def test_replay_write_returns_true(self):
"""Write on replay connection returns True (no-op)."""
"""Write on replay connection returns True."""
if not self._npy_available():
self.skipTest("npy data files not found")
from radar_protocol import ReplayConnection
@@ -514,6 +515,99 @@ class TestReplayConnection(unittest.TestCase):
self.assertTrue(conn.write(b"\x01\x00\x00\x01"))
conn.close()
def test_replay_adjustable_param_cfar_guard(self):
"""Changing CFAR guard via write() triggers re-processing."""
if not self._npy_available():
self.skipTest("npy data files not found")
from radar_protocol import ReplayConnection
conn = ReplayConnection(self.NPY_DIR, use_mti=True)
conn.open()
# Initial: guard=2 → 4 detections
self.assertFalse(conn._needs_rebuild)
# Send CFAR_GUARD=4
cmd = RadarProtocol.build_command(0x21, 4)
conn.write(cmd)
self.assertTrue(conn._needs_rebuild)
self.assertEqual(conn._cfar_guard, 4)
# Read triggers rebuild
conn.read(1024)
self.assertFalse(conn._needs_rebuild)
conn.close()
def test_replay_adjustable_param_mti_toggle(self):
"""Toggling MTI via write() triggers re-processing."""
if not self._npy_available():
self.skipTest("npy data files not found")
from radar_protocol import ReplayConnection
conn = ReplayConnection(self.NPY_DIR, use_mti=True)
conn.open()
# Disable MTI
cmd = RadarProtocol.build_command(0x26, 0)
conn.write(cmd)
self.assertTrue(conn._needs_rebuild)
self.assertFalse(conn._mti_enable)
# Read to trigger rebuild, then count detections
# Drain all packets after rebuild
conn.read(1024) # triggers rebuild
raw = conn._packets
boundaries = RadarProtocol.find_packet_boundaries(raw)
det_count = sum(1 for s, e, t in boundaries
if RadarProtocol.parse_data_packet(raw[s:e]).get("detection", 0))
# No-MTI with default CFAR → 0 detections
self.assertEqual(det_count, 0)
conn.close()
def test_replay_adjustable_param_dc_notch(self):
"""Changing DC notch width via write() triggers re-processing."""
if not self._npy_available():
self.skipTest("npy data files not found")
from radar_protocol import ReplayConnection
conn = ReplayConnection(self.NPY_DIR, use_mti=True)
conn.open()
# Change DC notch to 0 (no notch)
cmd = RadarProtocol.build_command(0x27, 0)
conn.write(cmd)
self.assertTrue(conn._needs_rebuild)
self.assertEqual(conn._dc_notch_width, 0)
conn.read(1024) # triggers rebuild
raw = conn._packets
boundaries = RadarProtocol.find_packet_boundaries(raw)
det_count = sum(1 for s, e, t in boundaries
if RadarProtocol.parse_data_packet(raw[s:e]).get("detection", 0))
# DC notch=0 with MTI → 6 detections (more noise passes through)
self.assertEqual(det_count, 6)
conn.close()
def test_replay_hardware_opcode_ignored(self):
"""Hardware-only opcodes don't trigger rebuild."""
if not self._npy_available():
self.skipTest("npy data files not found")
from radar_protocol import ReplayConnection
conn = ReplayConnection(self.NPY_DIR, use_mti=True)
conn.open()
# Send TRIGGER (hardware-only)
cmd = RadarProtocol.build_command(0x01, 1)
conn.write(cmd)
self.assertFalse(conn._needs_rebuild)
# Send STREAM_ENABLE (hardware-only)
cmd = RadarProtocol.build_command(0x05, 7)
conn.write(cmd)
self.assertFalse(conn._needs_rebuild)
conn.close()
def test_replay_same_value_no_rebuild(self):
"""Setting same value as current doesn't trigger rebuild."""
if not self._npy_available():
self.skipTest("npy data files not found")
from radar_protocol import ReplayConnection
conn = ReplayConnection(self.NPY_DIR, use_mti=True)
conn.open()
# CFAR guard already 2
cmd = RadarProtocol.build_command(0x21, 2)
conn.write(cmd)
self.assertFalse(conn._needs_rebuild)
conn.close()
if __name__ == "__main__":
unittest.main(verbosity=2)