大模型Lora微调实践(思维链版)

大模型Lora微调实践(思维链版)

上一篇已经介绍基础版的Lora微调实现过程,这一篇记录如何微调大模型使其输出带思维链。

准备工作

要求大模型输出带思维链,那么就要求微调数据集本身带有思维链,所以需要选择带有思维链的数据集

数据集来自魔搭社区的 medical-o1-reasoning-SFT:

https://modelscope.cn/datasets/AI-ModelScope/medical-o1-reasoning-SFT

数据集格式为:

1
2
3
4
5
{
"Question": "在发生医疗事故或事件且临床诊断不能明确死亡原因的情况下,患者死亡后应该在多长时间内进行尸检?",
"Complex_CoT": "哎,发生医疗事故的时候,真是比较棘手啊。\n\n如果这个时候患者去世了,但具体的死亡原因不清楚,那真得赶紧弄个尸检看看。\n\n尸检不仅能帮助我们找出死亡原因,还有助于解决医疗事故责任问题,保护家属的权益。\n\n其实,我们国家应该对这种情况有相关规定吧,比如《医疗事故处理条例》之类的。\n\n听说很多法律都建议在患者死亡后尽快进行尸检。\n\n对哦,通常法律上是让在48小时内搞定尸检,这样可以保证结果的准确性。\n\n不过有时候,也可能因为家庭的原因或者特殊法律程序的需求,哪怕规定是48小时,实际操作可能会稍有延迟。\n\n但不管怎样,根据法规还是希望尽量在这个时间线内完成检查,以免耽误事情。\n\n除此之外,如果家属同意的话,也许还可以商量延迟的可能性。\n\n所以说,在出现医疗事故这样的敏感情况下,还是真得按法律走,48小时内完成尸检最为稳妥。",
"Response": "根据中国《医疗事故处理条例》的相关规定,在发生医疗事故或事件并且无法明确死亡原因的情况下,通常建议在患者死亡后的48小时内进行尸检。这个时间限制主要是为了保证尸检结果的准确性,因为在过长的时间后,尸体组织可能发生变化,影响结果的精确性。\n\n不过,确切的时间可能会因为个案的特殊性而略有不同,比如遇到复杂的法律程序或家庭方面的考虑。通常在这种情况下,会在充分沟通的基础上,尽力在法律规定的时间内达成一致。\n\n所以,一般情况下,48小时内进行尸检是一个比较合理和法律支持的时间框架。"
}

本实验环境:

GPU 显存 >= 8GB

pytorch==2.5.0+cu118

transformers==4.47.1

peft==0.14.0

参考资料:

https://mp.weixin.qq.com/s/hOgeu6EPbuaQgVHyjij-kg

数据预处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
from datasets import load_dataset


data_path = "../modelscope/medical-o1-reasoning-SFT/medical_o1_sft_Chinese.json"

def process_data(tokenizer):
# 筛选1万条微调,总量24772条
dataset = load_dataset("json", data_files=data_path, split="train[:10000]")
max_source_length = 256
max_cot_length = 512
max_target_length = 256

def format_example(example):
# 利用标准对话格式进行生成
messages = [
{"role": "system", "content": "你是一个医疗方面的专家,可以根据患者的问题进行解答。"},
{"role": "user", "content": example['Question']}
]
prompt = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
# 对prompt 进行 tokenize
instruction = tokenizer(prompt, add_special_tokens=False, max_length=max_source_length)
# 对 cot 和 answer 进行 tokenize
cot_tokens = tokenizer(f"详细分析:{example['Complex_CoT']}", add_special_tokens=False, max_length=max_cot_length,
trust_remote_code=True)
answer_tokens = tokenizer(f"答案:{example['Response']}", add_special_tokens=False, max_length=max_target_length,
trust_remote_code=True)
# 输入是整个对话内容
input_ids = (instruction["input_ids"] + cot_tokens["input_ids"] +
answer_tokens["input_ids"] + [tokenizer.pad_token_id])
attention_mask = instruction["input_ids"] + cot_tokens["input_ids"] + answer_tokens["input_ids"] + [1]

# 输出部分,忽略 question部分的损失计算,计算cot 和 answer 部分的损失
labels = ([-100] * len(instruction["input_ids"]) + cot_tokens["input_ids"] +
answer_tokens["input_ids"] + [tokenizer.pad_token_id])
return {
"input_ids": input_ids,
"attention_mask": attention_mask,
"labels": labels
}

