正在加载,请稍候…

使用 Tokio 进行 Rust 异步编程:构建高性能 Web 服务

使用 async/await 和 Tokio 构建高性能 Rust Web 服务。涵盖 Future trait、Tokio 运行时、任务生成、共享状态、Axu

使用 Tokio 进行 Rust 异步编程:构建高性能 Web 服务

使用 Tokio 进行 Rust 异步编程:构建高性能 Web 服务

Rust 的异步生态系统已经显著成熟,Tokio 成为生产服务的实际标准运行时。从底层理解 Rust 异步——Future trait、执行器模型以及实用模式——对于构建高性能网络应用至关重要。

理解 Future Trait

Rust 异步模型的核心是 Future trait:

pub trait Future {
    type Output;
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}

pub enum Poll<T> {
    Ready(T),
    Pending,
}

当你编写 async fn 时,编译器会将其转换为实现 Future 的状态机。执行器调用 poll 直到返回 Ready

使用 Tokio 进行 Rust 异步编程:构建高性能 Web 服务 插图

设置 Tokio

将 Tokio 添加到你的 Cargo.toml

[dependencies]
tokio = { version = "1", features = ["full"] }
axum = "0.7"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
tower = "0.4"
tracing = "0.1"
tracing-subscriber = "0.3"

最小的 Tokio 运行时:

#[tokio::main]
async fn main() {
    println!("Running on Tokio!");
    let result = fetch_data("https://api.example.com").await;
    println!("{:?}", result);
}

async fn fetch_data(url: &str) -> Result<String, reqwest::Error> {
    reqwest::get(url).await?.text().await
}

生成任务

tokio::spawn 创建独立的任务并发运行:

use tokio::task::JoinHandle;

async fn concurrent_tasks() {
    let handle1: JoinHandle<i32> = tokio::spawn(async {
        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
        42
    });

    let handle2: JoinHandle<i32> = tokio::spawn(async {
        tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
        7
    });

    let (r1, r2) = tokio::join!(handle1, handle2);
    println!("{} {}", r1.unwrap(), r2.unwrap());
}

async fn try_concurrent() -> Result<(), Box<dyn std::error::Error>> {
    let (r1, r2) = tokio::try_join!(
        fetch_data("https://api1.example.com"),
        fetch_data("https://api2.example.com"),
    )?;
    println!("{} {}", r1, r2);
    Ok(())
}

异步代码中的共享状态

在异步任务之间共享状态需要小心处理:

use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::{Mutex, RwLock};

type SharedMap = Arc<Mutex<HashMap<String, String>>>;

async fn write_to_map(map: SharedMap, key: String, value: String) {
    let mut guard = map.lock().await;
    guard.insert(key, value);
}

async fn read_from_map(map: SharedMap, key: &str) -> Option<String> {
    let guard = map.lock().await;
    guard.get(key).cloned()
}

type SharedCache = Arc<RwLock<HashMap<String, Vec<u8>>>>;

async fn cache_put(cache: SharedCache, key: String, data: Vec<u8>) {
    let mut w = cache.write().await;
    w.insert(key, data);
}

async fn cache_get(cache: SharedCache, key: &str) -> Option<Vec<u8>> {
    let r = cache.read().await;
    r.get(key).cloned()
}

使用 Tokio 进行 Rust 异步编程:构建高性能 Web 服务 插图

构建 Axum Web 服务

Axum 是建立在 Tokio 和 Tower 之上的领先 Rust Web 框架:

use axum::{
    extract::{Path, State},
    http::StatusCode,
    response::Json,
    routing::{get, post},
    Router,
};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;

#[derive(Debug, Clone, Serialize, Deserialize)]
struct User {
    id: u64,
    name: String,
    email: String,
}

type UserStore = Arc<RwLock<HashMap<u64, User>>>;

async fn get_user(
    Path(id): Path<u64>,
    State(store): State<UserStore>,
) -> Result<Json<User>, StatusCode> {
    let users = store.read().await;
    users.get(&id)
        .cloned()
        .map(Json)
        .ok_or(StatusCode::NOT_FOUND)
}

