跳到主要内容

子进程管理

asyncio 提供了异步创建和管理子进程的能力,可以在不阻塞事件循环的情况下执行外部命令、与子进程交互。本章介绍如何使用 asyncio 进行子进程管理。

为什么需要异步子进程?

传统使用 subprocess.run()os.system() 执行外部命令会阻塞整个线程:

import subprocess
import time

# 同步方式:阻塞等待
result = subprocess.run(['sleep', '5'], capture_output=True)
# 这 5 秒内,整个程序都无法执行其他操作

使用 asyncio 可以在等待子进程完成的同时继续执行其他任务:

import asyncio

async def run_command():
# 异步执行命令
proc = await asyncio.create_subprocess_exec('sleep', '5')
# 可以在这里做其他事情
await proc.wait()

asyncio.run(run_command())

创建子进程

create_subprocess_exec()

使用 create_subprocess_exec() 执行命令(推荐方式):

import asyncio

async def run_ls():
# 执行 ls -la 命令
proc = await asyncio.create_subprocess_exec(
'ls', '-la',
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)

# 等待命令完成并获取输出
stdout, stderr = await proc.communicate()

print(f"返回码: {proc.returncode}")
print(f"标准输出:\n{stdout.decode()}")
print(f"标准错误:\n{stderr.decode()}")

asyncio.run(run_ls())

create_subprocess_exec() 的参数:

  • 第一个参数:可执行程序路径或名称
  • 后续参数:命令行参数(每个参数单独传入)
  • 关键字参数:重定向输入输出

create_subprocess_shell()

使用 create_subprocess_shell() 执行 shell 命令:

import asyncio

async def run_shell_command():
# 执行 shell 命令(支持管道、重定向等 shell 特性)
proc = await asyncio.create_subprocess_shell(
'ls -la | grep ".py"',
stdout=asyncio.subprocess.PIPE
)

stdout, _ = await proc.communicate()
print(stdout.decode())

asyncio.run(run_shell_command())

注意create_subprocess_shell() 存在 shell 注入风险,如果命令参数来自用户输入,必须进行严格的验证或转义。对于已知命令,推荐使用 create_subprocess_exec()

两种方式的区别

特性create_subprocess_execcreate_subprocess_shell
安全性更安全,无 shell 注入风险有 shell 注入风险
Shell 特性不支持管道、通配符等支持完整 shell 语法
参数传递参数分开传入整个命令作为字符串
推荐场景已知命令,参数可控需要 shell 特性

进程输入输出

标准输出捕获

import asyncio

async def capture_output():
proc = await asyncio.create_subprocess_exec(
'echo', 'Hello, asyncio!',
stdout=asyncio.subprocess.PIPE
)

stdout, _ = await proc.communicate()
print(f"输出: {stdout.decode().strip()}")

asyncio.run(capture_output())

标准错误捕获

import asyncio

async def capture_stderr():
proc = await asyncio.create_subprocess_exec(
'python', '-c', 'import sys; print("错误信息", file=sys.stderr)',
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)

stdout, stderr = await proc.communicate()
print(f"标准输出: {stdout.decode()}")
print(f"标准错误: {stderr.decode()}")

asyncio.run(capture_stderr())

向进程发送输入

import asyncio

async def send_input():
# 创建一个需要输入的进程
proc = await asyncio.create_subprocess_exec(
'python', '-c', 'print(input("请输入: "))',
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)

# 发送输入并获取输出
stdout, stderr = await proc.communicate(input=b'Hello from asyncio\n')

print(f"输出: {stdout.decode()}")

asyncio.run(send_input())

输入输出重定向到文件

import asyncio

async def redirect_to_file():
# 将输出重定向到文件
with open('output.txt', 'w') as f:
proc = await asyncio.create_subprocess_exec(
'ls', '-la',
stdout=f # 直接传入文件对象
)

await proc.wait()

print("输出已写入 output.txt")

