大模型Lora微调实践(思维链版)
上一篇已经介绍基础版的Lora微调实现过程,这一篇记录如何微调大模型使其输出带思维链。
准备工作
要求大模型输出带思维链,那么就要求微调数据集本身带有思维链,所以需要选择带有思维链的数据集
数据集来自魔搭社区的 medical-o1-reasoning-SFT:
https://modelscope.cn/datasets/AI-ModelScope/medical-o1-reasoning-SFT
数据集格式为:1
2
3
4
5{
"Question": "在发生医疗事故或事件且临床诊断不能明确死亡原因的情况下,患者死亡后应该在多长时间内进行尸检?",
"Complex_CoT": "哎,发生医疗事故的时候,真是比较棘手啊。\n\n如果这个时候患者去世了,但具体的死亡原因不清楚,那真得赶紧弄个尸检看看。\n\n尸检不仅能帮助我们找出死亡原因,还有助于解决医疗事故责任问题,保护家属的权益。\n\n其实,我们国家应该对这种情况有相关规定吧,比如《医疗事故处理条例》之类的。\n\n听说很多法律都建议在患者死亡后尽快进行尸检。\n\n对哦,通常法律上是让在48小时内搞定尸检,这样可以保证结果的准确性。\n\n不过有时候,也可能因为家庭的原因或者特殊法律程序的需求,哪怕规定是48小时,实际操作可能会稍有延迟。\n\n但不管怎样,根据法规还是希望尽量在这个时间线内完成检查,以免耽误事情。\n\n除此之外,如果家属同意的话,也许还可以商量延迟的可能性。\n\n所以说,在出现医疗事故这样的敏感情况下,还是真得按法律走,48小时内完成尸检最为稳妥。",
"Response": "根据中国《医疗事故处理条例》的相关规定,在发生医疗事故或事件并且无法明确死亡原因的情况下,通常建议在患者死亡后的48小时内进行尸检。这个时间限制主要是为了保证尸检结果的准确性,因为在过长的时间后,尸体组织可能发生变化,影响结果的精确性。\n\n不过,确切的时间可能会因为个案的特殊性而略有不同,比如遇到复杂的法律程序或家庭方面的考虑。通常在这种情况下,会在充分沟通的基础上,尽力在法律规定的时间内达成一致。\n\n所以,一般情况下,48小时内进行尸检是一个比较合理和法律支持的时间框架。"
}
本实验环境:
GPU 显存 >= 8GB
pytorch==2.5.0+cu118
transformers==4.47.1
peft==0.14.0
参考资料:
https://mp.weixin.qq.com/s/hOgeu6EPbuaQgVHyjij-kg
数据预处理
1 | from datasets import load_dataset |
其实这里依然采用的指令微调数据集格式,只不过把cot+answer拼在一起作为输出。
在训练时通过”详细分析“和”答案“这两个标记来区分思考过程和实际答案部分,那么微调后的模型在输出时就会分为这两个部分。
模型训练
这里与上一篇略有不同的地方在于,使用transformer的Trainer类,可以省去很多自己写的代码。
1 | import os |
不过Trainer类型好像会自动把数据分发到其他卡,当在多卡机器上单卡微调时,目前发现只能通过环境变量来屏蔽其他的卡;否则虽然可以控制模型只加载到某张卡,但是Trainer好像还是会把数据分发到其他卡上,导致报错。
但是如果想要实现DDP分布式训练则方便许多,只需要将上述代码中涉及device设备指定的地方全部去除,并且也不要通过环境变量来隐藏可见显卡设备。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104# LoRA配置
peft_config = LoraConfig(
r=8,
lora_alpha=32,
target_modules=["q_proj", "k_proj", "v_proj", "o_proj", "gate_proj", "up_proj", "down_proj"],
lora_dropout=0.05,
bias="none",
task_type="CAUSAL_LM"
)
# 训练参数配置
training_args = TrainingArguments(
output_dir=output_path,
per_device_train_batch_size=1, # 8G显存上无法设置大于1
gradient_accumulation_steps=8, # 累计梯度相当于batch_size=per_device_train_batch_size*gradient_accumulation_steps
num_train_epochs=5, # 5个轮次
learning_rate=3e-4,
fp16=True, # 开启混合精度
logging_steps=20,
save_strategy="no",
report_to="none",
optim="adamw_torch",
no_cuda=False, # 强制使用CUDA
dataloader_pin_memory=False, # 加速数据加载
remove_unused_columns=False, # 防止删除未使用的列
label_names=["labels"], # 手动指定标签字段名,根据分布式训练时的警告提示而设定的
ddp_find_unused_parameters=False # 明确设置为 False,根据分布式训练时的警告提示而设定的
)
def main():
os.makedirs(output_path, exist_ok=True)
tokenizer = AutoTokenizer.from_pretrained(model_path)
tokenizer.pad_token = tokenizer.eos_token
# 加载模型 device_map='auto' 只适合推理,不适合Trainer训练
model = AutoModelForCausalLM.from_pretrained(model_path, torch_dtype=torch.float16)
model = get_peft_model(model, peft_config)
model.print_trainable_parameters()
# 准备数据
dataset = process_data(tokenizer)
# 训练回调
loss_callback = LossCallback()
# 数据加载器
def data_collator(data):
# batch = {
# "input_ids": torch.stack([torch.tensor(d["input_ids"]) for d in data]).to(device),
# "attention_mask": torch.stack([torch.tensor(d["attention_mask"]) for d in data]).to(device),
# "labels": torch.stack([torch.tensor(d["labels"]) for d in data]).to(device) # 使用input_ids作为labels
# }
# return batch
# 当设置batch_size > 1,防止每条数据的长度不一致
input_ids = [torch.tensor(d["input_ids"]) for d in data]
attention_mask = [torch.tensor(d["attention_mask"]) for d in data]
labels = [torch.tensor(d["labels"]) for d in data]
# 对 input_ids 进行填充
input_ids = pad_sequence(input_ids, batch_first=True, padding_value=tokenizer.pad_token_id) # .to(device)
# 对 attention_mask 进行填充
attention_mask = pad_sequence(attention_mask, batch_first=True, padding_value=0) # .to(device)
# 对 labels 进行填充
labels = pad_sequence(labels, batch_first=True, padding_value=-100) # .to(device)
batch = {
"input_ids": input_ids,
"attention_mask": attention_mask,
"labels": labels
}
return batch
# 创建Trainer
trainer = Trainer(
model=model,
args=training_args,
train_dataset=dataset,
data_collator=data_collator,
callbacks=[loss_callback]
)
# 开始训练
print("开始训练...")
# 修改 Trainer 的模型、优化器和数据加载器
trainer.train()
# 保存最终模型
trainer.model.save_pretrained(output_path)
print(f"模型已保存至:{output_path}")
# 绘制训练集损失Loss曲线
plt.figure(figsize=(10, 6))
plt.plot(loss_callback.losses)
plt.title("Training Loss Curve")
plt.xlabel("Steps")
plt.ylabel("Loss")
plt.savefig(os.path.join(output_path, "loss_curve.png"))
print("Loss曲线已保存")
if __name__ == "__main__":
main()
然后直接使用torchrun
命令启动即可:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
同样的也可以使用accelerate实现DDP、FSDP策略训练
```python
def main():
os.makedirs(output_path, exist_ok=True)
tokenizer = AutoTokenizer.from_pretrained(model_path)
tokenizer.pad_token = tokenizer.eos_token
# 初始化 Accelerator
accelerator = Accelerator()
# 加载模型
model = AutoModelForCausalLM.from_pretrained(model_path, torch_dtype=torch.bfloat16)
model = get_peft_model(model, peft_config)
model.print_trainable_parameters()
# model = model.to(device)
# 准备数据
dataset = process_data(tokenizer)
# 训练回调
loss_callback = LossCallback()
# 数据加载器
def data_collator(data):
# batch = {
# "input_ids": torch.stack([torch.tensor(d["input_ids"]) for d in data]).to(device),
# "attention_mask": torch.stack([torch.tensor(d["attention_mask"]) for d in data]).to(device),
# "labels": torch.stack([torch.tensor(d["labels"]) for d in data]).to(device) # 使用input_ids作为labels
# }
# return batch
# 当设置batch_size > 1,防止每条数据的长度不一致
input_ids = [torch.tensor(d["input_ids"]) for d in data]
attention_mask = [torch.tensor(d["attention_mask"]) for d in data]
labels = [torch.tensor(d["labels"]) for d in data]
# 对 input_ids 进行填充
input_ids = pad_sequence(input_ids, batch_first=True, padding_value=tokenizer.pad_token_id) # .to(device)
# 对 attention_mask 进行填充
attention_mask = pad_sequence(attention_mask, batch_first=True, padding_value=0) # .to(device)
# 对 labels 进行填充
labels = pad_sequence(labels, batch_first=True, padding_value=-100) # .to(device)
batch = {
"input_ids": input_ids,
"attention_mask": attention_mask,
"labels": labels
}
return batch
# 创建Trainer
trainer = Trainer(
model=model,
args=training_args,
train_dataset=dataset,
data_collator=data_collator,
callbacks=[loss_callback]
)
# 使用 Accelerator 准备模型、优化器和数据加载器
model, optimizer, train_dataloader = accelerator.prepare(
trainer.model,
trainer.optimizer,
trainer.get_train_dataloader()
)
# 开始训练
print("开始训练...")
# 修改 Trainer 的模型、优化器和数据加载器
trainer.model = model
trainer.optimizer = optimizer
trainer.get_train_dataloader = lambda: train_dataloader
trainer.train()
# 保存最终模型
trainer.model.save_pretrained(output_path)
print(f"模型已保存至:{output_path}")
# 绘制训练集损失Loss曲线
plt.figure(figsize=(10, 6))
plt.plot(loss_callback.losses)
plt.title("Training Loss Curve")
plt.xlabel("Steps")
plt.ylabel("Loss")
plt.savefig(os.path.join(output_path, "loss_curve.png"))
print("Loss曲线已保存")
DDP,accelerate launch --config_file ddp_config.yaml model_parallelism.py
FSDP,accelerate launch --config_file fsdp_config.yaml model_parallelism.py
同样为了测试DDP、FSDP策略确实生效,进行了以下对比测试
1.5B模型,上下文长度-1k,精度bf16
torchrun
训练需要20分钟,单卡占用显存11G+
ddp
训练需要20分钟,单卡占用显存11G+
fsdp
训练需要6小时+,单卡占用显存11G+
7B模型 上下文长度-1k,精度bf16torchrun
显存不够ddp
显存不够fsdp
显存也不够
7B模型 上下文长度-512,精度bf16torchrun
显存不够ddp
显存不够fsdp
训练需要28小时+,单卡占用显存23G+
FSDP训练速度太慢了,除非单卡显存无法满足的情况下,否则不要使用fsdp
。
One More Thing
除了使用上一篇和这一篇的代码进行大模型Lora微调,还可以使用更为成熟和完善的LLamaFactory
进行大模型微调,其也全面支持分布式训练:
https://llamafactory.readthedocs.io/zh-cn/latest/advanced/distributed.html
而关于模型的部署和推理优化可以学习以下两个库:
vLLM:https://www.llamafactory.cn/vllm/