Hi Everyone,
I tried to make pytorch lightning model and train it using ddp in multi nodes and multi gpu.In below is a very brief version of the code that I believe covers them. I am submitting the job using on a IBM watson server using jsrun. I would be very grateful, If you could help
job submission
jsrun -bpacked:7 -g6 -a6 -c42 -r1 python train_model.py
python code
project_root = os.path.dirname(os.path.abspath(sys.argv[0]))
feature_path = '/gpfs/alpine/proj-shared/bif135/raj/hetero_data/features/'
label_path = '/gpfs/alpine/proj-shared/bif135/raj/hetero_data/labels/'
train_file = "../dataset/initial/het_std_train_list.txt"
val_file = "../dataset/initial/het_std_val_list.txt"
NODES =int (sys.argv[1])
GPU = int(sys.argv[2])
BATCH_SIZE = 1
VAL_BATCH_SIZE = BATCH_SIZE
class block(pl.LightningModule):
def __init__(
self, in_channels, intermediate_channels, identity_downsample=None, stride=1, _dilation_rate=1):
super(block, self).__init__()
self.conv1 = nn.Conv2d(
intermediate_channels, intermediate_channels, kernel_size=3, bias=False, padding=2 * _dilation_rate,
dilation=_dilation_rate)
self.ins1 = nn.InstanceNorm2d(intermediate_channels)
self.elu = nn.ELU()
self.dropout1 = nn.Dropout(0.15)
self.conv2 = nn.Conv2d(
intermediate_channels, intermediate_channels, kernel_size=3, bias=False, dilation=_dilation_rate)
self.ins2 = nn.InstanceNorm2d(intermediate_channels)
def forward(self, x):
identity = x.clone()
x = self.conv1(x)
x = self.ins1(x)
x = self.elu(x)
x = self.dropout1(x)
x = self.conv2(x)
x = self.ins2(x)
x += identity
x = self.elu(x)
return x
class ResNet(pl.LightningModule):
def __init__(self):
super(ResNet, self).__init__()
self.filter = 48
self.layers = [RESNET_DEPTH]
self.in_channels = 442
self.nThreads = 24
self.epoch = 0
self.conv1 = nn.Conv2d(self.in_channels, self.filter, kernel_size=1, stride=1, padding=0, bias=False)
self.ins = nn.InstanceNorm2d(self.filter)
self.elu = nn.ELU()
self.loss = nn.BCELoss()
self.list_acc_T5 = []
self.list_acc_T10 = []
self.list_acc_T20 = []
self.list_acc_T30 = []
self.list_acc_T50 = []
self.list_acc_l10 = []
self.list_acc_l30 = []
self.list_acc_l20 = []
self.list_acc_l5 = []
self.layer1 = self._make_layer(
block, self.layers[0], intermediate_channels=self.filter, stride=1
)
self.last_layer = nn.Conv2d(self.filter, 1, kernel_size=1, stride=1, padding=0, bias=False)
self.sigmoid = nn.Sigmoid()
def forward(self, x):
x = self.conv1(x)
x = self.ins(x)
x = self.elu(x)
x = self.layer1(x)
x = self.last_layer(x)
x = self.sigmoid(x)
return x
def _make_layer(self, block, num_residual_blocks, intermediate_channels, stride):
layers = []
dilation_rate = 1
for i in range(num_residual_blocks - 1):
layers.append(block(self.filter, intermediate_channels, _dilation_rate=dilation_rate))
dilation_rate = dilation_rate * 2
if dilation_rate > 16:
dilation_rate = 1
return nn.Sequential(*layers)
def configure_optimizers(self):
optimizer = optim.Adam(model.parameters(), lr=0.0001, weight_decay=0)
return optimizer
def training_step(self, batch, batch_idx):
features = batch['feat'].float()
label = batch['ground_truth'].float()
output_val = model(features)
output_final = torch.squeeze(output_val)
j = self.loss(output_final, label)
self.log("loss",loss,on_step=True,on_epoch=True,sync_dist=True)
return {"loss": j}
def train_dataloader(self):
train_file_list = text_file_reader(train_file)
train_dataset = my_dataset()
train_dataset.initialize(_feat_path=feature_path, _label_path=label_path, _file_list=train_file_list,
_max_len=MAX_LENGTH)
train_dataloader = torch.utils.data.DataLoader(train_dataset, batch_size=BATCH_SIZE,
num_workers=self.nThreads, )
return train_dataloader
def val_dataloader(self):
val_file_list = text_file_reader(val_file)
val_dataset = my_dataset()
val_dataset.initialize(_feat_path=feature_path, _label_path=label_path, _file_list=val_file_list,
_max_len=MAX_LENGTH)
val_dataloader = torch.utils.data.DataLoader(val_dataset, batch_size=VAL_BATCH_SIZE, num_workers=self.nThreads)
return val_dataloader
def validation_step(self, batch, batch_idx):
features = batch['feat'].float()
label = batch['ground_truth'].float()
output_val = model(features)
pred = torch.squeeze(output_val)
j_val = self.loss(pred, label)
eval_length = math.sqrt(batch["len_a"].detach() * batch["len_b"].detach())
self.log("validation_loss",j_val,on_step=True,on_epoch=True,sync_dist=True)
return {"val_loss": j_val, "pred": pred, "true": label, "eval_len": eval_length}
def validation_step_end(self, batch_parts):
(prec_T5, prec_T10, prec_T20, prec_T30, prec_T50, prec_L30, prec_L20, prec_L10,
prec_L5) =
calculateEvaluationStats(_pred_cmap=batch_parts["pred"].detach().cpu().clone().numpy().squeeze(), _true_cmap=batch_parts["true"].detach().cpu().clone().numpy().squeeze(),
_L=int(batch_parts["eval_len"]))
self.list_acc_T5.append(prec_T5)
.
.
.
self.list_acc_l30.append(prec_L30)
self.list_acc_l20.append(prec_L20)
self.list_acc_l5.append(prec_L5)
return None
def on_validation_end(self):
val_acc_info = [len(self.list_acc_T5), Average(self.list_acc_T5), Average(self.list_acc_T10),
Average(self.list_acc_T20),Average(self.list_acc_T30), Average(self.list_acc_T50), Average(self.list_acc_l30),
Average(self.list_acc_l20), Average(self.list_acc_l10), Average(self.list_acc_l5)]
self.list_acc_T5, self.list_acc_T10, self.list_acc_l30, self.list_acc_l20, self.list_acc_l10, self.list_acc_l5, self.list_acc_T20, self.list_acc_
T30, self.list_acc_T50 = [], [], [], [], [], [], [], [], []
self.epoch = self.epoch + 1
global last_epoch
last_epoch = last_epoch + 1
return None
def calculateEvaluationStats(_pred_cmap, _true_cmap, _L):
.
.
return prec_T5, prec_T10, prec_T20, prec_T30, prec_T50, prec_L30, prec_L20, prec_L10, prec_L5
class my_dataset(data.Dataset):
def initialize(self, _feat_path, _label_path, _file_list, _max_len):
.
.
def __getitem__(self, index):
.
.
return {'feat': dist, 'ground_truth': padded_labels, 'len_a': self.len_a, 'len_b': self.len_b,
'name': self.name}
model = ResNet()
model
checkpoint_callback = ModelCheckpoint( dirpath=HISTORY_FILE, filename="hetero-{epoch:02d}", save_top_k=-1, monitor="val_loss")
trainer = pl.Trainer(callbacks=[checkpoint_callback], num_nodes=int(NODES), gpus=-1, accelerator="ddp2",resume_from_checkpoint=last_weight,log_every_
n_steps=5,precision=16,plugins=DDPPlugin(find_unused_parameters=False))
trainer.fit(model)
Error I get:
This DataLoader will create 24 worker processes in total. Our suggested max number of worker in current system is 1, which is smaller than what this DataLoader is going t
o create. Please be aware that excessive worker creation might get DataLoader running slow or even freeze, lower the worker number to avoid potential slowness/freeze if necessary.