当前位置: 首页 > news >正文

08-MLOps与工程落地——模型注册表与模型服务

模型注册表与模型服务(MLflow Model Registry、Seldon Core)

一、模型注册表概述

1.1 什么是模型注册表?

importmatplotlib.pyplotaspltfrommatplotlib.patchesimportRectangle,FancyBboxPatchimportwarnings warnings.filterwarnings('ignore')print("="*60)print("模型注册表:集中管理模型版本")print("="*60)# 模型注册表架构图fig,ax=plt.subplots(figsize=(12,8))ax.axis('off')# 组件components={'训练流水线':(0.2,0.7),'模型注册表':(0.5,0.7),'模型服务':(0.8,0.7),'版本管理':(0.2,0.4),'阶段转换':(0.5,0.4),'监控告警':(0.8,0.4),}forname,(x,y)incomponents.items():circle=plt.Circle((x,y),0.08,color='lightblue',ec='black')ax.add_patch(circle)ax.text(x,y,name,ha='center',va='center',fontsize=8)# 连接ax.annotate('',xy=(0.4,0.7),xytext=(0.28,0.7),arrowprops=dict(arrowstyle='->',lw=2))ax.annotate('',xy=(0.72,0.7),xytext=(0.58,0.7),arrowprops=dict(arrowstyle='->',lw=2))ax.annotate('',xy=(0.5,0.62),xytext=(0.5,0.48),arrowprops=dict(arrowstyle='->',lw=1))ax.annotate('',xy=(0.8,0.62),xytext=(0.8,0.48),arrowprops=dict(arrowstyle='->',lw=1))ax.set_xlim(0,1)ax.set_ylim(0,1)ax.set_title('模型注册表架构',fontsize=14)plt.tight_layout()plt.show()print("\n💡 模型注册表核心功能:")print(" - 模型版本管理")print(" - 阶段转换(Staging/Production/Archived)")print(" - 模型元数据存储")print(" - 模型血缘追踪")print(" - 部署集成")

二、MLflow Model Registry

2.1 模型注册

defmlflow_registry():"""MLflow模型注册"""print("\n"+"="*60)print("MLflow Model Registry")print("="*60)code=""" import mlflow import mlflow.sklearn from mlflow.tracking import MlflowClient from sklearn.ensemble import RandomForestClassifier from sklearn.datasets import make_classification from sklearn.model_selection import train_test_split # 1. 设置MLflow mlflow.set_tracking_uri("http://localhost:5000") mlflow.set_experiment("model_registry_demo") # 2. 训练和注册模型 with mlflow.start_run(run_name="model_v1") as run: # 训练模型 X, y = make_classification(n_samples=1000, n_features=20, random_state=42) X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2) model = RandomForestClassifier(n_estimators=100, max_depth=10) model.fit(X_train, y_train) # 记录参数和指标 mlflow.log_param("n_estimators", 100) mlflow.log_param("max_depth", 10) mlflow.log_metric("accuracy", model.score(X_test, y_test)) # 注册模型 mlflow.sklearn.log_model( model, "model", registered_model_name="production_model" ) run_id = run.info.run_id # 3. 使用Client管理模型 client = MlflowClient() # 创建注册模型 client.create_registered_model("production_model") # 添加模型版本 model_uri = f"runs:/{run_id}/model" client.create_model_version( name="production_model", source=model_uri, run_id=run_id, description="Random Forest model v1" ) # 4. 模型版本转换 # 转换到Staging client.transition_model_version_stage( name="production_model", version=1, stage="Staging" ) # 转换到Production client.transition_model_version_stage( name="production_model", version=1, stage="Production" ) # 5. 更新模型描述 client.update_model_version( name="production_model", version=1, description="Improved with feature engineering" ) # 6. 添加模型标签 client.set_model_version_tag( name="production_model", version=1, key="accuracy", value="0.95" ) # 7. 获取模型版本信息 model_version = client.get_model_version("production_model", 1) print(f"Version: {model_version.version}") print(f"Stage: {model_version.stage}") print(f"Description: {model_version.description}") # 8. 列出所有版本 versions = client.search_model_versions("name='production_model'") for v in versions: print(f"Version {v.version}: Stage {v.stage}") # 9. 加载生产模型 model = mlflow.pyfunc.load_model("models:/production_model/Production") """print(code)mlflow_registry()

