Ddp2 in multi node and multi gpu failing on pytorch lightning

Hi Everyone,

I tried to make pytorch lightning model and train it using ddp in multi nodes and multi gpu.In below is a very brief version of the code that I believe covers them. I am submitting the job using on a IBM watson server using jsrun. I would be very grateful, If you could help

job submission
jsrun -bpacked:7 -g6 -a6 -c42 -r1 python train_model.py
python code
project_root = os.path.dirname(os.path.abspath(sys.argv[0]))
feature_path = '/gpfs/alpine/proj-shared/bif135/raj/hetero_data/features/'
label_path = '/gpfs/alpine/proj-shared/bif135/raj/hetero_data/labels/'

train_file = "../dataset/initial/het_std_train_list.txt"
val_file = "../dataset/initial/het_std_val_list.txt"


NODES =int (sys.argv[1])
GPU = int(sys.argv[2])

BATCH_SIZE = 1
VAL_BATCH_SIZE = BATCH_SIZE
class block(pl.LightningModule):
   def __init__(
           self, in_channels, intermediate_channels, identity_downsample=None, stride=1, _dilation_rate=1):
       super(block, self).__init__()
       self.conv1 = nn.Conv2d(
           intermediate_channels, intermediate_channels, kernel_size=3, bias=False, padding=2 * _dilation_rate,
           dilation=_dilation_rate)
       self.ins1 = nn.InstanceNorm2d(intermediate_channels)
       self.elu = nn.ELU()
       self.dropout1 = nn.Dropout(0.15)
       self.conv2 = nn.Conv2d(
           intermediate_channels, intermediate_channels, kernel_size=3, bias=False, dilation=_dilation_rate)
       self.ins2 = nn.InstanceNorm2d(intermediate_channels)

   def forward(self, x):
       identity = x.clone()
       x = self.conv1(x)
       x = self.ins1(x)
       x = self.elu(x)
       x = self.dropout1(x)
       x = self.conv2(x)
       x = self.ins2(x)
       x += identity
       x = self.elu(x)
       return x