asyncio.run(redirect_to_file())

输出重定向选项

选项说明
None(默认)继承父进程的输出
asyncio.subprocess.PIPE创建管道,可以通过 communicate() 读取
asyncio.subprocess.DEVNULL丢弃输出
文件对象重定向到文件

与进程交互

communicate() - 一次性交互

communicate() 等待进程完成并返回所有输出:

import asyncio

async def communicate_example():
proc = await asyncio.create_subprocess_exec(
'python', '-c', '''
for i in range(5):
print(f"Line {i}")
''',
stdout=asyncio.subprocess.PIPE
)

# 一次性获取所有输出
stdout, _ = await proc.communicate()
print(stdout.decode())

asyncio.run(communicate_example())

注意communicate() 会等待进程结束,如果进程持续输出,程序会一直等待。

逐行读取输出

对于长时间运行的进程,可以逐行读取:

import asyncio

async def read_line_by_line():
proc = await asyncio.create_subprocess_exec(
'python', '-c', '''
import time
for i in range(5):
print(f"Line {i}")
time.sleep(1)
''',
stdout=asyncio.subprocess.PIPE
)

# 逐行读取
while True:
line = await proc.stdout.readline()
if not line:
break
print(f"收到: {line.decode().strip()}")

await proc.wait()

asyncio.run(read_line_by_line())

实时交互

对于需要实时交互的进程:

import asyncio

async def real_time_interaction():
proc = await asyncio.create_subprocess_exec(
'python', '-i', # Python 交互模式
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)

async def read_output():
"""持续读取输出"""
while True:
line = await proc.stdout.readline()
if not line:
break
print(f"输出: {line.decode().strip()}")

async def send_commands():
"""发送命令"""
commands = [
b'1 + 1\n',
b'2 * 3\n',
b'exit()\n'
]

for cmd in commands:
proc.stdin.write(cmd)
await proc.stdin.drain()
await asyncio.sleep(0.5)

# 并发执行读写
await asyncio.gather(
read_output(),
send_commands()
)

asyncio.run(real_time_interaction())

进程控制

等待进程完成

import asyncio

async def wait_for_process():
proc = await asyncio.create_subprocess_exec('sleep', '3')

print("等待进程完成...")
returncode = await proc.wait() # 阻塞直到进程结束

print(f"进程完成,返回码: {returncode}")

asyncio.run(wait_for_process())

带超时的等待

import asyncio

async def wait_with_timeout():
proc = await asyncio.create_subprocess_exec('sleep', '10')

try:
returncode = await asyncio.wait_for(proc.wait(), timeout=3)
print(f"进程完成,返回码: {returncode}")
except asyncio.TimeoutError:
print("进程超时,终止进程")
proc.terminate()
await proc.wait()

asyncio.run(wait_with_timeout())

终止进程

import asyncio
import signal

async def terminate_process():
proc = await asyncio.create_subprocess_exec('sleep', '100')

await asyncio.sleep(1)

# 发送 SIGTERM 信号(请求进程正常退出)
proc.terminate()

await proc.wait()
print(f"进程已终止,返回码: {proc.returncode}")

asyncio.run(terminate_process())

强制终止进程

import asyncio

async def kill_process():
proc = await asyncio.create_subprocess_exec('sleep', '100')

await asyncio.sleep(1)

# 发送 SIGKILL 信号(强制终止)
proc.kill()

await proc.wait()
print(f"进程已强制终止,返回码: {proc.returncode}")

asyncio.run(kill_process())

终止方式对比

方法信号说明
terminate()SIGTERM请求进程正常退出,进程可以捕获并清理
kill()SIGKILL强制终止,进程无法捕获
send_signal()自定义发送任意信号

发送自定义信号

import asyncio
import signal

async def send_custom_signal():
proc = await asyncio.create_subprocess_exec('python', '-c', '''
import signal
import time

def handler(signum, frame):
print(f"收到信号 {signum}")
exit(0)

signal.signal(signal.SIGUSR1, handler)
print("等待信号...")
while True:
time.sleep(1)
''')

