Create Batchjob projects

<aside> 💡 Make sure to create an environment first before creating a batch job. You will need to

</aside>

Creating batch job is very simple. On the home page, click on Create + and select Job. You can create an empty job or upload one from your computer or from your Drive or from GitHub. Here is the screenshot of the modal window where you choose:

Once you select the project source - Empty, Filesystem, Drive or Github, you will then pick the project source/root directory and a main file. The main file is where the execution begins. The platform will launch the project by calling:

YOUR_ENV/python YOUR_MAIN_FILE.py YOUR_ARGUMENTS

In addition, you must select an Environment. Environment is a user defined environment that is essentially a conda yaml file. In addition you can also attach secrets to an environment. The platform will install the conda environment and store secrets as environment variables. It is important to note that you as a user must first define an environment. You can create an environment from the sidebar by going into Environments and pressing Create + at the top left.

Besides Job Settings, there are a few other settings, a user can control - Compute Settings, Distributed Settings and Collaboration Settings.

Under Compute Settings, the user can choose what type of compute (CPU, Memory) and GPUs they need for the job. A user can also choose to add more disk capacity.

Distributed Settings

Under Distributed Settings, the user can toggle on Distributed Training. This is a special feature for pytorch and tensorflow distributed training modules. User can simply toggle it on and select how many nodes they want to train a model on.

Untitled

<aside> 💡 The training code simply needs to use torch.ddp or tensorflow.distributed package. The Ploutos platform is deeply integrated with both those packages and will set up the rank, master, ports and NCCL communication etc.

</aside>

An example of Single Click Multi GPU Multi Node training with Pytorch

Here is an example script with pytorch. Note that there is no change in the code to use this automation. It just works like magic:

import os

import numpy as np

import torch
from torch.utils.data.distributed import DistributedSampler
from torch.utils.data import DataLoader
import torch.nn as nn
import torch.optim as optim

import torchvision
import torchvision.transforms as transforms

import argparse

import random

print ("Starting Disgributed Training:\\n")

