声明
本文章中所有内容仅供学习交流使用,不用于其他任何目的,抓包 内容、敏感网址、数据接口等均已做脱敏处理,严禁用于商业用途和非法用途,否则由此产生的一切后果均与作者无关!侵权通过头像私信或名字简介叫我删除博客谢谢。
部分python代码
import asyncio import execjs import websockets import json import ssl import time from datetime import datetime from Crypto.Cipher import AES from Crypto.Util.Padding import unpad import base64 class FootballScoreClient: def __init__(self, ads_tracker_baidu): ads = ads_tracker_baidu ck = f'ads-tracker-baidu={ads}' self.url = "scorepush/football" self.headers = { 'cookie':ck } self.initial_message = { "device": "pc", "topic": "USER.topic.app.8" } self.websocket = None self.is_running = False async def connect(self): """建立WebSocket连接""" try: ssl_context = ssl.create_default_context() ssl_context.check_hostname = False ssl_context.verify_mode = ssl.CERT_NONE self.websocket = await websockets.connect( self.url, extra_headers=self.headers, ) print(f"[{self._get_timestamp()}] 已连接到服务器") return True except Exception as e: print(f"[{self._get_timestamp()}] 连接失败: {e}") return False async def send_initial_message(self): """发送初始消息""" try: await self.websocket.send(json.dumps(self.initial_message)) print(f"[{self._get_timestamp()}] 发送初始消息: {self.initial_message}") except Exception as e: print(f"[{self._get_timestamp()}] 发送消息失败: {e}") async def receive_messages(self): """接收消息循环""" try: async for message in self.websocket: print(f"[{self._get_timestamp()}] 收到消息: {message}") await self.handle_message(message) except websockets.exceptions.ConnectionClosed as e: print(f"[{self._get_timestamp()}] 连接关闭: {e}") self.is_running = False except Exception as e: print(f"[{self._get_timestamp()}] 接收消息错误: {e}") self.is_running = False async def handle_message(self, message): """处理接收到的消息""" try: data = decrypt_aes_cbc(message) # 在这里添加你的消息处理逻辑 print(f"解析后的数据: {data}") pass except json.JSONDecodeError: print(f"无法解析消息: {message}") async def run(self, auto_reconnect=True): """主运行函数""" self.is_running = True while self.is_running: if await self.connect(): await self.send_initial_message() await self.receive_messages() if auto_reconnect and self.is_running: print(f"[{self._get_timestamp()}] 5秒后重新连接...") await asyncio.sleep(5) else: break async def stop(self): """停止客户端""" self.is_running = False if self.websocket: await self.websocket.close() def _get_timestamp(self): """获取当前时间戳""" return datetime.now().strftime("%Y-%m-%d %H:%M:%S") async def main(): cp = execjs.compile(open('1.js', 'r', encoding='utf-8', errors='ignore').read()) ads_tracker_baidu = cp.call('getAdsTrackerBaidu').split("ads-tracker-baidu=")[-1] print(ads_tracker_baidu) client = FootballScoreClient(ads_tracker_baidu) try: await client.run() except KeyboardInterrupt: print("\n正在关闭客户端...") await client.stop() if __name__ == "__main__": asyncio.run(main())
结果
![]()
总结
1.出于安全考虑,本章未提供完整流程,调试环节省略较多,只提供大致思路,具体细节要你自己还原,相信你也能调试出来。
2.具体更多细节请看名字进入详情了解更多细节,具体细节要你自己还原,相信你也能调试出来。