当前位置: 首页 > news >正文

Python网络编程与Socket通信

Python网络编程与Socket通信

一、Socket基础概念

Socket(套接字)是网络通信的端点,提供了进程间通信的机制。Python的socket模块提供了对底层网络接口的访问。

import socket

# 创建TCP Socket
tcp_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

# 创建UDP Socket
udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

# 地址族
# AF_INET - IPv4
# AF_INET6 - IPv6
# AF_UNIX - Unix域套接字

# Socket类型
# SOCK_STREAM - TCP(面向连接,可靠)
# SOCK_DGRAM - UDP(无连接,不可靠但快速)


二、TCP服务器与客户端

2.1 简单TCP服务器

import socket
import threading

class TCPServer:
def __init__(self, host='0.0.0.0', port=8080):
self.host = host
self.port = port
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

def start(self):
self.server_socket.bind((self.host, self.port))
self.server_socket.listen(5)
print(f"服务器启动在 {self.host}:{self.port}")

try:
while True:
client_socket, address = self.server_socket.accept()
print(f"新连接来自 {address}")
thread = threading.Thread(
target=self.handle_client,
args=(client_socket, address)
)
thread.daemon = True
thread.start()
except KeyboardInterrupt:
print("服务器关闭")
finally:
self.server_socket.close()

def handle_client(self, client_socket, address):
try:
while True:
data = client_socket.recv(4096)
if not data:
break
message = data.decode('utf-8')
print(f"收到来自 {address}: {message}")
response = f"服务器收到: {message}"
client_socket.send(response.encode('utf-8'))
except ConnectionResetError:
print(f"客户端 {address} 断开连接")
finally:
client_socket.close()

2.2 TCP客户端

class TCPClient:
def __init__(self, host='localhost', port=8080):
self.host = host
self.port = port
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

def connect(self):
self.socket.connect((self.host, self.port))
print(f"已连接到 {self.host}:{self.port}")

def send(self, message):
self.socket.send(message.encode('utf-8'))
response = self.socket.recv(4096)
return response.decode('utf-8')

def close(self):
self.socket.close()

# 使用
client = TCPClient()
client.connect()
response = client.send("Hello, Server!")
print(response)
client.close()


三、带协议的通信

网络通信需要定义消息边界,常见方案:

3.1 固定长度头部协议

import struct
import json

class MessageProtocol:
"""消息格式:4字节长度头 + JSON消息体"""
HEADER_SIZE = 4

@staticmethod
def encode(data: dict) -> bytes:
body = json.dumps(data).encode('utf-8')
header = struct.pack('!I', len(body)) # 大端序4字节无符号整数
return header + body

@staticmethod
def decode(socket_conn) -> dict:
# 读取头部
header = MessageProtocol._recv_exact(socket_conn, MessageProtocol.HEADER_SIZE)
if not header:
return None
body_length = struct.unpack('!I', header)[0]

# 读取消息体
body = MessageProtocol._recv_exact(socket_conn, body_length)
return json.loads(body.decode('utf-8'))

@staticmethod
def _recv_exact(conn, length):
"""确保接收指定长度的数据"""
data = b''
while len(data) < length:
chunk = conn.recv(length - len(data))
if not chunk:
return None
data += chunk
return data

3.2 使用协议的服务器

class ProtocolServer:
def __init__(self, host='0.0.0.0', port=9000):
self.host = host
self.port = port
self.handlers = {}

def register_handler(self, action, handler):
self.handlers[action] = handler

def start(self):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server:
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind((self.host, self.port))
server.listen(5)
print(f"协议服务器启动在 {self.host}:{self.port}")

while True:
conn, addr = server.accept()
threading.Thread(target=self._handle, args=(conn, addr)).start()

def _handle(self, conn, addr):
try:
while True:
message = MessageProtocol.decode(conn)
if message is None:
break

action = message.get('action')
handler = self.handlers.get(action)

if handler:
response = handler(message.get('data'))
else:
response = {'error': f'未知操作: {action}'}

conn.send(MessageProtocol.encode(response))
finally:
conn.close()

# 注册处理器
server = ProtocolServer()
server.register_handler('echo', lambda data: {'echo': data})
server.register_handler('time', lambda data: {'time': str(datetime.now())})


四、UDP通信

import socket

class UDPServer:
def __init__(self, host='0.0.0.0', port=8888):
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.socket.bind((host, port))
print(f"UDP服务器启动在 {host}:{port}")

def run(self):
while True:
data, addr = self.socket.recvfrom(65535)
message = data.decode('utf-8')
print(f"收到来自 {addr}: {message}")
response = f"ACK: {message}"
self.socket.sendto(response.encode('utf-8'), addr)

class UDPClient:
def __init__(self):
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.socket.settimeout(5.0) # 设置超时

def send(self, message, host='localhost', port=8888):
self.socket.sendto(message.encode('utf-8'), (host, port))
try:
data, addr = self.socket.recvfrom(65535)
return data.decode('utf-8')
except socket.timeout:
return None


