fix(v7): align live host-DSP units with replay path

Use WaveformConfig for live range/velocity conversion in RadarDataWorker
and add headless AST-based regression checks in test_v7.py.

Before: RadarDataWorker._run_host_dsp used RadarSettings.velocity_resolution
(default 1.0 in models.py:113), while ReplayWorker used WaveformConfig
(~5.343 m/s/bin). Live GUI under-reported velocity by factor ~5.34x.

Fix: local WaveformConfig() in _run_host_dsp, mirroring ReplayWorker
pattern. Doppler center derived from frame shape, matching processing.py:520.

Test: TestLiveReplayPhysicalUnitsParity in test_v7.py uses ast.parse on
workers.py (no v7.workers import, headless-CI-safe despite PyQt6 dep)
and asserts AST Call/Attribute/BinOp nodes for both RadarDataWorker
and ReplayWorker paths.
This commit is contained in:
Serhii
2026-04-19 19:28:03 +03:00
parent c82b25f7a0
commit f895c0244c
2 changed files with 137 additions and 7 deletions
+126
View File
@@ -586,6 +586,132 @@ class TestSoftwareFPGA(unittest.TestCase):
self.assertEqual(fpga.agc_holdoff, 0x0F) self.assertEqual(fpga.agc_holdoff, 0x0F)
# ============================================================================
# Test: live vs replay physical-unit parity — regression guard for unit drift
#
# Uses AST parse of workers.py (not inspect.getsource / import) so the test
# runs in headless CI without PyQt6 — v7.workers imports PyQt6 unconditionally
# at workers.py:24, and other worker tests here already use skipUnless(
# _pyqt6_available()). Contract enforcement must not be gated on GUI deps.
#
# Asserts on AST nodes (Call / Attribute / BinOp), not source substrings, so
# false-pass on comments or docstring wording is impossible.
# ============================================================================
class TestLiveReplayPhysicalUnitsParity(unittest.TestCase):
"""Contract: live path (RadarDataWorker._run_host_dsp) and replay path
(ReplayWorker._emit_frame) both derive bin-to-physical conversion from
WaveformConfig — same source of truth, identical (range_m, velocity_ms)
for identical detections.
Regression context: before the fix, live path used
RadarSettings.velocity_resolution (default 1.0 in models.py:113) while
replay used WaveformConfig.velocity_resolution_mps (~5.343). Live GUI
therefore under-reported velocity by factor ~5.34x vs replay for
identical frames. See test_v7.py:449 for the WaveformConfig pin.
"""
@staticmethod
def _parse_method(class_name: str, method_name: str):
"""Return AST FunctionDef for class_name.method_name from workers.py,
without importing v7.workers (PyQt6-independent)."""
import ast
from pathlib import Path
path = Path(__file__).parent / "v7" / "workers.py"
tree = ast.parse(path.read_text(encoding="utf-8"))
for node in tree.body:
if isinstance(node, ast.ClassDef) and node.name == class_name:
for item in node.body:
if isinstance(item, ast.FunctionDef) and item.name == method_name:
return item
raise RuntimeError(f"{class_name}.{method_name} not found in workers.py")
@staticmethod
def _has_attribute_chain(tree, chain):
"""True if AST tree contains a dotted attribute access matching chain.
Chain ('self', '_settings', 'range_resolution') matches
``self._settings.range_resolution`` exactly.
"""
import ast
for n in ast.walk(tree):
if isinstance(n, ast.Attribute):
parts = [n.attr]
cur = n.value
while isinstance(cur, ast.Attribute):
parts.append(cur.attr)
cur = cur.value
if isinstance(cur, ast.Name):
parts.append(cur.id)
parts.reverse()
if tuple(parts) == tuple(chain):
return True
return False
@staticmethod
def _has_call_to(tree, func_name):
"""True if AST tree contains a call to a bare name (func_name())."""
import ast
for n in ast.walk(tree):
if (isinstance(n, ast.Call) and isinstance(n.func, ast.Name)
and n.func.id == func_name):
return True
return False
@staticmethod
def _has_dbin_minus(tree, literal):
"""True if AST tree contains ``dbin - <literal>`` binary op."""
import ast
for n in ast.walk(tree):
if (isinstance(n, ast.BinOp) and isinstance(n.op, ast.Sub)
and isinstance(n.left, ast.Name) and n.left.id == "dbin"
and isinstance(n.right, ast.Constant)
and n.right.value == literal):
return True
return False
def test_live_path_uses_waveform_config(self):
"""_run_host_dsp must call WaveformConfig() and read
wf.range_resolution_m / wf.velocity_resolution_mps — not
self._settings.range_resolution / velocity_resolution."""
method = self._parse_method("RadarDataWorker", "_run_host_dsp")
self.assertTrue(self._has_call_to(method, "WaveformConfig"),
"Live path must instantiate WaveformConfig().")
self.assertTrue(self._has_attribute_chain(method, ("wf", "range_resolution_m")),
"Live path must read wf.range_resolution_m.")
self.assertTrue(self._has_attribute_chain(method, ("wf", "velocity_resolution_mps")),
"Live path must read wf.velocity_resolution_mps. "
"RadarSettings.velocity_resolution default 1.0 caused ~5.34x "
"underreport vs replay (test_v7.py:449 pins ~5.343).")
self.assertFalse(self._has_attribute_chain(
method, ("self", "_settings", "range_resolution")),
"Live path still reads stale RadarSettings.range_resolution.")
self.assertFalse(self._has_attribute_chain(
method, ("self", "_settings", "velocity_resolution")),
"Live path still reads stale RadarSettings.velocity_resolution.")
def test_live_path_doppler_center_not_hardcoded(self):
"""_run_host_dsp must derive doppler_center from frame shape, not
use hardcoded ``dbin - 16`` — mirrors processing.py:520."""
method = self._parse_method("RadarDataWorker", "_run_host_dsp")
self.assertFalse(self._has_dbin_minus(method, 16),
"Hardcoded doppler_center=16 breaks if frame shape changes. "
"Use frame.detections.shape[1] // 2 like processing.py:520.")
def test_replay_path_still_uses_waveform_config(self):
"""Parity half: replay path (ReplayWorker._emit_frame) must keep
reading self._waveform.range_resolution_m / velocity_resolution_mps —
guards against someone breaking the replay side of the invariant."""
method = self._parse_method("ReplayWorker", "_emit_frame")
self.assertTrue(self._has_attribute_chain(
method, ("self", "_waveform", "range_resolution_m")),
"Replay path lost WaveformConfig range source of truth.")
self.assertTrue(self._has_attribute_chain(
method, ("self", "_waveform", "velocity_resolution_mps")),
"Replay path lost WaveformConfig velocity source of truth.")
class TestSoftwareFPGASignalChain(unittest.TestCase): class TestSoftwareFPGASignalChain(unittest.TestCase):
"""SoftwareFPGA.process_chirps with real co-sim data.""" """SoftwareFPGA.process_chirps with real co-sim data."""
+11 -7
View File
@@ -23,7 +23,7 @@ import numpy as np
from PyQt6.QtCore import QThread, QObject, QTimer, pyqtSignal from PyQt6.QtCore import QThread, QObject, QTimer, pyqtSignal
from .models import RadarTarget, GPSData, RadarSettings from .models import RadarTarget, GPSData, RadarSettings, WaveformConfig
from .hardware import ( from .hardware import (
RadarAcquisition, RadarAcquisition,
RadarFrame, RadarFrame,
@@ -169,8 +169,8 @@ class RadarDataWorker(QThread):
The FPGA already does: FFT, MTI, CFAR, DC notch. The FPGA already does: FFT, MTI, CFAR, DC notch.
Host-side DSP adds: clustering, tracking, geo-coordinate mapping. Host-side DSP adds: clustering, tracking, geo-coordinate mapping.
Bin-to-physical conversion uses RadarSettings.range_resolution Bin-to-physical conversion uses WaveformConfig defaults to keep
and velocity_resolution (should be calibrated to actual waveform). live and replay units aligned (same source of truth as ReplayWorker).
""" """
targets: list[RadarTarget] = [] targets: list[RadarTarget] = []
@@ -180,8 +180,11 @@ class RadarDataWorker(QThread):
# Extract detections from FPGA CFAR flags # Extract detections from FPGA CFAR flags
det_indices = np.argwhere(frame.detections > 0) det_indices = np.argwhere(frame.detections > 0)
r_res = self._settings.range_resolution wf = WaveformConfig()
v_res = self._settings.velocity_resolution r_res = wf.range_resolution_m
v_res = wf.velocity_resolution_mps
n_doppler = frame.detections.shape[1] if frame.detections.ndim == 2 else 32
doppler_center = n_doppler // 2
for idx in det_indices: for idx in det_indices:
rbin, dbin = idx rbin, dbin = idx
@@ -190,8 +193,9 @@ class RadarDataWorker(QThread):
# Convert bin indices to physical units # Convert bin indices to physical units
range_m = float(rbin) * r_res range_m = float(rbin) * r_res
# Doppler: centre bin (16) = 0 m/s; positive bins = approaching # Doppler: centre bin = 0 m/s; positive bins = approaching.
velocity_ms = float(dbin - 16) * v_res # Derived from frame shape — mirrors processing.py:520.
velocity_ms = float(dbin - doppler_center) * v_res
# Apply pitch correction if GPS data available # Apply pitch correction if GPS data available
raw_elev = 0.0 # FPGA doesn't send elevation per-detection raw_elev = 0.0 # FPGA doesn't send elevation per-detection