fix: resolve all ruff lint errors across V6+ GUIs, v7 module, and FPGA cosim scripts

Fixes 25 remaining manual lint errors after auto-fix pass (94 auto-fixed earlier):
- GUI_V6.py: noqa on availability imports, bare except, unused vars, F811 redefs
- GUI_V6_Demo.py: unused app variable
- v7/models.py: noqa F401 on 8 try/except availability-check imports
- FPGA cosim: unused header/status/span vars, ambiguous 'l' renamed to 'line',
  E701 while-on-one-line split, F841 padding vars annotated

Also adds v7/ module, GUI_PyQt_Map.py, and GUI_V7_PyQt.py to version control.
Expands CI lint job to cover all 21 maintained Python files (was 4).

All 58 Python tests pass. Zero ruff errors on all target files.
This commit is contained in:
Jason
2026-04-08 19:11:40 +03:00
parent 6a117dd324
commit 57de32b172
24 changed files with 5260 additions and 91 deletions
+5 -6
View File
@@ -29,7 +29,7 @@ import sys
# Add this directory to path for imports
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from fpga_model import SignalChain, sign_extend
from fpga_model import SignalChain
# =============================================================================
@@ -107,7 +107,7 @@ def load_rtl_csv(filepath):
bb_i = []
bb_q = []
with open(filepath, 'r') as f:
header = f.readline() # Skip header
f.readline() # Skip header
for line in f:
line = line.strip()
if not line:
@@ -280,7 +280,7 @@ def compare_scenario(scenario_name):
py_i_stats = compute_signal_stats(py_i)
py_q_stats = compute_signal_stats(py_q)
print(f"\nSignal Statistics:")
print("\nSignal Statistics:")
print(f" RTL I: mean={rtl_i_stats['mean']:.1f}, rms={rtl_i_stats['rms']:.1f}, "
f"range=[{rtl_i_stats['min']}, {rtl_i_stats['max']}]")
print(f" RTL Q: mean={rtl_q_stats['mean']:.1f}, rms={rtl_q_stats['rms']:.1f}, "
@@ -352,12 +352,12 @@ def compare_scenario(scenario_name):
corr_i_aligned = compute_correlation(aligned_rtl_i, aligned_py_i)
corr_q_aligned = compute_correlation(aligned_rtl_q, aligned_py_q)
print(f"\nError Metrics (after alignment):")
print("\nError Metrics (after alignment):")
print(f" I-channel: RMS={rms_i:.2f} LSB, max={max_err_i} LSB, corr={corr_i_aligned:.6f}")
print(f" Q-channel: RMS={rms_q:.2f} LSB, max={max_err_q} LSB, corr={corr_q_aligned:.6f}")
# ---- First/last sample comparison ----
print(f"\nFirst 10 samples (after alignment):")
print("\nFirst 10 samples (after alignment):")
print(f" {'idx':>4s} {'RTL_I':>8s} {'Py_I':>8s} {'Err_I':>6s} {'RTL_Q':>8s} {'Py_Q':>8s} {'Err_Q':>6s}")
for k in range(min(10, aligned_len)):
ei = aligned_rtl_i[k] - aligned_py_i[k]
@@ -444,7 +444,6 @@ def compare_scenario(scenario_name):
print("PASS/FAIL Results:")
all_pass = True
for name, ok, detail in results:
status = "PASS" if ok else "FAIL"
mark = "[PASS]" if ok else "[FAIL]"
print(f" {mark} {name}: {detail}")
if not ok:
@@ -74,7 +74,7 @@ def load_doppler_csv(filepath):
"""
data = {}
with open(filepath, 'r') as f:
header = f.readline()
f.readline() # Skip header
for line in f:
line = line.strip()
if not line:
@@ -163,11 +163,11 @@ def compare_scenario(name, config, base_dir):
if not os.path.exists(golden_path):
print(f" ERROR: Golden CSV not found: {golden_path}")
print(f" Run: python3 gen_doppler_golden.py")
print(" Run: python3 gen_doppler_golden.py")
return False, {}
if not os.path.exists(rtl_path):
print(f" ERROR: RTL CSV not found: {rtl_path}")
print(f" Run the Verilog testbench first")
print(" Run the Verilog testbench first")
return False, {}
py_data = load_doppler_csv(golden_path)
@@ -201,7 +201,7 @@ def compare_scenario(name, config, base_dir):
else:
energy_ratio = 1.0 if rtl_energy == 0 else float('inf')
print(f"\n Global energy:")
print("\n Global energy:")
print(f" Python: {py_energy}")
print(f" RTL: {rtl_energy}")
print(f" Ratio: {energy_ratio:.4f}")
@@ -255,7 +255,7 @@ def compare_scenario(name, config, base_dir):
avg_corr_i = sum(i_correlations) / len(i_correlations)
avg_corr_q = sum(q_correlations) / len(q_correlations)
print(f"\n Per-range-bin metrics:")
print("\n Per-range-bin metrics:")
print(f" Peak Doppler bin agreement (+/-1 within sub-frame): {peak_agreements}/{RANGE_BINS} "
f"({peak_agreement_frac:.0%})")
print(f" Avg magnitude correlation: {avg_mag_corr:.4f}")
@@ -263,7 +263,7 @@ def compare_scenario(name, config, base_dir):
print(f" Avg Q-channel correlation: {avg_corr_q:.4f}")
# Show top 5 range bins by Python energy
print(f"\n Top 5 range bins by Python energy:")
print("\n Top 5 range bins by Python energy:")
top_rbins = sorted(peak_details, key=lambda x: -x['py_energy'])[:5]
for d in top_rbins:
print(f" rbin={d['rbin']:2d}: py_peak={d['py_peak']:2d}, "
@@ -291,7 +291,7 @@ def compare_scenario(name, config, base_dir):
checks.append((f'High-energy rbin avg mag_corr >= {MAG_CORR_MIN:.2f} '
f'(actual={he_mag_corr:.3f})', he_ok))
print(f"\n Pass/Fail Checks:")
print("\n Pass/Fail Checks:")
all_pass = True
for check_name, passed in checks:
status = "PASS" if passed else "FAIL"
+7 -7
View File
@@ -80,7 +80,7 @@ def load_csv(filepath):
vals_i = []
vals_q = []
with open(filepath, 'r') as f:
header = f.readline()
f.readline() # Skip header
for line in f:
line = line.strip()
if not line:
@@ -172,11 +172,11 @@ def compare_scenario(scenario_name, config, base_dir):
if not os.path.exists(golden_path):
print(f" ERROR: Golden CSV not found: {golden_path}")
print(f" Run: python3 gen_mf_cosim_golden.py")
print(" Run: python3 gen_mf_cosim_golden.py")
return False, {}
if not os.path.exists(rtl_path):
print(f" ERROR: RTL CSV not found: {rtl_path}")
print(f" Run the RTL testbench first")
print(" Run the RTL testbench first")
return False, {}
py_i, py_q = load_csv(golden_path)
@@ -205,7 +205,7 @@ def compare_scenario(scenario_name, config, base_dir):
energy_ratio = float('inf') if py_energy == 0 else 0.0
rms_ratio = float('inf') if py_rms == 0 else 0.0
print(f"\n Energy:")
print("\n Energy:")
print(f" Python total energy: {py_energy}")
print(f" RTL total energy: {rtl_energy}")
print(f" Energy ratio (RTL/Py): {energy_ratio:.4f}")
@@ -217,7 +217,7 @@ def compare_scenario(scenario_name, config, base_dir):
py_peak_bin, py_peak_mag = find_peak(py_i, py_q)
rtl_peak_bin, rtl_peak_mag = find_peak(rtl_i, rtl_q)
print(f"\n Peak location:")
print("\n Peak location:")
print(f" Python: bin={py_peak_bin}, mag={py_peak_mag}")
print(f" RTL: bin={rtl_peak_bin}, mag={rtl_peak_mag}")
@@ -242,7 +242,7 @@ def compare_scenario(scenario_name, config, base_dir):
corr_i = pearson_correlation(py_i, rtl_i)
corr_q = pearson_correlation(py_q, rtl_q)
print(f"\n Channel correlation:")
print("\n Channel correlation:")
print(f" I-channel: {corr_i:.6f}")
print(f" Q-channel: {corr_q:.6f}")
@@ -278,7 +278,7 @@ def compare_scenario(scenario_name, config, base_dir):
energy_ok))
# Print checks
print(f"\n Pass/Fail Checks:")
print("\n Pass/Fail Checks:")
all_pass = True
for name, passed in checks:
status = "PASS" if passed else "FAIL"
+1 -3
View File
@@ -19,7 +19,6 @@ Author: Phase 0.5 co-simulation suite for PLFM_RADAR
"""
import os
import struct
# =============================================================================
# Fixed-point utility functions
@@ -196,7 +195,7 @@ class NCO:
if phase_valid:
# Stage 1 NBA: phase_accum_reg <= phase_accumulator (old value)
new_phase_accum_reg = (self.phase_accumulator - ftw) & 0xFFFFFFFF # old accum before add
_new_phase_accum_reg = (self.phase_accumulator - ftw) & 0xFFFFFFFF # noqa: F841 — old accum before add (derivation reference)
# Wait - let me re-derive. The Verilog is:
# phase_accumulator <= phase_accumulator + frequency_tuning_word;
# phase_accum_reg <= phase_accumulator; // OLD value (NBA)
@@ -812,7 +811,6 @@ class FFTEngine:
# COMPUTE: LOG2N stages of butterflies
for stage in range(log2n):
half = 1 << stage
span = half << 1
tw_stride = (n >> 1) >> stage
for bfly in range(n // 2):
@@ -134,7 +134,7 @@ def main():
print("AERIS-10 Chirp .mem File Generator")
print("=" * 60)
print()
print(f"Parameters:")
print("Parameters:")
print(f" CHIRP_BW = {CHIRP_BW/1e6:.1f} MHz")
print(f" FS_SYS = {FS_SYS/1e6:.1f} MHz")
print(f" T_LONG_CHIRP = {T_LONG_CHIRP*1e6:.1f} us")
@@ -212,7 +212,7 @@ def main():
mismatches += 1
if mismatches == 0:
print(f" [PASS] Seg0 matches radar_scene.py generate_reference_chirp_q15()")
print(" [PASS] Seg0 matches radar_scene.py generate_reference_chirp_q15()")
else:
print(f" [FAIL] Seg0 has {mismatches} mismatches vs generate_reference_chirp_q15()")
return 1
@@ -225,13 +225,13 @@ def main():
# Check seg3 zero padding
seg3_i_path = os.path.join(MEM_DIR, 'long_chirp_seg3_i.mem')
with open(seg3_i_path, 'r') as f:
seg3_lines = [l.strip() for l in f if l.strip()]
nonzero_seg3 = sum(1 for l in seg3_lines if l != '0000')
seg3_lines = [line.strip() for line in f if line.strip()]
nonzero_seg3 = sum(1 for line in seg3_lines if line != '0000')
print(f" Seg3 non-zero entries: {nonzero_seg3}/{len(seg3_lines)} "
f"(expected 0 since chirp ends at sample 2999)")
if nonzero_seg3 == 0:
print(f" [PASS] Seg3 is all zeros (chirp 3000 samples < seg3 start 3072)")
print(" [PASS] Seg3 is all zeros (chirp 3000 samples < seg3 start 3072)")
else:
print(f" [WARN] Seg3 has {nonzero_seg3} non-zero entries")
@@ -18,14 +18,13 @@ Usage:
Author: Phase 0.5 Doppler co-simulation suite for PLFM_RADAR
"""
import math
import os
import sys
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from fpga_model import (
DopplerProcessor, sign_extend, HAMMING_WINDOW
DopplerProcessor
)
from radar_scene import Target, generate_doppler_frame
@@ -121,7 +120,7 @@ def generate_scenario(name, targets, description, base_dir):
"""Generate input hex + golden output for one scenario."""
print(f"\n{'='*60}")
print(f"Scenario: {name}{description}")
print(f"Model: CLEAN (dual 16-pt FFT)")
print("Model: CLEAN (dual 16-pt FFT)")
print(f"{'='*60}")
# Generate Doppler frame (32 chirps x 64 range bins)
@@ -172,7 +171,7 @@ def generate_scenario(name, targets, description, base_dir):
write_hex_32bit(golden_hex, list(zip(flat_i, flat_q)))
# ---- Find peak per range bin ----
print(f"\n Peak Doppler bins per range bin (top 5 by magnitude):")
print("\n Peak Doppler bins per range bin (top 5 by magnitude):")
peak_info = []
for rbin in range(RANGE_BINS):
mags = [abs(doppler_i[rbin][d]) + abs(doppler_q[rbin][d])
@@ -25,8 +25,8 @@ import sys
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from fpga_model import (
FFTEngine, FreqMatchedFilter, MatchedFilterChain,
RangeBinDecimator, sign_extend, saturate
MatchedFilterChain,
sign_extend, saturate
)
@@ -208,7 +208,6 @@ def generate_long_chirp_test():
input_buffer_i = [0] * BUFFER_SIZE
input_buffer_q = [0] * BUFFER_SIZE
buffer_write_ptr = 0
current_segment = 0
input_idx = 0
chirp_samples_collected = 0
@@ -342,8 +341,9 @@ def generate_short_chirp_test():
input_q.append(saturate(val_q, 16))
# Zero-pad to 1024 (as RTL does in ST_ZERO_PAD)
padded_i = list(input_i) + [0] * (BUFFER_SIZE - SHORT_SAMPLES)
padded_q = list(input_q) + [0] * (BUFFER_SIZE - SHORT_SAMPLES)
# Note: padding computed here for documentation; actual buffer uses buf_i/buf_q below
_padded_i = list(input_i) + [0] * (BUFFER_SIZE - SHORT_SAMPLES) # noqa: F841
_padded_q = list(input_q) + [0] * (BUFFER_SIZE - SHORT_SAMPLES) # noqa: F841
# The buffer truncation: ddc_i[17:2] + ddc_i[1]
# For data already 16-bit sign-extended to 18: result is (val >> 2) + bit1
+2 -3
View File
@@ -21,7 +21,6 @@ Author: Phase 0.5 co-simulation suite for PLFM_RADAR
import math
import os
import struct
# =============================================================================
@@ -156,7 +155,7 @@ def generate_if_chirp(n_samples, chirp_bw=CHIRP_BW, f_if=F_IF, fs=FS_ADC):
t = n / fs
# Instantaneous frequency: f_if - chirp_bw/2 + chirp_rate * t
# Phase: integral of 2*pi*f(t)*dt
f_inst = f_if - chirp_bw / 2 + chirp_rate * t
_f_inst = f_if - chirp_bw / 2 + chirp_rate * t # noqa: F841 — documents instantaneous frequency formula
phase = 2 * math.pi * (f_if - chirp_bw / 2) * t + math.pi * chirp_rate * t * t
chirp_i.append(math.cos(phase))
chirp_q.append(math.sin(phase))
@@ -668,7 +667,7 @@ def generate_all_test_vectors(output_dir=None):
f.write(f" ADC: {FS_ADC/1e6:.0f} MSPS, {ADC_BITS}-bit\n")
f.write(f" Range resolution: {RANGE_RESOLUTION:.1f} m\n")
f.write(f" Wavelength: {WAVELENGTH*1000:.2f} mm\n")
f.write(f"\n")
f.write("\n")
f.write("Scenario 1: Single target\n")
for t in targets1:
@@ -20,7 +20,6 @@ Usage:
import numpy as np
import os
import sys
import argparse
# ===========================================================================
@@ -787,7 +786,7 @@ def run_mti_canceller(decim_i, decim_q, enable=True):
if not enable:
mti_i[:] = decim_i
mti_q[:] = decim_q
print(f" Pass-through mode (MTI disabled)")
print(" Pass-through mode (MTI disabled)")
return mti_i, mti_q
for c in range(n_chirps):
@@ -803,7 +802,7 @@ def run_mti_canceller(decim_i, decim_q, enable=True):
mti_i[c, r] = saturate(diff_i, 16)
mti_q[c, r] = saturate(diff_q, 16)
print(f" Chirp 0: muted (zeros)")
print(" Chirp 0: muted (zeros)")
print(f" Chirps 1-{n_chirps-1}: I range [{mti_i[1:].min()}, {mti_i[1:].max()}], "
f"Q range [{mti_q[1:].min()}, {mti_q[1:].max()}]")
return mti_i, mti_q
@@ -839,7 +838,7 @@ def run_dc_notch(doppler_i, doppler_q, width=2):
print(f"[DC NOTCH] width={width}, {n_range} range bins x {n_doppler} Doppler bins (dual sub-frame)")
if width == 0:
print(f" Pass-through (width=0)")
print(" Pass-through (width=0)")
return notched_i, notched_q
zeroed_count = 0
@@ -1029,7 +1028,7 @@ def run_float_reference(iq_i, iq_q):
Uses the exact same RTL Hamming window coefficients (Q15) to isolate
only the FFT fixed-point quantization error.
"""
print(f"\n[FLOAT REF] Running floating-point reference pipeline")
print("\n[FLOAT REF] Running floating-point reference pipeline")
n_chirps, n_samples = iq_i.shape[0], iq_i.shape[1] if iq_i.ndim == 2 else len(iq_i)
@@ -1384,10 +1383,10 @@ def main():
cfar_detections = np.argwhere(cfar_flags)
cfar_det_list_file = os.path.join(output_dir, "fullchain_cfar_detections.txt")
with open(cfar_det_list_file, 'w') as f:
f.write(f"# AERIS-10 Full-Chain CFAR Detection List\n")
f.write("# AERIS-10 Full-Chain CFAR Detection List\n")
f.write(f"# Chain: decim -> MTI -> Doppler -> DC notch(w={DC_NOTCH_WIDTH}) -> CA-CFAR\n")
f.write(f"# CFAR: guard={CFAR_GUARD}, train={CFAR_TRAIN}, alpha=0x{CFAR_ALPHA:02X}, mode={CFAR_MODE}\n")
f.write(f"# Format: range_bin doppler_bin magnitude threshold\n")
f.write("# Format: range_bin doppler_bin magnitude threshold\n")
for det in cfar_detections:
r, d = det
f.write(f"{r} {d} {cfar_mag[r, d]} {cfar_thr[r, d]}\n")
@@ -1406,9 +1405,9 @@ def main():
# Save full-chain detection reference
fc_det_file = os.path.join(output_dir, "fullchain_detections.txt")
with open(fc_det_file, 'w') as f:
f.write(f"# AERIS-10 Full-Chain Golden Reference Detections\n")
f.write("# AERIS-10 Full-Chain Golden Reference Detections\n")
f.write(f"# Threshold: {args.threshold}\n")
f.write(f"# Format: range_bin doppler_bin magnitude\n")
f.write("# Format: range_bin doppler_bin magnitude\n")
for d in fc_detections:
rbin, dbin = d
f.write(f"{rbin} {dbin} {fc_mag[rbin, dbin]}\n")
@@ -1433,9 +1432,9 @@ def main():
# Save detection list
det_file = os.path.join(output_dir, "detections.txt")
with open(det_file, 'w') as f:
f.write(f"# AERIS-10 Golden Reference Detections\n")
f.write("# AERIS-10 Golden Reference Detections\n")
f.write(f"# Threshold: {args.threshold}\n")
f.write(f"# Format: range_bin doppler_bin magnitude\n")
f.write("# Format: range_bin doppler_bin magnitude\n")
for d in detections:
rbin, dbin = d
f.write(f"{rbin} {dbin} {mag[rbin, dbin]}\n")
@@ -1484,12 +1483,12 @@ def main():
print(f" Range FFT: {FFT_SIZE}-point → {snr_range:.1f} dB vs float")
print(f" Doppler FFT (direct): {DOPPLER_FFT_SIZE}-point Hamming → {snr_doppler:.1f} dB vs float")
print(f" Detections (direct): {len(detections)} (threshold={args.threshold})")
print(f" Full-chain decimator: 1024→64 peak detection")
print(" Full-chain decimator: 1024→64 peak detection")
print(f" Full-chain detections: {len(fc_detections)} (threshold={args.threshold})")
print(f" MTI+CFAR chain: decim → MTI → Doppler → DC notch(w={DC_NOTCH_WIDTH}) → CA-CFAR")
print(f" CFAR detections: {len(cfar_detections)} (guard={CFAR_GUARD}, train={CFAR_TRAIN}, alpha=0x{CFAR_ALPHA:02X})")
print(f" Hex stimulus files: {output_dir}/")
print(f" Ready for RTL co-simulation with Icarus Verilog")
print(" Ready for RTL co-simulation with Icarus Verilog")
# -----------------------------------------------------------------------
# Optional plots
@@ -206,7 +206,7 @@ def test_long_chirp():
expected_max_from_model = 32767 * 0.9
uses_model_scaling = max_mag > expected_max_from_model * 0.8
if uses_model_scaling:
print(f" Scaling: CONSISTENT with radar_scene.py model (0.9 * Q15)")
print(" Scaling: CONSISTENT with radar_scene.py model (0.9 * Q15)")
else:
warn(f"Magnitude ({max_mag:.0f}) is much lower than expected from Python model "
f"({expected_max_from_model:.0f}). .mem files may have unknown provenance.")
@@ -246,7 +246,7 @@ def test_long_chirp():
f_max = max(freq_estimates)
f_range = f_max - f_min
print(f"\n Instantaneous frequency analysis (post-DDC baseband):")
print("\n Instantaneous frequency analysis (post-DDC baseband):")
print(f" Start freq: {f_start/1e6:.3f} MHz")
print(f" End freq: {f_end/1e6:.3f} MHz")
print(f" Min freq: {f_min/1e6:.3f} MHz")
@@ -269,7 +269,7 @@ def test_long_chirp():
# Compare segment boundaries for overlap-save consistency
# In proper overlap-save, the chirp data should be segmented at 896-sample boundaries
# with segments being 1024-sample FFT blocks
print(f"\n Segment boundary analysis:")
print("\n Segment boundary analysis:")
for seg in range(4):
seg_i = read_mem_hex(f'long_chirp_seg{seg}_i.mem')
seg_q = read_mem_hex(f'long_chirp_seg{seg}_q.mem')
@@ -290,9 +290,9 @@ def test_long_chirp():
print(f" Seg {seg}: avg_mag={seg_avg:.1f}, max_mag={seg_max:.1f}, "
f"near-zero={zero_count}/{len(seg_mags)}")
if zero_count > 500:
print(f" -> Seg 3 mostly zeros (chirp shorter than 4096 samples)")
print(" -> Seg 3 mostly zeros (chirp shorter than 4096 samples)")
else:
print(f" -> Seg 3 has significant data throughout")
print(" -> Seg 3 has significant data throughout")
else:
print(f" Seg {seg}: avg_mag={seg_avg:.1f}, max_mag={seg_max:.1f}")
@@ -330,8 +330,10 @@ def test_short_chirp():
freq_est = []
for n in range(1, len(phases)):
dp = phases[n] - phases[n-1]
while dp > math.pi: dp -= 2 * math.pi
while dp < -math.pi: dp += 2 * math.pi
while dp > math.pi:
dp -= 2 * math.pi
while dp < -math.pi:
dp += 2 * math.pi
freq_est.append(dp * FS_SYS / (2 * math.pi))
if freq_est:
@@ -382,9 +384,9 @@ def test_chirp_vs_model():
print(f" Exact I matches: {matches}/{len(model_i)}")
if matches > len(model_i) * 0.9:
print(f" -> .mem files MATCH Python model")
print(" -> .mem files MATCH Python model")
else:
warn(f".mem files do NOT match Python model. They likely have different provenance.")
warn(".mem files do NOT match Python model. They likely have different provenance.")
# Try to detect scaling
if mem_max > 0:
ratio = model_max / mem_max
@@ -399,14 +401,16 @@ def test_chirp_vs_model():
phase_diffs = []
for mp, fp in zip(model_phases, mem_phases):
d = mp - fp
while d > math.pi: d -= 2 * math.pi
while d < -math.pi: d += 2 * math.pi
while d > math.pi:
d -= 2 * math.pi
while d < -math.pi:
d += 2 * math.pi
phase_diffs.append(d)
avg_phase_diff = sum(phase_diffs) / len(phase_diffs)
max_phase_diff = max(abs(d) for d in phase_diffs)
print(f"\n Phase comparison (shape regardless of amplitude):")
print("\n Phase comparison (shape regardless of amplitude):")
print(f" Avg phase diff: {avg_phase_diff:.4f} rad ({math.degrees(avg_phase_diff):.2f} deg)")
print(f" Max phase diff: {max_phase_diff:.4f} rad ({math.degrees(max_phase_diff):.2f} deg)")
+2 -2
View File
@@ -91,7 +91,7 @@ def generate_case(case_num, sig_i, sig_q, ref_i, ref_q, description, outdir):
peak_q_q = out_q_q[peak_bin]
# Write hex files
prefix = os.path.join(outdir, f"mf_golden")
prefix = os.path.join(outdir, "mf_golden")
write_hex_file(f"{prefix}_sig_i_case{case_num}.hex", sig_i)
write_hex_file(f"{prefix}_sig_q_case{case_num}.hex", sig_q)
write_hex_file(f"{prefix}_ref_i_case{case_num}.hex", ref_i)
@@ -233,7 +233,7 @@ def main():
f.write(f" Peak Q (float): {s['peak_q_float']:.6f}\n")
f.write(f" Peak I (quantized): {s['peak_i_quant']}\n")
f.write(f" Peak Q (quantized): {s['peak_q_quant']}\n")
f.write(f" Files:\n")
f.write(" Files:\n")
for fname in s["files"]:
f.write(f" {fname}\n")
f.write("\n")
File diff suppressed because it is too large Load Diff
+10 -14
View File
@@ -8,15 +8,11 @@ import numpy as np
import matplotlib.pyplot as plt
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
from matplotlib.figure import Figure
import matplotlib.patches as patches
import logging
from dataclasses import dataclass
from typing import Dict, List, Tuple, Optional
from scipy import signal
from sklearn.cluster import DBSCAN
from filterpy.kalman import KalmanFilter
import crcmod
import math
import webbrowser
import tempfile
import os
@@ -30,9 +26,9 @@ except ImportError:
logging.warning("pyusb not available. USB functionality will be disabled.")
try:
from pyftdi.ftdi import Ftdi
from pyftdi.usbtools import UsbTools
from pyftdi.ftdi import FtdiError
from pyftdi.ftdi import Ftdi # noqa: F401
from pyftdi.usbtools import UsbTools # noqa: F401
from pyftdi.ftdi import FtdiError # noqa: F401
FTDI_AVAILABLE = True
except ImportError:
FTDI_AVAILABLE = False
@@ -294,7 +290,7 @@ class FT601Interface:
'device': dev,
'serial': serial
})
except Exception as e:
except Exception:
devices.append({
'description': f"FT601 USB3.0 (VID:{vid:04X}, PID:{pid:04X})",
'vendor_id': vid,
@@ -553,7 +549,7 @@ class STM32USBInterface:
'product_id': pid,
'device': dev
})
except:
except Exception:
devices.append({
'description': f"STM32 CDC (VID:{vid:04X}, PID:{pid:04X})",
'vendor_id': vid,
@@ -910,7 +906,7 @@ class RadarPacketParser:
if len(packet) < 6:
return None
sync = packet[0:2]
_sync = packet[0:2] # noqa: F841
packet_type = packet[2]
length = packet[3]
@@ -1335,8 +1331,8 @@ class RadarGUI:
logging.info("Radar system stopped")
def process_radar_data(self):
"""Process incoming radar data from FT601"""
def _process_radar_data_ft601(self):
"""Process incoming radar data from FT601 (legacy, superseded by FTDI version)."""
buffer = bytearray()
while True:
if self.running and self.ft601_interface.is_open:
@@ -1375,7 +1371,7 @@ class RadarGUI:
# This should match your packet structure
return 64 # Example: 64-byte packets
def process_gps_data(self):
def _process_gps_data_ft601(self):
"""Step 16/17: Process GPS data from STM32 via USB CDC"""
while True:
if self.running and self.stm32_usb_interface.is_open:
@@ -1716,7 +1712,7 @@ def main():
"""Main application entry point"""
try:
root = tk.Tk()
app = RadarGUI(root)
_app = RadarGUI(root) # noqa: F841 must stay alive for mainloop
root.mainloop()
except Exception as e:
logging.error(f"Application error: {e}")
+2 -5
View File
@@ -8,8 +8,6 @@ All buttons work, simulated radar data is generated in real-time
import tkinter as tk
from tkinter import ttk, messagebox
import threading
import queue
import time
import numpy as np
import matplotlib.pyplot as plt
@@ -17,10 +15,9 @@ from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
from matplotlib.figure import Figure
import logging
from dataclasses import dataclass
from typing import List, Dict, Optional
from typing import List, Dict
import random
import json
import os
from datetime import datetime
# Configure logging
@@ -1200,7 +1197,7 @@ def main():
root = tk.Tk()
# Create application
app = RadarDemoGUI(root)
_app = RadarDemoGUI(root) # noqa: F841 — keeps reference alive
# Center window
root.update_idletasks()
+40
View File
@@ -0,0 +1,40 @@
#!/usr/bin/env python3
"""
PLFM Radar System GUI V7 — PyQt6 Edition
Entry point. Launches the RadarDashboard main window.
Usage:
python GUI_V7_PyQt.py
"""
import sys
import logging
from PyQt6.QtWidgets import QApplication
from PyQt6.QtGui import QFont
from v7 import RadarDashboard
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)-8s %(name)s %(message)s",
)
logger = logging.getLogger(__name__)
def main():
app = QApplication(sys.argv)
app.setApplicationName("PLFM Radar System V7")
app.setApplicationVersion("7.0.0")
app.setFont(QFont("Segoe UI", 10))
window = RadarDashboard()
window.show()
logger.info("PLFM Radar GUI V7 started")
sys.exit(app.exec())
if __name__ == "__main__":
main()
+80
View File
@@ -0,0 +1,80 @@
"""
v7 — PLFM Radar GUI V7 (PyQt6 edition).
Re-exports all public classes and functions from sub-modules for convenient
top-level imports:
from v7 import RadarDashboard, RadarTarget, RadarSettings, ...
"""
# Models / constants
from .models import (
RadarTarget,
RadarSettings,
GPSData,
ProcessingConfig,
TileServer,
DARK_BG, DARK_FG, DARK_ACCENT, DARK_HIGHLIGHT, DARK_BORDER,
DARK_TEXT, DARK_BUTTON, DARK_BUTTON_HOVER,
DARK_TREEVIEW, DARK_TREEVIEW_ALT,
DARK_SUCCESS, DARK_WARNING, DARK_ERROR, DARK_INFO,
USB_AVAILABLE, FTDI_AVAILABLE, SCIPY_AVAILABLE,
SKLEARN_AVAILABLE, FILTERPY_AVAILABLE, CRCMOD_AVAILABLE,
)
# Hardware interfaces
from .hardware import (
FT2232HQInterface,
STM32USBInterface,
)
# Processing pipeline
from .processing import (
RadarProcessor,
RadarPacketParser,
USBPacketParser,
apply_pitch_correction,
)
# Workers and simulator
from .workers import (
RadarDataWorker,
GPSDataWorker,
TargetSimulator,
polar_to_geographic,
)
# Map widget
from .map_widget import (
MapBridge,
RadarMapWidget,
)
# Main dashboard
from .dashboard import (
RadarDashboard,
RangeDopplerCanvas,
)
__all__ = [
# models
"RadarTarget", "RadarSettings", "GPSData", "ProcessingConfig", "TileServer",
"DARK_BG", "DARK_FG", "DARK_ACCENT", "DARK_HIGHLIGHT", "DARK_BORDER",
"DARK_TEXT", "DARK_BUTTON", "DARK_BUTTON_HOVER",
"DARK_TREEVIEW", "DARK_TREEVIEW_ALT",
"DARK_SUCCESS", "DARK_WARNING", "DARK_ERROR", "DARK_INFO",
"USB_AVAILABLE", "FTDI_AVAILABLE", "SCIPY_AVAILABLE",
"SKLEARN_AVAILABLE", "FILTERPY_AVAILABLE", "CRCMOD_AVAILABLE",
# hardware
"FT2232HQInterface", "STM32USBInterface",
# processing
"RadarProcessor", "RadarPacketParser", "USBPacketParser",
"apply_pitch_correction",
# workers
"RadarDataWorker", "GPSDataWorker", "TargetSimulator",
"polar_to_geographic",
# map
"MapBridge", "RadarMapWidget",
# dashboard
"RadarDashboard", "RangeDopplerCanvas",
]
File diff suppressed because it is too large Load Diff
+307
View File
@@ -0,0 +1,307 @@
"""
v7.hardware — Hardware interface classes for the PLFM Radar GUI V7.
Provides two USB hardware interfaces:
- FT2232HQInterface (PRIMARY — USB 2.0, VID 0x0403 / PID 0x6010)
- STM32USBInterface (USB CDC for commands and GPS)
"""
import struct
import logging
from typing import List, Dict, Optional
from .models import (
USB_AVAILABLE, FTDI_AVAILABLE,
RadarSettings,
)
if USB_AVAILABLE:
import usb.core
import usb.util
if FTDI_AVAILABLE:
from pyftdi.ftdi import Ftdi
from pyftdi.usbtools import UsbTools
logger = logging.getLogger(__name__)
# =============================================================================
# FT2232HQ Interface — PRIMARY data path (USB 2.0)
# =============================================================================
class FT2232HQInterface:
"""
Interface for FT2232HQ (USB 2.0 Hi-Speed) in synchronous FIFO mode.
This is the **primary** radar data interface.
VID/PID: 0x0403 / 0x6010
"""
VID = 0x0403
PID = 0x6010
def __init__(self):
self.ftdi: Optional[object] = None
self.is_open: bool = False
# ---- enumeration -------------------------------------------------------
def list_devices(self) -> List[Dict]:
"""List available FT2232H devices using pyftdi."""
if not FTDI_AVAILABLE:
logger.warning("pyftdi not available — cannot enumerate FT2232H devices")
return []
try:
devices = []
for device_desc in UsbTools.find_all([(self.VID, self.PID)]):
devices.append({
"description": f"FT2232H Device {device_desc}",
"url": f"ftdi://{device_desc}/1",
})
return devices
except Exception as e:
logger.error(f"Error listing FT2232H devices: {e}")
return []
# ---- open / close ------------------------------------------------------
def open_device(self, device_url: str) -> bool:
"""Open FT2232H device in synchronous FIFO mode."""
if not FTDI_AVAILABLE:
logger.error("pyftdi not available — cannot open device")
return False
try:
self.ftdi = Ftdi()
self.ftdi.open_from_url(device_url)
# Synchronous FIFO mode
self.ftdi.set_bitmode(0xFF, Ftdi.BitMode.SYNCFF)
# Low-latency timer (2 ms)
self.ftdi.set_latency_timer(2)
# Purge stale data
self.ftdi.purge_buffers()
self.is_open = True
logger.info(f"FT2232H device opened: {device_url}")
return True
except Exception as e:
logger.error(f"Error opening FT2232H device: {e}")
self.ftdi = None
return False
def close(self):
"""Close FT2232H device."""
if self.ftdi and self.is_open:
try:
self.ftdi.close()
except Exception as e:
logger.error(f"Error closing FT2232H device: {e}")
finally:
self.is_open = False
self.ftdi = None
# ---- data I/O ----------------------------------------------------------
def read_data(self, bytes_to_read: int = 4096) -> Optional[bytes]:
"""Read data from FT2232H."""
if not self.is_open or self.ftdi is None:
return None
try:
data = self.ftdi.read_data(bytes_to_read)
if data:
return bytes(data)
return None
except Exception as e:
logger.error(f"Error reading from FT2232H: {e}")
return None
# =============================================================================
# STM32 USB CDC Interface — commands & GPS data
# =============================================================================
class STM32USBInterface:
"""
Interface for STM32 USB CDC (Virtual COM Port).
Used to:
- Send start flag and radar settings to the MCU
- Receive GPS data from the MCU
"""
STM32_VID_PIDS = [
(0x0483, 0x5740), # STM32 Virtual COM Port
(0x0483, 0x3748), # STM32 Discovery
(0x0483, 0x374B),
(0x0483, 0x374D),
(0x0483, 0x374E),
(0x0483, 0x3752),
]
def __init__(self):
self.device = None
self.is_open: bool = False
self.ep_in = None
self.ep_out = None
# ---- enumeration -------------------------------------------------------
def list_devices(self) -> List[Dict]:
"""List available STM32 USB CDC devices."""
if not USB_AVAILABLE:
logger.warning("pyusb not available — cannot enumerate STM32 devices")
return []
devices = []
try:
for vid, pid in self.STM32_VID_PIDS:
found = usb.core.find(find_all=True, idVendor=vid, idProduct=pid)
for dev in found:
try:
product = (usb.util.get_string(dev, dev.iProduct)
if dev.iProduct else "STM32 CDC")
serial = (usb.util.get_string(dev, dev.iSerialNumber)
if dev.iSerialNumber else "Unknown")
devices.append({
"description": f"{product} ({serial})",
"vendor_id": vid,
"product_id": pid,
"device": dev,
})
except Exception:
devices.append({
"description": f"STM32 CDC (VID:{vid:04X}, PID:{pid:04X})",
"vendor_id": vid,
"product_id": pid,
"device": dev,
})
except Exception as e:
logger.error(f"Error listing STM32 devices: {e}")
return devices
# ---- open / close ------------------------------------------------------
def open_device(self, device_info: Dict) -> bool:
"""Open STM32 USB CDC device."""
if not USB_AVAILABLE:
logger.error("pyusb not available — cannot open STM32 device")
return False
try:
self.device = device_info["device"]
if self.device.is_kernel_driver_active(0):
self.device.detach_kernel_driver(0)
self.device.set_configuration()
cfg = self.device.get_active_configuration()
intf = cfg[(0, 0)]
self.ep_out = usb.util.find_descriptor(
intf,
custom_match=lambda e: (
usb.util.endpoint_direction(e.bEndpointAddress)
== usb.util.ENDPOINT_OUT
),
)
self.ep_in = usb.util.find_descriptor(
intf,
custom_match=lambda e: (
usb.util.endpoint_direction(e.bEndpointAddress)
== usb.util.ENDPOINT_IN
),
)
if self.ep_out is None or self.ep_in is None:
logger.error("Could not find STM32 CDC endpoints")
return False
self.is_open = True
logger.info(f"STM32 USB device opened: {device_info.get('description', '')}")
return True
except Exception as e:
logger.error(f"Error opening STM32 device: {e}")
return False
def close(self):
"""Close STM32 USB device."""
if self.device and self.is_open:
try:
usb.util.dispose_resources(self.device)
except Exception as e:
logger.error(f"Error closing STM32 device: {e}")
self.is_open = False
self.device = None
self.ep_in = None
self.ep_out = None
# ---- commands ----------------------------------------------------------
def send_start_flag(self) -> bool:
"""Send start flag to STM32 (4-byte magic)."""
start_packet = bytes([23, 46, 158, 237])
logger.info("Sending start flag to STM32 via USB...")
return self._send_data(start_packet)
def send_settings(self, settings: RadarSettings) -> bool:
"""Send radar settings binary packet to STM32."""
try:
packet = self._create_settings_packet(settings)
logger.info("Sending radar settings to STM32 via USB...")
return self._send_data(packet)
except Exception as e:
logger.error(f"Error sending settings via USB: {e}")
return False
# ---- data I/O ----------------------------------------------------------
def read_data(self, size: int = 64, timeout: int = 1000) -> Optional[bytes]:
"""Read data from STM32 via USB CDC."""
if not self.is_open or self.ep_in is None:
return None
try:
data = self.ep_in.read(size, timeout=timeout)
return bytes(data)
except Exception:
# Timeout or other USB error
return None
# ---- internal helpers --------------------------------------------------
def _send_data(self, data: bytes) -> bool:
if not self.is_open or self.ep_out is None:
return False
try:
packet_size = 64
for i in range(0, len(data), packet_size):
chunk = data[i : i + packet_size]
if len(chunk) < packet_size:
chunk += b"\x00" * (packet_size - len(chunk))
self.ep_out.write(chunk)
return True
except Exception as e:
logger.error(f"Error sending data via USB: {e}")
return False
@staticmethod
def _create_settings_packet(settings: RadarSettings) -> bytes:
"""Create binary settings packet: 'SET' ... 'END'."""
packet = b"SET"
packet += struct.pack(">d", settings.system_frequency)
packet += struct.pack(">d", settings.chirp_duration_1)
packet += struct.pack(">d", settings.chirp_duration_2)
packet += struct.pack(">I", settings.chirps_per_position)
packet += struct.pack(">d", settings.freq_min)
packet += struct.pack(">d", settings.freq_max)
packet += struct.pack(">d", settings.prf1)
packet += struct.pack(">d", settings.prf2)
packet += struct.pack(">d", settings.max_distance)
packet += struct.pack(">d", settings.map_size)
packet += b"END"
return packet
+532
View File
@@ -0,0 +1,532 @@
"""
v7.map_widget — Embedded Leaflet.js map widget for the PLFM Radar GUI V7.
Classes:
- MapBridge — QObject exposed to JavaScript via QWebChannel
- RadarMapWidget — QWidget wrapping QWebEngineView with Leaflet map
The full HTML/CSS/JS for Leaflet is generated inline (no external files).
Supports: OSM, Google, Google Sat, Google Hybrid, ESRI Sat tile servers;
coverage circle, target trails, velocity-based color coding, popups, legend.
"""
import json
import logging
from typing import List
from PyQt6.QtWidgets import (
QWidget, QVBoxLayout, QHBoxLayout, QFrame,
QComboBox, QCheckBox, QPushButton, QLabel,
)
from PyQt6.QtCore import Qt, pyqtSignal, pyqtSlot, QObject
from PyQt6.QtWebEngineWidgets import QWebEngineView
from PyQt6.QtWebChannel import QWebChannel
from .models import (
GPSData, RadarTarget, TileServer,
DARK_BG, DARK_FG, DARK_ACCENT, DARK_BORDER,
DARK_TEXT, DARK_BUTTON, DARK_BUTTON_HOVER,
DARK_SUCCESS, DARK_INFO,
)
logger = logging.getLogger(__name__)
# =============================================================================
# MapBridge — Python <-> JavaScript
# =============================================================================
class MapBridge(QObject):
"""Bridge object registered with QWebChannel for JS ↔ Python calls."""
mapClicked = pyqtSignal(float, float) # lat, lon
markerClicked = pyqtSignal(int) # target_id
mapReady = pyqtSignal()
def __init__(self, parent=None):
super().__init__(parent)
self._map_ready = False
@pyqtSlot(float, float)
def onMapClick(self, lat: float, lon: float):
logger.debug(f"Map clicked: {lat}, {lon}")
self.mapClicked.emit(lat, lon)
@pyqtSlot(int)
def onMarkerClick(self, target_id: int):
logger.debug(f"Marker clicked: #{target_id}")
self.markerClicked.emit(target_id)
@pyqtSlot()
def onMapReady(self):
logger.info("Leaflet map ready")
self._map_ready = True
self.mapReady.emit()
@pyqtSlot(str)
def logFromJS(self, message: str):
logger.debug(f"[JS] {message}")
@property
def is_ready(self) -> bool:
return self._map_ready
# =============================================================================
# RadarMapWidget
# =============================================================================
class RadarMapWidget(QWidget):
"""
Embeds a Leaflet.js interactive map inside a QWebEngineView.
Public methods mirror the V6 map API:
set_radar_position(gps), set_targets(list), set_coverage_radius(r),
set_zoom(level)
"""
targetSelected = pyqtSignal(int)
def __init__(self, radar_lat: float = 41.9028, radar_lon: float = 12.4964,
parent=None):
super().__init__(parent)
# State
self._radar_position = GPSData(
latitude=radar_lat, longitude=radar_lon,
altitude=0.0, pitch=0.0, heading=0.0,
)
self._targets: List[RadarTarget] = []
self._coverage_radius = 50_000 # metres
self._tile_server = TileServer.OPENSTREETMAP
self._show_coverage = True
self._show_trails = False
# Build UI
self._setup_ui()
# Bridge + channel
self._bridge = MapBridge(self)
self._bridge.mapReady.connect(self._on_map_ready)
self._bridge.markerClicked.connect(self._on_marker_clicked)
self._channel = QWebChannel()
self._channel.registerObject("bridge", self._bridge)
self._web_view.page().setWebChannel(self._channel)
# Load the Leaflet map
self._load_map()
# ---- UI setup ----------------------------------------------------------
def _setup_ui(self):
layout = QVBoxLayout(self)
layout.setContentsMargins(0, 0, 0, 0)
# Control bar
bar = QFrame()
bar.setStyleSheet(f"background-color: {DARK_ACCENT}; border-radius: 4px;")
bar_layout = QHBoxLayout(bar)
bar_layout.setContentsMargins(8, 4, 8, 4)
# Tile selector
self._tile_combo = QComboBox()
self._tile_combo.addItem("OpenStreetMap", TileServer.OPENSTREETMAP)
self._tile_combo.addItem("Google Maps", TileServer.GOOGLE_MAPS)
self._tile_combo.addItem("Google Satellite", TileServer.GOOGLE_SATELLITE)
self._tile_combo.addItem("Google Hybrid", TileServer.GOOGLE_HYBRID)
self._tile_combo.addItem("ESRI Satellite", TileServer.ESRI_SATELLITE)
self._tile_combo.currentIndexChanged.connect(self._on_tile_changed)
self._tile_combo.setStyleSheet(f"""
QComboBox {{
background-color: {DARK_BUTTON}; color: {DARK_FG};
border: 1px solid {DARK_BORDER}; padding: 4px 8px; border-radius: 4px;
}}
""")
bar_layout.addWidget(QLabel("Tiles:"))
bar_layout.addWidget(self._tile_combo)
# Toggles
self._coverage_check = QCheckBox("Coverage")
self._coverage_check.setChecked(True)
self._coverage_check.stateChanged.connect(self._on_coverage_toggled)
bar_layout.addWidget(self._coverage_check)
self._trails_check = QCheckBox("Trails")
self._trails_check.setChecked(False)
self._trails_check.stateChanged.connect(self._on_trails_toggled)
bar_layout.addWidget(self._trails_check)
btn_style = f"""
QPushButton {{
background-color: {DARK_BUTTON}; color: {DARK_FG};
border: 1px solid {DARK_BORDER}; padding: 4px 12px; border-radius: 4px;
}}
QPushButton:hover {{ background-color: {DARK_BUTTON_HOVER}; }}
"""
center_btn = QPushButton("Center")
center_btn.clicked.connect(self._center_on_radar)
center_btn.setStyleSheet(btn_style)
bar_layout.addWidget(center_btn)
fit_btn = QPushButton("Fit All")
fit_btn.clicked.connect(self._fit_all)
fit_btn.setStyleSheet(btn_style)
bar_layout.addWidget(fit_btn)
bar_layout.addStretch()
self._status_label = QLabel("Loading map...")
self._status_label.setStyleSheet(f"color: {DARK_INFO};")
bar_layout.addWidget(self._status_label)
layout.addWidget(bar)
# Web view
self._web_view = QWebEngineView()
self._web_view.setMinimumSize(400, 300)
layout.addWidget(self._web_view, stretch=1)
# ---- HTML generation ---------------------------------------------------
def _get_map_html(self) -> str:
lat = self._radar_position.latitude
lon = self._radar_position.longitude
cov = self._coverage_radius
# Using {{ / }} for literal braces inside the f-string
return f'''<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Radar Map</title>
<link rel="stylesheet" href="https://unpkg.com/leaflet@1.9.4/dist/leaflet.css"
integrity="sha256-p4NxAoJBhIIN+hmNHrzRCf9tD/miZyoHS5obTRR9BMY=" crossorigin=""/>
<script src="https://unpkg.com/leaflet@1.9.4/dist/leaflet.js"
integrity="sha256-20nQCchB9co0qIjJZRGuk2/Z9VM+kNiyxNV1lvTlZBo=" crossorigin=""></script>
<script src="qrc:///qtwebchannel/qwebchannel.js"></script>
<style>
* {{ margin:0; padding:0; box-sizing:border-box; }}
html, body {{ height:100%; width:100%; font-family:'Segoe UI',Arial,sans-serif; }}
#map {{ height:100%; width:100%; background-color:{DARK_BG}; }}
.leaflet-container {{ background-color:{DARK_BG} !important; }}
.leaflet-popup-content-wrapper {{
background-color:{DARK_ACCENT}; color:{DARK_FG};
border-radius:8px; box-shadow:0 4px 12px rgba(0,0,0,0.4);
}}
.leaflet-popup-tip {{ background-color:{DARK_ACCENT}; }}
.leaflet-popup-content {{ margin:12px; }}
.popup-title {{
font-size:14px; font-weight:bold; color:#4e9eff;
margin-bottom:8px; border-bottom:1px solid {DARK_BORDER}; padding-bottom:6px;
}}
.popup-row {{ display:flex; justify-content:space-between; margin:4px 0; font-size:12px; }}
.popup-label {{ color:{DARK_TEXT}; }}
.popup-value {{ color:{DARK_FG}; font-weight:500; }}
.status-approaching {{ color:#F44336; }}
.status-receding {{ color:#2196F3; }}
.status-stationary {{ color:#9E9E9E; }}
.legend {{
background-color:{DARK_ACCENT}; color:{DARK_FG};
padding:10px 14px; border-radius:6px; font-size:12px;
box-shadow:0 2px 8px rgba(0,0,0,0.3);
}}
.legend-title {{ font-weight:bold; margin-bottom:8px; color:#4e9eff; }}
.legend-item {{ display:flex; align-items:center; margin:4px 0; }}
.legend-color {{
width:14px; height:14px; border-radius:50%;
margin-right:8px; border:1px solid white;
}}
</style>
</head>
<body>
<div id="map"></div>
<script>
var map, radarMarker, coverageCircle;
var targetMarkers = {{}};
var targetTrails = {{}};
var targetTrailHistory = {{}};
var bridge = null;
var currentTileLayer = null;
var showCoverage = true;
var showTrails = false;
var maxTrailLength = 30;
var tileServers = {{
'osm': {{
url:'https://{{s}}.tile.openstreetmap.org/{{z}}/{{x}}/{{y}}.png',
attribution:'&copy; OpenStreetMap', maxZoom:19
}},
'google': {{
url:'https://mt0.google.com/vt/lyrs=m&hl=en&x={{x}}&y={{y}}&z={{z}}&s=Ga',
attribution:'&copy; Google Maps', maxZoom:22
}},
'google_sat': {{
url:'https://mt0.google.com/vt/lyrs=s&hl=en&x={{x}}&y={{y}}&z={{z}}&s=Ga',
attribution:'&copy; Google Maps', maxZoom:22
}},
'google_hybrid': {{
url:'https://mt0.google.com/vt/lyrs=y&hl=en&x={{x}}&y={{y}}&z={{z}}&s=Ga',
attribution:'&copy; Google Maps', maxZoom:22
}},
'esri_sat': {{
url:'https://server.arcgisonline.com/ArcGIS/rest/services/World_Imagery/MapServer/tile/{{z}}/{{y}}/{{x}}',
attribution:'&copy; Esri', maxZoom:19
}}
}};
function initMap() {{
map = L.map('map', {{ preferCanvas:true, zoomControl:true }})
.setView([{lat}, {lon}], 10);
setTileServer('osm');
var radarIcon = L.divIcon({{
className:'radar-icon',
html:'<div style="background:radial-gradient(circle,#FF5252 0%,#D32F2F 100%);'+
'width:24px;height:24px;border-radius:50%;border:3px solid white;'+
'box-shadow:0 2px 8px rgba(0,0,0,0.5);"></div>',
iconSize:[24,24], iconAnchor:[12,12]
}});
radarMarker = L.marker([{lat},{lon}], {{ icon:radarIcon, zIndexOffset:1000 }}).addTo(map);
updateRadarPopup();
coverageCircle = L.circle([{lat},{lon}], {{
radius:{cov}, color:'#FF5252', fillColor:'#FF5252',
fillOpacity:0.08, weight:2, dashArray:'8, 8'
}}).addTo(map);
addLegend();
map.on('click', function(e){{ if(bridge) bridge.onMapClick(e.latlng.lat,e.latlng.lng); }});
}}
function setTileServer(id) {{
var cfg = tileServers[id]; if(!cfg) return;
if(currentTileLayer) map.removeLayer(currentTileLayer);
currentTileLayer = L.tileLayer(cfg.url, {{ attribution:cfg.attribution, maxZoom:cfg.maxZoom }}).addTo(map);
}}
function updateRadarPopup() {{
if(!radarMarker) return;
var ll = radarMarker.getLatLng();
radarMarker.bindPopup(
'<div class="popup-title">Radar System</div>'+
'<div class="popup-row"><span class="popup-label">Lat:</span><span class="popup-value">'+ll.lat.toFixed(6)+'</span></div>'+
'<div class="popup-row"><span class="popup-label">Lon:</span><span class="popup-value">'+ll.lng.toFixed(6)+'</span></div>'+
'<div class="popup-row"><span class="popup-label">Status:</span><span class="popup-value status-approaching">Active</span></div>'
);
}}
function addLegend() {{
var legend = L.control({{ position:'bottomright' }});
legend.onAdd = function() {{
var d = L.DomUtil.create('div','legend');
d.innerHTML =
'<div class="legend-title">Target Legend</div>'+
'<div class="legend-item"><div class="legend-color" style="background:#F44336"></div>Approaching</div>'+
'<div class="legend-item"><div class="legend-color" style="background:#2196F3"></div>Receding</div>'+
'<div class="legend-item"><div class="legend-color" style="background:#9E9E9E"></div>Stationary</div>'+
'<div class="legend-item"><div class="legend-color" style="background:#FF5252"></div>Radar</div>';
return d;
}};
legend.addTo(map);
}}
function updateRadarPosition(lat,lon,alt,pitch,heading) {{
if(!radarMarker||!coverageCircle) return;
radarMarker.setLatLng([lat,lon]);
coverageCircle.setLatLng([lat,lon]);
updateRadarPopup();
}}
function updateTargets(targetsJson) {{
var targets = JSON.parse(targetsJson);
var currentIds = {{}};
targets.forEach(function(t) {{
currentIds[t.id] = true;
var lat=t.latitude, lon=t.longitude;
var color = getTargetColor(t.velocity);
var sz = Math.max(10, Math.min(20, 10+t.snr/3));
if(!targetTrailHistory[t.id]) targetTrailHistory[t.id] = [];
targetTrailHistory[t.id].push([lat,lon]);
if(targetTrailHistory[t.id].length > maxTrailLength)
targetTrailHistory[t.id].shift();
if(targetMarkers[t.id]) {{
targetMarkers[t.id].setLatLng([lat,lon]);
targetMarkers[t.id].setIcon(makeIcon(color,sz));
if(targetTrails[t.id]) {{
targetTrails[t.id].setLatLngs(targetTrailHistory[t.id]);
targetTrails[t.id].setStyle({{ color:color }});
}}
}} else {{
var marker = L.marker([lat,lon], {{ icon:makeIcon(color,sz) }}).addTo(map);
marker.on('click', (function(id){{ return function(){{ if(bridge) bridge.onMarkerClick(id); }}; }})(t.id));
targetMarkers[t.id] = marker;
if(showTrails) {{
targetTrails[t.id] = L.polyline(targetTrailHistory[t.id], {{
color:color, weight:3, opacity:0.7, lineCap:'round', lineJoin:'round'
}}).addTo(map);
}}
}}
updateTargetPopup(t);
}});
for(var id in targetMarkers) {{
if(!currentIds[id]) {{
map.removeLayer(targetMarkers[id]); delete targetMarkers[id];
if(targetTrails[id]) {{ map.removeLayer(targetTrails[id]); delete targetTrails[id]; }}
delete targetTrailHistory[id];
}}
}}
}}
function makeIcon(color,sz) {{
return L.divIcon({{
className:'target-icon',
html:'<div style="background-color:'+color+';width:'+sz+'px;height:'+sz+'px;'+
'border-radius:50%;border:2px solid white;box-shadow:0 2px 6px rgba(0,0,0,0.4);"></div>',
iconSize:[sz,sz], iconAnchor:[sz/2,sz/2]
}});
}}
function updateTargetPopup(t) {{
if(!targetMarkers[t.id]) return;
var sc = t.velocity>1?'status-approaching':(t.velocity<-1?'status-receding':'status-stationary');
var st = t.velocity>1?'Approaching':(t.velocity<-1?'Receding':'Stationary');
targetMarkers[t.id].bindPopup(
'<div class="popup-title">Target #'+t.id+'</div>'+
'<div class="popup-row"><span class="popup-label">Range:</span><span class="popup-value">'+t.range.toFixed(1)+' m</span></div>'+
'<div class="popup-row"><span class="popup-label">Velocity:</span><span class="popup-value">'+t.velocity.toFixed(1)+' m/s</span></div>'+
'<div class="popup-row"><span class="popup-label">Azimuth:</span><span class="popup-value">'+t.azimuth.toFixed(1)+'&deg;</span></div>'+
'<div class="popup-row"><span class="popup-label">Elevation:</span><span class="popup-value">'+t.elevation.toFixed(1)+'&deg;</span></div>'+
'<div class="popup-row"><span class="popup-label">SNR:</span><span class="popup-value">'+t.snr.toFixed(1)+' dB</span></div>'+
'<div class="popup-row"><span class="popup-label">Track:</span><span class="popup-value">'+t.track_id+'</span></div>'+
'<div class="popup-row"><span class="popup-label">Status:</span><span class="popup-value '+sc+'">'+st+'</span></div>'
);
}}
function getTargetColor(v) {{
if(v>50) return '#FF1744';
if(v>10) return '#FF5252';
if(v>1) return '#FF8A65';
if(v<-50) return '#1565C0';
if(v<-10) return '#2196F3';
if(v<-1) return '#64B5F6';
return '#9E9E9E';
}}
function setCoverageVisible(vis) {{
showCoverage = vis;
if(coverageCircle) {{
if(vis) coverageCircle.addTo(map); else map.removeLayer(coverageCircle);
}}
}}
function setCoverageRadius(r) {{ if(coverageCircle) coverageCircle.setRadius(r); }}
function setTrailsVisible(vis) {{
showTrails = vis;
if(vis) {{
for(var id in targetMarkers) {{
if(!targetTrails[id] && targetTrailHistory[id] && targetTrailHistory[id].length>1) {{
targetTrails[id] = L.polyline(targetTrailHistory[id], {{
color:'#4CAF50', weight:3, opacity:0.7, lineCap:'round', lineJoin:'round'
}}).addTo(map);
}} else if(targetTrails[id]) {{
targetTrails[id].addTo(map);
}}
}}
}} else {{
for(var id in targetTrails) {{ map.removeLayer(targetTrails[id]); }}
}}
}}
function centerOnRadar() {{ if(radarMarker) map.setView(radarMarker.getLatLng(), map.getZoom()); }}
function fitAllTargets() {{
var b = L.latLngBounds([]);
if(radarMarker) b.extend(radarMarker.getLatLng());
for(var id in targetMarkers) b.extend(targetMarkers[id].getLatLng());
if(b.isValid()) map.fitBounds(b, {{ padding:[50,50] }});
}}
function setZoom(lvl) {{ map.setZoom(lvl); }}
document.addEventListener('DOMContentLoaded', function() {{
new QWebChannel(qt.webChannelTransport, function(ch) {{
bridge = ch.objects.bridge;
initMap();
if(bridge) bridge.onMapReady();
}});
}});
</script>
</body>
</html>'''
# ---- load / helpers ----------------------------------------------------
def _load_map(self):
self._web_view.setHtml(self._get_map_html())
logger.info("Leaflet map HTML loaded")
def _on_map_ready(self):
self._status_label.setText(f"Map ready - {len(self._targets)} targets")
self._status_label.setStyleSheet(f"color: {DARK_SUCCESS};")
def _on_marker_clicked(self, tid: int):
self.targetSelected.emit(tid)
def _run_js(self, script: str):
self._web_view.page().runJavaScript(script)
# ---- control bar callbacks ---------------------------------------------
def _on_tile_changed(self, _index: int):
server = self._tile_combo.currentData()
if server:
self._tile_server = server
self._run_js(f"setTileServer('{server.value}')")
def _on_coverage_toggled(self, state: int):
vis = state == Qt.CheckState.Checked.value
self._show_coverage = vis
self._run_js(f"setCoverageVisible({str(vis).lower()})")
def _on_trails_toggled(self, state: int):
vis = state == Qt.CheckState.Checked.value
self._show_trails = vis
self._run_js(f"setTrailsVisible({str(vis).lower()})")
def _center_on_radar(self):
self._run_js("centerOnRadar()")
def _fit_all(self):
self._run_js("fitAllTargets()")
# ---- public API --------------------------------------------------------
def set_radar_position(self, gps: GPSData):
self._radar_position = gps
self._run_js(
f"updateRadarPosition({gps.latitude},{gps.longitude},"
f"{gps.altitude},{gps.pitch},{gps.heading})"
)
def set_targets(self, targets: List[RadarTarget]):
self._targets = targets
data = [t.to_dict() for t in targets]
js = json.dumps(data).replace("'", "\\'")
self._status_label.setText(f"{len(targets)} targets tracked")
self._run_js(f"updateTargets('{js}')")
def set_coverage_radius(self, radius_m: float):
self._coverage_radius = radius_m
self._run_js(f"setCoverageRadius({radius_m})")
def set_zoom(self, level: int):
level = max(0, min(22, level))
self._run_js(f"setZoom({level})")
+187
View File
@@ -0,0 +1,187 @@
"""
v7.models — Data classes, enums, and theme constants for the PLFM Radar GUI V7.
This module defines the core data structures used throughout the application:
- RadarTarget, RadarSettings, GPSData (dataclasses)
- TileServer (enum for map tile providers)
- Dark theme color constants
- Optional dependency availability flags
"""
import logging
from dataclasses import dataclass, asdict
from enum import Enum
# ---------------------------------------------------------------------------
# Optional dependency flags (graceful degradation)
# ---------------------------------------------------------------------------
try:
import usb.core
import usb.util # noqa: F401 — availability check
USB_AVAILABLE = True
except ImportError:
USB_AVAILABLE = False
logging.warning("pyusb not available. USB functionality will be disabled.")
try:
from pyftdi.ftdi import Ftdi # noqa: F401 — availability check
from pyftdi.usbtools import UsbTools # noqa: F401 — availability check
from pyftdi.ftdi import FtdiError # noqa: F401 — availability check
FTDI_AVAILABLE = True
except ImportError:
FTDI_AVAILABLE = False
logging.warning("pyftdi not available. FTDI functionality will be disabled.")
try:
from scipy import signal as _scipy_signal # noqa: F401 — availability check
SCIPY_AVAILABLE = True
except ImportError:
SCIPY_AVAILABLE = False
logging.warning("scipy not available. Some DSP features will be disabled.")
try:
from sklearn.cluster import DBSCAN as _DBSCAN # noqa: F401 — availability check
SKLEARN_AVAILABLE = True
except ImportError:
SKLEARN_AVAILABLE = False
logging.warning("sklearn not available. Clustering will be disabled.")
try:
from filterpy.kalman import KalmanFilter as _KalmanFilter # noqa: F401 — availability check
FILTERPY_AVAILABLE = True
except ImportError:
FILTERPY_AVAILABLE = False
logging.warning("filterpy not available. Kalman tracking will be disabled.")
try:
import crcmod as _crcmod # noqa: F401 — availability check
CRCMOD_AVAILABLE = True
except ImportError:
CRCMOD_AVAILABLE = False
logging.warning("crcmod not available. CRC validation will use fallback.")
# ---------------------------------------------------------------------------
# Dark theme color constants (shared by all modules)
# ---------------------------------------------------------------------------
DARK_BG = "#2b2b2b"
DARK_FG = "#e0e0e0"
DARK_ACCENT = "#3c3f41"
DARK_HIGHLIGHT = "#4e5254"
DARK_BORDER = "#555555"
DARK_TEXT = "#cccccc"
DARK_BUTTON = "#3c3f41"
DARK_BUTTON_HOVER = "#4e5254"
DARK_TREEVIEW = "#3c3f41"
DARK_TREEVIEW_ALT = "#404040"
DARK_SUCCESS = "#4CAF50"
DARK_WARNING = "#FFC107"
DARK_ERROR = "#F44336"
DARK_INFO = "#2196F3"
# ---------------------------------------------------------------------------
# Data classes
# ---------------------------------------------------------------------------
@dataclass
class RadarTarget:
"""Represents a detected radar target."""
id: int
range: float # Range in meters
velocity: float # Velocity in m/s (positive = approaching)
azimuth: float # Azimuth angle in degrees
elevation: float # Elevation angle in degrees
latitude: float = 0.0
longitude: float = 0.0
snr: float = 0.0 # Signal-to-noise ratio in dB
timestamp: float = 0.0
track_id: int = -1
classification: str = "unknown"
def to_dict(self) -> dict:
"""Convert to dictionary for JSON serialization."""
return asdict(self)
@dataclass
class RadarSettings:
"""Radar system configuration parameters."""
system_frequency: float = 10e9 # Hz
chirp_duration_1: float = 30e-6 # Long chirp duration (s)
chirp_duration_2: float = 0.5e-6 # Short chirp duration (s)
chirps_per_position: int = 32
freq_min: float = 10e6 # Hz
freq_max: float = 30e6 # Hz
prf1: float = 1000 # PRF 1 (Hz)
prf2: float = 2000 # PRF 2 (Hz)
max_distance: float = 50000 # Max detection range (m)
map_size: float = 50000 # Map display size (m)
coverage_radius: float = 50000 # Map coverage radius (m)
@dataclass
class GPSData:
"""GPS position and orientation data."""
latitude: float
longitude: float
altitude: float
pitch: float # Pitch angle in degrees
heading: float = 0.0 # Heading in degrees (0 = North)
timestamp: float = 0.0
def to_dict(self) -> dict:
return asdict(self)
# ---------------------------------------------------------------------------
# Tile server enum
# ---------------------------------------------------------------------------
@dataclass
class ProcessingConfig:
"""Signal processing pipeline configuration.
Controls: MTI filter, CFAR detector, DC notch removal,
windowing, detection threshold, DBSCAN clustering, and Kalman tracking.
"""
# MTI (Moving Target Indication)
mti_enabled: bool = False
mti_order: int = 2 # 1, 2, or 3
# CFAR (Constant False Alarm Rate)
cfar_enabled: bool = False
cfar_type: str = "CA-CFAR" # CA-CFAR, OS-CFAR, GO-CFAR, SO-CFAR
cfar_guard_cells: int = 2
cfar_training_cells: int = 8
cfar_threshold_factor: float = 5.0 # PFA-related scalar
# DC Notch / DC Removal
dc_notch_enabled: bool = False
# Windowing (applied before FFT)
window_type: str = "Hann" # None, Hann, Hamming, Blackman, Kaiser, Chebyshev
# Detection threshold (dB above noise floor)
detection_threshold_db: float = 12.0
# DBSCAN Clustering
clustering_enabled: bool = True
clustering_eps: float = 100.0
clustering_min_samples: int = 2
# Kalman Tracking
tracking_enabled: bool = True
# ---------------------------------------------------------------------------
# Tile server enum
# ---------------------------------------------------------------------------
class TileServer(Enum):
"""Available map tile servers."""
OPENSTREETMAP = "osm"
GOOGLE_MAPS = "google"
GOOGLE_SATELLITE = "google_sat"
GOOGLE_HYBRID = "google_hybrid"
ESRI_SATELLITE = "esri_sat"
+642
View File
@@ -0,0 +1,642 @@
"""
v7.processing — Radar signal processing, packet parsing, and GPS parsing.
Classes:
- RadarProcessor — dual-CPI fusion, multi-PRF unwrap, DBSCAN clustering,
association, Kalman tracking
- RadarPacketParser — parse raw byte streams into typed radar packets
(FIX: returns (parsed_dict, bytes_consumed) tuple)
- USBPacketParser — parse GPS text/binary frames from STM32 CDC
Bug fixes vs V6:
1. RadarPacketParser.parse_packet() now returns (dict, bytes_consumed) tuple
so the caller knows exactly how many bytes to strip from the buffer.
2. apply_pitch_correction() is a proper standalone function.
"""
import struct
import time
import logging
import math
from typing import Optional, Tuple, List, Dict
import numpy as np
from .models import (
RadarTarget, GPSData, ProcessingConfig,
SCIPY_AVAILABLE, SKLEARN_AVAILABLE, FILTERPY_AVAILABLE, CRCMOD_AVAILABLE,
)
if SKLEARN_AVAILABLE:
from sklearn.cluster import DBSCAN
if FILTERPY_AVAILABLE:
from filterpy.kalman import KalmanFilter
if CRCMOD_AVAILABLE:
import crcmod
if SCIPY_AVAILABLE:
from scipy.signal import windows as scipy_windows
logger = logging.getLogger(__name__)
# =============================================================================
# Utility: pitch correction (Bug #4 fix — was never defined in V6)
# =============================================================================
def apply_pitch_correction(raw_elevation: float, pitch: float) -> float:
"""
Apply platform pitch correction to a raw elevation angle.
Returns the corrected elevation = raw_elevation - pitch.
"""
return raw_elevation - pitch
# =============================================================================
# Radar Processor — signal-level processing & tracking pipeline
# =============================================================================
class RadarProcessor:
"""Full radar processing pipeline: fusion, clustering, association, tracking."""
def __init__(self):
self.range_doppler_map = np.zeros((1024, 32))
self.detected_targets: List[RadarTarget] = []
self.track_id_counter: int = 0
self.tracks: Dict[int, dict] = {}
self.frame_count: int = 0
self.config = ProcessingConfig()
# MTI state: store previous frames for cancellation
self._mti_history: List[np.ndarray] = []
# ---- Configuration -----------------------------------------------------
def set_config(self, config: ProcessingConfig):
"""Update the processing configuration and reset MTI history if needed."""
old_order = self.config.mti_order
self.config = config
if config.mti_order != old_order:
self._mti_history.clear()
# ---- Windowing ----------------------------------------------------------
@staticmethod
def apply_window(data: np.ndarray, window_type: str) -> np.ndarray:
"""Apply a window function along each column (slow-time dimension).
*data* shape: (range_bins, doppler_bins). Window is applied along
axis-1 (Doppler / slow-time).
"""
if window_type == "None" or not window_type:
return data
n = data.shape[1]
if n < 2:
return data
if SCIPY_AVAILABLE:
wtype = window_type.lower()
if wtype == "hann":
w = scipy_windows.hann(n, sym=False)
elif wtype == "hamming":
w = scipy_windows.hamming(n, sym=False)
elif wtype == "blackman":
w = scipy_windows.blackman(n)
elif wtype == "kaiser":
w = scipy_windows.kaiser(n, beta=14)
elif wtype == "chebyshev":
w = scipy_windows.chebwin(n, at=80)
else:
w = np.ones(n)
else:
# Fallback: numpy Hann
wtype = window_type.lower()
if wtype == "hann":
w = np.hanning(n)
elif wtype == "hamming":
w = np.hamming(n)
elif wtype == "blackman":
w = np.blackman(n)
else:
w = np.ones(n)
return data * w[np.newaxis, :]
# ---- DC Notch (zero-Doppler removal) ------------------------------------
@staticmethod
def dc_notch(data: np.ndarray) -> np.ndarray:
"""Remove the DC (zero-Doppler) component by subtracting the
mean along the slow-time axis for each range bin."""
return data - np.mean(data, axis=1, keepdims=True)
# ---- MTI (Moving Target Indication) -------------------------------------
def mti_filter(self, frame: np.ndarray) -> np.ndarray:
"""Apply MTI cancellation of order 1, 2, or 3.
Order-1: y[n] = x[n] - x[n-1]
Order-2: y[n] = x[n] - 2*x[n-1] + x[n-2]
Order-3: y[n] = x[n] - 3*x[n-1] + 3*x[n-2] - x[n-3]
The internal history buffer stores up to 3 previous frames.
"""
order = self.config.mti_order
self._mti_history.append(frame.copy())
# Trim history to order + 1 frames
max_len = order + 1
if len(self._mti_history) > max_len:
self._mti_history = self._mti_history[-max_len:]
if len(self._mti_history) < order + 1:
# Not enough history yet — return zeros (suppress output)
return np.zeros_like(frame)
h = self._mti_history
if order == 1:
return h[-1] - h[-2]
elif order == 2:
return h[-1] - 2.0 * h[-2] + h[-3]
elif order == 3:
return h[-1] - 3.0 * h[-2] + 3.0 * h[-3] - h[-4]
else:
return h[-1] - h[-2]
# ---- CFAR (Constant False Alarm Rate) -----------------------------------
@staticmethod
def cfar_1d(signal_vec: np.ndarray, guard: int, train: int,
threshold_factor: float, cfar_type: str = "CA-CFAR") -> np.ndarray:
"""1-D CFAR detector.
Parameters
----------
signal_vec : 1-D array (power in linear scale)
guard : number of guard cells on each side
train : number of training cells on each side
threshold_factor : multiplier on estimated noise level
cfar_type : CA-CFAR, OS-CFAR, GO-CFAR, or SO-CFAR
Returns
-------
detections : boolean array, True where target detected
"""
n = len(signal_vec)
detections = np.zeros(n, dtype=bool)
half = guard + train
for i in range(half, n - half):
# Leading training cells
lead = signal_vec[i - half: i - guard]
# Lagging training cells
lag = signal_vec[i + guard + 1: i + half + 1]
if cfar_type == "CA-CFAR":
noise = (np.sum(lead) + np.sum(lag)) / (2 * train)
elif cfar_type == "GO-CFAR":
noise = max(np.mean(lead), np.mean(lag))
elif cfar_type == "SO-CFAR":
noise = min(np.mean(lead), np.mean(lag))
elif cfar_type == "OS-CFAR":
all_train = np.concatenate([lead, lag])
all_train.sort()
k = int(0.75 * len(all_train)) # 75th percentile
noise = all_train[min(k, len(all_train) - 1)]
else:
noise = (np.sum(lead) + np.sum(lag)) / (2 * train)
threshold = noise * threshold_factor
if signal_vec[i] > threshold:
detections[i] = True
return detections
def cfar_2d(self, rdm: np.ndarray) -> np.ndarray:
"""Apply 1-D CFAR along each range bin (across Doppler dimension).
Returns a boolean mask of the same shape as *rdm*.
"""
cfg = self.config
mask = np.zeros_like(rdm, dtype=bool)
for r in range(rdm.shape[0]):
row = rdm[r, :]
if row.max() > 0:
mask[r, :] = self.cfar_1d(
row, cfg.cfar_guard_cells, cfg.cfar_training_cells,
cfg.cfar_threshold_factor, cfg.cfar_type,
)
return mask
# ---- Full processing pipeline -------------------------------------------
def process_frame(self, raw_frame: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
"""Run the full signal processing chain on a Range x Doppler frame.
Parameters
----------
raw_frame : 2-D array (range_bins x doppler_bins), complex or real
Returns
-------
(processed_rdm, detection_mask)
processed_rdm — processed Range-Doppler map (power, linear)
detection_mask — boolean mask of CFAR / threshold detections
"""
cfg = self.config
data = raw_frame.astype(np.float64)
# 1. DC Notch
if cfg.dc_notch_enabled:
data = self.dc_notch(data)
# 2. Windowing (before FFT — applied along slow-time axis)
if cfg.window_type and cfg.window_type != "None":
data = self.apply_window(data, cfg.window_type)
# 3. MTI
if cfg.mti_enabled:
data = self.mti_filter(data)
# 4. Power (magnitude squared)
power = np.abs(data) ** 2
power = np.maximum(power, 1e-20) # avoid log(0)
# 5. CFAR detection or simple threshold
if cfg.cfar_enabled:
detection_mask = self.cfar_2d(power)
else:
# Simple threshold: convert dB threshold to linear
power_db = 10.0 * np.log10(power)
noise_floor = np.median(power_db)
detection_mask = power_db > (noise_floor + cfg.detection_threshold_db)
# Update stored RDM
self.range_doppler_map = power
self.frame_count += 1
return power, detection_mask
# ---- Dual-CPI fusion ---------------------------------------------------
@staticmethod
def dual_cpi_fusion(range_profiles_1: np.ndarray,
range_profiles_2: np.ndarray) -> np.ndarray:
"""Dual-CPI fusion for better detection."""
return np.mean(range_profiles_1, axis=0) + np.mean(range_profiles_2, axis=0)
# ---- Multi-PRF velocity unwrapping -------------------------------------
def multi_prf_unwrap(self, doppler_measurements, prf1: float, prf2: float):
"""Multi-PRF velocity unwrapping (Chinese Remainder Theorem)."""
lam = 3e8 / 10e9
v_max1 = prf1 * lam / 2
v_max2 = prf2 * lam / 2
unwrapped = []
for doppler in doppler_measurements:
v1 = doppler * lam / 2
v2 = doppler * lam / 2
velocity = self._solve_chinese_remainder(v1, v2, v_max1, v_max2)
unwrapped.append(velocity)
return unwrapped
@staticmethod
def _solve_chinese_remainder(v1, v2, max1, max2):
for k in range(-5, 6):
candidate = v1 + k * max1
if abs(candidate - v2) < max2 / 2:
return candidate
return v1
# ---- DBSCAN clustering -------------------------------------------------
@staticmethod
def clustering(detections: List[RadarTarget],
eps: float = 100, min_samples: int = 2) -> list:
"""DBSCAN clustering of detections (requires sklearn)."""
if not SKLEARN_AVAILABLE or len(detections) == 0:
return []
points = np.array([[d.range, d.velocity] for d in detections])
labels = DBSCAN(eps=eps, min_samples=min_samples).fit(points).labels_
clusters = []
for label in set(labels):
if label == -1:
continue
cluster_points = points[labels == label]
clusters.append({
"center": np.mean(cluster_points, axis=0),
"points": cluster_points,
"size": len(cluster_points),
})
return clusters
# ---- Association -------------------------------------------------------
def association(self, detections: List[RadarTarget],
clusters: list) -> List[RadarTarget]:
"""Associate detections to existing tracks (nearest-neighbour)."""
associated = []
for det in detections:
best_track = None
min_dist = float("inf")
for tid, track in self.tracks.items():
dist = math.sqrt(
(det.range - track["state"][0]) ** 2
+ (det.velocity - track["state"][2]) ** 2
)
if dist < min_dist and dist < 500:
min_dist = dist
best_track = tid
if best_track is not None:
det.track_id = best_track
else:
det.track_id = self.track_id_counter
self.track_id_counter += 1
associated.append(det)
return associated
# ---- Kalman tracking ---------------------------------------------------
def tracking(self, associated_detections: List[RadarTarget]):
"""Kalman filter tracking (requires filterpy)."""
if not FILTERPY_AVAILABLE:
return
now = time.time()
for det in associated_detections:
if det.track_id not in self.tracks:
kf = KalmanFilter(dim_x=4, dim_z=2)
kf.x = np.array([det.range, 0, det.velocity, 0])
kf.F = np.array([
[1, 1, 0, 0],
[0, 1, 0, 0],
[0, 0, 1, 1],
[0, 0, 0, 1],
])
kf.H = np.array([
[1, 0, 0, 0],
[0, 0, 1, 0],
])
kf.P *= 1000
kf.R = np.diag([10, 1])
kf.Q = np.eye(4) * 0.1
self.tracks[det.track_id] = {
"filter": kf,
"state": kf.x,
"last_update": now,
"hits": 1,
}
else:
track = self.tracks[det.track_id]
track["filter"].predict()
track["filter"].update([det.range, det.velocity])
track["state"] = track["filter"].x
track["last_update"] = now
track["hits"] += 1
# Prune stale tracks (> 5 s without update)
stale = [tid for tid, t in self.tracks.items()
if now - t["last_update"] > 5.0]
for tid in stale:
del self.tracks[tid]
# =============================================================================
# Radar Packet Parser
# =============================================================================
class RadarPacketParser:
"""
Parse binary radar packets from the raw byte stream.
Packet format:
[Sync 2][Type 1][Length 1][Payload N][CRC16 2]
Sync pattern: 0xA5 0xC3
Bug fix vs V6:
parse_packet() now returns ``(parsed_dict, bytes_consumed)`` so the
caller can correctly advance the read pointer in the buffer.
"""
SYNC = b"\xA5\xC3"
def __init__(self):
if CRCMOD_AVAILABLE:
self.crc16_func = crcmod.mkCrcFun(
0x11021, rev=False, initCrc=0xFFFF, xorOut=0x0000
)
else:
self.crc16_func = None
# ---- main entry point --------------------------------------------------
def parse_packet(self, data: bytes) -> Optional[Tuple[dict, int]]:
"""
Attempt to parse one radar packet from *data*.
Returns
-------
(parsed_dict, bytes_consumed) on success, or None if no valid packet.
"""
if len(data) < 6:
return None
idx = data.find(self.SYNC)
if idx == -1:
return None
pkt = data[idx:]
if len(pkt) < 6:
return None
pkt_type = pkt[2]
length = pkt[3]
total_len = 4 + length + 2 # sync(2) + type(1) + len(1) + payload + crc(2)
if len(pkt) < total_len:
return None
payload = pkt[4 : 4 + length]
crc_received = struct.unpack("<H", pkt[4 + length : 4 + length + 2])[0]
# CRC check
if self.crc16_func is not None:
crc_calc = self.crc16_func(pkt[0 : 4 + length])
if crc_calc != crc_received:
logger.warning(
f"CRC mismatch: got {crc_received:04X}, calc {crc_calc:04X}"
)
return None
# Bytes consumed = offset to sync + total packet length
consumed = idx + total_len
parsed = None
if pkt_type == 0x01:
parsed = self._parse_range(payload)
elif pkt_type == 0x02:
parsed = self._parse_doppler(payload)
elif pkt_type == 0x03:
parsed = self._parse_detection(payload)
else:
logger.warning(f"Unknown packet type: {pkt_type:02X}")
if parsed is None:
return None
return (parsed, consumed)
# ---- sub-parsers -------------------------------------------------------
@staticmethod
def _parse_range(payload: bytes) -> Optional[dict]:
if len(payload) < 12:
return None
try:
range_val = struct.unpack(">I", payload[0:4])[0]
elevation = payload[4] & 0x1F
azimuth = payload[5] & 0x3F
chirp = payload[6] & 0x1F
return {
"type": "range",
"range": range_val,
"elevation": elevation,
"azimuth": azimuth,
"chirp": chirp,
"timestamp": time.time(),
}
except Exception as e:
logger.error(f"Error parsing range packet: {e}")
return None
@staticmethod
def _parse_doppler(payload: bytes) -> Optional[dict]:
if len(payload) < 12:
return None
try:
real = struct.unpack(">h", payload[0:2])[0]
imag = struct.unpack(">h", payload[2:4])[0]
elevation = payload[4] & 0x1F
azimuth = payload[5] & 0x3F
chirp = payload[6] & 0x1F
return {
"type": "doppler",
"doppler_real": real,
"doppler_imag": imag,
"elevation": elevation,
"azimuth": azimuth,
"chirp": chirp,
"timestamp": time.time(),
}
except Exception as e:
logger.error(f"Error parsing doppler packet: {e}")
return None
@staticmethod
def _parse_detection(payload: bytes) -> Optional[dict]:
if len(payload) < 8:
return None
try:
detected = (payload[0] & 0x01) != 0
elevation = payload[1] & 0x1F
azimuth = payload[2] & 0x3F
chirp = payload[3] & 0x1F
return {
"type": "detection",
"detected": detected,
"elevation": elevation,
"azimuth": azimuth,
"chirp": chirp,
"timestamp": time.time(),
}
except Exception as e:
logger.error(f"Error parsing detection packet: {e}")
return None
# =============================================================================
# USB / GPS Packet Parser
# =============================================================================
class USBPacketParser:
"""
Parse GPS (and general) data arriving from the STM32 via USB CDC.
Supports:
- Text format: ``GPS:lat,lon,alt,pitch\\r\\n``
- Binary format: ``GPSB`` header, 30 bytes total
"""
def __init__(self):
if CRCMOD_AVAILABLE:
self.crc16_func = crcmod.mkCrcFun(
0x11021, rev=False, initCrc=0xFFFF, xorOut=0x0000
)
else:
self.crc16_func = None
def parse_gps_data(self, data: bytes) -> Optional[GPSData]:
"""Attempt to parse GPS data from a raw USB CDC frame."""
if not data:
return None
try:
# Text format: "GPS:lat,lon,alt,pitch\r\n"
text = data.decode("utf-8", errors="ignore").strip()
if text.startswith("GPS:"):
parts = text.split(":")[1].split(",")
if len(parts) >= 4:
return GPSData(
latitude=float(parts[0]),
longitude=float(parts[1]),
altitude=float(parts[2]),
pitch=float(parts[3]),
timestamp=time.time(),
)
# Binary format: [GPSB 4][lat 8][lon 8][alt 4][pitch 4][CRC 2] = 30 bytes
if len(data) >= 30 and data[0:4] == b"GPSB":
return self._parse_binary_gps(data)
except Exception as e:
logger.error(f"Error parsing GPS data: {e}")
return None
@staticmethod
def _parse_binary_gps(data: bytes) -> Optional[GPSData]:
"""Parse 30-byte binary GPS frame."""
try:
if len(data) < 30:
return None
# Simple checksum CRC
crc_rcv = (data[28] << 8) | data[29]
crc_calc = sum(data[0:28]) & 0xFFFF
if crc_rcv != crc_calc:
logger.warning("GPS binary CRC mismatch")
return None
lat = struct.unpack(">d", data[4:12])[0]
lon = struct.unpack(">d", data[12:20])[0]
alt = struct.unpack(">f", data[20:24])[0]
pitch = struct.unpack(">f", data[24:28])[0]
return GPSData(
latitude=lat,
longitude=lon,
altitude=alt,
pitch=pitch,
timestamp=time.time(),
)
except Exception as e:
logger.error(f"Error parsing binary GPS: {e}")
return None
+389
View File
@@ -0,0 +1,389 @@
"""
v7.workers — QThread-based workers and demo target simulator.
Classes:
- RadarDataWorker — reads from FT2232HQ, parses packets,
emits signals with processed data.
- GPSDataWorker — reads GPS frames from STM32 CDC, emits GPSData signals.
- TargetSimulator — QTimer-based demo target generator (from GUI_PyQt_Map.py).
"""
import math
import time
import random
import logging
from typing import List
from PyQt6.QtCore import QThread, QObject, QTimer, pyqtSignal
from .models import RadarTarget, RadarSettings, GPSData
from .hardware import FT2232HQInterface, STM32USBInterface
from .processing import (
RadarProcessor, RadarPacketParser, USBPacketParser,
apply_pitch_correction,
)
logger = logging.getLogger(__name__)
# =============================================================================
# Utility: polar → geographic
# =============================================================================
def polar_to_geographic(
radar_lat: float,
radar_lon: float,
range_m: float,
azimuth_deg: float,
) -> tuple:
"""
Convert polar coordinates (range, azimuth) relative to radar
to geographic (latitude, longitude).
azimuth_deg: 0 = North, clockwise.
Returns (lat, lon).
"""
R = 6_371_000 # Earth radius in meters
lat1 = math.radians(radar_lat)
lon1 = math.radians(radar_lon)
bearing = math.radians(azimuth_deg)
lat2 = math.asin(
math.sin(lat1) * math.cos(range_m / R)
+ math.cos(lat1) * math.sin(range_m / R) * math.cos(bearing)
)
lon2 = lon1 + math.atan2(
math.sin(bearing) * math.sin(range_m / R) * math.cos(lat1),
math.cos(range_m / R) - math.sin(lat1) * math.sin(lat2),
)
return (math.degrees(lat2), math.degrees(lon2))
# =============================================================================
# Radar Data Worker (QThread)
# =============================================================================
class RadarDataWorker(QThread):
"""
Background worker that continuously reads radar data from the primary
FT2232HQ interface, parses packets, runs the processing pipeline, and
emits signals with results.
Signals:
packetReceived(dict) — a single parsed packet dict
targetsUpdated(list) — list of RadarTarget after processing
errorOccurred(str) — error message
statsUpdated(dict) — packet/byte counters
"""
packetReceived = pyqtSignal(dict)
targetsUpdated = pyqtSignal(list)
errorOccurred = pyqtSignal(str)
statsUpdated = pyqtSignal(dict)
def __init__(
self,
ft2232hq: FT2232HQInterface,
processor: RadarProcessor,
packet_parser: RadarPacketParser,
settings: RadarSettings,
gps_data_ref: GPSData,
parent=None,
):
super().__init__(parent)
self._ft2232hq = ft2232hq
self._processor = processor
self._parser = packet_parser
self._settings = settings
self._gps = gps_data_ref
self._running = False
# Counters
self._packet_count = 0
self._byte_count = 0
self._error_count = 0
def stop(self):
self._running = False
def run(self):
"""Main loop: read → parse → process → emit."""
self._running = True
buffer = bytearray()
while self._running:
# Use FT2232HQ interface
iface = None
if self._ft2232hq and self._ft2232hq.is_open:
iface = self._ft2232hq
if iface is None:
self.msleep(100)
continue
try:
data = iface.read_data(4096)
if data:
buffer.extend(data)
self._byte_count += len(data)
# Parse as many packets as possible
while len(buffer) >= 6:
result = self._parser.parse_packet(bytes(buffer))
if result is None:
# No valid packet at current position — skip one byte
if len(buffer) > 1:
buffer = buffer[1:]
else:
break
continue
pkt, consumed = result
buffer = buffer[consumed:]
self._packet_count += 1
# Process the packet
self._process_packet(pkt)
self.packetReceived.emit(pkt)
# Emit stats periodically
self.statsUpdated.emit({
"packets": self._packet_count,
"bytes": self._byte_count,
"errors": self._error_count,
"active_tracks": len(self._processor.tracks),
"targets": len(self._processor.detected_targets),
})
else:
self.msleep(10)
except Exception as e:
self._error_count += 1
self.errorOccurred.emit(str(e))
logger.error(f"RadarDataWorker error: {e}")
self.msleep(100)
# ---- internal packet handling ------------------------------------------
def _process_packet(self, pkt: dict):
"""Route a parsed packet through the processing pipeline."""
try:
if pkt["type"] == "range":
range_m = pkt["range"] * 0.1
raw_elev = pkt["elevation"]
corr_elev = apply_pitch_correction(raw_elev, self._gps.pitch)
target = RadarTarget(
id=pkt["chirp"],
range=range_m,
velocity=0,
azimuth=pkt["azimuth"],
elevation=corr_elev,
snr=20.0,
timestamp=pkt["timestamp"],
)
self._update_rdm(target)
elif pkt["type"] == "doppler":
lam = 3e8 / self._settings.system_frequency
velocity = (pkt["doppler_real"] / 32767.0) * (
self._settings.prf1 * lam / 2
)
self._update_velocity(pkt, velocity)
elif pkt["type"] == "detection":
if pkt["detected"]:
raw_elev = pkt["elevation"]
corr_elev = apply_pitch_correction(raw_elev, self._gps.pitch)
logger.info(
f"CFAR Detection: raw={raw_elev}, corr={corr_elev:.1f}, "
f"pitch={self._gps.pitch:.1f}"
)
except Exception as e:
logger.error(f"Error processing packet: {e}")
def _update_rdm(self, target: RadarTarget):
range_bin = min(int(target.range / 50), 1023)
doppler_bin = min(abs(int(target.velocity)), 31)
self._processor.range_doppler_map[range_bin, doppler_bin] += 1
self._processor.detected_targets.append(target)
if len(self._processor.detected_targets) > 100:
self._processor.detected_targets = self._processor.detected_targets[-100:]
def _update_velocity(self, pkt: dict, velocity: float):
for t in self._processor.detected_targets:
if (t.azimuth == pkt["azimuth"]
and t.elevation == pkt["elevation"]
and t.id == pkt["chirp"]):
t.velocity = velocity
break
# =============================================================================
# GPS Data Worker (QThread)
# =============================================================================
class GPSDataWorker(QThread):
"""
Background worker that reads GPS frames from the STM32 USB CDC interface
and emits parsed GPSData objects.
Signals:
gpsReceived(GPSData)
errorOccurred(str)
"""
gpsReceived = pyqtSignal(object) # GPSData
errorOccurred = pyqtSignal(str)
def __init__(
self,
stm32: STM32USBInterface,
usb_parser: USBPacketParser,
parent=None,
):
super().__init__(parent)
self._stm32 = stm32
self._parser = usb_parser
self._running = False
self._gps_count = 0
@property
def gps_count(self) -> int:
return self._gps_count
def stop(self):
self._running = False
def run(self):
self._running = True
while self._running:
if not (self._stm32 and self._stm32.is_open):
self.msleep(100)
continue
try:
data = self._stm32.read_data(64, timeout=100)
if data:
gps = self._parser.parse_gps_data(data)
if gps:
self._gps_count += 1
self.gpsReceived.emit(gps)
except Exception as e:
self.errorOccurred.emit(str(e))
logger.error(f"GPSDataWorker error: {e}")
self.msleep(100)
# =============================================================================
# Target Simulator (Demo Mode) — QTimer-based
# =============================================================================
class TargetSimulator(QObject):
"""
Generates simulated radar targets for demo/testing.
Uses a QTimer on the main thread (or whichever thread owns this object).
Emits ``targetsUpdated`` with a list[RadarTarget] on each tick.
"""
targetsUpdated = pyqtSignal(list)
def __init__(self, radar_position: GPSData, parent=None):
super().__init__(parent)
self._radar_pos = radar_position
self._targets: List[RadarTarget] = []
self._next_id = 1
self._timer = QTimer(self)
self._timer.timeout.connect(self._tick)
self._initialize_targets(8)
# ---- public API --------------------------------------------------------
def start(self, interval_ms: int = 500):
self._timer.start(interval_ms)
def stop(self):
self._timer.stop()
def set_radar_position(self, gps: GPSData):
self._radar_pos = gps
def add_random_target(self):
self._add_random_target()
# ---- internals ---------------------------------------------------------
def _initialize_targets(self, count: int):
for _ in range(count):
self._add_random_target()
def _add_random_target(self):
range_m = random.uniform(5000, 40000)
azimuth = random.uniform(0, 360)
velocity = random.uniform(-100, 100)
elevation = random.uniform(-5, 45)
lat, lon = polar_to_geographic(
self._radar_pos.latitude,
self._radar_pos.longitude,
range_m,
azimuth,
)
target = RadarTarget(
id=self._next_id,
range=range_m,
velocity=velocity,
azimuth=azimuth,
elevation=elevation,
latitude=lat,
longitude=lon,
snr=random.uniform(10, 35),
timestamp=time.time(),
track_id=self._next_id,
classification=random.choice(["aircraft", "drone", "bird", "unknown"]),
)
self._next_id += 1
self._targets.append(target)
def _tick(self):
"""Update all simulated targets and emit."""
updated: List[RadarTarget] = []
for t in self._targets:
new_range = t.range - t.velocity * 0.5
if new_range < 500 or new_range > 50000:
continue # target exits coverage — drop it
new_vel = max(-150, min(150, t.velocity + random.uniform(-2, 2)))
new_az = (t.azimuth + random.uniform(-0.5, 0.5)) % 360
lat, lon = polar_to_geographic(
self._radar_pos.latitude,
self._radar_pos.longitude,
new_range,
new_az,
)
updated.append(RadarTarget(
id=t.id,
range=new_range,
velocity=new_vel,
azimuth=new_az,
elevation=t.elevation + random.uniform(-0.1, 0.1),
latitude=lat,
longitude=lon,
snr=t.snr + random.uniform(-1, 1),
timestamp=time.time(),
track_id=t.track_id,
classification=t.classification,
))
# Maintain a reasonable target count
if len(updated) < 5 or (random.random() < 0.05 and len(updated) < 15):
self._add_random_target()
updated.append(self._targets[-1])
self._targets = updated
self.targetsUpdated.emit(updated)