feat: AGC phase 7 — AGC Monitor visualization tab with throttled redraws

Add AGC Monitor tab to both tkinter and PyQt6 dashboards with:
- Real-time strip charts: gain history, peak magnitude, saturation count
- Color-coded indicator labels (green/yellow/red thresholds)
- Ring buffer architecture (deque maxlen=256, ~60s at 10 Hz)
- Fill-between saturation area with auto-scaling Y axis
- Throttled matplotlib redraws (500ms interval via time.monotonic)
  to prevent GUI hang from 20 Hz mock-mode status packets

Tests: 82 dashboard + 38 v7 = 120 total, all passing. Ruff: clean.
This commit is contained in:
Jason
2026-04-13 20:42:01 +05:45
parent 666527fa7d
commit 3ef6416e3f
4 changed files with 548 additions and 4 deletions
+171
View File
@@ -111,6 +111,16 @@ class RadarDashboard:
self._vmax_ema = 1000.0
self._vmax_alpha = 0.15 # smoothing factor (lower = more stable)
# AGC visualization history (ring buffers, ~60s at 10 Hz)
self._agc_history_len = 256
self._agc_gain_history: deque[int] = deque(maxlen=self._agc_history_len)
self._agc_peak_history: deque[int] = deque(maxlen=self._agc_history_len)
self._agc_sat_history: deque[int] = deque(maxlen=self._agc_history_len)
self._agc_time_history: deque[float] = deque(maxlen=self._agc_history_len)
self._agc_t0: float = time.time()
self._agc_last_redraw: float = 0.0 # throttle chart redraws
self._AGC_REDRAW_INTERVAL: float = 0.5 # seconds between redraws
self._build_ui()
self._schedule_update()
@@ -162,13 +172,16 @@ class RadarDashboard:
tab_display = ttk.Frame(nb)
tab_control = ttk.Frame(nb)
tab_agc = ttk.Frame(nb)
tab_log = ttk.Frame(nb)
nb.add(tab_display, text=" Display ")
nb.add(tab_control, text=" Control ")
nb.add(tab_agc, text=" AGC Monitor ")
nb.add(tab_log, text=" Log ")
self._build_display_tab(tab_display)
self._build_control_tab(tab_control)
self._build_agc_tab(tab_agc)
self._build_log_tab(tab_log)
def _build_display_tab(self, parent):
@@ -474,6 +487,91 @@ class RadarDashboard:
var.set(str(clamped))
self._send_cmd(opcode, clamped)
def _build_agc_tab(self, parent):
"""AGC Monitor tab — real-time strip charts for gain, peak, and saturation."""
# Top row: AGC status badge + saturation indicator
top = ttk.Frame(parent)
top.pack(fill="x", padx=8, pady=(8, 0))
self._agc_badge = ttk.Label(
top, text="AGC: --", font=("Menlo", 14, "bold"), foreground=FG)
self._agc_badge.pack(side="left", padx=(0, 24))
self._agc_sat_badge = ttk.Label(
top, text="Saturation: 0", font=("Menlo", 12), foreground=GREEN)
self._agc_sat_badge.pack(side="left", padx=(0, 24))
self._agc_gain_value = ttk.Label(
top, text="Gain: --", font=("Menlo", 12), foreground=ACCENT)
self._agc_gain_value.pack(side="left", padx=(0, 24))
self._agc_peak_value = ttk.Label(
top, text="Peak: --", font=("Menlo", 12), foreground=ACCENT)
self._agc_peak_value.pack(side="left")
# Matplotlib figure with 3 stacked subplots sharing x-axis (time)
self._agc_fig = Figure(figsize=(14, 7), facecolor=BG)
self._agc_fig.subplots_adjust(
left=0.07, right=0.98, top=0.95, bottom=0.08,
hspace=0.30)
# Subplot 1: FPGA inner-loop gain (4-bit, 0-15)
self._ax_gain = self._agc_fig.add_subplot(3, 1, 1)
self._ax_gain.set_facecolor(BG2)
self._ax_gain.set_title("FPGA AGC Gain (inner loop)", color=FG, fontsize=10)
self._ax_gain.set_ylabel("Gain Level", color=FG)
self._ax_gain.set_ylim(-0.5, 15.5)
self._ax_gain.tick_params(colors=FG)
self._ax_gain.set_xlim(0, self._agc_history_len)
self._gain_line, = self._ax_gain.plot(
[], [], color=ACCENT, linewidth=1.5, label="Gain")
self._ax_gain.axhline(y=0, color=RED, linewidth=0.5, alpha=0.5, linestyle="--")
self._ax_gain.axhline(y=15, color=RED, linewidth=0.5, alpha=0.5, linestyle="--")
for spine in self._ax_gain.spines.values():
spine.set_color(SURFACE)
# Subplot 2: Peak magnitude (8-bit, 0-255)
self._ax_peak = self._agc_fig.add_subplot(3, 1, 2)
self._ax_peak.set_facecolor(BG2)
self._ax_peak.set_title("Peak Magnitude", color=FG, fontsize=10)
self._ax_peak.set_ylabel("Peak (8-bit)", color=FG)
self._ax_peak.set_ylim(-5, 260)
self._ax_peak.tick_params(colors=FG)
self._ax_peak.set_xlim(0, self._agc_history_len)
self._peak_line, = self._ax_peak.plot(
[], [], color=YELLOW, linewidth=1.5, label="Peak")
# AGC target reference line (default 200)
self._agc_target_line = self._ax_peak.axhline(
y=200, color=GREEN, linewidth=1.0, alpha=0.7, linestyle="--",
label="Target (200)")
self._ax_peak.legend(loc="upper right", fontsize=8,
facecolor=BG2, edgecolor=SURFACE,
labelcolor=FG)
for spine in self._ax_peak.spines.values():
spine.set_color(SURFACE)
# Subplot 3: Saturation count (8-bit, 0-255) as bar-style fill
self._ax_sat = self._agc_fig.add_subplot(3, 1, 3)
self._ax_sat.set_facecolor(BG2)
self._ax_sat.set_title("Saturation Count", color=FG, fontsize=10)
self._ax_sat.set_ylabel("Sat Count", color=FG)
self._ax_sat.set_xlabel("Sample Index", color=FG)
self._ax_sat.set_ylim(-1, 40)
self._ax_sat.tick_params(colors=FG)
self._ax_sat.set_xlim(0, self._agc_history_len)
self._sat_fill = self._ax_sat.fill_between(
[], [], color=RED, alpha=0.6, label="Saturation")
self._sat_line, = self._ax_sat.plot(
[], [], color=RED, linewidth=1.0)
self._ax_sat.axhline(y=0, color=GREEN, linewidth=0.5, alpha=0.5, linestyle="--")
for spine in self._ax_sat.spines.values():
spine.set_color(SURFACE)
agc_canvas = FigureCanvasTkAgg(self._agc_fig, master=parent)
agc_canvas.draw()
agc_canvas.get_tk_widget().pack(fill="both", expand=True)
self._agc_canvas = agc_canvas
def _build_log_tab(self, parent):
self.log_text = tk.Text(parent, bg=BG2, fg=FG, font=("Menlo", 10),
insertbackground=FG, wrap="word")
@@ -609,6 +707,79 @@ class RadarDashboard:
text=f"Sat Count: {status.agc_saturation_count}",
foreground=sat_color)
# AGC visualization update
self._update_agc_visualization(status)
def _update_agc_visualization(self, status: StatusResponse):
"""Push AGC metrics into ring buffers and redraw strip charts.
Data is always accumulated (cheap), but matplotlib redraws are
throttled to ``_AGC_REDRAW_INTERVAL`` seconds to avoid saturating
the GUI event-loop when status packets arrive at 20 Hz.
"""
if not hasattr(self, '_agc_canvas'):
return
# Append to ring buffers (always — this is O(1))
self._agc_gain_history.append(status.agc_current_gain)
self._agc_peak_history.append(status.agc_peak_magnitude)
self._agc_sat_history.append(status.agc_saturation_count)
# Update indicator labels (cheap Tk config calls)
mode_str = "AUTO" if status.agc_enable else "MANUAL"
mode_color = GREEN if status.agc_enable else FG
self._agc_badge.config(text=f"AGC: {mode_str}", foreground=mode_color)
self._agc_current_gain_lbl.config(
text=f"Current Gain: {status.agc_current_gain}")
self._agc_current_peak_lbl.config(
text=f"Peak Mag: {status.agc_peak_magnitude}")
total_sat = sum(self._agc_sat_history)
if total_sat > 10:
sat_color = RED
elif total_sat > 0:
sat_color = YELLOW
else:
sat_color = GREEN
self._agc_sat_total_lbl.config(
text=f"Total Saturations: {total_sat}", foreground=sat_color)
# ---- Throttle matplotlib redraws ---------------------------------
now = time.monotonic()
if now - self._agc_last_redraw < self._AGC_REDRAW_INTERVAL:
return
self._agc_last_redraw = now
n = len(self._agc_gain_history)
xs = list(range(n))
# Update line plots
gain_data = list(self._agc_gain_history)
peak_data = list(self._agc_peak_history)
sat_data = list(self._agc_sat_history)
self._gain_line.set_data(xs, gain_data)
self._peak_line.set_data(xs, peak_data)
# Saturation: redraw as filled area
self._sat_line.set_data(xs, sat_data)
if self._sat_fill is not None:
self._sat_fill.remove()
self._sat_fill = self._ax_sat.fill_between(
xs, sat_data, color=RED, alpha=0.4)
# Auto-scale saturation Y axis to data
max_sat = max(sat_data) if sat_data else 0
self._ax_sat.set_ylim(-1, max(max_sat * 1.5, 5))
# Scroll X axis to keep latest data visible
if n >= self._agc_history_len:
self._ax_gain.set_xlim(0, n)
self._ax_peak.set_xlim(0, n)
self._ax_sat.set_xlim(0, n)
self._agc_canvas.draw_idle()
# --------------------------------------------------------- Display loop
def _schedule_update(self):
self._update_display()
+106
View File
@@ -840,5 +840,111 @@ class TestAGCStatusResponseDefaults(unittest.TestCase):
self.assertEqual(sr.agc_enable, 1)
# =============================================================================
# AGC Visualization — ring buffer / data model tests
# =============================================================================
class TestAGCVisualizationHistory(unittest.TestCase):
"""Test the AGC visualization ring buffer logic (no GUI required)."""
def _make_deque(self, maxlen=256):
from collections import deque
return deque(maxlen=maxlen)
def test_ring_buffer_maxlen(self):
"""Ring buffer should evict oldest when full."""
d = self._make_deque(maxlen=4)
for i in range(6):
d.append(i)
self.assertEqual(list(d), [2, 3, 4, 5])
self.assertEqual(len(d), 4)
def test_gain_history_accumulation(self):
"""Gain values accumulate correctly in a deque."""
gain_hist = self._make_deque(maxlen=256)
statuses = [
StatusResponse(agc_current_gain=g)
for g in [0, 3, 7, 15, 8, 2]
]
for st in statuses:
gain_hist.append(st.agc_current_gain)
self.assertEqual(list(gain_hist), [0, 3, 7, 15, 8, 2])
def test_peak_history_accumulation(self):
"""Peak magnitude values accumulate correctly."""
peak_hist = self._make_deque(maxlen=256)
for p in [0, 50, 200, 255, 128]:
peak_hist.append(p)
self.assertEqual(list(peak_hist), [0, 50, 200, 255, 128])
def test_saturation_total_computation(self):
"""Sum of saturation ring buffer gives running total."""
sat_hist = self._make_deque(maxlen=256)
for s in [0, 0, 5, 0, 12, 3]:
sat_hist.append(s)
self.assertEqual(sum(sat_hist), 20)
def test_saturation_color_thresholds(self):
"""Color logic: green=0, yellow=1-10, red>10."""
def sat_color(total):
if total > 10:
return "red"
if total > 0:
return "yellow"
return "green"
self.assertEqual(sat_color(0), "green")
self.assertEqual(sat_color(1), "yellow")
self.assertEqual(sat_color(10), "yellow")
self.assertEqual(sat_color(11), "red")
self.assertEqual(sat_color(255), "red")
def test_ring_buffer_eviction_preserves_latest(self):
"""After overflow, only the most recent values remain."""
d = self._make_deque(maxlen=8)
for i in range(20):
d.append(i)
self.assertEqual(list(d), [12, 13, 14, 15, 16, 17, 18, 19])
def test_empty_history_safe(self):
"""Empty ring buffer should be safe for max/sum."""
d = self._make_deque(maxlen=256)
self.assertEqual(sum(d), 0)
self.assertEqual(len(d), 0)
# max() on empty would raise — test the guard pattern used in viz code
max_sat = max(d) if d else 0
self.assertEqual(max_sat, 0)
def test_agc_mode_string(self):
"""AGC mode display string from enable flag."""
self.assertEqual(
"AUTO" if StatusResponse(agc_enable=1).agc_enable else "MANUAL",
"AUTO")
self.assertEqual(
"AUTO" if StatusResponse(agc_enable=0).agc_enable else "MANUAL",
"MANUAL")
def test_xlim_scroll_logic(self):
"""X-axis scroll: when n >= history_len, xlim should expand."""
history_len = 8
d = self._make_deque(maxlen=history_len)
for i in range(10):
d.append(i)
n = len(d)
# After 10 pushes into maxlen=8, n=8
self.assertEqual(n, history_len)
# xlim should be (0, n) for static or (n-history_len, n) for scrolling
self.assertEqual(max(0, n - history_len), 0)
self.assertEqual(n, 8)
def test_sat_autoscale_ylim(self):
"""Saturation y-axis auto-scale: max(max_sat * 1.5, 5)."""
# No saturation
self.assertEqual(max(0 * 1.5, 5), 5)
# Some saturation
self.assertAlmostEqual(max(10 * 1.5, 5), 15.0)
# High saturation
self.assertAlmostEqual(max(200 * 1.5, 5), 300.0)
if __name__ == "__main__":
unittest.main(verbosity=2)
+67
View File
@@ -334,6 +334,73 @@ class TestV7Init(unittest.TestCase):
self.assertTrue(hasattr(v7, name), f"v7 missing export: {name}")
# =============================================================================
# Test: AGC Visualization data model
# =============================================================================
class TestAGCVisualizationV7(unittest.TestCase):
"""AGC visualization ring buffer and data model tests (no Qt required)."""
def _make_deque(self, maxlen=256):
from collections import deque
return deque(maxlen=maxlen)
def test_ring_buffer_basics(self):
d = self._make_deque(maxlen=4)
for i in range(6):
d.append(i)
self.assertEqual(list(d), [2, 3, 4, 5])
def test_gain_range_4bit(self):
"""AGC gain is 4-bit (0-15)."""
from radar_protocol import StatusResponse
for g in [0, 7, 15]:
sr = StatusResponse(agc_current_gain=g)
self.assertEqual(sr.agc_current_gain, g)
def test_peak_range_8bit(self):
"""Peak magnitude is 8-bit (0-255)."""
from radar_protocol import StatusResponse
for p in [0, 128, 255]:
sr = StatusResponse(agc_peak_magnitude=p)
self.assertEqual(sr.agc_peak_magnitude, p)
def test_saturation_accumulation(self):
"""Saturation ring buffer sum tracks total events."""
sat = self._make_deque(maxlen=256)
for s in [0, 5, 0, 10, 3]:
sat.append(s)
self.assertEqual(sum(sat), 18)
def test_mode_label_logic(self):
"""AGC mode string from enable field."""
from radar_protocol import StatusResponse
self.assertEqual(
"AUTO" if StatusResponse(agc_enable=1).agc_enable else "MANUAL",
"AUTO")
self.assertEqual(
"AUTO" if StatusResponse(agc_enable=0).agc_enable else "MANUAL",
"MANUAL")
def test_history_len_default(self):
"""Default history length should be 256."""
d = self._make_deque(maxlen=256)
self.assertEqual(d.maxlen, 256)
def test_color_thresholds(self):
"""Saturation color: green=0, warning=1-10, error>10."""
from v7.models import DARK_SUCCESS, DARK_WARNING, DARK_ERROR
def pick_color(total):
if total > 10:
return DARK_ERROR
if total > 0:
return DARK_WARNING
return DARK_SUCCESS
self.assertEqual(pick_color(0), DARK_SUCCESS)
self.assertEqual(pick_color(5), DARK_WARNING)
self.assertEqual(pick_color(11), DARK_ERROR)
# =============================================================================
# Helper: lazy import of v7.models
# =============================================================================
+204 -4
View File
@@ -1,15 +1,16 @@
"""
v7.dashboard — Main application window for the PLFM Radar GUI V7.
RadarDashboard is a QMainWindow with five tabs:
RadarDashboard is a QMainWindow with six tabs:
1. Main View — Range-Doppler matplotlib canvas (64x32), device combos,
Start/Stop, targets table
2. Map View — Embedded Leaflet map + sidebar
3. FPGA Control — Full FPGA register control panel (all 27 opcodes incl. AGC,
bit-width validation, grouped layout matching production)
4. Diagnostics — Connection indicators, packet stats, dependency status,
4. AGC Monitor — Real-time AGC strip charts (gain, peak magnitude, saturation)
5. Diagnostics — Connection indicators, packet stats, dependency status,
self-test results, log viewer
5. Settings — Host-side DSP parameters + About section
6. Settings — Host-side DSP parameters + About section
Uses production radar_protocol.py for all FPGA communication:
- FT2232HConnection for real hardware
@@ -23,6 +24,7 @@ commands sent over FT2232H.
import time
import logging
from collections import deque
import numpy as np
@@ -153,6 +155,14 @@ class RadarDashboard(QMainWindow):
# FPGA control parameter widgets
self._param_spins: dict = {} # opcode_hex -> QSpinBox
# AGC visualization history (ring buffers)
self._agc_history_len = 256
self._agc_gain_history: deque[int] = deque(maxlen=self._agc_history_len)
self._agc_peak_history: deque[int] = deque(maxlen=self._agc_history_len)
self._agc_sat_history: deque[int] = deque(maxlen=self._agc_history_len)
self._agc_last_redraw: float = 0.0 # throttle chart redraws
self._AGC_REDRAW_INTERVAL: float = 0.5 # seconds between redraws
# ---- Build UI ------------------------------------------------------
self._apply_dark_theme()
self._setup_ui()
@@ -306,6 +316,7 @@ class RadarDashboard(QMainWindow):
self._create_main_tab()
self._create_map_tab()
self._create_fpga_control_tab()
self._create_agc_monitor_tab()
self._create_diagnostics_tab()
self._create_settings_tab()
@@ -783,7 +794,122 @@ class RadarDashboard(QMainWindow):
parent_layout.addLayout(row)
# -----------------------------------------------------------------
# TAB 4: Diagnostics
# TAB 4: AGC Monitor
# -----------------------------------------------------------------
def _create_agc_monitor_tab(self):
"""AGC Monitor — real-time strip charts for FPGA inner-loop AGC."""
tab = QWidget()
layout = QVBoxLayout(tab)
layout.setContentsMargins(8, 8, 8, 8)
# ---- Top indicator row ---------------------------------------------
indicator = QFrame()
indicator.setStyleSheet(
f"background-color: {DARK_ACCENT}; border-radius: 4px;")
ind_layout = QHBoxLayout(indicator)
ind_layout.setContentsMargins(12, 8, 12, 8)
self._agc_mode_lbl = QLabel("AGC: --")
self._agc_mode_lbl.setStyleSheet(
f"color: {DARK_FG}; font-size: 16px; font-weight: bold;")
ind_layout.addWidget(self._agc_mode_lbl)
self._agc_gain_lbl = QLabel("Gain: --")
self._agc_gain_lbl.setStyleSheet(
f"color: {DARK_INFO}; font-size: 14px;")
ind_layout.addWidget(self._agc_gain_lbl)
self._agc_peak_lbl = QLabel("Peak: --")
self._agc_peak_lbl.setStyleSheet(
f"color: {DARK_INFO}; font-size: 14px;")
ind_layout.addWidget(self._agc_peak_lbl)
self._agc_sat_total_lbl = QLabel("Total Saturations: 0")
self._agc_sat_total_lbl.setStyleSheet(
f"color: {DARK_SUCCESS}; font-size: 14px; font-weight: bold;")
ind_layout.addWidget(self._agc_sat_total_lbl)
ind_layout.addStretch()
layout.addWidget(indicator)
# ---- Matplotlib figure with 3 subplots -----------------------------
agc_fig = Figure(figsize=(12, 7), facecolor=DARK_BG)
agc_fig.subplots_adjust(
left=0.07, right=0.96, top=0.95, bottom=0.07,
hspace=0.32)
# Subplot 1: Gain history (4-bit, 0-15)
self._agc_ax_gain = agc_fig.add_subplot(3, 1, 1)
self._agc_ax_gain.set_facecolor(DARK_ACCENT)
self._agc_ax_gain.set_ylabel("Gain Code", color=DARK_FG, fontsize=10)
self._agc_ax_gain.set_title(
"FPGA Inner-Loop Gain (4-bit)", color=DARK_FG, fontsize=11)
self._agc_ax_gain.set_ylim(-0.5, 15.5)
self._agc_ax_gain.tick_params(colors=DARK_FG, labelsize=9)
self._agc_ax_gain.set_xlim(0, self._agc_history_len)
for spine in self._agc_ax_gain.spines.values():
spine.set_color(DARK_BORDER)
self._agc_gain_line, = self._agc_ax_gain.plot(
[], [], color="#89b4fa", linewidth=1.5, label="Gain")
self._agc_ax_gain.axhline(y=7.5, color=DARK_WARNING, linestyle="--",
linewidth=0.8, alpha=0.5, label="Midpoint")
self._agc_ax_gain.legend(
loc="upper right", fontsize=8,
facecolor=DARK_ACCENT, edgecolor=DARK_BORDER,
labelcolor=DARK_FG)
# Subplot 2: Peak magnitude (8-bit, 0-255)
self._agc_ax_peak = agc_fig.add_subplot(
3, 1, 2, sharex=self._agc_ax_gain)
self._agc_ax_peak.set_facecolor(DARK_ACCENT)
self._agc_ax_peak.set_ylabel("Peak Mag", color=DARK_FG, fontsize=10)
self._agc_ax_peak.set_title(
"ADC Peak Magnitude (8-bit)", color=DARK_FG, fontsize=11)
self._agc_ax_peak.set_ylim(-5, 260)
self._agc_ax_peak.tick_params(colors=DARK_FG, labelsize=9)
for spine in self._agc_ax_peak.spines.values():
spine.set_color(DARK_BORDER)
self._agc_peak_line, = self._agc_ax_peak.plot(
[], [], color=DARK_SUCCESS, linewidth=1.5, label="Peak")
self._agc_ax_peak.axhline(y=200, color=DARK_WARNING, linestyle="--",
linewidth=0.8, alpha=0.5,
label="Target (200)")
self._agc_ax_peak.axhspan(240, 255, alpha=0.15, color=DARK_ERROR,
label="Sat Zone")
self._agc_ax_peak.legend(
loc="upper right", fontsize=8,
facecolor=DARK_ACCENT, edgecolor=DARK_BORDER,
labelcolor=DARK_FG)
# Subplot 3: Saturation count per update (8-bit, 0-255)
self._agc_ax_sat = agc_fig.add_subplot(
3, 1, 3, sharex=self._agc_ax_gain)
self._agc_ax_sat.set_facecolor(DARK_ACCENT)
self._agc_ax_sat.set_ylabel("Sat Count", color=DARK_FG, fontsize=10)
self._agc_ax_sat.set_xlabel(
"Sample (newest right)", color=DARK_FG, fontsize=10)
self._agc_ax_sat.set_title(
"Saturation Events per Update", color=DARK_FG, fontsize=11)
self._agc_ax_sat.set_ylim(-1, 10)
self._agc_ax_sat.tick_params(colors=DARK_FG, labelsize=9)
for spine in self._agc_ax_sat.spines.values():
spine.set_color(DARK_BORDER)
self._agc_sat_line, = self._agc_ax_sat.plot(
[], [], color=DARK_ERROR, linewidth=1.0)
self._agc_sat_fill_artist = None
self._agc_ax_sat.legend(
loc="upper right", fontsize=8,
facecolor=DARK_ACCENT, edgecolor=DARK_BORDER,
labelcolor=DARK_FG)
self._agc_canvas = FigureCanvasQTAgg(agc_fig)
layout.addWidget(self._agc_canvas, stretch=1)
self._tabs.addTab(tab, "AGC Monitor")
# -----------------------------------------------------------------
# TAB 5: Diagnostics
# -----------------------------------------------------------------
def _create_diagnostics_tab(self):
@@ -1335,6 +1461,80 @@ class RadarDashboard(QMainWindow):
self._agc_labels["sat"].setText(
f"Sat Count: {st.agc_saturation_count}")
# AGC Monitor tab visualization
self._update_agc_visualization(st)
def _update_agc_visualization(self, st: StatusResponse):
"""Push AGC metrics into ring buffers and redraw AGC Monitor charts.
Data is always accumulated (cheap), but matplotlib redraws are
throttled to ``_AGC_REDRAW_INTERVAL`` seconds to avoid saturating
the GUI event-loop when status packets arrive at 20 Hz.
"""
if not hasattr(self, '_agc_canvas'):
return
# Push data into ring buffers (always — O(1))
self._agc_gain_history.append(st.agc_current_gain)
self._agc_peak_history.append(st.agc_peak_magnitude)
self._agc_sat_history.append(st.agc_saturation_count)
# Update indicator labels (cheap Qt calls)
agc_str = "AUTO" if st.agc_enable else "MANUAL"
agc_color = DARK_SUCCESS if st.agc_enable else DARK_INFO
self._agc_mode_lbl.setStyleSheet(
f"color: {agc_color}; font-size: 16px; font-weight: bold;")
self._agc_mode_lbl.setText(f"AGC: {agc_str}")
self._agc_gain_lbl.setText(f"Gain: {st.agc_current_gain}")
self._agc_peak_lbl.setText(f"Peak: {st.agc_peak_magnitude}")
total_sat = sum(self._agc_sat_history)
if total_sat > 10:
sat_color = DARK_ERROR
elif total_sat > 0:
sat_color = DARK_WARNING
else:
sat_color = DARK_SUCCESS
self._agc_sat_total_lbl.setStyleSheet(
f"color: {sat_color}; font-size: 14px; font-weight: bold;")
self._agc_sat_total_lbl.setText(f"Total Saturations: {total_sat}")
# ---- Throttle matplotlib redraws ---------------------------------
now = time.monotonic()
if now - self._agc_last_redraw < self._AGC_REDRAW_INTERVAL:
return
self._agc_last_redraw = now
n = len(self._agc_gain_history)
xs = list(range(n))
# Update line plots
gain_data = list(self._agc_gain_history)
peak_data = list(self._agc_peak_history)
sat_data = list(self._agc_sat_history)
self._agc_gain_line.set_data(xs, gain_data)
self._agc_peak_line.set_data(xs, peak_data)
self._agc_sat_line.set_data(xs, sat_data)
# Update saturation fill
if self._agc_sat_fill_artist is not None:
self._agc_sat_fill_artist.remove()
if n > 0:
self._agc_sat_fill_artist = self._agc_ax_sat.fill_between(
xs, sat_data, color=DARK_ERROR, alpha=0.4)
else:
self._agc_sat_fill_artist = None
# Auto-scale saturation y-axis
max_sat = max(sat_data) if sat_data else 1
self._agc_ax_sat.set_ylim(-1, max(max_sat * 1.3, 5))
# Scroll x-axis
self._agc_ax_gain.set_xlim(max(0, n - self._agc_history_len), n)
self._agc_canvas.draw_idle()
# =====================================================================
# Position / coverage callbacks (map sidebar)
# =====================================================================