feat(gui): add FT601Connection class, USB interface selection in V65/V7

- Add FT601Connection in radar_protocol.py using ftd3xx library with
  proper setChipConfiguration re-enumeration handling (close, wait 2s,
  re-open) and 4-byte write alignment
- Add USB Interface dropdown to V65 Tk GUI (FT2232H default, FT601 option)
- Add USB Interface combo to V7 PyQt dashboard with Live/File mode toggle
- Fix mock frame_start bit 7 in both FT2232H and FT601 connections
- Use FPGA range data from USB packets instead of recomputing in Python
- Export FT601Connection from v7/hardware.py and v7/__init__.py
- Add 7 FT601Connection tests (91 total in test_GUI_V65_Tk.py)
This commit is contained in:
Jason
2026-04-16 16:19:13 +05:45
parent f393e96d69
commit b22cadb429
6 changed files with 336 additions and 28 deletions
+200 -6
View File
@@ -6,6 +6,7 @@ Pure-logic module for USB packet parsing and command building.
No GUI dependencies — safe to import from tests and headless scripts.
USB Interface: FT2232H USB 2.0 (8-bit, 50T production board) via pyftdi
FT601 USB 3.0 (32-bit, 200T premium board) via ftd3xx
USB Packet Protocol (11-byte):
TX (FPGA→Host):
@@ -22,7 +23,7 @@ import queue
import logging
import contextlib
from dataclasses import dataclass, field
from typing import Any
from typing import Any, ClassVar
from enum import IntEnum
@@ -200,7 +201,9 @@ class RadarProtocol:
range_i = _to_signed16(struct.unpack_from(">H", raw, 3)[0])
doppler_i = _to_signed16(struct.unpack_from(">H", raw, 5)[0])
doppler_q = _to_signed16(struct.unpack_from(">H", raw, 7)[0])
detection = raw[9] & 0x01
det_byte = raw[9]
detection = det_byte & 0x01
frame_start = (det_byte >> 7) & 0x01
return {
"range_i": range_i,
@@ -208,6 +211,7 @@ class RadarProtocol:
"doppler_i": doppler_i,
"doppler_q": doppler_q,
"detection": detection,
"frame_start": frame_start,
}
@staticmethod
@@ -433,7 +437,191 @@ class FT2232HConnection:
pkt += struct.pack(">h", np.clip(range_i, -32768, 32767))
pkt += struct.pack(">h", np.clip(dop_i, -32768, 32767))
pkt += struct.pack(">h", np.clip(dop_q, -32768, 32767))
pkt.append(detection & 0x01)
# Bit 7 = frame_start (sample_counter == 0), bit 0 = detection
det_byte = (detection & 0x01) | (0x80 if idx == 0 else 0x00)
pkt.append(det_byte)
pkt.append(FOOTER_BYTE)
buf += pkt
self._mock_seq_idx = (start_idx + num_packets) % NUM_CELLS
return bytes(buf)
# ============================================================================
# FT601 USB 3.0 Connection (premium board only)
# ============================================================================
# Optional ftd3xx import (FTDI's proprietary driver for FT60x USB 3.0 chips).
# pyftdi does NOT support FT601 — it only handles USB 2.0 chips (FT232H, etc.)
try:
import ftd3xx # type: ignore[import-untyped]
FTD3XX_AVAILABLE = True
_Ftd3xxError: type = ftd3xx.FTD3XXError # type: ignore[attr-defined]
except ImportError:
FTD3XX_AVAILABLE = False
_Ftd3xxError = OSError # fallback for type-checking; never raised
class FT601Connection:
"""
FT601 USB 3.0 SuperSpeed FIFO bridge — premium board only.
The FT601 has a 32-bit data bus and runs at 100 MHz.
VID:PID = 0x0403:0x6030 or 0x6031 (FTDI FT60x).
Requires the ``ftd3xx`` library (``pip install ftd3xx`` on Windows,
or ``libft60x`` on Linux). This is FTDI's proprietary USB 3.0 driver;
``pyftdi`` only supports USB 2.0 and will NOT work with FT601.
Public contract matches FT2232HConnection so callers can swap freely.
"""
VID = 0x0403
PID_LIST: ClassVar[list[int]] = [0x6030, 0x6031]
def __init__(self, mock: bool = True):
self._mock = mock
self._dev = None
self._lock = threading.Lock()
self.is_open = False
# Mock state (reuses same synthetic data pattern)
self._mock_frame_num = 0
self._mock_rng = np.random.RandomState(42)
def open(self, device_index: int = 0) -> bool:
if self._mock:
self.is_open = True
log.info("FT601 mock device opened (no hardware)")
return True
if not FTD3XX_AVAILABLE:
log.error(
"ftd3xx library required for FT601 hardware — "
"install with: pip install ftd3xx"
)
return False
try:
self._dev = ftd3xx.create(device_index, ftd3xx.OPEN_BY_INDEX)
if self._dev is None:
log.error("No FT601 device found at index %d", device_index)
return False
# Verify chip configuration — only reconfigure if needed.
# setChipConfiguration triggers USB re-enumeration, which
# invalidates the device handle and requires a re-open cycle.
cfg = self._dev.getChipConfiguration()
needs_reconfig = (
cfg.FIFOMode != 0 # 245 FIFO mode
or cfg.ChannelConfig != 0 # 1 channel, 32-bit
or cfg.OptionalFeatureSupport != 0
)
if needs_reconfig:
cfg.FIFOMode = 0
cfg.ChannelConfig = 0
cfg.OptionalFeatureSupport = 0
self._dev.setChipConfiguration(cfg)
# Device re-enumerates — close stale handle, wait, re-open
self._dev.close()
self._dev = None
import time
time.sleep(2.0) # wait for USB re-enumeration
self._dev = ftd3xx.create(device_index, ftd3xx.OPEN_BY_INDEX)
if self._dev is None:
log.error("FT601 not found after reconfiguration")
return False
log.info("FT601 reconfigured and re-opened (index %d)", device_index)
self.is_open = True
log.info("FT601 device opened (index %d)", device_index)
return True
except (OSError, _Ftd3xxError) as e:
log.error("FT601 open failed: %s", e)
self._dev = None
return False
def close(self):
if self._dev is not None:
with contextlib.suppress(Exception):
self._dev.close()
self._dev = None
self.is_open = False
def read(self, size: int = 4096) -> bytes | None:
"""Read raw bytes from FT601. Returns None on error/timeout."""
if not self.is_open:
return None
if self._mock:
return self._mock_read(size)
with self._lock:
try:
data = self._dev.readPipe(0x82, size, raw=True)
return bytes(data) if data else None
except (OSError, _Ftd3xxError) as e:
log.error("FT601 read error: %s", e)
return None
def write(self, data: bytes) -> bool:
"""Write raw bytes to FT601. Data must be 4-byte aligned for 32-bit bus."""
if not self.is_open:
return False
if self._mock:
log.info(f"FT601 mock write: {data.hex()}")
return True
# Pad to 4-byte alignment (FT601 32-bit bus requirement).
# NOTE: Radar commands are already 4 bytes, so this should be a no-op.
remainder = len(data) % 4
if remainder:
data = data + b"\x00" * (4 - remainder)
with self._lock:
try:
written = self._dev.writePipe(0x02, data, raw=True)
return written == len(data)
except (OSError, _Ftd3xxError) as e:
log.error("FT601 write error: %s", e)
return False
def _mock_read(self, size: int) -> bytes:
"""Generate synthetic radar packets (same pattern as FT2232H mock)."""
time.sleep(0.05)
self._mock_frame_num += 1
buf = bytearray()
num_packets = min(NUM_CELLS, size // DATA_PACKET_SIZE)
start_idx = getattr(self, "_mock_seq_idx", 0)
for n in range(num_packets):
idx = (start_idx + n) % NUM_CELLS
rbin = idx // NUM_DOPPLER_BINS
dbin = idx % NUM_DOPPLER_BINS
range_i = int(self._mock_rng.normal(0, 100))
range_q = int(self._mock_rng.normal(0, 100))
if abs(rbin - 20) < 3:
range_i += 5000
range_q += 3000
dop_i = int(self._mock_rng.normal(0, 50))
dop_q = int(self._mock_rng.normal(0, 50))
if abs(rbin - 20) < 3 and abs(dbin - 8) < 2:
dop_i += 8000
dop_q += 4000
detection = 1 if (abs(rbin - 20) < 2 and abs(dbin - 8) < 2) else 0
pkt = bytearray()
pkt.append(HEADER_BYTE)
pkt += struct.pack(">h", np.clip(range_q, -32768, 32767))
pkt += struct.pack(">h", np.clip(range_i, -32768, 32767))
pkt += struct.pack(">h", np.clip(dop_i, -32768, 32767))
pkt += struct.pack(">h", np.clip(dop_q, -32768, 32767))
# Bit 7 = frame_start (sample_counter == 0), bit 0 = detection
det_byte = (detection & 0x01) | (0x80 if idx == 0 else 0x00)
pkt.append(det_byte)
pkt.append(FOOTER_BYTE)
buf += pkt
@@ -600,6 +788,12 @@ class RadarAcquisition(threading.Thread):
if sample.get("detection", 0):
self._frame.detections[rbin, dbin] = 1
self._frame.detection_count += 1
# Accumulate FPGA range profile data (matched-filter output)
# Each sample carries the range_i/range_q for this range bin.
# Accumulate magnitude across Doppler bins for the range profile.
ri = int(sample.get("range_i", 0))
rq = int(sample.get("range_q", 0))
self._frame.range_profile[rbin] += abs(ri) + abs(rq)
self._sample_idx += 1
@@ -607,11 +801,11 @@ class RadarAcquisition(threading.Thread):
self._finalize_frame()
def _finalize_frame(self):
"""Complete frame: compute range profile, push to queue, record."""
"""Complete frame: push to queue, record."""
self._frame.timestamp = time.time()
self._frame.frame_number = self._frame_num
# Range profile = sum of magnitude across Doppler bins
self._frame.range_profile = np.sum(self._frame.magnitude, axis=1)
# range_profile is already accumulated from FPGA range_i/range_q
# data in _ingest_sample(). No need to synthesize from doppler magnitude.
# Push to display queue (drop old if backed up)
try: