当前位置: 首页 > news >正文

locust-WebSocket压测

连接WS的库有的是支持异步IO的,项目中我们推荐这样的库,但是压测时还是要选择同步的库

异步

安装

pip install websockets

代码示例

import asyncio
import websockets
import json
import randomasync def mytest():async with websockets.connect('wss://sockettest.xhkjedu.com/ws') as websocket:num = random.randint(0, 10000000)msg = {"b": {"num": num},"c": 123456,}msgstr = json.dumps(msg)await websocket.send(msgstr)print(f"↑: {msgstr}")greeting = await websocket.recv()print(f"↓: {greeting}")asyncio.get_event_loop().run_until_complete(mytest())

同步

官网地址

https://pypi.org/project/websocket-client/

安装

pip install websocket-client

示例

from websocket import create_connection
import json
import randomws = create_connection("wss://sockettest.xhkjedu.com/ws")num = random.randint(0, 10000000)
msg = {"b": {"num": num},"c": 123456,
}msgstr = json.dumps(msg)
print("Sending " + msgstr)
ws.send(msgstr)result = ws.recv()
print("Received '%s'" % result)
ws.close()
import websocketdef on_message(ws, message):print(ws)print(message)def on_error(ws, error):print(ws)print(error)def on_close(ws):print(ws)print("### closed ###")websocket.enableTrace(True)
ws = websocket.WebSocketApp("ws://127.0.0.1:8888/track",on_message=on_message,on_error=on_error,on_close=on_close)ws.run_forever()

Websocket压测

Jmeter要测试websocket接口,需要先下载安装一个websocket samplers by peter doornbosch的插件

而locust因为是代码实现,所以可以进行任何的测试,引用相应的库即可。

from locust import User, task, events
import time
from websocket import create_connection
import json
import randomdef success_call(name, recvText, total_time):events.request_success.fire(request_type="[Success]",name=name,response_time=total_time,response_length=len(recvText))def fail_call(name, total_time, e):events.request_failure.fire(request_type="[Fail]",name=name,response_time=total_time,response_length=0,exception=e,)class WebSocketClient(object):def __init__(self, host):self.host = hostself.ws = Nonedef connect(self, burl):self.ws = create_connection(burl)def recv(self):return self.ws.recv()def send(self, msg):self.ws.send(msg)class WebsocketUser(User):abstract = Truedef __init__(self, *args, **kwargs):super(WebsocketUser, self).__init__(*args, **kwargs)self.client = WebSocketClient(self.host)self.client._locust_environment = self.environmentclass ApiUser(WebsocketUser):host = "wss://sockettest.xhkjedu.com/"@task(1)def pft(self):# wss 地址self.url = 'wss://sockettest.xhkjedu.com/ws'print("连接前")start_time = time.time()try:self.client.connect(self.url)print("连接后")# 发送的订阅请求num = random.randint(0, 10000000)msg = {"b": {"num": num},"c": 123456,}msgstr = json.dumps(msg)self.client.send(msgstr)print(f"↑: {msgstr}")greeting = self.client.recv()print(f"↓: {greeting}")except Exception as e:total_time = int((time.time() - start_time) * 1000)fail_call("Send", total_time, e)else:total_time = int((time.time() - start_time) * 1000)success_call("Send", "success", total_time)
http://www.jsqmd.com/news/38575/

相关文章:

  • 11.11 CSP-S 模拟赛 T3. square
  • 2025年排渣阀订制厂家权威推荐榜单:陶瓷阀门/搪瓷阀门/铸铁阀门源头厂家精选
  • locust常用类和方法解析
  • locust高级特性详解
  • Aoao Round 2 比赛总结
  • 基于遗传算法的PID控制器参数整定方法详解
  • QT项目复盘:如何在有限资源下把桌面端做成‘高端应用’?
  • 揭开时序数据库的秘密:为何它是数据存储的未来?
  • 11月12日打卡
  • Java中将String字符串转换为算术表达式并计算
  • 按钮固定在底部
  • locust基础
  • 基于HSMS通信标准的SECS通讯程序
  • 设置fdfs自动启动
  • 完整教程:Redis GEO 模块深度解析:从原理到高可用架构实践
  • 2025/11/9
  • 办公楼设计多少钱一平?广州办公楼设计收费标准
  • 2025/11/8
  • macOS 下载汇总 (系统、应用和教程) - macOS Tahoe 26
  • 使用page-meta为u-popup的遮罩层添加穿透屏蔽
  • 2025年广州到吉尔吉斯斯坦海运公司权威推荐榜单:广州到吉尔吉斯斯坦运输/广州到吉尔吉斯斯坦双清门到门/广州到吉尔吉斯斯坦双清源头公司精选
  • AI人力资源管理系统如何让HR的工作更高效、更有判断力
  • etcd 参数调整
  • 2026年HR系统选型全攻略:功能、成本与落地建议
  • 实用指南:AI应用架构师眼中的智能家居AI智能体:开启智能化居家生活的新机遇
  • 11.10 联考总结
  • PPT-EA:PPT自动生成器 - 详解
  • 锦州西林瓶灌装压塞机厂家终身维护服务及费用指南
  • 微算法科技(NASDAQ MLGO)开发基于优先级的区块链交易打包算法,提高云边协同计算环境下的交易效率
  • 肇庆化妆品西林瓶灌装线推荐:食品级材质接触部件解析