2.2 模型版本管理

defmodel_version_management():"""模型版本管理"""print("\n"+"="*60)print("模型版本管理")print("="*60)code=""" from mlflow.tracking import MlflowClient import mlflow client = MlflowClient() # 1. 注册多个版本 versions = [ {"n_estimators": 50, "max_depth": 5}, {"n_estimators": 100, "max_depth": 10}, {"n_estimators": 150, "max_depth": 15}, {"n_estimators": 200, "max_depth": 20} ] for i, params in enumerate(versions, 1): with mlflow.start_run(run_name=f"model_v{i}"): # 训练模型 model = RandomForestClassifier(**params) model.fit(X_train, y_train) # 记录参数 mlflow.log_params(params) mlflow.log_metric("accuracy", model.score(X_test, y_test)) # 注册模型 mlflow.sklearn.log_model( model, "model", registered_model_name="production_model" ) # 2. 比较模型版本 def compare_models(model_name): versions = client.search_model_versions(f"name='{model_name}'") results = [] for v in versions: # 获取运行ID run_id = v.run_id run = mlflow.get_run(run_id) results.append({ "version": v.version, "stage": v.stage, "accuracy": run.data.metrics.get("accuracy"), "params": run.data.params }) # 按准确率排序 results.sort(key=lambda x: x["accuracy"], reverse=True) for r in results: print(f"Version {r['version']} ({r['stage']}): accuracy={r['accuracy']:.4f}") return results # 3. 设置模型别名 client.set_registered_model_alias("production_model", "champion", "3") client.set_registered_model_alias("production_model", "challenger", "4") # 使用别名加载 champion_model = mlflow.pyfunc.load_model("models:/production_model@champion") challenger_model = mlflow.pyfunc.load_model("models:/production_model@challenger") # 4. A/B测试 def ab_test(champion_model, challenger_model, test_data, ratio=0.1): """A/B测试""" import random results = {"champion": 0, "challenger": 0} for x in test_data: if random.random() < ratio: # 使用挑战者模型 pred = challenger_model.predict(x) results["challenger"] += 1 else: # 使用冠军模型 pred = champion_model.predict(x) results["champion"] += 1 return results # 5. 模型回滚 def rollback_model(model_name, target_version): """回滚到指定版本""" # 获取当前生产版本 current = client.get_latest_versions(model_name, stages=["Production"])[0] # 将当前版本归档 client.transition_model_version_stage( name=model_name, version=current.version, stage="Archived" ) # 将目标版本提升为生产 client.transition_model_version_stage( name=model_name, version=target_version, stage="Production" ) print(f"Rolled back from version {current.version} to {target_version}") """print(code)model_version_management()

三、Seldon Core模型服务

3.1 Seldon Core部署

defseldon_core():"""Seldon Core部署"""print("\n"+"="*60)print("Seldon Core模型服务")print("="*60)code=""" # 1. Seldon Deployment定义 apiVersion: machinelearning.seldon.io/v1 kind: SeldonDeployment metadata: name: sklearn-model namespace: production spec: name: sklearn-deployment predictors: - name: default graph: name: classifier type: MODEL modelUri: gs://seldon-models/sklearn/iris implementation: SKLEARN_SERVER replicas: 2 traffic: 100 # 2. 多模型部署(金丝雀) apiVersion: machinelearning.seldon.io/v1 kind: SeldonDeployment metadata: name: canary-deployment spec: predictors: - name: stable graph: name: stable-model modelUri: gs://models/stable implementation: SKLEARN_SERVER replicas: 2 traffic: 90 - name: canary graph: name: canary-model modelUri: gs://models/canary implementation: SKLEARN_SERVER replicas: 1 traffic: 10 # 3. 自定义模型服务 apiVersion: machinelearning.seldon.io/v1 kind: SeldonDeployment metadata: name: custom-model spec: predictors: - name: custom graph: name: custom-model type: MODEL implementation: CUSTOM modelUri: gs://models/custom envSecretRefName: model-secrets serviceAccountName: seldon-service-account resources: requests: memory: 2Gi cpu: 1 limits: memory: 4Gi cpu: 2 replicas: 2 # 4. 推理图(Pipeline) apiVersion: machinelearning.seldon.io/v1 kind: SeldonDeployment metadata: name: inference-graph spec: predictors: - name: pipeline graph: name: transformer type: TRANSFORMER children: - name: model1 type: MODEL children: - name: model2 type: MODEL endpoint: type: REST """print(code)seldon_core()

