SageMaker Pipelines与MLflow协同实现大模型实验工程化
1. 项目概述:当大模型实验撞上工程化瓶颈,我们到底在解决什么问题?
“Scaling LLM Experimentation with SageMaker Pipelines and MLflow”——这个标题不是一句技术口号,而是我在过去18个月里每天早上打开监控面板时看到的真实压力源。它直指当前大模型研发中最隐蔽、也最消耗团队战力的痛点:实验爆炸(experiment explosion)。你可能刚跑完一个LoRA微调任务,发现学习率0.0003比0.00025效果高0.8个点;转头又想试试Qwen-1.5B换掉Llama-3-8B基座模型,再叠加数据清洗策略A和B的组合;接着同事发来PR,说他用新的tokenization方式把上下文长度撑到了32K……这些都不是孤立动作,而是一张指数级增长的实验网。我带的团队去年平均每人每周启动47次训练作业,但其中只有不到12%的实验结果被真正记录、复现或用于决策。其余的,要么参数配置散落在Slack消息里,要么checkpoint文件名是“model_v3_final_really_final_20240521.pth”,要么连用的是哪个commit hash都查不到。
这就是SageMaker Pipelines + MLflow组合要切的硬骨头:把LLM实验从“手工作坊式试错”,变成可版本化、可审计、可回滚、可协作的软件工程实践。它不承诺让你的模型指标一夜暴涨,但它能确保当你在季度复盘会上被问到“上次那个提升2.3%的RAG优化方案,到底是哪次实验跑出来的?用的什么数据切片?谁改的prompt模板?”时,你能直接点开MLflow UI,3秒内给出带完整血缘图谱的答案。关键词很明确:SageMaker Pipelines(定义、调度、编排端到端ML工作流)、MLflow(跟踪实验、管理模型、部署服务)、Scaling(不是单次跑得快,而是支撑百人团队、千级并发实验、万次版本迭代的可持续能力)。它面向的不是单打独斗的研究员,而是需要交付稳定AI能力的产品团队、MLOps工程师、以及被实验噪音淹没的算法负责人。如果你还在用Jupyter Notebook+本地wandb+手动scp模型文件的方式管理LLM实验,这篇就是为你写的实操手册——不是概念科普,而是我把踩过的坑、调过的参数、写废的5版pipeline定义脚本,全摊开给你看。
2. 整体架构设计与核心选型逻辑:为什么是Pipeline+MLflow,而不是其他组合?
2.1 不是所有“自动化”都能解决LLM实验的规模化问题
先说结论:单纯用Airflow调度训练脚本,或者只用DVC管理数据版本,甚至只靠Weights & Biases做指标跟踪,都无法独立承载LLM实验规模化的真实需求。我见过太多团队在工具选型上走弯路,最后不是卡在某个环节,就是各系统之间数据孤岛严重。比如,用Airflow调度SageMaker训练任务,它确实能跑起来,但Airflow本身不理解“模型”是什么——它不会自动捕获训练输出的model.tar.gz路径,不会关联这次训练用的MLflow Experiment ID,更不会把评估报告生成为可交互的HTML artifact。结果就是:调度是自动的,但实验元数据是割裂的,你依然得人工去S3找日志、去MLflow查指标、去ECR确认镜像版本,三者之间没有自动链接。
SageMaker Pipelines的核心价值,在于它原生嵌入了AWS ML生态的血缘感知能力。当你定义一个TrainingStep时,Pipeline不只是提交一个训练任务,它会自动将该步骤的输入数据URI、输出模型URI、超参字典、甚至SageMaker Training Job的ARN,全部作为结构化元数据注入到Pipeline Execution的Execution Graph中。这个Graph不是静态快照,而是动态可查询的——你可以用boto3.client('sagemaker').list_pipeline_execution_steps()实时获取任意一次执行中每个步骤的状态、输入输出、耗时、失败原因。这解决了“实验过程不可见”的问题。
而MLflow的价值,则在于它补上了Pipeline缺失的语义层。SageMaker知道“这个步骤输出了一个模型”,但不知道“这个模型是针对金融客服场景微调的Qwen-1.5B,使用了包含127个拒答样本的对抗数据集,评估时在Banking77测试集上F1达到89.2%”。MLflow的mlflow.start_run()会把所有这些业务语义信息,连同指标、参数、代码版本、artifact(如tokenizer.json、eval_report.json),打包成一个逻辑完整的Run。更重要的是,MLflow Tracking Server可以部署在VPC内,支持细粒度RBAC权限控制,这满足了企业对实验数据合规性的硬性要求——不是所有团队都愿意把模型评估指标上传到公网SaaS服务。
所以,Pipeline + MLflow不是简单拼凑,而是分工明确的协同体:Pipeline负责“物理流程的确定性执行与可观测性”,MLflow负责“逻辑实验的语义化表达与可追溯性”。它们通过一个轻量级胶水层(即Pipeline中的ProcessingStep或TrainingStep内嵌的MLflow client调用)连接。这个设计规避了两个常见陷阱:一是避免了用Lambda函数做异步回调带来的状态同步复杂度;二是绕开了在SageMaker Training Image中强行集成Airflow Client导致的镜像臃肿和升级困难。
2.2 为什么不用SageMaker内置的Model Registry?MLflow Model Registry有何不可替代性?
这里必须澄清一个高频误解:SageMaker有Model Registry,为什么还要用MLflow的?答案是:Registry的定位不同,解决的问题域也不同。
SageMaker Model Registry本质是一个模型部署就绪中心(Deployment-Ready Hub)。它的核心字段是ModelPackageGroupName、ModelApprovalStatus(Approved/Pending/Rejected)、InferenceSpecification(容器镜像、启动命令)。它假设你已经有一个经过充分验证、符合上线标准的模型,现在要把它推送到生产环境。它的强项是与SageMaker Endpoint、SageMaker Projects深度集成,一键部署、A/B测试、影子测试都极其顺畅。
而MLflow Model Registry,是一个实验成果沉淀中心(Experiment Outcome Repository)。它的核心字段是Model Name、Version、Stage(None/Staging/Production/Archived)、Run ID(反向链接到完整实验记录)。它的设计哲学是:“任何一次成功的实验,无论是否上线,都值得被命名、被归档、被比较”。举个真实案例:我们曾为同一份客服对话数据,同时运行了三个实验分支——Branch A用LoRA微调,Branch B用QLoRA量化微调,Branch C用Adapter模块。三者在验证集上指标接近(88.1% vs 87.9% vs 88.3%),但推理延迟差异巨大(120ms vs 45ms vs 85ms)。SageMaker Model Registry只会收录最终上线的那个(比如Branch B),而MLflow Registry则把三个都存为customer-service-qwen-lora、customer-service-qwen-qlora、customer-service-qwen-adapter三个Model,并标记为Staging。当我们后续发现某类长尾问题(如多轮转账确认)在Branch B上表现更差时,能立刻切回Branch A的v3版本做对比分析,而无需重新训练——因为v3的所有输入数据、代码、评估报告,都通过Run ID牢牢绑定。
更关键的是,MLflow Registry支持跨平台模型格式。我们的部分实验用PyTorch Lightning,部分用Hugging Face Transformers,还有少量用DeepSpeed。SageMaker Model Registry要求模型必须打包为model.tar.gz并符合特定目录结构(code/、model/),而MLflow则原生支持mlflow.pytorch、mlflow.transformers、mlflow.huggingface等flavor,自动处理序列化/反序列化逻辑。这意味着,同一个mlflow.log_model()调用,在不同框架下生成的artifact,都能被统一注册、统一加载、统一服务化。这种抽象层,是SageMaker原生Registry目前不具备的。
2.3 架构全景图:数据流、控制流与元数据流如何交织?
整个系统的数据流向,可以用三个平行但交织的“流”来理解:
数据流(Data Flow):原始数据(S3://my-bucket/raw-data/) → Pipeline ProcessingStep(清洗、分词、构造instruction格式)→ 输出为S3://my-bucket/processed-data/{run_id}/ → TrainingStep读取该路径进行训练 → 输出模型至S3://my-bucket/models/{pipeline_exec_id}/{step_name}/。
控制流(Control Flow):开发者提交Pipeline Definition(JSON/YAML)→ SageMaker Pipelines Service解析依赖关系 → 按DAG顺序触发各Step(Processing/Training/Transform)→ 每个Step内部通过
boto3.client('sagemaker')调用对应API → 执行完成返回状态与输出URI。元数据流(Metadata Flow):这是最容易被忽视,却最核心的一环。它由两部分构成:
- Pipeline元数据:由SageMaker自动生成,包括
PipelineExecutionArn、StepName、StartTime、EndTime、InputParameters(传入Pipeline的参数,如--base-model-id)、OutputParameters(步骤输出,如--model-uri)。这些可通过describe_pipeline_execution()API实时查询。 - MLflow元数据:由Pipeline Step内嵌的Python代码显式记录,包括
mlflow.set_experiment("llm-finetuning")、mlflow.log_param("lora_r", 8)、mlflow.log_metric("eval_f1", 0.883)、mlflow.log_artifact("eval_report.json")、mlflow.pytorch.log_model(model, "model")。这些数据写入MLflow Tracking Server(我们部署在EKS上,后端PostgreSQL)。
- Pipeline元数据:由SageMaker自动生成,包括
这两股元数据流的交汇点,就是Run ID的双向绑定。我们在Pipeline的每个Step开始时,执行:
import mlflow mlflow.set_tracking_uri("http://mlflow-svc.mlflow.svc.cluster.local:5000") mlflow.set_experiment("llm-finetuning") # 关键:用Pipeline Execution ID + Step Name 生成唯一Run ID run_id = f"{pipeline_execution_id}_{step_name}" mlflow.start_run(run_id=run_id)这样,当我们在MLflow UI中查看这个Run时,就能在Tags里看到pipeline_execution_arn: arn:aws:sagemaker:us-east-1:123456789012:pipeline-execution/abc123。反过来,在SageMaker Console的Pipeline Execution详情页,我们也在Step的OutputParameters里显式写入mlflow_run_id: abc123_train_step。这种双向锚定,让“从Pipeline跳转到MLflow”和“从MLflow跳转回Pipeline”成为可能,彻底打通了工程链路与实验链路。
提示:不要依赖MLflow自动生成的随机Run ID。在Pipeline环境中,必须显式传入
run_id参数。否则,当Step因OOM重试时,会创建多个Run,导致指标混乱。我们曾因此误判过一次学习率衰减策略的效果,花了两天才定位到是重试产生的幽灵Run污染了平均值。
3. 核心细节解析与实操要点:从零搭建可复现的LLM实验流水线
3.1 环境准备:最小可行镜像与依赖治理
很多团队一上来就想构建一个“全能”镜像,把PyTorch、Transformers、DeepSpeed、FlashAttention、vLLM全塞进去。结果是镜像体积超过8GB,每次Pipeline更新都要等待15分钟拉取,CI/CD流水线频繁超时。我的经验是:为不同类型的LLM实验,定义专用精简镜像。我们目前维护三个核心镜像:
| 镜像名称 | 基础镜像 | 核心依赖 | 典型用途 | 镜像大小 |
|---|---|---|---|---|
llm-train-pytorch | pytorch/pytorch:2.1.0-cuda11.8-cudnn8-runtime | transformers==4.38.2,datasets==2.16.1,peft==0.8.2 | LoRA/QLoRA微调 | 3.2GB |
llm-infer-vllm | vllm/vllm-cu118:0.3.2 | mlflow==2.10.1,boto3==1.28.59 | 批量推理、评估 | 2.1GB |
llm-eval-metrics | python:3.11-slim-bookworm | scikit-learn==1.3.0,evaluate==0.4.0,mlflow==2.10.1 | 独立评估脚本 | 840MB |
关键操作细节:
- 基础镜像选择:严格匹配SageMaker Training Instance的CUDA版本。例如,
ml.p4d.24xlarge实例预装CUDA 11.8,那么你的PyTorch镜像就必须是cuda11.8版本。我们曾因使用cuda12.1镜像导致torch.cuda.is_available()返回False,错误日志里只有一行Failed to load library: libcudnn.so.8,排查了6小时才发现是CUDA小版本不匹配。 - 依赖版本锁定:
requirements.txt中必须使用==精确指定版本,禁用>=。特别是transformers和datasets,小版本升级常带来tokenization行为变更。例如,transformers==4.37.0和4.38.0对<|eot_id|>特殊token的处理逻辑不同,会导致同一份prompt在不同版本下生成结果不一致。我们在requirements.txt顶部加注释:# DO NOT UPGRADE: Pinning critical for reproducibility。 - 镜像构建优化:使用多阶段构建(Multi-stage Build)分离构建依赖与运行时依赖。例如,在
llm-train-pytorch中,第一阶段安装flash-attn需要ninja、cmake等编译工具,第二阶段只COPY编译好的.so文件和pip install的纯Python包。这使最终镜像体积减少40%,且无编译工具残留,提升安全性。
注意:SageMaker Training Job默认以
root用户运行,但MLflow Tracking Server通常要求非root用户访问。因此,在Dockerfile中必须添加USER 1001指令,并确保/opt/ml目录权限对UID 1001可写。否则,mlflow.log_artifact()会因权限拒绝而静默失败,日志里只显示Permission denied: /opt/ml/output/artifacts,非常难排查。
3.2 Pipeline定义:用Python SDK而非YAML,掌控每一个执行细节
SageMaker Pipelines支持两种定义方式:Python SDK(推荐)和YAML DSL。我强烈建议全程使用Python SDK,原因有三:一是YAML无法表达动态逻辑(如根据数据集大小自动调整per_device_train_batch_size);二是Python SDK的类型提示(type hints)能提前捕获参数错误(如把str类型的instance_type误传为int);三是调试体验天壤之别——你可以在本地用pipeline.upsert()前,打印出完整的pipeline.definition()JSON,逐行检查DAG结构。
一个典型的LLM微调Pipeline定义,核心骨架如下:
from sagemaker.workflow.pipeline import Pipeline from sagemaker.workflow.steps import TrainingStep, ProcessingStep from sagemaker.workflow.parameters import ParameterString, ParameterInteger from sagemaker.sklearn.processing import SKLearnProcessor from sagemaker.pytorch import PyTorch # 1. 定义参数(所有可变输入) base_model_id = ParameterString(name="BaseModelId", default_value="Qwen/Qwen1.5-1.8B") dataset_version = ParameterString(name="DatasetVersion", default_value="v20240501") lora_r = ParameterInteger(name="LoraR", default_value=8) max_steps = ParameterInteger(name="MaxSteps", default_value=1000) # 2. 数据预处理Step sklearn_processor = SKLearnProcessor( framework_version="1.0-1", role=role, instance_type="ml.m5.xlarge", instance_count=1, env={"MLFLOW_TRACKING_URI": "http://mlflow-svc.mlflow.svc.cluster.local:5000"} ) processing_step = ProcessingStep( name="PreprocessData", processor=sklearn_processor, inputs=[ ProcessingInput(source=f"s3://my-bucket/raw-data/{dataset_version}/", destination="/opt/ml/processing/input/"), ], outputs=[ ProcessingOutput(output_name="train_data", source="/opt/ml/processing/output/train/", destination=f"s3://my-bucket/processed-data/{dataset_version}/train/"), ProcessingOutput(output_name="eval_data", source="/opt/ml/processing/output/eval/", destination=f"s3://my-bucket/processed-data/{dataset_version}/eval/"), ], code="preprocess.py" # 该脚本内会调用mlflow.start_run() ) # 3. 训练Step(核心) estimator = PyTorch( entry_point="train.py", source_dir="src/", role=role, instance_count=1, instance_type="ml.g5.12xlarge", py_version="py311", framework_version="2.1.0", hyperparameters={ "model_id": base_model_id, "lora_r": lora_r, "max_steps": max_steps, "mlflow_tracking_uri": "http://mlflow-svc.mlflow.svc.cluster.local:5000" } ) training_step = TrainingStep( name="TrainLLM", estimator=estimator, inputs={ "train": TrainingInput(s3_data=processing_step.properties.Outputs["train_data"].S3OutputLocation), "eval": TrainingInput(s3_data=processing_step.properties.Outputs["eval_data"].S3OutputLocation), } ) # 4. 组装Pipeline pipeline = Pipeline( name="LLM-Finetuning-Pipeline", parameters=[base_model_id, dataset_version, lora_r, max_steps], steps=[processing_step, training_step], # 关键:启用Pipeline Execution日志的详细级别 configuration={"LogLevel": "All"} )这里有几个魔鬼细节:
ProcessingOutput的destination必须是完整S3路径,且以/结尾。如果写成s3://bucket/processed-data/{version}/train(无尾部斜杠),SageMaker会把整个train/目录当成一个文件名,导致下游TrainingStep读取时路径拼接错误。TrainingStep的inputs字典key(如"train")会自动映射为训练脚本的--train命令行参数。因此,你的train.py必须能接收--train参数,并将其值(即S3 URI)传递给datasets.load_from_disk()或类似方法。我们曾因参数名不匹配,导致训练脚本始终读取默认路径,浪费了3次p4d实例的费用。configuration={"LogLevel": "All"}是调试神器。默认日志级别是Error,很多Step失败时只显示Failed,没有堆栈。开启All后,CloudWatch Logs中会输出完整的boto3调用请求/响应,能快速定位是IAM权限不足、S3路径不存在,还是网络策略阻断。
3.3 MLflow集成:不止是log_param,而是构建实验DNA
在Pipeline Step中集成MLflow,绝不是简单地在train.py开头加mlflow.start_run()。真正的价值在于,把实验的每一个原子要素,都转化为可查询、可比较、可复现的结构化数据。我们定义了一套强制性的MLflow Logging规范,所有团队成员必须遵守:
3.3.1 必须记录的5类核心元数据
| 类别 | 记录方式 | 示例 | 为什么重要 |
|---|---|---|---|
| 代码快照 | mlflow.log_artifact(".git") | 将整个.git目录作为artifact上传 | 精确还原训练时的代码状态,比git commit hash更可靠(包含未提交的临时修改) |
| 数据指纹 | mlflow.log_dict(data_fingerprint, "data_fingerprint.json") | {"train_rows": 12450, "eval_rows": 1245, "hash": "a1b2c3..."} | 避免“数据漂移”误判。当指标下降时,先查data_fingerprint是否变更,再查模型 |
| 硬件配置 | mlflow.log_dict(hardware_info, "hardware.json") | {"instance_type": "g5.12xlarge", "gpu_count": 4, "cuda_version": "11.8"} | GPU型号不同可能导致数值精度差异(如A10G的FP16与A100的TF32),影响结果可比性 |
| 训练轨迹 | mlflow.log_metric("train_loss", value, step=step) | 每10步记录一次loss,step参数必填 | 支持在MLflow UI中绘制平滑的loss曲线,step是X轴,没有它曲线就是一堆离散点 |
| 模型卡片 | mlflow.log_text(model_card, "model_card.md") | 包含模型用途、限制、偏见声明、测试集表现的Markdown | 满足内部AI治理要求,也是新成员快速理解模型的入口 |
3.3.2 模型序列化的最佳实践
对于Hugging Face模型,我们弃用model.save_pretrained(),而采用mlflow.transformers.log_model():
from transformers import AutoModelForCausalLM, AutoTokenizer import mlflow.transformers model = AutoModelForCausalLM.from_pretrained(base_model_id) tokenizer = AutoTokenizer.from_pretrained(base_model_id) # 关键:传入tokenizer,mlflow会自动保存其配置 mlflow.transformers.log_model( transformers_model={ "model": model, "tokenizer": tokenizer, "task": "text-generation" }, artifact_path="model", # 这个参数至关重要:它告诉mlflow,加载时要用transformers_pipeline # 而不是简单的torch.load(),从而保证tokenizer和model的兼容性 signature=mlflow.models.infer_signature( model_input=tokenizer("Hello", return_tensors="pt"), model_output=model.generate(**tokenizer("Hello", return_tensors="pt"), max_new_tokens=10) ) )这样做的好处是,后续用mlflow.pyfunc.load_model()加载时,会自动构建一个transformers.pipeline对象,你只需调用predict({"inputs": "Hello"}),无需关心tokenizer的pad_token_id、eos_token_id等细节。我们曾因手动保存model.bin和config.json,导致加载时tokenizer.pad_token_id为None,引发generate()报错,排查了整整一天。
实操心得:在
train.py末尾,务必添加mlflow.end_run()。我们曾因忘记这行,在Pipeline Step重试时,新的Run会继承上一次的active_run,导致所有log_param都写到旧Run里,造成数据污染。现在,我们的模板脚本强制在try...finally块中包裹训练主逻辑,确保end_run()必然执行。
4. 实操过程与核心环节实现:一次端到端的LoRA微调全流程详解
4.1 从零启动:参数配置、数据准备与首次Pipeline提交
假设我们要对Qwen-1.5-1.8B模型,在自有的客服对话数据集上进行LoRA微调。以下是我在终端中实际执行的每一步命令和背后的思考:
第一步:准备数据
# 1. 将原始CSV数据上传到S3 aws s3 cp ./data/customer-dialogs-v20240501.csv s3://my-bucket/raw-data/v20240501/ # 2. 生成数据指纹(使用sha256sum,但仅对内容哈希,排除元数据) # 我们写了一个小脚本data_fingerprint.py,它会: # - 读取CSV,按行排序(消除导出顺序影响) # - 对每一行JSONL格式化(确保空格、引号一致) # - 计算整个文件的sha256 python data_fingerprint.py ./data/customer-dialogs-v20240501.csv # 输出:a1b2c3d4e5f67890...为什么花时间做数据指纹?因为数据集版本管理是LLM实验的基石。我们曾遇到过:算法同学A用v20240401数据训练,同学B用v20240415(新增了200条拒答样本),两人指标对比时发现B高0.5%,但实际是数据差异,而非模型改进。有了指纹,v20240401和v20240415的哈希值不同,一眼就能识别。
第二步:配置Pipeline参数
# 在pipeline_definition.py中,设置参数 base_model_id = ParameterString(name="BaseModelId", default_value="Qwen/Qwen1.5-1.8B") dataset_version = ParameterString(name="DatasetVersion", default_value="v20240501") lora_r = ParameterInteger(name="LoraR", default_value=8) lora_alpha = ParameterInteger(name="LoraAlpha", default_value=16) learning_rate = ParameterFloat(name="LearningRate", default_value=2e-4)参数选择依据:
lora_r=8:基于Qwen-1.5-1.8B的层数(40层)和注意力头数(32),r=8能在参数增量(约0.1%)和性能损失(<0.3% F1)间取得平衡。我们做过网格搜索:r=4时收敛慢,r=16时显存占用接近全参数微调。lora_alpha=16:alpha/r=2是Hugging Face PEFT库的推荐比例,能保持LoRA权重的缩放稳定性。learning_rate=2e-4:不是拍脑袋。我们先用lr_finder在1%数据上跑了100步,观察loss下降拐点,确定1e-4到5e-4是有效区间,最终选中间值2e-4。
第三步:提交Pipeline
# 1. 创建Pipeline(首次) pipeline.upsert(role_arn=role) # 2. 启动一次执行 execution = pipeline.start( parameters={ "BaseModelId": "Qwen/Qwen1.5-1.8B", "DatasetVersion": "v20240501", "LoraR": 8, "LoraAlpha": 16, "LearningRate": 2e-4 } ) # 3. 实时监控(我习惯用这个命令,比Console更直观) watch -n 5 'aws sagemaker describe-pipeline-execution --pipeline-execution-arn '$execution.arn' --query "PipelineExecutionStatus" --output text'watch命令每5秒刷新一次状态,PipelineExecutionStatus会依次显示Executing→Stopping→Stopped。当看到Stopped时,立即去MLflow UI,用v20240501作为Search Filter,应该能看到一个新Run,其params.base_model_id为Qwen/Qwen1.5-1.8B,metrics.eval_f1约为0.852(这是我们预估的基线值)。
4.2 训练Step深度解析:train.py中的关键代码与避坑指南
train.py是整个Pipeline的引擎核心。下面是我实际使用的、经过生产验证的简化版(去除了日志和异常处理):
import os import torch from transformers import ( AutoModelForCausalLM, AutoTokenizer, TrainingArguments, Trainer, DataCollatorForSeq2Seq ) import mlflow import argparse from peft import LoraConfig, get_peft_model def parse_args(): parser = argparse.ArgumentParser() parser.add_argument("--model_id", type=str, required=True) parser.add_argument("--train", type=str, required=True) # S3 URI parser.add_argument("--eval", type=str, required=True) # S3 URI parser.add_argument("--lora_r", type=int, default=8) parser.add_argument("--lora_alpha", type=int, default=16) parser.add_argument("--learning_rate", type=float, default=2e-4) parser.add_argument("--mlflow_tracking_uri", type=str, required=True) return parser.parse_args() def main(): args = parse_args() # 1. 初始化MLflow Run(关键:用Pipeline Execution ID生成唯一ID) pipeline_exec_id = os.getenv("SM_PIPELINE_EXECUTION_ID", "local-test") run_id = f"{pipeline_exec_id}_train" mlflow.set_tracking_uri(args.mlflow_tracking_uri) mlflow.set_experiment("llm-finetuning") mlflow.start_run(run_id=run_id) # 2. 记录所有输入参数 mlflow.log_params({ "model_id": args.model_id, "lora_r": args.lora_r, "lora_alpha": args.lora_alpha, "learning_rate": args.learning_rate, "train_s3_uri": args.train, "eval_s3_uri": args.eval }) # 3. 下载数据(SageMaker自动挂载S3到本地,但需确认路径) # SageMaker Training Job会将S3 URI映射到/opt/ml/input/data/{channel_name}/ # 这里channel_name是TrainingInput的key,即"train"和"eval" train_dataset = load_dataset("json", data_files=f"/opt/ml/input/data/train/train.jsonl") eval_dataset = load_dataset("json", data_files=f"/opt/ml/input/data/eval/eval.jsonl") # 4. 加载模型和tokenizer model = AutoModelForCausalLM.from_pretrained( args.model_id, torch_dtype=torch.bfloat16, # 关键:bfloat16比float16更稳定,尤其对Qwen device_map="auto", # 自动分配到多GPU trust_remote_code=True # Qwen需要 ) tokenizer = AutoTokenizer.from_pretrained( args.model_id, trust_remote_code=True, padding_side="left" # 关键:left padding,因为causal LM需要eos在末尾 ) tokenizer.pad_token = tokenizer.eos_token # 必须设置,否则collator报错 # 5. 配置LoRA peft_config = LoraConfig( r=args.lora_r, lora_alpha=args.lora_alpha, target_modules=["q_proj", "k_proj", "v_proj", "o_proj"], # Qwen的注意力层名 lora_dropout=0.05, bias="none", task_type="CAUSAL_LM" ) model = get_peft_model(model, peft_config) # 6. 定义训练参数(重点:梯度检查点和Flash Attention) training_args = TrainingArguments( output_dir="/opt/ml/model", # SageMaker要求模型输出到此路径 per_device_train_batch_size=4, # g5.12xlarge有4个A10G,总batch=16 per_device_eval_batch_size=4, gradient_accumulation_steps=4, # 等效总batch=64,适配Qwen-1.8B learning_rate=args.learning_rate, num_train_epochs=1, # LLM微调通常1 epoch足够 warmup_ratio=0.03, logging_steps=10, save_steps=100, evaluation_strategy="steps", eval_steps=100, save_total_limit=2, load_best_model_at_end=True, metric_for_best_model="eval_f1", greater_is_better=True, report_to="none", # 关闭wandb,只用mlflow # 关键:启用Flash Attention(Qwen官方支持) torch_compile=True, # PyTorch 2.1+的graph mode compile # 关键:梯度检查点,节省显存 fp16=False, # 不用fp16,用bfloat16 bf16=True, gradient_checkpointing=True, # 关键:指定tokenizer,让Trainer自动处理padding pad_to_multiple_of=8, remove_unused_columns=False, ) # 7. 数据整理(将对话转为instruction格式) def format_chat(example): # 将[{"role":"user","content":"..."},{"role":"assistant","content":"..."}]转为 # "<|im_start|>user\n{user}\n<|im_end|><|im_start|>assistant\n{assistant}\n<|im_end|>" messages = example["messages"] formatted = "" for msg in messages: formatted += f"<|im_start|>{msg['role']}\n{msg['content']}\n<|im_end|>" return {"text": formatted} train_dataset = train_dataset.map(format_chat, batched=False) eval_dataset = eval_dataset.map(format_chat, batched=False) # 8. 数据整理器(关键:使用tokenizer的chat template) data_collator = DataCollatorForSeq2Seq( tokenizer=tokenizer, model=model, padding=True, return_tensors="pt" ) # 9. 初始化Trainer trainer = Trainer( model=model, args=training_args, train_dataset=train_dataset, eval_dataset=eval_dataset, tokenizer=tokenizer, data_collator=data_collator, compute_metrics=compute_f1_metric # 自定义F1计算函数 ) #