diff --git a/9_Firmware/9_3_GUI/GUI_V6 b/9_Firmware/9_3_GUI/GUI_V6 new file mode 100644 index 0000000..e1ba0cd --- /dev/null +++ b/9_Firmware/9_3_GUI/GUI_V6 @@ -0,0 +1,616 @@ +import tkinter as tk +from tkinter import ttk, messagebox +import threading +import queue +import time +import struct +import numpy as np +import matplotlib.pyplot as plt +from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg +from matplotlib.figure import Figure +import matplotlib.patches as patches +import logging +from dataclasses import dataclass +from typing import Dict, List, Tuple, Optional +from scipy import signal +from sklearn.cluster import DBSCAN +from filterpy.kalman import KalmanFilter +import crcmod +import math +import webbrowser +import tempfile +import os + +try: + import usb.core + import usb.util + USB_AVAILABLE = True +except ImportError: + USB_AVAILABLE = False + logging.warning("pyusb not available. USB functionality will be disabled.") + +try: + from pyftdi.ftdi import Ftdi + from pyftdi.usbtools import UsbTools + from pyftdi.ftdi import FtdiError + FTDI_AVAILABLE = True +except ImportError: + FTDI_AVAILABLE = False + logging.warning("pyftdi not available. FTDI functionality will be disabled.") + +# Configure logging +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') + +# Dark theme colors (same as before) +DARK_BG = "#2b2b2b" +DARK_FG = "#e0e0e0" +DARK_ACCENT = "#3c3f41" +DARK_HIGHLIGHT = "#4e5254" +DARK_BORDER = "#555555" +DARK_TEXT = "#cccccc" +DARK_BUTTON = "#3c3f41" +DARK_BUTTON_HOVER = "#4e5254" +DARK_TREEVIEW = "#3c3f41" +DARK_TREEVIEW_ALT = "#404040" + +@dataclass +class RadarTarget: + id: int + range: float + velocity: float + azimuth: int + elevation: int + latitude: float = 0.0 + longitude: float = 0.0 + snr: float = 0.0 + timestamp: float = 0.0 + track_id: int = -1 + +@dataclass +class RadarSettings: + system_frequency: float = 10e9 + chirp_duration_1: float = 30e-6 # Long chirp duration + chirp_duration_2: float = 0.5e-6 # Short chirp duration + chirps_per_position: int = 32 + freq_min: float = 10e6 + freq_max: float = 30e6 + prf1: float = 1000 + prf2: float = 2000 + max_distance: float = 50000 + map_size: float = 50000 # Map size in meters + +@dataclass +class GPSData: + latitude: float + longitude: float + altitude: float + pitch: float # Pitch angle in degrees + timestamp: float + +class MapGenerator: + # [MapGenerator class remains the same as before] + pass + +class FT601Interface: + """ + Interface for FT601 USB 3.0 SuperSpeed controller + """ + def __init__(self): + self.ftdi = None + self.is_open = False + self.device = None + self.ep_in = None + self.ep_out = None + + # FT601 specific parameters + self.channel = 0 # Default channel + self.fifo_mode = True + self.buffer_size = 512 # FT601 optimal buffer size + + def list_devices(self): + """List available FT601 devices using pyftdi""" + if not FTDI_AVAILABLE: + logging.warning("FTDI not available - please install pyftdi") + return [] + + try: + devices = [] + # FT601 vendor/product IDs + ft601_vid_pids = [ + (0x0403, 0x6030), # FT601 + (0x0403, 0x6031), # FT601Q + ] + + for vid, pid in ft601_vid_pids: + found_devices = usb.core.find(find_all=True, idVendor=vid, idProduct=pid) + for dev in found_devices: + try: + product = usb.util.get_string(dev, dev.iProduct) if dev.iProduct else "FT601 USB3.0" + serial = usb.util.get_string(dev, dev.iSerialNumber) if dev.iSerialNumber else "Unknown" + + # Create FTDI URL for the device + url = f"ftdi://{vid:04x}:{pid:04x}:{serial}/1" + + devices.append({ + 'description': f"{product} ({serial})", + 'vendor_id': vid, + 'product_id': pid, + 'url': url, + 'device': dev, + 'serial': serial + }) + except Exception as e: + devices.append({ + 'description': f"FT601 USB3.0 (VID:{vid:04X}, PID:{pid:04X})", + 'vendor_id': vid, + 'product_id': pid, + 'url': f"ftdi://{vid:04x}:{pid:04x}/1", + 'device': dev + }) + + return devices + except Exception as e: + logging.error(f"Error listing FT601 devices: {e}") + # Return mock devices for testing + return [ + {'description': 'FT601 USB3.0 Device A', + 'url': 'ftdi://device/1', + 'vendor_id': 0x0403, + 'product_id': 0x6030} + ] + + def open_device(self, device_url): + """Open FT601 device using pyftdi""" + if not FTDI_AVAILABLE: + logging.error("FTDI not available - cannot open device") + return False + + try: + self.ftdi = Ftdi() + + # Open device with FT601 specific configuration + self.ftdi.open_from_url(device_url) + + # Configure for FT601 SuperSpeed mode + # Set to 245 FIFO mode (similar to FT2232 but with 32-bit bus) + self.ftdi.set_bitmode(0xFF, Ftdi.BitMode.SYNCFF) + + # Set high baud rate for USB 3.0 (500MHz / 5 = 100MHz) + self.ftdi.set_frequency(100e6) # 100 MHz clock + + # Configure latency timer for optimal performance + self.ftdi.set_latency_timer(2) # 2ms latency + + # Set transfer size for large packets + self.ftdi.write_data_set_chunksize(self.buffer_size) + + # Purge buffers + self.ftdi.purge_buffers() + + self.is_open = True + logging.info(f"FT601 device opened: {device_url}") + return True + + except Exception as e: + logging.error(f"Error opening FT601 device: {e}") + return False + + def open_device_direct(self, device_info): + """Open FT601 device directly using USB (alternative method)""" + if not USB_AVAILABLE: + logging.error("USB not available - cannot open device") + return False + + try: + self.device = device_info['device'] + + # Detach kernel driver if active + if self.device.is_kernel_driver_active(0): + self.device.detach_kernel_driver(0) + + # Set configuration + self.device.set_configuration() + + # Get FT601 endpoints + cfg = self.device.get_active_configuration() + intf = cfg[(0,0)] + + # FT601 typically has: + # EP1 OUT (host to device) + # EP1 IN (device to host) + # EP2 OUT + # EP2 IN + + # Find bulk endpoints for high-speed transfer + self.ep_out = usb.util.find_descriptor( + intf, + custom_match=lambda e: + usb.util.endpoint_direction(e.bEndpointAddress) == usb.util.ENDPOINT_OUT and + e.bEndpointAddress & 0xF in [1, 2] # EP1 or EP2 + ) + + self.ep_in = usb.util.find_descriptor( + intf, + custom_match=lambda e: + usb.util.endpoint_direction(e.bEndpointAddress) == usb.util.ENDPOINT_IN and + e.bEndpointAddress & 0xF in [1, 2] # EP1 or EP2 + ) + + if self.ep_out is None or self.ep_in is None: + logging.error("Could not find FT601 endpoints") + return False + + self.is_open = True + logging.info(f"FT601 device opened: {device_info['description']}") + return True + + except Exception as e: + logging.error(f"Error opening FT601 device: {e}") + return False + + def read_data(self, bytes_to_read=None): + """Read data from FT601 (32-bit word aligned)""" + if not self.is_open or (self.ftdi is None and self.device is None): + return None + + try: + if self.ftdi: + # Using pyftdi + # FT601 reads are 32-bit aligned + if bytes_to_read is None: + bytes_to_read = self.buffer_size + + # Ensure read size is multiple of 4 bytes + bytes_to_read = ((bytes_to_read + 3) // 4) * 4 + + data = self.ftdi.read_data(bytes_to_read) + if data: + return bytes(data) + return None + + elif self.device and self.ep_in: + # Direct USB access + if bytes_to_read is None: + bytes_to_read = 512 + + # FT601 maximum packet size + max_packet = 512 + + data = bytearray() + while len(data) < bytes_to_read: + chunk_size = min(max_packet, bytes_to_read - len(data)) + try: + chunk = self.ep_in.read(chunk_size, timeout=100) + data.extend(chunk) + except usb.core.USBError as e: + if e.errno == 110: # Timeout + break + raise + + return bytes(data) if data else None + + except Exception as e: + logging.error(f"Error reading from FT601: {e}") + return None + + def write_data(self, data): + """Write data to FT601 (32-bit word aligned)""" + if not self.is_open or (self.ftdi is None and self.device is None): + return False + + try: + if self.ftdi: + # Using pyftdi + # Ensure data length is multiple of 4 for 32-bit alignment + if len(data) % 4 != 0: + padding = 4 - (len(data) % 4) + data += b'\x00' * padding + + self.ftdi.write_data(data) + return True + + elif self.device and self.ep_out: + # Direct USB access + # FT601 supports large transfers + max_packet = 512 + + for i in range(0, len(data), max_packet): + chunk = data[i:i + max_packet] + self.ep_out.write(chunk, timeout=100) + + return True + + except Exception as e: + logging.error(f"Error writing to FT601: {e}") + return False + + def configure_burst_mode(self, enable=True): + """Configure FT601 burst mode for maximum throughput""" + if self.ftdi: + try: + # FT601 specific commands for burst mode + if enable: + # Enable burst mode + self.ftdi.set_bitmode(0xFF, Ftdi.BitMode.SYNCFF) + self.ftdi.write_data_set_chunksize(4096) # Larger chunks for burst + logging.info("FT601 burst mode enabled") + else: + # Disable burst mode + self.ftdi.set_bitmode(0xFF, Ftdi.BitMode.RESET) + logging.info("FT601 burst mode disabled") + return True + except Exception as e: + logging.error(f"Error configuring burst mode: {e}") + return False + return False + + def close(self): + """Close FT601 device""" + if self.ftdi and self.is_open: + try: + self.ftdi.close() + self.is_open = False + logging.info("FT601 device closed") + except Exception as e: + logging.error(f"Error closing FT601 device: {e}") + + if self.device and self.is_open: + try: + usb.util.dispose_resources(self.device) + self.is_open = False + except Exception as e: + logging.error(f"Error closing FT601 device: {e}") + +# [RadarProcessor class remains the same] +class RadarProcessor: + # ... (same as before) + pass + +class USBPacketParser: + # ... (same as before) + pass + +class RadarPacketParser: + # ... (same as before) + pass + +class RadarGUI: + def __init__(self, root): + self.root = root + self.root.title("Advanced Radar System GUI - FT601 USB 3.0") + self.root.geometry("1400x900") + + # Apply dark theme + self.root.configure(bg=DARK_BG) + + # Configure ttk style + self.style = ttk.Style() + self.style.theme_use('clam') + self.configure_dark_theme() + + # Initialize interfaces - Replace FTDI with FT601 + self.stm32_usb_interface = STM32USBInterface() + self.ft601_interface = FT601Interface() # Changed from FTDIInterface + self.radar_processor = RadarProcessor() + self.usb_packet_parser = USBPacketParser() + self.radar_packet_parser = RadarPacketParser() + self.map_generator = MapGenerator() + self.settings = RadarSettings() + + # Data queues + self.radar_data_queue = queue.Queue() + self.gps_data_queue = queue.Queue() + + # Thread control + self.running = False + self.radar_thread = None + self.gps_thread = None + + # Counters + self.received_packets = 0 + self.current_gps = GPSData(latitude=41.9028, longitude=12.4964, altitude=0, pitch=0.0, timestamp=0) + self.corrected_elevations = [] + self.map_file_path = None + self.google_maps_api_key = "YOUR_GOOGLE_MAPS_API_KEY" + + self.create_gui() + self.start_background_threads() + + def configure_dark_theme(self): + # ... (same as before) + pass + + def create_gui(self): + # ... (same as before, update device label) + pass + + def setup_main_tab(self): + """Setup the main radar display tab""" + # Control frame + control_frame = ttk.Frame(self.tab_main) + control_frame.pack(fill='x', padx=10, pady=5) + + # USB Device selection + ttk.Label(control_frame, text="STM32 USB Device:").grid(row=0, column=0, padx=5) + self.stm32_usb_combo = ttk.Combobox(control_frame, state="readonly", width=40) + self.stm32_usb_combo.grid(row=0, column=1, padx=5) + + # FT601 Device selection (replaces FTDI) + ttk.Label(control_frame, text="FT601 USB 3.0 Device:").grid(row=0, column=2, padx=5) + self.ft601_combo = ttk.Combobox(control_frame, state="readonly", width=40) + self.ft601_combo.grid(row=0, column=3, padx=5) + + # Burst mode checkbox (new for FT601) + self.burst_mode_var = tk.BooleanVar(value=True) + ttk.Checkbutton(control_frame, text="Burst Mode", + variable=self.burst_mode_var).grid(row=0, column=4, padx=5) + + ttk.Button(control_frame, text="Refresh Devices", + command=self.refresh_devices).grid(row=0, column=5, padx=5) + + self.start_button = ttk.Button(control_frame, text="Start Radar", + command=self.start_radar) + self.start_button.grid(row=0, column=6, padx=5) + + self.stop_button = ttk.Button(control_frame, text="Stop Radar", + command=self.stop_radar, state="disabled") + self.stop_button.grid(row=0, column=7, padx=5) + + # GPS and Pitch info + self.gps_label = ttk.Label(control_frame, text="GPS: Waiting for data...") + self.gps_label.grid(row=1, column=0, columnspan=4, sticky='w', padx=5, pady=2) + + # Pitch display + self.pitch_label = ttk.Label(control_frame, text="Pitch: --.--°") + self.pitch_label.grid(row=1, column=4, columnspan=2, padx=5, pady=2) + + # Status info with FT601 specific info + self.status_label = ttk.Label(control_frame, + text="Status: Ready - FT601 USB 3.0") + self.status_label.grid(row=1, column=6, columnspan=2, sticky='e', padx=5, pady=2) + + # [Rest of setup_main_tab remains the same] + # ... + + def refresh_devices(self): + """Refresh available USB devices""" + # STM32 USB devices + stm32_devices = self.stm32_usb_interface.list_devices() + stm32_names = [dev['description'] for dev in stm32_devices] + self.stm32_usb_combo['values'] = stm32_names + + # FT601 devices (replaces FTDI) + ft601_devices = self.ft601_interface.list_devices() + ft601_names = [dev['description'] for dev in ft601_devices] + self.ft601_combo['values'] = ft601_names + + if stm32_names: + self.stm32_usb_combo.current(0) + if ft601_names: + self.ft601_combo.current(0) + + def start_radar(self): + """Start radar operation with FT601""" + try: + # Open STM32 USB device + stm32_index = self.stm32_usb_combo.current() + if stm32_index == -1: + messagebox.showerror("Error", "Please select an STM32 USB device") + return + + stm32_devices = self.stm32_usb_interface.list_devices() + if stm32_index >= len(stm32_devices): + messagebox.showerror("Error", "Invalid STM32 device selection") + return + + if not self.stm32_usb_interface.open_device(stm32_devices[stm32_index]): + messagebox.showerror("Error", "Failed to open STM32 USB device") + return + + # Open FT601 device + ft601_index = self.ft601_combo.current() + if ft601_index != -1: + ft601_devices = self.ft601_interface.list_devices() + if ft601_index < len(ft601_devices): + # Try direct USB first, fallback to pyftdi + if not self.ft601_interface.open_device_direct(ft601_devices[ft601_index]): + device_url = ft601_devices[ft601_index]['url'] + if not self.ft601_interface.open_device(device_url): + logging.warning("Failed to open FT601 device, continuing without radar data") + messagebox.showwarning("Warning", "Failed to open FT601 device") + else: + # Configure burst mode if enabled + if self.burst_mode_var.get(): + self.ft601_interface.configure_burst_mode(True) + else: + logging.warning("No FT601 device selected, continuing without radar data") + else: + logging.warning("No FT601 device selected, continuing without radar data") + + # Send start flag to STM32 + if not self.stm32_usb_interface.send_start_flag(): + messagebox.showerror("Error", "Failed to send start flag to STM32") + return + + # Send settings to STM32 + self.apply_settings() + + # Start radar operation + self.running = True + self.start_button.config(state="disabled") + self.stop_button.config(state="normal") + self.status_label.config(text="Status: Radar running - FT601 USB 3.0 active") + + logging.info("Radar system started successfully with FT601 USB 3.0") + + except Exception as e: + messagebox.showerror("Error", f"Failed to start radar: {e}") + logging.error(f"Start radar error: {e}") + + def stop_radar(self): + """Stop radar operation""" + self.running = False + self.start_button.config(state="normal") + self.stop_button.config(state="disabled") + self.status_label.config(text="Status: Radar stopped") + + self.stm32_usb_interface.close() + self.ft601_interface.close() + + logging.info("Radar system stopped") + + def process_radar_data(self): + """Process incoming radar data from FT601""" + buffer = bytearray() + while True: + if self.running and self.ft601_interface.is_open: + try: + # Read from FT601 (supports larger transfers) + data = self.ft601_interface.read_data(4096) + if data: + buffer.extend(data) + + # Process packets (32-bit aligned) + while len(buffer) >= 8: # Minimum packet size + # Try to find valid packet + packet = self.radar_packet_parser.parse_packet(bytes(buffer)) + if packet: + self.process_radar_packet(packet) + # Remove processed packet from buffer + packet_length = self.get_packet_length(packet) + if packet_length > 0: + buffer = buffer[packet_length:] + self.received_packets += 1 + else: + # No valid packet found, shift buffer + if len(buffer) > 4: + buffer = buffer[1:] + else: + break + + except Exception as e: + logging.error(f"Error processing radar data: {e}") + time.sleep(0.1) + else: + time.sleep(0.1) + + def get_packet_length(self, packet): + """Calculate packet length including header and footer""" + # This should match your packet structure + return 64 # Example: 64-byte packets + + # [Rest of the methods remain the same] + # ... + +def main(): + """Main application entry point""" + try: + root = tk.Tk() + app = RadarGUI(root) + root.mainloop() + except Exception as e: + logging.error(f"Application error: {e}") + messagebox.showerror("Fatal Error", f"Application failed to start: {e}") + +if __name__ == "__main__": + main() \ No newline at end of file