
asyncio 在生产环境中的应用
Python 的 asyncio 擅长处理 I/O 密集型并发。以下是实际应用中重要的模式。

任务组(Python 3.11+)
import asyncio
async def fetch_data(url: str) -> dict:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.json()
async def main():
urls = [
"https://api.example.com/users",
"https://api.example.com/products",
"https://api.example.com/orders",
]
# TaskGroup 在第一个失败时取消所有任务
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(fetch_data(url)) for url in urls]
results = [t.result() for t in tasks]
return results
信号量用于速率限制
async def fetch_with_limit(url: str, semaphore: asyncio.Semaphore) -> dict:
async with semaphore:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.json()
async def fetch_many(urls: list[str], max_concurrent: int = 10):
semaphore = asyncio.Semaphore(max_concurrent)
tasks = [fetch_with_limit(url, semaphore) for url in urls]
return await asyncio.gather(*tasks, return_exceptions=True)
生产者-消费者模式与队列
import asyncio
from asyncio import Queue
async def producer(queue: Queue, items: list):
for item in items:
await queue.put(item)
await asyncio.sleep(0.01) # 模拟生产延迟
# 发送完成信号
await queue.put(None)
async def consumer(queue: Queue, worker_id: int, results: list):
while True:
item = await queue.get()
if item is None:
queue.task_done()
await queue.put(None) # 将信号传递给下一个消费者
break
result = await process_item(item)
results.append(result)
queue.task_done()
print(f"Worker {worker_id}: processed {item}")
async def pipeline(items: list, num_workers: int = 4):
queue = Queue(maxsize=100) # 背压!
results = []
consumers = [
asyncio.create_task(consumer(queue, i, results))
for i in range(num_workers)
]
await producer(queue, items)
await asyncio.gather(*consumers)
return results
超时与取消
import asyncio
async def fetch_with_timeout(url: str, timeout: float = 5.0) -> dict | None:
try:
async with asyncio.timeout(timeout): # Python 3.11+
return await fetch_data(url)
except TimeoutError:
print(f"Timeout fetching {url}")
return None
# 优雅处理取消
async def long_running_task():
try:
for i in range(100):
await do_work(i)
await asyncio.sleep(0.1)
except asyncio.CancelledError:
print("Task cancelled — cleaning up")
await cleanup()
raise # 重新抛出以传播
# 取消一个任务
task = asyncio.create_task(long_running_task())
await asyncio.sleep(1)
task.cancel()
try:
await task
except asyncio.CancelledError:
print("Task was cancelled")
连接池模式
import asyncio
from asyncio import Queue
import asyncpg
class AsyncConnectionPool:
def __init__(self, dsn: str, min_size: int = 5, max_size: int = 20):
self.dsn = dsn
self.min_size = min_size
self.max_size = max_size
self._pool: asyncpg.Pool | None = None
async def __aenter__(self):
self._pool = await asyncpg.create_pool(
self.dsn,
min_size=self.min_size,
max_size=self.max_size,
)
return self
async def __aexit__(self, *args):
await self._pool.close()
async def execute(self, query: str, *args):
async with self._pool.acquire() as conn:
return await conn.execute(query, *args)
async def fetch(self, query: str, *args) -> list:
async with self._pool.acquire() as conn:
return await conn.fetch(query, *args)
在线程中运行同步代码
import asyncio
from concurrent.futures import ThreadPoolExecutor
executor = ThreadPoolExecutor(max_workers=10)
async def run_sync(func, *args):
loop = asyncio.get_event_loop()
return await loop.run_in_executor(executor, func, *args)
# 示例:阻塞的 PIL 图像处理
async def process_image(image_path: str):
def _process(path: str):
from PIL import Image
img = Image.open(path)
img = img.resize((800, 600))
img.save(path.replace('.jpg', '_resized.jpg'))
await run_sync(_process, image_path)
结构化并发模式
from contextlib import asynccontextmanager
from typing import AsyncIterator
@asynccontextmanager
async def managed_tasks(*coros) -> AsyncIterator[list]:
tasks = [asyncio.create_task(coro) for coro in coros]
try:
yield tasks
await asyncio.gather(*tasks)
except Exception:
# 出错时取消所有任务
for task in tasks:
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
raise
async def main():
async with managed_tasks(
fetch_users(),
fetch_products(),
warm_cache(),
) as tasks:
pass # 所有任务完成或全部取消
异步上下文变量
from contextvars import ContextVar
request_id: ContextVar[str] = ContextVar('request_id', default='')
async def handle_request(req_id: str):
token = request_id.set(req_id)
try:
await process_request()
finally:
request_id.reset(token)
async def process_request():
rid = request_id.get()
print(f"Processing request {rid}")
# 即使在并发请求下也能正确工作
性能提示
- 使用
asyncio.gather()处理并行的独立任务 - 使用
asyncio.Queue实现管道中的背压 - 避免使用
time.sleep()— 改用asyncio.sleep() - 使用
aiomonitor或py-spy进行性能分析 - 在开发环境中设置
PYTHONASYNCIODEBUG=1以捕获问题