手把手教你用Python bleak库连接Nordic蓝牙串口(NUS服务)做物联网数据收发
Python蓝牙开发实战:用bleak库实现Nordic设备NUS服务通信
蓝牙低功耗(BLE)技术已经成为物联网设备通信的主流方案之一。在众多蓝牙协议栈中,Nordic Semiconductor的nRF系列芯片因其稳定性和灵活性备受开发者青睐。本文将带你深入探索如何利用Python的bleak库,与搭载Nordic芯片的设备建立可靠的数据通道,实现类似串口的双向通信。
1. 环境准备与bleak库基础
在开始蓝牙开发前,我们需要确保开发环境配置正确。bleak作为一个跨平台的Python蓝牙库,支持Windows、Linux和macOS三大操作系统。对于物联网开发者来说,这意味着可以在不同设备上保持代码一致性,大幅提高开发效率。
安装bleak库非常简单,只需执行以下命令:
pip install bleak注意:在树莓派等Linux设备上,可能需要先安装bluez相关依赖:
sudo apt-get install bluetooth bluez libbluetooth-dev验证安装是否成功:
import bleak print(bleak.__version__)bleak库的核心优势在于其异步IO的设计模式,这使得它特别适合处理蓝牙这种需要长时间连接和实时数据交换的场景。与传统的pybluez相比,bleak提供了更现代的API设计和更好的跨平台兼容性。
提示:建议使用Python 3.7及以上版本,以获得最佳的异步特性支持
2. Nordic UART服务(NUS)协议解析
Nordic的UART服务(NUS)是一种基于BLE的虚拟串口协议,它通过两个主要特性实现双向通信:
- TX特性(UUID: 6E400003-B5A3-F393-E0A9-E50E24DCCA9E):设备向客户端发送数据
- RX特性(UUID: 6E400002-B5A3-F393-E0A9-E50E24DCCA9E):客户端向设备发送数据
这种设计模拟了传统串口的TX/RX线路,使得开发者可以像操作物理串口一样使用蓝牙连接。NUS服务的完整UUID为6E400001-B5A3-F393-E0A9-E50E24DCCA9E。
NUS通信流程:
- 客户端连接设备
- 订阅TX特性的通知(notify)
- 通过RX特性发送数据
- 通过TX特性的通知接收数据
这种模式在物联网应用中非常常见,比如:
- 传感器数据采集
- 设备固件升级(OTA)
- 远程控制指令传输
3. 设备发现与连接
使用bleak进行设备扫描时,我们通常有两种方式:简单扫描和过滤扫描。对于NUS设备,推荐使用过滤扫描以提高效率。
设备发现示例代码:
from bleak import BleakScanner, BleakClient from bleak.backends.device import BLEDevice from bleak.backends.scanner import AdvertisementData import asyncio UART_SERVICE_UUID = "6E400001-B5A3-F393-E0A9-E50E24DCCA9E" def nus_filter(device: BLEDevice, adv: AdvertisementData): return UART_SERVICE_UUID.lower() in adv.service_uuids async def discover_devices(): device = await BleakScanner.find_device_by_filter(nus_filter) if device: print(f"Found NUS device: {device.name} - {device.address}") return device else: print("No NUS device found") return None这段代码定义了一个过滤函数nus_filter,它只匹配广播中包含NUS服务UUID的设备。find_device_by_filter方法会返回第一个匹配的设备对象。
建立连接的完整流程:
- 创建BleakClient实例
- 连接设备
- 获取服务特性
- 设置通知回调
- 开始通信
async def connect_to_device(device_address): client = BleakClient(device_address) try: await client.connect() print(f"Connected to {device_address}") return client except Exception as e: print(f"Connection failed: {e}") return None4. 数据收发实现
成功连接后,我们需要设置通知处理函数来实现数据接收,并通过写入特性实现数据发送。
完整的数据收发实现:
UART_RX_CHAR_UUID = "6E400002-B5A3-F393-E0A9-E50E24DCCA9E" UART_TX_CHAR_UUID = "6E400003-B5A3-F393-E0A9-E50E24DCCA9E" def handle_rx(sender, data): print(f"Received: {bytes(data).decode('utf-8')}") async def run_communication(): device = await discover_devices() if not device: return async with BleakClient(device) as client: # 设置通知回调 await client.start_notify(UART_TX_CHAR_UUID, handle_rx) print("Connected, start typing and press ENTER to send...") while True: message = input() if message.lower() == 'exit': break # 发送数据 await client.write_gatt_char(UART_RX_CHAR_UUID, message.encode('utf-8')) await client.stop_notify(UART_TX_CHAR_UUID) # 运行主程序 asyncio.run(run_communication())这段代码实现了完整的NUS通信流程:
- 发现NUS设备
- 建立连接
- 设置TX特性的通知回调
- 循环读取用户输入并通过RX特性发送
- 接收设备返回的数据并打印
常见问题处理:
| 问题现象 | 可能原因 | 解决方案 |
|---|---|---|
| 连接超时 | 设备不在范围内 | 检查设备是否开机且在广播状态 |
| 特性读写失败 | UUID不匹配 | 确认设备确实支持NUS服务 |
| 数据乱码 | 编码不一致 | 确保发送和接收使用相同的编码(通常UTF-8) |
| 通知不工作 | 未正确订阅 | 检查start_notify调用是否成功 |
5. 实战案例:环境监测系统
让我们通过一个实际案例来展示bleak和NUS的强大组合。假设我们有一个基于nRF52的环境传感器,它通过NUS服务定期发送温度和湿度数据。
传感器数据格式:
TEMP:25.6,HUM:45.2数据解析与处理实现:
import asyncio from bleak import BleakClient class EnvironmentalMonitor: def __init__(self): self.temperature = None self.humidity = None def handle_sensor_data(self, sender, data): raw_data = bytes(data).decode('utf-8') try: parts = raw_data.split(',') temp_part = parts[0].split(':')[1] hum_part = parts[1].split(':')[1] self.temperature = float(temp_part) self.humidity = float(hum_part) print(f"Updated: Temp={self.temperature}°C, Hum={self.humidity}%") except (IndexError, ValueError) as e: print(f"Data parsing error: {e}") async def monitor_environment(): device = await discover_devices() if not device: return monitor = EnvironmentalMonitor() async with BleakClient(device) as client: await client.start_notify(UART_TX_CHAR_UUID, monitor.handle_sensor_data) print("Monitoring started...") while True: await asyncio.sleep(1) # 可以在此处添加发送控制指令的逻辑 # 例如:await client.write_gatt_char(UART_RX_CHAR_UUID, b"GET_DATA") asyncio.run(monitor_environment())这个案例展示了如何:
- 创建专门的数据处理器类
- 解析结构化传感器数据
- 实现长时间运行的监控循环
- 添加双向交互的可能性
6. 性能优化与高级技巧
在实际项目中,我们需要考虑通信的可靠性和效率。以下是几个提升NUS通信质量的技巧:
1. 连接参数优化
BLE连接参数直接影响通信的响应速度和功耗。可以通过以下方式调整:
async def connect_with_params(device_address): client = BleakClient( device_address, timeout=20.0, # 连接超时时间 disconnected_callback=lambda c: print("Disconnected!") ) # Windows平台特有参数 if os.name == 'nt': from bleak.winrt.client import WinRTClient client = WinRTClient( device_address, connection_parameters={ "IntervalMin": 15, # 毫秒 "IntervalMax": 30, "Latency": 0, "Timeout": 5000 } ) await client.connect() return client2. 数据分包处理
当传输较大数据时,需要考虑分包策略:
async def send_large_data(client, data, chunk_size=20): for i in range(0, len(data), chunk_size): chunk = data[i:i+chunk_size] await client.write_gatt_char(UART_RX_CHAR_UUID, chunk) await asyncio.sleep(0.01) # 防止发送过快3. 错误处理与重连机制
稳定的蓝牙通信需要完善的错误处理:
async def robust_communication(): retry_count = 0 max_retries = 3 while retry_count < max_retries: try: device = await discover_devices() if not device: await asyncio.sleep(2) retry_count += 1 continue async with BleakClient(device) as client: await client.start_notify(UART_TX_CHAR_UUID, handle_rx) print("Connection established") while True: # 主通信循环 await asyncio.sleep(1) except Exception as e: print(f"Error occurred: {e}") retry_count += 1 await asyncio.sleep(2) print("Max retries reached, exiting")4. 多设备管理
当需要同时管理多个NUS设备时:
class DeviceManager: def __init__(self): self.clients = {} async def add_device(self, address): if address in self.clients: return client = BleakClient(address) await client.connect() await client.start_notify(UART_TX_CHAR_UUID, self.handle_device_data) self.clients[address] = client def handle_device_data(self, sender, data): print(f"Data from {sender}: {data}") async def broadcast(self, message): for client in self.clients.values(): try: await client.write_gatt_char(UART_RX_CHAR_UUID, message.encode()) except Exception as e: print(f"Send to {client.address} failed: {e}")7. 跨平台兼容性实践
bleak最大的优势之一是其跨平台能力,但不同平台仍有细微差别需要注意:
平台特定注意事项:
| 平台 | 蓝牙栈 | 特殊要求 | 测试建议 |
|---|---|---|---|
| Windows | WinRT | 需要v16299+ | 检查Windows版本 |
| Linux | BlueZ | 需要5.43+ | 运行bluetoothd -v |
| macOS | Core Bluetooth | 10.11+ | 检查系统版本 |
| 树莓派 | BlueZ | 可能需要额外配置 | 测试基础蓝牙功能 |
平台兼容性测试代码:
async def test_platform_compatibility(): print("Testing BLE functionality...") try: # 测试扫描 print("Scanning for devices...") devices = await BleakScanner.discover() print(f"Found {len(devices)} devices") # 测试NUS设备连接 if devices: device = await discover_devices() if device: async with BleakClient(device) as client: print("Basic connection test passed") return True print("No suitable device found for full test") return False except Exception as e: print(f"Compatibility test failed: {e}") return False在实际项目中,建议针对目标平台进行充分测试,特别是当应用需要部署在不同操作系统上时。bleak虽然提供了统一的API,但底层蓝牙栈的实现差异可能导致不同的行为表现。
