Files
PLFM_RADAR/9_Firmware/9_3_GUI/test_radar_dashboard.py
T
Jason 3ef6416e3f feat: AGC phase 7 — AGC Monitor visualization tab with throttled redraws
Add AGC Monitor tab to both tkinter and PyQt6 dashboards with:
- Real-time strip charts: gain history, peak magnitude, saturation count
- Color-coded indicator labels (green/yellow/red thresholds)
- Ring buffer architecture (deque maxlen=256, ~60s at 10 Hz)
- Fill-between saturation area with auto-scaling Y axis
- Throttled matplotlib redraws (500ms interval via time.monotonic)
  to prevent GUI hang from 20 Hz mock-mode status packets

Tests: 82 dashboard + 38 v7 = 120 total, all passing. Ruff: clean.
2026-04-13 20:42:01 +05:45

951 lines
37 KiB
Python

#!/usr/bin/env python3
"""
Tests for AERIS-10 Radar Dashboard protocol parsing, command building,
data recording, and acquisition logic.
Run: python -m pytest test_radar_dashboard.py -v
or: python test_radar_dashboard.py
"""
import struct
import time
import queue
import os
import tempfile
import unittest
import numpy as np
from radar_protocol import (
RadarProtocol, FT2232HConnection, DataRecorder, RadarAcquisition,
RadarFrame, StatusResponse, Opcode,
HEADER_BYTE, FOOTER_BYTE, STATUS_HEADER_BYTE,
NUM_RANGE_BINS, NUM_DOPPLER_BINS, NUM_CELLS,
DATA_PACKET_SIZE,
_HARDWARE_ONLY_OPCODES,
)
class TestRadarProtocol(unittest.TestCase):
"""Test packet parsing and command building against usb_data_interface.v."""
# ----------------------------------------------------------------
# Command building
# ----------------------------------------------------------------
def test_build_command_trigger(self):
"""Opcode 0x01, value 1 → {0x01, 0x00, 0x0001}."""
cmd = RadarProtocol.build_command(0x01, 1)
self.assertEqual(len(cmd), 4)
word = struct.unpack(">I", cmd)[0]
self.assertEqual((word >> 24) & 0xFF, 0x01) # opcode
self.assertEqual((word >> 16) & 0xFF, 0x00) # addr
self.assertEqual(word & 0xFFFF, 1) # value
def test_build_command_cfar_alpha(self):
"""Opcode 0x23, value 0x30 (alpha=3.0 Q4.4)."""
cmd = RadarProtocol.build_command(0x23, 0x30)
word = struct.unpack(">I", cmd)[0]
self.assertEqual((word >> 24) & 0xFF, 0x23)
self.assertEqual(word & 0xFFFF, 0x30)
def test_build_command_status_request(self):
"""Opcode 0xFF, value 0."""
cmd = RadarProtocol.build_command(0xFF, 0)
word = struct.unpack(">I", cmd)[0]
self.assertEqual((word >> 24) & 0xFF, 0xFF)
self.assertEqual(word & 0xFFFF, 0)
def test_build_command_with_addr(self):
"""Command with non-zero addr field."""
cmd = RadarProtocol.build_command(0x10, 500, addr=0x42)
word = struct.unpack(">I", cmd)[0]
self.assertEqual((word >> 24) & 0xFF, 0x10)
self.assertEqual((word >> 16) & 0xFF, 0x42)
self.assertEqual(word & 0xFFFF, 500)
def test_build_command_value_clamp(self):
"""Value > 0xFFFF should be masked to 16 bits."""
cmd = RadarProtocol.build_command(0x01, 0x1FFFF)
word = struct.unpack(">I", cmd)[0]
self.assertEqual(word & 0xFFFF, 0xFFFF)
# ----------------------------------------------------------------
# Data packet parsing
# ----------------------------------------------------------------
def _make_data_packet(self, range_i=100, range_q=200,
dop_i=300, dop_q=400, detection=0):
"""Build a synthetic 11-byte data packet matching FT2232H format."""
pkt = bytearray()
pkt.append(HEADER_BYTE)
pkt += struct.pack(">h", range_q & 0xFFFF if range_q >= 0 else range_q)
pkt += struct.pack(">h", range_i & 0xFFFF if range_i >= 0 else range_i)
pkt += struct.pack(">h", dop_i & 0xFFFF if dop_i >= 0 else dop_i)
pkt += struct.pack(">h", dop_q & 0xFFFF if dop_q >= 0 else dop_q)
pkt.append(detection & 0x01)
pkt.append(FOOTER_BYTE)
return bytes(pkt)
def test_parse_data_packet_basic(self):
raw = self._make_data_packet(100, 200, 300, 400, 0)
result = RadarProtocol.parse_data_packet(raw)
self.assertIsNotNone(result)
self.assertEqual(result["range_i"], 100)
self.assertEqual(result["range_q"], 200)
self.assertEqual(result["doppler_i"], 300)
self.assertEqual(result["doppler_q"], 400)
self.assertEqual(result["detection"], 0)
def test_parse_data_packet_with_detection(self):
raw = self._make_data_packet(0, 0, 0, 0, 1)
result = RadarProtocol.parse_data_packet(raw)
self.assertIsNotNone(result)
self.assertEqual(result["detection"], 1)
def test_parse_data_packet_negative_values(self):
"""Signed 16-bit values should round-trip correctly."""
raw = self._make_data_packet(-1000, -2000, -500, 32000, 0)
result = RadarProtocol.parse_data_packet(raw)
self.assertIsNotNone(result)
self.assertEqual(result["range_i"], -1000)
self.assertEqual(result["range_q"], -2000)
self.assertEqual(result["doppler_i"], -500)
self.assertEqual(result["doppler_q"], 32000)
def test_parse_data_packet_too_short(self):
self.assertIsNone(RadarProtocol.parse_data_packet(b"\xAA\x00"))
def test_parse_data_packet_wrong_header(self):
raw = self._make_data_packet()
bad = b"\x00" + raw[1:]
self.assertIsNone(RadarProtocol.parse_data_packet(bad))
# ----------------------------------------------------------------
# Status packet parsing
# ----------------------------------------------------------------
def _make_status_packet(self, mode=1, stream=7, threshold=10000,
long_chirp=3000, long_listen=13700,
guard=17540, short_chirp=50,
short_listen=17450, chirps=32, range_mode=0,
st_flags=0, st_detail=0, st_busy=0,
agc_gain=0, agc_peak=0, agc_sat=0, agc_enable=0):
"""Build a 26-byte status response matching FPGA format (Build 26)."""
pkt = bytearray()
pkt.append(STATUS_HEADER_BYTE)
# Word 0: {0xFF[31:24], mode[23:22], stream[21:19], 3'b000[18:16], threshold[15:0]}
w0 = (0xFF << 24) | ((mode & 0x03) << 22) | ((stream & 0x07) << 19) | (threshold & 0xFFFF)
pkt += struct.pack(">I", w0)
# Word 1: {long_chirp, long_listen}
w1 = ((long_chirp & 0xFFFF) << 16) | (long_listen & 0xFFFF)
pkt += struct.pack(">I", w1)
# Word 2: {guard, short_chirp}
w2 = ((guard & 0xFFFF) << 16) | (short_chirp & 0xFFFF)
pkt += struct.pack(">I", w2)
# Word 3: {short_listen, 10'd0, chirps[5:0]}
w3 = ((short_listen & 0xFFFF) << 16) | (chirps & 0x3F)
pkt += struct.pack(">I", w3)
# Word 4: {agc_current_gain[3:0], agc_peak_magnitude[7:0],
# agc_saturation_count[7:0], agc_enable, 9'd0, range_mode[1:0]}
w4 = (((agc_gain & 0x0F) << 28) | ((agc_peak & 0xFF) << 20) |
((agc_sat & 0xFF) << 12) | ((agc_enable & 0x01) << 11) |
(range_mode & 0x03))
pkt += struct.pack(">I", w4)
# Word 5: {7'd0, self_test_busy, 8'd0, self_test_detail[7:0],
# 3'd0, self_test_flags[4:0]}
w5 = ((st_busy & 0x01) << 24) | ((st_detail & 0xFF) << 8) | (st_flags & 0x1F)
pkt += struct.pack(">I", w5)
pkt.append(FOOTER_BYTE)
return bytes(pkt)
def test_parse_status_defaults(self):
raw = self._make_status_packet()
sr = RadarProtocol.parse_status_packet(raw)
self.assertIsNotNone(sr)
self.assertEqual(sr.radar_mode, 1)
self.assertEqual(sr.stream_ctrl, 7)
self.assertEqual(sr.cfar_threshold, 10000)
self.assertEqual(sr.long_chirp, 3000)
self.assertEqual(sr.long_listen, 13700)
self.assertEqual(sr.guard, 17540)
self.assertEqual(sr.short_chirp, 50)
self.assertEqual(sr.short_listen, 17450)
self.assertEqual(sr.chirps_per_elev, 32)
self.assertEqual(sr.range_mode, 0)
def test_parse_status_range_mode(self):
raw = self._make_status_packet(range_mode=2)
sr = RadarProtocol.parse_status_packet(raw)
self.assertEqual(sr.range_mode, 2)
def test_parse_status_too_short(self):
self.assertIsNone(RadarProtocol.parse_status_packet(b"\xBB" + b"\x00" * 20))
def test_parse_status_wrong_header(self):
raw = self._make_status_packet()
bad = b"\xAA" + raw[1:]
self.assertIsNone(RadarProtocol.parse_status_packet(bad))
def test_parse_status_wrong_footer(self):
raw = bytearray(self._make_status_packet())
raw[25] = 0x00 # corrupt footer (was at index 21 in old 5-word format)
self.assertIsNone(RadarProtocol.parse_status_packet(bytes(raw)))
def test_parse_status_self_test_all_pass(self):
"""Status with all self-test flags set (all tests pass)."""
raw = self._make_status_packet(st_flags=0x1F, st_detail=0xA5, st_busy=0)
sr = RadarProtocol.parse_status_packet(raw)
self.assertIsNotNone(sr)
self.assertEqual(sr.self_test_flags, 0x1F)
self.assertEqual(sr.self_test_detail, 0xA5)
self.assertEqual(sr.self_test_busy, 0)
def test_parse_status_self_test_busy(self):
"""Status with self-test busy flag set."""
raw = self._make_status_packet(st_flags=0x00, st_detail=0x00, st_busy=1)
sr = RadarProtocol.parse_status_packet(raw)
self.assertIsNotNone(sr)
self.assertEqual(sr.self_test_busy, 1)
self.assertEqual(sr.self_test_flags, 0)
self.assertEqual(sr.self_test_detail, 0)
def test_parse_status_self_test_partial_fail(self):
"""Status with partial self-test failures (flags=0b10110)."""
raw = self._make_status_packet(st_flags=0b10110, st_detail=0x42, st_busy=0)
sr = RadarProtocol.parse_status_packet(raw)
self.assertIsNotNone(sr)
self.assertEqual(sr.self_test_flags, 0b10110)
self.assertEqual(sr.self_test_detail, 0x42)
self.assertEqual(sr.self_test_busy, 0)
# T0 (BRAM) failed, T1 (CIC) passed, T2 (FFT) passed, T3 (arith) failed, T4 (ADC) passed
self.assertFalse(sr.self_test_flags & 0x01) # T0 fail
self.assertTrue(sr.self_test_flags & 0x02) # T1 pass
self.assertTrue(sr.self_test_flags & 0x04) # T2 pass
self.assertFalse(sr.self_test_flags & 0x08) # T3 fail
self.assertTrue(sr.self_test_flags & 0x10) # T4 pass
def test_parse_status_self_test_zero_word5(self):
"""Status with zero word 5 (self-test never run)."""
raw = self._make_status_packet()
sr = RadarProtocol.parse_status_packet(raw)
self.assertEqual(sr.self_test_flags, 0)
self.assertEqual(sr.self_test_detail, 0)
self.assertEqual(sr.self_test_busy, 0)
def test_status_packet_is_26_bytes(self):
"""Verify status packet is exactly 26 bytes."""
raw = self._make_status_packet()
self.assertEqual(len(raw), 26)
# ----------------------------------------------------------------
# Boundary detection
# ----------------------------------------------------------------
def test_find_boundaries_mixed(self):
data_pkt = self._make_data_packet()
status_pkt = self._make_status_packet()
buf = b"\x00\x00" + data_pkt + b"\x00" + status_pkt + data_pkt
boundaries = RadarProtocol.find_packet_boundaries(buf)
self.assertEqual(len(boundaries), 3)
self.assertEqual(boundaries[0][2], "data")
self.assertEqual(boundaries[1][2], "status")
self.assertEqual(boundaries[2][2], "data")
def test_find_boundaries_empty(self):
self.assertEqual(RadarProtocol.find_packet_boundaries(b""), [])
def test_find_boundaries_truncated(self):
"""Truncated packet should not be returned."""
data_pkt = self._make_data_packet()
buf = data_pkt[:6] # truncated (less than 11-byte packet size)
boundaries = RadarProtocol.find_packet_boundaries(buf)
self.assertEqual(len(boundaries), 0)
class TestFT2232HConnection(unittest.TestCase):
"""Test mock FT2232H connection."""
def test_mock_open_close(self):
conn = FT2232HConnection(mock=True)
self.assertTrue(conn.open())
self.assertTrue(conn.is_open)
conn.close()
self.assertFalse(conn.is_open)
def test_mock_read_returns_data(self):
conn = FT2232HConnection(mock=True)
conn.open()
data = conn.read(4096)
self.assertIsNotNone(data)
self.assertGreater(len(data), 0)
conn.close()
def test_mock_read_contains_valid_packets(self):
"""Mock data should contain parseable data packets."""
conn = FT2232HConnection(mock=True)
conn.open()
raw = conn.read(4096)
packets = RadarProtocol.find_packet_boundaries(raw)
self.assertGreater(len(packets), 0)
for start, end, ptype in packets:
if ptype == "data":
result = RadarProtocol.parse_data_packet(raw[start:end])
self.assertIsNotNone(result)
conn.close()
def test_mock_write(self):
conn = FT2232HConnection(mock=True)
conn.open()
cmd = RadarProtocol.build_command(0x01, 1)
self.assertTrue(conn.write(cmd))
conn.close()
def test_read_when_closed(self):
conn = FT2232HConnection(mock=True)
self.assertIsNone(conn.read())
def test_write_when_closed(self):
conn = FT2232HConnection(mock=True)
self.assertFalse(conn.write(b"\x00\x00\x00\x00"))
class TestDataRecorder(unittest.TestCase):
"""Test HDF5 recording (skipped if h5py not available)."""
def setUp(self):
self.tmpdir = tempfile.mkdtemp()
self.filepath = os.path.join(self.tmpdir, "test_recording.h5")
def tearDown(self):
if os.path.exists(self.filepath):
os.remove(self.filepath)
os.rmdir(self.tmpdir)
@unittest.skipUnless(
(lambda: (
__import__("importlib.util")
and __import__("importlib").util.find_spec("h5py") is not None
))(),
"h5py not installed"
)
def test_record_and_stop(self):
import h5py
rec = DataRecorder()
rec.start(self.filepath)
self.assertTrue(rec.recording)
# Record 3 frames
for i in range(3):
frame = RadarFrame()
frame.frame_number = i
frame.timestamp = time.time()
frame.magnitude = np.random.rand(NUM_RANGE_BINS, NUM_DOPPLER_BINS)
frame.range_profile = np.random.rand(NUM_RANGE_BINS)
rec.record_frame(frame)
rec.stop()
self.assertFalse(rec.recording)
# Verify HDF5 contents
with h5py.File(self.filepath, "r") as f:
self.assertEqual(f.attrs["total_frames"], 3)
self.assertIn("frames", f)
self.assertIn("frame_000000", f["frames"])
self.assertIn("frame_000002", f["frames"])
mag = f["frames/frame_000001/magnitude"][:]
self.assertEqual(mag.shape, (NUM_RANGE_BINS, NUM_DOPPLER_BINS))
class TestRadarAcquisition(unittest.TestCase):
"""Test acquisition thread with mock connection."""
def test_acquisition_produces_frames(self):
conn = FT2232HConnection(mock=True)
conn.open()
fq = queue.Queue(maxsize=16)
acq = RadarAcquisition(conn, fq)
acq.start()
# Wait for at least one frame (mock produces ~32 samples per read,
# need 2048 for a full frame, so may take a few seconds)
frame = None
try: # noqa: SIM105
frame = fq.get(timeout=10)
except queue.Empty:
pass
acq.stop()
acq.join(timeout=3)
conn.close()
# With mock data producing 32 packets per read at 50ms interval,
# a full frame (2048 samples) takes ~3.2s. Allow up to 10s.
if frame is not None:
self.assertIsInstance(frame, RadarFrame)
self.assertEqual(frame.magnitude.shape,
(NUM_RANGE_BINS, NUM_DOPPLER_BINS))
# If no frame arrived in timeout, that's still OK for a fast CI run
def test_acquisition_stop(self):
conn = FT2232HConnection(mock=True)
conn.open()
fq = queue.Queue(maxsize=4)
acq = RadarAcquisition(conn, fq)
acq.start()
time.sleep(0.2)
acq.stop()
acq.join(timeout=3)
self.assertFalse(acq.is_alive())
conn.close()
class TestRadarFrameDefaults(unittest.TestCase):
"""Test RadarFrame default initialization."""
def test_default_shapes(self):
f = RadarFrame()
self.assertEqual(f.range_doppler_i.shape, (64, 32))
self.assertEqual(f.range_doppler_q.shape, (64, 32))
self.assertEqual(f.magnitude.shape, (64, 32))
self.assertEqual(f.detections.shape, (64, 32))
self.assertEqual(f.range_profile.shape, (64,))
self.assertEqual(f.detection_count, 0)
def test_default_zeros(self):
f = RadarFrame()
self.assertTrue(np.all(f.magnitude == 0))
self.assertTrue(np.all(f.detections == 0))
class TestEndToEnd(unittest.TestCase):
"""End-to-end: build command → parse response → verify round-trip."""
def test_command_roundtrip_all_opcodes(self):
"""Verify all opcodes produce valid 4-byte commands."""
opcodes = [0x01, 0x02, 0x03, 0x04, 0x10, 0x11, 0x12,
0x13, 0x14, 0x15, 0x16, 0x20, 0x21, 0x22, 0x23, 0x24, 0x25,
0x26, 0x27, 0x30, 0x31, 0xFF]
for op in opcodes:
cmd = RadarProtocol.build_command(op, 42)
self.assertEqual(len(cmd), 4, f"opcode 0x{op:02X}")
word = struct.unpack(">I", cmd)[0]
self.assertEqual((word >> 24) & 0xFF, op)
self.assertEqual(word & 0xFFFF, 42)
def test_data_packet_roundtrip(self):
"""Build an 11-byte data packet, parse it, verify values match."""
ri, rq, di, dq = 1234, -5678, 9012, -3456
pkt = bytearray()
pkt.append(HEADER_BYTE)
pkt += struct.pack(">h", rq)
pkt += struct.pack(">h", ri)
pkt += struct.pack(">h", di)
pkt += struct.pack(">h", dq)
pkt.append(1)
pkt.append(FOOTER_BYTE)
self.assertEqual(len(pkt), DATA_PACKET_SIZE)
result = RadarProtocol.parse_data_packet(bytes(pkt))
self.assertIsNotNone(result)
self.assertEqual(result["range_i"], ri)
self.assertEqual(result["range_q"], rq)
self.assertEqual(result["doppler_i"], di)
self.assertEqual(result["doppler_q"], dq)
self.assertEqual(result["detection"], 1)
class TestReplayConnection(unittest.TestCase):
"""Test ReplayConnection with real .npy data files."""
NPY_DIR = os.path.join(
os.path.dirname(__file__), "..", "9_2_FPGA", "tb", "cosim",
"real_data", "hex"
)
def _npy_available(self):
"""Check if the npy data files exist."""
return os.path.isfile(os.path.join(self.NPY_DIR,
"fullchain_mti_doppler_i.npy"))
def test_replay_open_close(self):
"""ReplayConnection opens and closes without error."""
if not self._npy_available():
self.skipTest("npy data files not found")
from radar_protocol import ReplayConnection
conn = ReplayConnection(self.NPY_DIR, use_mti=True)
self.assertTrue(conn.open())
self.assertTrue(conn.is_open)
conn.close()
self.assertFalse(conn.is_open)
def test_replay_packet_count(self):
"""Replay builds exactly NUM_CELLS (2048) packets."""
if not self._npy_available():
self.skipTest("npy data files not found")
from radar_protocol import ReplayConnection
conn = ReplayConnection(self.NPY_DIR, use_mti=True)
conn.open()
# Each packet is 11 bytes, total = 2048 * 11
expected_bytes = NUM_CELLS * DATA_PACKET_SIZE
self.assertEqual(conn._frame_len, expected_bytes)
conn.close()
def test_replay_packets_parseable(self):
"""Every packet from replay can be parsed by RadarProtocol."""
if not self._npy_available():
self.skipTest("npy data files not found")
from radar_protocol import ReplayConnection
conn = ReplayConnection(self.NPY_DIR, use_mti=True)
conn.open()
raw = conn._packets
boundaries = RadarProtocol.find_packet_boundaries(raw)
self.assertEqual(len(boundaries), NUM_CELLS)
parsed_count = 0
det_count = 0
for start, end, ptype in boundaries:
self.assertEqual(ptype, "data")
result = RadarProtocol.parse_data_packet(raw[start:end])
self.assertIsNotNone(result)
parsed_count += 1
if result["detection"]:
det_count += 1
self.assertEqual(parsed_count, NUM_CELLS)
# Default: MTI=ON, DC_notch=2, CFAR CA g=2 t=8 a=0x30 → 4 detections
self.assertEqual(det_count, 4)
conn.close()
def test_replay_read_loops(self):
"""Read returns data and loops back around."""
if not self._npy_available():
self.skipTest("npy data files not found")
from radar_protocol import ReplayConnection
conn = ReplayConnection(self.NPY_DIR, use_mti=True, replay_fps=1000)
conn.open()
total_read = 0
for _ in range(100):
chunk = conn.read(1024)
self.assertIsNotNone(chunk)
total_read += len(chunk)
self.assertGreater(total_read, 0)
conn.close()
def test_replay_no_mti(self):
"""ReplayConnection works with use_mti=False (CFAR still runs)."""
if not self._npy_available():
self.skipTest("npy data files not found")
from radar_protocol import ReplayConnection
conn = ReplayConnection(self.NPY_DIR, use_mti=False)
conn.open()
self.assertEqual(conn._frame_len, NUM_CELLS * DATA_PACKET_SIZE)
# No-MTI with DC notch=2 and default CFAR → 0 detections
raw = conn._packets
boundaries = RadarProtocol.find_packet_boundaries(raw)
det_count = sum(1 for s, e, t in boundaries
if RadarProtocol.parse_data_packet(raw[s:e]).get("detection", 0))
self.assertEqual(det_count, 0)
conn.close()
def test_replay_write_returns_true(self):
"""Write on replay connection returns True."""
if not self._npy_available():
self.skipTest("npy data files not found")
from radar_protocol import ReplayConnection
conn = ReplayConnection(self.NPY_DIR)
conn.open()
self.assertTrue(conn.write(b"\x01\x00\x00\x01"))
conn.close()
def test_replay_adjustable_param_cfar_guard(self):
"""Changing CFAR guard via write() triggers re-processing."""
if not self._npy_available():
self.skipTest("npy data files not found")
from radar_protocol import ReplayConnection
conn = ReplayConnection(self.NPY_DIR, use_mti=True)
conn.open()
# Initial: guard=2 → 4 detections
self.assertFalse(conn._needs_rebuild)
# Send CFAR_GUARD=4
cmd = RadarProtocol.build_command(0x21, 4)
conn.write(cmd)
self.assertTrue(conn._needs_rebuild)
self.assertEqual(conn._cfar_guard, 4)
# Read triggers rebuild
conn.read(1024)
self.assertFalse(conn._needs_rebuild)
conn.close()
def test_replay_adjustable_param_mti_toggle(self):
"""Toggling MTI via write() triggers re-processing."""
if not self._npy_available():
self.skipTest("npy data files not found")
from radar_protocol import ReplayConnection
conn = ReplayConnection(self.NPY_DIR, use_mti=True)
conn.open()
# Disable MTI
cmd = RadarProtocol.build_command(0x26, 0)
conn.write(cmd)
self.assertTrue(conn._needs_rebuild)
self.assertFalse(conn._mti_enable)
# Read to trigger rebuild, then count detections
# Drain all packets after rebuild
conn.read(1024) # triggers rebuild
raw = conn._packets
boundaries = RadarProtocol.find_packet_boundaries(raw)
det_count = sum(1 for s, e, t in boundaries
if RadarProtocol.parse_data_packet(raw[s:e]).get("detection", 0))
# No-MTI with default CFAR → 0 detections
self.assertEqual(det_count, 0)
conn.close()
def test_replay_adjustable_param_dc_notch(self):
"""Changing DC notch width via write() triggers re-processing."""
if not self._npy_available():
self.skipTest("npy data files not found")
from radar_protocol import ReplayConnection
conn = ReplayConnection(self.NPY_DIR, use_mti=True)
conn.open()
# Change DC notch to 0 (no notch)
cmd = RadarProtocol.build_command(0x27, 0)
conn.write(cmd)
self.assertTrue(conn._needs_rebuild)
self.assertEqual(conn._dc_notch_width, 0)
conn.read(1024) # triggers rebuild
raw = conn._packets
boundaries = RadarProtocol.find_packet_boundaries(raw)
det_count = sum(1 for s, e, t in boundaries
if RadarProtocol.parse_data_packet(raw[s:e]).get("detection", 0))
# DC notch=0 with MTI → 6 detections (more noise passes through)
self.assertEqual(det_count, 6)
conn.close()
def test_replay_hardware_opcode_ignored(self):
"""Hardware-only opcodes don't trigger rebuild."""
if not self._npy_available():
self.skipTest("npy data files not found")
from radar_protocol import ReplayConnection
conn = ReplayConnection(self.NPY_DIR, use_mti=True)
conn.open()
# Send TRIGGER (hardware-only)
cmd = RadarProtocol.build_command(0x01, 1)
conn.write(cmd)
self.assertFalse(conn._needs_rebuild)
# Send STREAM_CONTROL (hardware-only, opcode 0x04)
cmd = RadarProtocol.build_command(0x04, 7)
conn.write(cmd)
self.assertFalse(conn._needs_rebuild)
conn.close()
def test_replay_same_value_no_rebuild(self):
"""Setting same value as current doesn't trigger rebuild."""
if not self._npy_available():
self.skipTest("npy data files not found")
from radar_protocol import ReplayConnection
conn = ReplayConnection(self.NPY_DIR, use_mti=True)
conn.open()
# CFAR guard already 2
cmd = RadarProtocol.build_command(0x21, 2)
conn.write(cmd)
self.assertFalse(conn._needs_rebuild)
conn.close()
def test_replay_self_test_opcodes_are_hardware_only(self):
"""Self-test opcodes 0x30/0x31 are hardware-only (ignored in replay)."""
if not self._npy_available():
self.skipTest("npy data files not found")
from radar_protocol import ReplayConnection
conn = ReplayConnection(self.NPY_DIR, use_mti=True)
conn.open()
# Send self-test trigger
cmd = RadarProtocol.build_command(0x30, 1)
conn.write(cmd)
self.assertFalse(conn._needs_rebuild)
# Send self-test status request
cmd = RadarProtocol.build_command(0x31, 0)
conn.write(cmd)
self.assertFalse(conn._needs_rebuild)
conn.close()
class TestOpcodeEnum(unittest.TestCase):
"""Verify Opcode enum matches RTL host register map (radar_system_top.v)."""
def test_gain_shift_is_0x16(self):
"""GAIN_SHIFT opcode must be 0x16 (matches radar_system_top.v:928)."""
self.assertEqual(Opcode.GAIN_SHIFT, 0x16)
def test_no_digital_gain_alias(self):
"""DIGITAL_GAIN should NOT exist (use GAIN_SHIFT)."""
self.assertFalse(hasattr(Opcode, 'DIGITAL_GAIN'))
def test_self_test_trigger(self):
"""SELF_TEST_TRIGGER opcode must be 0x30."""
self.assertEqual(Opcode.SELF_TEST_TRIGGER, 0x30)
def test_self_test_status(self):
"""SELF_TEST_STATUS opcode must be 0x31."""
self.assertEqual(Opcode.SELF_TEST_STATUS, 0x31)
def test_self_test_in_hardware_only(self):
"""Self-test opcodes must be in _HARDWARE_ONLY_OPCODES."""
self.assertIn(0x30, _HARDWARE_ONLY_OPCODES)
self.assertIn(0x31, _HARDWARE_ONLY_OPCODES)
def test_0x16_in_hardware_only(self):
"""GAIN_SHIFT 0x16 must be in _HARDWARE_ONLY_OPCODES."""
self.assertIn(0x16, _HARDWARE_ONLY_OPCODES)
def test_stream_control_is_0x04(self):
"""STREAM_CONTROL must be 0x04 (matches radar_system_top.v:906)."""
self.assertEqual(Opcode.STREAM_CONTROL, 0x04)
def test_legacy_aliases_removed(self):
"""Legacy aliases must NOT exist in production Opcode enum."""
for name in ("TRIGGER", "PRF_DIV", "NUM_CHIRPS", "CHIRP_TIMER",
"STREAM_ENABLE", "THRESHOLD"):
self.assertFalse(hasattr(Opcode, name),
f"Legacy alias Opcode.{name} should not exist")
def test_radar_mode_names(self):
"""New canonical names must exist and match FPGA opcodes."""
self.assertEqual(Opcode.RADAR_MODE, 0x01)
self.assertEqual(Opcode.TRIGGER_PULSE, 0x02)
self.assertEqual(Opcode.DETECT_THRESHOLD, 0x03)
self.assertEqual(Opcode.STREAM_CONTROL, 0x04)
def test_stale_opcodes_not_in_hardware_only(self):
"""Old wrong opcode values must not be in _HARDWARE_ONLY_OPCODES."""
self.assertNotIn(0x05, _HARDWARE_ONLY_OPCODES) # was wrong STREAM_ENABLE
self.assertNotIn(0x06, _HARDWARE_ONLY_OPCODES) # was wrong GAIN_SHIFT
def test_all_rtl_opcodes_present(self):
"""Every RTL opcode (from radar_system_top.v) has a matching Opcode enum member."""
expected = {0x01, 0x02, 0x03, 0x04,
0x10, 0x11, 0x12, 0x13, 0x14, 0x15, 0x16,
0x20, 0x21, 0x22, 0x23, 0x24, 0x25, 0x26, 0x27,
0x28, 0x29, 0x2A, 0x2B, 0x2C,
0x30, 0x31, 0xFF}
enum_values = {int(m) for m in Opcode}
for op in expected:
self.assertIn(op, enum_values, f"0x{op:02X} missing from Opcode enum")
class TestStatusResponseDefaults(unittest.TestCase):
"""Verify StatusResponse dataclass has self-test fields."""
def test_default_self_test_fields(self):
sr = StatusResponse()
self.assertEqual(sr.self_test_flags, 0)
self.assertEqual(sr.self_test_detail, 0)
self.assertEqual(sr.self_test_busy, 0)
def test_self_test_fields_set(self):
sr = StatusResponse(self_test_flags=0x1F,
self_test_detail=0xAB,
self_test_busy=1)
self.assertEqual(sr.self_test_flags, 0x1F)
self.assertEqual(sr.self_test_detail, 0xAB)
self.assertEqual(sr.self_test_busy, 1)
class TestAGCOpcodes(unittest.TestCase):
"""Verify AGC opcode enum members match FPGA RTL (0x28-0x2C)."""
def test_agc_enable_opcode(self):
self.assertEqual(Opcode.AGC_ENABLE, 0x28)
def test_agc_target_opcode(self):
self.assertEqual(Opcode.AGC_TARGET, 0x29)
def test_agc_attack_opcode(self):
self.assertEqual(Opcode.AGC_ATTACK, 0x2A)
def test_agc_decay_opcode(self):
self.assertEqual(Opcode.AGC_DECAY, 0x2B)
def test_agc_holdoff_opcode(self):
self.assertEqual(Opcode.AGC_HOLDOFF, 0x2C)
class TestAGCStatusParsing(unittest.TestCase):
"""Verify AGC fields in status_words[4] are parsed correctly."""
def _make_status_packet(self, **kwargs):
"""Delegate to TestRadarProtocol helper."""
helper = TestRadarProtocol()
return helper._make_status_packet(**kwargs)
def test_agc_fields_default_zero(self):
"""With no AGC fields set, all should be 0."""
raw = self._make_status_packet()
sr = RadarProtocol.parse_status_packet(raw)
self.assertEqual(sr.agc_current_gain, 0)
self.assertEqual(sr.agc_peak_magnitude, 0)
self.assertEqual(sr.agc_saturation_count, 0)
self.assertEqual(sr.agc_enable, 0)
def test_agc_fields_nonzero(self):
"""AGC fields round-trip through status packet."""
raw = self._make_status_packet(agc_gain=7, agc_peak=200,
agc_sat=15, agc_enable=1)
sr = RadarProtocol.parse_status_packet(raw)
self.assertEqual(sr.agc_current_gain, 7)
self.assertEqual(sr.agc_peak_magnitude, 200)
self.assertEqual(sr.agc_saturation_count, 15)
self.assertEqual(sr.agc_enable, 1)
def test_agc_max_values(self):
"""AGC fields at max values."""
raw = self._make_status_packet(agc_gain=15, agc_peak=255,
agc_sat=255, agc_enable=1)
sr = RadarProtocol.parse_status_packet(raw)
self.assertEqual(sr.agc_current_gain, 15)
self.assertEqual(sr.agc_peak_magnitude, 255)
self.assertEqual(sr.agc_saturation_count, 255)
self.assertEqual(sr.agc_enable, 1)
def test_agc_and_range_mode_coexist(self):
"""AGC fields and range_mode occupy the same word without conflict."""
raw = self._make_status_packet(agc_gain=5, agc_peak=128,
agc_sat=42, agc_enable=1,
range_mode=2)
sr = RadarProtocol.parse_status_packet(raw)
self.assertEqual(sr.agc_current_gain, 5)
self.assertEqual(sr.agc_peak_magnitude, 128)
self.assertEqual(sr.agc_saturation_count, 42)
self.assertEqual(sr.agc_enable, 1)
self.assertEqual(sr.range_mode, 2)
class TestAGCStatusResponseDefaults(unittest.TestCase):
"""Verify StatusResponse AGC field defaults."""
def test_default_agc_fields(self):
sr = StatusResponse()
self.assertEqual(sr.agc_current_gain, 0)
self.assertEqual(sr.agc_peak_magnitude, 0)
self.assertEqual(sr.agc_saturation_count, 0)
self.assertEqual(sr.agc_enable, 0)
def test_agc_fields_set(self):
sr = StatusResponse(agc_current_gain=7, agc_peak_magnitude=200,
agc_saturation_count=15, agc_enable=1)
self.assertEqual(sr.agc_current_gain, 7)
self.assertEqual(sr.agc_peak_magnitude, 200)
self.assertEqual(sr.agc_saturation_count, 15)
self.assertEqual(sr.agc_enable, 1)
# =============================================================================
# AGC Visualization — ring buffer / data model tests
# =============================================================================
class TestAGCVisualizationHistory(unittest.TestCase):
"""Test the AGC visualization ring buffer logic (no GUI required)."""
def _make_deque(self, maxlen=256):
from collections import deque
return deque(maxlen=maxlen)
def test_ring_buffer_maxlen(self):
"""Ring buffer should evict oldest when full."""
d = self._make_deque(maxlen=4)
for i in range(6):
d.append(i)
self.assertEqual(list(d), [2, 3, 4, 5])
self.assertEqual(len(d), 4)
def test_gain_history_accumulation(self):
"""Gain values accumulate correctly in a deque."""
gain_hist = self._make_deque(maxlen=256)
statuses = [
StatusResponse(agc_current_gain=g)
for g in [0, 3, 7, 15, 8, 2]
]
for st in statuses:
gain_hist.append(st.agc_current_gain)
self.assertEqual(list(gain_hist), [0, 3, 7, 15, 8, 2])
def test_peak_history_accumulation(self):
"""Peak magnitude values accumulate correctly."""
peak_hist = self._make_deque(maxlen=256)
for p in [0, 50, 200, 255, 128]:
peak_hist.append(p)
self.assertEqual(list(peak_hist), [0, 50, 200, 255, 128])
def test_saturation_total_computation(self):
"""Sum of saturation ring buffer gives running total."""
sat_hist = self._make_deque(maxlen=256)
for s in [0, 0, 5, 0, 12, 3]:
sat_hist.append(s)
self.assertEqual(sum(sat_hist), 20)
def test_saturation_color_thresholds(self):
"""Color logic: green=0, yellow=1-10, red>10."""
def sat_color(total):
if total > 10:
return "red"
if total > 0:
return "yellow"
return "green"
self.assertEqual(sat_color(0), "green")
self.assertEqual(sat_color(1), "yellow")
self.assertEqual(sat_color(10), "yellow")
self.assertEqual(sat_color(11), "red")
self.assertEqual(sat_color(255), "red")
def test_ring_buffer_eviction_preserves_latest(self):
"""After overflow, only the most recent values remain."""
d = self._make_deque(maxlen=8)
for i in range(20):
d.append(i)
self.assertEqual(list(d), [12, 13, 14, 15, 16, 17, 18, 19])
def test_empty_history_safe(self):
"""Empty ring buffer should be safe for max/sum."""
d = self._make_deque(maxlen=256)
self.assertEqual(sum(d), 0)
self.assertEqual(len(d), 0)
# max() on empty would raise — test the guard pattern used in viz code
max_sat = max(d) if d else 0
self.assertEqual(max_sat, 0)
def test_agc_mode_string(self):
"""AGC mode display string from enable flag."""
self.assertEqual(
"AUTO" if StatusResponse(agc_enable=1).agc_enable else "MANUAL",
"AUTO")
self.assertEqual(
"AUTO" if StatusResponse(agc_enable=0).agc_enable else "MANUAL",
"MANUAL")
def test_xlim_scroll_logic(self):
"""X-axis scroll: when n >= history_len, xlim should expand."""
history_len = 8
d = self._make_deque(maxlen=history_len)
for i in range(10):
d.append(i)
n = len(d)
# After 10 pushes into maxlen=8, n=8
self.assertEqual(n, history_len)
# xlim should be (0, n) for static or (n-history_len, n) for scrolling
self.assertEqual(max(0, n - history_len), 0)
self.assertEqual(n, 8)
def test_sat_autoscale_ylim(self):
"""Saturation y-axis auto-scale: max(max_sat * 1.5, 5)."""
# No saturation
self.assertEqual(max(0 * 1.5, 5), 5)
# Some saturation
self.assertAlmostEqual(max(10 * 1.5, 5), 15.0)
# High saturation
self.assertAlmostEqual(max(200 * 1.5, 5), 300.0)
if __name__ == "__main__":
unittest.main(verbosity=2)