正在加载,请稍候…

使用 Kubernetes 和 Seldon Core 部署机器学习模型

学习如何使用 Kubernetes、Seldon Core 和 KServe 大规模部署机器学习模型,涵盖金丝雀部署、A/B 测试、影子模式和模型服务模式。

使用 Kubernetes 和 Seldon Core 部署机器学习模型

使用 Kubernetes 部署机器学习模型

Seldon Core 部署

# seldon-deployment.yaml
apiVersion: machinelearning.seldon.io/v1
kind: SeldonDeployment
metadata:
  name: churn-model
  namespace: ml-serving
spec:
  name: churn-predictor
  predictors:
    - name: default
      replicas: 3
      graph:
        name: classifier
        implementation: SKLEARN_SERVER
        modelUri: gs://my-models/churn-v1.2
        envSecretRefName: gcp-credentials
      componentSpecs:
        - spec:
            containers:
              - name: classifier
                resources:
                  requests:
                    cpu: "0.5"
                    memory: "1Gi"
                  limits:
                    cpu: "1"
                    memory: "2Gi"

使用 Kubernetes 和 Seldon Core 部署机器学习模型示意图

KServe InferenceService

# kserve-inference.yaml
apiVersion: serving.kserve.io/v1beta1
kind: InferenceService
metadata:
  name: sklearn-classifier
spec:
  predictor:
    sklearn:
      storageUri: "gs://my-models/sklearn/v1"
      resources:
        requests:
          cpu: "500m"
          memory: "1Gi"
  transformer:
    containers:
      - name: preprocessor
        image: myregistry/preprocessor:v1
        resources:
          requests:
            cpu: "250m"
            memory: "512Mi"

金丝雀部署

apiVersion: machinelearning.seldon.io/v1
kind: SeldonDeployment
metadata:
  name: churn-model-canary
spec:
  predictors:
    - name: main
      replicas: 4
      traffic: 90  # 90% 流量
      graph:
        name: v1
        modelUri: gs://models/churn-v1

    - name: canary
      replicas: 1
      traffic: 10  # 10% 金丝雀流量
      graph:
        name: v2
        modelUri: gs://models/churn-v2

使用 Kubernetes 和 Seldon Core 部署机器学习模型示意图

Python 模型服务器

from seldon_core.python_wrapper import SeldonComponent
import numpy as np
import joblib

class ChurnPredictor(SeldonComponent):
    def __init__(self):
        self.model = None

    def load(self):
        self.model = joblib.load("/mnt/models/model.pkl")
        self.scaler = joblib.load("/mnt/models/scaler.pkl")

    def predict(self, X: np.ndarray, features_names: list = None) -> np.ndarray:
        X_scaled = self.scaler.transform(X)
        probs = self.model.predict_proba(X_scaled)
        return probs

    def health_status(self) -> dict:
        return {"status": "OK", "model_loaded": self.model is not None}

使用 Prometheus 进行模型监控

from prometheus_client import Counter, Histogram, start_http_server
import time

prediction_counter = Counter("predictions_total", "Total predictions", ["model_version", "outcome"])
prediction_latency = Histogram("prediction_latency_seconds", "Prediction latency")
feature_drift = Gauge("feature_drift_score", "Feature drift score", ["feature"])

class MonitoredPredictor:
    def predict(self, X: np.ndarray) -> np.ndarray:
        start = time.time()

        predictions = self.model.predict(X)

        latency = time.time() - start
        prediction_latency.observe(latency)

        for pred in predictions:
            prediction_counter.labels(
                model_version=self.version,
                outcome=str(pred)
            ).inc()

        # 检查特征漂移
        for i, col in enumerate(self.feature_names):
            drift = self.drift_detector.score(X[:, i])
            feature_drift.labels(feature=col).set(drift)

        return predictions

使用 Kubernetes 和 Seldon Core 部署机器学习模型示意图

影子模式测试

# 影子模式:影子接收流量但不返回响应
apiVersion: machinelearning.seldon.io/v1
kind: SeldonDeployment
spec:
  predictors:
    - name: production
      traffic: 100
      graph:
        name: v1
        implementation: SKLEARN_SERVER
        modelUri: gs://models/v1

    - name: shadow
      shadow: true  # 影子模式 - 接收镜像流量
      traffic: 0
      graph:
        name: v2
        implementation: SKLEARN_SERVER
        modelUri: gs://models/v2

A/B 测试框架

import hashlib

class ABTestRouter:
    def __init__(self, experiment_config: dict):
        self.experiments = experiment_config

    def get_variant(self, user_id: str, experiment: str) -> str:
        config = self.experiments.get(experiment, {})
        variants = config.get("variants", ["control"])
        traffic = config.get("traffic", [100])

        # 基于 user_id 的确定性路由
        hash_val = int(hashlib.md5(f"{user_id}:{experiment}".encode()).hexdigest(), 16) % 100

        cumulative = 0
        for variant, pct in zip(variants, traffic):
            cumulative += pct
            if hash_val < cumulative:
                return variant

        return variants[-1]

    def record_outcome(self, user_id: str, experiment: str, variant: str, metric: float):
        # 存储到数据库进行分析
        pass

router = ABTestRouter({
    "churn_model": {
        "variants": ["control", "treatment"],
        "traffic": [50, 50],
    }
})

variant = router.get_variant("user-123", "churn_model")
# 根据 variant 使用相应的模型版本

部署检查清单

阶段 检查项
部署前 负载测试、延迟基准
金丝雀 监控错误率、p99 延迟
A/B 测试 统计显著性、业务指标
全量发布 逐步增加流量
部署后 漂移监控、反馈循环