当前位置: 首页 > news >正文

电力自动化通信入门:手把手教你用Python模拟IEC104协议的数据采集与遥控

电力自动化通信实战:Python模拟IEC104协议的数据采集与遥控

在工业自动化领域,电力系统的远程监控与控制是保障电网稳定运行的关键技术。IEC 60870-5-104(简称IEC104)作为电力自动化系统中广泛采用的通信协议,实现了控制站与受控站之间的高效数据交互。本文将带您用Python构建一个完整的IEC104协议模拟环境,无需真实硬件设备即可深入理解协议核心机制。

1. IEC104协议基础与环境搭建

IEC104协议采用客户端/服务器架构,基于TCP/IP实现实时数据传输。控制站(客户端)负责发送控制命令和接收数据,而受控站(服务器)则执行命令并上报设备状态。协议支持两种传输模式:

  • 非平衡模式:仅控制站可发起通信
  • 平衡模式(常用):双方均可主动传输数据

开发环境配置

# 安装必要库 pip install socket struct time random

协议帧结构由APCI(控制信息)和ASDU(应用数据)组成。关键帧类型包括:

帧类型功能描述典型应用场景
I格式信息传输遥测、遥信数据上报
S格式确认帧接收序号确认
U格式控制帧启动/停止数据传输

2. TCP连接与基础控制帧实现

建立可靠通信的第一步是TCP连接管理。以下是Python实现的服务器端基础框架:

import socket class IEC104Server: def __init__(self, host='0.0.0.0', port=2404): self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.server_socket.bind((host, port)) self.server_socket.listen(1) print(f"Server listening on {host}:{port}") def handle_connection(self): conn, addr = self.server_socket.accept() print(f"Connected by {addr}") # 默认处于STOPDT状态 data_transmission_active = False while True: data = conn.recv(255) # APDU最大长度255字节 if not data: break # 解析APCI start_byte, length = data[0], data[1] if start_byte != 0x68: print("Invalid start byte") continue control_field = data[2:6] frame_type = control_field[0] & 0x03 # 处理U格式帧 if frame_type == 3: if control_field[0] == 0x07: # STARTDT激活 print("Received STARTDT activate") conn.send(bytes([0x68, 0x04, 0x0B, 0x00, 0x00, 0x00])) # STARTDT确认 data_transmission_active = True elif control_field[0] == 0x13: # TESTFR激活 print("Received TESTFR activate") conn.send(bytes([0x68, 0x04, 0x83, 0x00, 0x00, 0x00])) # TESTFR确认

注意:实际实现中需要添加超时处理和心跳机制,确保连接稳定性

3. 遥信数据采集与上报

单点遥信(SIQ)是电力系统中最基础的状态量,表示开关、断路器等设备的开合状态。其数据结构包含状态值和质量标志:

SIQ字节结构: bit 0: 状态值 (0=分/开, 1=合/关) bit 1-3: 保留 bit 4: 封锁标志 (BL) bit 5: 取代标志 (SB) bit 6: 刷新标志 (NT) bit 7: 有效标志 (IV)

实现周期性数据上报的服务器端代码:

def send_spontaneous_data(conn, io_address, value): # 构建ASDU type_id = 0x01 # 单点遥信 vsq = 0x01 # 单个信息对象 cot = 0x03 # 突发传输原因 asdu_addr = 0x0001 io_addr_bytes = io_address.to_bytes(3, 'little') # 构建SIQ (假设质量标志全正常) siq = value & 0x01 # 组装APDU apci = bytes([0x68, 0x0E, 0x00, 0x00, 0x00, 0x00]) # I格式,序号0 asdu = bytes([type_id, vsq, cot, asdu_addr & 0xFF, (asdu_addr >> 8) & 0xFF]) + io_addr_bytes + bytes([siq]) conn.send(apci + asdu) # 在服务器主循环中添加 if data_transmission_active and random.random() < 0.3: # 30%概率上报 io_address = random.randint(1, 100) state = random.randint(0, 1) send_spontaneous_data(conn, io_address, state)

