当前位置: 首页 > news >正文

折腾笔记[41]-使用mqtt透传ollama的api

摘要

使用mqtt透传ollama的api端点以应对客户端与服务器网络连接不稳定的情况.

实现

服务端

1. 启动mqtt broker(也可以使用公共broker)

  • mqtt://127.0.0.1:8907, 允许匿名登陆
docker pull m.daocloud.io/docker.io/eclipse-mosquitto:latest
sudo mkdir -p /home/server/mosquitto
sudo touch /home/server/mosquitto/mosquitto.conf
sudo chown -R qsbye /home/server/mosquitto
cat > /home/server/mosquitto/mosquitto.conf <<'EOD'
listener 8907 0.0.0.0
protocol mqtt
allow_anonymous true
EOD
docker run --restart=always -p 8907:8907 -v "/home/server/mosquitto/mosquitto.conf:/mosquitto/config/mosquitto.conf" -d m.daocloud.io/docker.io/eclipse-mosquitto:latest

2. 配置mqtt透传转发ollama

命令:

uv init
uv python pin 3.13
uv add paho-mqtt aiohttp
vim mqtt_bridge_ollama.py

代码:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
文件: mqtt_bridge_ollama.py
功能: MQTT 透明代理 127.0.0.1:8907 <-> Ollama 127.0.0.1:11435
用法: python mqtt_bridge_ollama.py
"""
import asyncio
import json
import logging
import signal
import sys
import time
from datetime import datetime
from typing import Any, Dict, Optionalimport aiohttp
import paho.mqtt.client as mqtt# ========== 配置 ==========
OLLAMA_BASE = "http://127.0.0.1:11435"
MQTT_BROKER = "tcp://127.0.0.1:8907"
ROOT_TOPIC  = "/api/ollama"
QOS         = 1
# ==========================logging.basicConfig(level=logging.INFO,format="[mqtt-ollama] %(asctime)s  %(message)s",datefmt="%H:%M:%S",
)
log = logging.getLogger(__name__)# ---------- 工具 ----------
def gen_id() -> str:return str(int(datetime.now().timestamp() * 1_000_000))# ---------- MQTT ----------
class Bridge:def __init__(self) -> None:self._mqtt: Optional[mqtt.Client] = Noneself._session: Optional[aiohttp.ClientSession] = Noneself._loop: Optional[asyncio.AbstractEventLoop] = None# 入口def run(self) -> None:self._loop = asyncio.new_event_loop()asyncio.set_event_loop(self._loop)# 优雅退出for sig in (signal.SIGINT, signal.SIGTERM):signal.signal(sig, lambda *_: self._loop.create_task(self.stop()))try:self._loop.run_until_complete(self._start())finally:self._loop.close()async def stop(self) -> None:log.info("收到退出信号")if self._mqtt:self._mqtt.disconnect()if self._session:await self._session.close()self._loop.stop()# 连接 MQTTasync def _start(self) -> None:# 在异步上下文中创建 aiohttp sessionself._session = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=0))while True:try:self._mqtt = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id="mqtt-ollama-bridge")self._mqtt.on_connect = self._on_connectself._mqtt.connect("127.0.0.1", 8907, 60)self._mqtt.loop_start()log.info("MQTT 连接成功")# 保持运行await asyncio.Event().wait()except Exception as e:log.error("连接失败: %s ,5s 后重试", e)await asyncio.sleep(5)# 订阅主题def _on_connect(self, client: mqtt.Client, *_: Any, properties=None) -> None:topic = f"{ROOT_TOPIC}/#"client.subscribe(topic, qos=QOS)client.message_callback_add(topic, self._on_message)log.info("订阅成功: %s", topic)# 收到 MQTT 消息def _on_message(self, client: mqtt.Client, userdata: Any, msg: mqtt.MQTTMessage) -> None:asyncio.run_coroutine_threadsafe(self._handle(msg), self._loop)# 异步处理async def _handle(self, msg: mqtt.MQTTMessage) -> None:topic = msg.topiclog.info("收到 topic=%s", topic)# 解析 topic: api/ollama/<method>/<path...>suffix = topic[len(ROOT_TOPIC) + 1 :]if "/" not in suffix:log.warning("topic 格式错误")returnmethod, path = suffix.split("/", 1)# 提取 req_idtry:payload: Dict[str, Any] = json.loads(msg.payload) if msg.payload else {}except Exception:payload = {}req_id: str = payload.pop("_req_id", None) or gen_id()# 转发try:await self._forward(method, path, req_id, payload)except Exception as e:log.error("转发错误: %s", e)self._publish(req_id, {"error": str(e)})# 真正发 HTTP 并流式回 MQTTasync def _forward(self, method: str, path: str, req_id: str, body: Dict[str, Any]) -> None:url = f"{OLLAMA_BASE}/{path}"headers = {"Content-Type": "application/json"} if body else {}async with self._session.request(method, url, json=body or None, headers=headers) as resp:# 逐行读取 SSE / 普通 bodyasync for line in resp.content:line = line.rstrip(b"\r\n")if line:self._publish(req_id, line.decode("utf-8"))# 发布单条数据def _publish(self, req_id: str, data: Any) -> None:topic = f"{ROOT_TOPIC}/response/{req_id}"payload = json.dumps(data) if not isinstance(data, str) else dataself._mqtt.publish(topic, payload, qos=QOS)# ---------- main ----------
if __name__ == "__main__":Bridge().run()

运行:

nohup uv run mqtt_bridge_ollama.py &

客户端

1. 测试连接mqtt话题

命令:

uv init
uv python pin 3.13
uv add paho-mqtt aiohttp
vim test_mqtt_ollama.py

代码:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
测试:MQTT 透传 → Ollama 流式生成 Rust 代码
用法: python test_mqtt_ollama.py
"""
import json
import queue
import random
import time
import paho.mqtt.client as mqttMQTT_HOST = "10.8.8.130"
MQTT_PORT = 8907
ROOT_TOPIC = "/api/ollama"req_id = f"rust_demo_{random.randint(1000, 9999)}"
q = queue.Queue()def on_connect(cli, _ud, _flags, rc, _properties=None):if rc == 0:print("✅ MQTT 连接成功")cli.subscribe(f"{ROOT_TOPIC}/response/{req_id}", qos=1)else:print("❌ 连接失败,rc =", rc)def on_message(cli, _ud, msg):q.put(msg.payload)client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
client.on_connect = on_connect
client.on_message = on_message
client.connect(MQTT_HOST, MQTT_PORT, 60)
client.loop_start()# 等连上
while not q.empty():q.get()
time.sleep(0.5)# 发请求
payload = {"_req_id": req_id,"model": "modelscope.cn/Qwen/Qwen3-30B-A3B-GGUF:Qwen3-30B-A3B-Q8_0.gguf","prompt": "请只输出一段最简 Rust 代码,打印 hello ollama,不要任何解释。","stream": True
}
client.publish(f"{ROOT_TOPIC}/post/api/generate",json.dumps(payload), qos=1)# 收流式回答
print("\n--- 流式回答开始 ---")
done = False
while not done:try:pkt = q.get(timeout=10)if not pkt:continue# 只提取并打印 response 字段try:data = json.loads(pkt.decode('utf-8'))print(data.get("response", ""), end="", flush=True)if data.get("done"):done = Trueexcept json.JSONDecodeError:# 非 JSON 包直接丢弃continueexcept queue.Empty:print("\n⚠️  10s 没收到新包,退出")break
print("\n--- 流式回答结束 ---")client.loop_stop()
client.disconnect()

