From f895c0244c6dad78518b3438aa8f8abb3b45223a Mon Sep 17 00:00:00 2001 From: Serhii Date: Sun, 19 Apr 2026 19:28:03 +0300 Subject: [PATCH 1/3] fix(v7): align live host-DSP units with replay path Use WaveformConfig for live range/velocity conversion in RadarDataWorker and add headless AST-based regression checks in test_v7.py. Before: RadarDataWorker._run_host_dsp used RadarSettings.velocity_resolution (default 1.0 in models.py:113), while ReplayWorker used WaveformConfig (~5.343 m/s/bin). Live GUI under-reported velocity by factor ~5.34x. Fix: local WaveformConfig() in _run_host_dsp, mirroring ReplayWorker pattern. Doppler center derived from frame shape, matching processing.py:520. Test: TestLiveReplayPhysicalUnitsParity in test_v7.py uses ast.parse on workers.py (no v7.workers import, headless-CI-safe despite PyQt6 dep) and asserts AST Call/Attribute/BinOp nodes for both RadarDataWorker and ReplayWorker paths. --- 9_Firmware/9_3_GUI/test_v7.py | 126 +++++++++++++++++++++++++++++++ 9_Firmware/9_3_GUI/v7/workers.py | 18 +++-- 2 files changed, 137 insertions(+), 7 deletions(-) diff --git a/9_Firmware/9_3_GUI/test_v7.py b/9_Firmware/9_3_GUI/test_v7.py index 636c5d4..0d93862 100644 --- a/9_Firmware/9_3_GUI/test_v7.py +++ b/9_Firmware/9_3_GUI/test_v7.py @@ -586,6 +586,132 @@ class TestSoftwareFPGA(unittest.TestCase): self.assertEqual(fpga.agc_holdoff, 0x0F) +# ============================================================================ +# Test: live vs replay physical-unit parity — regression guard for unit drift +# +# Uses AST parse of workers.py (not inspect.getsource / import) so the test +# runs in headless CI without PyQt6 — v7.workers imports PyQt6 unconditionally +# at workers.py:24, and other worker tests here already use skipUnless( +# _pyqt6_available()). Contract enforcement must not be gated on GUI deps. +# +# Asserts on AST nodes (Call / Attribute / BinOp), not source substrings, so +# false-pass on comments or docstring wording is impossible. +# ============================================================================ + + +class TestLiveReplayPhysicalUnitsParity(unittest.TestCase): + """Contract: live path (RadarDataWorker._run_host_dsp) and replay path + (ReplayWorker._emit_frame) both derive bin-to-physical conversion from + WaveformConfig — same source of truth, identical (range_m, velocity_ms) + for identical detections. + + Regression context: before the fix, live path used + RadarSettings.velocity_resolution (default 1.0 in models.py:113) while + replay used WaveformConfig.velocity_resolution_mps (~5.343). Live GUI + therefore under-reported velocity by factor ~5.34x vs replay for + identical frames. See test_v7.py:449 for the WaveformConfig pin. + """ + + @staticmethod + def _parse_method(class_name: str, method_name: str): + """Return AST FunctionDef for class_name.method_name from workers.py, + without importing v7.workers (PyQt6-independent).""" + import ast + from pathlib import Path + path = Path(__file__).parent / "v7" / "workers.py" + tree = ast.parse(path.read_text(encoding="utf-8")) + for node in tree.body: + if isinstance(node, ast.ClassDef) and node.name == class_name: + for item in node.body: + if isinstance(item, ast.FunctionDef) and item.name == method_name: + return item + raise RuntimeError(f"{class_name}.{method_name} not found in workers.py") + + @staticmethod + def _has_attribute_chain(tree, chain): + """True if AST tree contains a dotted attribute access matching chain. + + Chain ('self', '_settings', 'range_resolution') matches + ``self._settings.range_resolution`` exactly. + """ + import ast + for n in ast.walk(tree): + if isinstance(n, ast.Attribute): + parts = [n.attr] + cur = n.value + while isinstance(cur, ast.Attribute): + parts.append(cur.attr) + cur = cur.value + if isinstance(cur, ast.Name): + parts.append(cur.id) + parts.reverse() + if tuple(parts) == tuple(chain): + return True + return False + + @staticmethod + def _has_call_to(tree, func_name): + """True if AST tree contains a call to a bare name (func_name()).""" + import ast + for n in ast.walk(tree): + if (isinstance(n, ast.Call) and isinstance(n.func, ast.Name) + and n.func.id == func_name): + return True + return False + + @staticmethod + def _has_dbin_minus(tree, literal): + """True if AST tree contains ``dbin - `` binary op.""" + import ast + for n in ast.walk(tree): + if (isinstance(n, ast.BinOp) and isinstance(n.op, ast.Sub) + and isinstance(n.left, ast.Name) and n.left.id == "dbin" + and isinstance(n.right, ast.Constant) + and n.right.value == literal): + return True + return False + + def test_live_path_uses_waveform_config(self): + """_run_host_dsp must call WaveformConfig() and read + wf.range_resolution_m / wf.velocity_resolution_mps — not + self._settings.range_resolution / velocity_resolution.""" + method = self._parse_method("RadarDataWorker", "_run_host_dsp") + self.assertTrue(self._has_call_to(method, "WaveformConfig"), + "Live path must instantiate WaveformConfig().") + self.assertTrue(self._has_attribute_chain(method, ("wf", "range_resolution_m")), + "Live path must read wf.range_resolution_m.") + self.assertTrue(self._has_attribute_chain(method, ("wf", "velocity_resolution_mps")), + "Live path must read wf.velocity_resolution_mps. " + "RadarSettings.velocity_resolution default 1.0 caused ~5.34x " + "underreport vs replay (test_v7.py:449 pins ~5.343).") + self.assertFalse(self._has_attribute_chain( + method, ("self", "_settings", "range_resolution")), + "Live path still reads stale RadarSettings.range_resolution.") + self.assertFalse(self._has_attribute_chain( + method, ("self", "_settings", "velocity_resolution")), + "Live path still reads stale RadarSettings.velocity_resolution.") + + def test_live_path_doppler_center_not_hardcoded(self): + """_run_host_dsp must derive doppler_center from frame shape, not + use hardcoded ``dbin - 16`` — mirrors processing.py:520.""" + method = self._parse_method("RadarDataWorker", "_run_host_dsp") + self.assertFalse(self._has_dbin_minus(method, 16), + "Hardcoded doppler_center=16 breaks if frame shape changes. " + "Use frame.detections.shape[1] // 2 like processing.py:520.") + + def test_replay_path_still_uses_waveform_config(self): + """Parity half: replay path (ReplayWorker._emit_frame) must keep + reading self._waveform.range_resolution_m / velocity_resolution_mps — + guards against someone breaking the replay side of the invariant.""" + method = self._parse_method("ReplayWorker", "_emit_frame") + self.assertTrue(self._has_attribute_chain( + method, ("self", "_waveform", "range_resolution_m")), + "Replay path lost WaveformConfig range source of truth.") + self.assertTrue(self._has_attribute_chain( + method, ("self", "_waveform", "velocity_resolution_mps")), + "Replay path lost WaveformConfig velocity source of truth.") + + class TestSoftwareFPGASignalChain(unittest.TestCase): """SoftwareFPGA.process_chirps with real co-sim data.""" diff --git a/9_Firmware/9_3_GUI/v7/workers.py b/9_Firmware/9_3_GUI/v7/workers.py index 6bf115f..727c72b 100644 --- a/9_Firmware/9_3_GUI/v7/workers.py +++ b/9_Firmware/9_3_GUI/v7/workers.py @@ -23,7 +23,7 @@ import numpy as np from PyQt6.QtCore import QThread, QObject, QTimer, pyqtSignal -from .models import RadarTarget, GPSData, RadarSettings +from .models import RadarTarget, GPSData, RadarSettings, WaveformConfig from .hardware import ( RadarAcquisition, RadarFrame, @@ -169,8 +169,8 @@ class RadarDataWorker(QThread): The FPGA already does: FFT, MTI, CFAR, DC notch. Host-side DSP adds: clustering, tracking, geo-coordinate mapping. - Bin-to-physical conversion uses RadarSettings.range_resolution - and velocity_resolution (should be calibrated to actual waveform). + Bin-to-physical conversion uses WaveformConfig defaults to keep + live and replay units aligned (same source of truth as ReplayWorker). """ targets: list[RadarTarget] = [] @@ -180,8 +180,11 @@ class RadarDataWorker(QThread): # Extract detections from FPGA CFAR flags det_indices = np.argwhere(frame.detections > 0) - r_res = self._settings.range_resolution - v_res = self._settings.velocity_resolution + wf = WaveformConfig() + r_res = wf.range_resolution_m + v_res = wf.velocity_resolution_mps + n_doppler = frame.detections.shape[1] if frame.detections.ndim == 2 else 32 + doppler_center = n_doppler // 2 for idx in det_indices: rbin, dbin = idx @@ -190,8 +193,9 @@ class RadarDataWorker(QThread): # Convert bin indices to physical units range_m = float(rbin) * r_res - # Doppler: centre bin (16) = 0 m/s; positive bins = approaching - velocity_ms = float(dbin - 16) * v_res + # Doppler: centre bin = 0 m/s; positive bins = approaching. + # Derived from frame shape — mirrors processing.py:520. + velocity_ms = float(dbin - doppler_center) * v_res # Apply pitch correction if GPS data available raw_elev = 0.0 # FPGA doesn't send elevation per-detection From ebd96c90ce9bcc40ddc29afe377698c97808496f Mon Sep 17 00:00:00 2001 From: Jason <83615043+JJassonn69@users.noreply.github.com> Date: Tue, 21 Apr 2026 02:35:53 +0545 Subject: [PATCH 2/3] fix(v7): store WaveformConfig on self; add set_waveform parity; fix magic 32 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Move WaveformConfig() from per-frame local in _run_host_dsp to self._waveform in __init__, mirroring ReplayWorker pattern. - Add set_waveform() to RadarDataWorker for injection symmetry with ReplayWorker.set_waveform() — live path is now configurable. - Replace hardcoded fallback 32 with self._waveform.n_doppler_bins. - Update AST contract tests: WaveformConfig() check moves to __init__ parse; attribute chains updated from ("wf", ...) to ("self", "_waveform", ...) to match renamed accessor. --- 9_Firmware/9_3_GUI/test_v7.py | 21 ++++++++++++--------- 9_Firmware/9_3_GUI/v7/workers.py | 15 +++++++++------ 2 files changed, 21 insertions(+), 15 deletions(-) diff --git a/9_Firmware/9_3_GUI/test_v7.py b/9_Firmware/9_3_GUI/test_v7.py index 0d93862..7000d1b 100644 --- a/9_Firmware/9_3_GUI/test_v7.py +++ b/9_Firmware/9_3_GUI/test_v7.py @@ -672,16 +672,19 @@ class TestLiveReplayPhysicalUnitsParity(unittest.TestCase): return False def test_live_path_uses_waveform_config(self): - """_run_host_dsp must call WaveformConfig() and read - wf.range_resolution_m / wf.velocity_resolution_mps — not - self._settings.range_resolution / velocity_resolution.""" + """RadarDataWorker.__init__ must instantiate WaveformConfig() into + self._waveform; _run_host_dsp must read self._waveform.range_resolution_m + / velocity_resolution_mps — not self._settings equivalents.""" + init = self._parse_method("RadarDataWorker", "__init__") + self.assertTrue(self._has_call_to(init, "WaveformConfig"), + "RadarDataWorker.__init__ must instantiate WaveformConfig() into self._waveform.") method = self._parse_method("RadarDataWorker", "_run_host_dsp") - self.assertTrue(self._has_call_to(method, "WaveformConfig"), - "Live path must instantiate WaveformConfig().") - self.assertTrue(self._has_attribute_chain(method, ("wf", "range_resolution_m")), - "Live path must read wf.range_resolution_m.") - self.assertTrue(self._has_attribute_chain(method, ("wf", "velocity_resolution_mps")), - "Live path must read wf.velocity_resolution_mps. " + self.assertTrue( + self._has_attribute_chain(method, ("self", "_waveform", "range_resolution_m")), + "Live path must read self._waveform.range_resolution_m.") + self.assertTrue( + self._has_attribute_chain(method, ("self", "_waveform", "velocity_resolution_mps")), + "Live path must read self._waveform.velocity_resolution_mps. " "RadarSettings.velocity_resolution default 1.0 caused ~5.34x " "underreport vs replay (test_v7.py:449 pins ~5.343).") self.assertFalse(self._has_attribute_chain( diff --git a/9_Firmware/9_3_GUI/v7/workers.py b/9_Firmware/9_3_GUI/v7/workers.py index 727c72b..6d80615 100644 --- a/9_Firmware/9_3_GUI/v7/workers.py +++ b/9_Firmware/9_3_GUI/v7/workers.py @@ -84,6 +84,7 @@ class RadarDataWorker(QThread): self._recorder = recorder self._gps = gps_data_ref self._settings = settings or RadarSettings() + self._waveform = WaveformConfig() self._running = False # Frame queue for production RadarAcquisition → this thread @@ -97,6 +98,9 @@ class RadarDataWorker(QThread): self._byte_count = 0 self._error_count = 0 + def set_waveform(self, wf: "WaveformConfig") -> None: + self._waveform = wf + def stop(self): self._running = False if self._acquisition: @@ -169,8 +173,8 @@ class RadarDataWorker(QThread): The FPGA already does: FFT, MTI, CFAR, DC notch. Host-side DSP adds: clustering, tracking, geo-coordinate mapping. - Bin-to-physical conversion uses WaveformConfig defaults to keep - live and replay units aligned (same source of truth as ReplayWorker). + Bin-to-physical conversion uses self._waveform (WaveformConfig) to keep + live and replay units aligned. Override via set_waveform() if needed. """ targets: list[RadarTarget] = [] @@ -180,10 +184,9 @@ class RadarDataWorker(QThread): # Extract detections from FPGA CFAR flags det_indices = np.argwhere(frame.detections > 0) - wf = WaveformConfig() - r_res = wf.range_resolution_m - v_res = wf.velocity_resolution_mps - n_doppler = frame.detections.shape[1] if frame.detections.ndim == 2 else 32 + r_res = self._waveform.range_resolution_m + v_res = self._waveform.velocity_resolution_mps + n_doppler = frame.detections.shape[1] if frame.detections.ndim == 2 else self._waveform.n_doppler_bins doppler_center = n_doppler // 2 for idx in det_indices: From f48448970b1a26b0540ab77199ac58cc5c3b9631 Mon Sep 17 00:00:00 2001 From: Jason <83615043+JJassonn69@users.noreply.github.com> Date: Tue, 21 Apr 2026 02:40:21 +0545 Subject: [PATCH 3/3] fix(v7): wrap long n_doppler fallback line for ruff E501 Line exceeded 100-char limit; wrap with parentheses to stay within project line-length setting. --- 9_Firmware/9_3_GUI/v7/workers.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/9_Firmware/9_3_GUI/v7/workers.py b/9_Firmware/9_3_GUI/v7/workers.py index 6d80615..16c28e8 100644 --- a/9_Firmware/9_3_GUI/v7/workers.py +++ b/9_Firmware/9_3_GUI/v7/workers.py @@ -186,7 +186,8 @@ class RadarDataWorker(QThread): det_indices = np.argwhere(frame.detections > 0) r_res = self._waveform.range_resolution_m v_res = self._waveform.velocity_resolution_mps - n_doppler = frame.detections.shape[1] if frame.detections.ndim == 2 else self._waveform.n_doppler_bins + n_doppler = (frame.detections.shape[1] if frame.detections.ndim == 2 + else self._waveform.n_doppler_bins) doppler_center = n_doppler // 2 for idx in det_indices: