当前位置: 首页 > news >正文

python mqgg 发送 json 文件

pip install paho-mqtt

发送json,含有音频文件;

import json, base64 import paho.mqtt.client as mqtt import time def on_connect(client, userdata, flags, rc): if rc == 0: print("[A] Connected to broker") client.subscribe(TOPIC_SUB) print(f"[A] Subscribed to {TOPIC_SUB}") else: print(f"[A] Connection failed: {rc}") def on_message(client, userdata, msg): print(f"[A] Received message on '{msg.topic}': {msg.payload.decode()}") if __name__ == '__main__': #install "pip install paho-mqtt" BROKER = "192.168.8.127" BROKER = "localhost" PORT = 1883 TOPIC_SUB = "ue/state" # 订阅状态 TOPIC_PUB = "ue/command" # 发布命令 client = mqtt.Client(client_id="ClientA") client.on_connect = on_connect client.on_message = on_message client.connect(BROKER, PORT, keepalive=30) # non-blocking loop client.loop_start() wav_path = r"D:\data\audios\post_2.wav" # 发送一些命令 for i in range(5000): msg = f"command {i}" with open(wav_path, "rb") as f: audio_bytes = f.read() audio_b64 = base64.b64encode(audio_bytes).decode("utf-8") msg_audio = json.dumps({ "type": "audio", "data": audio_b64, }) client.publish("ue/messages", msg_audio) # ===== 发送命令 ===== msg_cmd = json.dumps({ "type": "command", "data": "pause", }) # client.publish("ue/messages", msg_cmd) print(f"[A] Sent: {msg}") time.sleep(3) client.loop_stop() client.disconnect()

接收json,含有音频文件

import json import base64 import paho.mqtt.client as mqtt import os # ====== 配置 ====== BROKER = "localhost" # MQTT Broker PORT = 1883 TOPIC_SUB = "ue/messages" # 订阅 Topic SAVE_DIR = r"D:\data\audios_received" os.makedirs(SAVE_DIR, exist_ok=True) file_index = 0 # ====== MQTT 回调 ====== def on_connect(client, userdata, flags, rc): if rc == 0: print("[B] Connected to broker") client.subscribe(TOPIC_SUB) print(f"[B] Subscribed to {TOPIC_SUB}") else: print(f"[B] Connection failed: {rc}") def on_message(client, userdata, msg): global file_index try: payload_str = msg.payload.decode('utf-8') data = json.loads(payload_str) if data.get("type") == "audio": audio_b64 = data.get("data", "") if not audio_b64: return audio_bytes = base64.b64decode(audio_b64) file_path = os.path.join(SAVE_DIR, f"received_{file_index}.wav") file_index += 1 # 自动判断是否是完整 WAV if is_wav_file(audio_bytes): # 已经是完整 WAV,直接写入文件 with open(file_path, 'wb') as f: f.write(audio_bytes) print(f"[B] Saved complete WAV to {file_path}") else: # 裸 PCM,需要自己加 WAV 头 # 默认假设 16-bit PCM,单声道,采样率 44100 save_pcm_as_wav(file_path, audio_bytes, sample_rate=44100, num_channels=1) print(f"[B] Saved PCM as WAV to {file_path}") elif data.get("type") == "command": cmd = data.get("data") print(f"[B] Received command: {cmd}") except Exception as e: print(f"[B] Error processing message: {e}") # ====== 判断是否完整 WAV 文件 ====== def is_wav_file(data_bytes): # WAV 文件头前4字节是 "RIFF" return data_bytes[:4] == b'RIFF' # ====== 将 PCM 数据写入 WAV 文件 ====== def save_pcm_as_wav(file_path, audio_bytes, sample_rate=44100, num_channels=1): import wave with wave.open(file_path, 'wb') as wf: wf.setnchannels(num_channels) wf.setsampwidth(2) # 16-bit wf.setframerate(sample_rate) wf.writeframes(audio_bytes) # ====== 主程序 ====== if __name__ == "__main__": client = mqtt.Client(client_id="ClientB") client.on_connect = on_connect client.on_message = on_message client.connect(BROKER, PORT, keepalive=30) client.loop_forever()
http://www.jsqmd.com/news/280663/

相关文章:

  • 学习日记day64
  • Java毕设项目:基于springboot的智慧医疗网上预约系统(源码+文档,讲解、调试运行,定制等)
  • GGUF、Safetensors、ONNX三种格式
  • springboot_ssm815大学生校园图书借阅购买管理系统--论文
  • #对象模型
  • 强烈安利8个AI论文平台,本科生搞定毕业论文!
  • springboot_ssm816大学运动场地预约器材租借管理系统--论文
  • springboot_ssm817学生信息管理系统--论文
  • leetcode 热题
  • [ACTF2020 新生赛]Upload 1(一句话木马加蚁剑)
  • 【深度测评】2026年护考刷题APP算法横评:为何“易小考”能成为护资备考首选?
  • 【毕业设计】基于springboot的智慧医疗网上预约系统(源码+文档+远程调试,全bao定制等)
  • RHCSA结课综合作业
  • 【课程设计/毕业设计】基于springboot的智慧医疗网上预约系统医院在线挂号与患者预约管理【附源码、数据库、万字文档】
  • springboot_ssm807古诗词数字化分享平台--论文
  • 异常检测:提示工程架构师如何识别提示数据中的异常行为?
  • 字节面试官:问你C++观察者模式,你答了7点他说不够深
  • 分流抢票软件bypass,Bypass-分流抢票:让你秒杀抢票,稳定捡漏的神器!
  • springboot_ssm808图书借阅挂失崔还系统功能全--论文
  • K8s修改Pod的Command/Args参数报错?这篇实操指南帮你搞定
  • K8s Nginx Pod 出现 CrashLoopBackOff?从配置排查到彻底解决
  • Ubuntu系统移植
  • 【奖励到账】CSDN AI 社区镜像创作激励活动第三批奖励正式发放!
  • [特殊字符] 最新版 | Windows10 Win11系统终极优化神器RyTuneX完全安装配置指南 [特殊字符]
  • springboot_ssm809基于SSM架构的网上书城系统图书销售--论文
  • 全面优化你的Windows,RyTuneX系统全能优化神器
  • 【课程设计/毕业设计】基于SpringBoot的宝贝回家走失儿童报备系统基于springboot的走失儿童认领与登记系统【附源码、数据库、万字文档】
  • springboot_ssm810基于SSM的校园音乐平台--论文
  • Flux2 Klein 闪电急速出图 WebUI整合包体验版下载及使用教程【上篇】(模型与性能解析)
  • springboot_ssm811基于web的特殊药品商城管理系统--论文