This repository serve as a backup for my Maxwell-TD code
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

400 lines
13 KiB

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Compute port power from time-modulated modal fields sampled at triangle quadrature points.
Inputs (in working dir):
- computeDGTD.log : contains PERFORMING INFORMATION + PORT line (see regex below)
- port_inc_field.csv : columns:
exc_face_id, local_face_idx, quad_idx,
x1,y1,z1, x2,y2,z2, x3,y3,z3, # triangle vertices (meters)
nx,ny,nz, # QUADRATURE-POINT coordinates (meters)
Et_x,Et_y,Et_z, # modal tangential E at quad (shape only)
Ht_x,Ht_y,Ht_z # modal tangential H at quad (shape only)
Outputs:
- port_power_spectrum.csv/.png : per-frequency incident power (W) vs f
- port_power_time_trace.csv : instantaneous P(t) (W) and time-axis
- console print of <P(t)> and sum P_k consistency check
Notes:
- If you want TEM-consistent amplitudes, set Hmag = Emag / Zc instead of Hmag = Emag.
- Ensure quad_idx ordering matches your 6/9-pt rule tables (it does in this script).
"""
from pathlib import Path
import re
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import fnmatch
import os
# =========================
# Config / paths
# =========================
LOG_PATH = Path("computeDGTD.log")
CSV_PATH = Path("port_inc_field.csv")
PROBE_FOLDER = Path("./PROBES")
SAVE_PREFIX = "port"
# =========================
# Constants
# =========================
Pi = np.pi
c0 = 299792458.0 # m/s
# =========================
# Parse computeDGTD.log
# =========================
def extract_selected_log_values(filepath: Path):
targets = {
"Final Time(sec)": None,
"dt_sample": None,
"tsPerSampling": None
}
with filepath.open("r") as f:
lines = f.readlines()
in_block = False
for line in lines:
if "==========================================" in line and "PERFORMING INFORMATION" in line:
in_block = True
continue
if in_block and "==========================================" in line:
break
for key in targets:
if key in line:
m = re.search(r'=\s*([\d.eE+-]+)', line)
if m:
val = float(m.group(1))
if key == "tsPerSampling":
targets[key] = int(val)
else:
targets[key] = val
return {
"FinalTime": targets["Final Time(sec)"],
"dt_sample": targets["dt_sample"],
"tsPerSampling": targets["tsPerSampling"],
}
PORT_RE = re.compile(
r"PORT:\s*ro=\[(?P<ro>[^]]+)\],\s*r1=\[(?P<r1>[^]]+)\],\s*Z=(?P<Z>[+\-0-9.eE]+),\s*\|E\|=(?P<Emag>[+\-0-9.eE]+),\s*Flag=(?P<Flag>\d+),\s*Tdist=(?P<Tdist>\d+),\s*f=(?P<f>[+\-0-9.eE]+),\s*t0=(?P<t0>[+\-0-9.eE]+),\s*tau=(?P<tau>[+\-0-9.eE]+)"
)
def _parse_bracket_vec(s: str):
parts = [float(p) for p in s.split(",")]
if len(parts) != 3:
raise ValueError(f"Expected 3 components in vector, got: {s}")
return np.array(parts, dtype=float)
def parse_port_from_log(log_path: Path):
with log_path.open("r") as f:
for line in f:
if "PORT:" in line:
m = PORT_RE.search(line)
if not m:
continue
ro = _parse_bracket_vec(m.group("ro"))
r1 = _parse_bracket_vec(m.group("r1"))
Z = float(m.group("Z"))
Emag= float(m.group("Emag"))
Flag= int(m.group("Flag"))
Tdist=int(m.group("Tdist"))
fMHz = float(m.group("f")) # MHz
t0 = float(m.group("t0"))
tau = float(m.group("tau"))
khat = r1 - ro
nrm = np.linalg.norm(khat)
khat = khat / nrm if nrm > 0 else np.array([0.0, 0.0, 1.0], float)
return {
"ro": ro, "r1": r1, "Z": Z, "Emag": Emag,
"Flag": Flag, "Tdist": Tdist, "fMHz": fMHz,
"t0": t0, "tau": tau, "khat": khat
}
raise RuntimeError("No parsable PORT line found in log.")
# =========================
# Quadrature (6-pt / 9-pt) weights in same ordering as your C macros
# =========================
G6_W = np.array([0.109951743655322]*3 + [0.223381589678011]*3) # sum = 1/2 on ref triangle
G9_W = np.array([0.205950504760887]*3 + [0.063691414286223]*6) # sum = 1/2 on ref triangle
# =========================
# Geometry helpers
# =========================
def tri_area(v0, v1, v2):
return 0.5 * np.linalg.norm(np.cross(v1 - v0, v2 - v0))
def tri_normal(v0, v1, v2):
n = np.cross(v1 - v0, v2 - v0)
nn = np.linalg.norm(n)
if nn == 0:
raise ValueError("Degenerate triangle")
return n / nn
def build_port_quadrature_from_csv(df):
"""
df columns required:
exc_face_id, local_face_idx, quad_idx,
x1,y1,z1, x2,y2,z2, x3,y3,z3,
nx,ny,nz, # QUADRATURE-POINT coordinates on the face
Et_x,Et_y,Et_z, Ht_x,Ht_y,Ht_z
Returns:
rQ (Q,3) quad-point positions (nx,ny,nz)
EtQ (Q,3) modal tangential E at quad points
HtQ (Q,3) modal tangential H at quad points
w_phys (Q,) physical weights = (2*A_face) * W_ref[quad_idx]
n_face (Q,3) per-quad face unit normal (same per face),
flipped to point 'into device' if provided
"""
needed = ["exc_face_id","local_face_idx","quad_idx",
"x1","y1","z1","x2","y2","z2","x3","y3","z3",
"nx","ny","nz","Et_x","Et_y","Et_z","Ht_x","Ht_y","Ht_z"]
for col in needed:
if col not in df.columns:
raise ValueError(f"Missing column in CSV: {col}")
rQ = df[["nx","ny","nz"]].to_numpy(float)
EtQ = df[["Et_x","Et_y","Et_z"]].to_numpy(float)
HtQ = df[["Ht_x","Ht_y","Ht_z"]].to_numpy(float)
w_phys = np.zeros(len(df), float)
n_face = np.zeros((len(df), 3), float)
# normalize reference direction (into device) if provided
u_enf = np.array([0,0,1])
# group by face
groups = {}
for i, row in df.iterrows():
key = (int(row["exc_face_id"]), int(row["local_face_idx"]))
groups.setdefault(key, []).append(i)
for key, idxs in groups.items():
qidxs = df.loc[idxs, "quad_idx"].to_numpy(int)
Nq = int(qidxs.max() + 1)
if Nq == 6:
Wref = G6_W
elif Nq == 9:
Wref = G9_W
else:
raise ValueError(f"Face {key} has {Nq} quad points; expected 6 or 9.")
row0 = df.loc[idxs[0]]
v0 = np.array([row0["x1"], row0["y1"], row0["z1"]], float)
v1 = np.array([row0["x2"], row0["y2"], row0["z2"]], float)
v2 = np.array([row0["x3"], row0["y3"], row0["z3"]], float)
A = tri_area(v0, v1, v2)
n = tri_normal(v0, v1, v2)
if u_enf is not None and np.dot(n, u_enf) < 0:
n = -n
w_face = (2.0 * A) * Wref[qidxs] # physical quad weights
w_phys[idxs] = w_face
n_face[idxs, :] = n
# Optional quick check: per-face sum weights ≈ area
# import numpy as _np
# _np.testing.assert_allclose(w_face.sum(), A, rtol=1e-10, atol=1e-12)
return rQ, EtQ, HtQ, w_phys, n_face
Pi = np.pi
MEGA = 1e6
Vo = c0
def time_modulation_inc(flag, t, t0, khat, r, r0, freq_m, Emag, Hmag, tau):
omega = 2.0 * Pi * freq_m * MEGA
t = np.asarray(t, float)
if flag == 0:
phase = - omega * t[:, None]
IncE = Emag * np.sin(phase)
IncH = Hmag * np.sin(phase)
elif flag == 1:
Exponent = t[:, None] - t0
CosMod = np.cos(omega * (t[:, None] - t0))
env = np.exp(-(Exponent**2) / (tau*tau))
IncE = Emag * CosMod * env
IncH = Hmag * CosMod * env
elif flag == 2:
Exponent = t[:, None] - t0
Neuman = (2.0 * Exponent) / (tau*tau)
env = np.exp(-(Exponent**2) / (tau*tau))
IncE = Emag * Neuman * env
IncH = Hmag * Neuman * env
else:
raise ValueError("Unknown TimeDistributionFlag")
return IncE, IncH
# =========================
# Main
# =========================
# ---- parse log ----
sim = extract_selected_log_values(LOG_PATH)
port = parse_port_from_log(LOG_PATH)
dt_sample = sim["dt_sample"]
steps = sim["tsPerSampling"]
# Sampling frequency f_s = 1 / dt
f_s = 1.0 / dt_sample
# Nyquist frequency
f_nyq = f_s / 2.0
# Count .curJ files
nfiles = len(fnmatch.filter(os.listdir(PROBE_FOLDER), '*.csv'))
nfiles = int(nfiles)
print('Number of time domain files:', nfiles)
print("\n---- Parsed sim/log ----")
print(f"FinalTime = {sim['FinalTime']}")
print(f"dt_sample = {dt_sample}")
print(f"steps = {steps}")
print("\n---- Parsed PORT ----")
for k, v in port.items():
print(f"{k:>6} : {v}")
# Number of FFT samples
NFFT = nfiles * 5
# Time axis from sampling
t_idx = np.arange(nfiles, dtype=int) # 0..steps-1
t_sec = t_idx * dt_sample # seconds
# ---- load CSV ----
df = pd.read_csv(CSV_PATH)
# ---- build quad geometry/weights/normals ----
rQ, EtQ, HtQ, w_phys, n_face = build_port_quadrature_from_csv(df)
# ---- apply time modulation at quad points ----
Emag = port["Emag"]
# TEM-consistent option: Hmag = Emag / port["Z"]
Hmag = Emag
IncE, IncH = time_modulation_inc(
port["Tdist"], t_sec, port["t0"], port["khat"],
rQ, port["ro"], port["fMHz"], Emag, Hmag, port["tau"]
) # (T,Q)
# build vector fields (T,Q,3)
E_inc = IncE[:, :, None] * EtQ[None, :, :]
H_inc = IncH[:, :, None] * HtQ[None, :, :]
# ---- time-domain instantaneous & average power (surface Poynting) ----
S_t = np.cross(E_inc, H_inc) # (T,Q,3)
S_n_t = np.einsum('tqj,qj->tq', S_t, n_face) # project onto normal
P_t = np.sum(S_n_t * w_phys[None, :], axis=1) # (T,)
P_timeavg_W = float(np.mean(P_t))
print(f"\n[Port] Time-average incident power: {P_timeavg_W:.6e} W")
# ---- frequency-domain per-bin power (single-sided) ----
freqs = np.fft.rfftfreq(NFFT, d=dt_sample) # Hz
freqs_ghz = freqs * 1e-9
E_f = np.fft.rfft(E_inc, n=NFFT, axis=0) / NFFT # (F,Q,3)
H_f = np.fft.rfft(H_inc, n=NFFT, axis=0) / NFFT # (F,Q,3)
Cross_F = np.cross(E_f, np.conj(H_f)) # (F,Q,3) note H*
S_n_F = np.einsum('fqj,qj->fq', Cross_F, n_face) # (F,Q)
Sum_F = np.sum(S_n_F * w_phys[None, :], axis=1) # (F,) complex
scale = np.ones_like(Sum_F, float)
if scale.size > 2:
scale[1:-1] = 2.0 # single-sided doubling
P_per_freq_W = scale * np.real(Sum_F) # (F,)
P_total_bins_W = float(np.sum(P_per_freq_W))
print(f"[Port] Sum of per-frequency power: {P_total_bins_W:.6e} W")
if P_timeavg_W != 0:
rel = abs(P_total_bins_W - P_timeavg_W)/abs(P_timeavg_W)
print(f"[check] rel diff sum(P_k) vs <P(t)> = {rel:.3e}")
# ---- save outputs ----
# spectrum CSV
pd.DataFrame({
"f_Hz": freqs,
"f_GHz": freqs_ghz,
"P_W": P_per_freq_W
}).to_csv(f"{SAVE_PREFIX}_power_spectrum.csv", index=False)
# spectrum PNG
plt.figure(figsize=(9,5))
plt.plot(freqs_ghz, P_per_freq_W, lw=1.4)
plt.grid(True)
plt.xlabel("Frequency (GHz)")
plt.ylabel("Port power per bin (W)")
plt.title("Incident Port Power Spectrum (surface Poynting, single-sided)")
plt.tight_layout()
plt.savefig(f"{SAVE_PREFIX}_power_spectrum.png", dpi=180)
plt.close()
# time trace CSV
pd.DataFrame({"t_s": t_sec, "P_inst_W": P_t}).to_csv(f"{SAVE_PREFIX}_power_time_trace.csv", index=False)
print("\nSaved:")
print(f" - {SAVE_PREFIX}_power_spectrum.csv / .png")
print(f" - {SAVE_PREFIX}_power_time_trace.csv")
# ======= Area-weighted average port fields vs time (add after E_inc/H_inc) =======
A_total = np.sum(w_phys) # total port area
# Area-weighted averages of the *vector* fields at each time sample
# Shapes: (T,3)
E_avg_t = np.sum(E_inc * w_phys[None, :, None], axis=1) / A_total
H_avg_t = np.sum(H_inc * w_phys[None, :, None], axis=1) / A_total
# Magnitudes vs time
E_avg_mag = np.linalg.norm(E_avg_t, axis=1) # (T,)
H_avg_mag = np.linalg.norm(H_avg_t, axis=1) # (T,)
# Save to CSV
df_avg = pd.DataFrame({
"t_s": t_sec,
"Eavg_x": E_avg_t[:, 0], "Eavg_y": E_avg_t[:, 1], "Eavg_z": E_avg_t[:, 2],
"Eavg_mag": E_avg_mag,
"Havg_x": H_avg_t[:, 0], "Havg_y": H_avg_t[:, 1], "Havg_z": H_avg_t[:, 2],
"Havg_mag": H_avg_mag,
})
df_avg.to_csv(f"{SAVE_PREFIX}_avg_field_time.csv", index=False)
# Plot magnitudes vs time
# Plot E and H magnitudes vs time with separate axes
fig, ax1 = plt.subplots(figsize=(9,5))
color_e = "tab:blue"
ax1.set_xlabel("Time (s)")
ax1.set_ylabel("|E_avg(t)|", color=color_e)
ax1.plot(t_sec, E_avg_mag, color=color_e, lw=1.4)
ax1.tick_params(axis="y", labelcolor=color_e)
# Twin y-axis for H
ax2 = ax1.twinx()
color_h = "tab:red"
ax2.set_ylabel("|H_avg(t)|", color=color_h)
ax2.plot(t_sec, H_avg_mag, color=color_h, lw=1.4, linestyle="--")
ax2.tick_params(axis="y", labelcolor=color_h)
plt.title("Area-weighted average tangential fields vs time")
fig.tight_layout()
plt.savefig(f"{SAVE_PREFIX}_avg_field_time.png", dpi=180)
plt.close(fig)
print(f" - {SAVE_PREFIX}_avg_field_time.csv / .png")