diff --git a/.github/workflows/ci-tests.yml b/.github/workflows/ci-tests.yml index 33db606..be7637a 100644 --- a/.github/workflows/ci-tests.yml +++ b/.github/workflows/ci-tests.yml @@ -46,7 +46,7 @@ jobs: - name: Unit tests run: > uv run pytest - 9_Firmware/9_3_GUI/test_radar_dashboard.py + 9_Firmware/9_3_GUI/test_GUI_V65_Tk.py 9_Firmware/9_3_GUI/test_v7.py -v --tb=short diff --git a/9_Firmware/9_3_GUI/radar_dashboard.py b/9_Firmware/9_3_GUI/GUI_V65_Tk.py similarity index 60% rename from 9_Firmware/9_3_GUI/radar_dashboard.py rename to 9_Firmware/9_3_GUI/GUI_V65_Tk.py index 3d6988c..6755534 100644 --- a/9_Firmware/9_3_GUI/radar_dashboard.py +++ b/9_Firmware/9_3_GUI/GUI_V65_Tk.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 """ -AERIS-10 Radar Dashboard +AERIS-10 Radar Dashboard (Tkinter) =================================================== Real-time visualization and control for the AERIS-10 phased-array radar via FT2232H USB 2.0 interface. @@ -14,22 +14,33 @@ Features: 0x01-0x04, 0x10-0x16, 0x20-0x27, 0x30-0x31, 0xFF) - Configuration panel for all radar parameters - HDF5 data recording for offline analysis + - Replay mode (co-sim dirs, raw IQ .npy, HDF5) with transport controls + - Demo mode with synthetic moving targets + - Detected targets table + - Dual dispatch: FPGA controls route to SoftwareFPGA during replay - Mock mode for development/testing without hardware Usage: - python radar_dashboard.py # Launch with mock data - python radar_dashboard.py --live # Launch with FT2232H hardware - python radar_dashboard.py --record # Launch with HDF5 recording + python GUI_V65_Tk.py # Launch with mock data + python GUI_V65_Tk.py --live # Launch with FT2232H hardware + python GUI_V65_Tk.py --record # Launch with HDF5 recording + python GUI_V65_Tk.py --replay path/to/data # Auto-load replay + python GUI_V65_Tk.py --demo # Start in demo mode """ import os +import math import time +import copy import queue +import random import logging import argparse import threading import contextlib from collections import deque +from pathlib import Path +from typing import ClassVar import numpy as np @@ -54,7 +65,7 @@ logging.basicConfig( format="%(asctime)s [%(levelname)s] %(message)s", datefmt="%H:%M:%S", ) -log = logging.getLogger("radar_dashboard") +log = logging.getLogger("GUI_V65_Tk") @@ -73,6 +84,296 @@ YELLOW = "#f9e2af" SURFACE = "#313244" +# ============================================================================ +# Demo Target Simulator (Tkinter timer-based) +# ============================================================================ + +class DemoTarget: + """Single simulated target with kinematics.""" + + __slots__ = ("azimuth", "classification", "id", "range_m", "snr", "velocity") + + # Physical range grid: 64 bins x ~4.8 m/bin = ~307 m max + _RANGE_PER_BIN: float = (3e8 / (2 * 500e6)) * 16 # ~4.8 m + _MAX_RANGE: float = _RANGE_PER_BIN * NUM_RANGE_BINS # ~307 m + + def __init__(self, tid: int): + self.id = tid + self.range_m = random.uniform(20, self._MAX_RANGE - 20) + self.velocity = random.uniform(-10, 10) + self.azimuth = random.uniform(0, 360) + self.snr = random.uniform(10, 35) + self.classification = random.choice( + ["aircraft", "drone", "bird", "unknown"]) + + def step(self) -> bool: + """Advance one tick. Return False if target exits coverage.""" + self.range_m -= self.velocity * 0.1 + if self.range_m < 5 or self.range_m > self._MAX_RANGE: + return False + self.velocity = max(-20, min(20, self.velocity + random.uniform(-1, 1))) + self.azimuth = (self.azimuth + random.uniform(-0.5, 0.5)) % 360 + self.snr = max(0, min(50, self.snr + random.uniform(-1, 1))) + return True + + +class DemoSimulator: + """Timer-driven demo target generator for the Tkinter dashboard. + + Produces synthetic ``RadarFrame`` objects and a target list each tick, + pushing them into the dashboard's ``frame_queue`` and ``_ui_queue``. + """ + + def __init__(self, frame_queue: queue.Queue, ui_queue: queue.Queue, + root: tk.Tk, interval_ms: int = 500): + self._frame_queue = frame_queue + self._ui_queue = ui_queue + self._root = root + self._interval_ms = interval_ms + self._targets: list[DemoTarget] = [] + self._next_id = 1 + self._frame_number = 0 + self._after_id: str | None = None + + # Seed initial targets + for _ in range(8): + self._add_target() + + def start(self): + self._tick() + + def stop(self): + if self._after_id is not None: + self._root.after_cancel(self._after_id) + self._after_id = None + + def add_random_target(self): + self._add_target() + + def _add_target(self): + t = DemoTarget(self._next_id) + self._next_id += 1 + self._targets.append(t) + + def _tick(self): + updated: list[DemoTarget] = [t for t in self._targets if t.step()] + if len(updated) < 5 or (random.random() < 0.05 and len(updated) < 15): + self._add_target() + updated.append(self._targets[-1]) + self._targets = updated + + # Synthesize a RadarFrame with Gaussian blobs for each target + frame = self._make_frame(updated) + with contextlib.suppress(queue.Full): + self._frame_queue.put_nowait(frame) + + # Post target info for the detected-targets treeview + target_dicts = [ + {"id": t.id, "range_m": t.range_m, "velocity": t.velocity, + "azimuth": t.azimuth, "snr": t.snr, "class": t.classification} + for t in updated + ] + self._ui_queue.put(("demo_targets", target_dicts)) + + self._after_id = self._root.after(self._interval_ms, self._tick) + + def _make_frame(self, targets: list[DemoTarget]) -> RadarFrame: + """Build a synthetic RadarFrame from target list.""" + mag = np.zeros((NUM_RANGE_BINS, NUM_DOPPLER_BINS), dtype=np.float64) + det = np.zeros((NUM_RANGE_BINS, NUM_DOPPLER_BINS), dtype=np.uint8) + + # Range/Doppler scaling (approximate) + range_per_bin = (3e8 / (2 * 500e6)) * 16 # ~4.8 m/bin + max_range = range_per_bin * NUM_RANGE_BINS + vel_per_bin = 1.484 # m/s per Doppler bin (from WaveformConfig) + + for t in targets: + if t.range_m > max_range or t.range_m < 0: + continue + r_bin = int(t.range_m / range_per_bin) + d_bin = int((t.velocity / vel_per_bin) + NUM_DOPPLER_BINS / 2) + r_bin = max(0, min(NUM_RANGE_BINS - 1, r_bin)) + d_bin = max(0, min(NUM_DOPPLER_BINS - 1, d_bin)) + + # Gaussian-ish blob + amplitude = 500 + t.snr * 200 + for dr in range(-2, 3): + for dd in range(-1, 2): + ri = r_bin + dr + di = d_bin + dd + if 0 <= ri < NUM_RANGE_BINS and 0 <= di < NUM_DOPPLER_BINS: + w = math.exp(-0.5 * (dr**2 + dd**2)) + mag[ri, di] += amplitude * w + if w > 0.5: + det[ri, di] = 1 + + rd_i = (mag * 0.5).astype(np.int16) + rd_q = np.zeros_like(rd_i) + rp = mag.max(axis=1) + + self._frame_number += 1 + return RadarFrame( + timestamp=time.time(), + range_doppler_i=rd_i, + range_doppler_q=rd_q, + magnitude=mag, + detections=det, + range_profile=rp, + detection_count=int(det.sum()), + frame_number=self._frame_number, + ) + + +# ============================================================================ +# Replay Controller (threading-based, reuses v7.ReplayEngine) +# ============================================================================ + +class _ReplayController: + """Manages replay playback in a background thread for the Tkinter dashboard. + + Imports ``ReplayEngine`` and ``SoftwareFPGA`` from ``v7`` lazily so + they are only required when replay is actually used. + """ + + # Speed multiplier → frame interval in seconds + SPEED_MAP: ClassVar[dict[str, float]] = { + "0.25x": 0.400, + "0.5x": 0.200, + "1x": 0.100, + "2x": 0.050, + "5x": 0.020, + "10x": 0.010, + } + + def __init__(self, frame_queue: queue.Queue, ui_queue: queue.Queue): + self._frame_queue = frame_queue + self._ui_queue = ui_queue + self._engine = None # lazy + self._software_fpga = None # lazy + self._thread: threading.Thread | None = None + self._play_event = threading.Event() + self._stop_event = threading.Event() + self._lock = threading.Lock() + self._current_index = 0 + self._last_emitted_index = -1 + self._loop = False + self._frame_interval = 0.100 # 1x speed + + def load(self, path: str) -> int: + """Load replay data from path. Returns total frames or raises.""" + from v7.replay import ReplayEngine, ReplayFormat, detect_format + from v7.software_fpga import SoftwareFPGA + + fmt = detect_format(path) + if fmt == ReplayFormat.RAW_IQ_NPY: + self._software_fpga = SoftwareFPGA() + self._engine = ReplayEngine(path, software_fpga=self._software_fpga) + else: + self._engine = ReplayEngine(path) + + self._current_index = 0 + self._last_emitted_index = -1 + self._stop_event.clear() + self._play_event.clear() + return self._engine.total_frames + + @property + def total_frames(self) -> int: + return self._engine.total_frames if self._engine else 0 + + @property + def current_index(self) -> int: + return self._last_emitted_index if self._last_emitted_index >= 0 else 0 + + @property + def is_playing(self) -> bool: + return self._play_event.is_set() + + @property + def software_fpga(self): + return self._software_fpga + + def set_speed(self, label: str): + self._frame_interval = self.SPEED_MAP.get(label, 0.100) + + def set_loop(self, loop: bool): + self._loop = loop + + def play(self): + self._play_event.set() + with self._lock: + if self._current_index >= self.total_frames: + self._current_index = 0 + self._ui_queue.put(("replay_state", "playing")) + if self._thread is None or not self._thread.is_alive(): + self._stop_event.clear() + self._thread = threading.Thread(target=self._run, daemon=True) + self._thread.start() + + def pause(self): + self._play_event.clear() + self._ui_queue.put(("replay_state", "paused")) + + def stop(self): + self._stop_event.set() + self._play_event.set() # unblock wait so thread exits promptly + with self._lock: + self._current_index = 0 + self._last_emitted_index = -1 + if self._thread is not None: + self._thread.join(timeout=2) + self._thread = None + self._play_event.clear() + self._ui_queue.put(("replay_state", "stopped")) + + def close(self): + """Stop playback and release underlying engine resources.""" + self.stop() + if self._engine is not None: + self._engine.close() + self._engine = None + self._software_fpga = None + + def seek(self, index: int): + with self._lock: + self._current_index = max(0, min(index, self.total_frames - 1)) + self._emit_frame() + self._last_emitted_index = self._current_index + # Advance past the emitted frame so _run doesn't re-emit it + self._current_index += 1 + + def _run(self): + while not self._stop_event.is_set(): + # Block until play or stop is signalled — no busy-sleep + self._play_event.wait() + if self._stop_event.is_set(): + break + with self._lock: + if self._current_index >= self.total_frames: + if self._loop: + self._current_index = 0 + else: + self._play_event.clear() + self._ui_queue.put(("replay_state", "paused")) + continue + self._emit_frame() + self._last_emitted_index = self._current_index + idx = self._current_index + self._current_index += 1 + self._ui_queue.put(("replay_index", (idx, self.total_frames))) + time.sleep(self._frame_interval) + + def _emit_frame(self): + """Get current frame and push to queue. Must be called with lock held.""" + if self._engine is None: + return + frame = self._engine.get_frame(self._current_index) + if frame is not None: + frame = copy.deepcopy(frame) + with contextlib.suppress(queue.Full): + self._frame_queue.put_nowait(frame) + + class RadarDashboard: """Main tkinter application: real-time radar visualization and control.""" @@ -93,7 +394,7 @@ class RadarDashboard: self.root.geometry("1600x950") self.root.configure(bg=BG) - # Frame queue (acquisition → display) + # Frame queue (acquisition / replay / demo → display) self.frame_queue: queue.Queue[RadarFrame] = queue.Queue(maxsize=8) self._acq_thread: RadarAcquisition | None = None @@ -126,6 +427,17 @@ class RadarDashboard: self._agc_last_redraw: float = 0.0 # throttle chart redraws self._AGC_REDRAW_INTERVAL: float = 0.5 # seconds between redraws + # Replay state + self._replay_ctrl: _ReplayController | None = None + self._replay_active = False + + # Demo state + self._demo_sim: DemoSimulator | None = None + self._demo_active = False + + # Detected targets (from demo or replay host-DSP) + self._detected_targets: list[dict] = [] + self._build_ui() self._schedule_update() @@ -171,30 +483,33 @@ class RadarDashboard: self.btn_record = ttk.Button(top, text="Record", command=self._on_record) self.btn_record.pack(side="right", padx=4) + self.btn_demo = ttk.Button(top, text="Start Demo", + command=self._toggle_demo) + self.btn_demo.pack(side="right", padx=4) + # -- Tabbed notebook layout -- nb = ttk.Notebook(self.root) nb.pack(fill="both", expand=True, padx=8, pady=8) tab_display = ttk.Frame(nb) tab_control = ttk.Frame(nb) + tab_replay = ttk.Frame(nb) tab_agc = ttk.Frame(nb) tab_log = ttk.Frame(nb) nb.add(tab_display, text=" Display ") nb.add(tab_control, text=" Control ") + nb.add(tab_replay, text=" Replay ") nb.add(tab_agc, text=" AGC Monitor ") nb.add(tab_log, text=" Log ") self._build_display_tab(tab_display) self._build_control_tab(tab_control) + self._build_replay_tab(tab_replay) self._build_agc_tab(tab_agc) self._build_log_tab(tab_log) def _build_display_tab(self, parent): # Compute physical axis limits - # Range resolution: dR = c / (2 * BW) per range bin - # But we decimate 1024→64 bins, so each bin spans 16 FFT bins. - # Range resolution derivation: c/(2*BW) gives ~0.3 m per FFT bin. - # After 1024-to-64 decimation each displayed range bin spans 16 FFT bins. range_res = self.C / (2.0 * self.BANDWIDTH) # ~0.3 m per FFT bin # After decimation 1024→64, each range bin = 16 FFT bins range_per_bin = range_res * 16 @@ -203,8 +518,12 @@ class RadarDashboard: doppler_bin_lo = 0 doppler_bin_hi = NUM_DOPPLER_BINS + # Top pane: plots + plot_frame = ttk.Frame(parent) + plot_frame.pack(fill="both", expand=True) + # Matplotlib figure with 3 subplots - self.fig = Figure(figsize=(14, 7), facecolor=BG) + self.fig = Figure(figsize=(14, 5), facecolor=BG) self.fig.subplots_adjust(left=0.07, right=0.98, top=0.94, bottom=0.10, wspace=0.30, hspace=0.35) @@ -245,11 +564,35 @@ class RadarDashboard: self.ax_wf.set_ylabel("Frame", color=FG) self.ax_wf.tick_params(colors=FG) - canvas = FigureCanvasTkAgg(self.fig, master=parent) + canvas = FigureCanvasTkAgg(self.fig, master=plot_frame) canvas.draw() canvas.get_tk_widget().pack(fill="both", expand=True) self._canvas = canvas + # Bottom pane: detected targets table + tgt_frame = ttk.LabelFrame(parent, text="Detected Targets", padding=4) + tgt_frame.pack(fill="x", padx=8, pady=(0, 4)) + + cols = ("id", "range_m", "velocity", "azimuth", "snr", "class") + self._tgt_tree = ttk.Treeview( + tgt_frame, columns=cols, show="headings", height=5) + for col, heading, width in [ + ("id", "ID", 50), + ("range_m", "Range (m)", 100), + ("velocity", "Vel (m/s)", 90), + ("azimuth", "Az (deg)", 90), + ("snr", "SNR (dB)", 80), + ("class", "Class", 100), + ]: + self._tgt_tree.heading(col, text=heading) + self._tgt_tree.column(col, width=width, anchor="center") + + scrollbar = ttk.Scrollbar( + tgt_frame, orient="vertical", command=self._tgt_tree.yview) + self._tgt_tree.configure(yscrollcommand=scrollbar.set) + self._tgt_tree.pack(side="left", fill="x", expand=True) + scrollbar.pack(side="right", fill="y") + def _build_control_tab(self, parent): """Host command sender — organized by FPGA register groups. @@ -492,6 +835,86 @@ class RadarDashboard: var.set(str(clamped)) self._send_cmd(opcode, clamped) + def _build_replay_tab(self, parent): + """Replay tab — load file, transport controls, seek slider.""" + # File selection + file_frame = ttk.LabelFrame(parent, text="Replay Source", padding=10) + file_frame.pack(fill="x", padx=8, pady=(8, 4)) + + self._replay_path_var = tk.StringVar(value="(none)") + ttk.Label(file_frame, textvariable=self._replay_path_var, + font=("Menlo", 9)).pack(side="left", fill="x", expand=True) + + ttk.Button(file_frame, text="Browse File...", + command=self._replay_browse_file).pack(side="right", padx=(4, 0)) + ttk.Button(file_frame, text="Browse Dir...", + command=self._replay_browse_dir).pack(side="right", padx=(4, 0)) + + # Transport controls + ctrl_frame = ttk.LabelFrame(parent, text="Transport", padding=10) + ctrl_frame.pack(fill="x", padx=8, pady=4) + + btn_row = ttk.Frame(ctrl_frame) + btn_row.pack(fill="x", pady=(0, 6)) + + self._rp_play_btn = ttk.Button( + btn_row, text="Play", command=self._replay_play, state="disabled") + self._rp_play_btn.pack(side="left", padx=2) + + self._rp_pause_btn = ttk.Button( + btn_row, text="Pause", command=self._replay_pause, state="disabled") + self._rp_pause_btn.pack(side="left", padx=2) + + self._rp_stop_btn = ttk.Button( + btn_row, text="Stop", command=self._replay_stop, state="disabled") + self._rp_stop_btn.pack(side="left", padx=2) + + # Speed selector + ttk.Label(btn_row, text="Speed:").pack(side="left", padx=(16, 4)) + self._rp_speed_var = tk.StringVar(value="1x") + speed_combo = ttk.Combobox( + btn_row, textvariable=self._rp_speed_var, + values=list(_ReplayController.SPEED_MAP.keys()), + state="readonly", width=6) + speed_combo.pack(side="left", padx=2) + speed_combo.bind("<>", self._replay_speed_changed) + + # Loop checkbox + self._rp_loop_var = tk.BooleanVar(value=False) + ttk.Checkbutton(btn_row, text="Loop", + variable=self._rp_loop_var, + command=self._replay_loop_changed).pack(side="left", padx=8) + + # Seek slider + slider_row = ttk.Frame(ctrl_frame) + slider_row.pack(fill="x") + + self._rp_slider = tk.Scale( + slider_row, from_=0, to=0, orient="horizontal", + bg=SURFACE, fg=FG, highlightthickness=0, + troughcolor=BG2, command=self._replay_seek) + self._rp_slider.pack(side="left", fill="x", expand=True) + + self._rp_frame_label = ttk.Label( + slider_row, text="0 / 0", font=("Menlo", 10)) + self._rp_frame_label.pack(side="right", padx=8) + + # Status + self._rp_status_label = ttk.Label( + parent, text="No replay loaded", font=("Menlo", 10)) + self._rp_status_label.pack(padx=8, pady=4, anchor="w") + + # Info frame for FPGA controls during replay + info = ttk.LabelFrame(parent, text="Replay FPGA Controls", padding=10) + info.pack(fill="x", padx=8, pady=4) + ttk.Label( + info, + text=("When replaying Raw IQ data, FPGA Control tab " + "parameters are routed to the SoftwareFPGA.\n" + "Changes take effect on the next frame."), + font=("Menlo", 9), foreground=ACCENT, + ).pack(anchor="w") + def _build_agc_tab(self, parent): """AGC Monitor tab — real-time strip charts for gain, peak, and saturation.""" # Top row: AGC status badge + saturation indicator @@ -602,6 +1025,12 @@ class RadarDashboard: log.info("Disconnected") return + # Stop any active demo or replay before going live + if self._demo_active: + self._stop_demo() + if self._replay_active: + self._replay_stop() + # Open connection in a background thread to avoid blocking the GUI self.lbl_status.config(text="CONNECTING...", foreground=YELLOW) self.btn_connect.config(state="disabled") @@ -644,7 +1073,37 @@ class RadarDashboard: self.recorder.start(filepath) self.btn_record.config(text="Stop Rec") + # Opcode → SoftwareFPGA setter method name for dual dispatch during replay + _SFPGA_SETTER_NAMES: ClassVar[dict[int, str]] = { + 0x03: "set_detect_threshold", + 0x16: "set_gain_shift", + 0x21: "set_cfar_guard", + 0x22: "set_cfar_train", + 0x23: "set_cfar_alpha", + 0x24: "set_cfar_mode", + 0x25: "set_cfar_enable", + 0x26: "set_mti_enable", + 0x27: "set_dc_notch_width", + 0x28: "set_agc_enable", + } + def _send_cmd(self, opcode: int, value: int): + """Send command — routes to SoftwareFPGA when replaying raw IQ.""" + if (self._replay_active and self._replay_ctrl is not None + and self._replay_ctrl.software_fpga is not None): + sfpga = self._replay_ctrl.software_fpga + setter_name = self._SFPGA_SETTER_NAMES.get(opcode) + if setter_name is not None: + getattr(sfpga, setter_name)(value) + log.info( + f"SoftwareFPGA 0x{opcode:02X} val={value}") + return + log.warning( + f"Opcode 0x{opcode:02X} not routable in replay mode") + self._ui_queue.put( + ("status_msg", + f"Opcode 0x{opcode:02X} is hardware-only (ignored in replay)")) + return cmd = RadarProtocol.build_command(opcode, value) ok = self.conn.write(cmd) log.info(f"CMD 0x{opcode:02X} val={value} ({'OK' if ok else 'FAIL'})") @@ -657,6 +1116,133 @@ class RadarDashboard: except ValueError: log.error("Invalid custom command values") + # -------------------------------------------------------- Replay actions + def _replay_browse_file(self): + path = filedialog.askopenfilename( + title="Select replay file", + filetypes=[ + ("NumPy files", "*.npy"), + ("HDF5 files", "*.h5"), + ("All files", "*.*"), + ], + ) + if path: + self._replay_load(path) + + def _replay_browse_dir(self): + path = filedialog.askdirectory(title="Select co-sim directory") + if path: + self._replay_load(path) + + def _replay_load(self, path: str): + """Load replay data and enable transport controls.""" + # Stop any running mode + if self._demo_active: + self._stop_demo() + # Safely shutdown and disable UI controls before loading the new file + if self._replay_active or self._replay_ctrl is not None: + self._replay_stop() + if self._acq_thread is not None: + if self.conn.is_open: + self._on_connect() # disconnect + else: + # Connection dropped unexpectedly — just clean up the thread + self._acq_thread.stop() + self._acq_thread.join(timeout=2) + self._acq_thread = None + + try: + self._replay_ctrl = _ReplayController( + self.frame_queue, self._ui_queue) + total = self._replay_ctrl.load(path) + except Exception as exc: # noqa: BLE001 + log.error(f"Failed to load replay: {exc}") + self._rp_status_label.config( + text=f"Load failed: {exc}", foreground=RED) + self._replay_ctrl = None + return + + short_path = Path(path).name + self._replay_path_var.set(short_path) + self._rp_slider.config(to=max(0, total - 1)) + self._rp_frame_label.config(text=f"0 / {total}") + self._rp_status_label.config( + text=f"Loaded: {total} frames from {short_path}", + foreground=GREEN) + + # Enable transport buttons + for btn in (self._rp_play_btn, self._rp_pause_btn, self._rp_stop_btn): + btn.config(state="normal") + + self._replay_active = True + self.lbl_status.config(text="REPLAY", foreground=ACCENT) + log.info(f"Replay loaded: {total} frames from {path}") + + def _replay_play(self): + if self._replay_ctrl: + self._replay_ctrl.play() + + def _replay_pause(self): + if self._replay_ctrl: + self._replay_ctrl.pause() + + def _replay_stop(self): + if self._replay_ctrl: + self._replay_ctrl.close() + self._replay_ctrl = None + self._replay_active = False + self.lbl_status.config(text="DISCONNECTED", foreground=RED) + self._rp_slider.set(0) + self._rp_frame_label.config(text="0 / 0") + for btn in (self._rp_play_btn, self._rp_pause_btn, self._rp_stop_btn): + btn.config(state="disabled") + + def _replay_seek(self, value): + if (self._replay_ctrl and self._replay_active + and not self._replay_ctrl.is_playing): + self._replay_ctrl.seek(int(value)) + + def _replay_speed_changed(self, _event=None): + if self._replay_ctrl: + self._replay_ctrl.set_speed(self._rp_speed_var.get()) + + def _replay_loop_changed(self): + if self._replay_ctrl: + self._replay_ctrl.set_loop(self._rp_loop_var.get()) + + # ---------------------------------------------------------- Demo actions + def _toggle_demo(self): + if self._demo_active: + self._stop_demo() + else: + self._start_demo() + + def _start_demo(self): + """Start demo mode with synthetic targets.""" + # Mutual exclusion + if self._replay_active: + self._replay_stop() + if self._acq_thread is not None: + log.warning("Cannot start demo while radar is connected") + return + + self._demo_sim = DemoSimulator( + self.frame_queue, self._ui_queue, self.root, interval_ms=500) + self._demo_sim.start() + self._demo_active = True + self.lbl_status.config(text="DEMO", foreground=YELLOW) + self.btn_demo.config(text="Stop Demo") + log.info("Demo mode started") + + def _stop_demo(self): + if self._demo_sim is not None: + self._demo_sim.stop() + self._demo_sim = None + self._demo_active = False + self.lbl_status.config(text="DISCONNECTED", foreground=RED) + self.btn_demo.config(text="Start Demo") + log.info("Demo mode stopped") + def _on_status_received(self, status: StatusResponse): """Called from acquisition thread — post to UI queue for main thread.""" self._ui_queue.put(("status", status)) @@ -804,6 +1390,46 @@ class RadarDashboard: self._update_self_test_labels(payload) elif tag == "log": self._log_handler_append(payload) + elif tag == "replay_state": + self._on_replay_state(payload) + elif tag == "replay_index": + self._on_replay_index(*payload) + elif tag == "demo_targets": + self._on_demo_targets(payload) + elif tag == "status_msg": + self.lbl_status.config(text=str(payload), foreground=YELLOW) + + def _on_replay_state(self, state: str): + if state == "playing": + self._rp_status_label.config(text="Playing", foreground=GREEN) + elif state == "paused": + self._rp_status_label.config(text="Paused", foreground=YELLOW) + elif state == "stopped": + self._rp_status_label.config(text="Stopped", foreground=FG) + + def _on_replay_index(self, index: int, total: int): + self._rp_frame_label.config(text=f"{index} / {total}") + self._rp_slider.set(index) + + def _on_demo_targets(self, targets: list[dict]): + """Update the detected targets treeview from demo data.""" + self._update_targets_table(targets) + + def _update_targets_table(self, targets: list[dict]): + """Refresh the detected targets treeview.""" + # Clear existing rows + for item in self._tgt_tree.get_children(): + self._tgt_tree.delete(item) + # Insert new rows + for t in targets: + self._tgt_tree.insert("", "end", values=( + t.get("id", ""), + f"{t.get('range_m', 0):.0f}", + f"{t.get('velocity', 0):.1f}", + f"{t.get('azimuth', 0):.1f}", + f"{t.get('snr', 0):.1f}", + t.get("class", ""), + )) def _log_handler_append(self, msg: str): """Append a log message to the log Text widget (main thread only).""" @@ -902,12 +1528,17 @@ class _TextHandler(logging.Handler): def main(): parser = argparse.ArgumentParser(description="AERIS-10 Radar Dashboard") - parser.add_argument("--live", action="store_true", - help="Use real FT2232H hardware (default: mock mode)") parser.add_argument("--record", action="store_true", help="Start HDF5 recording immediately") parser.add_argument("--device", type=int, default=0, help="FT2232H device index (default: 0)") + mode_group = parser.add_mutually_exclusive_group() + mode_group.add_argument("--live", action="store_true", + help="Use real FT2232H hardware (default: mock mode)") + mode_group.add_argument("--replay", type=str, default=None, + help="Auto-load replay file or directory on startup") + mode_group.add_argument("--demo", action="store_true", + help="Start in demo mode with synthetic targets") args = parser.parse_args() if args.live: @@ -930,7 +1561,19 @@ def main(): ) recorder.start(filepath) + if args.replay: + dashboard._replay_load(args.replay) + + if args.demo: + dashboard._start_demo() + def on_closing(): + # Stop demo if active + if dashboard._demo_active: + dashboard._stop_demo() + # Stop replay if active + if dashboard._replay_ctrl is not None: + dashboard._replay_ctrl.close() if dashboard._acq_thread is not None: dashboard._acq_thread.stop() dashboard._acq_thread.join(timeout=2) diff --git a/9_Firmware/9_3_GUI/GUI_versions.txt b/9_Firmware/9_3_GUI/GUI_versions.txt index c424412..5ed1fa2 100644 --- a/9_Firmware/9_3_GUI/GUI_versions.txt +++ b/9_Firmware/9_3_GUI/GUI_versions.txt @@ -8,6 +8,6 @@ GUI_V5 ==> Added Mercury Color GUI_V6 ==> Added USB3 FT601 support -radar_dashboard ==> Board bring-up dashboard (FT2232H reader, real-time R-D heatmap, CFAR overlay, waterfall, host commands, HDF5 recording) +GUI_V65_Tk ==> Board bring-up dashboard (FT2232H reader, real-time R-D heatmap, CFAR overlay, waterfall, host commands, HDF5 recording, replay, demo mode) radar_protocol ==> Protocol layer (packet parsing, command building, FT2232H connection, data recorder, acquisition thread) smoke_test ==> Board bring-up smoke test host script (triggers FPGA self-test via opcode 0x30) diff --git a/9_Firmware/9_3_GUI/test_radar_dashboard.py b/9_Firmware/9_3_GUI/test_GUI_V65_Tk.py similarity index 80% rename from 9_Firmware/9_3_GUI/test_radar_dashboard.py rename to 9_Firmware/9_3_GUI/test_GUI_V65_Tk.py index f94b85a..de5f18f 100644 --- a/9_Firmware/9_3_GUI/test_radar_dashboard.py +++ b/9_Firmware/9_3_GUI/test_GUI_V65_Tk.py @@ -3,8 +3,8 @@ Tests for AERIS-10 Radar Dashboard protocol parsing, command building, data recording, and acquisition logic. -Run: python -m pytest test_radar_dashboard.py -v - or: python test_radar_dashboard.py +Run: python -m pytest test_GUI_V65_Tk.py -v + or: python test_GUI_V65_Tk.py """ import struct @@ -22,6 +22,7 @@ from radar_protocol import ( NUM_RANGE_BINS, NUM_DOPPLER_BINS, DATA_PACKET_SIZE, ) +from GUI_V65_Tk import DemoTarget, DemoSimulator, _ReplayController class TestRadarProtocol(unittest.TestCase): @@ -719,5 +720,199 @@ class TestAGCVisualizationHistory(unittest.TestCase): self.assertAlmostEqual(max(200 * 1.5, 5), 300.0) +# ===================================================================== +# Tests for DemoTarget, DemoSimulator, and _ReplayController +# ===================================================================== + + +class TestDemoTarget(unittest.TestCase): + """Unit tests for DemoTarget kinematics.""" + + def test_initial_values_in_range(self): + t = DemoTarget(1) + self.assertEqual(t.id, 1) + self.assertGreaterEqual(t.range_m, 20) + self.assertLessEqual(t.range_m, DemoTarget._MAX_RANGE) + self.assertIn(t.classification, ["aircraft", "drone", "bird", "unknown"]) + + def test_step_returns_true_in_normal_range(self): + t = DemoTarget(2) + t.range_m = 150.0 + t.velocity = 0.0 + self.assertTrue(t.step()) + + def test_step_returns_false_when_out_of_range_high(self): + t = DemoTarget(3) + t.range_m = DemoTarget._MAX_RANGE + 1 + t.velocity = -1.0 # moving away + self.assertFalse(t.step()) + + def test_step_returns_false_when_out_of_range_low(self): + t = DemoTarget(4) + t.range_m = 2.0 + t.velocity = 1.0 # moving closer + self.assertFalse(t.step()) + + def test_velocity_clamped(self): + t = DemoTarget(5) + t.velocity = 19.0 + t.range_m = 150.0 + # Step many times — velocity should stay within [-20, 20] + for _ in range(100): + t.range_m = 150.0 # keep in range + t.step() + self.assertGreaterEqual(t.velocity, -20) + self.assertLessEqual(t.velocity, 20) + + def test_snr_clamped(self): + t = DemoTarget(6) + t.snr = 49.5 + t.range_m = 150.0 + for _ in range(100): + t.range_m = 150.0 + t.step() + self.assertGreaterEqual(t.snr, 0) + self.assertLessEqual(t.snr, 50) + + +class TestDemoSimulatorNoTk(unittest.TestCase): + """Test DemoSimulator logic without a real Tk event loop. + + We replace ``root.after`` with a mock to avoid needing a display. + """ + + def _make_simulator(self): + from unittest.mock import MagicMock + + fq = queue.Queue(maxsize=100) + uq = queue.Queue(maxsize=100) + mock_root = MagicMock() + # root.after(ms, fn) should return an id (str) + mock_root.after.return_value = "mock_after_id" + sim = DemoSimulator(fq, uq, mock_root, interval_ms=100) + return sim, fq, uq, mock_root + + def test_initial_targets_created(self): + sim, _fq, _uq, _root = self._make_simulator() + # Should seed 8 initial targets + self.assertEqual(len(sim._targets), 8) + + def test_tick_produces_frame_and_targets(self): + sim, fq, uq, _root = self._make_simulator() + sim._tick() + # Should have a frame + self.assertFalse(fq.empty()) + frame = fq.get_nowait() + self.assertIsInstance(frame, RadarFrame) + self.assertEqual(frame.frame_number, 1) + # Should have demo_targets in ui_queue + tag, payload = uq.get_nowait() + self.assertEqual(tag, "demo_targets") + self.assertIsInstance(payload, list) + + def test_tick_produces_nonzero_detections(self): + """Demo targets should actually render into the range-Doppler grid.""" + sim, fq, _uq, _root = self._make_simulator() + sim._tick() + frame = fq.get_nowait() + # At least some targets should produce magnitude > 0 and detections + self.assertGreater(frame.magnitude.sum(), 0, + "Demo targets should render into range-Doppler grid") + self.assertGreater(frame.detection_count, 0, + "Demo targets should produce detections") + + def test_stop_cancels_after(self): + sim, _fq, _uq, mock_root = self._make_simulator() + sim._tick() # sets _after_id + sim.stop() + mock_root.after_cancel.assert_called_once_with("mock_after_id") + self.assertIsNone(sim._after_id) + + +class TestReplayController(unittest.TestCase): + """Unit tests for _ReplayController (no GUI required).""" + + def test_initial_state(self): + fq = queue.Queue() + uq = queue.Queue() + ctrl = _ReplayController(fq, uq) + self.assertEqual(ctrl.total_frames, 0) + self.assertEqual(ctrl.current_index, 0) + self.assertFalse(ctrl.is_playing) + self.assertIsNone(ctrl.software_fpga) + + def test_set_speed(self): + ctrl = _ReplayController(queue.Queue(), queue.Queue()) + ctrl.set_speed("2x") + self.assertAlmostEqual(ctrl._frame_interval, 0.050) + + def test_set_speed_unknown_falls_back(self): + ctrl = _ReplayController(queue.Queue(), queue.Queue()) + ctrl.set_speed("99x") + self.assertAlmostEqual(ctrl._frame_interval, 0.100) + + def test_set_loop(self): + ctrl = _ReplayController(queue.Queue(), queue.Queue()) + ctrl.set_loop(True) + self.assertTrue(ctrl._loop) + ctrl.set_loop(False) + self.assertFalse(ctrl._loop) + + def test_seek_increments_past_emitted(self): + """After seek(), _current_index should be one past the seeked frame.""" + fq = queue.Queue(maxsize=100) + uq = queue.Queue(maxsize=100) + ctrl = _ReplayController(fq, uq) + # Manually set engine to a mock to allow seek + from unittest.mock import MagicMock + mock_engine = MagicMock() + mock_engine.total_frames = 10 + mock_engine.get_frame.return_value = RadarFrame() + ctrl._engine = mock_engine + ctrl.seek(5) + # _current_index should be 6 (past the emitted frame) + self.assertEqual(ctrl._current_index, 6) + self.assertEqual(ctrl._last_emitted_index, 5) + # Frame should be in the queue + self.assertFalse(fq.empty()) + + def test_seek_clamps_to_bounds(self): + from unittest.mock import MagicMock + + fq = queue.Queue(maxsize=100) + uq = queue.Queue(maxsize=100) + ctrl = _ReplayController(fq, uq) + mock_engine = MagicMock() + mock_engine.total_frames = 5 + mock_engine.get_frame.return_value = RadarFrame() + ctrl._engine = mock_engine + + ctrl.seek(100) + # Should clamp to last frame (index 4), then _current_index = 5 + self.assertEqual(ctrl._last_emitted_index, 4) + self.assertEqual(ctrl._current_index, 5) + + ctrl.seek(-10) + # Should clamp to 0, then _current_index = 1 + self.assertEqual(ctrl._last_emitted_index, 0) + self.assertEqual(ctrl._current_index, 1) + + def test_close_releases_engine(self): + from unittest.mock import MagicMock + + fq = queue.Queue(maxsize=100) + uq = queue.Queue(maxsize=100) + ctrl = _ReplayController(fq, uq) + mock_engine = MagicMock() + mock_engine.total_frames = 5 + mock_engine.get_frame.return_value = RadarFrame() + ctrl._engine = mock_engine + + ctrl.close() + mock_engine.close.assert_called_once() + self.assertIsNone(ctrl._engine) + self.assertIsNone(ctrl.software_fpga) + + if __name__ == "__main__": unittest.main(verbosity=2) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 4f97c55..6b34008 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -78,9 +78,9 @@ Every test binary must exit 0. ```bash cd 9_Firmware/9_3_GUI -python3 -m pytest test_radar_dashboard.py -v +python3 -m pytest test_GUI_V65_Tk.py -v # or without pytest: -python3 -m unittest test_radar_dashboard -v +python3 -m unittest test_GUI_V65_Tk -v ``` 57+ protocol and rendering tests. The `test_record_and_stop` test @@ -130,7 +130,7 @@ Before pushing, confirm: 1. `bash run_regression.sh` — all phases pass 2. `make all` (MCU tests) — 20/20 pass -3. `python3 -m unittest test_radar_dashboard -v` — all pass +3. `python3 -m unittest test_GUI_V65_Tk -v` — all pass 4. `python3 validate_mem_files.py` — all checks pass 5. `python3 compare.py dc && python3 compare_doppler.py stationary && python3 compare_mf.py all` 6. `git diff --check` — no whitespace issues