1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123
| import os import torch import torch.distributed as dist from datasets import load_dataset from datasets import Split from transformers import AutoTokenizer, AutoModelForCausalLM from torch.utils.data import IterableDataset, DataLoader import deepspeed from torch.utils.tensorboard import SummaryWriter
deepspeed.init_distributed()
os.environ["TOKENIZERS_PARALLELISM"] = "false"
local_rank = int(os.getenv("LOCAL_RANK", "0")) world_size = int(os.getenv("WORLD_SIZE", "1")) os.environ["RANK"] = str(local_rank) os.environ["WORLD_SIZE"] = str(world_size) os.environ["MASTER_ADDR"] = "localhost" os.environ["MASTER_PORT"] = "23333"
os.environ["NCCL_BLOCKING_WAIT"] = "1" os.environ["NCCL_ASYNC_ERROR_HANDLING"] = "1" os.environ["NCCL_DEBUG"] = "INFO" os.environ["NCCL_TIMEOUT"] = "600"
if not dist.is_initialized(): dist.init_process_group(backend="nccl")
model_path = "./models/meta-llama/Llama-3.2-3B" dataset_path = "./dataset/LLM/yahma___alpaca-cleaned" ds_config = "./ds_config.json" log_dir = "./logs" max_length = 1024 batch_size = 8 train_epochs = 3
dataset_stream = load_dataset(dataset_path, split=Split.TRAIN, streaming=True)
tokenizer = AutoTokenizer.from_pretrained(model_path, token=True) tokenizer.pad_token = tokenizer.eos_token
model = AutoModelForCausalLM.from_pretrained(model_path, token=True) model.to(f"cuda:{local_rank}")
class StreamDataset(IterableDataset): def __init__(self, data_stream, tokenizer, max_length=max_length): self.data_stream = data_stream self.tokenizer = tokenizer self.max_length = max_length def __iter__(self): for data in self.data_stream: instruction = data["instruction"] input_text = data["input"] output = data["output"] if input_text.strip(): prompt = f"### Instruction:\n{instruction}\n\n### Input:\n{input_text}\n\n### Response:\n" else: prompt = f"### Instruction:\n{instruction}\n\n### Response:\n" full_text = prompt + output input_ids = tokenizer( full_text, truncation=True, max_length=self.max_length, padding="max_length", )["input_ids"] prompt_length = len( tokenizer(prompt, padding="do_not_pad", truncation=False)["input_ids"] ) labels = [-100] * prompt_length + input_ids[prompt_length:] labels = labels[: self.max_length] labels = labels + [-100] * (max_length - len(labels)) yield { "input_ids": torch.tensor(input_ids, dtype=torch.long), "labels": torch.tensor(labels, dtype=torch.long), } def __len__(self): return 51760 def train(): train_dataset = StreamDataset(dataset_stream, tokenizer) train_dataloader = DataLoader(train_dataset, batch_size=batch_size) writer = SummaryWriter(log_dir=log_dir) if dist.get_rank() == 0 else None model_engine, optimizer, _, _ = deepspeed.initialize( model=model, model_parameters=model.parameters(), config=ds_config, training_data=train_dataset, ) for epoch in range(train_epochs): for step, batch in enumerate(train_dataloader): inputs = { key: value.to(f"cuda:{local_rank}") for key, value in batch.items() } outputs = model_engine(**inputs) loss = outputs.loss model_engine.backward(loss) model_engine.step() if dist.get_rank() == 0: if step % 10 == 0: print( f"Epoch: {epoch}, Step: {step}/{len(train_dataloader)}, Loss: {loss.item()}" ) writer.add_scalar( "Loss/train", loss.item(), epoch * len(train_dataloader) + step ) checkpoint_path = f"./checkpoints" model_engine.save_checkpoint(checkpoint_path, tag=f"epoch_{epoch}") print(f"Checkpoint saved at {checkpoint_path}") if writer: writer.close() if __name__ == "__main__": train()
|