正在加载,请稍候…

Python asyncio 高级模式:无线程并发

掌握 Python asyncio 在生产环境中的高级用法,包括任务组、信号量、队列、连接池、取消、超时处理以及集成同步库。

Python asyncio 高级模式:无线程并发

asyncio 在生产环境中的应用

Python 的 asyncio 擅长处理 I/O 密集型并发。以下是实际应用中重要的模式。

Python asyncio 高级模式:无线程并发 插图

任务组(Python 3.11+)

import asyncio

async def fetch_data(url: str) -> dict:
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.json()

async def main():
    urls = [
        "https://api.example.com/users",
        "https://api.example.com/products",
        "https://api.example.com/orders",
    ]
    
    # TaskGroup 在第一个失败时取消所有任务
    async with asyncio.TaskGroup() as tg:
        tasks = [tg.create_task(fetch_data(url)) for url in urls]
    
    results = [t.result() for t in tasks]
    return results

信号量用于速率限制

async def fetch_with_limit(url: str, semaphore: asyncio.Semaphore) -> dict:
    async with semaphore:
        async with aiohttp.ClientSession() as session:
            async with session.get(url) as response:
                return await response.json()

async def fetch_many(urls: list[str], max_concurrent: int = 10):
    semaphore = asyncio.Semaphore(max_concurrent)
    tasks = [fetch_with_limit(url, semaphore) for url in urls]
    return await asyncio.gather(*tasks, return_exceptions=True)

生产者-消费者模式与队列

import asyncio
from asyncio import Queue

async def producer(queue: Queue, items: list):
    for item in items:
        await queue.put(item)
        await asyncio.sleep(0.01)  # 模拟生产延迟
    
    # 发送完成信号
    await queue.put(None)

async def consumer(queue: Queue, worker_id: int, results: list):
    while True:
        item = await queue.get()
        if item is None:
            queue.task_done()
            await queue.put(None)  # 将信号传递给下一个消费者
            break
        
        result = await process_item(item)
        results.append(result)
        queue.task_done()
        print(f"Worker {worker_id}: processed {item}")

async def pipeline(items: list, num_workers: int = 4):
    queue = Queue(maxsize=100)  # 背压!
    results = []
    
    consumers = [
        asyncio.create_task(consumer(queue, i, results))
        for i in range(num_workers)
    ]
    
    await producer(queue, items)
    await asyncio.gather(*consumers)
    
    return results

Python asyncio 高级模式:无线程并发 插图

超时与取消

import asyncio

async def fetch_with_timeout(url: str, timeout: float = 5.0) -> dict | None:
    try:
        async with asyncio.timeout(timeout):  # Python 3.11+
            return await fetch_data(url)
    except TimeoutError:
        print(f"Timeout fetching {url}")
        return None

# 优雅处理取消
async def long_running_task():
    try:
        for i in range(100):
            await do_work(i)
            await asyncio.sleep(0.1)
    except asyncio.CancelledError:
        print("Task cancelled — cleaning up")
        await cleanup()
        raise  # 重新抛出以传播

# 取消一个任务
task = asyncio.create_task(long_running_task())
await asyncio.sleep(1)
task.cancel()
try:
    await task
except asyncio.CancelledError:
    print("Task was cancelled")

连接池模式

import asyncio
from asyncio import Queue
import asyncpg

class AsyncConnectionPool:
    def __init__(self, dsn: str, min_size: int = 5, max_size: int = 20):
        self.dsn = dsn
        self.min_size = min_size
        self.max_size = max_size
        self._pool: asyncpg.Pool | None = None
    
    async def __aenter__(self):
        self._pool = await asyncpg.create_pool(
            self.dsn,
            min_size=self.min_size,
            max_size=self.max_size,
        )
        return self
    
    async def __aexit__(self, *args):
        await self._pool.close()
    
    async def execute(self, query: str, *args):
        async with self._pool.acquire() as conn:
            return await conn.execute(query, *args)
    
    async def fetch(self, query: str, *args) -> list:
        async with self._pool.acquire() as conn:
            return await conn.fetch(query, *args)

在线程中运行同步代码

import asyncio
from concurrent.futures import ThreadPoolExecutor

executor = ThreadPoolExecutor(max_workers=10)

async def run_sync(func, *args):
    loop = asyncio.get_event_loop()
    return await loop.run_in_executor(executor, func, *args)

# 示例:阻塞的 PIL 图像处理
async def process_image(image_path: str):
    def _process(path: str):
        from PIL import Image
        img = Image.open(path)
        img = img.resize((800, 600))
        img.save(path.replace('.jpg', '_resized.jpg'))
    
    await run_sync(_process, image_path)

Python asyncio 高级模式:无线程并发 插图

结构化并发模式

from contextlib import asynccontextmanager
from typing import AsyncIterator

@asynccontextmanager
async def managed_tasks(*coros) -> AsyncIterator[list]:
    tasks = [asyncio.create_task(coro) for coro in coros]
    try:
        yield tasks
        await asyncio.gather(*tasks)
    except Exception:
        # 出错时取消所有任务
        for task in tasks:
            task.cancel()
        await asyncio.gather(*tasks, return_exceptions=True)
        raise

async def main():
    async with managed_tasks(
        fetch_users(),
        fetch_products(),
        warm_cache(),
    ) as tasks:
        pass  # 所有任务完成或全部取消

异步上下文变量

from contextvars import ContextVar

request_id: ContextVar[str] = ContextVar('request_id', default='')

async def handle_request(req_id: str):
    token = request_id.set(req_id)
    try:
        await process_request()
    finally:
        request_id.reset(token)

async def process_request():
    rid = request_id.get()
    print(f"Processing request {rid}")
    # 即使在并发请求下也能正确工作

性能提示

  • 使用 asyncio.gather() 处理并行的独立任务
  • 使用 asyncio.Queue 实现管道中的背压
  • 避免使用 time.sleep() — 改用 asyncio.sleep()
  • 使用 aiomonitorpy-spy 进行性能分析
  • 在开发环境中设置 PYTHONASYNCIODEBUG=1 以捕获问题