客户端解析SIQ数据的示例:

def parse_siq(asdu): type_id = asdu[0] if type_id != 0x01: return None io_address = int.from_bytes(asdu[6:9], 'little') siq = asdu[9] state = siq & 0x01 iv = (siq >> 7) & 0x01 # 有效性标志 return { 'address': io_address, 'state': 'ON' if state else 'OFF', 'valid': iv == 0, 'timestamp': time.time() }

4. 遥控命令实现与安全机制

遥控命令(SCO)允许控制站改变受控站设备状态,典型应用包括断路器分合闸操作。IEC104支持两种控制模式:

  1. 直接执行模式:单步完成命令下发与执行
  2. 选择-执行模式:先选择验证,再确认执行

选择-执行模式实现

def handle_control_command(conn, data): type_id = data[6] if type_id != 0x2D: # 单命令C_SC_NA_1 return False cot = data[8] io_address = int.from_bytes(data[10:13], 'little') sco = data[13] # 解析控制命令 execute = (sco & 0x80) == 0 command = sco & 0x01 if cot == 0x06: # 选择命令 print(f"Select command for address {io_address}") # 返回确认帧(肯定确认) confirm_asdu = bytes([0x2D, 0x01, 0x07, 0x01, 0x00]) + data[10:14] conn.send(bytes([0x68, 0x0E, 0x00, 0x00, 0x00, 0x00]) + confirm_asdu) return True elif cot == 0x08: # 执行命令 print(f"Execute {'ON' if command else 'OFF'} for address {io_address}") # 更新设备状态并返回执行确认 confirm_asdu = bytes([0x2D, 0x01, 0x0A, 0x01, 0x00]) + data[10:14] conn.send(bytes([0x68, 0x0E, 0x00, 0x00, 0x00, 0x00]) + confirm_asdu) return True return False

安全提示:实际系统中应实现操作权限验证、防误操作闭锁等安全机制

客户端发送遥控命令的示例流程:

def send_control_command(conn, io_address, command, select_execute=True): # 构建ASDU type_id = 0x2D # 单命令 vsq = 0x01 # 单个信息对象 cot = 0x06 if select_execute else 0x08 # 选择/执行阶段 asdu_addr = 0x0001 io_addr_bytes = io_address.to_bytes(3, 'little') # 构建SCO (选择阶段设置最高位为1) sco = 0x80 if select_execute else 0x00 sco |= (command & 0x01) # 组装APDU apci = bytes([0x68, 0x0E, 0x00, 0x00, 0x00, 0x00]) # I格式,序号0 asdu = bytes([type_id, vsq, cot, asdu_addr & 0xFF, (asdu_addr >> 8) & 0xFF]) + io_addr_bytes + bytes([sco]) conn.send(apci + asdu) # 等待确认 response = conn.recv(255) if len(response) < 6: return False # 验证确认帧 return response[6] == type_id and response[8] == (cot + 1)

5. 高级功能与性能优化

完整的IEC104实现还需要考虑以下高级功能:

时钟同步机制

def handle_time_sync(conn, data): type_id = data[6] if type_id != 0x67: # 时钟同步命令C_CS_NA_1 return False # 解析时间 (CP56Time2a格式) milliseconds = int.from_bytes(data[10:12], 'little') minutes = data[12] & 0x3F hours = data[13] & 0x1F day = data[14] & 0x1F month = data[15] & 0x0F year = data[16] & 0x7F # 返回带时标的确认 current_time = time.time() time_bytes = convert_to_cp56time2a(current_time) confirm_asdu = bytes([0x67, 0x01, 0x07, 0x01, 0x00]) + time_bytes conn.send(bytes([0x68, 0x0E, 0x00, 0x00, 0x00, 0x00]) + confirm_asdu) return True

性能优化技巧

  1. 使用缓冲区减少小包传输
  2. 实现帧序号自动管理
  3. 异步处理耗时操作
  4. 连接状态监控与自动恢复
