Files
PLFM_RADAR/9_Firmware/9_3_GUI/radar_dashboard.py
T
Serhii ffc89f0bbd fix(rtl,gui,cosim,formal): adapt surrounding files for dual 16-pt FFT (follow-up to PR #33)
- radar_system_top.v: DC notch now masks to dop_bin[3:0] per sub-frame so both sub-frames get their DC zeroed correctly; rename DOPPLER_FFT_SIZE → DOPPLER_FRAME_CHIRPS to avoid confusion with the per-FFT size (now 16)
- radar_dashboard.py: remove fftshift (crosses sub-frame boundary), display raw Doppler bins, remove dead velocity constants
- golden_reference.py: model dual 16-pt FFT with per-sub-frame Hamming window, update DC notch and CFAR to match RTL
- fv_doppler_processor.sby: reference xfft_16.v / fft_twiddle_16.mem, raise BMC depth to 512 and cover to 1024
- fv_radar_mode_controller.sby: raise cover depth to 600
- fv_radar_mode_controller.v: pin cfg_* to reduced constants (documented as single-config proof), fix Property 5 mode guard, strengthen Cover 1
- STALE_NOTICE.md: document that real-data hex files are stale and need regeneration with external dataset

Closes #39
2026-04-06 23:15:50 +03:00

609 lines
23 KiB
Python

#!/usr/bin/env python3
"""
AERIS-10 Radar Dashboard — Board Bring-Up Edition
===================================================
Real-time visualization and control for the AERIS-10 phased-array radar
via FT601 USB 3.0 interface.
Features:
- FT601 USB reader with packet parsing (matches usb_data_interface.v)
- Real-time range-Doppler magnitude heatmap (64x32)
- CFAR detection overlay (flagged cells highlighted)
- Range profile waterfall plot (range vs. time)
- Host command sender (opcodes 0x01-0x27, 0x30, 0xFF)
- Configuration panel for all radar parameters
- HDF5 data recording for offline analysis
- Mock mode for development/testing without hardware
Usage:
python radar_dashboard.py # Launch with mock data
python radar_dashboard.py --live # Launch with FT601 hardware
python radar_dashboard.py --record # Launch with HDF5 recording
"""
import sys
import os
import time
import queue
import logging
import argparse
import threading
from typing import Optional, Dict
from collections import deque
import numpy as np
import tkinter as tk
from tkinter import ttk, filedialog
import matplotlib
matplotlib.use("TkAgg")
from matplotlib.figure import Figure
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
# Import protocol layer (no GUI deps)
from radar_protocol import (
RadarProtocol, FT601Connection, ReplayConnection,
DataRecorder, RadarAcquisition,
RadarFrame, StatusResponse, Opcode,
NUM_RANGE_BINS, NUM_DOPPLER_BINS, WATERFALL_DEPTH,
)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%H:%M:%S",
)
log = logging.getLogger("radar_dashboard")
# ============================================================================
# Dashboard GUI
# ============================================================================
# Dark theme colors
BG = "#1e1e2e"
BG2 = "#282840"
FG = "#cdd6f4"
ACCENT = "#89b4fa"
GREEN = "#a6e3a1"
RED = "#f38ba8"
YELLOW = "#f9e2af"
SURFACE = "#313244"
class RadarDashboard:
"""Main tkinter application: real-time radar visualization and control."""
UPDATE_INTERVAL_MS = 100 # 10 Hz display refresh
# Radar parameters used for range-axis scaling.
BANDWIDTH = 500e6 # Hz — chirp bandwidth
C = 3e8 # m/s — speed of light
def __init__(self, root: tk.Tk, connection: FT601Connection,
recorder: DataRecorder):
self.root = root
self.conn = connection
self.recorder = recorder
self.root.title("AERIS-10 Radar Dashboard — Bring-Up Edition")
self.root.geometry("1600x950")
self.root.configure(bg=BG)
# Frame queue (acquisition → display)
self.frame_queue: queue.Queue[RadarFrame] = queue.Queue(maxsize=8)
self._acq_thread: Optional[RadarAcquisition] = None
# Display state
self._current_frame = RadarFrame()
self._waterfall = deque(maxlen=WATERFALL_DEPTH)
for _ in range(WATERFALL_DEPTH):
self._waterfall.append(np.zeros(NUM_RANGE_BINS))
self._frame_count = 0
self._fps_ts = time.time()
self._fps = 0.0
# Stable colorscale — exponential moving average of vmax
self._vmax_ema = 1000.0
self._vmax_alpha = 0.15 # smoothing factor (lower = more stable)
self._build_ui()
self._schedule_update()
# ------------------------------------------------------------------ UI
def _build_ui(self):
style = ttk.Style()
style.theme_use("clam")
style.configure(".", background=BG, foreground=FG, fieldbackground=SURFACE)
style.configure("TFrame", background=BG)
style.configure("TLabel", background=BG, foreground=FG)
style.configure("TButton", background=SURFACE, foreground=FG)
style.configure("TLabelframe", background=BG, foreground=ACCENT)
style.configure("TLabelframe.Label", background=BG, foreground=ACCENT)
style.configure("Accent.TButton", background=ACCENT, foreground=BG)
style.configure("TNotebook", background=BG)
style.configure("TNotebook.Tab", background=SURFACE, foreground=FG,
padding=[12, 4])
style.map("TNotebook.Tab", background=[("selected", ACCENT)],
foreground=[("selected", BG)])
# Top bar
top = ttk.Frame(self.root)
top.pack(fill="x", padx=8, pady=(8, 0))
self.lbl_status = ttk.Label(top, text="DISCONNECTED", foreground=RED,
font=("Menlo", 11, "bold"))
self.lbl_status.pack(side="left", padx=8)
self.lbl_fps = ttk.Label(top, text="0.0 fps", font=("Menlo", 10))
self.lbl_fps.pack(side="left", padx=16)
self.lbl_detections = ttk.Label(top, text="Det: 0", font=("Menlo", 10))
self.lbl_detections.pack(side="left", padx=16)
self.lbl_frame = ttk.Label(top, text="Frame: 0", font=("Menlo", 10))
self.lbl_frame.pack(side="left", padx=16)
self.btn_connect = ttk.Button(top, text="Connect",
command=self._on_connect,
style="Accent.TButton")
self.btn_connect.pack(side="right", padx=4)
self.btn_record = ttk.Button(top, text="Record", command=self._on_record)
self.btn_record.pack(side="right", padx=4)
# Notebook (tabs)
nb = ttk.Notebook(self.root)
nb.pack(fill="both", expand=True, padx=8, pady=8)
tab_display = ttk.Frame(nb)
tab_control = ttk.Frame(nb)
tab_log = ttk.Frame(nb)
nb.add(tab_display, text=" Display ")
nb.add(tab_control, text=" Control ")
nb.add(tab_log, text=" Log ")
self._build_display_tab(tab_display)
self._build_control_tab(tab_control)
self._build_log_tab(tab_log)
def _build_display_tab(self, parent):
# Compute physical axis limits
# Range resolution: dR = c / (2 * BW) per range bin
# But we decimate 1024→64 bins, so each bin spans 16 FFT bins.
# Range per FFT bin = c / (2 * BW) * (Fs / FFT_SIZE) — simplified:
# max_range = c * Fs / (4 * BW) for Fs-sampled baseband
# range_per_bin = max_range / NUM_RANGE_BINS
range_res = self.C / (2.0 * self.BANDWIDTH) # ~0.3 m per FFT bin
# After decimation 1024→64, each range bin = 16 FFT bins
range_per_bin = range_res * 16
max_range = range_per_bin * NUM_RANGE_BINS
doppler_bin_lo = 0
doppler_bin_hi = NUM_DOPPLER_BINS
# Matplotlib figure with 3 subplots
self.fig = Figure(figsize=(14, 7), facecolor=BG)
self.fig.subplots_adjust(left=0.07, right=0.98, top=0.94, bottom=0.10,
wspace=0.30, hspace=0.35)
# Range-Doppler heatmap
self.ax_rd = self.fig.add_subplot(1, 3, (1, 2))
self.ax_rd.set_facecolor(BG2)
self._rd_img = self.ax_rd.imshow(
np.zeros((NUM_RANGE_BINS, NUM_DOPPLER_BINS)),
aspect="auto", cmap="inferno", origin="lower",
extent=[doppler_bin_lo, doppler_bin_hi, 0, max_range],
vmin=0, vmax=1000,
)
self.ax_rd.set_title("Range-Doppler Map", color=FG, fontsize=12)
self.ax_rd.set_xlabel("Doppler Bin (0-15: long PRI, 16-31: short PRI)", color=FG)
self.ax_rd.set_ylabel("Range (m)", color=FG)
self.ax_rd.tick_params(colors=FG)
# Save axis limits for coordinate conversions
self._max_range = max_range
self._range_per_bin = range_per_bin
# CFAR detection overlay (scatter)
self._det_scatter = self.ax_rd.scatter([], [], s=30, c=GREEN,
marker="x", linewidths=1.5,
zorder=5, label="CFAR Det")
# Waterfall plot (range profile vs time)
self.ax_wf = self.fig.add_subplot(1, 3, 3)
self.ax_wf.set_facecolor(BG2)
wf_init = np.zeros((WATERFALL_DEPTH, NUM_RANGE_BINS))
self._wf_img = self.ax_wf.imshow(
wf_init, aspect="auto", cmap="viridis", origin="lower",
extent=[0, max_range, 0, WATERFALL_DEPTH],
vmin=0, vmax=5000,
)
self.ax_wf.set_title("Range Waterfall", color=FG, fontsize=12)
self.ax_wf.set_xlabel("Range (m)", color=FG)
self.ax_wf.set_ylabel("Frame", color=FG)
self.ax_wf.tick_params(colors=FG)
canvas = FigureCanvasTkAgg(self.fig, master=parent)
canvas.draw()
canvas.get_tk_widget().pack(fill="both", expand=True)
self._canvas = canvas
def _build_control_tab(self, parent):
"""Host command sender and configuration panel."""
outer = ttk.Frame(parent)
outer.pack(fill="both", expand=True, padx=16, pady=16)
# Left column: Quick actions
left = ttk.LabelFrame(outer, text="Quick Actions", padding=12)
left.grid(row=0, column=0, sticky="nsew", padx=(0, 8))
ttk.Button(left, text="Trigger Chirp (0x01)",
command=lambda: self._send_cmd(0x01, 1)).pack(fill="x", pady=3)
ttk.Button(left, text="Enable MTI (0x26)",
command=lambda: self._send_cmd(0x26, 1)).pack(fill="x", pady=3)
ttk.Button(left, text="Disable MTI (0x26)",
command=lambda: self._send_cmd(0x26, 0)).pack(fill="x", pady=3)
ttk.Button(left, text="Enable CFAR (0x25)",
command=lambda: self._send_cmd(0x25, 1)).pack(fill="x", pady=3)
ttk.Button(left, text="Disable CFAR (0x25)",
command=lambda: self._send_cmd(0x25, 0)).pack(fill="x", pady=3)
ttk.Button(left, text="Request Status (0xFF)",
command=lambda: self._send_cmd(0xFF, 0)).pack(fill="x", pady=3)
ttk.Separator(left, orient="horizontal").pack(fill="x", pady=6)
ttk.Label(left, text="FPGA Self-Test", font=("Menlo", 10, "bold")).pack(
anchor="w", pady=(2, 0))
ttk.Button(left, text="Run Self-Test (0x30)",
command=lambda: self._send_cmd(0x30, 1)).pack(fill="x", pady=3)
ttk.Button(left, text="Read Self-Test Result (0x31)",
command=lambda: self._send_cmd(0x31, 0)).pack(fill="x", pady=3)
# Self-test result display
st_frame = ttk.LabelFrame(left, text="Self-Test Results", padding=6)
st_frame.pack(fill="x", pady=(6, 0))
self._st_labels = {}
for name, default_text in [
("busy", "Busy: --"),
("flags", "Flags: -----"),
("detail", "Detail: 0x--"),
("t0", "T0 BRAM: --"),
("t1", "T1 CIC: --"),
("t2", "T2 FFT: --"),
("t3", "T3 Arith: --"),
("t4", "T4 ADC: --"),
]:
lbl = ttk.Label(st_frame, text=default_text, font=("Menlo", 9))
lbl.pack(anchor="w")
self._st_labels[name] = lbl
# Right column: Parameter configuration
right = ttk.LabelFrame(outer, text="Parameter Configuration", padding=12)
right.grid(row=0, column=1, sticky="nsew", padx=(8, 0))
self._param_vars: Dict[str, tk.StringVar] = {}
params = [
("CFAR Guard (0x21)", 0x21, "2"),
("CFAR Train (0x22)", 0x22, "8"),
("CFAR Alpha Q4.4 (0x23)", 0x23, "48"),
("CFAR Mode (0x24)", 0x24, "0"),
("Threshold (0x10)", 0x10, "500"),
("Gain Shift (0x06)", 0x06, "0"),
("DC Notch Width (0x27)", 0x27, "0"),
("Range Mode (0x20)", 0x20, "0"),
("Stream Enable (0x05)", 0x05, "7"),
]
for row_idx, (label, opcode, default) in enumerate(params):
ttk.Label(right, text=label).grid(row=row_idx, column=0,
sticky="w", pady=2)
var = tk.StringVar(value=default)
self._param_vars[str(opcode)] = var
ent = ttk.Entry(right, textvariable=var, width=10)
ent.grid(row=row_idx, column=1, padx=8, pady=2)
ttk.Button(
right, text="Set",
command=lambda op=opcode, v=var: self._send_cmd(op, int(v.get()))
).grid(row=row_idx, column=2, pady=2)
# Custom command
ttk.Separator(right, orient="horizontal").grid(
row=len(params), column=0, columnspan=3, sticky="ew", pady=8)
ttk.Label(right, text="Custom Opcode (hex)").grid(
row=len(params) + 1, column=0, sticky="w")
self._custom_op = tk.StringVar(value="01")
ttk.Entry(right, textvariable=self._custom_op, width=10).grid(
row=len(params) + 1, column=1, padx=8)
ttk.Label(right, text="Value (dec)").grid(
row=len(params) + 2, column=0, sticky="w")
self._custom_val = tk.StringVar(value="0")
ttk.Entry(right, textvariable=self._custom_val, width=10).grid(
row=len(params) + 2, column=1, padx=8)
ttk.Button(right, text="Send Custom",
command=self._send_custom).grid(
row=len(params) + 2, column=2, pady=2)
outer.columnconfigure(0, weight=1)
outer.columnconfigure(1, weight=2)
outer.rowconfigure(0, weight=1)
def _build_log_tab(self, parent):
self.log_text = tk.Text(parent, bg=BG2, fg=FG, font=("Menlo", 10),
insertbackground=FG, wrap="word")
self.log_text.pack(fill="both", expand=True, padx=8, pady=8)
# Redirect log handler to text widget
handler = _TextHandler(self.log_text)
handler.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(message)s",
datefmt="%H:%M:%S"))
logging.getLogger().addHandler(handler)
# ------------------------------------------------------------ Actions
def _on_connect(self):
if self.conn.is_open:
# Disconnect
if self._acq_thread is not None:
self._acq_thread.stop()
self._acq_thread.join(timeout=2)
self._acq_thread = None
self.conn.close()
self.lbl_status.config(text="DISCONNECTED", foreground=RED)
self.btn_connect.config(text="Connect")
log.info("Disconnected")
return
# Open connection in a background thread to avoid blocking the GUI
self.lbl_status.config(text="CONNECTING...", foreground=YELLOW)
self.btn_connect.config(state="disabled")
self.root.update_idletasks()
def _do_connect():
ok = self.conn.open()
# Schedule UI update back on the main thread
self.root.after(0, lambda: self._on_connect_done(ok))
threading.Thread(target=_do_connect, daemon=True).start()
def _on_connect_done(self, success: bool):
"""Called on main thread after connection attempt completes."""
self.btn_connect.config(state="normal")
if success:
self.lbl_status.config(text="CONNECTED", foreground=GREEN)
self.btn_connect.config(text="Disconnect")
self._acq_thread = RadarAcquisition(
self.conn, self.frame_queue, self.recorder,
status_callback=self._on_status_received)
self._acq_thread.start()
log.info("Connected and acquisition started")
else:
self.lbl_status.config(text="CONNECT FAILED", foreground=RED)
self.btn_connect.config(text="Connect")
def _on_record(self):
if self.recorder.recording:
self.recorder.stop()
self.btn_record.config(text="Record")
return
filepath = filedialog.asksaveasfilename(
defaultextension=".h5",
filetypes=[("HDF5", "*.h5"), ("All", "*.*")],
initialfile=f"radar_{time.strftime('%Y%m%d_%H%M%S')}.h5",
)
if filepath:
self.recorder.start(filepath)
self.btn_record.config(text="Stop Rec")
def _send_cmd(self, opcode: int, value: int):
cmd = RadarProtocol.build_command(opcode, value)
ok = self.conn.write(cmd)
log.info(f"CMD 0x{opcode:02X} val={value} ({'OK' if ok else 'FAIL'})")
def _send_custom(self):
try:
op = int(self._custom_op.get(), 16)
val = int(self._custom_val.get())
self._send_cmd(op, val)
except ValueError:
log.error("Invalid custom command values")
def _on_status_received(self, status: StatusResponse):
"""Called from acquisition thread — schedule UI update on main thread."""
self.root.after(0, self._update_self_test_labels, status)
def _update_self_test_labels(self, status: StatusResponse):
"""Update the self-test result labels from a StatusResponse."""
if not hasattr(self, '_st_labels'):
return
flags = status.self_test_flags
detail = status.self_test_detail
busy = status.self_test_busy
busy_str = "RUNNING" if busy else "IDLE"
busy_color = YELLOW if busy else FG
self._st_labels["busy"].config(text=f"Busy: {busy_str}",
foreground=busy_color)
self._st_labels["flags"].config(text=f"Flags: {flags:05b}")
self._st_labels["detail"].config(text=f"Detail: 0x{detail:02X}")
# Individual test results (bit = 1 means PASS)
test_names = [
("t0", "T0 BRAM"),
("t1", "T1 CIC"),
("t2", "T2 FFT"),
("t3", "T3 Arith"),
("t4", "T4 ADC"),
]
for i, (key, name) in enumerate(test_names):
if busy:
result_str = "..."
color = YELLOW
elif flags & (1 << i):
result_str = "PASS"
color = GREEN
else:
result_str = "FAIL"
color = RED
self._st_labels[key].config(
text=f"{name}: {result_str}", foreground=color)
# --------------------------------------------------------- Display loop
def _schedule_update(self):
self._update_display()
self.root.after(self.UPDATE_INTERVAL_MS, self._schedule_update)
def _update_display(self):
"""Pull latest frame from queue and update plots."""
frame = None
# Drain queue, keep latest
while True:
try:
frame = self.frame_queue.get_nowait()
except queue.Empty:
break
if frame is None:
return
self._current_frame = frame
self._frame_count += 1
# FPS calculation
now = time.time()
dt = now - self._fps_ts
if dt > 0.5:
self._fps = self._frame_count / dt
self._frame_count = 0
self._fps_ts = now
# Update labels
self.lbl_fps.config(text=f"{self._fps:.1f} fps")
self.lbl_detections.config(text=f"Det: {frame.detection_count}")
self.lbl_frame.config(text=f"Frame: {frame.frame_number}")
# Update range-Doppler heatmap in raw dual-subframe bin order
mag = frame.magnitude
det_shifted = frame.detections
# Stable colorscale via EMA smoothing of vmax
frame_vmax = float(np.max(mag)) if np.max(mag) > 0 else 1.0
self._vmax_ema = (self._vmax_alpha * frame_vmax +
(1.0 - self._vmax_alpha) * self._vmax_ema)
stable_vmax = max(self._vmax_ema, 1.0)
self._rd_img.set_data(mag)
self._rd_img.set_clim(vmin=0, vmax=stable_vmax)
# Update CFAR overlay in raw Doppler-bin coordinates
det_coords = np.argwhere(det_shifted > 0)
if len(det_coords) > 0:
# det_coords[:, 0] = range bin, det_coords[:, 1] = Doppler bin
range_m = (det_coords[:, 0] + 0.5) * self._range_per_bin
doppler_bins = det_coords[:, 1] + 0.5
offsets = np.column_stack([doppler_bins, range_m])
self._det_scatter.set_offsets(offsets)
else:
self._det_scatter.set_offsets(np.empty((0, 2)))
# Update waterfall
self._waterfall.append(frame.range_profile.copy())
wf_arr = np.array(list(self._waterfall))
wf_max = max(np.max(wf_arr), 1.0)
self._wf_img.set_data(wf_arr)
self._wf_img.set_clim(vmin=0, vmax=wf_max)
self._canvas.draw_idle()
class _TextHandler(logging.Handler):
"""Logging handler that writes to a tkinter Text widget."""
def __init__(self, text_widget: tk.Text):
super().__init__()
self._text = text_widget
def emit(self, record):
msg = self.format(record)
try:
self._text.after(0, self._append, msg)
except Exception:
pass
def _append(self, msg: str):
self._text.insert("end", msg + "\n")
self._text.see("end")
# Keep last 500 lines
lines = int(self._text.index("end-1c").split(".")[0])
if lines > 500:
self._text.delete("1.0", f"{lines - 500}.0")
# ============================================================================
# Entry Point
# ============================================================================
def main():
parser = argparse.ArgumentParser(description="AERIS-10 Radar Dashboard")
parser.add_argument("--live", action="store_true",
help="Use real FT601 hardware (default: mock mode)")
parser.add_argument("--replay", type=str, metavar="NPY_DIR",
help="Replay real data from .npy directory "
"(e.g. tb/cosim/real_data/hex/)")
parser.add_argument("--no-mti", action="store_true",
help="With --replay, use non-MTI Doppler data")
parser.add_argument("--record", action="store_true",
help="Start HDF5 recording immediately")
parser.add_argument("--device", type=int, default=0,
help="FT601 device index (default: 0)")
args = parser.parse_args()
if args.replay:
npy_dir = os.path.abspath(args.replay)
conn = ReplayConnection(npy_dir, use_mti=not args.no_mti)
mode_str = f"REPLAY ({npy_dir}, MTI={'OFF' if args.no_mti else 'ON'})"
elif args.live:
conn = FT601Connection(mock=False)
mode_str = "LIVE"
else:
conn = FT601Connection(mock=True)
mode_str = "MOCK"
recorder = DataRecorder()
root = tk.Tk()
dashboard = RadarDashboard(root, conn, recorder)
if args.record:
filepath = os.path.join(
os.getcwd(),
f"radar_{time.strftime('%Y%m%d_%H%M%S')}.h5"
)
recorder.start(filepath)
def on_closing():
if dashboard._acq_thread is not None:
dashboard._acq_thread.stop()
dashboard._acq_thread.join(timeout=2)
if conn.is_open:
conn.close()
if recorder.recording:
recorder.stop()
root.destroy()
root.protocol("WM_DELETE_WINDOW", on_closing)
log.info(f"Dashboard started (mode={mode_str})")
root.mainloop()
if __name__ == "__main__":
main()