refactor: revert replay code, preserve non-replay fixes

Revert raw IQ replay (commits 2cb56e8..6095893) to prepare
for unified SoftwareFPGA replay architecture.

Preserved: C-locale spinboxes, AGC chart label, demo/radar
mutual exclusion.

Delete v7/raw_iq_replay.py
Restore workers.py, processing.py, models.py, __init__.py, test_v7.py
This commit is contained in:
Jason
2026-04-14 09:57:25 +05:45
parent 609589349d
commit 2387f7f29f
7 changed files with 75 additions and 1352 deletions
-33
View File
@@ -69,39 +69,6 @@ class TestRadarSettings(unittest.TestCase):
self.assertEqual(s.max_distance, 50000) self.assertEqual(s.max_distance, 50000)
class TestWaveformConfig(unittest.TestCase):
"""WaveformConfig — range/velocity resolution from waveform params."""
def test_adi_phaser_range_resolution(self):
"""ADI CN0566 defaults: 4MSPS, 500MHz BW, 300µs chirp, 1079 samples."""
wf = _models().WaveformConfig() # ADI phaser defaults
r_res = wf.range_resolution(n_samples=1079)
# Expected: c * fs / (2 * N * slope) = 3e8 * 4e6 / (2 * 1079 * 1.667e12)
# ≈ 0.334 m/bin
self.assertAlmostEqual(r_res, 0.334, places=2)
def test_adi_phaser_velocity_resolution(self):
"""ADI phaser: 256 chirps, 1079 samples at 4 MSPS."""
wf = _models().WaveformConfig()
v_res = wf.velocity_resolution(n_samples=1079, n_chirps=256)
# λ * fs / (2 * N * M) = 0.03 * 4e6 / (2 * 1079 * 256) ≈ 0.217 m/s/bin
self.assertAlmostEqual(v_res, 0.217, places=2)
def test_max_range(self):
wf = _models().WaveformConfig()
max_r = wf.max_range(n_range_bins=64, n_samples=1079)
# 0.334 * 64 ≈ 21.4 m
self.assertAlmostEqual(max_r, 21.4, places=0)
def test_plfm_defaults_differ(self):
"""PLFM FPGA defaults (781.25 m/bin) must NOT equal ADI phaser."""
default_settings = _models().RadarSettings()
wf = _models().WaveformConfig()
r_res = wf.range_resolution(n_samples=1079)
self.assertNotAlmostEqual(default_settings.range_resolution, r_res,
places=0) # 781 vs 0.33
class TestGPSData(unittest.TestCase): class TestGPSData(unittest.TestCase):
def test_to_dict(self): def test_to_dict(self):
g = _models().GPSData(latitude=41.9, longitude=12.5, g = _models().GPSData(latitude=41.9, longitude=12.5,
+1 -2
View File
@@ -11,7 +11,6 @@ top-level imports:
from .models import ( from .models import (
RadarTarget, RadarTarget,
RadarSettings, RadarSettings,
WaveformConfig,
GPSData, GPSData,
ProcessingConfig, ProcessingConfig,
TileServer, TileServer,
@@ -67,7 +66,7 @@ except ImportError: # PyQt6 not installed (e.g. CI headless runner)
__all__ = [ # noqa: RUF022 __all__ = [ # noqa: RUF022
# models # models
"RadarTarget", "RadarSettings", "WaveformConfig", "GPSData", "ProcessingConfig", "TileServer", "RadarTarget", "RadarSettings", "GPSData", "ProcessingConfig", "TileServer",
"DARK_BG", "DARK_FG", "DARK_ACCENT", "DARK_HIGHLIGHT", "DARK_BORDER", "DARK_BG", "DARK_FG", "DARK_ACCENT", "DARK_HIGHLIGHT", "DARK_BORDER",
"DARK_TEXT", "DARK_BUTTON", "DARK_BUTTON_HOVER", "DARK_TEXT", "DARK_BUTTON", "DARK_BUTTON_HOVER",
"DARK_TREEVIEW", "DARK_TREEVIEW_ALT", "DARK_TREEVIEW", "DARK_TREEVIEW_ALT",
+9 -463
View File
@@ -23,10 +23,8 @@ commands sent over FT2232H.
""" """
import time import time
import re
import logging import logging
from collections import deque from collections import deque
from pathlib import Path
import numpy as np import numpy as np
@@ -44,7 +42,7 @@ from matplotlib.backends.backend_qtagg import FigureCanvasQTAgg
from matplotlib.figure import Figure from matplotlib.figure import Figure
from .models import ( from .models import (
RadarTarget, RadarSettings, WaveformConfig, GPSData, ProcessingConfig, RadarTarget, RadarSettings, GPSData, ProcessingConfig,
DARK_BG, DARK_FG, DARK_ACCENT, DARK_HIGHLIGHT, DARK_BORDER, DARK_BG, DARK_FG, DARK_ACCENT, DARK_HIGHLIGHT, DARK_BORDER,
DARK_TEXT, DARK_BUTTON, DARK_BUTTON_HOVER, DARK_TEXT, DARK_BUTTON, DARK_BUTTON_HOVER,
DARK_TREEVIEW, DARK_TREEVIEW_ALT, DARK_TREEVIEW, DARK_TREEVIEW_ALT,
@@ -61,10 +59,8 @@ from .hardware import (
DataRecorder, DataRecorder,
STM32USBInterface, STM32USBInterface,
) )
from .processing import RadarProcessor, USBPacketParser, RawIQFrameProcessor from .processing import RadarProcessor, USBPacketParser
from .workers import RadarDataWorker, RawIQReplayWorker, GPSDataWorker, TargetSimulator from .workers import RadarDataWorker, GPSDataWorker, TargetSimulator
from .raw_iq_replay import RawIQReplayController, PlaybackState
from .agc_sim import AGCConfig
from .map_widget import RadarMapWidget from .map_widget import RadarMapWidget
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -85,69 +81,24 @@ def _make_dspin() -> QDoubleSpinBox:
return sb return sb
def _parse_waveform_from_filename(name: str) -> WaveformConfig | None:
"""Try to extract waveform params from ADI phaser filename convention.
Expected pattern fragments (order-independent):
``<N>MSPS`` or ``<N>MSps`` → sample rate in MHz
``<N>M`` (followed by _ or end) → bandwidth in MHz
``<N>u`` (followed by _ or end) → chirp duration in µs
Returns a WaveformConfig with parsed values (defaults for un-parsed),
or None if nothing recognisable was found.
"""
cfg = WaveformConfig() # ADI phaser defaults
found = False
# Sample rate: "4MSPS" or "4MSps"
m = re.search(r"(\d+)M[Ss][Pp][Ss]", name)
if m:
cfg.sample_rate_hz = float(m.group(1)) * 1e6
found = True
# Bandwidth: "500M" (must NOT be followed by S for MSPS)
m = re.search(r"(\d+)M(?![Ss])", name)
if m:
cfg.bandwidth_hz = float(m.group(1)) * 1e6
found = True
# Chirp duration: "300u"
m = re.search(r"(\d+)u", name)
if m:
cfg.chirp_duration_s = float(m.group(1)) * 1e-6
found = True
return cfg if found else None
# ============================================================================= # =============================================================================
# Range-Doppler Canvas (matplotlib) # Range-Doppler Canvas (matplotlib)
# ============================================================================= # =============================================================================
class RangeDopplerCanvas(FigureCanvasQTAgg): class RangeDopplerCanvas(FigureCanvasQTAgg):
"""Matplotlib canvas showing a Range-Doppler map with dark theme. """Matplotlib canvas showing the 64x32 Range-Doppler map with dark theme."""
Adapts dynamically to incoming frame dimensions (e.g. 64x32 from FPGA,
or different sizes from Raw IQ Replay).
"""
def __init__(self, _parent=None): def __init__(self, _parent=None):
fig = Figure(figsize=(10, 6), facecolor=DARK_BG) fig = Figure(figsize=(10, 6), facecolor=DARK_BG)
self.ax = fig.add_subplot(111, facecolor=DARK_ACCENT) self.ax = fig.add_subplot(111, facecolor=DARK_ACCENT)
# Initial backing data — will resize on first update_map call self._data = np.zeros((NUM_RANGE_BINS, NUM_DOPPLER_BINS))
self._n_range = NUM_RANGE_BINS
self._n_doppler = NUM_DOPPLER_BINS
self._data = np.zeros((self._n_range, self._n_doppler))
self.im = self.ax.imshow( self.im = self.ax.imshow(
self._data, aspect="auto", cmap="hot", self._data, aspect="auto", cmap="hot",
extent=[0, self._n_doppler, 0, self._n_range], origin="lower", extent=[0, NUM_DOPPLER_BINS, 0, NUM_RANGE_BINS], origin="lower",
) )
self.ax.set_title( self.ax.set_title("Range-Doppler Map (64x32)", color=DARK_FG)
f"Range-Doppler Map ({self._n_range}x{self._n_doppler})",
color=DARK_FG,
)
self.ax.set_xlabel("Doppler Bin", color=DARK_FG) self.ax.set_xlabel("Doppler Bin", color=DARK_FG)
self.ax.set_ylabel("Range Bin", color=DARK_FG) self.ax.set_ylabel("Range Bin", color=DARK_FG)
self.ax.tick_params(colors=DARK_FG) self.ax.tick_params(colors=DARK_FG)
@@ -158,20 +109,7 @@ class RangeDopplerCanvas(FigureCanvasQTAgg):
super().__init__(fig) super().__init__(fig)
def update_map(self, magnitude: np.ndarray, _detections: np.ndarray = None): def update_map(self, magnitude: np.ndarray, _detections: np.ndarray = None):
"""Update the heatmap with new magnitude data. """Update the heatmap with new magnitude data."""
Automatically resizes the canvas if the incoming shape differs from
the current backing array.
"""
nr, nd = magnitude.shape
if nr != self._n_range or nd != self._n_doppler:
self._n_range = nr
self._n_doppler = nd
self._data = np.zeros((nr, nd))
self.im.set_extent([0, nd, 0, nr])
self.ax.set_title(
f"Range-Doppler Map ({nr}x{nd})", color=DARK_FG)
display = np.log10(magnitude + 1) display = np.log10(magnitude + 1)
self.im.set_data(display) self.im.set_data(display)
self.im.set_clim(vmin=display.min(), vmax=max(display.max(), 0.1)) self.im.set_clim(vmin=display.min(), vmax=max(display.max(), 0.1))
@@ -215,15 +153,9 @@ class RadarDashboard(QMainWindow):
self._gps_worker: GPSDataWorker | None = None self._gps_worker: GPSDataWorker | None = None
self._simulator: TargetSimulator | None = None self._simulator: TargetSimulator | None = None
# Raw IQ Replay
self._replay_controller: RawIQReplayController | None = None
self._replay_worker: RawIQReplayWorker | None = None
self._iq_processor: RawIQFrameProcessor | None = None
# State # State
self._running = False self._running = False
self._demo_mode = False self._demo_mode = False
self._replay_status_override: str | None = None # "playing"/"paused"
self._start_time = time.time() self._start_time = time.time()
self._current_frame: RadarFrame | None = None self._current_frame: RadarFrame | None = None
self._last_status: StatusResponse | None = None self._last_status: StatusResponse | None = None
@@ -420,8 +352,7 @@ class RadarDashboard(QMainWindow):
# Row 0: connection mode + device combos + buttons # Row 0: connection mode + device combos + buttons
ctrl_layout.addWidget(QLabel("Mode:"), 0, 0) ctrl_layout.addWidget(QLabel("Mode:"), 0, 0)
self._mode_combo = QComboBox() self._mode_combo = QComboBox()
self._mode_combo.addItems([ self._mode_combo.addItems(["Mock", "Live FT2232H", "Replay (.npy)"])
"Mock", "Live FT2232H", "Replay (.npy)", "Raw IQ Replay (.npy)"])
self._mode_combo.setCurrentIndex(0) self._mode_combo.setCurrentIndex(0)
ctrl_layout.addWidget(self._mode_combo, 0, 1) ctrl_layout.addWidget(self._mode_combo, 0, 1)
@@ -470,104 +401,6 @@ class RadarDashboard(QMainWindow):
self._status_label_main.setAlignment(Qt.AlignmentFlag.AlignRight) self._status_label_main.setAlignment(Qt.AlignmentFlag.AlignRight)
ctrl_layout.addWidget(self._status_label_main, 1, 5, 1, 5) ctrl_layout.addWidget(self._status_label_main, 1, 5, 1, 5)
# Row 2: Raw IQ playback controls (hidden until Raw IQ mode active)
self._playback_frame = QFrame()
self._playback_frame.setStyleSheet(
f"background-color: {DARK_HIGHLIGHT}; border-radius: 4px;")
pb_layout = QHBoxLayout(self._playback_frame)
pb_layout.setContentsMargins(8, 4, 8, 4)
self._pb_play_btn = QPushButton("Play")
self._pb_play_btn.setStyleSheet(
f"QPushButton {{ background-color: {DARK_SUCCESS}; color: white; }}")
self._pb_play_btn.clicked.connect(self._pb_play_pause)
pb_layout.addWidget(self._pb_play_btn)
self._pb_step_btn = QPushButton("Step")
self._pb_step_btn.clicked.connect(self._pb_step)
pb_layout.addWidget(self._pb_step_btn)
self._pb_stop_btn = QPushButton("Stop")
self._pb_stop_btn.setStyleSheet(
f"QPushButton {{ background-color: {DARK_ERROR}; color: white; }}")
self._pb_stop_btn.clicked.connect(self._stop_radar)
pb_layout.addWidget(self._pb_stop_btn)
pb_layout.addWidget(QLabel("FPS:"))
self._pb_fps_spin = _make_dspin()
self._pb_fps_spin.setRange(0.1, 60.0)
self._pb_fps_spin.setValue(10.0)
self._pb_fps_spin.setSingleStep(1.0)
self._pb_fps_spin.valueChanged.connect(self._pb_fps_changed)
pb_layout.addWidget(self._pb_fps_spin)
self._pb_loop_check = QCheckBox("Loop")
self._pb_loop_check.setChecked(True)
self._pb_loop_check.toggled.connect(self._pb_loop_changed)
pb_layout.addWidget(self._pb_loop_check)
self._pb_frame_label = QLabel("Frame: 0 / 0")
self._pb_frame_label.setStyleSheet(
f"color: {DARK_INFO}; font-weight: bold;")
pb_layout.addWidget(self._pb_frame_label)
self._pb_file_label = QLabel("")
self._pb_file_label.setStyleSheet(f"color: {DARK_TEXT}; font-size: 10px;")
pb_layout.addWidget(self._pb_file_label)
pb_layout.addStretch()
self._playback_frame.setVisible(False)
ctrl_layout.addWidget(self._playback_frame, 2, 0, 1, 10)
# -- Waveform config row (Raw IQ replay only) -----------------------
self._waveform_frame = QFrame()
self._waveform_frame.setStyleSheet(
f"background-color: {DARK_ACCENT}; border-radius: 4px;")
wf_layout = QHBoxLayout(self._waveform_frame)
wf_layout.setContentsMargins(8, 4, 8, 4)
wf_layout.addWidget(QLabel("Waveform:"))
wf_layout.addWidget(QLabel("fs (MHz):"))
self._wf_fs_spin = _make_dspin()
self._wf_fs_spin.setRange(0.1, 100.0)
self._wf_fs_spin.setValue(4.0)
self._wf_fs_spin.setDecimals(2)
self._wf_fs_spin.setToolTip("ADC sample rate in MHz")
wf_layout.addWidget(self._wf_fs_spin)
wf_layout.addWidget(QLabel("BW (MHz):"))
self._wf_bw_spin = _make_dspin()
self._wf_bw_spin.setRange(1.0, 5000.0)
self._wf_bw_spin.setValue(500.0)
self._wf_bw_spin.setDecimals(1)
self._wf_bw_spin.setToolTip("Chirp bandwidth in MHz")
wf_layout.addWidget(self._wf_bw_spin)
wf_layout.addWidget(QLabel("T (us):"))
self._wf_chirp_spin = _make_dspin()
self._wf_chirp_spin.setRange(1.0, 10000.0)
self._wf_chirp_spin.setValue(300.0)
self._wf_chirp_spin.setDecimals(1)
self._wf_chirp_spin.setToolTip("Chirp duration in microseconds")
wf_layout.addWidget(self._wf_chirp_spin)
wf_layout.addWidget(QLabel("fc (GHz):"))
self._wf_fc_spin = _make_dspin()
self._wf_fc_spin.setRange(0.1, 100.0)
self._wf_fc_spin.setValue(10.0)
self._wf_fc_spin.setDecimals(2)
self._wf_fc_spin.setToolTip("Carrier frequency in GHz")
wf_layout.addWidget(self._wf_fc_spin)
self._wf_res_label = QLabel("")
self._wf_res_label.setStyleSheet(f"color: {DARK_INFO}; font-size: 10px;")
wf_layout.addWidget(self._wf_res_label)
wf_layout.addStretch()
self._waveform_frame.setVisible(False)
ctrl_layout.addWidget(self._waveform_frame, 3, 0, 1, 10)
layout.addWidget(ctrl) layout.addWidget(ctrl)
# ---- Display area (range-doppler + targets table) ------------------ # ---- Display area (range-doppler + targets table) ------------------
@@ -648,22 +481,12 @@ class RadarDashboard(QMainWindow):
self._alt_spin.setValue(0.0) self._alt_spin.setValue(0.0)
self._alt_spin.setSuffix(" m") self._alt_spin.setSuffix(" m")
self._heading_spin = _make_dspin()
self._heading_spin.setRange(0, 360)
self._heading_spin.setDecimals(1)
self._heading_spin.setValue(0.0)
self._heading_spin.setSuffix("\u00b0")
self._heading_spin.setWrapping(True)
self._heading_spin.valueChanged.connect(self._on_position_changed)
pos_layout.addWidget(QLabel("Latitude:"), 0, 0) pos_layout.addWidget(QLabel("Latitude:"), 0, 0)
pos_layout.addWidget(self._lat_spin, 0, 1) pos_layout.addWidget(self._lat_spin, 0, 1)
pos_layout.addWidget(QLabel("Longitude:"), 1, 0) pos_layout.addWidget(QLabel("Longitude:"), 1, 0)
pos_layout.addWidget(self._lon_spin, 1, 1) pos_layout.addWidget(self._lon_spin, 1, 1)
pos_layout.addWidget(QLabel("Altitude:"), 2, 0) pos_layout.addWidget(QLabel("Altitude:"), 2, 0)
pos_layout.addWidget(self._alt_spin, 2, 1) pos_layout.addWidget(self._alt_spin, 2, 1)
pos_layout.addWidget(QLabel("Heading:"), 3, 0)
pos_layout.addWidget(self._heading_spin, 3, 1)
sb_layout.addWidget(pos_group) sb_layout.addWidget(pos_group)
@@ -1386,10 +1209,6 @@ class RadarDashboard(QMainWindow):
try: try:
mode = self._mode_combo.currentText() mode = self._mode_combo.currentText()
if "Raw IQ" in mode:
self._start_raw_iq_replay()
return
if "Mock" in mode: if "Mock" in mode:
self._connection = FT2232HConnection(mock=True) self._connection = FT2232HConnection(mock=True)
if not self._connection.open(): if not self._connection.open():
@@ -1429,7 +1248,6 @@ class RadarDashboard(QMainWindow):
self._radar_worker.targetsUpdated.connect(self._on_radar_targets) self._radar_worker.targetsUpdated.connect(self._on_radar_targets)
self._radar_worker.statsUpdated.connect(self._on_radar_stats) self._radar_worker.statsUpdated.connect(self._on_radar_stats)
self._radar_worker.errorOccurred.connect(self._on_worker_error) self._radar_worker.errorOccurred.connect(self._on_worker_error)
self._radar_worker.finished.connect(self._on_worker_finished)
self._radar_worker.start() self._radar_worker.start()
# Optionally start GPS worker # Optionally start GPS worker
@@ -1464,32 +1282,12 @@ class RadarDashboard(QMainWindow):
def _stop_radar(self): def _stop_radar(self):
self._running = False self._running = False
self._replay_status_override = None
# Stop demo simulator if active (prevents cross-mode interference)
if self._simulator:
self._simulator.stop()
self._simulator = None
self._demo_mode = False
self._demo_btn_main.setText("Start Demo")
self._demo_btn_map.setText("Start Demo")
self._demo_btn_map.setChecked(False)
if self._radar_worker: if self._radar_worker:
self._radar_worker.stop() self._radar_worker.stop()
self._radar_worker.wait(2000) self._radar_worker.wait(2000)
self._radar_worker = None self._radar_worker = None
# Raw IQ Replay cleanup
if self._replay_controller is not None:
self._replay_controller.stop()
if self._replay_worker is not None:
self._replay_worker.stop()
self._replay_worker.wait(2000)
self._replay_worker = None
self._replay_controller = None
self._iq_processor = None
if self._gps_worker: if self._gps_worker:
self._gps_worker.stop() self._gps_worker.stop()
self._gps_worker.wait(2000) self._gps_worker.wait(2000)
@@ -1506,237 +1304,11 @@ class RadarDashboard(QMainWindow):
self._mode_combo.setEnabled(True) self._mode_combo.setEnabled(True)
self._demo_btn_main.setEnabled(True) self._demo_btn_main.setEnabled(True)
self._demo_btn_map.setEnabled(True) self._demo_btn_map.setEnabled(True)
self._playback_frame.setVisible(False)
self._waveform_frame.setVisible(False)
self._pb_play_btn.setText("Play")
self._pb_frame_label.setText("Frame: 0 / 0")
self._pb_file_label.setText("")
self._status_label_main.setText("Status: Radar stopped") self._status_label_main.setText("Status: Radar stopped")
self._sb_status.setText("Radar stopped") self._sb_status.setText("Radar stopped")
self._sb_mode.setText("Idle") self._sb_mode.setText("Idle")
logger.info("Radar system stopped") logger.info("Radar system stopped")
# =====================================================================
# Raw IQ Replay
# =====================================================================
def _start_raw_iq_replay(self):
"""Start raw IQ replay mode: load .npy file and begin playback."""
from PyQt6.QtWidgets import QFileDialog
npy_path, _ = QFileDialog.getOpenFileName(
self, "Select Raw IQ .npy file", "",
"NumPy files (*.npy);;All files (*)")
if not npy_path:
return
try:
# Create controller and load file
self._replay_controller = RawIQReplayController()
info = self._replay_controller.load_file(npy_path)
# -- Waveform calibration: try to parse from filename -----------
parsed_wf = _parse_waveform_from_filename(Path(npy_path).name)
if parsed_wf is not None:
self._wf_fs_spin.setValue(parsed_wf.sample_rate_hz / 1e6)
self._wf_bw_spin.setValue(parsed_wf.bandwidth_hz / 1e6)
self._wf_chirp_spin.setValue(parsed_wf.chirp_duration_s / 1e-6)
self._wf_fc_spin.setValue(parsed_wf.center_freq_hz / 1e9)
logger.info("Waveform params parsed from filename: %s", parsed_wf)
# Build waveform config from (possibly updated) spinboxes
wfc = self._waveform_config_from_ui()
range_res = wfc.range_resolution(info.n_samples)
vel_res = wfc.velocity_resolution(info.n_samples, info.n_chirps)
n_range_out = min(64, info.n_samples)
max_range = range_res * n_range_out
# Create replay-specific RadarSettings with correct calibration
replay_settings = RadarSettings(
system_frequency=wfc.center_freq_hz,
range_resolution=range_res,
velocity_resolution=vel_res,
max_distance=max_range,
map_size=max_range * 1.2,
coverage_radius=max_range * 1.2,
)
logger.info(
"Replay calibration: range_res=%.4f m/bin, vel_res=%.4f m/s/bin, "
"max_range=%.1f m",
range_res, vel_res, max_range,
)
# Update coverage/map spinboxes to match replay scale
self._coverage_spin.setValue(replay_settings.coverage_radius / 1000)
self._update_waveform_res_label(info.n_samples, info.n_chirps)
# Create frame processor
self._iq_processor = RawIQFrameProcessor(
n_range_out=n_range_out,
n_doppler_out=min(32, info.n_chirps),
)
# Apply current AGC settings from FPGA Control tab
agc_enable = self._param_spins.get("0x28")
agc_target = self._param_spins.get("0x29")
agc_attack = self._param_spins.get("0x2A")
agc_decay = self._param_spins.get("0x2B")
agc_holdoff = self._param_spins.get("0x2C")
self._iq_processor.set_agc_config(AGCConfig(
enabled=bool(agc_enable.value()) if agc_enable else False,
target=agc_target.value() if agc_target else 200,
attack=agc_attack.value() if agc_attack else 1,
decay=agc_decay.value() if agc_decay else 1,
holdoff=agc_holdoff.value() if agc_holdoff else 4,
))
# Apply CFAR settings
cfar_en = self._param_spins.get("0x25")
cfar_guard = self._param_spins.get("0x21")
cfar_train = self._param_spins.get("0x22")
cfar_alpha = self._param_spins.get("0x23")
cfar_mode = self._param_spins.get("0x24")
self._iq_processor.set_cfar_params(
enabled=bool(cfar_en.value()) if cfar_en else False,
guard=cfar_guard.value() if cfar_guard else 2,
train=cfar_train.value() if cfar_train else 8,
alpha_q44=cfar_alpha.value() if cfar_alpha else 0x30,
mode=cfar_mode.value() if cfar_mode else 0,
)
# Apply MTI / DC notch
mti_en = self._param_spins.get("0x26")
dc_notch = self._param_spins.get("0x27")
self._iq_processor.set_mti_enabled(
bool(mti_en.value()) if mti_en else False)
self._iq_processor.set_dc_notch_width(
dc_notch.value() if dc_notch else 0)
# Threshold
thresh = self._param_spins.get("0x03")
self._iq_processor.set_detect_threshold(
thresh.value() if thresh else 10000)
# Create worker
self._replay_worker = RawIQReplayWorker(
controller=self._replay_controller,
processor=self._iq_processor,
host_processor=self._processor,
settings=replay_settings,
gps_data_ref=self._radar_position,
)
self._replay_worker.frameReady.connect(self._on_frame_ready)
self._replay_worker.statusReceived.connect(self._on_status_received)
self._replay_worker.targetsUpdated.connect(self._on_radar_targets)
self._replay_worker.statsUpdated.connect(self._on_radar_stats)
self._replay_worker.errorOccurred.connect(self._on_worker_error)
self._replay_worker.finished.connect(self._on_worker_finished)
self._replay_worker.playbackStateChanged.connect(
self._on_playback_state_changed)
self._replay_worker.frameIndexChanged.connect(
self._on_frame_index_changed)
# Start worker (paused initially)
self._replay_worker.start()
# UI state
self._running = True
self._replay_status_override = "paused"
self._start_time = time.time()
self._frame_count = 0
self._start_btn.setEnabled(False)
self._stop_btn.setEnabled(True)
self._mode_combo.setEnabled(False)
self._demo_btn_main.setEnabled(False)
self._demo_btn_map.setEnabled(False)
self._playback_frame.setVisible(True)
self._waveform_frame.setVisible(True)
self._pb_frame_label.setText(f"Frame: 0 / {info.n_frames}")
self._pb_file_label.setText(
f"{Path(npy_path).name} "
f"({info.n_chirps}x{info.n_samples}, "
f"{info.file_size_mb:.1f} MB)")
self._status_label_main.setText("Status: Raw IQ Replay (paused)")
self._sb_status.setText("Raw IQ Replay")
self._sb_mode.setText("Raw IQ Replay")
logger.info(f"Raw IQ Replay started: {npy_path}")
except (ValueError, OSError) as e:
# Clean up any partially-created objects
if self._replay_worker is not None:
self._replay_worker.stop()
self._replay_worker.wait(1000)
self._replay_worker = None
self._replay_controller = None
self._iq_processor = None
QMessageBox.critical(self, "Error",
f"Failed to load raw IQ file:\n{e}")
logger.error(f"Raw IQ load error: {e}")
# ---- Playback control slots --------------------------------------------
def _pb_play_pause(self):
"""Toggle play/pause for raw IQ replay."""
if self._replay_controller is None:
return
state = self._replay_controller.state
if state == PlaybackState.PLAYING:
self._replay_controller.pause()
self._pb_play_btn.setText("Play")
else:
self._replay_controller.play()
self._pb_play_btn.setText("Pause")
def _pb_step(self):
"""Step one frame forward in raw IQ replay."""
if self._replay_controller is not None:
self._replay_controller.step_forward()
def _pb_fps_changed(self, value: float):
if self._replay_controller is not None:
self._replay_controller.set_fps(value)
def _pb_loop_changed(self, checked: bool):
if self._replay_controller is not None:
self._replay_controller.set_loop(checked)
def _waveform_config_from_ui(self) -> WaveformConfig:
"""Build a WaveformConfig from the waveform spinboxes."""
return WaveformConfig(
sample_rate_hz=self._wf_fs_spin.value() * 1e6,
bandwidth_hz=self._wf_bw_spin.value() * 1e6,
chirp_duration_s=self._wf_chirp_spin.value() * 1e-6,
center_freq_hz=self._wf_fc_spin.value() * 1e9,
)
def _update_waveform_res_label(self, n_samples: int, n_chirps: int) -> None:
"""Update the waveform resolution info label."""
wfc = self._waveform_config_from_ui()
r_res = wfc.range_resolution(n_samples)
v_res = wfc.velocity_resolution(n_samples, n_chirps)
n_r = min(64, n_samples)
max_r = r_res * n_r
self._wf_res_label.setText(
f"Range: {r_res:.3f} m/bin | Vel: {v_res:.3f} m/s/bin | "
f"Max range: {max_r:.1f} m ({n_r} bins)"
)
@pyqtSlot(str)
def _on_playback_state_changed(self, state_str: str):
if state_str == "playing":
self._pb_play_btn.setText("Pause")
self._replay_status_override = "playing"
elif state_str == "paused":
self._pb_play_btn.setText("Play")
self._replay_status_override = "paused"
elif state_str == "stopped":
self._replay_status_override = None
self._stop_radar()
@pyqtSlot(int, int)
def _on_frame_index_changed(self, current: int, total: int):
self._pb_frame_label.setText(f"Frame: {current} / {total}")
# ===================================================================== # =====================================================================
# Demo mode # Demo mode
# ===================================================================== # =====================================================================
@@ -1752,7 +1324,6 @@ class RadarDashboard(QMainWindow):
self._simulator.targetsUpdated.connect(self._on_demo_targets) self._simulator.targetsUpdated.connect(self._on_demo_targets)
self._simulator.start(500) self._simulator.start(500)
self._demo_mode = True self._demo_mode = True
if not self._running:
self._sb_mode.setText("Demo Mode") self._sb_mode.setText("Demo Mode")
self._sb_status.setText("Demo mode active") self._sb_status.setText("Demo mode active")
self._demo_btn_main.setText("Stop Demo") self._demo_btn_main.setText("Stop Demo")
@@ -1767,17 +1338,11 @@ class RadarDashboard(QMainWindow):
self._demo_mode = False self._demo_mode = False
if not self._running: if not self._running:
mode = "Idle" mode = "Idle"
elif self._replay_controller is not None:
mode = "Raw IQ Replay"
elif isinstance(self._connection, ReplayConnection): elif isinstance(self._connection, ReplayConnection):
mode = "Replay" mode = "Replay"
else: else:
# Use mode combo text for Mock vs Live distinction
mode = self._mode_combo.currentText()
if "Mock" not in mode and "Live" not in mode:
mode = "Live" mode = "Live"
self._sb_mode.setText(mode) self._sb_mode.setText(mode)
if not self._running:
self._sb_status.setText("Demo stopped") self._sb_status.setText("Demo stopped")
self._demo_btn_main.setText("Start Demo") self._demo_btn_main.setText("Start Demo")
self._demo_btn_map.setText("Start Demo") self._demo_btn_map.setText("Start Demo")
@@ -1830,12 +1395,6 @@ class RadarDashboard(QMainWindow):
def _on_worker_error(self, msg: str): def _on_worker_error(self, msg: str):
logger.error(f"Worker error: {msg}") logger.error(f"Worker error: {msg}")
def _on_worker_finished(self):
"""Handle unexpected worker thread exit — recover UI to stopped state."""
if self._running:
logger.warning("Worker thread exited unexpectedly, resetting UI")
self._stop_radar()
@pyqtSlot(object) @pyqtSlot(object)
def _on_gps_received(self, gps: GPSData): def _on_gps_received(self, gps: GPSData):
self._gps_packet_count += 1 self._gps_packet_count += 1
@@ -2016,7 +1575,6 @@ class RadarDashboard(QMainWindow):
self._radar_position.latitude = self._lat_spin.value() self._radar_position.latitude = self._lat_spin.value()
self._radar_position.longitude = self._lon_spin.value() self._radar_position.longitude = self._lon_spin.value()
self._radar_position.altitude = self._alt_spin.value() self._radar_position.altitude = self._alt_spin.value()
self._radar_position.heading = self._heading_spin.value()
self._map_widget.set_radar_position(self._radar_position) self._map_widget.set_radar_position(self._radar_position)
if self._simulator: if self._simulator:
self._simulator.set_radar_position(self._radar_position) self._simulator.set_radar_position(self._radar_position)
@@ -2091,13 +1649,6 @@ class RadarDashboard(QMainWindow):
if self._running: if self._running:
det = (self._current_frame.detection_count det = (self._current_frame.detection_count
if self._current_frame else 0) if self._current_frame else 0)
# Preserve replay-specific status (paused/playing)
if self._replay_status_override == "paused":
self._status_label_main.setText(
f"Status: Raw IQ Replay (paused) \u2014 "
f"Frames: {self._frame_count} \u2014 Detections: {det}"
)
else:
self._status_label_main.setText( self._status_label_main.setText(
f"Status: Running \u2014 Frames: {self._frame_count} " f"Status: Running \u2014 Frames: {self._frame_count} "
f"\u2014 Detections: {det}" f"\u2014 Detections: {det}"
@@ -2186,11 +1737,6 @@ class RadarDashboard(QMainWindow):
def closeEvent(self, event): def closeEvent(self, event):
if self._simulator: if self._simulator:
self._simulator.stop() self._simulator.stop()
if self._replay_controller is not None:
self._replay_controller.stop()
if self._replay_worker is not None:
self._replay_worker.stop()
self._replay_worker.wait(1000)
if self._radar_worker: if self._radar_worker:
self._radar_worker.stop() self._radar_worker.stop()
self._radar_worker.wait(1000) self._radar_worker.wait(1000)
-41
View File
@@ -96,47 +96,6 @@ class RadarTarget:
return asdict(self) return asdict(self)
@dataclass
class WaveformConfig:
"""FMCW waveform parameters for bin-to-physical-unit conversion.
Defaults are for the ADI CN0566 phaser (10 GHz, 500 MHz BW, 300 µs chirp,
4 MSPS ADC). For the PLFM FPGA waveform the values differ — but the FPGA
pipeline hardcodes its own bin widths, so this config is only used for
Raw IQ Replay (host-side FFT processing).
"""
sample_rate_hz: float = 4e6 # ADC sample rate (Hz)
bandwidth_hz: float = 500e6 # Chirp bandwidth (Hz)
chirp_duration_s: float = 300e-6 # Chirp sweep time (s)
center_freq_hz: float = 10e9 # Carrier frequency (Hz)
# --- derived quantities (need n_samples, n_chirps from file) ---
def range_resolution(self, n_samples: int) -> float:
"""Metres per range bin for an N-point range FFT.
range_per_bin = c · fs / (2 · N · slope)
where slope = BW / T_chirp.
"""
c = 299_792_458.0
slope = self.bandwidth_hz / self.chirp_duration_s
return c * self.sample_rate_hz / (2.0 * n_samples * slope)
def velocity_resolution(self, n_samples: int, n_chirps: int) -> float:
"""m/s per Doppler bin for an M-chirp Doppler FFT.
vel_per_bin = λ · fs / (2 · N · M)
where λ = c / fc, N = n_samples (PRI = N/fs), M = n_chirps.
"""
c = 299_792_458.0
wavelength = c / self.center_freq_hz
return wavelength * self.sample_rate_hz / (2.0 * n_samples * n_chirps)
def max_range(self, n_range_bins: int, n_samples: int) -> float:
"""Maximum unambiguous range for the given bin count."""
return self.range_resolution(n_samples) * n_range_bins
@dataclass @dataclass
class RadarSettings: class RadarSettings:
"""Radar system display/map configuration. """Radar system display/map configuration.
-371
View File
@@ -4,9 +4,6 @@ v7.processing — Radar signal processing and GPS parsing.
Classes: Classes:
- RadarProcessor — dual-CPI fusion, multi-PRF unwrap, DBSCAN clustering, - RadarProcessor — dual-CPI fusion, multi-PRF unwrap, DBSCAN clustering,
association, Kalman tracking association, Kalman tracking
- RawIQFrameProcessor — full signal chain for raw IQ replay:
quantize -> AGC -> Range FFT -> Doppler FFT ->
crop -> MTI -> DC notch -> mag -> CFAR
- USBPacketParser — parse GPS text/binary frames from STM32 CDC - USBPacketParser — parse GPS text/binary frames from STM32 CDC
Note: RadarPacketParser (old A5/C3 sync + CRC16 format) was removed. Note: RadarPacketParser (old A5/C3 sync + CRC16 format) was removed.
@@ -25,11 +22,6 @@ from .models import (
RadarTarget, GPSData, ProcessingConfig, RadarTarget, GPSData, ProcessingConfig,
SCIPY_AVAILABLE, SKLEARN_AVAILABLE, FILTERPY_AVAILABLE, SCIPY_AVAILABLE, SKLEARN_AVAILABLE, FILTERPY_AVAILABLE,
) )
from .agc_sim import (
AGCConfig, AGCState, AGCFrameResult,
quantize_iq, process_agc_frame,
)
from .hardware import RadarFrame, StatusResponse
if SKLEARN_AVAILABLE: if SKLEARN_AVAILABLE:
from sklearn.cluster import DBSCAN from sklearn.cluster import DBSCAN
@@ -56,103 +48,6 @@ def apply_pitch_correction(raw_elevation: float, pitch: float) -> float:
return raw_elevation - pitch return raw_elevation - pitch
# =============================================================================
# Utility: bin-to-physical target extraction (shared by all workers)
# =============================================================================
def extract_targets_from_frame(
frame: RadarFrame,
range_resolution: float,
velocity_resolution: float,
*,
gps: GPSData | None = None,
) -> list[RadarTarget]:
"""Extract RadarTargets from a RadarFrame's detection mask.
Performs bin-to-physical conversion and optional GPS coordinate mapping.
This is the shared implementation used by both RadarDataWorker (live mode)
and RawIQReplayWorker (replay mode).
Args:
frame: RadarFrame with populated ``detections`` and ``magnitude`` arrays.
range_resolution: Metres per range bin.
velocity_resolution: m/s per Doppler bin.
gps: Optional GPSData for pitch correction and geographic mapping.
Returns:
List of RadarTarget with physical-unit range, velocity, SNR, and
(if GPS available) lat/lon/azimuth/elevation.
"""
det_indices = np.argwhere(frame.detections > 0)
if len(det_indices) == 0:
return []
n_doppler = frame.magnitude.shape[1]
center_dbin = n_doppler // 2
targets: list[RadarTarget] = []
for idx in det_indices:
rbin, dbin = idx
mag = frame.magnitude[rbin, dbin]
snr = 10 * np.log10(max(mag, 1)) if mag > 0 else 0.0
range_m = float(rbin) * range_resolution
velocity_ms = float(dbin - center_dbin) * velocity_resolution
# GPS-dependent fields
raw_elev = 0.0
corr_elev = raw_elev
lat, lon, azimuth = 0.0, 0.0, 0.0
if gps is not None:
corr_elev = apply_pitch_correction(raw_elev, gps.pitch)
azimuth = gps.heading
lat, lon = _polar_to_geographic(
gps.latitude, gps.longitude, range_m, azimuth)
targets.append(RadarTarget(
id=len(targets),
range=range_m,
velocity=velocity_ms,
azimuth=azimuth,
elevation=corr_elev,
latitude=lat,
longitude=lon,
snr=snr,
timestamp=frame.timestamp,
))
return targets
def _polar_to_geographic(
radar_lat: float, radar_lon: float, range_m: float, bearing_deg: float,
) -> tuple[float, float]:
"""Convert polar (range, bearing) to geographic (lat, lon).
Uses the spherical-Earth approximation (adequate for <50 km ranges).
Duplicated from ``workers.polar_to_geographic`` to keep processing.py
self-contained; the workers module still exports its own copy for
backward-compat.
"""
if range_m <= 0:
return radar_lat, radar_lon
earth_r = 6_371_000.0
lat_r = math.radians(radar_lat)
lon_r = math.radians(radar_lon)
brg_r = math.radians(bearing_deg)
d_r = range_m / earth_r
new_lat = math.asin(
math.sin(lat_r) * math.cos(d_r)
+ math.cos(lat_r) * math.sin(d_r) * math.cos(brg_r)
)
new_lon = lon_r + math.atan2(
math.sin(brg_r) * math.sin(d_r) * math.cos(lat_r),
math.cos(d_r) - math.sin(lat_r) * math.sin(new_lat),
)
return math.degrees(new_lat), math.degrees(new_lon)
# ============================================================================= # =============================================================================
# Radar Processor — signal-level processing & tracking pipeline # Radar Processor — signal-level processing & tracking pipeline
# ============================================================================= # =============================================================================
@@ -556,269 +451,3 @@ class USBPacketParser:
except (ValueError, struct.error) as e: except (ValueError, struct.error) as e:
logger.error(f"Error parsing binary GPS: {e}") logger.error(f"Error parsing binary GPS: {e}")
return None return None
# =============================================================================
# Raw IQ Frame Processor — full signal chain for replay mode
# =============================================================================
class RawIQFrameProcessor:
"""Process raw complex IQ frames through the full radar signal chain.
This replicates the FPGA processing pipeline in software so that
raw ADI CN0566 captures (or similar) can be visualised in the V7
dashboard exactly as they would appear from the FPGA.
Pipeline per frame:
1. Quantize raw complex → 16-bit signed I/Q
2. AGC gain application (bit-accurate to rx_gain_control.v)
3. Range FFT (across samples)
4. Doppler FFT (across chirps) + fftshift + centre crop
5. Optional MTI (2-pulse canceller using history)
6. Optional DC notch (zero-Doppler removal)
7. Magnitude (|I| + |Q| approximation matching FPGA, or true |.|)
8. CFAR or simple threshold detection
9. Build RadarFrame + synthetic StatusResponse
"""
def __init__(
self,
n_range_out: int = 64,
n_doppler_out: int = 32,
):
self._n_range = n_range_out
self._n_doppler = n_doppler_out
# AGC state (persists across frames)
self._agc_config = AGCConfig()
self._agc_state = AGCState()
# MTI history buffer (stores previous Range-Doppler maps)
self._mti_history: list[np.ndarray] = []
self._mti_enabled: bool = False
# DC notch
self._dc_notch_width: int = 0
# CFAR / threshold config
self._cfar_enabled: bool = False
self._cfar_guard: int = 2
self._cfar_train: int = 8
self._cfar_alpha_q44: int = 0x30 # Q4.4 → 3.0
self._cfar_mode: int = 0 # 0=CA, 1=GO, 2=SO
self._detect_threshold: int = 10000
# Frame counter
self._frame_number: int = 0
# Host-side processing (windowing, clustering, etc.)
self._host_processor = RadarProcessor()
# ---- Configuration setters ---------------------------------------------
def set_agc_config(self, config: AGCConfig) -> None:
self._agc_config = config
def reset_agc_state(self) -> None:
"""Reset AGC state (e.g. on seek)."""
self._agc_state = AGCState()
self._mti_history.clear()
def set_mti_enabled(self, enabled: bool) -> None:
if self._mti_enabled != enabled:
self._mti_history.clear()
self._mti_enabled = enabled
def set_dc_notch_width(self, width: int) -> None:
self._dc_notch_width = max(0, min(7, width))
def set_cfar_params(
self,
enabled: bool,
guard: int = 2,
train: int = 8,
alpha_q44: int = 0x30,
mode: int = 0,
) -> None:
self._cfar_enabled = enabled
self._cfar_guard = guard
self._cfar_train = train
self._cfar_alpha_q44 = alpha_q44
self._cfar_mode = mode
def set_detect_threshold(self, threshold: int) -> None:
self._detect_threshold = threshold
@property
def agc_state(self) -> AGCState:
return self._agc_state
@property
def agc_config(self) -> AGCConfig:
return self._agc_config
@property
def frame_number(self) -> int:
return self._frame_number
# ---- Main processing entry point ---------------------------------------
def process_frame(
self,
raw_frame: np.ndarray,
timestamp: float = 0.0,
) -> tuple[RadarFrame, StatusResponse, AGCFrameResult]:
"""Process one raw IQ frame through the full chain.
Parameters
----------
raw_frame : complex array, shape (n_chirps, n_samples)
timestamp : frame timestamp for RadarFrame
Returns
-------
(radar_frame, status_response, agc_result)
"""
n_chirps, _n_samples = raw_frame.shape
self._frame_number += 1
# 1. Quantize to 16-bit signed IQ
frame_i, frame_q = quantize_iq(raw_frame)
# 2. AGC
agc_result = process_agc_frame(
frame_i, frame_q, self._agc_config, self._agc_state)
# Use AGC-shifted IQ for downstream processing
iq = agc_result.shifted_i.astype(np.float64) + 1j * agc_result.shifted_q.astype(np.float64)
# 3. Range FFT (across samples axis)
range_fft = np.fft.fft(iq, axis=1)[:, :self._n_range]
# 4. Doppler FFT (across chirps axis) + fftshift + centre crop
doppler_fft = np.fft.fft(range_fft, axis=0)
doppler_fft = np.fft.fftshift(doppler_fft, axes=0)
# Centre-crop to n_doppler bins
center = n_chirps // 2
half_d = self._n_doppler // 2
start_d = max(0, center - half_d)
end_d = start_d + self._n_doppler
if end_d > n_chirps:
end_d = n_chirps
start_d = max(0, end_d - self._n_doppler)
rd_complex = doppler_fft[start_d:end_d, :]
# shape: (n_doppler, n_range) → transpose to (n_range, n_doppler)
rd_complex = rd_complex.T
# 5. Optional MTI (2-pulse canceller)
if self._mti_enabled:
rd_complex = self._apply_mti(rd_complex)
# 6. Optional DC notch (zero-Doppler bins)
if self._dc_notch_width > 0:
rd_complex = self._apply_dc_notch(rd_complex)
# Extract I/Q for RadarFrame
rd_i = np.round(rd_complex.real).astype(np.int16)
rd_q = np.round(rd_complex.imag).astype(np.int16)
# 7. Magnitude (FPGA uses |I|+|Q| approximation)
magnitude = np.abs(rd_complex.real) + np.abs(rd_complex.imag)
# Range profile (sum across Doppler)
range_profile = np.sum(magnitude, axis=1)
# 8. Detection (CFAR or simple threshold)
if self._cfar_enabled:
detections = self._run_cfar(magnitude)
else:
detections = self._run_threshold(magnitude)
detection_count = int(np.sum(detections > 0))
# 9. Build RadarFrame
radar_frame = RadarFrame(
timestamp=timestamp,
range_doppler_i=rd_i,
range_doppler_q=rd_q,
magnitude=magnitude,
detections=detections,
range_profile=range_profile,
detection_count=detection_count,
frame_number=self._frame_number,
)
# 10. Build synthetic StatusResponse
status = self._build_status(agc_result)
return radar_frame, status, agc_result
# ---- Internal helpers --------------------------------------------------
def _apply_mti(self, rd: np.ndarray) -> np.ndarray:
"""2-pulse MTI canceller: y[n] = x[n] - x[n-1]."""
self._mti_history.append(rd.copy())
if len(self._mti_history) > 2:
self._mti_history = self._mti_history[-2:]
if len(self._mti_history) < 2:
return np.zeros_like(rd) # suppress first frame
return self._mti_history[-1] - self._mti_history[-2]
def _apply_dc_notch(self, rd: np.ndarray) -> np.ndarray:
"""Zero out centre Doppler bins (DC notch)."""
n_doppler = rd.shape[1]
center = n_doppler // 2
w = self._dc_notch_width
lo = max(0, center - w)
hi = min(n_doppler, center + w + 1)
result = rd.copy()
result[:, lo:hi] = 0
return result
def _run_cfar(self, magnitude: np.ndarray) -> np.ndarray:
"""Run 1-D CFAR along each range bin (Doppler direction).
Uses the host-side CFAR from RadarProcessor with alpha converted
from Q4.4 to float.
"""
alpha_float = self._cfar_alpha_q44 / 16.0
cfar_types = {0: "CA-CFAR", 1: "GO-CFAR", 2: "SO-CFAR"}
cfar_type = cfar_types.get(self._cfar_mode, "CA-CFAR")
power = magnitude ** 2
power = np.maximum(power, 1e-20)
mask = np.zeros_like(magnitude, dtype=np.uint8)
for r in range(magnitude.shape[0]):
row = power[r, :]
if row.max() > 0:
det = RadarProcessor.cfar_1d(
row, self._cfar_guard, self._cfar_train,
alpha_float, cfar_type)
mask[r, :] = det.astype(np.uint8)
return mask
def _run_threshold(self, magnitude: np.ndarray) -> np.ndarray:
"""Simple threshold detection (matches FPGA detect_threshold)."""
return (magnitude > self._detect_threshold).astype(np.uint8)
def _build_status(self, agc_result: AGCFrameResult) -> StatusResponse:
"""Build a synthetic StatusResponse from current processor state."""
return StatusResponse(
radar_mode=1, # active
stream_ctrl=0b111,
cfar_threshold=self._detect_threshold,
long_chirp=3000,
long_listen=13700,
guard=17540,
short_chirp=50,
short_listen=17450,
chirps_per_elev=32,
range_mode=0,
agc_current_gain=agc_result.gain_enc,
agc_peak_magnitude=agc_result.peak_mag_8bit,
agc_saturation_count=agc_result.saturation_count,
agc_enable=1 if self._agc_config.enabled else 0,
)
-264
View File
@@ -1,264 +0,0 @@
"""
v7.raw_iq_replay -- Raw IQ replay controller for the V7 dashboard.
Manages loading of raw complex IQ .npy captures, playback state
(play/pause/step/speed/loop), and delivers frames to a worker thread.
The controller is thread-safe: the worker calls ``next_frame()`` which
blocks until a frame is available or playback is stopped.
Supported file formats:
- 3-D .npy: (n_frames, n_chirps, n_samples) complex
- 2-D .npy: (n_chirps, n_samples) complex -> treated as single frame
Classes:
- RawIQReplayController -- playback state machine + frame delivery
- RawIQFileInfo -- metadata about the loaded file
"""
from __future__ import annotations
import logging
import threading
from dataclasses import dataclass
from enum import Enum, auto
from pathlib import Path
import numpy as np
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Playback state enum
# ---------------------------------------------------------------------------
class PlaybackState(Enum):
"""Playback state for the replay controller."""
STOPPED = auto()
PLAYING = auto()
PAUSED = auto()
# ---------------------------------------------------------------------------
# File metadata
# ---------------------------------------------------------------------------
@dataclass
class RawIQFileInfo:
"""Metadata about a loaded raw IQ .npy file."""
path: str
n_frames: int
n_chirps: int
n_samples: int
dtype: str
file_size_mb: float
# ---------------------------------------------------------------------------
# Replay Controller
# ---------------------------------------------------------------------------
class RawIQReplayController:
"""Manages raw IQ file loading and playback state.
Thread-safety: the controller uses a condition variable so the worker
thread can block on ``next_frame()`` waiting for play/step events,
while the GUI thread calls ``play()``, ``pause()``, ``step()``, etc.
"""
def __init__(self) -> None:
self._data: np.ndarray | None = None
self._info: RawIQFileInfo | None = None
# Playback state
self._state = PlaybackState.STOPPED
self._frame_index: int = 0
self._fps: float = 10.0 # target frames per second
self._loop: bool = True
# Thread synchronisation
self._lock = threading.Lock()
self._cond = threading.Condition(self._lock)
# Step request flag (set by GUI, consumed by worker)
self._step_requested: bool = False
# Stop signal
self._stop_requested: bool = False
# ---- File loading ------------------------------------------------------
def load_file(self, path: str) -> RawIQFileInfo:
"""Load a .npy file containing raw IQ data.
Raises ValueError if the file is not a valid raw IQ capture.
"""
p = Path(path)
if not p.exists():
msg = f"File not found: {path}"
raise ValueError(msg)
if p.suffix.lower() != ".npy":
msg = f"Expected .npy file, got: {p.suffix}"
raise ValueError(msg)
# Memory-map for large files
data = np.load(str(p), mmap_mode="r")
if not np.iscomplexobj(data):
msg = f"Expected complex data, got dtype={data.dtype}"
raise ValueError(msg)
# Normalise shape
if data.ndim == 2:
# Single frame: (chirps, samples) -> (1, chirps, samples)
data = data[np.newaxis, :, :]
elif data.ndim == 3:
pass # (frames, chirps, samples) — expected
else:
msg = f"Expected 2-D or 3-D array, got {data.ndim}-D"
raise ValueError(msg)
with self._lock:
self._data = data
self._frame_index = 0
self._state = PlaybackState.PAUSED
self._stop_requested = False
self._info = RawIQFileInfo(
path=str(p),
n_frames=data.shape[0],
n_chirps=data.shape[1],
n_samples=data.shape[2],
dtype=str(data.dtype),
file_size_mb=p.stat().st_size / (1024 * 1024),
)
logger.info(
f"Loaded raw IQ: {p.name}{self._info.n_frames} frames, "
f"{self._info.n_chirps} chirps, {self._info.n_samples} samples, "
f"{self._info.file_size_mb:.1f} MB"
)
return self._info
def unload(self) -> None:
"""Unload the current file and stop playback."""
with self._lock:
self._state = PlaybackState.STOPPED
self._stop_requested = True
self._data = None
self._info = None
self._frame_index = 0
self._cond.notify_all()
# ---- Playback control (called from GUI thread) -------------------------
def play(self) -> None:
with self._lock:
if self._data is None:
return
self._state = PlaybackState.PLAYING
self._stop_requested = False
self._cond.notify_all()
def pause(self) -> None:
with self._lock:
if self._state == PlaybackState.PLAYING:
self._state = PlaybackState.PAUSED
def stop(self) -> None:
with self._lock:
self._state = PlaybackState.STOPPED
self._stop_requested = True
self._cond.notify_all()
def step_forward(self) -> None:
"""Advance one frame (works in PAUSED state)."""
with self._lock:
if self._data is None:
return
self._step_requested = True
self._cond.notify_all()
def seek(self, frame_index: int) -> None:
"""Jump to a specific frame index."""
with self._lock:
if self._data is None:
return
self._frame_index = max(0, min(frame_index, self._data.shape[0] - 1))
def set_fps(self, fps: float) -> None:
with self._lock:
self._fps = max(0.1, min(60.0, fps))
def set_loop(self, loop: bool) -> None:
with self._lock:
self._loop = loop
# ---- State queries (thread-safe) ---------------------------------------
@property
def state(self) -> PlaybackState:
with self._lock:
return self._state
@property
def frame_index(self) -> int:
with self._lock:
return self._frame_index
@property
def info(self) -> RawIQFileInfo | None:
with self._lock:
return self._info
@property
def fps(self) -> float:
with self._lock:
return self._fps
@property
def is_loaded(self) -> bool:
with self._lock:
return self._data is not None
# ---- Frame delivery (called from worker thread) ------------------------
def next_frame(self) -> np.ndarray | None:
"""Block until the next frame is available, then return it.
Returns None when playback is stopped or file is unloaded.
The caller (worker thread) should use this in a loop.
"""
with self._cond:
while True:
if self._stop_requested or self._data is None:
return None
if self._state == PlaybackState.PLAYING:
return self._deliver_frame()
if self._step_requested:
self._step_requested = False
return self._deliver_frame()
# PAUSED or STOPPED — wait for signal
self._cond.wait(timeout=0.1)
def _deliver_frame(self) -> np.ndarray | None:
"""Return current frame and advance index. Must hold lock."""
if self._data is None:
return None
n_frames = self._data.shape[0]
if self._frame_index >= n_frames:
if self._loop:
self._frame_index = 0
else:
self._state = PlaybackState.PAUSED
return None
# Read the frame (memory-mapped, so this is cheap)
frame = np.array(self._data[self._frame_index]) # copy from mmap
self._frame_index += 1
return frame
+48 -161
View File
@@ -5,9 +5,6 @@ Classes:
- RadarDataWorker — reads from FT2232H via production RadarAcquisition, - RadarDataWorker — reads from FT2232H via production RadarAcquisition,
parses 0xAA/0xBB packets, assembles 64x32 frames, parses 0xAA/0xBB packets, assembles 64x32 frames,
runs host-side DSP, emits PyQt signals. runs host-side DSP, emits PyQt signals.
- RawIQReplayWorker — reads raw IQ .npy frames from RawIQReplayController,
processes through RawIQFrameProcessor, emits same
signals as RadarDataWorker + playback state.
- GPSDataWorker — reads GPS frames from STM32 CDC, emits GPSData signals. - GPSDataWorker — reads GPS frames from STM32 CDC, emits GPSData signals.
- TargetSimulator — QTimer-based demo target generator. - TargetSimulator — QTimer-based demo target generator.
@@ -23,6 +20,8 @@ import queue
import struct import struct
import logging import logging
import numpy as np
from PyQt6.QtCore import QThread, QObject, QTimer, pyqtSignal from PyQt6.QtCore import QThread, QObject, QTimer, pyqtSignal
from .models import RadarTarget, GPSData, RadarSettings from .models import RadarTarget, GPSData, RadarSettings
@@ -35,11 +34,9 @@ from .hardware import (
) )
from .processing import ( from .processing import (
RadarProcessor, RadarProcessor,
RawIQFrameProcessor,
USBPacketParser, USBPacketParser,
extract_targets_from_frame, apply_pitch_correction,
) )
from .raw_iq_replay import RawIQReplayController, PlaybackState
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -209,17 +206,56 @@ class RadarDataWorker(QThread):
Bin-to-physical conversion uses RadarSettings.range_resolution Bin-to-physical conversion uses RadarSettings.range_resolution
and velocity_resolution (should be calibrated to actual waveform). and velocity_resolution (should be calibrated to actual waveform).
""" """
targets: list[RadarTarget] = []
cfg = self._processor.config cfg = self._processor.config
if not (cfg.clustering_enabled or cfg.tracking_enabled): if not (cfg.clustering_enabled or cfg.tracking_enabled):
return [] return targets
targets = extract_targets_from_frame( # Extract detections from FPGA CFAR flags
frame, det_indices = np.argwhere(frame.detections > 0)
self._settings.range_resolution, r_res = self._settings.range_resolution
self._settings.velocity_resolution, v_res = self._settings.velocity_resolution
gps=self._gps,
for idx in det_indices:
rbin, dbin = idx
mag = frame.magnitude[rbin, dbin]
snr = 10 * np.log10(max(mag, 1)) if mag > 0 else 0
# Convert bin indices to physical units
range_m = float(rbin) * r_res
# Doppler: centre bin (16) = 0 m/s; positive bins = approaching
velocity_ms = float(dbin - 16) * v_res
# Apply pitch correction if GPS data available
raw_elev = 0.0 # FPGA doesn't send elevation per-detection
corr_elev = raw_elev
if self._gps:
corr_elev = apply_pitch_correction(raw_elev, self._gps.pitch)
# Compute geographic position if GPS available
lat, lon = 0.0, 0.0
azimuth = 0.0 # No azimuth from single-beam; set to heading
if self._gps:
azimuth = self._gps.heading
lat, lon = polar_to_geographic(
self._gps.latitude, self._gps.longitude,
range_m, azimuth,
) )
target = RadarTarget(
id=len(targets),
range=range_m,
velocity=velocity_ms,
azimuth=azimuth,
elevation=corr_elev,
latitude=lat,
longitude=lon,
snr=snr,
timestamp=frame.timestamp,
)
targets.append(target)
# DBSCAN clustering # DBSCAN clustering
if cfg.clustering_enabled and len(targets) > 0: if cfg.clustering_enabled and len(targets) > 0:
clusters = self._processor.clustering( clusters = self._processor.clustering(
@@ -232,155 +268,6 @@ class RadarDataWorker(QThread):
return targets return targets
# =============================================================================
# Raw IQ Replay Worker (QThread) — processes raw .npy captures
# =============================================================================
class RawIQReplayWorker(QThread):
"""Background worker for raw IQ replay mode.
Reads frames from a RawIQReplayController, processes them through
RawIQFrameProcessor (quantize -> AGC -> FFT -> CFAR -> RadarFrame),
and emits the same signals as RadarDataWorker so the dashboard can
display them identically.
Additional signal:
playbackStateChanged(str) — "playing", "paused", "stopped"
frameIndexChanged(int, int) — (current_index, total_frames)
Signals:
frameReady(RadarFrame)
statusReceived(object)
targetsUpdated(list)
errorOccurred(str)
statsUpdated(dict)
playbackStateChanged(str)
frameIndexChanged(int, int)
"""
frameReady = pyqtSignal(object)
statusReceived = pyqtSignal(object)
targetsUpdated = pyqtSignal(list)
errorOccurred = pyqtSignal(str)
statsUpdated = pyqtSignal(dict)
playbackStateChanged = pyqtSignal(str)
frameIndexChanged = pyqtSignal(int, int)
def __init__(
self,
controller: RawIQReplayController,
processor: RawIQFrameProcessor,
host_processor: RadarProcessor | None = None,
settings: RadarSettings | None = None,
gps_data_ref: GPSData | None = None,
parent=None,
):
super().__init__(parent)
self._controller = controller
self._processor = processor
self._host_processor = host_processor
self._settings = settings or RadarSettings()
self._gps = gps_data_ref
self._running = False
self._frame_count = 0
self._error_count = 0
def stop(self):
self._running = False
self._controller.stop()
def run(self):
self._running = True
self._frame_count = 0
self._last_emitted_state: str | None = None
logger.info("RawIQReplayWorker started")
info = self._controller.info
total_frames = info.n_frames if info else 0
while self._running:
try:
# Block until next frame or stop
raw_frame = self._controller.next_frame()
if raw_frame is None:
# Stopped or end of file
if self._running:
self.playbackStateChanged.emit("stopped")
break
# Process through full signal chain
import time as _time
ts = _time.time()
frame, status, _agc_result = self._processor.process_frame(
raw_frame, timestamp=ts)
self._frame_count += 1
# Emit signals
self.frameReady.emit(frame)
self.statusReceived.emit(status)
# Emit frame index
idx = self._controller.frame_index
self.frameIndexChanged.emit(idx, total_frames)
# Emit playback state only on transitions (avoid race
# where a stale "playing" signal overwrites a pause)
state = self._controller.state
state_str = state.name.lower()
if state_str != self._last_emitted_state:
self._last_emitted_state = state_str
self.playbackStateChanged.emit(state_str)
# Run host-side DSP if configured
if self._host_processor is not None:
targets = self._extract_targets(frame)
if targets:
self.targetsUpdated.emit(targets)
# Stats
self.statsUpdated.emit({
"frames": self._frame_count,
"detection_count": frame.detection_count,
"errors": self._error_count,
"frame_index": idx,
"total_frames": total_frames,
})
# Rate limiting: sleep to match target FPS
fps = self._controller.fps
if fps > 0 and self._controller.state == PlaybackState.PLAYING:
self.msleep(int(1000.0 / fps))
except (ValueError, IndexError) as e:
self._error_count += 1
self.errorOccurred.emit(str(e))
logger.error(f"RawIQReplayWorker error: {e}")
self._running = False
logger.info("RawIQReplayWorker stopped")
def _extract_targets(self, frame: RadarFrame) -> list[RadarTarget]:
"""Extract targets from detection mask using shared bin-to-physical conversion."""
targets = extract_targets_from_frame(
frame,
self._settings.range_resolution,
self._settings.velocity_resolution,
gps=self._gps,
)
# Clustering + tracking
if self._host_processor is not None:
cfg = self._host_processor.config
if cfg.clustering_enabled and len(targets) > 0:
clusters = self._host_processor.clustering(
targets, cfg.clustering_eps, cfg.clustering_min_samples)
if cfg.tracking_enabled:
targets = self._host_processor.association(targets, clusters)
self._host_processor.tracking(targets)
return targets
# ============================================================================= # =============================================================================
# GPS Data Worker (QThread) # GPS Data Worker (QThread)
# ============================================================================= # =============================================================================