当前位置: 首页 > news >正文

用Python和pymodbus库模拟Modbus RTU主从通信(附完整代码与数据帧解析)

用Python和pymodbus库模拟Modbus RTU主从通信(附完整代码与数据帧解析)

在工业自动化领域,Modbus RTU协议因其简单可靠的特点,成为连接PLC、传感器和上位机系统的通用语言。本文将带您从零开始,使用Python的pymodbus库构建完整的Modbus RTU通信测试环境,通过代码实例深入解析协议底层的数据帧结构。

1. 环境搭建与基础配置

1.1 虚拟串口工具配置

在开始编码前,我们需要模拟物理串口环境。Windows平台推荐使用com0com虚拟串口工具,Linux/Mac可使用socat命令创建虚拟串口对:

# Linux/Mac虚拟串口创建 socat -d -d pty,raw,echo=0 pty,raw,echo=0

安装Python环境依赖:

pip install pymodbus==3.0.0 serial

注意:pymodbus 3.x版本重构了异步IO支持,本文示例基于同步API以保证兼容性

1.2 协议基础参数设置

Modbus RTU的关键参数需要主从设备严格一致:

参数典型值说明
波特率19200常见工业设备标准速率
数据位8固定配置
停止位1常见配置
校验位None/Even根据设备要求设置
响应超时1秒等待从站响应的最长时间

2. 构建Modbus从站模拟器

2.1 初始化从站服务

