当前位置: 首页 > news >正文

Python读取光谱仪数据的完整代码示例

一、前言

上一篇文章我们介绍了光谱仪的工作原理,本文将手把手教你用 Python 连接辰昶光纤光谱仪,实现光谱数据的实时读取、显示和保存。

本文亮点

  • ✅ 完整的设备连接代码
  • ✅ 实时光谱采集
  • ✅ 数据可视化
  • ✅ 文件保存与管理
  • ✅ 错误处理机制


二、开发环境准备

2.1 硬件要求

  • 辰昶光纤光谱仪(本文以 EQ2000/ER4000 系列为例)
  • USB 数据线或以太网连接
  • 光纤跳线(推荐 SMA905 接口)

2.2 软件依赖

pip install numpy pandas matplotlib pyserial pyusb

依赖说明

包名用途
numpy数值计算
pandas数据存储
matplotlib光谱可视化
pyserial串口通信
pyusbUSB设备连接

三、基础连接代码

3.1 USB连接方式

import numpy as np import matplotlib.pyplot as plt from matplotlib.animation import FuncAnimation import time class ChopticsSpectrometer: """ 辰昶光纤光谱仪 Python 控制类 支持 EQ2000、ER4000、EK2000 Pro 系列 """ def __init__(self, serial_port='COM3', baudrate=115200): self.serial_port = serial_port self.baudrate = baudrate self.is_connected = False self.serial = None def connect(self): """连接光谱仪""" import serial try: self.serial = serial.Serial( port=self.serial_port, baudrate=self.baudrate, bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, timeout=1.0 ) self.is_connected = True print(f"✅ 光谱仪已连接: {self.serial_port}") # 获取设备信息 self._get_device_info() return True except serial.SerialException as e: print(f"❌ 连接失败: {e}") return False def _get_device_info(self): """获取设备基本信息""" # 发送查询命令(具体命令格式请参考辰昶通信协议文档) self.serial.write(b'*IDN?\r\n') response = self.serial.readline().decode('utf-8').strip() print(f"设备信息: {response}") def set_integration_time(self, time_ms): """ 设置积分时间(毫秒) 辰昶光谱仪范围: 1ms ~ 65000ms """ command = f'INTEGRATION,{time_ms}\r\n' self.serial.write(command.encode()) response = self.serial.readline().decode('utf-8').strip() print(f"积分时间设置: {response}") def acquire_spectrum(self): """ 获取单次光谱数据 返回: (wavelengths, intensities) """ if not self.is_connected: raise RuntimeError("请先连接光谱仪") # 发送采集命令 self.serial.write(b'SPECTRUM\r\n') # 读取波长数据 num_pixels = 2048 # EQ2000系列 wavelength_data = [] for _ in range(num_pixels): line = self.serial.readline().decode('utf-8').strip() if line: wavelength_data.append(float(line)) # 读取强度数据 intensity_data = [] for _ in range(num_pixels): line = self.serial.readline().decode('utf-8').strip() if line: intensity_data.append(float(line)) return np.array(wavelength_data), np.array(intensity_data) def close(self): """关闭连接""" if self.serial and self.serial.is_open: self.serial.close() self.is_connected = False print("🔌 光谱仪连接已关闭") # 使用示例 if __name__ == "__main__": spectrometer = ChopticsSpectrometer(serial_port='COM3') if spectrometer.connect(): # 设置积分时间 100ms spectrometer.set_integration_time(100) # 获取光谱 wavelengths, intensities = spectrometer.acquire_spectrum() # 绘制光谱图 plt.figure(figsize=(12, 6)) plt.plot(wavelengths, intensities, 'b-', linewidth=1) plt.xlabel('波长 (nm)', fontsize=12) plt.ylabel('强度 (counts)', fontsize=12) plt.title('光纤光谱仪实时数据', fontsize=14) plt.grid(True, alpha=0.3) plt.show() spectrometer.close()

3.2 以太网连接方式(工业级)

