Add files via upload
This commit is contained in:
@@ -0,0 +1,46 @@
|
||||
import numpy as np
|
||||
|
||||
# Constants
|
||||
MATRIX_SIZE = 83 # 83x83 matrix
|
||||
BUFFER_SIZE = MATRIX_SIZE * MATRIX_SIZE # 6889 elements
|
||||
AZIMUTH_RANGE = (-41.8, 41.8) # Azimuth range
|
||||
ELEVATION_RANGE = (41.8, -41.8) # Elevation range
|
||||
|
||||
def generate_angle_grid():
|
||||
"""Generate azimuth and elevation matrices."""
|
||||
azimuth_values = np.linspace(AZIMUTH_RANGE[0], AZIMUTH_RANGE[1], MATRIX_SIZE)
|
||||
elevation_values = np.linspace(ELEVATION_RANGE[0], ELEVATION_RANGE[1], MATRIX_SIZE)
|
||||
|
||||
azimuth_matrix, elevation_matrix = np.meshgrid(azimuth_values, elevation_values)
|
||||
return azimuth_matrix, elevation_matrix
|
||||
|
||||
def process_radar_data(buffer):
|
||||
"""Convert buffer into an 83x83 matrix, find max value position, and get azimuth/elevation."""
|
||||
if len(buffer) != BUFFER_SIZE:
|
||||
raise ValueError(f"Invalid buffer size! Expected {BUFFER_SIZE}, got {len(buffer)}")
|
||||
|
||||
# Reshape buffer into [83][83] matrix
|
||||
matrix = np.array(buffer).reshape((MATRIX_SIZE, MATRIX_SIZE))
|
||||
|
||||
# Find position of the max value
|
||||
max_index = np.unravel_index(np.argmax(matrix), matrix.shape)
|
||||
|
||||
# Generate azimuth and elevation mapping
|
||||
azimuth_matrix, elevation_matrix = generate_angle_grid()
|
||||
|
||||
# Get azimuth and elevation for max value position
|
||||
max_azimuth = azimuth_matrix[max_index]
|
||||
max_elevation = elevation_matrix[max_index]
|
||||
|
||||
return matrix, max_index, max_azimuth, max_elevation
|
||||
|
||||
# Example: Simulated buffer with random values (Replace with actual RADAR data)
|
||||
np.random.seed(42) # For reproducibility
|
||||
buffer = np.random.rand(BUFFER_SIZE) # Simulated intensity values
|
||||
|
||||
# Process the buffer
|
||||
matrix, max_pos, max_azimuth, max_elevation = process_radar_data(buffer)
|
||||
|
||||
# Output results
|
||||
print(f"Max value found at matrix position: {max_pos}")
|
||||
print(f"Estimated Target Angles -> Azimuth: {max_azimuth:.2f}°, Elevation: {max_elevation:.2f}°")
|
||||
@@ -0,0 +1,7 @@
|
||||
import usb.core
|
||||
import usb.backend.libusb1
|
||||
|
||||
backend = usb.backend.libusb1.get_backend(find_library=lambda x: "C:/Windows/System32/libusb-1.0.dll")
|
||||
devices = usb.core.find(find_all=True, backend=backend)
|
||||
|
||||
print("USB devices found:", list(devices))
|
||||
@@ -0,0 +1,66 @@
|
||||
from pyftdi.ftdi import Ftdi
|
||||
from pyftdi.usbtools import UsbTools
|
||||
import time
|
||||
|
||||
# FT2232HQ Configuration
|
||||
VID = 0x0403 # FTDI Vendor ID
|
||||
PID = 0x6010 # FT2232HQ Product ID
|
||||
INTERFACE = 1 # Interface A (1) or B (2)
|
||||
|
||||
# Buffer to store received data
|
||||
buffer = bytearray()
|
||||
|
||||
def initialize_ft2232hq(ftdi):
|
||||
"""
|
||||
Initialize the FT2232HQ for synchronous FIFO mode.
|
||||
"""
|
||||
# Reset the FT2232HQ
|
||||
ftdi.set_bitmode(0x00, Ftdi.BitMode.RESET)
|
||||
|
||||
# Configure the FT2232HQ for synchronous FIFO mode
|
||||
ftdi.set_bitmode(0x00, Ftdi.BitMode.SYNCFF)
|
||||
|
||||
# Set the clock frequency (60 MHz for 480 Mbps)
|
||||
#ftdi.set_frequency(60000000)
|
||||
|
||||
# Configure GPIO pins (if needed)
|
||||
# Example: Set all pins as outputs
|
||||
ftdi.set_bitmode(0xFF, Ftdi.BitMode.BITBANG)
|
||||
|
||||
# Enable synchronous FIFO mode
|
||||
ftdi.write_data(bytes([0x00])) # Dummy write to activate FIFO mode
|
||||
|
||||
print("FT2232HQ initialized in synchronous FIFO mode.")
|
||||
|
||||
def receive_data():
|
||||
try:
|
||||
# Initialize the FTDI device
|
||||
ftdi = Ftdi()
|
||||
ftdi.open(vendor=VID, product=PID, interface=INTERFACE)
|
||||
|
||||
# Initialize the FT2232HQ for synchronous FIFO mode
|
||||
initialize_ft2232hq(ftdi)
|
||||
|
||||
# Receive data
|
||||
while True:
|
||||
# Read data from the FIFO
|
||||
data = ftdi.read_data(4096) # Read up to 4096 bytes at a time
|
||||
if data:
|
||||
buffer.extend(data) # Append data to the buffer
|
||||
print(f"Received {len(data)} bytes. Total buffer size: {len(buffer)} bytes")
|
||||
else:
|
||||
print("No data received.")
|
||||
break
|
||||
|
||||
# Add a small delay to avoid overwhelming the CPU
|
||||
time.sleep(0.001)
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error: {e}")
|
||||
finally:
|
||||
# Close the FTDI device
|
||||
ftdi.close()
|
||||
print("FT2232HQ closed.")
|
||||
|
||||
if __name__ == "__main__":
|
||||
receive_data()
|
||||
@@ -0,0 +1,25 @@
|
||||
import numpy as np
|
||||
|
||||
# Parameters
|
||||
f0 = 1e6 # Start frequency (Hz)
|
||||
f1 = 15e6 # End frequency (Hz)
|
||||
fs = 120e6 # Sampling frequency (Hz)
|
||||
T = 1e-6 # Time duration (s)
|
||||
N = int(fs * T) # Number of samples
|
||||
|
||||
# Frequency slope
|
||||
k = (f1 - f0) / T
|
||||
|
||||
# Generate time array
|
||||
t = np.arange(N) / fs
|
||||
|
||||
# Calculate phase
|
||||
phase = 2 * np.pi * (f0 * t + 0.5 * k * t**2)
|
||||
|
||||
# Generate sine wave and convert to 8-bit values
|
||||
waveform_LUT = np.uint8(128 + 127 * np.sin(phase))
|
||||
|
||||
# Print the LUT in Verilog format
|
||||
print("waveform_LUT[0] = 8'h{:02X};".format(waveform_LUT[0]))
|
||||
for i in range(1, N):
|
||||
print("waveform_LUT[{}] = 8'h{:02X};".format(i, waveform_LUT[i]))
|
||||
@@ -0,0 +1,177 @@
|
||||
# %%
|
||||
# Copyright (C) 2024 Analog Devices, Inc.
|
||||
#
|
||||
# All rights reserved.
|
||||
#
|
||||
# Redistribution and use in source and binary forms, with or without modification,
|
||||
# are permitted provided that the following conditions are met:
|
||||
# - Redistributions of source code must retain the above copyright
|
||||
# notice, this list of conditions and the following disclaimer.
|
||||
# - Redistributions in binary form must reproduce the above copyright
|
||||
# notice, this list of conditions and the following disclaimer in
|
||||
# the documentation and/or other materials provided with the
|
||||
# distribution.
|
||||
# - Neither the name of Analog Devices, Inc. nor the names of its
|
||||
# contributors may be used to endorse or promote products derived
|
||||
# from this software without specific prior written permission.
|
||||
# - The use of this software may or may not infringe the patent rights
|
||||
# of one or more patent holders. This license does not release you
|
||||
# from the requirement that you obtain separate licenses from these
|
||||
# patent holders to use this software.
|
||||
# - Use of the software either in source or binary form, must be run
|
||||
# on or directly connected to an Analog Devices Inc. component.
|
||||
#
|
||||
# THIS SOFTWARE IS PROVIDED BY ANALOG DEVICES "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
|
||||
# INCLUDING, BUT NOT LIMITED TO, NON-INFRINGEMENT, MERCHANTABILITY AND FITNESS FOR A
|
||||
# PARTICULAR PURPOSE ARE DISCLAIMED.
|
||||
#
|
||||
# IN NO EVENT SHALL ANALOG DEVICES BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
|
||||
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, INTELLECTUAL PROPERTY
|
||||
# RIGHTS, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
|
||||
# BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
|
||||
# STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
|
||||
# THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||
|
||||
|
||||
# Imports
|
||||
import time
|
||||
import matplotlib
|
||||
import matplotlib.pyplot as plt
|
||||
import numpy as np
|
||||
plt.close('all')
|
||||
|
||||
|
||||
all_data = np.load(f)
|
||||
|
||||
MTI_filter = '2pulse' # choices are none, 2pulse, or 3pulse
|
||||
max_range = 300
|
||||
min_scale = 4
|
||||
max_scale = 300
|
||||
|
||||
|
||||
# %%
|
||||
""" Calculate and print summary of ramp parameters
|
||||
"""
|
||||
sample_rate = 60e6
|
||||
signal_freq = 30e6
|
||||
output_freq = 10.5e9
|
||||
num_chirps = 256
|
||||
chirp_BW = 25e6
|
||||
ramp_time_s = 0.25e-6
|
||||
frame_length_ms = 0.5e-6 # each chirp is spaced this far apart
|
||||
num_samples = len(all_data[0][0])
|
||||
|
||||
PRI = frame_length_ms / 1e3
|
||||
PRF = 1 / PRI
|
||||
|
||||
# Split into frames
|
||||
N_frame = int(PRI * float(sample_rate))
|
||||
|
||||
# Obtain range-FFT x-axis
|
||||
c = 3e8
|
||||
wavelength = c / output_freq
|
||||
slope = chirp_BW / ramp_time_s
|
||||
freq = np.linspace(-sample_rate / 2, sample_rate / 2, N_frame)
|
||||
dist = (freq - signal_freq) * c / (2 * slope)
|
||||
|
||||
# Resolutions
|
||||
R_res = c / (2 * chirp_BW)
|
||||
v_res = wavelength / (2 * num_chirps * PRI)
|
||||
|
||||
# Doppler spectrum limits
|
||||
max_doppler_freq = PRF / 2
|
||||
max_doppler_vel = max_doppler_freq * wavelength / 2
|
||||
|
||||
print("sample_rate = ", sample_rate/1e6, "MHz, ramp_time = ", int(ramp_time_s*(1e6)), "us, num_chirps = ", num_chirps, ", PRI = ", frame_length_ms, " ms")
|
||||
|
||||
|
||||
# %%
|
||||
# Function to process data
|
||||
def pulse_canceller(radar_data):
|
||||
global num_chirps, num_samples
|
||||
rx_chirps = []
|
||||
rx_chirps = radar_data
|
||||
# create 2 pulse canceller MTI array
|
||||
Chirp2P = np.empty([num_chirps, num_samples])*1j
|
||||
for chirp in range(num_chirps-1):
|
||||
chirpI = rx_chirps[chirp,:]
|
||||
chirpI1 = rx_chirps[chirp+1,:]
|
||||
chirp_correlation = np.correlate(chirpI, chirpI1, 'valid')
|
||||
angle_diff = np.angle(chirp_correlation, deg=False) # returns radians
|
||||
Chirp2P[chirp,:] = (chirpI1 - chirpI * np.exp(-1j*angle_diff[0]))
|
||||
# create 3 pulse canceller MTI array
|
||||
Chirp3P = np.empty([num_chirps, num_samples])*1j
|
||||
for chirp in range(num_chirps-2):
|
||||
chirpI = Chirp2P[chirp,:]
|
||||
chirpI1 = Chirp2P[chirp+1,:]
|
||||
Chirp3P[chirp,:] = chirpI1 - chirpI
|
||||
return Chirp2P, Chirp3P
|
||||
|
||||
def freq_process(data):
|
||||
rx_chirps_fft = np.fft.fftshift(abs(np.fft.fft2(data)))
|
||||
range_doppler_data = np.log10(rx_chirps_fft).T
|
||||
# or this is the longer way to do the fft2 function:
|
||||
# rx_chirps_fft = np.fft.fft(data)
|
||||
# rx_chirps_fft = np.fft.fft(rx_chirps_fft.T).T
|
||||
# rx_chirps_fft = np.fft.fftshift(abs(rx_chirps_fft))
|
||||
range_doppler_data = np.log10(rx_chirps_fft).T
|
||||
num_good = len(range_doppler_data[:,0])
|
||||
center_delete = 0 # delete ground clutter velocity bins around 0 m/s
|
||||
if center_delete != 0:
|
||||
for g in range(center_delete):
|
||||
end_bin = int(num_chirps/2+center_delete/2)
|
||||
range_doppler_data[:,(end_bin-center_delete+g)] = np.zeros(num_good)
|
||||
range_delete = 0 # delete the zero range bins (these are Tx to Rx leakage)
|
||||
if range_delete != 0:
|
||||
for r in range(range_delete):
|
||||
start_bin = int(len(range_doppler_data)/2)
|
||||
range_doppler_data[start_bin+r, :] = np.zeros(num_chirps)
|
||||
range_doppler_data = np.clip(range_doppler_data, min_scale, max_scale) # clip the data to control the max spectrogram scale
|
||||
return range_doppler_data
|
||||
|
||||
# %%
|
||||
# Plot range doppler data, loop through at the end of the data set
|
||||
cmn = ''
|
||||
i = 0
|
||||
raw_data = freq_process(all_data[i])
|
||||
i=int((i+1) % len(all_data))
|
||||
range_doppler_fig, ax = plt.subplots(1, figsize=(7,7))
|
||||
extent = [-max_doppler_vel, max_doppler_vel, dist.min(), dist.max()]
|
||||
cmaps = ['inferno', 'plasma']
|
||||
cmn = cmaps[0]
|
||||
ax.set_xlim([-12, 12])
|
||||
ax.set_ylim([0, max_range])
|
||||
ax.set_yticks(np.arange(0, max_range, 10))
|
||||
ax.set_ylabel('Range [m]')
|
||||
ax.set_title('Range Doppler Spectrum')
|
||||
ax.set_xlabel('Velocity [m/s]')
|
||||
range_doppler = ax.imshow(raw_data, aspect='auto', extent=extent, origin='lower', cmap=matplotlib.colormaps.get_cmap(cmn))
|
||||
|
||||
print("CTRL + c to stop the loop")
|
||||
step_thru_plots = False
|
||||
if step_thru_plots == True:
|
||||
print("Press Enter key to adance to next frame")
|
||||
print("Press 0 then Enter to go back one frame")
|
||||
try:
|
||||
while True:
|
||||
if MTI_filter != 'none':
|
||||
Chirp2P, Chirp3P = pulse_canceller(all_data[i])
|
||||
if MTI_filter == '3pulse':
|
||||
freq_process_data = freq_process(Chirp3P)
|
||||
else:
|
||||
freq_process_data = freq_process(Chirp2P)
|
||||
else:
|
||||
freq_process_data = freq_process(all_data[i])
|
||||
range_doppler.set_data(freq_process_data)
|
||||
plt.show(block=False)
|
||||
plt.pause(0.1)
|
||||
if step_thru_plots == True:
|
||||
val = input()
|
||||
if val == '0':
|
||||
i=int((i-1) % len(all_data))
|
||||
else:
|
||||
i=int((i+1) % len(all_data))
|
||||
else:
|
||||
i=int((i+1) % len(all_data))
|
||||
except KeyboardInterrupt: # press ctrl-c to stop the loop
|
||||
pass
|
||||
@@ -0,0 +1,306 @@
|
||||
# %% Imports
|
||||
import time
|
||||
import logging
|
||||
import matplotlib
|
||||
import matplotlib.pyplot as plt
|
||||
import numpy as np
|
||||
from pyftdi.ftdi import Ftdi
|
||||
from pyftdi.usbtools import UsbTools
|
||||
|
||||
# Configure logging
|
||||
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
|
||||
|
||||
# %% FT2232HQ Configuration
|
||||
VID = 0x0403 # FTDI Vendor ID
|
||||
PID = 0x6010 # FT2232HQ Product ID
|
||||
INTERFACE = 0 # Interface A (0) or B (1)
|
||||
|
||||
# Buffer to store received data
|
||||
buffer = bytearray()
|
||||
buf_size = 6889
|
||||
|
||||
all_data =[]
|
||||
|
||||
# Constants for angle estimation
|
||||
MATRIX_SIZE = 83 # 83x83 matrix
|
||||
BUFFER_SIZE = MATRIX_SIZE * MATRIX_SIZE # 6889 elements
|
||||
AZIMUTH_RANGE = (-41.8, 41.8) # Azimuth range
|
||||
ELEVATION_RANGE = (41.8, -41.8) # Elevation range
|
||||
REFRESH_RATE = 0.5 # Refresh rate in seconds (adjust as needed)
|
||||
CFAR_GUARD_CELLS = 2 # Number of guard cells around CUT
|
||||
CFAR_TRAINING_CELLS = 5 # Number of training cells around CUT
|
||||
CFAR_THRESHOLD_FACTOR = 3 # CFAR scaling factor
|
||||
|
||||
# Radar parameters
|
||||
sample_rate = 60e6
|
||||
signal_freq = 30e6
|
||||
output_freq = 10.5e9
|
||||
num_chirps = 256
|
||||
chirp_BW = 25e6
|
||||
ramp_time_s = 0.25e-6
|
||||
frame_length_ms = 0.5e-3 # each chirp is spaced this far apart
|
||||
c = 3e8 # Speed of light
|
||||
wavelength = c / output_freq
|
||||
slope = chirp_BW / ramp_time_s
|
||||
PRI = frame_length_ms / 1e3
|
||||
PRF = 1 / PRI
|
||||
N_frame = int(PRI * float(sample_rate))
|
||||
freq = np.linspace(-sample_rate / 2, sample_rate / 2, N_frame)
|
||||
dist = (freq - signal_freq) * c / (2 * slope)
|
||||
R_res = c / (2 * chirp_BW)
|
||||
v_res = wavelength / (2 * num_chirps * PRI)
|
||||
max_doppler_freq = PRF / 2
|
||||
max_doppler_vel = max_doppler_freq * wavelength / 2
|
||||
|
||||
# Plotting parameters
|
||||
MTI_filter = '2pulse' # choices are none, 2pulse, or 3pulse
|
||||
max_range = 300
|
||||
min_scale = 4
|
||||
max_scale = 300
|
||||
|
||||
|
||||
# %% Initialize FT2232HQ
|
||||
def initialize_ft2232hq(ftdi):
|
||||
"""
|
||||
Initialize the FT2232HQ for synchronous FIFO mode.
|
||||
"""
|
||||
try:
|
||||
# Open the FTDI device in FT245 FIFO mode (Interface A)
|
||||
ftdi.open(vendor=VID, product=PID, interface=INTERFACE)
|
||||
|
||||
# Reset the FT2232HQ
|
||||
ftdi.set_bitmode(0x00, Ftdi.BitMode.RESET)
|
||||
time.sleep(0.1)
|
||||
|
||||
# Configure the FT2232HQ for synchronous FIFO mode
|
||||
ftdi.set_bitmode(0x00, Ftdi.BitMode.SYNCFF)
|
||||
time.sleep(0.1)
|
||||
# Set USB transfer latency timer (reduce to improve real-time data)
|
||||
ftdi.write_data_set_chunksize(BUFFER_SIZE)
|
||||
ftdi.read_data_set_chunksize(BUFFER_SIZE)
|
||||
ftdi.set_latency_timer(2) # 2ms latency timer
|
||||
ftdi.purge_buffers()
|
||||
|
||||
|
||||
|
||||
# Enable synchronous FIFO mode
|
||||
ftdi.write_data(bytes([0x00])) # Dummy write to activate FIFO mode
|
||||
|
||||
logging.info("FT2232HQ initialized in synchronous FIFO mode.")
|
||||
return ftdi
|
||||
except Exception as e:
|
||||
logging.error(f"Failed to initialize FT2232HQ: {e}")
|
||||
raise
|
||||
|
||||
# %% Read data from FTDI
|
||||
def read_data_from_ftdi(ftdi, num_bytes=BUFFER_SIZE):
|
||||
"""
|
||||
Read data from FT2232H FIFO and store it in a buffer.
|
||||
"""
|
||||
buffer = bytearray()
|
||||
try:
|
||||
while len(buffer) < num_bytes:
|
||||
data = ftdi.read_data(num_bytes - len(buffer))
|
||||
if data:
|
||||
buffer.extend(data)
|
||||
time.sleep(0.001) # Small delay to avoid high CPU usage
|
||||
except Exception as e:
|
||||
logging.error(f"Read error: {e}")
|
||||
raise
|
||||
|
||||
logging.info(f"Read {len(buffer)} bytes from FT2232H")
|
||||
return buffer
|
||||
|
||||
# %% Generate angle grid
|
||||
def generate_angle_grid():
|
||||
"""
|
||||
Generate azimuth and elevation matrices.
|
||||
"""
|
||||
azimuth_values = np.linspace(AZIMUTH_RANGE[0], AZIMUTH_RANGE[1], MATRIX_SIZE)
|
||||
elevation_values = np.linspace(ELEVATION_RANGE[0], ELEVATION_RANGE[1], MATRIX_SIZE)
|
||||
azimuth_matrix, elevation_matrix = np.meshgrid(azimuth_values, elevation_values)
|
||||
return azimuth_matrix, elevation_matrix
|
||||
|
||||
# %% CFAR Detection
|
||||
def ca_cfar(matrix):
|
||||
"""
|
||||
Perform CFAR detection on the matrix using vectorized operations.
|
||||
"""
|
||||
detected_targets = np.zeros_like(matrix)
|
||||
for i in range(CFAR_TRAINING_CELLS + CFAR_GUARD_CELLS, MATRIX_SIZE - CFAR_TRAINING_CELLS - CFAR_GUARD_CELLS):
|
||||
for j in range(CFAR_TRAINING_CELLS + CFAR_GUARD_CELLS, MATRIX_SIZE - CFAR_TRAINING_CELLS - CFAR_GUARD_CELLS):
|
||||
# Define CFAR window
|
||||
window = matrix[i - CFAR_TRAINING_CELLS - CFAR_GUARD_CELLS : i + CFAR_TRAINING_CELLS + CFAR_GUARD_CELLS + 1,
|
||||
j - CFAR_TRAINING_CELLS - CFAR_GUARD_CELLS : j + CFAR_TRAINING_CELLS + CFAR_GUARD_CELLS + 1]
|
||||
guard_area = matrix[i - CFAR_GUARD_CELLS : i + CFAR_GUARD_CELLS + 1,
|
||||
j - CFAR_GUARD_CELLS : j + CFAR_GUARD_CELLS + 1]
|
||||
training_cells = np.setdiff1d(window, guard_area)
|
||||
noise_level = np.mean(training_cells)
|
||||
threshold = CFAR_THRESHOLD_FACTOR * noise_level
|
||||
if matrix[i, j] > threshold:
|
||||
detected_targets[i, j] = matrix[i, j]
|
||||
return detected_targets
|
||||
|
||||
# %% Process radar data
|
||||
def process_radar_data(buffer):
|
||||
"""
|
||||
Convert buffer into an 83x83 matrix, apply CFAR, and find target angles.
|
||||
"""
|
||||
if len(buffer) != BUFFER_SIZE:
|
||||
raise ValueError(f"Invalid buffer size! Expected {BUFFER_SIZE}, got {len(buffer)}")
|
||||
|
||||
matrix = np.array(buffer).reshape((MATRIX_SIZE, MATRIX_SIZE))
|
||||
detected_targets = ca_cfar(matrix)
|
||||
max_index = np.unravel_index(np.argmax(detected_targets), matrix.shape)
|
||||
azimuth_matrix, elevation_matrix = generate_angle_grid()
|
||||
max_azimuth = azimuth_matrix[max_index]
|
||||
max_elevation = elevation_matrix[max_index]
|
||||
return detected_targets, max_index, max_azimuth, max_elevation
|
||||
|
||||
# %% Plot target position
|
||||
def plot_target_position(ax, azimuth, elevation):
|
||||
"""
|
||||
Plot detected targets with CFAR and refresh display.
|
||||
"""
|
||||
ax.clear()
|
||||
ax.set_xlim(AZIMUTH_RANGE)
|
||||
ax.set_ylim(ELEVATION_RANGE)
|
||||
ax.set_xlabel("Azimuth (°)")
|
||||
ax.set_ylabel("Elevation (°)")
|
||||
ax.set_title("RADAR Target Detection (CFAR)")
|
||||
ax.scatter(azimuth, elevation, color='red', s=100, label="Detected Target")
|
||||
ax.legend()
|
||||
plt.draw()
|
||||
plt.pause(0.01)
|
||||
|
||||
# %% Pulse Canceller
|
||||
def pulse_canceller(radar_data):
|
||||
"""
|
||||
Apply 2-pulse or 3-pulse MTI filtering to radar data.
|
||||
"""
|
||||
global num_chirps, num_samples
|
||||
rx_chirps = radar_data
|
||||
Chirp2P = np.empty([num_chirps, num_samples], dtype=complex)
|
||||
for chirp in range(num_chirps - 1):
|
||||
chirpI = rx_chirps[chirp, :]
|
||||
chirpI1 = rx_chirps[chirp + 1, :]
|
||||
chirp_correlation = np.correlate(chirpI, chirpI1, 'valid')
|
||||
angle_diff = np.angle(chirp_correlation, deg=False) # returns radians
|
||||
Chirp2P[chirp, :] = (chirpI1 - chirpI * np.exp(-1j * angle_diff[0]))
|
||||
|
||||
Chirp3P = np.empty([num_chirps, num_samples], dtype=complex)
|
||||
for chirp in range(num_chirps - 2):
|
||||
chirpI = Chirp2P[chirp, :]
|
||||
chirpI1 = Chirp2P[chirp + 1, :]
|
||||
Chirp3P[chirp, :] = chirpI1 - chirpI
|
||||
|
||||
return Chirp2P, Chirp3P
|
||||
|
||||
# %% Frequency Processing
|
||||
def freq_process(data):
|
||||
"""
|
||||
Process radar data to generate range-Doppler spectrum.
|
||||
"""
|
||||
rx_chirps_fft = np.fft.fftshift(abs(np.fft.fft2(data)))
|
||||
range_doppler_data = np.log10(rx_chirps_fft).T
|
||||
num_good = len(range_doppler_data[:, 0])
|
||||
|
||||
# Delete ground clutter velocity bins around 0 m/s
|
||||
center_delete = 0
|
||||
if center_delete != 0:
|
||||
for g in range(center_delete):
|
||||
end_bin = int(num_chirps / 2 + center_delete / 2)
|
||||
range_doppler_data[:, (end_bin - center_delete + g)] = np.zeros(num_good)
|
||||
|
||||
# Delete the zero range bins (Tx to Rx leakage)
|
||||
range_delete = 0
|
||||
if range_delete != 0:
|
||||
for r in range(range_delete):
|
||||
start_bin = int(len(range_doppler_data) / 2)
|
||||
range_doppler_data[start_bin + r, :] = np.zeros(num_chirps)
|
||||
|
||||
# Clip the data to control the max spectrogram scale
|
||||
range_doppler_data = np.clip(range_doppler_data, min_scale, max_scale)
|
||||
return range_doppler_data
|
||||
|
||||
# Plot range doppler data, loop through at the end of the data set
|
||||
cmn = ''
|
||||
i = 0
|
||||
# Initialize FTDI device
|
||||
ftdi = Ftdi()
|
||||
initialize_ft2232hq(ftdi)
|
||||
|
||||
buffer = read_data_from_ftdi(ftdi, num_bytes=BUFFER_SIZE)
|
||||
all_data = buffer
|
||||
raw_data = freq_process(all_data[i])
|
||||
i=int((i+1) % len(all_data))
|
||||
range_doppler_fig, ax = plt.subplots(1, figsize=(7,7))
|
||||
extent = [-max_doppler_vel, max_doppler_vel, dist.min(), dist.max()]
|
||||
cmaps = ['inferno', 'plasma']
|
||||
cmn = cmaps[0]
|
||||
ax.set_xlim([-12, 12])
|
||||
ax.set_ylim([0, max_range])
|
||||
ax.set_yticks(np.arange(0, max_range, 10))
|
||||
ax.set_ylabel('Range [m]')
|
||||
ax.set_title('Range Doppler Spectrum')
|
||||
ax.set_xlabel('Velocity [m/s]')
|
||||
range_doppler = ax.imshow(raw_data, aspect='auto', extent=extent, origin='lower', cmap=matplotlib.colormaps.get_cmap(cmn))
|
||||
|
||||
print("CTRL + c to stop the loop")
|
||||
step_thru_plots = False
|
||||
if step_thru_plots == True:
|
||||
print("Press Enter key to adance to next frame")
|
||||
print("Press 0 then Enter to go back one frame")
|
||||
|
||||
|
||||
# %% Main loop
|
||||
def main():
|
||||
try:
|
||||
# Initialize plot
|
||||
plt.ion()
|
||||
fig, ax = plt.subplots(figsize=(6, 6))
|
||||
|
||||
while True:
|
||||
# Read data from FTDI
|
||||
buffer = read_data_from_ftdi(ftdi, num_bytes=BUFFER_SIZE)
|
||||
all_data = buffer
|
||||
detected_targets, max_pos, max_azimuth, max_elevation = process_radar_data(buffer)
|
||||
|
||||
# Print detected target angles
|
||||
logging.info(f"Detected Target at {max_pos} -> Azimuth: {max_azimuth:.2f}°, Elevation: {max_elevation:.2f}°")
|
||||
|
||||
# Update plot
|
||||
plot_target_position(ax, max_azimuth, max_elevation)
|
||||
time.sleep(REFRESH_RATE)
|
||||
|
||||
if MTI_filter != 'none':
|
||||
Chirp2P, Chirp3P = pulse_canceller(all_data[i])
|
||||
if MTI_filter == '3pulse':
|
||||
freq_process_data = freq_process(Chirp3P)
|
||||
else:
|
||||
freq_process_data = freq_process(Chirp2P)
|
||||
else:
|
||||
freq_process_data = freq_process(all_data[i])
|
||||
range_doppler.set_data(freq_process_data)
|
||||
plt.show(block=False)
|
||||
plt.pause(0.1)
|
||||
if step_thru_plots == True:
|
||||
val = input()
|
||||
if val == '0':
|
||||
i=int((i-1) % len(all_data))
|
||||
else:
|
||||
i=int((i+1) % len(all_data))
|
||||
else:
|
||||
i=int((i+1) % len(all_data))
|
||||
|
||||
except KeyboardInterrupt:
|
||||
logging.info("Process interrupted by user.")
|
||||
except Exception as e:
|
||||
logging.error(f"An error occurred: {e}")
|
||||
finally:
|
||||
ftdi.close()
|
||||
logging.info("FTDI device closed.")
|
||||
|
||||
# %% Run the script
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1,342 @@
|
||||
# %%
|
||||
# Copyright (C) 2024 Analog Devices, Inc.
|
||||
#
|
||||
# All rights reserved.
|
||||
#
|
||||
# Redistribution and use in source and binary forms, with or without modification,
|
||||
# are permitted provided that the following conditions are met:
|
||||
# - Redistributions of source code must retain the above copyright
|
||||
# notice, this list of conditions and the following disclaimer.
|
||||
# - Redistributions in binary form must reproduce the above copyright
|
||||
# notice, this list of conditions and the following disclaimer in
|
||||
# the documentation and/or other materials provided with the
|
||||
# distribution.
|
||||
# - Neither the name of Analog Devices, Inc. nor the names of its
|
||||
# contributors may be used to endorse or promote products derived
|
||||
# from this software without specific prior written permission.
|
||||
# - The use of this software may or may not infringe the patent rights
|
||||
# of one or more patent holders. This license does not release you
|
||||
# from the requirement that you obtain separate licenses from these
|
||||
# patent holders to use this software.
|
||||
# - Use of the software either in source or binary form, must be run
|
||||
# on or directly connected to an Analog Devices Inc. component.
|
||||
#
|
||||
# THIS SOFTWARE IS PROVIDED BY ANALOG DEVICES "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
|
||||
# INCLUDING, BUT NOT LIMITED TO, NON-INFRINGEMENT, MERCHANTABILITY AND FITNESS FOR A
|
||||
# PARTICULAR PURPOSE ARE DISCLAIMED.
|
||||
#
|
||||
# IN NO EVENT SHALL ANALOG DEVICES BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
|
||||
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, INTELLECTUAL PROPERTY
|
||||
# RIGHTS, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
|
||||
# BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
|
||||
# STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
|
||||
# THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||
|
||||
|
||||
# Imports
|
||||
import time
|
||||
import matplotlib
|
||||
import matplotlib.pyplot as plt
|
||||
import numpy as np
|
||||
|
||||
from pyftdi.ftdi import Ftdi
|
||||
from pyftdi.usbtools import UsbTools
|
||||
|
||||
|
||||
# FT2232HQ Configuration
|
||||
|
||||
VID = 0x0403 # FTDI Vendor ID
|
||||
PID = 0x6010 # FT2232HQ Product ID
|
||||
INTERFACE = 1 # Interface A (1) or B (2)
|
||||
|
||||
# Buffer to store received data
|
||||
buffer = bytearray()
|
||||
buf_size = 6889
|
||||
|
||||
# Constants for angle estimation
|
||||
MATRIX_SIZE = 83 # 83x83 matrix
|
||||
BUFFER_SIZE = MATRIX_SIZE * MATRIX_SIZE # 6889 elements
|
||||
AZIMUTH_RANGE = (-41.8, 41.8) # Azimuth range
|
||||
ELEVATION_RANGE = (41.8, -41.8) # Elevation range
|
||||
REFRESH_RATE = 0.5 # Refresh rate in seconds (adjust as needed)
|
||||
CFAR_GUARD_CELLS = 2 # Number of guard cells around CUT
|
||||
CFAR_TRAINING_CELLS = 5 # Number of training cells around CUT
|
||||
CFAR_THRESHOLD_FACTOR = 3 # CFAR scaling factor
|
||||
|
||||
plt.close('all')
|
||||
|
||||
MTI_filter = '2pulse' # choices are none, 2pulse, or 3pulse
|
||||
max_range = 300
|
||||
min_scale = 4
|
||||
max_scale = 300
|
||||
|
||||
|
||||
# %%
|
||||
""" Calculate and print summary of ramp parameters
|
||||
"""
|
||||
sample_rate = 60e6
|
||||
signal_freq = 30e6
|
||||
output_freq = 10.5e9
|
||||
num_chirps = 256
|
||||
chirp_BW = 25e6
|
||||
ramp_time_s = 0.25e-6
|
||||
frame_length_ms = 0.5e-6 # each chirp is spaced this far apart
|
||||
num_samples = len(all_data[0][0])
|
||||
|
||||
PRI = frame_length_ms / 1e3
|
||||
PRF = 1 / PRI
|
||||
|
||||
# Split into frames
|
||||
N_frame = int(PRI * float(sample_rate))
|
||||
|
||||
# Obtain range-FFT x-axis
|
||||
c = 3e8
|
||||
wavelength = c / output_freq
|
||||
slope = chirp_BW / ramp_time_s
|
||||
freq = np.linspace(-sample_rate / 2, sample_rate / 2, N_frame)
|
||||
dist = (freq - signal_freq) * c / (2 * slope)
|
||||
|
||||
# Resolutions
|
||||
R_res = c / (2 * chirp_BW)
|
||||
v_res = wavelength / (2 * num_chirps * PRI)
|
||||
|
||||
# Doppler spectrum limits
|
||||
max_doppler_freq = PRF / 2
|
||||
max_doppler_vel = max_doppler_freq * wavelength / 2
|
||||
|
||||
print("sample_rate = ", sample_rate/1e6, "MHz, ramp_time = ", int(ramp_time_s*(1e6)), "us, num_chirps = ", num_chirps, ", PRI = ", frame_length_ms, " ms")
|
||||
|
||||
def initialize_ft2232hq(ftdi):
|
||||
"""
|
||||
Initialize the FT2232HQ for synchronous FIFO mode.
|
||||
"""
|
||||
# Open the FTDI device in FT245 FIFO mode (Interface A)
|
||||
ftdi.open(VID, PID, INTERFACE)
|
||||
# Reset the FT2232HQ
|
||||
ftdi.set_bitmode(0x00, Ftdi.BitMode.RESET)
|
||||
time.sleep(0.1)
|
||||
|
||||
# Configure the FT2232HQ for synchronous FIFO mode
|
||||
ftdi.set_bitmode(0x00, Ftdi.BitMode.SYNCFF)
|
||||
time.sleep(0.1)
|
||||
|
||||
# Set the clock frequency (60 MHz for 480 Mbps)
|
||||
ftdi.set_frequency(60000000)
|
||||
|
||||
# Configure GPIO pins (if needed)
|
||||
# Example: Set all pins as outputs
|
||||
ftdi.set_bitmode(0xFF, Ftdi.BitMode.BITBANG)
|
||||
|
||||
# Enable synchronous FIFO mode
|
||||
ftdi.write_data(bytes([0x00])) # Dummy write to activate FIFO mode
|
||||
|
||||
print("FT2232HQ initialized in synchronous FIFO mode.")
|
||||
|
||||
def read_data_from_ftdi(ftdi, num_bytes=BUFFER_SIZE):
|
||||
"""Read data from FT2232H FIFO and store it in a buffer."""
|
||||
buffer = bytearray()
|
||||
|
||||
try:
|
||||
while len(buffer) < num_bytes:
|
||||
data = ftdi.read_data(num_bytes - len(buffer))
|
||||
if data:
|
||||
buffer.extend(data)
|
||||
time.sleep(0.001) # Small delay to avoid high CPU usage
|
||||
except Exception as e:
|
||||
print(f"[ERROR] Read error: {e}")
|
||||
|
||||
print(f"[INFO] Read {len(buffer)} bytes from FT2232H")
|
||||
return buffer
|
||||
|
||||
def generate_angle_grid():
|
||||
"""Generate azimuth and elevation matrices."""
|
||||
azimuth_values = np.linspace(AZIMUTH_RANGE[0], AZIMUTH_RANGE[1], MATRIX_SIZE)
|
||||
elevation_values = np.linspace(ELEVATION_RANGE[0], ELEVATION_RANGE[1], MATRIX_SIZE)
|
||||
|
||||
azimuth_matrix, elevation_matrix = np.meshgrid(azimuth_values, elevation_values)
|
||||
return azimuth_matrix, elevation_matrix
|
||||
|
||||
def ca_cfar(matrix):
|
||||
"""
|
||||
Perform CFAR detection on the matrix using vectorized operations.
|
||||
"""
|
||||
detected_targets = np.zeros_like(matrix)
|
||||
|
||||
# Define sliding window
|
||||
window_size = 2 * (CFAR_TRAINING_CELLS + CFAR_GUARD_CELLS) + 1
|
||||
for i in range(CFAR_TRAINING_CELLS + CFAR_GUARD_CELLS, MATRIX_SIZE - CFAR_TRAINING_CELLS - CFAR_GUARD_CELLS):
|
||||
for j in range(CFAR_TRAINING_CELLS + CFAR_GUARD_CELLS, MATRIX_SIZE - CFAR_TRAINING_CELLS - CFAR_GUARD_CELLS):
|
||||
# Extract window
|
||||
window = matrix[i - CFAR_GUARD_CELLS - CFAR_TRAINING_CELLS : i + CFAR_GUARD_CELLS + CFAR_TRAINING_CELLS + 1,
|
||||
j - CFAR_GUARD_CELLS - CFAR_TRAINING_CELLS : j + CFAR_GUARD_CELLS + CFAR_TRAINING_CELLS + 1]
|
||||
|
||||
# Exclude guard cells and CUT
|
||||
guard_area = matrix[i - CFAR_GUARD_CELLS : i + CFAR_GUARD_CELLS + 1,
|
||||
j - CFAR_GUARD_CELLS : j + CFAR_GUARD_CELLS + 1]
|
||||
training_cells = np.setdiff1d(window, guard_area)
|
||||
|
||||
# Compute noise threshold
|
||||
noise_level = np.mean(training_cells)
|
||||
threshold = CFAR_THRESHOLD_FACTOR * noise_level
|
||||
|
||||
# Compare CUT against threshold
|
||||
if matrix[i, j] > threshold:
|
||||
detected_targets[i, j] = matrix[i, j]
|
||||
|
||||
return detected_targets
|
||||
|
||||
|
||||
def process_radar_data(buffer):
|
||||
"""Convert buffer into an 83x83 matrix, apply CFAR, and find target angles."""
|
||||
if len(buffer) != BUFFER_SIZE:
|
||||
raise ValueError(f"Invalid buffer size! Expected {BUFFER_SIZE}, got {len(buffer)}")
|
||||
|
||||
# Reshape buffer into [83][83] matrix
|
||||
matrix = np.array(buffer).reshape((MATRIX_SIZE, MATRIX_SIZE))
|
||||
|
||||
# Apply CFAR
|
||||
detected_targets = ca_cfar(matrix)
|
||||
|
||||
# Find position of the max detected target
|
||||
max_index = np.unravel_index(np.argmax(detected_targets), matrix.shape)
|
||||
|
||||
# Generate azimuth and elevation mapping
|
||||
azimuth_matrix, elevation_matrix = generate_angle_grid()
|
||||
|
||||
# Get azimuth and elevation for detected target
|
||||
max_azimuth = azimuth_matrix[max_index]
|
||||
max_elevation = elevation_matrix[max_index]
|
||||
|
||||
return detected_targets, max_index, max_azimuth, max_elevation
|
||||
|
||||
def plot_target_position(ax, azimuth, elevation):
|
||||
"""Plot detected targets with CFAR and refresh display."""
|
||||
ax.clear()
|
||||
ax.set_xlim(AZIMUTH_RANGE)
|
||||
ax.set_ylim(ELEVATION_RANGE)
|
||||
ax.set_xlabel("Azimuth (°)")
|
||||
ax.set_ylabel("Elevation (°)")
|
||||
ax.set_title("RADAR Target Detection (CFAR)")
|
||||
|
||||
# Plot the detected target
|
||||
ax.scatter(azimuth, elevation, color='red', s=100, label="Detected Target")
|
||||
ax.legend()
|
||||
|
||||
plt.draw()
|
||||
plt.pause(0.01) # Pause to update plot
|
||||
|
||||
# %%
|
||||
# Function to process data
|
||||
def pulse_canceller(radar_data):
|
||||
global num_chirps, num_samples
|
||||
rx_chirps = []
|
||||
rx_chirps = radar_data
|
||||
# create 2 pulse canceller MTI array
|
||||
Chirp2P = np.empty([num_chirps, num_samples])*1j
|
||||
for chirp in range(num_chirps-1):
|
||||
chirpI = rx_chirps[chirp,:]
|
||||
chirpI1 = rx_chirps[chirp+1,:]
|
||||
chirp_correlation = np.correlate(chirpI, chirpI1, 'valid')
|
||||
angle_diff = np.angle(chirp_correlation, deg=False) # returns radians
|
||||
Chirp2P[chirp,:] = (chirpI1 - chirpI * np.exp(-1j*angle_diff[0]))
|
||||
# create 3 pulse canceller MTI array
|
||||
Chirp3P = np.empty([num_chirps, num_samples])*1j
|
||||
for chirp in range(num_chirps-2):
|
||||
chirpI = Chirp2P[chirp,:]
|
||||
chirpI1 = Chirp2P[chirp+1,:]
|
||||
Chirp3P[chirp,:] = chirpI1 - chirpI
|
||||
return Chirp2P, Chirp3P
|
||||
|
||||
def freq_process(data):
|
||||
rx_chirps_fft = np.fft.fftshift(abs(np.fft.fft2(data)))
|
||||
range_doppler_data = np.log10(rx_chirps_fft).T
|
||||
# or this is the longer way to do the fft2 function:
|
||||
# rx_chirps_fft = np.fft.fft(data)
|
||||
# rx_chirps_fft = np.fft.fft(rx_chirps_fft.T).T
|
||||
# rx_chirps_fft = np.fft.fftshift(abs(rx_chirps_fft))
|
||||
range_doppler_data = np.log10(rx_chirps_fft).T
|
||||
num_good = len(range_doppler_data[:,0])
|
||||
center_delete = 0 # delete ground clutter velocity bins around 0 m/s
|
||||
if center_delete != 0:
|
||||
for g in range(center_delete):
|
||||
end_bin = int(num_chirps/2+center_delete/2)
|
||||
range_doppler_data[:,(end_bin-center_delete+g)] = np.zeros(num_good)
|
||||
range_delete = 0 # delete the zero range bins (these are Tx to Rx leakage)
|
||||
if range_delete != 0:
|
||||
for r in range(range_delete):
|
||||
start_bin = int(len(range_doppler_data)/2)
|
||||
range_doppler_data[start_bin+r, :] = np.zeros(num_chirps)
|
||||
range_doppler_data = np.clip(range_doppler_data, min_scale, max_scale) # clip the data to control the max spectrogram scale
|
||||
return range_doppler_data
|
||||
|
||||
# %%
|
||||
|
||||
# Plot range doppler data, loop through at the end of the data set
|
||||
cmn = ''
|
||||
i = 0
|
||||
raw_data = freq_process(all_data[i])
|
||||
i=int((i+1) % len(all_data))
|
||||
range_doppler_fig, ax = plt.subplots(1, figsize=(7,7))
|
||||
extent = [-max_doppler_vel, max_doppler_vel, dist.min(), dist.max()]
|
||||
cmaps = ['inferno', 'plasma']
|
||||
cmn = cmaps[0]
|
||||
ax.set_xlim([-12, 12])
|
||||
ax.set_ylim([0, max_range])
|
||||
ax.set_yticks(np.arange(0, max_range, 10))
|
||||
ax.set_ylabel('Range [m]')
|
||||
ax.set_title('Range Doppler Spectrum')
|
||||
ax.set_xlabel('Velocity [m/s]')
|
||||
range_doppler = ax.imshow(raw_data, aspect='auto', extent=extent, origin='lower', cmap=matplotlib.colormaps.get_cmap(cmn))
|
||||
|
||||
print("CTRL + c to stop the loop")
|
||||
step_thru_plots = False
|
||||
if step_thru_plots == True:
|
||||
print("Press Enter key to adance to next frame")
|
||||
print("Press 0 then Enter to go back one frame")
|
||||
try:
|
||||
# Initialize the FTDI device
|
||||
ftdi = Ftdi()
|
||||
ftdi.open(vendor=VID, product=PID, interface=INTERFACE)
|
||||
# Initialize the FT2232HQ for synchronous FIFO mode
|
||||
initialize_ft2232hq(ftdi)
|
||||
# Initialize plot
|
||||
plt.ion()
|
||||
fig, ax = plt.subplots(figsize=(6, 6))
|
||||
|
||||
while True:
|
||||
receive_data() #receive data from FT2232
|
||||
all_data = read_data_from_ftdi(ftdi, num_bytes=buf_size)
|
||||
buffer = all_data
|
||||
# Process the buffer with CFAR
|
||||
detected_targets, max_pos, max_azimuth, max_elevation = process_radar_data(buffer)
|
||||
|
||||
# Print detected target angles
|
||||
print(f"Detected Target at {max_pos} -> Azimuth: {max_azimuth:.2f}°, Elevation: {max_elevation:.2f}°")
|
||||
|
||||
# Update plot
|
||||
plot_target_position(ax, max_azimuth, max_elevation)
|
||||
|
||||
# Refresh delay
|
||||
time.sleep(REFRESH_RATE)
|
||||
|
||||
if MTI_filter != 'none':
|
||||
Chirp2P, Chirp3P = pulse_canceller(all_data[i])
|
||||
if MTI_filter == '3pulse':
|
||||
freq_process_data = freq_process(Chirp3P)
|
||||
else:
|
||||
freq_process_data = freq_process(Chirp2P)
|
||||
else:
|
||||
freq_process_data = freq_process(all_data[i])
|
||||
range_doppler.set_data(freq_process_data)
|
||||
plt.show(block=False)
|
||||
plt.pause(0.1)
|
||||
if step_thru_plots == True:
|
||||
val = input()
|
||||
if val == '0':
|
||||
i=int((i-1) % len(all_data))
|
||||
else:
|
||||
i=int((i+1) % len(all_data))
|
||||
else:
|
||||
i=int((i+1) % len(all_data))
|
||||
except KeyboardInterrupt: # press ctrl-c to stop the loop
|
||||
pass
|
||||
@@ -0,0 +1,84 @@
|
||||
import numpy as np
|
||||
import scipy.signal as signal
|
||||
import matplotlib.pyplot as plt
|
||||
|
||||
# RADAR Parameters
|
||||
Fs = 60e6 # Sampling rate (60 Msps)
|
||||
Fc = 10.5e9 # Carrier frequency (10.5 GHz)
|
||||
Bw = 30e6 # LFM Bandwidth (30 MHz)
|
||||
T_chirp = 10e-6 # Chirp duration (10 µs)
|
||||
T_standby = 20e-6 # Chirp standby duration (20 µs)
|
||||
N_chirps = 50 # Number of chirps per ADAR1000 position
|
||||
c = 3e8 # Speed of light (m/s)
|
||||
max_range = 3000 # Maximum range (m)
|
||||
max_speed = 270 / 3.6 # Maximum speed (m/s)
|
||||
|
||||
# Derived Parameters
|
||||
num_samples = int(Fs * T_chirp) # Samples per chirp
|
||||
range_res = c / (2 * Bw) # Range resolution
|
||||
PRI = T_chirp + T_standby # Pulse Repetition Interval
|
||||
fd_max = max_speed / (c / Fc / 2) # Maximum Doppler frequency shift
|
||||
|
||||
# Generate LFM Chirp
|
||||
T = np.linspace(0, T_chirp, num_samples, endpoint=False)
|
||||
ref_chirp = signal.chirp(T, f0=0, f1=Bw, t1=T_chirp, method='linear')
|
||||
ref_chirp *= np.hamming(num_samples) # Apply window function
|
||||
|
||||
def simulate_targets(num_targets=1):
|
||||
targets = []
|
||||
for _ in range(num_targets):
|
||||
target_range = np.random.uniform(500, max_range)
|
||||
target_speed = np.random.uniform(-max_speed, max_speed)
|
||||
time_delay = 2 * target_range / c
|
||||
doppler_shift = 2 * target_speed * Fc / c
|
||||
targets.append((time_delay, doppler_shift))
|
||||
return targets
|
||||
|
||||
def generate_received_signal(targets):
|
||||
rx_signal = np.zeros(num_samples, dtype=complex)
|
||||
for time_delay, doppler_shift in targets:
|
||||
delayed_chirp = np.roll(ref_chirp, int(time_delay * Fs))
|
||||
rx_signal += delayed_chirp * np.exp(1j * 2 * np.pi * doppler_shift * T)
|
||||
rx_signal += 0.1 * (np.random.randn(len(rx_signal)) + 1j * np.random.randn(len(rx_signal)))
|
||||
return rx_signal
|
||||
|
||||
def cfar_ca1D(signal, guard_cells, training_cells, threshold_factor):
|
||||
num_cells = len(signal)
|
||||
cfar_output = np.zeros(num_cells, dtype=complex)
|
||||
|
||||
for i in range(training_cells + guard_cells, num_cells - training_cells - guard_cells):
|
||||
noise_level = np.mean(np.abs(signal[i - training_cells - guard_cells:i - guard_cells]))
|
||||
threshold = noise_level * threshold_factor
|
||||
cfar_output[i] = signal[i] if np.abs(signal[i]) > threshold else 0
|
||||
|
||||
return cfar_output
|
||||
|
||||
plt.figure(figsize=(10, 6))
|
||||
while True:
|
||||
targets = simulate_targets(num_targets=3)
|
||||
rx_signal = generate_received_signal(targets)
|
||||
compressed_signal = signal.fftconvolve(rx_signal, ref_chirp[::-1].conj(), mode='same')
|
||||
cfar_output = cfar_ca1D(compressed_signal, guard_cells=3, training_cells=10, threshold_factor=3)
|
||||
|
||||
# MTI Processing
|
||||
mti_filtered = np.diff(compressed_signal)
|
||||
fft_doppler = np.fft.fftshift(np.fft.fft(mti_filtered, N_chirps))
|
||||
speed_axis = np.linspace(-max_speed, max_speed, N_chirps)
|
||||
range_axis = np.linspace(0, max_range, num_samples)
|
||||
|
||||
# Apply CFAR to Doppler Spectrum
|
||||
cfar_doppler_output = cfar_ca1D(np.abs(fft_doppler), guard_cells=3, training_cells=10, threshold_factor=3)
|
||||
|
||||
# Enhanced Range-Doppler Map
|
||||
zero_padded_signal = np.pad(compressed_signal, (0, num_samples), mode='constant')
|
||||
range_doppler_map = np.abs(np.fft.fftshift(np.fft.fft2(zero_padded_signal.reshape(N_chirps, -1), axes=(0, 1)), axes=0))
|
||||
range_doppler_map = 20 * np.log10(range_doppler_map + 1e-6)
|
||||
|
||||
# Plot results
|
||||
plt.clf()
|
||||
plt.imshow(range_doppler_map, aspect='auto', extent=[0, max_range, -max_speed, max_speed], cmap='viridis')
|
||||
plt.colorbar(label='Magnitude (dB)')
|
||||
plt.xlabel("Range (m)")
|
||||
plt.ylabel("Speed (m/s)")
|
||||
plt.title("Real-time Range-Speed Detection")
|
||||
plt.pause(0.1)
|
||||
@@ -0,0 +1,276 @@
|
||||
# %% Imports
|
||||
import time
|
||||
import logging
|
||||
import matplotlib
|
||||
import matplotlib.pyplot as plt
|
||||
import numpy as np
|
||||
from scipy.signal import hilbert
|
||||
|
||||
# Configure logging
|
||||
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
|
||||
|
||||
# Constants for angle estimation
|
||||
MATRIX_SIZE = 83 # 83x83 matrix
|
||||
BUFFER_SIZE = MATRIX_SIZE * MATRIX_SIZE # 6889 elements
|
||||
AZIMUTH_RANGE = (-41.8, 41.8) # Azimuth range
|
||||
ELEVATION_RANGE = (41.8, -41.8) # Elevation range
|
||||
REFRESH_RATE = 0.5 # Refresh rate in seconds (adjust as needed)
|
||||
CFAR_GUARD_CELLS = 2 # Number of guard cells around CUT
|
||||
CFAR_TRAINING_CELLS = 5 # Number of training cells around CUT
|
||||
CFAR_THRESHOLD_FACTOR = 1.5 # CFAR scaling factor (reduced for better sensitivity)
|
||||
|
||||
# Radar parameters (user-configurable)
|
||||
sample_rate = 60e6 # 60 MHz
|
||||
chirp_BW = 30e6 # 30 MHz
|
||||
num_chirps = 459
|
||||
NUM_SAMPLES = 14
|
||||
NUM_CHIRPS = 459
|
||||
num_samples = 14
|
||||
ramp_time_s = 0.25e-6 # Ramp time in seconds
|
||||
frame_length_ms = 0.5e-3 # Frame length in milliseconds
|
||||
output_freq = 10.5e9
|
||||
signal_freq = 32e6
|
||||
max_range = 300 # Maximum range in meters
|
||||
max_speed = 50 # Maximum speed in m/s
|
||||
c = 3e8 # Speed of light
|
||||
wavelength = c / output_freq
|
||||
slope = chirp_BW / ramp_time_s
|
||||
PRI = frame_length_ms / 1e3 # Pulse repetition interval
|
||||
PRF = 1 / PRI # Pulse repetition frequency
|
||||
N_frame = int(PRI * float(sample_rate))
|
||||
freq = np.linspace(-sample_rate / 2, sample_rate / 2, N_frame)
|
||||
dist = (freq - signal_freq) * c / (2 * slope)
|
||||
R_res = c / (2 * chirp_BW)
|
||||
v_res = wavelength / (2 * num_chirps * PRI)
|
||||
max_doppler_freq = PRF / 2
|
||||
max_doppler_vel = max_doppler_freq * wavelength / 2
|
||||
|
||||
# Plotting parameters
|
||||
MTI_filter = '2pulse' # choices are none, 2pulse, or 3pulse
|
||||
min_scale = 4
|
||||
max_scale = 300
|
||||
|
||||
def generate_radar_data():
|
||||
"""
|
||||
Generate synthetic RADAR data with a target and noise.
|
||||
"""
|
||||
# Create an empty array
|
||||
data = np.zeros((MATRIX_SIZE, MATRIX_SIZE), dtype=np.uint8)
|
||||
|
||||
# Add a synthetic target (e.g., a high-value spike)
|
||||
target_row, target_col = MATRIX_SIZE // 2, MATRIX_SIZE // 2 # Place the target in the middle
|
||||
data[target_row, target_col] = 255 # Maximum value for uint8
|
||||
|
||||
# Add Gaussian noise
|
||||
noise = np.random.normal(0, 50, (MATRIX_SIZE, MATRIX_SIZE)).astype(np.uint8) # Adjust noise level as needed
|
||||
data = np.clip(data + noise, 0, 255) # Ensure values stay within uint8 range
|
||||
|
||||
return data.flatten() # Return as 1D array
|
||||
|
||||
def generate_angle_grid():
|
||||
"""
|
||||
Generate azimuth and elevation matrices.
|
||||
"""
|
||||
azimuth_values = np.linspace(AZIMUTH_RANGE[0], AZIMUTH_RANGE[1], MATRIX_SIZE)
|
||||
elevation_values = np.linspace(ELEVATION_RANGE[0], ELEVATION_RANGE[1], MATRIX_SIZE)
|
||||
azimuth_matrix, elevation_matrix = np.meshgrid(azimuth_values, elevation_values)
|
||||
return azimuth_matrix, elevation_matrix
|
||||
|
||||
def ca_cfar(matrix):
|
||||
"""
|
||||
Perform CFAR detection on the matrix using vectorized operations.
|
||||
"""
|
||||
detected_targets = np.zeros_like(matrix)
|
||||
for i in range(CFAR_TRAINING_CELLS + CFAR_GUARD_CELLS, MATRIX_SIZE - CFAR_TRAINING_CELLS - CFAR_GUARD_CELLS):
|
||||
for j in range(CFAR_TRAINING_CELLS + CFAR_GUARD_CELLS, MATRIX_SIZE - CFAR_TRAINING_CELLS - CFAR_GUARD_CELLS):
|
||||
# Define CFAR window
|
||||
window = matrix[i - CFAR_TRAINING_CELLS - CFAR_GUARD_CELLS : i + CFAR_TRAINING_CELLS + CFAR_GUARD_CELLS + 1,
|
||||
j - CFAR_TRAINING_CELLS - CFAR_GUARD_CELLS : j + CFAR_TRAINING_CELLS + CFAR_GUARD_CELLS + 1]
|
||||
guard_area = matrix[i - CFAR_GUARD_CELLS : i + CFAR_GUARD_CELLS + 1,
|
||||
j - CFAR_GUARD_CELLS : j + CFAR_GUARD_CELLS + 1]
|
||||
training_cells = np.setdiff1d(window, guard_area)
|
||||
noise_level = np.mean(training_cells)
|
||||
threshold = CFAR_THRESHOLD_FACTOR * noise_level
|
||||
if matrix[i, j] > threshold:
|
||||
detected_targets[i, j] = matrix[i, j]
|
||||
|
||||
# Debug: Print the number of detected targets
|
||||
num_targets = np.count_nonzero(detected_targets)
|
||||
logging.info(f"CFAR detected {num_targets} targets.")
|
||||
|
||||
return detected_targets
|
||||
|
||||
def process_radar_data(buffer):
|
||||
"""
|
||||
Convert buffer into an 83x83 matrix, apply CFAR, and find target angles.
|
||||
"""
|
||||
if len(buffer) != BUFFER_SIZE:
|
||||
raise ValueError(f"Invalid buffer size! Expected {BUFFER_SIZE}, got {len(buffer)}")
|
||||
|
||||
# Reshape the 1D buffer into an 83x83 matrix
|
||||
matrix = buffer.reshape((MATRIX_SIZE, MATRIX_SIZE))
|
||||
detected_targets = ca_cfar(matrix)
|
||||
max_index = np.unravel_index(np.argmax(detected_targets), matrix.shape)
|
||||
azimuth_matrix, elevation_matrix = generate_angle_grid()
|
||||
max_azimuth = azimuth_matrix[max_index]
|
||||
max_elevation = elevation_matrix[max_index]
|
||||
return detected_targets, max_index, max_azimuth, max_elevation
|
||||
|
||||
def plot_target_position(ax, azimuth, elevation):
|
||||
"""
|
||||
Plot detected targets with CFAR and refresh display.
|
||||
"""
|
||||
ax.clear()
|
||||
ax.set_xlim(AZIMUTH_RANGE)
|
||||
ax.set_ylim(ELEVATION_RANGE)
|
||||
ax.set_xlabel("Azimuth (°)")
|
||||
ax.set_ylabel("Elevation (°)")
|
||||
ax.set_title("RADAR Target Detection (CFAR)")
|
||||
ax.scatter(azimuth, elevation, color='red', s=100, label="Detected Target")
|
||||
ax.legend()
|
||||
plt.draw()
|
||||
plt.pause(0.01)
|
||||
|
||||
# %%
|
||||
# Function to process data
|
||||
def pulse_canceller(radar_data):
|
||||
global num_chirps, num_samples
|
||||
rx_chirps = []
|
||||
rx_chirps = radar_data
|
||||
# create 2 pulse canceller MTI array
|
||||
Chirp2P = np.empty([num_chirps, num_samples])*1j
|
||||
for chirp in range(num_chirps-1):
|
||||
chirpI = rx_chirps[chirp,:]
|
||||
chirpI1 = rx_chirps[chirp+1,:]
|
||||
chirp_correlation = np.correlate(chirpI, chirpI1, 'valid')
|
||||
angle_diff = np.angle(chirp_correlation, deg=False) # returns radians
|
||||
Chirp2P[chirp,:] = (chirpI1 - chirpI * np.exp(-1j*angle_diff[0]))
|
||||
# create 3 pulse canceller MTI array
|
||||
Chirp3P = np.empty([num_chirps, num_samples])*1j
|
||||
for chirp in range(num_chirps-2):
|
||||
chirpI = Chirp2P[chirp,:]
|
||||
chirpI1 = Chirp2P[chirp+1,:]
|
||||
Chirp3P[chirp,:] = chirpI1 - chirpI
|
||||
return Chirp2P, Chirp3P
|
||||
|
||||
def freq_process(data):
|
||||
"""
|
||||
Process radar data to generate range-Doppler spectrum.
|
||||
"""
|
||||
# Reshape the 1D data into a 2D matrix
|
||||
|
||||
rx_chirps_fft = np.fft.fftshift(abs(np.fft.fft2(data)))
|
||||
range_doppler_data = np.log10(rx_chirps_fft).T
|
||||
# or this is the longer way to do the fft2 function:
|
||||
# rx_chirps_fft = np.fft.fft(data)
|
||||
# rx_chirps_fft = np.fft.fft(rx_chirps_fft.T).T
|
||||
# rx_chirps_fft = np.fft.fftshift(abs(rx_chirps_fft))
|
||||
range_doppler_data = np.log10(rx_chirps_fft).T
|
||||
num_good = len(range_doppler_data[:,0])
|
||||
center_delete = 0 # delete ground clutter velocity bins around 0 m/s
|
||||
if center_delete != 0:
|
||||
for g in range(center_delete):
|
||||
end_bin = int(num_chirps/2+center_delete/2)
|
||||
range_doppler_data[:,(end_bin-center_delete+g)] = np.zeros(num_good)
|
||||
range_delete = 0 # delete the zero range bins (these are Tx to Rx leakage)
|
||||
if range_delete != 0:
|
||||
for r in range(range_delete):
|
||||
start_bin = int(len(range_doppler_data)/2)
|
||||
range_doppler_data[start_bin+r, :] = np.zeros(num_chirps)
|
||||
range_doppler_data = np.clip(range_doppler_data, min_scale, max_scale) # clip the data to control the max spectrogram scale
|
||||
|
||||
|
||||
|
||||
# Debug: Print the range-Doppler data statistics
|
||||
logging.info(f"Range-Doppler data:{range_doppler_data}")
|
||||
logging.info(f"Range-Doppler data: min={np.min(range_doppler_data)}, max={np.max(range_doppler_data)}")
|
||||
|
||||
return range_doppler_data
|
||||
|
||||
k = 0
|
||||
buffer = generate_radar_data()
|
||||
data = buffer[:6426]
|
||||
data = data.reshape((NUM_SAMPLES,NUM_CHIRPS))
|
||||
k=int((k+1) % len(data))
|
||||
|
||||
# %% Main loop
|
||||
def main():
|
||||
try:
|
||||
global k
|
||||
|
||||
# Initialize plot
|
||||
plt.ion()
|
||||
|
||||
# Create a figure with two subplots: one for target position and one for range-Doppler spectrum
|
||||
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 6))
|
||||
|
||||
# Initialize the range-Doppler plot
|
||||
extent = [-max_doppler_vel, max_doppler_vel, dist.min(), dist.max()]
|
||||
cmaps = ['inferno', 'plasma']
|
||||
cmn = cmaps[0]
|
||||
range_doppler_plot = ax2.imshow(np.zeros((NUM_SAMPLES, NUM_CHIRPS)), aspect='auto', extent=extent, origin='lower', cmap=plt.get_cmap(cmn))
|
||||
ax2.set_xlim([-max_speed, max_speed])
|
||||
ax2.set_ylim([0, max_range])
|
||||
ax2.set_yticks(np.arange(0, max_range, 10))
|
||||
ax2.set_ylabel('Range [m]')
|
||||
ax2.set_title('Range Doppler Spectrum')
|
||||
ax2.set_xlabel('Velocity [m/s]')
|
||||
|
||||
while True:
|
||||
# Generate synthetic RADAR data
|
||||
buffer = generate_radar_data()
|
||||
data = buffer[:6426]
|
||||
data = data.reshape((NUM_SAMPLES,NUM_CHIRPS))
|
||||
|
||||
|
||||
# Process radar data
|
||||
detected_targets, max_pos, max_azimuth, max_elevation = process_radar_data(buffer)
|
||||
|
||||
# Print detected target angles
|
||||
logging.info(f"Detected Target at {max_pos} -> Azimuth: {max_azimuth:.2f}°, Elevation: {max_elevation:.2f}°")
|
||||
|
||||
# Update target position plot
|
||||
plot_target_position(ax1, max_azimuth, max_elevation)
|
||||
|
||||
# Process range-Doppler data
|
||||
|
||||
range_doppler_data = freq_process(data)
|
||||
|
||||
# Update range-Doppler plot
|
||||
if MTI_filter != 'none':
|
||||
Chirp2P, Chirp3P = pulse_canceller(data[k])
|
||||
if MTI_filter == '3pulse':
|
||||
freq_process_data = freq_process(Chirp3P)
|
||||
else:
|
||||
freq_process_data = freq_process(Chirp2P)
|
||||
else:
|
||||
freq_process_data = freq_process(data[k])
|
||||
range_doppler.set_data(freq_process_data)
|
||||
plt.show(block=False)
|
||||
plt.pause(0.1)
|
||||
if step_thru_plots == True:
|
||||
val = input()
|
||||
if val == '0':
|
||||
k=int((k-1) % len(buffer))
|
||||
else:
|
||||
k=int((k+1) % len(buffer))
|
||||
else:
|
||||
k=int((k+1) % len(buffer))
|
||||
|
||||
#range_doppler_plot.set_data(range_doppler_data)
|
||||
#range_doppler_plot.set_clim(vmin=np.min(range_doppler_data), vmax=np.max(range_doppler_data)) # Adjust color scale
|
||||
#plt.draw()
|
||||
|
||||
# Pause to control the refresh rate
|
||||
plt.pause(REFRESH_RATE)
|
||||
|
||||
except KeyboardInterrupt:
|
||||
logging.info("Process interrupted by user.")
|
||||
except Exception as e:
|
||||
logging.error(f"An error occurred: {e}")
|
||||
finally:
|
||||
logging.info("Processing stopped.")
|
||||
|
||||
# %% Run the script
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user