Files
PLFM_RADAR/9_Firmware/9_3_GUI/v7/hardware.py
T

191 lines
6.7 KiB
Python

"""
v7.hardware — Hardware interface classes for the PLFM Radar GUI V7.
Provides:
- FT2232H radar data + command interface via production radar_protocol module
- ReplayConnection for offline .npy replay via production radar_protocol module
- STM32USBInterface for GPS data only (USB CDC)
The FT2232H interface uses the production protocol layer (radar_protocol.py)
which sends 4-byte {opcode, addr, value_hi, value_lo} register commands and
parses 0xAA data / 0xBB status packets from the FPGA. The old magic-packet
and 'SET'...'END' binary settings protocol has been removed — it was
incompatible with the FPGA register interface.
"""
import importlib.util
import logging
import pathlib
import sys
from typing import ClassVar
from .models import USB_AVAILABLE
if USB_AVAILABLE:
import usb.core
import usb.util
def _load_radar_protocol():
"""Load radar_protocol.py by absolute path without mutating sys.path."""
mod_name = "radar_protocol"
if mod_name in sys.modules:
return sys.modules[mod_name]
proto_path = pathlib.Path(__file__).resolve().parent.parent / "radar_protocol.py"
spec = importlib.util.spec_from_file_location(mod_name, proto_path)
mod = importlib.util.module_from_spec(spec)
sys.modules[mod_name] = mod
spec.loader.exec_module(mod) # type: ignore[union-attr]
return mod
_rp = _load_radar_protocol()
# Re-exported for the v7 package — single source of truth for FPGA comms
FT2232HConnection = _rp.FT2232HConnection
ReplayConnection = _rp.ReplayConnection
RadarProtocol = _rp.RadarProtocol
Opcode = _rp.Opcode
RadarAcquisition = _rp.RadarAcquisition
RadarFrame = _rp.RadarFrame
StatusResponse = _rp.StatusResponse
DataRecorder = _rp.DataRecorder
logger = logging.getLogger(__name__)
# =============================================================================
# STM32 USB CDC Interface — GPS data ONLY
# =============================================================================
class STM32USBInterface:
"""
Interface for STM32 USB CDC (Virtual COM Port).
Used ONLY for receiving GPS data from the MCU.
FPGA register commands are sent via FT2232H (see FT2232HConnection
from radar_protocol.py). The old send_start_flag() / send_settings()
methods have been removed — they used an incompatible magic-packet
protocol that the FPGA does not understand.
"""
STM32_VID_PIDS: ClassVar[list[tuple[int, int]]] = [
(0x0483, 0x5740), # STM32 Virtual COM Port
(0x0483, 0x3748), # STM32 Discovery
(0x0483, 0x374B),
(0x0483, 0x374D),
(0x0483, 0x374E),
(0x0483, 0x3752),
]
def __init__(self):
self.device = None
self.is_open: bool = False
self.ep_in = None
self.ep_out = None
# ---- enumeration -------------------------------------------------------
def list_devices(self) -> list[dict]:
"""List available STM32 USB CDC devices."""
if not USB_AVAILABLE:
logger.warning("pyusb not available — cannot enumerate STM32 devices")
return []
devices = []
try:
for vid, pid in self.STM32_VID_PIDS:
found = usb.core.find(find_all=True, idVendor=vid, idProduct=pid)
for dev in found:
try:
product = (usb.util.get_string(dev, dev.iProduct)
if dev.iProduct else "STM32 CDC")
serial = (usb.util.get_string(dev, dev.iSerialNumber)
if dev.iSerialNumber else "Unknown")
devices.append({
"description": f"{product} ({serial})",
"vendor_id": vid,
"product_id": pid,
"device": dev,
})
except (usb.core.USBError, ValueError):
devices.append({
"description": f"STM32 CDC (VID:{vid:04X}, PID:{pid:04X})",
"vendor_id": vid,
"product_id": pid,
"device": dev,
})
except (usb.core.USBError, ValueError) as e:
logger.error(f"Error listing STM32 devices: {e}")
return devices
# ---- open / close ------------------------------------------------------
def open_device(self, device_info: dict) -> bool:
"""Open STM32 USB CDC device."""
if not USB_AVAILABLE:
logger.error("pyusb not available — cannot open STM32 device")
return False
try:
self.device = device_info["device"]
if self.device.is_kernel_driver_active(0):
self.device.detach_kernel_driver(0)
self.device.set_configuration()
cfg = self.device.get_active_configuration()
intf = cfg[(0, 0)]
self.ep_out = usb.util.find_descriptor(
intf,
custom_match=lambda e: (
usb.util.endpoint_direction(e.bEndpointAddress)
== usb.util.ENDPOINT_OUT
),
)
self.ep_in = usb.util.find_descriptor(
intf,
custom_match=lambda e: (
usb.util.endpoint_direction(e.bEndpointAddress)
== usb.util.ENDPOINT_IN
),
)
if self.ep_out is None or self.ep_in is None:
logger.error("Could not find STM32 CDC endpoints")
return False
self.is_open = True
logger.info(f"STM32 USB device opened: {device_info.get('description', '')}")
return True
except (usb.core.USBError, ValueError) as e:
logger.error(f"Error opening STM32 device: {e}")
return False
def close(self):
"""Close STM32 USB device."""
if self.device and self.is_open:
try:
usb.util.dispose_resources(self.device)
except usb.core.USBError as e:
logger.error(f"Error closing STM32 device: {e}")
self.is_open = False
self.device = None
self.ep_in = None
self.ep_out = None
# ---- GPS data I/O ------------------------------------------------------
def read_data(self, size: int = 64, timeout: int = 1000) -> bytes | None:
"""Read GPS data from STM32 via USB CDC."""
if not self.is_open or self.ep_in is None:
return None
try:
data = self.ep_in.read(size, timeout=timeout)
return bytes(data)
except usb.core.USBError:
# Timeout or other USB error
return None