This repository serve as a backup for my Maxwell-TD code
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

1182 lines
40 KiB

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Pure per-point comparison of incident vs total vs scattered E-fields.
NO quadrature: no areas, no weights, no normals.
Now with an E-only S11 calculation at the bottom, computed ONLY over
frequency bins where the incident-field spectrum energy ≥ 10% of peak.
Inputs (in working dir):
- computeDGTD.log
- port_inc_field.csv (columns: nx,ny,nz, Et_x,Et_y,Et_z, Ht_x,Ht_y,Ht_z)
- PROBES/Probes_<FILE_BASENAME>_*.csv (or Currents_<FILE_BASENAME>_*.csv)
- <FILE_BASENAME>.probe (X,Y,Z for the same points used in PROBES)
Outputs:
- compare_E_per_point_stats.csv
- compare_E_per_point_time_series.npz
- Heatmaps: compare_E_heatmap_*.png
- Sample traces: compare_E_time_point_p*.csv
- S11_selected.csv, S11_selected.png, S11_bands_info.txt
"""
from pathlib import Path
import re
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import fnmatch, glob, os
os.environ["MPLBACKEND"] = "Agg"
import matplotlib
matplotlib.use("Agg", force=True)
import matplotlib.pyplot as plt
import imageio
from mpl_toolkits.mplot3d import Axes3D # noqa: F401
from matplotlib import cm
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
import concurrent
from concurrent import futures
from typing import Tuple, List, Optional
import os
import shutil
# =========================
# Config / paths
# =========================
current_dir = os.getcwd()
LOG_PATH = Path("computeDGTD.log")
CSV_PATH = Path("./PortInc/Port0.csv")
CSV_PATH_1 = Path("./PortInc/Port1.csv")
PROBE_FOLDER = Path("./PortProbes/Port0")
PROBE_FOLDER_1 = Path("./PortProbes/Port1") # TOTAL field CSVs at Port1
FILE_BASENAME = "waveguide" # matches <FILE_BASENAME>.probe and files in PROBES
MAKE_GIFS = True # set False to skip GIFs
GIF_MAX_FRAMES = 500
SAVE_PREFIX = "compare"
fname = "waveguide"
DEST = current_dir+"/RCS"
port_er = 1.0
# =========================
# Parse computeDGTD.log (timing + port envelope)
# =========================
def extract_selected_log_values(filepath: Path):
targets = {"Final Time(sec)": None, "dt_sample": None, "tsPerSampling": None}
with filepath.open("r") as f:
lines = f.readlines()
in_block = False
for line in lines:
if "==========================================" in line and "PERFORMING INFORMATION" in line:
in_block = True
continue
if in_block and "==========================================" in line:
break
for key in targets:
if key in line:
m = re.search(r'=\s*([\d.eE+-]+)', line)
if m:
val = float(m.group(1))
targets[key] = int(val) if key == "tsPerSampling" else val
return {"FinalTime": targets["Final Time(sec)"], "dt_sample": targets["dt_sample"], "tsPerSampling": targets["tsPerSampling"]}
def _parse_bracket_vec(s: str):
parts = [float(p) for p in s.split(",")]
if len(parts) != 3:
raise ValueError(f"Expected 3 components in vector, got: {s}")
return np.array(parts, float)
# Regex for new-style "PORT[<idx>] ..." lines
PORT_RE = re.compile(
r"""^
PORT\[(?P<idx>\d+)\]\s+ # PORT[0]
BC=(?P<BC>\d+)\s+
ro=\[(?P<ro>[^\]]+)\]\s+
r1=\[(?P<r1>[^\]]+)\]\s+
Z=(?P<Z>[+\-0-9.eE]+)\s+
\|E\|=(?P<Emag>[+\-0-9.eE]+)\s+
Flag=(?P<Flag>\d+)\s+
Tdist=(?P<Tdist>\d+)\s+
f=(?P<f>[+\-0-9.eE]+)\s+
t0=(?P<t0>[+\-0-9.eE]+)\s+
tau=(?P<tau>[+\-0-9.eE]+)
(?:\s+BW=(?P<BW>[+\-0-9.eE]+))? # optional BW
""",
re.VERBOSE,
)
def parse_port_from_log(log_path: Path):
"""
Parse only 'PORT[<idx>] BC=...' style lines.
Returns dict with:
idx, bc, ro, r1, khat, Z, Emag, Flag, Tdist,
fMHz, t0, tau, BW (may be None if absent).
"""
with log_path.open("r") as f:
for line in f:
if not line.strip().startswith("PORT["):
continue
m = PORT_RE.search(line)
if not m:
continue
ro = _parse_bracket_vec(m.group("ro"))
r1 = _parse_bracket_vec(m.group("r1"))
khat = r1 - ro
nrm = np.linalg.norm(khat)
khat = khat / nrm if nrm > 0 else np.array([0.0, 0.0, 1.0])
return {
"idx": int(m.group("idx")),
"bc": int(m.group("BC")),
"ro": ro,
"r1": r1,
"khat": khat,
"Z": float(m.group("Z")),
"Emag": float(m.group("Emag")),
"Flag": int(m.group("Flag")),
"Tdist":int(m.group("Tdist")),
"fMHz": float(m.group("f")),
"t0": float(m.group("t0")),
"tau": float(m.group("tau")),
"BW": float(m.group("BW")) if m.group("BW") else None,
}
raise RuntimeError("No parsable PORT[<idx>] line found in log.")
# =========================
# NO-quadrature CSV reader
# =========================
def build_port_points_from_csv(df: pd.DataFrame):
"""
Parse a port-centroid CSV and return:
r : (Q,3) centroid coordinates
Et: (Q,3) tangential E at centroid
Ht: (Q,3) tangential H at centroid
a : (Q,) triangle area (or None if not present)
Accepts several coordinate header conventions for robustness:
- preferred: x1,y1,z1 (centroid)
- legacy: rx,ry,rz or nx,ny,nz
"""
# 1) coordinates (centroid)
coord_header_options = [
("x1", "y1", "z1"), # your new centroid columns
("rx", "ry", "rz"), # earlier “quadrature position” columns
("nx", "ny", "nz"), # (legacy) some dumps used these names for points
]
coord_cols = None
for cand in coord_header_options:
if all(c in df.columns for c in cand):
coord_cols = cand
break
if coord_cols is None:
raise ValueError("No coordinate columns found. Expected one of "
"[x1,y1,z1] or [rx,ry,rz] or [nx,ny,nz].")
r = df[list(coord_cols)].to_numpy(float)
# 2) fields at centroid
req_EH = ["Et_x","Et_y","Et_z","Ht_x","Ht_y","Ht_z"]
missing = [c for c in req_EH if c not in df.columns]
if missing:
raise ValueError(f"Missing columns in CSV: {missing}")
Et = df[["Et_x","Et_y","Et_z"]].to_numpy(float)
Ht = df[["Ht_x","Ht_y","Ht_z"]].to_numpy(float)
# 3) area (optional but preferred now)
if "area" in df.columns:
a = df["area"].to_numpy(float)
else:
a = None # Without vertices we cannot derive area; keep as None.
return r, Et, Ht, a
def fft_incident_port_power(E_inc, H_inc, dt, areas=None, khat=None, window=None,
n_fft = None,
pad_factor = None,
pad_multiple = None,
pad_to_pow2 = False):
"""
Compute port power vs frequency from incident fields AND overall (time-average) power.
Args
----
E_inc : (T, Q, 3) real
Time-domain incident E at Q surface points (tri centroids).
H_inc : (T, Q, 3) real
Time-domain incident H at the same points.
dt : float
Time step (seconds).
areas : (Q,) optional
Triangle areas for power integration. If None, uses ones.
khat : (3,) optional
Unit normal for power flow (+z default). Will be normalized.
window: {'hann','hamming'} | array(T) | None
Optional time window for FFT; coherent gain is compensated.
Returns
-------
f : (F,)
One-sided frequency bins (Hz).
P : (F,)
One-sided total incident power vs frequency (W).
S_n : (F, Q)
One-sided per-triangle normal power density (W/m^2) vs frequency.
S_t : (T, Q, 3)
Time domain Poynting
P_avg_time : float
Overall (time-average) incident power over the record (W).
P_t : (T,)
Instantaneous incident power vs time (W).
Ef : (F, Q, 3)
Frequency domain Electric Field
Hf : (F, Q, 3)
Frequency domain Magnetic Field
"""
E_inc = np.asarray(E_inc)
H_inc = np.asarray(H_inc)
T, Q, _ = E_inc.shape
if areas is None:
areas = np.ones(Q, dtype=float)
else:
areas = np.asarray(areas, float)
if khat is None:
khat = np.array([0.0, 0.0, 1.0], float)
else:
khat = np.asarray(khat, float)
nrm = np.linalg.norm(khat)
if nrm == 0:
raise ValueError("khat must be nonzero")
khat = khat / nrm
# ---------- Time-domain overall power ----------
# Instantaneous Poynting (real fields): S(t,q) = E x H [W/m^2]
S_t = np.cross(E_inc, H_inc, axis=2) # (T, Q, 3)
# Project onto khat (normal component), integrate over area -> power
S_n_t = S_t @ khat # (T, Q)
P_t = (S_n_t * areas[None, :]).sum(axis=1) # (T,)
P_avg_time = float(P_t.mean()) # scalar W
# ---------- Frequency-domain spectrum ----------
# Optional window (leakage control) with coherent-gain compensation
if window is None:
w = np.ones(T, float)
cg = 1.0
elif isinstance(window, str):
wl = window.lower()
if wl == "hann":
w = np.hanning(T)
elif wl == "hamming":
w = np.hamming(T)
else:
raise ValueError("Unsupported window; use 'hann', 'hamming', or pass an array.")
cg = w.mean()
else:
w = np.asarray(window, float)
if w.shape[0] != T:
raise ValueError("Custom window length must equal T.")
cg = w.mean() if w.mean() != 0 else 1.0
Ew = w[:, None, None] * E_inc
Hw = w[:, None, None] * H_inc
# ---------- choose FFT length ----------
Npad = T
if n_fft is not None:
Npad = max(T, int(n_fft))
elif pad_factor is not None:
Npad = max(T, int(T * pad_factor))
else:
if pad_to_pow2:
Npow = 1 << (T - 1).bit_length()
Npad = max(Npad, Npow)
if pad_multiple is not None and pad_multiple > 1:
Nmul = ((T + pad_multiple - 1) // pad_multiple) * pad_multiple
Npad = max(Npad, Nmul)
# ---------- FFT ----------
Ef = np.fft.rfft(Ew, n=Npad, axis=0) / (cg * T) * 2.0
Hf = np.fft.rfft(Hw, n=Npad, axis=0) / (cg * T) * 2.0
f = np.fft.rfftfreq(Npad, d=dt); F = f.size
# one-sided scaling (even/odd Npad)
onesided = np.ones(F, float)
if Npad % 2 == 0:
if F > 2: onesided[1:-1] = 2.0
else:
if F > 1: onesided[1:] = 2.0
cross = np.cross(Ef, np.conj(Hf), axis=2)
S_n = 0.5 * np.real(cross @ khat) * onesided[:, None]
P = (S_n * areas[None, :]).sum(axis=1)
return f, P, S_n, S_t, P_avg_time, P_t, Ef, Hf, Npad
# --- helper to find the 10% band around the main lobe ---
def ten_percent_band(f, P, frac=0.10):
i0 = int(np.nanargmax(P))
thr = (frac)*P[i0]
# walk left
iL = i0
while iL > 0 and P[iL] >= thr:
iL -= 1
iL = max(iL, 0)
# walk right
iR = i0
while iR < len(P)-1 and P[iR] >= thr:
iR += 1
iR = min(iR, len(P)-1)
band = slice(iL, iR+1)
return band, thr, i0
# =========================
# Incident modulation
# =========================
Pi = np.pi
MEGA = 1e6
c0 = 299792458.0
def time_modulation_inc(
flag,
t, t0, # t: (T,) seconds ; t0: START time for flag 3 ('to' in C++)
khat, r, r0, # used for flags 0/1/2 retarded-time
freq_m, # MHz
Emag, Hmag, # amplitudes (Hmag typically Emag/eta)
tau, # flags 1/2: Gaussian width (s); flag 3: DURATION (s)
bw_mhz=None # for flag 3: bandwidth in MHz (CHIRP_BW_MHZ)
):
"""
flag:
0 = CW
1 = Gaussian-cos
2 = Gaussian-derivative
3 = Linear FM chirp (Hann-tapered): starts at t0, lasts tau, B from bw_mhz
"""
t = np.asarray(t, float) # (T,)
r = np.asarray(r, float) # (Q,3)
r0 = np.asarray(r0, float)
khat= np.asarray(khat, float)
khat= khat / max(np.linalg.norm(khat), 1e-30)
# Retarded-time (only for flags 0/1/2)
proj = (r - r0) @ khat # (Q,)
Exponent = t[:, None] - proj[None, :] / c0
omega = 2.0 * Pi * freq_m * MEGA
Q = r.shape[0]
if flag == 0:
phase = -omega * Exponent
IncE = Emag * np.sin(phase)
IncH = Hmag * np.sin(phase)
elif flag == 1:
env = np.exp(-((Exponent - t0)**2) / (tau * tau))
cosmd = np.cos(omega * (Exponent - t0))
IncE = Emag * cosmd * env
IncH = Hmag * cosmd * env
elif flag == 2:
env = np.exp(-((Exponent - t0)**2) / (tau * tau))
neuman = 2.0 * (Exponent - t0) / (tau * tau)
IncE = Emag * neuman * env
IncH = Hmag * neuman * env
elif flag == 3:
# ===================== Linear FM chirp (Hann-tapered) =====================
# 'to' is the START time; chirp runs on [t0, t0+tau]
# Sweep centered at freq_m (MHz), bandwidth bw_mhz (MHz), k = B/tau
if bw_mhz is None:
raise ValueError("For flag=3, please pass bw_mhz (CHIRP_BW_MHZ in MHz).")
B_Hz = float(bw_mhz) * MEGA # bandwidth [Hz]
fc = float(freq_m) * MEGA # center freq [Hz]
f0 = fc - 0.5 * B_Hz # start freq [Hz]
k = (B_Hz / max(float(tau), 1e-30)) # chirp rate [Hz/s]
# time since chirp start; shape (T,)
tt = t - float(t0)
# Gate to [0, tau]
gate = (tt >= 0.0) & (tt <= float(tau))
# Column form (T,1) to match broadcasting with (Q,)
tt_col = tt[:, None]
# Phase: 2π( f0*tt + 0.5*k*tt^2 )
phase = 2.0 * Pi * (f0 * tt_col + 0.5 * k * tt_col * tt_col)
# Hann window on [0, tau]: 0.5*(1 - cos(2π*tt/tau)), zero outside gate
# Avoid division by zero if tau==0 (already guarded above)
w = 0.5 * (1.0 - np.cos(2.0 * Pi * (tt_col / float(tau))))
w = np.where(gate[:, None], w, 0.0)
s = np.sin(phase) * w
# Chirp is independent of position here (no retarded term per your CUDA)
IncE = Emag * s # (T,1) -> caller multiplies by Et[None,:,:] to get (T,Q,3)
IncH = Hmag * s
elif flag == 4:
# Linear FM chirp, rectangular gate on [0, tau]
if bw_mhz is None:
raise ValueError("For flag=4, please pass bw_mhz (bandwidth in MHz).")
B_Hz = float(bw_mhz) * MEGA
fc = float(freq_m) * MEGA
f0 = fc - 0.5 * B_Hz
k = B_Hz / max(float(tau), 1e-30)
tt = t - float(t0) # (T,)
gate = (tt >= 0.0) & (tt <= float(tau))
ttcol = tt[:, None] # (T,1)
phase = 2.0 * Pi * (f0 * ttcol + 0.5 * k * ttcol * ttcol)
s = np.sin(phase)
# Rectangular gate (apply it!)
s = np.where(gate[:, None], s, 0.0) # (T,1)
# Broadcast to all spatial points (no retarded term)
onesQ = np.ones((1, Q), dtype=float)
IncE = Emag * s @ onesQ # (T,Q)
IncH = Hmag * s @ onesQ
else:
raise ValueError("Unknown TimeDistributionFlag")
return IncE, IncH
def parse_mesh(filename):
with open(filename, 'r') as f:
lines = f.readlines()
scale = float(lines[0].strip())
num_nodes = int(lines[1].strip())
# Read nodes
nodes = np.array([list(map(float, line.strip().split())) for line in lines[2:2 + num_nodes]])
num_triangles = int(lines[3 + num_nodes].strip())
# Read triangles (1-based indexing in file, converting to 0-based)
triangles = np.array([list(map(int, line.strip().split())) for line in lines[4 + num_nodes:3 + num_nodes + num_triangles]])
print(num_triangles)
print(triangles[0])
print(triangles[-1])
return scale, nodes, triangles
def process_GAIN(F_curJ, F_curM, FREQ, FREQ_STR, outfile, DEST, SURFACE_TRI_MESH, Num_tri, Pin, port_er):
# Create frequency folder and copy geometry
freq_dir = f"{current_dir}/freq/FREQ{FREQ_STR}"
os.makedirs(freq_dir, exist_ok=True)
print("Working Folder =",freq_dir)
# Change directory into frequency folder
os.chdir(freq_dir)
# Save results for current and magnetic field components
fid = open(outfile + ".curJ", 'w')
for t in tqdm(range(Num_tri)):
for n in range(3):
for c in range(3):
line = str(np.real(F_curJ[t, n, c])) + " " + str(np.imag(F_curJ[t, n, c])) + '\n'
fid.write(line)
fid.close()
fid = open(outfile + ".curM", 'w')
for t in tqdm(range(Num_tri)):
for n in range(3):
for c in range(3):
line = str(np.real(F_curM[t, n, c])) + " " + str(np.imag(F_curM[t, n, c])) + '\n'
fid.write(line)
fid.close()
# Create .region file
REGIONFILE = outfile
with open(REGIONFILE + ".region", "w") as filep:
filep.write("1\n")
filep.write("0 FEBI\n")
filep.write(f"0 FEBI {outfile}\n")
filep.write("Coupling\n")
# Copy mesh to expected filename
shutil.copy(f"{current_dir}/{SURFACE_TRI_MESH}_out.tri", f"{freq_dir}/{outfile}.tri")
shutil.copy(f"{current_dir}/n2f_main", f"{freq_dir}/n2f_main")
# Run field-to-far-field converter and translator
# Option for file format: 0(magnitude), 1(real and imaginary)
# Input the mode:0(radiation),1(scattering)
os.system(f"./n2f_main {REGIONFILE} {int(FREQ)} 0 0 0 180 360 1 0")
os.system(f"emsurftranslator -s {outfile}")
print("\n\n==== Process Farfield Result ====\n")
ap_path = f"./{REGIONFILE}.ap" # or your explicit path
Pin_W = Pin * np.sqrt(port_er)
ETA0 = 120.0 * np.pi
# --- read .ap robustly (skip counts line) ---
raw = pd.read_csv(
ap_path,
sep=r"\s+",
engine="python",
skiprows=1,
header=None,
on_bad_lines="skip",
na_values=["inf","-inf","INF","-INF","nan","NaN","Infinity","-Infinity"]
)
if raw.shape[1] < 6:
raise RuntimeError(f".ap format needs 6 cols: Theta Phi Re(Eθ) Im(Eθ) Re(Eφ) Im(Eφ); got {raw.shape[1]}")
df = pd.DataFrame({
"Theta": pd.to_numeric(raw.iloc[:,0], errors="coerce"),
"Phi": pd.to_numeric(raw.iloc[:,1], errors="coerce"),
"Eth_re": pd.to_numeric(raw.iloc[:,2], errors="coerce"),
"Eth_im": pd.to_numeric(raw.iloc[:,3], errors="coerce"),
"Eph_re": pd.to_numeric(raw.iloc[:,4], errors="coerce"),
"Eph_im": pd.to_numeric(raw.iloc[:,5], errors="coerce"),
}).dropna().reset_index(drop=True)
# Angles (deg), normalize
df["Theta"] = df["Theta"].round().astype(int)
df["Phi"] = df["Phi"].round().astype(int) % 360
# Complex fields and |E|^2
Eth = df["Eth_re"].to_numpy() + 1j*df["Eth_im"].to_numpy()
Eph = df["Eph_re"].to_numpy() + 1j*df["Eph_im"].to_numpy()
E2 = (np.abs(Eth)**2 + np.abs(Eph)**2) # (V/m)^2
# Radiation intensity at r=1 m: U = |E|^2 / (2*eta0) [W/sr]
U = E2 / (2.0*ETA0)
# Gain with known Pin: G_lin = 4π U / Pin
G_lin = (4.0*Pi) * U / float(Pin_W)
G_lin = np.clip(G_lin, 1e-20, 1e30) # guard
G_dBi = 10.0*np.log10(G_lin)
out = df[["Theta","Phi"]].copy()
out["Gain_lin"] = G_lin
out["Gain_dBi"] = G_dBi
out.to_csv("ap_gain.csv", index=False)
print("Wrote ap_gain.csv")
# ---- Build merged cuts (φ=0/180 and φ=90/270), no averaging at 0° ----
pat = out.copy()
pat["Theta"] = pat["Theta"].astype(int)
pat["Phi"] = (pat["Phi"].astype(int) % 360)
# Keep only θ in [0,90]
th_mask = (pat["Theta"] >= 0) & (pat["Theta"] <= 90)
# φ = 0/180 merged cut:
# +θ from φ=0 (include θ=0), −θ from φ=180 (exclude θ=0), then combine
pos0 = pat[(pat["Phi"] == 0) & th_mask].copy()
pos0.loc[:, "Ang"] = pos0["Theta"]
neg0 = pat[(pat["Phi"] == 180) & th_mask & (pat["Theta"] > 0)].copy()
neg0.loc[:, "Ang"] = -neg0["Theta"]
cut0 = pd.concat(
[neg0[["Ang", "Gain_dBi"]], pos0[["Ang", "Gain_dBi"]]],
ignore_index=True
).sort_values("Ang").reset_index(drop=True)
cut0.to_csv("GainCut_phi0_merged_-90_90_dBi.csv", index=False)
# φ = 90/270 merged cut:
pos90 = pat[(pat["Phi"] == 90) & th_mask].copy()
pos90.loc[:, "Ang"] = pos90["Theta"]
neg90 = pat[(pat["Phi"] == 270) & th_mask & (pat["Theta"] > 0)].copy()
neg90.loc[:, "Ang"] = -neg90["Theta"]
cut90 = pd.concat(
[neg90[["Ang", "Gain_dBi"]], pos90[["Ang", "Gain_dBi"]]],
ignore_index=True
).sort_values("Ang").reset_index(drop=True)
cut90.to_csv("GainCut_phi90_merged_-90_90_dBi.csv", index=False)
# ---- Plot the two cuts ----
plt.figure(figsize=(7,5))
plt.plot(cut0["Ang"], cut0["Gain_dBi"], label="phi 0/180 merged", marker="o")
plt.plot(cut90["Ang"], cut90["Gain_dBi"], label="phi 90/270 merged", marker="x")
# Full angular span
plt.xlim(-90, 90)
# Robust y-limits (ignore NaN/±inf just in case)
y_all = np.concatenate([cut0["Gain_dBi"].to_numpy(), cut90["Gain_dBi"].to_numpy()])
y_all = y_all[np.isfinite(y_all)]
if y_all.size:
lo, hi = np.percentile(y_all, [1, 99])
pad = 0.05 * max(1.0, (hi - lo))
plt.ylim(lo - pad, hi + pad)
plt.xlabel("Angle (deg) [-90 … +90]")
plt.ylabel("Gain (dBi)")
plt.title("Merged Gain cuts (from .ap, with known Pin)")
plt.grid(True); plt.legend()
plt.tight_layout()
plt.savefig("GainCuts_merged_phi0_phi90_dBi.png", dpi=200, bbox_inches="tight")
plt.close()
# Change back to base directory
os.chdir("..")
os.chdir("..")
i = np.where(np.isclose(cut0['Ang'].to_numpy(), 0))[0]
G0_phi0 = float(cut0['Gain_dBi'].iloc[i[0] if i.size else cut0['Ang'].abs().argmin()])
return G0_phi0
# =========================
# Total Fields
# =========================
_CENTER_COLS = ["x1","y1","z1"]
_TF_COLS = ["Ex","Ey","Ez","Hx","Hy","Hz"]
def _extract_tidx(fname: str) -> int:
"""
Parse the time index from a filename ending in _####.csv
Example: Port_Probes_0_0037.csv -> 37
Port_Probes_0037.csv -> 37
"""
m = re.search(r"_([0-9]+)\.csv$", fname)
if not m:
raise ValueError(f"Cannot parse time index from filename: {fname}")
return int(m.group(1))
def _read_one_port_csv(path: str) -> pd.DataFrame:
"""
Read a single per-port CSV with centroid fields.
Expected columns (case-insensitive): x1,y1,z1, Ex,Ey,Ez,Hx,Hy,Hz [, area]
"""
df = pd.read_csv(path, sep=None, engine="python", header=0)
df.columns = [c.strip().lower() for c in df.columns]
for c in _CENTER_COLS:
if c not in df.columns:
raise ValueError(f"Missing '{c}' in {path}")
keep = list(_CENTER_COLS) + [c.lower() for c in _TF_COLS]
missing = [c for c in keep if c not in df.columns]
if missing:
raise ValueError(f"Missing columns in {path}: {missing}")
out = df[keep].copy()
# Optional area
if "area" in df.columns:
out["area"] = df["area"].astype(float)
out["t_idx"] = _extract_tidx(Path(path).name)
return out
def read_port_centroid_series(port_dir: Path,
workers: int = 8
) -> Tuple[np.ndarray, Optional[np.ndarray],
np.ndarray, np.ndarray,
np.ndarray, List[str]]:
"""
Read *all* CSVs in a per-port directory (one file per timestep),
return:
coords: (Q,3) centroid coordinates (from the FIRST file)
areas : (Q,) or None (if 'area' not present)
E : (T,Q,3)
H : (T,Q,3)
tvals : (T,) sorted time indices (integers from filenames)
files : list of file paths in read order
Notes:
- Assumes same triangle order across timesteps (as written by your code).
- If 'area' is missing, returns None (you can pass None to the FFT/power functions).
"""
port_dir = Path(port_dir)
files = sorted(glob.glob(str(port_dir / "*.csv")))
if not files:
raise FileNotFoundError(f"No CSVs found under {port_dir}")
# Load all in parallel
dfs = []
with ThreadPoolExecutor(max_workers=workers) as ex:
futs = {ex.submit(_read_one_port_csv, f): f for f in files}
for fut in as_completed(futs):
dfs.append(fut.result())
df_all = pd.concat(dfs, ignore_index=True)
df_all.sort_values(["t_idx"], inplace=True, kind="mergesort")
tvals = np.sort(df_all["t_idx"].unique())
T = len(tvals)
# Use first timestep as geometry reference
df0 = df_all[df_all["t_idx"] == tvals[0]].reset_index(drop=True)
Q = len(df0)
coords = df0[_CENTER_COLS].to_numpy(float) # (Q,3)
# Build arrays
E = np.empty((T, Q, 3), float)
H = np.empty((T, Q, 3), float)
# Fast path: split by t_idx in order
for ti, t in enumerate(tvals):
dft = df_all[df_all["t_idx"] == t].reset_index(drop=True)
if len(dft) != Q:
raise ValueError(f"Inconsistent face count at t={t}: got {len(dft)}, expected {Q}")
E[ti, :, 0] = dft["ex"].to_numpy(float)
E[ti, :, 1] = dft["ey"].to_numpy(float)
E[ti, :, 2] = dft["ez"].to_numpy(float)
H[ti, :, 0] = dft["hx"].to_numpy(float)
H[ti, :, 1] = dft["hy"].to_numpy(float)
H[ti, :, 2] = dft["hz"].to_numpy(float)
return coords, E, H, tvals, files
# ---------------------------------------------------------------------
# Quick plotting helpers (keep it simple)
# ---------------------------------------------------------------------
def plot_time_series(t: np.ndarray,
E: np.ndarray,
P_t: np.ndarray,
out_png: str = "port_time.png"):
"""
E: (T,Q,3). We plot <|E|> over Q vs time, and P_t vs time (2 rows).
"""
Emag = np.linalg.norm(E, axis=2) # (T,Q)
Emag_mean = Emag.mean(axis=1) # (T,)
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 7), sharex=True)
ax1.plot(t, Emag_mean)
ax1.set_ylabel("<|E|> over faces")
ax1.set_title("Field magnitude (mean over port faces)")
ax2.plot(t, P_t)
ax2.set_xlabel("Time (s)")
ax2.set_ylabel("Total Power (W)")
ax2.set_title("Power vs time")
fig.tight_layout()
fig.savefig(out_png, dpi=150)
plt.close(fig)
def plot_spectrum(f: np.ndarray,
P: np.ndarray,
band_mask: np.ndarray,
thr: float,
S_n_band: Optional[np.ndarray] = None, # kept for API compatibility; unused
out_png: str = "port_freq.png",
xtick_step: Optional[float] = None):
"""
Plot incident power spectrum with a blue highlight over the ≥10% band.
"""
fig, axP = plt.subplots(1, 1, figsize=(12, 5))
# Main spectrum + threshold
axP.plot(f, P)
axP.axhline(thr, ls="--", label="10% threshold")
# Blue band highlight
fL = f[band_mask][0]
fR = f[band_mask][-1]
axP.axvspan(fL, fR, facecolor="tab:blue", edgecolor="tab:blue", alpha=0.15, label="10% band")
# Labels & ticks
axP.set_xlabel("Frequency (Hz)")
axP.set_ylabel("Power (W)")
axP.set_title("Power Spectrum")
if xtick_step is not None and xtick_step > 0:
xmax = max(f[-1], fR)
xticks = np.arange(0.0, xmax + xtick_step, xtick_step)
axP.set_xticks(xticks)
axP.legend()
minF = max(0.0,fL-1e9)
axP.set_xlim(minF,fR+1e9)
fig.tight_layout()
fig.savefig(out_png, dpi=150)
plt.close(fig)
#################
## S parameters
#################
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
def _mode_power_Nm(Em, Hm, areas, nhat):
"""
Nm = 0.5 * ∫ dS ( Em × Hm* ) · n
"""
Em = np.asarray(Em, complex) # (Q,3)
Hmc = np.conj(np.asarray(Hm, complex))
nh = np.asarray(nhat, float) / max(np.linalg.norm(nhat), 1e-30)
a = np.asarray(areas, float)
cx = np.cross(Em, Hmc) # (Q,3)
dot = cx @ nh # (Q,)
Nm = 0.5 * np.sum(dot * a) # complex (usually real)
return Nm
def mode_overlap_s11(E_tot_f, H_tot_f, E_mode_f, H_mode_f, areas, nhat):
"""
Rigorous single-mode overlap on a port plane.
Inputs
------
E_tot_f, H_tot_f : (F,Q,3) complex
TOTAL fields in frequency domain on the port surface.
E_mode_f, H_mode_f : (F,Q,3) complex
Mode fields you use as the incident reference (your E_inc_freq/H_inc_freq).
areas : (Q,) float
Triangle areas [m^2].
nhat : (3,) float
Unit normal pointing in the +propagation direction of the *forward* mode.
Returns
-------
a_f, b_f : (F,) complex
Forward and backward modal amplitudes of the TOTAL field.
S11 : (F,) complex
Reflection coefficient versus frequency (b_f / a_inc).
I1, I2 : (F,) complex
Overlap integrals (with the 0.5 factor included).
Nm : (F,) complex
Mode normalization 0.5 ∫ (E_m × H_m*)·n dS (per frequency).
"""
E_tot_f = np.asarray(E_tot_f); H_tot_f = np.asarray(H_tot_f)
E_mode_f = np.asarray(E_mode_f); H_mode_f = np.asarray(H_mode_f)
F, Q, _ = E_tot_f.shape
assert E_mode_f.shape == (F,Q,3) and H_mode_f.shape == (F,Q,3)
areas = np.asarray(areas, float); assert areas.shape == (Q,)
nhat = np.asarray(nhat, float); nhat = nhat / np.linalg.norm(nhat)
def _surface_int_EH(Ef, Hf):
# 0.5 * ∫ (E × H*)·n dS (vectorized over F and Q)
c = np.cross(Ef, np.conj(Hf), axis=2) # (F,Q,3)
s = c @ nhat # (F,Q)
return 0.5 * (s * areas[None, :]).sum(axis=1) # (F,)
# Normalization of the chosen mode
Nm = _surface_int_EH(E_mode_f, H_mode_f) # (F,)
# Overlap integrals with TOTAL field:
# I1 = 0.5 ∫ (E_tot × H_m*)·n dS = (a + b) Nm
# I2 = 0.5 ∫ (E_m* × H_tot)·n dS = (a - b) Nm*
I1 = _surface_int_EH(E_tot_f, H_mode_f) # (F,)
# For I2 use E_m* × H_tot, so conjugate E_m inside the cross:
I2 = 0.5 * ( (np.cross(np.conj(E_mode_f), H_tot_f, axis=2) @ nhat)
* areas[None, :] ).sum(axis=1)
tiny = 1e-30
Nm_c = np.conj(Nm)
Nm_nz = np.where(np.abs(Nm) > tiny, Nm, np.inf)
NmC_nz= np.where(np.abs(Nm_c) > tiny, Nm_c, np.inf)
# Solve for a and b (per frequency)
a_f = 0.5 * (I1 / Nm_nz + I2 / NmC_nz)
b_f = 0.5 * (I1 / Nm_nz - I2 / NmC_nz)
# Incident amplitude (from the mode itself).
# Using the same formulas with (E_tot,H_tot) replaced by (E_mode,H_mode):
I1_inc = _surface_int_EH(E_mode_f, H_mode_f) # equals Nm if the mode is forward-only
I2_inc = 0.5 * ( (np.cross(np.conj(E_mode_f), H_mode_f, axis=2) @ nhat)
* areas[None, :] ).sum(axis=1) # equals Nm* for forward-only
a_inc = 0.5 * (I1_inc / Nm_nz + I2_inc / NmC_nz)
S11 = np.where(np.abs(a_inc) > tiny, b_f / a_inc, 0.0 + 0.0j)
return a_f, b_f, S11, I1, I2, Nm
def plot_s11_band(f, S11_mag, band_mask, out_png="S11_band.png", xtick_step=None, title="S11 (mode overlap)"):
fig, ax = plt.subplots(1,1, figsize=(12,4))
ax.plot(f, 20*np.log10(np.clip(S11_mag, 1e-20, None)), label='|S11| (dB)')
flo, fhi = f[band_mask][0], f[band_mask][-1]
ax.axhline(-10, ls=":", color='0.4', lw=1) # handy guide
ax.set_xlabel("Frequency (Hz)")
ax.set_ylabel("|S11| (dB)")
ax.set_title(title)
if xtick_step:
xt = np.arange(0.0, fhi + xtick_step, xtick_step)
ax.set_xticks(xt)
ax.legend()
fig.tight_layout()
fig.savefig(out_png, dpi=150)
plt.close(fig)
def export_s11_xlsx(f, S11_mag, band_mask, path="S11_mode_overlap.xlsx"):
f_band = f[band_mask]
S11_band = S11_mag[band_mask]
df = pd.DataFrame({
"freq_Hz": f_band,
"S11_mag": S11_band,
"S11_dB" : 20*np.log10(np.clip(S11_band, 1e-20, None))
})
try:
import xlsxwriter # if available
df.to_excel(path, index=False)
print(f"Wrote {path}")
except Exception as e:
csv = path.rsplit(".",1)[0] + ".csv"
df.to_csv(csv, index=False)
print(f"xlsxwriter not available ({e}); wrote {csv} instead.")
import numpy as np
import pandas as pd
def export_s11_to_excel(outfile, freqs, S11, band_mask, a_f=None, b_f=None):
"""
Write S11 vs frequency to an .xlsx file.
- outfile : "Sparams.xlsx"
- freqs : (F,) Hz
- S11 : (F,) complex reflection coefficient
- band_mask : slice or boolean mask selecting the 10% band
- a_f,b_f : (F,) complex forward/backward amplitudes (optional; saved if given)
"""
S11 = np.asarray(S11)
freqs = np.asarray(freqs, float)
df_full = pd.DataFrame({
"f_Hz": freqs,
"f_GHz": freqs/1e9,
"S11_mag": np.abs(S11),
"S11_dB": 20*np.log10(np.clip(np.abs(S11), 1e-20, None)),
"S11_ang_deg": np.angle(S11, deg=True),
"in_10pct_band": False
})
df_full.loc[band_mask, "in_10pct_band"] = True
if a_f is not None:
df_full["a_f_mag"] = np.abs(a_f)
df_full["a_f_ang"] = np.angle(a_f, deg=True)
if b_f is not None:
df_full["b_f_mag"] = np.abs(b_f)
df_full["b_f_ang"] = np.angle(b_f, deg=True)
df_band = df_full.loc[band_mask].reset_index(drop=True)
# Try openpyxl first, then xlsxwriter; else CSV fallback.
try:
with pd.ExcelWriter(outfile, engine="openpyxl") as xl:
df_full.to_excel(xl, sheet_name="S11_full", index=False)
df_band.to_excel(xl, sheet_name="S11_10pct_band", index=False)
print(f"Wrote Excel: {outfile}")
except Exception as e1:
try:
with pd.ExcelWriter(outfile, engine="xlsxwriter") as xl:
df_full.to_excel(xl, sheet_name="S11_full", index=False)
df_band.to_excel(xl, sheet_name="S11_10pct_band", index=False)
print(f"Wrote Excel (xlsxwriter): {outfile}")
except Exception as e2:
csv_fallback = outfile.replace(".xlsx", ".csv")
df_full.to_csv(csv_fallback, index=False)
print(f"No Excel engine available; wrote CSV fallback: {csv_fallback}")
from concurrent import futures
# Read curJ and curM files using multi-threading
def read_cur_files(i, currentFilename, ext, Num_tri, steps):
index = "%05d" % int(steps * i)
pfile = open(currentFilename + index + "_BC." + ext, 'r')
lines = pfile.readlines()
cur = np.zeros([Num_tri, 3, 3])
counter = 0
for t in range(Num_tri):
for n in range(3):
cur[t, n, 0] = float(lines[counter])
counter += 1
cur[t, n, 1] = float(lines[counter])
counter += 1
cur[t, n, 2] = float(lines[counter])
counter += 1
counter += 1
pfile.close()
return cur
# =========================
# Main (comparison + S11 over ≥10% bins)
# =========================
def main():
# --- parse log for timing + port envelope ---
sim = extract_selected_log_values(LOG_PATH)
port = parse_port_from_log(LOG_PATH)
dt_sample = sim["dt_sample"]
tsPerSampling = sim["tsPerSampling"]
print("dt_sample = " ,dt_sample)
print("tsPerSampling = ", tsPerSampling)
# --- load Incident Point ---
df_pts = pd.read_csv(CSV_PATH)
r, Et, Ht, a = build_port_points_from_csv(df_pts)
Q = r.shape[0]
print("Point count (Q):", Q)
if a is not None:
print("Total area on file:", np.sum(a))
print(port["fMHz"])
#################################################
####### TOTAL FIELDS ############################
#################################################
# --- choose a port directory and sample time ---
port_dir = Path("./PortProbes/Port0") # e.g., Port0_0000.csv, Port0_0036.csv, ...
# 1) Read total-field time series from per-timestep CSVs
coords, E_tot, H_tot, t_idx, files = read_port_centroid_series(port_dir)
# Build a time axis from file index and dt
t_sec = t_idx.astype(float) * dt_sample / tsPerSampling
# 2) Compute spectrum + time-domain power (same function works for totals)
freqs, P_tot, S_tot_n_fq, S_tot_time, P_tot_avg_time, P_tot_t, E_tot_freq, H_tot_freq, Npad = fft_incident_port_power(E_tot, H_tot, dt_sample, areas=a, pad_factor=3)
# 2) Compute spectrum + time-domain power (same function works for totals)
trunc = 210
E_tot_trunc = np.zeros_like(E_tot)
H_tot_trunc = np.zeros_like(H_tot)
E_tot_trunc[0:trunc] = E_tot[0:trunc]
H_tot_trunc[0:trunc] = H_tot[0:trunc]
freqs, Pinc_tot, Sinc_tot_n_fq, Sinc_tot_time, Pinc_tot_avg_time, Pinc_tot_t, Einc_tot_freq, Hinc_tot_freq, Npad = fft_incident_port_power(E_tot, H_tot, dt_sample, areas=a, pad_factor=4)
###################################################
####### Incident Field ############################
###################################################
Emag = port["Emag"]
Hmag = Emag
IncE, IncH = time_modulation_inc(port["Tdist"], t_sec, port["t0"], port["khat"], r, port["ro"], port["fMHz"], Emag, Hmag, port["tau"], port["BW"])
E_inc = IncE[:, :, None] * Et[None, :, :] # (T,Q,3)
H_inc = IncH[:, :, None] * Ht[None, :, :] # (T,Q,3)
freqs, P_inc, S_n_fq, S_time, P_avg_time, P_t, E_inc_freq, H_inc_freq, Npad = fft_incident_port_power(E_inc, H_inc, dt_sample, areas=a, pad_factor=3)
band, thr, i0 = ten_percent_band(freqs, P_inc, frac=0.10)
f_band = freqs[band]
band_mask = np.zeros_like(freqs, dtype=bool)
band_mask[band] = True
flo, fhi = freqs[band][0], freqs[band][-1]
print(f"10% band: [{flo:.6g}, {fhi:.6g}] Hz (BW={fhi-flo:.6g} Hz)")
print(t_sec)
plot_time_series(t_sec, E_inc, P_t, out_png=str("./Inc_time.png"))
S_band = S_n_fq[band, :]
plot_spectrum(freqs, P_inc, band, thr, S_n_band=S_band,
out_png=str("./Inc_freq.png"),
xtick_step=0.1e9)
print(f"[Incident] time-average power over record: {P_avg_time:.6g} W")
# 3) Plots (time and spectrum). Spectrum uses 0.1 GHz ticks.
plot_time_series(t_sec, E_tot, P_tot_t, out_png=str("./TOTAL_time.png"))
S_band = S_tot_n_fq[band, :]
plot_spectrum(freqs, P_tot, band, thr, S_n_band=S_band,
out_png=str("./TOTAL_freq.png"),
xtick_step=0.1e9)
# 4) Console summaries
print(f"[TOTAL] time-average power over record: {P_avg_time:.6g} W")
f0 = freqs[i0]
flo, fhi = freqs[band][0], freqs[band][-1]
print(f"[TOTAL] Peak at {f0:.6g} Hz; 10% band: [{flo:.6g}, {fhi:.6g}] Hz (BW={fhi-flo:.6g} Hz)")
################################
### Calculate the Sparameters
print(H_inc_freq.shape)
a_f, b_f, S11_mag, I1, I2, Nm = mode_overlap_s11(E_tot_freq, H_tot_freq, E_inc_freq, H_inc_freq, areas=a, nhat=np.array([0,0,1]))
xtick_step = 0.1e9
fig, ax = plt.subplots(1,1, figsize=(12,4))
ax.plot(freqs[band_mask], 20*np.log10(np.clip(abs(S11_mag[band_mask]), 1e-20, None)), label='|S11| (dB)')
flo, fhi = freqs[band_mask][0], freqs[band_mask][-1]
ax.axvspan(flo, fhi, color='C0', alpha=0.15, label='10% band') # filled box
ax.axhline(-10, ls=":", color='0.4', lw=1) # handy guide
ax.set_xlabel("Frequency (Hz)")
ax.set_ylabel("|S11| (dB)")
ax.set_title("S Parameter")
if xtick_step:
xt = np.arange(0.0, fhi + xtick_step, xtick_step)
ax.set_xticks(xt)
ax.legend()
ax.set_xlim([flo,fhi])
fig.tight_layout()
fig.savefig("S11para.png", dpi=150)
plt.close(fig)
export_s11_to_excel("S11params.xlsx", freqs, S11_mag, band_mask, a_f=a_f, b_f=b_f)
if __name__ == "__main__":
main()