""" V7-specific unit tests for the PLFM Radar GUI V7 modules. Tests cover: - v7.models: RadarTarget, RadarSettings, GPSData, ProcessingConfig - v7.processing: RadarProcessor, USBPacketParser, apply_pitch_correction - v7.workers: polar_to_geographic - v7.hardware: STM32USBInterface (basic), production protocol re-exports Does NOT require a running Qt event loop — only unit-testable components. Run with: python -m unittest test_v7 -v """ import os import struct import unittest from dataclasses import asdict import numpy as np # ============================================================================= # Test: v7.models # ============================================================================= class TestRadarTarget(unittest.TestCase): """RadarTarget dataclass.""" def test_defaults(self): t = _models().RadarTarget(id=1, range=1000.0, velocity=5.0, azimuth=45.0, elevation=2.0) self.assertEqual(t.id, 1) self.assertEqual(t.range, 1000.0) self.assertEqual(t.snr, 0.0) self.assertEqual(t.track_id, -1) self.assertEqual(t.classification, "unknown") def test_to_dict(self): t = _models().RadarTarget(id=1, range=500.0, velocity=-10.0, azimuth=0.0, elevation=0.0, snr=15.0) d = t.to_dict() self.assertIsInstance(d, dict) self.assertEqual(d["range"], 500.0) self.assertEqual(d["snr"], 15.0) class TestRadarSettings(unittest.TestCase): """RadarSettings — verify stale STM32 fields are removed.""" def test_no_stale_fields(self): """chirp_duration, freq_min/max, prf1/2 must NOT exist.""" s = _models().RadarSettings() d = asdict(s) for stale in ["chirp_duration_1", "chirp_duration_2", "freq_min", "freq_max", "prf1", "prf2", "chirps_per_position"]: self.assertNotIn(stale, d, f"Stale field '{stale}' still present") def test_has_physical_conversion_fields(self): s = _models().RadarSettings() self.assertIsInstance(s.range_resolution, float) self.assertIsInstance(s.velocity_resolution, float) self.assertGreater(s.range_resolution, 0) self.assertGreater(s.velocity_resolution, 0) def test_defaults(self): s = _models().RadarSettings() self.assertEqual(s.system_frequency, 10e9) self.assertEqual(s.coverage_radius, 50000) self.assertEqual(s.max_distance, 50000) class TestGPSData(unittest.TestCase): def test_to_dict(self): g = _models().GPSData(latitude=41.9, longitude=12.5, altitude=100.0, pitch=2.5) d = g.to_dict() self.assertAlmostEqual(d["latitude"], 41.9) self.assertAlmostEqual(d["pitch"], 2.5) class TestProcessingConfig(unittest.TestCase): def test_defaults(self): cfg = _models().ProcessingConfig() self.assertTrue(cfg.clustering_enabled) self.assertTrue(cfg.tracking_enabled) self.assertFalse(cfg.mti_enabled) self.assertFalse(cfg.cfar_enabled) class TestNoCrcmodDependency(unittest.TestCase): """crcmod was removed — verify it's not exported.""" def test_no_crcmod_available(self): models = _models() self.assertFalse(hasattr(models, "CRCMOD_AVAILABLE"), "CRCMOD_AVAILABLE should be removed from models") # ============================================================================= # Test: v7.processing # ============================================================================= class TestApplyPitchCorrection(unittest.TestCase): def test_positive_pitch(self): from v7.processing import apply_pitch_correction self.assertAlmostEqual(apply_pitch_correction(10.0, 3.0), 7.0) def test_zero_pitch(self): from v7.processing import apply_pitch_correction self.assertAlmostEqual(apply_pitch_correction(5.0, 0.0), 5.0) class TestRadarProcessorMTI(unittest.TestCase): def test_mti_order1(self): from v7.processing import RadarProcessor from v7.models import ProcessingConfig proc = RadarProcessor() proc.set_config(ProcessingConfig(mti_enabled=True, mti_order=1)) frame1 = np.ones((64, 32)) frame2 = np.ones((64, 32)) * 3 result1 = proc.mti_filter(frame1) np.testing.assert_array_equal(result1, np.zeros((64, 32)), err_msg="First frame should be zeros (no history)") result2 = proc.mti_filter(frame2) expected = frame2 - frame1 np.testing.assert_array_almost_equal(result2, expected) def test_mti_order2(self): from v7.processing import RadarProcessor from v7.models import ProcessingConfig proc = RadarProcessor() proc.set_config(ProcessingConfig(mti_enabled=True, mti_order=2)) f1 = np.ones((4, 4)) f2 = np.ones((4, 4)) * 2 f3 = np.ones((4, 4)) * 5 proc.mti_filter(f1) # zeros (need 3 frames) proc.mti_filter(f2) # zeros result = proc.mti_filter(f3) # Order 2: x[n] - 2*x[n-1] + x[n-2] = 5 - 4 + 1 = 2 np.testing.assert_array_almost_equal(result, np.ones((4, 4)) * 2) class TestRadarProcessorCFAR(unittest.TestCase): def test_cfar_1d_detects_peak(self): from v7.processing import RadarProcessor signal = np.ones(64) * 10 signal[32] = 500 # inject a strong target det = RadarProcessor.cfar_1d(signal, guard=2, train=4, threshold_factor=3.0, cfar_type="CA-CFAR") self.assertTrue(det[32], "Should detect strong peak at bin 32") def test_cfar_1d_no_false_alarm(self): from v7.processing import RadarProcessor signal = np.ones(64) * 10 # uniform — no target det = RadarProcessor.cfar_1d(signal, guard=2, train=4, threshold_factor=3.0) self.assertEqual(det.sum(), 0, "Should have no detections in flat noise") class TestRadarProcessorProcessFrame(unittest.TestCase): def test_process_frame_returns_shapes(self): from v7.processing import RadarProcessor proc = RadarProcessor() frame = np.random.randn(64, 32) * 10 frame[20, 8] = 5000 # inject a target power, mask = proc.process_frame(frame) self.assertEqual(power.shape, (64, 32)) self.assertEqual(mask.shape, (64, 32)) self.assertEqual(mask.dtype, bool) class TestRadarProcessorWindowing(unittest.TestCase): def test_hann_window(self): from v7.processing import RadarProcessor data = np.ones((4, 32)) windowed = RadarProcessor.apply_window(data, "Hann") # Hann window tapers to ~0 at edges self.assertLess(windowed[0, 0], 0.1) self.assertGreater(windowed[0, 16], 0.5) def test_none_window(self): from v7.processing import RadarProcessor data = np.ones((4, 32)) result = RadarProcessor.apply_window(data, "None") np.testing.assert_array_equal(result, data) class TestRadarProcessorDCNotch(unittest.TestCase): def test_dc_removal(self): from v7.processing import RadarProcessor data = np.ones((4, 8)) * 100 data[0, :] += 50 # DC offset in range bin 0 result = RadarProcessor.dc_notch(data) # Mean along axis=1 should be ~0 row_means = np.mean(result, axis=1) for m in row_means: self.assertAlmostEqual(m, 0, places=10) class TestRadarProcessorClustering(unittest.TestCase): def test_clustering_empty(self): from v7.processing import RadarProcessor result = RadarProcessor.clustering([], eps=100, min_samples=2) self.assertEqual(result, []) class TestUSBPacketParser(unittest.TestCase): def test_parse_gps_text(self): from v7.processing import USBPacketParser parser = USBPacketParser() data = b"GPS:41.9028,12.4964,100.0,2.5\r\n" gps = parser.parse_gps_data(data) self.assertIsNotNone(gps) self.assertAlmostEqual(gps.latitude, 41.9028, places=3) self.assertAlmostEqual(gps.longitude, 12.4964, places=3) self.assertAlmostEqual(gps.altitude, 100.0) self.assertAlmostEqual(gps.pitch, 2.5) def test_parse_gps_text_invalid(self): from v7.processing import USBPacketParser parser = USBPacketParser() self.assertIsNone(parser.parse_gps_data(b"NOT_GPS_DATA")) self.assertIsNone(parser.parse_gps_data(b"")) self.assertIsNone(parser.parse_gps_data(None)) def test_parse_binary_gps(self): from v7.processing import USBPacketParser parser = USBPacketParser() # Build a valid binary GPS packet pkt = bytearray(b"GPSB") pkt += struct.pack(">d", 41.9028) # lat pkt += struct.pack(">d", 12.4964) # lon pkt += struct.pack(">f", 100.0) # alt pkt += struct.pack(">f", 2.5) # pitch # Simple checksum cksum = sum(pkt) & 0xFFFF pkt += struct.pack(">H", cksum) self.assertEqual(len(pkt), 30) gps = parser.parse_gps_data(bytes(pkt)) self.assertIsNotNone(gps) self.assertAlmostEqual(gps.latitude, 41.9028, places=3) def test_no_crc16_func_attribute(self): """crcmod was removed — USBPacketParser should not have crc16_func.""" from v7.processing import USBPacketParser parser = USBPacketParser() self.assertFalse(hasattr(parser, "crc16_func"), "crc16_func should be removed (crcmod dead code)") def test_no_multi_prf_unwrap(self): """multi_prf_unwrap was removed (never called, prf fields removed).""" from v7.processing import RadarProcessor self.assertFalse(hasattr(RadarProcessor, "multi_prf_unwrap"), "multi_prf_unwrap should be removed") # ============================================================================= # Test: v7.workers — polar_to_geographic # ============================================================================= def _pyqt6_available(): try: import PyQt6.QtCore # noqa: F401 return True except ImportError: return False @unittest.skipUnless(_pyqt6_available(), "PyQt6 not installed") class TestPolarToGeographic(unittest.TestCase): def test_north_bearing(self): from v7.workers import polar_to_geographic lat, lon = polar_to_geographic(0.0, 0.0, 1000.0, 0.0) # Moving 1km north from equator self.assertGreater(lat, 0.0) self.assertAlmostEqual(lon, 0.0, places=4) def test_east_bearing(self): from v7.workers import polar_to_geographic lat, lon = polar_to_geographic(0.0, 0.0, 1000.0, 90.0) self.assertAlmostEqual(lat, 0.0, places=4) self.assertGreater(lon, 0.0) def test_zero_range(self): from v7.workers import polar_to_geographic lat, lon = polar_to_geographic(41.9, 12.5, 0.0, 0.0) self.assertAlmostEqual(lat, 41.9, places=6) self.assertAlmostEqual(lon, 12.5, places=6) # ============================================================================= # Test: v7.hardware — production protocol re-exports # ============================================================================= class TestHardwareReExports(unittest.TestCase): """Verify hardware.py re-exports all production protocol classes.""" def test_exports(self): from v7.hardware import ( FT2232HConnection, RadarProtocol, STM32USBInterface, ) # Verify these are actual classes/types, not None self.assertTrue(callable(FT2232HConnection)) self.assertTrue(callable(RadarProtocol)) self.assertTrue(callable(STM32USBInterface)) def test_stm32_list_devices_no_crash(self): from v7.hardware import STM32USBInterface stm = STM32USBInterface() self.assertFalse(stm.is_open) # list_devices should return empty list (no USB in test env), not crash devs = stm.list_devices() self.assertIsInstance(devs, list) # ============================================================================= # Test: v7.__init__ — clean exports # ============================================================================= class TestV7Init(unittest.TestCase): """Verify top-level v7 package exports.""" def test_no_crcmod_export(self): import v7 self.assertFalse(hasattr(v7, "CRCMOD_AVAILABLE"), "CRCMOD_AVAILABLE should not be in v7.__all__") def test_key_exports(self): import v7 # Core exports (no PyQt6 required) for name in ["RadarTarget", "RadarSettings", "GPSData", "ProcessingConfig", "FT2232HConnection", "RadarProtocol", "RadarProcessor"]: self.assertTrue(hasattr(v7, name), f"v7 missing export: {name}") # PyQt6-dependent exports — only present when PyQt6 is installed if _pyqt6_available(): for name in ["RadarDataWorker", "RadarMapWidget", "RadarDashboard"]: self.assertTrue(hasattr(v7, name), f"v7 missing export: {name}") # ============================================================================= # Test: AGC Visualization data model # ============================================================================= class TestAGCVisualizationV7(unittest.TestCase): """AGC visualization ring buffer and data model tests (no Qt required).""" def _make_deque(self, maxlen=256): from collections import deque return deque(maxlen=maxlen) def test_ring_buffer_basics(self): d = self._make_deque(maxlen=4) for i in range(6): d.append(i) self.assertEqual(list(d), [2, 3, 4, 5]) def test_gain_range_4bit(self): """AGC gain is 4-bit (0-15).""" from radar_protocol import StatusResponse for g in [0, 7, 15]: sr = StatusResponse(agc_current_gain=g) self.assertEqual(sr.agc_current_gain, g) def test_peak_range_8bit(self): """Peak magnitude is 8-bit (0-255).""" from radar_protocol import StatusResponse for p in [0, 128, 255]: sr = StatusResponse(agc_peak_magnitude=p) self.assertEqual(sr.agc_peak_magnitude, p) def test_saturation_accumulation(self): """Saturation ring buffer sum tracks total events.""" sat = self._make_deque(maxlen=256) for s in [0, 5, 0, 10, 3]: sat.append(s) self.assertEqual(sum(sat), 18) def test_mode_label_logic(self): """AGC mode string from enable field.""" from radar_protocol import StatusResponse self.assertEqual( "AUTO" if StatusResponse(agc_enable=1).agc_enable else "MANUAL", "AUTO") self.assertEqual( "AUTO" if StatusResponse(agc_enable=0).agc_enable else "MANUAL", "MANUAL") def test_history_len_default(self): """Default history length should be 256.""" d = self._make_deque(maxlen=256) self.assertEqual(d.maxlen, 256) def test_color_thresholds(self): """Saturation color: green=0, warning=1-10, error>10.""" from v7.models import DARK_SUCCESS, DARK_WARNING, DARK_ERROR def pick_color(total): if total > 10: return DARK_ERROR if total > 0: return DARK_WARNING return DARK_SUCCESS self.assertEqual(pick_color(0), DARK_SUCCESS) self.assertEqual(pick_color(5), DARK_WARNING) self.assertEqual(pick_color(11), DARK_ERROR) # ============================================================================= # Test: v7.models.WaveformConfig # ============================================================================= class TestWaveformConfig(unittest.TestCase): """WaveformConfig dataclass and derived physical properties.""" def test_defaults(self): from v7.models import WaveformConfig wc = WaveformConfig() self.assertEqual(wc.sample_rate_hz, 4e6) self.assertEqual(wc.bandwidth_hz, 500e6) self.assertEqual(wc.chirp_duration_s, 300e-6) self.assertEqual(wc.center_freq_hz, 10.525e9) self.assertEqual(wc.n_range_bins, 64) self.assertEqual(wc.n_doppler_bins, 32) self.assertEqual(wc.fft_size, 1024) self.assertEqual(wc.decimation_factor, 16) def test_range_resolution(self): """range_resolution_m should be ~5.62 m/bin with ADI defaults.""" from v7.models import WaveformConfig wc = WaveformConfig() self.assertAlmostEqual(wc.range_resolution_m, 5.621, places=1) def test_velocity_resolution(self): """velocity_resolution_mps should be ~1.484 m/s/bin.""" from v7.models import WaveformConfig wc = WaveformConfig() self.assertAlmostEqual(wc.velocity_resolution_mps, 1.484, places=2) def test_max_range(self): """max_range_m = range_resolution * n_range_bins.""" from v7.models import WaveformConfig wc = WaveformConfig() self.assertAlmostEqual(wc.max_range_m, wc.range_resolution_m * 64, places=1) def test_max_velocity(self): """max_velocity_mps = velocity_resolution * n_doppler_bins / 2.""" from v7.models import WaveformConfig wc = WaveformConfig() self.assertAlmostEqual( wc.max_velocity_mps, wc.velocity_resolution_mps * 16, places=2, ) def test_custom_params(self): """Non-default parameters correctly change derived values.""" from v7.models import WaveformConfig wc1 = WaveformConfig() wc2 = WaveformConfig(bandwidth_hz=1e9) # double BW → halve range res self.assertAlmostEqual(wc2.range_resolution_m, wc1.range_resolution_m / 2, places=2) def test_zero_center_freq_velocity(self): """Zero center freq should cause ZeroDivisionError in velocity calc.""" from v7.models import WaveformConfig wc = WaveformConfig(center_freq_hz=0.0) with self.assertRaises(ZeroDivisionError): _ = wc.velocity_resolution_mps # ============================================================================= # Test: v7.software_fpga.SoftwareFPGA # ============================================================================= class TestSoftwareFPGA(unittest.TestCase): """SoftwareFPGA register interface and signal chain.""" def _make_fpga(self): from v7.software_fpga import SoftwareFPGA return SoftwareFPGA() def test_reset_defaults(self): """Register reset values match FPGA RTL (radar_system_top.v).""" fpga = self._make_fpga() self.assertEqual(fpga.detect_threshold, 10_000) self.assertEqual(fpga.gain_shift, 0) self.assertFalse(fpga.cfar_enable) self.assertEqual(fpga.cfar_guard, 2) self.assertEqual(fpga.cfar_train, 8) self.assertEqual(fpga.cfar_alpha, 0x30) self.assertEqual(fpga.cfar_mode, 0) self.assertFalse(fpga.mti_enable) self.assertEqual(fpga.dc_notch_width, 0) self.assertFalse(fpga.agc_enable) self.assertEqual(fpga.agc_target, 200) self.assertEqual(fpga.agc_attack, 1) self.assertEqual(fpga.agc_decay, 1) self.assertEqual(fpga.agc_holdoff, 4) def test_setter_detect_threshold(self): fpga = self._make_fpga() fpga.set_detect_threshold(5000) self.assertEqual(fpga.detect_threshold, 5000) def test_setter_detect_threshold_clamp_16bit(self): fpga = self._make_fpga() fpga.set_detect_threshold(0x1FFFF) # 17-bit self.assertEqual(fpga.detect_threshold, 0xFFFF) def test_setter_gain_shift_clamp_4bit(self): fpga = self._make_fpga() fpga.set_gain_shift(0xFF) self.assertEqual(fpga.gain_shift, 0x0F) def test_setter_cfar_enable(self): fpga = self._make_fpga() fpga.set_cfar_enable(True) self.assertTrue(fpga.cfar_enable) fpga.set_cfar_enable(False) self.assertFalse(fpga.cfar_enable) def test_setter_cfar_guard_clamp_4bit(self): fpga = self._make_fpga() fpga.set_cfar_guard(0x1F) self.assertEqual(fpga.cfar_guard, 0x0F) def test_setter_cfar_train_min_1(self): """CFAR train cells clamped to min 1.""" fpga = self._make_fpga() fpga.set_cfar_train(0) self.assertEqual(fpga.cfar_train, 1) def test_setter_cfar_train_clamp_5bit(self): fpga = self._make_fpga() fpga.set_cfar_train(0x3F) self.assertEqual(fpga.cfar_train, 0x1F) def test_setter_cfar_alpha_clamp_8bit(self): fpga = self._make_fpga() fpga.set_cfar_alpha(0x1FF) self.assertEqual(fpga.cfar_alpha, 0xFF) def test_setter_cfar_mode_clamp_2bit(self): fpga = self._make_fpga() fpga.set_cfar_mode(7) self.assertEqual(fpga.cfar_mode, 3) def test_setter_mti_enable(self): fpga = self._make_fpga() fpga.set_mti_enable(True) self.assertTrue(fpga.mti_enable) def test_setter_dc_notch_clamp_3bit(self): fpga = self._make_fpga() fpga.set_dc_notch_width(0xFF) self.assertEqual(fpga.dc_notch_width, 7) def test_setter_agc_params_selective(self): """set_agc_params only changes provided fields.""" fpga = self._make_fpga() fpga.set_agc_params(target=100) self.assertEqual(fpga.agc_target, 100) self.assertEqual(fpga.agc_attack, 1) # unchanged fpga.set_agc_params(attack=3, decay=5) self.assertEqual(fpga.agc_attack, 3) self.assertEqual(fpga.agc_decay, 5) self.assertEqual(fpga.agc_target, 100) # unchanged def test_setter_agc_params_clamp(self): fpga = self._make_fpga() fpga.set_agc_params(target=0xFFF, attack=0xFF, decay=0xFF, holdoff=0xFF) self.assertEqual(fpga.agc_target, 0xFF) self.assertEqual(fpga.agc_attack, 0x0F) self.assertEqual(fpga.agc_decay, 0x0F) self.assertEqual(fpga.agc_holdoff, 0x0F) class TestSoftwareFPGASignalChain(unittest.TestCase): """SoftwareFPGA.process_chirps with real co-sim data.""" COSIM_DIR = os.path.join( os.path.dirname(__file__), "..", "9_2_FPGA", "tb", "cosim", "real_data", "hex" ) def _cosim_available(self): return os.path.isfile(os.path.join(self.COSIM_DIR, "doppler_map_i.npy")) def test_process_chirps_returns_radar_frame(self): """process_chirps produces a RadarFrame with correct shapes.""" if not self._cosim_available(): self.skipTest("co-sim data not found") from v7.software_fpga import SoftwareFPGA from radar_protocol import RadarFrame # Load decimated range data as minimal input (32 chirps x 64 bins) dec_i = np.load(os.path.join(self.COSIM_DIR, "decimated_range_i.npy")) dec_q = np.load(os.path.join(self.COSIM_DIR, "decimated_range_q.npy")) # Build fake 1024-sample chirps from decimated data (pad with zeros) n_chirps = dec_i.shape[0] iq_i = np.zeros((n_chirps, 1024), dtype=np.int64) iq_q = np.zeros((n_chirps, 1024), dtype=np.int64) # Put decimated data into first 64 bins so FFT has something iq_i[:, :dec_i.shape[1]] = dec_i iq_q[:, :dec_q.shape[1]] = dec_q fpga = SoftwareFPGA() frame = fpga.process_chirps(iq_i, iq_q, frame_number=42, timestamp=1.0) self.assertIsInstance(frame, RadarFrame) self.assertEqual(frame.frame_number, 42) self.assertAlmostEqual(frame.timestamp, 1.0) self.assertEqual(frame.range_doppler_i.shape, (64, 32)) self.assertEqual(frame.range_doppler_q.shape, (64, 32)) self.assertEqual(frame.magnitude.shape, (64, 32)) self.assertEqual(frame.detections.shape, (64, 32)) self.assertEqual(frame.range_profile.shape, (64,)) self.assertEqual(frame.detection_count, int(frame.detections.sum())) def test_cfar_enable_changes_detections(self): """Enabling CFAR vs simple threshold should yield different detection counts.""" if not self._cosim_available(): self.skipTest("co-sim data not found") from v7.software_fpga import SoftwareFPGA iq_i = np.zeros((32, 1024), dtype=np.int64) iq_q = np.zeros((32, 1024), dtype=np.int64) # Inject a single strong tone in bin 10 of every chirp iq_i[:, 10] = 5000 iq_q[:, 10] = 3000 fpga_thresh = SoftwareFPGA() fpga_thresh.set_detect_threshold(1) # very low → many detections frame_thresh = fpga_thresh.process_chirps(iq_i, iq_q) fpga_cfar = SoftwareFPGA() fpga_cfar.set_cfar_enable(True) fpga_cfar.set_cfar_alpha(0x10) # low alpha → more detections frame_cfar = fpga_cfar.process_chirps(iq_i, iq_q) # Just verify both produce valid frames — exact counts depend on chain self.assertIsNotNone(frame_thresh) self.assertIsNotNone(frame_cfar) self.assertEqual(frame_thresh.magnitude.shape, (64, 32)) self.assertEqual(frame_cfar.magnitude.shape, (64, 32)) class TestQuantizeRawIQ(unittest.TestCase): """quantize_raw_iq utility function.""" def test_3d_input(self): """3-D (frames, chirps, samples) → uses first frame.""" from v7.software_fpga import quantize_raw_iq raw = np.random.randn(5, 32, 1024) + 1j * np.random.randn(5, 32, 1024) iq_i, iq_q = quantize_raw_iq(raw) self.assertEqual(iq_i.shape, (32, 1024)) self.assertEqual(iq_q.shape, (32, 1024)) self.assertTrue(np.all(np.abs(iq_i) <= 32767)) self.assertTrue(np.all(np.abs(iq_q) <= 32767)) def test_2d_input(self): """2-D (chirps, samples) → works directly.""" from v7.software_fpga import quantize_raw_iq raw = np.random.randn(32, 1024) + 1j * np.random.randn(32, 1024) iq_i, _iq_q = quantize_raw_iq(raw) self.assertEqual(iq_i.shape, (32, 1024)) def test_zero_input(self): """All-zero complex input → all-zero output.""" from v7.software_fpga import quantize_raw_iq raw = np.zeros((32, 1024), dtype=np.complex128) iq_i, iq_q = quantize_raw_iq(raw) self.assertTrue(np.all(iq_i == 0)) self.assertTrue(np.all(iq_q == 0)) def test_peak_target_scaling(self): """Peak of output should be near peak_target.""" from v7.software_fpga import quantize_raw_iq raw = np.zeros((32, 1024), dtype=np.complex128) raw[0, 0] = 1.0 + 0j # single peak iq_i, _iq_q = quantize_raw_iq(raw, peak_target=500) # The peak I value should be exactly 500 (sole max) self.assertEqual(int(iq_i[0, 0]), 500) # ============================================================================= # Test: v7.replay (ReplayEngine, detect_format) # ============================================================================= class TestDetectFormat(unittest.TestCase): """detect_format auto-detection logic.""" COSIM_DIR = os.path.join( os.path.dirname(__file__), "..", "9_2_FPGA", "tb", "cosim", "real_data", "hex" ) def test_cosim_dir(self): if not os.path.isdir(self.COSIM_DIR): self.skipTest("co-sim dir not found") from v7.replay import detect_format, ReplayFormat self.assertEqual(detect_format(self.COSIM_DIR), ReplayFormat.COSIM_DIR) def test_npy_file(self): """A .npy file → RAW_IQ_NPY.""" from v7.replay import detect_format, ReplayFormat import tempfile with tempfile.NamedTemporaryFile(suffix=".npy", delete=False) as f: np.save(f, np.zeros((2, 32, 1024), dtype=np.complex128)) tmp = f.name try: self.assertEqual(detect_format(tmp), ReplayFormat.RAW_IQ_NPY) finally: os.unlink(tmp) def test_h5_file(self): """A .h5 file → HDF5.""" from v7.replay import detect_format, ReplayFormat self.assertEqual(detect_format("/tmp/fake_recording.h5"), ReplayFormat.HDF5) def test_unknown_extension_raises(self): from v7.replay import detect_format with self.assertRaises(ValueError): detect_format("/tmp/data.csv") def test_empty_dir_raises(self): """Directory without co-sim files → ValueError.""" from v7.replay import detect_format import tempfile with tempfile.TemporaryDirectory() as td, self.assertRaises(ValueError): detect_format(td) class TestReplayEngineCosim(unittest.TestCase): """ReplayEngine loading from FPGA co-sim directory.""" COSIM_DIR = os.path.join( os.path.dirname(__file__), "..", "9_2_FPGA", "tb", "cosim", "real_data", "hex" ) def _available(self): return os.path.isfile(os.path.join(self.COSIM_DIR, "doppler_map_i.npy")) def test_load_cosim(self): if not self._available(): self.skipTest("co-sim data not found") from v7.replay import ReplayEngine, ReplayFormat engine = ReplayEngine(self.COSIM_DIR) self.assertEqual(engine.fmt, ReplayFormat.COSIM_DIR) self.assertEqual(engine.total_frames, 1) def test_get_frame_cosim(self): if not self._available(): self.skipTest("co-sim data not found") from v7.replay import ReplayEngine from radar_protocol import RadarFrame engine = ReplayEngine(self.COSIM_DIR) frame = engine.get_frame(0) self.assertIsInstance(frame, RadarFrame) self.assertEqual(frame.range_doppler_i.shape, (64, 32)) self.assertEqual(frame.magnitude.shape, (64, 32)) def test_get_frame_out_of_range(self): if not self._available(): self.skipTest("co-sim data not found") from v7.replay import ReplayEngine engine = ReplayEngine(self.COSIM_DIR) with self.assertRaises(IndexError): engine.get_frame(1) with self.assertRaises(IndexError): engine.get_frame(-1) class TestReplayEngineRawIQ(unittest.TestCase): """ReplayEngine loading from raw IQ .npy cube.""" def test_load_raw_iq_synthetic(self): """Synthetic raw IQ cube loads and produces correct frame count.""" import tempfile from v7.replay import ReplayEngine, ReplayFormat from v7.software_fpga import SoftwareFPGA raw = np.random.randn(3, 32, 1024) + 1j * np.random.randn(3, 32, 1024) with tempfile.NamedTemporaryFile(suffix=".npy", delete=False) as f: np.save(f, raw) tmp = f.name try: fpga = SoftwareFPGA() engine = ReplayEngine(tmp, software_fpga=fpga) self.assertEqual(engine.fmt, ReplayFormat.RAW_IQ_NPY) self.assertEqual(engine.total_frames, 3) finally: os.unlink(tmp) def test_get_frame_raw_iq_synthetic(self): """get_frame on raw IQ runs SoftwareFPGA and returns RadarFrame.""" import tempfile from v7.replay import ReplayEngine from v7.software_fpga import SoftwareFPGA from radar_protocol import RadarFrame raw = np.random.randn(2, 32, 1024) + 1j * np.random.randn(2, 32, 1024) with tempfile.NamedTemporaryFile(suffix=".npy", delete=False) as f: np.save(f, raw) tmp = f.name try: fpga = SoftwareFPGA() engine = ReplayEngine(tmp, software_fpga=fpga) frame = engine.get_frame(0) self.assertIsInstance(frame, RadarFrame) self.assertEqual(frame.range_doppler_i.shape, (64, 32)) self.assertEqual(frame.frame_number, 0) finally: os.unlink(tmp) def test_raw_iq_no_fpga_raises(self): """Raw IQ get_frame without SoftwareFPGA → RuntimeError.""" import tempfile from v7.replay import ReplayEngine raw = np.random.randn(1, 32, 1024) + 1j * np.random.randn(1, 32, 1024) with tempfile.NamedTemporaryFile(suffix=".npy", delete=False) as f: np.save(f, raw) tmp = f.name try: engine = ReplayEngine(tmp) with self.assertRaises(RuntimeError): engine.get_frame(0) finally: os.unlink(tmp) class TestReplayEngineHDF5(unittest.TestCase): """ReplayEngine loading from HDF5 recordings.""" def _skip_no_h5py(self): try: import h5py # noqa: F401 except ImportError: self.skipTest("h5py not installed") def test_load_hdf5_synthetic(self): """Synthetic HDF5 loads and iterates frames.""" self._skip_no_h5py() import tempfile import h5py from v7.replay import ReplayEngine, ReplayFormat from radar_protocol import RadarFrame with tempfile.NamedTemporaryFile(suffix=".h5", delete=False) as f: tmp = f.name try: with h5py.File(tmp, "w") as hf: hf.attrs["creator"] = "test" hf.attrs["range_bins"] = 64 hf.attrs["doppler_bins"] = 32 grp = hf.create_group("frames") for i in range(3): fg = grp.create_group(f"frame_{i:06d}") fg.attrs["timestamp"] = float(i) fg.attrs["frame_number"] = i fg.attrs["detection_count"] = 0 fg.create_dataset("range_doppler_i", data=np.zeros((64, 32), dtype=np.int16)) fg.create_dataset("range_doppler_q", data=np.zeros((64, 32), dtype=np.int16)) fg.create_dataset("magnitude", data=np.zeros((64, 32), dtype=np.float64)) fg.create_dataset("detections", data=np.zeros((64, 32), dtype=np.uint8)) fg.create_dataset("range_profile", data=np.zeros(64, dtype=np.float64)) engine = ReplayEngine(tmp) self.assertEqual(engine.fmt, ReplayFormat.HDF5) self.assertEqual(engine.total_frames, 3) frame = engine.get_frame(1) self.assertIsInstance(frame, RadarFrame) self.assertEqual(frame.frame_number, 1) self.assertEqual(frame.range_doppler_i.shape, (64, 32)) engine.close() finally: os.unlink(tmp) # ============================================================================= # Test: v7.processing.extract_targets_from_frame # ============================================================================= class TestExtractTargetsFromFrame(unittest.TestCase): """extract_targets_from_frame bin-to-physical conversion.""" def _make_frame(self, det_cells=None): """Create a minimal RadarFrame with optional detection cells.""" from radar_protocol import RadarFrame frame = RadarFrame() if det_cells: for rbin, dbin in det_cells: frame.detections[rbin, dbin] = 1 frame.magnitude[rbin, dbin] = 1000.0 frame.detection_count = int(frame.detections.sum()) frame.timestamp = 1.0 return frame def test_no_detections(self): from v7.processing import extract_targets_from_frame frame = self._make_frame() targets = extract_targets_from_frame(frame) self.assertEqual(len(targets), 0) def test_single_detection_range(self): """Detection at range bin 10 → range = 10 * range_resolution.""" from v7.processing import extract_targets_from_frame frame = self._make_frame(det_cells=[(10, 16)]) # dbin=16 = center → vel=0 targets = extract_targets_from_frame(frame, range_resolution=5.621) self.assertEqual(len(targets), 1) self.assertAlmostEqual(targets[0].range, 10 * 5.621, places=2) self.assertAlmostEqual(targets[0].velocity, 0.0, places=2) def test_velocity_sign(self): """Doppler bin < center → negative velocity, > center → positive.""" from v7.processing import extract_targets_from_frame frame = self._make_frame(det_cells=[(5, 10), (5, 20)]) targets = extract_targets_from_frame(frame, velocity_resolution=1.484) # dbin=10: vel = (10-16)*1.484 = -8.904 (approaching) # dbin=20: vel = (20-16)*1.484 = +5.936 (receding) self.assertLess(targets[0].velocity, 0) self.assertGreater(targets[1].velocity, 0) def test_snr_positive_for_nonzero_mag(self): from v7.processing import extract_targets_from_frame frame = self._make_frame(det_cells=[(3, 16)]) targets = extract_targets_from_frame(frame) self.assertGreater(targets[0].snr, 0) def test_gps_georef(self): """With GPS data, targets get non-zero lat/lon.""" from v7.processing import extract_targets_from_frame from v7.models import GPSData gps = GPSData(latitude=41.9, longitude=12.5, altitude=0.0, pitch=0.0, heading=90.0) frame = self._make_frame(det_cells=[(10, 16)]) targets = extract_targets_from_frame( frame, range_resolution=100.0, gps=gps) # Should be roughly east of radar position self.assertAlmostEqual(targets[0].latitude, 41.9, places=2) self.assertGreater(targets[0].longitude, 12.5) def test_multiple_detections(self): from v7.processing import extract_targets_from_frame frame = self._make_frame(det_cells=[(0, 0), (10, 10), (63, 31)]) targets = extract_targets_from_frame(frame) self.assertEqual(len(targets), 3) # IDs should be sequential 0, 1, 2 self.assertEqual([t.id for t in targets], [0, 1, 2]) # ============================================================================= # Helper: lazy import of v7.models # ============================================================================= def _models(): import v7.models return v7.models if __name__ == "__main__": unittest.main()