fix: enforce strict ruff lint (17 rule sets) across entire repo
- Expand ruff config from E/F to 17 rule sets (B, RUF, SIM, PIE, T20, ARG, ERA, A, BLE, RET, ISC, TCH, UP, C4, PERF) - Fix 907 lint errors across all Python files (GUI, FPGA cosim, schematics scripts, simulations, utilities, tools) - Replace all blind except-Exception with specific exception types - Remove commented-out dead code (ERA001) from cosim/simulation files - Modernize typing: deprecated typing.List/Dict/Tuple to builtins - Fix unused args/loop vars, ambiguous unicode, perf anti-patterns - Delete legacy GUI files V1-V4 - Add V7 test suite, requirements files - All CI jobs pass: ruff (0 errors), py_compile, pytest (92/92), MCU tests (20/20), FPGA regression (25/25)
This commit is contained in:
@@ -1,141 +1,62 @@
|
||||
"""
|
||||
v7.hardware — Hardware interface classes for the PLFM Radar GUI V7.
|
||||
|
||||
Provides two USB hardware interfaces:
|
||||
- FT2232HQInterface (PRIMARY — USB 2.0, VID 0x0403 / PID 0x6010)
|
||||
- STM32USBInterface (USB CDC for commands and GPS)
|
||||
Provides:
|
||||
- FT2232H radar data + command interface via production radar_protocol module
|
||||
- ReplayConnection for offline .npy replay via production radar_protocol module
|
||||
- STM32USBInterface for GPS data only (USB CDC)
|
||||
|
||||
The FT2232H interface uses the production protocol layer (radar_protocol.py)
|
||||
which sends 4-byte {opcode, addr, value_hi, value_lo} register commands and
|
||||
parses 0xAA data / 0xBB status packets from the FPGA. The old magic-packet
|
||||
and 'SET'...'END' binary settings protocol has been removed — it was
|
||||
incompatible with the FPGA register interface.
|
||||
"""
|
||||
|
||||
import struct
|
||||
import sys
|
||||
import os
|
||||
import logging
|
||||
from typing import List, Dict, Optional
|
||||
from typing import ClassVar
|
||||
|
||||
from .models import (
|
||||
USB_AVAILABLE, FTDI_AVAILABLE,
|
||||
RadarSettings,
|
||||
)
|
||||
from .models import USB_AVAILABLE
|
||||
|
||||
if USB_AVAILABLE:
|
||||
import usb.core
|
||||
import usb.util
|
||||
|
||||
if FTDI_AVAILABLE:
|
||||
from pyftdi.ftdi import Ftdi
|
||||
from pyftdi.usbtools import UsbTools
|
||||
# Import production protocol layer — single source of truth for FPGA comms
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
|
||||
from radar_protocol import ( # noqa: F401 — re-exported for v7 package
|
||||
FT2232HConnection,
|
||||
ReplayConnection,
|
||||
RadarProtocol,
|
||||
Opcode,
|
||||
RadarAcquisition,
|
||||
RadarFrame,
|
||||
StatusResponse,
|
||||
DataRecorder,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# FT2232HQ Interface — PRIMARY data path (USB 2.0)
|
||||
# =============================================================================
|
||||
|
||||
class FT2232HQInterface:
|
||||
"""
|
||||
Interface for FT2232HQ (USB 2.0 Hi-Speed) in synchronous FIFO mode.
|
||||
|
||||
This is the **primary** radar data interface.
|
||||
VID/PID: 0x0403 / 0x6010
|
||||
"""
|
||||
|
||||
VID = 0x0403
|
||||
PID = 0x6010
|
||||
|
||||
def __init__(self):
|
||||
self.ftdi: Optional[object] = None
|
||||
self.is_open: bool = False
|
||||
|
||||
# ---- enumeration -------------------------------------------------------
|
||||
|
||||
def list_devices(self) -> List[Dict]:
|
||||
"""List available FT2232H devices using pyftdi."""
|
||||
if not FTDI_AVAILABLE:
|
||||
logger.warning("pyftdi not available — cannot enumerate FT2232H devices")
|
||||
return []
|
||||
|
||||
try:
|
||||
devices = []
|
||||
for device_desc in UsbTools.find_all([(self.VID, self.PID)]):
|
||||
devices.append({
|
||||
"description": f"FT2232H Device {device_desc}",
|
||||
"url": f"ftdi://{device_desc}/1",
|
||||
})
|
||||
return devices
|
||||
except Exception as e:
|
||||
logger.error(f"Error listing FT2232H devices: {e}")
|
||||
return []
|
||||
|
||||
# ---- open / close ------------------------------------------------------
|
||||
|
||||
def open_device(self, device_url: str) -> bool:
|
||||
"""Open FT2232H device in synchronous FIFO mode."""
|
||||
if not FTDI_AVAILABLE:
|
||||
logger.error("pyftdi not available — cannot open device")
|
||||
return False
|
||||
|
||||
try:
|
||||
self.ftdi = Ftdi()
|
||||
self.ftdi.open_from_url(device_url)
|
||||
|
||||
# Synchronous FIFO mode
|
||||
self.ftdi.set_bitmode(0xFF, Ftdi.BitMode.SYNCFF)
|
||||
|
||||
# Low-latency timer (2 ms)
|
||||
self.ftdi.set_latency_timer(2)
|
||||
|
||||
# Purge stale data
|
||||
self.ftdi.purge_buffers()
|
||||
|
||||
self.is_open = True
|
||||
logger.info(f"FT2232H device opened: {device_url}")
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"Error opening FT2232H device: {e}")
|
||||
self.ftdi = None
|
||||
return False
|
||||
|
||||
def close(self):
|
||||
"""Close FT2232H device."""
|
||||
if self.ftdi and self.is_open:
|
||||
try:
|
||||
self.ftdi.close()
|
||||
except Exception as e:
|
||||
logger.error(f"Error closing FT2232H device: {e}")
|
||||
finally:
|
||||
self.is_open = False
|
||||
self.ftdi = None
|
||||
|
||||
# ---- data I/O ----------------------------------------------------------
|
||||
|
||||
def read_data(self, bytes_to_read: int = 4096) -> Optional[bytes]:
|
||||
"""Read data from FT2232H."""
|
||||
if not self.is_open or self.ftdi is None:
|
||||
return None
|
||||
|
||||
try:
|
||||
data = self.ftdi.read_data(bytes_to_read)
|
||||
if data:
|
||||
return bytes(data)
|
||||
return None
|
||||
except Exception as e:
|
||||
logger.error(f"Error reading from FT2232H: {e}")
|
||||
return None
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# STM32 USB CDC Interface — commands & GPS data
|
||||
# STM32 USB CDC Interface — GPS data ONLY
|
||||
# =============================================================================
|
||||
|
||||
class STM32USBInterface:
|
||||
"""
|
||||
Interface for STM32 USB CDC (Virtual COM Port).
|
||||
|
||||
Used to:
|
||||
- Send start flag and radar settings to the MCU
|
||||
- Receive GPS data from the MCU
|
||||
Used ONLY for receiving GPS data from the MCU.
|
||||
|
||||
FPGA register commands are sent via FT2232H (see FT2232HConnection
|
||||
from radar_protocol.py). The old send_start_flag() / send_settings()
|
||||
methods have been removed — they used an incompatible magic-packet
|
||||
protocol that the FPGA does not understand.
|
||||
"""
|
||||
|
||||
STM32_VID_PIDS = [
|
||||
STM32_VID_PIDS: ClassVar[list[tuple[int, int]]] = [
|
||||
(0x0483, 0x5740), # STM32 Virtual COM Port
|
||||
(0x0483, 0x3748), # STM32 Discovery
|
||||
(0x0483, 0x374B),
|
||||
@@ -152,7 +73,7 @@ class STM32USBInterface:
|
||||
|
||||
# ---- enumeration -------------------------------------------------------
|
||||
|
||||
def list_devices(self) -> List[Dict]:
|
||||
def list_devices(self) -> list[dict]:
|
||||
"""List available STM32 USB CDC devices."""
|
||||
if not USB_AVAILABLE:
|
||||
logger.warning("pyusb not available — cannot enumerate STM32 devices")
|
||||
@@ -174,20 +95,20 @@ class STM32USBInterface:
|
||||
"product_id": pid,
|
||||
"device": dev,
|
||||
})
|
||||
except Exception:
|
||||
except (usb.core.USBError, ValueError):
|
||||
devices.append({
|
||||
"description": f"STM32 CDC (VID:{vid:04X}, PID:{pid:04X})",
|
||||
"vendor_id": vid,
|
||||
"product_id": pid,
|
||||
"device": dev,
|
||||
})
|
||||
except Exception as e:
|
||||
except (usb.core.USBError, ValueError) as e:
|
||||
logger.error(f"Error listing STM32 devices: {e}")
|
||||
return devices
|
||||
|
||||
# ---- open / close ------------------------------------------------------
|
||||
|
||||
def open_device(self, device_info: Dict) -> bool:
|
||||
def open_device(self, device_info: dict) -> bool:
|
||||
"""Open STM32 USB CDC device."""
|
||||
if not USB_AVAILABLE:
|
||||
logger.error("pyusb not available — cannot open STM32 device")
|
||||
@@ -225,7 +146,7 @@ class STM32USBInterface:
|
||||
self.is_open = True
|
||||
logger.info(f"STM32 USB device opened: {device_info.get('description', '')}")
|
||||
return True
|
||||
except Exception as e:
|
||||
except (usb.core.USBError, ValueError) as e:
|
||||
logger.error(f"Error opening STM32 device: {e}")
|
||||
return False
|
||||
|
||||
@@ -234,74 +155,22 @@ class STM32USBInterface:
|
||||
if self.device and self.is_open:
|
||||
try:
|
||||
usb.util.dispose_resources(self.device)
|
||||
except Exception as e:
|
||||
except usb.core.USBError as e:
|
||||
logger.error(f"Error closing STM32 device: {e}")
|
||||
self.is_open = False
|
||||
self.device = None
|
||||
self.ep_in = None
|
||||
self.ep_out = None
|
||||
|
||||
# ---- commands ----------------------------------------------------------
|
||||
# ---- GPS data I/O ------------------------------------------------------
|
||||
|
||||
def send_start_flag(self) -> bool:
|
||||
"""Send start flag to STM32 (4-byte magic)."""
|
||||
start_packet = bytes([23, 46, 158, 237])
|
||||
logger.info("Sending start flag to STM32 via USB...")
|
||||
return self._send_data(start_packet)
|
||||
|
||||
def send_settings(self, settings: RadarSettings) -> bool:
|
||||
"""Send radar settings binary packet to STM32."""
|
||||
try:
|
||||
packet = self._create_settings_packet(settings)
|
||||
logger.info("Sending radar settings to STM32 via USB...")
|
||||
return self._send_data(packet)
|
||||
except Exception as e:
|
||||
logger.error(f"Error sending settings via USB: {e}")
|
||||
return False
|
||||
|
||||
# ---- data I/O ----------------------------------------------------------
|
||||
|
||||
def read_data(self, size: int = 64, timeout: int = 1000) -> Optional[bytes]:
|
||||
"""Read data from STM32 via USB CDC."""
|
||||
def read_data(self, size: int = 64, timeout: int = 1000) -> bytes | None:
|
||||
"""Read GPS data from STM32 via USB CDC."""
|
||||
if not self.is_open or self.ep_in is None:
|
||||
return None
|
||||
try:
|
||||
data = self.ep_in.read(size, timeout=timeout)
|
||||
return bytes(data)
|
||||
except Exception:
|
||||
except usb.core.USBError:
|
||||
# Timeout or other USB error
|
||||
return None
|
||||
|
||||
# ---- internal helpers --------------------------------------------------
|
||||
|
||||
def _send_data(self, data: bytes) -> bool:
|
||||
if not self.is_open or self.ep_out is None:
|
||||
return False
|
||||
try:
|
||||
packet_size = 64
|
||||
for i in range(0, len(data), packet_size):
|
||||
chunk = data[i : i + packet_size]
|
||||
if len(chunk) < packet_size:
|
||||
chunk += b"\x00" * (packet_size - len(chunk))
|
||||
self.ep_out.write(chunk)
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"Error sending data via USB: {e}")
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
def _create_settings_packet(settings: RadarSettings) -> bytes:
|
||||
"""Create binary settings packet: 'SET' ... 'END'."""
|
||||
packet = b"SET"
|
||||
packet += struct.pack(">d", settings.system_frequency)
|
||||
packet += struct.pack(">d", settings.chirp_duration_1)
|
||||
packet += struct.pack(">d", settings.chirp_duration_2)
|
||||
packet += struct.pack(">I", settings.chirps_per_position)
|
||||
packet += struct.pack(">d", settings.freq_min)
|
||||
packet += struct.pack(">d", settings.freq_max)
|
||||
packet += struct.pack(">d", settings.prf1)
|
||||
packet += struct.pack(">d", settings.prf2)
|
||||
packet += struct.pack(">d", settings.max_distance)
|
||||
packet += struct.pack(">d", settings.map_size)
|
||||
packet += b"END"
|
||||
return packet
|
||||
|
||||
Reference in New Issue
Block a user