import socket import struct class EthernetSpectrometer: """ 以太网连接光谱仪(适合工业在线检测) 支持 TCP/IP 协议 """ def __init__(self, ip_address='192.168.1.100', port=5000): self.ip_address = ip_address self.port = port self.socket = None def connect(self): """建立TCP连接""" try: self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.socket.connect((self.ip_address, self.port)) self.socket.settimeout(5.0) print(f"✅ 已连接到 {self.ip_address}:{self.port}") return True except Exception as e: print(f"❌ 连接失败: {e}") return False def send_command(self, command): """发送命令""" self.socket.sendall((command + '\r\n').encode()) def receive_data(self, expected_bytes): """接收原始数据""" data = b'' while len(data) < expected_bytes: packet = self.socket.recv(4096) if not packet: break data += packet return data def acquire_spectrum(self): """采集光谱数据""" self.send_command('ACQUIRE') # EQ2000: 2048像素,16位 = 4096字节 raw_data = self.receive_data(4096) # 解析数据 intensities = np.array(struct.unpack(f'{2048}H', raw_data)) # 生成波长数组(根据校准参数) start_wavelength = 200 # nm end_wavelength = 1100 # nm wavelengths = np.linspace(start_wavelength, end_wavelength, 2048) return wavelengths, intensities def close(self): if self.socket: self.socket.close()

四、实时显示与数据处理

4.1 实时采集动画

class RealTimeSpectrometerDisplay: """ 实时光谱显示类 支持动态更新、峰值标记、数据统计 """ def __init__(self, spectrometer, interval=100): """ Args: spectrometer: ChopticsSpectrometer 实例 interval: 采集间隔(毫秒) """ self.spectrometer = spectrometer self.interval = interval self.spectra_history = [] # 存储历史数据 def update_plot(self, frame): """更新光谱曲线""" try: wavelengths, intensities = self.spectrometer.acquire_spectrum() # 绘制当前光谱 self.ax.clear() self.ax.plot(wavelengths, intensities, 'b-', linewidth=1.5, label='实时光谱') # 标记峰值 peak_indices, _ = find_peaks(intensities, height=1000) if len(peak_indices) > 0: peak_wavelengths = wavelengths[peak_indices] peak_intensities = intensities[peak_indices] self.ax.scatter(peak_wavelengths, peak_intensities, c='red', s=50, zorder=5, label='峰值') # 标注峰值波长 for w, i in zip(peak_wavelengths, peak_intensities): self.ax.annotate(f'{w:.1f}nm', (w, i), textcoords="offset points", xytext=(0,10), ha='center', fontsize=9) # 显示统计信息 stats_text = f'最大值: {intensities.max()}\n' stats_text += f'平均值: {intensities.mean():.1f}\n' stats_text += f'标准差: {intensities.std():.1f}' self.ax.text(0.02, 0.98, stats_text, transform=self.ax.transAxes, verticalalignment='top', bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.5)) self.ax.set_xlabel('波长 (nm)', fontsize=12) self.ax.set_ylabel('强度 (counts)', fontsize=12) self.ax.set_title('光纤光谱仪实时监测', fontsize=14) self.ax.grid(True, alpha=0.3) self.ax.legend(loc='upper right') self.ax.set_xlim(200, 1100) # 保存历史数据 self.spectra_history.append(intensities.copy()) except Exception as e: print(f"采集异常: {e}") def start(self): """启动实时显示""" fig, self.ax = plt.subplots(figsize=(14, 6)) ani = FuncAnimation(fig, self.update_plot, interval=self.interval, save_count=100) plt.show() def save_data(self, filename='spectrum_data.csv'): """保存数据到CSV""" import pandas as pd if not self.spectra_history: print("无历史数据") return df = pd.DataFrame(self.spectra_history) df.to_csv(filename, index=False) print(f"✅ 数据已保存: {filename}") # 启动实时显示 if __name__ == "__main__": spec = ChopticsSpectrometer(serial_port='COM3') spec.connect() spec.set_integration_time(50) # 50ms积分时间 display = RealTimeSpectrometerDisplay(spec, interval=100) display.start()

4.2 数据平滑与去噪

