当前位置: 首页 > news >正文

Pythonasyncio子进程管理

=================================================================
Python asyncio 子进程管理:异步执行外部命令
=================================================================

asyncio 提供了 create_subprocess_exec 和 create_subprocess_shell
来异步管理子进程,避免阻塞事件循环。本文详解完整用法。

=================================================================
一、create_subprocess_exec:执行外部程序
=================================================================

import asyncio
import sys

async def run_cmd_simple():
"""
异步执行一个外部命令,等待其完成。
create_subprocess_exec 直接执行程序(不经过 shell)。
"""
print("开始执行 'ls -la' ...")
# 创建子进程:第一个参数是可执行文件路径,后续是参数
process = await asyncio.create_subprocess_exec(
"ls" if sys.platform != "win32" else "dir", # Linux/macOS 用 ls
"-la",
# 默认继承父进程的标准流
stdout=asyncio.subprocess.PIPE, # 捕获标准输出
stderr=asyncio.subprocess.PIPE, # 捕获标准错误
)
# communicate() 等待进程结束并返回 (stdout, stderr)
stdout, stderr = await process.communicate()
print(f"返回码: {process.returncode}")
if stdout:
print(f"标准输出 ({len(stdout)} 字节):")
print(stdout.decode(errors="replace")[:500])
if stderr:
print(f"标准错误: {stderr.decode(errors='replace')}")

=================================================================
二、create_subprocess_shell:通过 Shell 执行
=================================================================