class IEC104Buffer: def __init__(self): self.send_seq = 0 self.recv_seq = 0 self.send_buffer = [] def add_frame(self, apdu): self.send_buffer.append(apdu) self.send_seq = (self.send_seq + 1) % 32768 def get_ack_frames(self, ack_seq): # 返回已确认的帧 acked = [] while self.send_buffer and self.send_buffer[0]['seq'] <= ack_seq: acked.append(self.send_buffer.pop(0)) return acked

通过这个Python实现,我们构建了一个完整的IEC104协议模拟环境,涵盖了从基础连接建立到数据采集、遥控命令等核心功能。这种模拟方法不仅适用于学习协议原理,也可用于自动化测试系统的开发。

http://www.jsqmd.com/news/920396/

相关文章:

  • 别再死记payload了!手把手教你用PHP代码动态生成CTF序列化利用点
  • FPGA图像缩放+GTX光传输+UDP网传:一个视频处理系统的数据流完整解析(附源码)
  • 终极指南:如何深度配置Jellyfin Android TV打造专业级家庭影院体验
  • 保姆级教程:Win10系统下MATLAB 2021b从下载到激活的完整避坑指南
  • 别再死记硬背Payload了!手把手教你用PHP代码动态生成序列化攻击字符串
  • 保姆级教程:用Ansys Workbench 2023 R2找出BGA焊点最容易坏的位置(附模型文件)
  • 别再死记硬背了!用‘重叠区域’和PD图直观理解SRT除法器设计
  • 10分钟掌握AI音频修复:VoiceFixer的完整免费指南
  • WeMod终极功能解锁指南:快速免费激活高级特性完整教程
  • 避坑指南:交叉编译ZLMediaKit启用WebRTC时,OpenSSL和libsrtp的配置雷区全解析
  • ECB02蓝牙模块避坑指南:主机模式连接不上?从AT指令调试到绑定失败的5个常见问题排查
  • 深度解析:如何用LeagueAkari实现英雄联盟游戏效率翻倍
  • FPGA开发板吃灰了?用拨码开关和LED灯做个4位乘法器“计算器”吧(Quartus II实战)
  • 别再只记payload了!深入理解PHP is_numeric()与strcmp()的‘坑’与绕过姿势
  • 10分钟精通:西安交通大学LaTeX论文模板的终极排版解决方案
  • CM211-1刷Armbian避坑大全:从S905L3固件选择、网络修复到长期稳定运行指南
  • 从‘conda not found’到流畅使用:Miniconda3在Windows/Linux/macOS上的完整配置与避坑指南
  • 2026年4月技术好的一体化泵站制造厂家推荐,不锈钢智慧泵房/碳钢户外泵房/变频控制柜,一体化泵站销售商推荐 - 品牌推荐师
  • 告别IP核!手把手教你用Verilog在Quartus II里从零实现一个4位乘法器(附仿真与引脚绑定)
  • 保姆级教程:在STM32CubeMX生成的FreeRTOS工程里,手把手移植一个稳定的软件IIC驱动(附AT24C02测试代码)
  • 企业安全正在从账号安全走向执行安全
  • WechatDecrypt终极指南:三步快速掌握微信聊天记录解密技术
  • 2026年4月高评价电缆沟盖板推荐指南:卡槽式电缆沟盖、双层井盖、变电站室外电缆沟盖板、复合树脂井盖、复合树脂盖板选择指南 - 优质品牌商家
  • 从自动售货机到快递路线:贪心算法在真实软件开发中的3个应用场景与Python实现
  • Android 11 User版本编译实战:为线上设备安全开启su与root账户(附完整SELinux策略修改清单)
  • 朝着可靠的合成控制
  • 不止是填参数:深入理解ZYNQ MPSoC DDR子系统时钟、位宽与PCB设计的关联
  • 别再死记硬背了!用这个“电压转电流”的比喻,5分钟搞懂MOSFET跨导gm
  • ESP32开发板到手别吃灰!5分钟搞定VSCode环境,让板载LED闪起来
  • Realtek RTL8821CE驱动技术深度解析:Linux无线连接问题的硬核解决方案