子进程管理
asyncio 提供了异步创建和管理子进程的能力,可以在不阻塞事件循环的情况下执行外部命令、与子进程交互。本章介绍如何使用 asyncio 进行子进程管理。
为什么需要异步子进程?
传统使用 subprocess.run() 或 os.system() 执行外部命令会阻塞整个线程:
import subprocess
import time
# 同步方式:阻塞等待
result = subprocess.run(['sleep', '5'], capture_output=True)
# 这 5 秒内,整个程序都无法执行其他操作
使用 asyncio 可以在等待子进程完成的同时继续执行其他任务:
import asyncio
async def run_command():
# 异步执行命令
proc = await asyncio.create_subprocess_exec('sleep', '5')
# 可以在这里做其他事情
await proc.wait()
asyncio.run(run_command())
创建子进程
create_subprocess_exec()
使用 create_subprocess_exec() 执行命令(推荐方式):
import asyncio
async def run_ls():
# 执行 ls -la 命令
proc = await asyncio.create_subprocess_exec(
'ls', '-la',
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
# 等待命令完成并获取输出
stdout, stderr = await proc.communicate()
print(f"返回码: {proc.returncode}")
print(f"标准输出:\n{stdout.decode()}")
print(f"标准错误:\n{stderr.decode()}")
asyncio.run(run_ls())
create_subprocess_exec() 的参数:
- 第一个参数:可执行程序路径或名称
- 后续参数:命令行参数(每个参数单独传入)
- 关键字参数:重定向输入输出
create_subprocess_shell()
使用 create_subprocess_shell() 执行 shell 命令:
import asyncio
async def run_shell_command():
# 执行 shell 命令(支持管道、重定向等 shell 特性)
proc = await asyncio.create_subprocess_shell(
'ls -la | grep ".py"',
stdout=asyncio.subprocess.PIPE
)
stdout, _ = await proc.communicate()
print(stdout.decode())
asyncio.run(run_shell_command())
注意:create_subprocess_shell() 存在 shell 注入风险,如果命令参数来自用户输入,必须进行严格的验证或转义。对于已知命令,推荐使用 create_subprocess_exec()。
两种方式的区别
| 特性 | create_subprocess_exec | create_subprocess_shell |
|---|---|---|
| 安全性 | 更安全,无 shell 注入风险 | 有 shell 注入风险 |
| Shell 特性 | 不支持管道、通配符等 | 支持完整 shell 语法 |
| 参数传递 | 参数分开传入 | 整个命令作为字符串 |
| 推荐场景 | 已知命令,参数可控 | 需要 shell 特性 |
进程输入输出
标准输出捕获
import asyncio
async def capture_output():
proc = await asyncio.create_subprocess_exec(
'echo', 'Hello, asyncio!',
stdout=asyncio.subprocess.PIPE
)
stdout, _ = await proc.communicate()
print(f"输出: {stdout.decode().strip()}")
asyncio.run(capture_output())
标准错误捕获
import asyncio
async def capture_stderr():
proc = await asyncio.create_subprocess_exec(
'python', '-c', 'import sys; print("错误信息", file=sys.stderr)',
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await proc.communicate()
print(f"标准输出: {stdout.decode()}")
print(f"标准错误: {stderr.decode()}")
asyncio.run(capture_stderr())
向进程发送输入
import asyncio
async def send_input():
# 创建一个需要输入的进程
proc = await asyncio.create_subprocess_exec(
'python', '-c', 'print(input("请输入: "))',
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
# 发送输入并获取输出
stdout, stderr = await proc.communicate(input=b'Hello from asyncio\n')
print(f"输出: {stdout.decode()}")
asyncio.run(send_input())
输入输出重定向到文件
import asyncio
async def redirect_to_file():
# 将输出重定向到文件
with open('output.txt', 'w') as f:
proc = await asyncio.create_subprocess_exec(
'ls', '-la',
stdout=f # 直接传入文件对象
)
await proc.wait()
print("输出已写入 output.txt")
asyncio.run(redirect_to_file())
输出重定向选项
| 选项 | 说明 |
|---|---|
None(默认) | 继承父进程的输出 |
asyncio.subprocess.PIPE | 创建管道,可以通过 communicate() 读取 |
asyncio.subprocess.DEVNULL | 丢弃输出 |
| 文件对象 | 重定向到文件 |
与进程交互
communicate() - 一次性交互
communicate() 等待进程完成并返回所有输出:
import asyncio
async def communicate_example():
proc = await asyncio.create_subprocess_exec(
'python', '-c', '''
for i in range(5):
print(f"Line {i}")
''',
stdout=asyncio.subprocess.PIPE
)
# 一次性获取所有输出
stdout, _ = await proc.communicate()
print(stdout.decode())
asyncio.run(communicate_example())
注意:communicate() 会等待进程结束,如果进程持续输出,程序会一直等待。
逐行读取输出
对于长时间运行的进程,可以逐行读取:
import asyncio
async def read_line_by_line():
proc = await asyncio.create_subprocess_exec(
'python', '-c', '''
import time
for i in range(5):
print(f"Line {i}")
time.sleep(1)
''',
stdout=asyncio.subprocess.PIPE
)
# 逐行读取
while True:
line = await proc.stdout.readline()
if not line:
break
print(f"收到: {line.decode().strip()}")
await proc.wait()
asyncio.run(read_line_by_line())
实时交互
对于需要实时交互的进程:
import asyncio
async def real_time_interaction():
proc = await asyncio.create_subprocess_exec(
'python', '-i', # Python 交互模式
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
async def read_output():
"""持续读取输出"""
while True:
line = await proc.stdout.readline()
if not line:
break
print(f"输出: {line.decode().strip()}")
async def send_commands():
"""发送命令"""
commands = [
b'1 + 1\n',
b'2 * 3\n',
b'exit()\n'
]
for cmd in commands:
proc.stdin.write(cmd)
await proc.stdin.drain()
await asyncio.sleep(0.5)
# 并发执行读写
await asyncio.gather(
read_output(),
send_commands()
)
asyncio.run(real_time_interaction())
进程控制
等待进程完成
import asyncio
async def wait_for_process():
proc = await asyncio.create_subprocess_exec('sleep', '3')
print("等待进程完成...")
returncode = await proc.wait() # 阻塞直到进程结束
print(f"进程完成,返回码: {returncode}")
asyncio.run(wait_for_process())
带超时的等待
import asyncio
async def wait_with_timeout():
proc = await asyncio.create_subprocess_exec('sleep', '10')
try:
returncode = await asyncio.wait_for(proc.wait(), timeout=3)
print(f"进程完成,返回码: {returncode}")
except asyncio.TimeoutError:
print("进程超时,终止进程")
proc.terminate()
await proc.wait()
asyncio.run(wait_with_timeout())
终止进程
import asyncio
import signal
async def terminate_process():
proc = await asyncio.create_subprocess_exec('sleep', '100')
await asyncio.sleep(1)
# 发送 SIGTERM 信号(请求进程正常退出)
proc.terminate()
await proc.wait()
print(f"进程已终止,返回码: {proc.returncode}")
asyncio.run(terminate_process())
强制终止进程
import asyncio
async def kill_process():
proc = await asyncio.create_subprocess_exec('sleep', '100')
await asyncio.sleep(1)
# 发送 SIGKILL 信号(强制终止)
proc.kill()
await proc.wait()
print(f"进程已强制终止,返回码: {proc.returncode}")
asyncio.run(kill_process())
终止方式对比
| 方法 | 信号 | 说明 |
|---|---|---|
terminate() | SIGTERM | 请求进程正常退出,进程可以捕获并清理 |
kill() | SIGKILL | 强制终止,进程无法捕获 |
send_signal() | 自定义 | 发送任意信号 |
发送自定义信号
import asyncio
import signal
async def send_custom_signal():
proc = await asyncio.create_subprocess_exec('python', '-c', '''
import signal
import time
def handler(signum, frame):
print(f"收到信号 {signum}")
exit(0)
signal.signal(signal.SIGUSR1, handler)
print("等待信号...")
while True:
time.sleep(1)
''')
await asyncio.sleep(1)
# 发送 SIGUSR1 信号
proc.send_signal(signal.SIGUSR1)
await proc.wait()
print(f"进程结束,返回码: {proc.returncode}")
# Unix 系统有效
# asyncio.run(send_custom_signal())
检查进程状态
returncode 属性
进程的返回码存储在 returncode 属性中:
import asyncio
async def check_returncode():
# 成功的命令
proc1 = await asyncio.create_subprocess_exec('true')
await proc1.wait()
print(f"true 返回码: {proc1.returncode}") # 0
# 失败的命令
proc2 = await asyncio.create_subprocess_exec('false')
await proc2.wait()
print(f"false 返回码: {proc2.returncode}") # 1
asyncio.run(check_returncode())
返回码含义:
None:进程仍在运行0:成功非 0:失败(具体含义取决于程序)-N:被信号 N 终止
检查进程是否运行
import asyncio
async def check_running():
proc = await asyncio.create_subprocess_exec('sleep', '5')
print(f"运行中? returncode={proc.returncode}") # None 表示运行中
await proc.wait()
print(f"运行中? returncode={proc.returncode}") # 有值表示已结束
asyncio.run(check_running())
并发执行多个进程
并发执行多个命令
import asyncio
async def run_command(cmd):
"""执行单个命令"""
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE
)
stdout, _ = await proc.communicate()
return stdout.decode()
async def run_multiple():
commands = [
['echo', 'Hello'],
['echo', 'World'],
['echo', 'asyncio'],
]
# 并发执行所有命令
tasks = [run_command(cmd) for cmd in commands]
results = await asyncio.gather(*tasks)
for result in results:
print(result.strip())
asyncio.run(run_multiple())
限制并发数量
import asyncio
async def run_command_with_semaphore(semaphore, cmd):
"""带信号量限制的命令执行"""
async with semaphore:
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE
)
stdout, _ = await proc.communicate()
return stdout.decode()
async def run_limited():
semaphore = asyncio.Semaphore(3) # 最多 3 个并发
commands = [['sleep', '1'] for _ in range(10)]
tasks = [
run_command_with_semaphore(semaphore, cmd)
for cmd in commands
]
await asyncio.gather(*tasks)
print("所有命令完成")
asyncio.run(run_limited())
实践示例
示例 1:异步 Git 操作
import asyncio
async def git_status():
"""获取 git 状态"""
proc = await asyncio.create_subprocess_exec(
'git', 'status', '--short',
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await proc.communicate()
if proc.returncode != 0:
print(f"错误: {stderr.decode()}")
return
for line in stdout.decode().strip().split('\n'):
if line:
print(f"文件状态: {line}")
asyncio.run(git_status())
示例 2:批量图片转换
import asyncio
import os
async def convert_image(input_path, output_path):
"""使用 ffmpeg 转换图片"""
proc = await asyncio.create_subprocess_exec(
'ffmpeg', '-i', input_path, '-y', output_path,
stdout=asyncio.subprocess.DEVNULL,
stderr=asyncio.subprocess.PIPE
)
_, stderr = await proc.communicate()
if proc.returncode == 0:
print(f"转换成功: {output_path}")
else:
print(f"转换失败: {stderr.decode()}")
async def batch_convert(input_dir, output_dir):
"""批量转换"""
os.makedirs(output_dir, exist_ok=True)
semaphore = asyncio.Semaphore(4) # 限制并发数
async def limited_convert(filename):
async with semaphore:
input_path = os.path.join(input_dir, filename)
output_path = os.path.join(output_dir, filename.replace('.png', '.jpg'))
await convert_image(input_path, output_path)
tasks = []
for filename in os.listdir(input_dir):
if filename.endswith('.png'):
tasks.append(limited_convert(filename))
await asyncio.gather(*tasks)
# asyncio.run(batch_convert('input', 'output'))
示例 3:实时日志监控
import asyncio
async def monitor_log(filepath):
"""实时监控日志文件"""
proc = await asyncio.create_subprocess_exec(
'tail', '-f', filepath,
stdout=asyncio.subprocess.PIPE
)
print(f"监控日志: {filepath}")
try:
while True:
line = await proc.stdout.readline()
if not line:
break
print(f"[日志] {line.decode().strip()}")
except asyncio.CancelledError:
proc.terminate()
await proc.wait()
async def main():
task = asyncio.create_task(monitor_log('/var/log/system.log'))
# 运行 10 秒后停止
await asyncio.sleep(10)
task.cancel()
try:
await task
except asyncio.CancelledError:
print("监控已停止")
# asyncio.run(main())
示例 4:命令执行器类
import asyncio
from dataclasses import dataclass
from typing import Optional, List
@dataclass
class CommandResult:
"""命令执行结果"""
returncode: int
stdout: str
stderr: str
success: bool
class AsyncCommand:
"""异步命令执行器"""
def __init__(self, command: List[str], timeout: Optional[float] = None):
self.command = command
self.timeout = timeout
async def run(self) -> CommandResult:
"""执行命令"""
proc = await asyncio.create_subprocess_exec(
*self.command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
try:
stdout, stderr = await asyncio.wait_for(
proc.communicate(),
timeout=self.timeout
)
except asyncio.TimeoutError:
proc.kill()
await proc.wait()
raise
return CommandResult(
returncode=proc.returncode,
stdout=stdout.decode(),
stderr=stderr.decode(),
success=proc.returncode == 0
)
async def main():
# 简单使用
cmd = AsyncCommand(['echo', 'Hello'])
result = await cmd.run()
print(result)
# 带超时
cmd_with_timeout = AsyncCommand(['sleep', '10'], timeout=2)
try:
result = await cmd_with_timeout.run()
except asyncio.TimeoutError:
print("命令超时")
asyncio.run(main())
示例 5:管道连接多个进程
import asyncio
async def pipeline():
"""将多个进程通过管道连接(类似 shell 的 |)"""
# 进程1:生成数据
proc1 = await asyncio.create_subprocess_exec(
'echo', '1\n2\n3\n4\n5',
stdout=asyncio.subprocess.PIPE
)
# 进程2:过滤数据
proc2 = await asyncio.create_subprocess_exec(
'grep', '[135]',
stdin=proc1.stdout,
stdout=asyncio.subprocess.PIPE
)
# 进程3:处理数据
proc3 = await asyncio.create_subprocess_exec(
'wc', '-l',
stdin=proc2.stdout,
stdout=asyncio.subprocess.PIPE
)
# 关闭不需要的管道端
proc1.stdout.close()
proc2.stdout.close()
stdout, _ = await proc3.communicate()
print(f"结果: {stdout.decode().strip()} 行")
asyncio.run(pipeline())
小结
asyncio 子进程管理的核心 API:
| API | 用途 |
|---|---|
create_subprocess_exec() | 安全执行命令(推荐) |
create_subprocess_shell() | 执行 shell 命令 |
communicate() | 一次性发送输入并获取输出 |
wait() | 等待进程完成 |
terminate() | 发送 SIGTERM 信号 |
kill() | 发送 SIGKILL 信号 |
send_signal() | 发送自定义信号 |
最佳实践:
- 优先使用
create_subprocess_exec()避免 shell 注入 - 对于长时间运行的进程,使用逐行读取而非
communicate() - 使用信号量限制并发进程数量
- 为进程执行添加超时控制
- 正确处理进程的 stdout 和 stderr 管道,避免死锁
- 使用
try/finally确保进程被正确终止
asyncio 的子进程管理功能非常适合需要执行大量外部命令的场景,如构建系统、批处理任务、CI/CD 管道等。