五、select/poll多路复用

import select

class SelectServer:
"""使用select实现非阻塞IO多路复用"""
def __init__(self, host='0.0.0.0', port=8080):
self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server.setblocking(False)
self.server.bind((host, port))
self.server.listen(100)

self.inputs = [self.server]
self.outputs = []
self.message_queues = {}

def run(self):
print("Select服务器启动")
while self.inputs:
readable, writable, exceptional = select.select(
self.inputs, self.outputs, self.inputs, 1.0
)

for sock in readable:
if sock is self.server:
# 新连接
conn, addr = sock.accept()
conn.setblocking(False)
self.inputs.append(conn)
self.message_queues[conn] = []
else:
# 已有连接的数据
data = sock.recv(4096)
if data:
self.message_queues[sock].append(data)
if sock not in self.outputs:
self.outputs.append(sock)
else:
self._remove_connection(sock)

for sock in writable:
if self.message_queues.get(sock):
msg = self.message_queues[sock].pop(0)
sock.send(msg)
else:
self.outputs.remove(sock)

for sock in exceptional:
self._remove_connection(sock)

def _remove_connection(self, sock):
if sock in self.inputs:
self.inputs.remove(sock)
if sock in self.outputs:
self.outputs.remove(sock)
del self.message_queues[sock]
sock.close()


六、selectors高级接口

import selectors

class SelectorServer:
"""使用selectors模块(推荐方式)"""
def __init__(self, host='0.0.0.0', port=8080):
self.sel = selectors.DefaultSelector()
self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server.setblocking(False)
self.server.bind((host, port))
self.server.listen(100)
self.sel.register(self.server, selectors.EVENT_READ, self._accept)

def _accept(self, sock, mask):
conn, addr = sock.accept()
conn.setblocking(False)
self.sel.register(conn, selectors.EVENT_READ, self._read)

def _read(self, conn, mask):
data = conn.recv(4096)
if data:
conn.send(data) # echo
else:
self.sel.unregister(conn)
conn.close()

def run(self):
print("Selector服务器启动")
while True:
events = self.sel.select(timeout=1)
for key, mask in events:
callback = key.data
callback(key.fileobj, mask)


七、HTTP客户端实现

class SimpleHTTPClient:
"""手动实现HTTP/1.1客户端"""
def __init__(self):
self.socket = None

def request(self, method, url, headers=None, body=None):
from urllib.parse import urlparse

parsed = urlparse(url)
host = parsed.hostname
port = parsed.port or 80
path = parsed.path or '/'

# 建立连接
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.connect((host, port))

# 构建请求
request_line = f"{method} {path} HTTP/1.1\r\n"
default_headers = {
'Host': host,
'Connection': 'close',
'User-Agent': 'PythonSocket/1.0',
}
if headers:
default_headers.update(headers)
if body:
default_headers['Content-Length'] = str(len(body))

header_str = ''.join(f"{k}: {v}\r\n" for k, v in default_headers.items())
request = f"{request_line}{header_str}\r\n"

if body:
request += body

self.socket.send(request.encode('utf-8'))

# 接收响应
response = b''
while True:
chunk = self.socket.recv(4096)
if not chunk:
break
response += chunk

self.socket.close()
return self._parse_response(response)

def _parse_response(self, raw):
header_end = raw.find(b'\r\n\r\n')
header_part = raw[:header_end].decode('utf-8')
body = raw[header_end + 4:]

lines = header_part.split('\r\n')
status_line = lines[0]
status_code = int(status_line.split(' ')[1])

headers = {}
for line in lines[1:]:
key, value = line.split(': ', 1)
headers[key.lower()] = value

return {'status': status_code, 'headers': headers, 'body': body}


八、WebSocket简易实现

import hashlib
import base64

class WebSocketServer:
MAGIC_STRING = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"

def __init__(self, host='0.0.0.0', port=8765):
self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server.bind((host, port))
self.server.listen(5)

def _handshake(self, conn):
"""WebSocket握手"""
data = conn.recv(4096).decode('utf-8')
key = None
for line in data.split('\r\n'):
if 'Sec-WebSocket-Key' in line:
key = line.split(': ')[1].strip()
break

if not key:
return False

accept_key = base64.b64encode(
hashlib.sha1((key + self.MAGIC_STRING).encode()).digest()
).decode()

response = (
"HTTP/1.1 101 Switching Protocols\r\n"
"Upgrade: websocket\r\n"
"Connection: Upgrade\r\n"
f"Sec-WebSocket-Accept: {accept_key}\r\n\r\n"
)
conn.send(response.encode())
return True

def _decode_frame(self, data):
"""解码WebSocket帧"""
if len(data) < 2:
return None

opcode = data[0] & 0x0F
masked = data[1] & 0x80
payload_length = data[1] & 0x7F

