From 85e59d6f462772fbcb36a62fb6c204c85302e9c5 Mon Sep 17 00:00:00 2001 From: NawfalMotii79 Date: Mon, 16 Mar 2026 22:25:10 +0000 Subject: [PATCH] Added missing classes and functions --- 9_Firmware/9_3_GUI/GUI_V6.py | 1143 +++++++++++++++++++++++++++++++++- 1 file changed, 1126 insertions(+), 17 deletions(-) diff --git a/9_Firmware/9_3_GUI/GUI_V6.py b/9_Firmware/9_3_GUI/GUI_V6.py index e8cf8f7..f789462 100644 --- a/9_Firmware/9_3_GUI/GUI_V6.py +++ b/9_Firmware/9_3_GUI/GUI_V6.py @@ -88,8 +88,163 @@ class GPSData: timestamp: float class MapGenerator: - # [MapGenerator class remains the same as before] - pass + def __init__(self): + self.map_html_template = """ + + + + Radar Map + + + + + +
+ + + + + + + """ + pass class FT601Interface: """ @@ -361,18 +516,496 @@ class FT601Interface: except Exception as e: logging.error(f"Error closing FT601 device: {e}") +class STM32USBInterface: + def __init__(self): + self.device = None + self.is_open = False + self.ep_in = None + self.ep_out = None + + def list_devices(self): + """List available STM32 USB CDC devices""" + if not USB_AVAILABLE: + logging.warning("USB not available - please install pyusb") + return [] + + try: + devices = [] + # STM32 USB CDC devices typically use these vendor/product IDs + stm32_vid_pids = [ + (0x0483, 0x5740), # STM32 Virtual COM Port + (0x0483, 0x3748), # STM32 Discovery + (0x0483, 0x374B), # STM32 CDC + (0x0483, 0x374D), # STM32 CDC + (0x0483, 0x374E), # STM32 CDC + (0x0483, 0x3752), # STM32 CDC + ] + + for vid, pid in stm32_vid_pids: + found_devices = usb.core.find(find_all=True, idVendor=vid, idProduct=pid) + for dev in found_devices: + try: + product = usb.util.get_string(dev, dev.iProduct) if dev.iProduct else "STM32 CDC" + serial = usb.util.get_string(dev, dev.iSerialNumber) if dev.iSerialNumber else "Unknown" + devices.append({ + 'description': f"{product} ({serial})", + 'vendor_id': vid, + 'product_id': pid, + 'device': dev + }) + except: + devices.append({ + 'description': f"STM32 CDC (VID:{vid:04X}, PID:{pid:04X})", + 'vendor_id': vid, + 'product_id': pid, + 'device': dev + }) + + return devices + except Exception as e: + logging.error(f"Error listing USB devices: {e}") + # Return mock devices for testing + return [{'description': 'STM32 Virtual COM Port', 'vendor_id': 0x0483, 'product_id': 0x5740}] + + def open_device(self, device_info): + """Open STM32 USB CDC device""" + if not USB_AVAILABLE: + logging.error("USB not available - cannot open device") + return False + + try: + self.device = device_info['device'] + + # Detach kernel driver if active + if self.device.is_kernel_driver_active(0): + self.device.detach_kernel_driver(0) + + # Set configuration + self.device.set_configuration() + + # Get CDC endpoints + cfg = self.device.get_active_configuration() + intf = cfg[(0,0)] + + # Find bulk endpoints (CDC data interface) + self.ep_out = usb.util.find_descriptor( + intf, + custom_match=lambda e: usb.util.endpoint_direction(e.bEndpointAddress) == usb.util.ENDPOINT_OUT + ) + + self.ep_in = usb.util.find_descriptor( + intf, + custom_match=lambda e: usb.util.endpoint_direction(e.bEndpointAddress) == usb.util.ENDPOINT_IN + ) + + if self.ep_out is None or self.ep_in is None: + logging.error("Could not find CDC endpoints") + return False + + self.is_open = True + logging.info(f"STM32 USB device opened: {device_info['description']}") + return True + + except Exception as e: + logging.error(f"Error opening USB device: {e}") + return False + + def send_start_flag(self): + """Step 12: Send start flag to STM32 via USB""" + start_packet = bytes([23, 46, 158, 237]) + logging.info("Sending start flag to STM32 via USB...") + return self._send_data(start_packet) + + def send_settings(self, settings): + """Step 13: Send radar settings to STM32 via USB""" + try: + packet = self._create_settings_packet(settings) + logging.info("Sending radar settings to STM32 via USB...") + return self._send_data(packet) + except Exception as e: + logging.error(f"Error sending settings via USB: {e}") + return False + + def read_data(self, size=64, timeout=1000): + """Read data from STM32 via USB""" + if not self.is_open or self.ep_in is None: + return None + + try: + data = self.ep_in.read(size, timeout=timeout) + return bytes(data) + except usb.core.USBError as e: + if e.errno == 110: # Timeout + return None + logging.error(f"USB read error: {e}") + return None + except Exception as e: + logging.error(f"Error reading from USB: {e}") + return None + + def _send_data(self, data): + """Send data to STM32 via USB""" + if not self.is_open or self.ep_out is None: + return False + + try: + # USB CDC typically uses 64-byte packets + packet_size = 64 + for i in range(0, len(data), packet_size): + chunk = data[i:i + packet_size] + # Pad to packet size if needed + if len(chunk) < packet_size: + chunk += b'\x00' * (packet_size - len(chunk)) + self.ep_out.write(chunk) + + return True + except Exception as e: + logging.error(f"Error sending data via USB: {e}") + return False + + def _create_settings_packet(self, settings): + """Create binary settings packet for USB transmission""" + packet = b'SET' + packet += struct.pack('>d', settings.system_frequency) + packet += struct.pack('>d', settings.chirp_duration_1) + packet += struct.pack('>d', settings.chirp_duration_2) + packet += struct.pack('>I', settings.chirps_per_position) + packet += struct.pack('>d', settings.freq_min) + packet += struct.pack('>d', settings.freq_max) + packet += struct.pack('>d', settings.prf1) + packet += struct.pack('>d', settings.prf2) + packet += struct.pack('>d', settings.max_distance) + packet += struct.pack('>d', settings.map_size) + packet += b'END' + return packet + + def close(self): + """Close USB device""" + if self.device and self.is_open: + try: + usb.util.dispose_resources(self.device) + self.is_open = False + except Exception as e: + logging.error(f"Error closing USB device: {e}") + + # [RadarProcessor class remains the same] class RadarProcessor: - # ... (same as before) - pass + def __init__(self): + self.range_doppler_map = np.zeros((1024, 32)) + self.detected_targets = [] + self.track_id_counter = 0 + self.tracks = {} + self.frame_count = 0 + + def dual_cpi_fusion(self, range_profiles_1, range_profiles_2): + """Dual-CPI fusion for better detection""" + fused_profile = np.mean(range_profiles_1, axis=0) + np.mean(range_profiles_2, axis=0) + return fused_profile + + def multi_prf_unwrap(self, doppler_measurements, prf1, prf2): + """Multi-PRF velocity unwrapping""" + lambda_wavelength = 3e8 / 10e9 + v_max1 = prf1 * lambda_wavelength / 2 + v_max2 = prf2 * lambda_wavelength / 2 + + unwrapped_velocities = [] + for doppler in doppler_measurements: + v1 = doppler * lambda_wavelength / 2 + v2 = doppler * lambda_wavelength / 2 + + velocity = self._solve_chinese_remainder(v1, v2, v_max1, v_max2) + unwrapped_velocities.append(velocity) + + return unwrapped_velocities + + def _solve_chinese_remainder(self, v1, v2, max1, max2): + for k in range(-5, 6): + candidate = v1 + k * max1 + if abs(candidate - v2) < max2 / 2: + return candidate + return v1 + + def clustering(self, detections, eps=100, min_samples=2): + """DBSCAN clustering of detections""" + if len(detections) == 0: + return [] + + points = np.array([[d.range, d.velocity] for d in detections]) + clustering = DBSCAN(eps=eps, min_samples=min_samples).fit(points) + + clusters = [] + for label in set(clustering.labels_): + if label != -1: + cluster_points = points[clustering.labels_ == label] + clusters.append({ + 'center': np.mean(cluster_points, axis=0), + 'points': cluster_points, + 'size': len(cluster_points) + }) + + return clusters + + def association(self, detections, clusters): + """Association of detections to tracks""" + associated_detections = [] + + for detection in detections: + best_track = None + min_distance = float('inf') + + for track_id, track in self.tracks.items(): + distance = np.sqrt( + (detection.range - track['state'][0])**2 + + (detection.velocity - track['state'][2])**2 + ) + + if distance < min_distance and distance < 500: + min_distance = distance + best_track = track_id + + if best_track is not None: + detection.track_id = best_track + associated_detections.append(detection) + else: + detection.track_id = self.track_id_counter + self.track_id_counter += 1 + associated_detections.append(detection) + + return associated_detections + + def tracking(self, associated_detections): + """Kalman filter tracking""" + current_time = time.time() + + for detection in associated_detections: + if detection.track_id not in self.tracks: + kf = KalmanFilter(dim_x=4, dim_z=2) + kf.x = np.array([detection.range, 0, detection.velocity, 0]) + kf.F = np.array([[1, 1, 0, 0], + [0, 1, 0, 0], + [0, 0, 1, 1], + [0, 0, 0, 1]]) + kf.H = np.array([[1, 0, 0, 0], + [0, 0, 1, 0]]) + kf.P *= 1000 + kf.R = np.diag([10, 1]) + kf.Q = np.eye(4) * 0.1 + + self.tracks[detection.track_id] = { + 'filter': kf, + 'state': kf.x, + 'last_update': current_time, + 'hits': 1 + } + else: + track = self.tracks[detection.track_id] + track['filter'].predict() + track['filter'].update([detection.range, detection.velocity]) + track['state'] = track['filter'].x + track['last_update'] = current_time + track['hits'] += 1 + + stale_tracks = [tid for tid, track in self.tracks.items() + if current_time - track['last_update'] > 5.0] + for tid in stale_tracks: + del self.tracks[tid] class USBPacketParser: - # ... (same as before) - pass + def __init__(self): + self.crc16_func = crcmod.mkCrcFun(0x11021, rev=False, initCrc=0xFFFF, xorOut=0x0000) + + def parse_gps_data(self, data): + """Parse GPS data from STM32 USB CDC with pitch angle""" + if not data: + return None + + try: + # Try text format first: "GPS:lat,lon,alt,pitch\r\n" + text_data = data.decode('utf-8', errors='ignore').strip() + if text_data.startswith('GPS:'): + parts = text_data.split(':')[1].split(',') + if len(parts) == 4: # Now expecting 4 values + lat = float(parts[0]) + lon = float(parts[1]) + alt = float(parts[2]) + pitch = float(parts[3]) # Pitch angle in degrees + return GPSData(latitude=lat, longitude=lon, altitude=alt, pitch=pitch, timestamp=time.time()) + + # Try binary format (30 bytes with pitch) + if len(data) >= 30 and data[0:4] == b'GPSB': + return self._parse_binary_gps_with_pitch(data) + + except Exception as e: + logging.error(f"Error parsing GPS data: {e}") + + return None + + def _parse_binary_gps_with_pitch(self, data): + """Parse binary GPS format with pitch angle (30 bytes)""" + try: + # Binary format: [Header 4][Latitude 8][Longitude 8][Altitude 4][Pitch 4][CRC 2] + if len(data) < 30: + return None + + # Verify CRC (simple checksum) + crc_received = (data[28] << 8) | data[29] + crc_calculated = sum(data[0:28]) & 0xFFFF + + if crc_received != crc_calculated: + logging.warning("GPS CRC mismatch") + return None + + # Parse latitude (double, big-endian) + lat_bits = 0 + for i in range(8): + lat_bits = (lat_bits << 8) | data[4 + i] + latitude = struct.unpack('>d', struct.pack('>Q', lat_bits))[0] + + # Parse longitude (double, big-endian) + lon_bits = 0 + for i in range(8): + lon_bits = (lon_bits << 8) | data[12 + i] + longitude = struct.unpack('>d', struct.pack('>Q', lon_bits))[0] + + # Parse altitude (float, big-endian) + alt_bits = 0 + for i in range(4): + alt_bits = (alt_bits << 8) | data[20 + i] + altitude = struct.unpack('>f', struct.pack('>I', alt_bits))[0] + + # Parse pitch angle (float, big-endian) + pitch_bits = 0 + for i in range(4): + pitch_bits = (pitch_bits << 8) | data[24 + i] + pitch = struct.unpack('>f', struct.pack('>I', pitch_bits))[0] + + return GPSData( + latitude=latitude, + longitude=longitude, + altitude=altitude, + pitch=pitch, + timestamp=time.time() + ) + + except Exception as e: + logging.error(f"Error parsing binary GPS with pitch: {e}") + return None class RadarPacketParser: - # ... (same as before) - pass + def __init__(self): + self.sync_pattern = b'\xA5\xC3' + self.crc16_func = crcmod.mkCrcFun(0x11021, rev=False, initCrc=0xFFFF, xorOut=0x0000) + + def parse_packet(self, data): + if len(data) < 6: + return None + + sync_index = data.find(self.sync_pattern) + if sync_index == -1: + return None + + packet = data[sync_index:] + + if len(packet) < 6: + return None + + sync = packet[0:2] + packet_type = packet[2] + length = packet[3] + + if len(packet) < (4 + length + 2): + return None + + payload = packet[4:4+length] + crc_received = struct.unpack('I', payload[0:4])[0] + elevation = payload[4] & 0x1F + azimuth = payload[5] & 0x3F + chirp_counter = payload[6] & 0x1F + + return { + 'type': 'range', + 'range': range_value, + 'elevation': elevation, + 'azimuth': azimuth, + 'chirp': chirp_counter, + 'timestamp': time.time() + } + except Exception as e: + logging.error(f"Error parsing range packet: {e}") + return None + + def parse_doppler_packet(self, payload): + if len(payload) < 12: + return None + + try: + doppler_real = struct.unpack('>h', payload[0:2])[0] + doppler_imag = struct.unpack('>h', payload[2:4])[0] + elevation = payload[4] & 0x1F + azimuth = payload[5] & 0x3F + chirp_counter = payload[6] & 0x1F + + return { + 'type': 'doppler', + 'doppler_real': doppler_real, + 'doppler_imag': doppler_imag, + 'elevation': elevation, + 'azimuth': azimuth, + 'chirp': chirp_counter, + 'timestamp': time.time() + } + except Exception as e: + logging.error(f"Error parsing Doppler packet: {e}") + return None + + def parse_detection_packet(self, payload): + if len(payload) < 8: + return None + + try: + detection_flag = (payload[0] & 0x01) != 0 + elevation = payload[1] & 0x1F + azimuth = payload[2] & 0x3F + chirp_counter = payload[3] & 0x1F + + return { + 'type': 'detection', + 'detected': detection_flag, + 'elevation': elevation, + 'azimuth': azimuth, + 'chirp': chirp_counter, + 'timestamp': time.time() + } + except Exception as e: + logging.error(f"Error parsing detection packet: {e}") + return None + class RadarGUI: def __init__(self, root): @@ -417,12 +1050,105 @@ class RadarGUI: self.start_background_threads() def configure_dark_theme(self): - # ... (same as before) - pass + """Configure ttk style for dark mercury theme""" + self.style.configure('.', + background=DARK_BG, + foreground=DARK_FG, + fieldbackground=DARK_ACCENT, + selectbackground=DARK_HIGHLIGHT, + selectforeground=DARK_FG, + troughcolor=DARK_ACCENT, + borderwidth=1, + focuscolor=DARK_BORDER) + + # Configure specific widgets + self.style.configure('TFrame', background=DARK_BG) + self.style.configure('TLabel', background=DARK_BG, foreground=DARK_FG) + self.style.configure('TButton', + background=DARK_BUTTON, + foreground=DARK_FG, + borderwidth=1, + focuscolor=DARK_BORDER) + self.style.map('TButton', + background=[('active', DARK_BUTTON_HOVER), + ('pressed', DARK_HIGHLIGHT)]) + + self.style.configure('TCombobox', + fieldbackground=DARK_ACCENT, + background=DARK_BG, + foreground=DARK_FG, + arrowcolor=DARK_FG) + self.style.map('TCombobox', + fieldbackground=[('readonly', DARK_ACCENT)], + selectbackground=[('readonly', DARK_HIGHLIGHT)], + selectforeground=[('readonly', DARK_FG)]) + + self.style.configure('TNotebook', background=DARK_BG, borderwidth=0) + self.style.configure('TNotebook.Tab', + background=DARK_ACCENT, + foreground=DARK_FG, + padding=[10, 5]) + self.style.map('TNotebook.Tab', + background=[('selected', DARK_HIGHLIGHT), + ('active', DARK_BUTTON_HOVER)]) + + self.style.configure('Treeview', + background=DARK_TREEVIEW, + foreground=DARK_FG, + fieldbackground=DARK_TREEVIEW, + borderwidth=0) + self.style.map('Treeview', + background=[('selected', DARK_HIGHLIGHT)]) + + self.style.configure('Treeview.Heading', + background=DARK_ACCENT, + foreground=DARK_FG, + relief='flat') + self.style.map('Treeview.Heading', + background=[('active', DARK_BUTTON_HOVER)]) + + self.style.configure('TEntry', + fieldbackground=DARK_ACCENT, + foreground=DARK_FG, + insertcolor=DARK_FG) + + self.style.configure('Vertical.TScrollbar', + background=DARK_ACCENT, + troughcolor=DARK_BG, + borderwidth=0, + arrowsize=12) + self.style.configure('Horizontal.TScrollbar', + background=DARK_ACCENT, + troughcolor=DARK_BG, + borderwidth=0, + arrowsize=12) + + self.style.configure('TLabelFrame', + background=DARK_BG, + foreground=DARK_FG, + bordercolor=DARK_BORDER) + self.style.configure('TLabelFrame.Label', + background=DARK_BG, + foreground=DARK_FG) def create_gui(self): - # ... (same as before, update device label) - pass + """Create the main GUI with tabs""" + self.notebook = ttk.Notebook(self.root) + self.notebook.pack(fill='both', expand=True, padx=10, pady=10) + + self.tab_main = ttk.Frame(self.notebook) + self.tab_map = ttk.Frame(self.notebook) + self.tab_diagnostics = ttk.Frame(self.notebook) + self.tab_settings = ttk.Frame(self.notebook) + + self.notebook.add(self.tab_main, text='Main View') + self.notebook.add(self.tab_map, text='Map View') + self.notebook.add(self.tab_diagnostics, text='Diagnostics') + self.notebook.add(self.tab_settings, text='Settings') + + self.setup_main_tab() + self.setup_map_tab() + self.setup_settings_tab() def setup_main_tab(self): """Setup the main radar display tab""" @@ -469,8 +1195,58 @@ class RadarGUI: text="Status: Ready - FT601 USB 3.0") self.status_label.grid(row=1, column=6, columnspan=2, sticky='e', padx=5, pady=2) - # [Rest of setup_main_tab remains the same] - # ... + # Main display area + display_frame = ttk.Frame(self.tab_main) + display_frame.pack(fill='both', expand=True, padx=10, pady=5) + + # Range-Doppler Map with dark theme + plt.style.use('dark_background') + fig = Figure(figsize=(10, 6), facecolor=DARK_BG) + self.range_doppler_ax = fig.add_subplot(111, facecolor=DARK_ACCENT) + self.range_doppler_plot = self.range_doppler_ax.imshow( + np.random.rand(1024, 32), aspect='auto', cmap='hot', + extent=[0, 32, 0, 1024]) + self.range_doppler_ax.set_title('Range-Doppler Map (Pitch Corrected)', color=DARK_FG) + self.range_doppler_ax.set_xlabel('Doppler Bin', color=DARK_FG) + self.range_doppler_ax.set_ylabel('Range Bin', color=DARK_FG) + self.range_doppler_ax.tick_params(colors=DARK_FG) + self.range_doppler_ax.spines['bottom'].set_color(DARK_FG) + self.range_doppler_ax.spines['top'].set_color(DARK_FG) + self.range_doppler_ax.spines['left'].set_color(DARK_FG) + self.range_doppler_ax.spines['right'].set_color(DARK_FG) + + self.canvas = FigureCanvasTkAgg(fig, display_frame) + self.canvas.draw() + self.canvas.get_tk_widget().pack(side='left', fill='both', expand=True) + + # Targets list with corrected elevation + targets_frame = ttk.LabelFrame(display_frame, text="Detected Targets (Pitch Corrected)") + targets_frame.pack(side='right', fill='y', padx=5) + + self.targets_tree = ttk.Treeview(targets_frame, + columns=('ID', 'Range', 'Velocity', 'Azimuth', 'Elevation', 'Corrected Elev', 'SNR'), + show='headings', height=20) + self.targets_tree.heading('ID', text='Track ID') + self.targets_tree.heading('Range', text='Range (m)') + self.targets_tree.heading('Velocity', text='Velocity (m/s)') + self.targets_tree.heading('Azimuth', text='Azimuth') + self.targets_tree.heading('Elevation', text='Raw Elev') + self.targets_tree.heading('Corrected Elev', text='Corr Elev') + self.targets_tree.heading('SNR', text='SNR (dB)') + + self.targets_tree.column('ID', width=70) + self.targets_tree.column('Range', width=90) + self.targets_tree.column('Velocity', width=90) + self.targets_tree.column('Azimuth', width=70) + self.targets_tree.column('Elevation', width=70) + self.targets_tree.column('Corrected Elev', width=70) + self.targets_tree.column('SNR', width=70) + + # Add scrollbar to targets tree + tree_scroll = ttk.Scrollbar(targets_frame, orient="vertical", command=self.targets_tree.yview) + self.targets_tree.configure(yscrollcommand=tree_scroll.set) + self.targets_tree.pack(side='left', fill='both', expand=True, padx=5, pady=5) + tree_scroll.pack(side='right', fill='y', padx=(0, 5), pady=5) def refresh_devices(self): """Refresh available USB devices""" @@ -599,8 +1375,342 @@ class RadarGUI: # This should match your packet structure return 64 # Example: 64-byte packets - # [Rest of the methods remain the same] - # ... + def process_gps_data(self): + """Step 16/17: Process GPS data from STM32 via USB CDC""" + while True: + if self.running and self.stm32_usb_interface.is_open: + try: + # Read data from STM32 USB + data = self.stm32_usb_interface.read_data(64, timeout=100) + if data: + gps_data = self.usb_packet_parser.parse_gps_data(data) + if gps_data: + self.gps_data_queue.put(gps_data) + logging.info(f"GPS Data received via USB: Lat {gps_data.latitude:.6f}, Lon {gps_data.longitude:.6f}, Alt {gps_data.altitude:.1f}m, Pitch {gps_data.pitch:.1f}°") + except Exception as e: + logging.error(f"Error processing GPS data via USB: {e}") + time.sleep(0.1) + + def process_radar_packet(self, packet): + """Step 40: Process radar data and apply pitch correction""" + try: + if packet['type'] == 'range': + range_meters = packet['range'] * 0.1 + + # Apply pitch correction to elevation + raw_elevation = packet['elevation'] + corrected_elevation = self.apply_pitch_correction(raw_elevation, self.current_gps.pitch) + + # Store correction for display + self.corrected_elevations.append({ + 'raw': raw_elevation, + 'corrected': corrected_elevation, + 'pitch': self.current_gps.pitch, + 'timestamp': packet['timestamp'] + }) + + # Keep only recent corrections + if len(self.corrected_elevations) > 100: + self.corrected_elevations = self.corrected_elevations[-100:] + + target = RadarTarget( + id=packet['chirp'], + range=range_meters, + velocity=0, + azimuth=packet['azimuth'], + elevation=corrected_elevation, # Use corrected elevation + snr=20.0, + timestamp=packet['timestamp'] + ) + + self.update_range_doppler_map(target) + + elif packet['type'] == 'doppler': + lambda_wavelength = 3e8 / self.settings.system_frequency + velocity = (packet['doppler_real'] / 32767.0) * (self.settings.prf1 * lambda_wavelength / 2) + self.update_target_velocity(packet, velocity) + + elif packet['type'] == 'detection': + if packet['detected']: + # Apply pitch correction to detection elevation + raw_elevation = packet['elevation'] + corrected_elevation = self.apply_pitch_correction(raw_elevation, self.current_gps.pitch) + + logging.info(f"CFAR Detection: Raw Elev {raw_elevation}°, Corrected Elev {corrected_elevation:.1f}°, Pitch {self.current_gps.pitch:.1f}°") + + except Exception as e: + logging.error(f"Error processing radar packet: {e}") + + def update_range_doppler_map(self, target): + """Update range-Doppler map with new target""" + range_bin = min(int(target.range / 50), 1023) + doppler_bin = min(abs(int(target.velocity)), 31) + + self.radar_processor.range_doppler_map[range_bin, doppler_bin] += 1 + + self.radar_processor.detected_targets.append(target) + + if len(self.radar_processor.detected_targets) > 100: + self.radar_processor.detected_targets = self.radar_processor.detected_targets[-100:] + + def update_target_velocity(self, packet, velocity): + """Update target velocity information""" + for target in self.radar_processor.detected_targets: + if (target.azimuth == packet['azimuth'] and + target.elevation == packet['elevation'] and + target.id == packet['chirp']): + target.velocity = velocity + break + + def setup_map_tab(self): + """Setup the map display tab with Google Maps""" + map_frame = ttk.Frame(self.tab_map) + map_frame.pack(fill='both', expand=True, padx=10, pady=10) + + # Map controls + controls_frame = ttk.Frame(map_frame) + controls_frame.pack(fill='x', pady=5) + + ttk.Button(controls_frame, text="Open Map in Browser", + command=self.open_map_in_browser).pack(side='left', padx=5) + + ttk.Button(controls_frame, text="Refresh Map", + command=self.refresh_map).pack(side='left', padx=5) + + self.map_status_label = ttk.Label(controls_frame, text="Map: Ready to generate") + self.map_status_label.pack(side='left', padx=20) + + # Map info display + info_frame = ttk.Frame(map_frame) + info_frame.pack(fill='x', pady=5) + + self.map_info_label = ttk.Label(info_frame, text="No GPS data received yet", font=('Arial', 10)) + self.map_info_label.pack() + + def open_map_in_browser(self): + """Open the generated map in the default web browser""" + if self.map_file_path and os.path.exists(self.map_file_path): + webbrowser.open('file://' + os.path.abspath(self.map_file_path)) + else: + messagebox.showwarning("Warning", "No map file available. Generate map first by receiving GPS data.") + + def refresh_map(self): + """Refresh the map with current data""" + self.generate_map() + + def generate_map(self): + """Generate Google Maps HTML file with current targets""" + if self.current_gps.latitude == 0 and self.current_gps.longitude == 0: + self.map_status_label.config(text="Map: Waiting for GPS data") + return + + try: + # Create temporary HTML file + with tempfile.NamedTemporaryFile(mode='w', suffix='.html', delete=False, encoding='utf-8') as f: + map_html = self.map_generator.generate_map( + self.current_gps, + self.radar_processor.detected_targets, + self.settings.map_size, + self.google_maps_api_key + ) + f.write(map_html) + self.map_file_path = f.name + + self.map_status_label.config(text=f"Map: Generated at {self.map_file_path}") + self.map_info_label.config( + text=f"Radar: {self.current_gps.latitude:.6f}, {self.current_gps.longitude:.6f} | " + f"Targets: {len(self.radar_processor.detected_targets)} | " + f"Coverage: {self.settings.map_size/1000:.1f}km" + ) + logging.info(f"Map generated: {self.map_file_path}") + + except Exception as e: + logging.error(f"Error generating map: {e}") + self.map_status_label.config(text=f"Map: Error - {str(e)}") + + def update_gps_display(self): + """Step 18: Update GPS and pitch display""" + try: + while not self.gps_data_queue.empty(): + gps_data = self.gps_data_queue.get_nowait() + self.current_gps = gps_data + + # Update GPS label + self.gps_label.config( + text=f"GPS: Lat {gps_data.latitude:.6f}, Lon {gps_data.longitude:.6f}, Alt {gps_data.altitude:.1f}m") + + # Update pitch label with color coding + pitch_text = f"Pitch: {gps_data.pitch:+.1f}°" + self.pitch_label.config(text=pitch_text) + + # Color code based on pitch magnitude + if abs(gps_data.pitch) > 10: + self.pitch_label.config(foreground='red') # High pitch warning + elif abs(gps_data.pitch) > 5: + self.pitch_label.config(foreground='orange') # Medium pitch + else: + self.pitch_label.config(foreground='green') # Normal pitch + + # Generate/update map when new GPS data arrives + self.generate_map() + + except queue.Empty: + pass + + def setup_settings_tab(self): + """Setup the settings tab with additional chirp durations and map size""" + settings_frame = ttk.Frame(self.tab_settings) + settings_frame.pack(fill='both', expand=True, padx=10, pady=10) + + entries = [ + ('System Frequency (Hz):', 'system_frequency', 10e9), + ('Chirp Duration 1 - Long (s):', 'chirp_duration_1', 30e-6), + ('Chirp Duration 2 - Short (s):', 'chirp_duration_2', 0.5e-6), + ('Chirps per Position:', 'chirps_per_position', 32), + ('Frequency Min (Hz):', 'freq_min', 10e6), + ('Frequency Max (Hz):', 'freq_max', 30e6), + ('PRF1 (Hz):', 'prf1', 1000), + ('PRF2 (Hz):', 'prf2', 2000), + ('Max Distance (m):', 'max_distance', 50000), + ('Map Size (m):', 'map_size', 50000), + ('Google Maps API Key:', 'google_maps_api_key', 'YOUR_GOOGLE_MAPS_API_KEY') + ] + + self.settings_vars = {} + + for i, (label, attr, default) in enumerate(entries): + ttk.Label(settings_frame, text=label).grid(row=i, column=0, sticky='w', padx=5, pady=5) + var = tk.StringVar(value=str(default)) + entry = ttk.Entry(settings_frame, textvariable=var, width=25) + entry.grid(row=i, column=1, padx=5, pady=5) + self.settings_vars[attr] = var + + ttk.Button(settings_frame, text="Apply Settings", + command=self.apply_settings).grid(row=len(entries), column=0, columnspan=2, pady=10) + + def apply_settings(self): + """Step 13: Apply and send radar settings via USB""" + try: + self.settings.system_frequency = float(self.settings_vars['system_frequency'].get()) + self.settings.chirp_duration_1 = float(self.settings_vars['chirp_duration_1'].get()) + self.settings.chirp_duration_2 = float(self.settings_vars['chirp_duration_2'].get()) + self.settings.chirps_per_position = int(self.settings_vars['chirps_per_position'].get()) + self.settings.freq_min = float(self.settings_vars['freq_min'].get()) + self.settings.freq_max = float(self.settings_vars['freq_max'].get()) + self.settings.prf1 = float(self.settings_vars['prf1'].get()) + self.settings.prf2 = float(self.settings_vars['prf2'].get()) + self.settings.max_distance = float(self.settings_vars['max_distance'].get()) + self.settings.map_size = float(self.settings_vars['map_size'].get()) + self.google_maps_api_key = self.settings_vars['google_maps_api_key'].get() + + if self.stm32_usb_interface.is_open: + self.stm32_usb_interface.send_settings(self.settings) + + messagebox.showinfo("Success", "Settings applied and sent to STM32 via USB") + logging.info("Radar settings applied via USB") + + except ValueError as e: + messagebox.showerror("Error", f"Invalid setting value: {e}") + + def update_targets_list(self): + """Update the targets list display with corrected elevations""" + for item in self.targets_tree.get_children(): + self.targets_tree.delete(item) + + for target in self.radar_processor.detected_targets[-20:]: + # Find the corresponding raw elevation if available + raw_elevation = "N/A" + for correction in self.corrected_elevations[-20:]: + if abs(correction['corrected'] - target.elevation) < 0.1: # Fuzzy match + raw_elevation = f"{correction['raw']}" + break + + self.targets_tree.insert('', 'end', values=( + target.track_id, + f"{target.range:.1f}", + f"{target.velocity:.1f}", + target.azimuth, + raw_elevation, # Show raw elevation + f"{target.elevation:.1f}", # Show corrected elevation + f"{target.snr:.1f}" + )) + + def start_background_threads(self): + """Start background data processing threads""" + self.radar_thread = threading.Thread(target=self.process_radar_data, daemon=True) + self.radar_thread.start() + + self.gps_thread = threading.Thread(target=self.process_gps_data, daemon=True) + self.gps_thread.start() + + self.root.after(100, self.update_gui) + + def process_radar_data(self): + """Step 39: Process incoming radar data from FTDI""" + buffer = b'' + while True: + if self.running and self.ftdi_interface.is_open: + try: + data = self.ftdi_interface.read_data(4096) + if data: + buffer += data + + while len(buffer) >= 6: + packet = self.radar_packet_parser.parse_packet(buffer) + if packet: + self.process_radar_packet(packet) + packet_length = 4 + len(packet.get('payload', b'')) + 2 + buffer = buffer[packet_length:] + self.received_packets += 1 + else: + break + + except Exception as e: + logging.error(f"Error processing radar data: {e}") + time.sleep(0.1) + else: + time.sleep(0.1) + + def process_gps_data(self): + """Step 16/17: Process GPS data from STM32 via USB CDC""" + while True: + if self.running and self.stm32_usb_interface.is_open: + try: + # Read data from STM32 USB + data = self.stm32_usb_interface.read_data(64, timeout=100) + if data: + gps_data = self.usb_packet_parser.parse_gps_data(data) + if gps_data: + self.gps_data_queue.put(gps_data) + logging.info(f"GPS Data received via USB: Lat {gps_data.latitude:.6f}, Lon {gps_data.longitude:.6f}, Alt {gps_data.altitude:.1f}m, Pitch {gps_data.pitch:.1f}°") + except Exception as e: + logging.error(f"Error processing GPS data via USB: {e}") + time.sleep(0.1) + + def update_gui(self): + """Step 40: Update all GUI displays""" + try: + # Update status with pitch information + if self.running: + self.status_label.config( + text=f"Status: Running - Packets: {self.received_packets} - Pitch: {self.current_gps.pitch:+.1f}°") + + # Update range-Doppler map + if hasattr(self, 'range_doppler_plot'): + display_data = np.log10(self.radar_processor.range_doppler_map + 1) + self.range_doppler_plot.set_array(display_data) + self.canvas.draw_idle() + + # Update targets list + self.update_targets_list() + + # Update GPS and pitch display + self.update_gps_display() + + except Exception as e: + logging.error(f"Error updating GUI: {e}") + + self.root.after(100, self.update_gui) def main(): """Main application entry point""" @@ -613,5 +1723,4 @@ def main(): messagebox.showerror("Fatal Error", f"Application failed to start: {e}") if __name__ == "__main__": - main()