
使用 Tokio 进行 Rust 异步编程:构建高性能 Web 服务
Rust 的异步生态系统已经显著成熟,Tokio 成为生产服务的实际标准运行时。从底层理解 Rust 异步——Future trait、执行器模型以及实用模式——对于构建高性能网络应用至关重要。
理解 Future Trait
Rust 异步模型的核心是 Future trait:
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
pub enum Poll<T> {
Ready(T),
Pending,
}
当你编写 async fn 时,编译器会将其转换为实现 Future 的状态机。执行器调用 poll 直到返回 Ready。
设置 Tokio
将 Tokio 添加到你的 Cargo.toml:
[dependencies]
tokio = { version = "1", features = ["full"] }
axum = "0.7"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
tower = "0.4"
tracing = "0.1"
tracing-subscriber = "0.3"
最小的 Tokio 运行时:
#[tokio::main]
async fn main() {
println!("Running on Tokio!");
let result = fetch_data("https://api.example.com").await;
println!("{:?}", result);
}
async fn fetch_data(url: &str) -> Result<String, reqwest::Error> {
reqwest::get(url).await?.text().await
}
生成任务
tokio::spawn 创建独立的任务并发运行:
use tokio::task::JoinHandle;
async fn concurrent_tasks() {
let handle1: JoinHandle<i32> = tokio::spawn(async {
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
42
});
let handle2: JoinHandle<i32> = tokio::spawn(async {
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
7
});
let (r1, r2) = tokio::join!(handle1, handle2);
println!("{} {}", r1.unwrap(), r2.unwrap());
}
async fn try_concurrent() -> Result<(), Box<dyn std::error::Error>> {
let (r1, r2) = tokio::try_join!(
fetch_data("https://api1.example.com"),
fetch_data("https://api2.example.com"),
)?;
println!("{} {}", r1, r2);
Ok(())
}
异步代码中的共享状态
在异步任务之间共享状态需要小心处理:
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::{Mutex, RwLock};
type SharedMap = Arc<Mutex<HashMap<String, String>>>;
async fn write_to_map(map: SharedMap, key: String, value: String) {
let mut guard = map.lock().await;
guard.insert(key, value);
}
async fn read_from_map(map: SharedMap, key: &str) -> Option<String> {
let guard = map.lock().await;
guard.get(key).cloned()
}
type SharedCache = Arc<RwLock<HashMap<String, Vec<u8>>>>;
async fn cache_put(cache: SharedCache, key: String, data: Vec<u8>) {
let mut w = cache.write().await;
w.insert(key, data);
}
async fn cache_get(cache: SharedCache, key: &str) -> Option<Vec<u8>> {
let r = cache.read().await;
r.get(key).cloned()
}
构建 Axum Web 服务
Axum 是建立在 Tokio 和 Tower 之上的领先 Rust Web 框架:
use axum::{
extract::{Path, State},
http::StatusCode,
response::Json,
routing::{get, post},
Router,
};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
#[derive(Debug, Clone, Serialize, Deserialize)]
struct User {
id: u64,
name: String,
email: String,
}
type UserStore = Arc<RwLock<HashMap<u64, User>>>;
async fn get_user(
Path(id): Path<u64>,
State(store): State<UserStore>,
) -> Result<Json<User>, StatusCode> {
let users = store.read().await;
users.get(&id)
.cloned()
.map(Json)
.ok_or(StatusCode::NOT_FOUND)
}
#[derive(Deserialize)]
struct CreateUser {
name: String,
email: String,
}
async fn create_user(
State(store): State<UserStore>,
Json(payload): Json<CreateUser>,
) -> (StatusCode, Json<User>) {
let mut users = store.write().await;
let id = users.len() as u64 + 1;
let user = User { id, name: payload.name, email: payload.email };
users.insert(id, user.clone());
(StatusCode::CREATED, Json(user))
}
#[tokio::main]
async fn main() {
let store: UserStore = Arc::new(RwLock::new(HashMap::new()));
let app = Router::new()
.route("/users", get(list_users).post(create_user))
.route("/users/:id", get(get_user))
.with_state(store);
let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
axum::serve(listener, app).await.unwrap();
}
使用 Tower 的中间件
use axum::middleware::{self, Next};
use axum::extract::Request;
use axum::response::Response;
use std::time::Instant;
async fn timing_middleware(req: Request, next: Next) -> Response {
let start = Instant::now();
let path = req.uri().path().to_string();
let method = req.method().clone();
let response = next.run(req).await;
let duration = start.elapsed();
println!("{} {} took {}ms", method, path, duration.as_millis());
response
}
async fn auth_middleware(req: Request, next: Next) -> Result<Response, StatusCode> {
let auth_header = req.headers()
.get("authorization")
.and_then(|v| v.to_str().ok());
match auth_header {
Some(token) if token.starts_with("Bearer ") => Ok(next.run(req).await),
_ => Err(StatusCode::UNAUTHORIZED),
}
}
性能调优
Tokio 运行时配置
#[tokio::main(flavor = "multi_thread", worker_threads = 4)]
async fn main() {
// 使用 4 个工作线程
}
fn build_runtime() -> tokio::runtime::Runtime {
tokio::runtime::Builder::new_multi_thread()
.worker_threads(num_cpus::get())
.enable_all()
.thread_stack_size(3 * 1024 * 1024)
.build()
.unwrap()
}
避免在异步上下文中阻塞
use tokio::task;
// 错误:阻塞 I/O 会阻塞 Tokio 线程
async fn bad_file_read(path: &str) -> String {
std::fs::read_to_string(path).unwrap() // 阻塞!
}
// 正确:使用 tokio 的异步 I/O
async fn good_file_read(path: &str) -> tokio::io::Result<String> {
tokio::fs::read_to_string(path).await
}
// 对于 CPU 密集型工作:使用 spawn_blocking
async fn cpu_heavy_work(data: Vec<u8>) -> Vec<u8> {
task::spawn_blocking(move || {
data.iter().map(|b| b.wrapping_add(1)).collect()
})
.await
.unwrap()
}
错误处理最佳实践
use thiserror::Error;
#[derive(Error, Debug)]
enum AppError {
#[error("User not found: {id}")]
NotFound { id: u64 },
#[error("Database error: {0}")]
Database(#[from] sqlx::Error),
#[error("Validation error: {message}")]
Validation { message: String },
}
impl axum::response::IntoResponse for AppError {
fn into_response(self) -> axum::response::Response {
let (status, message) = match self {
AppError::NotFound { id } => (
StatusCode::NOT_FOUND,
format!("User {} not found", id),
),
AppError::Database(e) => (
StatusCode::INTERNAL_SERVER_ERROR,
format!("Database error: {}", e),
),
AppError::Validation { message } => (
StatusCode::BAD_REQUEST,
message,
),
};
(status, Json(serde_json::json!({"error": message}))).into_response()
}
}
Rust 的异步生态系统在 2026 年提供了可与 C++ 媲美的卓越性能,同时具备使生产系统可靠的内存安全保证。Tokio 和 Axum 为处理每秒数百万请求且延迟可预测的服务提供了基础。