当前位置: 首页 > news >正文

emqx作为ws服务器

先说我的需求、我现在需要一个支持非常多客户端的ws服务器。emqx支持、但是我这边需要先登录。所以需要配置emqx的客户端认证,我这里会自己写一个自定义认证

安装emqx https://github.com/emqx/emqx/releases/tag/6.1.1

我这里下的二进制包,启动

./bin/emqx start

image

1:登录后台,默认账号、密码是:admin public、第一次登录会让改密码

2:设置自定义认证。我这里测试环境用的账号密码,实际环境其实可以直接传一个字段,比如密码、jwt。因为客户端默认先走网关根据账号密码生成jwt,然后我们单独写一个jwt认证的接口用于处理emqx的自定义认证

image
image

3:启动自定义的接口服务,我这用的是go。其他语言看公司需要,返回的deny、allow是定死的

package main
import ("github.com/gin-gonic/gin""net/http"
)
// 定义 EMQX 发来的 JSON 结构
type AuthReq struct {Username string `json:"username"`Password string `json:"password"`
}
func emqxAuth(c *gin.Context) {var req AuthReq// 1. 接收 JSONif err := c.ShouldBindJSON(&req); err != nil {c.JSON(http.StatusOK, gin.H{"result": "deny"})return}// 2. 你自己的账号密码(改成你要的)correctUsername := "admin"correctPassword := "123456"// 3. 校验if req.Username == correctUsername && req.Password == correctPassword {// 验证通过c.JSON(http.StatusOK, gin.H{"result": "allow"})return}// 验证失败c.JSON(http.StatusOK, gin.H{"result": "deny"})
}
func main() {r := gin.Default()// EMQX HTTP 认证接口r.POST("/", emqxAuth)// 启动服务r.Run(":8080")
}

4:测试单点登录(同一设备id会顶掉)+发消息+私聊+群聊

image

ScreenShot_2026-03-27_103102_335

5: 另一个客户端:代码

package mainimport ("encoding/json""fmt""log""os""os/signal""syscall""time"mqtt "github.com/eclipse/paho.mqtt.golang"
)// ===================== 【配置】 =====================
const (EMQX_BROKER = "tcp://172.17.0.185:1883/mqtt" // mqtt之ws的地址USER_ID     = "admin"PASSWORD    = "123456"
)// ===================== 主题 =====================
const (TOPIC_P2P   = "p2p/%s"TOPIC_GROUP = "group/%s"
)var (clientGlobal mqtt.Client // 全局clientmyP2pTopic   stringmyGroupTopic string
)// 消息回调
func messageCallback(client mqtt.Client, msg mqtt.Message) {msg.Ack() // ✅ 必须确认fmt.Printf("✅ 收到消息 -> %s | %s\n", msg.Topic(), string(msg.Payload()))
}// 订阅函数(重连时也要调用)
func doSubscribe() {// 订阅自己的私聊if token := clientGlobal.Subscribe(myP2pTopic, 1, messageCallback); token.Wait() && token.Error() != nil {log.Fatal("订阅失败:", token.Error())}fmt.Printf("✅ 已订阅: %s\n", myP2pTopic)// 订阅群聊if token := clientGlobal.Subscribe(myGroupTopic, 1, messageCallback); token.Wait() && token.Error() != nil {log.Fatal("订阅失败:", token.Error())}fmt.Printf("✅ 已订阅: %s\n", myGroupTopic)
}// 连接EMQX(稳定版)
func connectEMQX() mqtt.Client {opts := mqtt.NewClientOptions()opts.AddBroker(EMQX_BROKER)opts.SetClientID("emqx_MzkxND") // 固定ID,单点登录opts.SetUsername(USER_ID)opts.SetPassword(PASSWORD)// ✅【关键】自动重连opts.SetAutoReconnect(true)opts.SetMaxReconnectInterval(5 * time.Second)// ✅【关键】重连后 自动重新订阅!!!opts.OnConnect = func(c mqtt.Client) {fmt.Println("\n✅ EMQX 已连接")clientGlobal = cdoSubscribe() // 重连后自动订阅!!!}opts.OnConnectionLost = func(c mqtt.Client, err error) {fmt.Printf("\n❌ 连接断开: %v,正在自动重连...\n", err)}client := mqtt.NewClient(opts)if token := client.Connect(); token.Wait() && token.Error() != nil {log.Fatalf("连接失败: %v", token.Error())}return client
}// 发送消息
func publish(topic, content string) {payload, _ := json.Marshal(map[string]string{"from":    USER_ID,"content": content,"time":    time.Now().Format("15:04:05"),})token := clientGlobal.Publish(topic, 1, false, payload)token.Wait()if token.Error() != nil {fmt.Println("发送失败:", token.Error())return}fmt.Printf("✅ 发送 -> %s | %s\n", topic, content)
}// ===================== 主函数 =====================
func main() {// 初始化主题myP2pTopic = fmt.Sprintf(TOPIC_P2P, USER_ID)myGroupTopic = fmt.Sprintf(TOPIC_GROUP, "888")// 连接client := connectEMQX()clientGlobal = clientfmt.Println("\n======================")fmt.Println("🎉 聊天服务已启动!")fmt.Println("当前用户:", USER_ID)fmt.Println("======================\n")// 测试消息time.Sleep(1 * time.Second)publish(fmt.Sprintf(TOPIC_P2P, "user_1002"), "你好!这是私聊消息")publish(myGroupTopic, "大家好,我是群成员 "+USER_ID)// 阻塞quit := make(chan os.Signal, 1)signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)<-quitclient.Disconnect(250)
}