offset = 2
if payload_length == 126:
payload_length = struct.unpack('!H', data[2:4])[0]
offset = 4
elif payload_length == 127:
payload_length = struct.unpack('!Q', data[2:10])[0]
offset = 10

if masked:
mask = data[offset:offset+4]
offset += 4
payload = bytearray(data[offset:offset+payload_length])
for i in range(len(payload)):
payload[i] ^= mask[i % 4]
else:
payload = data[offset:offset+payload_length]

return {'opcode': opcode, 'payload': bytes(payload)}

def _encode_frame(self, message):
"""编码WebSocket帧"""
payload = message.encode('utf-8')
frame = bytearray()
frame.append(0x81) # FIN + text opcode

length = len(payload)
if length < 126:
frame.append(length)
elif length < 65536:
frame.append(126)
frame.extend(struct.pack('!H', length))
else:
frame.append(127)
frame.extend(struct.pack('!Q', length))

frame.extend(payload)
return bytes(frame)


九、实用工具函数

def get_local_ip():
"""获取本机IP地址"""
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
s.connect(('8.8.8.8', 80))
return s.getsockname()[0]
finally:
s.close()

def port_scan(host, start_port, end_port, timeout=0.5):
"""端口扫描"""
open_ports = []
for port in range(start_port, end_port + 1):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)
result = sock.connect_ex((host, port))
if result == 0:
open_ports.append(port)
sock.close()
return open_ports

def resolve_dns(hostname):
"""DNS解析"""
try:
ip = socket.gethostbyname(hostname)
all_ips = socket.gethostbyname_ex(hostname)
return {'ip': ip, 'all': all_ips}
except socket.gaierror as e:
return {'error': str(e)}


十、总结

网络编程要点:
1. TCP适合需要可靠传输的场景,UDP适合实时性要求高的场景
2. 必须处理消息边界问题(TCP是字节流,不保证消息完整性)
3. 生产环境使用selectors或asyncio处理并发连接
4. 注意设置超时,避免永久阻塞
5. 实际项目中优先使用高层库(requests、aiohttp、websockets)

http://www.jsqmd.com/news/987531/

相关文章:

  • 想用 Claude Fable 5?AWS Bedrock 用户得把数据交给 Anthropic 30 天,我看完蚌埠住了
  • 2026怀化防水补漏哪家靠谱?正规公司排名及避坑价格指南 - 苏易修缮
  • 2026 福州欧米茄回收行情|海马 / 蝶飞 / 超霸,热门款价格走势 - 奢侈品回收评测
  • 音高与色彩的物理映射:跨模态设计中的光谱-情感协同方法
  • 休闲食品行业数据分析平台建设方案,揭秘增长新引擎!
  • 眉山全屋定制橱柜品牌排行 核心维度实测对比 - 起跑123
  • OfficeLite中更改机器人型号
  • 收银机用途------自动点赞评论
  • 适合送礼的时尚百搭女生手表品牌排行参考 - 互联网科技品牌测评
  • 伍德氏紫光灯怎么样?:瑞德医生精准专业 - 思溯深度专栏
  • 如果有一副眼镜,你打手语,它帮你“说”出来,有人需要吗?
  • Sqribble:模板即规则的文档操作系统解析
  • 蓝底证件照哪个app好2026?4款必备小程序对比测评
  • 2026年 呼和浩特汽车膜推荐榜:隔热防晒/防爆隐私/改色车衣全案品牌工厂实力解析 - 品牌发掘
  • MSP430F149实战入门包:带中文资料、IAR可运行工程和DS18B20+LCD1602温显例程
  • QMCDecode终极指南:如何轻松解锁QQ音乐加密音频文件
  • 2026年 知识产权代办机构推荐榜:深圳/全球专利商标软件服务,工业设计赋能产品外观与家电设备创新! - 品牌发掘
  • 2026年矿用防爆伺服电机服务商避坑与技术选型深度指南 - 品牌报告
  • 2026重庆钻石回收安全榜单|收的顶满分登顶,安心变现不踩雷 - 奢侈品回收测评
  • Windows系统安装包管理工具Chocolatey的安装使用教程
  • 出海企业如何高效匹配全球市场调研供应商?
  • Three.js火焰特效演示包:含着色器源码、可直接运行的WebGL火光模拟
  • chunking和embedding的差别
  • Python项目结构从混乱到清晰的组织之道
  • 2026武汉黄金回收实测排行——本地人变现闭眼选不踩坑 - 奢侈品回收测评
  • 金价大跳水!2026年6月黄金持续大跌,后市还要跌,青岛人抓紧高位落袋 - 奢侈品回收测评
  • 哈尔滨黄金首饰回收黑幕:奢二网教你一招识破鬼秤调包套路 - 讯息早知道
  • 拼多多代运营电话_拼多多代运营公司联系方式_杭州百推官方热线 13968060425 - 品牌榜中榜
  • 眉山全屋定制衣柜品牌排行:实测维度对比解析 - 起跑123
  • Google 26 vo辅助真题分享