test: update test_radar_dashboard for FT2232H-only protocol

Align test suite with FT601 removal from radar_protocol.py:
- Replace FT601Connection with FT2232HConnection throughout
- Rewrite _make_data_packet() to build 11-byte packets (was 35-byte)
- Update data packet roundtrip test for 11-byte format
- Fix truncation test threshold (20 -> 6 bytes, since packets are 11)
- Update ReplayConnection frame_len assertions (35 -> 11 per packet)

57 passed, 1 skipped (h5py), 0 failed.
This commit is contained in:
Jason
2026-04-07 21:15:36 +03:00
parent 385a54d971
commit c1d12c4130
+30 -44
View File
@@ -16,10 +16,11 @@ import unittest
import numpy as np
from radar_protocol import (
RadarProtocol, FT601Connection, DataRecorder, RadarAcquisition,
RadarProtocol, FT2232HConnection, DataRecorder, RadarAcquisition,
RadarFrame, StatusResponse, Opcode,
HEADER_BYTE, FOOTER_BYTE, STATUS_HEADER_BYTE,
NUM_RANGE_BINS, NUM_DOPPLER_BINS, NUM_CELLS,
DATA_PACKET_SIZE,
_HARDWARE_ONLY_OPCODES, _REPLAY_ADJUSTABLE_OPCODES,
)
@@ -72,23 +73,13 @@ class TestRadarProtocol(unittest.TestCase):
# ----------------------------------------------------------------
def _make_data_packet(self, range_i=100, range_q=200,
dop_i=300, dop_q=400, detection=0):
"""Build a synthetic 35-byte data packet matching FPGA format."""
"""Build a synthetic 11-byte data packet matching FT2232H format."""
pkt = bytearray()
pkt.append(HEADER_BYTE)
# Range: word 0 = {range_q[15:0], range_i[15:0]}
rword = (((range_q & 0xFFFF) << 16) | (range_i & 0xFFFF)) & 0xFFFFFFFF
pkt += struct.pack(">I", rword)
# Words 1-3: shifted copies (don't matter for parsing)
for shift in [8, 16, 24]:
pkt += struct.pack(">I", ((rword << shift) & 0xFFFFFFFF))
# Doppler: word 0 = {dop_i[15:0], dop_q[15:0]}
dword = (((dop_i & 0xFFFF) << 16) | (dop_q & 0xFFFF)) & 0xFFFFFFFF
pkt += struct.pack(">I", dword)
for shift in [8, 16, 24]:
pkt += struct.pack(">I", ((dword << shift) & 0xFFFFFFFF))
pkt += struct.pack(">h", range_q & 0xFFFF if range_q >= 0 else range_q)
pkt += struct.pack(">h", range_i & 0xFFFF if range_i >= 0 else range_i)
pkt += struct.pack(">h", dop_i & 0xFFFF if dop_i >= 0 else dop_i)
pkt += struct.pack(">h", dop_q & 0xFFFF if dop_q >= 0 else dop_q)
pkt.append(detection & 0x01)
pkt.append(FOOTER_BYTE)
return bytes(pkt)
@@ -265,23 +256,23 @@ class TestRadarProtocol(unittest.TestCase):
def test_find_boundaries_truncated(self):
"""Truncated packet should not be returned."""
data_pkt = self._make_data_packet()
buf = data_pkt[:20] # truncated
buf = data_pkt[:6] # truncated (less than 11-byte packet size)
boundaries = RadarProtocol.find_packet_boundaries(buf)
self.assertEqual(len(boundaries), 0)
class TestFT601Connection(unittest.TestCase):
"""Test mock FT601 connection."""
class TestFT2232HConnection(unittest.TestCase):
"""Test mock FT2232H connection."""
def test_mock_open_close(self):
conn = FT601Connection(mock=True)
conn = FT2232HConnection(mock=True)
self.assertTrue(conn.open())
self.assertTrue(conn.is_open)
conn.close()
self.assertFalse(conn.is_open)
def test_mock_read_returns_data(self):
conn = FT601Connection(mock=True)
conn = FT2232HConnection(mock=True)
conn.open()
data = conn.read(4096)
self.assertIsNotNone(data)
@@ -290,7 +281,7 @@ class TestFT601Connection(unittest.TestCase):
def test_mock_read_contains_valid_packets(self):
"""Mock data should contain parseable data packets."""
conn = FT601Connection(mock=True)
conn = FT2232HConnection(mock=True)
conn.open()
raw = conn.read(4096)
packets = RadarProtocol.find_packet_boundaries(raw)
@@ -302,18 +293,18 @@ class TestFT601Connection(unittest.TestCase):
conn.close()
def test_mock_write(self):
conn = FT601Connection(mock=True)
conn = FT2232HConnection(mock=True)
conn.open()
cmd = RadarProtocol.build_command(0x01, 1)
self.assertTrue(conn.write(cmd))
conn.close()
def test_read_when_closed(self):
conn = FT601Connection(mock=True)
conn = FT2232HConnection(mock=True)
self.assertIsNone(conn.read())
def test_write_when_closed(self):
conn = FT601Connection(mock=True)
conn = FT2232HConnection(mock=True)
self.assertFalse(conn.write(b"\x00\x00\x00\x00"))
@@ -365,7 +356,7 @@ class TestRadarAcquisition(unittest.TestCase):
"""Test acquisition thread with mock connection."""
def test_acquisition_produces_frames(self):
conn = FT601Connection(mock=True)
conn = FT2232HConnection(mock=True)
conn.open()
fq = queue.Queue(maxsize=16)
acq = RadarAcquisition(conn, fq)
@@ -392,7 +383,7 @@ class TestRadarAcquisition(unittest.TestCase):
# If no frame arrived in timeout, that's still OK for a fast CI run
def test_acquisition_stop(self):
conn = FT601Connection(mock=True)
conn = FT2232HConnection(mock=True)
conn.open()
fq = queue.Queue(maxsize=4)
acq = RadarAcquisition(conn, fq)
@@ -438,25 +429,20 @@ class TestEndToEnd(unittest.TestCase):
self.assertEqual(word & 0xFFFF, 42)
def test_data_packet_roundtrip(self):
"""Build a data packet, parse it, verify values match."""
# Build packet manually
"""Build an 11-byte data packet, parse it, verify values match."""
ri, rq, di, dq = 1234, -5678, 9012, -3456
pkt = bytearray()
pkt.append(HEADER_BYTE)
ri, rq, di, dq = 1234, -5678, 9012, -3456
rword = (((rq & 0xFFFF) << 16) | (ri & 0xFFFF)) & 0xFFFFFFFF
pkt += struct.pack(">I", rword)
for s in [8, 16, 24]:
pkt += struct.pack(">I", (rword << s) & 0xFFFFFFFF)
dword = (((di & 0xFFFF) << 16) | (dq & 0xFFFF)) & 0xFFFFFFFF
pkt += struct.pack(">I", dword)
for s in [8, 16, 24]:
pkt += struct.pack(">I", (dword << s) & 0xFFFFFFFF)
pkt += struct.pack(">h", rq)
pkt += struct.pack(">h", ri)
pkt += struct.pack(">h", di)
pkt += struct.pack(">h", dq)
pkt.append(1)
pkt.append(FOOTER_BYTE)
self.assertEqual(len(pkt), DATA_PACKET_SIZE)
result = RadarProtocol.parse_data_packet(bytes(pkt))
self.assertIsNotNone(result)
self.assertEqual(result["range_i"], ri)
@@ -497,8 +483,8 @@ class TestReplayConnection(unittest.TestCase):
from radar_protocol import ReplayConnection
conn = ReplayConnection(self.NPY_DIR, use_mti=True)
conn.open()
# Each packet is 35 bytes, total = 2048 * 35
expected_bytes = NUM_CELLS * 35
# Each packet is 11 bytes, total = 2048 * 11
expected_bytes = NUM_CELLS * DATA_PACKET_SIZE
self.assertEqual(conn._frame_len, expected_bytes)
conn.close()
@@ -548,7 +534,7 @@ class TestReplayConnection(unittest.TestCase):
from radar_protocol import ReplayConnection
conn = ReplayConnection(self.NPY_DIR, use_mti=False)
conn.open()
self.assertEqual(conn._frame_len, NUM_CELLS * 35)
self.assertEqual(conn._frame_len, NUM_CELLS * DATA_PACKET_SIZE)
# No-MTI with DC notch=2 and default CFAR → 0 detections
raw = conn._packets
boundaries = RadarProtocol.find_packet_boundaries(raw)