return dataset.map(format_example, remove_columns=dataset.column_names)

其实这里依然采用的指令微调数据集格式,只不过把cot+answer拼在一起作为输出。
在训练时通过”详细分析“和”答案“这两个标记来区分思考过程和实际答案部分,那么微调后的模型在输出时就会分为这两个部分。

模型训练

这里与上一篇略有不同的地方在于,使用transformer的Trainer类,可以省去很多自己写的代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
import os
# 这个代码不知为啥,在Linux上运行时,会自动占用0号卡显存和算力,所以通过环境变量限制(一定要放在torch导入之前)
os.environ["CUDA_VISIBLE_DEVICES"] = '0'
import pdb
import torch
import matplotlib.pyplot as plt
from transformers import (
AutoTokenizer,
AutoModelForCausalLM,
TrainingArguments,
Trainer,
TrainerCallback
)
from peft import LoraConfig, get_peft_model
from datasets import load_dataset
from torch.nn.utils.rnn import pad_sequence


# 配置路径
model_path = "../modelscope/Qwen/Qwen2.5-1.5B-Instruct"
data_path = "../modelscope/medical-o1-reasoning-SFT/medical_o1_sft_Chinese.json"
output_path = "./output/"

# 强制使用GPU
assert torch.cuda.is_available()
# 当使用环境变量限制只有3号卡可见时,这里的0就代表3号卡
device = torch.device("cuda:0")


class LossCallback(TrainerCallback):
def __init__(self):
self.losses = []

def on_log(self, args, state, control, logs=None, **kwargs):
if "loss" in logs:
self.losses.append(logs["loss"])


# LoRA配置
peft_config = LoraConfig(
r=8,
lora_alpha=32,
target_modules=["q_proj", "k_proj", "v_proj", "o_proj", "gate_proj", "up_proj", "down_proj"], # 可以设置要微调哪些层
lora_dropout=0.05,
bias="none",
task_type="CAUSAL_LM"
)

# 训练参数配置
training_args = TrainingArguments(
output_dir=output_path,
per_device_train_batch_size=2, # 8G显存上无法设置大于1
gradient_accumulation_steps=8, # 累计梯度相当于batch_size=per_device_train_batch_size*gradient_accumulation_steps
num_train_epochs=5, # 5个轮次
learning_rate=3e-4,
fp16=True, # 开启混合精度
logging_steps=20,
save_strategy="no",
report_to="none",
optim="adamw_torch",
no_cuda=False, # 强制使用CUDA
dataloader_pin_memory=False, # 加速数据加载
remove_unused_columns=False, # 防止删除未使用的列
)


def main():
os.makedirs(output_path, exist_ok=True)
tokenizer = AutoTokenizer.from_pretrained(model_path)
tokenizer.pad_token = tokenizer.eos_token

# 加载模型
model = AutoModelForCausalLM.from_pretrained(model_path, torch_dtype=torch.float16)
model = get_peft_model(model, peft_config)
model.print_trainable_parameters()
model = model.to(device)

# 准备数据
dataset = process_data(tokenizer)
# 训练回调
loss_callback = LossCallback()

# 数据加载器
def data_collator(data):
# batch = {
# "input_ids": torch.stack([torch.tensor(d["input_ids"]) for d in data]).to(device),
# "attention_mask": torch.stack([torch.tensor(d["attention_mask"]) for d in data]).to(device),
# "labels": torch.stack([torch.tensor(d["labels"]) for d in data]).to(device) # 使用input_ids作为labels
# }
# return batch
# 当设置batch_size > 1,防止每条数据的长度不一致
input_ids = [torch.tensor(d["input_ids"]) for d in data]
attention_mask = [torch.tensor(d["attention_mask"]) for d in data]
labels = [torch.tensor(d["labels"]) for d in data]

# 对 input_ids 进行填充
input_ids = pad_sequence(input_ids, batch_first=True, padding_value=tokenizer.pad_token_id).to(device)
# 对 attention_mask 进行填充
attention_mask = pad_sequence(attention_mask, batch_first=True, padding_value=0).to(device)
# 对 labels 进行填充
labels = pad_sequence(labels, batch_first=True, padding_value=-100).to(device)

batch = {
"input_ids": input_ids,
"attention_mask": attention_mask,
"labels": labels
}
return batch

# 创建Trainer

trainer = Trainer(
model=model,
args=training_args,
train_dataset=dataset,
data_collator=data_collator,
callbacks=[loss_callback]
)

# 开始训练
print("开始训练...")
trainer.train()

