当前位置: 首页 > news >正文

用Python和pymodbus库模拟Modbus RTU主从通信(附完整代码)

用Python和pymodbus实现Modbus RTU主从通信全流程实战

当你需要测试Modbus设备却手头没有PLC硬件时,Python的pymodbus库能帮你搭建完整的虚拟测试环境。本文将带你从零开始,用代码构建Modbus RTU主站和从站,实现工业通信协议的核心功能交互。

1. 环境搭建与基础配置

首先确保你的Python环境已安装3.7+版本。通过pip安装必要依赖:

pip install pymodbus==3.0.0 serial==0.0.97

pymodbus 3.0的重大改进包括:

  • 异步IO支持提升通信效率
  • 更完善的RTU帧处理机制
  • 简化的同步/异步API切换

创建虚拟串口的两种方案对比:

工具适用平台配置复杂度性能表现
com0comWindows中等优秀
socatLinux/macOS简单良好
虚拟串口工具跨平台复杂一般

提示:开发阶段推荐使用socat,一条命令即可创建虚拟串口对:
socat -d -d pty,raw,echo=0 pty,raw,echo=0

2. 构建Modbus从站模拟器

从站是数据提供方,我们先实现一个支持核心功能码的虚拟设备:

from pymodbus.server import StartSerialServer from pymodbus.datastore import ModbusSequentialDataBlock from pymodbus.datastore import ModbusSlaveContext, ModbusServerContext def run_slave(): # 初始化数据存储 coil_block = ModbusSequentialDataBlock(0x00, [False]*100) reg_block = ModbusSequentialDataBlock(0x00, [0]*100) store = ModbusSlaveContext( di=coil_block, co=coil_block, hr=reg_block, ir=reg_block ) context = ModbusServerContext(slaves=store, single=True) # 启动RTU从站 StartSerialServer( context=context, port='/dev/ttyS1', # 修改为你的虚拟串口 framer=ModbusRtuFramer, baudrate=19200, timeout=0.005 )

关键参数解析:

  • timeout:影响RTU帧间隔检测,典型值3.5个字符时间
  • baudrate:需与主站严格一致,常见值9600/19200/38400
  • datastore:维护四种Modbus对象的数据状态

3. 实现Modbus主站客户端

主站作为通信发起方,需要处理更复杂的超时重试机制:

from pymodbus.client import ModbusSerialClient from pymodbus.exceptions import ModbusIOException class ModbusMaster: def __init__(self, port): self.client = ModbusSerialClient( method='rtu', port=port, baudrate=19200, timeout=1, retries=3 ) def read_holding_registers(self, addr, count, unit=1): try: response = self.client.read_holding_registers( address=addr, count=count, slave=unit ) if response.isError(): raise ModbusIOException(str(response)) return response.registers except Exception as e: print(f"Read failed: {str(e)}") return None def write_register(self, addr, value, unit=1): return self.client.write_register( address=addr, value=value, slave=unit )

通信异常处理要点:

  • CRC校验失败会自动重试
  • 响应超时需业务层判断
  • 从站忙状态(0x06)需延迟重试

4. 核心功能码实战演示

4.1 03功能码:读保持寄存器

典型请求-响应过程:

主站发送: 01 03 00 00 00 02 C4 0B 从站回复: 01 03 04 00 0A 00 0B 85 84

对应Python实现:

# 主站读取寄存器0x0000-0x0001 registers = master.read_holding_registers(0, 2) print(f"Register values: {registers}") # 从站预处理方法(在StartSerialServer前添加) def process_03_request(context, client): address = client.registers[0].address values = [0x000A, 0x000B] # 模拟数据 return ModbusRegisterValues(values)

4.2 06功能码:写单个寄存器

完整数据流示例:

主站发送: 01 06 00 02 00 FF 89 F8 从站回复: 01 06 00 02 00 FF 89 F8

Python交互代码:

# 主站写入操作 result = master.write_register(2, 0x00FF) if not result.isError(): print("Write success") # 从站数据验证 assert store.getValues(3, 2)[0] == 0x00FF

5. 高级功能与调试技巧

5.1 自定义异常响应

实现从站主动返回错误码:

from pymodbus.pdu import ExceptionResponse def process_request(self, request): if request.function_code == 0x03: if request.address > 100: return ExceptionResponse( request.function_code, ModbusExceptions.IllegalAddress ) return super().process_request(request)

常见异常代码:

代码含义触发条件
0x01非法功能码未实现的功能码请求
0x02非法数据地址地址超出从站范围
0x03非法数据值数据不符合从站要求
0x04从站设备故障从站执行过程中发生错误

5.2 通信质量监测指标

通过以下参数评估通信可靠性:

# 在主站类中添加统计方法 def get_communication_metrics(self): return { 'success_rate': self.success_count / self.total_requests, 'avg_response_time': self.total_latency / self.success_count, 'crc_errors': self.crc_error_count, 'timeouts': self.timeout_count }

优化建议:

  • 调整timeout值平衡响应速度与误判率
  • 大数据量传输时适当降低波特率
  • 关键操作添加二次验证机制

