torch.distributed package에서 제공하는 Multi-GPU training에 대해 코드를 보면서 알아보자. Accelerate/Ignite/Lightning 등의 library는 이런 distributed run을 좀 더 쉽게 해주는데, 이 중 HuggingFace의 Accelerate를 활용한 것도 코드를 보면서 알아보자.

0. Single GPU

PyTorch 공식에도 올라와있는 MNIST 분류 모델 코드이다. CUDA:0으로 GPU 하나 사용해서 학습하는데, 아래에서 동일하게 Multi-GPU로 학습해보기 전 baseline이다.

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
from torch.utils.data import DataLoader

class BasicNet(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(1, 32, 3, 1)
        self.conv2 = nn.Conv2d(32, 64, 3, 1)
        self.dropout1 = nn.Dropout(0.25)
        self.dropout2 = nn.Dropout(0.5)
        self.fc1 = nn.Linear(9216, 128)
        self.fc2 = nn.Linear(128, 10)
        self.act = F.relu

    def forward(self, x):
        x = self.act(self.conv1(x))
        x = self.act(self.conv2(x))
        x = F.max_pool2d(x, 2)
        x = self.dropout1(x)
        x = torch.flatten(x, 1)
        x = self.act(self.fc1(x))
        x = self.dropout2(x)
        x = self.fc2(x)
        output = F.log_softmax(x, dim=1)
        return output

def main():
    device = "cuda"

    transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.1307), (0.3081))
    ])

    train_dset = datasets.MNIST('data', train=True, download=True, transform=transform)
    test_dset = datasets.MNIST('data', train=False, transform=transform)

    train_loader = DataLoader(train_dset, shuffle=True, batch_size=64)
    test_loader = DataLoader(test_dset, shuffle=False, batch_size=64)


    model = BasicNet().to(device)

    optimizer = optim.AdamW(model.parameters(), lr=1e-3)

    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        output = model(data)
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()

    model.eval()
    correct = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            pred = output.argmax(dim=1, keepdim=True)
            correct += pred.eq(target.view_as(pred)).sum().item()
    print(f'Accuracy: {100. * correct / len(test_loader.dataset)}')

if __name__ == "__main__":
    main()

1. DP vs DDP

Pytorch Distributed Overview에 보면 다음과 같은 내용이 나온다.

Use single-machine multi-GPU DataParallel to make use of multiple GPUs on a single machine to speed up training with minimal code changes. Use single-machine multi-GPU DistributedDataParallel, if you would like to further speed up training and are willing to write a little more code to set it up.

torch.nn.DataParallel (DP)와 torch.nn.parallel.DistributedDataParallel (DDP)을 주로 쓰는데, DDP가 DP에 비해 세팅할 것들이 조금 더 있다. 번거롭다 기본적으로 DP는 single-process, multi-thread에 single node만 지원한다. 그러나 Python의 Global Interpreter Lock에 의해서 하나의 프로세스에서 동시에 여러 개의 thread가 작동할 수 없기에, 근본적으로는 multi-process 프로그램으로 만들어줘야 된다. 그리고 forward 연산 시 하나의 GPU에 모든 다음에 loss를 계산해서, 0번 GPU에 memory가 더 많이 사용되는 문제점이 있다.

위와 동일한 모델을 학습하면서, DP로 Multi-GPU training을 구현하면 다음과 같다. GPU 6번 7번 2개를 사용하였다.

    import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
from torch.utils.data import DataLoader