async def run_cmd_shell():
"""
通过系统 shell 执行命令。
注意:存在 shell 注入风险,不要用于不可信的输入。
"""
process = await asyncio.create_subprocess_shell(
"echo 'Hello from asyncio' && sleep 1 && echo 'Done'",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await process.communicate()
print(f"Shell 输出: {stdout.decode()}")

# create_subprocess_exec 与 create_subprocess_shell 的区别:
# - exec: 直接执行程序,更安全,不需要 shell 转义
# - shell: 通过 /bin/sh (Windows: cmd.exe) 执行,支持管道、重定向
# - exec 推荐用于所有可以避免使用 shell 的场景

=================================================================
三、Process 类详解
=================================================================

async def process_class_demo():
"""
展示 asyncio.subprocess.Process 的完整方法。
Process 对象通过 create_subprocess_* 返回。
"""
process = await asyncio.create_subprocess_exec(
"python3", "--version",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
# 工作目录
cwd="/tmp",
# 环境变量
env={"PYTHONUNBUFFERED": "1"},
)

# 进程标识
print(f"PID: {process.pid}")

# 等待进程结束,获取返回码
returncode = await process.wait()
print(f"返回码: {returncode}")

# 检查进程状态
if process.returncode == 0:
print("执行成功")
else:
print(f"执行失败,返回码: {process.returncode}")

# 发送信号(仅 Unix)
# process.send_signal(signal.SIGTERM)
# process.terminate()
# process.kill()

=================================================================
四、读取子进程输出:逐行读取
=================================================================

async def stream_subprocess_output():
"""
逐行读取子进程输出,适用于长时间运行的进程。
使用 stdout.readline() 而不是 communicate(),
后者会等待进程结束才返回所有数据。
"""
process = await asyncio.create_subprocess_exec(
"ping", "-c", "5", "127.0.0.1",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)

print("开始逐行读取 ping 输出:")
# 逐行读取标准输出(实时读取)
while True:
line = await process.stdout.readline()
if not line:
break # EOF,进程已结束
# 解码并去除换行符
text = line.decode("utf-8", errors="replace").rstrip()
print(f"[ping] {text}")

# 确保进程已完全结束
await process.wait()
print(f"进程结束,返回码: {process.returncode}")

=================================================================
五、超时控制
=================================================================

async def subprocess_with_timeout():
"""
为子进程设置超时。
使用 asyncio.wait_for 包装 communicate()。
"""
process = await asyncio.create_subprocess_exec(
"sleep", "10", # 持续 10 秒
stdout=asyncio.subprocess.PIPE,
)

try:
# 设置 3 秒超时
stdout, stderr = await asyncio.wait_for(
process.communicate(), timeout=3.0
)
except asyncio.TimeoutError:
print("子进程执行超时,正在终止...")
process.terminate() # 发送 SIGTERM
# 等待进程真正结束
await process.wait()
print(f"进程已终止,返回码: {process.returncode}")
except Exception as e:
print(f"其他错误: {e}")
process.kill()
await process.wait()

=================================================================
六、并发执行多个子进程
=================================================================

async def run_multiple_subprocesses():
"""
使用 asyncio.gather 同时执行多个子进程。
这比串行执行快得多。
"""
async def run_one(cmd: str, delay: int):
"""运行一个子进程并返回结果"""
print(f"开始执行: {cmd}")
process = await asyncio.create_subprocess_shell(
f"echo 'Starting {cmd}' && sleep {delay} && echo '{cmd} done'",
stdout=asyncio.subprocess.PIPE,
shell=True,
)
stdout, _ = await process.communicate()
return stdout.decode().strip()

# 并发执行 3 个任务
results = await asyncio.gather(
run_one("Task-1", 2),
run_one("Task-2", 1),
run_one("Task-3", 3),
)

for result in results:
print(f"结果: {result}")

=================================================================
七、与子进程交互:写入 stdin
=================================================================

async def interact_with_subprocess():
"""
向子进程的标准输入写入数据,并读取输出。
适用于交互式程序。
"""
process = await asyncio.create_subprocess_exec(
"python3", "-c",
"""
import sys
for line in sys.stdin:
sys.stdout.write(f"处理: {line.strip().upper()}\\n")
sys.stdout.flush()
""",
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)

# 向 stdin 写入数据
process.stdin.write(b"hello\nworld\nasyncio\n")
# 务必调用 drain() 刷新缓冲区
await process.stdin.drain()
# 关闭 stdin 表示输入结束
process.stdin.close()

# 读取所有输出
stdout, stderr = await process.communicate()
print("子进程输出:")
print(stdout.decode())

=================================================================
八、总结
=================================================================

# 1. create_subprocess_exec 直接执行程序,更安全
# 2. create_subprocess_shell 经过 shell,有注入风险
# 3. communicate() 同时读写 stdin/stdout/stderr 并等待结束
# 4. wait() 仅等待进程结束,获取返回码
# 5. 逐行读取适用于实时流式输出场景
# 6. asyncio.wait_for 可为子进程设置超时
# 7. asyncio.gather 可并发管理多个子进程
# 8. 通过 stdin.write() 和 drain() 向子进程发送数据

if __name__ == "__main__":
asyncio.run(stream_subprocess_output())

http://www.jsqmd.com/news/918349/

相关文章:

  • 从“水缸”到“高速公路”:用生活化比喻彻底搞懂电容的滤波、旁路与去耦(附LTspice仿真)
  • 终极Maya动画师效率革命:Studio Library姿势管理完全指南 [特殊字符]
  • 3分钟搞定Axure汉化:告别英文界面,产品经理的救星来了! [特殊字符]
  • 原型设计工具对比与校园失物招领系统原型设计
  • 别再只会用PEC了!CST材料库保姆级使用指南:从Normal介质到Lossy Metal的实战选择
  • 科瑞昌省电空调选购指南:工业大空间降温选型全攻略 - 资讯纵览
  • Android音乐播放器实战工程:带用户系统、本地数据库与四大组件完整实现
  • 花卉图片分类实战包:Python数据读取、自动划分与模型识别全流程代码
  • 从零打造仿生机械手:Arduino、Python与伺服电机的工程实践
  • 2026年10款降AIGC工具对比:最高AI率100%直降至0.12%
  • 智能电视上网难?TV Bro电视浏览器如何让大屏浏览变得轻松愉悦?
  • Google Drive自动化下载技术深度解析与Python实用指南
  • RNN案例_seq2seq 英译法案例
  • VR-Reversal:如何免费将3D视频转换为2D的终极指南
  • 2026护网行动全指南(干货版):从认知到实战,攻防落地可照搬
  • 保姆级教程:在CentOS 7上用源码编译安装Netdata 1.0.0,并配置开机自启
  • 从数据孤岛到智能闭环,AI与CRM深度整合的4层架构设计全解析,含可复用API对接清单
  • 从下载镜像到进入桌面:一份给纯小白的 VirtualBox 装 Ubuntu 22.04 LTS 保姆级避坑记录
  • 实用高效:chfsgui文件共享工具5分钟快速配置完整指南
  • Windows安卓应用安装器:三步实现电脑运行手机应用
  • 2026年硬核亲测:10款降AIGC工具深度横评(附对比表)
  • 2026年苏州本地建筑防水补漏专业服务机构选型核心要点与合规服务商梳理 专业防水公司排名推荐(2026年5月防水补漏最新TOP权威排名) - 鼎壹万修缮说
  • 微信聊天记录永久保存终极指南:如何一键导出所有聊天数据
  • 3步掌握Unity游戏马赛克移除:UniversalUnityDemosaics完整指南
  • 破解雨衣批发痛点:FEP一体化方法论如何实现高性价比稳定供应? - 资讯纵览
  • 北京中央电化教育馆家庭教育指导师报名入口:中山优才教育 - 当下教育培训干货
  • Win10硬盘‘失联’全记录:从拍打到换盘,我的No Bootable Device修复踩坑指南
  • HFSS 2023 R2 新界面速览:从菜单栏到状态栏,高效建模你必须知道的几个冷门技巧
  • UE5 Niagara避坑指南:GPU粒子不支持灯光渲染?这些性能优化技巧你得知道
  • 【AI运维生死线】:当LangChain链式调用突然卡死——3层异步栈追踪+实时可观测性注入方案