Shortcuts

Tutorial 6: Distributed Training

COALA enables federated learning (FL) training over multiple GPUs. We define the following variables to further illustrate the idea:

  • K: the number of clients who participated in training each round

  • N: the number of available GPUs

When K == N, each selected client is allocated to a GPU to train.

When K > N, multiple clients are allocated to a GPU, then they execute training sequentially in the GPU.

When K < N, you can adjust to use fewer GPUs in training.

We make it easy to use distributed training. You just need to modify the configs, without changing the core implementations. In particular, you need to set the number of GPUs in gpu and specific distributed settings in the distributed configs.

Distributed Training with Pytorch Multiprocessing

The following is an example of distributed training on multiple gpus using PyTorch Multiprocessing.

import logging

logger = logging.getLogger(__name__)

import argparse
import coala
import torch.multiprocessing as mp


def run(rank, args):
    config = {
        "task_id": "cifar10",
        "data": {
            "dataset": "cifar10",
            "split_type": "iid",
            "num_of_clients": args.num_of_clients,
        },
        "server": {
            "rounds": args.rounds,
            "clients_per_round": args.clients_per_round,
            "test_every": args.test_every,
            "test_all": True,
            "random_selection": True,
        },
        "client": {
            "local_epoch": args.local_epoch,
            "rounds": args.rounds,
            "test_batch_size": 32,
            "batch_size": args.batch_size,
        },

        "model": "simple_cnn",
        "test_mode": "test_in_server",

        "is_remote": False,
        "local_port": 22,

        "distributed": {
            "world_size": args.gpus
        },

        "gpu": args.gpus,
    }
    if args.gpus > 1:
        config.update({
            "distributed": {
                "rank": rank,
                "local_rank": rank,
                "world_size": args.gpus,
                "init_method": "tcp://127.0.0.1:8123"
            }
        })
    coala.init(config)
    coala.run()


if __name__ == "__main__":
    parser = argparse.ArgumentParser(description='Application')
    parser.add_argument("--task_id", type=str, default="")
    parser.add_argument('--gpus', default=4, type=int)
    parser.add_argument('--batch_size', default=32, type=int)
    parser.add_argument('--local_epoch', default=5, type=int)
    parser.add_argument('--rounds', default=100, type=int)
    parser.add_argument('--num_of_clients', default=50, type=int)
    parser.add_argument('--clients_per_round', default=10, type=int)
    parser.add_argument('--lr', default=0.001, type=float)
    parser.add_argument('--test_every', default=5, type=int, help='Test every x rounds')
    args = parser.parse_args()
    print("arguments: ", args)
    
    if args.gpus <= 1:
        run(0, args)
    else:
        mp.set_start_method("spawn")
        processes = []
        for rank in range(args.gpus):
            p = mp.Process(target=run, args=(rank, args))
            p.start()
            processes.append(p)
        for p in processes:
            p.join()

Distributed Training with Slurm

The following is an example of distributed training on a GPU cluster managed by slurm.

import coala
from coala.distributed import slurm

# Get the distributed settings.
rank, local_rank, world_size, host_addr = slurm.setup()
# Set the distributed training settings.
config = {
    "gpu": world_size,
    "distributed": {
        "rank": rank, 
        "local_rank": local_rank, 
        "world_size": world_size, 
        "init_method": host_addr
    },
}
# Initialize COALA.
coala.init(config)
# Execute training with distributed training.
coala.run()

We will further provide scripts to set up distributed training using multiprocess. Pull requests are also welcomed.