Merge remote-tracking branch 'upstream/develop'

This commit is contained in:
Serhii
2026-04-08 01:10:48 +03:00
8 changed files with 283 additions and 382 deletions
+71
View File
@@ -0,0 +1,71 @@
name: AERIS-10 CI
on:
pull_request:
branches: [main, develop]
push:
branches: [main, develop]
jobs:
# ===========================================================================
# Job 1: Python Host Software Tests (58 tests)
# radar_protocol, radar_dashboard, FT2232H connection, replay, opcodes, e2e
# ===========================================================================
python-tests:
name: Python Dashboard Tests (58)
runs-on: ubuntu-latest
steps:
- name: Checkout repository
uses: actions/checkout@v4
- name: Set up Python 3.12
uses: actions/setup-python@v5
with:
python-version: "3.12"
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install pytest numpy h5py
- name: Run test suite
run: python -m pytest 9_Firmware/9_3_GUI/test_radar_dashboard.py -v --tb=short
# ===========================================================================
# Job 2: MCU Firmware Unit Tests (20 tests)
# Bug regression (15) + Gap-3 safety tests (5)
# ===========================================================================
mcu-tests:
name: MCU Firmware Tests (20)
runs-on: ubuntu-latest
steps:
- name: Checkout repository
uses: actions/checkout@v4
- name: Install build tools
run: sudo apt-get update && sudo apt-get install -y build-essential
- name: Build and run MCU tests
working-directory: 9_Firmware/9_1_Microcontroller/tests
run: make test
# ===========================================================================
# Job 3: FPGA RTL Regression (23 testbenches + lint)
# Phase 0: Vivado-style lint, Phase 1-4: unit + integration + e2e
# ===========================================================================
fpga-regression:
name: FPGA Regression (23 TBs + lint)
runs-on: ubuntu-latest
steps:
- name: Checkout repository
uses: actions/checkout@v4
- name: Install Icarus Verilog
run: sudo apt-get update && sudo apt-get install -y iverilog
- name: Run full FPGA regression
working-directory: 9_Firmware/9_2_FPGA
run: bash run_regression.sh
+135 -25
View File
@@ -4,19 +4,19 @@
| File | Device | Package | Purpose | | File | Device | Package | Purpose |
|------|--------|---------|---------| |------|--------|---------|---------|
| `xc7a50t_ftg256.xdc` | XC7A50T-2FTG256I | FTG256 (256-ball BGA) | Upstream author's board (copy of `cntrt.xdc`) | | `xc7a50t_ftg256.xdc` | XC7A50T-2FTG256I | FTG256 (256-ball BGA) | 50T production board |
| `xc7a200t_fbg484.xdc` | XC7A200T-2FBG484I | FBG484 (484-ball BGA) | Production board (new PCB design) | | `xc7a200t_fbg484.xdc` | XC7A200T-2FBG484I | FBG484 (484-ball BGA) | 200T premium dev board |
| `te0712_te0701_minimal.xdc` | XC7A200T-2FBG484I | FBG484 (484-ball BGA) | Trenz dev split target (minimal clock/reset + LEDs/status) | | `te0712_te0701_minimal.xdc` | XC7A200T-2FBG484I | FBG484 (484-ball BGA) | Trenz dev split target (minimal clock/reset + LEDs/status) |
| `te0713_te0701_minimal.xdc` | XC7A200T-2FBG484C | FBG484 (484-ball BGA) | Trenz alternate SoM target (minimal clock + FMC status outputs) | | `te0713_te0701_minimal.xdc` | XC7A200T-2FBG484C | FBG484 (484-ball BGA) | Trenz alternate SoM target (minimal clock + FMC status outputs) |
## Why Four Files ## Why Four Files
The upstream prototype uses a smaller XC7A50T in an FTG256 package. The production The 50T production board uses an XC7A50T in an FTG256 package. The 200T premium
AERIS-10 radar migrates to the XC7A200T for more logic, BRAM, and DSP resources. dev board uses an XC7A200T for more logic, BRAM, and DSP resources. The two
The two devices have completely different packages and pin names, so each needs its devices have completely different packages and pin names, so each needs its own
own constraint file. constraint file.
The Trenz TE0712/TE0701 path uses the same FPGA part as production but different board The Trenz TE0712/TE0701 path uses the same FPGA part as the 200T but different board
pinout and peripherals. The dev target is split into its own top wrapper pinout and peripherals. The dev target is split into its own top wrapper
(`radar_system_top_te0712_dev.v`) and minimal constraints file to avoid accidental mixing (`radar_system_top_te0712_dev.v`) and minimal constraints file to avoid accidental mixing
of production pin assignments during bring-up. of production pin assignments during bring-up.
@@ -25,9 +25,83 @@ The Trenz TE0713/TE0701 path supports situations where TE0712 lead time is prohi
TE0713 uses XC7A200T-2FBG484C (commercial temp grade) and requires separate clock mapping, TE0713 uses XC7A200T-2FBG484C (commercial temp grade) and requires separate clock mapping,
so it has its own dev top and XDC. so it has its own dev top and XDC.
## USB Interface Architecture (USB_MODE)
The radar system supports two USB data interfaces, selected at **compile time** via
the `USB_MODE` parameter in `radar_system_top.v`:
| USB_MODE | Interface | Bus Width | Speed | Board Target |
|----------|-----------|-----------|-------|--------------|
| 0 (default) | FT601 (USB 3.0) | 32-bit | 100 MHz | 200T premium dev board |
| 1 | FT2232H (USB 2.0) | 8-bit | 60 MHz | 50T production board |
### How USB_MODE Works
`radar_system_top.v` contains a Verilog `generate` block that instantiates exactly
one USB interface module based on the `USB_MODE` parameter:
```
generate
if (USB_MODE == 0) begin : gen_ft601
usb_data_interface usb_inst (...) // FT601, 32-bit
// FT2232H ports tied off to inactive
end else begin : gen_ft2232h
usb_data_interface_ft2232h usb_inst (...) // FT2232H, 8-bit
// FT601 ports tied off to inactive
end
endgenerate
```
Both interfaces share the same internal radar data bus and host command interface.
The unused interface's I/O pins are tied to safe inactive states (active-low
signals high, active-high signals low, bidirectional buses high-Z).
### How USB_MODE Is Passed Per Board Target
The parameter is set via a **wrapper module** that overrides the default:
- **50T production**: `radar_system_top_50t.v` instantiates the core with
`.USB_MODE(1)` and maps the FT2232H's 60 MHz `CLKOUT` to the shared
`ft601_clk_in` port. FT601 inputs are tied inactive; outputs go to `_nc` wires.
```verilog
// In radar_system_top_50t.v:
radar_system_top #(
.USB_MODE(1)
) u_core ( ... );
```
- **200T dev board**: `radar_system_top` is used directly as the top module.
`USB_MODE` defaults to `0` (FT601). No wrapper needed.
### RTL Files by USB Interface
| File | Purpose |
|------|---------|
| `usb_data_interface.v` | FT601 USB 3.0 module (32-bit, USB_MODE=0) |
| `usb_data_interface_ft2232h.v` | FT2232H USB 2.0 module (8-bit, USB_MODE=1) |
| `radar_system_top.v` | Core module with USB_MODE generate block |
| `radar_system_top_50t.v` | 50T wrapper: sets USB_MODE=1, ties off FT601 |
### FT2232H Pin Map (50T, Bank 35, VCCO=3.3V)
All connections are direct between U6 (FT2232HQ) and U42 (XC7A50T). Only
Channel A is used (245 Synchronous FIFO mode). Channel B is unconnected.
| Signal | FT2232H Pin | FPGA Ball | Direction |
|--------|-------------|-----------|-----------|
| FT_D[7:0] | ADBUS[7:0] | K1,J3,H3,G4,F2,D1,C3,C1 | Bidirectional |
| FT_RXF# | ACBUS0 | A2 | Input (FIFO not empty) |
| FT_TXE# | ACBUS1 | B2 | Input (FIFO not full) |
| FT_RD# | ACBUS2 | A3 | Output (read strobe) |
| FT_WR# | ACBUS3 | A4 | Output (write strobe) |
| FT_SIWUA | ACBUS4 | A5 | Output (send immediate) |
| FT_CLKOUT | ACBUS5 | C4 (MRCC) | Input (60 MHz clock) |
| FT_OE# | ACBUS6 | B7 | Output (bus direction) |
## Bank Voltage Assignments ## Bank Voltage Assignments
### XC7A50T-FTG256 (Upstream) ### XC7A50T-FTG256 (50T Production)
| Bank | VCCO | Signals | | Bank | VCCO | Signals |
|------|------|---------| |------|------|---------|
@@ -35,9 +109,9 @@ so it has its own dev top and XDC.
| 14 | 3.3V | ADC LVDS (LVDS_33), SPI flash | | 14 | 3.3V | ADC LVDS (LVDS_33), SPI flash |
| 15 | 3.3V | DAC, clocks, STM32 3.3V SPI, DIG bus | | 15 | 3.3V | DAC, clocks, STM32 3.3V SPI, DIG bus |
| 34 | 1.8V | ADAR1000 control, SPI 1.8V side | | 34 | 1.8V | ADAR1000 control, SPI 1.8V side |
| 35 | 3.3V | Unused (no signal connections) | | 35 | 3.3V | FT2232H USB 2.0 (8-bit data + control, 15 signals) |
### XC7A200T-FBG484 (Production) ### XC7A200T-FBG484 (200T Premium Dev Board)
| Bank | VCCO | Used/Avail | Signals | | Bank | VCCO | Used/Avail | Signals |
|------|------|------------|---------| |------|------|------------|---------|
@@ -50,15 +124,43 @@ so it has its own dev top and XDC.
## Signal Differences Between Targets ## Signal Differences Between Targets
| Signal | Upstream (FTG256) | Production (FBG484) | | Signal | 50T Production (FTG256) | 200T Dev (FBG484) |
|--------|-------------------|---------------------| |--------|-------------------------|-------------------|
| FT601 USB | Unwired (chip placed, no nets) | Fully wired, Bank 16 | | USB interface | FT2232H USB 2.0 (8-bit, Bank 35) | FT601 USB 3.0 (32-bit, Bank 16) |
| USB_MODE | 1 (via `radar_system_top_50t` wrapper) | 0 (default in `radar_system_top`) |
| USB clock | 60 MHz from FT2232H CLKOUT | 100 MHz from FT601 |
| `dac_clk` | Not connected (DAC clocked by AD9523 directly) | Routed, FPGA drives DAC | | `dac_clk` | Not connected (DAC clocked by AD9523 directly) | Routed, FPGA drives DAC |
| `ft601_be` width | `[1:0]` in upstream RTL | `[3:0]` (RTL updated) | | `ft601_be` width | N/A (FT601 unused, tied off) | `[3:0]` (RTL updated) |
| ADC LVDS standard | LVDS_33 (3.3V bank) | LVDS_25 (2.5V bank, better quality) | | ADC LVDS standard | LVDS_33 (3.3V bank) | LVDS_25 (2.5V bank, better quality) |
| Status/debug outputs | No physical pins (commented out) | All routed to Banks 35 + 13 | | Status/debug outputs | No physical pins (commented out) | All routed to Banks 35 + 13 |
## How to Select in Vivado ## How to Build
### Quick Reference
```bash
# From the FPGA source directory (9_Firmware/9_2_FPGA):
# 50T production build (FT2232H, USB_MODE=1):
vivado -mode batch -source scripts/50t/build_50t.tcl 2>&1 | tee build_50t/vivado.log
# 200T dev build (FT601, USB_MODE=0):
vivado -mode batch -source scripts/200t/build_200t.tcl \
-log build/build.log -journal build/build.jou
```
The build scripts automatically select the correct top module and constraints:
| Build Script | Top Module | Constraints | USB_MODE |
|--------------|------------|-------------|----------|
| `scripts/50t/build_50t.tcl` | `radar_system_top_50t` | `xc7a50t_ftg256.xdc` | 1 (FT2232H) |
| `scripts/200t/build_200t.tcl` | `radar_system_top` | `xc7a200t_fbg484.xdc` | 0 (FT601) |
You do NOT need to set `USB_MODE` manually. The top module selection handles it:
- `radar_system_top_50t` forces `USB_MODE=1` internally
- `radar_system_top` defaults to `USB_MODE=0`
## How to Select Constraints in Vivado
In the Vivado project, only one target XDC should be active at a time: In the Vivado project, only one target XDC should be active at a time:
@@ -85,12 +187,12 @@ read_xdc constraints/te0713_te0701_minimal.xdc
## Top Modules by Target ## Top Modules by Target
| Target | Top module | Notes | | Target | Top module | USB_MODE | USB Interface | Notes |
|--------|------------|-------| |--------|------------|----------|---------------|-------|
| Upstream FTG256 | `radar_system_top` | Legacy board support | | 50T Production (FTG256) | `radar_system_top_50t` | 1 | FT2232H (8-bit) | Wrapper sets USB_MODE=1, ties off FT601 |
| Production FBG484 | `radar_system_top` | Main AERIS-10 board | | 200T Dev (FBG484) | `radar_system_top` | 0 (default) | FT601 (32-bit) | No wrapper needed |
| Trenz TE0712/TE0701 | `radar_system_top_te0712_dev` | Minimal bring-up wrapper while pinout/peripherals are migrated | | Trenz TE0712/TE0701 | `radar_system_top_te0712_dev` | 0 (default) | FT601 (32-bit) | Minimal bring-up wrapper |
| Trenz TE0713/TE0701 | `radar_system_top_te0713_dev` | Alternate SoM wrapper (TE0713 clock mapping) | | Trenz TE0713/TE0701 | `radar_system_top_te0713_dev` | 0 (default) | FT601 (32-bit) | Alternate SoM wrapper |
## Trenz Split Status ## Trenz Split Status
@@ -142,11 +244,19 @@ TE0713 outputs:
## Notes ## Notes
- The production XDC pin assignments are **recommended** for the new PCB. - **USB_MODE is compile-time only.** You cannot switch USB interfaces at runtime.
Each board target has exactly one USB chip physically connected.
- The 50T production build must use `radar_system_top_50t` as top module. Using
`radar_system_top` directly will default to FT601 (USB_MODE=0), which has no
physical connection on the 50T board.
- The 200T XDC pin assignments are **recommended** for the new PCB.
The PCB designer should follow this allocation. The PCB designer should follow this allocation.
- Bank 16 (FT601) is fully utilized at 50/50 pins. No room for expansion - Bank 16 on the 200T (FT601) is fully utilized at 50/50 pins. No room for expansion
on that bank. on that bank.
- Bank 35 (status/debug) is also at capacity (50/50). Additional debug - Bank 35 on the 200T (status/debug) is also at capacity (50/50). Additional debug
signals should use Bank 13 spare pins (18 remaining). signals should use Bank 13 spare pins (18 remaining).
- Bank 35 on the 50T is used for FT2232H (15 signals). Remaining pins are available
for future expansion.
- Clock inputs are placed on MRCC (Multi-Region Clock Capable) pins to - Clock inputs are placed on MRCC (Multi-Region Clock Capable) pins to
ensure proper clock tree access. ensure proper clock tree access. The FT2232H CLKOUT (60 MHz) is on
pin C4 (`IO_L12N_T1_MRCC_35`).
+2 -2
View File
@@ -8,6 +8,6 @@ GUI_V5 ==> Added Mercury Color
GUI_V6 ==> Added USB3 FT601 support GUI_V6 ==> Added USB3 FT601 support
radar_dashboard ==> Board bring-up dashboard (FT601 reader, real-time R-D heatmap, CFAR overlay, waterfall, host commands, HDF5 recording) radar_dashboard ==> Board bring-up dashboard (FT2232H reader, real-time R-D heatmap, CFAR overlay, waterfall, host commands, HDF5 recording)
radar_protocol ==> Protocol layer (packet parsing, command building, FT601 connection, data recorder, acquisition thread) radar_protocol ==> Protocol layer (packet parsing, command building, FT2232H connection, data recorder, acquisition thread)
smoke_test ==> Board bring-up smoke test host script (triggers FPGA self-test via opcode 0x30) smoke_test ==> Board bring-up smoke test host script (triggers FPGA self-test via opcode 0x30)
+9 -9
View File
@@ -3,10 +3,10 @@
AERIS-10 Radar Dashboard — Board Bring-Up Edition AERIS-10 Radar Dashboard — Board Bring-Up Edition
=================================================== ===================================================
Real-time visualization and control for the AERIS-10 phased-array radar Real-time visualization and control for the AERIS-10 phased-array radar
via FT601 USB 3.0 interface. via FT2232H USB 2.0 interface.
Features: Features:
- FT601 USB reader with packet parsing (matches usb_data_interface.v) - FT2232H USB reader with packet parsing (matches usb_data_interface_ft2232h.v)
- Real-time range-Doppler magnitude heatmap (64x32) - Real-time range-Doppler magnitude heatmap (64x32)
- CFAR detection overlay (flagged cells highlighted) - CFAR detection overlay (flagged cells highlighted)
- Range profile waterfall plot (range vs. time) - Range profile waterfall plot (range vs. time)
@@ -17,7 +17,7 @@ Features:
Usage: Usage:
python radar_dashboard.py # Launch with mock data python radar_dashboard.py # Launch with mock data
python radar_dashboard.py --live # Launch with FT601 hardware python radar_dashboard.py --live # Launch with FT2232H hardware
python radar_dashboard.py --record # Launch with HDF5 recording python radar_dashboard.py --record # Launch with HDF5 recording
""" """
@@ -43,7 +43,7 @@ from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
# Import protocol layer (no GUI deps) # Import protocol layer (no GUI deps)
from radar_protocol import ( from radar_protocol import (
RadarProtocol, FT601Connection, ReplayConnection, RadarProtocol, FT2232HConnection, ReplayConnection,
DataRecorder, RadarAcquisition, DataRecorder, RadarAcquisition,
RadarFrame, StatusResponse, Opcode, RadarFrame, StatusResponse, Opcode,
NUM_RANGE_BINS, NUM_DOPPLER_BINS, WATERFALL_DEPTH, NUM_RANGE_BINS, NUM_DOPPLER_BINS, WATERFALL_DEPTH,
@@ -82,7 +82,7 @@ class RadarDashboard:
BANDWIDTH = 500e6 # Hz — chirp bandwidth BANDWIDTH = 500e6 # Hz — chirp bandwidth
C = 3e8 # m/s — speed of light C = 3e8 # m/s — speed of light
def __init__(self, root: tk.Tk, connection: FT601Connection, def __init__(self, root: tk.Tk, connection: FT2232HConnection,
recorder: DataRecorder): recorder: DataRecorder):
self.root = root self.root = root
self.conn = connection self.conn = connection
@@ -552,7 +552,7 @@ class _TextHandler(logging.Handler):
def main(): def main():
parser = argparse.ArgumentParser(description="AERIS-10 Radar Dashboard") parser = argparse.ArgumentParser(description="AERIS-10 Radar Dashboard")
parser.add_argument("--live", action="store_true", parser.add_argument("--live", action="store_true",
help="Use real FT601 hardware (default: mock mode)") help="Use real FT2232H hardware (default: mock mode)")
parser.add_argument("--replay", type=str, metavar="NPY_DIR", parser.add_argument("--replay", type=str, metavar="NPY_DIR",
help="Replay real data from .npy directory " help="Replay real data from .npy directory "
"(e.g. tb/cosim/real_data/hex/)") "(e.g. tb/cosim/real_data/hex/)")
@@ -561,7 +561,7 @@ def main():
parser.add_argument("--record", action="store_true", parser.add_argument("--record", action="store_true",
help="Start HDF5 recording immediately") help="Start HDF5 recording immediately")
parser.add_argument("--device", type=int, default=0, parser.add_argument("--device", type=int, default=0,
help="FT601 device index (default: 0)") help="FT2232H device index (default: 0)")
args = parser.parse_args() args = parser.parse_args()
if args.replay: if args.replay:
@@ -569,10 +569,10 @@ def main():
conn = ReplayConnection(npy_dir, use_mti=not args.no_mti) conn = ReplayConnection(npy_dir, use_mti=not args.no_mti)
mode_str = f"REPLAY ({npy_dir}, MTI={'OFF' if args.no_mti else 'ON'})" mode_str = f"REPLAY ({npy_dir}, MTI={'OFF' if args.no_mti else 'ON'})"
elif args.live: elif args.live:
conn = FT601Connection(mock=False) conn = FT2232HConnection(mock=False)
mode_str = "LIVE" mode_str = "LIVE"
else: else:
conn = FT601Connection(mock=True) conn = FT2232HConnection(mock=True)
mode_str = "MOCK" mode_str = "MOCK"
recorder = DataRecorder() recorder = DataRecorder()
+25 -291
View File
@@ -5,21 +5,12 @@ AERIS-10 Radar Protocol Layer
Pure-logic module for USB packet parsing and command building. Pure-logic module for USB packet parsing and command building.
No GUI dependencies — safe to import from tests and headless scripts. No GUI dependencies — safe to import from tests and headless scripts.
Supports two USB interfaces: USB Interface: FT2232H USB 2.0 (8-bit, 50T production board) via pyftdi
- FT601 USB 3.0 (32-bit, 200T dev board) via ftd3xx
- FT2232H USB 2.0 (8-bit, 50T production board) via pyftdi
USB Packet Protocol (FT601, 35-byte): USB Packet Protocol (11-byte):
TX (FPGA→Host):
Data packet: [0xAA] [range 4×32b] [doppler 4×32b] [det 1B] [0x55]
Status packet: [0xBB] [status 6×32b] [0x55]
RX (Host→FPGA):
Command word: {opcode[31:24], addr[23:16], value[15:0]}
USB Packet Protocol (FT2232H, 11-byte compact):
TX (FPGA→Host): TX (FPGA→Host):
Data packet: [0xAA] [range_q 2B] [range_i 2B] [dop_re 2B] [dop_im 2B] [det 1B] [0x55] Data packet: [0xAA] [range_q 2B] [range_i 2B] [dop_re 2B] [dop_im 2B] [det 1B] [0x55]
Status packet: [0xBB] [status 6×32b] [0x55] (same 26-byte format) Status packet: [0xBB] [status 6×32b] [0x55]
RX (Host→FPGA): RX (Host→FPGA):
Command: 4 bytes received sequentially {opcode, addr, value_hi, value_lo} Command: 4 bytes received sequentially {opcode, addr, value_hi, value_lo}
""" """
@@ -48,9 +39,8 @@ FOOTER_BYTE = 0x55
STATUS_HEADER_BYTE = 0xBB STATUS_HEADER_BYTE = 0xBB
# Packet sizes # Packet sizes
DATA_PACKET_SIZE_FT601 = 35 # FT601: 1 + 16 + 16 + 1 + 1 DATA_PACKET_SIZE = 11 # 1 + 4 + 2 + 2 + 1 + 1
DATA_PACKET_SIZE_FT2232H = 11 # FT2232H: 1 + 4 + 2 + 2 + 1 + 1 STATUS_PACKET_SIZE = 26 # 1 + 24 + 1
STATUS_PACKET_SIZE = 26 # Same for both: 1 + 24 + 1
NUM_RANGE_BINS = 64 NUM_RANGE_BINS = 64
NUM_DOPPLER_BINS = 32 NUM_DOPPLER_BINS = 32
@@ -148,7 +138,7 @@ class RadarProtocol:
def build_command(opcode: int, value: int, addr: int = 0) -> bytes: def build_command(opcode: int, value: int, addr: int = 0) -> bytes:
""" """
Build a 32-bit command word: {opcode[31:24], addr[23:16], value[15:0]}. Build a 32-bit command word: {opcode[31:24], addr[23:16], value[15:0]}.
Returns 4 bytes, big-endian (MSB first as FT601 expects). Returns 4 bytes, big-endian (MSB first).
""" """
word = ((opcode & 0xFF) << 24) | ((addr & 0xFF) << 16) | (value & 0xFFFF) word = ((opcode & 0xFF) << 24) | ((addr & 0xFF) << 16) | (value & 0xFFFF)
return struct.pack(">I", word) return struct.pack(">I", word)
@@ -156,70 +146,11 @@ class RadarProtocol:
@staticmethod @staticmethod
def parse_data_packet(raw: bytes) -> Optional[Dict[str, Any]]: def parse_data_packet(raw: bytes) -> Optional[Dict[str, Any]]:
""" """
Parse a single data packet from the FPGA byte stream. Parse an 11-byte data packet from the FT2232H byte stream.
Returns dict with keys: 'range_i', 'range_q', 'doppler_i', 'doppler_q', Returns dict with keys: 'range_i', 'range_q', 'doppler_i', 'doppler_q',
'detection', or None if invalid. 'detection', or None if invalid.
Packet format (all streams enabled): Packet format (11 bytes):
[0xAA] [range 4×4B] [doppler 4×4B] [det 1B] [0x55]
= 1 + 16 + 16 + 1 + 1 = 35 bytes
With byte-enables, the FT601 delivers only valid bytes.
Header/footer/detection use BE=0001 → 1 byte each.
Range/doppler use BE=1111 → 4 bytes each × 4 transfers.
In practice, the range data word 0 contains the full 32-bit value
{range_q[15:0], range_i[15:0]}. Words 13 are shifted copies.
Similarly, doppler word 0 = {doppler_real, doppler_imag}.
"""
if len(raw) < 3:
return None
if raw[0] != HEADER_BYTE:
return None
result = {}
pos = 1
# Range data: 4 × 4 bytes, only word 0 matters
if pos + 16 <= len(raw):
range_word0 = struct.unpack_from(">I", raw, pos)[0]
result["range_i"] = _to_signed16(range_word0 & 0xFFFF)
result["range_q"] = _to_signed16((range_word0 >> 16) & 0xFFFF)
pos += 16
else:
return None
# Doppler data: 4 × 4 bytes, only word 0 matters
# Word 0 layout: {doppler_real[31:16], doppler_imag[15:0]}
if pos + 16 <= len(raw):
dop_word0 = struct.unpack_from(">I", raw, pos)[0]
result["doppler_q"] = _to_signed16(dop_word0 & 0xFFFF)
result["doppler_i"] = _to_signed16((dop_word0 >> 16) & 0xFFFF)
pos += 16
else:
return None
# Detection: 1 byte
if pos + 1 <= len(raw):
result["detection"] = raw[pos] & 0x01
pos += 1
else:
return None
# Footer
if pos < len(raw) and raw[pos] == FOOTER_BYTE:
pos += 1
return result
@staticmethod
def parse_data_packet_compact(raw: bytes) -> Optional[Dict[str, Any]]:
"""
Parse a compact 11-byte data packet from the FT2232H byte stream.
Returns dict with keys: 'range_i', 'range_q', 'doppler_i', 'doppler_q',
'detection', or None if invalid.
Compact packet format (FT2232H, 11 bytes):
Byte 0: 0xAA (header) Byte 0: 0xAA (header)
Bytes 1-2: range_q[15:0] MSB first Bytes 1-2: range_q[15:0] MSB first
Bytes 3-4: range_i[15:0] MSB first Bytes 3-4: range_i[15:0] MSB first
@@ -228,7 +159,7 @@ class RadarProtocol:
Byte 9: {7'b0, cfar_detection} Byte 9: {7'b0, cfar_detection}
Byte 10: 0x55 (footer) Byte 10: 0x55 (footer)
""" """
if len(raw) < DATA_PACKET_SIZE_FT2232H: if len(raw) < DATA_PACKET_SIZE:
return None return None
if raw[0] != HEADER_BYTE: if raw[0] != HEADER_BYTE:
return None return None
@@ -292,23 +223,16 @@ class RadarProtocol:
return sr return sr
@staticmethod @staticmethod
def find_packet_boundaries(buf: bytes, def find_packet_boundaries(buf: bytes) -> List[Tuple[int, int, str]]:
compact: bool = False) -> List[Tuple[int, int, str]]:
""" """
Scan buffer for packet start markers (0xAA data, 0xBB status). Scan buffer for packet start markers (0xAA data, 0xBB status).
Returns list of (start_idx, expected_end_idx, packet_type). Returns list of (start_idx, expected_end_idx, packet_type).
Args:
buf: Raw byte buffer from USB read.
compact: If True, use 11-byte compact packets (FT2232H).
If False, use 35-byte packets (FT601, default).
""" """
data_size = DATA_PACKET_SIZE_FT2232H if compact else DATA_PACKET_SIZE_FT601
packets = [] packets = []
i = 0 i = 0
while i < len(buf): while i < len(buf):
if buf[i] == HEADER_BYTE: if buf[i] == HEADER_BYTE:
end = i + data_size end = i + DATA_PACKET_SIZE
if end <= len(buf): if end <= len(buf):
packets.append((i, end, "data")) packets.append((i, end, "data"))
i = end i = end
@@ -327,151 +251,6 @@ class RadarProtocol:
return packets return packets
# ============================================================================
# FT601 USB Connection
# ============================================================================
# Optional ftd3xx import
try:
import ftd3xx
FTD3XX_AVAILABLE = True
except ImportError:
FTD3XX_AVAILABLE = False
class FT601Connection:
"""
FT601 USB 3.0 FIFO bridge communication.
Supports ftd3xx (native D3XX) or mock mode.
"""
def __init__(self, mock: bool = True):
self._mock = mock
self._device = None
self._lock = threading.Lock()
self.is_open = False
# Mock state
self._mock_frame_num = 0
self._mock_rng = np.random.RandomState(42)
def open(self, device_index: int = 0) -> bool:
if self._mock:
self.is_open = True
log.info("FT601 mock device opened (no hardware)")
return True
if not FTD3XX_AVAILABLE:
log.error("ftd3xx not installed — cannot open real FT601 device")
return False
try:
self._device = ftd3xx.create(device_index, ftd3xx.CONFIGURATION_CHANNEL_0)
if self._device is None:
log.error("ftd3xx.create returned None")
return False
self.is_open = True
log.info(f"FT601 device {device_index} opened")
return True
except Exception as e:
log.error(f"FT601 open failed: {e}")
return False
def close(self):
if self._device is not None:
try:
self._device.close()
except Exception:
pass
self._device = None
self.is_open = False
def read(self, size: int = 4096) -> Optional[bytes]:
"""Read raw bytes from FT601. Returns None on error/timeout."""
if not self.is_open:
return None
if self._mock:
return self._mock_read(size)
with self._lock:
try:
buf = self._device.readPipe(0x82, size, raw=True)
return bytes(buf) if buf else None
except Exception as e:
log.error(f"FT601 read error: {e}")
return None
def write(self, data: bytes) -> bool:
"""Write raw bytes to FT601."""
if not self.is_open:
return False
if self._mock:
log.info(f"FT601 mock write: {data.hex()}")
return True
with self._lock:
try:
self._device.writePipe(0x02, data, len(data))
return True
except Exception as e:
log.error(f"FT601 write error: {e}")
return False
def _mock_read(self, size: int) -> bytes:
"""
Generate synthetic radar data packets for testing.
Simulates a batch of packets with a target near range bin 20, Doppler bin 8.
"""
time.sleep(0.05) # Simulate USB latency
self._mock_frame_num += 1
buf = bytearray()
num_packets = min(32, size // 35)
for _ in range(num_packets):
rbin = self._mock_rng.randint(0, NUM_RANGE_BINS)
dbin = self._mock_rng.randint(0, NUM_DOPPLER_BINS)
# Simulate range profile with a target at bin ~20 and noise
range_i = int(self._mock_rng.normal(0, 100))
range_q = int(self._mock_rng.normal(0, 100))
if abs(rbin - 20) < 3:
range_i += 5000
range_q += 3000
# Simulate Doppler with target at Doppler bin ~8
dop_i = int(self._mock_rng.normal(0, 50))
dop_q = int(self._mock_rng.normal(0, 50))
if abs(rbin - 20) < 3 and abs(dbin - 8) < 2:
dop_i += 8000
dop_q += 4000
detection = 1 if (abs(rbin - 20) < 2 and abs(dbin - 8) < 2) else 0
# Build packet
pkt = bytearray()
pkt.append(HEADER_BYTE)
rword = (((range_q & 0xFFFF) << 16) | (range_i & 0xFFFF)) & 0xFFFFFFFF
pkt += struct.pack(">I", rword)
pkt += struct.pack(">I", ((rword << 8) & 0xFFFFFFFF))
pkt += struct.pack(">I", ((rword << 16) & 0xFFFFFFFF))
pkt += struct.pack(">I", ((rword << 24) & 0xFFFFFFFF))
dword = (((dop_i & 0xFFFF) << 16) | (dop_q & 0xFFFF)) & 0xFFFFFFFF
pkt += struct.pack(">I", dword)
pkt += struct.pack(">I", ((dword << 8) & 0xFFFFFFFF))
pkt += struct.pack(">I", ((dword << 16) & 0xFFFFFFFF))
pkt += struct.pack(">I", ((dword << 24) & 0xFFFFFFFF))
pkt.append(detection & 0x01)
pkt.append(FOOTER_BYTE)
buf += pkt
return bytes(buf)
# ============================================================================ # ============================================================================
# FT2232H USB 2.0 Connection (pyftdi, 245 Synchronous FIFO) # FT2232H USB 2.0 Connection (pyftdi, 245 Synchronous FIFO)
# ============================================================================ # ============================================================================
@@ -576,13 +355,14 @@ class FT2232HConnection:
def _mock_read(self, size: int) -> bytes: def _mock_read(self, size: int) -> bytes:
""" """
Generate synthetic compact radar data packets (11-byte) for testing. Generate synthetic compact radar data packets (11-byte) for testing.
Same target simulation as FT601 mock but using compact format. Generate synthetic 11-byte radar data packets for testing.
Simulates a batch of packets with a target near range bin 20, Doppler bin 8.
""" """
time.sleep(0.05) # Simulate USB latency time.sleep(0.05)
self._mock_frame_num += 1 self._mock_frame_num += 1
buf = bytearray() buf = bytearray()
num_packets = min(32, size // DATA_PACKET_SIZE_FT2232H) num_packets = min(32, size // DATA_PACKET_SIZE)
for _ in range(num_packets): for _ in range(num_packets):
rbin = self._mock_rng.randint(0, NUM_RANGE_BINS) rbin = self._mock_rng.randint(0, NUM_RANGE_BINS)
dbin = self._mock_rng.randint(0, NUM_DOPPLER_BINS) dbin = self._mock_rng.randint(0, NUM_DOPPLER_BINS)
@@ -780,11 +560,10 @@ class ReplayConnection:
""" """
def __init__(self, npy_dir: str, use_mti: bool = True, def __init__(self, npy_dir: str, use_mti: bool = True,
replay_fps: float = 5.0, compact: bool = False): replay_fps: float = 5.0):
self._npy_dir = npy_dir self._npy_dir = npy_dir
self._use_mti = use_mti self._use_mti = use_mti
self._replay_fps = max(replay_fps, 0.1) self._replay_fps = max(replay_fps, 0.1)
self._compact = compact # True = FT2232H 11-byte packets
self._lock = threading.Lock() self._lock = threading.Lock()
self.is_open = False self.is_open = False
self._packets: bytes = b"" self._packets: bytes = b""
@@ -958,8 +737,7 @@ class ReplayConnection:
det = np.zeros((NUM_RANGE_BINS, NUM_DOPPLER_BINS), dtype=bool) det = np.zeros((NUM_RANGE_BINS, NUM_DOPPLER_BINS), dtype=bool)
det_count = int(det.sum()) det_count = int(det.sum())
pkt_fmt = "compact" if self._compact else "FT601" log.info(f"Replay: rebuilt {NUM_CELLS} packets ("
log.info(f"Replay: rebuilt {NUM_CELLS} packets ({pkt_fmt}, "
f"MTI={'ON' if self._mti_enable else 'OFF'}, " f"MTI={'ON' if self._mti_enable else 'OFF'}, "
f"DC_notch={self._dc_notch_width}, " f"DC_notch={self._dc_notch_width}, "
f"CFAR={'ON' if self._cfar_enable else 'OFF'} " f"CFAR={'ON' if self._cfar_enable else 'OFF'} "
@@ -970,14 +748,11 @@ class ReplayConnection:
range_i = self._range_i_vec range_i = self._range_i_vec
range_q = self._range_q_vec range_q = self._range_q_vec
if self._compact: return self._build_packets_data(range_i, range_q, dop_i, dop_q, det)
return self._build_packets_compact(range_i, range_q, dop_i, dop_q, det)
else:
return self._build_packets_ft601(range_i, range_q, dop_i, dop_q, det)
def _build_packets_compact(self, range_i, range_q, dop_i, dop_q, det) -> bytes: def _build_packets_data(self, range_i, range_q, dop_i, dop_q, det) -> bytes:
"""Build compact 11-byte packets for FT2232H interface.""" """Build 11-byte data packets for FT2232H interface."""
buf = bytearray(NUM_CELLS * DATA_PACKET_SIZE_FT2232H) buf = bytearray(NUM_CELLS * DATA_PACKET_SIZE)
pos = 0 pos = 0
for rbin in range(NUM_RANGE_BINS): for rbin in range(NUM_RANGE_BINS):
ri = int(np.clip(range_i[rbin], -32768, 32767)) ri = int(np.clip(range_i[rbin], -32768, 32767))
@@ -999,40 +774,6 @@ class ReplayConnection:
return bytes(buf) return bytes(buf)
def _build_packets_ft601(self, range_i, range_q, dop_i, dop_q, det) -> bytes:
"""Build 35-byte packets for FT601 interface."""
buf = bytearray(NUM_CELLS * DATA_PACKET_SIZE_FT601)
pos = 0
for rbin in range(NUM_RANGE_BINS):
ri = int(np.clip(range_i[rbin], -32768, 32767)) & 0xFFFF
rq = int(np.clip(range_q[rbin], -32768, 32767)) & 0xFFFF
rword = ((rq << 16) | ri) & 0xFFFFFFFF
rw0 = struct.pack(">I", rword)
rw1 = struct.pack(">I", (rword << 8) & 0xFFFFFFFF)
rw2 = struct.pack(">I", (rword << 16) & 0xFFFFFFFF)
rw3 = struct.pack(">I", (rword << 24) & 0xFFFFFFFF)
for dbin in range(NUM_DOPPLER_BINS):
di = int(np.clip(dop_i[rbin, dbin], -32768, 32767)) & 0xFFFF
dq = int(np.clip(dop_q[rbin, dbin], -32768, 32767)) & 0xFFFF
d = 1 if det[rbin, dbin] else 0
dword = ((di << 16) | dq) & 0xFFFFFFFF
buf[pos] = HEADER_BYTE
pos += 1
buf[pos:pos+4] = rw0; pos += 4
buf[pos:pos+4] = rw1; pos += 4
buf[pos:pos+4] = rw2; pos += 4
buf[pos:pos+4] = rw3; pos += 4
buf[pos:pos+4] = struct.pack(">I", dword); pos += 4
buf[pos:pos+4] = struct.pack(">I", (dword << 8) & 0xFFFFFFFF); pos += 4
buf[pos:pos+4] = struct.pack(">I", (dword << 16) & 0xFFFFFFFF); pos += 4
buf[pos:pos+4] = struct.pack(">I", (dword << 24) & 0xFFFFFFFF); pos += 4
buf[pos] = d; pos += 1
buf[pos] = FOOTER_BYTE; pos += 1
return bytes(buf)
# ============================================================================ # ============================================================================
# Data Recorder (HDF5) # Data Recorder (HDF5)
@@ -1112,20 +853,18 @@ class DataRecorder:
class RadarAcquisition(threading.Thread): class RadarAcquisition(threading.Thread):
""" """
Background thread: reads from USB (FT601 or FT2232H), parses packets, Background thread: reads from USB (FT2232H), parses 11-byte packets,
assembles frames, and pushes complete frames to the display queue. assembles frames, and pushes complete frames to the display queue.
""" """
def __init__(self, connection, frame_queue: queue.Queue, def __init__(self, connection, frame_queue: queue.Queue,
recorder: Optional[DataRecorder] = None, recorder: Optional[DataRecorder] = None,
status_callback=None, status_callback=None):
compact: bool = False):
super().__init__(daemon=True) super().__init__(daemon=True)
self.conn = connection self.conn = connection
self.frame_queue = frame_queue self.frame_queue = frame_queue
self.recorder = recorder self.recorder = recorder
self._status_callback = status_callback self._status_callback = status_callback
self._compact = compact # True for FT2232H 11-byte packets
self._stop_event = threading.Event() self._stop_event = threading.Event()
self._frame = RadarFrame() self._frame = RadarFrame()
self._sample_idx = 0 self._sample_idx = 0
@@ -1135,21 +874,16 @@ class RadarAcquisition(threading.Thread):
self._stop_event.set() self._stop_event.set()
def run(self): def run(self):
log.info(f"Acquisition thread started (compact={self._compact})") log.info("Acquisition thread started")
while not self._stop_event.is_set(): while not self._stop_event.is_set():
raw = self.conn.read(4096) raw = self.conn.read(4096)
if raw is None or len(raw) == 0: if raw is None or len(raw) == 0:
time.sleep(0.01) time.sleep(0.01)
continue continue
packets = RadarProtocol.find_packet_boundaries( packets = RadarProtocol.find_packet_boundaries(raw)
raw, compact=self._compact)
for start, end, ptype in packets: for start, end, ptype in packets:
if ptype == "data": if ptype == "data":
if self._compact:
parsed = RadarProtocol.parse_data_packet_compact(
raw[start:end])
else:
parsed = RadarProtocol.parse_data_packet( parsed = RadarProtocol.parse_data_packet(
raw[start:end]) raw[start:end])
if parsed is not None: if parsed is not None:
@@ -5,5 +5,5 @@ numpy>=1.24
matplotlib>=3.7 matplotlib>=3.7
h5py>=3.8 h5py>=3.8
# FT601 USB 3.0 driver (install from FTDI website if not on PyPI) # FT2232H USB 2.0 driver (pyftdi — pure Python, pip-installable)
# ftd3xx # Optional: only needed for --live mode with real hardware # pyftdi>=0.54 # Optional: only needed for --live mode with real hardware
+7 -7
View File
@@ -8,7 +8,7 @@ optionally captures raw ADC samples for offline analysis.
Usage: Usage:
python smoke_test.py # Mock mode (no hardware) python smoke_test.py # Mock mode (no hardware)
python smoke_test.py --live # Real FT601 hardware python smoke_test.py --live # Real FT2232H hardware
python smoke_test.py --live --adc-dump adc_raw.npy # Capture ADC data python smoke_test.py --live --adc-dump adc_raw.npy # Capture ADC data
Self-Test Subsystems: Self-Test Subsystems:
@@ -35,7 +35,7 @@ import numpy as np
# Add parent directory for radar_protocol import # Add parent directory for radar_protocol import
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from radar_protocol import RadarProtocol, FT601Connection from radar_protocol import RadarProtocol, FT2232HConnection
logging.basicConfig( logging.basicConfig(
level=logging.INFO, level=logging.INFO,
@@ -67,7 +67,7 @@ TEST_NAMES = {
class SmokeTest: class SmokeTest:
"""Host-side smoke test controller.""" """Host-side smoke test controller."""
def __init__(self, connection: FT601Connection, adc_dump_path: str = None): def __init__(self, connection: FT2232HConnection, adc_dump_path: str = None):
self.conn = connection self.conn = connection
self.adc_dump_path = adc_dump_path self.adc_dump_path = adc_dump_path
self._adc_samples = [] self._adc_samples = []
@@ -85,7 +85,7 @@ class SmokeTest:
# Step 1: Connect # Step 1: Connect
if not self.conn.is_open: if not self.conn.is_open:
if not self.conn.open(): if not self.conn.open():
log.error("Failed to open FT601 connection") log.error("Failed to open FT2232H connection")
return False return False
# Step 2: Send self-test trigger (opcode 0x30) # Step 2: Send self-test trigger (opcode 0x30)
@@ -205,15 +205,15 @@ class SmokeTest:
def main(): def main():
parser = argparse.ArgumentParser(description="AERIS-10 Board Smoke Test") parser = argparse.ArgumentParser(description="AERIS-10 Board Smoke Test")
parser.add_argument("--live", action="store_true", parser.add_argument("--live", action="store_true",
help="Use real FT601 hardware (default: mock)") help="Use real FT2232H hardware (default: mock)")
parser.add_argument("--device", type=int, default=0, parser.add_argument("--device", type=int, default=0,
help="FT601 device index") help="FT2232H device index")
parser.add_argument("--adc-dump", type=str, default=None, parser.add_argument("--adc-dump", type=str, default=None,
help="Save raw ADC samples to .npy file") help="Save raw ADC samples to .npy file")
args = parser.parse_args() args = parser.parse_args()
mock_mode = not args.live mock_mode = not args.live
conn = FT601Connection(mock=mock_mode) conn = FT2232HConnection(mock=mock_mode)
tester = SmokeTest(conn, adc_dump_path=args.adc_dump) tester = SmokeTest(conn, adc_dump_path=args.adc_dump)
success = tester.run() success = tester.run()
+30 -44
View File
@@ -16,10 +16,11 @@ import unittest
import numpy as np import numpy as np
from radar_protocol import ( from radar_protocol import (
RadarProtocol, FT601Connection, DataRecorder, RadarAcquisition, RadarProtocol, FT2232HConnection, DataRecorder, RadarAcquisition,
RadarFrame, StatusResponse, Opcode, RadarFrame, StatusResponse, Opcode,
HEADER_BYTE, FOOTER_BYTE, STATUS_HEADER_BYTE, HEADER_BYTE, FOOTER_BYTE, STATUS_HEADER_BYTE,
NUM_RANGE_BINS, NUM_DOPPLER_BINS, NUM_CELLS, NUM_RANGE_BINS, NUM_DOPPLER_BINS, NUM_CELLS,
DATA_PACKET_SIZE,
_HARDWARE_ONLY_OPCODES, _REPLAY_ADJUSTABLE_OPCODES, _HARDWARE_ONLY_OPCODES, _REPLAY_ADJUSTABLE_OPCODES,
) )
@@ -72,23 +73,13 @@ class TestRadarProtocol(unittest.TestCase):
# ---------------------------------------------------------------- # ----------------------------------------------------------------
def _make_data_packet(self, range_i=100, range_q=200, def _make_data_packet(self, range_i=100, range_q=200,
dop_i=300, dop_q=400, detection=0): dop_i=300, dop_q=400, detection=0):
"""Build a synthetic 35-byte data packet matching FPGA format.""" """Build a synthetic 11-byte data packet matching FT2232H format."""
pkt = bytearray() pkt = bytearray()
pkt.append(HEADER_BYTE) pkt.append(HEADER_BYTE)
pkt += struct.pack(">h", range_q & 0xFFFF if range_q >= 0 else range_q)
# Range: word 0 = {range_q[15:0], range_i[15:0]} pkt += struct.pack(">h", range_i & 0xFFFF if range_i >= 0 else range_i)
rword = (((range_q & 0xFFFF) << 16) | (range_i & 0xFFFF)) & 0xFFFFFFFF pkt += struct.pack(">h", dop_i & 0xFFFF if dop_i >= 0 else dop_i)
pkt += struct.pack(">I", rword) pkt += struct.pack(">h", dop_q & 0xFFFF if dop_q >= 0 else dop_q)
# Words 1-3: shifted copies (don't matter for parsing)
for shift in [8, 16, 24]:
pkt += struct.pack(">I", ((rword << shift) & 0xFFFFFFFF))
# Doppler: word 0 = {dop_i[15:0], dop_q[15:0]}
dword = (((dop_i & 0xFFFF) << 16) | (dop_q & 0xFFFF)) & 0xFFFFFFFF
pkt += struct.pack(">I", dword)
for shift in [8, 16, 24]:
pkt += struct.pack(">I", ((dword << shift) & 0xFFFFFFFF))
pkt.append(detection & 0x01) pkt.append(detection & 0x01)
pkt.append(FOOTER_BYTE) pkt.append(FOOTER_BYTE)
return bytes(pkt) return bytes(pkt)
@@ -265,23 +256,23 @@ class TestRadarProtocol(unittest.TestCase):
def test_find_boundaries_truncated(self): def test_find_boundaries_truncated(self):
"""Truncated packet should not be returned.""" """Truncated packet should not be returned."""
data_pkt = self._make_data_packet() data_pkt = self._make_data_packet()
buf = data_pkt[:20] # truncated buf = data_pkt[:6] # truncated (less than 11-byte packet size)
boundaries = RadarProtocol.find_packet_boundaries(buf) boundaries = RadarProtocol.find_packet_boundaries(buf)
self.assertEqual(len(boundaries), 0) self.assertEqual(len(boundaries), 0)
class TestFT601Connection(unittest.TestCase): class TestFT2232HConnection(unittest.TestCase):
"""Test mock FT601 connection.""" """Test mock FT2232H connection."""
def test_mock_open_close(self): def test_mock_open_close(self):
conn = FT601Connection(mock=True) conn = FT2232HConnection(mock=True)
self.assertTrue(conn.open()) self.assertTrue(conn.open())
self.assertTrue(conn.is_open) self.assertTrue(conn.is_open)
conn.close() conn.close()
self.assertFalse(conn.is_open) self.assertFalse(conn.is_open)
def test_mock_read_returns_data(self): def test_mock_read_returns_data(self):
conn = FT601Connection(mock=True) conn = FT2232HConnection(mock=True)
conn.open() conn.open()
data = conn.read(4096) data = conn.read(4096)
self.assertIsNotNone(data) self.assertIsNotNone(data)
@@ -290,7 +281,7 @@ class TestFT601Connection(unittest.TestCase):
def test_mock_read_contains_valid_packets(self): def test_mock_read_contains_valid_packets(self):
"""Mock data should contain parseable data packets.""" """Mock data should contain parseable data packets."""
conn = FT601Connection(mock=True) conn = FT2232HConnection(mock=True)
conn.open() conn.open()
raw = conn.read(4096) raw = conn.read(4096)
packets = RadarProtocol.find_packet_boundaries(raw) packets = RadarProtocol.find_packet_boundaries(raw)
@@ -302,18 +293,18 @@ class TestFT601Connection(unittest.TestCase):
conn.close() conn.close()
def test_mock_write(self): def test_mock_write(self):
conn = FT601Connection(mock=True) conn = FT2232HConnection(mock=True)
conn.open() conn.open()
cmd = RadarProtocol.build_command(0x01, 1) cmd = RadarProtocol.build_command(0x01, 1)
self.assertTrue(conn.write(cmd)) self.assertTrue(conn.write(cmd))
conn.close() conn.close()
def test_read_when_closed(self): def test_read_when_closed(self):
conn = FT601Connection(mock=True) conn = FT2232HConnection(mock=True)
self.assertIsNone(conn.read()) self.assertIsNone(conn.read())
def test_write_when_closed(self): def test_write_when_closed(self):
conn = FT601Connection(mock=True) conn = FT2232HConnection(mock=True)
self.assertFalse(conn.write(b"\x00\x00\x00\x00")) self.assertFalse(conn.write(b"\x00\x00\x00\x00"))
@@ -365,7 +356,7 @@ class TestRadarAcquisition(unittest.TestCase):
"""Test acquisition thread with mock connection.""" """Test acquisition thread with mock connection."""
def test_acquisition_produces_frames(self): def test_acquisition_produces_frames(self):
conn = FT601Connection(mock=True) conn = FT2232HConnection(mock=True)
conn.open() conn.open()
fq = queue.Queue(maxsize=16) fq = queue.Queue(maxsize=16)
acq = RadarAcquisition(conn, fq) acq = RadarAcquisition(conn, fq)
@@ -392,7 +383,7 @@ class TestRadarAcquisition(unittest.TestCase):
# If no frame arrived in timeout, that's still OK for a fast CI run # If no frame arrived in timeout, that's still OK for a fast CI run
def test_acquisition_stop(self): def test_acquisition_stop(self):
conn = FT601Connection(mock=True) conn = FT2232HConnection(mock=True)
conn.open() conn.open()
fq = queue.Queue(maxsize=4) fq = queue.Queue(maxsize=4)
acq = RadarAcquisition(conn, fq) acq = RadarAcquisition(conn, fq)
@@ -438,25 +429,20 @@ class TestEndToEnd(unittest.TestCase):
self.assertEqual(word & 0xFFFF, 42) self.assertEqual(word & 0xFFFF, 42)
def test_data_packet_roundtrip(self): def test_data_packet_roundtrip(self):
"""Build a data packet, parse it, verify values match.""" """Build an 11-byte data packet, parse it, verify values match."""
# Build packet manually ri, rq, di, dq = 1234, -5678, 9012, -3456
pkt = bytearray() pkt = bytearray()
pkt.append(HEADER_BYTE) pkt.append(HEADER_BYTE)
pkt += struct.pack(">h", rq)
ri, rq, di, dq = 1234, -5678, 9012, -3456 pkt += struct.pack(">h", ri)
rword = (((rq & 0xFFFF) << 16) | (ri & 0xFFFF)) & 0xFFFFFFFF pkt += struct.pack(">h", di)
pkt += struct.pack(">I", rword) pkt += struct.pack(">h", dq)
for s in [8, 16, 24]:
pkt += struct.pack(">I", (rword << s) & 0xFFFFFFFF)
dword = (((di & 0xFFFF) << 16) | (dq & 0xFFFF)) & 0xFFFFFFFF
pkt += struct.pack(">I", dword)
for s in [8, 16, 24]:
pkt += struct.pack(">I", (dword << s) & 0xFFFFFFFF)
pkt.append(1) pkt.append(1)
pkt.append(FOOTER_BYTE) pkt.append(FOOTER_BYTE)
self.assertEqual(len(pkt), DATA_PACKET_SIZE)
result = RadarProtocol.parse_data_packet(bytes(pkt)) result = RadarProtocol.parse_data_packet(bytes(pkt))
self.assertIsNotNone(result) self.assertIsNotNone(result)
self.assertEqual(result["range_i"], ri) self.assertEqual(result["range_i"], ri)
@@ -497,8 +483,8 @@ class TestReplayConnection(unittest.TestCase):
from radar_protocol import ReplayConnection from radar_protocol import ReplayConnection
conn = ReplayConnection(self.NPY_DIR, use_mti=True) conn = ReplayConnection(self.NPY_DIR, use_mti=True)
conn.open() conn.open()
# Each packet is 35 bytes, total = 2048 * 35 # Each packet is 11 bytes, total = 2048 * 11
expected_bytes = NUM_CELLS * 35 expected_bytes = NUM_CELLS * DATA_PACKET_SIZE
self.assertEqual(conn._frame_len, expected_bytes) self.assertEqual(conn._frame_len, expected_bytes)
conn.close() conn.close()
@@ -548,7 +534,7 @@ class TestReplayConnection(unittest.TestCase):
from radar_protocol import ReplayConnection from radar_protocol import ReplayConnection
conn = ReplayConnection(self.NPY_DIR, use_mti=False) conn = ReplayConnection(self.NPY_DIR, use_mti=False)
conn.open() conn.open()
self.assertEqual(conn._frame_len, NUM_CELLS * 35) self.assertEqual(conn._frame_len, NUM_CELLS * DATA_PACKET_SIZE)
# No-MTI with DC notch=2 and default CFAR → 0 detections # No-MTI with DC notch=2 and default CFAR → 0 detections
raw = conn._packets raw = conn._packets
boundaries = RadarProtocol.find_packet_boundaries(raw) boundaries = RadarProtocol.find_packet_boundaries(raw)