
MLOps:生产级 ML 流水线
使用 MLflow 进行实验跟踪
import mlflow, mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, f1_score
mlflow.set_tracking_uri("http://mlflow-server:5000")
mlflow.set_experiment("customer-churn-prediction")
def train(X_train, y_train, X_test, y_test, params):
with mlflow.start_run():
mlflow.log_params(params)
model = RandomForestClassifier(**params, random_state=42)
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
mlflow.log_metrics({
"accuracy": accuracy_score(y_test, y_pred),
"f1": f1_score(y_test, y_pred, average="weighted"),
})
from mlflow.models.signature import infer_signature
mlflow.sklearn.log_model(
model, "model",
signature=infer_signature(X_train, y_pred),
registered_model_name="customer-churn-rf",
)

模型注册
from mlflow.tracking import MlflowClient
def promote(model_name, run_id, stage="Staging"):
client = MlflowClient()
versions = client.search_model_versions(f"run_id='{run_id}'")
version = versions[0].version
client.transition_model_version_stage(
name=model_name, version=version, stage=stage,
archive_existing_versions=True,
)
print(f"{model_name} v{version} -> {stage}")

Kubeflow 流水线
import kfp
from kfp import dsl
@dsl.component(packages_to_install=["scikit-learn", "pandas", "mlflow"])
def train_component(data: kfp.dsl.Input[kfp.dsl.Dataset],
model_out: kfp.dsl.Output[kfp.dsl.Model]):
import pandas as pd, pickle
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
df = pd.read_csv(data.path)
X, y = df.drop("churn", axis=1), df["churn"]
X_tr, X_te, y_tr, y_te = train_test_split(X, y, test_size=0.2)
model = RandomForestClassifier(n_estimators=200)
model.fit(X_tr, y_tr)
with open(model_out.path, "wb") as f:
pickle.dump(model, f)
@dsl.pipeline(name="churn-pipeline")
def pipeline(raw_data: str):
train_component(data=preprocess(raw_data).outputs["out"])

漂移检测
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset
class DriftMonitor:
def __init__(self, reference_data):
self.ref = reference_data
def check(self, current_data) -> bool:
report = Report(metrics=[DataDriftPreset()])
report.run(reference_data=self.ref, current_data=current_data)
return report.as_dict()["metrics"][0]["result"]["dataset_drift"]
MLOps 成熟度等级
| 等级 |
描述 |
| 0 |
手动笔记本 |
| 1 |
ML 流水线自动化 |
| 2 |
ML 的 CI/CD |
| 3 |
完整 MLOps + 监控 |