class BasicNet(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(1, 32, 3, 1)
        self.conv2 = nn.Conv2d(32, 64, 3, 1)
        self.dropout1 = nn.Dropout(0.25)
        self.dropout2 = nn.Dropout(0.5)
        self.fc1 = nn.Linear(9216, 128)
        self.fc2 = nn.Linear(128, 10)
        self.act = F.relu

    def forward(self, x):
        x = self.act(self.conv1(x))
        x = self.act(self.conv2(x))
        x = F.max_pool2d(x, 2)
        x = self.dropout1(x)
        x = torch.flatten(x, 1)
        x = self.act(self.fc1(x))
        x = self.dropout2(x)
        x = self.fc2(x)
        output = F.log_softmax(x, dim=1)
        return output

def main():
    device = "cuda:6"

    transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.1307), (0.3081))
    ])

    train_dset = datasets.MNIST('data', train=True, download=True, transform=transform)
    test_dset = datasets.MNIST('data', train=False, transform=transform)

    train_loader = DataLoader(train_dset, shuffle=True, batch_size=64)
    test_loader = DataLoader(test_dset, shuffle=False, batch_size=64)

    model = BasicNet().to(device="cuda:6")
    model = nn.DataParallel(model, device_ids=[6, 7], output_device=6)
    optimizer = optim.AdamW(model.parameters(), lr=1e-3)

    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        output = model(data)
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()

    model.eval()
    correct = 0
    with torch.no_grad():
        for data, target in test_loader:
            output = model(data)
            pred = output.argmax(dim=1, keepdim=True)
            correct += pred.detach().cpu().eq(target.view_as(pred)).sum().item()
    print(f'Accuracy: {100. * correct / len(test_loader.dataset)}')

if __name__ == "__main__":
    main()

DDP로 Multi-GPU training을 구현하면 다음과 같다.

import os
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.optim.lr_scheduler import StepLR
import argparse
import torch.distributed as dist
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
from torch.utils.data.distributed import DistributedSampler
from torch.nn.parallel import DistributedDataParallel as DDP

from MySampler import DistributedEvalSampler
# See: https://github.com/SeungjunNah/DeepDeblur-PyTorch/blob/master/src/data/sampler.py

class BasicNet(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(1, 32, 3, 1)
        self.conv2 = nn.Conv2d(32, 64, 3, 1)
        self.dropout1 = nn.Dropout(0.25)
        self.dropout2 = nn.Dropout(0.5)
        self.fc1 = nn.Linear(9216, 128)
        self.fc2 = nn.Linear(128, 10)
        self.act = F.relu

    def forward(self, x):
        x = self.act(self.conv1(x))
        x = self.act(self.conv2(x))
        x = F.max_pool2d(x, 2)
        x = self.dropout1(x)
        x = torch.flatten(x, 1)
        x = self.act(self.fc1(x))
        x = self.dropout2(x)
        output = self.fc2(x)
        return output

def main(opts):
    # init dist
    init_for_distributed(opts)
    local_gpu_id = opts.gpu

    # data set
    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.1307), (0.3081))
        ])

    train_set = datasets.MNIST('data', train=True, download=True, transform=transform)
    test_set = datasets.MNIST('data', train=False, transform=transform)

    train_sampler = DistributedSampler(dataset=train_set, shuffle=True)
    '''
    https://github.com/SeungjunNah/DeepDeblur-PyTorch/blob/master/src/data/sampler.py

    DistributedEvalSampler is different from DistributedSampler.
    It does NOT add extra samples to make it evenly divisible.
    DistributedEvalSampler should NOT be used for training. The distributed processes could hang forever.
    See this issue for details: https://github.com/pytorch/pytorch/issues/22584
    '''
    test_sampler = DistributedEvalSampler(dataset=test_set, shuffle=False)
    # test_sampler2 = DistributedSampler(dataset=test_set, shuffle=False)

    train_loader = DataLoader(dataset=train_set,
                              batch_size=int(64 / opts.world_size), # batch_size=64
                               shuffle=False,
                               num_workers=int(opts.num_workers / opts.world_size),
                               sampler=train_sampler,
                               pin_memory=True)
    test_loader = DataLoader(dataset=test_set,
                             batch_size=int(64 / opts.world_size),
                             shuffle=False,
                             num_workers=int(opts.num_workers / opts.world_size),
                             sampler=test_sampler,
                             pin_memory=True)

    # model
    model = BasicNet()
    model = model.cuda(local_gpu_id)
    model = DDP(module=model,
                device_ids=[local_gpu_id])

    # criterion
    criterion = torch.nn.CrossEntropyLoss().to(local_gpu_id)

    # optimizer
    optimizer = optim.AdamW(model.parameters(), lr=1e-3)

    # scheduler
    scheduler = StepLR(optimizer=optimizer, step_size=30, gamma=0.1)
    
    for epoch in range(1): # training for one epoch
        # train
        model.train()
        '''
        In distributed mode, calling the set_epoch() method at the beginning of each epoch 
        before creating the DataLoader iterator is necessary to make shuffling work properly 
        across multiple epochs. Otherwise, the same ordering will be always used.
        '''
        train_sampler.set_epoch(epoch)

        for _, (data, target) in enumerate(train_loader):
            data, target = data.to(local_gpu_id), target.to(local_gpu_id)
            output = model(data)
            loss = criterion(output, target)
            loss.backward()
            optimizer.step()
            optimizer.zero_grad()

        # test
        model.eval()

        correct = 0
        with torch.no_grad():
            for _, (data, target) in enumerate(test_loader):
                data = data.to(opts.rank)
                labels = target.to(opts.rank)
                outputs = model(data)
                pred = outputs.argmax(dim=1, keepdim=True)
                correct += pred.eq(labels.view_as(pred)).sum().item()
        correct = torch.Tensor([correct]).to(opts.rank)
        global_result = [torch.zeros_like(correct) for _ in range(opts.world_size)]
        
        
        if opts.rank == 0:
            dist.gather(correct, gather_list=global_result)
        else:
            dist.gather(correct, dst=0)

        if opts.rank == 0:
            global_result_tensor = torch.cat(global_result)
            global_correct = torch.sum(global_result_tensor)
            #print(train_sampler.total_size)
            #print(test_sampler.total_size)
            #print(test_sampler2.total_size)
            print(f'Accuracy: {100. * global_correct / len(test_loader.dataset)}')
            
            scheduler.step()

    cleanup()
    return