image

http://www.jsqmd.com/news/543549/

相关文章:

  • 工会活动服务选哪家,湖南星火传承教育咨询靠谱吗? - 工业品网
  • 联邦学习环境下的算法保护
  • 别再让时钟信号‘跑偏’了!手把手教你理解ADC中DCC电路的设计要点
  • 人大金仓V8数据库授权过期别慌!手把手教你5分钟搞定license文件替换(附官方下载地址)
  • 将-Streamlit-应用程序部署到-AWS
  • PPTAgent终极指南:10分钟掌握智能演示文稿生成技术
  • 如何快速掌握FModel:解锁虚幻引擎游戏资源的完整实战指南 [特殊字符]
  • 东莞化妆学校排名出炉!前三名推荐名单 - 梅1梅
  • 告别设备标识混乱!用uniappx插件Ba-IdCode-U一站式获取OAID/AndroidID/IMEI(附隐私合规指南)
  • 支付宝红包套装回收避坑指南:教你选正规靠谱的变现渠道 - 团团收购物卡回收
  • 将-TensorFlow-模型转换为-PyTorch-的挑战
  • 别再只会while(1)了!聊聊MCU裸机开发的6种实用架构,从51到STM32都能用
  • 卡证检测矫正模型性能调优:降低延迟与提升吞吐量实践
  • 如何通过Akagi提升麻将水平:从新手到高手的智能助手指南
  • HunyuanVideo-Foley效果展示:AI生成的量子计算实验室环境音效(科技感)
  • 别只点‘Passive’!深入理解Altium Designer引脚电气类型,从根源上杜绝原理图ERC错误
  • Phi-4-Reasoning-Vision惊艳效果:复杂构图中空间关系与因果逻辑推理
  • AutoTask:让安卓自动化任务变得简单高效的开源工具
  • 将产品数据转化为战略决策
  • 终极免费生态系统模拟器Ecosim:如何用5分钟创建你的虚拟生态世界
  • Windows HEIC缩略图终极指南:3分钟让iPhone照片在Windows完美预览
  • 将领域专业知识注入您的-AI-系统
  • GitHub下载加速终极指南:告别龟速,3分钟让下载速度飙升300%
  • 避坑指南:K8s集群APIServer IP修改后kubectl不可用的解决方案
  • 将视觉-语言智能引入-RAG-的-ColPali
  • 嵌入式系统的启动流程与初始化详解
  • GIL已死,GIL万岁?——2024大厂Python并发岗面试题库首发(含性能压测对比数据)
  • STM32 GPIO模式实战:开漏输出与推挽输出的5个常见应用场景解析
  • CasRel模型智能体(Agent)应用:自主进行信息检索与关系归纳
  • 如何用WebPlotDigitizer快速提取论文图表数据?5分钟学会高效科研技巧