feat: rename Tkinter dashboard to GUI_V65_Tk, add replay/demo/targets parity

Rename radar_dashboard.py -> GUI_V65_Tk.py and add core feature parity
with the v7 PyQt dashboard while keeping Tkinter as the framework:

Replay mode:
- _ReplayController with threading.Event-based play/pause/stop
- Reuses v7.ReplayEngine and v7.SoftwareFPGA for all 3 input formats
- Dual dispatch routes FPGA control opcodes to SoftwareFPGA during
  raw IQ replay; non-routable opcodes show user-visible status message
- Seek slider with re-emit guard, speed combo, loop checkbox
- close() properly releases engine file handles on stop/reload

Demo mode:
- DemoTarget kinematics scaled to physical range grid (~307m max)
- DemoSimulator generates synthetic RadarFrames with Gaussian blobs
- Targets table (ttk.Treeview) updates from demo target list

Mode exclusion (bidirectional):
- Connect stops active demo/replay before starting acquisition
- Replay load stops previous controller and demo before loading
- Demo start stops active replay; refuses if live-connected
- --live/--replay/--demo in mutually exclusive CLI arg group

Bug fixes:
- seek() now increments past emitted frame to prevent re-emit on resume
- Failed replay load nulls controller ref to prevent dangling state

Tests: 17 new tests for DemoTarget, DemoSimulator, _ReplayController
CI: all 4 jobs pass (167+21+25+29 = 242 tests)
This commit is contained in:
Jason
2026-04-14 22:54:00 +05:45
parent 24b8442e40
commit 34ecaf360b
5 changed files with 859 additions and 21 deletions
+1 -1
View File
@@ -46,7 +46,7 @@ jobs:
- name: Unit tests - name: Unit tests
run: > run: >
uv run pytest uv run pytest
9_Firmware/9_3_GUI/test_radar_dashboard.py 9_Firmware/9_3_GUI/test_GUI_V65_Tk.py
9_Firmware/9_3_GUI/test_v7.py 9_Firmware/9_3_GUI/test_v7.py
-v --tb=short -v --tb=short
@@ -1,6 +1,6 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
""" """
AERIS-10 Radar Dashboard AERIS-10 Radar Dashboard (Tkinter)
=================================================== ===================================================
Real-time visualization and control for the AERIS-10 phased-array radar Real-time visualization and control for the AERIS-10 phased-array radar
via FT2232H USB 2.0 interface. via FT2232H USB 2.0 interface.
@@ -14,22 +14,33 @@ Features:
0x01-0x04, 0x10-0x16, 0x20-0x27, 0x30-0x31, 0xFF) 0x01-0x04, 0x10-0x16, 0x20-0x27, 0x30-0x31, 0xFF)
- Configuration panel for all radar parameters - Configuration panel for all radar parameters
- HDF5 data recording for offline analysis - HDF5 data recording for offline analysis
- Replay mode (co-sim dirs, raw IQ .npy, HDF5) with transport controls
- Demo mode with synthetic moving targets
- Detected targets table
- Dual dispatch: FPGA controls route to SoftwareFPGA during replay
- Mock mode for development/testing without hardware - Mock mode for development/testing without hardware
Usage: Usage:
python radar_dashboard.py # Launch with mock data python GUI_V65_Tk.py # Launch with mock data
python radar_dashboard.py --live # Launch with FT2232H hardware python GUI_V65_Tk.py --live # Launch with FT2232H hardware
python radar_dashboard.py --record # Launch with HDF5 recording python GUI_V65_Tk.py --record # Launch with HDF5 recording
python GUI_V65_Tk.py --replay path/to/data # Auto-load replay
python GUI_V65_Tk.py --demo # Start in demo mode
""" """
import os import os
import math
import time import time
import copy
import queue import queue
import random
import logging import logging
import argparse import argparse
import threading import threading
import contextlib import contextlib
from collections import deque from collections import deque
from pathlib import Path
from typing import ClassVar
import numpy as np import numpy as np
@@ -54,7 +65,7 @@ logging.basicConfig(
format="%(asctime)s [%(levelname)s] %(message)s", format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%H:%M:%S", datefmt="%H:%M:%S",
) )
log = logging.getLogger("radar_dashboard") log = logging.getLogger("GUI_V65_Tk")
@@ -73,6 +84,296 @@ YELLOW = "#f9e2af"
SURFACE = "#313244" SURFACE = "#313244"
# ============================================================================
# Demo Target Simulator (Tkinter timer-based)
# ============================================================================
class DemoTarget:
"""Single simulated target with kinematics."""
__slots__ = ("azimuth", "classification", "id", "range_m", "snr", "velocity")
# Physical range grid: 64 bins x ~4.8 m/bin = ~307 m max
_RANGE_PER_BIN: float = (3e8 / (2 * 500e6)) * 16 # ~4.8 m
_MAX_RANGE: float = _RANGE_PER_BIN * NUM_RANGE_BINS # ~307 m
def __init__(self, tid: int):
self.id = tid
self.range_m = random.uniform(20, self._MAX_RANGE - 20)
self.velocity = random.uniform(-10, 10)
self.azimuth = random.uniform(0, 360)
self.snr = random.uniform(10, 35)
self.classification = random.choice(
["aircraft", "drone", "bird", "unknown"])
def step(self) -> bool:
"""Advance one tick. Return False if target exits coverage."""
self.range_m -= self.velocity * 0.1
if self.range_m < 5 or self.range_m > self._MAX_RANGE:
return False
self.velocity = max(-20, min(20, self.velocity + random.uniform(-1, 1)))
self.azimuth = (self.azimuth + random.uniform(-0.5, 0.5)) % 360
self.snr = max(0, min(50, self.snr + random.uniform(-1, 1)))
return True
class DemoSimulator:
"""Timer-driven demo target generator for the Tkinter dashboard.
Produces synthetic ``RadarFrame`` objects and a target list each tick,
pushing them into the dashboard's ``frame_queue`` and ``_ui_queue``.
"""
def __init__(self, frame_queue: queue.Queue, ui_queue: queue.Queue,
root: tk.Tk, interval_ms: int = 500):
self._frame_queue = frame_queue
self._ui_queue = ui_queue
self._root = root
self._interval_ms = interval_ms
self._targets: list[DemoTarget] = []
self._next_id = 1
self._frame_number = 0
self._after_id: str | None = None
# Seed initial targets
for _ in range(8):
self._add_target()
def start(self):
self._tick()
def stop(self):
if self._after_id is not None:
self._root.after_cancel(self._after_id)
self._after_id = None
def add_random_target(self):
self._add_target()
def _add_target(self):
t = DemoTarget(self._next_id)
self._next_id += 1
self._targets.append(t)
def _tick(self):
updated: list[DemoTarget] = [t for t in self._targets if t.step()]
if len(updated) < 5 or (random.random() < 0.05 and len(updated) < 15):
self._add_target()
updated.append(self._targets[-1])
self._targets = updated
# Synthesize a RadarFrame with Gaussian blobs for each target
frame = self._make_frame(updated)
with contextlib.suppress(queue.Full):
self._frame_queue.put_nowait(frame)
# Post target info for the detected-targets treeview
target_dicts = [
{"id": t.id, "range_m": t.range_m, "velocity": t.velocity,
"azimuth": t.azimuth, "snr": t.snr, "class": t.classification}
for t in updated
]
self._ui_queue.put(("demo_targets", target_dicts))
self._after_id = self._root.after(self._interval_ms, self._tick)
def _make_frame(self, targets: list[DemoTarget]) -> RadarFrame:
"""Build a synthetic RadarFrame from target list."""
mag = np.zeros((NUM_RANGE_BINS, NUM_DOPPLER_BINS), dtype=np.float64)
det = np.zeros((NUM_RANGE_BINS, NUM_DOPPLER_BINS), dtype=np.uint8)
# Range/Doppler scaling (approximate)
range_per_bin = (3e8 / (2 * 500e6)) * 16 # ~4.8 m/bin
max_range = range_per_bin * NUM_RANGE_BINS
vel_per_bin = 1.484 # m/s per Doppler bin (from WaveformConfig)
for t in targets:
if t.range_m > max_range or t.range_m < 0:
continue
r_bin = int(t.range_m / range_per_bin)
d_bin = int((t.velocity / vel_per_bin) + NUM_DOPPLER_BINS / 2)
r_bin = max(0, min(NUM_RANGE_BINS - 1, r_bin))
d_bin = max(0, min(NUM_DOPPLER_BINS - 1, d_bin))
# Gaussian-ish blob
amplitude = 500 + t.snr * 200
for dr in range(-2, 3):
for dd in range(-1, 2):
ri = r_bin + dr
di = d_bin + dd
if 0 <= ri < NUM_RANGE_BINS and 0 <= di < NUM_DOPPLER_BINS:
w = math.exp(-0.5 * (dr**2 + dd**2))
mag[ri, di] += amplitude * w
if w > 0.5:
det[ri, di] = 1
rd_i = (mag * 0.5).astype(np.int16)
rd_q = np.zeros_like(rd_i)
rp = mag.max(axis=1)
self._frame_number += 1
return RadarFrame(
timestamp=time.time(),
range_doppler_i=rd_i,
range_doppler_q=rd_q,
magnitude=mag,
detections=det,
range_profile=rp,
detection_count=int(det.sum()),
frame_number=self._frame_number,
)
# ============================================================================
# Replay Controller (threading-based, reuses v7.ReplayEngine)
# ============================================================================
class _ReplayController:
"""Manages replay playback in a background thread for the Tkinter dashboard.
Imports ``ReplayEngine`` and ``SoftwareFPGA`` from ``v7`` lazily so
they are only required when replay is actually used.
"""
# Speed multiplier → frame interval in seconds
SPEED_MAP: ClassVar[dict[str, float]] = {
"0.25x": 0.400,
"0.5x": 0.200,
"1x": 0.100,
"2x": 0.050,
"5x": 0.020,
"10x": 0.010,
}
def __init__(self, frame_queue: queue.Queue, ui_queue: queue.Queue):
self._frame_queue = frame_queue
self._ui_queue = ui_queue
self._engine = None # lazy
self._software_fpga = None # lazy
self._thread: threading.Thread | None = None
self._play_event = threading.Event()
self._stop_event = threading.Event()
self._lock = threading.Lock()
self._current_index = 0
self._last_emitted_index = -1
self._loop = False
self._frame_interval = 0.100 # 1x speed
def load(self, path: str) -> int:
"""Load replay data from path. Returns total frames or raises."""
from v7.replay import ReplayEngine, ReplayFormat, detect_format
from v7.software_fpga import SoftwareFPGA
fmt = detect_format(path)
if fmt == ReplayFormat.RAW_IQ_NPY:
self._software_fpga = SoftwareFPGA()
self._engine = ReplayEngine(path, software_fpga=self._software_fpga)
else:
self._engine = ReplayEngine(path)
self._current_index = 0
self._last_emitted_index = -1
self._stop_event.clear()
self._play_event.clear()
return self._engine.total_frames
@property
def total_frames(self) -> int:
return self._engine.total_frames if self._engine else 0
@property
def current_index(self) -> int:
return self._last_emitted_index if self._last_emitted_index >= 0 else 0
@property
def is_playing(self) -> bool:
return self._play_event.is_set()
@property
def software_fpga(self):
return self._software_fpga
def set_speed(self, label: str):
self._frame_interval = self.SPEED_MAP.get(label, 0.100)
def set_loop(self, loop: bool):
self._loop = loop
def play(self):
self._play_event.set()
with self._lock:
if self._current_index >= self.total_frames:
self._current_index = 0
self._ui_queue.put(("replay_state", "playing"))
if self._thread is None or not self._thread.is_alive():
self._stop_event.clear()
self._thread = threading.Thread(target=self._run, daemon=True)
self._thread.start()
def pause(self):
self._play_event.clear()
self._ui_queue.put(("replay_state", "paused"))
def stop(self):
self._stop_event.set()
self._play_event.set() # unblock wait so thread exits promptly
with self._lock:
self._current_index = 0
self._last_emitted_index = -1
if self._thread is not None:
self._thread.join(timeout=2)
self._thread = None
self._play_event.clear()
self._ui_queue.put(("replay_state", "stopped"))
def close(self):
"""Stop playback and release underlying engine resources."""
self.stop()
if self._engine is not None:
self._engine.close()
self._engine = None
self._software_fpga = None
def seek(self, index: int):
with self._lock:
self._current_index = max(0, min(index, self.total_frames - 1))
self._emit_frame()
self._last_emitted_index = self._current_index
# Advance past the emitted frame so _run doesn't re-emit it
self._current_index += 1
def _run(self):
while not self._stop_event.is_set():
# Block until play or stop is signalled — no busy-sleep
self._play_event.wait()
if self._stop_event.is_set():
break
with self._lock:
if self._current_index >= self.total_frames:
if self._loop:
self._current_index = 0
else:
self._play_event.clear()
self._ui_queue.put(("replay_state", "paused"))
continue
self._emit_frame()
self._last_emitted_index = self._current_index
idx = self._current_index
self._current_index += 1
self._ui_queue.put(("replay_index", (idx, self.total_frames)))
time.sleep(self._frame_interval)
def _emit_frame(self):
"""Get current frame and push to queue. Must be called with lock held."""
if self._engine is None:
return
frame = self._engine.get_frame(self._current_index)
if frame is not None:
frame = copy.deepcopy(frame)
with contextlib.suppress(queue.Full):
self._frame_queue.put_nowait(frame)
class RadarDashboard: class RadarDashboard:
"""Main tkinter application: real-time radar visualization and control.""" """Main tkinter application: real-time radar visualization and control."""
@@ -93,7 +394,7 @@ class RadarDashboard:
self.root.geometry("1600x950") self.root.geometry("1600x950")
self.root.configure(bg=BG) self.root.configure(bg=BG)
# Frame queue (acquisition → display) # Frame queue (acquisition / replay / demo → display)
self.frame_queue: queue.Queue[RadarFrame] = queue.Queue(maxsize=8) self.frame_queue: queue.Queue[RadarFrame] = queue.Queue(maxsize=8)
self._acq_thread: RadarAcquisition | None = None self._acq_thread: RadarAcquisition | None = None
@@ -126,6 +427,17 @@ class RadarDashboard:
self._agc_last_redraw: float = 0.0 # throttle chart redraws self._agc_last_redraw: float = 0.0 # throttle chart redraws
self._AGC_REDRAW_INTERVAL: float = 0.5 # seconds between redraws self._AGC_REDRAW_INTERVAL: float = 0.5 # seconds between redraws
# Replay state
self._replay_ctrl: _ReplayController | None = None
self._replay_active = False
# Demo state
self._demo_sim: DemoSimulator | None = None
self._demo_active = False
# Detected targets (from demo or replay host-DSP)
self._detected_targets: list[dict] = []
self._build_ui() self._build_ui()
self._schedule_update() self._schedule_update()
@@ -171,30 +483,33 @@ class RadarDashboard:
self.btn_record = ttk.Button(top, text="Record", command=self._on_record) self.btn_record = ttk.Button(top, text="Record", command=self._on_record)
self.btn_record.pack(side="right", padx=4) self.btn_record.pack(side="right", padx=4)
self.btn_demo = ttk.Button(top, text="Start Demo",
command=self._toggle_demo)
self.btn_demo.pack(side="right", padx=4)
# -- Tabbed notebook layout -- # -- Tabbed notebook layout --
nb = ttk.Notebook(self.root) nb = ttk.Notebook(self.root)
nb.pack(fill="both", expand=True, padx=8, pady=8) nb.pack(fill="both", expand=True, padx=8, pady=8)
tab_display = ttk.Frame(nb) tab_display = ttk.Frame(nb)
tab_control = ttk.Frame(nb) tab_control = ttk.Frame(nb)
tab_replay = ttk.Frame(nb)
tab_agc = ttk.Frame(nb) tab_agc = ttk.Frame(nb)
tab_log = ttk.Frame(nb) tab_log = ttk.Frame(nb)
nb.add(tab_display, text=" Display ") nb.add(tab_display, text=" Display ")
nb.add(tab_control, text=" Control ") nb.add(tab_control, text=" Control ")
nb.add(tab_replay, text=" Replay ")
nb.add(tab_agc, text=" AGC Monitor ") nb.add(tab_agc, text=" AGC Monitor ")
nb.add(tab_log, text=" Log ") nb.add(tab_log, text=" Log ")
self._build_display_tab(tab_display) self._build_display_tab(tab_display)
self._build_control_tab(tab_control) self._build_control_tab(tab_control)
self._build_replay_tab(tab_replay)
self._build_agc_tab(tab_agc) self._build_agc_tab(tab_agc)
self._build_log_tab(tab_log) self._build_log_tab(tab_log)
def _build_display_tab(self, parent): def _build_display_tab(self, parent):
# Compute physical axis limits # Compute physical axis limits
# Range resolution: dR = c / (2 * BW) per range bin
# But we decimate 1024→64 bins, so each bin spans 16 FFT bins.
# Range resolution derivation: c/(2*BW) gives ~0.3 m per FFT bin.
# After 1024-to-64 decimation each displayed range bin spans 16 FFT bins.
range_res = self.C / (2.0 * self.BANDWIDTH) # ~0.3 m per FFT bin range_res = self.C / (2.0 * self.BANDWIDTH) # ~0.3 m per FFT bin
# After decimation 1024→64, each range bin = 16 FFT bins # After decimation 1024→64, each range bin = 16 FFT bins
range_per_bin = range_res * 16 range_per_bin = range_res * 16
@@ -203,8 +518,12 @@ class RadarDashboard:
doppler_bin_lo = 0 doppler_bin_lo = 0
doppler_bin_hi = NUM_DOPPLER_BINS doppler_bin_hi = NUM_DOPPLER_BINS
# Top pane: plots
plot_frame = ttk.Frame(parent)
plot_frame.pack(fill="both", expand=True)
# Matplotlib figure with 3 subplots # Matplotlib figure with 3 subplots
self.fig = Figure(figsize=(14, 7), facecolor=BG) self.fig = Figure(figsize=(14, 5), facecolor=BG)
self.fig.subplots_adjust(left=0.07, right=0.98, top=0.94, bottom=0.10, self.fig.subplots_adjust(left=0.07, right=0.98, top=0.94, bottom=0.10,
wspace=0.30, hspace=0.35) wspace=0.30, hspace=0.35)
@@ -245,11 +564,35 @@ class RadarDashboard:
self.ax_wf.set_ylabel("Frame", color=FG) self.ax_wf.set_ylabel("Frame", color=FG)
self.ax_wf.tick_params(colors=FG) self.ax_wf.tick_params(colors=FG)
canvas = FigureCanvasTkAgg(self.fig, master=parent) canvas = FigureCanvasTkAgg(self.fig, master=plot_frame)
canvas.draw() canvas.draw()
canvas.get_tk_widget().pack(fill="both", expand=True) canvas.get_tk_widget().pack(fill="both", expand=True)
self._canvas = canvas self._canvas = canvas
# Bottom pane: detected targets table
tgt_frame = ttk.LabelFrame(parent, text="Detected Targets", padding=4)
tgt_frame.pack(fill="x", padx=8, pady=(0, 4))
cols = ("id", "range_m", "velocity", "azimuth", "snr", "class")
self._tgt_tree = ttk.Treeview(
tgt_frame, columns=cols, show="headings", height=5)
for col, heading, width in [
("id", "ID", 50),
("range_m", "Range (m)", 100),
("velocity", "Vel (m/s)", 90),
("azimuth", "Az (deg)", 90),
("snr", "SNR (dB)", 80),
("class", "Class", 100),
]:
self._tgt_tree.heading(col, text=heading)
self._tgt_tree.column(col, width=width, anchor="center")
scrollbar = ttk.Scrollbar(
tgt_frame, orient="vertical", command=self._tgt_tree.yview)
self._tgt_tree.configure(yscrollcommand=scrollbar.set)
self._tgt_tree.pack(side="left", fill="x", expand=True)
scrollbar.pack(side="right", fill="y")
def _build_control_tab(self, parent): def _build_control_tab(self, parent):
"""Host command sender — organized by FPGA register groups. """Host command sender — organized by FPGA register groups.
@@ -492,6 +835,86 @@ class RadarDashboard:
var.set(str(clamped)) var.set(str(clamped))
self._send_cmd(opcode, clamped) self._send_cmd(opcode, clamped)
def _build_replay_tab(self, parent):
"""Replay tab — load file, transport controls, seek slider."""
# File selection
file_frame = ttk.LabelFrame(parent, text="Replay Source", padding=10)
file_frame.pack(fill="x", padx=8, pady=(8, 4))
self._replay_path_var = tk.StringVar(value="(none)")
ttk.Label(file_frame, textvariable=self._replay_path_var,
font=("Menlo", 9)).pack(side="left", fill="x", expand=True)
ttk.Button(file_frame, text="Browse File...",
command=self._replay_browse_file).pack(side="right", padx=(4, 0))
ttk.Button(file_frame, text="Browse Dir...",
command=self._replay_browse_dir).pack(side="right", padx=(4, 0))
# Transport controls
ctrl_frame = ttk.LabelFrame(parent, text="Transport", padding=10)
ctrl_frame.pack(fill="x", padx=8, pady=4)
btn_row = ttk.Frame(ctrl_frame)
btn_row.pack(fill="x", pady=(0, 6))
self._rp_play_btn = ttk.Button(
btn_row, text="Play", command=self._replay_play, state="disabled")
self._rp_play_btn.pack(side="left", padx=2)
self._rp_pause_btn = ttk.Button(
btn_row, text="Pause", command=self._replay_pause, state="disabled")
self._rp_pause_btn.pack(side="left", padx=2)
self._rp_stop_btn = ttk.Button(
btn_row, text="Stop", command=self._replay_stop, state="disabled")
self._rp_stop_btn.pack(side="left", padx=2)
# Speed selector
ttk.Label(btn_row, text="Speed:").pack(side="left", padx=(16, 4))
self._rp_speed_var = tk.StringVar(value="1x")
speed_combo = ttk.Combobox(
btn_row, textvariable=self._rp_speed_var,
values=list(_ReplayController.SPEED_MAP.keys()),
state="readonly", width=6)
speed_combo.pack(side="left", padx=2)
speed_combo.bind("<<ComboboxSelected>>", self._replay_speed_changed)
# Loop checkbox
self._rp_loop_var = tk.BooleanVar(value=False)
ttk.Checkbutton(btn_row, text="Loop",
variable=self._rp_loop_var,
command=self._replay_loop_changed).pack(side="left", padx=8)
# Seek slider
slider_row = ttk.Frame(ctrl_frame)
slider_row.pack(fill="x")
self._rp_slider = tk.Scale(
slider_row, from_=0, to=0, orient="horizontal",
bg=SURFACE, fg=FG, highlightthickness=0,
troughcolor=BG2, command=self._replay_seek)
self._rp_slider.pack(side="left", fill="x", expand=True)
self._rp_frame_label = ttk.Label(
slider_row, text="0 / 0", font=("Menlo", 10))
self._rp_frame_label.pack(side="right", padx=8)
# Status
self._rp_status_label = ttk.Label(
parent, text="No replay loaded", font=("Menlo", 10))
self._rp_status_label.pack(padx=8, pady=4, anchor="w")
# Info frame for FPGA controls during replay
info = ttk.LabelFrame(parent, text="Replay FPGA Controls", padding=10)
info.pack(fill="x", padx=8, pady=4)
ttk.Label(
info,
text=("When replaying Raw IQ data, FPGA Control tab "
"parameters are routed to the SoftwareFPGA.\n"
"Changes take effect on the next frame."),
font=("Menlo", 9), foreground=ACCENT,
).pack(anchor="w")
def _build_agc_tab(self, parent): def _build_agc_tab(self, parent):
"""AGC Monitor tab — real-time strip charts for gain, peak, and saturation.""" """AGC Monitor tab — real-time strip charts for gain, peak, and saturation."""
# Top row: AGC status badge + saturation indicator # Top row: AGC status badge + saturation indicator
@@ -602,6 +1025,12 @@ class RadarDashboard:
log.info("Disconnected") log.info("Disconnected")
return return
# Stop any active demo or replay before going live
if self._demo_active:
self._stop_demo()
if self._replay_active:
self._replay_stop()
# Open connection in a background thread to avoid blocking the GUI # Open connection in a background thread to avoid blocking the GUI
self.lbl_status.config(text="CONNECTING...", foreground=YELLOW) self.lbl_status.config(text="CONNECTING...", foreground=YELLOW)
self.btn_connect.config(state="disabled") self.btn_connect.config(state="disabled")
@@ -644,7 +1073,37 @@ class RadarDashboard:
self.recorder.start(filepath) self.recorder.start(filepath)
self.btn_record.config(text="Stop Rec") self.btn_record.config(text="Stop Rec")
# Opcode → SoftwareFPGA setter method name for dual dispatch during replay
_SFPGA_SETTER_NAMES: ClassVar[dict[int, str]] = {
0x03: "set_detect_threshold",
0x16: "set_gain_shift",
0x21: "set_cfar_guard",
0x22: "set_cfar_train",
0x23: "set_cfar_alpha",
0x24: "set_cfar_mode",
0x25: "set_cfar_enable",
0x26: "set_mti_enable",
0x27: "set_dc_notch_width",
0x28: "set_agc_enable",
}
def _send_cmd(self, opcode: int, value: int): def _send_cmd(self, opcode: int, value: int):
"""Send command — routes to SoftwareFPGA when replaying raw IQ."""
if (self._replay_active and self._replay_ctrl is not None
and self._replay_ctrl.software_fpga is not None):
sfpga = self._replay_ctrl.software_fpga
setter_name = self._SFPGA_SETTER_NAMES.get(opcode)
if setter_name is not None:
getattr(sfpga, setter_name)(value)
log.info(
f"SoftwareFPGA 0x{opcode:02X} val={value}")
return
log.warning(
f"Opcode 0x{opcode:02X} not routable in replay mode")
self._ui_queue.put(
("status_msg",
f"Opcode 0x{opcode:02X} is hardware-only (ignored in replay)"))
return
cmd = RadarProtocol.build_command(opcode, value) cmd = RadarProtocol.build_command(opcode, value)
ok = self.conn.write(cmd) ok = self.conn.write(cmd)
log.info(f"CMD 0x{opcode:02X} val={value} ({'OK' if ok else 'FAIL'})") log.info(f"CMD 0x{opcode:02X} val={value} ({'OK' if ok else 'FAIL'})")
@@ -657,6 +1116,133 @@ class RadarDashboard:
except ValueError: except ValueError:
log.error("Invalid custom command values") log.error("Invalid custom command values")
# -------------------------------------------------------- Replay actions
def _replay_browse_file(self):
path = filedialog.askopenfilename(
title="Select replay file",
filetypes=[
("NumPy files", "*.npy"),
("HDF5 files", "*.h5"),
("All files", "*.*"),
],
)
if path:
self._replay_load(path)
def _replay_browse_dir(self):
path = filedialog.askdirectory(title="Select co-sim directory")
if path:
self._replay_load(path)
def _replay_load(self, path: str):
"""Load replay data and enable transport controls."""
# Stop any running mode
if self._demo_active:
self._stop_demo()
# Safely shutdown and disable UI controls before loading the new file
if self._replay_active or self._replay_ctrl is not None:
self._replay_stop()
if self._acq_thread is not None:
if self.conn.is_open:
self._on_connect() # disconnect
else:
# Connection dropped unexpectedly — just clean up the thread
self._acq_thread.stop()
self._acq_thread.join(timeout=2)
self._acq_thread = None
try:
self._replay_ctrl = _ReplayController(
self.frame_queue, self._ui_queue)
total = self._replay_ctrl.load(path)
except Exception as exc: # noqa: BLE001
log.error(f"Failed to load replay: {exc}")
self._rp_status_label.config(
text=f"Load failed: {exc}", foreground=RED)
self._replay_ctrl = None
return
short_path = Path(path).name
self._replay_path_var.set(short_path)
self._rp_slider.config(to=max(0, total - 1))
self._rp_frame_label.config(text=f"0 / {total}")
self._rp_status_label.config(
text=f"Loaded: {total} frames from {short_path}",
foreground=GREEN)
# Enable transport buttons
for btn in (self._rp_play_btn, self._rp_pause_btn, self._rp_stop_btn):
btn.config(state="normal")
self._replay_active = True
self.lbl_status.config(text="REPLAY", foreground=ACCENT)
log.info(f"Replay loaded: {total} frames from {path}")
def _replay_play(self):
if self._replay_ctrl:
self._replay_ctrl.play()
def _replay_pause(self):
if self._replay_ctrl:
self._replay_ctrl.pause()
def _replay_stop(self):
if self._replay_ctrl:
self._replay_ctrl.close()
self._replay_ctrl = None
self._replay_active = False
self.lbl_status.config(text="DISCONNECTED", foreground=RED)
self._rp_slider.set(0)
self._rp_frame_label.config(text="0 / 0")
for btn in (self._rp_play_btn, self._rp_pause_btn, self._rp_stop_btn):
btn.config(state="disabled")
def _replay_seek(self, value):
if (self._replay_ctrl and self._replay_active
and not self._replay_ctrl.is_playing):
self._replay_ctrl.seek(int(value))
def _replay_speed_changed(self, _event=None):
if self._replay_ctrl:
self._replay_ctrl.set_speed(self._rp_speed_var.get())
def _replay_loop_changed(self):
if self._replay_ctrl:
self._replay_ctrl.set_loop(self._rp_loop_var.get())
# ---------------------------------------------------------- Demo actions
def _toggle_demo(self):
if self._demo_active:
self._stop_demo()
else:
self._start_demo()
def _start_demo(self):
"""Start demo mode with synthetic targets."""
# Mutual exclusion
if self._replay_active:
self._replay_stop()
if self._acq_thread is not None:
log.warning("Cannot start demo while radar is connected")
return
self._demo_sim = DemoSimulator(
self.frame_queue, self._ui_queue, self.root, interval_ms=500)
self._demo_sim.start()
self._demo_active = True
self.lbl_status.config(text="DEMO", foreground=YELLOW)
self.btn_demo.config(text="Stop Demo")
log.info("Demo mode started")
def _stop_demo(self):
if self._demo_sim is not None:
self._demo_sim.stop()
self._demo_sim = None
self._demo_active = False
self.lbl_status.config(text="DISCONNECTED", foreground=RED)
self.btn_demo.config(text="Start Demo")
log.info("Demo mode stopped")
def _on_status_received(self, status: StatusResponse): def _on_status_received(self, status: StatusResponse):
"""Called from acquisition thread — post to UI queue for main thread.""" """Called from acquisition thread — post to UI queue for main thread."""
self._ui_queue.put(("status", status)) self._ui_queue.put(("status", status))
@@ -804,6 +1390,46 @@ class RadarDashboard:
self._update_self_test_labels(payload) self._update_self_test_labels(payload)
elif tag == "log": elif tag == "log":
self._log_handler_append(payload) self._log_handler_append(payload)
elif tag == "replay_state":
self._on_replay_state(payload)
elif tag == "replay_index":
self._on_replay_index(*payload)
elif tag == "demo_targets":
self._on_demo_targets(payload)
elif tag == "status_msg":
self.lbl_status.config(text=str(payload), foreground=YELLOW)
def _on_replay_state(self, state: str):
if state == "playing":
self._rp_status_label.config(text="Playing", foreground=GREEN)
elif state == "paused":
self._rp_status_label.config(text="Paused", foreground=YELLOW)
elif state == "stopped":
self._rp_status_label.config(text="Stopped", foreground=FG)
def _on_replay_index(self, index: int, total: int):
self._rp_frame_label.config(text=f"{index} / {total}")
self._rp_slider.set(index)
def _on_demo_targets(self, targets: list[dict]):
"""Update the detected targets treeview from demo data."""
self._update_targets_table(targets)
def _update_targets_table(self, targets: list[dict]):
"""Refresh the detected targets treeview."""
# Clear existing rows
for item in self._tgt_tree.get_children():
self._tgt_tree.delete(item)
# Insert new rows
for t in targets:
self._tgt_tree.insert("", "end", values=(
t.get("id", ""),
f"{t.get('range_m', 0):.0f}",
f"{t.get('velocity', 0):.1f}",
f"{t.get('azimuth', 0):.1f}",
f"{t.get('snr', 0):.1f}",
t.get("class", ""),
))
def _log_handler_append(self, msg: str): def _log_handler_append(self, msg: str):
"""Append a log message to the log Text widget (main thread only).""" """Append a log message to the log Text widget (main thread only)."""
@@ -902,12 +1528,17 @@ class _TextHandler(logging.Handler):
def main(): def main():
parser = argparse.ArgumentParser(description="AERIS-10 Radar Dashboard") parser = argparse.ArgumentParser(description="AERIS-10 Radar Dashboard")
parser.add_argument("--live", action="store_true",
help="Use real FT2232H hardware (default: mock mode)")
parser.add_argument("--record", action="store_true", parser.add_argument("--record", action="store_true",
help="Start HDF5 recording immediately") help="Start HDF5 recording immediately")
parser.add_argument("--device", type=int, default=0, parser.add_argument("--device", type=int, default=0,
help="FT2232H device index (default: 0)") help="FT2232H device index (default: 0)")
mode_group = parser.add_mutually_exclusive_group()
mode_group.add_argument("--live", action="store_true",
help="Use real FT2232H hardware (default: mock mode)")
mode_group.add_argument("--replay", type=str, default=None,
help="Auto-load replay file or directory on startup")
mode_group.add_argument("--demo", action="store_true",
help="Start in demo mode with synthetic targets")
args = parser.parse_args() args = parser.parse_args()
if args.live: if args.live:
@@ -930,7 +1561,19 @@ def main():
) )
recorder.start(filepath) recorder.start(filepath)
if args.replay:
dashboard._replay_load(args.replay)
if args.demo:
dashboard._start_demo()
def on_closing(): def on_closing():
# Stop demo if active
if dashboard._demo_active:
dashboard._stop_demo()
# Stop replay if active
if dashboard._replay_ctrl is not None:
dashboard._replay_ctrl.close()
if dashboard._acq_thread is not None: if dashboard._acq_thread is not None:
dashboard._acq_thread.stop() dashboard._acq_thread.stop()
dashboard._acq_thread.join(timeout=2) dashboard._acq_thread.join(timeout=2)
+1 -1
View File
@@ -8,6 +8,6 @@ GUI_V5 ==> Added Mercury Color
GUI_V6 ==> Added USB3 FT601 support GUI_V6 ==> Added USB3 FT601 support
radar_dashboard ==> Board bring-up dashboard (FT2232H reader, real-time R-D heatmap, CFAR overlay, waterfall, host commands, HDF5 recording) GUI_V65_Tk ==> Board bring-up dashboard (FT2232H reader, real-time R-D heatmap, CFAR overlay, waterfall, host commands, HDF5 recording, replay, demo mode)
radar_protocol ==> Protocol layer (packet parsing, command building, FT2232H connection, data recorder, acquisition thread) radar_protocol ==> Protocol layer (packet parsing, command building, FT2232H connection, data recorder, acquisition thread)
smoke_test ==> Board bring-up smoke test host script (triggers FPGA self-test via opcode 0x30) smoke_test ==> Board bring-up smoke test host script (triggers FPGA self-test via opcode 0x30)
@@ -3,8 +3,8 @@
Tests for AERIS-10 Radar Dashboard protocol parsing, command building, Tests for AERIS-10 Radar Dashboard protocol parsing, command building,
data recording, and acquisition logic. data recording, and acquisition logic.
Run: python -m pytest test_radar_dashboard.py -v Run: python -m pytest test_GUI_V65_Tk.py -v
or: python test_radar_dashboard.py or: python test_GUI_V65_Tk.py
""" """
import struct import struct
@@ -22,6 +22,7 @@ from radar_protocol import (
NUM_RANGE_BINS, NUM_DOPPLER_BINS, NUM_RANGE_BINS, NUM_DOPPLER_BINS,
DATA_PACKET_SIZE, DATA_PACKET_SIZE,
) )
from GUI_V65_Tk import DemoTarget, DemoSimulator, _ReplayController
class TestRadarProtocol(unittest.TestCase): class TestRadarProtocol(unittest.TestCase):
@@ -719,5 +720,199 @@ class TestAGCVisualizationHistory(unittest.TestCase):
self.assertAlmostEqual(max(200 * 1.5, 5), 300.0) self.assertAlmostEqual(max(200 * 1.5, 5), 300.0)
# =====================================================================
# Tests for DemoTarget, DemoSimulator, and _ReplayController
# =====================================================================
class TestDemoTarget(unittest.TestCase):
"""Unit tests for DemoTarget kinematics."""
def test_initial_values_in_range(self):
t = DemoTarget(1)
self.assertEqual(t.id, 1)
self.assertGreaterEqual(t.range_m, 20)
self.assertLessEqual(t.range_m, DemoTarget._MAX_RANGE)
self.assertIn(t.classification, ["aircraft", "drone", "bird", "unknown"])
def test_step_returns_true_in_normal_range(self):
t = DemoTarget(2)
t.range_m = 150.0
t.velocity = 0.0
self.assertTrue(t.step())
def test_step_returns_false_when_out_of_range_high(self):
t = DemoTarget(3)
t.range_m = DemoTarget._MAX_RANGE + 1
t.velocity = -1.0 # moving away
self.assertFalse(t.step())
def test_step_returns_false_when_out_of_range_low(self):
t = DemoTarget(4)
t.range_m = 2.0
t.velocity = 1.0 # moving closer
self.assertFalse(t.step())
def test_velocity_clamped(self):
t = DemoTarget(5)
t.velocity = 19.0
t.range_m = 150.0
# Step many times — velocity should stay within [-20, 20]
for _ in range(100):
t.range_m = 150.0 # keep in range
t.step()
self.assertGreaterEqual(t.velocity, -20)
self.assertLessEqual(t.velocity, 20)
def test_snr_clamped(self):
t = DemoTarget(6)
t.snr = 49.5
t.range_m = 150.0
for _ in range(100):
t.range_m = 150.0
t.step()
self.assertGreaterEqual(t.snr, 0)
self.assertLessEqual(t.snr, 50)
class TestDemoSimulatorNoTk(unittest.TestCase):
"""Test DemoSimulator logic without a real Tk event loop.
We replace ``root.after`` with a mock to avoid needing a display.
"""
def _make_simulator(self):
from unittest.mock import MagicMock
fq = queue.Queue(maxsize=100)
uq = queue.Queue(maxsize=100)
mock_root = MagicMock()
# root.after(ms, fn) should return an id (str)
mock_root.after.return_value = "mock_after_id"
sim = DemoSimulator(fq, uq, mock_root, interval_ms=100)
return sim, fq, uq, mock_root
def test_initial_targets_created(self):
sim, _fq, _uq, _root = self._make_simulator()
# Should seed 8 initial targets
self.assertEqual(len(sim._targets), 8)
def test_tick_produces_frame_and_targets(self):
sim, fq, uq, _root = self._make_simulator()
sim._tick()
# Should have a frame
self.assertFalse(fq.empty())
frame = fq.get_nowait()
self.assertIsInstance(frame, RadarFrame)
self.assertEqual(frame.frame_number, 1)
# Should have demo_targets in ui_queue
tag, payload = uq.get_nowait()
self.assertEqual(tag, "demo_targets")
self.assertIsInstance(payload, list)
def test_tick_produces_nonzero_detections(self):
"""Demo targets should actually render into the range-Doppler grid."""
sim, fq, _uq, _root = self._make_simulator()
sim._tick()
frame = fq.get_nowait()
# At least some targets should produce magnitude > 0 and detections
self.assertGreater(frame.magnitude.sum(), 0,
"Demo targets should render into range-Doppler grid")
self.assertGreater(frame.detection_count, 0,
"Demo targets should produce detections")
def test_stop_cancels_after(self):
sim, _fq, _uq, mock_root = self._make_simulator()
sim._tick() # sets _after_id
sim.stop()
mock_root.after_cancel.assert_called_once_with("mock_after_id")
self.assertIsNone(sim._after_id)
class TestReplayController(unittest.TestCase):
"""Unit tests for _ReplayController (no GUI required)."""
def test_initial_state(self):
fq = queue.Queue()
uq = queue.Queue()
ctrl = _ReplayController(fq, uq)
self.assertEqual(ctrl.total_frames, 0)
self.assertEqual(ctrl.current_index, 0)
self.assertFalse(ctrl.is_playing)
self.assertIsNone(ctrl.software_fpga)
def test_set_speed(self):
ctrl = _ReplayController(queue.Queue(), queue.Queue())
ctrl.set_speed("2x")
self.assertAlmostEqual(ctrl._frame_interval, 0.050)
def test_set_speed_unknown_falls_back(self):
ctrl = _ReplayController(queue.Queue(), queue.Queue())
ctrl.set_speed("99x")
self.assertAlmostEqual(ctrl._frame_interval, 0.100)
def test_set_loop(self):
ctrl = _ReplayController(queue.Queue(), queue.Queue())
ctrl.set_loop(True)
self.assertTrue(ctrl._loop)
ctrl.set_loop(False)
self.assertFalse(ctrl._loop)
def test_seek_increments_past_emitted(self):
"""After seek(), _current_index should be one past the seeked frame."""
fq = queue.Queue(maxsize=100)
uq = queue.Queue(maxsize=100)
ctrl = _ReplayController(fq, uq)
# Manually set engine to a mock to allow seek
from unittest.mock import MagicMock
mock_engine = MagicMock()
mock_engine.total_frames = 10
mock_engine.get_frame.return_value = RadarFrame()
ctrl._engine = mock_engine
ctrl.seek(5)
# _current_index should be 6 (past the emitted frame)
self.assertEqual(ctrl._current_index, 6)
self.assertEqual(ctrl._last_emitted_index, 5)
# Frame should be in the queue
self.assertFalse(fq.empty())
def test_seek_clamps_to_bounds(self):
from unittest.mock import MagicMock
fq = queue.Queue(maxsize=100)
uq = queue.Queue(maxsize=100)
ctrl = _ReplayController(fq, uq)
mock_engine = MagicMock()
mock_engine.total_frames = 5
mock_engine.get_frame.return_value = RadarFrame()
ctrl._engine = mock_engine
ctrl.seek(100)
# Should clamp to last frame (index 4), then _current_index = 5
self.assertEqual(ctrl._last_emitted_index, 4)
self.assertEqual(ctrl._current_index, 5)
ctrl.seek(-10)
# Should clamp to 0, then _current_index = 1
self.assertEqual(ctrl._last_emitted_index, 0)
self.assertEqual(ctrl._current_index, 1)
def test_close_releases_engine(self):
from unittest.mock import MagicMock
fq = queue.Queue(maxsize=100)
uq = queue.Queue(maxsize=100)
ctrl = _ReplayController(fq, uq)
mock_engine = MagicMock()
mock_engine.total_frames = 5
mock_engine.get_frame.return_value = RadarFrame()
ctrl._engine = mock_engine
ctrl.close()
mock_engine.close.assert_called_once()
self.assertIsNone(ctrl._engine)
self.assertIsNone(ctrl.software_fpga)
if __name__ == "__main__": if __name__ == "__main__":
unittest.main(verbosity=2) unittest.main(verbosity=2)
+3 -3
View File
@@ -78,9 +78,9 @@ Every test binary must exit 0.
```bash ```bash
cd 9_Firmware/9_3_GUI cd 9_Firmware/9_3_GUI
python3 -m pytest test_radar_dashboard.py -v python3 -m pytest test_GUI_V65_Tk.py -v
# or without pytest: # or without pytest:
python3 -m unittest test_radar_dashboard -v python3 -m unittest test_GUI_V65_Tk -v
``` ```
57+ protocol and rendering tests. The `test_record_and_stop` test 57+ protocol and rendering tests. The `test_record_and_stop` test
@@ -130,7 +130,7 @@ Before pushing, confirm:
1. `bash run_regression.sh` — all phases pass 1. `bash run_regression.sh` — all phases pass
2. `make all` (MCU tests) — 20/20 pass 2. `make all` (MCU tests) — 20/20 pass
3. `python3 -m unittest test_radar_dashboard -v` — all pass 3. `python3 -m unittest test_GUI_V65_Tk -v` — all pass
4. `python3 validate_mem_files.py` — all checks pass 4. `python3 validate_mem_files.py` — all checks pass
5. `python3 compare.py dc && python3 compare_doppler.py stationary && python3 compare_mf.py all` 5. `python3 compare.py dc && python3 compare_doppler.py stationary && python3 compare_mf.py all`
6. `git diff --check` — no whitespace issues 6. `git diff --check` — no whitespace issues