3.2 模型服务集成

defseldon_integration():"""Seldon Core集成"""print("\n"+"="*60)print("Seldon Core集成")print("="*60)code=""" # 1. Python自定义模型服务 import numpy as np import joblib from seldon_core.user_model import SeldonComponent class CustomModel(SeldonComponent): def __init__(self, model_path): self.model = joblib.load(model_path) def predict(self, X, features_names=None, **kwargs): """预测接口""" predictions = self.model.predict(X) return predictions def predict_proba(self, X, features_names=None, **kwargs): """概率预测接口""" probabilities = self.model.predict_proba(X) return probabilities # 2. Dockerfile FROM python:3.9-slim RUN pip install seldon-core scikit-learn joblib COPY model.pkl /model.pkl COPY model_server.py /model_server.py CMD ["python", "/model_server.py"] # 3. 服务请求示例 import requests import json # REST请求 def predict_rest(features): url = "http://seldon-service.production.svc.cluster.local/v1/models/default/predict" payload = { "data": { "ndarray": [features] } } response = requests.post(url, json=payload) return response.json() # gRPC请求 import grpc from seldon_core.proto import prediction_pb2, prediction_pb2_grpc def predict_grpc(features): channel = grpc.insecure_channel('seldon-service.production:8000') stub = prediction_pb2_grpc.SeldonStub(channel) request = prediction_pb2.SeldonMessage( data=prediction_pb2.DefaultData( ndarray=prediction_pb2.NDArray(values=features) ) ) response = stub.Predict(request) return response # 4. 性能测试 def benchmark_seldon(): import time features = [[1, 2, 3, 4]] n_requests = 100 start = time.time() for _ in range(n_requests): predict_rest(features) elapsed = time.time() - start print(f"Average latency: {elapsed/n_requests*1000:.2f}ms") print(f"Throughput: {n_requests/elapsed:.2f} req/s") """print(code)seldon_integration()

四、端到端工作流

4.1 完整MLOps流程

defend_to_end_workflow():"""端到端MLOps工作流"""print("\n"+"="*60)print("端到端MLOps工作流")print("="*60)code=""" import mlflow from mlflow.tracking import MlflowClient import requests import json class MLOpsPipeline: def __init__(self, model_name, tracking_uri="http://mlflow:5000"): self.model_name = model_name mlflow.set_tracking_uri(tracking_uri) self.client = MlflowClient() def train_and_register(self, training_func, version, params): """训练并注册模型""" with mlflow.start_run(run_name=f"train_v{version}"): # 训练模型 model = training_func(**params) # 记录参数和指标 mlflow.log_params(params) accuracy = evaluate_model(model) mlflow.log_metric("accuracy", accuracy) # 注册模型 mlflow.sklearn.log_model( model, "model", registered_model_name=self.model_name ) # 获取run_id run_id = mlflow.active_run().info.run_id # 添加标签 self.client.set_model_version_tag( name=self.model_name, version=version, key="accuracy", value=str(accuracy) ) return run_id def promote_to_staging(self, version): """提升到Staging环境""" self.client.transition_model_version_stage( name=self.model_name, version=version, stage="Staging" ) # 部署到Staging self._deploy_to_environment("staging", version) def promote_to_production(self, version): """提升到Production环境""" # 获取当前生产版本 current = self.client.get_latest_versions(self.model_name, stages=["Production"]) # 验证新模型性能 if not self._validate_model(version): raise Exception("Model validation failed") # 执行金丝雀部署 self._canary_deploy(version, current[0].version if current else None) # 转换阶段 self.client.transition_model_version_stage( name=self.model_name, version=version, stage="Production" ) if current: self.client.transition_model_version_stage( name=self.model_name, version=current[0].version, stage="Archived" ) def _deploy_to_environment(self, env, version): """部署到指定环境""" model_uri = f"models:/{self.model_name}/{version}" # 创建Seldon部署 deployment = { "apiVersion": "machinelearning.seldon.io/v1", "kind": "SeldonDeployment", "metadata": { "name": f"{self.model_name}-{env}", "namespace": env }, "spec": { "predictors": [{ "name": "default", "graph": { "name": "classifier", "implementation": "SKLEARN_SERVER", "modelUri": model_uri }, "replicas": 2 }] } } # 应用部署 # kubectl.apply(deployment) def _validate_model(self, version): """验证模型性能""" model = mlflow.pyfunc.load_model(f"models:/{self.model_name}/{version}") # 在验证集上测试 accuracy = test_model(model) return accuracy > 0.85 def _canary_deploy(self, new_version, old_version): """金丝雀部署""" # 创建金丝雀部署配置 pass # 使用示例 pipeline = MLOpsPipeline("production_model") # 训练新版本 pipeline.train_and_register(train_func, version=5, params={"n_estimators": 150}) # 提升到Staging pipeline.promote_to_staging(5) # 测试验证后提升到Production pipeline.promote_to_production(5) """print(code)end_to_end_workflow()