await asyncio.sleep(1)

# 发送 SIGUSR1 信号
proc.send_signal(signal.SIGUSR1)

await proc.wait()
print(f"进程结束,返回码: {proc.returncode}")

# Unix 系统有效
# asyncio.run(send_custom_signal())

检查进程状态

returncode 属性

进程的返回码存储在 returncode 属性中:

import asyncio

async def check_returncode():
# 成功的命令
proc1 = await asyncio.create_subprocess_exec('true')
await proc1.wait()
print(f"true 返回码: {proc1.returncode}") # 0

# 失败的命令
proc2 = await asyncio.create_subprocess_exec('false')
await proc2.wait()
print(f"false 返回码: {proc2.returncode}") # 1

asyncio.run(check_returncode())

返回码含义:

  • None:进程仍在运行
  • 0:成功
  • 非 0:失败(具体含义取决于程序)
  • -N:被信号 N 终止

检查进程是否运行

import asyncio

async def check_running():
proc = await asyncio.create_subprocess_exec('sleep', '5')

print(f"运行中? returncode={proc.returncode}") # None 表示运行中

await proc.wait()

print(f"运行中? returncode={proc.returncode}") # 有值表示已结束

asyncio.run(check_running())

并发执行多个进程

并发执行多个命令

import asyncio

async def run_command(cmd):
"""执行单个命令"""
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE
)
stdout, _ = await proc.communicate()
return stdout.decode()

async def run_multiple():
commands = [
['echo', 'Hello'],
['echo', 'World'],
['echo', 'asyncio'],
]

# 并发执行所有命令
tasks = [run_command(cmd) for cmd in commands]
results = await asyncio.gather(*tasks)

for result in results:
print(result.strip())

asyncio.run(run_multiple())

限制并发数量

import asyncio

async def run_command_with_semaphore(semaphore, cmd):
"""带信号量限制的命令执行"""
async with semaphore:
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE
)
stdout, _ = await proc.communicate()
return stdout.decode()

async def run_limited():
semaphore = asyncio.Semaphore(3) # 最多 3 个并发

commands = [['sleep', '1'] for _ in range(10)]

tasks = [
run_command_with_semaphore(semaphore, cmd)
for cmd in commands
]

await asyncio.gather(*tasks)
print("所有命令完成")

asyncio.run(run_limited())

实践示例

示例 1:异步 Git 操作

import asyncio

async def git_status():
"""获取 git 状态"""
proc = await asyncio.create_subprocess_exec(
'git', 'status', '--short',
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)

stdout, stderr = await proc.communicate()

if proc.returncode != 0:
print(f"错误: {stderr.decode()}")
return

for line in stdout.decode().strip().split('\n'):
if line:
print(f"文件状态: {line}")

asyncio.run(git_status())

示例 2:批量图片转换

import asyncio
import os

async def convert_image(input_path, output_path):
"""使用 ffmpeg 转换图片"""
proc = await asyncio.create_subprocess_exec(
'ffmpeg', '-i', input_path, '-y', output_path,
stdout=asyncio.subprocess.DEVNULL,
stderr=asyncio.subprocess.PIPE
)

_, stderr = await proc.communicate()

if proc.returncode == 0:
print(f"转换成功: {output_path}")
else:
print(f"转换失败: {stderr.decode()}")

async def batch_convert(input_dir, output_dir):
"""批量转换"""
os.makedirs(output_dir, exist_ok=True)

semaphore = asyncio.Semaphore(4) # 限制并发数

async def limited_convert(filename):
async with semaphore:
input_path = os.path.join(input_dir, filename)
output_path = os.path.join(output_dir, filename.replace('.png', '.jpg'))
await convert_image(input_path, output_path)

tasks = []
for filename in os.listdir(input_dir):
if filename.endswith('.png'):
tasks.append(limited_convert(filename))

