From 385a54d971fedefdeb5a47bd61d6c057129de2f7 Mon Sep 17 00:00:00 2001 From: Jason <83615043+JJassonn69@users.noreply.github.com> Date: Tue, 7 Apr 2026 21:10:12 +0300 Subject: [PATCH] refactor(host): remove FT601 support from radar_protocol.py Strip all FT601/ftd3xx references from the core protocol module: - Remove FT601Connection class and ftd3xx import block - Remove _build_packets_ft601() 35-byte packet builder - Remove compact: bool parameter from RadarAcquisition - Remove dual-path parsing logic (compact vs FT601) - Rename parse_data_packet_compact -> parse_data_packet - Unify DATA_PACKET_SIZE to single 11-byte constant The 50T production board uses FT2232H exclusively. FT601 remains in out-of-scope legacy files (GUI_V6, etc). --- 9_Firmware/9_3_GUI/radar_protocol.py | 320 +++------------------------ 1 file changed, 27 insertions(+), 293 deletions(-) diff --git a/9_Firmware/9_3_GUI/radar_protocol.py b/9_Firmware/9_3_GUI/radar_protocol.py index 9432929..af7adbb 100644 --- a/9_Firmware/9_3_GUI/radar_protocol.py +++ b/9_Firmware/9_3_GUI/radar_protocol.py @@ -5,21 +5,12 @@ AERIS-10 Radar Protocol Layer Pure-logic module for USB packet parsing and command building. No GUI dependencies — safe to import from tests and headless scripts. -Supports two USB interfaces: - - FT601 USB 3.0 (32-bit, 200T dev board) via ftd3xx - - FT2232H USB 2.0 (8-bit, 50T production board) via pyftdi +USB Interface: FT2232H USB 2.0 (8-bit, 50T production board) via pyftdi -USB Packet Protocol (FT601, 35-byte): - TX (FPGA→Host): - Data packet: [0xAA] [range 4×32b] [doppler 4×32b] [det 1B] [0x55] - Status packet: [0xBB] [status 6×32b] [0x55] - RX (Host→FPGA): - Command word: {opcode[31:24], addr[23:16], value[15:0]} - -USB Packet Protocol (FT2232H, 11-byte compact): +USB Packet Protocol (11-byte): TX (FPGA→Host): Data packet: [0xAA] [range_q 2B] [range_i 2B] [dop_re 2B] [dop_im 2B] [det 1B] [0x55] - Status packet: [0xBB] [status 6×32b] [0x55] (same 26-byte format) + Status packet: [0xBB] [status 6×32b] [0x55] RX (Host→FPGA): Command: 4 bytes received sequentially {opcode, addr, value_hi, value_lo} """ @@ -48,9 +39,8 @@ FOOTER_BYTE = 0x55 STATUS_HEADER_BYTE = 0xBB # Packet sizes -DATA_PACKET_SIZE_FT601 = 35 # FT601: 1 + 16 + 16 + 1 + 1 -DATA_PACKET_SIZE_FT2232H = 11 # FT2232H: 1 + 4 + 2 + 2 + 1 + 1 -STATUS_PACKET_SIZE = 26 # Same for both: 1 + 24 + 1 +DATA_PACKET_SIZE = 11 # 1 + 4 + 2 + 2 + 1 + 1 +STATUS_PACKET_SIZE = 26 # 1 + 24 + 1 NUM_RANGE_BINS = 64 NUM_DOPPLER_BINS = 32 @@ -148,7 +138,7 @@ class RadarProtocol: def build_command(opcode: int, value: int, addr: int = 0) -> bytes: """ Build a 32-bit command word: {opcode[31:24], addr[23:16], value[15:0]}. - Returns 4 bytes, big-endian (MSB first as FT601 expects). + Returns 4 bytes, big-endian (MSB first). """ word = ((opcode & 0xFF) << 24) | ((addr & 0xFF) << 16) | (value & 0xFFFF) return struct.pack(">I", word) @@ -156,70 +146,11 @@ class RadarProtocol: @staticmethod def parse_data_packet(raw: bytes) -> Optional[Dict[str, Any]]: """ - Parse a single data packet from the FPGA byte stream. + Parse an 11-byte data packet from the FT2232H byte stream. Returns dict with keys: 'range_i', 'range_q', 'doppler_i', 'doppler_q', 'detection', or None if invalid. - Packet format (all streams enabled): - [0xAA] [range 4×4B] [doppler 4×4B] [det 1B] [0x55] - = 1 + 16 + 16 + 1 + 1 = 35 bytes - - With byte-enables, the FT601 delivers only valid bytes. - Header/footer/detection use BE=0001 → 1 byte each. - Range/doppler use BE=1111 → 4 bytes each × 4 transfers. - - In practice, the range data word 0 contains the full 32-bit value - {range_q[15:0], range_i[15:0]}. Words 1–3 are shifted copies. - Similarly, doppler word 0 = {doppler_real, doppler_imag}. - """ - if len(raw) < 3: - return None - if raw[0] != HEADER_BYTE: - return None - - result = {} - pos = 1 - - # Range data: 4 × 4 bytes, only word 0 matters - if pos + 16 <= len(raw): - range_word0 = struct.unpack_from(">I", raw, pos)[0] - result["range_i"] = _to_signed16(range_word0 & 0xFFFF) - result["range_q"] = _to_signed16((range_word0 >> 16) & 0xFFFF) - pos += 16 - else: - return None - - # Doppler data: 4 × 4 bytes, only word 0 matters - # Word 0 layout: {doppler_real[31:16], doppler_imag[15:0]} - if pos + 16 <= len(raw): - dop_word0 = struct.unpack_from(">I", raw, pos)[0] - result["doppler_q"] = _to_signed16(dop_word0 & 0xFFFF) - result["doppler_i"] = _to_signed16((dop_word0 >> 16) & 0xFFFF) - pos += 16 - else: - return None - - # Detection: 1 byte - if pos + 1 <= len(raw): - result["detection"] = raw[pos] & 0x01 - pos += 1 - else: - return None - - # Footer - if pos < len(raw) and raw[pos] == FOOTER_BYTE: - pos += 1 - - return result - - @staticmethod - def parse_data_packet_compact(raw: bytes) -> Optional[Dict[str, Any]]: - """ - Parse a compact 11-byte data packet from the FT2232H byte stream. - Returns dict with keys: 'range_i', 'range_q', 'doppler_i', 'doppler_q', - 'detection', or None if invalid. - - Compact packet format (FT2232H, 11 bytes): + Packet format (11 bytes): Byte 0: 0xAA (header) Bytes 1-2: range_q[15:0] MSB first Bytes 3-4: range_i[15:0] MSB first @@ -228,7 +159,7 @@ class RadarProtocol: Byte 9: {7'b0, cfar_detection} Byte 10: 0x55 (footer) """ - if len(raw) < DATA_PACKET_SIZE_FT2232H: + if len(raw) < DATA_PACKET_SIZE: return None if raw[0] != HEADER_BYTE: return None @@ -292,23 +223,16 @@ class RadarProtocol: return sr @staticmethod - def find_packet_boundaries(buf: bytes, - compact: bool = False) -> List[Tuple[int, int, str]]: + def find_packet_boundaries(buf: bytes) -> List[Tuple[int, int, str]]: """ Scan buffer for packet start markers (0xAA data, 0xBB status). Returns list of (start_idx, expected_end_idx, packet_type). - - Args: - buf: Raw byte buffer from USB read. - compact: If True, use 11-byte compact packets (FT2232H). - If False, use 35-byte packets (FT601, default). """ - data_size = DATA_PACKET_SIZE_FT2232H if compact else DATA_PACKET_SIZE_FT601 packets = [] i = 0 while i < len(buf): if buf[i] == HEADER_BYTE: - end = i + data_size + end = i + DATA_PACKET_SIZE if end <= len(buf): packets.append((i, end, "data")) i = end @@ -327,151 +251,6 @@ class RadarProtocol: return packets -# ============================================================================ -# FT601 USB Connection -# ============================================================================ - -# Optional ftd3xx import -try: - import ftd3xx - FTD3XX_AVAILABLE = True -except ImportError: - FTD3XX_AVAILABLE = False - - -class FT601Connection: - """ - FT601 USB 3.0 FIFO bridge communication. - Supports ftd3xx (native D3XX) or mock mode. - """ - - def __init__(self, mock: bool = True): - self._mock = mock - self._device = None - self._lock = threading.Lock() - self.is_open = False - # Mock state - self._mock_frame_num = 0 - self._mock_rng = np.random.RandomState(42) - - def open(self, device_index: int = 0) -> bool: - if self._mock: - self.is_open = True - log.info("FT601 mock device opened (no hardware)") - return True - - if not FTD3XX_AVAILABLE: - log.error("ftd3xx not installed — cannot open real FT601 device") - return False - - try: - self._device = ftd3xx.create(device_index, ftd3xx.CONFIGURATION_CHANNEL_0) - if self._device is None: - log.error("ftd3xx.create returned None") - return False - self.is_open = True - log.info(f"FT601 device {device_index} opened") - return True - except Exception as e: - log.error(f"FT601 open failed: {e}") - return False - - def close(self): - if self._device is not None: - try: - self._device.close() - except Exception: - pass - self._device = None - self.is_open = False - - def read(self, size: int = 4096) -> Optional[bytes]: - """Read raw bytes from FT601. Returns None on error/timeout.""" - if not self.is_open: - return None - - if self._mock: - return self._mock_read(size) - - with self._lock: - try: - buf = self._device.readPipe(0x82, size, raw=True) - return bytes(buf) if buf else None - except Exception as e: - log.error(f"FT601 read error: {e}") - return None - - def write(self, data: bytes) -> bool: - """Write raw bytes to FT601.""" - if not self.is_open: - return False - - if self._mock: - log.info(f"FT601 mock write: {data.hex()}") - return True - - with self._lock: - try: - self._device.writePipe(0x02, data, len(data)) - return True - except Exception as e: - log.error(f"FT601 write error: {e}") - return False - - def _mock_read(self, size: int) -> bytes: - """ - Generate synthetic radar data packets for testing. - Simulates a batch of packets with a target near range bin 20, Doppler bin 8. - """ - time.sleep(0.05) # Simulate USB latency - self._mock_frame_num += 1 - - buf = bytearray() - num_packets = min(32, size // 35) - for _ in range(num_packets): - rbin = self._mock_rng.randint(0, NUM_RANGE_BINS) - dbin = self._mock_rng.randint(0, NUM_DOPPLER_BINS) - - # Simulate range profile with a target at bin ~20 and noise - range_i = int(self._mock_rng.normal(0, 100)) - range_q = int(self._mock_rng.normal(0, 100)) - if abs(rbin - 20) < 3: - range_i += 5000 - range_q += 3000 - - # Simulate Doppler with target at Doppler bin ~8 - dop_i = int(self._mock_rng.normal(0, 50)) - dop_q = int(self._mock_rng.normal(0, 50)) - if abs(rbin - 20) < 3 and abs(dbin - 8) < 2: - dop_i += 8000 - dop_q += 4000 - - detection = 1 if (abs(rbin - 20) < 2 and abs(dbin - 8) < 2) else 0 - - # Build packet - pkt = bytearray() - pkt.append(HEADER_BYTE) - - rword = (((range_q & 0xFFFF) << 16) | (range_i & 0xFFFF)) & 0xFFFFFFFF - pkt += struct.pack(">I", rword) - pkt += struct.pack(">I", ((rword << 8) & 0xFFFFFFFF)) - pkt += struct.pack(">I", ((rword << 16) & 0xFFFFFFFF)) - pkt += struct.pack(">I", ((rword << 24) & 0xFFFFFFFF)) - - dword = (((dop_i & 0xFFFF) << 16) | (dop_q & 0xFFFF)) & 0xFFFFFFFF - pkt += struct.pack(">I", dword) - pkt += struct.pack(">I", ((dword << 8) & 0xFFFFFFFF)) - pkt += struct.pack(">I", ((dword << 16) & 0xFFFFFFFF)) - pkt += struct.pack(">I", ((dword << 24) & 0xFFFFFFFF)) - - pkt.append(detection & 0x01) - pkt.append(FOOTER_BYTE) - - buf += pkt - - return bytes(buf) - - # ============================================================================ # FT2232H USB 2.0 Connection (pyftdi, 245 Synchronous FIFO) # ============================================================================ @@ -576,13 +355,14 @@ class FT2232HConnection: def _mock_read(self, size: int) -> bytes: """ Generate synthetic compact radar data packets (11-byte) for testing. - Same target simulation as FT601 mock but using compact format. + Generate synthetic 11-byte radar data packets for testing. + Simulates a batch of packets with a target near range bin 20, Doppler bin 8. """ - time.sleep(0.05) # Simulate USB latency + time.sleep(0.05) self._mock_frame_num += 1 buf = bytearray() - num_packets = min(32, size // DATA_PACKET_SIZE_FT2232H) + num_packets = min(32, size // DATA_PACKET_SIZE) for _ in range(num_packets): rbin = self._mock_rng.randint(0, NUM_RANGE_BINS) dbin = self._mock_rng.randint(0, NUM_DOPPLER_BINS) @@ -780,11 +560,10 @@ class ReplayConnection: """ def __init__(self, npy_dir: str, use_mti: bool = True, - replay_fps: float = 5.0, compact: bool = False): + replay_fps: float = 5.0): self._npy_dir = npy_dir self._use_mti = use_mti self._replay_fps = max(replay_fps, 0.1) - self._compact = compact # True = FT2232H 11-byte packets self._lock = threading.Lock() self.is_open = False self._packets: bytes = b"" @@ -958,8 +737,7 @@ class ReplayConnection: det = np.zeros((NUM_RANGE_BINS, NUM_DOPPLER_BINS), dtype=bool) det_count = int(det.sum()) - pkt_fmt = "compact" if self._compact else "FT601" - log.info(f"Replay: rebuilt {NUM_CELLS} packets ({pkt_fmt}, " + log.info(f"Replay: rebuilt {NUM_CELLS} packets (" f"MTI={'ON' if self._mti_enable else 'OFF'}, " f"DC_notch={self._dc_notch_width}, " f"CFAR={'ON' if self._cfar_enable else 'OFF'} " @@ -970,14 +748,11 @@ class ReplayConnection: range_i = self._range_i_vec range_q = self._range_q_vec - if self._compact: - return self._build_packets_compact(range_i, range_q, dop_i, dop_q, det) - else: - return self._build_packets_ft601(range_i, range_q, dop_i, dop_q, det) + return self._build_packets_data(range_i, range_q, dop_i, dop_q, det) - def _build_packets_compact(self, range_i, range_q, dop_i, dop_q, det) -> bytes: - """Build compact 11-byte packets for FT2232H interface.""" - buf = bytearray(NUM_CELLS * DATA_PACKET_SIZE_FT2232H) + def _build_packets_data(self, range_i, range_q, dop_i, dop_q, det) -> bytes: + """Build 11-byte data packets for FT2232H interface.""" + buf = bytearray(NUM_CELLS * DATA_PACKET_SIZE) pos = 0 for rbin in range(NUM_RANGE_BINS): ri = int(np.clip(range_i[rbin], -32768, 32767)) @@ -999,40 +774,6 @@ class ReplayConnection: return bytes(buf) - def _build_packets_ft601(self, range_i, range_q, dop_i, dop_q, det) -> bytes: - """Build 35-byte packets for FT601 interface.""" - buf = bytearray(NUM_CELLS * DATA_PACKET_SIZE_FT601) - pos = 0 - for rbin in range(NUM_RANGE_BINS): - ri = int(np.clip(range_i[rbin], -32768, 32767)) & 0xFFFF - rq = int(np.clip(range_q[rbin], -32768, 32767)) & 0xFFFF - rword = ((rq << 16) | ri) & 0xFFFFFFFF - rw0 = struct.pack(">I", rword) - rw1 = struct.pack(">I", (rword << 8) & 0xFFFFFFFF) - rw2 = struct.pack(">I", (rword << 16) & 0xFFFFFFFF) - rw3 = struct.pack(">I", (rword << 24) & 0xFFFFFFFF) - for dbin in range(NUM_DOPPLER_BINS): - di = int(np.clip(dop_i[rbin, dbin], -32768, 32767)) & 0xFFFF - dq = int(np.clip(dop_q[rbin, dbin], -32768, 32767)) & 0xFFFF - d = 1 if det[rbin, dbin] else 0 - - dword = ((di << 16) | dq) & 0xFFFFFFFF - - buf[pos] = HEADER_BYTE - pos += 1 - buf[pos:pos+4] = rw0; pos += 4 - buf[pos:pos+4] = rw1; pos += 4 - buf[pos:pos+4] = rw2; pos += 4 - buf[pos:pos+4] = rw3; pos += 4 - buf[pos:pos+4] = struct.pack(">I", dword); pos += 4 - buf[pos:pos+4] = struct.pack(">I", (dword << 8) & 0xFFFFFFFF); pos += 4 - buf[pos:pos+4] = struct.pack(">I", (dword << 16) & 0xFFFFFFFF); pos += 4 - buf[pos:pos+4] = struct.pack(">I", (dword << 24) & 0xFFFFFFFF); pos += 4 - buf[pos] = d; pos += 1 - buf[pos] = FOOTER_BYTE; pos += 1 - - return bytes(buf) - # ============================================================================ # Data Recorder (HDF5) @@ -1112,20 +853,18 @@ class DataRecorder: class RadarAcquisition(threading.Thread): """ - Background thread: reads from USB (FT601 or FT2232H), parses packets, + Background thread: reads from USB (FT2232H), parses 11-byte packets, assembles frames, and pushes complete frames to the display queue. """ def __init__(self, connection, frame_queue: queue.Queue, recorder: Optional[DataRecorder] = None, - status_callback=None, - compact: bool = False): + status_callback=None): super().__init__(daemon=True) self.conn = connection self.frame_queue = frame_queue self.recorder = recorder self._status_callback = status_callback - self._compact = compact # True for FT2232H 11-byte packets self._stop_event = threading.Event() self._frame = RadarFrame() self._sample_idx = 0 @@ -1135,23 +874,18 @@ class RadarAcquisition(threading.Thread): self._stop_event.set() def run(self): - log.info(f"Acquisition thread started (compact={self._compact})") + log.info("Acquisition thread started") while not self._stop_event.is_set(): raw = self.conn.read(4096) if raw is None or len(raw) == 0: time.sleep(0.01) continue - packets = RadarProtocol.find_packet_boundaries( - raw, compact=self._compact) + packets = RadarProtocol.find_packet_boundaries(raw) for start, end, ptype in packets: if ptype == "data": - if self._compact: - parsed = RadarProtocol.parse_data_packet_compact( - raw[start:end]) - else: - parsed = RadarProtocol.parse_data_packet( - raw[start:end]) + parsed = RadarProtocol.parse_data_packet( + raw[start:end]) if parsed is not None: self._ingest_sample(parsed) elif ptype == "status":