feat: Phase C co-sim tests + Doppler compare wired into CI
- Add 15 Tier 5 pipeline co-sim tests (cross-layer): decimator, MTI, Doppler, DC notch, CFAR, and full-chain golden match - Add 4 SoftwareFPGA tests (test_v7): golden fixture integrity, process_chirps wiring validation against manual chain - Wire compare_doppler.py into run_regression.sh with 3 scenarios (stationary, moving, two_targets) replacing single unvalidated run - Harden _load_npy to raise on missing files; expand sentinel check to all 14 golden artifacts - DC notch test verifies both zero and pass-through bins Test counts: 172 python + 55 cross-layer + 21 MCU + 29 FPGA = 277 total
This commit is contained in:
@@ -315,6 +315,67 @@ run_mf_cosim() {
|
|||||||
PASS=$((PASS + 1))
|
PASS=$((PASS + 1))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Helper: compile, run, and compare a Doppler co-sim scenario
|
||||||
|
# run_doppler_cosim <scenario_name> <define_flag>
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
run_doppler_cosim() {
|
||||||
|
local name="$1"
|
||||||
|
local define="$2"
|
||||||
|
local vvp="tb/tb_doppler_cosim_${name}.vvp"
|
||||||
|
|
||||||
|
printf " %-45s " "Doppler Co-Sim ($name)"
|
||||||
|
|
||||||
|
# Compile — build command as string to handle optional define
|
||||||
|
local cmd="iverilog -g2001 -DSIMULATION"
|
||||||
|
if [[ -n "$define" ]]; then
|
||||||
|
cmd="$cmd $define"
|
||||||
|
fi
|
||||||
|
cmd="$cmd -o $vvp tb/tb_doppler_cosim.v doppler_processor.v xfft_16.v fft_engine.v"
|
||||||
|
|
||||||
|
if ! eval "$cmd" 2>/tmp/iverilog_err_$$; then
|
||||||
|
echo -e "${RED}COMPILE FAIL${NC}"
|
||||||
|
ERRORS="$ERRORS\n Doppler Co-Sim ($name): compile error ($(head -1 /tmp/iverilog_err_$$))"
|
||||||
|
FAIL=$((FAIL + 1))
|
||||||
|
return
|
||||||
|
fi
|
||||||
|
|
||||||
|
# Run TB
|
||||||
|
local output
|
||||||
|
output=$(timeout 120 vvp "$vvp" 2>&1) || true
|
||||||
|
rm -f "$vvp"
|
||||||
|
|
||||||
|
# Check TB internal pass/fail
|
||||||
|
local tb_fail
|
||||||
|
tb_fail=$(echo "$output" | grep -Ec '^\[FAIL' || true)
|
||||||
|
if [[ "$tb_fail" -gt 0 ]]; then
|
||||||
|
echo -e "${RED}FAIL${NC} (TB internal failure)"
|
||||||
|
ERRORS="$ERRORS\n Doppler Co-Sim ($name): TB internal failure"
|
||||||
|
FAIL=$((FAIL + 1))
|
||||||
|
return
|
||||||
|
fi
|
||||||
|
|
||||||
|
# Run Python compare
|
||||||
|
if command -v python3 >/dev/null 2>&1; then
|
||||||
|
local compare_out
|
||||||
|
local compare_rc=0
|
||||||
|
compare_out=$(python3 tb/cosim/compare_doppler.py "$name" 2>&1) || compare_rc=$?
|
||||||
|
if [[ "$compare_rc" -ne 0 ]]; then
|
||||||
|
echo -e "${RED}FAIL${NC} (compare_doppler.py mismatch)"
|
||||||
|
ERRORS="$ERRORS\n Doppler Co-Sim ($name): Python compare failed"
|
||||||
|
FAIL=$((FAIL + 1))
|
||||||
|
return
|
||||||
|
fi
|
||||||
|
else
|
||||||
|
echo -e "${YELLOW}SKIP${NC} (RTL passed, python3 not found — compare skipped)"
|
||||||
|
SKIP=$((SKIP + 1))
|
||||||
|
return
|
||||||
|
fi
|
||||||
|
|
||||||
|
echo -e "${GREEN}PASS${NC} (RTL + Python compare)"
|
||||||
|
PASS=$((PASS + 1))
|
||||||
|
}
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
# Helper: compile and run a single testbench
|
# Helper: compile and run a single testbench
|
||||||
# run_test <name> <vvp_path> <iverilog_args...>
|
# run_test <name> <vvp_path> <iverilog_args...>
|
||||||
@@ -429,9 +490,9 @@ run_test "Chirp Contract" \
|
|||||||
tb/tb_chirp_ctr_reg.vvp \
|
tb/tb_chirp_ctr_reg.vvp \
|
||||||
tb/tb_chirp_contract.v plfm_chirp_controller.v
|
tb/tb_chirp_contract.v plfm_chirp_controller.v
|
||||||
|
|
||||||
run_test "Doppler Processor (DSP48)" \
|
run_doppler_cosim "stationary" ""
|
||||||
tb/tb_doppler_reg.vvp \
|
run_doppler_cosim "moving" "-DSCENARIO_MOVING"
|
||||||
tb/tb_doppler_cosim.v doppler_processor.v xfft_16.v fft_engine.v
|
run_doppler_cosim "two_targets" "-DSCENARIO_TWO"
|
||||||
|
|
||||||
run_test "Threshold Detector (detection bugs)" \
|
run_test "Threshold Detector (detection bugs)" \
|
||||||
tb/tb_threshold_detector.vvp \
|
tb/tb_threshold_detector.vvp \
|
||||||
|
|||||||
@@ -666,6 +666,231 @@ class TestSoftwareFPGASignalChain(unittest.TestCase):
|
|||||||
self.assertEqual(frame_cfar.magnitude.shape, (64, 32))
|
self.assertEqual(frame_cfar.magnitude.shape, (64, 32))
|
||||||
|
|
||||||
|
|
||||||
|
class TestGoldenReferenceReplayFixtures(unittest.TestCase):
|
||||||
|
"""Golden replay fixtures and SoftwareFPGA smoke tests.
|
||||||
|
|
||||||
|
Phase C3 adds two fixture-integrity tests that verify the committed
|
||||||
|
`golden_reference.py` pipeline still reproduces the checked-in `.npy`
|
||||||
|
artifacts, plus one `SoftwareFPGA.process_chirps()` smoke test that checks
|
||||||
|
the GUI replay path still produces a valid `RadarFrame` shape.
|
||||||
|
"""
|
||||||
|
|
||||||
|
COSIM_DIR = os.path.join(
|
||||||
|
os.path.dirname(__file__), "..", "9_2_FPGA", "tb", "cosim",
|
||||||
|
"real_data", "hex"
|
||||||
|
)
|
||||||
|
|
||||||
|
REQUIRED_COSIM_FILES = (
|
||||||
|
"range_fft_all_i.npy",
|
||||||
|
"range_fft_all_q.npy",
|
||||||
|
"decimated_range_i.npy",
|
||||||
|
"decimated_range_q.npy",
|
||||||
|
"fullchain_cfar_flags.npy",
|
||||||
|
"fullchain_cfar_mag.npy",
|
||||||
|
)
|
||||||
|
|
||||||
|
def _cosim_available(self):
|
||||||
|
return all(
|
||||||
|
os.path.isfile(os.path.join(self.COSIM_DIR, name))
|
||||||
|
for name in self.REQUIRED_COSIM_FILES
|
||||||
|
)
|
||||||
|
|
||||||
|
def _load(self, name):
|
||||||
|
return np.load(os.path.join(self.COSIM_DIR, name))
|
||||||
|
|
||||||
|
def test_decimator_matches_golden(self):
|
||||||
|
"""Golden decimator helper must reproduce committed decimated output."""
|
||||||
|
if not self._cosim_available():
|
||||||
|
self.skipTest("co-sim data not found")
|
||||||
|
import sys as _sys
|
||||||
|
from pathlib import Path as _Path
|
||||||
|
_gr_dir = str(
|
||||||
|
_Path(__file__).resolve().parents[1]
|
||||||
|
/ "9_2_FPGA" / "tb" / "cosim" / "real_data"
|
||||||
|
)
|
||||||
|
if _gr_dir not in _sys.path:
|
||||||
|
_sys.path.insert(0, _gr_dir)
|
||||||
|
from golden_reference import run_range_bin_decimator
|
||||||
|
|
||||||
|
range_i = self._load("range_fft_all_i.npy")
|
||||||
|
range_q = self._load("range_fft_all_q.npy")
|
||||||
|
expected_i = self._load("decimated_range_i.npy")
|
||||||
|
expected_q = self._load("decimated_range_q.npy")
|
||||||
|
|
||||||
|
got_i, got_q = run_range_bin_decimator(range_i, range_q)
|
||||||
|
|
||||||
|
np.testing.assert_array_equal(
|
||||||
|
got_i,
|
||||||
|
expected_i,
|
||||||
|
err_msg="Golden decimator I drifted from committed fixture",
|
||||||
|
)
|
||||||
|
np.testing.assert_array_equal(
|
||||||
|
got_q,
|
||||||
|
expected_q,
|
||||||
|
err_msg="Golden decimator Q drifted from committed fixture",
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_full_chain_cfar_output_matches_golden(self):
|
||||||
|
"""Golden full pipeline must reproduce committed CFAR golden data."""
|
||||||
|
if not self._cosim_available():
|
||||||
|
self.skipTest("co-sim data not found")
|
||||||
|
import sys as _sys
|
||||||
|
from pathlib import Path as _Path
|
||||||
|
_gr_dir = str(
|
||||||
|
_Path(__file__).resolve().parents[1]
|
||||||
|
/ "9_2_FPGA" / "tb" / "cosim" / "real_data"
|
||||||
|
)
|
||||||
|
if _gr_dir not in _sys.path:
|
||||||
|
_sys.path.insert(0, _gr_dir)
|
||||||
|
from golden_reference import (
|
||||||
|
run_range_bin_decimator, run_mti_canceller,
|
||||||
|
run_doppler_fft, run_dc_notch, run_cfar_ca,
|
||||||
|
)
|
||||||
|
|
||||||
|
range_i = self._load("range_fft_all_i.npy")
|
||||||
|
range_q = self._load("range_fft_all_q.npy")
|
||||||
|
expected_flags = self._load("fullchain_cfar_flags.npy")
|
||||||
|
expected_mag = self._load("fullchain_cfar_mag.npy")
|
||||||
|
|
||||||
|
# Run the same chain as golden_reference.py:main()
|
||||||
|
twiddle_16 = os.path.join(
|
||||||
|
os.path.dirname(__file__), "..", "9_2_FPGA",
|
||||||
|
"fft_twiddle_16.mem",
|
||||||
|
)
|
||||||
|
if not os.path.exists(twiddle_16):
|
||||||
|
self.skipTest("fft_twiddle_16.mem not found")
|
||||||
|
|
||||||
|
dec_i, dec_q = run_range_bin_decimator(range_i, range_q)
|
||||||
|
mti_i, mti_q = run_mti_canceller(dec_i, dec_q, enable=True)
|
||||||
|
dop_i, dop_q = run_doppler_fft(
|
||||||
|
mti_i, mti_q, twiddle_file_16=twiddle_16,
|
||||||
|
)
|
||||||
|
notch_i, notch_q = run_dc_notch(dop_i, dop_q, width=2)
|
||||||
|
flags, mag, _thr = run_cfar_ca(
|
||||||
|
notch_i, notch_q, guard=2, train=8, alpha_q44=0x30, mode="CA",
|
||||||
|
)
|
||||||
|
|
||||||
|
np.testing.assert_array_equal(
|
||||||
|
flags,
|
||||||
|
expected_flags,
|
||||||
|
err_msg="Golden full-chain CFAR flags mismatch committed fixture",
|
||||||
|
)
|
||||||
|
np.testing.assert_array_equal(
|
||||||
|
mag,
|
||||||
|
expected_mag,
|
||||||
|
err_msg="Golden full-chain CFAR magnitudes mismatch committed fixture",
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_software_fpga_frame_shape_from_range_fft(self):
|
||||||
|
"""SoftwareFPGA.process_chirps on range_fft data produces correct shape.
|
||||||
|
|
||||||
|
NOTE: process_chirps re-runs the range FFT, so we can't feed it
|
||||||
|
post-FFT data and expect golden match. This test verifies that the
|
||||||
|
RadarFrame output has the right shape and that the chain doesn't crash
|
||||||
|
when given realistic-amplitude input.
|
||||||
|
"""
|
||||||
|
if not self._cosim_available():
|
||||||
|
self.skipTest("co-sim data not found")
|
||||||
|
from v7.software_fpga import SoftwareFPGA
|
||||||
|
from radar_protocol import RadarFrame
|
||||||
|
|
||||||
|
# Use decimated data padded to 1024 as input (so range FFT has content)
|
||||||
|
dec_i = self._load("decimated_range_i.npy")
|
||||||
|
dec_q = self._load("decimated_range_q.npy")
|
||||||
|
iq_i = np.zeros((32, 1024), dtype=np.int64)
|
||||||
|
iq_q = np.zeros((32, 1024), dtype=np.int64)
|
||||||
|
iq_i[:, :dec_i.shape[1]] = dec_i
|
||||||
|
iq_q[:, :dec_q.shape[1]] = dec_q
|
||||||
|
|
||||||
|
fpga = SoftwareFPGA()
|
||||||
|
fpga.set_mti_enable(True)
|
||||||
|
fpga.set_cfar_enable(True)
|
||||||
|
fpga.set_dc_notch_width(2)
|
||||||
|
frame = fpga.process_chirps(iq_i, iq_q, frame_number=99)
|
||||||
|
|
||||||
|
self.assertIsInstance(frame, RadarFrame)
|
||||||
|
self.assertEqual(frame.range_doppler_i.shape, (64, 32))
|
||||||
|
self.assertEqual(frame.magnitude.shape, (64, 32))
|
||||||
|
self.assertEqual(frame.detections.shape, (64, 32))
|
||||||
|
self.assertEqual(frame.range_profile.shape, (64,))
|
||||||
|
self.assertEqual(frame.frame_number, 99)
|
||||||
|
|
||||||
|
def test_software_fpga_wiring_matches_manual_chain(self):
|
||||||
|
"""SoftwareFPGA.process_chirps must call stages in the correct order
|
||||||
|
with the correct parameters.
|
||||||
|
|
||||||
|
Feeds identical synthetic IQ through both SoftwareFPGA and a manual
|
||||||
|
golden_reference chain, then asserts the CFAR output matches.
|
||||||
|
This catches wiring bugs: wrong stage order, wrong default params,
|
||||||
|
missing DC notch, etc.
|
||||||
|
"""
|
||||||
|
from v7.software_fpga import SoftwareFPGA, TWIDDLE_1024, TWIDDLE_16
|
||||||
|
if not os.path.exists(TWIDDLE_1024) or not os.path.exists(TWIDDLE_16):
|
||||||
|
self.skipTest("twiddle files not found")
|
||||||
|
|
||||||
|
import sys as _sys
|
||||||
|
from pathlib import Path as _Path
|
||||||
|
_gr_dir = str(
|
||||||
|
_Path(__file__).resolve().parents[1]
|
||||||
|
/ "9_2_FPGA" / "tb" / "cosim" / "real_data"
|
||||||
|
)
|
||||||
|
if _gr_dir not in _sys.path:
|
||||||
|
_sys.path.insert(0, _gr_dir)
|
||||||
|
from golden_reference import (
|
||||||
|
run_range_fft, run_range_bin_decimator, run_mti_canceller,
|
||||||
|
run_doppler_fft, run_dc_notch, run_cfar_ca,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Deterministic synthetic input — small int16 values to avoid overflow
|
||||||
|
rng = np.random.RandomState(42)
|
||||||
|
iq_i = rng.randint(-500, 500, size=(32, 1024), dtype=np.int64)
|
||||||
|
iq_q = rng.randint(-500, 500, size=(32, 1024), dtype=np.int64)
|
||||||
|
|
||||||
|
# --- SoftwareFPGA path (what we're testing) ---
|
||||||
|
fpga = SoftwareFPGA()
|
||||||
|
fpga.set_mti_enable(True)
|
||||||
|
fpga.set_cfar_enable(True)
|
||||||
|
fpga.set_dc_notch_width(2)
|
||||||
|
frame = fpga.process_chirps(iq_i.copy(), iq_q.copy(), frame_number=0)
|
||||||
|
|
||||||
|
# --- Manual golden_reference chain (ground truth) ---
|
||||||
|
range_i = np.zeros_like(iq_i)
|
||||||
|
range_q = np.zeros_like(iq_q)
|
||||||
|
for c in range(32):
|
||||||
|
range_i[c], range_q[c] = run_range_fft(
|
||||||
|
iq_i[c], iq_q[c], twiddle_file=TWIDDLE_1024,
|
||||||
|
)
|
||||||
|
dec_i, dec_q = run_range_bin_decimator(range_i, range_q)
|
||||||
|
mti_i, mti_q = run_mti_canceller(dec_i, dec_q, enable=True)
|
||||||
|
dop_i, dop_q = run_doppler_fft(
|
||||||
|
mti_i, mti_q, twiddle_file_16=TWIDDLE_16,
|
||||||
|
)
|
||||||
|
notch_i, notch_q = run_dc_notch(dop_i, dop_q, width=2)
|
||||||
|
flags, mag, _thr = run_cfar_ca(
|
||||||
|
notch_i, notch_q, guard=2, train=8, alpha_q44=0x30, mode="CA",
|
||||||
|
)
|
||||||
|
|
||||||
|
# --- Compare ---
|
||||||
|
np.testing.assert_array_equal(
|
||||||
|
frame.detections, flags.astype(np.uint8),
|
||||||
|
err_msg="SoftwareFPGA CFAR flags differ from manual chain",
|
||||||
|
)
|
||||||
|
np.testing.assert_array_equal(
|
||||||
|
frame.magnitude, mag.astype(np.float64),
|
||||||
|
err_msg="SoftwareFPGA CFAR magnitudes differ from manual chain",
|
||||||
|
)
|
||||||
|
expected_rd_i = np.clip(notch_i, -32768, 32767).astype(np.int16)
|
||||||
|
expected_rd_q = np.clip(notch_q, -32768, 32767).astype(np.int16)
|
||||||
|
np.testing.assert_array_equal(
|
||||||
|
frame.range_doppler_i, expected_rd_i,
|
||||||
|
err_msg="SoftwareFPGA range_doppler_i differs from manual chain",
|
||||||
|
)
|
||||||
|
np.testing.assert_array_equal(
|
||||||
|
frame.range_doppler_q, expected_rd_q,
|
||||||
|
err_msg="SoftwareFPGA range_doppler_q differs from manual chain",
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class TestQuantizeRawIQ(unittest.TestCase):
|
class TestQuantizeRawIQ(unittest.TestCase):
|
||||||
"""quantize_raw_iq utility function."""
|
"""quantize_raw_iq utility function."""
|
||||||
|
|
||||||
|
|||||||
@@ -1100,3 +1100,392 @@ class TestTier4BannedPatterns:
|
|||||||
+ "\n".join(f" {h}" for h in all_hits[:20])
|
+ "\n".join(f" {h}" for h in all_hits[:20])
|
||||||
+ ("\n ... and more" if len(all_hits) > 20 else "")
|
+ ("\n ... and more" if len(all_hits) > 20 else "")
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# ===================================================================
|
||||||
|
# TIER 5: Pipeline Co-Simulation (Python golden_reference round-trip)
|
||||||
|
# ===================================================================
|
||||||
|
#
|
||||||
|
# These tests load committed .npy golden reference data and run the
|
||||||
|
# Python bit-accurate model functions on intermediate outputs to verify
|
||||||
|
# stage-to-stage consistency. This catches:
|
||||||
|
# - Golden reference function drift (model changed but .npy not regenerated)
|
||||||
|
# - Stage boundary mismatches (one stage's output shape/scale doesn't
|
||||||
|
# match the next stage's expected input)
|
||||||
|
# - Bit-accuracy regressions in the Python model
|
||||||
|
#
|
||||||
|
# The .npy files were generated by golden_reference.py:main() using
|
||||||
|
# real ADI Phaser test data. These tests do NOT re-run golden_reference
|
||||||
|
# main() — they verify that each processing function, when fed the
|
||||||
|
# committed intermediate outputs, reproduces the committed downstream
|
||||||
|
# outputs. This is fundamentally different from self-blessing because
|
||||||
|
# the .npy files are committed artifacts (not regenerated each run).
|
||||||
|
|
||||||
|
import numpy as np # noqa: E402
|
||||||
|
|
||||||
|
_COSIM_HEX_DIR = cp.REPO_ROOT / "9_Firmware" / "9_2_FPGA" / "tb" / "cosim" / "real_data" / "hex"
|
||||||
|
_GOLDEN_REF_DIR = cp.REPO_ROOT / "9_Firmware" / "9_2_FPGA" / "tb" / "cosim" / "real_data"
|
||||||
|
|
||||||
|
# Add golden_reference to path for imports
|
||||||
|
if str(_GOLDEN_REF_DIR) not in sys.path:
|
||||||
|
sys.path.insert(0, str(_GOLDEN_REF_DIR))
|
||||||
|
|
||||||
|
|
||||||
|
def _load_npy(name: str) -> np.ndarray:
|
||||||
|
"""Load a committed .npy file from the cosim hex directory.
|
||||||
|
|
||||||
|
Raises FileNotFoundError if the file is missing — use _skip_if_no_golden()
|
||||||
|
as a sentinel check at the start of each test to skip gracefully when the
|
||||||
|
entire golden dataset is absent.
|
||||||
|
"""
|
||||||
|
path = _COSIM_HEX_DIR / name
|
||||||
|
return np.load(str(path))
|
||||||
|
|
||||||
|
|
||||||
|
# Every Tier 5 artifact that is loaded by these tests. If any is missing, the
|
||||||
|
# committed golden dataset is incomplete and all Tier 5 tests are skipped.
|
||||||
|
_GOLDEN_SENTINELS = [
|
||||||
|
"decimated_range_i.npy",
|
||||||
|
"decimated_range_q.npy",
|
||||||
|
"doppler_map_i.npy",
|
||||||
|
"doppler_map_q.npy",
|
||||||
|
"fullchain_mti_i.npy",
|
||||||
|
"fullchain_mti_q.npy",
|
||||||
|
"fullchain_mti_doppler_i.npy",
|
||||||
|
"fullchain_mti_doppler_q.npy",
|
||||||
|
"fullchain_cfar_flags.npy",
|
||||||
|
"fullchain_cfar_mag.npy",
|
||||||
|
"fullchain_cfar_thr.npy",
|
||||||
|
"range_fft_all_i.npy",
|
||||||
|
"range_fft_all_q.npy",
|
||||||
|
"detection_mag.npy",
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def _skip_if_no_golden():
|
||||||
|
"""Skip test if committed golden .npy data is not available."""
|
||||||
|
for sentinel in _GOLDEN_SENTINELS:
|
||||||
|
if not (_COSIM_HEX_DIR / sentinel).exists():
|
||||||
|
pytest.skip(f"Golden data incomplete: {sentinel} not found")
|
||||||
|
|
||||||
|
|
||||||
|
class TestTier5PipelineDecimToMTI:
|
||||||
|
"""Verify decimated range → MTI canceller stage consistency."""
|
||||||
|
|
||||||
|
def test_mti_canceller_reproduces_committed_output(self):
|
||||||
|
"""run_mti_canceller(decimated) must match fullchain_mti .npy files."""
|
||||||
|
_skip_if_no_golden()
|
||||||
|
from golden_reference import run_mti_canceller
|
||||||
|
|
||||||
|
dec_i = _load_npy("decimated_range_i.npy")
|
||||||
|
dec_q = _load_npy("decimated_range_q.npy")
|
||||||
|
expected_i = _load_npy("fullchain_mti_i.npy")
|
||||||
|
expected_q = _load_npy("fullchain_mti_q.npy")
|
||||||
|
|
||||||
|
got_i, got_q = run_mti_canceller(dec_i, dec_q, enable=True)
|
||||||
|
|
||||||
|
np.testing.assert_array_equal(got_i, expected_i,
|
||||||
|
err_msg="MTI I output drifted from committed golden data")
|
||||||
|
np.testing.assert_array_equal(got_q, expected_q,
|
||||||
|
err_msg="MTI Q output drifted from committed golden data")
|
||||||
|
|
||||||
|
def test_mti_passthrough_equals_input(self):
|
||||||
|
"""MTI with enable=False must pass through unchanged."""
|
||||||
|
_skip_if_no_golden()
|
||||||
|
from golden_reference import run_mti_canceller
|
||||||
|
|
||||||
|
dec_i = _load_npy("decimated_range_i.npy")
|
||||||
|
dec_q = _load_npy("decimated_range_q.npy")
|
||||||
|
|
||||||
|
got_i, got_q = run_mti_canceller(dec_i, dec_q, enable=False)
|
||||||
|
|
||||||
|
np.testing.assert_array_equal(got_i, dec_i)
|
||||||
|
np.testing.assert_array_equal(got_q, dec_q)
|
||||||
|
|
||||||
|
def test_mti_first_chirp_is_zeroed(self):
|
||||||
|
"""MTI 2-pulse canceller must zero the first chirp (no previous data)."""
|
||||||
|
_skip_if_no_golden()
|
||||||
|
from golden_reference import run_mti_canceller
|
||||||
|
|
||||||
|
dec_i = _load_npy("decimated_range_i.npy")
|
||||||
|
dec_q = _load_npy("decimated_range_q.npy")
|
||||||
|
|
||||||
|
got_i, got_q = run_mti_canceller(dec_i, dec_q, enable=True)
|
||||||
|
|
||||||
|
np.testing.assert_array_equal(got_i[0], 0,
|
||||||
|
err_msg="MTI chirp 0 should be all zeros")
|
||||||
|
np.testing.assert_array_equal(got_q[0], 0,
|
||||||
|
err_msg="MTI chirp 0 should be all zeros")
|
||||||
|
|
||||||
|
|
||||||
|
class TestTier5PipelineDoppler:
|
||||||
|
"""Verify Doppler FFT stage consistency against committed golden data."""
|
||||||
|
|
||||||
|
def test_doppler_no_mti_reproduces_committed_output(self):
|
||||||
|
"""run_doppler_fft(range_fft_all) must match doppler_map .npy (direct path).
|
||||||
|
|
||||||
|
NOTE: doppler_map was generated from full range FFT output (32, 1024),
|
||||||
|
NOT from decimated data. The function only processes the first 64
|
||||||
|
columns, but it needs the twiddle file that golden_reference.py used.
|
||||||
|
"""
|
||||||
|
_skip_if_no_golden()
|
||||||
|
from golden_reference import run_doppler_fft
|
||||||
|
|
||||||
|
range_i = _load_npy("range_fft_all_i.npy")
|
||||||
|
range_q = _load_npy("range_fft_all_q.npy")
|
||||||
|
expected_i = _load_npy("doppler_map_i.npy")
|
||||||
|
expected_q = _load_npy("doppler_map_q.npy")
|
||||||
|
|
||||||
|
twiddle_16 = str(cp.REPO_ROOT / "9_Firmware" / "9_2_FPGA" / "fft_twiddle_16.mem")
|
||||||
|
if not Path(twiddle_16).exists():
|
||||||
|
pytest.skip("fft_twiddle_16.mem not found")
|
||||||
|
|
||||||
|
got_i, got_q = run_doppler_fft(range_i, range_q, twiddle_file_16=twiddle_16)
|
||||||
|
|
||||||
|
np.testing.assert_array_equal(got_i, expected_i,
|
||||||
|
err_msg="Doppler I (direct path) drifted from committed golden data")
|
||||||
|
np.testing.assert_array_equal(got_q, expected_q,
|
||||||
|
err_msg="Doppler Q (direct path) drifted from committed golden data")
|
||||||
|
|
||||||
|
def test_doppler_mti_reproduces_committed_output(self):
|
||||||
|
"""run_doppler_fft(mti_output) must match fullchain_mti_doppler .npy."""
|
||||||
|
_skip_if_no_golden()
|
||||||
|
from golden_reference import run_doppler_fft
|
||||||
|
|
||||||
|
mti_i = _load_npy("fullchain_mti_i.npy")
|
||||||
|
mti_q = _load_npy("fullchain_mti_q.npy")
|
||||||
|
expected_i = _load_npy("fullchain_mti_doppler_i.npy")
|
||||||
|
expected_q = _load_npy("fullchain_mti_doppler_q.npy")
|
||||||
|
|
||||||
|
twiddle_16 = str(cp.REPO_ROOT / "9_Firmware" / "9_2_FPGA" / "fft_twiddle_16.mem")
|
||||||
|
if not Path(twiddle_16).exists():
|
||||||
|
pytest.skip("fft_twiddle_16.mem not found")
|
||||||
|
|
||||||
|
got_i, got_q = run_doppler_fft(mti_i, mti_q, twiddle_file_16=twiddle_16)
|
||||||
|
|
||||||
|
np.testing.assert_array_equal(got_i, expected_i,
|
||||||
|
err_msg="Doppler I (MTI path) drifted from committed golden data")
|
||||||
|
np.testing.assert_array_equal(got_q, expected_q,
|
||||||
|
err_msg="Doppler Q (MTI path) drifted from committed golden data")
|
||||||
|
|
||||||
|
def test_doppler_output_shape(self):
|
||||||
|
"""Doppler output must be (64, 32) — 64 range bins x 32 Doppler bins."""
|
||||||
|
_skip_if_no_golden()
|
||||||
|
from golden_reference import run_doppler_fft
|
||||||
|
|
||||||
|
dec_i = _load_npy("decimated_range_i.npy")
|
||||||
|
dec_q = _load_npy("decimated_range_q.npy")
|
||||||
|
|
||||||
|
twiddle_16 = str(
|
||||||
|
cp.REPO_ROOT / "9_Firmware" / "9_2_FPGA" / "fft_twiddle_16.mem"
|
||||||
|
)
|
||||||
|
tw = twiddle_16 if Path(twiddle_16).exists() else None
|
||||||
|
|
||||||
|
got_i, got_q = run_doppler_fft(dec_i, dec_q, twiddle_file_16=tw)
|
||||||
|
|
||||||
|
assert got_i.shape == (64, 32), f"Expected (64,32), got {got_i.shape}"
|
||||||
|
assert got_q.shape == (64, 32), f"Expected (64,32), got {got_q.shape}"
|
||||||
|
|
||||||
|
def test_doppler_chained_from_decimated_via_mti(self):
|
||||||
|
"""Full chain: decimated → MTI → Doppler must match committed output."""
|
||||||
|
_skip_if_no_golden()
|
||||||
|
from golden_reference import run_mti_canceller, run_doppler_fft
|
||||||
|
|
||||||
|
dec_i = _load_npy("decimated_range_i.npy")
|
||||||
|
dec_q = _load_npy("decimated_range_q.npy")
|
||||||
|
expected_i = _load_npy("fullchain_mti_doppler_i.npy")
|
||||||
|
expected_q = _load_npy("fullchain_mti_doppler_q.npy")
|
||||||
|
|
||||||
|
twiddle_16 = str(cp.REPO_ROOT / "9_Firmware" / "9_2_FPGA" / "fft_twiddle_16.mem")
|
||||||
|
if not Path(twiddle_16).exists():
|
||||||
|
pytest.skip("fft_twiddle_16.mem not found")
|
||||||
|
|
||||||
|
mti_i, mti_q = run_mti_canceller(dec_i, dec_q, enable=True)
|
||||||
|
got_i, got_q = run_doppler_fft(mti_i, mti_q, twiddle_file_16=twiddle_16)
|
||||||
|
|
||||||
|
np.testing.assert_array_equal(got_i, expected_i,
|
||||||
|
err_msg="Chained decim→MTI→Doppler I mismatches committed golden")
|
||||||
|
np.testing.assert_array_equal(got_q, expected_q,
|
||||||
|
err_msg="Chained decim→MTI→Doppler Q mismatches committed golden")
|
||||||
|
|
||||||
|
|
||||||
|
class TestTier5PipelineCFAR:
|
||||||
|
"""Verify CFAR stage against committed golden data."""
|
||||||
|
|
||||||
|
# Parameters matching golden_reference.py:main() at lines 1266-1269
|
||||||
|
CFAR_GUARD = 2
|
||||||
|
CFAR_TRAIN = 8
|
||||||
|
CFAR_ALPHA = 0x30
|
||||||
|
CFAR_MODE = "CA"
|
||||||
|
DC_NOTCH_WIDTH = 2
|
||||||
|
|
||||||
|
def test_cfar_reproduces_committed_output(self):
|
||||||
|
"""Full chain: MTI Doppler → DC notch → CFAR must match committed .npy."""
|
||||||
|
_skip_if_no_golden()
|
||||||
|
from golden_reference import run_dc_notch, run_cfar_ca
|
||||||
|
|
||||||
|
doppler_i = _load_npy("fullchain_mti_doppler_i.npy")
|
||||||
|
doppler_q = _load_npy("fullchain_mti_doppler_q.npy")
|
||||||
|
expected_flags = _load_npy("fullchain_cfar_flags.npy")
|
||||||
|
expected_mag = _load_npy("fullchain_cfar_mag.npy")
|
||||||
|
expected_thr = _load_npy("fullchain_cfar_thr.npy")
|
||||||
|
|
||||||
|
# Apply DC notch first (matching golden_reference.py:main() flow)
|
||||||
|
notch_i, notch_q = run_dc_notch(doppler_i, doppler_q,
|
||||||
|
width=self.DC_NOTCH_WIDTH)
|
||||||
|
|
||||||
|
got_flags, got_mag, got_thr = run_cfar_ca(
|
||||||
|
notch_i, notch_q,
|
||||||
|
guard=self.CFAR_GUARD,
|
||||||
|
train=self.CFAR_TRAIN,
|
||||||
|
alpha_q44=self.CFAR_ALPHA,
|
||||||
|
mode=self.CFAR_MODE,
|
||||||
|
)
|
||||||
|
|
||||||
|
np.testing.assert_array_equal(got_flags, expected_flags,
|
||||||
|
err_msg="CFAR flags drifted from committed golden data")
|
||||||
|
np.testing.assert_array_equal(got_mag, expected_mag,
|
||||||
|
err_msg="CFAR magnitudes drifted from committed golden data")
|
||||||
|
np.testing.assert_array_equal(got_thr, expected_thr,
|
||||||
|
err_msg="CFAR thresholds drifted from committed golden data")
|
||||||
|
|
||||||
|
def test_cfar_output_shapes(self):
|
||||||
|
"""CFAR outputs must be (64, 32)."""
|
||||||
|
_skip_if_no_golden()
|
||||||
|
from golden_reference import run_dc_notch, run_cfar_ca
|
||||||
|
|
||||||
|
doppler_i = _load_npy("fullchain_mti_doppler_i.npy")
|
||||||
|
doppler_q = _load_npy("fullchain_mti_doppler_q.npy")
|
||||||
|
|
||||||
|
notch_i, notch_q = run_dc_notch(
|
||||||
|
doppler_i, doppler_q, width=self.DC_NOTCH_WIDTH,
|
||||||
|
)
|
||||||
|
flags, mag, thr = run_cfar_ca(
|
||||||
|
notch_i, notch_q,
|
||||||
|
guard=self.CFAR_GUARD,
|
||||||
|
train=self.CFAR_TRAIN,
|
||||||
|
alpha_q44=self.CFAR_ALPHA,
|
||||||
|
mode=self.CFAR_MODE,
|
||||||
|
)
|
||||||
|
|
||||||
|
assert flags.shape == (64, 32)
|
||||||
|
assert mag.shape == (64, 32)
|
||||||
|
assert thr.shape == (64, 32)
|
||||||
|
|
||||||
|
def test_cfar_flags_are_boolean(self):
|
||||||
|
"""CFAR flags should be boolean (0 or 1 only)."""
|
||||||
|
_skip_if_no_golden()
|
||||||
|
flags = _load_npy("fullchain_cfar_flags.npy")
|
||||||
|
unique = set(np.unique(flags))
|
||||||
|
assert unique <= {0, 1}, \
|
||||||
|
f"CFAR flags contain non-boolean values: {unique}"
|
||||||
|
|
||||||
|
def test_cfar_threshold_ge_zero(self):
|
||||||
|
"""CFAR thresholds must be non-negative (unsigned in RTL)."""
|
||||||
|
_skip_if_no_golden()
|
||||||
|
thr = _load_npy("fullchain_cfar_thr.npy")
|
||||||
|
assert np.all(thr >= 0), "CFAR thresholds contain negative values"
|
||||||
|
|
||||||
|
def test_cfar_detection_implies_mag_gt_threshold(self):
|
||||||
|
"""Where CFAR flags are True, magnitude must exceed threshold."""
|
||||||
|
_skip_if_no_golden()
|
||||||
|
flags = _load_npy("fullchain_cfar_flags.npy")
|
||||||
|
mag = _load_npy("fullchain_cfar_mag.npy")
|
||||||
|
thr = _load_npy("fullchain_cfar_thr.npy")
|
||||||
|
|
||||||
|
detected = flags.astype(bool)
|
||||||
|
if not np.any(detected):
|
||||||
|
pytest.skip("No CFAR detections in golden data")
|
||||||
|
|
||||||
|
# Where detected, magnitude must be > threshold (strict inequality)
|
||||||
|
violations = detected & (mag <= thr)
|
||||||
|
n_violations = int(np.sum(violations))
|
||||||
|
assert n_violations == 0, (
|
||||||
|
f"{n_violations} cells detected but mag <= threshold"
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_dc_notch_zeros_correct_bins(self):
|
||||||
|
"""DC notch with width=2 must zero bins {0,1,15,16,17,31}.
|
||||||
|
|
||||||
|
Logic: bin_within_sf < width OR bin_within_sf > (15 - width + 1)
|
||||||
|
width=2: < 2 → {0,1} per subframe; > 14 → {15} per subframe
|
||||||
|
Global: {0, 1, 15, 16, 17, 31}
|
||||||
|
"""
|
||||||
|
_skip_if_no_golden()
|
||||||
|
from golden_reference import run_dc_notch
|
||||||
|
|
||||||
|
doppler_i = _load_npy("fullchain_mti_doppler_i.npy")
|
||||||
|
doppler_q = _load_npy("fullchain_mti_doppler_q.npy")
|
||||||
|
|
||||||
|
notch_i, notch_q = run_dc_notch(doppler_i, doppler_q, width=2)
|
||||||
|
|
||||||
|
# Per subframe: bins < 2 → {0,1} and bins > 14 → {15}
|
||||||
|
expected_zero_bins = {0, 1, 15, 16, 17, 31}
|
||||||
|
for dbin in range(32):
|
||||||
|
if dbin in expected_zero_bins:
|
||||||
|
assert np.all(notch_i[:, dbin] == 0), \
|
||||||
|
f"DC notch failed: bin {dbin} should be zero"
|
||||||
|
assert np.all(notch_q[:, dbin] == 0), \
|
||||||
|
f"DC notch failed: bin {dbin} should be zero"
|
||||||
|
else:
|
||||||
|
assert np.array_equal(notch_i[:, dbin], doppler_i[:, dbin]), \
|
||||||
|
f"DC notch modified non-notched I bin {dbin}"
|
||||||
|
assert np.array_equal(notch_q[:, dbin], doppler_q[:, dbin]), \
|
||||||
|
f"DC notch modified non-notched Q bin {dbin}"
|
||||||
|
|
||||||
|
|
||||||
|
class TestTier5FullPipelineChain:
|
||||||
|
"""End-to-end: decimated → MTI → Doppler → DC notch → CFAR."""
|
||||||
|
|
||||||
|
def test_full_chain_matches_committed_cfar(self):
|
||||||
|
"""Chaining all stages from decimated input must match committed CFAR output."""
|
||||||
|
_skip_if_no_golden()
|
||||||
|
from golden_reference import (
|
||||||
|
run_mti_canceller, run_doppler_fft, run_dc_notch, run_cfar_ca,
|
||||||
|
)
|
||||||
|
|
||||||
|
dec_i = _load_npy("decimated_range_i.npy")
|
||||||
|
dec_q = _load_npy("decimated_range_q.npy")
|
||||||
|
expected_flags = _load_npy("fullchain_cfar_flags.npy")
|
||||||
|
expected_mag = _load_npy("fullchain_cfar_mag.npy")
|
||||||
|
|
||||||
|
twiddle_16 = str(cp.REPO_ROOT / "9_Firmware" / "9_2_FPGA" / "fft_twiddle_16.mem")
|
||||||
|
if not Path(twiddle_16).exists():
|
||||||
|
pytest.skip("fft_twiddle_16.mem not found")
|
||||||
|
|
||||||
|
# Run full chain
|
||||||
|
mti_i, mti_q = run_mti_canceller(dec_i, dec_q, enable=True)
|
||||||
|
dop_i, dop_q = run_doppler_fft(mti_i, mti_q, twiddle_file_16=twiddle_16)
|
||||||
|
notch_i, notch_q = run_dc_notch(dop_i, dop_q, width=2)
|
||||||
|
flags, mag, _thr = run_cfar_ca(
|
||||||
|
notch_i, notch_q, guard=2, train=8, alpha_q44=0x30, mode="CA",
|
||||||
|
)
|
||||||
|
|
||||||
|
np.testing.assert_array_equal(flags, expected_flags,
|
||||||
|
err_msg="Full chain flags mismatch — pipeline drift detected")
|
||||||
|
np.testing.assert_array_equal(mag, expected_mag,
|
||||||
|
err_msg="Full chain magnitudes mismatch — pipeline drift detected")
|
||||||
|
|
||||||
|
def test_detection_mag_matches_committed(self):
|
||||||
|
"""Simple threshold detection magnitude must match committed detection_mag.npy.
|
||||||
|
|
||||||
|
detection_mag.npy was generated from the DIRECT path (no decimator, no MTI):
|
||||||
|
range_fft_all → doppler_fft → run_detection
|
||||||
|
"""
|
||||||
|
_skip_if_no_golden()
|
||||||
|
from golden_reference import run_doppler_fft, run_detection
|
||||||
|
|
||||||
|
range_i = _load_npy("range_fft_all_i.npy")
|
||||||
|
range_q = _load_npy("range_fft_all_q.npy")
|
||||||
|
expected_mag = _load_npy("detection_mag.npy")
|
||||||
|
|
||||||
|
twiddle_16 = str(cp.REPO_ROOT / "9_Firmware" / "9_2_FPGA" / "fft_twiddle_16.mem")
|
||||||
|
if not Path(twiddle_16).exists():
|
||||||
|
pytest.skip("fft_twiddle_16.mem not found")
|
||||||
|
|
||||||
|
# Direct path (matching golden_reference.py:main() flow for detection_mag)
|
||||||
|
dop_i, dop_q = run_doppler_fft(range_i, range_q, twiddle_file_16=twiddle_16)
|
||||||
|
got_mag, _det = run_detection(dop_i, dop_q, threshold=10000)
|
||||||
|
|
||||||
|
np.testing.assert_array_equal(got_mag, expected_mag,
|
||||||
|
err_msg="Detection magnitude drifted from committed golden data")
|
||||||
|
|||||||
Reference in New Issue
Block a user