
asyncio 使得在单线程上实现大量 IO 密集型并发成为可能。以下是如何正确使用它。
心智模型
单线程,协作式多任务,通过 await 显式让出控制权。IO 密集型工作负载可获得 10-100 倍加速;CPU 密集型则需要多进程。
import asyncio, aiohttp
# 错误:阻塞整个事件循环
async def fetch_bad(url: str) -> str:
import requests
return requests.get(url).text # 阻塞所有其他协程
# 正确:非阻塞
async def fetch_good(session: aiohttp.ClientSession, url: str) -> str:
async with session.get(url) as response:
return await response.text()

并发请求
async def check_all(urls: list) -> list:
async with aiohttp.ClientSession() as session:
tasks = [get_status(session, url) for url in urls]
return await asyncio.gather(*tasks, return_exceptions=True)

信号量(速率控制)
async def scrape_urls(urls: list, max_concurrent: int = 20) -> list:
sem = asyncio.Semaphore(max_concurrent)
async def fetch(url):
async with sem:
async with aiohttp.ClientSession() as s:
async with s.get(url) as resp:
return await resp.text()
return await asyncio.gather(*[fetch(u) for u in urls], return_exceptions=True)

生产者-消费者队列
async def pipeline(urls: list) -> list:
queue, results, N = asyncio.Queue(maxsize=100), [], 5
async def producer():
for url in urls:
await queue.put(url)
for _ in range(N):
await queue.put(None) # 毒丸
async def consumer():
async with aiohttp.ClientSession() as session:
while True:
url = await queue.get()
if url is None:
queue.task_done()
break
try:
async with session.get(url) as resp:
results.append({'url': url, 'len': len(await resp.text())})
finally:
queue.task_done()
await asyncio.gather(producer(), *[asyncio.create_task(consumer()) for _ in range(N)])
return results
CPU 密集型工作与 ProcessPoolExecutor
from concurrent.futures import ProcessPoolExecutor
import multiprocessing
def heavy_compute(data: list) -> dict:
return {'result': sum(x**2 for x in data)}
async def run_heavy(data: list) -> dict:
loop = asyncio.get_event_loop()
with ProcessPoolExecutor(max_workers=multiprocessing.cpu_count()) as ex:
return await loop.run_in_executor(ex, heavy_compute, data)
带重试的错误处理
async def with_retry(coro_fn, max_attempts=3, base_delay=1.0):
for attempt in range(max_attempts):
try:
return await coro_fn()
except (aiohttp.ClientError, asyncio.TimeoutError) as e:
if attempt < max_attempts - 1:
await asyncio.sleep(base_delay * (2 ** attempt))
else:
raise
asyncio 奖励那些尊重其限制的开发者:IO 密集型工作、受控并发、以及将 CPU 工作放入执行器。
→ 使用 URL 解析器 工具测试你的异步 API 端点。