当前位置: 首页 > news >正文

Python WebSocket 实时通信实战:构建实时Web应用

Python WebSocket 实时通信实战:构建实时Web应用

引言

在现代Web开发中,实时通信是构建交互式应用的关键技术。作为一名从Rust转向Python的后端开发者,我深刻体会到WebSocket在构建实时功能方面的重要性。WebSocket提供了双向通信能力,使得服务器和客户端可以实时交换数据。

WebSocket 核心概念

什么是WebSocket

WebSocket是一种在单个TCP连接上进行全双工通信的协议,具有以下特点:

  • 双向通信:服务器和客户端可以互相发送消息
  • 低延迟:建立连接后保持持久连接,避免HTTP请求开销
  • 实时性:支持实时推送数据
  • 跨域支持:支持跨域通信

架构设计

┌─────────────────────────────────────────────────────────────┐ │ WebSocket 客户端 │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ JavaScript WebSocket API │ │ │ │ 连接 → 发送消息 → 接收消息 → 断开连接 │ │ │ └─────────────────────────┬─────────────────────────┘ │ └─────────────────────────────┼─────────────────────────────┘ │ WebSocket协议 ▼ ┌─────────────────────────────────────────────────────────────┐ │ WebSocket 服务端 │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ Python WebSocket Server │ │ │ │ 接收连接 → 处理消息 → 广播消息 → 管理连接 │ │ │ └─────────────────────────────────────────────────────┘ │ └─────────────────────────────────────────────────────────────┘

环境搭建与基础配置

安装依赖

pip install websockets fastapi uvicorn

基本WebSocket服务器

import asyncio import websockets connected_clients = set() async def handle_client(websocket, path): connected_clients.add(websocket) print(f"Client connected. Total clients: {len(connected_clients)}") try: async for message in websocket: print(f"Received: {message}") # 广播消息给所有客户端 for client in connected_clients: if client != websocket: await client.send(message) finally: connected_clients.remove(websocket) print(f"Client disconnected. Total clients: {len(connected_clients)}") start_server = websockets.serve(handle_client, "localhost", 8765) asyncio.get_event_loop().run_until_complete(start_server) asyncio.get_event_loop().run_forever()

客户端实现

const ws = new WebSocket('ws://localhost:8765'); ws.onopen = function() { console.log('Connected to server'); ws.send('Hello, Server!'); }; ws.onmessage = function(event) { console.log('Received:', event.data); }; ws.onclose = function() { console.log('Disconnected from server'); };

FastAPI WebSocket集成

创建WebSocket端点

from fastapi import FastAPI, WebSocket, WebSocketDisconnect from typing import List app = FastAPI() class ConnectionManager: def __init__(self): self.active_connections: List[WebSocket] = [] async def connect(self, websocket: WebSocket): await websocket.accept() self.active_connections.append(websocket) def disconnect(self, websocket: WebSocket): self.active_connections.remove(websocket) async def broadcast(self, message: str): for connection in self.active_connections: await connection.send_text(message) manager = ConnectionManager() @app.websocket("/ws") async def websocket_endpoint(websocket: WebSocket): await manager.connect(websocket) try: while True: data = await websocket.receive_text() await manager.broadcast(f"Client says: {data}") except WebSocketDisconnect: manager.disconnect(websocket) await manager.broadcast("A client has disconnected")

运行服务器

uvicorn main:app --reload

高级特性实战

消息认证

from fastapi import WebSocket, Depends async def get_token(websocket: WebSocket): token = websocket.query_params.get("token") if token != "secret": await websocket.close(code=1008, reason="Invalid token") raise WebSocketDisconnect(code=1008) return token @app.websocket("/ws/auth") async def websocket_auth(websocket: WebSocket, token: str = Depends(get_token)): await websocket.accept() await websocket.send_text(f"Authenticated with token: {token}") while True: data = await websocket.receive_text() await websocket.send_text(f"Received: {data}")

分组广播

class RoomManager: def __init__(self): self.rooms: dict[str, List[WebSocket]] = {} async def join_room(self, room_id: str, websocket: WebSocket): if room_id not in self.rooms: self.rooms[room_id] = [] self.rooms[room_id].append(websocket) await websocket.accept() def leave_room(self, room_id: str, websocket: WebSocket): if room_id in self.rooms: self.rooms[room_id].remove(websocket) async def broadcast_to_room(self, room_id: str, message: str): if room_id in self.rooms: for connection in self.rooms[room_id]: await connection.send_text(message) room_manager = RoomManager() @app.websocket("/ws/room/{room_id}") async def websocket_room(websocket: WebSocket, room_id: str): await room_manager.join_room(room_id, websocket) try: while True: data = await websocket.receive_text() await room_manager.broadcast_to_room(room_id, f"Room {room_id}: {data}") except WebSocketDisconnect: room_manager.leave_room(room_id, websocket)

二进制数据传输

@app.websocket("/ws/binary") async def websocket_binary(websocket: WebSocket): await websocket.accept() while True: data = await websocket.receive_bytes() print(f"Received binary data: {len(data)} bytes") # 处理二进制数据 response = process_binary(data) await websocket.send_bytes(response)

实际业务场景

场景一:实时聊天应用

