Files
PLFM_RADAR/9_Firmware/9_3_GUI/GUI_V65_Tk.py
T
Jason 76cfc71b19 fix(gui): align radar parameters to FPGA truth (radar_scene.py)
- Bandwidth 500 MHz -> 20 MHz, sample rate 4 MHz -> 100 MHz (DDC output)
- Range formula: deramped FMCW -> matched-filter c/(2*Fs)*decimation
- Velocity formula: use PRI (167 us) and chirps_per_subframe (16)
- Carrier frequency: 10.525 GHz -> 10.5 GHz per radar_scene.py
- Range per bin: 4.8 m -> 24 m, max range: 307 m -> 1536 m
- Fix simulator target spawn range to match new coverage (50-1400 m)
- Remove dead BANDWIDTH constant, add SAMPLE_RATE to V65 Tk
- All 174 tests pass, ruff clean
2026-04-16 21:35:01 +05:45

1627 lines
64 KiB
Python

#!/usr/bin/env python3
"""
AERIS-10 Radar Dashboard (Tkinter)
===================================================
Real-time visualization and control for the AERIS-10 phased-array radar
via FT2232H USB 2.0 interface.
Features:
- FT2232H USB reader with packet parsing (matches usb_data_interface_ft2232h.v)
- Real-time range-Doppler magnitude heatmap (64x32)
- CFAR detection overlay (flagged cells highlighted)
- Range profile waterfall plot (range vs. time)
- Host command sender (opcodes per radar_system_top.v:
0x01-0x04, 0x10-0x16, 0x20-0x27, 0x30-0x31, 0xFF)
- Configuration panel for all radar parameters
- HDF5 data recording for offline analysis
- Replay mode (co-sim dirs, raw IQ .npy, HDF5) with transport controls
- Demo mode with synthetic moving targets
- Detected targets table
- Dual dispatch: FPGA controls route to SoftwareFPGA during replay
- Mock mode for development/testing without hardware
Usage:
python GUI_V65_Tk.py # Launch with mock data
python GUI_V65_Tk.py --live # Launch with FT2232H hardware
python GUI_V65_Tk.py --record # Launch with HDF5 recording
python GUI_V65_Tk.py --replay path/to/data # Auto-load replay
python GUI_V65_Tk.py --demo # Start in demo mode
"""
import os
import math
import time
import copy
import queue
import random
import logging
import argparse
import threading
import contextlib
from collections import deque
from pathlib import Path
from typing import ClassVar
import numpy as np
try:
import tkinter as tk
from tkinter import ttk, filedialog
import matplotlib
matplotlib.use("TkAgg")
from matplotlib.figure import Figure
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
_HAS_GUI = True
except (ModuleNotFoundError, ImportError):
_HAS_GUI = False
# Import protocol layer (no GUI deps)
from radar_protocol import (
RadarProtocol, FT2232HConnection, FT601Connection,
DataRecorder, RadarAcquisition,
RadarFrame, StatusResponse,
NUM_RANGE_BINS, NUM_DOPPLER_BINS, WATERFALL_DEPTH,
)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%H:%M:%S",
)
log = logging.getLogger("GUI_V65_Tk")
# ============================================================================
# Dashboard GUI
# ============================================================================
# Dark theme colors
BG = "#1e1e2e"
BG2 = "#282840"
FG = "#cdd6f4"
ACCENT = "#89b4fa"
GREEN = "#a6e3a1"
RED = "#f38ba8"
YELLOW = "#f9e2af"
SURFACE = "#313244"
# ============================================================================
# Demo Target Simulator (Tkinter timer-based)
# ============================================================================
class DemoTarget:
"""Single simulated target with kinematics."""
__slots__ = ("azimuth", "classification", "id", "range_m", "snr", "velocity")
# Physical range grid: 64 bins x ~24 m/bin = ~1536 m max
# Bin spacing = c / (2 * Fs) * decimation, where Fs = 100 MHz DDC output.
_RANGE_PER_BIN: float = (3e8 / (2 * 100e6)) * 16 # ~24 m
_MAX_RANGE: float = _RANGE_PER_BIN * NUM_RANGE_BINS # ~1536 m
def __init__(self, tid: int):
self.id = tid
self.range_m = random.uniform(20, self._MAX_RANGE - 20)
self.velocity = random.uniform(-10, 10)
self.azimuth = random.uniform(0, 360)
self.snr = random.uniform(10, 35)
self.classification = random.choice(
["aircraft", "drone", "bird", "unknown"])
def step(self) -> bool:
"""Advance one tick. Return False if target exits coverage."""
self.range_m -= self.velocity * 0.1
if self.range_m < 5 or self.range_m > self._MAX_RANGE:
return False
self.velocity = max(-20, min(20, self.velocity + random.uniform(-1, 1)))
self.azimuth = (self.azimuth + random.uniform(-0.5, 0.5)) % 360
self.snr = max(0, min(50, self.snr + random.uniform(-1, 1)))
return True
class DemoSimulator:
"""Timer-driven demo target generator for the Tkinter dashboard.
Produces synthetic ``RadarFrame`` objects and a target list each tick,
pushing them into the dashboard's ``frame_queue`` and ``_ui_queue``.
"""
def __init__(self, frame_queue: queue.Queue, ui_queue: queue.Queue,
root: tk.Tk, interval_ms: int = 500):
self._frame_queue = frame_queue
self._ui_queue = ui_queue
self._root = root
self._interval_ms = interval_ms
self._targets: list[DemoTarget] = []
self._next_id = 1
self._frame_number = 0
self._after_id: str | None = None
# Seed initial targets
for _ in range(8):
self._add_target()
def start(self):
self._tick()
def stop(self):
if self._after_id is not None:
self._root.after_cancel(self._after_id)
self._after_id = None
def add_random_target(self):
self._add_target()
def _add_target(self):
t = DemoTarget(self._next_id)
self._next_id += 1
self._targets.append(t)
def _tick(self):
updated: list[DemoTarget] = [t for t in self._targets if t.step()]
if len(updated) < 5 or (random.random() < 0.05 and len(updated) < 15):
self._add_target()
updated.append(self._targets[-1])
self._targets = updated
# Synthesize a RadarFrame with Gaussian blobs for each target
frame = self._make_frame(updated)
with contextlib.suppress(queue.Full):
self._frame_queue.put_nowait(frame)
# Post target info for the detected-targets treeview
target_dicts = [
{"id": t.id, "range_m": t.range_m, "velocity": t.velocity,
"azimuth": t.azimuth, "snr": t.snr, "class": t.classification}
for t in updated
]
self._ui_queue.put(("demo_targets", target_dicts))
self._after_id = self._root.after(self._interval_ms, self._tick)
def _make_frame(self, targets: list[DemoTarget]) -> RadarFrame:
"""Build a synthetic RadarFrame from target list."""
mag = np.zeros((NUM_RANGE_BINS, NUM_DOPPLER_BINS), dtype=np.float64)
det = np.zeros((NUM_RANGE_BINS, NUM_DOPPLER_BINS), dtype=np.uint8)
# Range/Doppler scaling: bin spacing = c/(2*Fs)*decimation
range_per_bin = (3e8 / (2 * 100e6)) * 16 # ~24 m/bin
max_range = range_per_bin * NUM_RANGE_BINS
vel_per_bin = 5.34 # m/s per Doppler bin (radar_scene.py: lam/(2*16*PRI))
for t in targets:
if t.range_m > max_range or t.range_m < 0:
continue
r_bin = int(t.range_m / range_per_bin)
d_bin = int((t.velocity / vel_per_bin) + NUM_DOPPLER_BINS / 2)
r_bin = max(0, min(NUM_RANGE_BINS - 1, r_bin))
d_bin = max(0, min(NUM_DOPPLER_BINS - 1, d_bin))
# Gaussian-ish blob
amplitude = 500 + t.snr * 200
for dr in range(-2, 3):
for dd in range(-1, 2):
ri = r_bin + dr
di = d_bin + dd
if 0 <= ri < NUM_RANGE_BINS and 0 <= di < NUM_DOPPLER_BINS:
w = math.exp(-0.5 * (dr**2 + dd**2))
mag[ri, di] += amplitude * w
if w > 0.5:
det[ri, di] = 1
rd_i = (mag * 0.5).astype(np.int16)
rd_q = np.zeros_like(rd_i)
rp = mag.max(axis=1)
self._frame_number += 1
return RadarFrame(
timestamp=time.time(),
range_doppler_i=rd_i,
range_doppler_q=rd_q,
magnitude=mag,
detections=det,
range_profile=rp,
detection_count=int(det.sum()),
frame_number=self._frame_number,
)
# ============================================================================
# Replay Controller (threading-based, reuses v7.ReplayEngine)
# ============================================================================
class _ReplayController:
"""Manages replay playback in a background thread for the Tkinter dashboard.
Imports ``ReplayEngine`` and ``SoftwareFPGA`` from ``v7`` lazily so
they are only required when replay is actually used.
"""
# Speed multiplier → frame interval in seconds
SPEED_MAP: ClassVar[dict[str, float]] = {
"0.25x": 0.400,
"0.5x": 0.200,
"1x": 0.100,
"2x": 0.050,
"5x": 0.020,
"10x": 0.010,
}
def __init__(self, frame_queue: queue.Queue, ui_queue: queue.Queue):
self._frame_queue = frame_queue
self._ui_queue = ui_queue
self._engine = None # lazy
self._software_fpga = None # lazy
self._thread: threading.Thread | None = None
self._play_event = threading.Event()
self._stop_event = threading.Event()
self._lock = threading.Lock()
self._current_index = 0
self._last_emitted_index = -1
self._loop = False
self._frame_interval = 0.100 # 1x speed
def load(self, path: str) -> int:
"""Load replay data from path. Returns total frames or raises."""
from v7.replay import ReplayEngine, ReplayFormat, detect_format
from v7.software_fpga import SoftwareFPGA
fmt = detect_format(path)
if fmt == ReplayFormat.RAW_IQ_NPY:
self._software_fpga = SoftwareFPGA()
self._engine = ReplayEngine(path, software_fpga=self._software_fpga)
else:
self._engine = ReplayEngine(path)
self._current_index = 0
self._last_emitted_index = -1
self._stop_event.clear()
self._play_event.clear()
return self._engine.total_frames
@property
def total_frames(self) -> int:
return self._engine.total_frames if self._engine else 0
@property
def current_index(self) -> int:
return self._last_emitted_index if self._last_emitted_index >= 0 else 0
@property
def is_playing(self) -> bool:
return self._play_event.is_set()
@property
def software_fpga(self):
return self._software_fpga
def set_speed(self, label: str):
self._frame_interval = self.SPEED_MAP.get(label, 0.100)
def set_loop(self, loop: bool):
self._loop = loop
def play(self):
self._play_event.set()
with self._lock:
if self._current_index >= self.total_frames:
self._current_index = 0
self._ui_queue.put(("replay_state", "playing"))
if self._thread is None or not self._thread.is_alive():
self._stop_event.clear()
self._thread = threading.Thread(target=self._run, daemon=True)
self._thread.start()
def pause(self):
self._play_event.clear()
self._ui_queue.put(("replay_state", "paused"))
def stop(self):
self._stop_event.set()
self._play_event.set() # unblock wait so thread exits promptly
with self._lock:
self._current_index = 0
self._last_emitted_index = -1
if self._thread is not None:
self._thread.join(timeout=2)
self._thread = None
self._play_event.clear()
self._ui_queue.put(("replay_state", "stopped"))
def close(self):
"""Stop playback and release underlying engine resources."""
self.stop()
if self._engine is not None:
self._engine.close()
self._engine = None
self._software_fpga = None
def seek(self, index: int):
with self._lock:
self._current_index = max(0, min(index, self.total_frames - 1))
self._emit_frame()
self._last_emitted_index = self._current_index
# Advance past the emitted frame so _run doesn't re-emit it
self._current_index += 1
def _run(self):
while not self._stop_event.is_set():
# Block until play or stop is signalled — no busy-sleep
self._play_event.wait()
if self._stop_event.is_set():
break
with self._lock:
if self._current_index >= self.total_frames:
if self._loop:
self._current_index = 0
else:
self._play_event.clear()
self._ui_queue.put(("replay_state", "paused"))
continue
self._emit_frame()
self._last_emitted_index = self._current_index
idx = self._current_index
self._current_index += 1
self._ui_queue.put(("replay_index", (idx, self.total_frames)))
time.sleep(self._frame_interval)
def _emit_frame(self):
"""Get current frame and push to queue. Must be called with lock held."""
if self._engine is None:
return
frame = self._engine.get_frame(self._current_index)
if frame is not None:
frame = copy.deepcopy(frame)
with contextlib.suppress(queue.Full):
self._frame_queue.put_nowait(frame)
class RadarDashboard:
"""Main tkinter application: real-time radar visualization and control."""
UPDATE_INTERVAL_MS = 100 # 10 Hz display refresh
# Radar parameters used for range-axis scaling.
SAMPLE_RATE = 100e6 # Hz — DDC output I/Q rate (matched filter input)
C = 3e8 # m/s — speed of light
def __init__(self, root: tk.Tk, mock: bool,
recorder: DataRecorder, device_index: int = 0):
self.root = root
self._mock = mock
self.conn: FT2232HConnection | FT601Connection | None = None
self.recorder = recorder
self.device_index = device_index
self.root.title("AERIS-10 Radar Dashboard")
self.root.geometry("1600x950")
self.root.configure(bg=BG)
# Frame queue (acquisition / replay / demo → display)
self.frame_queue: queue.Queue[RadarFrame] = queue.Queue(maxsize=8)
self._acq_thread: RadarAcquisition | None = None
# Thread-safe UI message queue — avoids calling root.after() from
# background threads which crashes Python 3.12 (GIL state corruption).
# Entries are (tag, payload) tuples drained by _schedule_update().
self._ui_queue: queue.Queue[tuple[str, object]] = queue.Queue()
# Display state
self._current_frame = RadarFrame()
self._waterfall = deque(maxlen=WATERFALL_DEPTH)
for _ in range(WATERFALL_DEPTH):
self._waterfall.append(np.zeros(NUM_RANGE_BINS))
self._frame_count = 0
self._fps_ts = time.time()
self._fps = 0.0
# Stable colorscale — exponential moving average of vmax
self._vmax_ema = 1000.0
self._vmax_alpha = 0.15 # smoothing factor (lower = more stable)
# AGC visualization history (ring buffers, ~60s at 10 Hz)
self._agc_history_len = 256
self._agc_gain_history: deque[int] = deque(maxlen=self._agc_history_len)
self._agc_peak_history: deque[int] = deque(maxlen=self._agc_history_len)
self._agc_sat_history: deque[int] = deque(maxlen=self._agc_history_len)
self._agc_time_history: deque[float] = deque(maxlen=self._agc_history_len)
self._agc_t0: float = time.time()
self._agc_last_redraw: float = 0.0 # throttle chart redraws
self._AGC_REDRAW_INTERVAL: float = 0.5 # seconds between redraws
# Replay state
self._replay_ctrl: _ReplayController | None = None
self._replay_active = False
# Demo state
self._demo_sim: DemoSimulator | None = None
self._demo_active = False
# Detected targets (from demo or replay host-DSP)
self._detected_targets: list[dict] = []
self._build_ui()
self._schedule_update()
# ------------------------------------------------------------------ UI
def _build_ui(self):
style = ttk.Style()
style.theme_use("clam")
style.configure(".", background=BG, foreground=FG, fieldbackground=SURFACE)
style.configure("TFrame", background=BG)
style.configure("TLabel", background=BG, foreground=FG)
style.configure("TButton", background=SURFACE, foreground=FG)
style.configure("TLabelframe", background=BG, foreground=ACCENT)
style.configure("TLabelframe.Label", background=BG, foreground=ACCENT)
style.configure("Accent.TButton", background=ACCENT, foreground=BG)
style.configure("TNotebook", background=BG)
style.configure("TNotebook.Tab", background=SURFACE, foreground=FG,
padding=[12, 4])
style.map("TNotebook.Tab", background=[("selected", ACCENT)],
foreground=[("selected", BG)])
# Top bar
top = ttk.Frame(self.root)
top.pack(fill="x", padx=8, pady=(8, 0))
self.lbl_status = ttk.Label(top, text="DISCONNECTED", foreground=RED,
font=("Menlo", 11, "bold"))
self.lbl_status.pack(side="left", padx=8)
self.lbl_fps = ttk.Label(top, text="0.0 fps", font=("Menlo", 10))
self.lbl_fps.pack(side="left", padx=16)
self.lbl_detections = ttk.Label(top, text="Det: 0", font=("Menlo", 10))
self.lbl_detections.pack(side="left", padx=16)
self.lbl_frame = ttk.Label(top, text="Frame: 0", font=("Menlo", 10))
self.lbl_frame.pack(side="left", padx=16)
self.btn_connect = ttk.Button(top, text="Connect",
command=self._on_connect,
style="Accent.TButton")
self.btn_connect.pack(side="right", padx=4)
# USB Interface selector (production FT2232H / premium FT601)
self._usb_iface_var = tk.StringVar(value="FT2232H (Production)")
self.cmb_usb_iface = ttk.Combobox(
top, textvariable=self._usb_iface_var,
values=["FT2232H (Production)", "FT601 (Premium)"],
state="readonly", width=20,
)
self.cmb_usb_iface.pack(side="right", padx=4)
ttk.Label(top, text="USB:", font=("Menlo", 10)).pack(side="right")
self.btn_record = ttk.Button(top, text="Record", command=self._on_record)
self.btn_record.pack(side="right", padx=4)
self.btn_demo = ttk.Button(top, text="Start Demo",
command=self._toggle_demo)
self.btn_demo.pack(side="right", padx=4)
# -- Tabbed notebook layout --
nb = ttk.Notebook(self.root)
nb.pack(fill="both", expand=True, padx=8, pady=8)
tab_display = ttk.Frame(nb)
tab_control = ttk.Frame(nb)
tab_replay = ttk.Frame(nb)
tab_agc = ttk.Frame(nb)
tab_log = ttk.Frame(nb)
nb.add(tab_display, text=" Display ")
nb.add(tab_control, text=" Control ")
nb.add(tab_replay, text=" Replay ")
nb.add(tab_agc, text=" AGC Monitor ")
nb.add(tab_log, text=" Log ")
self._build_display_tab(tab_display)
self._build_control_tab(tab_control)
self._build_replay_tab(tab_replay)
self._build_agc_tab(tab_agc)
self._build_log_tab(tab_log)
def _build_display_tab(self, parent):
# Compute physical axis limits
# Bin spacing = c / (2 * Fs_ddc) for matched-filter processing.
range_per_bin = self.C / (2.0 * self.SAMPLE_RATE) * 16 # ~24 m
max_range = range_per_bin * NUM_RANGE_BINS
doppler_bin_lo = 0
doppler_bin_hi = NUM_DOPPLER_BINS
# Top pane: plots
plot_frame = ttk.Frame(parent)
plot_frame.pack(fill="both", expand=True)
# Matplotlib figure with 3 subplots
self.fig = Figure(figsize=(14, 5), facecolor=BG)
self.fig.subplots_adjust(left=0.07, right=0.98, top=0.94, bottom=0.10,
wspace=0.30, hspace=0.35)
# Range-Doppler heatmap
self.ax_rd = self.fig.add_subplot(1, 3, (1, 2))
self.ax_rd.set_facecolor(BG2)
self._rd_img = self.ax_rd.imshow(
np.zeros((NUM_RANGE_BINS, NUM_DOPPLER_BINS)),
aspect="auto", cmap="inferno", origin="lower",
extent=[doppler_bin_lo, doppler_bin_hi, 0, max_range],
vmin=0, vmax=1000,
)
self.ax_rd.set_title("Range-Doppler Map", color=FG, fontsize=12)
self.ax_rd.set_xlabel("Doppler Bin (0-15: long PRI, 16-31: short PRI)", color=FG)
self.ax_rd.set_ylabel("Range (m)", color=FG)
self.ax_rd.tick_params(colors=FG)
# Save axis limits for coordinate conversions
self._max_range = max_range
self._range_per_bin = range_per_bin
# CFAR detection overlay (scatter)
self._det_scatter = self.ax_rd.scatter([], [], s=30, c=GREEN,
marker="x", linewidths=1.5,
zorder=5, label="CFAR Det")
# Waterfall plot (range profile vs time)
self.ax_wf = self.fig.add_subplot(1, 3, 3)
self.ax_wf.set_facecolor(BG2)
wf_init = np.zeros((WATERFALL_DEPTH, NUM_RANGE_BINS))
self._wf_img = self.ax_wf.imshow(
wf_init, aspect="auto", cmap="viridis", origin="lower",
extent=[0, max_range, 0, WATERFALL_DEPTH],
vmin=0, vmax=5000,
)
self.ax_wf.set_title("Range Waterfall", color=FG, fontsize=12)
self.ax_wf.set_xlabel("Range (m)", color=FG)
self.ax_wf.set_ylabel("Frame", color=FG)
self.ax_wf.tick_params(colors=FG)
canvas = FigureCanvasTkAgg(self.fig, master=plot_frame)
canvas.draw()
canvas.get_tk_widget().pack(fill="both", expand=True)
self._canvas = canvas
# Bottom pane: detected targets table
tgt_frame = ttk.LabelFrame(parent, text="Detected Targets", padding=4)
tgt_frame.pack(fill="x", padx=8, pady=(0, 4))
cols = ("id", "range_m", "velocity", "azimuth", "snr", "class")
self._tgt_tree = ttk.Treeview(
tgt_frame, columns=cols, show="headings", height=5)
for col, heading, width in [
("id", "ID", 50),
("range_m", "Range (m)", 100),
("velocity", "Vel (m/s)", 90),
("azimuth", "Az (deg)", 90),
("snr", "SNR (dB)", 80),
("class", "Class", 100),
]:
self._tgt_tree.heading(col, text=heading)
self._tgt_tree.column(col, width=width, anchor="center")
scrollbar = ttk.Scrollbar(
tgt_frame, orient="vertical", command=self._tgt_tree.yview)
self._tgt_tree.configure(yscrollcommand=scrollbar.set)
self._tgt_tree.pack(side="left", fill="x", expand=True)
scrollbar.pack(side="right", fill="y")
def _build_control_tab(self, parent):
"""Host command sender — organized by FPGA register groups.
Layout: scrollable canvas with three columns:
Left: Quick Actions + Diagnostics (self-test)
Center: Waveform Timing + Signal Processing
Right: Detection (CFAR) + Custom Command
"""
# Scrollable wrapper for small screens
canvas = tk.Canvas(parent, bg=BG, highlightthickness=0)
scrollbar = ttk.Scrollbar(parent, orient="vertical", command=canvas.yview)
outer = ttk.Frame(canvas)
outer.bind("<Configure>",
lambda _e: canvas.configure(scrollregion=canvas.bbox("all")))
canvas.create_window((0, 0), window=outer, anchor="nw")
canvas.configure(yscrollcommand=scrollbar.set)
canvas.pack(side="left", fill="both", expand=True, padx=8, pady=8)
scrollbar.pack(side="right", fill="y")
self._param_vars: dict[str, tk.StringVar] = {}
# ── Left column: Quick Actions + Diagnostics ──────────────────
left = ttk.Frame(outer)
left.grid(row=0, column=0, sticky="nsew", padx=(0, 6))
# -- Radar Operation --
grp_op = ttk.LabelFrame(left, text="Radar Operation", padding=10)
grp_op.pack(fill="x", pady=(0, 8))
ttk.Button(grp_op, text="Radar Mode On",
command=lambda: self._send_cmd(0x01, 1)).pack(fill="x", pady=2)
ttk.Button(grp_op, text="Radar Mode Off",
command=lambda: self._send_cmd(0x01, 0)).pack(fill="x", pady=2)
ttk.Button(grp_op, text="Trigger Chirp",
command=lambda: self._send_cmd(0x02, 1)).pack(fill="x", pady=2)
# Stream Control (3-bit mask)
sc_row = ttk.Frame(grp_op)
sc_row.pack(fill="x", pady=2)
ttk.Label(sc_row, text="Stream Control").pack(side="left")
var_sc = tk.StringVar(value="7")
self._param_vars["4"] = var_sc
ttk.Entry(sc_row, textvariable=var_sc, width=6).pack(side="left", padx=6)
ttk.Label(sc_row, text="0-7", foreground=ACCENT,
font=("Menlo", 9)).pack(side="left")
ttk.Button(sc_row, text="Set",
command=lambda: self._send_validated(
0x04, var_sc, bits=3)).pack(side="right")
ttk.Button(grp_op, text="Request Status",
command=lambda: self._send_cmd(0xFF, 0)).pack(fill="x", pady=2)
# -- Signal Processing --
grp_sp = ttk.LabelFrame(left, text="Signal Processing", padding=10)
grp_sp.pack(fill="x", pady=(0, 8))
sp_params = [
# Format: label, opcode, default, bits, hint
("Detect Threshold", 0x03, "10000", 16, "0-65535"),
("Gain Shift", 0x16, "0", 4, "0-15, dir+shift"),
("MTI Enable", 0x26, "0", 1, "0=off, 1=on"),
("DC Notch Width", 0x27, "0", 3, "0-7 bins"),
]
for label, opcode, default, bits, hint in sp_params:
self._add_param_row(grp_sp, label, opcode, default, bits, hint)
# MTI quick toggle
mti_row = ttk.Frame(grp_sp)
mti_row.pack(fill="x", pady=2)
ttk.Button(mti_row, text="Enable MTI",
command=lambda: self._send_cmd(0x26, 1)).pack(
side="left", expand=True, fill="x", padx=(0, 2))
ttk.Button(mti_row, text="Disable MTI",
command=lambda: self._send_cmd(0x26, 0)).pack(
side="left", expand=True, fill="x", padx=(2, 0))
# -- Diagnostics --
grp_diag = ttk.LabelFrame(left, text="Diagnostics", padding=10)
grp_diag.pack(fill="x", pady=(0, 8))
ttk.Button(grp_diag, text="Run Self-Test",
command=lambda: self._send_cmd(0x30, 1)).pack(fill="x", pady=2)
ttk.Button(grp_diag, text="Read Self-Test Result",
command=lambda: self._send_cmd(0x31, 0)).pack(fill="x", pady=2)
st_frame = ttk.LabelFrame(grp_diag, text="Self-Test Results", padding=6)
st_frame.pack(fill="x", pady=(4, 0))
self._st_labels = {}
for name, default_text in [
("busy", "Busy: --"),
("flags", "Flags: -----"),
("detail", "Detail: 0x--"),
("t0", "T0 BRAM: --"),
("t1", "T1 CIC: --"),
("t2", "T2 FFT: --"),
("t3", "T3 Arith: --"),
("t4", "T4 ADC: --"),
]:
lbl = ttk.Label(st_frame, text=default_text, font=("Menlo", 9))
lbl.pack(anchor="w")
self._st_labels[name] = lbl
# ── Center column: Waveform Timing ────────────────────────────
center = ttk.Frame(outer)
center.grid(row=0, column=1, sticky="nsew", padx=6)
grp_wf = ttk.LabelFrame(center, text="Waveform Timing", padding=10)
grp_wf.pack(fill="x", pady=(0, 8))
wf_params = [
("Long Chirp Cycles", 0x10, "3000", 16, "0-65535, rst=3000"),
("Long Listen Cycles", 0x11, "13700", 16, "0-65535, rst=13700"),
("Guard Cycles", 0x12, "17540", 16, "0-65535, rst=17540"),
("Short Chirp Cycles", 0x13, "50", 16, "0-65535, rst=50"),
("Short Listen Cycles", 0x14, "17450", 16, "0-65535, rst=17450"),
("Chirps Per Elevation", 0x15, "32", 6, "1-32, clamped"),
]
for label, opcode, default, bits, hint in wf_params:
self._add_param_row(grp_wf, label, opcode, default, bits, hint)
# ── Right column: Detection (CFAR) + Custom ───────────────────
right = ttk.Frame(outer)
right.grid(row=0, column=2, sticky="nsew", padx=(6, 0))
grp_cfar = ttk.LabelFrame(right, text="Detection (CFAR)", padding=10)
grp_cfar.pack(fill="x", pady=(0, 8))
cfar_params = [
("CFAR Enable", 0x25, "0", 1, "0=off, 1=on"),
("CFAR Guard Cells", 0x21, "2", 4, "0-15, rst=2"),
("CFAR Train Cells", 0x22, "8", 5, "1-31, rst=8"),
("CFAR Alpha (Q4.4)", 0x23, "48", 8, "0-255, rst=0x30=3.0"),
("CFAR Mode", 0x24, "0", 2, "0=CA 1=GO 2=SO"),
]
for label, opcode, default, bits, hint in cfar_params:
self._add_param_row(grp_cfar, label, opcode, default, bits, hint)
# CFAR quick toggle
cfar_row = ttk.Frame(grp_cfar)
cfar_row.pack(fill="x", pady=2)
ttk.Button(cfar_row, text="Enable CFAR",
command=lambda: self._send_cmd(0x25, 1)).pack(
side="left", expand=True, fill="x", padx=(0, 2))
ttk.Button(cfar_row, text="Disable CFAR",
command=lambda: self._send_cmd(0x25, 0)).pack(
side="left", expand=True, fill="x", padx=(2, 0))
# ── AGC (Automatic Gain Control) ──────────────────────────────
grp_agc = ttk.LabelFrame(right, text="AGC (Auto Gain)", padding=10)
grp_agc.pack(fill="x", pady=(0, 8))
agc_params = [
("AGC Enable", 0x28, "0", 1, "0=manual, 1=auto"),
("AGC Target", 0x29, "200", 8, "0-255, peak target"),
("AGC Attack", 0x2A, "1", 4, "0-15, atten step"),
("AGC Decay", 0x2B, "1", 4, "0-15, gain-up step"),
("AGC Holdoff", 0x2C, "4", 4, "0-15, frames"),
]
for label, opcode, default, bits, hint in agc_params:
self._add_param_row(grp_agc, label, opcode, default, bits, hint)
# AGC quick toggle
agc_row = ttk.Frame(grp_agc)
agc_row.pack(fill="x", pady=2)
ttk.Button(agc_row, text="Enable AGC",
command=lambda: self._send_cmd(0x28, 1)).pack(
side="left", expand=True, fill="x", padx=(0, 2))
ttk.Button(agc_row, text="Disable AGC",
command=lambda: self._send_cmd(0x28, 0)).pack(
side="left", expand=True, fill="x", padx=(2, 0))
# AGC status readback labels
agc_st = ttk.LabelFrame(grp_agc, text="AGC Status", padding=6)
agc_st.pack(fill="x", pady=(4, 0))
self._agc_labels = {}
for name, default_text in [
("enable", "AGC: --"),
("gain", "Gain: --"),
("peak", "Peak: --"),
("sat", "Sat Count: --"),
]:
lbl = ttk.Label(agc_st, text=default_text, font=("Menlo", 9))
lbl.pack(anchor="w")
self._agc_labels[name] = lbl
# ── Custom Command (advanced / debug) ─────────────────────────
grp_cust = ttk.LabelFrame(right, text="Custom Command", padding=10)
grp_cust.pack(fill="x", pady=(0, 8))
r0 = ttk.Frame(grp_cust)
r0.pack(fill="x", pady=2)
ttk.Label(r0, text="Opcode (hex)").pack(side="left")
self._custom_op = tk.StringVar(value="01")
ttk.Entry(r0, textvariable=self._custom_op, width=8).pack(
side="left", padx=6)
r1 = ttk.Frame(grp_cust)
r1.pack(fill="x", pady=2)
ttk.Label(r1, text="Value (dec)").pack(side="left")
self._custom_val = tk.StringVar(value="0")
ttk.Entry(r1, textvariable=self._custom_val, width=8).pack(
side="left", padx=6)
ttk.Button(grp_cust, text="Send",
command=self._send_custom).pack(fill="x", pady=2)
# Column weights
outer.columnconfigure(0, weight=1)
outer.columnconfigure(1, weight=1)
outer.columnconfigure(2, weight=1)
outer.rowconfigure(0, weight=1)
def _add_param_row(self, parent, label: str, opcode: int,
default: str, bits: int, hint: str):
"""Add a single parameter row: label, entry, hint, Set button with validation."""
row = ttk.Frame(parent)
row.pack(fill="x", pady=2)
ttk.Label(row, text=label).pack(side="left")
var = tk.StringVar(value=default)
self._param_vars[str(opcode)] = var
ttk.Entry(row, textvariable=var, width=8).pack(side="left", padx=6)
ttk.Label(row, text=hint, foreground=ACCENT,
font=("Menlo", 9)).pack(side="left")
ttk.Button(row, text="Set",
command=lambda: self._send_validated(
opcode, var, bits=bits)).pack(side="right")
def _send_validated(self, opcode: int, var: tk.StringVar, bits: int):
"""Parse, clamp to bit-width, send command, and update the entry."""
try:
raw = int(var.get())
except ValueError:
log.error(f"Invalid value for opcode 0x{opcode:02X}: {var.get()!r}")
return
max_val = (1 << bits) - 1
clamped = max(0, min(raw, max_val))
if clamped != raw:
log.warning(f"Value {raw} clamped to {clamped} "
f"({bits}-bit max={max_val}) for opcode 0x{opcode:02X}")
var.set(str(clamped))
self._send_cmd(opcode, clamped)
def _build_replay_tab(self, parent):
"""Replay tab — load file, transport controls, seek slider."""
# File selection
file_frame = ttk.LabelFrame(parent, text="Replay Source", padding=10)
file_frame.pack(fill="x", padx=8, pady=(8, 4))
self._replay_path_var = tk.StringVar(value="(none)")
ttk.Label(file_frame, textvariable=self._replay_path_var,
font=("Menlo", 9)).pack(side="left", fill="x", expand=True)
ttk.Button(file_frame, text="Browse File...",
command=self._replay_browse_file).pack(side="right", padx=(4, 0))
ttk.Button(file_frame, text="Browse Dir...",
command=self._replay_browse_dir).pack(side="right", padx=(4, 0))
# Transport controls
ctrl_frame = ttk.LabelFrame(parent, text="Transport", padding=10)
ctrl_frame.pack(fill="x", padx=8, pady=4)
btn_row = ttk.Frame(ctrl_frame)
btn_row.pack(fill="x", pady=(0, 6))
self._rp_play_btn = ttk.Button(
btn_row, text="Play", command=self._replay_play, state="disabled")
self._rp_play_btn.pack(side="left", padx=2)
self._rp_pause_btn = ttk.Button(
btn_row, text="Pause", command=self._replay_pause, state="disabled")
self._rp_pause_btn.pack(side="left", padx=2)
self._rp_stop_btn = ttk.Button(
btn_row, text="Stop", command=self._replay_stop, state="disabled")
self._rp_stop_btn.pack(side="left", padx=2)
# Speed selector
ttk.Label(btn_row, text="Speed:").pack(side="left", padx=(16, 4))
self._rp_speed_var = tk.StringVar(value="1x")
speed_combo = ttk.Combobox(
btn_row, textvariable=self._rp_speed_var,
values=list(_ReplayController.SPEED_MAP.keys()),
state="readonly", width=6)
speed_combo.pack(side="left", padx=2)
speed_combo.bind("<<ComboboxSelected>>", self._replay_speed_changed)
# Loop checkbox
self._rp_loop_var = tk.BooleanVar(value=False)
ttk.Checkbutton(btn_row, text="Loop",
variable=self._rp_loop_var,
command=self._replay_loop_changed).pack(side="left", padx=8)
# Seek slider
slider_row = ttk.Frame(ctrl_frame)
slider_row.pack(fill="x")
self._rp_slider = tk.Scale(
slider_row, from_=0, to=0, orient="horizontal",
bg=SURFACE, fg=FG, highlightthickness=0,
troughcolor=BG2, command=self._replay_seek)
self._rp_slider.pack(side="left", fill="x", expand=True)
self._rp_frame_label = ttk.Label(
slider_row, text="0 / 0", font=("Menlo", 10))
self._rp_frame_label.pack(side="right", padx=8)
# Status
self._rp_status_label = ttk.Label(
parent, text="No replay loaded", font=("Menlo", 10))
self._rp_status_label.pack(padx=8, pady=4, anchor="w")
# Info frame for FPGA controls during replay
info = ttk.LabelFrame(parent, text="Replay FPGA Controls", padding=10)
info.pack(fill="x", padx=8, pady=4)
ttk.Label(
info,
text=("When replaying Raw IQ data, FPGA Control tab "
"parameters are routed to the SoftwareFPGA.\n"
"Changes take effect on the next frame."),
font=("Menlo", 9), foreground=ACCENT,
).pack(anchor="w")
def _build_agc_tab(self, parent):
"""AGC Monitor tab — real-time strip charts for gain, peak, and saturation."""
# Top row: AGC status badge + saturation indicator
top = ttk.Frame(parent)
top.pack(fill="x", padx=8, pady=(8, 0))
self._agc_badge = ttk.Label(
top, text="AGC: --", font=("Menlo", 14, "bold"), foreground=FG)
self._agc_badge.pack(side="left", padx=(0, 24))
self._agc_sat_badge = ttk.Label(
top, text="Saturation: 0", font=("Menlo", 12), foreground=GREEN)
self._agc_sat_badge.pack(side="left", padx=(0, 24))
self._agc_gain_value = ttk.Label(
top, text="Gain: --", font=("Menlo", 12), foreground=ACCENT)
self._agc_gain_value.pack(side="left", padx=(0, 24))
self._agc_peak_value = ttk.Label(
top, text="Peak: --", font=("Menlo", 12), foreground=ACCENT)
self._agc_peak_value.pack(side="left")
# Matplotlib figure with 3 stacked subplots sharing x-axis (time)
self._agc_fig = Figure(figsize=(14, 7), facecolor=BG)
self._agc_fig.subplots_adjust(
left=0.07, right=0.98, top=0.95, bottom=0.08,
hspace=0.30)
# Subplot 1: FPGA inner-loop gain (4-bit, 0-15)
self._ax_gain = self._agc_fig.add_subplot(3, 1, 1)
self._ax_gain.set_facecolor(BG2)
self._ax_gain.set_title("FPGA AGC Gain (inner loop)", color=FG, fontsize=10)
self._ax_gain.set_ylabel("Gain Level", color=FG)
self._ax_gain.set_ylim(-0.5, 15.5)
self._ax_gain.tick_params(colors=FG)
self._ax_gain.set_xlim(0, self._agc_history_len)
self._gain_line, = self._ax_gain.plot(
[], [], color=ACCENT, linewidth=1.5, label="Gain")
self._ax_gain.axhline(y=0, color=RED, linewidth=0.5, alpha=0.5, linestyle="--")
self._ax_gain.axhline(y=15, color=RED, linewidth=0.5, alpha=0.5, linestyle="--")
for spine in self._ax_gain.spines.values():
spine.set_color(SURFACE)
# Subplot 2: Peak magnitude (8-bit, 0-255)
self._ax_peak = self._agc_fig.add_subplot(3, 1, 2)
self._ax_peak.set_facecolor(BG2)
self._ax_peak.set_title("Peak Magnitude", color=FG, fontsize=10)
self._ax_peak.set_ylabel("Peak (8-bit)", color=FG)
self._ax_peak.set_ylim(-5, 260)
self._ax_peak.tick_params(colors=FG)
self._ax_peak.set_xlim(0, self._agc_history_len)
self._peak_line, = self._ax_peak.plot(
[], [], color=YELLOW, linewidth=1.5, label="Peak")
# AGC target reference line (default 200)
self._agc_target_line = self._ax_peak.axhline(
y=200, color=GREEN, linewidth=1.0, alpha=0.7, linestyle="--",
label="Target (200)")
self._ax_peak.legend(loc="upper right", fontsize=8,
facecolor=BG2, edgecolor=SURFACE,
labelcolor=FG)
for spine in self._ax_peak.spines.values():
spine.set_color(SURFACE)
# Subplot 3: Saturation count (8-bit, 0-255) as bar-style fill
self._ax_sat = self._agc_fig.add_subplot(3, 1, 3)
self._ax_sat.set_facecolor(BG2)
self._ax_sat.set_title("Saturation Count", color=FG, fontsize=10)
self._ax_sat.set_ylabel("Sat Count", color=FG)
self._ax_sat.set_xlabel("Sample Index", color=FG)
self._ax_sat.set_ylim(-1, 40)
self._ax_sat.tick_params(colors=FG)
self._ax_sat.set_xlim(0, self._agc_history_len)
self._sat_fill = self._ax_sat.fill_between(
[], [], color=RED, alpha=0.6, label="Saturation")
self._sat_line, = self._ax_sat.plot(
[], [], color=RED, linewidth=1.0)
self._ax_sat.axhline(y=0, color=GREEN, linewidth=0.5, alpha=0.5, linestyle="--")
for spine in self._ax_sat.spines.values():
spine.set_color(SURFACE)
agc_canvas = FigureCanvasTkAgg(self._agc_fig, master=parent)
agc_canvas.draw()
agc_canvas.get_tk_widget().pack(fill="both", expand=True)
self._agc_canvas = agc_canvas
def _build_log_tab(self, parent):
self.log_text = tk.Text(parent, bg=BG2, fg=FG, font=("Menlo", 10),
insertbackground=FG, wrap="word")
self.log_text.pack(fill="both", expand=True, padx=8, pady=8)
# Redirect log handler to text widget (via UI queue for thread safety)
handler = _TextHandler(self._ui_queue)
handler.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(message)s",
datefmt="%H:%M:%S"))
logging.getLogger().addHandler(handler)
# ------------------------------------------------------------ Actions
def _on_connect(self):
if self.conn is not None and self.conn.is_open:
# Disconnect
if self._acq_thread is not None:
self._acq_thread.stop()
self._acq_thread.join(timeout=2)
self._acq_thread = None
self.conn.close()
self.conn = None
self.lbl_status.config(text="DISCONNECTED", foreground=RED)
self.btn_connect.config(text="Connect")
self.cmb_usb_iface.config(state="readonly")
log.info("Disconnected")
return
# Stop any active demo or replay before going live
if self._demo_active:
self._stop_demo()
if self._replay_active:
self._replay_stop()
# Create connection based on USB Interface selector
iface = self._usb_iface_var.get()
if "FT601" in iface:
self.conn = FT601Connection(mock=self._mock)
else:
self.conn = FT2232HConnection(mock=self._mock)
# Disable interface selector while connecting/connected
self.cmb_usb_iface.config(state="disabled")
# Open connection in a background thread to avoid blocking the GUI
self.lbl_status.config(text="CONNECTING...", foreground=YELLOW)
self.btn_connect.config(state="disabled")
self.root.update_idletasks()
def _do_connect():
ok = self.conn.open(self.device_index)
# Post result to UI queue (drained by _schedule_update)
self._ui_queue.put(("connect", ok))
threading.Thread(target=_do_connect, daemon=True).start()
def _on_connect_done(self, success: bool):
"""Called on main thread after connection attempt completes."""
self.btn_connect.config(state="normal")
if success:
self.lbl_status.config(text="CONNECTED", foreground=GREEN)
self.btn_connect.config(text="Disconnect")
self._acq_thread = RadarAcquisition(
self.conn, self.frame_queue, self.recorder,
status_callback=self._on_status_received)
self._acq_thread.start()
log.info("Connected and acquisition started")
else:
self.lbl_status.config(text="CONNECT FAILED", foreground=RED)
self.btn_connect.config(text="Connect")
self.cmb_usb_iface.config(state="readonly")
self.conn = None
def _on_record(self):
if self.recorder.recording:
self.recorder.stop()
self.btn_record.config(text="Record")
return
filepath = filedialog.asksaveasfilename(
defaultextension=".h5",
filetypes=[("HDF5", "*.h5"), ("All", "*.*")],
initialfile=f"radar_{time.strftime('%Y%m%d_%H%M%S')}.h5",
)
if filepath:
self.recorder.start(filepath)
self.btn_record.config(text="Stop Rec")
# Opcode → SoftwareFPGA setter method name for dual dispatch during replay
_SFPGA_SETTER_NAMES: ClassVar[dict[int, str]] = {
0x03: "set_detect_threshold",
0x16: "set_gain_shift",
0x21: "set_cfar_guard",
0x22: "set_cfar_train",
0x23: "set_cfar_alpha",
0x24: "set_cfar_mode",
0x25: "set_cfar_enable",
0x26: "set_mti_enable",
0x27: "set_dc_notch_width",
0x28: "set_agc_enable",
}
def _send_cmd(self, opcode: int, value: int):
"""Send command — routes to SoftwareFPGA when replaying raw IQ."""
if (self._replay_active and self._replay_ctrl is not None
and self._replay_ctrl.software_fpga is not None):
sfpga = self._replay_ctrl.software_fpga
setter_name = self._SFPGA_SETTER_NAMES.get(opcode)
if setter_name is not None:
getattr(sfpga, setter_name)(value)
log.info(
f"SoftwareFPGA 0x{opcode:02X} val={value}")
return
log.warning(
f"Opcode 0x{opcode:02X} not routable in replay mode")
self._ui_queue.put(
("status_msg",
f"Opcode 0x{opcode:02X} is hardware-only (ignored in replay)"))
return
cmd = RadarProtocol.build_command(opcode, value)
if self.conn is None:
log.warning("No connection — command not sent")
return
ok = self.conn.write(cmd)
log.info(f"CMD 0x{opcode:02X} val={value} ({'OK' if ok else 'FAIL'})")
def _send_custom(self):
try:
op = int(self._custom_op.get(), 16)
val = int(self._custom_val.get())
self._send_cmd(op, val)
except ValueError:
log.error("Invalid custom command values")
# -------------------------------------------------------- Replay actions
def _replay_browse_file(self):
path = filedialog.askopenfilename(
title="Select replay file",
filetypes=[
("NumPy files", "*.npy"),
("HDF5 files", "*.h5"),
("All files", "*.*"),
],
)
if path:
self._replay_load(path)
def _replay_browse_dir(self):
path = filedialog.askdirectory(title="Select co-sim directory")
if path:
self._replay_load(path)
def _replay_load(self, path: str):
"""Load replay data and enable transport controls."""
# Stop any running mode
if self._demo_active:
self._stop_demo()
# Safely shutdown and disable UI controls before loading the new file
if self._replay_active or self._replay_ctrl is not None:
self._replay_stop()
if self._acq_thread is not None:
if self.conn is not None and self.conn.is_open:
self._on_connect() # disconnect
else:
# Connection dropped unexpectedly — just clean up the thread
self._acq_thread.stop()
self._acq_thread.join(timeout=2)
self._acq_thread = None
try:
self._replay_ctrl = _ReplayController(
self.frame_queue, self._ui_queue)
total = self._replay_ctrl.load(path)
except Exception as exc: # noqa: BLE001
log.error(f"Failed to load replay: {exc}")
self._rp_status_label.config(
text=f"Load failed: {exc}", foreground=RED)
self._replay_ctrl = None
return
short_path = Path(path).name
self._replay_path_var.set(short_path)
self._rp_slider.config(to=max(0, total - 1))
self._rp_frame_label.config(text=f"0 / {total}")
self._rp_status_label.config(
text=f"Loaded: {total} frames from {short_path}",
foreground=GREEN)
# Enable transport buttons
for btn in (self._rp_play_btn, self._rp_pause_btn, self._rp_stop_btn):
btn.config(state="normal")
self._replay_active = True
self.lbl_status.config(text="REPLAY", foreground=ACCENT)
log.info(f"Replay loaded: {total} frames from {path}")
def _replay_play(self):
if self._replay_ctrl:
self._replay_ctrl.play()
def _replay_pause(self):
if self._replay_ctrl:
self._replay_ctrl.pause()
def _replay_stop(self):
if self._replay_ctrl:
self._replay_ctrl.close()
self._replay_ctrl = None
self._replay_active = False
self.lbl_status.config(text="DISCONNECTED", foreground=RED)
self._rp_slider.set(0)
self._rp_frame_label.config(text="0 / 0")
for btn in (self._rp_play_btn, self._rp_pause_btn, self._rp_stop_btn):
btn.config(state="disabled")
def _replay_seek(self, value):
if (self._replay_ctrl and self._replay_active
and not self._replay_ctrl.is_playing):
self._replay_ctrl.seek(int(value))
def _replay_speed_changed(self, _event=None):
if self._replay_ctrl:
self._replay_ctrl.set_speed(self._rp_speed_var.get())
def _replay_loop_changed(self):
if self._replay_ctrl:
self._replay_ctrl.set_loop(self._rp_loop_var.get())
# ---------------------------------------------------------- Demo actions
def _toggle_demo(self):
if self._demo_active:
self._stop_demo()
else:
self._start_demo()
def _start_demo(self):
"""Start demo mode with synthetic targets."""
# Mutual exclusion
if self._replay_active:
self._replay_stop()
if self._acq_thread is not None:
log.warning("Cannot start demo while radar is connected")
return
self._demo_sim = DemoSimulator(
self.frame_queue, self._ui_queue, self.root, interval_ms=500)
self._demo_sim.start()
self._demo_active = True
self.lbl_status.config(text="DEMO", foreground=YELLOW)
self.btn_demo.config(text="Stop Demo")
log.info("Demo mode started")
def _stop_demo(self):
if self._demo_sim is not None:
self._demo_sim.stop()
self._demo_sim = None
self._demo_active = False
self.lbl_status.config(text="DISCONNECTED", foreground=RED)
self.btn_demo.config(text="Start Demo")
log.info("Demo mode stopped")
def _on_status_received(self, status: StatusResponse):
"""Called from acquisition thread — post to UI queue for main thread."""
self._ui_queue.put(("status", status))
def _update_self_test_labels(self, status: StatusResponse):
"""Update the self-test result labels and AGC status from a StatusResponse."""
if not hasattr(self, '_st_labels'):
return
flags = status.self_test_flags
detail = status.self_test_detail
busy = status.self_test_busy
busy_str = "RUNNING" if busy else "IDLE"
busy_color = YELLOW if busy else FG
self._st_labels["busy"].config(text=f"Busy: {busy_str}",
foreground=busy_color)
self._st_labels["flags"].config(text=f"Flags: {flags:05b}")
self._st_labels["detail"].config(text=f"Detail: 0x{detail:02X}")
# Individual test results (bit = 1 means PASS)
test_names = [
("t0", "T0 BRAM"),
("t1", "T1 CIC"),
("t2", "T2 FFT"),
("t3", "T3 Arith"),
("t4", "T4 ADC"),
]
for i, (key, name) in enumerate(test_names):
if busy:
result_str = "..."
color = YELLOW
elif flags & (1 << i):
result_str = "PASS"
color = GREEN
else:
result_str = "FAIL"
color = RED
self._st_labels[key].config(
text=f"{name}: {result_str}", foreground=color)
# AGC status readback
if hasattr(self, '_agc_labels'):
agc_str = "AUTO" if status.agc_enable else "MANUAL"
agc_color = GREEN if status.agc_enable else FG
self._agc_labels["enable"].config(
text=f"AGC: {agc_str}", foreground=agc_color)
self._agc_labels["gain"].config(
text=f"Gain: {status.agc_current_gain}")
self._agc_labels["peak"].config(
text=f"Peak: {status.agc_peak_magnitude}")
sat_color = RED if status.agc_saturation_count > 0 else FG
self._agc_labels["sat"].config(
text=f"Sat Count: {status.agc_saturation_count}",
foreground=sat_color)
# AGC visualization update
self._update_agc_visualization(status)
def _update_agc_visualization(self, status: StatusResponse):
"""Push AGC metrics into ring buffers and redraw strip charts.
Data is always accumulated (cheap), but matplotlib redraws are
throttled to ``_AGC_REDRAW_INTERVAL`` seconds to avoid saturating
the GUI event-loop when status packets arrive at 20 Hz.
"""
if not hasattr(self, '_agc_canvas'):
return
# Append to ring buffers (always — this is O(1))
self._agc_gain_history.append(status.agc_current_gain)
self._agc_peak_history.append(status.agc_peak_magnitude)
self._agc_sat_history.append(status.agc_saturation_count)
# Update indicator labels (cheap Tk config calls)
mode_str = "AUTO" if status.agc_enable else "MANUAL"
mode_color = GREEN if status.agc_enable else FG
self._agc_badge.config(text=f"AGC: {mode_str}", foreground=mode_color)
self._agc_gain_value.config(
text=f"Gain: {status.agc_current_gain}")
self._agc_peak_value.config(
text=f"Peak: {status.agc_peak_magnitude}")
total_sat = sum(self._agc_sat_history)
if total_sat > 10:
sat_color = RED
elif total_sat > 0:
sat_color = YELLOW
else:
sat_color = GREEN
self._agc_sat_badge.config(
text=f"Saturation: {total_sat}", foreground=sat_color)
# ---- Throttle matplotlib redraws ---------------------------------
now = time.monotonic()
if now - self._agc_last_redraw < self._AGC_REDRAW_INTERVAL:
return
self._agc_last_redraw = now
n = len(self._agc_gain_history)
xs = list(range(n))
# Update line plots
gain_data = list(self._agc_gain_history)
peak_data = list(self._agc_peak_history)
sat_data = list(self._agc_sat_history)
self._gain_line.set_data(xs, gain_data)
self._peak_line.set_data(xs, peak_data)
# Saturation: redraw as filled area
self._sat_line.set_data(xs, sat_data)
if self._sat_fill is not None:
self._sat_fill.remove()
self._sat_fill = self._ax_sat.fill_between(
xs, sat_data, color=RED, alpha=0.4)
# Auto-scale saturation Y axis to data
max_sat = max(sat_data) if sat_data else 0
self._ax_sat.set_ylim(-1, max(max_sat * 1.5, 5))
# Scroll X axis to keep latest data visible
if n >= self._agc_history_len:
self._ax_gain.set_xlim(0, n)
self._ax_peak.set_xlim(0, n)
self._ax_sat.set_xlim(0, n)
self._agc_canvas.draw_idle()
# --------------------------------------------------------- Display loop
def _schedule_update(self):
self._drain_ui_queue()
self._update_display()
self.root.after(self.UPDATE_INTERVAL_MS, self._schedule_update)
def _drain_ui_queue(self):
"""Process all pending cross-thread messages on the main thread."""
while True:
try:
tag, payload = self._ui_queue.get_nowait()
except queue.Empty:
break
if tag == "connect":
self._on_connect_done(payload)
elif tag == "status":
self._update_self_test_labels(payload)
elif tag == "log":
self._log_handler_append(payload)
elif tag == "replay_state":
self._on_replay_state(payload)
elif tag == "replay_index":
self._on_replay_index(*payload)
elif tag == "demo_targets":
self._on_demo_targets(payload)
elif tag == "status_msg":
self.lbl_status.config(text=str(payload), foreground=YELLOW)
def _on_replay_state(self, state: str):
if state == "playing":
self._rp_status_label.config(text="Playing", foreground=GREEN)
elif state == "paused":
self._rp_status_label.config(text="Paused", foreground=YELLOW)
elif state == "stopped":
self._rp_status_label.config(text="Stopped", foreground=FG)
def _on_replay_index(self, index: int, total: int):
self._rp_frame_label.config(text=f"{index} / {total}")
self._rp_slider.set(index)
def _on_demo_targets(self, targets: list[dict]):
"""Update the detected targets treeview from demo data."""
self._update_targets_table(targets)
def _update_targets_table(self, targets: list[dict]):
"""Refresh the detected targets treeview."""
# Clear existing rows
for item in self._tgt_tree.get_children():
self._tgt_tree.delete(item)
# Insert new rows
for t in targets:
self._tgt_tree.insert("", "end", values=(
t.get("id", ""),
f"{t.get('range_m', 0):.0f}",
f"{t.get('velocity', 0):.1f}",
f"{t.get('azimuth', 0):.1f}",
f"{t.get('snr', 0):.1f}",
t.get("class", ""),
))
def _log_handler_append(self, msg: str):
"""Append a log message to the log Text widget (main thread only)."""
with contextlib.suppress(Exception):
self.log_text.insert("end", msg + "\n")
self.log_text.see("end")
# Keep last 500 lines
lines = int(self.log_text.index("end-1c").split(".")[0])
if lines > 500:
self.log_text.delete("1.0", f"{lines - 500}.0")
def _update_display(self):
"""Pull latest frame from queue and update plots."""
frame = None
# Drain queue, keep latest
while True:
try:
frame = self.frame_queue.get_nowait()
except queue.Empty:
break
if frame is None:
return
self._current_frame = frame
self._frame_count += 1
# FPS calculation
now = time.time()
dt = now - self._fps_ts
if dt > 0.5:
self._fps = self._frame_count / dt
self._frame_count = 0
self._fps_ts = now
# Update labels
self.lbl_fps.config(text=f"{self._fps:.1f} fps")
self.lbl_detections.config(text=f"Det: {frame.detection_count}")
self.lbl_frame.config(text=f"Frame: {frame.frame_number}")
# Update range-Doppler heatmap in raw dual-subframe bin order
mag = frame.magnitude
det_shifted = frame.detections
# Stable colorscale via EMA smoothing of vmax
frame_vmax = float(np.max(mag)) if np.max(mag) > 0 else 1.0
self._vmax_ema = (self._vmax_alpha * frame_vmax +
(1.0 - self._vmax_alpha) * self._vmax_ema)
stable_vmax = max(self._vmax_ema, 1.0)
self._rd_img.set_data(mag)
self._rd_img.set_clim(vmin=0, vmax=stable_vmax)
# Update CFAR overlay in raw Doppler-bin coordinates
det_coords = np.argwhere(det_shifted > 0)
if len(det_coords) > 0:
# det_coords[:, 0] = range bin, det_coords[:, 1] = Doppler bin
range_m = (det_coords[:, 0] + 0.5) * self._range_per_bin
doppler_bins = det_coords[:, 1] + 0.5
offsets = np.column_stack([doppler_bins, range_m])
self._det_scatter.set_offsets(offsets)
else:
self._det_scatter.set_offsets(np.empty((0, 2)))
# Update waterfall
self._waterfall.append(frame.range_profile.copy())
wf_arr = np.array(list(self._waterfall))
wf_max = max(np.max(wf_arr), 1.0)
self._wf_img.set_data(wf_arr)
self._wf_img.set_clim(vmin=0, vmax=wf_max)
self._canvas.draw_idle()
class _TextHandler(logging.Handler):
"""Logging handler that posts messages to a queue for main-thread append.
Using widget.after() from background threads crashes Python 3.12 due to
GIL state corruption. Instead we post to the dashboard's _ui_queue and
let _drain_ui_queue() append on the main thread.
"""
def __init__(self, ui_queue: queue.Queue[tuple[str, object]]):
super().__init__()
self._ui_queue = ui_queue
def emit(self, record):
msg = self.format(record)
with contextlib.suppress(Exception):
self._ui_queue.put(("log", msg))
# ============================================================================
# Entry Point
# ============================================================================
def main():
parser = argparse.ArgumentParser(description="AERIS-10 Radar Dashboard")
parser.add_argument("--record", action="store_true",
help="Start HDF5 recording immediately")
parser.add_argument("--device", type=int, default=0,
help="FT2232H device index (default: 0)")
mode_group = parser.add_mutually_exclusive_group()
mode_group.add_argument("--live", action="store_true",
help="Use real FT2232H hardware (default: mock mode)")
mode_group.add_argument("--replay", type=str, default=None,
help="Auto-load replay file or directory on startup")
mode_group.add_argument("--demo", action="store_true",
help="Start in demo mode with synthetic targets")
args = parser.parse_args()
if args.live:
mock = False
mode_str = "LIVE"
else:
mock = True
mode_str = "MOCK"
recorder = DataRecorder()
root = tk.Tk()
dashboard = RadarDashboard(root, mock, recorder, device_index=args.device)
if args.record:
filepath = os.path.join(
os.getcwd(),
f"radar_{time.strftime('%Y%m%d_%H%M%S')}.h5"
)
recorder.start(filepath)
if args.replay:
dashboard._replay_load(args.replay)
if args.demo:
dashboard._start_demo()
def on_closing():
# Stop demo if active
if dashboard._demo_active:
dashboard._stop_demo()
# Stop replay if active
if dashboard._replay_ctrl is not None:
dashboard._replay_ctrl.close()
if dashboard._acq_thread is not None:
dashboard._acq_thread.stop()
dashboard._acq_thread.join(timeout=2)
if dashboard.conn is not None and dashboard.conn.is_open:
dashboard.conn.close()
if recorder.recording:
recorder.stop()
root.destroy()
root.protocol("WM_DELETE_WINDOW", on_closing)
log.info(f"Dashboard started (mode={mode_str})")
root.mainloop()
if __name__ == "__main__":
main()