08-MLOps与工程落地——模型注册表与模型服务
模型注册表与模型服务(MLflow Model Registry、Seldon Core)
一、模型注册表概述
1.1 什么是模型注册表?
importmatplotlib.pyplotaspltfrommatplotlib.patchesimportRectangle,FancyBboxPatchimportwarnings warnings.filterwarnings('ignore')print("="*60)print("模型注册表:集中管理模型版本")print("="*60)# 模型注册表架构图fig,ax=plt.subplots(figsize=(12,8))ax.axis('off')# 组件components={'训练流水线':(0.2,0.7),'模型注册表':(0.5,0.7),'模型服务':(0.8,0.7),'版本管理':(0.2,0.4),'阶段转换':(0.5,0.4),'监控告警':(0.8,0.4),}forname,(x,y)incomponents.items():circle=plt.Circle((x,y),0.08,color='lightblue',ec='black')ax.add_patch(circle)ax.text(x,y,name,ha='center',va='center',fontsize=8)# 连接ax.annotate('',xy=(0.4,0.7),xytext=(0.28,0.7),arrowprops=dict(arrowstyle='->',lw=2))ax.annotate('',xy=(0.72,0.7),xytext=(0.58,0.7),arrowprops=dict(arrowstyle='->',lw=2))ax.annotate('',xy=(0.5,0.62),xytext=(0.5,0.48),arrowprops=dict(arrowstyle='->',lw=1))ax.annotate('',xy=(0.8,0.62),xytext=(0.8,0.48),arrowprops=dict(arrowstyle='->',lw=1))ax.set_xlim(0,1)ax.set_ylim(0,1)ax.set_title('模型注册表架构',fontsize=14)plt.tight_layout()plt.show()print("\n💡 模型注册表核心功能:")print(" - 模型版本管理")print(" - 阶段转换(Staging/Production/Archived)")print(" - 模型元数据存储")print(" - 模型血缘追踪")print(" - 部署集成")二、MLflow Model Registry
2.1 模型注册
defmlflow_registry():"""MLflow模型注册"""print("\n"+"="*60)print("MLflow Model Registry")print("="*60)code=""" import mlflow import mlflow.sklearn from mlflow.tracking import MlflowClient from sklearn.ensemble import RandomForestClassifier from sklearn.datasets import make_classification from sklearn.model_selection import train_test_split # 1. 设置MLflow mlflow.set_tracking_uri("http://localhost:5000") mlflow.set_experiment("model_registry_demo") # 2. 训练和注册模型 with mlflow.start_run(run_name="model_v1") as run: # 训练模型 X, y = make_classification(n_samples=1000, n_features=20, random_state=42) X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2) model = RandomForestClassifier(n_estimators=100, max_depth=10) model.fit(X_train, y_train) # 记录参数和指标 mlflow.log_param("n_estimators", 100) mlflow.log_param("max_depth", 10) mlflow.log_metric("accuracy", model.score(X_test, y_test)) # 注册模型 mlflow.sklearn.log_model( model, "model", registered_model_name="production_model" ) run_id = run.info.run_id # 3. 使用Client管理模型 client = MlflowClient() # 创建注册模型 client.create_registered_model("production_model") # 添加模型版本 model_uri = f"runs:/{run_id}/model" client.create_model_version( name="production_model", source=model_uri, run_id=run_id, description="Random Forest model v1" ) # 4. 模型版本转换 # 转换到Staging client.transition_model_version_stage( name="production_model", version=1, stage="Staging" ) # 转换到Production client.transition_model_version_stage( name="production_model", version=1, stage="Production" ) # 5. 更新模型描述 client.update_model_version( name="production_model", version=1, description="Improved with feature engineering" ) # 6. 添加模型标签 client.set_model_version_tag( name="production_model", version=1, key="accuracy", value="0.95" ) # 7. 获取模型版本信息 model_version = client.get_model_version("production_model", 1) print(f"Version: {model_version.version}") print(f"Stage: {model_version.stage}") print(f"Description: {model_version.description}") # 8. 列出所有版本 versions = client.search_model_versions("name='production_model'") for v in versions: print(f"Version {v.version}: Stage {v.stage}") # 9. 加载生产模型 model = mlflow.pyfunc.load_model("models:/production_model/Production") """print(code)mlflow_registry()2.2 模型版本管理
defmodel_version_management():"""模型版本管理"""print("\n"+"="*60)print("模型版本管理")print("="*60)code=""" from mlflow.tracking import MlflowClient import mlflow client = MlflowClient() # 1. 注册多个版本 versions = [ {"n_estimators": 50, "max_depth": 5}, {"n_estimators": 100, "max_depth": 10}, {"n_estimators": 150, "max_depth": 15}, {"n_estimators": 200, "max_depth": 20} ] for i, params in enumerate(versions, 1): with mlflow.start_run(run_name=f"model_v{i}"): # 训练模型 model = RandomForestClassifier(**params) model.fit(X_train, y_train) # 记录参数 mlflow.log_params(params) mlflow.log_metric("accuracy", model.score(X_test, y_test)) # 注册模型 mlflow.sklearn.log_model( model, "model", registered_model_name="production_model" ) # 2. 比较模型版本 def compare_models(model_name): versions = client.search_model_versions(f"name='{model_name}'") results = [] for v in versions: # 获取运行ID run_id = v.run_id run = mlflow.get_run(run_id) results.append({ "version": v.version, "stage": v.stage, "accuracy": run.data.metrics.get("accuracy"), "params": run.data.params }) # 按准确率排序 results.sort(key=lambda x: x["accuracy"], reverse=True) for r in results: print(f"Version {r['version']} ({r['stage']}): accuracy={r['accuracy']:.4f}") return results # 3. 设置模型别名 client.set_registered_model_alias("production_model", "champion", "3") client.set_registered_model_alias("production_model", "challenger", "4") # 使用别名加载 champion_model = mlflow.pyfunc.load_model("models:/production_model@champion") challenger_model = mlflow.pyfunc.load_model("models:/production_model@challenger") # 4. A/B测试 def ab_test(champion_model, challenger_model, test_data, ratio=0.1): """A/B测试""" import random results = {"champion": 0, "challenger": 0} for x in test_data: if random.random() < ratio: # 使用挑战者模型 pred = challenger_model.predict(x) results["challenger"] += 1 else: # 使用冠军模型 pred = champion_model.predict(x) results["champion"] += 1 return results # 5. 模型回滚 def rollback_model(model_name, target_version): """回滚到指定版本""" # 获取当前生产版本 current = client.get_latest_versions(model_name, stages=["Production"])[0] # 将当前版本归档 client.transition_model_version_stage( name=model_name, version=current.version, stage="Archived" ) # 将目标版本提升为生产 client.transition_model_version_stage( name=model_name, version=target_version, stage="Production" ) print(f"Rolled back from version {current.version} to {target_version}") """print(code)model_version_management()三、Seldon Core模型服务
3.1 Seldon Core部署
defseldon_core():"""Seldon Core部署"""print("\n"+"="*60)print("Seldon Core模型服务")print("="*60)code=""" # 1. Seldon Deployment定义 apiVersion: machinelearning.seldon.io/v1 kind: SeldonDeployment metadata: name: sklearn-model namespace: production spec: name: sklearn-deployment predictors: - name: default graph: name: classifier type: MODEL modelUri: gs://seldon-models/sklearn/iris implementation: SKLEARN_SERVER replicas: 2 traffic: 100 # 2. 多模型部署(金丝雀) apiVersion: machinelearning.seldon.io/v1 kind: SeldonDeployment metadata: name: canary-deployment spec: predictors: - name: stable graph: name: stable-model modelUri: gs://models/stable implementation: SKLEARN_SERVER replicas: 2 traffic: 90 - name: canary graph: name: canary-model modelUri: gs://models/canary implementation: SKLEARN_SERVER replicas: 1 traffic: 10 # 3. 自定义模型服务 apiVersion: machinelearning.seldon.io/v1 kind: SeldonDeployment metadata: name: custom-model spec: predictors: - name: custom graph: name: custom-model type: MODEL implementation: CUSTOM modelUri: gs://models/custom envSecretRefName: model-secrets serviceAccountName: seldon-service-account resources: requests: memory: 2Gi cpu: 1 limits: memory: 4Gi cpu: 2 replicas: 2 # 4. 推理图(Pipeline) apiVersion: machinelearning.seldon.io/v1 kind: SeldonDeployment metadata: name: inference-graph spec: predictors: - name: pipeline graph: name: transformer type: TRANSFORMER children: - name: model1 type: MODEL children: - name: model2 type: MODEL endpoint: type: REST """print(code)seldon_core()3.2 模型服务集成
defseldon_integration():"""Seldon Core集成"""print("\n"+"="*60)print("Seldon Core集成")print("="*60)code=""" # 1. Python自定义模型服务 import numpy as np import joblib from seldon_core.user_model import SeldonComponent class CustomModel(SeldonComponent): def __init__(self, model_path): self.model = joblib.load(model_path) def predict(self, X, features_names=None, **kwargs): """预测接口""" predictions = self.model.predict(X) return predictions def predict_proba(self, X, features_names=None, **kwargs): """概率预测接口""" probabilities = self.model.predict_proba(X) return probabilities # 2. Dockerfile FROM python:3.9-slim RUN pip install seldon-core scikit-learn joblib COPY model.pkl /model.pkl COPY model_server.py /model_server.py CMD ["python", "/model_server.py"] # 3. 服务请求示例 import requests import json # REST请求 def predict_rest(features): url = "http://seldon-service.production.svc.cluster.local/v1/models/default/predict" payload = { "data": { "ndarray": [features] } } response = requests.post(url, json=payload) return response.json() # gRPC请求 import grpc from seldon_core.proto import prediction_pb2, prediction_pb2_grpc def predict_grpc(features): channel = grpc.insecure_channel('seldon-service.production:8000') stub = prediction_pb2_grpc.SeldonStub(channel) request = prediction_pb2.SeldonMessage( data=prediction_pb2.DefaultData( ndarray=prediction_pb2.NDArray(values=features) ) ) response = stub.Predict(request) return response # 4. 性能测试 def benchmark_seldon(): import time features = [[1, 2, 3, 4]] n_requests = 100 start = time.time() for _ in range(n_requests): predict_rest(features) elapsed = time.time() - start print(f"Average latency: {elapsed/n_requests*1000:.2f}ms") print(f"Throughput: {n_requests/elapsed:.2f} req/s") """print(code)seldon_integration()四、端到端工作流
4.1 完整MLOps流程
defend_to_end_workflow():"""端到端MLOps工作流"""print("\n"+"="*60)print("端到端MLOps工作流")print("="*60)code=""" import mlflow from mlflow.tracking import MlflowClient import requests import json class MLOpsPipeline: def __init__(self, model_name, tracking_uri="http://mlflow:5000"): self.model_name = model_name mlflow.set_tracking_uri(tracking_uri) self.client = MlflowClient() def train_and_register(self, training_func, version, params): """训练并注册模型""" with mlflow.start_run(run_name=f"train_v{version}"): # 训练模型 model = training_func(**params) # 记录参数和指标 mlflow.log_params(params) accuracy = evaluate_model(model) mlflow.log_metric("accuracy", accuracy) # 注册模型 mlflow.sklearn.log_model( model, "model", registered_model_name=self.model_name ) # 获取run_id run_id = mlflow.active_run().info.run_id # 添加标签 self.client.set_model_version_tag( name=self.model_name, version=version, key="accuracy", value=str(accuracy) ) return run_id def promote_to_staging(self, version): """提升到Staging环境""" self.client.transition_model_version_stage( name=self.model_name, version=version, stage="Staging" ) # 部署到Staging self._deploy_to_environment("staging", version) def promote_to_production(self, version): """提升到Production环境""" # 获取当前生产版本 current = self.client.get_latest_versions(self.model_name, stages=["Production"]) # 验证新模型性能 if not self._validate_model(version): raise Exception("Model validation failed") # 执行金丝雀部署 self._canary_deploy(version, current[0].version if current else None) # 转换阶段 self.client.transition_model_version_stage( name=self.model_name, version=version, stage="Production" ) if current: self.client.transition_model_version_stage( name=self.model_name, version=current[0].version, stage="Archived" ) def _deploy_to_environment(self, env, version): """部署到指定环境""" model_uri = f"models:/{self.model_name}/{version}" # 创建Seldon部署 deployment = { "apiVersion": "machinelearning.seldon.io/v1", "kind": "SeldonDeployment", "metadata": { "name": f"{self.model_name}-{env}", "namespace": env }, "spec": { "predictors": [{ "name": "default", "graph": { "name": "classifier", "implementation": "SKLEARN_SERVER", "modelUri": model_uri }, "replicas": 2 }] } } # 应用部署 # kubectl.apply(deployment) def _validate_model(self, version): """验证模型性能""" model = mlflow.pyfunc.load_model(f"models:/{self.model_name}/{version}") # 在验证集上测试 accuracy = test_model(model) return accuracy > 0.85 def _canary_deploy(self, new_version, old_version): """金丝雀部署""" # 创建金丝雀部署配置 pass # 使用示例 pipeline = MLOpsPipeline("production_model") # 训练新版本 pipeline.train_and_register(train_func, version=5, params={"n_estimators": 150}) # 提升到Staging pipeline.promote_to_staging(5) # 测试验证后提升到Production pipeline.promote_to_production(5) """print(code)end_to_end_workflow()五、总结
| 组件 | 功能 | 工具 |
|---|---|---|
| 模型注册 | 版本管理 | MLflow Registry |
| 阶段管理 | Staging/Production | MLflow |
| 模型服务 | 推理部署 | Seldon Core |
| 金丝雀 | 灰度发布 | Seldon Core |
| 监控 | 性能监控 | Prometheus |
最佳实践:
- 使用MLflow Registry管理模型版本
- 实施阶段转换流程
- 使用Seldon Core进行生产部署
- 实施金丝雀发布策略
- 集成监控和告警