await asyncio.gather(*tasks)

# asyncio.run(batch_convert('input', 'output'))

示例 3:实时日志监控

import asyncio

async def monitor_log(filepath):
"""实时监控日志文件"""
proc = await asyncio.create_subprocess_exec(
'tail', '-f', filepath,
stdout=asyncio.subprocess.PIPE
)

print(f"监控日志: {filepath}")

try:
while True:
line = await proc.stdout.readline()
if not line:
break
print(f"[日志] {line.decode().strip()}")
except asyncio.CancelledError:
proc.terminate()
await proc.wait()

async def main():
task = asyncio.create_task(monitor_log('/var/log/system.log'))

# 运行 10 秒后停止
await asyncio.sleep(10)
task.cancel()
try:
await task
except asyncio.CancelledError:
print("监控已停止")

# asyncio.run(main())

示例 4:命令执行器类

import asyncio
from dataclasses import dataclass
from typing import Optional, List

@dataclass
class CommandResult:
"""命令执行结果"""
returncode: int
stdout: str
stderr: str
success: bool

class AsyncCommand:
"""异步命令执行器"""

def __init__(self, command: List[str], timeout: Optional[float] = None):
self.command = command
self.timeout = timeout

async def run(self) -> CommandResult:
"""执行命令"""
proc = await asyncio.create_subprocess_exec(
*self.command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)

try:
stdout, stderr = await asyncio.wait_for(
proc.communicate(),
timeout=self.timeout
)
except asyncio.TimeoutError:
proc.kill()
await proc.wait()
raise

return CommandResult(
returncode=proc.returncode,
stdout=stdout.decode(),
stderr=stderr.decode(),
success=proc.returncode == 0
)

async def main():
# 简单使用
cmd = AsyncCommand(['echo', 'Hello'])
result = await cmd.run()
print(result)

# 带超时
cmd_with_timeout = AsyncCommand(['sleep', '10'], timeout=2)
try:
result = await cmd_with_timeout.run()
except asyncio.TimeoutError:
print("命令超时")

asyncio.run(main())

示例 5:管道连接多个进程

import asyncio

async def pipeline():
"""将多个进程通过管道连接(类似 shell 的 |)"""
# 进程1:生成数据
proc1 = await asyncio.create_subprocess_exec(
'echo', '1\n2\n3\n4\n5',
stdout=asyncio.subprocess.PIPE
)

# 进程2:过滤数据
proc2 = await asyncio.create_subprocess_exec(
'grep', '[135]',
stdin=proc1.stdout,
stdout=asyncio.subprocess.PIPE
)

# 进程3:处理数据
proc3 = await asyncio.create_subprocess_exec(
'wc', '-l',
stdin=proc2.stdout,
stdout=asyncio.subprocess.PIPE
)

# 关闭不需要的管道端
proc1.stdout.close()
proc2.stdout.close()

stdout, _ = await proc3.communicate()
print(f"结果: {stdout.decode().strip()} 行")

asyncio.run(pipeline())

小结

asyncio 子进程管理的核心 API:

API用途
create_subprocess_exec()安全执行命令(推荐)
create_subprocess_shell()执行 shell 命令
communicate()一次性发送输入并获取输出
wait()等待进程完成
terminate()发送 SIGTERM 信号
kill()发送 SIGKILL 信号
send_signal()发送自定义信号

最佳实践:

  1. 优先使用 create_subprocess_exec() 避免 shell 注入
  2. 对于长时间运行的进程,使用逐行读取而非 communicate()
  3. 使用信号量限制并发进程数量
  4. 为进程执行添加超时控制
  5. 正确处理进程的 stdout 和 stderr 管道,避免死锁
  6. 使用 try/finally 确保进程被正确终止

asyncio 的子进程管理功能非常适合需要执行大量外部命令的场景,如构建系统、批处理任务、CI/CD 管道等。