diff --git a/9_Firmware/9_3_GUI/test_radar_dashboard.py b/9_Firmware/9_3_GUI/test_radar_dashboard.py index 00d15cc..790bbef 100644 --- a/9_Firmware/9_3_GUI/test_radar_dashboard.py +++ b/9_Firmware/9_3_GUI/test_radar_dashboard.py @@ -16,10 +16,11 @@ import unittest import numpy as np from radar_protocol import ( - RadarProtocol, FT601Connection, DataRecorder, RadarAcquisition, + RadarProtocol, FT2232HConnection, DataRecorder, RadarAcquisition, RadarFrame, StatusResponse, Opcode, HEADER_BYTE, FOOTER_BYTE, STATUS_HEADER_BYTE, NUM_RANGE_BINS, NUM_DOPPLER_BINS, NUM_CELLS, + DATA_PACKET_SIZE, _HARDWARE_ONLY_OPCODES, _REPLAY_ADJUSTABLE_OPCODES, ) @@ -72,23 +73,13 @@ class TestRadarProtocol(unittest.TestCase): # ---------------------------------------------------------------- def _make_data_packet(self, range_i=100, range_q=200, dop_i=300, dop_q=400, detection=0): - """Build a synthetic 35-byte data packet matching FPGA format.""" + """Build a synthetic 11-byte data packet matching FT2232H format.""" pkt = bytearray() pkt.append(HEADER_BYTE) - - # Range: word 0 = {range_q[15:0], range_i[15:0]} - rword = (((range_q & 0xFFFF) << 16) | (range_i & 0xFFFF)) & 0xFFFFFFFF - pkt += struct.pack(">I", rword) - # Words 1-3: shifted copies (don't matter for parsing) - for shift in [8, 16, 24]: - pkt += struct.pack(">I", ((rword << shift) & 0xFFFFFFFF)) - - # Doppler: word 0 = {dop_i[15:0], dop_q[15:0]} - dword = (((dop_i & 0xFFFF) << 16) | (dop_q & 0xFFFF)) & 0xFFFFFFFF - pkt += struct.pack(">I", dword) - for shift in [8, 16, 24]: - pkt += struct.pack(">I", ((dword << shift) & 0xFFFFFFFF)) - + pkt += struct.pack(">h", range_q & 0xFFFF if range_q >= 0 else range_q) + pkt += struct.pack(">h", range_i & 0xFFFF if range_i >= 0 else range_i) + pkt += struct.pack(">h", dop_i & 0xFFFF if dop_i >= 0 else dop_i) + pkt += struct.pack(">h", dop_q & 0xFFFF if dop_q >= 0 else dop_q) pkt.append(detection & 0x01) pkt.append(FOOTER_BYTE) return bytes(pkt) @@ -265,23 +256,23 @@ class TestRadarProtocol(unittest.TestCase): def test_find_boundaries_truncated(self): """Truncated packet should not be returned.""" data_pkt = self._make_data_packet() - buf = data_pkt[:20] # truncated + buf = data_pkt[:6] # truncated (less than 11-byte packet size) boundaries = RadarProtocol.find_packet_boundaries(buf) self.assertEqual(len(boundaries), 0) -class TestFT601Connection(unittest.TestCase): - """Test mock FT601 connection.""" +class TestFT2232HConnection(unittest.TestCase): + """Test mock FT2232H connection.""" def test_mock_open_close(self): - conn = FT601Connection(mock=True) + conn = FT2232HConnection(mock=True) self.assertTrue(conn.open()) self.assertTrue(conn.is_open) conn.close() self.assertFalse(conn.is_open) def test_mock_read_returns_data(self): - conn = FT601Connection(mock=True) + conn = FT2232HConnection(mock=True) conn.open() data = conn.read(4096) self.assertIsNotNone(data) @@ -290,7 +281,7 @@ class TestFT601Connection(unittest.TestCase): def test_mock_read_contains_valid_packets(self): """Mock data should contain parseable data packets.""" - conn = FT601Connection(mock=True) + conn = FT2232HConnection(mock=True) conn.open() raw = conn.read(4096) packets = RadarProtocol.find_packet_boundaries(raw) @@ -302,18 +293,18 @@ class TestFT601Connection(unittest.TestCase): conn.close() def test_mock_write(self): - conn = FT601Connection(mock=True) + conn = FT2232HConnection(mock=True) conn.open() cmd = RadarProtocol.build_command(0x01, 1) self.assertTrue(conn.write(cmd)) conn.close() def test_read_when_closed(self): - conn = FT601Connection(mock=True) + conn = FT2232HConnection(mock=True) self.assertIsNone(conn.read()) def test_write_when_closed(self): - conn = FT601Connection(mock=True) + conn = FT2232HConnection(mock=True) self.assertFalse(conn.write(b"\x00\x00\x00\x00")) @@ -365,7 +356,7 @@ class TestRadarAcquisition(unittest.TestCase): """Test acquisition thread with mock connection.""" def test_acquisition_produces_frames(self): - conn = FT601Connection(mock=True) + conn = FT2232HConnection(mock=True) conn.open() fq = queue.Queue(maxsize=16) acq = RadarAcquisition(conn, fq) @@ -392,7 +383,7 @@ class TestRadarAcquisition(unittest.TestCase): # If no frame arrived in timeout, that's still OK for a fast CI run def test_acquisition_stop(self): - conn = FT601Connection(mock=True) + conn = FT2232HConnection(mock=True) conn.open() fq = queue.Queue(maxsize=4) acq = RadarAcquisition(conn, fq) @@ -438,25 +429,20 @@ class TestEndToEnd(unittest.TestCase): self.assertEqual(word & 0xFFFF, 42) def test_data_packet_roundtrip(self): - """Build a data packet, parse it, verify values match.""" - # Build packet manually + """Build an 11-byte data packet, parse it, verify values match.""" + ri, rq, di, dq = 1234, -5678, 9012, -3456 + pkt = bytearray() pkt.append(HEADER_BYTE) - - ri, rq, di, dq = 1234, -5678, 9012, -3456 - rword = (((rq & 0xFFFF) << 16) | (ri & 0xFFFF)) & 0xFFFFFFFF - pkt += struct.pack(">I", rword) - for s in [8, 16, 24]: - pkt += struct.pack(">I", (rword << s) & 0xFFFFFFFF) - - dword = (((di & 0xFFFF) << 16) | (dq & 0xFFFF)) & 0xFFFFFFFF - pkt += struct.pack(">I", dword) - for s in [8, 16, 24]: - pkt += struct.pack(">I", (dword << s) & 0xFFFFFFFF) - + pkt += struct.pack(">h", rq) + pkt += struct.pack(">h", ri) + pkt += struct.pack(">h", di) + pkt += struct.pack(">h", dq) pkt.append(1) pkt.append(FOOTER_BYTE) + self.assertEqual(len(pkt), DATA_PACKET_SIZE) + result = RadarProtocol.parse_data_packet(bytes(pkt)) self.assertIsNotNone(result) self.assertEqual(result["range_i"], ri) @@ -497,8 +483,8 @@ class TestReplayConnection(unittest.TestCase): from radar_protocol import ReplayConnection conn = ReplayConnection(self.NPY_DIR, use_mti=True) conn.open() - # Each packet is 35 bytes, total = 2048 * 35 - expected_bytes = NUM_CELLS * 35 + # Each packet is 11 bytes, total = 2048 * 11 + expected_bytes = NUM_CELLS * DATA_PACKET_SIZE self.assertEqual(conn._frame_len, expected_bytes) conn.close() @@ -548,7 +534,7 @@ class TestReplayConnection(unittest.TestCase): from radar_protocol import ReplayConnection conn = ReplayConnection(self.NPY_DIR, use_mti=False) conn.open() - self.assertEqual(conn._frame_len, NUM_CELLS * 35) + self.assertEqual(conn._frame_len, NUM_CELLS * DATA_PACKET_SIZE) # No-MTI with DC notch=2 and default CFAR → 0 detections raw = conn._packets boundaries = RadarProtocol.find_packet_boundaries(raw)