go版本:

package mainimport ("encoding/json""fmt""log""math/rand""time"mqtt "github.com/eclipse/paho.mqtt.golang"
)const (MQTT_HOST  = "10.8.8.130"MQTT_PORT  = 8907ROOT_TOPIC = "/api/ollama"
)func main() {// 生成随机请求 IDreqId := fmt.Sprintf("rust_demo_%d", rand.Intn(9000)+1000)// 创建消息队列(用带缓冲的 channel 模拟)msgQueue := make(chan mqtt.Message, 10)// MQTT 连接选项opts := mqtt.NewClientOptions()opts.AddBroker(fmt.Sprintf("tcp://%s:%d", MQTT_HOST, MQTT_PORT))opts.SetClientID(reqId)opts.SetAutoReconnect(true)// 连接成功回调:订阅响应主题opts.SetOnConnectHandler(func(c mqtt.Client) {fmt.Println("✅ MQTT 连接成功")topic := fmt.Sprintf("%s/response/%s", ROOT_TOPIC, reqId)if token := c.Subscribe(topic, 1, nil); token.Wait() && token.Error() != nil {log.Fatal(token.Error())}})// 消息到达回调:写入队列opts.SetDefaultPublishHandler(func(c mqtt.Client, m mqtt.Message) {msgQueue <- m})// 连接并启动网络循环client := mqtt.NewClient(opts)if token := client.Connect(); token.Wait() && token.Error() != nil {log.Fatal("❌ 连接失败:", token.Error())}defer client.Disconnect(250)// 等待连上并清空残留消息time.Sleep(500 * time.Millisecond)for len(msgQueue) > 0 {<-msgQueue}// 构造并发送请求payload := map[string]interface{}{"_req_id": reqId,"model":   "modelscope.cn/Qwen/Qwen3-30B-A3B-GGUF:Qwen3-30B-A3B-Q8_0.gguf","prompt":  "请只输出一段最简 Rust 代码,打印 hello ollama,不要任何解释。","stream":  true,}jsonBytes, _ := json.Marshal(payload)topic := fmt.Sprintf("%s/post/api/generate", ROOT_TOPIC)token := client.Publish(topic, 1, false, jsonBytes)token.Wait()// 收流式回答fmt.Println("\n--- 流式回答开始 ---")done := falsefor !done {select {case msg := <-msgQueue:// 只提取并打印 response 字段var data map[string]interface{}if err := json.Unmarshal(msg.Payload(), &data); err != nil {// 非 JSON 包直接丢弃continue}fmt.Print(data["response"])if v, ok := data["done"].(bool); ok && v {done = true}case <-time.After(10 * time.Second):fmt.Println("\n⚠️  10s 没收到新包,退出")return}}fmt.Println("\n--- 流式回答结束 ---")
}

