diff --git a/9_Firmware/9_3_GUI/GUI_V6.py b/9_Firmware/9_3_GUI/GUI_V6.py
index e8cf8f7..f789462 100644
--- a/9_Firmware/9_3_GUI/GUI_V6.py
+++ b/9_Firmware/9_3_GUI/GUI_V6.py
@@ -88,8 +88,163 @@ class GPSData:
timestamp: float
class MapGenerator:
- # [MapGenerator class remains the same as before]
- pass
+ def __init__(self):
+ self.map_html_template = """
+
+
+
+ Radar Map
+
+
+
+
+
+
+
+
+
+
+
+
+ """
+ pass
class FT601Interface:
"""
@@ -361,18 +516,496 @@ class FT601Interface:
except Exception as e:
logging.error(f"Error closing FT601 device: {e}")
+class STM32USBInterface:
+ def __init__(self):
+ self.device = None
+ self.is_open = False
+ self.ep_in = None
+ self.ep_out = None
+
+ def list_devices(self):
+ """List available STM32 USB CDC devices"""
+ if not USB_AVAILABLE:
+ logging.warning("USB not available - please install pyusb")
+ return []
+
+ try:
+ devices = []
+ # STM32 USB CDC devices typically use these vendor/product IDs
+ stm32_vid_pids = [
+ (0x0483, 0x5740), # STM32 Virtual COM Port
+ (0x0483, 0x3748), # STM32 Discovery
+ (0x0483, 0x374B), # STM32 CDC
+ (0x0483, 0x374D), # STM32 CDC
+ (0x0483, 0x374E), # STM32 CDC
+ (0x0483, 0x3752), # STM32 CDC
+ ]
+
+ for vid, pid in stm32_vid_pids:
+ found_devices = usb.core.find(find_all=True, idVendor=vid, idProduct=pid)
+ for dev in found_devices:
+ try:
+ product = usb.util.get_string(dev, dev.iProduct) if dev.iProduct else "STM32 CDC"
+ serial = usb.util.get_string(dev, dev.iSerialNumber) if dev.iSerialNumber else "Unknown"
+ devices.append({
+ 'description': f"{product} ({serial})",
+ 'vendor_id': vid,
+ 'product_id': pid,
+ 'device': dev
+ })
+ except:
+ devices.append({
+ 'description': f"STM32 CDC (VID:{vid:04X}, PID:{pid:04X})",
+ 'vendor_id': vid,
+ 'product_id': pid,
+ 'device': dev
+ })
+
+ return devices
+ except Exception as e:
+ logging.error(f"Error listing USB devices: {e}")
+ # Return mock devices for testing
+ return [{'description': 'STM32 Virtual COM Port', 'vendor_id': 0x0483, 'product_id': 0x5740}]
+
+ def open_device(self, device_info):
+ """Open STM32 USB CDC device"""
+ if not USB_AVAILABLE:
+ logging.error("USB not available - cannot open device")
+ return False
+
+ try:
+ self.device = device_info['device']
+
+ # Detach kernel driver if active
+ if self.device.is_kernel_driver_active(0):
+ self.device.detach_kernel_driver(0)
+
+ # Set configuration
+ self.device.set_configuration()
+
+ # Get CDC endpoints
+ cfg = self.device.get_active_configuration()
+ intf = cfg[(0,0)]
+
+ # Find bulk endpoints (CDC data interface)
+ self.ep_out = usb.util.find_descriptor(
+ intf,
+ custom_match=lambda e: usb.util.endpoint_direction(e.bEndpointAddress) == usb.util.ENDPOINT_OUT
+ )
+
+ self.ep_in = usb.util.find_descriptor(
+ intf,
+ custom_match=lambda e: usb.util.endpoint_direction(e.bEndpointAddress) == usb.util.ENDPOINT_IN
+ )
+
+ if self.ep_out is None or self.ep_in is None:
+ logging.error("Could not find CDC endpoints")
+ return False
+
+ self.is_open = True
+ logging.info(f"STM32 USB device opened: {device_info['description']}")
+ return True
+
+ except Exception as e:
+ logging.error(f"Error opening USB device: {e}")
+ return False
+
+ def send_start_flag(self):
+ """Step 12: Send start flag to STM32 via USB"""
+ start_packet = bytes([23, 46, 158, 237])
+ logging.info("Sending start flag to STM32 via USB...")
+ return self._send_data(start_packet)
+
+ def send_settings(self, settings):
+ """Step 13: Send radar settings to STM32 via USB"""
+ try:
+ packet = self._create_settings_packet(settings)
+ logging.info("Sending radar settings to STM32 via USB...")
+ return self._send_data(packet)
+ except Exception as e:
+ logging.error(f"Error sending settings via USB: {e}")
+ return False
+
+ def read_data(self, size=64, timeout=1000):
+ """Read data from STM32 via USB"""
+ if not self.is_open or self.ep_in is None:
+ return None
+
+ try:
+ data = self.ep_in.read(size, timeout=timeout)
+ return bytes(data)
+ except usb.core.USBError as e:
+ if e.errno == 110: # Timeout
+ return None
+ logging.error(f"USB read error: {e}")
+ return None
+ except Exception as e:
+ logging.error(f"Error reading from USB: {e}")
+ return None
+
+ def _send_data(self, data):
+ """Send data to STM32 via USB"""
+ if not self.is_open or self.ep_out is None:
+ return False
+
+ try:
+ # USB CDC typically uses 64-byte packets
+ packet_size = 64
+ for i in range(0, len(data), packet_size):
+ chunk = data[i:i + packet_size]
+ # Pad to packet size if needed
+ if len(chunk) < packet_size:
+ chunk += b'\x00' * (packet_size - len(chunk))
+ self.ep_out.write(chunk)
+
+ return True
+ except Exception as e:
+ logging.error(f"Error sending data via USB: {e}")
+ return False
+
+ def _create_settings_packet(self, settings):
+ """Create binary settings packet for USB transmission"""
+ packet = b'SET'
+ packet += struct.pack('>d', settings.system_frequency)
+ packet += struct.pack('>d', settings.chirp_duration_1)
+ packet += struct.pack('>d', settings.chirp_duration_2)
+ packet += struct.pack('>I', settings.chirps_per_position)
+ packet += struct.pack('>d', settings.freq_min)
+ packet += struct.pack('>d', settings.freq_max)
+ packet += struct.pack('>d', settings.prf1)
+ packet += struct.pack('>d', settings.prf2)
+ packet += struct.pack('>d', settings.max_distance)
+ packet += struct.pack('>d', settings.map_size)
+ packet += b'END'
+ return packet
+
+ def close(self):
+ """Close USB device"""
+ if self.device and self.is_open:
+ try:
+ usb.util.dispose_resources(self.device)
+ self.is_open = False
+ except Exception as e:
+ logging.error(f"Error closing USB device: {e}")
+
+
# [RadarProcessor class remains the same]
class RadarProcessor:
- # ... (same as before)
- pass
+ def __init__(self):
+ self.range_doppler_map = np.zeros((1024, 32))
+ self.detected_targets = []
+ self.track_id_counter = 0
+ self.tracks = {}
+ self.frame_count = 0
+
+ def dual_cpi_fusion(self, range_profiles_1, range_profiles_2):
+ """Dual-CPI fusion for better detection"""
+ fused_profile = np.mean(range_profiles_1, axis=0) + np.mean(range_profiles_2, axis=0)
+ return fused_profile
+
+ def multi_prf_unwrap(self, doppler_measurements, prf1, prf2):
+ """Multi-PRF velocity unwrapping"""
+ lambda_wavelength = 3e8 / 10e9
+ v_max1 = prf1 * lambda_wavelength / 2
+ v_max2 = prf2 * lambda_wavelength / 2
+
+ unwrapped_velocities = []
+ for doppler in doppler_measurements:
+ v1 = doppler * lambda_wavelength / 2
+ v2 = doppler * lambda_wavelength / 2
+
+ velocity = self._solve_chinese_remainder(v1, v2, v_max1, v_max2)
+ unwrapped_velocities.append(velocity)
+
+ return unwrapped_velocities
+
+ def _solve_chinese_remainder(self, v1, v2, max1, max2):
+ for k in range(-5, 6):
+ candidate = v1 + k * max1
+ if abs(candidate - v2) < max2 / 2:
+ return candidate
+ return v1
+
+ def clustering(self, detections, eps=100, min_samples=2):
+ """DBSCAN clustering of detections"""
+ if len(detections) == 0:
+ return []
+
+ points = np.array([[d.range, d.velocity] for d in detections])
+ clustering = DBSCAN(eps=eps, min_samples=min_samples).fit(points)
+
+ clusters = []
+ for label in set(clustering.labels_):
+ if label != -1:
+ cluster_points = points[clustering.labels_ == label]
+ clusters.append({
+ 'center': np.mean(cluster_points, axis=0),
+ 'points': cluster_points,
+ 'size': len(cluster_points)
+ })
+
+ return clusters
+
+ def association(self, detections, clusters):
+ """Association of detections to tracks"""
+ associated_detections = []
+
+ for detection in detections:
+ best_track = None
+ min_distance = float('inf')
+
+ for track_id, track in self.tracks.items():
+ distance = np.sqrt(
+ (detection.range - track['state'][0])**2 +
+ (detection.velocity - track['state'][2])**2
+ )
+
+ if distance < min_distance and distance < 500:
+ min_distance = distance
+ best_track = track_id
+
+ if best_track is not None:
+ detection.track_id = best_track
+ associated_detections.append(detection)
+ else:
+ detection.track_id = self.track_id_counter
+ self.track_id_counter += 1
+ associated_detections.append(detection)
+
+ return associated_detections
+
+ def tracking(self, associated_detections):
+ """Kalman filter tracking"""
+ current_time = time.time()
+
+ for detection in associated_detections:
+ if detection.track_id not in self.tracks:
+ kf = KalmanFilter(dim_x=4, dim_z=2)
+ kf.x = np.array([detection.range, 0, detection.velocity, 0])
+ kf.F = np.array([[1, 1, 0, 0],
+ [0, 1, 0, 0],
+ [0, 0, 1, 1],
+ [0, 0, 0, 1]])
+ kf.H = np.array([[1, 0, 0, 0],
+ [0, 0, 1, 0]])
+ kf.P *= 1000
+ kf.R = np.diag([10, 1])
+ kf.Q = np.eye(4) * 0.1
+
+ self.tracks[detection.track_id] = {
+ 'filter': kf,
+ 'state': kf.x,
+ 'last_update': current_time,
+ 'hits': 1
+ }
+ else:
+ track = self.tracks[detection.track_id]
+ track['filter'].predict()
+ track['filter'].update([detection.range, detection.velocity])
+ track['state'] = track['filter'].x
+ track['last_update'] = current_time
+ track['hits'] += 1
+
+ stale_tracks = [tid for tid, track in self.tracks.items()
+ if current_time - track['last_update'] > 5.0]
+ for tid in stale_tracks:
+ del self.tracks[tid]
class USBPacketParser:
- # ... (same as before)
- pass
+ def __init__(self):
+ self.crc16_func = crcmod.mkCrcFun(0x11021, rev=False, initCrc=0xFFFF, xorOut=0x0000)
+
+ def parse_gps_data(self, data):
+ """Parse GPS data from STM32 USB CDC with pitch angle"""
+ if not data:
+ return None
+
+ try:
+ # Try text format first: "GPS:lat,lon,alt,pitch\r\n"
+ text_data = data.decode('utf-8', errors='ignore').strip()
+ if text_data.startswith('GPS:'):
+ parts = text_data.split(':')[1].split(',')
+ if len(parts) == 4: # Now expecting 4 values
+ lat = float(parts[0])
+ lon = float(parts[1])
+ alt = float(parts[2])
+ pitch = float(parts[3]) # Pitch angle in degrees
+ return GPSData(latitude=lat, longitude=lon, altitude=alt, pitch=pitch, timestamp=time.time())
+
+ # Try binary format (30 bytes with pitch)
+ if len(data) >= 30 and data[0:4] == b'GPSB':
+ return self._parse_binary_gps_with_pitch(data)
+
+ except Exception as e:
+ logging.error(f"Error parsing GPS data: {e}")
+
+ return None
+
+ def _parse_binary_gps_with_pitch(self, data):
+ """Parse binary GPS format with pitch angle (30 bytes)"""
+ try:
+ # Binary format: [Header 4][Latitude 8][Longitude 8][Altitude 4][Pitch 4][CRC 2]
+ if len(data) < 30:
+ return None
+
+ # Verify CRC (simple checksum)
+ crc_received = (data[28] << 8) | data[29]
+ crc_calculated = sum(data[0:28]) & 0xFFFF
+
+ if crc_received != crc_calculated:
+ logging.warning("GPS CRC mismatch")
+ return None
+
+ # Parse latitude (double, big-endian)
+ lat_bits = 0
+ for i in range(8):
+ lat_bits = (lat_bits << 8) | data[4 + i]
+ latitude = struct.unpack('>d', struct.pack('>Q', lat_bits))[0]
+
+ # Parse longitude (double, big-endian)
+ lon_bits = 0
+ for i in range(8):
+ lon_bits = (lon_bits << 8) | data[12 + i]
+ longitude = struct.unpack('>d', struct.pack('>Q', lon_bits))[0]
+
+ # Parse altitude (float, big-endian)
+ alt_bits = 0
+ for i in range(4):
+ alt_bits = (alt_bits << 8) | data[20 + i]
+ altitude = struct.unpack('>f', struct.pack('>I', alt_bits))[0]
+
+ # Parse pitch angle (float, big-endian)
+ pitch_bits = 0
+ for i in range(4):
+ pitch_bits = (pitch_bits << 8) | data[24 + i]
+ pitch = struct.unpack('>f', struct.pack('>I', pitch_bits))[0]
+
+ return GPSData(
+ latitude=latitude,
+ longitude=longitude,
+ altitude=altitude,
+ pitch=pitch,
+ timestamp=time.time()
+ )
+
+ except Exception as e:
+ logging.error(f"Error parsing binary GPS with pitch: {e}")
+ return None
class RadarPacketParser:
- # ... (same as before)
- pass
+ def __init__(self):
+ self.sync_pattern = b'\xA5\xC3'
+ self.crc16_func = crcmod.mkCrcFun(0x11021, rev=False, initCrc=0xFFFF, xorOut=0x0000)
+
+ def parse_packet(self, data):
+ if len(data) < 6:
+ return None
+
+ sync_index = data.find(self.sync_pattern)
+ if sync_index == -1:
+ return None
+
+ packet = data[sync_index:]
+
+ if len(packet) < 6:
+ return None
+
+ sync = packet[0:2]
+ packet_type = packet[2]
+ length = packet[3]
+
+ if len(packet) < (4 + length + 2):
+ return None
+
+ payload = packet[4:4+length]
+ crc_received = struct.unpack('I', payload[0:4])[0]
+ elevation = payload[4] & 0x1F
+ azimuth = payload[5] & 0x3F
+ chirp_counter = payload[6] & 0x1F
+
+ return {
+ 'type': 'range',
+ 'range': range_value,
+ 'elevation': elevation,
+ 'azimuth': azimuth,
+ 'chirp': chirp_counter,
+ 'timestamp': time.time()
+ }
+ except Exception as e:
+ logging.error(f"Error parsing range packet: {e}")
+ return None
+
+ def parse_doppler_packet(self, payload):
+ if len(payload) < 12:
+ return None
+
+ try:
+ doppler_real = struct.unpack('>h', payload[0:2])[0]
+ doppler_imag = struct.unpack('>h', payload[2:4])[0]
+ elevation = payload[4] & 0x1F
+ azimuth = payload[5] & 0x3F
+ chirp_counter = payload[6] & 0x1F
+
+ return {
+ 'type': 'doppler',
+ 'doppler_real': doppler_real,
+ 'doppler_imag': doppler_imag,
+ 'elevation': elevation,
+ 'azimuth': azimuth,
+ 'chirp': chirp_counter,
+ 'timestamp': time.time()
+ }
+ except Exception as e:
+ logging.error(f"Error parsing Doppler packet: {e}")
+ return None
+
+ def parse_detection_packet(self, payload):
+ if len(payload) < 8:
+ return None
+
+ try:
+ detection_flag = (payload[0] & 0x01) != 0
+ elevation = payload[1] & 0x1F
+ azimuth = payload[2] & 0x3F
+ chirp_counter = payload[3] & 0x1F
+
+ return {
+ 'type': 'detection',
+ 'detected': detection_flag,
+ 'elevation': elevation,
+ 'azimuth': azimuth,
+ 'chirp': chirp_counter,
+ 'timestamp': time.time()
+ }
+ except Exception as e:
+ logging.error(f"Error parsing detection packet: {e}")
+ return None
+
class RadarGUI:
def __init__(self, root):
@@ -417,12 +1050,105 @@ class RadarGUI:
self.start_background_threads()
def configure_dark_theme(self):
- # ... (same as before)
- pass
+ """Configure ttk style for dark mercury theme"""
+ self.style.configure('.',
+ background=DARK_BG,
+ foreground=DARK_FG,
+ fieldbackground=DARK_ACCENT,
+ selectbackground=DARK_HIGHLIGHT,
+ selectforeground=DARK_FG,
+ troughcolor=DARK_ACCENT,
+ borderwidth=1,
+ focuscolor=DARK_BORDER)
+
+ # Configure specific widgets
+ self.style.configure('TFrame', background=DARK_BG)
+ self.style.configure('TLabel', background=DARK_BG, foreground=DARK_FG)
+ self.style.configure('TButton',
+ background=DARK_BUTTON,
+ foreground=DARK_FG,
+ borderwidth=1,
+ focuscolor=DARK_BORDER)
+ self.style.map('TButton',
+ background=[('active', DARK_BUTTON_HOVER),
+ ('pressed', DARK_HIGHLIGHT)])
+
+ self.style.configure('TCombobox',
+ fieldbackground=DARK_ACCENT,
+ background=DARK_BG,
+ foreground=DARK_FG,
+ arrowcolor=DARK_FG)
+ self.style.map('TCombobox',
+ fieldbackground=[('readonly', DARK_ACCENT)],
+ selectbackground=[('readonly', DARK_HIGHLIGHT)],
+ selectforeground=[('readonly', DARK_FG)])
+
+ self.style.configure('TNotebook', background=DARK_BG, borderwidth=0)
+ self.style.configure('TNotebook.Tab',
+ background=DARK_ACCENT,
+ foreground=DARK_FG,
+ padding=[10, 5])
+ self.style.map('TNotebook.Tab',
+ background=[('selected', DARK_HIGHLIGHT),
+ ('active', DARK_BUTTON_HOVER)])
+
+ self.style.configure('Treeview',
+ background=DARK_TREEVIEW,
+ foreground=DARK_FG,
+ fieldbackground=DARK_TREEVIEW,
+ borderwidth=0)
+ self.style.map('Treeview',
+ background=[('selected', DARK_HIGHLIGHT)])
+
+ self.style.configure('Treeview.Heading',
+ background=DARK_ACCENT,
+ foreground=DARK_FG,
+ relief='flat')
+ self.style.map('Treeview.Heading',
+ background=[('active', DARK_BUTTON_HOVER)])
+
+ self.style.configure('TEntry',
+ fieldbackground=DARK_ACCENT,
+ foreground=DARK_FG,
+ insertcolor=DARK_FG)
+
+ self.style.configure('Vertical.TScrollbar',
+ background=DARK_ACCENT,
+ troughcolor=DARK_BG,
+ borderwidth=0,
+ arrowsize=12)
+ self.style.configure('Horizontal.TScrollbar',
+ background=DARK_ACCENT,
+ troughcolor=DARK_BG,
+ borderwidth=0,
+ arrowsize=12)
+
+ self.style.configure('TLabelFrame',
+ background=DARK_BG,
+ foreground=DARK_FG,
+ bordercolor=DARK_BORDER)
+ self.style.configure('TLabelFrame.Label',
+ background=DARK_BG,
+ foreground=DARK_FG)
def create_gui(self):
- # ... (same as before, update device label)
- pass
+ """Create the main GUI with tabs"""
+ self.notebook = ttk.Notebook(self.root)
+ self.notebook.pack(fill='both', expand=True, padx=10, pady=10)
+
+ self.tab_main = ttk.Frame(self.notebook)
+ self.tab_map = ttk.Frame(self.notebook)
+ self.tab_diagnostics = ttk.Frame(self.notebook)
+ self.tab_settings = ttk.Frame(self.notebook)
+
+ self.notebook.add(self.tab_main, text='Main View')
+ self.notebook.add(self.tab_map, text='Map View')
+ self.notebook.add(self.tab_diagnostics, text='Diagnostics')
+ self.notebook.add(self.tab_settings, text='Settings')
+
+ self.setup_main_tab()
+ self.setup_map_tab()
+ self.setup_settings_tab()
def setup_main_tab(self):
"""Setup the main radar display tab"""
@@ -469,8 +1195,58 @@ class RadarGUI:
text="Status: Ready - FT601 USB 3.0")
self.status_label.grid(row=1, column=6, columnspan=2, sticky='e', padx=5, pady=2)
- # [Rest of setup_main_tab remains the same]
- # ...
+ # Main display area
+ display_frame = ttk.Frame(self.tab_main)
+ display_frame.pack(fill='both', expand=True, padx=10, pady=5)
+
+ # Range-Doppler Map with dark theme
+ plt.style.use('dark_background')
+ fig = Figure(figsize=(10, 6), facecolor=DARK_BG)
+ self.range_doppler_ax = fig.add_subplot(111, facecolor=DARK_ACCENT)
+ self.range_doppler_plot = self.range_doppler_ax.imshow(
+ np.random.rand(1024, 32), aspect='auto', cmap='hot',
+ extent=[0, 32, 0, 1024])
+ self.range_doppler_ax.set_title('Range-Doppler Map (Pitch Corrected)', color=DARK_FG)
+ self.range_doppler_ax.set_xlabel('Doppler Bin', color=DARK_FG)
+ self.range_doppler_ax.set_ylabel('Range Bin', color=DARK_FG)
+ self.range_doppler_ax.tick_params(colors=DARK_FG)
+ self.range_doppler_ax.spines['bottom'].set_color(DARK_FG)
+ self.range_doppler_ax.spines['top'].set_color(DARK_FG)
+ self.range_doppler_ax.spines['left'].set_color(DARK_FG)
+ self.range_doppler_ax.spines['right'].set_color(DARK_FG)
+
+ self.canvas = FigureCanvasTkAgg(fig, display_frame)
+ self.canvas.draw()
+ self.canvas.get_tk_widget().pack(side='left', fill='both', expand=True)
+
+ # Targets list with corrected elevation
+ targets_frame = ttk.LabelFrame(display_frame, text="Detected Targets (Pitch Corrected)")
+ targets_frame.pack(side='right', fill='y', padx=5)
+
+ self.targets_tree = ttk.Treeview(targets_frame,
+ columns=('ID', 'Range', 'Velocity', 'Azimuth', 'Elevation', 'Corrected Elev', 'SNR'),
+ show='headings', height=20)
+ self.targets_tree.heading('ID', text='Track ID')
+ self.targets_tree.heading('Range', text='Range (m)')
+ self.targets_tree.heading('Velocity', text='Velocity (m/s)')
+ self.targets_tree.heading('Azimuth', text='Azimuth')
+ self.targets_tree.heading('Elevation', text='Raw Elev')
+ self.targets_tree.heading('Corrected Elev', text='Corr Elev')
+ self.targets_tree.heading('SNR', text='SNR (dB)')
+
+ self.targets_tree.column('ID', width=70)
+ self.targets_tree.column('Range', width=90)
+ self.targets_tree.column('Velocity', width=90)
+ self.targets_tree.column('Azimuth', width=70)
+ self.targets_tree.column('Elevation', width=70)
+ self.targets_tree.column('Corrected Elev', width=70)
+ self.targets_tree.column('SNR', width=70)
+
+ # Add scrollbar to targets tree
+ tree_scroll = ttk.Scrollbar(targets_frame, orient="vertical", command=self.targets_tree.yview)
+ self.targets_tree.configure(yscrollcommand=tree_scroll.set)
+ self.targets_tree.pack(side='left', fill='both', expand=True, padx=5, pady=5)
+ tree_scroll.pack(side='right', fill='y', padx=(0, 5), pady=5)
def refresh_devices(self):
"""Refresh available USB devices"""
@@ -599,8 +1375,342 @@ class RadarGUI:
# This should match your packet structure
return 64 # Example: 64-byte packets
- # [Rest of the methods remain the same]
- # ...
+ def process_gps_data(self):
+ """Step 16/17: Process GPS data from STM32 via USB CDC"""
+ while True:
+ if self.running and self.stm32_usb_interface.is_open:
+ try:
+ # Read data from STM32 USB
+ data = self.stm32_usb_interface.read_data(64, timeout=100)
+ if data:
+ gps_data = self.usb_packet_parser.parse_gps_data(data)
+ if gps_data:
+ self.gps_data_queue.put(gps_data)
+ logging.info(f"GPS Data received via USB: Lat {gps_data.latitude:.6f}, Lon {gps_data.longitude:.6f}, Alt {gps_data.altitude:.1f}m, Pitch {gps_data.pitch:.1f}°")
+ except Exception as e:
+ logging.error(f"Error processing GPS data via USB: {e}")
+ time.sleep(0.1)
+
+ def process_radar_packet(self, packet):
+ """Step 40: Process radar data and apply pitch correction"""
+ try:
+ if packet['type'] == 'range':
+ range_meters = packet['range'] * 0.1
+
+ # Apply pitch correction to elevation
+ raw_elevation = packet['elevation']
+ corrected_elevation = self.apply_pitch_correction(raw_elevation, self.current_gps.pitch)
+
+ # Store correction for display
+ self.corrected_elevations.append({
+ 'raw': raw_elevation,
+ 'corrected': corrected_elevation,
+ 'pitch': self.current_gps.pitch,
+ 'timestamp': packet['timestamp']
+ })
+
+ # Keep only recent corrections
+ if len(self.corrected_elevations) > 100:
+ self.corrected_elevations = self.corrected_elevations[-100:]
+
+ target = RadarTarget(
+ id=packet['chirp'],
+ range=range_meters,
+ velocity=0,
+ azimuth=packet['azimuth'],
+ elevation=corrected_elevation, # Use corrected elevation
+ snr=20.0,
+ timestamp=packet['timestamp']
+ )
+
+ self.update_range_doppler_map(target)
+
+ elif packet['type'] == 'doppler':
+ lambda_wavelength = 3e8 / self.settings.system_frequency
+ velocity = (packet['doppler_real'] / 32767.0) * (self.settings.prf1 * lambda_wavelength / 2)
+ self.update_target_velocity(packet, velocity)
+
+ elif packet['type'] == 'detection':
+ if packet['detected']:
+ # Apply pitch correction to detection elevation
+ raw_elevation = packet['elevation']
+ corrected_elevation = self.apply_pitch_correction(raw_elevation, self.current_gps.pitch)
+
+ logging.info(f"CFAR Detection: Raw Elev {raw_elevation}°, Corrected Elev {corrected_elevation:.1f}°, Pitch {self.current_gps.pitch:.1f}°")
+
+ except Exception as e:
+ logging.error(f"Error processing radar packet: {e}")
+
+ def update_range_doppler_map(self, target):
+ """Update range-Doppler map with new target"""
+ range_bin = min(int(target.range / 50), 1023)
+ doppler_bin = min(abs(int(target.velocity)), 31)
+
+ self.radar_processor.range_doppler_map[range_bin, doppler_bin] += 1
+
+ self.radar_processor.detected_targets.append(target)
+
+ if len(self.radar_processor.detected_targets) > 100:
+ self.radar_processor.detected_targets = self.radar_processor.detected_targets[-100:]
+
+ def update_target_velocity(self, packet, velocity):
+ """Update target velocity information"""
+ for target in self.radar_processor.detected_targets:
+ if (target.azimuth == packet['azimuth'] and
+ target.elevation == packet['elevation'] and
+ target.id == packet['chirp']):
+ target.velocity = velocity
+ break
+
+ def setup_map_tab(self):
+ """Setup the map display tab with Google Maps"""
+ map_frame = ttk.Frame(self.tab_map)
+ map_frame.pack(fill='both', expand=True, padx=10, pady=10)
+
+ # Map controls
+ controls_frame = ttk.Frame(map_frame)
+ controls_frame.pack(fill='x', pady=5)
+
+ ttk.Button(controls_frame, text="Open Map in Browser",
+ command=self.open_map_in_browser).pack(side='left', padx=5)
+
+ ttk.Button(controls_frame, text="Refresh Map",
+ command=self.refresh_map).pack(side='left', padx=5)
+
+ self.map_status_label = ttk.Label(controls_frame, text="Map: Ready to generate")
+ self.map_status_label.pack(side='left', padx=20)
+
+ # Map info display
+ info_frame = ttk.Frame(map_frame)
+ info_frame.pack(fill='x', pady=5)
+
+ self.map_info_label = ttk.Label(info_frame, text="No GPS data received yet", font=('Arial', 10))
+ self.map_info_label.pack()
+
+ def open_map_in_browser(self):
+ """Open the generated map in the default web browser"""
+ if self.map_file_path and os.path.exists(self.map_file_path):
+ webbrowser.open('file://' + os.path.abspath(self.map_file_path))
+ else:
+ messagebox.showwarning("Warning", "No map file available. Generate map first by receiving GPS data.")
+
+ def refresh_map(self):
+ """Refresh the map with current data"""
+ self.generate_map()
+
+ def generate_map(self):
+ """Generate Google Maps HTML file with current targets"""
+ if self.current_gps.latitude == 0 and self.current_gps.longitude == 0:
+ self.map_status_label.config(text="Map: Waiting for GPS data")
+ return
+
+ try:
+ # Create temporary HTML file
+ with tempfile.NamedTemporaryFile(mode='w', suffix='.html', delete=False, encoding='utf-8') as f:
+ map_html = self.map_generator.generate_map(
+ self.current_gps,
+ self.radar_processor.detected_targets,
+ self.settings.map_size,
+ self.google_maps_api_key
+ )
+ f.write(map_html)
+ self.map_file_path = f.name
+
+ self.map_status_label.config(text=f"Map: Generated at {self.map_file_path}")
+ self.map_info_label.config(
+ text=f"Radar: {self.current_gps.latitude:.6f}, {self.current_gps.longitude:.6f} | "
+ f"Targets: {len(self.radar_processor.detected_targets)} | "
+ f"Coverage: {self.settings.map_size/1000:.1f}km"
+ )
+ logging.info(f"Map generated: {self.map_file_path}")
+
+ except Exception as e:
+ logging.error(f"Error generating map: {e}")
+ self.map_status_label.config(text=f"Map: Error - {str(e)}")
+
+ def update_gps_display(self):
+ """Step 18: Update GPS and pitch display"""
+ try:
+ while not self.gps_data_queue.empty():
+ gps_data = self.gps_data_queue.get_nowait()
+ self.current_gps = gps_data
+
+ # Update GPS label
+ self.gps_label.config(
+ text=f"GPS: Lat {gps_data.latitude:.6f}, Lon {gps_data.longitude:.6f}, Alt {gps_data.altitude:.1f}m")
+
+ # Update pitch label with color coding
+ pitch_text = f"Pitch: {gps_data.pitch:+.1f}°"
+ self.pitch_label.config(text=pitch_text)
+
+ # Color code based on pitch magnitude
+ if abs(gps_data.pitch) > 10:
+ self.pitch_label.config(foreground='red') # High pitch warning
+ elif abs(gps_data.pitch) > 5:
+ self.pitch_label.config(foreground='orange') # Medium pitch
+ else:
+ self.pitch_label.config(foreground='green') # Normal pitch
+
+ # Generate/update map when new GPS data arrives
+ self.generate_map()
+
+ except queue.Empty:
+ pass
+
+ def setup_settings_tab(self):
+ """Setup the settings tab with additional chirp durations and map size"""
+ settings_frame = ttk.Frame(self.tab_settings)
+ settings_frame.pack(fill='both', expand=True, padx=10, pady=10)
+
+ entries = [
+ ('System Frequency (Hz):', 'system_frequency', 10e9),
+ ('Chirp Duration 1 - Long (s):', 'chirp_duration_1', 30e-6),
+ ('Chirp Duration 2 - Short (s):', 'chirp_duration_2', 0.5e-6),
+ ('Chirps per Position:', 'chirps_per_position', 32),
+ ('Frequency Min (Hz):', 'freq_min', 10e6),
+ ('Frequency Max (Hz):', 'freq_max', 30e6),
+ ('PRF1 (Hz):', 'prf1', 1000),
+ ('PRF2 (Hz):', 'prf2', 2000),
+ ('Max Distance (m):', 'max_distance', 50000),
+ ('Map Size (m):', 'map_size', 50000),
+ ('Google Maps API Key:', 'google_maps_api_key', 'YOUR_GOOGLE_MAPS_API_KEY')
+ ]
+
+ self.settings_vars = {}
+
+ for i, (label, attr, default) in enumerate(entries):
+ ttk.Label(settings_frame, text=label).grid(row=i, column=0, sticky='w', padx=5, pady=5)
+ var = tk.StringVar(value=str(default))
+ entry = ttk.Entry(settings_frame, textvariable=var, width=25)
+ entry.grid(row=i, column=1, padx=5, pady=5)
+ self.settings_vars[attr] = var
+
+ ttk.Button(settings_frame, text="Apply Settings",
+ command=self.apply_settings).grid(row=len(entries), column=0, columnspan=2, pady=10)
+
+ def apply_settings(self):
+ """Step 13: Apply and send radar settings via USB"""
+ try:
+ self.settings.system_frequency = float(self.settings_vars['system_frequency'].get())
+ self.settings.chirp_duration_1 = float(self.settings_vars['chirp_duration_1'].get())
+ self.settings.chirp_duration_2 = float(self.settings_vars['chirp_duration_2'].get())
+ self.settings.chirps_per_position = int(self.settings_vars['chirps_per_position'].get())
+ self.settings.freq_min = float(self.settings_vars['freq_min'].get())
+ self.settings.freq_max = float(self.settings_vars['freq_max'].get())
+ self.settings.prf1 = float(self.settings_vars['prf1'].get())
+ self.settings.prf2 = float(self.settings_vars['prf2'].get())
+ self.settings.max_distance = float(self.settings_vars['max_distance'].get())
+ self.settings.map_size = float(self.settings_vars['map_size'].get())
+ self.google_maps_api_key = self.settings_vars['google_maps_api_key'].get()
+
+ if self.stm32_usb_interface.is_open:
+ self.stm32_usb_interface.send_settings(self.settings)
+
+ messagebox.showinfo("Success", "Settings applied and sent to STM32 via USB")
+ logging.info("Radar settings applied via USB")
+
+ except ValueError as e:
+ messagebox.showerror("Error", f"Invalid setting value: {e}")
+
+ def update_targets_list(self):
+ """Update the targets list display with corrected elevations"""
+ for item in self.targets_tree.get_children():
+ self.targets_tree.delete(item)
+
+ for target in self.radar_processor.detected_targets[-20:]:
+ # Find the corresponding raw elevation if available
+ raw_elevation = "N/A"
+ for correction in self.corrected_elevations[-20:]:
+ if abs(correction['corrected'] - target.elevation) < 0.1: # Fuzzy match
+ raw_elevation = f"{correction['raw']}"
+ break
+
+ self.targets_tree.insert('', 'end', values=(
+ target.track_id,
+ f"{target.range:.1f}",
+ f"{target.velocity:.1f}",
+ target.azimuth,
+ raw_elevation, # Show raw elevation
+ f"{target.elevation:.1f}", # Show corrected elevation
+ f"{target.snr:.1f}"
+ ))
+
+ def start_background_threads(self):
+ """Start background data processing threads"""
+ self.radar_thread = threading.Thread(target=self.process_radar_data, daemon=True)
+ self.radar_thread.start()
+
+ self.gps_thread = threading.Thread(target=self.process_gps_data, daemon=True)
+ self.gps_thread.start()
+
+ self.root.after(100, self.update_gui)
+
+ def process_radar_data(self):
+ """Step 39: Process incoming radar data from FTDI"""
+ buffer = b''
+ while True:
+ if self.running and self.ftdi_interface.is_open:
+ try:
+ data = self.ftdi_interface.read_data(4096)
+ if data:
+ buffer += data
+
+ while len(buffer) >= 6:
+ packet = self.radar_packet_parser.parse_packet(buffer)
+ if packet:
+ self.process_radar_packet(packet)
+ packet_length = 4 + len(packet.get('payload', b'')) + 2
+ buffer = buffer[packet_length:]
+ self.received_packets += 1
+ else:
+ break
+
+ except Exception as e:
+ logging.error(f"Error processing radar data: {e}")
+ time.sleep(0.1)
+ else:
+ time.sleep(0.1)
+
+ def process_gps_data(self):
+ """Step 16/17: Process GPS data from STM32 via USB CDC"""
+ while True:
+ if self.running and self.stm32_usb_interface.is_open:
+ try:
+ # Read data from STM32 USB
+ data = self.stm32_usb_interface.read_data(64, timeout=100)
+ if data:
+ gps_data = self.usb_packet_parser.parse_gps_data(data)
+ if gps_data:
+ self.gps_data_queue.put(gps_data)
+ logging.info(f"GPS Data received via USB: Lat {gps_data.latitude:.6f}, Lon {gps_data.longitude:.6f}, Alt {gps_data.altitude:.1f}m, Pitch {gps_data.pitch:.1f}°")
+ except Exception as e:
+ logging.error(f"Error processing GPS data via USB: {e}")
+ time.sleep(0.1)
+
+ def update_gui(self):
+ """Step 40: Update all GUI displays"""
+ try:
+ # Update status with pitch information
+ if self.running:
+ self.status_label.config(
+ text=f"Status: Running - Packets: {self.received_packets} - Pitch: {self.current_gps.pitch:+.1f}°")
+
+ # Update range-Doppler map
+ if hasattr(self, 'range_doppler_plot'):
+ display_data = np.log10(self.radar_processor.range_doppler_map + 1)
+ self.range_doppler_plot.set_array(display_data)
+ self.canvas.draw_idle()
+
+ # Update targets list
+ self.update_targets_list()
+
+ # Update GPS and pitch display
+ self.update_gps_display()
+
+ except Exception as e:
+ logging.error(f"Error updating GUI: {e}")
+
+ self.root.after(100, self.update_gui)
def main():
"""Main application entry point"""
@@ -613,5 +1723,4 @@ def main():
messagebox.showerror("Fatal Error", f"Application failed to start: {e}")
if __name__ == "__main__":
-
main()