# 保存最终模型
trainer.model.save_pretrained(output_path)
print(f"模型已保存至:{output_path}")

# 绘制训练集损失Loss曲线
plt.figure(figsize=(10, 6))
plt.plot(loss_callback.losses)
plt.title("Training Loss Curve")
plt.xlabel("Steps")
plt.ylabel("Loss")
plt.savefig(os.path.join(output_path, "loss_curve.png"))
print("Loss曲线已保存")


if __name__ == "__main__":
main()

不过Trainer类型好像会自动把数据分发到其他卡,当在多卡机器上单卡微调时,目前发现只能通过环境变量来屏蔽其他的卡;否则虽然可以控制模型只加载到某张卡,但是Trainer好像还是会把数据分发到其他卡上,导致报错。

但是如果想要实现DDP分布式训练则方便许多,只需要将上述代码中涉及device设备指定的地方全部去除,并且也不要通过环境变量来隐藏可见显卡设备。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# LoRA配置
peft_config = LoraConfig(
r=8,
lora_alpha=32,
target_modules=["q_proj", "k_proj", "v_proj", "o_proj", "gate_proj", "up_proj", "down_proj"],
lora_dropout=0.05,
bias="none",
task_type="CAUSAL_LM"
)

# 训练参数配置
training_args = TrainingArguments(
output_dir=output_path,
per_device_train_batch_size=1, # 8G显存上无法设置大于1
gradient_accumulation_steps=8, # 累计梯度相当于batch_size=per_device_train_batch_size*gradient_accumulation_steps
num_train_epochs=5, # 5个轮次
learning_rate=3e-4,
fp16=True, # 开启混合精度
logging_steps=20,
save_strategy="no",
report_to="none",
optim="adamw_torch",
no_cuda=False, # 强制使用CUDA
dataloader_pin_memory=False, # 加速数据加载
remove_unused_columns=False, # 防止删除未使用的列
label_names=["labels"], # 手动指定标签字段名,根据分布式训练时的警告提示而设定的
ddp_find_unused_parameters=False # 明确设置为 False,根据分布式训练时的警告提示而设定的
)


def main():
os.makedirs(output_path, exist_ok=True)
tokenizer = AutoTokenizer.from_pretrained(model_path)
tokenizer.pad_token = tokenizer.eos_token


# 加载模型 device_map='auto' 只适合推理,不适合Trainer训练
model = AutoModelForCausalLM.from_pretrained(model_path, torch_dtype=torch.float16)
model = get_peft_model(model, peft_config)
model.print_trainable_parameters()

# 准备数据
dataset = process_data(tokenizer)
# 训练回调
loss_callback = LossCallback()

# 数据加载器
def data_collator(data):
# batch = {
# "input_ids": torch.stack([torch.tensor(d["input_ids"]) for d in data]).to(device),
# "attention_mask": torch.stack([torch.tensor(d["attention_mask"]) for d in data]).to(device),
# "labels": torch.stack([torch.tensor(d["labels"]) for d in data]).to(device) # 使用input_ids作为labels
# }
# return batch
# 当设置batch_size > 1,防止每条数据的长度不一致
input_ids = [torch.tensor(d["input_ids"]) for d in data]
attention_mask = [torch.tensor(d["attention_mask"]) for d in data]
labels = [torch.tensor(d["labels"]) for d in data]

# 对 input_ids 进行填充
input_ids = pad_sequence(input_ids, batch_first=True, padding_value=tokenizer.pad_token_id) # .to(device)
# 对 attention_mask 进行填充
attention_mask = pad_sequence(attention_mask, batch_first=True, padding_value=0) # .to(device)
# 对 labels 进行填充
labels = pad_sequence(labels, batch_first=True, padding_value=-100) # .to(device)

batch = {
"input_ids": input_ids,
"attention_mask": attention_mask,
"labels": labels
}
return batch

# 创建Trainer
trainer = Trainer(
model=model,
args=training_args,
train_dataset=dataset,
data_collator=data_collator,
callbacks=[loss_callback]
)

# 开始训练
print("开始训练...")
# 修改 Trainer 的模型、优化器和数据加载器

trainer.train()

# 保存最终模型
trainer.model.save_pretrained(output_path)
print(f"模型已保存至:{output_path}")

# 绘制训练集损失Loss曲线
plt.figure(figsize=(10, 6))
plt.plot(loss_callback.losses)
plt.title("Training Loss Curve")
plt.xlabel("Steps")
plt.ylabel("Loss")
plt.savefig(os.path.join(output_path, "loss_curve.png"))
print("Loss曲线已保存")


