fix: range calibration, demo/radar mutual exclusion, AGC analysis refactor

Bug #1 — Range calibration for Raw IQ Replay:
- Add WaveformConfig dataclass (models.py) with FMCW waveform params
  (fs, BW, T_chirp, fc) and methods to compute range/velocity resolution
- Add waveform parameter spinboxes to playback controls (dashboard.py)
- Auto-parse waveform params from ADI phaser filename convention
- Create replay-specific RadarSettings with correct calibration instead
  of using FPGA defaults (781.25 m/bin → 0.334 m/bin for ADI phaser)
- Add 4 unit tests validating WaveformConfig math

Bug #2 — Demo + radar mutual exclusion:
- _start_demo() now refuses if radar is running (_running=True)
- _start_radar() stops demo first if _demo_mode is active
- Demo buttons disabled while radar/replay is running, re-enabled on stop

Bug #3 — Refactor adi_agc_analysis.py:
- Remove 60+ lines of duplicated AGC functions (signed_to_encoding,
  encoding_to_signed, clamp_gain, apply_gain_shift)
- Import from v7.agc_sim canonical implementation
- Rewrite simulate_agc() to use process_agc_frame() in a loop
- Rewrite process_frame_rd() to use quantize_iq() from agc_sim
This commit is contained in:
Jason
2026-04-14 03:19:58 +05:45
parent a16472480a
commit 609589349d
5 changed files with 270 additions and 131 deletions
+34 -127
View File
@@ -32,83 +32,24 @@ from pathlib import Path
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
import numpy as np import numpy as np
from v7.agc_sim import (
encoding_to_signed,
apply_gain_shift,
quantize_iq,
AGCConfig,
AGCState,
process_agc_frame,
)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# FPGA AGC parameters (rx_gain_control.v reset defaults) # FPGA AGC parameters (rx_gain_control.v reset defaults)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
AGC_TARGET = 200 # host_agc_target (8-bit, default 200) AGC_TARGET = 200 # host_agc_target (8-bit, default 200)
AGC_ATTACK = 1 # host_agc_attack (4-bit, default 1)
AGC_DECAY = 1 # host_agc_decay (4-bit, default 1)
AGC_HOLDOFF = 4 # host_agc_holdoff (4-bit, default 4)
ADC_RAIL = 4095 # 12-bit ADC max absolute value ADC_RAIL = 4095 # 12-bit ADC max absolute value
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Gain encoding helpers (match RTL signed_to_encoding / encoding_to_signed) # Per-frame AGC simulation using v7.agc_sim (bit-accurate to RTL)
# ---------------------------------------------------------------------------
def signed_to_encoding(g: int) -> int:
"""Convert signed gain (-7..+7) to gain_shift[3:0] encoding.
[3]=0, [2:0]=N → amplify (left shift) by N
[3]=1, [2:0]=N → attenuate (right shift) by N
"""
if g >= 0:
return g & 0x07
return 0x08 | ((-g) & 0x07)
def encoding_to_signed(enc: int) -> int:
"""Convert gain_shift[3:0] encoding to signed gain."""
if (enc & 0x08) == 0:
return enc & 0x07
return -(enc & 0x07)
def clamp_gain(val: int) -> int:
"""Clamp to [-7, +7] (matches RTL clamp_gain function)."""
return max(-7, min(7, val))
# ---------------------------------------------------------------------------
# Apply gain shift to IQ data (matches RTL combinational logic)
# ---------------------------------------------------------------------------
def apply_gain_shift(frame_i: np.ndarray, frame_q: np.ndarray,
gain_enc: int) -> tuple[np.ndarray, np.ndarray, int]:
"""Apply gain_shift encoding to 16-bit signed IQ arrays.
Returns (shifted_i, shifted_q, overflow_count).
Matches the RTL: left shift = amplify, right shift = attenuate,
saturate to ±32767 on overflow.
"""
direction = (gain_enc >> 3) & 1 # 0=amplify, 1=attenuate
amount = gain_enc & 0x07
if amount == 0:
return frame_i.copy(), frame_q.copy(), 0
if direction == 0:
# Left shift (amplify)
si = frame_i.astype(np.int64) * (1 << amount)
sq = frame_q.astype(np.int64) * (1 << amount)
else:
# Arithmetic right shift (attenuate)
si = frame_i.astype(np.int64) >> amount
sq = frame_q.astype(np.int64) >> amount
# Count overflows (post-shift values outside 16-bit signed range)
overflow_i = (si > 32767) | (si < -32768)
overflow_q = (sq > 32767) | (sq < -32768)
overflow_count = int((overflow_i | overflow_q).sum())
# Saturate to ±32767
si = np.clip(si, -32768, 32767).astype(np.int16)
sq = np.clip(sq, -32768, 32767).astype(np.int16)
return si, sq, overflow_count
# ---------------------------------------------------------------------------
# Per-frame AGC simulation (bit-accurate to rx_gain_control.v)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
def simulate_agc(frames: np.ndarray, agc_enabled: bool = True, def simulate_agc(frames: np.ndarray, agc_enabled: bool = True,
@@ -126,79 +67,46 @@ def simulate_agc(frames: np.ndarray, agc_enabled: bool = True,
n_frames = frames.shape[0] n_frames = frames.shape[0]
# Output arrays # Output arrays
out_gain_enc = np.zeros(n_frames, dtype=int) # gain_shift encoding [3:0] out_gain_enc = np.zeros(n_frames, dtype=int)
out_gain_signed = np.zeros(n_frames, dtype=int) # signed gain for plotting out_gain_signed = np.zeros(n_frames, dtype=int)
out_peak_mag = np.zeros(n_frames, dtype=int) # peak_magnitude[7:0] out_peak_mag = np.zeros(n_frames, dtype=int)
out_sat_count = np.zeros(n_frames, dtype=int) # saturation_count[7:0] out_sat_count = np.zeros(n_frames, dtype=int)
out_sat_rate = np.zeros(n_frames, dtype=float) out_sat_rate = np.zeros(n_frames, dtype=float)
out_rms_post = np.zeros(n_frames, dtype=float) # RMS after gain shift out_rms_post = np.zeros(n_frames, dtype=float)
# AGC internal state # AGC state — managed by process_agc_frame()
agc_gain = 0 # signed, -7..+7 state = AGCState(
holdoff_counter = 0 gain=encoding_to_signed(initial_gain_enc),
agc_was_enabled = False holdoff_counter=0,
was_enabled=False,
)
for i in range(n_frames): for i in range(n_frames):
frame = frames[i] frame_i, frame_q = quantize_iq(frames[i])
# Quantize to 16-bit signed (ADC is 12-bit, sign-extended to 16)
frame_i = np.clip(np.round(frame.real), -32768, 32767).astype(np.int16)
frame_q = np.clip(np.round(frame.imag), -32768, 32767).astype(np.int16)
# --- PRE-gain peak measurement (RTL lines 133-135, 211-213) ---
abs_i = np.abs(frame_i.astype(np.int32))
abs_q = np.abs(frame_q.astype(np.int32))
max_iq = np.maximum(abs_i, abs_q)
frame_peak_15bit = int(max_iq.max()) # 15-bit unsigned
peak_8bit = (frame_peak_15bit >> 7) & 0xFF # Upper 8 bits
# --- Determine effective gain ---
agc_active = agc_enabled and (i >= enable_at_frame) agc_active = agc_enabled and (i >= enable_at_frame)
# AGC enable transition (RTL lines 250-253) # Build per-frame config (enable toggles at enable_at_frame)
if agc_active and not agc_was_enabled: config = AGCConfig(enabled=agc_active)
agc_gain = encoding_to_signed(initial_gain_enc)
holdoff_counter = AGC_HOLDOFF
effective_enc = signed_to_encoding(agc_gain) if agc_active else initial_gain_enc result = process_agc_frame(frame_i, frame_q, config, state)
agc_was_enabled = agc_active
# --- Apply gain shift + count POST-gain overflow (RTL lines 114-126, 207-209) ---
shifted_i, shifted_q, frame_overflow = apply_gain_shift(
frame_i, frame_q, effective_enc)
frame_sat = min(255, frame_overflow)
# RMS of shifted signal # RMS of shifted signal
rms = float(np.sqrt(np.mean( rms = float(np.sqrt(np.mean(
shifted_i.astype(np.float64)**2 + shifted_q.astype(np.float64)**2))) result.shifted_i.astype(np.float64)**2
+ result.shifted_q.astype(np.float64)**2)))
total_samples = frame_i.size + frame_q.size total_samples = frame_i.size + frame_q.size
sat_rate = frame_overflow / total_samples if total_samples > 0 else 0.0 sat_rate = result.overflow_raw / total_samples if total_samples > 0 else 0.0
# --- Record outputs --- # Record outputs
out_gain_enc[i] = effective_enc out_gain_enc[i] = result.gain_enc
out_gain_signed[i] = agc_gain if agc_active else encoding_to_signed(initial_gain_enc) out_gain_signed[i] = result.gain_signed
out_peak_mag[i] = peak_8bit out_peak_mag[i] = result.peak_mag_8bit
out_sat_count[i] = frame_sat out_sat_count[i] = result.saturation_count
out_sat_rate[i] = sat_rate out_sat_rate[i] = sat_rate
out_rms_post[i] = rms out_rms_post[i] = rms
# --- AGC update at frame boundary (RTL lines 226-246) ---
if agc_active:
if frame_sat > 0:
# Clipping: reduce gain immediately (attack)
agc_gain = clamp_gain(agc_gain - AGC_ATTACK)
holdoff_counter = AGC_HOLDOFF
elif peak_8bit < AGC_TARGET:
# Signal too weak: increase gain after holdoff
if holdoff_counter == 0:
agc_gain = clamp_gain(agc_gain + AGC_DECAY)
else:
holdoff_counter -= 1
else:
# Good range (peak >= target, no sat): hold, reset holdoff
holdoff_counter = AGC_HOLDOFF
return { return {
"gain_enc": out_gain_enc, "gain_enc": out_gain_enc,
"gain_signed": out_gain_signed, "gain_signed": out_gain_signed,
@@ -217,8 +125,7 @@ def process_frame_rd(frame: np.ndarray, gain_enc: int,
n_range: int = 64, n_range: int = 64,
n_doppler: int = 32) -> np.ndarray: n_doppler: int = 32) -> np.ndarray:
"""Range-Doppler magnitude for one frame with gain applied.""" """Range-Doppler magnitude for one frame with gain applied."""
frame_i = np.clip(np.round(frame.real), -32768, 32767).astype(np.int16) frame_i, frame_q = quantize_iq(frame)
frame_q = np.clip(np.round(frame.imag), -32768, 32767).astype(np.int16)
si, sq, _ = apply_gain_shift(frame_i, frame_q, gain_enc) si, sq, _ = apply_gain_shift(frame_i, frame_q, gain_enc)
iq = si.astype(np.float64) + 1j * sq.astype(np.float64) iq = si.astype(np.float64) + 1j * sq.astype(np.float64)
+33
View File
@@ -69,6 +69,39 @@ class TestRadarSettings(unittest.TestCase):
self.assertEqual(s.max_distance, 50000) self.assertEqual(s.max_distance, 50000)
class TestWaveformConfig(unittest.TestCase):
"""WaveformConfig — range/velocity resolution from waveform params."""
def test_adi_phaser_range_resolution(self):
"""ADI CN0566 defaults: 4MSPS, 500MHz BW, 300µs chirp, 1079 samples."""
wf = _models().WaveformConfig() # ADI phaser defaults
r_res = wf.range_resolution(n_samples=1079)
# Expected: c * fs / (2 * N * slope) = 3e8 * 4e6 / (2 * 1079 * 1.667e12)
# ≈ 0.334 m/bin
self.assertAlmostEqual(r_res, 0.334, places=2)
def test_adi_phaser_velocity_resolution(self):
"""ADI phaser: 256 chirps, 1079 samples at 4 MSPS."""
wf = _models().WaveformConfig()
v_res = wf.velocity_resolution(n_samples=1079, n_chirps=256)
# λ * fs / (2 * N * M) = 0.03 * 4e6 / (2 * 1079 * 256) ≈ 0.217 m/s/bin
self.assertAlmostEqual(v_res, 0.217, places=2)
def test_max_range(self):
wf = _models().WaveformConfig()
max_r = wf.max_range(n_range_bins=64, n_samples=1079)
# 0.334 * 64 ≈ 21.4 m
self.assertAlmostEqual(max_r, 21.4, places=0)
def test_plfm_defaults_differ(self):
"""PLFM FPGA defaults (781.25 m/bin) must NOT equal ADI phaser."""
default_settings = _models().RadarSettings()
wf = _models().WaveformConfig()
r_res = wf.range_resolution(n_samples=1079)
self.assertNotAlmostEqual(default_settings.range_resolution, r_res,
places=0) # 781 vs 0.33
class TestGPSData(unittest.TestCase): class TestGPSData(unittest.TestCase):
def test_to_dict(self): def test_to_dict(self):
g = _models().GPSData(latitude=41.9, longitude=12.5, g = _models().GPSData(latitude=41.9, longitude=12.5,
+2 -1
View File
@@ -11,6 +11,7 @@ top-level imports:
from .models import ( from .models import (
RadarTarget, RadarTarget,
RadarSettings, RadarSettings,
WaveformConfig,
GPSData, GPSData,
ProcessingConfig, ProcessingConfig,
TileServer, TileServer,
@@ -66,7 +67,7 @@ except ImportError: # PyQt6 not installed (e.g. CI headless runner)
__all__ = [ # noqa: RUF022 __all__ = [ # noqa: RUF022
# models # models
"RadarTarget", "RadarSettings", "GPSData", "ProcessingConfig", "TileServer", "RadarTarget", "RadarSettings", "WaveformConfig", "GPSData", "ProcessingConfig", "TileServer",
"DARK_BG", "DARK_FG", "DARK_ACCENT", "DARK_HIGHLIGHT", "DARK_BORDER", "DARK_BG", "DARK_FG", "DARK_ACCENT", "DARK_HIGHLIGHT", "DARK_BORDER",
"DARK_TEXT", "DARK_BUTTON", "DARK_BUTTON_HOVER", "DARK_TEXT", "DARK_BUTTON", "DARK_BUTTON_HOVER",
"DARK_TREEVIEW", "DARK_TREEVIEW_ALT", "DARK_TREEVIEW", "DARK_TREEVIEW_ALT",
+160 -3
View File
@@ -23,6 +23,7 @@ commands sent over FT2232H.
""" """
import time import time
import re
import logging import logging
from collections import deque from collections import deque
from pathlib import Path from pathlib import Path
@@ -43,7 +44,7 @@ from matplotlib.backends.backend_qtagg import FigureCanvasQTAgg
from matplotlib.figure import Figure from matplotlib.figure import Figure
from .models import ( from .models import (
RadarTarget, RadarSettings, GPSData, ProcessingConfig, RadarTarget, RadarSettings, WaveformConfig, GPSData, ProcessingConfig,
DARK_BG, DARK_FG, DARK_ACCENT, DARK_HIGHLIGHT, DARK_BORDER, DARK_BG, DARK_FG, DARK_ACCENT, DARK_HIGHLIGHT, DARK_BORDER,
DARK_TEXT, DARK_BUTTON, DARK_BUTTON_HOVER, DARK_TEXT, DARK_BUTTON, DARK_BUTTON_HOVER,
DARK_TREEVIEW, DARK_TREEVIEW_ALT, DARK_TREEVIEW, DARK_TREEVIEW_ALT,
@@ -84,6 +85,41 @@ def _make_dspin() -> QDoubleSpinBox:
return sb return sb
def _parse_waveform_from_filename(name: str) -> WaveformConfig | None:
"""Try to extract waveform params from ADI phaser filename convention.
Expected pattern fragments (order-independent):
``<N>MSPS`` or ``<N>MSps`` → sample rate in MHz
``<N>M`` (followed by _ or end) → bandwidth in MHz
``<N>u`` (followed by _ or end) → chirp duration in µs
Returns a WaveformConfig with parsed values (defaults for un-parsed),
or None if nothing recognisable was found.
"""
cfg = WaveformConfig() # ADI phaser defaults
found = False
# Sample rate: "4MSPS" or "4MSps"
m = re.search(r"(\d+)M[Ss][Pp][Ss]", name)
if m:
cfg.sample_rate_hz = float(m.group(1)) * 1e6
found = True
# Bandwidth: "500M" (must NOT be followed by S for MSPS)
m = re.search(r"(\d+)M(?![Ss])", name)
if m:
cfg.bandwidth_hz = float(m.group(1)) * 1e6
found = True
# Chirp duration: "300u"
m = re.search(r"(\d+)u", name)
if m:
cfg.chirp_duration_s = float(m.group(1)) * 1e-6
found = True
return cfg if found else None
# ============================================================================= # =============================================================================
# Range-Doppler Canvas (matplotlib) # Range-Doppler Canvas (matplotlib)
# ============================================================================= # =============================================================================
@@ -483,6 +519,55 @@ class RadarDashboard(QMainWindow):
self._playback_frame.setVisible(False) self._playback_frame.setVisible(False)
ctrl_layout.addWidget(self._playback_frame, 2, 0, 1, 10) ctrl_layout.addWidget(self._playback_frame, 2, 0, 1, 10)
# -- Waveform config row (Raw IQ replay only) -----------------------
self._waveform_frame = QFrame()
self._waveform_frame.setStyleSheet(
f"background-color: {DARK_ACCENT}; border-radius: 4px;")
wf_layout = QHBoxLayout(self._waveform_frame)
wf_layout.setContentsMargins(8, 4, 8, 4)
wf_layout.addWidget(QLabel("Waveform:"))
wf_layout.addWidget(QLabel("fs (MHz):"))
self._wf_fs_spin = _make_dspin()
self._wf_fs_spin.setRange(0.1, 100.0)
self._wf_fs_spin.setValue(4.0)
self._wf_fs_spin.setDecimals(2)
self._wf_fs_spin.setToolTip("ADC sample rate in MHz")
wf_layout.addWidget(self._wf_fs_spin)
wf_layout.addWidget(QLabel("BW (MHz):"))
self._wf_bw_spin = _make_dspin()
self._wf_bw_spin.setRange(1.0, 5000.0)
self._wf_bw_spin.setValue(500.0)
self._wf_bw_spin.setDecimals(1)
self._wf_bw_spin.setToolTip("Chirp bandwidth in MHz")
wf_layout.addWidget(self._wf_bw_spin)
wf_layout.addWidget(QLabel("T (us):"))
self._wf_chirp_spin = _make_dspin()
self._wf_chirp_spin.setRange(1.0, 10000.0)
self._wf_chirp_spin.setValue(300.0)
self._wf_chirp_spin.setDecimals(1)
self._wf_chirp_spin.setToolTip("Chirp duration in microseconds")
wf_layout.addWidget(self._wf_chirp_spin)
wf_layout.addWidget(QLabel("fc (GHz):"))
self._wf_fc_spin = _make_dspin()
self._wf_fc_spin.setRange(0.1, 100.0)
self._wf_fc_spin.setValue(10.0)
self._wf_fc_spin.setDecimals(2)
self._wf_fc_spin.setToolTip("Carrier frequency in GHz")
wf_layout.addWidget(self._wf_fc_spin)
self._wf_res_label = QLabel("")
self._wf_res_label.setStyleSheet(f"color: {DARK_INFO}; font-size: 10px;")
wf_layout.addWidget(self._wf_res_label)
wf_layout.addStretch()
self._waveform_frame.setVisible(False)
ctrl_layout.addWidget(self._waveform_frame, 3, 0, 1, 10)
layout.addWidget(ctrl) layout.addWidget(ctrl)
# ---- Display area (range-doppler + targets table) ------------------ # ---- Display area (range-doppler + targets table) ------------------
@@ -1294,6 +1379,10 @@ class RadarDashboard(QMainWindow):
def _start_radar(self): def _start_radar(self):
"""Start radar data acquisition using production protocol.""" """Start radar data acquisition using production protocol."""
# Mutual exclusion: stop demo if running
if self._demo_mode:
self._stop_demo()
try: try:
mode = self._mode_combo.currentText() mode = self._mode_combo.currentText()
@@ -1362,6 +1451,8 @@ class RadarDashboard(QMainWindow):
self._start_btn.setEnabled(False) self._start_btn.setEnabled(False)
self._stop_btn.setEnabled(True) self._stop_btn.setEnabled(True)
self._mode_combo.setEnabled(False) self._mode_combo.setEnabled(False)
self._demo_btn_main.setEnabled(False)
self._demo_btn_map.setEnabled(False)
self._status_label_main.setText(f"Status: Running ({mode})") self._status_label_main.setText(f"Status: Running ({mode})")
self._sb_status.setText(f"Running ({mode})") self._sb_status.setText(f"Running ({mode})")
self._sb_mode.setText(mode) self._sb_mode.setText(mode)
@@ -1413,7 +1504,10 @@ class RadarDashboard(QMainWindow):
self._start_btn.setEnabled(True) self._start_btn.setEnabled(True)
self._stop_btn.setEnabled(False) self._stop_btn.setEnabled(False)
self._mode_combo.setEnabled(True) self._mode_combo.setEnabled(True)
self._demo_btn_main.setEnabled(True)
self._demo_btn_map.setEnabled(True)
self._playback_frame.setVisible(False) self._playback_frame.setVisible(False)
self._waveform_frame.setVisible(False)
self._pb_play_btn.setText("Play") self._pb_play_btn.setText("Play")
self._pb_frame_label.setText("Frame: 0 / 0") self._pb_frame_label.setText("Frame: 0 / 0")
self._pb_file_label.setText("") self._pb_file_label.setText("")
@@ -1441,9 +1535,44 @@ class RadarDashboard(QMainWindow):
self._replay_controller = RawIQReplayController() self._replay_controller = RawIQReplayController()
info = self._replay_controller.load_file(npy_path) info = self._replay_controller.load_file(npy_path)
# -- Waveform calibration: try to parse from filename -----------
parsed_wf = _parse_waveform_from_filename(Path(npy_path).name)
if parsed_wf is not None:
self._wf_fs_spin.setValue(parsed_wf.sample_rate_hz / 1e6)
self._wf_bw_spin.setValue(parsed_wf.bandwidth_hz / 1e6)
self._wf_chirp_spin.setValue(parsed_wf.chirp_duration_s / 1e-6)
self._wf_fc_spin.setValue(parsed_wf.center_freq_hz / 1e9)
logger.info("Waveform params parsed from filename: %s", parsed_wf)
# Build waveform config from (possibly updated) spinboxes
wfc = self._waveform_config_from_ui()
range_res = wfc.range_resolution(info.n_samples)
vel_res = wfc.velocity_resolution(info.n_samples, info.n_chirps)
n_range_out = min(64, info.n_samples)
max_range = range_res * n_range_out
# Create replay-specific RadarSettings with correct calibration
replay_settings = RadarSettings(
system_frequency=wfc.center_freq_hz,
range_resolution=range_res,
velocity_resolution=vel_res,
max_distance=max_range,
map_size=max_range * 1.2,
coverage_radius=max_range * 1.2,
)
logger.info(
"Replay calibration: range_res=%.4f m/bin, vel_res=%.4f m/s/bin, "
"max_range=%.1f m",
range_res, vel_res, max_range,
)
# Update coverage/map spinboxes to match replay scale
self._coverage_spin.setValue(replay_settings.coverage_radius / 1000)
self._update_waveform_res_label(info.n_samples, info.n_chirps)
# Create frame processor # Create frame processor
self._iq_processor = RawIQFrameProcessor( self._iq_processor = RawIQFrameProcessor(
n_range_out=min(64, info.n_samples), n_range_out=n_range_out,
n_doppler_out=min(32, info.n_chirps), n_doppler_out=min(32, info.n_chirps),
) )
@@ -1493,7 +1622,7 @@ class RadarDashboard(QMainWindow):
controller=self._replay_controller, controller=self._replay_controller,
processor=self._iq_processor, processor=self._iq_processor,
host_processor=self._processor, host_processor=self._processor,
settings=self._settings, settings=replay_settings,
gps_data_ref=self._radar_position, gps_data_ref=self._radar_position,
) )
self._replay_worker.frameReady.connect(self._on_frame_ready) self._replay_worker.frameReady.connect(self._on_frame_ready)
@@ -1518,7 +1647,10 @@ class RadarDashboard(QMainWindow):
self._start_btn.setEnabled(False) self._start_btn.setEnabled(False)
self._stop_btn.setEnabled(True) self._stop_btn.setEnabled(True)
self._mode_combo.setEnabled(False) self._mode_combo.setEnabled(False)
self._demo_btn_main.setEnabled(False)
self._demo_btn_map.setEnabled(False)
self._playback_frame.setVisible(True) self._playback_frame.setVisible(True)
self._waveform_frame.setVisible(True)
self._pb_frame_label.setText(f"Frame: 0 / {info.n_frames}") self._pb_frame_label.setText(f"Frame: 0 / {info.n_frames}")
self._pb_file_label.setText( self._pb_file_label.setText(
f"{Path(npy_path).name} " f"{Path(npy_path).name} "
@@ -1568,6 +1700,27 @@ class RadarDashboard(QMainWindow):
if self._replay_controller is not None: if self._replay_controller is not None:
self._replay_controller.set_loop(checked) self._replay_controller.set_loop(checked)
def _waveform_config_from_ui(self) -> WaveformConfig:
"""Build a WaveformConfig from the waveform spinboxes."""
return WaveformConfig(
sample_rate_hz=self._wf_fs_spin.value() * 1e6,
bandwidth_hz=self._wf_bw_spin.value() * 1e6,
chirp_duration_s=self._wf_chirp_spin.value() * 1e-6,
center_freq_hz=self._wf_fc_spin.value() * 1e9,
)
def _update_waveform_res_label(self, n_samples: int, n_chirps: int) -> None:
"""Update the waveform resolution info label."""
wfc = self._waveform_config_from_ui()
r_res = wfc.range_resolution(n_samples)
v_res = wfc.velocity_resolution(n_samples, n_chirps)
n_r = min(64, n_samples)
max_r = r_res * n_r
self._wf_res_label.setText(
f"Range: {r_res:.3f} m/bin | Vel: {v_res:.3f} m/s/bin | "
f"Max range: {max_r:.1f} m ({n_r} bins)"
)
@pyqtSlot(str) @pyqtSlot(str)
def _on_playback_state_changed(self, state_str: str): def _on_playback_state_changed(self, state_str: str):
if state_str == "playing": if state_str == "playing":
@@ -1591,6 +1744,10 @@ class RadarDashboard(QMainWindow):
def _start_demo(self): def _start_demo(self):
if self._simulator: if self._simulator:
return return
# Mutual exclusion: do not start demo while radar/replay is running
if self._running:
logger.warning("Cannot start demo while radar is running")
return
self._simulator = TargetSimulator(self._radar_position, self) self._simulator = TargetSimulator(self._radar_position, self)
self._simulator.targetsUpdated.connect(self._on_demo_targets) self._simulator.targetsUpdated.connect(self._on_demo_targets)
self._simulator.start(500) self._simulator.start(500)
+41
View File
@@ -96,6 +96,47 @@ class RadarTarget:
return asdict(self) return asdict(self)
@dataclass
class WaveformConfig:
"""FMCW waveform parameters for bin-to-physical-unit conversion.
Defaults are for the ADI CN0566 phaser (10 GHz, 500 MHz BW, 300 µs chirp,
4 MSPS ADC). For the PLFM FPGA waveform the values differ — but the FPGA
pipeline hardcodes its own bin widths, so this config is only used for
Raw IQ Replay (host-side FFT processing).
"""
sample_rate_hz: float = 4e6 # ADC sample rate (Hz)
bandwidth_hz: float = 500e6 # Chirp bandwidth (Hz)
chirp_duration_s: float = 300e-6 # Chirp sweep time (s)
center_freq_hz: float = 10e9 # Carrier frequency (Hz)
# --- derived quantities (need n_samples, n_chirps from file) ---
def range_resolution(self, n_samples: int) -> float:
"""Metres per range bin for an N-point range FFT.
range_per_bin = c · fs / (2 · N · slope)
where slope = BW / T_chirp.
"""
c = 299_792_458.0
slope = self.bandwidth_hz / self.chirp_duration_s
return c * self.sample_rate_hz / (2.0 * n_samples * slope)
def velocity_resolution(self, n_samples: int, n_chirps: int) -> float:
"""m/s per Doppler bin for an M-chirp Doppler FFT.
vel_per_bin = λ · fs / (2 · N · M)
where λ = c / fc, N = n_samples (PRI = N/fs), M = n_chirps.
"""
c = 299_792_458.0
wavelength = c / self.center_freq_hz
return wavelength * self.sample_rate_hz / (2.0 * n_samples * n_chirps)
def max_range(self, n_range_bins: int, n_samples: int) -> float:
"""Maximum unambiguous range for the given bin count."""
return self.range_resolution(n_samples) * n_range_bins
@dataclass @dataclass
class RadarSettings: class RadarSettings:
"""Radar system display/map configuration. """Radar system display/map configuration.