class ResNet(pl.LightningModule):

   def __init__(self):
       super(ResNet, self).__init__()
       self.filter = 48
       self.layers = [RESNET_DEPTH]
       self.in_channels = 442
       self.nThreads = 24
       self.epoch = 0
       self.conv1 = nn.Conv2d(self.in_channels, self.filter, kernel_size=1, stride=1, padding=0, bias=False)
       self.ins = nn.InstanceNorm2d(self.filter)
       self.elu = nn.ELU()
       self.loss = nn.BCELoss()
       self.list_acc_T5 = []
       self.list_acc_T10 = []
       self.list_acc_T20 = []
       self.list_acc_T30 = []
       self.list_acc_T50 = []
       self.list_acc_l10 = []
       self.list_acc_l30 = []
       self.list_acc_l20 = []
       self.list_acc_l5 = []

       self.layer1 = self._make_layer(
           block, self.layers[0], intermediate_channels=self.filter, stride=1
       )
       self.last_layer = nn.Conv2d(self.filter, 1, kernel_size=1, stride=1, padding=0, bias=False)
       self.sigmoid = nn.Sigmoid()

   def forward(self, x):
       x = self.conv1(x)
       x = self.ins(x)
       x = self.elu(x)
       x = self.layer1(x)
       x = self.last_layer(x)
       x = self.sigmoid(x)
       return x

   def _make_layer(self, block, num_residual_blocks, intermediate_channels, stride):
       layers = []
       dilation_rate = 1
       for i in range(num_residual_blocks - 1):
           layers.append(block(self.filter, intermediate_channels, _dilation_rate=dilation_rate))
           dilation_rate = dilation_rate * 2
           if dilation_rate > 16:
               dilation_rate = 1
       return nn.Sequential(*layers)

   def configure_optimizers(self):
       optimizer = optim.Adam(model.parameters(), lr=0.0001, weight_decay=0)
       return optimizer

   def training_step(self, batch, batch_idx):
       features = batch['feat'].float()
       label = batch['ground_truth'].float()
       output_val = model(features)
       output_final = torch.squeeze(output_val)
       j = self.loss(output_final, label)

       self.log("loss",loss,on_step=True,on_epoch=True,sync_dist=True)
       return {"loss": j}

   def train_dataloader(self):
       train_file_list = text_file_reader(train_file)
       train_dataset = my_dataset()
       train_dataset.initialize(_feat_path=feature_path, _label_path=label_path, _file_list=train_file_list,
                                _max_len=MAX_LENGTH)
       train_dataloader = torch.utils.data.DataLoader(train_dataset, batch_size=BATCH_SIZE,
                                                      num_workers=self.nThreads, )
       return train_dataloader

   def val_dataloader(self):

       val_file_list = text_file_reader(val_file)
       val_dataset = my_dataset()
       val_dataset.initialize(_feat_path=feature_path, _label_path=label_path, _file_list=val_file_list,
                              _max_len=MAX_LENGTH)
       val_dataloader = torch.utils.data.DataLoader(val_dataset, batch_size=VAL_BATCH_SIZE, num_workers=self.nThreads)
       return val_dataloader

   def validation_step(self, batch, batch_idx):

       features = batch['feat'].float()
       label = batch['ground_truth'].float()
       output_val = model(features)
       pred = torch.squeeze(output_val)
       j_val = self.loss(pred, label)
       eval_length = math.sqrt(batch["len_a"].detach() * batch["len_b"].detach())
       self.log("validation_loss",j_val,on_step=True,on_epoch=True,sync_dist=True)
       return {"val_loss": j_val, "pred": pred, "true": label, "eval_len": eval_length}

   def validation_step_end(self, batch_parts):

       (prec_T5, prec_T10, prec_T20, prec_T30, prec_T50, prec_L30, prec_L20, prec_L10,
        prec_L5) = 
        calculateEvaluationStats(_pred_cmap=batch_parts["pred"].detach().cpu().clone().numpy().squeeze(), _true_cmap=batch_parts["true"].detach().cpu().clone().numpy().squeeze(),
                                            _L=int(batch_parts["eval_len"]))

       self.list_acc_T5.append(prec_T5)
       .
       .
       .
       self.list_acc_l30.append(prec_L30)
       self.list_acc_l20.append(prec_L20)
       self.list_acc_l5.append(prec_L5)

       return None

   def on_validation_end(self):
       val_acc_info = [len(self.list_acc_T5), Average(self.list_acc_T5), Average(self.list_acc_T10),
                       Average(self.list_acc_T20),Average(self.list_acc_T30), Average(self.list_acc_T50), Average(self.list_acc_l30),
                       Average(self.list_acc_l20), Average(self.list_acc_l10), Average(self.list_acc_l5)]
     

       self.list_acc_T5, self.list_acc_T10, self.list_acc_l30, self.list_acc_l20, self.list_acc_l10, self.list_acc_l5, self.list_acc_T20, self.list_acc_
T30, self.list_acc_T50 = [], [], [], [], [], [], [], [], []
       self.epoch = self.epoch + 1
       global last_epoch
       last_epoch = last_epoch + 1
       return None


def calculateEvaluationStats(_pred_cmap, _true_cmap, _L):
.
.

   return prec_T5, prec_T10, prec_T20, prec_T30, prec_T50, prec_L30, prec_L20, prec_L10, prec_L5

class my_dataset(data.Dataset):
   def initialize(self, _feat_path, _label_path, _file_list, _max_len):
       .
       .

   def __getitem__(self, index):
     .
     .
   return {'feat': dist, 'ground_truth': padded_labels, 'len_a': self.len_a, 'len_b': self.len_b,
               'name': self.name}

model = ResNet()
model
checkpoint_callback = ModelCheckpoint(  dirpath=HISTORY_FILE,    filename="hetero-{epoch:02d}",    save_top_k=-1,    monitor="val_loss")
trainer = pl.Trainer(callbacks=[checkpoint_callback], num_nodes=int(NODES), gpus=-1, accelerator="ddp2",resume_from_checkpoint=last_weight,log_every_
n_steps=5,precision=16,plugins=DDPPlugin(find_unused_parameters=False))
trainer.fit(model)

Error I get:
This DataLoader will create 24 worker processes in total. Our suggested max number of worker in current system is 1, which is smaller than what this DataLoader is going t
o create. Please be aware that excessive worker creation might get DataLoader running slow or even freeze, lower the worker number to avoid potential slowness/freeze if necessary.