用Python+PyModbus模拟一个Modbus RTU从站:从功能码到数据帧的完整实战
用Python+PyModbus构建Modbus RTU从站:从协议解析到实战调试
在工业自动化领域,Modbus RTU协议因其简单可靠的特点,已成为连接PLC、传感器和上位机的通用语言。但对于开发者而言,仅理解协议规范远远不够——当需要模拟设备行为、测试主站程序或排查通信故障时,一个可编程的虚拟从站能极大提升工作效率。本文将带您用Python的PyModbus库,从零构建支持8种核心功能码的Modbus RTU从站,并通过Wireshark抓包分析真实数据流,掌握协议实现的每个技术细节。
1. 环境搭建与基础配置
1.1 PyModbus库安装与虚拟串口设置
首先通过pip安装最新版PyModbus(3.5.0+)及其依赖:
pip install pymodbus==3.5.0 pyserial==3.5为模拟真实硬件环境,建议使用虚拟串口工具创建端口对。在Windows上可以使用com0com:
# 验证串口可用性 import serial.tools.list_ports print([port.device for port in serial.tools.list_ports.comports()])1.2 从站基础框架搭建
PyModbus提供了同步和异步两种API,我们选择更易上手的同步版本。以下代码创建了一个支持RTU模式的从站实例:
from pymodbus.server import StartSerialServer from pymodbus.datastore import ModbusSequentialDataBlock from pymodbus.datastore import ModbusSlaveContext, ModbusServerContext # 初始化数据存储区 coils = ModbusSequentialDataBlock(0, [False]*100) # 线圈状态 discrete_inputs = ModbusSequentialDataBlock(0, [True]*100) # 离散输入 holding_registers = ModbusSequentialDataBlock(0, [0]*100) # 保持寄存器 input_registers = ModbusSequentialDataBlock(0, [0]*100) # 输入寄存器 # 创建从站上下文 slave_context = ModbusSlaveContext( di=discrete_inputs, co=coils, hr=holding_registers, ir=input_registers ) context = ModbusServerContext(slaves=slave_context, single=True) # 启动RTU从站 StartSerialServer( context=context, port='COM2', # 虚拟串口 framer=ModbusRtuFramer, baudrate=19200, timeout=0.005 )2. 核心功能码实现解析
2.1 读操作功能码(0x01-0x04)
四种读操作虽然对象不同,但处理逻辑高度相似。以0x03读保持寄存器为例,观察实际数据交互:
主站请求帧(十六进制表示):
01 03 00 00 00 03 05 CB01:从站地址03:功能码00 00:起始地址00 03:寄存器数量05 CB:CRC校验
从站响应帧:
01 03 06 00 21 00 00 00 00 9D 7206:返回字节数(3寄存器×2字节)00 21等:寄存器值(大端格式)
在代码中可以通过自定义数据块实现动态响应:
class DynamicDataBlock(ModbusSequentialDataBlock): def getValues(self, address, count=1): # 模拟温度传感器:地址0返回随机温度值 if address == 0: return [random.randint(200, 250)] return super().getValues(address, count)2.2 写操作功能码(0x05-0x10)
单线圈写入(0x05)的请求/响应帧完全一致,这是协议设计的特殊之处:
01 05 00 00 FF 00 8C 3AFF 00表示置位,00 00表示复位
对于多寄存器写入(0x10),需要注意字节序处理。以下代码演示如何添加写入回调:
def on_write(context, slave, address, values): print(f"寄存器 {address} 被修改为: {values}") slave_context = ModbusSlaveContext( # ...其他参数... write_callback=on_write )3. 高级调试技巧
3.1 Wireshark抓包分析配置
使用Wireshark分析Modbus RTU需要特殊配置:
- 安装USBPcap捕获USB转串口流量
- 设置显示过滤器:
modbus || crc16 - 关键字段解析技巧:
- 帧间隔(3.5字符时间)是RTU模式的重要特征
- CRC校验错误通常表现为
[Malformed Packet]
3.2 异常响应处理
PyModbus默认会处理异常情况,但有时需要自定义异常码:
from pymodbus.exceptions import ModbusException from pymodbus.pdu import ExceptionResponse def handle_request(request, client): try: return request.execute(context) except ModbusException as exc: return ExceptionResponse(request.function_code, exc.exception_code)常见异常码:
| 代码 | 含义 | 触发场景 |
|---|---|---|
| 0x01 | 非法功能码 | 接收到未实现的功能码 |
| 0x02 | 非法数据地址 | 访问不存在的寄存器地址 |
| 0x03 | 非法数据值 | 写入值超出允许范围 |
4. 实战案例:模拟温度控制器
综合应用各功能码,实现一个带报警功能的虚拟温度控制器:
class TemperatureController: def __init__(self): self.temp_registers = [0]*10 self.alarm_coil = False def update(self): # 寄存器0:当前温度(模拟值) self.temp_registers[0] = random.randint(180, 300) # 寄存器1:高温阈值(可写) if self.temp_registers[0] > self.temp_registers[1]: self.alarm_coil = True # 寄存器2:低温阈值(可写) if self.temp_registers[0] < self.temp_registers[2]: self.alarm_coil = True # 集成到从站 controller = TemperatureController() def custom_read(context, slave, address, count): if address == 0: # 温度寄存器区 controller.update() return controller.temp_registers[address:address+count] return context[slave].getValues(address, count)通过Modbus Poll等测试工具,可以观察到:
- 保持寄存器1/2可读写(设置阈值)
- 线圈0反映报警状态
- 输入寄存器0实时变化(温度值)
