Converting to a python script¶
All we need to do is copy all the important cells from the previous notebook into a single python file. I have collated all the important steps below. Open up an empty text file and copy everything over.
import tensorflow as tf
import numpy as np
import os
import pickle
import pandas as pd
from sklearn.model_selection import train_test_split
#makes sure your random number generators start with the same random seed everytime it is run.
from numpy.random import seed
seed(1)
tf.random.set_seed(2)
def make_model_VGG(output = 1,l_rate = 0.01, loss = 'mean_squared_error',):
'''
Creates a CNN with the VGG architecture.
Params:
-------
output: int
The number of output neurons.
l_rate: float
The learning rate for the given loss function.
loss: str
Loss function to use, only excepts tf loss functions
Returns:
--------
Tensorflow sequential model.
'''
initializer = tf.keras.initializers.GlorotNormal()
model = tf.keras.Sequential()
model.add(tf.keras.layers.Conv2D(32, kernel_size=(3, 3), strides=(1, 1),padding ='same',kernel_initializer=initializer,use_bias =False))
model.add(tf.keras.layers.Conv2D(32, kernel_size=(3, 3), strides=(1, 1),padding ='same',kernel_initializer=initializer,use_bias =False))
model.add(tf.keras.layers.BatchNormalization(beta_initializer=initializer,momentum = 0.9))
model.add(tf.keras.layers.Activation('relu'))
model.add(tf.keras.layers.MaxPooling2D(pool_size=(2, 2), strides=(2, 2)))
model.add(tf.keras.layers.Conv2D(64, kernel_size=(3, 3), strides=(1, 1),padding ='same',kernel_initializer=initializer,use_bias =False))
model.add(tf.keras.layers.Conv2D(64, kernel_size=(3, 3), strides=(1, 1),padding ='same',kernel_initializer=initializer,use_bias =False))
model.add(tf.keras.layers.BatchNormalization(beta_initializer=initializer,momentum = 0.9))
model.add(tf.keras.layers.Activation('relu'))
model.add(tf.keras.layers.MaxPooling2D(pool_size=(2, 2), strides=(2, 2)))
model.add(tf.keras.layers.Conv2D(128, kernel_size=(3, 3), strides=(1, 1),padding ='same',kernel_initializer=initializer,use_bias =False))
model.add(tf.keras.layers.Conv2D(128, kernel_size=(3, 3), strides=(1, 1),padding ='same',kernel_initializer=initializer,use_bias =False))
model.add(tf.keras.layers.BatchNormalization(beta_initializer=initializer,momentum = 0.9))
model.add(tf.keras.layers.Activation('relu'))
model.add(tf.keras.layers.MaxPooling2D(pool_size=(2, 2), strides=(2, 2)))
model.add(tf.keras.layers.Conv2D(256, kernel_size=(3, 3), strides=(1, 1),padding ='same',kernel_initializer=initializer,use_bias =False))
model.add(tf.keras.layers.Conv2D(256, kernel_size=(3, 3), strides=(1, 1),padding ='same',kernel_initializer=initializer,use_bias =False))
model.add(tf.keras.layers.BatchNormalization(beta_initializer=initializer,momentum = 0.9))
model.add(tf.keras.layers.Activation('relu'))
model.add(tf.keras.layers.Flatten())
model.add(tf.keras.layers.Dense(1024,kernel_initializer=initializer,use_bias =False))
model.add(tf.keras.layers.BatchNormalization(beta_initializer=initializer,momentum = 0.9))
model.add(tf.keras.layers.Activation('relu'))
model.add(tf.keras.layers.Dense(1024,kernel_initializer=initializer,use_bias =False))
model.add(tf.keras.layers.BatchNormalization(beta_initializer=initializer,momentum = 0.9))
model.add(tf.keras.layers.Activation('relu'))
model.add(tf.keras.layers.Dense(1024,kernel_initializer=initializer,use_bias =False))
model.add(tf.keras.layers.BatchNormalization(beta_initializer=initializer,momentum = 0.9))
model.add(tf.keras.layers.Activation('relu'))
model.add(tf.keras.layers.Dense(output,kernel_initializer=initializer,use_bias =False))
model.compile(loss=loss,
optimizer=tf.keras.optimizers.Adam(learning_rate = l_rate),
metrics=[tf.keras.metrics.RootMeanSquaredError()])
return model
#Load in dataset
df_dataset = pd.read_pickle("df_recode.gzip",compression={'method': 'gzip', 'compresslevel': 1, 'mtime': 1})
#Split into training and test sets
df_train, df_test = train_test_split(df_dataset, test_size=.2, shuffle=True, random_state=42)
#Perform preprocessing
df_train = df_train.drop(df_train[df_train['xHI'] > 0.99].index)
df_train = df_train.drop(df_train[df_train['xHI'] < 0.01].index)
df_test = df_test.drop(df_test[df_test['xHI'] > 0.99].index)
df_test = df_test.drop(df_test[df_test['xHI'] < 0.01].index)
#separate into images and labels
x_train = np.array(list(df_train['maps'].values)).reshape(len(df_train),200,200,1)
y_train = df_train['xHI'].values
x_val = np.array(list(df_test['maps'].values)).reshape(len(df_test),200,200,1)
y_val = df_test['xHI'].values
#Standardise datasets
means_x = np.mean(x_train)
stds_x = np.std(x_train)
x_train = (x_train - means_x)/stds_x
x_val = (x_val - means_x)/stds_x
# Create a callback that saves the model's weights
checkpoint_path = "training/cp-{epoch:04d}.ckpt"
checkpoint_dir = os.path.dirname(checkpoint_path)
cp_callback = tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_path,
save_weights_only=False,
verbose=1)
#model parameters
output_nuerons = 1
learning_rate = 0.01
batch_size = 128
model = make_model_VGG(output_nuerons,learning_rate)
print('----------------------------------------------------------')
print(' Fitting Model ')
print('----------------------------------------------------------')
history = model.fit(train_x, train_y,
batch_size=batch_size,
epochs=10,
verbose=1,
callbacks=[cp_callback])
pickle.dump(history.history['loss'], open( dirname+"loss.p", "wb" ) )
Save this text file as filename_cpu.py
where you can substitute filename for any name of your choice. With that your script is now good to go on any cpu based cluster, Tensorflow and numpy are automatically parallalised across cpus so when you run this file, it should use all cpus you have access to.
You may want to run this script on the Imperial HPC which would require you to write a job file to submit. I will show an example of this below, for a more in-depth tutorial on the Imperial HPC see the graduate school course: https://www.imperial.ac.uk/students/academic-support/graduate-school/students/doctoral/professional-development/research-computing-data-science/courses/introduction-to-hpc/
Example job file¶
The first two lines asks the server for the resources you require, and the maximum time you need them for. The third line loads the anaconda package into your working environment. You would then source into your python virtual environment using conda (you would need to create this virtual environment before hand and install all python packages you would need). Finally, the penultimate line changes directory to your current working directory, which is the directory that the python script is saved in, then runs the script.
With access to the Imperial HPC, you also have access to GPUs. These will greatly reduce the time it takes to train deep network models. To make use of these GPUs, we need to make a few adjustmusts to our initial script. These are shown below:
import tensorflow as tf
import numpy as np
import os
import pickle
import pandas as pd
from sklearn.model_selection import train_test_split
#makes sure your random number generators start with the same random seed everytime it is run.
from numpy.random import seed
seed(1)
tf.random.set_seed(2)
def make_model_VGG(output = 1,l_rate = 0.01, loss = 'mean_squared_error',):
'''
Creates a CNN with the VGG architecture.
Params:
-------
output: int
The number of output neurons.
l_rate: float
The learning rate for the given loss function.
loss: str
Loss function to use, only excepts tf loss functions
Returns:
--------
Tensorflow sequential model.
'''
initializer = tf.keras.initializers.GlorotNormal()
model = tf.keras.Sequential()
model.add(tf.keras.layers.Conv2D(32, kernel_size=(3, 3), strides=(1, 1),padding ='same',kernel_initializer=initializer,use_bias =False))
model.add(tf.keras.layers.Conv2D(32, kernel_size=(3, 3), strides=(1, 1),padding ='same',kernel_initializer=initializer,use_bias =False))
model.add(tf.keras.layers.BatchNormalization(beta_initializer=initializer,momentum = 0.9))
model.add(tf.keras.layers.Activation('relu'))
model.add(tf.keras.layers.MaxPooling2D(pool_size=(2, 2), strides=(2, 2)))
model.add(tf.keras.layers.Conv2D(64, kernel_size=(3, 3), strides=(1, 1),padding ='same',kernel_initializer=initializer,use_bias =False))
model.add(tf.keras.layers.Conv2D(64, kernel_size=(3, 3), strides=(1, 1),padding ='same',kernel_initializer=initializer,use_bias =False))
model.add(tf.keras.layers.BatchNormalization(beta_initializer=initializer,momentum = 0.9))
model.add(tf.keras.layers.Activation('relu'))
model.add(tf.keras.layers.MaxPooling2D(pool_size=(2, 2), strides=(2, 2)))
model.add(tf.keras.layers.Conv2D(128, kernel_size=(3, 3), strides=(1, 1),padding ='same',kernel_initializer=initializer,use_bias =False))
model.add(tf.keras.layers.Conv2D(128, kernel_size=(3, 3), strides=(1, 1),padding ='same',kernel_initializer=initializer,use_bias =False))
model.add(tf.keras.layers.BatchNormalization(beta_initializer=initializer,momentum = 0.9))
model.add(tf.keras.layers.Activation('relu'))
model.add(tf.keras.layers.MaxPooling2D(pool_size=(2, 2), strides=(2, 2)))
model.add(tf.keras.layers.Conv2D(256, kernel_size=(3, 3), strides=(1, 1),padding ='same',kernel_initializer=initializer,use_bias =False))
model.add(tf.keras.layers.Conv2D(256, kernel_size=(3, 3), strides=(1, 1),padding ='same',kernel_initializer=initializer,use_bias =False))
model.add(tf.keras.layers.BatchNormalization(beta_initializer=initializer,momentum = 0.9))
model.add(tf.keras.layers.Activation('relu'))
model.add(tf.keras.layers.Flatten())
model.add(tf.keras.layers.Dense(1024,kernel_initializer=initializer,use_bias =False))
model.add(tf.keras.layers.BatchNormalization(beta_initializer=initializer,momentum = 0.9))
model.add(tf.keras.layers.Activation('relu'))
model.add(tf.keras.layers.Dense(1024,kernel_initializer=initializer,use_bias =False))
model.add(tf.keras.layers.BatchNormalization(beta_initializer=initializer,momentum = 0.9))
model.add(tf.keras.layers.Activation('relu'))
model.add(tf.keras.layers.Dense(1024,kernel_initializer=initializer,use_bias =False))
model.add(tf.keras.layers.BatchNormalization(beta_initializer=initializer,momentum = 0.9))
model.add(tf.keras.layers.Activation('relu'))
model.add(tf.keras.layers.Dense(output,kernel_initializer=initializer,use_bias =False))
model.compile(loss=loss,
optimizer=tf.keras.optimizers.Adam(learning_rate = l_rate),
metrics=[tf.keras.metrics.RootMeanSquaredError()])
return model
#Looks for all GPUs we have access to
device_type = 'GPU'
devices = tf.config.experimental.list_physical_devices(
device_type)
devices_names = [d.name.split('e:')[1] for d in devices]
print('----------------------------------------------------------')
print(devices_names)
print('----------------------------------------------------------')
#Enables all GPUs to be used by TF
strategy = tf.distribute.MirroredStrategy(
devices=devices_names,
cross_device_ops=tf.distribute.HierarchicalCopyAllReduce())
#Load in dataset
df_dataset = pd.read_pickle("df_recode.gzip",compression={'method': 'gzip', 'compresslevel': 1, 'mtime': 1})
#Split into training and test sets
df_train, df_test = train_test_split(df_dataset, test_size=.2, shuffle=True, random_state=42)
#Perform preprocessing
df_train = df_train.drop(df_train[df_train['xHI'] > 0.99].index)
df_train = df_train.drop(df_train[df_train['xHI'] < 0.01].index)
df_test = df_test.drop(df_test[df_test['xHI'] > 0.99].index)
df_test = df_test.drop(df_test[df_test['xHI'] < 0.01].index)
#separate into images and labels
x_train = np.array(list(df_train['maps'].values)).reshape(len(df_train),200,200,1)
y_train = df_train['xHI'].values
x_val = np.array(list(df_test['maps'].values)).reshape(len(df_test),200,200,1)
y_val = df_test['xHI'].values
#Standardise datasets
means_x = np.mean(x_train)
stds_x = np.std(x_train)
x_train = (x_train - means_x)/stds_x
x_val = (x_val - means_x)/stds_x
# Create a callback that saves the model's weights
checkpoint_path = "training/cp-{epoch:04d}.ckpt"
checkpoint_dir = os.path.dirname(checkpoint_path)
cp_callback = tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_path,
save_weights_only=False,
verbose=1)
#model parameters
output_nuerons = 1
learning_rate = 0.01
batch_size = 128
#Make the model and load it onto the GPUs
with strategy.scope():
model = make_model_VGG(output_nuerons,learning_rate)
#Reshape the training and test data into TF tensors with the batch size
train_data = tf.data.Dataset.from_tensor_slices((x_train, y_train))
val_data = tf.data.Dataset.from_tensor_slices((x_val, y_val))
#May or maynot need this depending on GPU implementation.
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.OFF
train_data = train_data.with_options(options)
val_data = val_data.with_options(options)
train_data = train_data.batch(batch_size)
val_data = val_data.batch(batch_size)
print('----------------------------------------------------------')
print(' Fitting Model ')
print('----------------------------------------------------------')
#Trains model on GPUs
model.fit(train_data,
validation_data=val_data,
epochs=500,
verbose=1,
callbacks=[cp_callback])
Save this script as filename_gpu.py
. With this new script you would also need a new job script to ask the HPC for GPUs:
Almost everything remains the same here, except now in the first two lines you request some GPUs.
This is now everything you will need to go from an image dataset to a script that can be run on a cluster!