
使用 Kubernetes 部署机器学习模型
Seldon Core 部署
# seldon-deployment.yaml
apiVersion: machinelearning.seldon.io/v1
kind: SeldonDeployment
metadata:
name: churn-model
namespace: ml-serving
spec:
name: churn-predictor
predictors:
- name: default
replicas: 3
graph:
name: classifier
implementation: SKLEARN_SERVER
modelUri: gs://my-models/churn-v1.2
envSecretRefName: gcp-credentials
componentSpecs:
- spec:
containers:
- name: classifier
resources:
requests:
cpu: "0.5"
memory: "1Gi"
limits:
cpu: "1"
memory: "2Gi"

KServe InferenceService
# kserve-inference.yaml
apiVersion: serving.kserve.io/v1beta1
kind: InferenceService
metadata:
name: sklearn-classifier
spec:
predictor:
sklearn:
storageUri: "gs://my-models/sklearn/v1"
resources:
requests:
cpu: "500m"
memory: "1Gi"
transformer:
containers:
- name: preprocessor
image: myregistry/preprocessor:v1
resources:
requests:
cpu: "250m"
memory: "512Mi"
金丝雀部署
apiVersion: machinelearning.seldon.io/v1
kind: SeldonDeployment
metadata:
name: churn-model-canary
spec:
predictors:
- name: main
replicas: 4
traffic: 90 # 90% 流量
graph:
name: v1
modelUri: gs://models/churn-v1
- name: canary
replicas: 1
traffic: 10 # 10% 金丝雀流量
graph:
name: v2
modelUri: gs://models/churn-v2

Python 模型服务器
from seldon_core.python_wrapper import SeldonComponent
import numpy as np
import joblib
class ChurnPredictor(SeldonComponent):
def __init__(self):
self.model = None
def load(self):
self.model = joblib.load("/mnt/models/model.pkl")
self.scaler = joblib.load("/mnt/models/scaler.pkl")
def predict(self, X: np.ndarray, features_names: list = None) -> np.ndarray:
X_scaled = self.scaler.transform(X)
probs = self.model.predict_proba(X_scaled)
return probs
def health_status(self) -> dict:
return {"status": "OK", "model_loaded": self.model is not None}
使用 Prometheus 进行模型监控
from prometheus_client import Counter, Histogram, start_http_server
import time
prediction_counter = Counter("predictions_total", "Total predictions", ["model_version", "outcome"])
prediction_latency = Histogram("prediction_latency_seconds", "Prediction latency")
feature_drift = Gauge("feature_drift_score", "Feature drift score", ["feature"])
class MonitoredPredictor:
def predict(self, X: np.ndarray) -> np.ndarray:
start = time.time()
predictions = self.model.predict(X)
latency = time.time() - start
prediction_latency.observe(latency)
for pred in predictions:
prediction_counter.labels(
model_version=self.version,
outcome=str(pred)
).inc()
# 检查特征漂移
for i, col in enumerate(self.feature_names):
drift = self.drift_detector.score(X[:, i])
feature_drift.labels(feature=col).set(drift)
return predictions

影子模式测试
# 影子模式:影子接收流量但不返回响应
apiVersion: machinelearning.seldon.io/v1
kind: SeldonDeployment
spec:
predictors:
- name: production
traffic: 100
graph:
name: v1
implementation: SKLEARN_SERVER
modelUri: gs://models/v1
- name: shadow
shadow: true # 影子模式 - 接收镜像流量
traffic: 0
graph:
name: v2
implementation: SKLEARN_SERVER
modelUri: gs://models/v2
A/B 测试框架
import hashlib
class ABTestRouter:
def __init__(self, experiment_config: dict):
self.experiments = experiment_config
def get_variant(self, user_id: str, experiment: str) -> str:
config = self.experiments.get(experiment, {})
variants = config.get("variants", ["control"])
traffic = config.get("traffic", [100])
# 基于 user_id 的确定性路由
hash_val = int(hashlib.md5(f"{user_id}:{experiment}".encode()).hexdigest(), 16) % 100
cumulative = 0
for variant, pct in zip(variants, traffic):
cumulative += pct
if hash_val < cumulative:
return variant
return variants[-1]
def record_outcome(self, user_id: str, experiment: str, variant: str, metric: float):
# 存储到数据库进行分析
pass
router = ABTestRouter({
"churn_model": {
"variants": ["control", "treatment"],
"traffic": [50, 50],
}
})
variant = router.get_variant("user-123", "churn_model")
# 根据 variant 使用相应的模型版本
部署检查清单
| 阶段 |
检查项 |
| 部署前 |
负载测试、延迟基准 |
| 金丝雀 |
监控错误率、p99 延迟 |
| A/B 测试 |
统计显著性、业务指标 |
| 全量发布 |
逐步增加流量 |
| 部署后 |
漂移监控、反馈循环 |