正在加载,请稍候…

使用 Scikit-Learn 构建生产级 ML 流水线:从原型到部署

使用 scikit-learn 构建稳健的机器学习流水线,涵盖特征工程、交叉验证、超参数调优、模型序列化及生产部署模式。

使用 Scikit-Learn 构建生产级 ML 流水线:从原型到部署

一个可用的 Jupyter notebook 与生产级 ML 系统之间的差距是巨大的。本指南将向您展示如何弥合这一差距。

流水线抽象

Scikit-learn 的 Pipeline 将预处理和建模链接成一个可序列化的单元。

from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.ensemble import GradientBoostingClassifier

numerical_cols = ['age', 'income', 'tenure_days']
categorical_cols = ['country', 'plan_type', 'channel']

preprocessor = ColumnTransformer([
    ('num', Pipeline([
        ('imputer', SimpleImputer(strategy='median')),
        ('scaler', StandardScaler())
    ]), numerical_cols),
    ('cat', Pipeline([
        ('imputer', SimpleImputer(strategy='constant', fill_value='unknown')),
        ('encoder', OneHotEncoder(handle_unknown='ignore', sparse_output=False))
    ]), categorical_cols),
])

model = Pipeline([
    ('preprocessor', preprocessor),
    ('classifier', GradientBoostingClassifier(
        n_estimators=200, max_depth=4, learning_rate=0.05, random_state=42
    ))
])

使用 Scikit-Learn 构建生产级 ML 流水线:从原型到部署 插图

正确评估

from sklearn.model_selection import StratifiedKFold, cross_validate, TimeSeriesSplit

cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
scores = cross_validate(model, X, y, cv=cv,
    scoring=['roc_auc', 'precision', 'recall', 'f1'],
    return_train_score=True, n_jobs=-1)
print(f"AUC: {scores['test_roc_auc'].mean():.3f} ± {scores['test_roc_auc'].std():.3f}")

使用 Scikit-Learn 构建生产级 ML 流水线:从原型到部署 插图

超参数调优

from sklearn.model_selection import RandomizedSearchCV
from scipy.stats import randint, uniform

search = RandomizedSearchCV(
    model,
    {
        'classifier__n_estimators': randint(100, 500),
        'classifier__max_depth': randint(3, 8),
        'classifier__learning_rate': uniform(0.01, 0.2),
    },
    n_iter=50, cv=StratifiedKFold(n_splits=3),
    scoring='roc_auc', n_jobs=-1, random_state=42
)
search.fit(X_train, y_train)
best_model = search.best_estimator_

使用 Scikit-Learn 构建生产级 ML 流水线:从原型到部署 插图

模型注册表

import joblib, json
from datetime import datetime
from pathlib import Path

class ModelRegistry:
    def __init__(self, base_path: str):
        self.base_path = Path(base_path)
        self.base_path.mkdir(parents=True, exist_ok=True)

    def save(self, model, metadata: dict) -> str:
        model_id = f"model_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}"
        path = self.base_path / model_id
        path.mkdir()
        joblib.dump(model, path / 'model.joblib', compress=3)
        metadata['model_id'] = model_id
        json.dump(metadata, open(path / 'meta.json', 'w'), indent=2)
        return model_id

FastAPI 服务

from fastapi import FastAPI
from pydantic import BaseModel
import joblib, pandas as pd

app = FastAPI()
model = joblib.load('./models/current/model.joblib')

class PredictRequest(BaseModel):
    age: float
    income: float
    tenure_days: int
    country: str
    plan_type: str

@app.post("/predict")
async def predict(req: PredictRequest):
    df = pd.DataFrame([req.dict()])
    proba = float(model.predict_proba(df)[0, 1])
    return {"churn_probability": proba, "will_churn": proba > 0.5}

漂移检测

from scipy import stats
import numpy as np

class DriftDetector:
    def __init__(self, reference: pd.DataFrame, threshold: float = 0.05):
        self.reference = reference
        self.threshold = threshold

    def check(self, current: pd.DataFrame) -> dict:
        results = {}
        for col in self.reference.select_dtypes(include=np.number).columns:
            _, p = stats.ks_2samp(self.reference[col].dropna(), current[col].dropna())
            results[col] = {'p_value': p, 'drift': p < self.threshold}
        return results

生产级 ML 需要可测试的流水线、版本化的模型和漂移监控。代码是容易的部分。

→ 使用 JSON Viewer 工具分析您的模型输出。