def init_for_distributed(opts):

    if 'RANK' in os.environ and 'WORLD_SIZE' in os.environ:
        opts.rank = int(os.environ["RANK"])
        opts.world_size = int(os.environ['WORLD_SIZE'])
        opts.gpu = int(os.environ['LOCAL_RANK'])

    torch.cuda.set_device(opts.gpu)

    #os.environ['MASTER_ADDR'] = 'localhost'
    #os.environ['MASTER_PORT'] = '23456'

    dist.init_process_group(backend='nccl', 
                            world_size=opts.world_size, 
                            rank=opts.rank)

    dist.barrier()
    setup_for_distributed(opts.rank == 0)


def setup_for_distributed(is_master):
    """
    Disable printing when not in master process
    """
    import builtins as __builtin__
    builtin_print = __builtin__.print

    def print(*args, **kwargs):
        force = kwargs.pop('force', False)
        if is_master or force:
            builtin_print(*args, **kwargs)

    __builtin__.print = print

def cleanup():
    dist.destroy_process_group()

if __name__ == '__main__':

    parser = argparse.ArgumentParser()
    parser.add_argument('--rank', type=int, default=0)
    parser.add_argument('--gpu_ids', nargs="+", default=['1', '2', '3', '4', '5', '6', '7'])
    args = parser.parse_args()

    os.environ["OMP_NUM_THREADS"] = "1" # Single Thread
    os.environ["CUDA_VISIBLE_DEVICES"] = ','.join([str(v) for v in args.gpu_ids])

    args.world_size = len(args.gpu_ids)
    args.num_workers = len(args.gpu_ids) * 4
    main(args)