6. 典型问题排查指南

6.1 常见错误现象与解决方案

现象可能原因排查步骤
通信完全无响应串口配置错误1. 检查端口号
2. 验证波特率
3. 测试串口连通性
偶发性CRC校验失败电磁干扰/波特率不匹配1. 降低波特率测试
2. 检查硬件连接
3. 添加磁环
响应数据格式错误从站数据处理逻辑异常1. 抓取原始数据帧
2. 检查从站数据存储
3. 验证功能码实现
主站收到无效从站地址响应从站地址冲突/广播问题1. 检查从站ID配置
2. 禁用广播功能
3. 网络终端电阻检查

6.2 数据帧分析工具推荐

  • Windows平台

    • Modbus Poll(主站模拟)
    • Modbus Slave(从站模拟)
    • AccessPort(串口监控)
  • Linux/macOS

    # 十六进制查看串口数据 stty -F /dev/ttyS1 19200 raw cat /dev/ttyS1 | hexdump -C
  • 跨平台方案

    # 使用pyserial嗅探数据 import serial sniffer = serial.Serial('/dev/ttyS1', 19200) while True: print(sniffer.read().hex())

在实际项目中,建议先通过虚拟串口测试所有功能逻辑,再迁移到物理设备。当遇到通信问题时,采用分治法逐步隔离问题范围——先确保物理层连通性,再验证协议层交互,最后检查业务数据处理逻辑。

http://www.jsqmd.com/news/966659/

相关文章:

  • 命令行一键下载百度搜图结果,轻量Python脚本支持自定义页数和保存路径
  • 告别依赖地狱:用AppImage在Ubuntu 22.04上安装最新版Neovim(附FUSE问题解决)
  • 从CNN到LSTM:拆解吴恩达《深度学习》课程中的核心项目与代码实践
  • 昆明市2026年最新黄金+白银+铂金+K金回收门店及联系方式电话推荐 黄金回收店铺TOP5排行榜 - 盛世金银回收
  • ai赋能matlab编程:通过快马调用大模型智能生成遗传算法求解优化问题
  • PyTorch版GITGAN脑电生成代码包:含OpenBMI与BCICIV2a数据集支持及完整训练流程
  • 【字节跳动】SEED·C语言宏定义版(.h头文件)
  • STM32CubeMX配置FreeRTOS内存管理:从heap1到heap5,你的项目到底该选哪个?
  • 不跳出应用也能拿到评分,HarmonyOS 评论弹窗方案实测
  • MinIO Admin 命令实战:从用户权限到集群修复,一份保姆级运维手册
  • Windows下MFC+Halcon实现的九点手眼标定与镜头畸变校正工程源码包
  • 别再折腾了!用Visual Studio 2019 + CMake编译FreeCAD 0.19.1源码的完整避坑指南
  • 从Point A到BWP:手把手拆解5G NR物理资源分配的完整逻辑链
  • 免费Colab跑通LLaMA 2聊天机器人:4-bit量化+Gradio实战指南
  • 【模型改进】DORGM 改进 YOLO 系列:面向 VisDrone 小目标检测的多尺度特征解耦与软路由增强
  • 实战演练:在快马平台模拟多种商务场景,掌握“都合”询问的高阶回复策略
  • ANSYS HFSS 主从边界条件全解析:从‘Master/Slave’到‘Primary/Secondary’的设计思维转变
  • 别再死记硬背了!用Python+NumPy可视化理解冲激函数如何‘抓取’信号值
  • Android平台可直接运行的WebRTC点对点视频对讲工程源码
  • 来宾市2026年最新黄金+白银+铂金+K金回收门店及联系方式电话推荐 黄金回收店铺TOP5排行榜 - 盛世金银回收
  • 性能提升秘籍:如何用Java并行处理(CompletableFuture)批量给上百页PDF去斜体水印?
  • PointMVSNet ICCV‘19可运行复现包:论文+中文详解+带注释代码+一键训练测试脚本
  • 解决ORB-SLAM3相机快速转动丢失?试试用GCNv2替换特征点提取器(Ubuntu 18.04 + CUDA 10.2实战)
  • 别再死记硬背公式了!用PyTorch和TensorFlow实战理解交叉熵损失函数
  • 从《现代大学英语精读》到真实沟通:如何用Python爬虫和NLP分析课文高频词,提升英语学习效率
  • 从安装到实战:用快马AI生成支持动态页面与数据入库的openclaw项目模板
  • 兰州市2026年最新黄金+白银+铂金+K金回收门店及联系方式电话推荐 黄金回收店铺TOP5排行榜 - 盛世金银回收
  • Ray实战指南:AI工程化落地的分布式运行时核心
  • 2026年q2切角塑封包装机厂家实测评测:全自动热缩膜包装机厂家/切角塑封包装机厂家/开箱机厂家/性价比对决 - 优质品牌商家
  • 手把手教你用C++实现PL/0表达式语法分析器(附完整源码与递归下降子程序详解)