
一个可用的 Jupyter notebook 与生产级 ML 系统之间的差距是巨大的。本指南将向您展示如何弥合这一差距。
流水线抽象
Scikit-learn 的 Pipeline 将预处理和建模链接成一个可序列化的单元。
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.ensemble import GradientBoostingClassifier
numerical_cols = ['age', 'income', 'tenure_days']
categorical_cols = ['country', 'plan_type', 'channel']
preprocessor = ColumnTransformer([
('num', Pipeline([
('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler())
]), numerical_cols),
('cat', Pipeline([
('imputer', SimpleImputer(strategy='constant', fill_value='unknown')),
('encoder', OneHotEncoder(handle_unknown='ignore', sparse_output=False))
]), categorical_cols),
])
model = Pipeline([
('preprocessor', preprocessor),
('classifier', GradientBoostingClassifier(
n_estimators=200, max_depth=4, learning_rate=0.05, random_state=42
))
])

正确评估
from sklearn.model_selection import StratifiedKFold, cross_validate, TimeSeriesSplit
cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
scores = cross_validate(model, X, y, cv=cv,
scoring=['roc_auc', 'precision', 'recall', 'f1'],
return_train_score=True, n_jobs=-1)
print(f"AUC: {scores['test_roc_auc'].mean():.3f} ± {scores['test_roc_auc'].std():.3f}")

超参数调优
from sklearn.model_selection import RandomizedSearchCV
from scipy.stats import randint, uniform
search = RandomizedSearchCV(
model,
{
'classifier__n_estimators': randint(100, 500),
'classifier__max_depth': randint(3, 8),
'classifier__learning_rate': uniform(0.01, 0.2),
},
n_iter=50, cv=StratifiedKFold(n_splits=3),
scoring='roc_auc', n_jobs=-1, random_state=42
)
search.fit(X_train, y_train)
best_model = search.best_estimator_

模型注册表
import joblib, json
from datetime import datetime
from pathlib import Path
class ModelRegistry:
def __init__(self, base_path: str):
self.base_path = Path(base_path)
self.base_path.mkdir(parents=True, exist_ok=True)
def save(self, model, metadata: dict) -> str:
model_id = f"model_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}"
path = self.base_path / model_id
path.mkdir()
joblib.dump(model, path / 'model.joblib', compress=3)
metadata['model_id'] = model_id
json.dump(metadata, open(path / 'meta.json', 'w'), indent=2)
return model_id
FastAPI 服务
from fastapi import FastAPI
from pydantic import BaseModel
import joblib, pandas as pd
app = FastAPI()
model = joblib.load('./models/current/model.joblib')
class PredictRequest(BaseModel):
age: float
income: float
tenure_days: int
country: str
plan_type: str
@app.post("/predict")
async def predict(req: PredictRequest):
df = pd.DataFrame([req.dict()])
proba = float(model.predict_proba(df)[0, 1])
return {"churn_probability": proba, "will_churn": proba > 0.5}
漂移检测
from scipy import stats
import numpy as np
class DriftDetector:
def __init__(self, reference: pd.DataFrame, threshold: float = 0.05):
self.reference = reference
self.threshold = threshold
def check(self, current: pd.DataFrame) -> dict:
results = {}
for col in self.reference.select_dtypes(include=np.number).columns:
_, p = stats.ks_2samp(self.reference[col].dropna(), current[col].dropna())
results[col] = {'p_value': p, 'drift': p < self.threshold}
return results
生产级 ML 需要可测试的流水线、版本化的模型和漂移监控。代码是容易的部分。
→ 使用 JSON Viewer 工具分析您的模型输出。