import tkinter as tk from tkinter import ttk, messagebox import threading import queue import time import struct import numpy as np from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg from matplotlib.figure import Figure import logging from dataclasses import dataclass from sklearn.cluster import DBSCAN from filterpy.kalman import KalmanFilter import crcmod import math try: import usb.core import usb.util USB_AVAILABLE = True except ImportError: USB_AVAILABLE = False logging.warning("pyusb not available. USB CDC functionality will be disabled.") try: from pyftdi.ftdi import Ftdi from pyftdi.usbtools import UsbTools FTDI_AVAILABLE = True except ImportError: FTDI_AVAILABLE = False logging.warning("pyftdi not available. FTDI functionality will be disabled.") # Configure logging logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") @dataclass class RadarTarget: id: int range: float velocity: float azimuth: int elevation: int snr: float timestamp: float track_id: int = -1 @dataclass class RadarSettings: system_frequency: float = 10e9 chirp_duration: float = 30e-6 chirps_per_position: int = 32 freq_min: float = 10e6 freq_max: float = 30e6 prf1: float = 1000 prf2: float = 2000 max_distance: float = 50000 @dataclass class GPSData: latitude: float longitude: float altitude: float pitch: float # Pitch angle in degrees timestamp: float class STM32USBInterface: def __init__(self): self.device = None self.is_open = False self.ep_in = None self.ep_out = None def list_devices(self): """List available STM32 USB CDC devices""" if not USB_AVAILABLE: logging.warning("USB not available - please install pyusb") return [] try: devices = [] # STM32 USB CDC devices typically use these vendor/product IDs stm32_vid_pids = [ (0x0483, 0x5740), # STM32 Virtual COM Port (0x0483, 0x3748), # STM32 Discovery (0x0483, 0x374B), # STM32 CDC (0x0483, 0x374D), # STM32 CDC (0x0483, 0x374E), # STM32 CDC (0x0483, 0x3752), # STM32 CDC ] for vid, pid in stm32_vid_pids: found_devices = usb.core.find(find_all=True, idVendor=vid, idProduct=pid) for dev in found_devices: try: product = ( usb.util.get_string(dev, dev.iProduct) if dev.iProduct else "STM32 CDC" ) serial = ( usb.util.get_string(dev, dev.iSerialNumber) if dev.iSerialNumber else "Unknown" ) devices.append( { "description": f"{product} ({serial})", "vendor_id": vid, "product_id": pid, "device": dev, } ) except Exception: devices.append( { "description": f"STM32 CDC (VID:{vid:04X}, PID:{pid:04X})", "vendor_id": vid, "product_id": pid, "device": dev, } ) return devices except Exception as e: logging.error(f"Error listing USB devices: {e}") # Return mock devices for testing return [ {"description": "STM32 Virtual COM Port", "vendor_id": 0x0483, "product_id": 0x5740} ] def open_device(self, device_info): """Open STM32 USB CDC device""" if not USB_AVAILABLE: logging.error("USB not available - cannot open device") return False try: self.device = device_info["device"] # Detach kernel driver if active if self.device.is_kernel_driver_active(0): self.device.detach_kernel_driver(0) # Set configuration self.device.set_configuration() # Get CDC endpoints cfg = self.device.get_active_configuration() intf = cfg[(0, 0)] # Find bulk endpoints (CDC data interface) self.ep_out = usb.util.find_descriptor( intf, custom_match=lambda e: ( usb.util.endpoint_direction(e.bEndpointAddress) == usb.util.ENDPOINT_OUT ), ) self.ep_in = usb.util.find_descriptor( intf, custom_match=lambda e: ( usb.util.endpoint_direction(e.bEndpointAddress) == usb.util.ENDPOINT_IN ), ) if self.ep_out is None or self.ep_in is None: logging.error("Could not find CDC endpoints") return False self.is_open = True logging.info(f"STM32 USB device opened: {device_info['description']}") return True except Exception as e: logging.error(f"Error opening USB device: {e}") return False def send_start_flag(self): """Step 12: Send start flag to STM32 via USB""" start_packet = bytes([23, 46, 158, 237]) logging.info("Sending start flag to STM32 via USB...") return self._send_data(start_packet) def send_settings(self, settings): """Step 13: Send radar settings to STM32 via USB""" try: packet = self._create_settings_packet(settings) logging.info("Sending radar settings to STM32 via USB...") return self._send_data(packet) except Exception as e: logging.error(f"Error sending settings via USB: {e}") return False def read_data(self, size=64, timeout=1000): """Read data from STM32 via USB""" if not self.is_open or self.ep_in is None: return None try: data = self.ep_in.read(size, timeout=timeout) return bytes(data) except usb.core.USBError as e: if e.errno == 110: # Timeout return None logging.error(f"USB read error: {e}") return None except Exception as e: logging.error(f"Error reading from USB: {e}") return None def _send_data(self, data): """Send data to STM32 via USB""" if not self.is_open or self.ep_out is None: return False try: # USB CDC typically uses 64-byte packets packet_size = 64 for i in range(0, len(data), packet_size): chunk = data[i : i + packet_size] # Pad to packet size if needed if len(chunk) < packet_size: chunk += b"\x00" * (packet_size - len(chunk)) self.ep_out.write(chunk) return True except Exception as e: logging.error(f"Error sending data via USB: {e}") return False def _create_settings_packet(self, settings): """Create binary settings packet for USB transmission""" packet = b"SET" packet += struct.pack(">d", settings.system_frequency) packet += struct.pack(">d", settings.chirp_duration) packet += struct.pack(">I", settings.chirps_per_position) packet += struct.pack(">d", settings.freq_min) packet += struct.pack(">d", settings.freq_max) packet += struct.pack(">d", settings.prf1) packet += struct.pack(">d", settings.prf2) packet += struct.pack(">d", settings.max_distance) packet += b"END" return packet def close(self): """Close USB device""" if self.device and self.is_open: try: usb.util.dispose_resources(self.device) self.is_open = False except Exception as e: logging.error(f"Error closing USB device: {e}") class FTDIInterface: def __init__(self): self.ftdi = None self.is_open = False def list_devices(self): """List available FTDI devices using pyftdi""" if not FTDI_AVAILABLE: logging.warning("FTDI not available - please install pyftdi") return [] try: devices = [] # Get list of all FTDI devices for device in UsbTools.find_all([(0x0403, 0x6010)]): # FT2232H vendor/product ID devices.append( {"description": f"FTDI Device {device}", "url": f"ftdi://{device}/1"} ) return devices except Exception as e: logging.error(f"Error listing FTDI devices: {e}") # Return mock devices for testing return [{"description": "FT2232H Device A", "url": "ftdi://device/1"}] def open_device(self, device_url): """Open FTDI device using pyftdi""" if not FTDI_AVAILABLE: logging.error("FTDI not available - cannot open device") return False try: self.ftdi = Ftdi() self.ftdi.open_from_url(device_url) # Configure for synchronous FIFO mode self.ftdi.set_bitmode(0xFF, Ftdi.BitMode.SYNCFF) # Set latency timer self.ftdi.set_latency_timer(2) # Purge buffers self.ftdi.purge_buffers() self.is_open = True logging.info(f"FTDI device opened: {device_url}") return True except Exception as e: logging.error(f"Error opening FTDI device: {e}") return False def read_data(self, bytes_to_read): """Read data from FTDI""" if not self.is_open or self.ftdi is None: return None try: data = self.ftdi.read_data(bytes_to_read) if data: return bytes(data) return None except Exception as e: logging.error(f"Error reading from FTDI: {e}") return None def close(self): """Close FTDI device""" if self.ftdi and self.is_open: self.ftdi.close() self.is_open = False class RadarProcessor: def __init__(self): self.range_doppler_map = np.zeros((1024, 32)) self.detected_targets = [] self.track_id_counter = 0 self.tracks = {} self.frame_count = 0 def dual_cpi_fusion(self, range_profiles_1, range_profiles_2): """Dual-CPI fusion for better detection""" fused_profile = np.mean(range_profiles_1, axis=0) + np.mean(range_profiles_2, axis=0) return fused_profile def multi_prf_unwrap(self, doppler_measurements, prf1, prf2): """Multi-PRF velocity unwrapping""" lambda_wavelength = 3e8 / 10e9 v_max1 = prf1 * lambda_wavelength / 2 v_max2 = prf2 * lambda_wavelength / 2 unwrapped_velocities = [] for doppler in doppler_measurements: v1 = doppler * lambda_wavelength / 2 v2 = doppler * lambda_wavelength / 2 velocity = self._solve_chinese_remainder(v1, v2, v_max1, v_max2) unwrapped_velocities.append(velocity) return unwrapped_velocities def _solve_chinese_remainder(self, v1, v2, max1, max2): for k in range(-5, 6): candidate = v1 + k * max1 if abs(candidate - v2) < max2 / 2: return candidate return v1 def clustering(self, detections, eps=100, min_samples=2): """DBSCAN clustering of detections""" if len(detections) == 0: return [] points = np.array([[d.range, d.velocity] for d in detections]) clustering = DBSCAN(eps=eps, min_samples=min_samples).fit(points) clusters = [] for label in set(clustering.labels_): if label != -1: cluster_points = points[clustering.labels_ == label] clusters.append( { "center": np.mean(cluster_points, axis=0), "points": cluster_points, "size": len(cluster_points), } ) return clusters def association(self, detections, clusters): """Association of detections to tracks""" associated_detections = [] for detection in detections: best_track = None min_distance = float("inf") for track_id, track in self.tracks.items(): distance = np.sqrt( (detection.range - track["state"][0]) ** 2 + (detection.velocity - track["state"][2]) ** 2 ) if distance < min_distance and distance < 500: min_distance = distance best_track = track_id if best_track is not None: detection.track_id = best_track associated_detections.append(detection) else: detection.track_id = self.track_id_counter self.track_id_counter += 1 associated_detections.append(detection) return associated_detections def tracking(self, associated_detections): """Kalman filter tracking""" current_time = time.time() for detection in associated_detections: if detection.track_id not in self.tracks: kf = KalmanFilter(dim_x=4, dim_z=2) kf.x = np.array([detection.range, 0, detection.velocity, 0]) kf.F = np.array([[1, 1, 0, 0], [0, 1, 0, 0], [0, 0, 1, 1], [0, 0, 0, 1]]) kf.H = np.array([[1, 0, 0, 0], [0, 0, 1, 0]]) kf.P *= 1000 kf.R = np.diag([10, 1]) kf.Q = np.eye(4) * 0.1 self.tracks[detection.track_id] = { "filter": kf, "state": kf.x, "last_update": current_time, "hits": 1, } else: track = self.tracks[detection.track_id] track["filter"].predict() track["filter"].update([detection.range, detection.velocity]) track["state"] = track["filter"].x track["last_update"] = current_time track["hits"] += 1 stale_tracks = [ tid for tid, track in self.tracks.items() if current_time - track["last_update"] > 5.0 ] for tid in stale_tracks: del self.tracks[tid] class USBPacketParser: def __init__(self): self.crc16_func = crcmod.mkCrcFun(0x11021, rev=False, initCrc=0xFFFF, xorOut=0x0000) def parse_gps_data(self, data): """Parse GPS data from STM32 USB CDC with pitch angle""" if not data: return None try: # Try text format first: "GPS:lat,lon,alt,pitch\r\n" text_data = data.decode("utf-8", errors="ignore").strip() if text_data.startswith("GPS:"): parts = text_data.split(":")[1].split(",") if len(parts) == 4: # Now expecting 4 values lat = float(parts[0]) lon = float(parts[1]) alt = float(parts[2]) pitch = float(parts[3]) # Pitch angle in degrees return GPSData( latitude=lat, longitude=lon, altitude=alt, pitch=pitch, timestamp=time.time(), ) # Try binary format (30 bytes with pitch) if len(data) >= 30 and data[0:4] == b"GPSB": return self._parse_binary_gps_with_pitch(data) except Exception as e: logging.error(f"Error parsing GPS data: {e}") return None def _parse_binary_gps_with_pitch(self, data): """Parse binary GPS format with pitch angle (30 bytes)""" try: # Binary format: [Header 4][Latitude 8][Longitude 8][Altitude 4][Pitch 4][CRC 2] if len(data) < 30: return None # Verify CRC (simple checksum) crc_received = (data[28] << 8) | data[29] crc_calculated = sum(data[0:28]) & 0xFFFF if crc_received != crc_calculated: logging.warning("GPS CRC mismatch") return None # Parse latitude (double, big-endian) lat_bits = 0 for i in range(8): lat_bits = (lat_bits << 8) | data[4 + i] latitude = struct.unpack(">d", struct.pack(">Q", lat_bits))[0] # Parse longitude (double, big-endian) lon_bits = 0 for i in range(8): lon_bits = (lon_bits << 8) | data[12 + i] longitude = struct.unpack(">d", struct.pack(">Q", lon_bits))[0] # Parse altitude (float, big-endian) alt_bits = 0 for i in range(4): alt_bits = (alt_bits << 8) | data[20 + i] altitude = struct.unpack(">f", struct.pack(">I", alt_bits))[0] # Parse pitch angle (float, big-endian) pitch_bits = 0 for i in range(4): pitch_bits = (pitch_bits << 8) | data[24 + i] pitch = struct.unpack(">f", struct.pack(">I", pitch_bits))[0] return GPSData( latitude=latitude, longitude=longitude, altitude=altitude, pitch=pitch, timestamp=time.time(), ) except Exception as e: logging.error(f"Error parsing binary GPS with pitch: {e}") return None class RadarPacketParser: def __init__(self): self.sync_pattern = b"\xa5\xc3" self.crc16_func = crcmod.mkCrcFun(0x11021, rev=False, initCrc=0xFFFF, xorOut=0x0000) def parse_packet(self, data): if len(data) < 6: return None sync_index = data.find(self.sync_pattern) if sync_index == -1: return None packet = data[sync_index:] if len(packet) < 6: return None _sync = packet[0:2] packet_type = packet[2] length = packet[3] if len(packet) < (4 + length + 2): return None payload = packet[4 : 4 + length] crc_received = struct.unpack("I", payload[0:4])[0] elevation = payload[4] & 0x1F azimuth = payload[5] & 0x3F chirp_counter = payload[6] & 0x1F return { "type": "range", "range": range_value, "elevation": elevation, "azimuth": azimuth, "chirp": chirp_counter, "timestamp": time.time(), } except Exception as e: logging.error(f"Error parsing range packet: {e}") return None def parse_doppler_packet(self, payload): if len(payload) < 12: return None try: doppler_real = struct.unpack(">h", payload[0:2])[0] doppler_imag = struct.unpack(">h", payload[2:4])[0] elevation = payload[4] & 0x1F azimuth = payload[5] & 0x3F chirp_counter = payload[6] & 0x1F return { "type": "doppler", "doppler_real": doppler_real, "doppler_imag": doppler_imag, "elevation": elevation, "azimuth": azimuth, "chirp": chirp_counter, "timestamp": time.time(), } except Exception as e: logging.error(f"Error parsing Doppler packet: {e}") return None def parse_detection_packet(self, payload): if len(payload) < 8: return None try: detection_flag = (payload[0] & 0x01) != 0 elevation = payload[1] & 0x1F azimuth = payload[2] & 0x3F chirp_counter = payload[3] & 0x1F return { "type": "detection", "detected": detection_flag, "elevation": elevation, "azimuth": azimuth, "chirp": chirp_counter, "timestamp": time.time(), } except Exception as e: logging.error(f"Error parsing detection packet: {e}") return None class RadarGUI: def __init__(self, root): self.root = root self.root.title("Advanced Radar System GUI - USB CDC with Pitch Correction") self.root.geometry("1400x900") # Initialize interfaces self.stm32_usb_interface = STM32USBInterface() self.ftdi_interface = FTDIInterface() self.radar_processor = RadarProcessor() self.usb_packet_parser = USBPacketParser() self.radar_packet_parser = RadarPacketParser() self.settings = RadarSettings() # Data queues self.radar_data_queue = queue.Queue() self.gps_data_queue = queue.Queue() # Thread control self.running = False self.radar_thread = None self.gps_thread = None # Counters self.received_packets = 0 self.current_gps = GPSData( latitude=41.9028, longitude=12.4964, altitude=0, pitch=0.0, timestamp=0 ) self.corrected_elevations = [] # Store corrected elevation values self.create_gui() self.start_background_threads() def create_gui(self): """Create the main GUI with tabs""" self.notebook = ttk.Notebook(self.root) self.notebook.pack(fill="both", expand=True, padx=10, pady=10) self.tab_main = ttk.Frame(self.notebook) self.tab_map = ttk.Frame(self.notebook) self.tab_diagnostics = ttk.Frame(self.notebook) self.tab_settings = ttk.Frame(self.notebook) self.notebook.add(self.tab_main, text="Main View") self.notebook.add(self.tab_map, text="Map View") self.notebook.add(self.tab_diagnostics, text="Diagnostics") self.notebook.add(self.tab_settings, text="Settings") self.setup_main_tab() self.setup_map_tab() self.setup_settings_tab() def setup_main_tab(self): """Setup the main radar display tab""" # Control frame control_frame = ttk.Frame(self.tab_main) control_frame.pack(fill="x", padx=10, pady=5) # USB Device selection ttk.Label(control_frame, text="STM32 USB Device:").grid(row=0, column=0, padx=5) self.stm32_usb_combo = ttk.Combobox(control_frame, state="readonly", width=40) self.stm32_usb_combo.grid(row=0, column=1, padx=5) ttk.Label(control_frame, text="FTDI Device:").grid(row=0, column=2, padx=5) self.ftdi_combo = ttk.Combobox(control_frame, state="readonly", width=30) self.ftdi_combo.grid(row=0, column=3, padx=5) ttk.Button(control_frame, text="Refresh Devices", command=self.refresh_devices).grid( row=0, column=4, padx=5 ) self.start_button = ttk.Button(control_frame, text="Start Radar", command=self.start_radar) self.start_button.grid(row=0, column=5, padx=5) self.stop_button = ttk.Button( control_frame, text="Stop Radar", command=self.stop_radar, state="disabled" ) self.stop_button.grid(row=0, column=6, padx=5) # GPS and Pitch info self.gps_label = ttk.Label(control_frame, text="GPS: Waiting for data...") self.gps_label.grid(row=1, column=0, columnspan=4, sticky="w", padx=5, pady=2) # Pitch display self.pitch_label = ttk.Label(control_frame, text="Pitch: --.--°") self.pitch_label.grid(row=1, column=4, columnspan=2, padx=5, pady=2) # Status info self.status_label = ttk.Label(control_frame, text="Status: Ready") self.status_label.grid(row=1, column=6, sticky="e", padx=5, pady=2) # Main display area display_frame = ttk.Frame(self.tab_main) display_frame.pack(fill="both", expand=True, padx=10, pady=5) # Range-Doppler Map fig = Figure(figsize=(10, 6)) self.range_doppler_ax = fig.add_subplot(111) self.range_doppler_plot = self.range_doppler_ax.imshow( np.random.rand(1024, 32), aspect="auto", cmap="hot", extent=[0, 32, 0, 1024] ) self.range_doppler_ax.set_title("Range-Doppler Map (Pitch Corrected)") self.range_doppler_ax.set_xlabel("Doppler Bin") self.range_doppler_ax.set_ylabel("Range Bin") self.canvas = FigureCanvasTkAgg(fig, display_frame) self.canvas.draw() self.canvas.get_tk_widget().pack(side="left", fill="both", expand=True) # Targets list with corrected elevation targets_frame = ttk.LabelFrame(display_frame, text="Detected Targets (Pitch Corrected)") targets_frame.pack(side="right", fill="y", padx=5) self.targets_tree = ttk.Treeview( targets_frame, columns=("ID", "Range", "Velocity", "Azimuth", "Elevation", "Corrected Elev", "SNR"), show="headings", height=20, ) self.targets_tree.heading("ID", text="Track ID") self.targets_tree.heading("Range", text="Range (m)") self.targets_tree.heading("Velocity", text="Velocity (m/s)") self.targets_tree.heading("Azimuth", text="Azimuth") self.targets_tree.heading("Elevation", text="Raw Elev") self.targets_tree.heading("Corrected Elev", text="Corr Elev") self.targets_tree.heading("SNR", text="SNR (dB)") self.targets_tree.column("ID", width=70) self.targets_tree.column("Range", width=90) self.targets_tree.column("Velocity", width=90) self.targets_tree.column("Azimuth", width=70) self.targets_tree.column("Elevation", width=70) self.targets_tree.column("Corrected Elev", width=70) self.targets_tree.column("SNR", width=70) self.targets_tree.pack(fill="both", expand=True, padx=5, pady=5) def setup_map_tab(self): """Setup the map display tab""" self.map_frame = ttk.Frame(self.tab_map) self.map_frame.pack(fill="both", expand=True, padx=10, pady=10) # Map placeholder self.map_label = ttk.Label( self.map_frame, text="Map will be displayed here after GPS data is received", font=("Arial", 12), ) self.map_label.pack(expand=True) def setup_settings_tab(self): """Setup the settings tab""" settings_frame = ttk.Frame(self.tab_settings) settings_frame.pack(fill="both", expand=True, padx=10, pady=10) entries = [ ("System Frequency (Hz):", "system_frequency", 10e9), ("Chirp Duration (s):", "chirp_duration", 30e-6), ("Chirps per Position:", "chirps_per_position", 32), ("Frequency Min (Hz):", "freq_min", 10e6), ("Frequency Max (Hz):", "freq_max", 30e6), ("PRF1 (Hz):", "prf1", 1000), ("PRF2 (Hz):", "prf2", 2000), ("Max Distance (m):", "max_distance", 50000), ] self.settings_vars = {} for i, (label, attr, default) in enumerate(entries): ttk.Label(settings_frame, text=label).grid(row=i, column=0, sticky="w", padx=5, pady=5) var = tk.StringVar(value=str(default)) entry = ttk.Entry(settings_frame, textvariable=var, width=20) entry.grid(row=i, column=1, padx=5, pady=5) self.settings_vars[attr] = var ttk.Button(settings_frame, text="Apply Settings", command=self.apply_settings).grid( row=len(entries), column=0, columnspan=2, pady=10 ) def apply_pitch_correction(self, raw_elevation, pitch_angle): """ Apply pitch correction to elevation angle raw_elevation: measured elevation from radar (degrees) pitch_angle: antenna pitch angle from IMU (degrees) Returns: corrected elevation angle (degrees) """ # Convert to radians for trigonometric functions raw_elev_rad = math.radians(raw_elevation) pitch_rad = math.radians(pitch_angle) # Apply pitch correction: corrected_elev = raw_elev - pitch # This assumes the pitch angle is positive when antenna is tilted up corrected_elev_rad = raw_elev_rad - pitch_rad # Convert back to degrees and ensure it's within valid range corrected_elev_deg = math.degrees(corrected_elev_rad) # Normalize to 0-180 degree range corrected_elev_deg = corrected_elev_deg % 180 if corrected_elev_deg < 0: corrected_elev_deg += 180 return corrected_elev_deg def refresh_devices(self): """Refresh available USB devices""" # STM32 USB devices stm32_devices = self.stm32_usb_interface.list_devices() stm32_names = [dev["description"] for dev in stm32_devices] self.stm32_usb_combo["values"] = stm32_names # FTDI devices ftdi_devices = self.ftdi_interface.list_devices() ftdi_names = [dev["description"] for dev in ftdi_devices] self.ftdi_combo["values"] = ftdi_names if stm32_names: self.stm32_usb_combo.current(0) if ftdi_names: self.ftdi_combo.current(0) def start_radar(self): """Step 11: Start button pressed - Begin radar operation""" try: # Open STM32 USB device stm32_index = self.stm32_usb_combo.current() if stm32_index == -1: messagebox.showerror("Error", "Please select an STM32 USB device") return stm32_devices = self.stm32_usb_interface.list_devices() if stm32_index >= len(stm32_devices): messagebox.showerror("Error", "Invalid STM32 device selection") return if not self.stm32_usb_interface.open_device(stm32_devices[stm32_index]): messagebox.showerror("Error", "Failed to open STM32 USB device") return # Open FTDI device if FTDI_AVAILABLE: ftdi_index = self.ftdi_combo.current() if ftdi_index != -1: ftdi_devices = self.ftdi_interface.list_devices() if ftdi_index < len(ftdi_devices): device_url = ftdi_devices[ftdi_index]["url"] if not self.ftdi_interface.open_device(device_url): logging.warning( "Failed to open FTDI device, continuing without radar data" ) else: logging.warning("No FTDI device selected, continuing without radar data") else: logging.warning("FTDI not available, continuing without radar data") # Step 12: Send start flag to STM32 via USB if not self.stm32_usb_interface.send_start_flag(): messagebox.showerror("Error", "Failed to send start flag to STM32") return # Step 13: Send settings to STM32 via USB self.apply_settings() # Start radar operation self.running = True self.start_button.config(state="disabled") self.stop_button.config(state="normal") self.status_label.config(text="Status: Radar running - Waiting for GPS data...") logging.info("Radar system started successfully via USB CDC") except Exception as e: messagebox.showerror("Error", f"Failed to start radar: {e}") logging.error(f"Start radar error: {e}") def stop_radar(self): """Stop radar operation""" self.running = False self.start_button.config(state="normal") self.stop_button.config(state="disabled") self.status_label.config(text="Status: Radar stopped") self.stm32_usb_interface.close() self.ftdi_interface.close() logging.info("Radar system stopped") def apply_settings(self): """Step 13: Apply and send radar settings via USB""" try: self.settings.system_frequency = float(self.settings_vars["system_frequency"].get()) self.settings.chirp_duration = float(self.settings_vars["chirp_duration"].get()) self.settings.chirps_per_position = int(self.settings_vars["chirps_per_position"].get()) self.settings.freq_min = float(self.settings_vars["freq_min"].get()) self.settings.freq_max = float(self.settings_vars["freq_max"].get()) self.settings.prf1 = float(self.settings_vars["prf1"].get()) self.settings.prf2 = float(self.settings_vars["prf2"].get()) self.settings.max_distance = float(self.settings_vars["max_distance"].get()) if self.stm32_usb_interface.is_open: self.stm32_usb_interface.send_settings(self.settings) messagebox.showinfo("Success", "Settings applied and sent to STM32 via USB") logging.info("Radar settings applied via USB") except ValueError as e: messagebox.showerror("Error", f"Invalid setting value: {e}") def start_background_threads(self): """Start background data processing threads""" self.radar_thread = threading.Thread(target=self.process_radar_data, daemon=True) self.radar_thread.start() self.gps_thread = threading.Thread(target=self.process_gps_data, daemon=True) self.gps_thread.start() self.root.after(100, self.update_gui) def process_radar_data(self): """Step 39: Process incoming radar data from FTDI""" buffer = b"" while True: if self.running and self.ftdi_interface.is_open: try: data = self.ftdi_interface.read_data(4096) if data: buffer += data while len(buffer) >= 6: packet = self.radar_packet_parser.parse_packet(buffer) if packet: self.process_radar_packet(packet) packet_length = 4 + len(packet.get("payload", b"")) + 2 buffer = buffer[packet_length:] self.received_packets += 1 else: break except Exception as e: logging.error(f"Error processing radar data: {e}") time.sleep(0.1) else: time.sleep(0.1) def process_gps_data(self): """Step 16/17: Process GPS data from STM32 via USB CDC""" while True: if self.running and self.stm32_usb_interface.is_open: try: # Read data from STM32 USB data = self.stm32_usb_interface.read_data(64, timeout=100) if data: gps_data = self.usb_packet_parser.parse_gps_data(data) if gps_data: self.gps_data_queue.put(gps_data) logging.info( "GPS Data received via USB: " f"Lat {gps_data.latitude:.6f}, " f"Lon {gps_data.longitude:.6f}, " f"Alt {gps_data.altitude:.1f}m, " f"Pitch {gps_data.pitch:.1f}°" ) except Exception as e: logging.error(f"Error processing GPS data via USB: {e}") time.sleep(0.1) def process_radar_packet(self, packet): """Step 40: Process radar data and apply pitch correction""" try: if packet["type"] == "range": range_meters = packet["range"] * 0.1 # Apply pitch correction to elevation raw_elevation = packet["elevation"] corrected_elevation = self.apply_pitch_correction( raw_elevation, self.current_gps.pitch ) # Store correction for display self.corrected_elevations.append( { "raw": raw_elevation, "corrected": corrected_elevation, "pitch": self.current_gps.pitch, "timestamp": packet["timestamp"], } ) # Keep only recent corrections if len(self.corrected_elevations) > 100: self.corrected_elevations = self.corrected_elevations[-100:] target = RadarTarget( id=packet["chirp"], range=range_meters, velocity=0, azimuth=packet["azimuth"], elevation=corrected_elevation, # Use corrected elevation snr=20.0, timestamp=packet["timestamp"], ) self.update_range_doppler_map(target) elif packet["type"] == "doppler": lambda_wavelength = 3e8 / self.settings.system_frequency velocity = (packet["doppler_real"] / 32767.0) * ( self.settings.prf1 * lambda_wavelength / 2 ) self.update_target_velocity(packet, velocity) elif packet["type"] == "detection": if packet["detected"]: # Apply pitch correction to detection elevation raw_elevation = packet["elevation"] corrected_elevation = self.apply_pitch_correction( raw_elevation, self.current_gps.pitch ) logging.info( f"CFAR Detection: Raw Elev {raw_elevation}°, " f"Corrected Elev {corrected_elevation:.1f}°, " f"Pitch {self.current_gps.pitch:.1f}°" ) except Exception as e: logging.error(f"Error processing radar packet: {e}") def update_range_doppler_map(self, target): """Update range-Doppler map with new target""" range_bin = min(int(target.range / 50), 1023) doppler_bin = min(abs(int(target.velocity)), 31) self.radar_processor.range_doppler_map[range_bin, doppler_bin] += 1 self.radar_processor.detected_targets.append(target) if len(self.radar_processor.detected_targets) > 100: self.radar_processor.detected_targets = self.radar_processor.detected_targets[-100:] def update_target_velocity(self, packet, velocity): """Update target velocity information""" for target in self.radar_processor.detected_targets: if ( target.azimuth == packet["azimuth"] and target.elevation == packet["elevation"] and target.id == packet["chirp"] ): target.velocity = velocity break def update_gps_display(self): """Step 18: Update GPS and pitch display""" try: while not self.gps_data_queue.empty(): gps_data = self.gps_data_queue.get_nowait() self.current_gps = gps_data # Update GPS label self.gps_label.config( text=( f"GPS: Lat {gps_data.latitude:.6f}, " f"Lon {gps_data.longitude:.6f}, " f"Alt {gps_data.altitude:.1f}m" ) ) # Update pitch label with color coding pitch_text = f"Pitch: {gps_data.pitch:+.1f}°" self.pitch_label.config(text=pitch_text) # Color code based on pitch magnitude if abs(gps_data.pitch) > 10: self.pitch_label.config(foreground="red") # High pitch warning elif abs(gps_data.pitch) > 5: self.pitch_label.config(foreground="orange") # Medium pitch else: self.pitch_label.config(foreground="green") # Normal pitch # Update map self.update_map_display(gps_data) except queue.Empty: pass def update_targets_list(self): """Update the targets list display with corrected elevations""" for item in self.targets_tree.get_children(): self.targets_tree.delete(item) for target in self.radar_processor.detected_targets[-20:]: # Find the corresponding raw elevation if available raw_elevation = "N/A" for correction in self.corrected_elevations[-20:]: if abs(correction["corrected"] - target.elevation) < 0.1: # Fuzzy match raw_elevation = f"{correction['raw']}" break self.targets_tree.insert( "", "end", values=( target.track_id, f"{target.range:.1f}", f"{target.velocity:.1f}", target.azimuth, raw_elevation, # Show raw elevation f"{target.elevation:.1f}", # Show corrected elevation f"{target.snr:.1f}", ), ) def update_gui(self): """Step 40: Update all GUI displays""" try: # Update status with pitch information if self.running: self.status_label.config( text=( f"Status: Running - Packets: {self.received_packets} - " f"Pitch: {self.current_gps.pitch:+.1f}°" ) ) # Update range-Doppler map if hasattr(self, "range_doppler_plot"): display_data = np.log10(self.radar_processor.range_doppler_map + 1) self.range_doppler_plot.set_array(display_data) self.canvas.draw_idle() # Update targets list self.update_targets_list() # Update GPS and pitch display self.update_gps_display() except Exception as e: logging.error(f"Error updating GUI: {e}") self.root.after(100, self.update_gui) def update_map_display(self, gps_data): """Step 18: Update map display with current GPS position""" try: self.map_label.config( text=f"Radar Position: {gps_data.latitude:.6f}, {gps_data.longitude:.6f}\n" f"Altitude: {gps_data.altitude:.1f}m\n" f"Pitch: {gps_data.pitch:+.1f}°\n" f"Coverage: 50km radius\n" f"Map centered on GPS coordinates" ) except Exception as e: logging.error(f"Error updating map display: {e}") def main(): """Main application entry point""" try: root = tk.Tk() _app = RadarGUI(root) root.mainloop() except Exception as e: logging.error(f"Application error: {e}") messagebox.showerror("Fatal Error", f"Application failed to start: {e}") if __name__ == "__main__": main()