正在加载,请稍候…

Python异步编程:掌握asyncio构建高性能应用

深入解析Python asyncio:协程、事件循环、任务、信号量、异步上下文管理器,以及构建高性能异步爬虫和API客户端。

Python异步编程:掌握asyncio构建高性能应用

asyncio 使得在单线程上实现大量 IO 密集型并发成为可能。以下是如何正确使用它。

心智模型

单线程,协作式多任务,通过 await 显式让出控制权。IO 密集型工作负载可获得 10-100 倍加速;CPU 密集型则需要多进程。

import asyncio, aiohttp

# 错误:阻塞整个事件循环
async def fetch_bad(url: str) -> str:
    import requests
    return requests.get(url).text  # 阻塞所有其他协程

# 正确:非阻塞
async def fetch_good(session: aiohttp.ClientSession, url: str) -> str:
    async with session.get(url) as response:
        return await response.text()

Python异步编程:掌握asyncio构建高性能应用插图

并发请求

async def check_all(urls: list) -> list:
    async with aiohttp.ClientSession() as session:
        tasks = [get_status(session, url) for url in urls]
        return await asyncio.gather(*tasks, return_exceptions=True)

Python异步编程:掌握asyncio构建高性能应用插图

信号量(速率控制)

async def scrape_urls(urls: list, max_concurrent: int = 20) -> list:
    sem = asyncio.Semaphore(max_concurrent)

    async def fetch(url):
        async with sem:
            async with aiohttp.ClientSession() as s:
                async with s.get(url) as resp:
                    return await resp.text()

    return await asyncio.gather(*[fetch(u) for u in urls], return_exceptions=True)

Python异步编程:掌握asyncio构建高性能应用插图

生产者-消费者队列

async def pipeline(urls: list) -> list:
    queue, results, N = asyncio.Queue(maxsize=100), [], 5

    async def producer():
        for url in urls:
            await queue.put(url)
        for _ in range(N):
            await queue.put(None)  # 毒丸

    async def consumer():
        async with aiohttp.ClientSession() as session:
            while True:
                url = await queue.get()
                if url is None:
                    queue.task_done()
                    break
                try:
                    async with session.get(url) as resp:
                        results.append({'url': url, 'len': len(await resp.text())})
                finally:
                    queue.task_done()

    await asyncio.gather(producer(), *[asyncio.create_task(consumer()) for _ in range(N)])
    return results

CPU 密集型工作与 ProcessPoolExecutor

from concurrent.futures import ProcessPoolExecutor
import multiprocessing

def heavy_compute(data: list) -> dict:
    return {'result': sum(x**2 for x in data)}

async def run_heavy(data: list) -> dict:
    loop = asyncio.get_event_loop()
    with ProcessPoolExecutor(max_workers=multiprocessing.cpu_count()) as ex:
        return await loop.run_in_executor(ex, heavy_compute, data)

带重试的错误处理

async def with_retry(coro_fn, max_attempts=3, base_delay=1.0):
    for attempt in range(max_attempts):
        try:
            return await coro_fn()
        except (aiohttp.ClientError, asyncio.TimeoutError) as e:
            if attempt < max_attempts - 1:
                await asyncio.sleep(base_delay * (2 ** attempt))
            else:
                raise

asyncio 奖励那些尊重其限制的开发者:IO 密集型工作、受控并发、以及将 CPU 工作放入执行器。

→ 使用 URL 解析器 工具测试你的异步 API 端点。