if __name__ == "__main__":
main()

然后直接使用torchrun命令启动即可:

--nproc_per_node
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84

同样的也可以使用accelerate实现DDP、FSDP策略训练
```python
def main():
os.makedirs(output_path, exist_ok=True)
tokenizer = AutoTokenizer.from_pretrained(model_path)
tokenizer.pad_token = tokenizer.eos_token

# 初始化 Accelerator
accelerator = Accelerator()

# 加载模型
model = AutoModelForCausalLM.from_pretrained(model_path, torch_dtype=torch.bfloat16)
model = get_peft_model(model, peft_config)
model.print_trainable_parameters()
# model = model.to(device)

# 准备数据
dataset = process_data(tokenizer)
# 训练回调
loss_callback = LossCallback()

# 数据加载器
def data_collator(data):
# batch = {
# "input_ids": torch.stack([torch.tensor(d["input_ids"]) for d in data]).to(device),
# "attention_mask": torch.stack([torch.tensor(d["attention_mask"]) for d in data]).to(device),
# "labels": torch.stack([torch.tensor(d["labels"]) for d in data]).to(device) # 使用input_ids作为labels
# }
# return batch
# 当设置batch_size > 1,防止每条数据的长度不一致
input_ids = [torch.tensor(d["input_ids"]) for d in data]
attention_mask = [torch.tensor(d["attention_mask"]) for d in data]
labels = [torch.tensor(d["labels"]) for d in data]

# 对 input_ids 进行填充
input_ids = pad_sequence(input_ids, batch_first=True, padding_value=tokenizer.pad_token_id) # .to(device)
# 对 attention_mask 进行填充
attention_mask = pad_sequence(attention_mask, batch_first=True, padding_value=0) # .to(device)
# 对 labels 进行填充
labels = pad_sequence(labels, batch_first=True, padding_value=-100) # .to(device)

batch = {
"input_ids": input_ids,
"attention_mask": attention_mask,
"labels": labels
}
return batch

# 创建Trainer
trainer = Trainer(
model=model,
args=training_args,
train_dataset=dataset,
data_collator=data_collator,
callbacks=[loss_callback]
)
# 使用 Accelerator 准备模型、优化器和数据加载器
model, optimizer, train_dataloader = accelerator.prepare(
trainer.model,
trainer.optimizer,
trainer.get_train_dataloader()
)

# 开始训练
print("开始训练...")
# 修改 Trainer 的模型、优化器和数据加载器
trainer.model = model
trainer.optimizer = optimizer
trainer.get_train_dataloader = lambda: train_dataloader
trainer.train()

# 保存最终模型
trainer.model.save_pretrained(output_path)
print(f"模型已保存至:{output_path}")

# 绘制训练集损失Loss曲线
plt.figure(figsize=(10, 6))
plt.plot(loss_callback.losses)
plt.title("Training Loss Curve")
plt.xlabel("Steps")
plt.ylabel("Loss")
plt.savefig(os.path.join(output_path, "loss_curve.png"))
print("Loss曲线已保存")

DDP,accelerate launch --config_file ddp_config.yaml model_parallelism.py

FSDP,accelerate launch --config_file fsdp_config.yaml model_parallelism.py

同样为了测试DDP、FSDP策略确实生效,进行了以下对比测试

1.5B模型,上下文长度-1k,精度bf16

torchrun 训练需要20分钟,单卡占用显存11G+

ddp训练需要20分钟,单卡占用显存11G+

fsdp 训练需要6小时+,单卡占用显存11G+

7B模型 上下文长度-1k,精度bf16
torchrun 显存不够
ddp 显存不够
fsdp 显存也不够

7B模型 上下文长度-512,精度bf16
torchrun 显存不够
ddp 显存不够
fsdp 训练需要28小时+,单卡占用显存23G+

FSDP训练速度太慢了,除非单卡显存无法满足的情况下,否则不要使用fsdp

One More Thing

除了使用上一篇和这一篇的代码进行大模型Lora微调,还可以使用更为成熟和完善的LLamaFactory进行大模型微调,其也全面支持分布式训练:

https://llamafactory.readthedocs.io/zh-cn/latest/advanced/distributed.html

而关于模型的部署和推理优化可以学习以下两个库:
vLLM:https://www.llamafactory.cn/vllm/

SGLang:https://www.llamafactory.cn/sglang/