Files
PLFM_RADAR/9_Firmware/9_3_GUI/GUI_V6
T
2026-03-10 01:24:45 +00:00

616 lines
23 KiB
Plaintext

import tkinter as tk
from tkinter import ttk, messagebox
import threading
import queue
import time
import struct
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
from matplotlib.figure import Figure
import matplotlib.patches as patches
import logging
from dataclasses import dataclass
from typing import Dict, List, Tuple, Optional
from scipy import signal
from sklearn.cluster import DBSCAN
from filterpy.kalman import KalmanFilter
import crcmod
import math
import webbrowser
import tempfile
import os
try:
import usb.core
import usb.util
USB_AVAILABLE = True
except ImportError:
USB_AVAILABLE = False
logging.warning("pyusb not available. USB functionality will be disabled.")
try:
from pyftdi.ftdi import Ftdi
from pyftdi.usbtools import UsbTools
from pyftdi.ftdi import FtdiError
FTDI_AVAILABLE = True
except ImportError:
FTDI_AVAILABLE = False
logging.warning("pyftdi not available. FTDI functionality will be disabled.")
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# Dark theme colors (same as before)
DARK_BG = "#2b2b2b"
DARK_FG = "#e0e0e0"
DARK_ACCENT = "#3c3f41"
DARK_HIGHLIGHT = "#4e5254"
DARK_BORDER = "#555555"
DARK_TEXT = "#cccccc"
DARK_BUTTON = "#3c3f41"
DARK_BUTTON_HOVER = "#4e5254"
DARK_TREEVIEW = "#3c3f41"
DARK_TREEVIEW_ALT = "#404040"
@dataclass
class RadarTarget:
id: int
range: float
velocity: float
azimuth: int
elevation: int
latitude: float = 0.0
longitude: float = 0.0
snr: float = 0.0
timestamp: float = 0.0
track_id: int = -1
@dataclass
class RadarSettings:
system_frequency: float = 10e9
chirp_duration_1: float = 30e-6 # Long chirp duration
chirp_duration_2: float = 0.5e-6 # Short chirp duration
chirps_per_position: int = 32
freq_min: float = 10e6
freq_max: float = 30e6
prf1: float = 1000
prf2: float = 2000
max_distance: float = 50000
map_size: float = 50000 # Map size in meters
@dataclass
class GPSData:
latitude: float
longitude: float
altitude: float
pitch: float # Pitch angle in degrees
timestamp: float
class MapGenerator:
# [MapGenerator class remains the same as before]
pass
class FT601Interface:
"""
Interface for FT601 USB 3.0 SuperSpeed controller
"""
def __init__(self):
self.ftdi = None
self.is_open = False
self.device = None
self.ep_in = None
self.ep_out = None
# FT601 specific parameters
self.channel = 0 # Default channel
self.fifo_mode = True
self.buffer_size = 512 # FT601 optimal buffer size
def list_devices(self):
"""List available FT601 devices using pyftdi"""
if not FTDI_AVAILABLE:
logging.warning("FTDI not available - please install pyftdi")
return []
try:
devices = []
# FT601 vendor/product IDs
ft601_vid_pids = [
(0x0403, 0x6030), # FT601
(0x0403, 0x6031), # FT601Q
]
for vid, pid in ft601_vid_pids:
found_devices = usb.core.find(find_all=True, idVendor=vid, idProduct=pid)
for dev in found_devices:
try:
product = usb.util.get_string(dev, dev.iProduct) if dev.iProduct else "FT601 USB3.0"
serial = usb.util.get_string(dev, dev.iSerialNumber) if dev.iSerialNumber else "Unknown"
# Create FTDI URL for the device
url = f"ftdi://{vid:04x}:{pid:04x}:{serial}/1"
devices.append({
'description': f"{product} ({serial})",
'vendor_id': vid,
'product_id': pid,
'url': url,
'device': dev,
'serial': serial
})
except Exception as e:
devices.append({
'description': f"FT601 USB3.0 (VID:{vid:04X}, PID:{pid:04X})",
'vendor_id': vid,
'product_id': pid,
'url': f"ftdi://{vid:04x}:{pid:04x}/1",
'device': dev
})
return devices
except Exception as e:
logging.error(f"Error listing FT601 devices: {e}")
# Return mock devices for testing
return [
{'description': 'FT601 USB3.0 Device A',
'url': 'ftdi://device/1',
'vendor_id': 0x0403,
'product_id': 0x6030}
]
def open_device(self, device_url):
"""Open FT601 device using pyftdi"""
if not FTDI_AVAILABLE:
logging.error("FTDI not available - cannot open device")
return False
try:
self.ftdi = Ftdi()
# Open device with FT601 specific configuration
self.ftdi.open_from_url(device_url)
# Configure for FT601 SuperSpeed mode
# Set to 245 FIFO mode (similar to FT2232 but with 32-bit bus)
self.ftdi.set_bitmode(0xFF, Ftdi.BitMode.SYNCFF)
# Set high baud rate for USB 3.0 (500MHz / 5 = 100MHz)
self.ftdi.set_frequency(100e6) # 100 MHz clock
# Configure latency timer for optimal performance
self.ftdi.set_latency_timer(2) # 2ms latency
# Set transfer size for large packets
self.ftdi.write_data_set_chunksize(self.buffer_size)
# Purge buffers
self.ftdi.purge_buffers()
self.is_open = True
logging.info(f"FT601 device opened: {device_url}")
return True
except Exception as e:
logging.error(f"Error opening FT601 device: {e}")
return False
def open_device_direct(self, device_info):
"""Open FT601 device directly using USB (alternative method)"""
if not USB_AVAILABLE:
logging.error("USB not available - cannot open device")
return False
try:
self.device = device_info['device']
# Detach kernel driver if active
if self.device.is_kernel_driver_active(0):
self.device.detach_kernel_driver(0)
# Set configuration
self.device.set_configuration()
# Get FT601 endpoints
cfg = self.device.get_active_configuration()
intf = cfg[(0,0)]
# FT601 typically has:
# EP1 OUT (host to device)
# EP1 IN (device to host)
# EP2 OUT
# EP2 IN
# Find bulk endpoints for high-speed transfer
self.ep_out = usb.util.find_descriptor(
intf,
custom_match=lambda e:
usb.util.endpoint_direction(e.bEndpointAddress) == usb.util.ENDPOINT_OUT and
e.bEndpointAddress & 0xF in [1, 2] # EP1 or EP2
)
self.ep_in = usb.util.find_descriptor(
intf,
custom_match=lambda e:
usb.util.endpoint_direction(e.bEndpointAddress) == usb.util.ENDPOINT_IN and
e.bEndpointAddress & 0xF in [1, 2] # EP1 or EP2
)
if self.ep_out is None or self.ep_in is None:
logging.error("Could not find FT601 endpoints")
return False
self.is_open = True
logging.info(f"FT601 device opened: {device_info['description']}")
return True
except Exception as e:
logging.error(f"Error opening FT601 device: {e}")
return False
def read_data(self, bytes_to_read=None):
"""Read data from FT601 (32-bit word aligned)"""
if not self.is_open or (self.ftdi is None and self.device is None):
return None
try:
if self.ftdi:
# Using pyftdi
# FT601 reads are 32-bit aligned
if bytes_to_read is None:
bytes_to_read = self.buffer_size
# Ensure read size is multiple of 4 bytes
bytes_to_read = ((bytes_to_read + 3) // 4) * 4
data = self.ftdi.read_data(bytes_to_read)
if data:
return bytes(data)
return None
elif self.device and self.ep_in:
# Direct USB access
if bytes_to_read is None:
bytes_to_read = 512
# FT601 maximum packet size
max_packet = 512
data = bytearray()
while len(data) < bytes_to_read:
chunk_size = min(max_packet, bytes_to_read - len(data))
try:
chunk = self.ep_in.read(chunk_size, timeout=100)
data.extend(chunk)
except usb.core.USBError as e:
if e.errno == 110: # Timeout
break
raise
return bytes(data) if data else None
except Exception as e:
logging.error(f"Error reading from FT601: {e}")
return None
def write_data(self, data):
"""Write data to FT601 (32-bit word aligned)"""
if not self.is_open or (self.ftdi is None and self.device is None):
return False
try:
if self.ftdi:
# Using pyftdi
# Ensure data length is multiple of 4 for 32-bit alignment
if len(data) % 4 != 0:
padding = 4 - (len(data) % 4)
data += b'\x00' * padding
self.ftdi.write_data(data)
return True
elif self.device and self.ep_out:
# Direct USB access
# FT601 supports large transfers
max_packet = 512
for i in range(0, len(data), max_packet):
chunk = data[i:i + max_packet]
self.ep_out.write(chunk, timeout=100)
return True
except Exception as e:
logging.error(f"Error writing to FT601: {e}")
return False
def configure_burst_mode(self, enable=True):
"""Configure FT601 burst mode for maximum throughput"""
if self.ftdi:
try:
# FT601 specific commands for burst mode
if enable:
# Enable burst mode
self.ftdi.set_bitmode(0xFF, Ftdi.BitMode.SYNCFF)
self.ftdi.write_data_set_chunksize(4096) # Larger chunks for burst
logging.info("FT601 burst mode enabled")
else:
# Disable burst mode
self.ftdi.set_bitmode(0xFF, Ftdi.BitMode.RESET)
logging.info("FT601 burst mode disabled")
return True
except Exception as e:
logging.error(f"Error configuring burst mode: {e}")
return False
return False
def close(self):
"""Close FT601 device"""
if self.ftdi and self.is_open:
try:
self.ftdi.close()
self.is_open = False
logging.info("FT601 device closed")
except Exception as e:
logging.error(f"Error closing FT601 device: {e}")
if self.device and self.is_open:
try:
usb.util.dispose_resources(self.device)
self.is_open = False
except Exception as e:
logging.error(f"Error closing FT601 device: {e}")
# [RadarProcessor class remains the same]
class RadarProcessor:
# ... (same as before)
pass
class USBPacketParser:
# ... (same as before)
pass
class RadarPacketParser:
# ... (same as before)
pass
class RadarGUI:
def __init__(self, root):
self.root = root
self.root.title("Advanced Radar System GUI - FT601 USB 3.0")
self.root.geometry("1400x900")
# Apply dark theme
self.root.configure(bg=DARK_BG)
# Configure ttk style
self.style = ttk.Style()
self.style.theme_use('clam')
self.configure_dark_theme()
# Initialize interfaces - Replace FTDI with FT601
self.stm32_usb_interface = STM32USBInterface()
self.ft601_interface = FT601Interface() # Changed from FTDIInterface
self.radar_processor = RadarProcessor()
self.usb_packet_parser = USBPacketParser()
self.radar_packet_parser = RadarPacketParser()
self.map_generator = MapGenerator()
self.settings = RadarSettings()
# Data queues
self.radar_data_queue = queue.Queue()
self.gps_data_queue = queue.Queue()
# Thread control
self.running = False
self.radar_thread = None
self.gps_thread = None
# Counters
self.received_packets = 0
self.current_gps = GPSData(latitude=41.9028, longitude=12.4964, altitude=0, pitch=0.0, timestamp=0)
self.corrected_elevations = []
self.map_file_path = None
self.google_maps_api_key = "YOUR_GOOGLE_MAPS_API_KEY"
self.create_gui()
self.start_background_threads()
def configure_dark_theme(self):
# ... (same as before)
pass
def create_gui(self):
# ... (same as before, update device label)
pass
def setup_main_tab(self):
"""Setup the main radar display tab"""
# Control frame
control_frame = ttk.Frame(self.tab_main)
control_frame.pack(fill='x', padx=10, pady=5)
# USB Device selection
ttk.Label(control_frame, text="STM32 USB Device:").grid(row=0, column=0, padx=5)
self.stm32_usb_combo = ttk.Combobox(control_frame, state="readonly", width=40)
self.stm32_usb_combo.grid(row=0, column=1, padx=5)
# FT601 Device selection (replaces FTDI)
ttk.Label(control_frame, text="FT601 USB 3.0 Device:").grid(row=0, column=2, padx=5)
self.ft601_combo = ttk.Combobox(control_frame, state="readonly", width=40)
self.ft601_combo.grid(row=0, column=3, padx=5)
# Burst mode checkbox (new for FT601)
self.burst_mode_var = tk.BooleanVar(value=True)
ttk.Checkbutton(control_frame, text="Burst Mode",
variable=self.burst_mode_var).grid(row=0, column=4, padx=5)
ttk.Button(control_frame, text="Refresh Devices",
command=self.refresh_devices).grid(row=0, column=5, padx=5)
self.start_button = ttk.Button(control_frame, text="Start Radar",
command=self.start_radar)
self.start_button.grid(row=0, column=6, padx=5)
self.stop_button = ttk.Button(control_frame, text="Stop Radar",
command=self.stop_radar, state="disabled")
self.stop_button.grid(row=0, column=7, padx=5)
# GPS and Pitch info
self.gps_label = ttk.Label(control_frame, text="GPS: Waiting for data...")
self.gps_label.grid(row=1, column=0, columnspan=4, sticky='w', padx=5, pady=2)
# Pitch display
self.pitch_label = ttk.Label(control_frame, text="Pitch: --.--")
self.pitch_label.grid(row=1, column=4, columnspan=2, padx=5, pady=2)
# Status info with FT601 specific info
self.status_label = ttk.Label(control_frame,
text="Status: Ready - FT601 USB 3.0")
self.status_label.grid(row=1, column=6, columnspan=2, sticky='e', padx=5, pady=2)
# [Rest of setup_main_tab remains the same]
# ...
def refresh_devices(self):
"""Refresh available USB devices"""
# STM32 USB devices
stm32_devices = self.stm32_usb_interface.list_devices()
stm32_names = [dev['description'] for dev in stm32_devices]
self.stm32_usb_combo['values'] = stm32_names
# FT601 devices (replaces FTDI)
ft601_devices = self.ft601_interface.list_devices()
ft601_names = [dev['description'] for dev in ft601_devices]
self.ft601_combo['values'] = ft601_names
if stm32_names:
self.stm32_usb_combo.current(0)
if ft601_names:
self.ft601_combo.current(0)
def start_radar(self):
"""Start radar operation with FT601"""
try:
# Open STM32 USB device
stm32_index = self.stm32_usb_combo.current()
if stm32_index == -1:
messagebox.showerror("Error", "Please select an STM32 USB device")
return
stm32_devices = self.stm32_usb_interface.list_devices()
if stm32_index >= len(stm32_devices):
messagebox.showerror("Error", "Invalid STM32 device selection")
return
if not self.stm32_usb_interface.open_device(stm32_devices[stm32_index]):
messagebox.showerror("Error", "Failed to open STM32 USB device")
return
# Open FT601 device
ft601_index = self.ft601_combo.current()
if ft601_index != -1:
ft601_devices = self.ft601_interface.list_devices()
if ft601_index < len(ft601_devices):
# Try direct USB first, fallback to pyftdi
if not self.ft601_interface.open_device_direct(ft601_devices[ft601_index]):
device_url = ft601_devices[ft601_index]['url']
if not self.ft601_interface.open_device(device_url):
logging.warning("Failed to open FT601 device, continuing without radar data")
messagebox.showwarning("Warning", "Failed to open FT601 device")
else:
# Configure burst mode if enabled
if self.burst_mode_var.get():
self.ft601_interface.configure_burst_mode(True)
else:
logging.warning("No FT601 device selected, continuing without radar data")
else:
logging.warning("No FT601 device selected, continuing without radar data")
# Send start flag to STM32
if not self.stm32_usb_interface.send_start_flag():
messagebox.showerror("Error", "Failed to send start flag to STM32")
return
# Send settings to STM32
self.apply_settings()
# Start radar operation
self.running = True
self.start_button.config(state="disabled")
self.stop_button.config(state="normal")
self.status_label.config(text="Status: Radar running - FT601 USB 3.0 active")
logging.info("Radar system started successfully with FT601 USB 3.0")
except Exception as e:
messagebox.showerror("Error", f"Failed to start radar: {e}")
logging.error(f"Start radar error: {e}")
def stop_radar(self):
"""Stop radar operation"""
self.running = False
self.start_button.config(state="normal")
self.stop_button.config(state="disabled")
self.status_label.config(text="Status: Radar stopped")
self.stm32_usb_interface.close()
self.ft601_interface.close()
logging.info("Radar system stopped")
def process_radar_data(self):
"""Process incoming radar data from FT601"""
buffer = bytearray()
while True:
if self.running and self.ft601_interface.is_open:
try:
# Read from FT601 (supports larger transfers)
data = self.ft601_interface.read_data(4096)
if data:
buffer.extend(data)
# Process packets (32-bit aligned)
while len(buffer) >= 8: # Minimum packet size
# Try to find valid packet
packet = self.radar_packet_parser.parse_packet(bytes(buffer))
if packet:
self.process_radar_packet(packet)
# Remove processed packet from buffer
packet_length = self.get_packet_length(packet)
if packet_length > 0:
buffer = buffer[packet_length:]
self.received_packets += 1
else:
# No valid packet found, shift buffer
if len(buffer) > 4:
buffer = buffer[1:]
else:
break
except Exception as e:
logging.error(f"Error processing radar data: {e}")
time.sleep(0.1)
else:
time.sleep(0.1)
def get_packet_length(self, packet):
"""Calculate packet length including header and footer"""
# This should match your packet structure
return 64 # Example: 64-byte packets
# [Rest of the methods remain the same]
# ...
def main():
"""Main application entry point"""
try:
root = tk.Tk()
app = RadarGUI(root)
root.mainloop()
except Exception as e:
logging.error(f"Application error: {e}")
messagebox.showerror("Fatal Error", f"Application failed to start: {e}")
if __name__ == "__main__":
main()