正在加载,请稍候…

MLOps:使用 MLflow 和 Kubeflow 构建生产级 ML 流水线

掌握生产级 ML 系统的 MLOps,涵盖使用 MLflow 的实验跟踪、流水线编排、模型注册、A/B 测试和漂移检测。

MLOps:使用 MLflow 和 Kubeflow 构建生产级 ML 流水线

MLOps:生产级 ML 流水线

使用 MLflow 进行实验跟踪

import mlflow, mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, f1_score

mlflow.set_tracking_uri("http://mlflow-server:5000")
mlflow.set_experiment("customer-churn-prediction")

def train(X_train, y_train, X_test, y_test, params):
    with mlflow.start_run():
        mlflow.log_params(params)
        model = RandomForestClassifier(**params, random_state=42)
        model.fit(X_train, y_train)
        y_pred = model.predict(X_test)
        mlflow.log_metrics({
            "accuracy": accuracy_score(y_test, y_pred),
            "f1": f1_score(y_test, y_pred, average="weighted"),
        })
        from mlflow.models.signature import infer_signature
        mlflow.sklearn.log_model(
            model, "model",
            signature=infer_signature(X_train, y_pred),
            registered_model_name="customer-churn-rf",
        )

MLOps:使用 MLflow 和 Kubeflow 构建生产级 ML 流水线 插图

模型注册

from mlflow.tracking import MlflowClient

def promote(model_name, run_id, stage="Staging"):
    client = MlflowClient()
    versions = client.search_model_versions(f"run_id='{run_id}'")
    version = versions[0].version
    client.transition_model_version_stage(
        name=model_name, version=version, stage=stage,
        archive_existing_versions=True,
    )
    print(f"{model_name} v{version} -> {stage}")

MLOps:使用 MLflow 和 Kubeflow 构建生产级 ML 流水线 插图

Kubeflow 流水线

import kfp
from kfp import dsl

@dsl.component(packages_to_install=["scikit-learn", "pandas", "mlflow"])
def train_component(data: kfp.dsl.Input[kfp.dsl.Dataset],
                    model_out: kfp.dsl.Output[kfp.dsl.Model]):
    import pandas as pd, pickle
    from sklearn.ensemble import RandomForestClassifier
    from sklearn.model_selection import train_test_split
    df = pd.read_csv(data.path)
    X, y = df.drop("churn", axis=1), df["churn"]
    X_tr, X_te, y_tr, y_te = train_test_split(X, y, test_size=0.2)
    model = RandomForestClassifier(n_estimators=200)
    model.fit(X_tr, y_tr)
    with open(model_out.path, "wb") as f:
        pickle.dump(model, f)

@dsl.pipeline(name="churn-pipeline")
def pipeline(raw_data: str):
    train_component(data=preprocess(raw_data).outputs["out"])

MLOps:使用 MLflow 和 Kubeflow 构建生产级 ML 流水线 插图

漂移检测

from evidently.report import Report
from evidently.metric_preset import DataDriftPreset

class DriftMonitor:
    def __init__(self, reference_data):
        self.ref = reference_data

    def check(self, current_data) -> bool:
        report = Report(metrics=[DataDriftPreset()])
        report.run(reference_data=self.ref, current_data=current_data)
        return report.as_dict()["metrics"][0]["result"]["dataset_drift"]

MLOps 成熟度等级

等级 描述
0 手动笔记本
1 ML 流水线自动化
2 ML 的 CI/CD
3 完整 MLOps + 监控