def set_random_seeds(random_seed=0):
    torch.manual_seed (random_seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
    np.random.seed (random_seed)
    random.seed (random_seed)

def evaluate(model , device , test_loader):
    model.eval ()

    correct = 0
    total = 0
    with torch.no_grad ():
        for data in test_loader:
            images , labels = data[ 0 ].to (device) , data[ 1 ].to (device)
            outputs = model (images)
            _ , predicted = torch.max (outputs.data , 1)
            total += labels.size (0)
            correct += (predicted == labels).sum ().item ()

    accuracy = correct / total

    return accuracy

def main():
    num_epochs_default = 10
    batch_size_default = 256  # 1024
    learning_rate_default = 0.1
    random_seed_default = 0
    model_dir_default = "saved_models"
    model_filename_default = "resnet_distributed.pth"
    print ("Comment 1 \\n")
    # Each process runs on 1 GPU device specified by the local_rank argument.
    parser = argparse.ArgumentParser (formatter_class=argparse.ArgumentDefaultsHelpFormatter)
   
    parser.add_argument ("--num_epochs" , type=int , help="Number of training epochs." , default=num_epochs_default)
    parser.add_argument ("--batch_size" , type=int , help="Training batch size for one process." ,
                         default=batch_size_default)
    parser.add_argument ("--learning_rate" , type=float , help="Learning rate." , default=learning_rate_default)
    parser.add_argument ("--random_seed" , type=int , help="Random seed." , default=random_seed_default)
    parser.add_argument ("--model_dir" , type=str , help="Directory for saving models." , default=model_dir_default)
    parser.add_argument ("--model_filename" , type=str , help="Model filename." , default=model_filename_default)
    parser.add_argument ("--resume" , action="store_true" , help="Resume training from saved checkpoint.")
    argv = parser.parse_args ()

    local_rank = int(os.environ["LOCAL_RANK"])
    num_epochs = argv.num_epochs
    batch_size = argv.batch_size
    learning_rate = argv.learning_rate
    random_seed = argv.random_seed
    model_dir = argv.model_dir
    model_filename = argv.model_filename
    resume = argv.resume
    print ("Creating directories")
    # Create directories outside the PyTorch program
    # Do not create directory here because it is not multiprocess safe
    '''
    if not os.path.exists(model_dir):
        os.makedirs(model_dir)
    '''
    print ("Setting seeds \\n")
    # model_filepath = os.path.join(model_dir, model_filename)

    # We need to use seeds to make sure that the models initialized in different processes are the same
    set_random_seeds (random_seed=random_seed)

    # Initializes the distributed backend which will take care of sychronizing nodes/GPUs
    torch.distributed.init_process_group (backend="nccl")
    # torch.distributed.init_process_group(backend="gloo")
    print ("Downloading base model \\n")
    # Encapsulate the model on the GPU assigned to the current process
    model = torchvision.models.resnet18 (pretrained=False)
    print ("Loading on CUDA \\n")
    device = torch.device ("cuda:{}".format (local_rank))
    model = model.to (device)
    ddp_model = torch.nn.parallel.DistributedDataParallel (model , device_ids=[ local_rank ] , output_device=local_rank)

    # We only save the model who uses device "cuda:0"
    # To resume, the device for the saved model would also be "cuda:0"
    # if resume == True:
    # map_location = {"cuda:0": "cuda:{}".format(local_rank)}
    # ddp_model.load_state_dict(torch.load(model_filepath, map_location=map_location))

    # Prepare dataset and dataloader
    transform = transforms.Compose ([
        transforms.RandomCrop (32 , padding=4) ,
        transforms.RandomHorizontalFlip () ,
        transforms.ToTensor () ,
        transforms.Normalize ((0.4914 , 0.4822 , 0.4465) , (0.2023 , 0.1994 , 0.2010)) ,
    ])
    print ("Fetching Data \\n")
    # Data should be prefetched
    # Download should be set to be False, because it is not multiprocess safe
    train_set = torchvision.datasets.CIFAR10 (root="data" , train=True , download=True , transform=transform)
    test_set = torchvision.datasets.CIFAR10 (root="data" , train=False , download=True , transform=transform)

    # Restricts data loading to a subset of the dataset exclusive to the current process
    train_sampler = DistributedSampler (dataset=train_set)

    train_loader = DataLoader (dataset=train_set , batch_size=batch_size , sampler=train_sampler , num_workers=8)
    # Test loader does not have to follow distributed sampling strategy
    test_loader = DataLoader (dataset=test_set , batch_size=128 , shuffle=False , num_workers=8)

    criterion = nn.CrossEntropyLoss ()
    optimizer = optim.SGD (ddp_model.parameters () , lr=learning_rate , momentum=0.9 , weight_decay=1e-5)
    print ("Starting the training \\n")
    # Loop over the dataset multiple times
    for epoch in range (num_epochs):

        print ("Local Rank: {}, Epoch: {}, Training ...".format (local_rank , epoch))

        # Save and evaluate model routinely
        if epoch % 2 == 0:
            if local_rank == 0:
                accuracy = evaluate (model=ddp_model , device=device , test_loader=test_loader)
                # torch.save(ddp_model.state_dict(), model_filepath)
                print ("-" * 75)
                print ("Epoch: {}, Accuracy: {}".format (epoch , accuracy))
                print ("-" * 75)

        ddp_model.train ()

        for data in train_loader:
            inputs , labels = data[ 0 ].to (device) , data[ 1 ].to (device)
            optimizer.zero_grad ()
            outputs = ddp_model (inputs)
            loss = criterion (outputs , labels)
            loss.backward ()
            optimizer.step ()
    print ("All done \\n")

if __name__ == "__main__":
    main ()

An example of Single Click Multi GPU Multi Node training with Tensorflow

import os
import json

import tensorflow as tf
import mnist

per_worker_batch_size = 64
tf_config = json.loads(os.environ['TF_CONFIG'])
num_workers = len(tf_config['cluster']['worker'])

strategy = tf.distribute.MultiWorkerMirroredStrategy()

global_batch_size = per_worker_batch_size * num_workers
multi_worker_dataset = mnist.mnist_dataset(global_batch_size)

with strategy.scope():
  # Model building/compiling need to be within `strategy.scope()`.
  multi_worker_model = mnist.build_and_compile_cnn_model()

multi_worker_model.fit(multi_worker_dataset, epochs=20, steps_per_epoch=70)
import os
import tensorflow as tf
import numpy as np

def mnist_dataset(batch_size):
  (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
  # The `x` arrays are in uint8 and have values in the [0, 255] range.
  # You need to convert them to float32 with values in the [0, 1] range.
  x_train = x_train / np.float32(255)
  y_train = y_train.astype(np.int64)
  train_dataset = tf.data.Dataset.from_tensor_slices(
      (x_train, y_train)).shuffle(60000).repeat().batch(batch_size)
  return train_dataset

def build_and_compile_cnn_model():
  model = tf.keras.Sequential([
      tf.keras.layers.InputLayer(input_shape=(28, 28)),
      tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(128, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
  model.compile(
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
      metrics=['accuracy'])
  return model