from pymodbus.server.sync import StartSerialServer from pymodbus.datastore import ModbusSequentialDataBlock from pymodbus.datastore import ModbusSlaveContext, ModbusServerContext def run_slave(): store = ModbusSlaveContext( di=ModbusSequentialDataBlock(0, [0]*100), # 离散输入 co=ModbusSequentialDataBlock(0, [0]*100), # 线圈 hr=ModbusSequentialDataBlock(0, [0]*100), # 保持寄存器 ir=ModbusSequentialDataBlock(0, [0]*100)) # 输入寄存器 context = ModbusServerContext(slaves=store, single=True) StartSerialServer( context, port='/dev/ttyS1', # 虚拟串口设备 baudrate=19200, timeout=1)

2.2 数据帧结构验证

当主站发送读取保持寄存器请求(功能码0x03)时,从站响应的原始数据帧可通过Wireshark捕获分析:

响应帧示例: 01 03 02 00 0A 45 CD
  • 01:从站地址
  • 03:功能码
  • 02:数据字节数
  • 00 0A:寄存器值(十进制10)
  • 45 CD:CRC校验码

3. 实现Modbus主站功能

3.1 同步读取操作实例

from pymodbus.client.sync import ModbusSerialClient client = ModbusSerialClient( method='rtu', port='/dev/ttyS0', baudrate=19200, timeout=1) # 读取保持寄存器 result = client.read_holding_registers( address=0, # 起始地址 count=5, # 寄存器数量 unit=1) # 从站地址 print("寄存器值:", result.registers)

3.2 数据帧构造原理

底层请求帧的构建过程:

def build_read_frame(slave_id, function_code, address, count): # 构造PDU pdu = bytes([function_code]) + \ address.to_bytes(2, 'big') + \ count.to_bytes(2, 'big') # 添加CRC校验 crc = compute_crc(bytes([slave_id]) + pdu) return bytes([slave_id]) + pdu + crc.to_bytes(2, 'little') # 示例:构造读取保持寄存器请求 frame = build_read_frame( slave_id=1, function_code=0x03, address=0, count=5) print("原始请求帧:", frame.hex())

4. 高级功能实现与调试

4.1 批量写入操作

# 写入多个保持寄存器(功能码0x10) values = [10, 20, 30, 40] result = client.write_registers( address=0, values=values, unit=1) if result.isError(): print("写入失败:", result) else: print("写入成功")

对应的请求帧解析:

01 10 00 00 00 04 08 00 0A 00 14 00 1E 00 28 42 1F
  • 00 00:起始地址
  • 00 04:寄存器数量
  • 08:数据字节长度
  • 后续8字节为4个寄存器的值

4.2 异常处理机制

Modbus协议定义的异常响应格式:

字段示例值说明
从站地址01
异常功能码83原功能码+0x80
异常码02非法数据地址

捕获异常响应的代码实现:

try: response = client.read_coils(0, 10, unit=1) if response.isError(): print("Modbus异常码:", response.exception_code) except Exception as e: print("通信错误:", str(e))

5. 协议分析工具链

5.1 使用Wireshark解码Modbus RTU

配置步骤:

  1. 安装Wireshark并启用串口捕获
  2. 添加Modbus RTU解析器
  3. 过滤语法:modbus || modbusadu

典型数据包分析:

Frame 1 (Request): 0000 01 03 00 00 00 02 44 09 ......D. Frame 2 (Response): 0000 01 03 04 00 0a 00 14 f5 33 ......ó3

5.2 自定义数据监控工具

import matplotlib.pyplot as plt from collections import deque class ModbusMonitor: def __init__(self, max_points=50): self.data = deque(maxlen=max_points) def update(self, values): self.data.extend(values) plt.clf() plt.plot(self.data) plt.pause(0.01) # 使用示例 monitor = ModbusMonitor() while True: result = client.read_holding_registers(0, 5, unit=1) monitor.update(result.registers)

6. 性能优化实践

6.1 多寄存器读取优化

# 批量读取策略对比 single_read_time = timeit.timeit( lambda: client.read_holding_registers(0, 1, unit=1), number=100) batch_read_time = timeit.timeit( lambda: client.read_holding_registers(0, 10, unit=1), number=100) print(f"单点读取耗时: {single_read_time:.3f}s") print(f"批量读取耗时: {batch_read_time:.3f}s")

6.2 连接池管理

from pymodbus.client.sync import ModbusConnectionPool pool = ModbusConnectionPool( client_class=ModbusSerialClient, max_size=5, **client_params) def safe_read(address): with pool.acquire() as conn: return conn.read_holding_registers(address, 1)

在实际项目中,通过合理设置超时时间和重试机制,可以将通信成功率提升至99.9%以上。一个常见的经验是,对于19200波特率的网络,单个请求的超时时间不应低于300ms。

http://www.jsqmd.com/news/970008/

相关文章:

  • 实习生转正路上的踩坑与复盘:校招生工程化成长路径
  • 嵌入式汉字编码与输入法实战:从GB2312原理到MCU实现
  • 2026年广元装修市场调查:铂金精工标准下的服务力深度评测 - 优家闲谈
  • EncodingChecker:解决多语言文件编码检测的终极方案
  • RL-Kernel
  • COM3D2.MaidFiddler:解锁COM3D2实时角色编辑的强大工具
  • 一个 VS Code 插件,干翻了 GitHub 3800 个内部仓库
  • 从CCFL到RGB-LED:显示背光技术演进与色彩革命
  • 比亚迪入局机器人赛道:内部消化订单跳过商业化等待期,能否复刻电池芯片成功路径?
  • 惠州宽带安装自有师傅一对一,满意再付钱 - mougen1
  • 串口通信中0x0C清屏指令的原理与应用实践
  • 从0到1搭建CSDN AI内容获客体系:3步建模、7天冷启动、22天实现线索成本低于行业均值58%
  • Xiaomi Miot Auto本地模式终极解决方案:深度解析离线运行疑难
  • 软件过程与管理知识回顾1 -
  • 告别依赖地狱:手把手教你用AppImage在Ubuntu 22.04上安装最新版Neovim(附FUSE问题解决)
  • 2026 无锡锡山区漏水维修攻略|苏易修缮推荐:卫生间/阳台/外墙/屋顶/地下室漏水|靠谱防水门店推荐 - 苏易修缮
  • AMD Ryzen硬件调试终极指南:SMUDebugTool专业使用手册
  • Thought-Action-Observation闭环:AI工程化协作的核心范式
  • 046、NPU的利用率:如何避免计算单元空闲?
  • 华强北元器件分销商资源整合:从策略联盟到资本联姻的破局之路
  • 当AI学会编程——从ZeroLang到供应链攻击,开发者的护城河还剩什么?
  • SpringBoot针式打印机连续套打工具包(支持前后入纸切换与多联单据精准定位)
  • 【头部科技公司内部报告】:为什么他们把37%的数字营销预算转向CSDN AI内容池?
  • WebPlotDigitizer 4.0全功能开源包:网页运行的曲线图取数工具,带批量处理和热图生成能力
  • 工业串口抗干扰实战:从RS-232烧毁到RS-485防护电路设计
  • 点狮HRM企业级HRM薪资计算系统架构设计
  • 宠乐圈 宠物领养互助平台
  • 为什么92%的运营人买错了CSDN AI套餐?资深签约顾问亲授季度锁价黄金窗口期
  • 番茄小说下载器:终极免费工具,5大实用技巧轻松收藏小说
  • 2026年5月技术拾遗:Agent 编程语言崛起与本地推理爆发