57de32b172
Fixes 25 remaining manual lint errors after auto-fix pass (94 auto-fixed earlier): - GUI_V6.py: noqa on availability imports, bare except, unused vars, F811 redefs - GUI_V6_Demo.py: unused app variable - v7/models.py: noqa F401 on 8 try/except availability-check imports - FPGA cosim: unused header/status/span vars, ambiguous 'l' renamed to 'line', E701 while-on-one-line split, F841 padding vars annotated Also adds v7/ module, GUI_PyQt_Map.py, and GUI_V7_PyQt.py to version control. Expands CI lint job to cover all 21 maintained Python files (was 4). All 58 Python tests pass. Zero ruff errors on all target files.
308 lines
10 KiB
Python
308 lines
10 KiB
Python
"""
|
|
v7.hardware — Hardware interface classes for the PLFM Radar GUI V7.
|
|
|
|
Provides two USB hardware interfaces:
|
|
- FT2232HQInterface (PRIMARY — USB 2.0, VID 0x0403 / PID 0x6010)
|
|
- STM32USBInterface (USB CDC for commands and GPS)
|
|
"""
|
|
|
|
import struct
|
|
import logging
|
|
from typing import List, Dict, Optional
|
|
|
|
from .models import (
|
|
USB_AVAILABLE, FTDI_AVAILABLE,
|
|
RadarSettings,
|
|
)
|
|
|
|
if USB_AVAILABLE:
|
|
import usb.core
|
|
import usb.util
|
|
|
|
if FTDI_AVAILABLE:
|
|
from pyftdi.ftdi import Ftdi
|
|
from pyftdi.usbtools import UsbTools
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# =============================================================================
|
|
# FT2232HQ Interface — PRIMARY data path (USB 2.0)
|
|
# =============================================================================
|
|
|
|
class FT2232HQInterface:
|
|
"""
|
|
Interface for FT2232HQ (USB 2.0 Hi-Speed) in synchronous FIFO mode.
|
|
|
|
This is the **primary** radar data interface.
|
|
VID/PID: 0x0403 / 0x6010
|
|
"""
|
|
|
|
VID = 0x0403
|
|
PID = 0x6010
|
|
|
|
def __init__(self):
|
|
self.ftdi: Optional[object] = None
|
|
self.is_open: bool = False
|
|
|
|
# ---- enumeration -------------------------------------------------------
|
|
|
|
def list_devices(self) -> List[Dict]:
|
|
"""List available FT2232H devices using pyftdi."""
|
|
if not FTDI_AVAILABLE:
|
|
logger.warning("pyftdi not available — cannot enumerate FT2232H devices")
|
|
return []
|
|
|
|
try:
|
|
devices = []
|
|
for device_desc in UsbTools.find_all([(self.VID, self.PID)]):
|
|
devices.append({
|
|
"description": f"FT2232H Device {device_desc}",
|
|
"url": f"ftdi://{device_desc}/1",
|
|
})
|
|
return devices
|
|
except Exception as e:
|
|
logger.error(f"Error listing FT2232H devices: {e}")
|
|
return []
|
|
|
|
# ---- open / close ------------------------------------------------------
|
|
|
|
def open_device(self, device_url: str) -> bool:
|
|
"""Open FT2232H device in synchronous FIFO mode."""
|
|
if not FTDI_AVAILABLE:
|
|
logger.error("pyftdi not available — cannot open device")
|
|
return False
|
|
|
|
try:
|
|
self.ftdi = Ftdi()
|
|
self.ftdi.open_from_url(device_url)
|
|
|
|
# Synchronous FIFO mode
|
|
self.ftdi.set_bitmode(0xFF, Ftdi.BitMode.SYNCFF)
|
|
|
|
# Low-latency timer (2 ms)
|
|
self.ftdi.set_latency_timer(2)
|
|
|
|
# Purge stale data
|
|
self.ftdi.purge_buffers()
|
|
|
|
self.is_open = True
|
|
logger.info(f"FT2232H device opened: {device_url}")
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Error opening FT2232H device: {e}")
|
|
self.ftdi = None
|
|
return False
|
|
|
|
def close(self):
|
|
"""Close FT2232H device."""
|
|
if self.ftdi and self.is_open:
|
|
try:
|
|
self.ftdi.close()
|
|
except Exception as e:
|
|
logger.error(f"Error closing FT2232H device: {e}")
|
|
finally:
|
|
self.is_open = False
|
|
self.ftdi = None
|
|
|
|
# ---- data I/O ----------------------------------------------------------
|
|
|
|
def read_data(self, bytes_to_read: int = 4096) -> Optional[bytes]:
|
|
"""Read data from FT2232H."""
|
|
if not self.is_open or self.ftdi is None:
|
|
return None
|
|
|
|
try:
|
|
data = self.ftdi.read_data(bytes_to_read)
|
|
if data:
|
|
return bytes(data)
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"Error reading from FT2232H: {e}")
|
|
return None
|
|
|
|
|
|
# =============================================================================
|
|
# STM32 USB CDC Interface — commands & GPS data
|
|
# =============================================================================
|
|
|
|
class STM32USBInterface:
|
|
"""
|
|
Interface for STM32 USB CDC (Virtual COM Port).
|
|
|
|
Used to:
|
|
- Send start flag and radar settings to the MCU
|
|
- Receive GPS data from the MCU
|
|
"""
|
|
|
|
STM32_VID_PIDS = [
|
|
(0x0483, 0x5740), # STM32 Virtual COM Port
|
|
(0x0483, 0x3748), # STM32 Discovery
|
|
(0x0483, 0x374B),
|
|
(0x0483, 0x374D),
|
|
(0x0483, 0x374E),
|
|
(0x0483, 0x3752),
|
|
]
|
|
|
|
def __init__(self):
|
|
self.device = None
|
|
self.is_open: bool = False
|
|
self.ep_in = None
|
|
self.ep_out = None
|
|
|
|
# ---- enumeration -------------------------------------------------------
|
|
|
|
def list_devices(self) -> List[Dict]:
|
|
"""List available STM32 USB CDC devices."""
|
|
if not USB_AVAILABLE:
|
|
logger.warning("pyusb not available — cannot enumerate STM32 devices")
|
|
return []
|
|
|
|
devices = []
|
|
try:
|
|
for vid, pid in self.STM32_VID_PIDS:
|
|
found = usb.core.find(find_all=True, idVendor=vid, idProduct=pid)
|
|
for dev in found:
|
|
try:
|
|
product = (usb.util.get_string(dev, dev.iProduct)
|
|
if dev.iProduct else "STM32 CDC")
|
|
serial = (usb.util.get_string(dev, dev.iSerialNumber)
|
|
if dev.iSerialNumber else "Unknown")
|
|
devices.append({
|
|
"description": f"{product} ({serial})",
|
|
"vendor_id": vid,
|
|
"product_id": pid,
|
|
"device": dev,
|
|
})
|
|
except Exception:
|
|
devices.append({
|
|
"description": f"STM32 CDC (VID:{vid:04X}, PID:{pid:04X})",
|
|
"vendor_id": vid,
|
|
"product_id": pid,
|
|
"device": dev,
|
|
})
|
|
except Exception as e:
|
|
logger.error(f"Error listing STM32 devices: {e}")
|
|
return devices
|
|
|
|
# ---- open / close ------------------------------------------------------
|
|
|
|
def open_device(self, device_info: Dict) -> bool:
|
|
"""Open STM32 USB CDC device."""
|
|
if not USB_AVAILABLE:
|
|
logger.error("pyusb not available — cannot open STM32 device")
|
|
return False
|
|
|
|
try:
|
|
self.device = device_info["device"]
|
|
|
|
if self.device.is_kernel_driver_active(0):
|
|
self.device.detach_kernel_driver(0)
|
|
|
|
self.device.set_configuration()
|
|
cfg = self.device.get_active_configuration()
|
|
intf = cfg[(0, 0)]
|
|
|
|
self.ep_out = usb.util.find_descriptor(
|
|
intf,
|
|
custom_match=lambda e: (
|
|
usb.util.endpoint_direction(e.bEndpointAddress)
|
|
== usb.util.ENDPOINT_OUT
|
|
),
|
|
)
|
|
self.ep_in = usb.util.find_descriptor(
|
|
intf,
|
|
custom_match=lambda e: (
|
|
usb.util.endpoint_direction(e.bEndpointAddress)
|
|
== usb.util.ENDPOINT_IN
|
|
),
|
|
)
|
|
|
|
if self.ep_out is None or self.ep_in is None:
|
|
logger.error("Could not find STM32 CDC endpoints")
|
|
return False
|
|
|
|
self.is_open = True
|
|
logger.info(f"STM32 USB device opened: {device_info.get('description', '')}")
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Error opening STM32 device: {e}")
|
|
return False
|
|
|
|
def close(self):
|
|
"""Close STM32 USB device."""
|
|
if self.device and self.is_open:
|
|
try:
|
|
usb.util.dispose_resources(self.device)
|
|
except Exception as e:
|
|
logger.error(f"Error closing STM32 device: {e}")
|
|
self.is_open = False
|
|
self.device = None
|
|
self.ep_in = None
|
|
self.ep_out = None
|
|
|
|
# ---- commands ----------------------------------------------------------
|
|
|
|
def send_start_flag(self) -> bool:
|
|
"""Send start flag to STM32 (4-byte magic)."""
|
|
start_packet = bytes([23, 46, 158, 237])
|
|
logger.info("Sending start flag to STM32 via USB...")
|
|
return self._send_data(start_packet)
|
|
|
|
def send_settings(self, settings: RadarSettings) -> bool:
|
|
"""Send radar settings binary packet to STM32."""
|
|
try:
|
|
packet = self._create_settings_packet(settings)
|
|
logger.info("Sending radar settings to STM32 via USB...")
|
|
return self._send_data(packet)
|
|
except Exception as e:
|
|
logger.error(f"Error sending settings via USB: {e}")
|
|
return False
|
|
|
|
# ---- data I/O ----------------------------------------------------------
|
|
|
|
def read_data(self, size: int = 64, timeout: int = 1000) -> Optional[bytes]:
|
|
"""Read data from STM32 via USB CDC."""
|
|
if not self.is_open or self.ep_in is None:
|
|
return None
|
|
try:
|
|
data = self.ep_in.read(size, timeout=timeout)
|
|
return bytes(data)
|
|
except Exception:
|
|
# Timeout or other USB error
|
|
return None
|
|
|
|
# ---- internal helpers --------------------------------------------------
|
|
|
|
def _send_data(self, data: bytes) -> bool:
|
|
if not self.is_open or self.ep_out is None:
|
|
return False
|
|
try:
|
|
packet_size = 64
|
|
for i in range(0, len(data), packet_size):
|
|
chunk = data[i : i + packet_size]
|
|
if len(chunk) < packet_size:
|
|
chunk += b"\x00" * (packet_size - len(chunk))
|
|
self.ep_out.write(chunk)
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Error sending data via USB: {e}")
|
|
return False
|
|
|
|
@staticmethod
|
|
def _create_settings_packet(settings: RadarSettings) -> bytes:
|
|
"""Create binary settings packet: 'SET' ... 'END'."""
|
|
packet = b"SET"
|
|
packet += struct.pack(">d", settings.system_frequency)
|
|
packet += struct.pack(">d", settings.chirp_duration_1)
|
|
packet += struct.pack(">d", settings.chirp_duration_2)
|
|
packet += struct.pack(">I", settings.chirps_per_position)
|
|
packet += struct.pack(">d", settings.freq_min)
|
|
packet += struct.pack(">d", settings.freq_max)
|
|
packet += struct.pack(">d", settings.prf1)
|
|
packet += struct.pack(">d", settings.prf2)
|
|
packet += struct.pack(">d", settings.max_distance)
|
|
packet += struct.pack(">d", settings.map_size)
|
|
packet += b"END"
|
|
return packet
|