diff --git a/9_Firmware/9_3_GUI/test_v7.py b/9_Firmware/9_3_GUI/test_v7.py index 7994622..bb54e48 100644 --- a/9_Firmware/9_3_GUI/test_v7.py +++ b/9_Firmware/9_3_GUI/test_v7.py @@ -69,39 +69,6 @@ class TestRadarSettings(unittest.TestCase): self.assertEqual(s.max_distance, 50000) -class TestWaveformConfig(unittest.TestCase): - """WaveformConfig — range/velocity resolution from waveform params.""" - - def test_adi_phaser_range_resolution(self): - """ADI CN0566 defaults: 4MSPS, 500MHz BW, 300µs chirp, 1079 samples.""" - wf = _models().WaveformConfig() # ADI phaser defaults - r_res = wf.range_resolution(n_samples=1079) - # Expected: c * fs / (2 * N * slope) = 3e8 * 4e6 / (2 * 1079 * 1.667e12) - # ≈ 0.334 m/bin - self.assertAlmostEqual(r_res, 0.334, places=2) - - def test_adi_phaser_velocity_resolution(self): - """ADI phaser: 256 chirps, 1079 samples at 4 MSPS.""" - wf = _models().WaveformConfig() - v_res = wf.velocity_resolution(n_samples=1079, n_chirps=256) - # λ * fs / (2 * N * M) = 0.03 * 4e6 / (2 * 1079 * 256) ≈ 0.217 m/s/bin - self.assertAlmostEqual(v_res, 0.217, places=2) - - def test_max_range(self): - wf = _models().WaveformConfig() - max_r = wf.max_range(n_range_bins=64, n_samples=1079) - # 0.334 * 64 ≈ 21.4 m - self.assertAlmostEqual(max_r, 21.4, places=0) - - def test_plfm_defaults_differ(self): - """PLFM FPGA defaults (781.25 m/bin) must NOT equal ADI phaser.""" - default_settings = _models().RadarSettings() - wf = _models().WaveformConfig() - r_res = wf.range_resolution(n_samples=1079) - self.assertNotAlmostEqual(default_settings.range_resolution, r_res, - places=0) # 781 vs 0.33 - - class TestGPSData(unittest.TestCase): def test_to_dict(self): g = _models().GPSData(latitude=41.9, longitude=12.5, diff --git a/9_Firmware/9_3_GUI/v7/__init__.py b/9_Firmware/9_3_GUI/v7/__init__.py index ef053fe..175da91 100644 --- a/9_Firmware/9_3_GUI/v7/__init__.py +++ b/9_Firmware/9_3_GUI/v7/__init__.py @@ -11,7 +11,6 @@ top-level imports: from .models import ( RadarTarget, RadarSettings, - WaveformConfig, GPSData, ProcessingConfig, TileServer, @@ -67,7 +66,7 @@ except ImportError: # PyQt6 not installed (e.g. CI headless runner) __all__ = [ # noqa: RUF022 # models - "RadarTarget", "RadarSettings", "WaveformConfig", "GPSData", "ProcessingConfig", "TileServer", + "RadarTarget", "RadarSettings", "GPSData", "ProcessingConfig", "TileServer", "DARK_BG", "DARK_FG", "DARK_ACCENT", "DARK_HIGHLIGHT", "DARK_BORDER", "DARK_TEXT", "DARK_BUTTON", "DARK_BUTTON_HOVER", "DARK_TREEVIEW", "DARK_TREEVIEW_ALT", diff --git a/9_Firmware/9_3_GUI/v7/dashboard.py b/9_Firmware/9_3_GUI/v7/dashboard.py index b202d17..1b3b2a8 100644 --- a/9_Firmware/9_3_GUI/v7/dashboard.py +++ b/9_Firmware/9_3_GUI/v7/dashboard.py @@ -23,10 +23,8 @@ commands sent over FT2232H. """ import time -import re import logging from collections import deque -from pathlib import Path import numpy as np @@ -44,7 +42,7 @@ from matplotlib.backends.backend_qtagg import FigureCanvasQTAgg from matplotlib.figure import Figure from .models import ( - RadarTarget, RadarSettings, WaveformConfig, GPSData, ProcessingConfig, + RadarTarget, RadarSettings, GPSData, ProcessingConfig, DARK_BG, DARK_FG, DARK_ACCENT, DARK_HIGHLIGHT, DARK_BORDER, DARK_TEXT, DARK_BUTTON, DARK_BUTTON_HOVER, DARK_TREEVIEW, DARK_TREEVIEW_ALT, @@ -61,10 +59,8 @@ from .hardware import ( DataRecorder, STM32USBInterface, ) -from .processing import RadarProcessor, USBPacketParser, RawIQFrameProcessor -from .workers import RadarDataWorker, RawIQReplayWorker, GPSDataWorker, TargetSimulator -from .raw_iq_replay import RawIQReplayController, PlaybackState -from .agc_sim import AGCConfig +from .processing import RadarProcessor, USBPacketParser +from .workers import RadarDataWorker, GPSDataWorker, TargetSimulator from .map_widget import RadarMapWidget logger = logging.getLogger(__name__) @@ -85,69 +81,24 @@ def _make_dspin() -> QDoubleSpinBox: return sb -def _parse_waveform_from_filename(name: str) -> WaveformConfig | None: - """Try to extract waveform params from ADI phaser filename convention. - - Expected pattern fragments (order-independent): - ``MSPS`` or ``MSps`` → sample rate in MHz - ``M`` (followed by _ or end) → bandwidth in MHz - ``u`` (followed by _ or end) → chirp duration in µs - - Returns a WaveformConfig with parsed values (defaults for un-parsed), - or None if nothing recognisable was found. - """ - cfg = WaveformConfig() # ADI phaser defaults - found = False - - # Sample rate: "4MSPS" or "4MSps" - m = re.search(r"(\d+)M[Ss][Pp][Ss]", name) - if m: - cfg.sample_rate_hz = float(m.group(1)) * 1e6 - found = True - - # Bandwidth: "500M" (must NOT be followed by S for MSPS) - m = re.search(r"(\d+)M(?![Ss])", name) - if m: - cfg.bandwidth_hz = float(m.group(1)) * 1e6 - found = True - - # Chirp duration: "300u" - m = re.search(r"(\d+)u", name) - if m: - cfg.chirp_duration_s = float(m.group(1)) * 1e-6 - found = True - - return cfg if found else None - - # ============================================================================= # Range-Doppler Canvas (matplotlib) # ============================================================================= class RangeDopplerCanvas(FigureCanvasQTAgg): - """Matplotlib canvas showing a Range-Doppler map with dark theme. - - Adapts dynamically to incoming frame dimensions (e.g. 64x32 from FPGA, - or different sizes from Raw IQ Replay). - """ + """Matplotlib canvas showing the 64x32 Range-Doppler map with dark theme.""" def __init__(self, _parent=None): fig = Figure(figsize=(10, 6), facecolor=DARK_BG) self.ax = fig.add_subplot(111, facecolor=DARK_ACCENT) - # Initial backing data — will resize on first update_map call - self._n_range = NUM_RANGE_BINS - self._n_doppler = NUM_DOPPLER_BINS - self._data = np.zeros((self._n_range, self._n_doppler)) + self._data = np.zeros((NUM_RANGE_BINS, NUM_DOPPLER_BINS)) self.im = self.ax.imshow( self._data, aspect="auto", cmap="hot", - extent=[0, self._n_doppler, 0, self._n_range], origin="lower", + extent=[0, NUM_DOPPLER_BINS, 0, NUM_RANGE_BINS], origin="lower", ) - self.ax.set_title( - f"Range-Doppler Map ({self._n_range}x{self._n_doppler})", - color=DARK_FG, - ) + self.ax.set_title("Range-Doppler Map (64x32)", color=DARK_FG) self.ax.set_xlabel("Doppler Bin", color=DARK_FG) self.ax.set_ylabel("Range Bin", color=DARK_FG) self.ax.tick_params(colors=DARK_FG) @@ -158,20 +109,7 @@ class RangeDopplerCanvas(FigureCanvasQTAgg): super().__init__(fig) def update_map(self, magnitude: np.ndarray, _detections: np.ndarray = None): - """Update the heatmap with new magnitude data. - - Automatically resizes the canvas if the incoming shape differs from - the current backing array. - """ - nr, nd = magnitude.shape - if nr != self._n_range or nd != self._n_doppler: - self._n_range = nr - self._n_doppler = nd - self._data = np.zeros((nr, nd)) - self.im.set_extent([0, nd, 0, nr]) - self.ax.set_title( - f"Range-Doppler Map ({nr}x{nd})", color=DARK_FG) - + """Update the heatmap with new magnitude data.""" display = np.log10(magnitude + 1) self.im.set_data(display) self.im.set_clim(vmin=display.min(), vmax=max(display.max(), 0.1)) @@ -215,15 +153,9 @@ class RadarDashboard(QMainWindow): self._gps_worker: GPSDataWorker | None = None self._simulator: TargetSimulator | None = None - # Raw IQ Replay - self._replay_controller: RawIQReplayController | None = None - self._replay_worker: RawIQReplayWorker | None = None - self._iq_processor: RawIQFrameProcessor | None = None - # State self._running = False self._demo_mode = False - self._replay_status_override: str | None = None # "playing"/"paused" self._start_time = time.time() self._current_frame: RadarFrame | None = None self._last_status: StatusResponse | None = None @@ -420,8 +352,7 @@ class RadarDashboard(QMainWindow): # Row 0: connection mode + device combos + buttons ctrl_layout.addWidget(QLabel("Mode:"), 0, 0) self._mode_combo = QComboBox() - self._mode_combo.addItems([ - "Mock", "Live FT2232H", "Replay (.npy)", "Raw IQ Replay (.npy)"]) + self._mode_combo.addItems(["Mock", "Live FT2232H", "Replay (.npy)"]) self._mode_combo.setCurrentIndex(0) ctrl_layout.addWidget(self._mode_combo, 0, 1) @@ -470,104 +401,6 @@ class RadarDashboard(QMainWindow): self._status_label_main.setAlignment(Qt.AlignmentFlag.AlignRight) ctrl_layout.addWidget(self._status_label_main, 1, 5, 1, 5) - # Row 2: Raw IQ playback controls (hidden until Raw IQ mode active) - self._playback_frame = QFrame() - self._playback_frame.setStyleSheet( - f"background-color: {DARK_HIGHLIGHT}; border-radius: 4px;") - pb_layout = QHBoxLayout(self._playback_frame) - pb_layout.setContentsMargins(8, 4, 8, 4) - - self._pb_play_btn = QPushButton("Play") - self._pb_play_btn.setStyleSheet( - f"QPushButton {{ background-color: {DARK_SUCCESS}; color: white; }}") - self._pb_play_btn.clicked.connect(self._pb_play_pause) - pb_layout.addWidget(self._pb_play_btn) - - self._pb_step_btn = QPushButton("Step") - self._pb_step_btn.clicked.connect(self._pb_step) - pb_layout.addWidget(self._pb_step_btn) - - self._pb_stop_btn = QPushButton("Stop") - self._pb_stop_btn.setStyleSheet( - f"QPushButton {{ background-color: {DARK_ERROR}; color: white; }}") - self._pb_stop_btn.clicked.connect(self._stop_radar) - pb_layout.addWidget(self._pb_stop_btn) - - pb_layout.addWidget(QLabel("FPS:")) - self._pb_fps_spin = _make_dspin() - self._pb_fps_spin.setRange(0.1, 60.0) - self._pb_fps_spin.setValue(10.0) - self._pb_fps_spin.setSingleStep(1.0) - self._pb_fps_spin.valueChanged.connect(self._pb_fps_changed) - pb_layout.addWidget(self._pb_fps_spin) - - self._pb_loop_check = QCheckBox("Loop") - self._pb_loop_check.setChecked(True) - self._pb_loop_check.toggled.connect(self._pb_loop_changed) - pb_layout.addWidget(self._pb_loop_check) - - self._pb_frame_label = QLabel("Frame: 0 / 0") - self._pb_frame_label.setStyleSheet( - f"color: {DARK_INFO}; font-weight: bold;") - pb_layout.addWidget(self._pb_frame_label) - - self._pb_file_label = QLabel("") - self._pb_file_label.setStyleSheet(f"color: {DARK_TEXT}; font-size: 10px;") - pb_layout.addWidget(self._pb_file_label) - - pb_layout.addStretch() - self._playback_frame.setVisible(False) - ctrl_layout.addWidget(self._playback_frame, 2, 0, 1, 10) - - # -- Waveform config row (Raw IQ replay only) ----------------------- - self._waveform_frame = QFrame() - self._waveform_frame.setStyleSheet( - f"background-color: {DARK_ACCENT}; border-radius: 4px;") - wf_layout = QHBoxLayout(self._waveform_frame) - wf_layout.setContentsMargins(8, 4, 8, 4) - - wf_layout.addWidget(QLabel("Waveform:")) - - wf_layout.addWidget(QLabel("fs (MHz):")) - self._wf_fs_spin = _make_dspin() - self._wf_fs_spin.setRange(0.1, 100.0) - self._wf_fs_spin.setValue(4.0) - self._wf_fs_spin.setDecimals(2) - self._wf_fs_spin.setToolTip("ADC sample rate in MHz") - wf_layout.addWidget(self._wf_fs_spin) - - wf_layout.addWidget(QLabel("BW (MHz):")) - self._wf_bw_spin = _make_dspin() - self._wf_bw_spin.setRange(1.0, 5000.0) - self._wf_bw_spin.setValue(500.0) - self._wf_bw_spin.setDecimals(1) - self._wf_bw_spin.setToolTip("Chirp bandwidth in MHz") - wf_layout.addWidget(self._wf_bw_spin) - - wf_layout.addWidget(QLabel("T (us):")) - self._wf_chirp_spin = _make_dspin() - self._wf_chirp_spin.setRange(1.0, 10000.0) - self._wf_chirp_spin.setValue(300.0) - self._wf_chirp_spin.setDecimals(1) - self._wf_chirp_spin.setToolTip("Chirp duration in microseconds") - wf_layout.addWidget(self._wf_chirp_spin) - - wf_layout.addWidget(QLabel("fc (GHz):")) - self._wf_fc_spin = _make_dspin() - self._wf_fc_spin.setRange(0.1, 100.0) - self._wf_fc_spin.setValue(10.0) - self._wf_fc_spin.setDecimals(2) - self._wf_fc_spin.setToolTip("Carrier frequency in GHz") - wf_layout.addWidget(self._wf_fc_spin) - - self._wf_res_label = QLabel("") - self._wf_res_label.setStyleSheet(f"color: {DARK_INFO}; font-size: 10px;") - wf_layout.addWidget(self._wf_res_label) - - wf_layout.addStretch() - self._waveform_frame.setVisible(False) - ctrl_layout.addWidget(self._waveform_frame, 3, 0, 1, 10) - layout.addWidget(ctrl) # ---- Display area (range-doppler + targets table) ------------------ @@ -648,22 +481,12 @@ class RadarDashboard(QMainWindow): self._alt_spin.setValue(0.0) self._alt_spin.setSuffix(" m") - self._heading_spin = _make_dspin() - self._heading_spin.setRange(0, 360) - self._heading_spin.setDecimals(1) - self._heading_spin.setValue(0.0) - self._heading_spin.setSuffix("\u00b0") - self._heading_spin.setWrapping(True) - self._heading_spin.valueChanged.connect(self._on_position_changed) - pos_layout.addWidget(QLabel("Latitude:"), 0, 0) pos_layout.addWidget(self._lat_spin, 0, 1) pos_layout.addWidget(QLabel("Longitude:"), 1, 0) pos_layout.addWidget(self._lon_spin, 1, 1) pos_layout.addWidget(QLabel("Altitude:"), 2, 0) pos_layout.addWidget(self._alt_spin, 2, 1) - pos_layout.addWidget(QLabel("Heading:"), 3, 0) - pos_layout.addWidget(self._heading_spin, 3, 1) sb_layout.addWidget(pos_group) @@ -1386,10 +1209,6 @@ class RadarDashboard(QMainWindow): try: mode = self._mode_combo.currentText() - if "Raw IQ" in mode: - self._start_raw_iq_replay() - return - if "Mock" in mode: self._connection = FT2232HConnection(mock=True) if not self._connection.open(): @@ -1429,7 +1248,6 @@ class RadarDashboard(QMainWindow): self._radar_worker.targetsUpdated.connect(self._on_radar_targets) self._radar_worker.statsUpdated.connect(self._on_radar_stats) self._radar_worker.errorOccurred.connect(self._on_worker_error) - self._radar_worker.finished.connect(self._on_worker_finished) self._radar_worker.start() # Optionally start GPS worker @@ -1464,32 +1282,12 @@ class RadarDashboard(QMainWindow): def _stop_radar(self): self._running = False - self._replay_status_override = None - - # Stop demo simulator if active (prevents cross-mode interference) - if self._simulator: - self._simulator.stop() - self._simulator = None - self._demo_mode = False - self._demo_btn_main.setText("Start Demo") - self._demo_btn_map.setText("Start Demo") - self._demo_btn_map.setChecked(False) if self._radar_worker: self._radar_worker.stop() self._radar_worker.wait(2000) self._radar_worker = None - # Raw IQ Replay cleanup - if self._replay_controller is not None: - self._replay_controller.stop() - if self._replay_worker is not None: - self._replay_worker.stop() - self._replay_worker.wait(2000) - self._replay_worker = None - self._replay_controller = None - self._iq_processor = None - if self._gps_worker: self._gps_worker.stop() self._gps_worker.wait(2000) @@ -1506,237 +1304,11 @@ class RadarDashboard(QMainWindow): self._mode_combo.setEnabled(True) self._demo_btn_main.setEnabled(True) self._demo_btn_map.setEnabled(True) - self._playback_frame.setVisible(False) - self._waveform_frame.setVisible(False) - self._pb_play_btn.setText("Play") - self._pb_frame_label.setText("Frame: 0 / 0") - self._pb_file_label.setText("") self._status_label_main.setText("Status: Radar stopped") self._sb_status.setText("Radar stopped") self._sb_mode.setText("Idle") logger.info("Radar system stopped") - # ===================================================================== - # Raw IQ Replay - # ===================================================================== - - def _start_raw_iq_replay(self): - """Start raw IQ replay mode: load .npy file and begin playback.""" - from PyQt6.QtWidgets import QFileDialog - - npy_path, _ = QFileDialog.getOpenFileName( - self, "Select Raw IQ .npy file", "", - "NumPy files (*.npy);;All files (*)") - if not npy_path: - return - - try: - # Create controller and load file - self._replay_controller = RawIQReplayController() - info = self._replay_controller.load_file(npy_path) - - # -- Waveform calibration: try to parse from filename ----------- - parsed_wf = _parse_waveform_from_filename(Path(npy_path).name) - if parsed_wf is not None: - self._wf_fs_spin.setValue(parsed_wf.sample_rate_hz / 1e6) - self._wf_bw_spin.setValue(parsed_wf.bandwidth_hz / 1e6) - self._wf_chirp_spin.setValue(parsed_wf.chirp_duration_s / 1e-6) - self._wf_fc_spin.setValue(parsed_wf.center_freq_hz / 1e9) - logger.info("Waveform params parsed from filename: %s", parsed_wf) - - # Build waveform config from (possibly updated) spinboxes - wfc = self._waveform_config_from_ui() - range_res = wfc.range_resolution(info.n_samples) - vel_res = wfc.velocity_resolution(info.n_samples, info.n_chirps) - n_range_out = min(64, info.n_samples) - max_range = range_res * n_range_out - - # Create replay-specific RadarSettings with correct calibration - replay_settings = RadarSettings( - system_frequency=wfc.center_freq_hz, - range_resolution=range_res, - velocity_resolution=vel_res, - max_distance=max_range, - map_size=max_range * 1.2, - coverage_radius=max_range * 1.2, - ) - logger.info( - "Replay calibration: range_res=%.4f m/bin, vel_res=%.4f m/s/bin, " - "max_range=%.1f m", - range_res, vel_res, max_range, - ) - - # Update coverage/map spinboxes to match replay scale - self._coverage_spin.setValue(replay_settings.coverage_radius / 1000) - self._update_waveform_res_label(info.n_samples, info.n_chirps) - - # Create frame processor - self._iq_processor = RawIQFrameProcessor( - n_range_out=n_range_out, - n_doppler_out=min(32, info.n_chirps), - ) - - # Apply current AGC settings from FPGA Control tab - agc_enable = self._param_spins.get("0x28") - agc_target = self._param_spins.get("0x29") - agc_attack = self._param_spins.get("0x2A") - agc_decay = self._param_spins.get("0x2B") - agc_holdoff = self._param_spins.get("0x2C") - self._iq_processor.set_agc_config(AGCConfig( - enabled=bool(agc_enable.value()) if agc_enable else False, - target=agc_target.value() if agc_target else 200, - attack=agc_attack.value() if agc_attack else 1, - decay=agc_decay.value() if agc_decay else 1, - holdoff=agc_holdoff.value() if agc_holdoff else 4, - )) - - # Apply CFAR settings - cfar_en = self._param_spins.get("0x25") - cfar_guard = self._param_spins.get("0x21") - cfar_train = self._param_spins.get("0x22") - cfar_alpha = self._param_spins.get("0x23") - cfar_mode = self._param_spins.get("0x24") - self._iq_processor.set_cfar_params( - enabled=bool(cfar_en.value()) if cfar_en else False, - guard=cfar_guard.value() if cfar_guard else 2, - train=cfar_train.value() if cfar_train else 8, - alpha_q44=cfar_alpha.value() if cfar_alpha else 0x30, - mode=cfar_mode.value() if cfar_mode else 0, - ) - - # Apply MTI / DC notch - mti_en = self._param_spins.get("0x26") - dc_notch = self._param_spins.get("0x27") - self._iq_processor.set_mti_enabled( - bool(mti_en.value()) if mti_en else False) - self._iq_processor.set_dc_notch_width( - dc_notch.value() if dc_notch else 0) - - # Threshold - thresh = self._param_spins.get("0x03") - self._iq_processor.set_detect_threshold( - thresh.value() if thresh else 10000) - - # Create worker - self._replay_worker = RawIQReplayWorker( - controller=self._replay_controller, - processor=self._iq_processor, - host_processor=self._processor, - settings=replay_settings, - gps_data_ref=self._radar_position, - ) - self._replay_worker.frameReady.connect(self._on_frame_ready) - self._replay_worker.statusReceived.connect(self._on_status_received) - self._replay_worker.targetsUpdated.connect(self._on_radar_targets) - self._replay_worker.statsUpdated.connect(self._on_radar_stats) - self._replay_worker.errorOccurred.connect(self._on_worker_error) - self._replay_worker.finished.connect(self._on_worker_finished) - self._replay_worker.playbackStateChanged.connect( - self._on_playback_state_changed) - self._replay_worker.frameIndexChanged.connect( - self._on_frame_index_changed) - - # Start worker (paused initially) - self._replay_worker.start() - - # UI state - self._running = True - self._replay_status_override = "paused" - self._start_time = time.time() - self._frame_count = 0 - self._start_btn.setEnabled(False) - self._stop_btn.setEnabled(True) - self._mode_combo.setEnabled(False) - self._demo_btn_main.setEnabled(False) - self._demo_btn_map.setEnabled(False) - self._playback_frame.setVisible(True) - self._waveform_frame.setVisible(True) - self._pb_frame_label.setText(f"Frame: 0 / {info.n_frames}") - self._pb_file_label.setText( - f"{Path(npy_path).name} " - f"({info.n_chirps}x{info.n_samples}, " - f"{info.file_size_mb:.1f} MB)") - self._status_label_main.setText("Status: Raw IQ Replay (paused)") - self._sb_status.setText("Raw IQ Replay") - self._sb_mode.setText("Raw IQ Replay") - logger.info(f"Raw IQ Replay started: {npy_path}") - - except (ValueError, OSError) as e: - # Clean up any partially-created objects - if self._replay_worker is not None: - self._replay_worker.stop() - self._replay_worker.wait(1000) - self._replay_worker = None - self._replay_controller = None - self._iq_processor = None - QMessageBox.critical(self, "Error", - f"Failed to load raw IQ file:\n{e}") - logger.error(f"Raw IQ load error: {e}") - - # ---- Playback control slots -------------------------------------------- - - def _pb_play_pause(self): - """Toggle play/pause for raw IQ replay.""" - if self._replay_controller is None: - return - state = self._replay_controller.state - if state == PlaybackState.PLAYING: - self._replay_controller.pause() - self._pb_play_btn.setText("Play") - else: - self._replay_controller.play() - self._pb_play_btn.setText("Pause") - - def _pb_step(self): - """Step one frame forward in raw IQ replay.""" - if self._replay_controller is not None: - self._replay_controller.step_forward() - - def _pb_fps_changed(self, value: float): - if self._replay_controller is not None: - self._replay_controller.set_fps(value) - - def _pb_loop_changed(self, checked: bool): - if self._replay_controller is not None: - self._replay_controller.set_loop(checked) - - def _waveform_config_from_ui(self) -> WaveformConfig: - """Build a WaveformConfig from the waveform spinboxes.""" - return WaveformConfig( - sample_rate_hz=self._wf_fs_spin.value() * 1e6, - bandwidth_hz=self._wf_bw_spin.value() * 1e6, - chirp_duration_s=self._wf_chirp_spin.value() * 1e-6, - center_freq_hz=self._wf_fc_spin.value() * 1e9, - ) - - def _update_waveform_res_label(self, n_samples: int, n_chirps: int) -> None: - """Update the waveform resolution info label.""" - wfc = self._waveform_config_from_ui() - r_res = wfc.range_resolution(n_samples) - v_res = wfc.velocity_resolution(n_samples, n_chirps) - n_r = min(64, n_samples) - max_r = r_res * n_r - self._wf_res_label.setText( - f"Range: {r_res:.3f} m/bin | Vel: {v_res:.3f} m/s/bin | " - f"Max range: {max_r:.1f} m ({n_r} bins)" - ) - - @pyqtSlot(str) - def _on_playback_state_changed(self, state_str: str): - if state_str == "playing": - self._pb_play_btn.setText("Pause") - self._replay_status_override = "playing" - elif state_str == "paused": - self._pb_play_btn.setText("Play") - self._replay_status_override = "paused" - elif state_str == "stopped": - self._replay_status_override = None - self._stop_radar() - - @pyqtSlot(int, int) - def _on_frame_index_changed(self, current: int, total: int): - self._pb_frame_label.setText(f"Frame: {current} / {total}") - # ===================================================================== # Demo mode # ===================================================================== @@ -1752,9 +1324,8 @@ class RadarDashboard(QMainWindow): self._simulator.targetsUpdated.connect(self._on_demo_targets) self._simulator.start(500) self._demo_mode = True - if not self._running: - self._sb_mode.setText("Demo Mode") - self._sb_status.setText("Demo mode active") + self._sb_mode.setText("Demo Mode") + self._sb_status.setText("Demo mode active") self._demo_btn_main.setText("Stop Demo") self._demo_btn_map.setText("Stop Demo") self._demo_btn_map.setChecked(True) @@ -1767,18 +1338,12 @@ class RadarDashboard(QMainWindow): self._demo_mode = False if not self._running: mode = "Idle" - elif self._replay_controller is not None: - mode = "Raw IQ Replay" elif isinstance(self._connection, ReplayConnection): mode = "Replay" else: - # Use mode combo text for Mock vs Live distinction - mode = self._mode_combo.currentText() - if "Mock" not in mode and "Live" not in mode: - mode = "Live" + mode = "Live" self._sb_mode.setText(mode) - if not self._running: - self._sb_status.setText("Demo stopped") + self._sb_status.setText("Demo stopped") self._demo_btn_main.setText("Start Demo") self._demo_btn_map.setText("Start Demo") self._demo_btn_map.setChecked(False) @@ -1830,12 +1395,6 @@ class RadarDashboard(QMainWindow): def _on_worker_error(self, msg: str): logger.error(f"Worker error: {msg}") - def _on_worker_finished(self): - """Handle unexpected worker thread exit — recover UI to stopped state.""" - if self._running: - logger.warning("Worker thread exited unexpectedly, resetting UI") - self._stop_radar() - @pyqtSlot(object) def _on_gps_received(self, gps: GPSData): self._gps_packet_count += 1 @@ -2016,7 +1575,6 @@ class RadarDashboard(QMainWindow): self._radar_position.latitude = self._lat_spin.value() self._radar_position.longitude = self._lon_spin.value() self._radar_position.altitude = self._alt_spin.value() - self._radar_position.heading = self._heading_spin.value() self._map_widget.set_radar_position(self._radar_position) if self._simulator: self._simulator.set_radar_position(self._radar_position) @@ -2091,17 +1649,10 @@ class RadarDashboard(QMainWindow): if self._running: det = (self._current_frame.detection_count if self._current_frame else 0) - # Preserve replay-specific status (paused/playing) - if self._replay_status_override == "paused": - self._status_label_main.setText( - f"Status: Raw IQ Replay (paused) \u2014 " - f"Frames: {self._frame_count} \u2014 Detections: {det}" - ) - else: - self._status_label_main.setText( - f"Status: Running \u2014 Frames: {self._frame_count} " - f"\u2014 Detections: {det}" - ) + self._status_label_main.setText( + f"Status: Running \u2014 Frames: {self._frame_count} " + f"\u2014 Detections: {det}" + ) # Diagnostics values self._update_diagnostics() @@ -2186,11 +1737,6 @@ class RadarDashboard(QMainWindow): def closeEvent(self, event): if self._simulator: self._simulator.stop() - if self._replay_controller is not None: - self._replay_controller.stop() - if self._replay_worker is not None: - self._replay_worker.stop() - self._replay_worker.wait(1000) if self._radar_worker: self._radar_worker.stop() self._radar_worker.wait(1000) diff --git a/9_Firmware/9_3_GUI/v7/models.py b/9_Firmware/9_3_GUI/v7/models.py index 9695b9f..a5eb40e 100644 --- a/9_Firmware/9_3_GUI/v7/models.py +++ b/9_Firmware/9_3_GUI/v7/models.py @@ -96,47 +96,6 @@ class RadarTarget: return asdict(self) -@dataclass -class WaveformConfig: - """FMCW waveform parameters for bin-to-physical-unit conversion. - - Defaults are for the ADI CN0566 phaser (10 GHz, 500 MHz BW, 300 µs chirp, - 4 MSPS ADC). For the PLFM FPGA waveform the values differ — but the FPGA - pipeline hardcodes its own bin widths, so this config is only used for - Raw IQ Replay (host-side FFT processing). - """ - sample_rate_hz: float = 4e6 # ADC sample rate (Hz) - bandwidth_hz: float = 500e6 # Chirp bandwidth (Hz) - chirp_duration_s: float = 300e-6 # Chirp sweep time (s) - center_freq_hz: float = 10e9 # Carrier frequency (Hz) - - # --- derived quantities (need n_samples, n_chirps from file) --- - - def range_resolution(self, n_samples: int) -> float: - """Metres per range bin for an N-point range FFT. - - range_per_bin = c · fs / (2 · N · slope) - where slope = BW / T_chirp. - """ - c = 299_792_458.0 - slope = self.bandwidth_hz / self.chirp_duration_s - return c * self.sample_rate_hz / (2.0 * n_samples * slope) - - def velocity_resolution(self, n_samples: int, n_chirps: int) -> float: - """m/s per Doppler bin for an M-chirp Doppler FFT. - - vel_per_bin = λ · fs / (2 · N · M) - where λ = c / fc, N = n_samples (PRI = N/fs), M = n_chirps. - """ - c = 299_792_458.0 - wavelength = c / self.center_freq_hz - return wavelength * self.sample_rate_hz / (2.0 * n_samples * n_chirps) - - def max_range(self, n_range_bins: int, n_samples: int) -> float: - """Maximum unambiguous range for the given bin count.""" - return self.range_resolution(n_samples) * n_range_bins - - @dataclass class RadarSettings: """Radar system display/map configuration. diff --git a/9_Firmware/9_3_GUI/v7/processing.py b/9_Firmware/9_3_GUI/v7/processing.py index 9bc5752..c6ce2cd 100644 --- a/9_Firmware/9_3_GUI/v7/processing.py +++ b/9_Firmware/9_3_GUI/v7/processing.py @@ -2,12 +2,9 @@ v7.processing — Radar signal processing and GPS parsing. Classes: - - RadarProcessor — dual-CPI fusion, multi-PRF unwrap, DBSCAN clustering, - association, Kalman tracking - - RawIQFrameProcessor — full signal chain for raw IQ replay: - quantize -> AGC -> Range FFT -> Doppler FFT -> - crop -> MTI -> DC notch -> mag -> CFAR - - USBPacketParser — parse GPS text/binary frames from STM32 CDC + - RadarProcessor — dual-CPI fusion, multi-PRF unwrap, DBSCAN clustering, + association, Kalman tracking + - USBPacketParser — parse GPS text/binary frames from STM32 CDC Note: RadarPacketParser (old A5/C3 sync + CRC16 format) was removed. All packet parsing now uses production RadarProtocol (0xAA/0xBB format) @@ -25,11 +22,6 @@ from .models import ( RadarTarget, GPSData, ProcessingConfig, SCIPY_AVAILABLE, SKLEARN_AVAILABLE, FILTERPY_AVAILABLE, ) -from .agc_sim import ( - AGCConfig, AGCState, AGCFrameResult, - quantize_iq, process_agc_frame, -) -from .hardware import RadarFrame, StatusResponse if SKLEARN_AVAILABLE: from sklearn.cluster import DBSCAN @@ -56,103 +48,6 @@ def apply_pitch_correction(raw_elevation: float, pitch: float) -> float: return raw_elevation - pitch -# ============================================================================= -# Utility: bin-to-physical target extraction (shared by all workers) -# ============================================================================= - -def extract_targets_from_frame( - frame: RadarFrame, - range_resolution: float, - velocity_resolution: float, - *, - gps: GPSData | None = None, -) -> list[RadarTarget]: - """Extract RadarTargets from a RadarFrame's detection mask. - - Performs bin-to-physical conversion and optional GPS coordinate mapping. - This is the shared implementation used by both RadarDataWorker (live mode) - and RawIQReplayWorker (replay mode). - - Args: - frame: RadarFrame with populated ``detections`` and ``magnitude`` arrays. - range_resolution: Metres per range bin. - velocity_resolution: m/s per Doppler bin. - gps: Optional GPSData for pitch correction and geographic mapping. - - Returns: - List of RadarTarget with physical-unit range, velocity, SNR, and - (if GPS available) lat/lon/azimuth/elevation. - """ - det_indices = np.argwhere(frame.detections > 0) - if len(det_indices) == 0: - return [] - - n_doppler = frame.magnitude.shape[1] - center_dbin = n_doppler // 2 - targets: list[RadarTarget] = [] - - for idx in det_indices: - rbin, dbin = idx - mag = frame.magnitude[rbin, dbin] - snr = 10 * np.log10(max(mag, 1)) if mag > 0 else 0.0 - - range_m = float(rbin) * range_resolution - velocity_ms = float(dbin - center_dbin) * velocity_resolution - - # GPS-dependent fields - raw_elev = 0.0 - corr_elev = raw_elev - lat, lon, azimuth = 0.0, 0.0, 0.0 - if gps is not None: - corr_elev = apply_pitch_correction(raw_elev, gps.pitch) - azimuth = gps.heading - lat, lon = _polar_to_geographic( - gps.latitude, gps.longitude, range_m, azimuth) - - targets.append(RadarTarget( - id=len(targets), - range=range_m, - velocity=velocity_ms, - azimuth=azimuth, - elevation=corr_elev, - latitude=lat, - longitude=lon, - snr=snr, - timestamp=frame.timestamp, - )) - - return targets - - -def _polar_to_geographic( - radar_lat: float, radar_lon: float, range_m: float, bearing_deg: float, -) -> tuple[float, float]: - """Convert polar (range, bearing) to geographic (lat, lon). - - Uses the spherical-Earth approximation (adequate for <50 km ranges). - Duplicated from ``workers.polar_to_geographic`` to keep processing.py - self-contained; the workers module still exports its own copy for - backward-compat. - """ - if range_m <= 0: - return radar_lat, radar_lon - earth_r = 6_371_000.0 - lat_r = math.radians(radar_lat) - lon_r = math.radians(radar_lon) - brg_r = math.radians(bearing_deg) - d_r = range_m / earth_r - - new_lat = math.asin( - math.sin(lat_r) * math.cos(d_r) - + math.cos(lat_r) * math.sin(d_r) * math.cos(brg_r) - ) - new_lon = lon_r + math.atan2( - math.sin(brg_r) * math.sin(d_r) * math.cos(lat_r), - math.cos(d_r) - math.sin(lat_r) * math.sin(new_lat), - ) - return math.degrees(new_lat), math.degrees(new_lon) - - # ============================================================================= # Radar Processor — signal-level processing & tracking pipeline # ============================================================================= @@ -556,269 +451,3 @@ class USBPacketParser: except (ValueError, struct.error) as e: logger.error(f"Error parsing binary GPS: {e}") return None - - -# ============================================================================= -# Raw IQ Frame Processor — full signal chain for replay mode -# ============================================================================= - -class RawIQFrameProcessor: - """Process raw complex IQ frames through the full radar signal chain. - - This replicates the FPGA processing pipeline in software so that - raw ADI CN0566 captures (or similar) can be visualised in the V7 - dashboard exactly as they would appear from the FPGA. - - Pipeline per frame: - 1. Quantize raw complex → 16-bit signed I/Q - 2. AGC gain application (bit-accurate to rx_gain_control.v) - 3. Range FFT (across samples) - 4. Doppler FFT (across chirps) + fftshift + centre crop - 5. Optional MTI (2-pulse canceller using history) - 6. Optional DC notch (zero-Doppler removal) - 7. Magnitude (|I| + |Q| approximation matching FPGA, or true |.|) - 8. CFAR or simple threshold detection - 9. Build RadarFrame + synthetic StatusResponse - """ - - def __init__( - self, - n_range_out: int = 64, - n_doppler_out: int = 32, - ): - self._n_range = n_range_out - self._n_doppler = n_doppler_out - - # AGC state (persists across frames) - self._agc_config = AGCConfig() - self._agc_state = AGCState() - - # MTI history buffer (stores previous Range-Doppler maps) - self._mti_history: list[np.ndarray] = [] - self._mti_enabled: bool = False - - # DC notch - self._dc_notch_width: int = 0 - - # CFAR / threshold config - self._cfar_enabled: bool = False - self._cfar_guard: int = 2 - self._cfar_train: int = 8 - self._cfar_alpha_q44: int = 0x30 # Q4.4 → 3.0 - self._cfar_mode: int = 0 # 0=CA, 1=GO, 2=SO - self._detect_threshold: int = 10000 - - # Frame counter - self._frame_number: int = 0 - - # Host-side processing (windowing, clustering, etc.) - self._host_processor = RadarProcessor() - - # ---- Configuration setters --------------------------------------------- - - def set_agc_config(self, config: AGCConfig) -> None: - self._agc_config = config - - def reset_agc_state(self) -> None: - """Reset AGC state (e.g. on seek).""" - self._agc_state = AGCState() - self._mti_history.clear() - - def set_mti_enabled(self, enabled: bool) -> None: - if self._mti_enabled != enabled: - self._mti_history.clear() - self._mti_enabled = enabled - - def set_dc_notch_width(self, width: int) -> None: - self._dc_notch_width = max(0, min(7, width)) - - def set_cfar_params( - self, - enabled: bool, - guard: int = 2, - train: int = 8, - alpha_q44: int = 0x30, - mode: int = 0, - ) -> None: - self._cfar_enabled = enabled - self._cfar_guard = guard - self._cfar_train = train - self._cfar_alpha_q44 = alpha_q44 - self._cfar_mode = mode - - def set_detect_threshold(self, threshold: int) -> None: - self._detect_threshold = threshold - - @property - def agc_state(self) -> AGCState: - return self._agc_state - - @property - def agc_config(self) -> AGCConfig: - return self._agc_config - - @property - def frame_number(self) -> int: - return self._frame_number - - # ---- Main processing entry point --------------------------------------- - - def process_frame( - self, - raw_frame: np.ndarray, - timestamp: float = 0.0, - ) -> tuple[RadarFrame, StatusResponse, AGCFrameResult]: - """Process one raw IQ frame through the full chain. - - Parameters - ---------- - raw_frame : complex array, shape (n_chirps, n_samples) - timestamp : frame timestamp for RadarFrame - - Returns - ------- - (radar_frame, status_response, agc_result) - """ - n_chirps, _n_samples = raw_frame.shape - self._frame_number += 1 - - # 1. Quantize to 16-bit signed IQ - frame_i, frame_q = quantize_iq(raw_frame) - - # 2. AGC - agc_result = process_agc_frame( - frame_i, frame_q, self._agc_config, self._agc_state) - - # Use AGC-shifted IQ for downstream processing - iq = agc_result.shifted_i.astype(np.float64) + 1j * agc_result.shifted_q.astype(np.float64) - - # 3. Range FFT (across samples axis) - range_fft = np.fft.fft(iq, axis=1)[:, :self._n_range] - - # 4. Doppler FFT (across chirps axis) + fftshift + centre crop - doppler_fft = np.fft.fft(range_fft, axis=0) - doppler_fft = np.fft.fftshift(doppler_fft, axes=0) - # Centre-crop to n_doppler bins - center = n_chirps // 2 - half_d = self._n_doppler // 2 - start_d = max(0, center - half_d) - end_d = start_d + self._n_doppler - if end_d > n_chirps: - end_d = n_chirps - start_d = max(0, end_d - self._n_doppler) - rd_complex = doppler_fft[start_d:end_d, :] - # shape: (n_doppler, n_range) → transpose to (n_range, n_doppler) - rd_complex = rd_complex.T - - # 5. Optional MTI (2-pulse canceller) - if self._mti_enabled: - rd_complex = self._apply_mti(rd_complex) - - # 6. Optional DC notch (zero-Doppler bins) - if self._dc_notch_width > 0: - rd_complex = self._apply_dc_notch(rd_complex) - - # Extract I/Q for RadarFrame - rd_i = np.round(rd_complex.real).astype(np.int16) - rd_q = np.round(rd_complex.imag).astype(np.int16) - - # 7. Magnitude (FPGA uses |I|+|Q| approximation) - magnitude = np.abs(rd_complex.real) + np.abs(rd_complex.imag) - - # Range profile (sum across Doppler) - range_profile = np.sum(magnitude, axis=1) - - # 8. Detection (CFAR or simple threshold) - if self._cfar_enabled: - detections = self._run_cfar(magnitude) - else: - detections = self._run_threshold(magnitude) - - detection_count = int(np.sum(detections > 0)) - - # 9. Build RadarFrame - radar_frame = RadarFrame( - timestamp=timestamp, - range_doppler_i=rd_i, - range_doppler_q=rd_q, - magnitude=magnitude, - detections=detections, - range_profile=range_profile, - detection_count=detection_count, - frame_number=self._frame_number, - ) - - # 10. Build synthetic StatusResponse - status = self._build_status(agc_result) - - return radar_frame, status, agc_result - - # ---- Internal helpers -------------------------------------------------- - - def _apply_mti(self, rd: np.ndarray) -> np.ndarray: - """2-pulse MTI canceller: y[n] = x[n] - x[n-1].""" - self._mti_history.append(rd.copy()) - if len(self._mti_history) > 2: - self._mti_history = self._mti_history[-2:] - - if len(self._mti_history) < 2: - return np.zeros_like(rd) # suppress first frame - - return self._mti_history[-1] - self._mti_history[-2] - - def _apply_dc_notch(self, rd: np.ndarray) -> np.ndarray: - """Zero out centre Doppler bins (DC notch).""" - n_doppler = rd.shape[1] - center = n_doppler // 2 - w = self._dc_notch_width - lo = max(0, center - w) - hi = min(n_doppler, center + w + 1) - result = rd.copy() - result[:, lo:hi] = 0 - return result - - def _run_cfar(self, magnitude: np.ndarray) -> np.ndarray: - """Run 1-D CFAR along each range bin (Doppler direction). - - Uses the host-side CFAR from RadarProcessor with alpha converted - from Q4.4 to float. - """ - alpha_float = self._cfar_alpha_q44 / 16.0 - cfar_types = {0: "CA-CFAR", 1: "GO-CFAR", 2: "SO-CFAR"} - cfar_type = cfar_types.get(self._cfar_mode, "CA-CFAR") - - power = magnitude ** 2 - power = np.maximum(power, 1e-20) - - mask = np.zeros_like(magnitude, dtype=np.uint8) - for r in range(magnitude.shape[0]): - row = power[r, :] - if row.max() > 0: - det = RadarProcessor.cfar_1d( - row, self._cfar_guard, self._cfar_train, - alpha_float, cfar_type) - mask[r, :] = det.astype(np.uint8) - return mask - - def _run_threshold(self, magnitude: np.ndarray) -> np.ndarray: - """Simple threshold detection (matches FPGA detect_threshold).""" - return (magnitude > self._detect_threshold).astype(np.uint8) - - def _build_status(self, agc_result: AGCFrameResult) -> StatusResponse: - """Build a synthetic StatusResponse from current processor state.""" - return StatusResponse( - radar_mode=1, # active - stream_ctrl=0b111, - cfar_threshold=self._detect_threshold, - long_chirp=3000, - long_listen=13700, - guard=17540, - short_chirp=50, - short_listen=17450, - chirps_per_elev=32, - range_mode=0, - agc_current_gain=agc_result.gain_enc, - agc_peak_magnitude=agc_result.peak_mag_8bit, - agc_saturation_count=agc_result.saturation_count, - agc_enable=1 if self._agc_config.enabled else 0, - ) diff --git a/9_Firmware/9_3_GUI/v7/raw_iq_replay.py b/9_Firmware/9_3_GUI/v7/raw_iq_replay.py deleted file mode 100644 index fc04517..0000000 --- a/9_Firmware/9_3_GUI/v7/raw_iq_replay.py +++ /dev/null @@ -1,264 +0,0 @@ -""" -v7.raw_iq_replay -- Raw IQ replay controller for the V7 dashboard. - -Manages loading of raw complex IQ .npy captures, playback state -(play/pause/step/speed/loop), and delivers frames to a worker thread. - -The controller is thread-safe: the worker calls ``next_frame()`` which -blocks until a frame is available or playback is stopped. - -Supported file formats: - - 3-D .npy: (n_frames, n_chirps, n_samples) complex - - 2-D .npy: (n_chirps, n_samples) complex -> treated as single frame - -Classes: - - RawIQReplayController -- playback state machine + frame delivery - - RawIQFileInfo -- metadata about the loaded file -""" - -from __future__ import annotations - -import logging -import threading -from dataclasses import dataclass -from enum import Enum, auto -from pathlib import Path - -import numpy as np - -logger = logging.getLogger(__name__) - - -# --------------------------------------------------------------------------- -# Playback state enum -# --------------------------------------------------------------------------- - -class PlaybackState(Enum): - """Playback state for the replay controller.""" - STOPPED = auto() - PLAYING = auto() - PAUSED = auto() - - -# --------------------------------------------------------------------------- -# File metadata -# --------------------------------------------------------------------------- - -@dataclass -class RawIQFileInfo: - """Metadata about a loaded raw IQ .npy file.""" - path: str - n_frames: int - n_chirps: int - n_samples: int - dtype: str - file_size_mb: float - - -# --------------------------------------------------------------------------- -# Replay Controller -# --------------------------------------------------------------------------- - -class RawIQReplayController: - """Manages raw IQ file loading and playback state. - - Thread-safety: the controller uses a condition variable so the worker - thread can block on ``next_frame()`` waiting for play/step events, - while the GUI thread calls ``play()``, ``pause()``, ``step()``, etc. - """ - - def __init__(self) -> None: - self._data: np.ndarray | None = None - self._info: RawIQFileInfo | None = None - - # Playback state - self._state = PlaybackState.STOPPED - self._frame_index: int = 0 - self._fps: float = 10.0 # target frames per second - self._loop: bool = True - - # Thread synchronisation - self._lock = threading.Lock() - self._cond = threading.Condition(self._lock) - - # Step request flag (set by GUI, consumed by worker) - self._step_requested: bool = False - - # Stop signal - self._stop_requested: bool = False - - # ---- File loading ------------------------------------------------------ - - def load_file(self, path: str) -> RawIQFileInfo: - """Load a .npy file containing raw IQ data. - - Raises ValueError if the file is not a valid raw IQ capture. - """ - p = Path(path) - if not p.exists(): - msg = f"File not found: {path}" - raise ValueError(msg) - if p.suffix.lower() != ".npy": - msg = f"Expected .npy file, got: {p.suffix}" - raise ValueError(msg) - - # Memory-map for large files - data = np.load(str(p), mmap_mode="r") - - if not np.iscomplexobj(data): - msg = f"Expected complex data, got dtype={data.dtype}" - raise ValueError(msg) - - # Normalise shape - if data.ndim == 2: - # Single frame: (chirps, samples) -> (1, chirps, samples) - data = data[np.newaxis, :, :] - elif data.ndim == 3: - pass # (frames, chirps, samples) — expected - else: - msg = f"Expected 2-D or 3-D array, got {data.ndim}-D" - raise ValueError(msg) - - with self._lock: - self._data = data - self._frame_index = 0 - self._state = PlaybackState.PAUSED - self._stop_requested = False - - self._info = RawIQFileInfo( - path=str(p), - n_frames=data.shape[0], - n_chirps=data.shape[1], - n_samples=data.shape[2], - dtype=str(data.dtype), - file_size_mb=p.stat().st_size / (1024 * 1024), - ) - - logger.info( - f"Loaded raw IQ: {p.name} — {self._info.n_frames} frames, " - f"{self._info.n_chirps} chirps, {self._info.n_samples} samples, " - f"{self._info.file_size_mb:.1f} MB" - ) - return self._info - - def unload(self) -> None: - """Unload the current file and stop playback.""" - with self._lock: - self._state = PlaybackState.STOPPED - self._stop_requested = True - self._data = None - self._info = None - self._frame_index = 0 - self._cond.notify_all() - - # ---- Playback control (called from GUI thread) ------------------------- - - def play(self) -> None: - with self._lock: - if self._data is None: - return - self._state = PlaybackState.PLAYING - self._stop_requested = False - self._cond.notify_all() - - def pause(self) -> None: - with self._lock: - if self._state == PlaybackState.PLAYING: - self._state = PlaybackState.PAUSED - - def stop(self) -> None: - with self._lock: - self._state = PlaybackState.STOPPED - self._stop_requested = True - self._cond.notify_all() - - def step_forward(self) -> None: - """Advance one frame (works in PAUSED state).""" - with self._lock: - if self._data is None: - return - self._step_requested = True - self._cond.notify_all() - - def seek(self, frame_index: int) -> None: - """Jump to a specific frame index.""" - with self._lock: - if self._data is None: - return - self._frame_index = max(0, min(frame_index, self._data.shape[0] - 1)) - - def set_fps(self, fps: float) -> None: - with self._lock: - self._fps = max(0.1, min(60.0, fps)) - - def set_loop(self, loop: bool) -> None: - with self._lock: - self._loop = loop - - # ---- State queries (thread-safe) --------------------------------------- - - @property - def state(self) -> PlaybackState: - with self._lock: - return self._state - - @property - def frame_index(self) -> int: - with self._lock: - return self._frame_index - - @property - def info(self) -> RawIQFileInfo | None: - with self._lock: - return self._info - - @property - def fps(self) -> float: - with self._lock: - return self._fps - - @property - def is_loaded(self) -> bool: - with self._lock: - return self._data is not None - - # ---- Frame delivery (called from worker thread) ------------------------ - - def next_frame(self) -> np.ndarray | None: - """Block until the next frame is available, then return it. - - Returns None when playback is stopped or file is unloaded. - The caller (worker thread) should use this in a loop. - """ - with self._cond: - while True: - if self._stop_requested or self._data is None: - return None - - if self._state == PlaybackState.PLAYING: - return self._deliver_frame() - - if self._step_requested: - self._step_requested = False - return self._deliver_frame() - - # PAUSED or STOPPED — wait for signal - self._cond.wait(timeout=0.1) - - def _deliver_frame(self) -> np.ndarray | None: - """Return current frame and advance index. Must hold lock.""" - if self._data is None: - return None - - n_frames = self._data.shape[0] - if self._frame_index >= n_frames: - if self._loop: - self._frame_index = 0 - else: - self._state = PlaybackState.PAUSED - return None - - # Read the frame (memory-mapped, so this is cheap) - frame = np.array(self._data[self._frame_index]) # copy from mmap - self._frame_index += 1 - return frame diff --git a/9_Firmware/9_3_GUI/v7/workers.py b/9_Firmware/9_3_GUI/v7/workers.py index 281195d..c467c98 100644 --- a/9_Firmware/9_3_GUI/v7/workers.py +++ b/9_Firmware/9_3_GUI/v7/workers.py @@ -2,14 +2,11 @@ v7.workers — QThread-based workers and demo target simulator. Classes: - - RadarDataWorker — reads from FT2232H via production RadarAcquisition, - parses 0xAA/0xBB packets, assembles 64x32 frames, - runs host-side DSP, emits PyQt signals. - - RawIQReplayWorker — reads raw IQ .npy frames from RawIQReplayController, - processes through RawIQFrameProcessor, emits same - signals as RadarDataWorker + playback state. - - GPSDataWorker — reads GPS frames from STM32 CDC, emits GPSData signals. - - TargetSimulator — QTimer-based demo target generator. + - RadarDataWorker — reads from FT2232H via production RadarAcquisition, + parses 0xAA/0xBB packets, assembles 64x32 frames, + runs host-side DSP, emits PyQt signals. + - GPSDataWorker — reads GPS frames from STM32 CDC, emits GPSData signals. + - TargetSimulator — QTimer-based demo target generator. The old V6/V7 packet parsing (sync A5 C3 + type + CRC16) has been removed. All packet parsing now uses the production radar_protocol.py which matches @@ -23,6 +20,8 @@ import queue import struct import logging +import numpy as np + from PyQt6.QtCore import QThread, QObject, QTimer, pyqtSignal from .models import RadarTarget, GPSData, RadarSettings @@ -35,11 +34,9 @@ from .hardware import ( ) from .processing import ( RadarProcessor, - RawIQFrameProcessor, USBPacketParser, - extract_targets_from_frame, + apply_pitch_correction, ) -from .raw_iq_replay import RawIQReplayController, PlaybackState logger = logging.getLogger(__name__) @@ -209,16 +206,55 @@ class RadarDataWorker(QThread): Bin-to-physical conversion uses RadarSettings.range_resolution and velocity_resolution (should be calibrated to actual waveform). """ + targets: list[RadarTarget] = [] + cfg = self._processor.config if not (cfg.clustering_enabled or cfg.tracking_enabled): - return [] + return targets - targets = extract_targets_from_frame( - frame, - self._settings.range_resolution, - self._settings.velocity_resolution, - gps=self._gps, - ) + # Extract detections from FPGA CFAR flags + det_indices = np.argwhere(frame.detections > 0) + r_res = self._settings.range_resolution + v_res = self._settings.velocity_resolution + + for idx in det_indices: + rbin, dbin = idx + mag = frame.magnitude[rbin, dbin] + snr = 10 * np.log10(max(mag, 1)) if mag > 0 else 0 + + # Convert bin indices to physical units + range_m = float(rbin) * r_res + # Doppler: centre bin (16) = 0 m/s; positive bins = approaching + velocity_ms = float(dbin - 16) * v_res + + # Apply pitch correction if GPS data available + raw_elev = 0.0 # FPGA doesn't send elevation per-detection + corr_elev = raw_elev + if self._gps: + corr_elev = apply_pitch_correction(raw_elev, self._gps.pitch) + + # Compute geographic position if GPS available + lat, lon = 0.0, 0.0 + azimuth = 0.0 # No azimuth from single-beam; set to heading + if self._gps: + azimuth = self._gps.heading + lat, lon = polar_to_geographic( + self._gps.latitude, self._gps.longitude, + range_m, azimuth, + ) + + target = RadarTarget( + id=len(targets), + range=range_m, + velocity=velocity_ms, + azimuth=azimuth, + elevation=corr_elev, + latitude=lat, + longitude=lon, + snr=snr, + timestamp=frame.timestamp, + ) + targets.append(target) # DBSCAN clustering if cfg.clustering_enabled and len(targets) > 0: @@ -232,155 +268,6 @@ class RadarDataWorker(QThread): return targets -# ============================================================================= -# Raw IQ Replay Worker (QThread) — processes raw .npy captures -# ============================================================================= - -class RawIQReplayWorker(QThread): - """Background worker for raw IQ replay mode. - - Reads frames from a RawIQReplayController, processes them through - RawIQFrameProcessor (quantize -> AGC -> FFT -> CFAR -> RadarFrame), - and emits the same signals as RadarDataWorker so the dashboard can - display them identically. - - Additional signal: - playbackStateChanged(str) — "playing", "paused", "stopped" - frameIndexChanged(int, int) — (current_index, total_frames) - - Signals: - frameReady(RadarFrame) - statusReceived(object) - targetsUpdated(list) - errorOccurred(str) - statsUpdated(dict) - playbackStateChanged(str) - frameIndexChanged(int, int) - """ - - frameReady = pyqtSignal(object) - statusReceived = pyqtSignal(object) - targetsUpdated = pyqtSignal(list) - errorOccurred = pyqtSignal(str) - statsUpdated = pyqtSignal(dict) - playbackStateChanged = pyqtSignal(str) - frameIndexChanged = pyqtSignal(int, int) - - def __init__( - self, - controller: RawIQReplayController, - processor: RawIQFrameProcessor, - host_processor: RadarProcessor | None = None, - settings: RadarSettings | None = None, - gps_data_ref: GPSData | None = None, - parent=None, - ): - super().__init__(parent) - self._controller = controller - self._processor = processor - self._host_processor = host_processor - self._settings = settings or RadarSettings() - self._gps = gps_data_ref - self._running = False - self._frame_count = 0 - self._error_count = 0 - - def stop(self): - self._running = False - self._controller.stop() - - def run(self): - self._running = True - self._frame_count = 0 - self._last_emitted_state: str | None = None - logger.info("RawIQReplayWorker started") - - info = self._controller.info - total_frames = info.n_frames if info else 0 - - while self._running: - try: - # Block until next frame or stop - raw_frame = self._controller.next_frame() - if raw_frame is None: - # Stopped or end of file - if self._running: - self.playbackStateChanged.emit("stopped") - break - - # Process through full signal chain - import time as _time - ts = _time.time() - frame, status, _agc_result = self._processor.process_frame( - raw_frame, timestamp=ts) - self._frame_count += 1 - - # Emit signals - self.frameReady.emit(frame) - self.statusReceived.emit(status) - - # Emit frame index - idx = self._controller.frame_index - self.frameIndexChanged.emit(idx, total_frames) - - # Emit playback state only on transitions (avoid race - # where a stale "playing" signal overwrites a pause) - state = self._controller.state - state_str = state.name.lower() - if state_str != self._last_emitted_state: - self._last_emitted_state = state_str - self.playbackStateChanged.emit(state_str) - - # Run host-side DSP if configured - if self._host_processor is not None: - targets = self._extract_targets(frame) - if targets: - self.targetsUpdated.emit(targets) - - # Stats - self.statsUpdated.emit({ - "frames": self._frame_count, - "detection_count": frame.detection_count, - "errors": self._error_count, - "frame_index": idx, - "total_frames": total_frames, - }) - - # Rate limiting: sleep to match target FPS - fps = self._controller.fps - if fps > 0 and self._controller.state == PlaybackState.PLAYING: - self.msleep(int(1000.0 / fps)) - - except (ValueError, IndexError) as e: - self._error_count += 1 - self.errorOccurred.emit(str(e)) - logger.error(f"RawIQReplayWorker error: {e}") - - self._running = False - logger.info("RawIQReplayWorker stopped") - - def _extract_targets(self, frame: RadarFrame) -> list[RadarTarget]: - """Extract targets from detection mask using shared bin-to-physical conversion.""" - targets = extract_targets_from_frame( - frame, - self._settings.range_resolution, - self._settings.velocity_resolution, - gps=self._gps, - ) - - # Clustering + tracking - if self._host_processor is not None: - cfg = self._host_processor.config - if cfg.clustering_enabled and len(targets) > 0: - clusters = self._host_processor.clustering( - targets, cfg.clustering_eps, cfg.clustering_min_samples) - if cfg.tracking_enabled: - targets = self._host_processor.association(targets, clusters) - self._host_processor.tracking(targets) - - return targets - - # ============================================================================= # GPS Data Worker (QThread) # =============================================================================