RCS Computation Toolkit This repository provides C++ and Python tools for computing radar cross section (RCS) from surface currents or analytical models
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

493 lines
14 KiB

import numpy as np
import os
import sys
import matplotlib.pyplot as plt
import csv
import pandas as pd
import fnmatch
from tqdm import tqdm
import random
import shutil
import copy
c0 = 3e8
# Filename (FileName): Simulation Filename
FileName="PPP"
# cur Filename (FileName): Simulation Filename
current_dir = os.getcwd()
curFolder = current_dir+"/CURRENT"
curFileName = current_dir+"/CURRENT/Currents_"+FileName+"_"
DEST = current_dir+"/freq"
# Freq (Freq): Central Modulation Frequency (MHz)
Freq=3000 * 1e6
# Planewave Waveform (ExcitationType): (0)Time Harmonic (1)Gauss (2)Neumann (3) Modulated Gauss
waveform=1
# Time Delay
Tdelay=25e-10
# Pulse Width
Tau=5e-10
# Incident Magnitude
Emagnitude = 1.0
# Final Simulation Time
FinalTime=1.2e-8
# Time interval (Sampling interval dt)
dt = 0.000000000013859
# Time step count
steps = 11
# Sampling frequency f_s = 1 / dt
f_s = 1.0 / dt
# Nyquist frequency
f_nyq = f_s / 2.0
# Chirp
fmin = 1e9
B = 3e9
Tchirp = 3e-9
# ----------------------------------------------- #
# Calculate the incident Condition (Analytical)
# ----------------------------------------------- #
# Define the signal generation function
def generate_incident_spectrum(FREQ, t_values, nfiles, NFFT, dt, To, Emagnitude, Tau=1e-9, TimePeriod=1e-8, fmin=1e9, B =1e3 , Tchirp=1e-8, excit_flag="Plane Wave"):
"""
Generates the FFT of the incident electric field based on excitation type.
Parameters:
FREQ: Target frequency (rad/s)
dt: Time step
NFFT: Number of FFT points
To: Time offset
Emagnitude: Electric field magnitude
excit_flag: "Plane Wave", "Gauss Pulse", or "Neumann Pulse"
ModuleFlag: whether to modulate with cosine
Tau: Width of Gaussian / Neumann pulse
TimePeriod: Total duration to simulate (in seconds)
Returns:
incident_fft: FFT of the incident electric field (np.ndarray)
"""
Ex = np.zeros(nfiles)
omega = 2.0 * np.pi * FREQ
for i, t in enumerate(t_values):
Exponent = t - To
SinModul = np.cos(omega * Exponent)
if excit_flag == "Plane Wave": # Plane wave
Ex[i] = Emagnitude * SinModul
elif excit_flag == "Gauss Pulse": # Gauss Pulse
Ex[i] = Emagnitude * SinModul * np.exp(-(Exponent**2) / (Tau**2))
elif excit_flag == "Neumann Pulse": # Neuman Pulse
Neuman = (2.0 * Exponent) / (Tau * Tau)
Ex[i] = Emagnitude * Neuman * np.exp(-(Exponent**2) / (Tau**2))
elif excit_flag == "Chirp":
if 0 <= Exponent <= Tchirp:
phi = 2 * np.pi * fmin * Exponent + np.pi * (B / Tchirp) * Exponent**2
Ex[i] = Emagnitude * np.cos(phi)
else:
Ex[i] = 0.0
incident_fft = np.fft.fft(Ex , n=NFFT) / NFFT
return Ex, incident_fft
# Count .curJ files
nfiles = len(fnmatch.filter(os.listdir(curFolder), '*.curJ'))
nfiles = int(nfiles)
print('Number of time domain current files:', nfiles)
# Sampling frequency f_s = 1 / dt
f_s = 1.0 / dt
# Nyquist frequency
f_nyq = f_s / 2.0
# FFT size
NFFT = nfiles * 2
# Frequency Resolution
df = f_s / NFFT
# time axis
time_axis = np.arange(nfiles) * dt
# frequency axis
freq_axis = np.arange(NFFT) * df / 1e9
# --- Step 1: Generate Signal ---
signal, incident_fft = generate_incident_spectrum(Freq, time_axis, nfiles, NFFT, dt, Tdelay, Emagnitude, Tau, FinalTime, fmin, B , Tchirp, "Gauss Pulse")
# --- Step 2: Find indices where magnitude > 10% of peak ---
fft_inc_magnitude = np.abs(incident_fft)
threshold = 0.1 * np.max(fft_inc_magnitude)
indices_above_threshold = np.where(fft_inc_magnitude[0:NFFT//2] > threshold)[0]
# Extract frequency range where magnitude > 10% of peak
freq_selected = freq_axis[indices_above_threshold]
fft_selected = fft_inc_magnitude[indices_above_threshold]
# Find min and max frequency for box range
fmin, fmax = np.min(freq_selected), np.max(freq_selected)
# Find indices of fmin and fmax
idx_min = np.searchsorted(freq_axis, fmin)
idx_max = np.searchsorted(freq_axis, fmax)
# Confirm the values at those indices
actual_fmin = freq_axis[idx_min]
actual_fmax = freq_axis[idx_max]
num_points = idx_max - idx_min
print(f"Index range: {idx_min} to {idx_max}")
print(f"Frequency range: {actual_fmin:.3f} GHz to {actual_fmax:.3f} GHz")
print(f"Number of frequency points in range: {num_points}")
# --- Step 3: Plot with translucent box ---
# Create subplots: time domain (top), frequency domain (bottom)
fig, axs = plt.subplots(2, 1, figsize=(10, 8))
# Plot time-domain signal
axs[0].plot(time_axis, signal)
axs[0].set_title("Time-Domain Signal (Gauss Pulse)")
axs[0].set_xlabel("Time (s)")
axs[0].set_ylabel("Amplitude")
axs[0].grid(True)
# Plot frequency-domain magnitude
axs[1].plot(freq_axis, fft_inc_magnitude)
axs[1].axvspan(fmin, fmax, color='red', alpha=0.3, label=">10% of Peak")
axs[1].set_title("Magnitude Spectrum of Incident Electric Field")
axs[1].set_xlabel("Frequency (Hz)")
axs[1].set_ylabel("Magnitude")
axs[1].grid(True)
axs[1].set_xlim([0, f_nyq / 1e9]) # Adjust based on your center freq
plt.legend()
plt.tight_layout()
plt.savefig("Incident.png")
# Create DataFrame
df = pd.DataFrame({
'Frequency (GHz)': freq_axis,
'fft_inc Magnitude': fft_inc_magnitude,
})
# Save to Excel
excel_path = "./cur_inc_freqdomain_analytical.xlsx"
df.to_excel(excel_path, index=False)
# ---------------------------------------------------
# Process .tri file
# Read number of triangles and nodes
def parse_mesh(filename):
with open(filename, 'r') as f:
lines = f.readlines()
scale = float(lines[0].strip())
num_nodes = int(lines[1].strip())
# Read nodes
nodes = np.array([list(map(float, line.strip().split())) for line in lines[2:2 + num_nodes]])
num_triangles = int(lines[3 + num_nodes].strip())
# Read triangles (1-based indexing in file, converting to 0-based)
triangles = np.array([list(map(int, line.strip().split())) for line in lines[4 + num_nodes:4 + num_nodes + num_triangles]])
print(num_triangles)
print(triangles[0])
print(triangles[-1])
return scale, nodes, triangles
def find_closest_node(query_pos, nodes):
dists = np.linalg.norm(nodes - query_pos, axis=1)
closest_index = np.argmin(dists)
return closest_index, dists[closest_index]
def find_triangles_with_node_and_position(triangles, node_index):
tri_indices = []
positions = []
for i, tri in enumerate(triangles):
for j, n in enumerate(tri):
if n == node_index:
tri_indices.append(i)
positions.append(j)
return np.array(tri_indices), np.array(positions)
tri_filename = FileName + "_out.tri"
scale, nodes, triangles = parse_mesh(tri_filename)
Num_Nodes = len(nodes)
Num_tri = len(triangles)
def count_triangles_from_timedomain_curfile(filename):
with open(filename, 'r') as f:
lines = f.readlines()
# Remove empty lines and strip
values = [line.strip() for line in lines if line.strip() != '']
total_values = len(values)
if total_values % 9 != 0:
print(f"Warning: File has {total_values} values which is not divisible by 9!")
num_triangles = total_values // 9
return num_triangles
sample_file = curFileName + "00000_BC.curJ"
num_tri = count_triangles_from_timedomain_curfile(sample_file)
print('Number of Triangles tri file =', Num_tri)
print('Number of Triangles time domain current file =', num_tri)
complex128_mem = 16
float_mem = 8
memory_usage_bytes = num_points * Num_tri * 3 * 3 * complex128_mem
print("Frequency Domain Memory = ", memory_usage_bytes/1e9," GB")
memory_usage_bytes = nfiles * Num_tri * 3 * 3 * float_mem
print("Time Domain Memory = ", memory_usage_bytes/1e9," GB")
# ---------------------------------------------------
# Multi-threading
import concurrent.futures
import urllib.request
# ----------------------------------------------- #
# Scattered Field
# ----------------------------------------------- #
# Read curJ and curM in time domain (Incident Field)
from concurrent import futures
# Read curJ and curM files using multi-threading
def read_cur_files(i, currentFilename, ext, Num_tri):
index = "%05d" % int(steps * i)
pfile = open(currentFilename + index + "_BC." + ext, 'r')
lines = pfile.readlines()
cur = np.zeros([Num_tri, 3, 3])
counter = 0
for t in range(Num_tri):
for n in range(3):
cur[t, n, 0] = float(lines[counter])
counter += 1
cur[t, n, 1] = float(lines[counter])
counter += 1
cur[t, n, 2] = float(lines[counter])
counter += 1
counter += 1
pfile.close()
return cur
# curJ = n x H
# Time, Triangle, node, xyz component
curJ_sc = np.zeros([nfiles, Num_tri, 3, 3])
# curM = E x n
# Time, Triangle, node, xyz component
curM_sc = np.zeros([nfiles, Num_tri, 3, 3])
print("Reading curJ sc ...")
with concurrent.futures.ThreadPoolExecutor(max_workers=16) as executor:
tickets = {executor.submit(read_cur_files, i, curFileName, "curJ", Num_tri): i for i in range(nfiles)}
for future in tqdm(concurrent.futures.as_completed(tickets), total=nfiles, desc="curJ files"):
index = tickets[future]
try:
curJ_sc[index, :, :, :] = future.result()
except Exception as exc:
print(f"{index} generated an exception: {exc}")
print(np.max(curJ_sc))
print("Reading curM sc...")
with concurrent.futures.ThreadPoolExecutor(max_workers=16) as executor:
tickets = {executor.submit(read_cur_files, i, curFileName, "curM", Num_tri): i for i in range(nfiles)}
for future in tqdm(concurrent.futures.as_completed(tickets), total=nfiles, desc="curM files"):
index = tickets[future]
try:
curM_sc[index, :, :, :] = future.result()
except Exception as exc:
print(f"{index} generated an exception: {exc}")
print(np.max(curM_sc))
def process_RCS(F_curJ, F_curM, FREQ, FREQ_STR, outfile, DEST, SURFACE_TRI_MESH, THETA_sc, PHI_sc):
# Create frequency folder and copy geometry
freq_dir = f"{current_dir}/freq/FREQ{FREQ_STR}"
os.makedirs(freq_dir, exist_ok=True)
print("Working Folder =",freq_dir)
# Change directory into frequency folder
os.chdir(freq_dir)
# Save results for current and magnetic field components
fid = open(outfile + ".curJ", 'w')
for t in tqdm(range(Num_tri)):
for n in range(3):
for c in range(3):
line = str(np.real(F_curJ[t, n, c])) + " " + str(np.imag(F_curJ[t, n, c])) + '\n'
fid.write(line)
fid.close()
fid = open(outfile + ".curM", 'w')
for t in tqdm(range(Num_tri)):
for n in range(3):
for c in range(3):
line = str(np.real(F_curM[t, n, c])) + " " + str(np.imag(F_curM[t, n, c])) + '\n'
fid.write(line)
fid.close()
# Create .region file
REGIONFILE = outfile
with open(REGIONFILE + ".region", "w") as filep:
filep.write("1\n")
filep.write("0 FEBI\n")
filep.write(f"0 FEBI {outfile}\n")
filep.write("Coupling\n")
# Copy mesh to expected filename
shutil.copy(f"{current_dir}/{SURFACE_TRI_MESH}_out.tri", f"{freq_dir}/{outfile}.tri")
shutil.copy(f"{current_dir}/n2f_main", f"{freq_dir}/n2f_main")
# Run field-to-far-field converter and translator
os.system(f"./n2f_main {REGIONFILE} {int(FREQ)} 0 0 0 180 360 0 1")
os.system(f"emsurftranslator -s {outfile}")
print("\n\n==== Process Farfield Result ====\n")
# Clean spacing in .cs file
os.system(f"sed -i 's/ / /g' {REGIONFILE}.cs")
# Read and split farfield RCS data
data = pd.read_csv(f"./{REGIONFILE}.cs", delimiter=" ", skiprows=1, header=None)
data1 = data.iloc[:361*10]
data1.drop(columns=[0], inplace=True)
data2 = data.iloc[361*10:]
data2.drop(columns=[data2.columns[-1]], inplace=True)
# Concatenate full pattern data
result = np.concatenate((data1.to_numpy(), data2.to_numpy()))
df = pd.DataFrame(result, columns=['Theta', 'Phi', 'RCS_V', 'RCS_H'])
# Extract RCS at specific angles
theta_df = df[df['Theta'] == THETA_sc]
theta_phi_df = theta_df[theta_df['Phi'] == PHI_sc]
print(theta_phi_df)
RCS_V = theta_phi_df['RCS_V'].values[0] # single value
RCS_H = theta_phi_df['RCS_H'].values[0] # single value
# Change back to base directory
os.chdir("..")
os.chdir("..")
return RCS_V, RCS_H
freq_all = []
RCS_V_all = []
RCS_H_all = []
print("----- Compute FFT -----")
curJ_FD = np.fft.fft(curJ_sc, n=NFFT, axis=0) / NFFT
curM_FD = np.fft.fft(curM_sc, n=NFFT, axis=0) / NFFT
print("----- Compute RCS -----")
for freq_ind in tqdm(range(num_points)):
freq_index = idx_min + freq_ind
SURFACE_TRI_MESH = FileName
outfile = "out"
FREQ = int(freq_axis[freq_index] * 1000)
FREQ_STR = str(FREQ)
THETA_sc = 180
PHI_sc = 0
print("---------------- Processing ", FREQ_STR, " MHz ---------------------")
norm = 1.0 / fft_inc_magnitude[freq_index]
F_curJ_single = curJ_FD[freq_index, :, :, :].copy() * norm
F_curM_single = curM_FD[freq_index, :, :, :].copy() * norm
RCS_V,RCS_H = process_RCS(F_curJ_single, F_curM_single, FREQ, FREQ_STR, outfile, DEST, SURFACE_TRI_MESH, THETA_sc, PHI_sc)
freq_all.append(FREQ/1000)
RCS_V_all.append(RCS_V)
RCS_H_all.append(RCS_H)
# Plotting
plt.figure(figsize=(8, 5))
plt.plot(freq_all, RCS_V_all, marker='o',label='RCS_V')
plt.plot(freq_all, RCS_H_all, marker='x',label='RCS_H')
plt.legend()
plt.grid(True)
plt.xlabel("Frequency (GHz)")
plt.ylabel("RCS (dBsm)")
plt.title("RCS vs Frequency")
plt.tight_layout()
plt.savefig("RCS_vs_Frequency_new.png", dpi=300)
# Save to Excel
df_rcs = pd.DataFrame({
"Frequency (GHz)": freq_all,
"RCS_V (dBsm)": RCS_V_all,
"RCS_H (dBsm)": RCS_H_all
})
df_rcs.to_excel("RCS_vs_Frequency_new.xlsx", index=False)
print("Saved to RCS_vs_Frequency_new.xlsx")
# ----------------------------------------------- #