서버에서 Docker container 상에서 코드를 실행하는데, MASTER_ADDR이랑 MASTER_PORT 부분을 주석 제거하고 실행하니 port 접속이 안 된다는 오류가 계속 떠서 제거하고 실행하였다. 도커 컨테이너 실행 시에 포트 포워딩 관련 옵션을 넣어주어야 되는 것 같은데, 관리자가 아니라서 못한다는…

이후 터미널 창에서 다음과 같이 실행해주면 된다. “python 파일명” 이렇게 실행하면 안되고, 멀티프로세스라 torchrun을 사용해야 한다. nproc_per_node 7는 GPU 7개를 사용한다는 것이고, nnodes 1은 single node라는 것이다.

torchrun –nproc_per_node=7 –nnodes=1 example_script.py

Cautions

  1. DP 사용시, model과 data는 가장 첫 번째 GPU에 있어야 된다.
  2. num_workers는 GPU 대수당 4가 적절. 물론 이 또한 Hyperparameter가 될 수 있긴 하다.
  3. Process Group init 시 Distributed GPU training을 위해서는 backend로 “nccl”을 사용한다.
  4. DataLoader 시 Batch_size는 원하는 batch size를 gpu 대수로 나누어 준 수를 넣고, Sampler는 torch.utils.data.distributed.DistributedSampler를 사용한다.
  5. Evaluation을 위해서는 dist.gather(또는 all_gather 등)를 이용한다. 그리고 rank==0 조건을 이용해서 한 번만 계산하는데, 이 때 DistributedSampler를 조심해야 된다. 가령 MNIST의 경우 training dataset 6만 개, test dataset 1만 개인데 GPU를 7개 사용 시 7로 개수가 나눠지기 위해 extra sample를 더해준다. (drop_last 파라미터를 True로 주면 sample를 몇 개 빼지만, 여전히 7로 나누어 지게 한다.) 그래서 위 코드에서 print(test_sampler2.total_size)의 결과는 1만이 아니라 10003이 된다. 그래서 정확한 결과가 아니게 되는데, 따라서 [DistributedEvalSampler]에 나와있는 버젼의 DistributedEvalSampler를 사용했다. 이것을 사용하면 extra padding 없이 print(test_sampler.total_size)의 결과는 정확히 1만이 된다.

2. HuggingFace Accelerate

[HuggingFace Accelerate]

🤗 Accelerate is a library that enables the same PyTorch code to be run across any distributed configuration by adding just four lines of code! In short, training and inference at scale made simple, efficient and adaptable.

from accelerate import Accelerator
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
from torch.optim.lr_scheduler import StepLR

class BasicNet(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(1, 32, 3, 1)
        self.conv2 = nn.Conv2d(32, 64, 3, 1)
        self.dropout1 = nn.Dropout(0.25)
        self.dropout2 = nn.Dropout(0.5)
        self.fc1 = nn.Linear(9216, 128)
        self.fc2 = nn.Linear(128, 10)
        self.act = F.relu

    def forward(self, x):
        x = self.act(self.conv1(x))
        x = self.act(self.conv2(x))
        x = F.max_pool2d(x, 2)
        x = self.dropout1(x)
        x = torch.flatten(x, 1)
        x = self.act(self.fc1(x))
        x = self.dropout2(x)
        output = self.fc2(x) 
        return output

def main():
    transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.1307), (0.3081))
    ])

    train_dset = datasets.MNIST('data', train=True, download=True, transform=transform)
    test_dset = datasets.MNIST('data', train=False, transform=transform)

    training_dataloader = DataLoader(train_dset, shuffle=True, batch_size=64)
    validation_datalaoder = DataLoader(test_dset, shuffle=False, batch_size=64)

    model = BasicNet()

    optimizer = optim.AdamW(model.parameters(), lr=1e-3)

    scheduler = StepLR(optimizer=optimizer, step_size=30, gamma=0.1)

    accelerator = Accelerator()

    model, optimizer, training_dataloader, scheduler = accelerator.prepare(
        model, optimizer, training_dataloader, scheduler
    )

    model.train()
    criterion = torch.nn.CrossEntropyLoss()
    # train - 1 epoch
    for batch in training_dataloader:
        optimizer.zero_grad()
        inputs, targets = batch
        outputs = model(inputs)
        loss = criterion(outputs, targets)
        accelerator.backward(loss)
        optimizer.step()
    scheduler.step()

    validation_datalaoder = accelerator.prepare(validation_datalaoder)
    # test - 1 epoch
    model.eval()
    correct = 0
    with torch.no_grad():
        for batch in validation_datalaoder:
            inputs, targets = batch
            outputs = model(inputs)
            pred = outputs.argmax(dim=1, keepdim=True)
            # Gathers tensor and potentially drops duplicates in the last batch if on a distributed system.
            all_preds, all_targets = accelerator.gather_for_metrics((pred, targets))
            if accelerator.is_main_process:
                correct += all_preds.eq(all_targets.view_as(all_preds)).sum().item()
    if accelerator.is_main_process:            
        accelerator.print(f'Accuracy: {100. * correct / len(validation_datalaoder.dataset)}')   

