#!/usr/bin/env python3 """ AERIS-10 Radar Dashboard — Board Bring-Up Edition =================================================== Real-time visualization and control for the AERIS-10 phased-array radar via FT601 USB 3.0 interface. Features: - FT601 USB reader with packet parsing (matches usb_data_interface.v) - Real-time range-Doppler magnitude heatmap (64x32) - CFAR detection overlay (flagged cells highlighted) - Range profile waterfall plot (range vs. time) - Host command sender (opcodes 0x01-0x27, 0x30, 0xFF) - Configuration panel for all radar parameters - HDF5 data recording for offline analysis - Mock mode for development/testing without hardware Usage: python radar_dashboard.py # Launch with mock data python radar_dashboard.py --live # Launch with FT601 hardware python radar_dashboard.py --record # Launch with HDF5 recording """ import sys import os import time import queue import logging import argparse import threading from typing import Optional, Dict from collections import deque import numpy as np import tkinter as tk from tkinter import ttk, filedialog import matplotlib matplotlib.use("TkAgg") from matplotlib.figure import Figure from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg # Import protocol layer (no GUI deps) from radar_protocol import ( RadarProtocol, FT601Connection, ReplayConnection, DataRecorder, RadarAcquisition, RadarFrame, StatusResponse, Opcode, NUM_RANGE_BINS, NUM_DOPPLER_BINS, WATERFALL_DEPTH, ) logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", datefmt="%H:%M:%S", ) log = logging.getLogger("radar_dashboard") # ============================================================================ # Dashboard GUI # ============================================================================ # Dark theme colors BG = "#1e1e2e" BG2 = "#282840" FG = "#cdd6f4" ACCENT = "#89b4fa" GREEN = "#a6e3a1" RED = "#f38ba8" YELLOW = "#f9e2af" SURFACE = "#313244" class RadarDashboard: """Main tkinter application: real-time radar visualization and control.""" UPDATE_INTERVAL_MS = 100 # 10 Hz display refresh # Radar parameters for physical axis labels (ADI CN0566 defaults) # Config: [sample_rate=4e6, IF=1e5, RF=9.9e9, chirps=256, BW=500e6, # ramp_time=300e-6, ...] SAMPLE_RATE = 4e6 # Hz — ADC sample rate (baseband) BANDWIDTH = 500e6 # Hz — chirp bandwidth RAMP_TIME = 300e-6 # s — chirp ramp time CENTER_FREQ = 10.5e9 # Hz — X-band center frequency NUM_CHIRPS_FRAME = 32 # chirps per Doppler frame C = 3e8 # m/s — speed of light def __init__(self, root: tk.Tk, connection: FT601Connection, recorder: DataRecorder): self.root = root self.conn = connection self.recorder = recorder self.root.title("AERIS-10 Radar Dashboard — Bring-Up Edition") self.root.geometry("1600x950") self.root.configure(bg=BG) # Frame queue (acquisition → display) self.frame_queue: queue.Queue[RadarFrame] = queue.Queue(maxsize=8) self._acq_thread: Optional[RadarAcquisition] = None # Display state self._current_frame = RadarFrame() self._waterfall = deque(maxlen=WATERFALL_DEPTH) for _ in range(WATERFALL_DEPTH): self._waterfall.append(np.zeros(NUM_RANGE_BINS)) self._frame_count = 0 self._fps_ts = time.time() self._fps = 0.0 # Stable colorscale — exponential moving average of vmax self._vmax_ema = 1000.0 self._vmax_alpha = 0.15 # smoothing factor (lower = more stable) self._build_ui() self._schedule_update() # ------------------------------------------------------------------ UI def _build_ui(self): style = ttk.Style() style.theme_use("clam") style.configure(".", background=BG, foreground=FG, fieldbackground=SURFACE) style.configure("TFrame", background=BG) style.configure("TLabel", background=BG, foreground=FG) style.configure("TButton", background=SURFACE, foreground=FG) style.configure("TLabelframe", background=BG, foreground=ACCENT) style.configure("TLabelframe.Label", background=BG, foreground=ACCENT) style.configure("Accent.TButton", background=ACCENT, foreground=BG) style.configure("TNotebook", background=BG) style.configure("TNotebook.Tab", background=SURFACE, foreground=FG, padding=[12, 4]) style.map("TNotebook.Tab", background=[("selected", ACCENT)], foreground=[("selected", BG)]) # Top bar top = ttk.Frame(self.root) top.pack(fill="x", padx=8, pady=(8, 0)) self.lbl_status = ttk.Label(top, text="DISCONNECTED", foreground=RED, font=("Menlo", 11, "bold")) self.lbl_status.pack(side="left", padx=8) self.lbl_fps = ttk.Label(top, text="0.0 fps", font=("Menlo", 10)) self.lbl_fps.pack(side="left", padx=16) self.lbl_detections = ttk.Label(top, text="Det: 0", font=("Menlo", 10)) self.lbl_detections.pack(side="left", padx=16) self.lbl_frame = ttk.Label(top, text="Frame: 0", font=("Menlo", 10)) self.lbl_frame.pack(side="left", padx=16) self.btn_connect = ttk.Button(top, text="Connect", command=self._on_connect, style="Accent.TButton") self.btn_connect.pack(side="right", padx=4) self.btn_record = ttk.Button(top, text="Record", command=self._on_record) self.btn_record.pack(side="right", padx=4) # Notebook (tabs) nb = ttk.Notebook(self.root) nb.pack(fill="both", expand=True, padx=8, pady=8) tab_display = ttk.Frame(nb) tab_control = ttk.Frame(nb) tab_log = ttk.Frame(nb) nb.add(tab_display, text=" Display ") nb.add(tab_control, text=" Control ") nb.add(tab_log, text=" Log ") self._build_display_tab(tab_display) self._build_control_tab(tab_control) self._build_log_tab(tab_log) def _build_display_tab(self, parent): # Compute physical axis limits # Range resolution: dR = c / (2 * BW) per range bin # But we decimate 1024→64 bins, so each bin spans 16 FFT bins. # Range per FFT bin = c / (2 * BW) * (Fs / FFT_SIZE) — simplified: # max_range = c * Fs / (4 * BW) for Fs-sampled baseband # range_per_bin = max_range / NUM_RANGE_BINS range_res = self.C / (2.0 * self.BANDWIDTH) # ~0.3 m per FFT bin # After decimation 1024→64, each range bin = 16 FFT bins range_per_bin = range_res * 16 max_range = range_per_bin * NUM_RANGE_BINS # Velocity resolution: dv = lambda / (2 * N_chirps * T_chirp) wavelength = self.C / self.CENTER_FREQ # Max unambiguous velocity = lambda / (4 * T_chirp) max_vel = wavelength / (4.0 * self.RAMP_TIME) vel_per_bin = 2.0 * max_vel / NUM_DOPPLER_BINS # Doppler axis: bin 0 = 0 Hz (DC), wraps at Nyquist # For display: center DC, so shift axis to [-max_vel, +max_vel) vel_lo = -max_vel vel_hi = max_vel # Matplotlib figure with 3 subplots self.fig = Figure(figsize=(14, 7), facecolor=BG) self.fig.subplots_adjust(left=0.07, right=0.98, top=0.94, bottom=0.10, wspace=0.30, hspace=0.35) # Range-Doppler heatmap self.ax_rd = self.fig.add_subplot(1, 3, (1, 2)) self.ax_rd.set_facecolor(BG2) self._rd_img = self.ax_rd.imshow( np.zeros((NUM_RANGE_BINS, NUM_DOPPLER_BINS)), aspect="auto", cmap="inferno", origin="lower", extent=[vel_lo, vel_hi, 0, max_range], vmin=0, vmax=1000, ) self.ax_rd.set_title("Range-Doppler Map", color=FG, fontsize=12) self.ax_rd.set_xlabel("Velocity (m/s)", color=FG) self.ax_rd.set_ylabel("Range (m)", color=FG) self.ax_rd.tick_params(colors=FG) # Save axis limits for coordinate conversions self._vel_lo = vel_lo self._vel_hi = vel_hi self._max_range = max_range self._range_per_bin = range_per_bin self._vel_per_bin = vel_per_bin # CFAR detection overlay (scatter) self._det_scatter = self.ax_rd.scatter([], [], s=30, c=GREEN, marker="x", linewidths=1.5, zorder=5, label="CFAR Det") # Waterfall plot (range profile vs time) self.ax_wf = self.fig.add_subplot(1, 3, 3) self.ax_wf.set_facecolor(BG2) wf_init = np.zeros((WATERFALL_DEPTH, NUM_RANGE_BINS)) self._wf_img = self.ax_wf.imshow( wf_init, aspect="auto", cmap="viridis", origin="lower", extent=[0, max_range, 0, WATERFALL_DEPTH], vmin=0, vmax=5000, ) self.ax_wf.set_title("Range Waterfall", color=FG, fontsize=12) self.ax_wf.set_xlabel("Range (m)", color=FG) self.ax_wf.set_ylabel("Frame", color=FG) self.ax_wf.tick_params(colors=FG) canvas = FigureCanvasTkAgg(self.fig, master=parent) canvas.draw() canvas.get_tk_widget().pack(fill="both", expand=True) self._canvas = canvas def _build_control_tab(self, parent): """Host command sender and configuration panel.""" outer = ttk.Frame(parent) outer.pack(fill="both", expand=True, padx=16, pady=16) # Left column: Quick actions left = ttk.LabelFrame(outer, text="Quick Actions", padding=12) left.grid(row=0, column=0, sticky="nsew", padx=(0, 8)) ttk.Button(left, text="Trigger Chirp (0x01)", command=lambda: self._send_cmd(0x01, 1)).pack(fill="x", pady=3) ttk.Button(left, text="Enable MTI (0x26)", command=lambda: self._send_cmd(0x26, 1)).pack(fill="x", pady=3) ttk.Button(left, text="Disable MTI (0x26)", command=lambda: self._send_cmd(0x26, 0)).pack(fill="x", pady=3) ttk.Button(left, text="Enable CFAR (0x25)", command=lambda: self._send_cmd(0x25, 1)).pack(fill="x", pady=3) ttk.Button(left, text="Disable CFAR (0x25)", command=lambda: self._send_cmd(0x25, 0)).pack(fill="x", pady=3) ttk.Button(left, text="Request Status (0xFF)", command=lambda: self._send_cmd(0xFF, 0)).pack(fill="x", pady=3) # Right column: Parameter configuration right = ttk.LabelFrame(outer, text="Parameter Configuration", padding=12) right.grid(row=0, column=1, sticky="nsew", padx=(8, 0)) self._param_vars: Dict[str, tk.StringVar] = {} params = [ ("CFAR Guard (0x21)", 0x21, "2"), ("CFAR Train (0x22)", 0x22, "8"), ("CFAR Alpha Q4.4 (0x23)", 0x23, "48"), ("CFAR Mode (0x24)", 0x24, "0"), ("Threshold (0x10)", 0x10, "500"), ("Gain Shift (0x16)", 0x16, "0"), ("DC Notch Width (0x27)", 0x27, "0"), ("Range Mode (0x20)", 0x20, "0"), ("Stream Enable (0x04)", 0x04, "7"), ] for row_idx, (label, opcode, default) in enumerate(params): ttk.Label(right, text=label).grid(row=row_idx, column=0, sticky="w", pady=2) var = tk.StringVar(value=default) self._param_vars[str(opcode)] = var ent = ttk.Entry(right, textvariable=var, width=10) ent.grid(row=row_idx, column=1, padx=8, pady=2) ttk.Button( right, text="Set", command=lambda op=opcode, v=var: self._send_cmd(op, int(v.get())) ).grid(row=row_idx, column=2, pady=2) # Custom command ttk.Separator(right, orient="horizontal").grid( row=len(params), column=0, columnspan=3, sticky="ew", pady=8) ttk.Label(right, text="Custom Opcode (hex)").grid( row=len(params) + 1, column=0, sticky="w") self._custom_op = tk.StringVar(value="01") ttk.Entry(right, textvariable=self._custom_op, width=10).grid( row=len(params) + 1, column=1, padx=8) ttk.Label(right, text="Value (dec)").grid( row=len(params) + 2, column=0, sticky="w") self._custom_val = tk.StringVar(value="0") ttk.Entry(right, textvariable=self._custom_val, width=10).grid( row=len(params) + 2, column=1, padx=8) ttk.Button(right, text="Send Custom", command=self._send_custom).grid( row=len(params) + 2, column=2, pady=2) outer.columnconfigure(0, weight=1) outer.columnconfigure(1, weight=2) outer.rowconfigure(0, weight=1) def _build_log_tab(self, parent): self.log_text = tk.Text(parent, bg=BG2, fg=FG, font=("Menlo", 10), insertbackground=FG, wrap="word") self.log_text.pack(fill="both", expand=True, padx=8, pady=8) # Redirect log handler to text widget handler = _TextHandler(self.log_text) handler.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(message)s", datefmt="%H:%M:%S")) logging.getLogger().addHandler(handler) # ------------------------------------------------------------ Actions def _on_connect(self): if self.conn.is_open: # Disconnect if self._acq_thread is not None: self._acq_thread.stop() self._acq_thread.join(timeout=2) self._acq_thread = None self.conn.close() self.lbl_status.config(text="DISCONNECTED", foreground=RED) self.btn_connect.config(text="Connect") log.info("Disconnected") return # Open connection in a background thread to avoid blocking the GUI self.lbl_status.config(text="CONNECTING...", foreground=YELLOW) self.btn_connect.config(state="disabled") self.root.update_idletasks() def _do_connect(): ok = self.conn.open() # Schedule UI update back on the main thread self.root.after(0, lambda: self._on_connect_done(ok)) threading.Thread(target=_do_connect, daemon=True).start() def _on_connect_done(self, success: bool): """Called on main thread after connection attempt completes.""" self.btn_connect.config(state="normal") if success: self.lbl_status.config(text="CONNECTED", foreground=GREEN) self.btn_connect.config(text="Disconnect") self._acq_thread = RadarAcquisition( self.conn, self.frame_queue, self.recorder) self._acq_thread.start() log.info("Connected and acquisition started") else: self.lbl_status.config(text="CONNECT FAILED", foreground=RED) self.btn_connect.config(text="Connect") def _on_record(self): if self.recorder.recording: self.recorder.stop() self.btn_record.config(text="Record") return filepath = filedialog.asksaveasfilename( defaultextension=".h5", filetypes=[("HDF5", "*.h5"), ("All", "*.*")], initialfile=f"radar_{time.strftime('%Y%m%d_%H%M%S')}.h5", ) if filepath: self.recorder.start(filepath) self.btn_record.config(text="Stop Rec") def _send_cmd(self, opcode: int, value: int): cmd = RadarProtocol.build_command(opcode, value) ok = self.conn.write(cmd) log.info(f"CMD 0x{opcode:02X} val={value} ({'OK' if ok else 'FAIL'})") def _send_custom(self): try: op = int(self._custom_op.get(), 16) val = int(self._custom_val.get()) self._send_cmd(op, val) except ValueError: log.error("Invalid custom command values") # --------------------------------------------------------- Display loop def _schedule_update(self): self._update_display() self.root.after(self.UPDATE_INTERVAL_MS, self._schedule_update) def _update_display(self): """Pull latest frame from queue and update plots.""" frame = None # Drain queue, keep latest while True: try: frame = self.frame_queue.get_nowait() except queue.Empty: break if frame is None: return self._current_frame = frame self._frame_count += 1 # FPS calculation now = time.time() dt = now - self._fps_ts if dt > 0.5: self._fps = self._frame_count / dt self._frame_count = 0 self._fps_ts = now # Update labels self.lbl_fps.config(text=f"{self._fps:.1f} fps") self.lbl_detections.config(text=f"Det: {frame.detection_count}") self.lbl_frame.config(text=f"Frame: {frame.frame_number}") # Update range-Doppler heatmap # FFT-shift Doppler axis so DC (bin 0) is in the center mag = np.fft.fftshift(frame.magnitude, axes=1) det_shifted = np.fft.fftshift(frame.detections, axes=1) # Stable colorscale via EMA smoothing of vmax frame_vmax = float(np.max(mag)) if np.max(mag) > 0 else 1.0 self._vmax_ema = (self._vmax_alpha * frame_vmax + (1.0 - self._vmax_alpha) * self._vmax_ema) stable_vmax = max(self._vmax_ema, 1.0) self._rd_img.set_data(mag) self._rd_img.set_clim(vmin=0, vmax=stable_vmax) # Update CFAR overlay — convert bin indices to physical coordinates det_coords = np.argwhere(det_shifted > 0) if len(det_coords) > 0: # det_coords[:, 0] = range bin, det_coords[:, 1] = Doppler bin range_m = (det_coords[:, 0] + 0.5) * self._range_per_bin vel_ms = self._vel_lo + (det_coords[:, 1] + 0.5) * self._vel_per_bin offsets = np.column_stack([vel_ms, range_m]) self._det_scatter.set_offsets(offsets) else: self._det_scatter.set_offsets(np.empty((0, 2))) # Update waterfall self._waterfall.append(frame.range_profile.copy()) wf_arr = np.array(list(self._waterfall)) wf_max = max(np.max(wf_arr), 1.0) self._wf_img.set_data(wf_arr) self._wf_img.set_clim(vmin=0, vmax=wf_max) self._canvas.draw_idle() class _TextHandler(logging.Handler): """Logging handler that writes to a tkinter Text widget.""" def __init__(self, text_widget: tk.Text): super().__init__() self._text = text_widget def emit(self, record): msg = self.format(record) try: self._text.after(0, self._append, msg) except Exception: pass def _append(self, msg: str): self._text.insert("end", msg + "\n") self._text.see("end") # Keep last 500 lines lines = int(self._text.index("end-1c").split(".")[0]) if lines > 500: self._text.delete("1.0", f"{lines - 500}.0") # ============================================================================ # Entry Point # ============================================================================ def main(): parser = argparse.ArgumentParser(description="AERIS-10 Radar Dashboard") parser.add_argument("--live", action="store_true", help="Use real FT601 hardware (default: mock mode)") parser.add_argument("--replay", type=str, metavar="NPY_DIR", help="Replay real data from .npy directory " "(e.g. tb/cosim/real_data/hex/)") parser.add_argument("--no-mti", action="store_true", help="With --replay, use non-MTI Doppler data") parser.add_argument("--record", action="store_true", help="Start HDF5 recording immediately") parser.add_argument("--device", type=int, default=0, help="FT601 device index (default: 0)") args = parser.parse_args() if args.replay: npy_dir = os.path.abspath(args.replay) conn = ReplayConnection(npy_dir, use_mti=not args.no_mti) mode_str = f"REPLAY ({npy_dir}, MTI={'OFF' if args.no_mti else 'ON'})" elif args.live: conn = FT601Connection(mock=False) mode_str = "LIVE" else: conn = FT601Connection(mock=True) mode_str = "MOCK" recorder = DataRecorder() root = tk.Tk() dashboard = RadarDashboard(root, conn, recorder) if args.record: filepath = os.path.join( os.getcwd(), f"radar_{time.strftime('%Y%m%d_%H%M%S')}.h5" ) recorder.start(filepath) def on_closing(): if dashboard._acq_thread is not None: dashboard._acq_thread.stop() dashboard._acq_thread.join(timeout=2) if conn.is_open: conn.close() if recorder.recording: recorder.stop() root.destroy() root.protocol("WM_DELETE_WINDOW", on_closing) log.info(f"Dashboard started (mode={mode_str})") root.mainloop() if __name__ == "__main__": main()