#[derive(Deserialize)]
struct CreateUser {
    name: String,
    email: String,
}

async fn create_user(
    State(store): State<UserStore>,
    Json(payload): Json<CreateUser>,
) -> (StatusCode, Json<User>) {
    let mut users = store.write().await;
    let id = users.len() as u64 + 1;
    let user = User { id, name: payload.name, email: payload.email };
    users.insert(id, user.clone());
    (StatusCode::CREATED, Json(user))
}

#[tokio::main]
async fn main() {
    let store: UserStore = Arc::new(RwLock::new(HashMap::new()));

    let app = Router::new()
        .route("/users", get(list_users).post(create_user))
        .route("/users/:id", get(get_user))
        .with_state(store);

    let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
    axum::serve(listener, app).await.unwrap();
}

使用 Tower 的中间件

use axum::middleware::{self, Next};
use axum::extract::Request;
use axum::response::Response;
use std::time::Instant;

async fn timing_middleware(req: Request, next: Next) -> Response {
    let start = Instant::now();
    let path = req.uri().path().to_string();
    let method = req.method().clone();
    let response = next.run(req).await;
    let duration = start.elapsed();
    println!("{} {} took {}ms", method, path, duration.as_millis());
    response
}

async fn auth_middleware(req: Request, next: Next) -> Result<Response, StatusCode> {
    let auth_header = req.headers()
        .get("authorization")
        .and_then(|v| v.to_str().ok());

    match auth_header {
        Some(token) if token.starts_with("Bearer ") => Ok(next.run(req).await),
        _ => Err(StatusCode::UNAUTHORIZED),
    }
}

性能调优

使用 Tokio 进行 Rust 异步编程:构建高性能 Web 服务 插图

Tokio 运行时配置

#[tokio::main(flavor = "multi_thread", worker_threads = 4)]
async fn main() {
    // 使用 4 个工作线程
}

fn build_runtime() -> tokio::runtime::Runtime {
    tokio::runtime::Builder::new_multi_thread()
        .worker_threads(num_cpus::get())
        .enable_all()
        .thread_stack_size(3 * 1024 * 1024)
        .build()
        .unwrap()
}

避免在异步上下文中阻塞

use tokio::task;

// 错误:阻塞 I/O 会阻塞 Tokio 线程
async fn bad_file_read(path: &str) -> String {
    std::fs::read_to_string(path).unwrap() // 阻塞!
}

// 正确:使用 tokio 的异步 I/O
async fn good_file_read(path: &str) -> tokio::io::Result<String> {
    tokio::fs::read_to_string(path).await
}

// 对于 CPU 密集型工作:使用 spawn_blocking
async fn cpu_heavy_work(data: Vec<u8>) -> Vec<u8> {
    task::spawn_blocking(move || {
        data.iter().map(|b| b.wrapping_add(1)).collect()
    })
    .await
    .unwrap()
}

错误处理最佳实践

use thiserror::Error;

#[derive(Error, Debug)]
enum AppError {
    #[error("User not found: {id}")]
    NotFound { id: u64 },
    #[error("Database error: {0}")]
    Database(#[from] sqlx::Error),
    #[error("Validation error: {message}")]
    Validation { message: String },
}

impl axum::response::IntoResponse for AppError {
    fn into_response(self) -> axum::response::Response {
        let (status, message) = match self {
            AppError::NotFound { id } => (
                StatusCode::NOT_FOUND,
                format!("User {} not found", id),
            ),
            AppError::Database(e) => (
                StatusCode::INTERNAL_SERVER_ERROR,
                format!("Database error: {}", e),
            ),
            AppError::Validation { message } => (
                StatusCode::BAD_REQUEST,
                message,
            ),
        };
        (status, Json(serde_json::json!({"error": message}))).into_response()
    }
}

Rust 的异步生态系统在 2026 年提供了可与 C++ 媲美的卓越性能,同时具备使生产系统可靠的内存安全保证。Tokio 和 Axum 为处理每秒数百万请求且延迟可预测的服务提供了基础。