五、总结

组件功能工具
模型注册版本管理MLflow Registry
阶段管理Staging/ProductionMLflow
模型服务推理部署Seldon Core
金丝雀灰度发布Seldon Core
监控性能监控Prometheus

最佳实践:

  1. 使用MLflow Registry管理模型版本
  2. 实施阶段转换流程
  3. 使用Seldon Core进行生产部署
  4. 实施金丝雀发布策略
  5. 集成监控和告警
http://www.jsqmd.com/news/772965/

相关文章:

  • 如何通过3步解锁QQ群聊天记录的隐藏价值:ChatLog完整指南
  • 重构搜索范式:阿里云 Elasticsearch 开启“Agent 原生”时代,打造企业级 AI 记忆湖
  • 【新人专属】OpenClaw 2.6.6 Windows 11 一键部署完整教程(包含安装包)
  • PySide6实战:手把手教你用SQLite+QTableView打造个人数据管理工具(附源码)
  • 3分钟终极指南:qmcdump轻松解锁QQ音乐加密文件,实现音乐自由播放
  • 5分钟搞定AI文本生成:oobabooga一键安装完全指南
  • 终极指南:如何用markdownReader插件彻底改变你的Markdown阅读体验
  • 集团首都公报:继美国谷歌公司、苹果公司之后,世界第三家手机控制系统公司(即     武汉市放飞炬人控制系统有限公司)今天2026年5月6日9点36分获得官方批准。
  • 昆山老房翻新装修公司哪家靠谱?2026年口碑推荐与避坑指南 - 速递信息
  • AI Agent团队数字档案库:用工程化方法管理角色人格与长期记忆
  • 大语言模型结构化剪枝实战:基于LLM-Pruner的模型压缩与部署优化
  • Windows热键冲突终极指南:三步快速定位被占用的快捷键
  • XnConvert v1.111.0 图像格式转换调整
  • 如何在XSLT中将动态字段值(如姓名)安全注入HTML链接的URL参数
  • HTML怎么标注回收估价规则_HTML估价逻辑说明折叠区【指南】
  • Install-TidGi-Windows-x64安装步骤详解(附TidGi知识库搭建教程)
  • 2026年昆山装修公司全包价格性价比最高排行榜推荐与避坑指南 - 速递信息
  • 中国词元:构建自主AI生态的“云-端“协同战略
  • AI_Agent记忆系统设计与实现
  • JavaScript中Object-getOwnPropertySymbols获取方法
  • 别再死记硬背三环了!用Arduino+伺服电机做个机械臂,实战理解位置、速度、力矩模式
  • 血清替代物(人血小板裂解液)从工艺到细胞扩增性能替代FBS的可行性分析
  • 从硬件到解决方案:2026年全球人形机器人及智能机器狗二次开发服务商全景解析 - 速递信息
  • WarcraftHelper:魔兽争霸3终极兼容性修复指南,让经典游戏在现代电脑流畅运行
  • 利用Taotoken多模型聚合能力为AIGC应用动态选择最佳性价比模型
  • RAG系统优化实战
  • Linux 自由诱惑大,但别冲动,切换前自问这5个问题
  • 2026郑州装修公司全包价格性价比最高排名推荐与省钱攻略 - 速递信息
  • SPSSAU文本分析新手入门:从数据上传到生成第一个词云图的全流程指南
  • 论文解读:生成式智能体让25个AI小人自己组织了一场情人节派对