from datetime import datetime @app.websocket("/ws/chat/{username}") async def chat_endpoint(websocket: WebSocket, username: str): await manager.connect(websocket) await manager.broadcast(f"{username} joined the chat") try: while True: message = await websocket.receive_text() timestamp = datetime.now().strftime("%H:%M:%S") await manager.broadcast(f"[{timestamp}] {username}: {message}") except WebSocketDisconnect: manager.disconnect(websocket) await manager.broadcast(f"{username} left the chat")

场景二:实时数据推送

import asyncio from fastapi import BackgroundTasks async def send_realtime_data(websocket: WebSocket): while True: data = generate_realtime_data() await websocket.send_json(data) await asyncio.sleep(1) @app.websocket("/ws/realtime") async def realtime_endpoint(websocket: WebSocket): await websocket.accept() task = asyncio.create_task(send_realtime_data(websocket)) try: while True: await websocket.receive_text() except WebSocketDisconnect: task.cancel()

场景三:协作编辑

document_store = {} @app.websocket("/ws/collab/{doc_id}") async def collab_edit(websocket: WebSocket, doc_id: str): await websocket.accept() if doc_id not in document_store: document_store[doc_id] = {"content": "", "users": []} document_store[doc_id]["users"].append(websocket) try: while True: update = await websocket.receive_json() document_store[doc_id]["content"] = update["content"] for user in document_store[doc_id]["users"]: if user != websocket: await user.send_json(update) except WebSocketDisconnect: document_store[doc_id]["users"].remove(websocket)

性能优化

异步处理

async def process_message(message: str) -> str: # 异步处理消息 await asyncio.sleep(0.1) return f"Processed: {message}" @app.websocket("/ws/async") async def async_websocket(websocket: WebSocket): await websocket.accept() while True: message = await websocket.receive_text() processed = await process_message(message) await websocket.send_text(processed)

消息压缩

import gzip @app.websocket("/ws/compressed") async def compressed_websocket(websocket: WebSocket): await websocket.accept() while True: compressed_data = await websocket.receive_bytes() data = gzip.decompress(compressed_data).decode('utf-8') response = f"Received: {data}" compressed_response = gzip.compress(response.encode('utf-8')) await websocket.send_bytes(compressed_response)

总结

WebSocket为Python后端开发者提供了构建实时Web应用的强大工具。通过双向通信和低延迟特性,WebSocket在实时聊天、数据推送和协作编辑等场景中表现出色。从Rust开发者的角度来看,Python的WebSocket实现虽然在性能上不如Rust,但在开发效率和生态成熟度方面具有优势。

在实际项目中,建议合理使用分组广播和消息认证来构建安全、高效的实时应用。

http://www.jsqmd.com/news/810502/

相关文章:

  • 告别答辩PPT焦虑:百考通AI一键生成,高效备战毕业答辩
  • AI时代的战神金刚——构建你的外部大脑与商业闭环@围巾哥萧尘
  • 【AI响应速度生死线】:Haiku在实时客服/编程助手/边缘设备的3大不可替代性验证
  • NotebookLM播客生成质量暴跌真相:训练数据污染率高达18.7%?我们逆向拆解了其RAG音频对齐层
  • LabVIEW主要设计特性与工程价值
  • STM32实战:BMP280气压模块IIC驱动与数据精准采集
  • 不靠感觉写代码:Matt Pocock 的 Skills 如何让 AI 写出你真正想要的代码
  • 半导体行业周期解析:从供需失衡到产业链博弈的生存指南
  • 终极音乐解锁指南:免费工具让你在任何设备播放加密音乐
  • 从底层逻辑了解AI
  • 基于SimpleX协议构建私有AI通信通道:OpenClaw插件部署指南
  • 氛围工程指南:如何量化与塑造技术团队的健康氛围
  • gptstudio:R语言数据分析的AI副驾驶,重塑RStudio工作流
  • 【ChatGPT Slogan生成黄金法则】:20年品牌技术专家亲授3步高转化文案炼金术
  • 假冒 TronLink 的 MV3 扩展钓鱼攻击机理与 Web3 钱包安全防御
  • 隐私保护机器学习技术:MPC与FHE对比与应用
  • 快速原型开发中利用Taotoken分钟级接入验证创意
  • PS图片文字修改教程 简单几步完美替换文字内容
  • 137.从 CUDA 环境到模型部署!YOLOv8 全流程实战,适配工业质检 / 自动驾驶多场景
  • 【实战指南】App Inventor对接阿里云:打造STM32温湿度数据可视化APP
  • 使用 OpenClaw 配置 Taotoken 作为其 AI 供应商的详细步骤
  • ComfyUI-FramePackWrapper:8GB显存也能流畅生成高质量AI视频的终极方案
  • 高效清理磁盘空间:DupeGuru重复文件查找工具完整指南 [特殊字符]
  • superpowers skill 6.2: receiving-code-review
  • 2026年金华餐饮SaaS系统选型参考:推荐3家具备落地服务能力的本地服务商 - 产业观察网
  • AI智能体驱动B2B线索挖掘:开源模板实现自动化客户发现与评分
  • 告别熬夜硬肝!百考通AI 助力高效完成答辩PPT,让毕业季更从容
  • 终极指南:3分钟掌握RPG Maker加密资源解密技巧
  • 告别卡顿!用EnhancedScroller插件优化Unity UI长列表的完整配置流程(含性能对比)
  • WeChatFerry:基于RPC与DLL注入的微信PC端自动化框架深度解析