24b8442e40
Add SoftwareFPGA class that imports golden_reference functions to replicate the FPGA pipeline in software, enabling bit-accurate replay of raw IQ, FPGA co-sim, and HDF5 recordings through the same dashboard path as live data. New modules: software_fpga.py, replay.py (ReplayEngine + 3 loaders) Enhanced: WaveformConfig model, extract_targets_from_frame() in processing, ReplayWorker with thread-safe playback controls, dashboard replay UI with transport controls and dual-dispatch FPGA parameter routing. Removed: ReplayConnection (from radar_protocol, hardware, dashboard, tests) — replaced by the unified replay architecture. 150/150 tests pass, ruff clean.
289 lines
10 KiB
Python
289 lines
10 KiB
Python
"""
|
|
v7.replay — ReplayEngine: auto-detect format, load, and iterate RadarFrames.
|
|
|
|
Supports three data sources:
|
|
1. **FPGA co-sim directory** — pre-computed ``.npy`` files from golden_reference
|
|
2. **Raw IQ cube** ``.npy`` — complex baseband capture (e.g. ADI Phaser)
|
|
3. **HDF5 recording** ``.h5`` — frames captured by ``DataRecorder``
|
|
|
|
For raw IQ data the engine uses :class:`SoftwareFPGA` to run the full
|
|
bit-accurate signal chain, so changing FPGA control registers in the
|
|
dashboard re-processes the data.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import time
|
|
from enum import Enum, auto
|
|
from pathlib import Path
|
|
from typing import TYPE_CHECKING
|
|
|
|
import numpy as np
|
|
|
|
if TYPE_CHECKING:
|
|
from .software_fpga import SoftwareFPGA
|
|
|
|
# radar_protocol is a sibling module (not inside v7/)
|
|
import sys as _sys
|
|
|
|
_GUI_DIR = str(Path(__file__).resolve().parent.parent)
|
|
if _GUI_DIR not in _sys.path:
|
|
_sys.path.insert(0, _GUI_DIR)
|
|
from radar_protocol import RadarFrame # noqa: E402
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
# Lazy import — h5py is optional
|
|
try:
|
|
import h5py
|
|
|
|
HDF5_AVAILABLE = True
|
|
except ImportError:
|
|
HDF5_AVAILABLE = False
|
|
|
|
|
|
class ReplayFormat(Enum):
|
|
"""Detected input format."""
|
|
|
|
COSIM_DIR = auto()
|
|
RAW_IQ_NPY = auto()
|
|
HDF5 = auto()
|
|
|
|
|
|
# ───────────────────────────────────────────────────────────────────
|
|
# Format detection
|
|
# ───────────────────────────────────────────────────────────────────
|
|
|
|
_COSIM_REQUIRED = {"doppler_map_i.npy", "doppler_map_q.npy"}
|
|
|
|
|
|
def detect_format(path: str) -> ReplayFormat:
|
|
"""Auto-detect the replay data format from *path*.
|
|
|
|
Raises
|
|
------
|
|
ValueError
|
|
If the format cannot be determined.
|
|
"""
|
|
p = Path(path)
|
|
|
|
if p.is_dir():
|
|
children = {f.name for f in p.iterdir()}
|
|
if _COSIM_REQUIRED.issubset(children):
|
|
return ReplayFormat.COSIM_DIR
|
|
msg = f"Directory {p} does not contain required co-sim files: {_COSIM_REQUIRED - children}"
|
|
raise ValueError(msg)
|
|
|
|
if p.suffix == ".h5":
|
|
return ReplayFormat.HDF5
|
|
|
|
if p.suffix == ".npy":
|
|
return ReplayFormat.RAW_IQ_NPY
|
|
|
|
msg = f"Cannot determine replay format for: {p}"
|
|
raise ValueError(msg)
|
|
|
|
|
|
# ───────────────────────────────────────────────────────────────────
|
|
# ReplayEngine
|
|
# ───────────────────────────────────────────────────────────────────
|
|
|
|
class ReplayEngine:
|
|
"""Load replay data and serve RadarFrames on demand.
|
|
|
|
Parameters
|
|
----------
|
|
path : str
|
|
File or directory path to load.
|
|
software_fpga : SoftwareFPGA | None
|
|
Required only for ``RAW_IQ_NPY`` format. For other formats the
|
|
data is already processed and the FPGA instance is ignored.
|
|
"""
|
|
|
|
def __init__(self, path: str, software_fpga: SoftwareFPGA | None = None) -> None:
|
|
self.path = path
|
|
self.fmt = detect_format(path)
|
|
self.software_fpga = software_fpga
|
|
|
|
# Populated by _load_*
|
|
self._total_frames: int = 0
|
|
self._raw_iq: np.ndarray | None = None # for RAW_IQ_NPY
|
|
self._h5_file = None
|
|
self._h5_keys: list[str] = []
|
|
self._cosim_frame = None # single RadarFrame for co-sim
|
|
|
|
self._load()
|
|
|
|
# ------------------------------------------------------------------
|
|
# Loading
|
|
# ------------------------------------------------------------------
|
|
|
|
def _load(self) -> None:
|
|
if self.fmt is ReplayFormat.COSIM_DIR:
|
|
self._load_cosim()
|
|
elif self.fmt is ReplayFormat.RAW_IQ_NPY:
|
|
self._load_raw_iq()
|
|
elif self.fmt is ReplayFormat.HDF5:
|
|
self._load_hdf5()
|
|
|
|
def _load_cosim(self) -> None:
|
|
"""Load FPGA co-sim directory (already-processed .npy arrays).
|
|
|
|
Prefers fullchain (MTI-enabled) files when CFAR outputs are present,
|
|
so that I/Q data is consistent with the detection mask. Falls back
|
|
to the non-MTI ``doppler_map`` files when fullchain data is absent.
|
|
"""
|
|
d = Path(self.path)
|
|
|
|
# CFAR outputs (from the MTI→Doppler→DC-notch→CFAR chain)
|
|
cfar_flags = d / "fullchain_cfar_flags.npy"
|
|
cfar_mag = d / "fullchain_cfar_mag.npy"
|
|
has_cfar = cfar_flags.exists() and cfar_mag.exists()
|
|
|
|
# MTI-consistent I/Q (same chain that produced CFAR outputs)
|
|
mti_dop_i = d / "fullchain_mti_doppler_i.npy"
|
|
mti_dop_q = d / "fullchain_mti_doppler_q.npy"
|
|
has_mti_doppler = mti_dop_i.exists() and mti_dop_q.exists()
|
|
|
|
# Choose I/Q: prefer MTI-chain when CFAR data comes from that chain
|
|
if has_cfar and has_mti_doppler:
|
|
dop_i = np.load(mti_dop_i).astype(np.int16)
|
|
dop_q = np.load(mti_dop_q).astype(np.int16)
|
|
log.info("Co-sim: using fullchain MTI+Doppler I/Q (matches CFAR chain)")
|
|
else:
|
|
dop_i = np.load(d / "doppler_map_i.npy").astype(np.int16)
|
|
dop_q = np.load(d / "doppler_map_q.npy").astype(np.int16)
|
|
log.info("Co-sim: using non-MTI doppler_map I/Q")
|
|
|
|
frame = RadarFrame()
|
|
frame.range_doppler_i = dop_i
|
|
frame.range_doppler_q = dop_q
|
|
|
|
if has_cfar:
|
|
frame.detections = np.load(cfar_flags).astype(np.uint8)
|
|
frame.magnitude = np.load(cfar_mag).astype(np.float64)
|
|
else:
|
|
frame.magnitude = np.sqrt(
|
|
dop_i.astype(np.float64) ** 2 + dop_q.astype(np.float64) ** 2
|
|
)
|
|
frame.detections = np.zeros_like(dop_i, dtype=np.uint8)
|
|
|
|
frame.range_profile = frame.magnitude[:, 0]
|
|
frame.detection_count = int(frame.detections.sum())
|
|
frame.frame_number = 0
|
|
frame.timestamp = time.time()
|
|
|
|
self._cosim_frame = frame
|
|
self._total_frames = 1
|
|
log.info("Loaded co-sim directory: %s (1 frame)", self.path)
|
|
|
|
def _load_raw_iq(self) -> None:
|
|
"""Load raw complex IQ cube (.npy)."""
|
|
data = np.load(self.path, mmap_mode="r")
|
|
if data.ndim == 2:
|
|
# (chirps, samples) — single frame
|
|
data = data[np.newaxis, ...]
|
|
if data.ndim != 3:
|
|
msg = f"Expected 3-D array (frames, chirps, samples), got shape {data.shape}"
|
|
raise ValueError(msg)
|
|
self._raw_iq = data
|
|
self._total_frames = data.shape[0]
|
|
log.info(
|
|
"Loaded raw IQ: %s, shape %s (%d frames)",
|
|
self.path,
|
|
data.shape,
|
|
self._total_frames,
|
|
)
|
|
|
|
def _load_hdf5(self) -> None:
|
|
"""Load HDF5 recording (.h5)."""
|
|
if not HDF5_AVAILABLE:
|
|
msg = "h5py is required to load HDF5 recordings"
|
|
raise ImportError(msg)
|
|
self._h5_file = h5py.File(self.path, "r")
|
|
frames_grp = self._h5_file.get("frames")
|
|
if frames_grp is None:
|
|
msg = f"HDF5 file {self.path} has no 'frames' group"
|
|
raise ValueError(msg)
|
|
self._h5_keys = sorted(frames_grp.keys())
|
|
self._total_frames = len(self._h5_keys)
|
|
log.info("Loaded HDF5: %s (%d frames)", self.path, self._total_frames)
|
|
|
|
# ------------------------------------------------------------------
|
|
# Public API
|
|
# ------------------------------------------------------------------
|
|
|
|
@property
|
|
def total_frames(self) -> int:
|
|
return self._total_frames
|
|
|
|
def get_frame(self, index: int) -> RadarFrame:
|
|
"""Return the RadarFrame at *index* (0-based).
|
|
|
|
For ``RAW_IQ_NPY`` format, this runs the SoftwareFPGA chain
|
|
on the requested frame's chirps.
|
|
"""
|
|
if index < 0 or index >= self._total_frames:
|
|
msg = f"Frame index {index} out of range [0, {self._total_frames})"
|
|
raise IndexError(msg)
|
|
|
|
if self.fmt is ReplayFormat.COSIM_DIR:
|
|
return self._get_cosim(index)
|
|
if self.fmt is ReplayFormat.RAW_IQ_NPY:
|
|
return self._get_raw_iq(index)
|
|
return self._get_hdf5(index)
|
|
|
|
def close(self) -> None:
|
|
"""Release any open file handles."""
|
|
if self._h5_file is not None:
|
|
self._h5_file.close()
|
|
self._h5_file = None
|
|
|
|
# ------------------------------------------------------------------
|
|
# Per-format frame getters
|
|
# ------------------------------------------------------------------
|
|
|
|
def _get_cosim(self, _index: int) -> RadarFrame:
|
|
"""Co-sim: single static frame (index ignored).
|
|
|
|
Uses deepcopy so numpy arrays are not shared with the source,
|
|
preventing in-place mutation from corrupting cached data.
|
|
"""
|
|
import copy
|
|
frame = copy.deepcopy(self._cosim_frame)
|
|
frame.timestamp = time.time()
|
|
return frame
|
|
|
|
def _get_raw_iq(self, index: int) -> RadarFrame:
|
|
"""Raw IQ: quantize one frame and run through SoftwareFPGA."""
|
|
if self.software_fpga is None:
|
|
msg = "SoftwareFPGA is required for raw IQ replay"
|
|
raise RuntimeError(msg)
|
|
|
|
from .software_fpga import quantize_raw_iq
|
|
|
|
raw = self._raw_iq[index] # (chirps, samples) complex
|
|
iq_i, iq_q = quantize_raw_iq(raw[np.newaxis, ...])
|
|
return self.software_fpga.process_chirps(
|
|
iq_i, iq_q, frame_number=index, timestamp=time.time()
|
|
)
|
|
|
|
def _get_hdf5(self, index: int) -> RadarFrame:
|
|
"""HDF5: reconstruct RadarFrame from stored datasets."""
|
|
key = self._h5_keys[index]
|
|
grp = self._h5_file["frames"][key]
|
|
|
|
frame = RadarFrame()
|
|
frame.timestamp = float(grp.attrs.get("timestamp", time.time()))
|
|
frame.frame_number = int(grp.attrs.get("frame_number", index))
|
|
frame.detection_count = int(grp.attrs.get("detection_count", 0))
|
|
|
|
frame.range_doppler_i = np.array(grp["range_doppler_i"], dtype=np.int16)
|
|
frame.range_doppler_q = np.array(grp["range_doppler_q"], dtype=np.int16)
|
|
frame.magnitude = np.array(grp["magnitude"], dtype=np.float64)
|
|
frame.detections = np.array(grp["detections"], dtype=np.uint8)
|
|
frame.range_profile = np.array(grp["range_profile"], dtype=np.float64)
|
|
|
|
return frame
|