from scipy.signal import savgol_filter, medfilt from scipy.ndimage import gaussian_filter1d def preprocess_spectrum(intensities, method='sg'): """ 光谱数据预处理 Parameters: intensities: 原始光谱强度数据 method: 'sg' (Savitzky-Golay) 或 'gauss' (高斯) 或 'median' (中值) Returns: 平滑后的光谱数据 """ if method == 'sg': # Savitzky-Golay滤波器(保持峰形) return savgol_filter(intensities, window_length=11, polyorder=3) elif method == 'gauss': # 高斯平滑 return gaussian_filter1d(intensities, sigma=2) elif method == 'median': # 中值滤波(去除脉冲噪声) return medfilt(intensities, kernel_size=5) else: return intensities def remove_baseline(intensities, lambda_param=1e5): """ 基线校正(AsLS算法) 用于去除荧光背景、散射背景等 """ from scipy import sparse from scipy.sparse.linalg import spsolve n = len(intensities) L = sparse.diags([1, -2, 1], [0, -1, -2], shape=(n, n-2)) L = lambda_param * L.dot(L.transpose()) D = sparse.diags([1, -1], [0, -1], shape=(n, n)) w = np.ones(n) W = sparse.diags(w, 0) for _ in range(10): # 迭代优化 W.setdiag(w) Z = W + L baseline = spsolve(Z, w * intensities) w = 1 * (intensities > baseline) + 0.001 * (intensities <= baseline) return intensities - baseline

五、数据保存与导出

5.1 多种格式保存

import json from datetime import datetime class SpectrumDataManager: """光谱数据管理器""" def __init__(self, base_path='./data'): self.base_path = base_path self.current_session = datetime.now().strftime('%Y%m%d_%H%M%S') def save_csv(self, wavelengths, intensities, filename=None): """保存为CSV格式""" if filename is None: filename = f'spectrum_{self.current_session}.csv' filepath = os.path.join(self.base_path, filename) df = pd.DataFrame({ 'Wavelength_nm': wavelengths, 'Intensity_counts': intensities }) df.to_csv(filepath, index=False) print(f"✅ CSV已保存: {filepath}") return filepath def save_numpy(self, wavelengths, intensities, filename=None): """保存为NumPy格式(保留高精度)""" if filename is None: filename = f'spectrum_{self.current_session}.npz' filepath = os.path.join(self.base_path, filename) np.savez(filepath, wavelengths=wavelengths, intensities=intensities) print(f"✅ NumPy已保存: {filepath}") return filepath def save_with_metadata(self, wavelengths, intensities, metadata=None): """ 保存带元数据的光谱文件 包含仪器参数、测量条件等信息 """ if metadata is None: metadata = {} # 元数据 metadata.update({ 'timestamp': datetime.now().isoformat(), 'instrument': '辰昶光纤光谱仪', 'num_points': len(wavelengths), 'wavelength_range': f'{wavelengths.min():.2f}-{wavelengths.max():.2f}nm' }) filename = f'spectrum_{self.current_session}_meta.json' filepath = os.path.join(self.base_path, filename) data = { 'metadata': metadata, 'wavelengths': wavelengths.tolist(), 'intensities': intensities.tolist() } with open(filepath, 'w', encoding='utf-8') as f: json.dump(data, f, indent=2, ensure_ascii=False) print(f"✅ 带元数据文件已保存: {filepath}") return filepath

六、工业级应用示例

6.1 在线浓度监测

class ConcentrationMonitor: """ 在线浓度监测系统 基于Beer-Lambert定律: A = ε·c·l """ def __init__(self, spectrometer, wavelength_absorbance): self.spectrometer = spectrometer self.wavelength_absorbance = wavelength_absorbance self.calibration_curve = {} # 浓度标定曲线 def set_calibration(self, concentrations, absorbances): """ 设置标定曲线 concentrations: 标准浓度列表 absorbances: 对应的吸光度值 """ from scipy.optimize import curve_fit def linear(x, a, b): return a * x + b popt, _ = curve_fit(linear, concentrations, absorbances) self.calibration_curve = {'a': popt[0], 'b': popt[1]} print(f"标定完成: A = {popt[0]:.4f}×C + {popt[1]:.4f}") def measure_concentration(self): """测量当前浓度""" wavelengths, intensities = self.spectrometer.acquire_spectrum() # 计算吸光度(需要参考光谱) # A = log10(I_ref / I_sample) absorbance = np.log10(self.reference_intensity / intensities) # 在指定波长处读取吸光度 idx = np.argmin(np.abs(wavelengths - self.wavelength_absorbance)) A = absorbance[idx] # 根据标定曲线计算浓度 if self.calibration_curve: C = (A - self.calibration_curve['b']) / self.calibration_curve['a'] return C, A return None, A

