钠中气泡探测器信号处理方法与系统研制【附程序】
✨ 长期致力于快堆、蒸汽发生器、泄漏检测、钠水反应、钠中气泡探测器、涡街电磁感应原理、峰峰值标准差、相关系数、频率波动系数、能量比值研究工作,擅长数据搜集与处理、建模仿真、程序编写、仿真设计。
✅ 专业定制毕设、代码
✅如需沟通交流,点击《获取方式》
(1)基于峰峰值标准差的钠中气泡探测信号处理方法:
钠中气泡探测器基于涡街电磁感应原理,当钠流过传感器时,无气泡情况下输出稳定的正弦信号,气泡经过时引起信号幅值调制。提出一种利用峰峰值标准差作为检测指标的算法。对传感器输出信号进行带通滤波(中心频率对应涡街频率,带宽±20%),提取每个周期的峰峰值(Vpp_i),计算连续N个周期(N=128)的峰峰值标准差σ_pp。在无气泡条件下,σ_pp稳定在0.02V以下;当有气泡通过时,σ_pp跃升至0.1V以上。设置阈值0.05V作为报警门限。在钠回路实验台架上进行测试,钠流量2.5m^3/h,注气流量0.66L/min(对应水泄漏率0.1g/s),σ_pp从基线0.015V上升到0.18V,检测响应时间小于0.5秒。在不同钠流量(2.0~3.5 m^3/h)下,阈值自适应调整为流量的函数:thr = 0.02 + 0.012*Q_Na。算法在DSP变送器上实时实现,每10ms输出一次检测结果,CPU占用率仅15%。
(2)基于相关系数和频率波动系数的双域联合检测:
为增强检测可靠性,融合时域相关系数和频域频率波动系数。相关系数方法:取相邻两个周期信号,计算归一化互相关系数ρ。无气泡时ρ接近0.98;有气泡时波形畸变,ρ降至0.6以下。频率波动系数方法:对信号进行过零检测,计算每10个周期的平均频率f_avg,定义频率波动系数CF = std(f_i)/f_avg。无气泡时CF<0.005;有气泡时CF>0.02。联合判定逻辑:当σ_pp>thresh_pp OR ρ<0.7 OR CF>0.015时触发预报警,三选二触发主报警。在钠中注气实验(0.66L/min)中,联合检测的误报率(无气泡时误触发)从单一指标的2.3%降至0.1%,检测成功率从94%提升至99.5%。在变流量动态过程中(流量从2.0变化到3.0m^3/h,变化率0.1m^3/h/s),三个指标均未出现误报,说明算法对流量变化不敏感。
(3)能量比值法基于信号组成成分分析:
将传感器输出信号建模为s(t)=A*sin(ωt+φ)+B*noise(t)+C*bubble(t),其中bubble(t)是气泡引起的冲击噪声。定义信号能量比值ER = (总能量 - 基频能量) / 基频能量。基频能量通过离散傅里叶变换在涡街频率±5Hz内计算。无气泡时ER<0.05;有气泡时ER升至0.2以上。该方法的优势是对气泡尺寸不敏感,能够检测微小气泡(直径<1mm)。在钠实验回路中,最小可检测注气流量为0.66L/min,对应水泄漏率0.1g/s,与峰峰值标准差法一致。将能量比值与频率波动系数融合,构建二维特征空间,使用支持向量机分类器(线性核),训练样本1000个(各500正负),测试准确率99.2%。在DSP平台上实现时,能量计算采用加窗FFT(汉宁窗,每1024点计算一次),计算周期50ms,满足实时性。所有四种信号处理方法均集成到探测器变送器样机中,用户可通过拨码开关选择算法模式。
import numpy as np from scipy.signal import butter, filtfilt, correlate from scipy.fft import fft, fftfreq class BubbleDetector: def __init__(self, fs=10000, flow_rate=2.5): self.fs = fs self.flow = flow_rate self.buffer = np.zeros(4096) self.bp_filter = self._bandpass(20, 200) def _bandpass(self, low, high): nyq = self.fs/2 b, a = butter(4, [low/nyq, high/nyq], btype='band') return b, a def _peak_peak_std(self, sig): peaks, _ = self._find_peaks(sig) if len(peaks) < 2: return 0 vpp = np.array([np.max(sig[p-10:p+10]) - np.min(sig[p-10:p+10]) for p in peaks[1:-1]]) return np.std(vpp) def _find_peaks(self, sig): # simplified zero-crossing based peak detection zero_cross = np.where(np.diff(np.sign(sig)))[0] peaks = zero_cross[::2] + (zero_cross[1::2]-zero_cross[::2])//2 return peaks, None def _correlation_coeff(self, sig): period = int(self.fs / 50) # assume 50Hz vortex freq if len(sig) < 2*period: return 0 seg1 = sig[:period] seg2 = sig[period:2*period] corr = correlate(seg1, seg2, mode='valid') return np.max(corr) / (np.std(seg1)*np.std(seg2)*len(seg1)) def _frequency_fluctuation(self, sig): zero_crossings = np.where(np.diff(np.sign(sig)))[0] periods = np.diff(zero_crossings) / self.fs freqs = 1.0 / periods if len(freqs) < 5: return 0 return np.std(freqs) / np.mean(freqs) def _energy_ratio(self, sig): fft_vals = fft(sig) freqs = fftfreq(len(sig), 1/self.fs) vortex_freq = 50 # estimated from flow rate idx_center = np.argmin(np.abs(freqs - vortex_freq)) idx_range = 5 total_energy = np.sum(np.abs(fft_vals)**2) signal_energy = np.sum(np.abs(fft_vals[idx_center-idx_range:idx_center+idx_range])**2) return (total_energy - signal_energy) / (signal_energy + 1e-8) def detect(self, new_samples): self.buffer = np.roll(self.buffer, -len(new_samples)) self.buffer[-len(new_samples):] = new_samples filtered = filtfilt(*self.bp_filter, self.buffer) pp_std = self._peak_peak_std(filtered) corr_coef = self._correlation_coeff(filtered) freq_cf = self._frequency_fluctuation(filtered) energy_r = self._energy_ratio(filtered) alarm = 0 if pp_std > (0.02 + 0.012*self.flow) or corr_coef < 0.7 or freq_cf > 0.015: alarm = 1 if (pp_std > (0.02 + 0.012*self.flow)) and (corr_coef < 0.7 or freq_cf > 0.015): alarm = 2 # main alarm return alarm, (pp_std, corr_coef, freq_cf, energy_r) class SVMClassifier: def __init__(self): self.w = None self.b = 0 def train(self, X, y, C=1.0): n, d = X.shape # simplified linear SVM solver (not full SMO) self.w = np.linalg.lstsq(X + np.eye(d)*C, y, rcond=None)[0] self.b = np.mean(y - X @ self.w) def predict(self, X): return np.sign(X @ self.w + self.b) if __name__ == '__main__': detector = BubbleDetector() fake_signal = np.sin(2*np.pi*50*np.arange(0, 0.5, 0.0001)) + 0.1*np.random.randn(5000) alarm, metrics = detector.detect(fake_signal) print(f'Alarm level: {alarm}, metrics: {metrics}')