输出:

  MQTT 连接成功--- 流式回答开始 ---
<think>
好的,用户让我提供一个最简的Rust代码,打印“hello ollama”,而且不要任何解释。首先,我需要确认用户的需求是什么。他们可能想要一个非常基础的示例,可能用于测试环境或者快速演示。Rust的Hello World通常使用println!宏。所以最简单的代码应该是fn main() { println!("hello ollama"); }。不过用户要求最简,可能需要更简短的写法。但Rust的语法必须有函数定义,所以fn main()是必须的。有没有可能更简?比如使用main函数的另一种形式?或者有没有其他方式?比如使用宏展开?不过可能不会更简。比如,直接写println!会报错,因为需要在函数体内。所以必须有main函数。那正确的最简代码应该是fn main(){println!("hello ollama");}。去掉空格的话,可能更简,但用户可能希望有适当的格式。不过用户要求最简,所以可能不需要空格。比如fn main(){println!("hello ollama");}这样。检查是否有其他可能的错误。比如,是否需要use std::println;?不过println!是宏,通常自动引入。所以不需要额外的use语句。因此,这段代码应该可以直接运行。确认用户不要任何解释,所以只需要输出代码。所以最终答案应该是这个代码段。
</think>fn main(){println!("hello ollama");}
--- 流式回答结束 ---
http://www.jsqmd.com/news/263394/

相关文章:

  • Python+django的高校教师科研项目管理系统的设计与实现
  • 2026的每一滴血
  • 2026最新贵州大平层装修/跃层装修/复式楼装修/装修设计/实景还原家装/改善型装修公司优选超世家装!贵阳家装标杆品牌,28年实力铸就品质之家 - 品牌推荐2026
  • 【计算机毕业设计案例】基于springboot+微信小程序的城镇职工基本医保云上管理系统(程序+文档+讲解+定制)
  • 2026年市面上热门的顶托企业口碑推荐,u型丝预埋件/钢支撑/脚手架/不锈钢止水钢板/顶托,顶托源头厂家排行榜单 - 品牌推荐师
  • web入门101-110
  • 开题报告怎么写不返工?宏智树 AI 教你一招搞定学术敲门砖
  • 2026.1
  • 学习unigui【46】让客户端浏览器可以选下载你的apk
  • 告别 SPSS/Excel 数据分析噩梦!宏智树 AI:论文实证研究的智能数据管家
  • 毕业论文通关指南:宏智树 AI 教你避开写作那些坑
  • 【AUTOSAR AP Core 】AUTOSAR AP Core集成测试关键策略
  • 程序员必看:大模型时代如何突围?从地铁求职广告看AI转型之路
  • 深度测评9个AI论文软件,助本科生轻松搞定毕业论文!
  • 告别文献堆砌!宏智树 AI:一键解锁文献综述的逻辑进阶术
  • 从“需求解读员“到“大模型兜底侠“:我的AI产品实践 | 程序员必藏
  • 学霸同款9个AI论文软件,继续教育学生必备!
  • springboot的智能民宿预定与游玩系统设计与实现
  • 太原文创伴手礼定制哪家性价比高
  • 【毕业设计】基于微信小程序的博物馆文创系统的设计与实现基于springboot+微信小程序的多平台的博物馆预约系统的设计与实现(源码+文档+远程调试,全bao定制等)
  • 小程序毕设选题推荐:基于微信小程序的博物馆服务系统的设计与实现基于springboot+微信小程序的多平台的博物馆预约系统的设计与实现【附源码、mysql、文档、调试+代码讲解+全bao等】
  • 【课程设计/毕业设计】基于协同过滤算法的彩妆商城系统的设计与实现基于springboot+协同过滤算法的美妆护理类的购物平台小程序【附源码、数据库、万字文档】
  • 氪金手游
  • ABC 441 G(线段树多懒标记维护)
  • 小程序计算机毕设之基于Java+SpringBoot+Vue的博物馆游客预约系统基于springboot+微信小程序的多平台的博物馆预约系统的设计与实现(完整前后端代码+说明文档+LW,调试定制等)
  • 必看!零基础转行AIGC产品经理的完整路径,收藏这篇就够了
  • STM32F0实战:基于HAL库开发【1.4】
  • 基于AI客服链动2+1模式商城小程序的社群运营策略研究——以千人社群活跃度提升为例
  • 勇气
  • STM32F0实战:基于HAL库开发【1.5】