七、常见问题与解决方案

问题可能原因解决方案
连接超时串口被占用检查端口号,更换USB口
采集数据为0光纤未连接检查光纤接口,确保光路导通
噪声过大积分时间过短增加积分时间,或使用平均采集
基线漂移环境温度变化使用基线校正算法
通信不稳定USB供电不足使用带供电的USB集线器
💡辰昶仪器提示:辰昶光谱仪提供完整的SDK开发包和技术支持,包括Python、LabVIEW、C#、Java等多语言接口。

八、总结

本文提供了完整的 Python 光谱仪数据读取方案:

  1. 基础连接类- 支持USB和以太网两种连接方式
  2. 实时显示- 动态更新、峰值标记、统计信息
  3. 数据预处理- 平滑、去噪、基线校正
  4. 多格式保存- CSV、NumPy、带元数据的JSON
  5. 工业应用- 浓度监测、在线分析示例
http://www.jsqmd.com/news/978066/

相关文章:

  • 2026年q2达州门窗定制厂家实测评测:达州家装门窗设计/达州封窗/达州断桥铝门窗/谁更靠谱 - 优质品牌商家
  • 基于深度学习YOLOv8的白细胞类型检测系统(YOLOv8+YOLO数据集+UI界面+Python项目源码+模型)
  • 终极SPT-AKI存档编辑器完全指南:简单快速修改你的单机塔科夫存档 [特殊字符]
  • 后 | 室 Backrooms
  • 实战指南 | 企业Geo运营方法论:AI搜索优化实战指南
  • 告别混乱:用Apollo配置中心统一管理Spring Boot多环境配置(附Idea/Eclipse实战)
  • 2026年新能源类本科院校技术办学实力实测与推荐:航空办学特色大学推荐/航空航天类大学推荐/优选推荐 - 优质品牌商家
  • Java final 关键字精讲:变量、方法与类的终极约束
  • 30岁的女人适合考个什么证
  • MyBatis-Plus 分页查询实战
  • 面向对象设计(OOP)核心思想与 Java 实践总结
  • 食品异物赔偿协商录音泄露,舆情处置时沟通记录别踩坑
  • 丰田电动SUV热销,为何此时却放缓电动化步伐?
  • 2026 推荐|OpenClaw 全平台部署包,Windows/Mac 通用
  • 河南工科类院校技术维度实测:安阳工学院核心竞争力解析 - 优质品牌商家
  • 掌握Agent技术,抢占高薪先机!小白程序员必备收藏指南
  • FinalShell密码忘了别慌!手把手教你从本地文件找回服务器密码(附Java解密脚本)
  • 2026年企业门户管理平台推荐
  • 别再只用v-if了!用Vue3自定义指令实现这3个超实用的业务场景(附完整代码)
  • 2026年迪拜公司注册权威机构排行:危险化学品许可证/吉尔吉斯斯坦公司注册/哈萨克斯坦公司注册/合规服务对比 - 优质品牌商家
  • 深度学习泛化性的几何视角与嵌入空间分析
  • 小白程序员必备!3个月从零掌握大模型,附收藏版AI学习路线图
  • OpenClaw 一键部署包|内置全部依赖,开箱即用
  • 2026年汽车贴膜性价比哪家高? - myqiye
  • RepoDoc:用知识图谱重构代码文档生成与增量更新
  • CAS 为什么效率高?
  • 【RT-DETR实战】168、交通监控综合项目:跟踪与计数功能扩展实战手记
  • 磁力链接转种子文件:Magnet2Torrent完整指南与核心技术解析
  • 前端超能力:让浏览器听你指挥——技术基石:Web API 的“听觉”与“理解”能力
  • 别再硬啃原生小程序了!用Vue语法+Uni-app快速搞定微信登录注册(附SpringBoot后端接口设计思路)