if __name__ == "__main__":
    main()

이후 터미널 창에서 다음과 같이 실행해주면 된다.

CUDA_VISIBLE_DEVICES=”6,7” accelerate launch –config_file ./my_config_file.yaml example_script.py

여기서 my_config_file.yaml은 Multi-GPU training 관련 설정들을 지정해주는 파일로, 다음과 같이 설정해주었다. 공식 문서에 나와있는 사항 그대로이며, FP16 Mixed-precision이 활용되었다.

compute_environment: LOCAL_MACHINE
deepspeed_config: {}
distributed_type: MULTI_GPU
fsdp_config: {}
machine_rank: 0
main_process_ip: null
main_process_port: null
main_training_function: main
mixed_precision: fp16
num_machines: 1
num_processes: 2
use_cpu: false

Cautions

  1. .cuda(), .to(device) 등을 없애고 accelerator.prepare()에 모두 보내면 된다.
  2. loss.backward()가 아니라 accelerator.backward(loss)로 보내준다.
  3. DDP와 유사하게, evaluation 시 전체를 모두 모으는 것이 필요하다. accelerator.gather() 혹은 위 코드처럼 accelerator.gather_for_metrics() 등을 사용하여 처리한다. DDP보다 나은 점은, DDP는 extra sample padding 때문에 또 다른 후속 처리가 필요했으나, accelerator의 경우 자동으로 해당 처리를 해준다.
  4. accelerator.is_main_process 는 local_rank==0과 동일하다. 딱 한 번 실행된다.
  5. 공식문서에도 나와있듯이, accelerate config를 accelerate launch 이전에 실행하기를 highly 권장하고 있다. (그렇지 않으면 Accelerate가 자동으로 다 해버린다고 경고하고 있다.) 그래서 위 실행 명령어 처럼 config_file을 따로 저장해두기를 권장하는 것 같다. hyperparameter 지정하는 yaml 파일 하나, Multi-gpu training option 지정하는 yaml 파일 하나 이렇게 사용하면 될 것 같다.
  6. torchrun 써도 된다고 한다.

Summary

[Multi GPU Training Examples]

Reference

Websites

[사이트 출처 1] (https://huggingface.co/docs/accelerate/main/en/package_reference/accelerator)

[사이트 출처 2] (https://huggingface.co/blog/pytorch-ddp-accelerate-transformers)

[사이트 출처 3] (https://github.com/huggingface/accelerate/blob/main/examples/nlp_example.py)

[사이트 출처 4] (https://velog.io/@mmsori/PyTorch-Distributed-Sampler-in-evaluation)

[사이트 출처 5] (https://github.com/SeungjunNah/DeepDeblur-PyTorch/blob/master/src/data/sampler.py)

Papers

태그:

카테고리:

업데이트:

댓글남기기